目 录CONTENT

文章目录
Go

Go并发编程-Channel通信

Sakura
2023-08-19 / 1 评论 / 0 点赞 / 103 阅读 / 0 字 / 正在检测是否收录...
温馨提示:
本文最后更新于408天前,若内容或图片失效,请留言反馈。 部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

一: Channel 通信

不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存

这是Go语言最核心的设计模式之一。

在很多主流的编程语言中,多个线程传递数据的方式一般都是共享内存,而 Go 语言中多 Goroutine 通信的主要方案是 Channel。Go 语言也可以使用共享内存的方式支持 Goroutine 通信。

Go 语言实现了 CSP 通信模式,CSP 是 Communicating Sequential Processes 的缩写,通信顺序进程。Goroutine 和 Channel 分别对应 CSP 中的实体和传递信息的媒介。CSP 是 Tony Hoare 于1977年提出。

Channel 提供可接收和发送特定类型值的用于并发函数 ( Goroutine ) 通信的数据类型,是满足 FIFO(先进先出)原则的队列类型,先进先出不仅体现在数据类型上,也体现在操作上:

  • channel 类型的元素是先进先出的,先发送到 chann el 的 value 会先被 receive

  • 先向 Channel 发送数据的 Goroutine 会先执行

  • 先从 Channel 接收数据的 Goroutine 会先执行

二: Channel 的操作语法

  • 一个关键字

    • chan

    • chan <-

    • <- chan

  • 两个函数

    • make

    • close

  • 一个语句

    • 发送语句 ch <- expresssion

  • 一个操作符

    • 接收操作符 <- ch

1. Channel 类型

// chan 双向,可收可发
// chan <- 仅发送
// <- chan 仅接收
ChannelType = ( "chan" | "chan" "<-" | "<-" "chan" ) ElementType

任何类型都可以作为 Channel 的元素类型

chan int
chan struct{}
chan <- int
<- chan int

2. 初始化 Channel 值

make(ChannelType, Capacity)
  • ChannelType 是 channel 类型

  • Capacity 是缓冲容量 , 可以省略或为0 , 表示无缓冲 Channel

ch := make(chan int)
var ch = make(chan int)
ch := make(chan int, 10)
ch := make(<-chan int)
ch := make(chan<- int, 10)

未使用 make() 初始化的 channel 为nil , nil channel 不能执行收发通信操作 , 例如:

var ch chan int

3. Send 语句和接收操作符

  • send 语句用于像 Chennel 发送值

  • 接收操作符用于从 Channel 中接收值

// 发送操作
ch <- Expression
ch <- 100
ch <- f()

// 接收操作
<- ch
v1 := <-ch // 声明
v = <-ch // 赋值
f(<-ch) // 函数调用
<-strobe // 等待接收

4. 关闭 Channel

// close()用于关闭channel
close()

关闭 Channel 的意思是记录该 Channel 不能再被发送任何元素了

  • 向已关闭的 Channel 发送回引发 runtime panic

  • 关闭 nil Channel 会引发 runtime panic

  • 不能关闭仅接收 Channel

  • 不能关闭已经关闭的 Channel,否则会引发 runtime panic

当从已关闭的 channel 接收值时候:

  • 可以接收关闭前发送的全部值

  • 若没有已发送的值会返回类型的零值 , 不会被阻塞

使用接收操作符的多值返回结构,可以判断Channel是否已经关闭:

var x, ok = <-ch
x, ok := <-ch
  • ok 为 true,channel 未关闭

  • ok 为 false,channel 已关闭

func ChannelOperate() {
	// 初始化
	ch := make(chan int) //无缓冲channel

	//发送值
	go func() {
		ch <- 100
	}()

	//接收值
	go func() {
		value := <-ch
		fmt.Println("receive from ch : ", value)
	}()
	time.Sleep(time.Second)

	//关闭ch
	close(ch)
}

5. for range channel 持续接收元素

for value := range ch {
    fmt.Println(value)
}
  • 若 ch 为 nil channel 会阻塞

  • 若 ch 没有已发送元素会阻塞

func ChannelFor() {
	//一: 初始化部分数据
	ch := make(chan int)
	wg := sync.WaitGroup{}

	//二:持续发送
	wg.Add(1)
	go func() {
		defer wg.Done()
		for i := 0; i < 5; i++ {
			//随机发送数据
			ch <- rand.Intn(10)
		}
		//关闭
		close(ch)
	}()

	//三:持续接收
	wg.Add(1)
	go func() {
		defer wg.Done()
		for e := range ch {
			fmt.Println("接收到的数据:", e)
		}
	}()

	wg.Wait()
}

这个测试案例中 , 如果没有关闭 ch , 那么接收函数就会一直被阻塞

fatal error: all goroutines are asleep - deadlock!

三: 缓冲与无缓冲 channel

1. 无缓冲 channel

无缓冲 channel , 也叫同步 channel

只有当发送方和接收方都准备就绪时候 , 通信才会就绪

// 通过比对时间来判断接受和发送的时间是否是相互的
func NoBufferChannel() {
	//一: 创建无缓冲channel
	ch := make(chan int)
	wg := sync.WaitGroup{}

	//二: 间隔发送
	wg.Add(1)
	go func() {
		wg.Done()
		for i := 0; i < 5; i++ {
			ch <- i
			// 格式化时间输出
			fmt.Println("发送数据: ", i, " 时间:", time.Now().Format("15:04:05.999999"))
			//间隔时间
			time.Sleep(1 * time.Second)
		}
		//关闭 channel
		close(ch)
	}()

	//三: 间隔接收
	wg.Add(1)
	go func() {
		defer wg.Done()
		for i := range ch {
			fmt.Println("接收数据: ", i, " 时间:", time.Now().Format("15:04:05.999999"))

			// 间隔时间
			time.Sleep(3 * time.Second)
		}
	}()

	wg.Wait()
}

用同步channel,使用两个goroutine完成发送和接收。每次发送和接收的时间间隔不同。我们分别打印发送和接收的值和时间。注意结果:

  • 发送和接收时间一致

  • 间隔以长的为准,可见发送和接收操作为同步操作

2. 缓冲 channel

存在缓冲的 channel非常适合做 goroutine 的数据通信

缓冲 Channel 也称为异步 Channel , 接收和发送方不用等待双方就绪即可成功

  • 接收时,从缓冲中接收元素,只要缓冲不为空,不会阻塞。反之,缓冲为空,会阻塞,goroutine 挂起

  • 发送时,向缓冲中发送元素,只要缓冲未满,不会阻塞。反之,缓冲满了,会阻塞,goroutine 挂起

应用级的缓冲往往会淘汰已存在的缓存项 , 未新缓存腾空间 , 但是 channel 的缓存项 , 必须要被接受 , 不会淘汰旧缓存项

func BufferChannel() {
	//一: 创建缓冲channel
	ch := make(chan int, 5)
	wg := sync.WaitGroup{}

	//二: 间隔发送
	wg.Add(1)
	go func() {
		wg.Done()
		for i := 0; i < 5; i++ {
			ch <- i
			// 格式化时间输出
			fmt.Println("发送数据: ", i, " 时间:", time.Now().Format("15:04:05.999999"))
			//间隔时间
			time.Sleep(1 * time.Second)
		}
		//关闭 channel
		close(ch)
	}()

	//三: 间隔接收
	wg.Add(1)
	go func() {
		defer wg.Done()
		for i := range ch {
			fmt.Println("接收数据: ", i, " 时间:", time.Now().Format("15:04:05.999999"))

			// 间隔时间
			time.Sleep(3 * time.Second)
		}
	}()

	wg.Wait()
}

可以看出发送操作和接收操作为异步操作

3. len() 和 cap()

置函数len()cap()可以分别获取:

  • len()长度,缓冲中元素个数。

  • cap()容量,缓冲的总大小cap()返回0,意味着是无缓冲通道 ( 可以作为判断是否为无缓冲channel 的依据 )

4. 使用 channel 控制并发数量

  • send 操作会在缓冲无空间时阻塞

func ChannelGroutineNumCtl() {
	// 1.独立的 goroutine 输出 goroutine数量
	go func() {
		for {
			fmt.Println("NumGroutine", runtime.NumGoroutine())
            //500毫秒计数一次
			time.Sleep(500 * time.Millisecond)
		}
	}()

	// 2.初始化channel,设置缓存区大小(并发规模)
	const size = 1024
	ch := make(chan struct{}, size)

	// 3.并发的goroutine
	for {
		//一: 启动goroutine前,执行 ch send
		// 当ch的缓冲已满时,阻塞
		ch <- struct{}{}
		go func() {
			time.Sleep(10 * time.Second)
			//二: goroutine结束时,接收一个ch中的元素
			<-ch
		}()
	}
}

四: 单向 channel

单向Channel,指的是仅支持接收或仅支持发送操作的Channel。语法上:

  • chan<- T 仅发送Channel

  • <-chan T 仅接收Channel

仅仅使用单向 channel 时没有意义的 , 单向 channel 最典型的使用方式是 : 使用单向 channel 约束双向 channel 的操作

  • 语法上来说 , 就是会将双向 channel 转换为单向 channel 来使用

func ChannelDirection() {
	// 一: 初始化数据
	ch := make(chan int)
	wg := &sync.WaitGroup{}

	wg.Add(2)
	// 使用双向channel为单向channel赋值
	go getElement(ch, wg)
	go setElement(ch, 45, wg)

	wg.Wait()

}

// 仅接收的channel
func getElement(ch <-chan int, wg *sync.WaitGroup) {
	defer wg.Done()

	fmt.Println("接收的数据为:", <-ch)
}

// 仅发送的channel
func setElement(ch chan<- int, value int, wg *sync.WaitGroup) {
	defer wg.Done()
	ch <- value

	fmt.Println("发送的数据为:", value)
}

函数 getElement 和 setElement,分别使用了单向的接收和发送 channel,在语义上表示只能接收和只能发送操作,同时程序上限定了操作

  • 典型的单向 channel 标准库例子

// signal.Notify() 信号量 , 只能发送数据
func Notify(c chan<- os.Signal, sig ...os.Signal)

// time.After 定时器 , 只能接收数据
func After(d Duration) <-chan Time

五: channel 结构

1. channel 定义结构

Channel的结构定义为 runtime.hchan

// GOROOT/src/runtime/chan.go
type hchan struct {
    qcount   uint           // 元素个数。len()
    dataqsiz uint           // 缓冲队列的长度。cap()
    buf      unsafe.Pointer // 缓冲队列指针,无缓冲队列为nil
    elemsize uint16 // 元素大小
    closed   uint32
    elemtype *_type // 元素类型
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}

2. buffer 缓存数组

缓冲为数组结构,channel记录发送和接收元素的索引:

 sendx    uint   // 发送索引
 recvx    uint   // 接收索引

3. channel 与 goroutine 的关系

recvq 和 sendq 按顺序记录当前有哪些 goroutine 在操作 channel

* sudog : 可以理解为要去操作当前 channel 的 goroutine

接收的 goroutine 如果被阻塞 , 就会记录在 recvq

发送的 goroutine 如果被阻塞 , 就会记录在 sendq

recvq    waitq  // 等待接收goroutine队列
sendq    waitq  // 等待发送goroutine队列

4. 整个 channel 流程

  1. 初始化 channel 流程

make()初始化 channe l时,会根据是否存在缓冲,选择:

  • 存在缓冲,为 channel 和 buffer 分别分配内存,同时 channel.buf 指向 buffer 地址

  • 不存在缓冲,仅为 channel 分配内存,channel.buf 为 nil。

  • 初始化 channel 中其他属性

  1. 向 channel 发送流程

语句 ch <- element 向 channel 发送元素时,大体的执行流程如下:

  • 直接发送:当channel存在等待接受者时,channel.recvq,直接将元素拷贝给等待接受者,并唤醒等待接受者 goroutine 将其放在 M 的 runnext 位置,下次调度立即执行

  • 直接写缓冲区 : 当缓冲区存在空间时,将发送元素直接写入缓冲区,调整 channel.sendx ( 指向发送索引 ) 的位置

  • 阻塞发送 : 当缓冲区已满或无缓冲区时,发送 goroutine 进入 channel.sendq 队列,转为阻塞状态,等待其他 goroutine 从 channel 中接收元素,进而唤醒发送goroutine

  1. 从 channel 接收流程

操作符 <- ch 从 channel 中接收元素,大体流程如下:

  • 当存在等待发送者时,channel.sendq

    • 若无缓冲区,直接将元素从发送者拷贝到接受者,并唤醒发送者 gorutine,进入 runnext 下次调度执行

    • 若存在缓冲区,此时缓冲区是满的,从缓冲区获取元素,并将等待发送者发送元素拷贝到缓冲区,唤醒发送者 goroutine。调整 channel 的 recvx 和 sendx 索引位置

  • 当缓冲区有元素时(无等待发送者),直接从缓冲区读取元素

  • 如果缓冲区不存在或缓冲区没有元素时,接收者 goroutine 进入阻塞状态,进入 channel.recvq 接受者队列,等待发送者发送数据唤醒。

  1. 关闭 channel 流程

close(ch) 关闭 channel,主要工作是:

  • 取消 channel 关联的 sendq 和 recvq 队列 ( 唤醒两个goroutine 队列 )

  • 调度阻塞在 sendq 和 recvq 中的 goroutine

六: 使用无缓冲channel 作为同步信号

原理 :

  1. 无缓冲 channel 是同步的

  2. close 的 channel 是可以接收内容的 ( 如果channle中有数据接收到对应数据 , 没有数据,接收到零值 )

func ChannelClose() {
	// 定义一个无缓冲的channel
	ch := make(chan struct{})
	wg := sync.WaitGroup{}

	// 开启一个goroutine,close channel,表示发送信号
	wg.Add(1)
	go func() {
		defer wg.Done()
		time.Sleep(2 * time.Second)
		fmt.Println("信号已发出,close(ch)")
		close(ch)
	}()

	// 再开启一个goroutine,接收信号
	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			select {
			case <-ch:
				// 假设这里的业务是收到信号之后退出
				fmt.Println("收到信号,程序终止")
				return
			default:

			}
			// 没有收到信号继续进行业务逻辑
			fmt.Println("处理业务逻辑...")
			time.Sleep(300 * time.Millisecond)
		}
	}()
	wg.Wait()
}

七: 监控系统信号signal.Notify()

要求主 goroutine 等待上面的 goroutine 的三种方案

  • wg.wait() 需要 goroutine 可以正常的结束

  • time.sleep() sleeep 的时间要控制合适

  • select{} 持久阻塞

func SelectSignal() {
	// 模拟一段长时间运行的goroutine
	go func() {
		for {
			// 定义当前时间,并且每隔一输出
			fmt.Println(time.Now().Format("15:04:05:.000"))
			time.Sleep(300 * time.Millisecond)
		}
	}()

	// 1.创建channle,传递信号
	OsChan := make(chan os.Signal, 1)

	// 2.设置该channel可以监控哪些信号
	signal.Notify(OsChan, os.Interrupt, os.Kill) //监控中断和kill信号
	//signal.Notify(OsChan) // 监控全部类型的系统信号
	
	// 3.监控channel信号
	select {
	case <-OsChan:
		fmt.Println("监控到了中断信号:程序退出")
	}
}

0

评论区