一: Channel 通信
不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存
这是Go语言最核心的设计模式之一。
在很多主流的编程语言中,多个线程传递数据的方式一般都是共享内存,而 Go 语言中多 Goroutine 通信的主要方案是 Channel。Go 语言也可以使用共享内存的方式支持 Goroutine 通信。
Go 语言实现了 CSP 通信模式,CSP 是 Communicating Sequential Processes 的缩写,通信顺序进程。Goroutine 和 Channel 分别对应 CSP 中的实体和传递信息的媒介。CSP 是 Tony Hoare 于1977年提出。
Channel 提供可接收和发送特定类型值的用于并发函数 ( Goroutine ) 通信的数据类型,是满足 FIFO(先进先出)原则的队列类型,先进先出不仅体现在数据类型上,也体现在操作上:
channel 类型的元素是先进先出的,先发送到 chann el 的 value 会先被 receive
先向 Channel 发送数据的 Goroutine 会先执行
先从 Channel 接收数据的 Goroutine 会先执行
二: Channel 的操作语法
一个关键字
chan
chan <-
<- chan
两个函数
make
close
一个语句
发送语句 ch <- expresssion
一个操作符
接收操作符 <- ch
1. Channel 类型
// chan 双向,可收可发
// chan <- 仅发送
// <- chan 仅接收
ChannelType = ( "chan" | "chan" "<-" | "<-" "chan" ) ElementType
任何类型都可以作为 Channel 的元素类型
chan int
chan struct{}
chan <- int
<- chan int
2. 初始化 Channel 值
make(ChannelType, Capacity)
ChannelType 是 channel 类型
Capacity 是缓冲容量 , 可以省略或为0 , 表示无缓冲 Channel
ch := make(chan int)
var ch = make(chan int)
ch := make(chan int, 10)
ch := make(<-chan int)
ch := make(chan<- int, 10)
未使用
make()
初始化的 channel 为nil , nil channel 不能执行收发通信操作 , 例如:
var ch chan int
3. Send 语句和接收操作符
send 语句用于像 Chennel 发送值
接收操作符用于从 Channel 中接收值
// 发送操作
ch <- Expression
ch <- 100
ch <- f()
// 接收操作
<- ch
v1 := <-ch // 声明
v = <-ch // 赋值
f(<-ch) // 函数调用
<-strobe // 等待接收
4. 关闭 Channel
// close()用于关闭channel
close()
关闭 Channel 的意思是记录该 Channel 不能再被发送任何元素了
向已关闭的 Channel 发送回引发 runtime panic
关闭 nil Channel 会引发 runtime panic
不能关闭仅接收 Channel
不能关闭已经关闭的 Channel,否则会引发 runtime panic
当从已关闭的 channel 接收值时候:
可以接收关闭前发送的全部值
若没有已发送的值会返回类型的零值 , 不会被阻塞
使用接收操作符的多值返回结构,可以判断Channel是否已经关闭:
var x, ok = <-ch
x, ok := <-ch
ok 为 true,channel 未关闭
ok 为 false,channel 已关闭
func ChannelOperate() {
// 初始化
ch := make(chan int) //无缓冲channel
//发送值
go func() {
ch <- 100
}()
//接收值
go func() {
value := <-ch
fmt.Println("receive from ch : ", value)
}()
time.Sleep(time.Second)
//关闭ch
close(ch)
}
5. for range channel 持续接收元素
for value := range ch {
fmt.Println(value)
}
若 ch 为 nil channel 会阻塞
若 ch 没有已发送元素会阻塞
func ChannelFor() {
//一: 初始化部分数据
ch := make(chan int)
wg := sync.WaitGroup{}
//二:持续发送
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 5; i++ {
//随机发送数据
ch <- rand.Intn(10)
}
//关闭
close(ch)
}()
//三:持续接收
wg.Add(1)
go func() {
defer wg.Done()
for e := range ch {
fmt.Println("接收到的数据:", e)
}
}()
wg.Wait()
}
这个测试案例中 , 如果没有关闭 ch , 那么接收函数就会一直被阻塞
fatal error: all goroutines are asleep - deadlock!
三: 缓冲与无缓冲 channel
1. 无缓冲 channel
无缓冲 channel , 也叫同步 channel
只有当发送方和接收方都准备就绪时候 , 通信才会就绪
// 通过比对时间来判断接受和发送的时间是否是相互的
func NoBufferChannel() {
//一: 创建无缓冲channel
ch := make(chan int)
wg := sync.WaitGroup{}
//二: 间隔发送
wg.Add(1)
go func() {
wg.Done()
for i := 0; i < 5; i++ {
ch <- i
// 格式化时间输出
fmt.Println("发送数据: ", i, " 时间:", time.Now().Format("15:04:05.999999"))
//间隔时间
time.Sleep(1 * time.Second)
}
//关闭 channel
close(ch)
}()
//三: 间隔接收
wg.Add(1)
go func() {
defer wg.Done()
for i := range ch {
fmt.Println("接收数据: ", i, " 时间:", time.Now().Format("15:04:05.999999"))
// 间隔时间
time.Sleep(3 * time.Second)
}
}()
wg.Wait()
}
用同步channel,使用两个goroutine完成发送和接收。每次发送和接收的时间间隔不同。我们分别打印发送和接收的值和时间。注意结果:
发送和接收时间一致
间隔以长的为准,可见发送和接收操作为同步操作
2. 缓冲 channel
存在缓冲的 channel非常适合做 goroutine 的数据通信
缓冲 Channel 也称为异步 Channel , 接收和发送方不用等待双方就绪即可成功
接收时,从缓冲中接收元素,只要缓冲不为空,不会阻塞。反之,缓冲为空,会阻塞,goroutine 挂起
发送时,向缓冲中发送元素,只要缓冲未满,不会阻塞。反之,缓冲满了,会阻塞,goroutine 挂起
应用级的缓冲往往会淘汰已存在的缓存项 , 未新缓存腾空间 , 但是 channel 的缓存项 , 必须要被接受 , 不会淘汰旧缓存项
func BufferChannel() {
//一: 创建缓冲channel
ch := make(chan int, 5)
wg := sync.WaitGroup{}
//二: 间隔发送
wg.Add(1)
go func() {
wg.Done()
for i := 0; i < 5; i++ {
ch <- i
// 格式化时间输出
fmt.Println("发送数据: ", i, " 时间:", time.Now().Format("15:04:05.999999"))
//间隔时间
time.Sleep(1 * time.Second)
}
//关闭 channel
close(ch)
}()
//三: 间隔接收
wg.Add(1)
go func() {
defer wg.Done()
for i := range ch {
fmt.Println("接收数据: ", i, " 时间:", time.Now().Format("15:04:05.999999"))
// 间隔时间
time.Sleep(3 * time.Second)
}
}()
wg.Wait()
}
可以看出发送操作和接收操作为异步操作
3. len() 和 cap()
置函数len()
和cap()
可以分别获取:
len()
长度,缓冲中元素个数。cap()
容量,缓冲的总大小。cap()
返回0,意味着是无缓冲通道 ( 可以作为判断是否为无缓冲channel 的依据 )
4. 使用 channel 控制并发数量
send 操作会在缓冲无空间时阻塞
func ChannelGroutineNumCtl() {
// 1.独立的 goroutine 输出 goroutine数量
go func() {
for {
fmt.Println("NumGroutine", runtime.NumGoroutine())
//500毫秒计数一次
time.Sleep(500 * time.Millisecond)
}
}()
// 2.初始化channel,设置缓存区大小(并发规模)
const size = 1024
ch := make(chan struct{}, size)
// 3.并发的goroutine
for {
//一: 启动goroutine前,执行 ch send
// 当ch的缓冲已满时,阻塞
ch <- struct{}{}
go func() {
time.Sleep(10 * time.Second)
//二: goroutine结束时,接收一个ch中的元素
<-ch
}()
}
}
四: 单向 channel
单向Channel,指的是仅支持接收或仅支持发送操作的Channel。语法上:
chan<- T
仅发送Channel<-chan T
仅接收Channel
仅仅使用单向 channel 时没有意义的 , 单向 channel 最典型的使用方式是 : 使用单向 channel 约束双向 channel 的操作
语法上来说 , 就是会将双向 channel 转换为单向 channel 来使用
func ChannelDirection() {
// 一: 初始化数据
ch := make(chan int)
wg := &sync.WaitGroup{}
wg.Add(2)
// 使用双向channel为单向channel赋值
go getElement(ch, wg)
go setElement(ch, 45, wg)
wg.Wait()
}
// 仅接收的channel
func getElement(ch <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Println("接收的数据为:", <-ch)
}
// 仅发送的channel
func setElement(ch chan<- int, value int, wg *sync.WaitGroup) {
defer wg.Done()
ch <- value
fmt.Println("发送的数据为:", value)
}
函数 getElement 和 setElement,分别使用了单向的接收和发送 channel,在语义上表示只能接收和只能发送操作,同时程序上限定了操作
典型的单向 channel 标准库例子
// signal.Notify() 信号量 , 只能发送数据
func Notify(c chan<- os.Signal, sig ...os.Signal)
// time.After 定时器 , 只能接收数据
func After(d Duration) <-chan Time
五: channel 结构
1. channel 定义结构
Channel的结构定义为 runtime.hchan
:
// GOROOT/src/runtime/chan.go
type hchan struct {
qcount uint // 元素个数。len()
dataqsiz uint // 缓冲队列的长度。cap()
buf unsafe.Pointer // 缓冲队列指针,无缓冲队列为nil
elemsize uint16 // 元素大小
closed uint32
elemtype *_type // 元素类型
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
2. buffer 缓存数组
缓冲为数组结构,channel记录发送和接收元素的索引:
sendx uint // 发送索引
recvx uint // 接收索引
3. channel 与 goroutine 的关系
recvq 和 sendq 按顺序记录当前有哪些 goroutine 在操作 channel
* sudog
: 可以理解为要去操作当前 channel 的 goroutine
接收的 goroutine 如果被阻塞 , 就会记录在 recvq
发送的 goroutine 如果被阻塞 , 就会记录在 sendq
recvq waitq // 等待接收goroutine队列
sendq waitq // 等待发送goroutine队列
4. 整个 channel 流程
初始化 channel 流程
make()
初始化 channe l时,会根据是否存在缓冲,选择:
存在缓冲,为 channel 和 buffer 分别分配内存,同时 channel.buf 指向 buffer 地址
不存在缓冲,仅为 channel 分配内存,channel.buf 为 nil。
初始化 channel 中其他属性
向 channel 发送流程
语句 ch <- element 向 channel 发送元素时,大体的执行流程如下:
直接发送:当channel存在等待接受者时,channel.recvq,直接将元素拷贝给等待接受者,并唤醒等待接受者 goroutine 将其放在 M 的 runnext 位置,下次调度立即执行
直接写缓冲区 : 当缓冲区存在空间时,将发送元素直接写入缓冲区,调整 channel.sendx ( 指向发送索引 ) 的位置
阻塞发送 : 当缓冲区已满或无缓冲区时,发送 goroutine 进入 channel.sendq 队列,转为阻塞状态,等待其他 goroutine 从 channel 中接收元素,进而唤醒发送goroutine
从 channel 接收流程
操作符 <- ch 从 channel 中接收元素,大体流程如下:
当存在等待发送者时,channel.sendq
若无缓冲区,直接将元素从发送者拷贝到接受者,并唤醒发送者 gorutine,进入 runnext 下次调度执行
若存在缓冲区,此时缓冲区是满的,从缓冲区获取元素,并将等待发送者发送元素拷贝到缓冲区,唤醒发送者 goroutine。调整 channel 的 recvx 和 sendx 索引位置
当缓冲区有元素时(无等待发送者),直接从缓冲区读取元素
如果缓冲区不存在或缓冲区没有元素时,接收者 goroutine 进入阻塞状态,进入 channel.recvq 接受者队列,等待发送者发送数据唤醒。
关闭 channel 流程
close(ch) 关闭 channel,主要工作是:
取消 channel 关联的 sendq 和 recvq 队列 ( 唤醒两个goroutine 队列 )
调度阻塞在 sendq 和 recvq 中的 goroutine
六: 使用无缓冲channel 作为同步信号
原理 :
无缓冲 channel 是同步的
close 的 channel 是可以接收内容的 ( 如果channle中有数据接收到对应数据 , 没有数据,接收到零值 )
func ChannelClose() {
// 定义一个无缓冲的channel
ch := make(chan struct{})
wg := sync.WaitGroup{}
// 开启一个goroutine,close channel,表示发送信号
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(2 * time.Second)
fmt.Println("信号已发出,close(ch)")
close(ch)
}()
// 再开启一个goroutine,接收信号
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ch:
// 假设这里的业务是收到信号之后退出
fmt.Println("收到信号,程序终止")
return
default:
}
// 没有收到信号继续进行业务逻辑
fmt.Println("处理业务逻辑...")
time.Sleep(300 * time.Millisecond)
}
}()
wg.Wait()
}
七: 监控系统信号signal.Notify()
要求主 goroutine 等待上面的 goroutine 的三种方案
wg.wait()
需要 goroutine 可以正常的结束time.sleep()
sleeep 的时间要控制合适select{}
持久阻塞
func SelectSignal() {
// 模拟一段长时间运行的goroutine
go func() {
for {
// 定义当前时间,并且每隔一输出
fmt.Println(time.Now().Format("15:04:05:.000"))
time.Sleep(300 * time.Millisecond)
}
}()
// 1.创建channle,传递信号
OsChan := make(chan os.Signal, 1)
// 2.设置该channel可以监控哪些信号
signal.Notify(OsChan, os.Interrupt, os.Kill) //监控中断和kill信号
//signal.Notify(OsChan) // 监控全部类型的系统信号
// 3.监控channel信号
select {
case <-OsChan:
fmt.Println("监控到了中断信号:程序退出")
}
}
评论区