Go语言并发编程-同步和锁
同步和锁
概述
同步是并发编程的基本要素之一,我们通过channel可以完成多个goroutine间数据和信号的同步。
除了channel外,我们还可以使用go的官方同步包sync,sync/atomic 完成一些基础的同步功能。主要包含同步数据、锁、原子操作等。
一个同步失败的示例:
func SyncErr() { wg := sync.WaitGroup{} // 计数器 counter := 0 // 多个goroutine并发的累加计数器 gs := 100 wg.Add(gs) for i := 0; i < gs; i++ { go func() { defer wg.Done() // 累加 for k := 0; k < 100; k++ { counter++ // ++ 操作不是原子的 // counter = counter + 1 // 1. 获取当前的counter变量 // 2. +1 // 3. 赋值新值到counter } }() } // 统计计数结果 wg.Wait() fmt.Println("Counter:", counter) }
Lock解决方案:
func SyncLock() { n := 0 wg := sync.WaitGroup{} lk := sync.Mutex{} for i := 0; i < 1000; i++ { wg.Add(1) go func() { defer wg.Done() for i := 0; i < 100; i++ { lk.Lock() n++ lk.Unlock() } }() } wg.Wait() fmt.Println("n:", n) } // run n: 100000
互斥锁Mutex的使用
sync包提供了两种锁:
-
互斥锁,Mutex
-
读写互斥锁,RWMutex
互斥锁,同一时刻只能有一个goroutine申请锁定成功,不区分读、写操作。也称为:独占锁、排它锁。
提供了如下方法完成锁操作:
type Mutex // 锁定锁m, 若锁m已是锁定状态,调用的goroutine会被阻塞,直到可以锁定 func (m *Mutex) Lock() // 解锁锁m,若m不是锁定状态,会导致运行时错误 func (m *Mutex) Unlock() // 尝试是否可以加锁,返回是否成功 func (m *Mutex) TryLock() bool
注意:锁与goroutine没有关联,意味着允许一个goroutine加锁,在另一个goroutine中解锁。但是不是最典型的用法。
典型的锁用法:
var lck sync.Mutex func () { lck.Lock() // 互斥执行的代码 defer lck.Unlock() }
锁的流程:
示例:
func SyncMutex() { wg := sync.WaitGroup{} var lck sync.Mutex for i := 0; i < 4; i++ { wg.Add(1) go func(n int) { defer wg.Done() fmt.Println("before lock: ", n) lck.Lock() fmt.Println("locked: ", n) time.Sleep(1 * time.Second) lck.Unlock() fmt.Println("after lock: ", n) }(i) } wg.Wait() }
某次输出结果:
before lock: 3 locked: 3 before lock: 2 before lock: 1 before lock: 0 after lock: 3 locked: 2 after lock: 2 locked: 1 after lock: 1 locked: 0 after lock: 0
可以发现,before lock 都是先执行的,而Lock() 操作,必须要等到其他goroutineUnlock()才能成功。
注意,如果其他goroutine没有通过相同的锁(1没用锁,2用了其他锁)去操作资源,那么是不受锁限制的,例如:
func SyncLockAndNo() { n := 0 wg := sync.WaitGroup{} lk := sync.Mutex{} for i := 0; i < 1000; i++ { wg.Add(1) go func() { defer wg.Done() for i := 0; i < 100; i++ { lk.Lock() n++ lk.Unlock() } }() } wg.Add(1) go func() { defer wg.Done() for i := 0; i < 10000; i++ { n++ } }() // 其他锁 //var lk2 sync.Mutex //go func() { // defer wg.Done() // for i := 0; i < 10000; i++ { // lk2.Lock() // n++ // lk2.Unlock() // } //}() wg.Wait() fmt.Println("n:", n) } // 其中一次结果 n: 109876
我们在第一个counter的例子上,增加了一个goroutine同去累加计数器counter,但没有使用前面的Mutex(不使用或使其他锁)。可见,出现了资源争用的情况。因此要注意:如果要限制资源的并发争用,要全部的资源操作都使用同一个锁。
实操时,锁除了直接调用外,还经常性出现在结构体中,以某个字段的形式出现,用于包含struct字段不会被多gorutine同时修改,例如我们 cancelCtx:
type cancelCtx struct { Context mu sync.Mutex // protects following fields done atomic.Value // of chan struct{}, created lazily, closed by first cancel call children map[canceler]struct{} // set to nil by the first cancel call err error // set to non-nil by the first cancel call }
我们通常也会这么做,示例:
type Post struct { Subject string // 赞 Likes int // 操作锁定 mu sync.Mutex } func (p *Post) IncrLikes() *Post { p.mu.Lock() defer p.mu.Unlock() p.Likes++ return p } func (p *Post) DecrLikes() *Post { p.mu.Lock() defer p.mu.Unlock() p.Likes-- return p }
读写RWMutex的使用
读写互斥锁,将锁操作类型做了区分,分为读锁和写锁,由sync.RWMutex类型实现:
-
读锁,Read Lock,共享读,阻塞写
-
写锁,Lock,独占操作,阻塞读写
并发 | 读 | 写 |
---|---|---|
读 | 支持 | 不支持 |
写 | 不支持 | 不支持 |
之所以减小锁的粒度,因为实际操作中读操作的比例要远高于写操作的比例,增加了共享读操作锁后,可以更大程度的提升读的并发能力。
sync.RWMutex 提供了如下方法完成操作:
type RWMutex // 写锁定 func (rw *RWMutex) Lock() // 写解锁 func (rw *RWMutex) Unlock() // 读锁定 func (rw *RWMutex) RLock() // 读解锁 func (rw *RWMutex) RUnlock() // 尝试加写锁定 func (rw *RWMutex) TryLock() bool // 尝试加读锁定 func (rw *RWMutex) TryRLock() bool
写锁定,与互斥锁Mutex的语法和操作结果一致,都是保证互斥的独占操作。
读锁定,可以在已经存在读锁的情况下,加锁成功。
如图所示:
读锁示例:
func SyncRLock() { wg := sync.WaitGroup{} // 模拟多个goroutine var rwlck sync.RWMutex for i := 0; i < 10; i++ { wg.Add(1) go func() { defer wg.Done() // //rwlck.Lock() rwlck.RLock() // 输出一段内容 fmt.Println(time.Now()) time.Sleep(1 * time.Second) // //rwlck.Unlock() rwlck.RUnlock() }() } wg.Add(1) go func() { defer wg.Done() // rwlck.Lock() //rwlck.RLock() // 输出一段内容 fmt.Println(time.Now(), "Lock") time.Sleep(1 * time.Second) // rwlck.Unlock() //rwlck.RUnlock() }() wg.Wait() }
其中,使用读锁,输出操作会全部立即执行,然后集体sleep1s后全部结束。使用写锁,输出和Sleep会间隔1s依次执行。
实操示例:
type Article struct { Subject string // 赞 likes int // 操作锁定 mu sync.RWMutex } func (a Article) Likes() int { a.mu.RLock() defer a.mu.RUnlock() return a.likes } func (a *Article) IncrLikes() *Article { a.mu.Lock() defer a.mu.Unlock() a.likes++ return a }
同步Map sync.Map
Go中Map是非线程(goroutine)安全的。并发操作 Map 类型时,会导致 fatal error: concurrent map read and map write
错误:
func SyncMapErr() { m := map[string]int{} // 并发map写 go func() { for { m["key"] = 0 } }() // 并发map读 go func() { for { _ = m["key"] } }() // 阻塞 select {} }
之所以Go不支持Map的并发安全,是因为Go认为Map的典型使用场景不需要在多个Goroutine间并发安全操作Map。
并发安全操作Map的方案:
-
锁 + Map,自定义Map操作,增加锁的控制,可以选择 Mutex和RWMutex。
-
sync.Map,sync包提供的安全Map.
锁+Map示例,在结构体内嵌入sync.Mutex:
func SyncMapLock() { myMap := struct { sync.RWMutex Data map[string]int }{ Data: map[string]int{}, } // write myMap.Lock() myMap.Data["key"] = 0 myMap.Unlock() // read myMap.RLock() _ = myMap.Data["key"] myMap.RUnlock() }
sync.Map 的使用
type Map // 最常用的4个方法: // 存储 func (m *Map) Store(key, value any) // 遍历 map func (m *Map) Range(f func(key, value any) bool) // 删除某个key元素 func (m *Map) Delete(key any) // 返回key的值。存在key,返回value,true,不存在返回 nil, false func (m *Map) Load(key any) (value any, ok bool) // 若m[key]==old,执行删除。key不存在,返回false func (m *Map) CompareAndDelete(key, old any) (deleted bool) // 若m[key]==old,执行交换, m[key] = new func (m *Map) CompareAndSwap(key, old, new any) bool // 返回值后删除元素。loaded 表示是否load成功,key不存在,loaded为false func (m *Map) LoadAndDelete(key any) (value any, loaded bool) // 加载,若加载失败则存储。返回加载或存储的值和是否加载 func (m *Map) LoadOrStore(key, value any) (actual any, loaded bool) // 存储新值,返回之前的值。loaded表示key是否存在 func (m *Map) Swap(key, value any) (previous any, loaded bool)
sync.Map 不需要类型初始化,即可使用,可以理解为map[comparable]any。
使用示例,不会触发 fatal error: concurrent map read and map write
:
func SyncSyncMap() { var m sync.Map go func() { for { m.Store("key", 0) } }() go func() { for { _, _ = m.Load("key") } }() select {} }
使用示例:
func SyncSyncMapMethod() { wg := sync.WaitGroup{} var m sync.Map for i := 0; i < 10; i++ { wg.Add(1) go func(n int) { defer wg.Done() m.Store(n, fmt.Sprintf("value: %d", n)) }(i) } for i := 0; i < 10; i++ { wg.Add(1) go func(n int) { defer wg.Done() fmt.Println(m.Load(n)) }(i) } wg.Wait() m.Range(func(key, value any) bool { fmt.Println(key, value) return true }) // m.Delete(4) }
并发安全操作Map的方案的选择,统计的压测数据显示,相对而言:
-
锁 + Map,写快,读慢
-
sync.Map,读快,写慢,删快,适合读多写少的场景
原子操作 sync/atomic
原子操作即是进行过程中不能被中断的操作,针对某个值的原子操作在被进行的过程中,CPU绝不会再去进行其他的针对该值的操作。为了实现这样的严谨性,原子操作仅会由一个独立的CPU指令代表和完成。原子操作是无锁的,常常直接通过CPU指令直接实现。 事实上,其它同步技术的实现常常依赖于原子操作。
原子操作是CPU指令级别实现的,比如在Intel的CPU上主要是使用总线锁的方式,AMD的CPU架构机器上就是使用MESI一致性协议的方式来保证原子操作。
go中 sync/atomic 包提供了原子操作的支持,用于同步操作整型(和指针类型):
-
int32
-
int64
-
uint32
-
uint64
-
uintptr
-
unsafe.Pointer
针对于以上类型,提供了如下操作:
// Type 是以上的类型之一 // 比较相等后交换 CAS func CompareAndSwapType(addr *Type, old, new Type) (swapped bool) // 交换 func SwapType(addr *Type, new Type) (old Type) // 累加 func AddType(addr *Type, delta Type) (new Type) // 获取 func LoadType(addr *Type) (val Type) // 存储 func StoreType(addr *Type, val Type)
除了以上函数,还提供了对应的类型方法操作,以Int32为例:
type Int32 func (x *Int32) Add(delta int32) (new int32) func (x *Int32) CompareAndSwap(old, new int32) (swapped bool) func (x *Int32) Load() int32 func (x *Int32) Store(val int32) func (x *Int32) Swap(new int32) (old int32)
除了以上几个整型,bool类型也提供了类型上的原子操作:
type Bool func (x *Bool) CompareAndSwap(old, new bool) (swapped bool) func (x *Bool) Load() bool func (x *Bool) Store(val bool) func (x *Bool) Swap(new bool) (old bool)
示例:
func SyncAtomicAdd() { // 并发的过程,没有加锁,Lock //var counter int32 = 0 // type // atomic 原子的Int32, counter := 0 counter := atomic.Int32{} wg := sync.WaitGroup{} for i := 0; i < 1000; i++ { wg.Add(1) go func() { defer wg.Done() for i := 0; i < 100; i++ { //atomic.AddInt32(&counter, 1) // type // 原子累加操作 , counter ++ counter.Add(1) } }() } wg.Wait() //fmt.Println("counter:", atomic.LoadInt32(&counter)) // type fmt.Println("counter:", counter.Load()) }
以上示例不会出现不到10000的情况了。
除了预定义的整型的支持,还可以使用 atomic.Value 类型,完成其他类型的原子操作:
type Value func (v *Value) CompareAndSwap(old, new any) (swapped bool) func (v *Value) Load() (val any) func (v *Value) Store(val any) func (v *Value) Swap(new any) (old any)
使用方法:
func SyncAtomicValue() { var loadConfig = func() map[string]string { return map[string]string{ // some config "title": "马士兵Go并发编程", "varConf": fmt.Sprintf("%d", rand.Int63()), } } var config atomic.Value // 每N秒加载一次配置文件 go func() { for { config.Store(loadConfig()) fmt.Println("latest config was loaded", time.Now().Format("15:04:05.99999999")) time.Sleep(time.Second) } }() // 使用配置 // 不能在加载的过程中使用配置 for { go func() { c := config.Load() fmt.Println(c, time.Now().Format("15:04:05.99999999")) }() time.Sleep(400 * time.Millisecond) } select {} }
sync.Pool 并发安全池
池是一组可以单独保存和检索的可以复用的临时对象。存储在池中的任何项目可随时自动删除,无需通知。一个池可以安全地同时被多个goroutine使用。
典型特征:
-
sync.Pool 是并发安全的
-
池中的对象由Go负责删除,内存由Go自己回收
-
池中元素的数量由Go负责管理,用户无法干预
-
池中元素应该是临时的,不应该是持久的。例如长连接不适合放入 sync.Pool 中
池的目的是缓存已分配但未使用的项目以供以后重用,从而减轻垃圾收集器的压力。也就是说,它使构建高效、线程安全的自由元素变得容易。
池的一个适当用途是管理一组临时项,这些临时项在包的并发独立客户端之间默默共享,并可能被其重用。池提供了一种在许多客户机上分摊分配开销的方法。
一个很好地使用池的例子是fmt包,它维护了临时输出缓冲区的动态大小存储。
池由 sync.Pool类型实现,具体三个操作:
-
初始化Pool实例,需要提供池中缓存元素的New方法。
-
申请元素,func (p *Pool) Get() any
-
交回对象,func (p *Pool) Put(x any)
操作示例:
func SyncPool() { // 原子的计数器 var counter int32 = 0 // 定义元素的Newer,创建器 elementNewer := func() any { // 原子的计数器累加 atomic.AddInt32(&counter, 1) // 池中元素推荐(强烈)是指针类型 return new(bytes.Buffer) } // Pool的初始化 pool := sync.Pool{ New: elementNewer, } // 并发的申请和交回元素 workerNum := 1024 * 1024 wg := sync.WaitGroup{} wg.Add(workerNum) for i := 0; i < workerNum; i++ { go func() { defer wg.Done() // 申请元素,通常需要断言为特定类型 buffer := pool.Get().(*bytes.Buffer) // 不用Pool //buffer := elementNewer().(*bytes.Buffer) // 交回元素 defer pool.Put(buffer) // 使用元素 _ = buffer.String() }() } // wg.Wait() // 测试创建元素的次数 fmt.Println("elements number is :", counter) } // elements number is : 12
测试的时候,大家可以发现创建的元素数量远远低于goroutine的数量。
DATA RACE 现象
当程序运行时,由于并发的原因会导致数据竞争使用,有时在编写代码时很难发现,要经过大量测试才会发现。可以用 go run -race
,增加-race选项,检测运行时可能出现的竞争问题。
测试之前的计数器累加代码:本例子需要 main.main 来演示,因为是 go run:
package main import ( "fmt" "sync" ) func main() { wg := sync.WaitGroup{} // 计数器 counter := 0 // 多个goroutine并发的累加计数器 gs := 1000 wg.Add(gs) for i := 0; i < gs; i++ { go func() { defer wg.Done() // 累加 for k := 0; k < 100; k++ { counter++ // ++ 操作不是原子的 // counter = counter + 1 // 1. 获取当前的counter变量 // 2. +1 // 3. 赋值新值到counter } }() } // 统计计数结果 wg.Wait() fmt.Println("Counter:", counter) }
结果:
# 没有使用 -race PS D:\apps\goExample\concurrency> go run .\syncRace.go n: 94077 # 使用 -race PS D:\apps\goExample\concurrency> go run -race .\syncRace.go ================== WARNING: DATA RACE Read at 0x00c00000e0f8 by goroutine 9: main.main.func1() D:/apps/goExample/concurrency/syncMain.go:16 +0xa8 Previous write at 0x00c00000e0f8 by goroutine 7: Goroutine 9 (running) created at: main.main() D:/apps/goExample/concurrency/syncMain.go:13 +0x84 Goroutine 7 (finished) created at: main.main() D:/apps/goExample/concurrency/syncMain.go:13 +0x84 ================== n: 98807 Found 1 data race(s) exit status 66
该选项用于在开发阶段,检测数据竞争情况。
出现 data race情况,可以使用锁,或原子操作的来解决。
sync.Once
若需要保证多个并发goroutine中,某段代码仅仅执行一次,就可以使用 sync.Once 结构实现。
例如,在获取配置的时候,往往仅仅需要获取一次,然后去使用。在多个goroutine并发时,要保证能够获取到配置,同时仅获取一次配置,就可以使用sync.Once结构:
func SyncOnce() { // 初始化config变量 config := make(map[string]string) // 1. 初始化 sync.Once once := sync.Once{} // 加载配置的函数 loadConfig := func() { // 2. 利用 once.Do() 来执行 once.Do(func() { // 保证执行一次 config = map[string]string{ "varInt": fmt.Sprintf("%d", rand.Int31()), } fmt.Println("config loaded") }) } // 模拟多个goroutine,多次调用加载配置 // 测试加载配置操作,执行了几次 workers := 10 wg := sync.WaitGroup{} wg.Add(workers) for i := 0; i < workers; i++ { go func() { defer wg.Done() // 并发的多次加载配置 loadConfig() // 使用配置 _ = config }() } wg.Wait() }
核心逻辑:
-
初始化 sync.Once
-
once.Do(func()) 可以确保func()仅仅执行一次
sync.Once 的实现很简单:
type Once struct { // 是否已处理,保证一次 done uint32 // 锁,保证并发安全 m Mutex }
sync.Cond
sync.Cond是sync包提供的基于条件(Condition)的通知结构。
该结构提供了4个方法:
// 创建Cond func NewCond(l Locker) *Cond // 全部唤醒 func (c *Cond) Broadcast() // 唤醒1个 func (c *Cond) Signal() // 等待唤醒 func (c *Cond) Wait()
其中,创建时,需要1个Locker作为参数,通常是 sync.Mutext或sync.RWMutex。然后两个方法用来通知,一个方法用来等待。
使用逻辑很简单,通常是一个goroutine负责通知,多个goroutine等待处理,如图:
创建Cond,sync.NewCond() 需要提供锁,同时在等待操作和广播(信号)操作中,通常需要先申请锁,其中等待操作是必须的,而官博(信号)操作是可选的。,例如:
cond := sync.NewCond(&sync.Mutex{}) cond := sync.NewCond(&sync.RWMutex{})
还有,cond的广播和信号通知操作是并发安全的,可以重复调用的。
要注意Wait()操作,是会先解锁,等到广播信号后,再加锁。因此,Wait()操作前,要加锁。
示例代码:
-
一个goroutine负责接收数据,完毕后,广播给处理数据的goroutine
-
多个goroutine处理数据,在数据未处理完前,等待广播信号。信号来了,处理数据
func SyncCond() { wg := sync.WaitGroup{} dataCap := 1024 * 1024 var data []int cond := sync.NewCond(&sync.Mutex{}) for i := 0; i < 8; i++ { wg.Add(1) go func(c *sync.Cond) { defer wg.Done() c.L.Lock() for len(data) < dataCap { c.Wait() } fmt.Println("listen", len(data), time.Now()) c.L.Unlock() }(cond) } wg.Add(1) go func(c *sync.Cond) { defer wg.Done() c.L.Lock() defer c.L.Unlock() for i := 0; i < dataCap; i++ { data = append(data, i*i) } fmt.Println("Broadcast") c.Broadcast() //c.Signal() }(cond) // 为什么 for { wait() } // 另外的广播goroutine //wg.Add(1) //go func(c *sync.Cond) { // defer wg.Done() // c.Broadcast() //}(cond) wg.Wait() }
示例代码要点:
-
wait所在的goroutine要判定是否需要wait,所以wait要出现在条件中,因为goroutine调用的关系,不能保证wait在broadcast前面执行
-
wait要使用for进行条件判定,因为在wait返回后,条件不一定成立。因为Broadcast()操作可能被提前调用(通常是在其他的goroutine中。
-
Broadcast() 操作可选的是否加锁解锁
-
Wait() 操作前,一定要加锁。因为Wait()操作,会先解锁,接收到信号后,再加锁。
sync.Cond 基本原理
sync.Cond结构:
type Cond struct { // 锁 L Locker // 等待通知goroutine列表 notify notifyList // 限制不能被拷贝 noCopy noCopy checker copyChecker }
结构上可见,Cond记录了等待的goroutine列表,这样就可以做到,广播到全部的等待goroutine。这也是Cond应该被复制的原因,否则这些goroutine可能会被意外唤醒。
Wait() 操作:
func (c *Cond) Wait() { // 检查是否被复制 c.checker.check() // 更新 notifyList 中需要等待的 waiter 的数量 // 返回当前需要插入 notifyList 的编号 t := runtime_notifyListAdd(&c.notify) // 解锁 c.L.Unlock() // 挂起,直到被唤醒 runtime_notifyListWait(&c.notify, t) // 唤醒之后,重新加锁。 // 因为阻塞之前解锁了。 c.L.Lock() }
核心工作就是,记录当goroutine到Cond的notifyList。之后解锁,挂起,加锁。因此要在Wait()前加锁,后边要解锁。
Broadcast()操作:
func (c *Cond) Broadcast() { // 检查 sync.Cond 是否被复制了 c.checker.check() // 唤醒 notifyList 中的所有 goroutine runtime_notifyListNotifyAll(&c.notify) }
核心工作就是唤醒 notifyList 中全部的 goroutine。
小结
同步类型:
-
数据同步,保证数据操作的原子性
-
sync/atomic
-
sync.Map
-
sync.Mutex, sync.RWMutex
-
-
操作同步
-
sync.Mutex, sync.RWMutex
-
锁的类型:
-
互斥锁 sync.Mutex,完全独占
-
读写互斥锁 sync.RWMutex,可以共享读操作
锁的不锁资源,只是锁定申请锁本身的操作。
sync包总结
-
锁:sync.Mutex, sync.RWMutex
-
数据:sync.Map, sync/atomic
-
sync.Pool
-
sync.Once
-
sync.Cond
使用Channel完成数据和信号的同步!
原文地址:https://blog.csdn.net/weixin_51568389/article/details/140535152
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!