一: 基本介绍
同步是并发编程的基本要素之一,通过 channel 可以完成多个 goroutine 间数据和信号的同步。
除了 channel 外,还可以使用go的官方同步包 sync,sync/atomic 完成一些基础的同步功能。主要包含同步数据、锁、原子操作等
没有进行同步的案例
func Mutex() {
// 定义计数器
counter := 0
// 开启多个goroutine
// 每个goroutine对counter++
n := 1
for i := 0; i < n; i++ {
go func() {
for j := 0; j < 100; j++ {
counter++
}
}()
}
fmt.Println("counter:", counter)
time.Sleep(300 * time.Millisecond)
}
counter++ 操作不是原子的
拿到 counter 变量
执行++
对counter赋值
进行同步的案例
func Mutex() {
wg := sync.WaitGroup{}
// 定义计数器
counter := 0
// 开启多个goroutine
// 每个goroutine对counter++
// 创建锁
lock := sync.Mutex{}
n := 100
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
defer wg.Done()
for j := 0; j < 100; j++ {
// 申请锁
lock.Lock()
counter++
// 释放锁
lock.Unlock()
}
}()
}
wg.Wait()
fmt.Println("counter:", counter)
}
二: 互斥锁 Mutex
sync包提供了两种锁:
互斥锁,Mutex
读写互斥锁,RWMutex
互斥锁,同一时刻只能有一个 goroutine 申请锁定成功,不区分读、写操作。也称为:独占锁、排它锁
type Mutex
// 锁定锁m, 若锁m已是锁定状态,调用的goroutine会被阻塞,直到可以锁定
func (m *Mutex) Lock()
// 解锁锁m,若m不是锁定状态,会导致运行时错误
func (m *Mutex) Unlock()
// 尝试是否可以加锁,返回是否成功
func (m *Mutex) TryLock() bool
注意 :
锁与函数和 goroutine 都没有直接关联 , 谁能拿到 lock 变量 , 谁就有资格加锁解锁
可以在一个函数加锁 , 另一个函数中解锁
甚至可以在一个goroutine中加锁 , 另一个goroutine中解锁
var lck sync.Mutex
func () {
lck.Lock()
// 互斥执行的代码
defer lck.Unlock()
}
流程图
代码实现
func SyncMutex() {
wg := sync.WaitGroup{}
var lck sync.Mutex
for i := 0; i < 4; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
fmt.Println("before lock: ", n)
lck.Lock()
fmt.Println("locked: ", n)
time.Sleep(1 * time.Second)
lck.Unlock()
fmt.Println("after lock: ", n)
}(i)
}
wg.Wait()
}
1. 不受锁限制的情况
注意,如果其他goroutine没有通过相同的锁(1没用锁,2用了其他锁)去操作资源,那么是不受锁限制的
两组 goroutine 用的不是同一把锁
func SyncLockAndNo() {
n := 0
wg := sync.WaitGroup{}
lk := sync.Mutex{}
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
lk.Lock()
n++
lk.Unlock()
}
}()
}
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 10000; i++ {
n++
}
}()
// 其他锁
var lk2 sync.Mutex
go func() {
defer wg.Done()
for i := 0; i < 10000; i++ {
lk2.Lock()
n++
lk2.Unlock()
}
}()
wg.Wait()
fmt.Println("n:", n)
}
2. 定义在结构体中
实操时,锁除了直接调用外,还经常性出现在结构体中,以某个字段的形式出现,用于包含struct字段不会被多gorutine同时修改,例如 cancelCtx:
type cancelCtx struct {
Context
mu sync.Mutex // protects following fields
done atomic.Value // of chan struct{}, created lazily, closed by first cancel call
children map[canceler]struct{} // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}
type Post struct {
Subject string
// 赞
Likes int
// 操作锁定
mu sync.Mutex
}
func (p *Post) IncrLikes() *Post {
p.mu.Lock()
defer p.mu.Unlock()
p.Likes++
return p
}
func (p *Post) DecrLikes() *Post {
p.mu.Lock()
defer p.mu.Unlock()
p.Likes--
return p
}
三: 读写锁 RWMutex
读写互斥锁,将锁操作类型做了区分,分为读锁和写锁,由sync.RWMutex
类型实现:
读锁,Read Lock,共享读,阻塞写
写锁,Lock,独占操作,阻塞读写
写锁和之前的互斥锁是没有区别的 , 是独占操作 , 区别在于读锁
type RWMutex
// 写锁定
func (rw *RWMutex) Lock()
// 写解锁
func (rw *RWMutex) Unlock()
// 读锁定
func (rw *RWMutex) RLock()
// 读解锁
func (rw *RWMutex) RUnlock()
// 尝试加写锁定
func (rw *RWMutex) TryLock() bool
// 尝试加读锁定
func (rw *RWMutex) TryRLock() bool
1. 使用
读写锁中的
Lock()
和Unlock()
和之前的互斥锁没有什么区别
主要区别在于读锁
func SyncRWLock() {
wg := sync.WaitGroup{}
// 申请读写锁
Lock := sync.RWMutex{}
// 开启多个 goroutine
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// 加锁
//Lock.Lock()
// 加读锁
Lock.RLock()
fmt.Println(time.Now())
time.Sleep(1 * time.Second)
// 解锁
//Lock.Unlock()
// 接锁
Lock.RUnlock()
}()
}
wg.Wait()
}
读锁不会互斥访问 , 是共享读
2. goroutine 底层对读写锁的优化
如果共同定义了读锁和写锁 , 那么 goroutine 底层会让写锁尽可能先调度 ( 执行 )
因为逻辑上应该让之后的用户看到更新后的数据
func SyncRWLock() {
wg := sync.WaitGroup{}
// 申请读写锁
Lock := sync.RWMutex{}
// 开启多个 goroutine
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// 加锁
Lock.RLock()
fmt.Println(time.Now())
time.Sleep(1 * time.Second)
// 解锁
Lock.RUnlock()
}()
}
wg.Add(1)
go func() {
defer wg.Done()
// 写锁
Lock.Lock()
fmt.Println(time.Now(), " Writing...")
Lock.Unlock()
}()
wg.Wait()
}
3. 实际应用
在
Likes()
获取点赞数方法中 , 加了写锁在
IncrLikes()
增加点赞数方法中 , 加了读锁
type Article struct {
Subject string
// 赞
likes int
// 操作锁定
mu sync.RWMutex
}
func (a Article) Likes() int {
a.mu.RLock()
defer a.mu.RUnlock()
return a.likes
}
func (a *Article) IncrLikes() *Article {
a.mu.Lock()
defer a.mu.Unlock()
a.likes++
return a
}
四: 同步 Map Sync.Map
Go 中 Ma p是非线程(goroutine)安全的。并发操作 Map 类型时,会导致 fatal error: concurrent map read and map write
错误:
Go 认为 Map 的典型使用场景不需要在多个 Goroutine 间并发安全操作Map。
func SyncMapErr() {
m := map[string]int{}
// 并发map写
go func() {
for {
m["key"] = 0
}
}()
// 并发map读
go func() {
for {
_ = m["key"]
}
}()
// 阻塞
select {}
}
并发安全操作Map的方案:
锁 + Map,自定义Map操作,增加锁的控制,可以选择 Mutex和RWMutex。
sync.Map,sync包提供的安全Map.
锁+Map示例,在结构体内嵌入sync.Mutex:
func SyncMapLock() {
myMap := struct {
sync.RWMutex
Data map[string]int
}{
Data: map[string]int{},
}
// write
myMap.Lock()
myMap.Data["key"] = 0
myMap.Unlock()
// read
myMap.RLock()
_ = myMap.Data["key"]
myMap.RUnlock()
}
sync.Map 的使用
type Map
// 最常用的4个方法:
// 存储
func (m *Map) Store(key, value any)
// 遍历 map
func (m *Map) Range(f func(key, value any) bool)
// 删除某个key元素
func (m *Map) Delete(key any)
// 返回key的值。存在key,返回value,true,不存在返回 nil, false
func (m *Map) Load(key any) (value any, ok bool)
// 若m[key]==old,执行删除。key不存在,返回false
func (m *Map) CompareAndDelete(key, old any) (deleted bool)
// 若m[key]==old,执行交换, m[key] = new
func (m *Map) CompareAndSwap(key, old, new any) bool
// 返回值后删除元素。loaded 表示是否load成功,key不存在,loaded为false
func (m *Map) LoadAndDelete(key any) (value any, loaded bool)
// 加载,若加载失败则存储。返回加载或存储的值和是否加载
func (m *Map) LoadOrStore(key, value any) (actual any, loaded bool)
// 存储新值,返回之前的值。loaded表示key是否存在
func (m *Map) Swap(key, value any) (previous any, loaded bool)
五: 原子操作 sync/automic
原子操作即是进行过程中不能被中断的操作,针对某个值的原子操作在被进行的过程中,CPU 绝不会再去进行其他的针对该值的操作。为了实现这样的严谨性,原子操作仅会由一个独立的 CPU 指令代表和完成。原子操作是无锁的,常常直接通过 CPU 指令直接实现。 事实上,其它同步技术的实现常常依赖于原子操作。
sync/automic
包提供了原子操作的支持 , 用于同步操作整型(和指针类型)int32
int64
uint32
uint64
uintptr
unsafe.Pointer
1. 整型类型
// Type 是以上的类型之一
// 比较相等后交换 CAS
func CompareAndSwapType(addr *Type, old, new Type) (swapped bool)
// 交换
func SwapType(addr *Type, new Type) (old Type)
// 累加
func AddType(addr *Type, delta Type) (new Type)
// 获取
func LoadType(addr *Type) (val Type)
// 存储
func StoreType(addr *Type, val Type)
2. 布尔类型
bool 类型也提供了原子操作
type Int32
func (x *Int32) Add(delta int32) (new int32)
func (x *Int32) CompareAndSwap(old, new int32) (swapped bool)
func (x *Int32) Load() int32
func (x *Int32) Store(val int32)
func (x *Int32) Swap(new int32) (old int32)
加锁操作是 Go 语言层级上的 , 原子操作是 CPU 指令层级上直接实现
3. 原子操作整型应用
由于counter.Add()
是原子操作 , 就不会发送多个 goroutine 同时操作 counter 变量了
func AutomicTest() {
wg := sync.WaitGroup{}
// 定义一个并发安全的Int64整数
counter := atomic.Int64{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 1000; j++ {
// 因为counter是并发安全的
// 所以这里的Add操作是原子操作
counter.Add(1)
}
}()
}
wg.Wait()
fmt.Println("counter:", counter.Load()) // 10000
}
4. 其他类型
除了预定义的整型的支持,还可以使用atomic.Value
类型,完成其他类型的原子操作:
type Value
func (v *Value) CompareAndSwap(old, new any) (swapped bool)
func (v *Value) Load() (val any)
func (v *Value) Store(val any)
func (v *Value) Swap(new any) (old any)
每秒加载配置文件 , 每次加载的配置文件都不一样
每 400 毫秒加载配置文件
配置的加载和读取操作必须是并发安全的原子操作
func OtherValue() {
// 将匿名函数赋值给一个变量
// 模拟加载文件配置,返回解析的配置信息
var loadConfig = func() map[string]string {
// 每次加载的配置文件都不一样
return map[string]string{
"title": "Sakura",
"num": fmt.Sprintf("%d", rand.Intn(2048)),
}
}
// config 的操作应该要是并发安全的,所以应该使用原子操作
var config atomic.Value
// 每N秒加载一次配置文件
go func() {
for {
config.Store(loadConfig())
fmt.Println("latest config was loaded", time.Now().Format("15:04:05.99999999"))
time.Sleep(time.Second)
}
}()
// 使用配置
// 不能在加载的过程中使用配置
for {
go func() {
c := config.Load()
fmt.Println(c, time.Now().Format("15:04:05.99999999"))
}()
time.Sleep(400 * time.Millisecond)
}
select {}
}
六: sync pool 并发安全池
sync.pool
并发安全的临时资源池 , 通常用这个池子存储可以复用的对象 ( 缓冲对象 , 缓存对象 ) , 比如: 数据库的连接
如果资源是可以复用的临时对象 , 就可以使用功能并发安全池
如果想要使用某个对象 , 直接去池子中拿 , 用完之后再放回去
典型特征:
sync.Pool 是并发安全的
池中的对象由 Go 负责删除,内存由Go自己回收
池中元素的数量由 Go 负责管理,用户无法干预
池中元素应该是临时的,不应该是持久的。例如长连接不适合放入 sync.Pool 中
1. 操作
池由 sync.Pool类型实现,具体三个操作:
初始化Pool实例,需要提供池中缓存元素的New方法。
申请元素,
func (p *Pool) Get() any
交回对象,
func (p *Pool) Put(x any)
// 并发安全池联系
func SyncPool() {
wg := sync.WaitGroup{}
// 原子计数器,通过counter判断调用了多少次counter
var counter int64 = 0
// 定义元素的Newer,创建器
ByteBuffer := func() any {
atomic.AddInt64(&counter, 1)
// 池中元素推荐(强烈)是指针类型
return new(bytes.Buffer)
}
// 初始化Pool
Pool := sync.Pool{
New: ByteBuffer,
}
// 并发的申请和交回元素
for i := 0; i < 100000; i++ {
wg.Add(1)
go func() {
// 使用并发池里的资源,需要做类型断言
buffer := Pool.Get().(*bytes.Buffer)
// 交回元素
defer Pool.Put(buffer)
defer wg.Done()
//使用元素
_ = buffer.String()
}()
}
wg.Wait()
fmt.Println("池子中的资源数量: ", counter)
}
池的目的是缓存已分配但未使用的项目以供以后重用,从而减轻垃圾收集器的压力。也就是说,它使构建高效、线程安全的自由元素变得容易。
池的一个适当用途是管理一组临时项,这些临时项在包的并发独立客户端之间默默共享,并可能被其重用。池提供了一种在许多客户机上分摊分配开销的方法。
2.
七: DATA RACE 现象
该选项用于在开发阶段 , 检测数据竞争的情况
出现 Data Race 的情况 , 可以使用锁, ,原子操作来解决
// 检测运行时可能出现的竞争问题。
go run -race ./main.go
报错首先 :
go env -w CGO_ENABLED=1
然后安装 GCC 编译器 : 常见问题之Golang——cgo: C compiler “gcc“ not found: exec: “gcc“: executable file not found in %PATH%错误..._cgo gcc-CSDN博客
func main() {
wg := sync.WaitGroup{}
// 定义计数器
counter := 0
// 开启多个goroutine
// 每个goroutine对counter++
//lock := sync.Mutex{}
n := 10000
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
defer wg.Done()
for j := 0; j < 100; j++ {
//lock.Lock()
counter++
//lock.Unlock()
}
}()
}
wg.Wait()
fmt.Println("counter:", counter)
}
检测结果
# 没有使用 -race
PS D:\apps\goExample\concurrency> go run .\syncRace.go
n: 94077
# 使用 -race
PS D:\apps\goExample\concurrency> go run -race .\syncRace.go
==================
WARNING: DATA RACE
Read at 0x00c00000e0f8 by goroutine 9:
main.main.func1()
D:/apps/goExample/concurrency/syncMain.go:16 +0xa8
Previous write at 0x00c00000e0f8 by goroutine 7:
Goroutine 9 (running) created at:
main.main()
D:/apps/goExample/concurrency/syncMain.go:13 +0x84
Goroutine 7 (finished) created at:
main.main()
D:/apps/goExample/concurrency/syncMain.go:13 +0x84
==================
n: 98807
Found 1 data race(s)
exit status 66
八: sync.Once 保证代码仅执行一次
若需要保证多个并发 goroutine 中,某段代码仅仅执行一次,就可以使用 sync.Once 结构实现
例如,在获取配置的时候,往往仅仅需要获取一次,然后去使用。在多个goroutine并发时,要保证能够获取到配置,同时仅获取一次配置,就可以使用sync.Once结构:
func SyncOnce() {
// 初始化config变量
config := make(map[string]string)
// 1. 初始化 sync.Once
once := sync.Once{}
// 加载配置的函数
loadConfig := func() {
// 2. 利用 once.Do() 来执行
once.Do(func() {
// 保证执行一次
config = map[string]string{
"varInt": fmt.Sprintf("%d", rand.Int31()),
}
fmt.Println("config loaded")
})
}
// 模拟多个goroutine,多次调用加载配置
// 测试加载配置操作,执行了几次
workers := 10
wg := sync.WaitGroup{}
wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer wg.Done()
// 并发的多次加载配置
loadConfig()
// 使用配置
_ = config
}()
}
wg.Wait()
}
评论区