【Go底层】singleflight包原理
1、背景
在处理同一时刻接口的并发请求时,常见的有这几种情况:一个请求正在执行,相同的其它请求等待顺序执行,使用互斥锁就能完成、一个请求正在执行,相同的其它请求都丢弃、一个请求正在执行,相同的其它请求等待拿取相同的结果。使用singleflight包就能达到一个请求正在执行,相同的其它请求过来等待第一个请求执行完,然后共享第一个请求的结果,在处理并发场景时非常好用。
2、下载
go get -u golang.org/x/sync/singleflight
3、原理解释
singleflight底层结构:
type Group struct {
mu sync.Mutex //保护map对象m的并发安全
m map[string]*call //key-请求的唯一标识,val-请求唯一标识对应要执行的函数
}
call底层结构:
type call struct {
wg sync.WaitGroup //用来阻塞相同标识对应的请求中的第一个请求之外的请求
val interface{} //第一个请求的执行结果
err error //第一个请求返回的错误
dups int //第一个请求之外的其它请求数
chans []chan<- Result //请求结果写入通道
}
关键函数:
//
// Do
// @Description: 一个唯一标识对应的请求在执行过程中,相同唯一标识对应的请求会被阻塞,等待第一个请求执行完并共享结果
// @receiver g
// @param key 请求唯一标识
// @param fn 请求要执行的函数
// @return v 请求要执行函数返回的结果
// @return err 请求要执行的函数返回的错误
// @return shared 是否有多个请求共享结果
//
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock() //保护map对象m的并发安全
if g.m == nil {
g.m = make(map[string]*call) //初始化m对象
}
if c, ok := g.m[key]; ok { //map中key存在说明这个key对应的请求正在执行中,这次请求不是第一个请求
c.dups++ //等待共享结果数+1
g.mu.Unlock()
c.wg.Wait() //阻塞等待第一个请求执行完
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true //返回第一个请求的执行结果
}
//第一个请求的处理逻辑
c := new(call)
c.wg.Add(1) //计数+1
g.m[key] = c //唯一标识关联对应的函数对象
g.mu.Unlock()
g.doCall(c, key, fn) //执行请求对应的函数
return c.val, c.err, c.dups > 0 //返回执行结果
}
上面Do函数中g.doCall函数也需要大概理解一下,就是会将key对应函数的执行结果写的call对象c里,然后清空wg计数,相同key对应的其它请求就会跳出c.wg.Wait()阻塞,直接从call对象c中读取第一个请求的执行结果和错误信息并返回,源码如下:
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
normalReturn := false
recovered := false
defer func() { //fn函数执行完之后执行
if !normalReturn && !recovered {
c.err = errGoexit
}
g.mu.Lock()
defer g.mu.Unlock()
c.wg.Done() //释放计数
if g.m[key] == c {
delete(g.m, key) //删除此次请求的唯一标识相关信息
}
if e, ok := c.err.(*panicError); ok {
if len(c.chans) > 0 {
go panic(e)
select {}
} else {
panic(e)
}
} else if c.err == errGoexit {
// Already in the process of goexit, no need to call again
} else {
for _, ch := range c.chans { //执行结果写入通道
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()
func() {
defer func() {
if !normalReturn {
if r := recover(); r != nil {
c.err = newPanicError(r)
}
}
}()
c.val, c.err = fn() //执行fn函数
normalReturn = true
}()
if !normalReturn {
recovered = true
}
}
singleflight中还提供了与Do函数功能相同的函数DoChan函数,唯一区别就是将请求对应的函数执行结果放到通道中进行返回,这两函数一个用于同步场景,一个用于异步场景。还有一个Forget函数:
func (g *Group) Forget(key string) {
g.mu.Lock()
delete(g.m, key) //删除map中的key,相同key对应请求进来会重新执行,不等待第一个key对应请求的执行结果
g.mu.Unlock()
}
4、代码示例
示例如下:
func main() {
var singleFlight singleflight.Group //初始化一个单次执行对象
var count uint64 //用于测试是否被修改
//为了并发执行,这里测试唯一标识都为xxx
go func() {
val1, _, shared1 := singleFlight.Do("xxx", func() (interface{}, error) {
logger.Info("first count +1")
atomic.AddUint64(&count, 1) //第一次执行,将count+1
time.Sleep(5 * time.Second) //增加第一次执行时间
return count, nil
})
//打印第一次执行结果
logger.Info("first count info", zap.Any("val1", val1), zap.Bool("shared1", shared1), zap.Uint64("count", count))
}()
time.Sleep(2 * time.Second) //为了防止下面的Do函数先执行
val2, _, shared2 := singleFlight.Do("xxx", func() (interface{}, error) {
logger.Info("second count +1")
atomic.AddUint64(&count, 1) //第2次执行count+1
return count, nil
})
//打印第二次执行结果
logger.Info("second count info", zap.Any("val2", val2), zap.Bool("shared2", shared2), zap.Uint64("count", count))
}
控制台输出:
$ go run ./singlefight_demo/main.go
[2025-01-09 17:08:11.169] | INFO | Goroutine:6 | [singlefight_demo/main.go:19] | first count +1
[2025-01-09 17:08:16.261] | INFO | Goroutine:6 | [singlefight_demo/main.go:28] | first count info | {"val1": 1, "shared1": true, "count": 1}
[2025-01-09 17:08:16.261] | INFO | Goroutine:1 | [singlefight_demo/main.go:41] | second count info | {"val2": 1, "shared2": true, "count": 1}
5、总结
看singleflight原码之后,要实现一个请求正在执行,相同的其它请求进来时直接报错的功能也很简单,将singleflight中等待第一个请求的逻辑改为直接返回错误就可以。
原文地址:https://blog.csdn.net/qq_45795794/article/details/145038514
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!