目 录CONTENT

文章目录

Go并发编程-同步和锁

Sakura
2023-09-23 / 0 评论 / 0 点赞 / 23 阅读 / 20035 字 / 正在检测是否收录...

一: 基本介绍

同步是并发编程的基本要素之一,通过 channel 可以完成多个 goroutine 间数据和信号的同步。

除了 channel 外,还可以使用go的官方同步包 sync,sync/atomic 完成一些基础的同步功能。主要包含同步数据、锁、原子操作等

  • 没有进行同步的案例

func Mutex() {
	// 定义计数器
	counter := 0

	// 开启多个goroutine
	// 每个goroutine对counter++
	n := 1
	for i := 0; i < n; i++ {
		go func() {
			for j := 0; j < 100; j++ {
				counter++
			}
		}()
	}
	fmt.Println("counter:", counter)
	time.Sleep(300 * time.Millisecond)
}

counter++ 操作不是原子的

  1. 拿到 counter 变量

  2. 执行++

  3. 对counter赋值

  • 进行同步的案例

func Mutex() {
	wg := sync.WaitGroup{}
	// 定义计数器
	counter := 0

	// 开启多个goroutine
	// 每个goroutine对counter++
	// 创建锁
	lock := sync.Mutex{}
	n := 100
	wg.Add(n)
	for i := 0; i < n; i++ {
		go func() {
			defer wg.Done()
			for j := 0; j < 100; j++ {
				// 申请锁
				lock.Lock()
				counter++
				// 释放锁
				lock.Unlock()
			}
		}()
	}
	wg.Wait()
	fmt.Println("counter:", counter)
}

二: 互斥锁 Mutex

sync包提供了两种锁:

  • 互斥锁,Mutex

  • 读写互斥锁,RWMutex

互斥锁,同一时刻只能有一个 goroutine 申请锁定成功,不区分读、写操作。也称为:独占锁、排它锁

type Mutex
// 锁定锁m, 若锁m已是锁定状态,调用的goroutine会被阻塞,直到可以锁定
func (m *Mutex) Lock()
// 解锁锁m,若m不是锁定状态,会导致运行时错误
func (m *Mutex) Unlock()
// 尝试是否可以加锁,返回是否成功
func (m *Mutex) TryLock() bool

注意 :

锁与函数和 goroutine 都没有直接关联 , 谁能拿到 lock 变量 , 谁就有资格加锁解锁

  1. 可以在一个函数加锁 , 另一个函数中解锁

  2. 甚至可以在一个goroutine中加锁 , 另一个goroutine中解锁

var lck sync.Mutex
func () {
    lck.Lock()
    // 互斥执行的代码
    defer lck.Unlock()
}
  • 流程图

代码实现

func SyncMutex() {
    wg := sync.WaitGroup{}
    var lck sync.Mutex
    for i := 0; i < 4; i++ {
        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            fmt.Println("before lock: ", n)
            lck.Lock()
            fmt.Println("locked: ", n)
            time.Sleep(1 * time.Second)
            lck.Unlock()
            fmt.Println("after lock: ", n)
        }(i)
    }
    wg.Wait()
}

1. 不受锁限制的情况

注意,如果其他goroutine没有通过相同的锁(1没用锁,2用了其他锁)去操作资源,那么是不受锁限制的

  • 两组 goroutine 用的不是同一把锁

func SyncLockAndNo() {
    n := 0
    wg := sync.WaitGroup{}
    lk := sync.Mutex{}
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for i := 0; i < 100; i++ {
                lk.Lock()
                n++
                lk.Unlock()
            }
        }()
    }
    wg.Add(1)
    go func() {
        defer wg.Done()
        for i := 0; i < 10000; i++ {
            n++
        }
    }()

    // 其他锁
    var lk2 sync.Mutex
    go func() {
        defer wg.Done()
        for i := 0; i < 10000; i++ {
            lk2.Lock()
            n++
            lk2.Unlock()
        }
    }()

    wg.Wait()
    fmt.Println("n:", n)
}

2. 定义在结构体中

实操时,锁除了直接调用外,还经常性出现在结构体中,以某个字段的形式出现,用于包含struct字段不会被多gorutine同时修改,例如 cancelCtx

type cancelCtx struct {
    Context

    mu       sync.Mutex            // protects following fields
    done     atomic.Value          // of chan struct{}, created lazily, closed by first cancel call
    children map[canceler]struct{} // set to nil by the first cancel call
    err      error                 // set to non-nil by the first cancel call
}
type Post struct {
    Subject string
    // 赞
    Likes   int
    // 操作锁定
    mu sync.Mutex
}

func (p *Post) IncrLikes() *Post {
    p.mu.Lock()
    defer p.mu.Unlock()
    p.Likes++

    return p
}

func (p *Post) DecrLikes() *Post {
    p.mu.Lock()
    defer p.mu.Unlock()
    p.Likes--

    return p
}

三: 读写锁 RWMutex

读写互斥锁,将锁操作类型做了区分,分为读锁写锁,由sync.RWMutex类型实现:

  • 读锁,Read Lock,共享读,阻塞写

  • 写锁,Lock,独占操作,阻塞读写

写锁和之前的互斥锁是没有区别的 , 是独占操作 , 区别在于读锁

type RWMutex
// 写锁定
func (rw *RWMutex) Lock()
// 写解锁
func (rw *RWMutex) Unlock()

// 读锁定
func (rw *RWMutex) RLock()
// 读解锁
func (rw *RWMutex) RUnlock()

// 尝试加写锁定
func (rw *RWMutex) TryLock() bool
// 尝试加读锁定
func (rw *RWMutex) TryRLock() bool

1. 使用

  • 读写锁中的 Lock()Unlock() 和之前的互斥锁没有什么区别

主要区别在于读锁

func SyncRWLock() {
	wg := sync.WaitGroup{}

	// 申请读写锁
	Lock := sync.RWMutex{}

	// 开启多个 goroutine
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			// 加锁
			//Lock.Lock()

			// 加读锁
			Lock.RLock()
			fmt.Println(time.Now())
			time.Sleep(1 * time.Second)
			// 解锁
			//Lock.Unlock()

			// 接锁
			Lock.RUnlock()
		}()
	}
	wg.Wait()
}

读锁不会互斥访问 , 是共享读

2. goroutine 底层对读写锁的优化

如果共同定义了读锁和写锁 , 那么 goroutine 底层会让写锁尽可能先调度 ( 执行 )

因为逻辑上应该让之后的用户看到更新后的数据

func SyncRWLock() {
	wg := sync.WaitGroup{}
	// 申请读写锁
	Lock := sync.RWMutex{}
	// 开启多个 goroutine
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			// 加锁
			Lock.RLock()
			fmt.Println(time.Now())
			time.Sleep(1 * time.Second)
			// 解锁
			Lock.RUnlock()
		}()
	}

	wg.Add(1)
	go func() {
		defer wg.Done()
		// 写锁
		Lock.Lock()
		fmt.Println(time.Now(), " Writing...")
		Lock.Unlock()
	}()
	wg.Wait()
}

3. 实际应用

Likes() 获取点赞数方法中 , 加了写锁

IncrLikes() 增加点赞数方法中 , 加了读锁

type Article struct {
    Subject string
    // 赞
    likes int
    // 操作锁定
    mu sync.RWMutex
}

func (a Article) Likes() int {
    a.mu.RLock()
    defer a.mu.RUnlock()
    return a.likes
}

func (a *Article) IncrLikes() *Article {
    a.mu.Lock()
    defer a.mu.Unlock()
    a.likes++
    return a
}

四: 同步 Map Sync.Map

Go 中 Ma p是非线程(goroutine)安全的。并发操作 Map 类型时,会导致 fatal error: concurrent map read and map write错误:

Go 认为 Map 的典型使用场景不需要在多个 Goroutine 间并发安全操作Map。

func SyncMapErr() {
	m := map[string]int{}
	// 并发map写
	go func() {
		for {
			m["key"] = 0
		}
	}()
	// 并发map读
	go func() {
		for {
			_ = m["key"]
		}
	}()
	// 阻塞
	select {}
}

并发安全操作Map的方案:

  • 锁 + Map,自定义Map操作,增加锁的控制,可以选择 Mutex和RWMutex。

  • sync.Map,sync包提供的安全Map.

  1. 锁+Map示例,在结构体内嵌入sync.Mutex:

func SyncMapLock() {
    myMap := struct {
        sync.RWMutex
        Data map[string]int
    }{
        Data: map[string]int{},
    }

    // write
    myMap.Lock()
    myMap.Data["key"] = 0
    myMap.Unlock()

    // read
    myMap.RLock()
    _ = myMap.Data["key"]
    myMap.RUnlock()
}
  1. sync.Map 的使用

type Map
// 最常用的4个方法:
// 存储
func (m *Map) Store(key, value any)
// 遍历 map
func (m *Map) Range(f func(key, value any) bool)
// 删除某个key元素
func (m *Map) Delete(key any)
// 返回key的值。存在key,返回value,true,不存在返回 nil, false
func (m *Map) Load(key any) (value any, ok bool)

// 若m[key]==old,执行删除。key不存在,返回false
func (m *Map) CompareAndDelete(key, old any) (deleted bool)
// 若m[key]==old,执行交换, m[key] = new
func (m *Map) CompareAndSwap(key, old, new any) bool

// 返回值后删除元素。loaded 表示是否load成功,key不存在,loaded为false
func (m *Map) LoadAndDelete(key any) (value any, loaded bool)
// 加载,若加载失败则存储。返回加载或存储的值和是否加载
func (m *Map) LoadOrStore(key, value any) (actual any, loaded bool)

// 存储新值,返回之前的值。loaded表示key是否存在
func (m *Map) Swap(key, value any) (previous any, loaded bool)

五: 原子操作 sync/automic

原子操作即是进行过程中不能被中断的操作,针对某个值的原子操作在被进行的过程中,CPU 绝不会再去进行其他的针对该值的操作。为了实现这样的严谨性,原子操作仅会由一个独立的 CPU 指令代表和完成。原子操作是无锁的,常常直接通过 CPU 指令直接实现。 事实上,其它同步技术的实现常常依赖于原子操作。

sync/automic包提供了原子操作的支持 , 用于同步操作整型(和指针类型)int32 int64 uint32 uint64 uintptr unsafe.Pointer

1. 整型类型

// Type 是以上的类型之一
// 比较相等后交换 CAS
func CompareAndSwapType(addr *Type, old, new Type) (swapped bool)
// 交换
func SwapType(addr *Type, new Type) (old Type)
// 累加
func AddType(addr *Type, delta Type) (new Type)
// 获取
func LoadType(addr *Type) (val Type)
// 存储
func StoreType(addr *Type, val Type)

2. 布尔类型

  • bool 类型也提供了原子操作

type Int32
func (x *Int32) Add(delta int32) (new int32)
func (x *Int32) CompareAndSwap(old, new int32) (swapped bool)
func (x *Int32) Load() int32
func (x *Int32) Store(val int32)
func (x *Int32) Swap(new int32) (old int32)

加锁操作是 Go 语言层级上的 , 原子操作是 CPU 指令层级上直接实现

3. 原子操作整型应用

由于counter.Add() 是原子操作 , 就不会发送多个 goroutine 同时操作 counter 变量了

func AutomicTest() {
	wg := sync.WaitGroup{}

	// 定义一个并发安全的Int64整数
	counter := atomic.Int64{}

	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for j := 0; j < 1000; j++ {
				// 因为counter是并发安全的
				// 所以这里的Add操作是原子操作
				counter.Add(1)
			}
		}()
	}

	wg.Wait()
	fmt.Println("counter:", counter.Load()) // 10000
}

4. 其他类型

除了预定义的整型的支持,还可以使用atomic.Value类型,完成其他类型的原子操作:

type Value
func (v *Value) CompareAndSwap(old, new any) (swapped bool)
func (v *Value) Load() (val any)
func (v *Value) Store(val any)
func (v *Value) Swap(new any) (old any)

每秒加载配置文件 , 每次加载的配置文件都不一样

每 400 毫秒加载配置文件

配置的加载和读取操作必须是并发安全的原子操作

func OtherValue() {
	// 将匿名函数赋值给一个变量
	// 模拟加载文件配置,返回解析的配置信息
	var loadConfig = func() map[string]string {
		// 每次加载的配置文件都不一样
		return map[string]string{
			"title": "Sakura",
			"num":   fmt.Sprintf("%d", rand.Intn(2048)),
		}
	}

	// config 的操作应该要是并发安全的,所以应该使用原子操作
	var config atomic.Value

	// 每N秒加载一次配置文件
	go func() {
		for {
			config.Store(loadConfig())
			fmt.Println("latest config was loaded", time.Now().Format("15:04:05.99999999"))
			time.Sleep(time.Second)
		}
	}()

	// 使用配置
	// 不能在加载的过程中使用配置
	for {
		go func() {
			c := config.Load()
			fmt.Println(c, time.Now().Format("15:04:05.99999999"))
		}()
		time.Sleep(400 * time.Millisecond)
	}
	select {}
}

六: sync pool 并发安全池

sync.pool 并发安全的临时资源池 , 通常用这个池子存储可以复用的对象 ( 缓冲对象 , 缓存对象 ) , 比如: 数据库的连接

如果资源是可以复用的临时对象 , 就可以使用功能并发安全池

如果想要使用某个对象 , 直接去池子中拿 , 用完之后再放回去

典型特征:

  • sync.Pool 是并发安全的

  • 池中的对象由 Go 负责删除,内存由Go自己回收

  • 池中元素的数量由 Go 负责管理,用户无法干预

  • 池中元素应该是临时的,不应该是持久的。例如长连接不适合放入 sync.Pool 中

1. 操作

池由 sync.Pool类型实现,具体三个操作:

  • 初始化Pool实例,需要提供池中缓存元素的New方法。

  • 申请元素,func (p *Pool) Get() any

  • 交回对象,func (p *Pool) Put(x any)

// 并发安全池联系
func SyncPool() {
	wg := sync.WaitGroup{}
	// 原子计数器,通过counter判断调用了多少次counter
	var counter int64 = 0

	// 定义元素的Newer,创建器
	ByteBuffer := func() any {
		atomic.AddInt64(&counter, 1)

		// 池中元素推荐(强烈)是指针类型
		return new(bytes.Buffer)
	}

	// 初始化Pool
	Pool := sync.Pool{
		New: ByteBuffer,
	}

	// 并发的申请和交回元素
	for i := 0; i < 100000; i++ {
		wg.Add(1)
		go func() {
			// 使用并发池里的资源,需要做类型断言
			buffer := Pool.Get().(*bytes.Buffer)
			// 交回元素
			defer Pool.Put(buffer)
			defer wg.Done()
			//使用元素
			_ = buffer.String()
		}()
	}
	wg.Wait()

	fmt.Println("池子中的资源数量: ", counter)
}

池的目的是缓存已分配但未使用的项目以供以后重用,从而减轻垃圾收集器的压力。也就是说,它使构建高效、线程安全的自由元素变得容易。

池的一个适当用途是管理一组临时项,这些临时项在包的并发独立客户端之间默默共享,并可能被其重用。池提供了一种在许多客户机上分摊分配开销的方法。

2.

七: DATA RACE 现象

该选项用于在开发阶段 , 检测数据竞争的情况

出现 Data Race 的情况 , 可以使用锁, ,原子操作来解决

// 检测运行时可能出现的竞争问题。 
go run -race ./main.go
func main() {
	wg := sync.WaitGroup{}
	// 定义计数器
	counter := 0

	// 开启多个goroutine
	// 每个goroutine对counter++
	//lock := sync.Mutex{}
	n := 10000
	wg.Add(n)
	for i := 0; i < n; i++ {
		go func() {
			defer wg.Done()
			for j := 0; j < 100; j++ {
				//lock.Lock()
				counter++
				//lock.Unlock()
			}
		}()
	}
	wg.Wait()
	fmt.Println("counter:", counter)
}
  • 检测结果

# 没有使用 -race
PS D:\apps\goExample\concurrency> go run .\syncRace.go
n: 94077

# 使用 -race
PS D:\apps\goExample\concurrency> go run -race .\syncRace.go
==================
WARNING: DATA RACE  
Read at 0x00c00000e0f8 by goroutine 9:
  main.main.func1()
      D:/apps/goExample/concurrency/syncMain.go:16 +0xa8

Previous write at 0x00c00000e0f8 by goroutine 7:
Goroutine 9 (running) created at:
  main.main()
      D:/apps/goExample/concurrency/syncMain.go:13 +0x84

Goroutine 7 (finished) created at:
  main.main()
      D:/apps/goExample/concurrency/syncMain.go:13 +0x84
==================
n: 98807
Found 1 data race(s)
exit status 66

八: sync.Once 保证代码仅执行一次

若需要保证多个并发 goroutine 中,某段代码仅仅执行一次,就可以使用 sync.Once 结构实现

例如,在获取配置的时候,往往仅仅需要获取一次,然后去使用。在多个goroutine并发时,要保证能够获取到配置,同时仅获取一次配置,就可以使用sync.Once结构:

func SyncOnce() {

    // 初始化config变量
    config := make(map[string]string)

    // 1. 初始化 sync.Once
    once := sync.Once{}

    // 加载配置的函数
    loadConfig := func() {
        // 2. 利用 once.Do() 来执行
        once.Do(func() {
            // 保证执行一次
            config = map[string]string{
                "varInt": fmt.Sprintf("%d", rand.Int31()),
            }
            fmt.Println("config loaded")
        })
    }

    // 模拟多个goroutine,多次调用加载配置
    // 测试加载配置操作,执行了几次
    workers := 10
    wg := sync.WaitGroup{}
    wg.Add(workers)
    for i := 0; i < workers; i++ {
        go func() {
            defer wg.Done()
            // 并发的多次加载配置
            loadConfig()
            // 使用配置
            _ = config

        }()
    }
    wg.Wait()
}
0

评论区