更简的并发代码,更强的并发控制
hxl Go语言中文网 昨天有没感觉 Go 的 sync 包不够用?有没遇到类型没有 sync/atomic 支持?我们一起看看 go-zero 的 syncx 包对标准库的一些增值补充。https://github.com/tal-tech/go-zero/tree/master/core/syncxname作用AtomicBoolbool类型 原子类AtomicDurationDuration有关 原子类AtomicFloat64float64类型 原子类Barrier栏栅【将加锁解锁包装】Cond条件变量DoneChan优雅通知关闭ImmutableResource创建后不会修改的资源Limit控制请求数LockedCalls确保方法的串行调用ManagedResource资源管理Once提供 once funcOnceGuard一次性使用的资源管理Poolpool,简单的池RefResource引用计数的资源ResourceManager资源管理器SharedCalls类似 singflight 的功能SpinLock自旋锁:自旋+CASTimeoutLimitLimit + timeout 控制atomic因为没有 泛型 支持,所以才会出现多种类型的原子类支持。以下采用 float64 作为例子:func (f *AtomicFloat64) Add(val float64) float64 {for {old := f.Load()nv := old + valif f.CompareAndSwap(old, nv) {return nv}}}func (f *AtomicFloat64) CompareAndSwap(old, val float64) bool {return atomic.CompareAndSwapUint64((*uint64)(f), math.Float64bits(old), math.Float64bits(val))}func (f *AtomicFloat64) Load() float64 {return math.Float64frombits(atomic.LoadUint64((*uint64)(f)))}func (f *AtomicFloat64) Set(val float64) {atomic.StoreUint64((*uint64)(f), math.Float64bits(val))}Add(val):如果 CAS 失败,不断for循环重试,获取 old val,并set old+val;CompareAndSwap(old, new):调用底层 atomic 的 CAS;Load():调用 atomic.LoadUint64 ,然后转换Set(val):调用 atomic.StoreUint64至于其他类型,开发者想自己扩展自己想要的类型,可以依照上述,基本上调用原始 atomic 操作,然后转换为需要的类型,比如:遇到 bool 可以借助 0, 1 来分辨对应的 false, true。Barrier这里 Barrier 只是将业务函数操作封装,作为闭包传入,内部将 lock 操作的加锁解锁自行解决了【防止开发者加锁了忘记解锁】func (b *Barrier) Guard(fn func()) {b.lock.Lock()defer b.lock.Unlock()// 自己的业务逻辑fn()}Cond/Limit/TimeoutLimit这个数据结构和 Limit 一起组成了 TimeoutLimit ,这里将这3个一起讲:func NewTimeoutLimit(n int) TimeoutLimit {return TimeoutLimit{limit: NewLimit(n),cond: NewCond(),}}func NewLimit(n int) Limit {return Limit{pool: make(chan lang.PlaceholderType, n),}}limit 这里是有缓冲的 channel;cond 是无缓冲的;所以这里结合名字来理解:因为 Limit 是限制某一种资源的使用,所以需要预先在资源池中放入预置数量的资源;Cond 类似阀门,需要两边都准备好,才能进行数据交换,所以使用无缓冲,同步控制。这里我们看看 stores/mongo 中关于 session 的管理,来理解 资源控制:func (cs *concurrentSession) takeSession(opts ...Option) (*mgo.Session, error) {// 选项参数注入...// 看 limit 中是否还能取出资源if err := cs.limit.Borrow(o.timeout); err != nil {return nil, err} else {return cs.Copy(), nil}}func (l TimeoutLimit) Borrow(timeout time.Duration) error {// 1. 如果还有 limit 中还有资源,取出一个,返回if l.TryBorrow() {return nil}// 2. 如果 limit 中资源已经用完了var ok boolfor {// 只有 cond 可以取出一个【无缓存,也只有 cond <- 此条才能通过】timeout, ok = l.cond.WaitWithTimeout(timeout)// 尝试取出一个【上面 cond 通过时,就有一个资源返回了】// 看 `Return()`if ok && l.TryBorrow() {return nil}// 超时控制if timeout <= 0 {return ErrTimeout}}}func (l TimeoutLimit) Return() error {// 返回去一个资源if err := l.limit.Return(); err != nil {return err}// 同步通知另一个需要资源的协程【实现了阀门,两方交换】l.cond.Signal()return nil}资源管理同文件夹中还有 ManagedResource &ResourceManager,从名字上类似,这里将两个组件放在一起讲解。先从结构上:type ManagedResource struct {// 资源resource interface{}lock sync.RWMutex// 生成资源的逻辑,由开发者自己控制generate func() interface{}// 对比资源equals func(a, b interface{}) bool}type ResourceManager struct {// 资源:这里看得出来是 I/O,resources map[string]io.ClosersharedCalls SharedCalls// 对资源map互斥访问lock sync.RWMutex}然后来看获取资源的方法签名:func (manager *ResourceManager) GetResource(key, create func() (io.Closer, error)) (io.Closer, error)// 获取一个资源(有就直接获取,没有生成一个)func (mr *ManagedResource) Take() interface{}// 判断这个资源是否不符合传入的判断要求,不符合则重置func (mr *ManagedResource) MarkBroken(resource interface{})ResourceManager 使用 SharedCalls 做防重复请求,并将资源缓存在内部的 sourMap;另外传入的 create func 和 IO 操作有关,常见用在网络资源的缓存;ManagedResource 缓存资源没有 map 而是单一的 interface ,说明只有一份,但是它提供了 Take() 和传入 generate()说明可以让开发者自行更新 resource;所以在用途上:ResourceManager:用在网络资源的管理。如:数据库连接管理;ManagedResource:用在一些变化资源,可以做资源前后对比,达到更新资源。如:token 管理和验证RefResource这个就和 GC 中引用计数类似:Use() -> ref++Clean() -> ref--; if ref == 0 -> ref cleanfunc (r *RefResource) Use() error {// 互斥访问r.lock.Lock()defer r.lock.Unlock()// 清除标记if r.cleaned {return ErrUseOfCleaned}// 引用 +1r.ref++return nil}SharedCalls一句话形容:使用SharedCalls可以使得同时多个请求只需要发起一次拿结果的调用,其他请求"坐享其成",这种设计有效减少了资源服务的并发压力,可以有效防止缓存击穿。这个组件被反复应用在其他组件中,上面说的 ResourceManager。类似当需要高频并发访问一个资源时,就可以使用 SharedCalls 缓存。// 当多个请求同时使用Do方法请求资源时func (g *sharedGroup) Do(key string, fn func() (interface{}, error)) (interface{}, error) {// 先申请加锁g.lock.Lock()// 根据key,获取对应的call结果,并用变量c保存if c, ok := g.calls[key]; ok {// 拿到call以后,释放锁,此处call可能还没有实际数据,只是一个空的内存占位g.lock.Unlock()// 调用wg.Wait,判断是否有其他goroutine正在申请资源,如果阻塞,说明有其他goroutine正在获取资源c.wg.Wait()// 当wg.Wait不再阻塞,表示资源获取已经结束,可以直接返回结果return c.val, c.err}// 没有拿到结果,则调用makeCall方法去获取资源,注意此处仍然是锁住的,可以保证只有一个goroutine可以调用makecallc := g.makeCall(key, fn)// 返回调用结果return c.val, c.err}总结不重复造轮子,一直是 go-zero 设计主旨之一;也同时将平时业务沉淀到组件中,这才是框架和组件的意义。关于 go-zero 更多的设计和实现文章,可以持续关注我们。欢迎大家去关注和使用。推荐阅读Go并发编程 — sync.Once 单实例模式的思考福利我为大家整理了一份从入门到进阶的Go学习资料礼包,包含学习建议:入门看什么,进阶看什么。关注公众号 「polarisxu」,回复 ebook 获取;还可以回复「进群」,和数万 Gopher 交流学习。阅读 698赞2在看2写下你的留言