大型网站设计,南海网官网,WordPress注册小工具,电子商务网站建设与策划在 go 的 sync 包中#xff0c;有一个 singleflight 包#xff0c;里面有一个 singleflight.go 文件#xff0c;代码加注释#xff0c;一共 200 行出头。内容包括以下几块儿#xff1a;
Group 结构体管理一组相关的函数调用工作,它包含一个互斥锁和一个 map,map 的 key 是…在 go 的 sync 包中有一个 singleflight 包里面有一个 singleflight.go 文件代码加注释一共 200 行出头。内容包括以下几块儿
Group 结构体管理一组相关的函数调用工作,它包含一个互斥锁和一个 map,map 的 key 是函数的名称,value 是对应的 call 结构体。call 结构体表示一个 inflight 或已完成的函数调用,包含等待组件 WaitGroup、调用结果 val 和 err、调用次数 dups 和通知通道 chans。Do 方法接收一个 key 和函数 fn,它会先查看 map 中是否已经有这个 key 的调用在 inflight,如果有则等待并返回已有结果,如果没有则新建一个 call 并执行函数调用。DoChan 类似 Do 但返回一个 channel 来接收结果。doCall 方法包含了具体处理调用的逻辑,它会在函数调用前后添加 defer 来 recover panic 和区分正常 return 与 runtime.Goexit。如果发生 panic,会将 panicwraps 成错误返回给等待的 channel,如果是 goexit 会直接退出。正常 return 时会将结果发送到所有通知 channel。Forget 方法可以忘记一个 key 的调用,下次 Do 时会重新执行函数。
这个包通过互斥锁和 map 实现了对相同 key 的函数调用去重,可以避免对已有调用的重复计算,同时通过 channel 机制可以通知调用者函数执行结果。在一些需要确保单次执行的场景中可以使用这个包中的方法。
通过 singleflight 可以很容易实现缓存和去重的效果避免重复计算接下来我们来模拟一下并发请求可能导致的缓存穿透场景,以及如何用 singleflight 包来解决这个问题
package mainimport (contextfmtgolang.org/x/sync/singleflightsync/atomictime)type Result string
// 模拟查询数据库
func find(ctx context.Context, query string) (Result, error) {return Result(fmt.Sprintf(result for %q, query)), nil
}func main() {var g singleflight.Groupconst n 200waited : int32(n)done : make(chan struct{})key : this is keyfor i : 0; i n; i {go func(j int) {v, _, shared : g.Do(key, func() (interface{}, error) {ret, err : find(context.Background(), key)return ret, err})if atomic.AddInt32(waited, -1) 0 {close(done)}fmt.Printf(index: %d, val: %v, shared: %v\n, j, v, shared)}(i)}select {case -done:case -time.After(time.Second):fmt.Println(Do hangs)}time.Sleep(time.Second * 4)
}在这段程序中如果重复使用查询结果shared 会返回 true穿透查询会返回 false
上面的设计中还有一个问题就是在 Do 阻塞时所有请求都会阻塞内存可能会出现大的问题。
此时Do 可以更换为DoChan两者实现上完全一样不同的是DoChan() 通过 channel 返回结果。因此可以使用 select 语句实现超时控制
ch : g.DoChan(key, func() (interface{}, error) {ret, err : find(context.Background(), key)return ret, err
})
// Create our timeout
timeout : time.After(500 * time.Millisecond)var ret singleflight.Result
select {
case -timeout: // Timeout elapsedfmt.Println(Timeout)return
case ret -ch: // Received result from channelfmt.Printf(index: %d, val: %v, shared: %v\n, j, ret.Val, ret.Shared)
}在超时时主动返回不阻塞。
此时又引入了另一个问题这样的每一次的请求并不是高可用的成功率是无法保证的。这时候可以增加一定的请求饱和度来保证业务的最终成功率此时一次请求还是多次请求对于下游服务而言并没有太大区别此时使用 singleflight 只是为了降低请求的数量级那么可以使用 Forget() 来提高下游请求的并发。
ch : g.DoChan(key, func() (interface{}, error) {go func() {time.Sleep(10 * time.Millisecond)fmt.Printf(Deleting key: %v\n, key)g.Forget(key)}()ret, err : find(context.Background(), key)return ret, err
})当然这种做法依然无法保证100%的成功如果单次的失败无法容忍在高并发的场景下需要使用更好的处理方案比如牺牲一部分实时性、完全使用缓存查询 异步更新等。