目 录CONTENT

文章目录

Goalng并发编程-select语句

Sakura
2023-08-27 / 0 评论 / 0 点赞 / 24 阅读 / 9544 字 / 正在检测是否收录...

一: 基本介绍

select 语句能够从多个可读或者可写的Channel中选择一个继续执行 ,若没有Channel发生读写操作,select 会一直阻塞当前Goroutine。

select 中 , 除了 default 之外 , 其他操作都必须要对 channel 进行操作

func SelectTest() {
	//1.声明需要的变量
	var array [3]int
	var c1, c2, c3, c4 = make(chan int), make(chan int), make(chan int), make(chan int)
	var i1, i2 int

	//2.用于select的goroutine
	go func() {
		c1 <- 10
	}()
	go func() {
		<-c2
	}()
	go func() {
		close(c3)
	}()
	go func() {
		c4 <- 40
	}()

	//3.多路监听的select
	go func() {
		select {
		//监听是否可以从c1中获取值
		case i1 = <-c1:
			fmt.Println("接收到了C1的值:", i1)
		//监听对c2的写操作
		case c2 <- i2:
			fmt.Println("向c2写入了:", i2)
		//c3是否被关闭
		case i3, ok := <-c3:
			if ok {
				fmt.Println("从接收到了:", i3)
			} else {
				fmt.Println("c3已被关闭")
			}
		//测试左值表达式的执行实际
		case array[f()] = <-c4:
			fmt.Println("从c4收到", array[f()])
		//默认
		default:
			fmt.Println("没有channel被操作")
		}
	}()

	//简单阻塞
	time.Sleep(500 * time.Millisecond)
}

func f() int {
	fmt.Println("f()执行")
	return 2
}

二: 执行流程

第一步: 收集 case 涉及的 channel

  1. 计算操作数 , 计算结果是一组从中接收或者发送到的 channel , 以及要发送的相应值 , 计算内容如下

  2. receive 操作的 channel 操作数 case value := <- channel

  3. send语句的 channel 和右表达式 case channel <- func()

  4. Receive 的左侧带有短变量声明或赋值的表达式不会被计算 case func() <- channel

第二步: 判断是否存在可操作的 channel

  1. 如果存在 , 随机选择可执行 channel 的 case , (在第一步的时候做了一个伪随机数的随机排序)

  2. 如果不存在可操作的 case , 判断是否有可操作的 default , 并执行

第三步: 如果没有可操作的 case , 并且也没有可执行的 default case , 那么就会被阻塞

三: for + select

select匹配到可操作的case或者是defaultcase后,就执行完毕了。实操时,我们通常需要持续监听某些channel的操作,因此典型的select使用会配合for完成

func ForSelect() {
	// 定义channel
	ch := make(chan int)

	// 向 channel 发送数据
	go func() {
		//实际工作中,ch可能来自缓存,网络,数据库
		for {
			ch <- rand.Intn(1000)
			time.Sleep(200 * time.Millisecond)
		}
	}()

	// 持续监听 channel
	go func() {
		for {
			select {
			case value := <-ch:
				fmt.Println("接收到了来之channel的数据:", value)
			}
		}
	}()

	time.Sleep(3000 * time.Millisecond)
}

四: 阻塞 select

1. 空 select 阻塞

func BlockChannel() {
	// 空select阻塞
	fmt.Println("before select")
	select {}
	fmt.Println("after select")

}

2. nil channel 阻塞

nil channel 不能读写

func BlockChannel() {

	//nil channel
	var channel chan int //nil channel 不能读写
	go func() {
		channel <- rand.Intn(1000)
	}()

	fmt.Println("before select")
	select {
	case <- channel:
	case channel <- 100:
	}
	fmt.Println("after select")
}

五: nil channel 的 case

nil channel 不能读写,因此通过将channel设置为nil,可以控制某个case不再被执行。

例如,3秒后,不再接受ch的数据:

func NilChannel() {

	// 一:初始化channel
	channel := make(chan int)

	// 二:操作 channel 的 goroutine
	go func() {
		//向 channel 中写入随机数
		for {
			channel <- rand.Intn(1000)
			time.Sleep(400 * time.Millisecond)
		}
	}()

	// 三:select 处理内部的 goroutine
	go func() {
		// 设置定时器
		after := time.After(3 * time.Second)
		sum := 0
		// 持续监听 channel
		for {
			select {
			case value := <-channel:
				fmt.Println("接收到channel的数据:", value)
				sum++
			// 定时器时间到了之后,就可以从里面读出内容
			case <-after:
				// 将channel设置为nil channel
				channel = nil
				fmt.Println("channel已设置为nil,sum为:", sum)
			}
		}
	}()
	// 让主 goroutine 阻塞
	time.Sleep(5 * time.Second)
}

六: 带有 default 的 select

当 select 语句存在 default case 时:

  • 若没有可操作的 channel,会执行 default case

  • 若有可操作的 channel,会执行对应的 case

这样 select 语句不会进入 block 状态,称之为非阻塞 ( non-block ) 的收发 ( channel ) 的接收和发送

示例:多人猜数字游戏,是否有人猜中数字:

func SelectNonBlock() {
	// 初始化数据
	counter := 10 // 参与人数
	max := 20     // [0, 19] // 最大范围
	rand.Seed(time.Now().UnixMilli())
	answer := rand.Intn(max) // 随机答案
	println("The answer is ", answer)
	println("------------------------------")

	// 正确答案channel
	bingoCh := make(chan int, counter)
	// wg
	wg := sync.WaitGroup{}
	wg.Add(counter)
	for i := 0; i < counter; i++ {
		// 每个goroutine代表一个猜数字的人
		go func() {
			defer wg.Done()
			result := rand.Intn(max)
			println("someone guess ", result)
			// 答案争取,写入channel
			if result == answer {
				bingoCh <- result
			}
		}()
	}
	wg.Wait()

	println("------------------------------")
	// 是否有人发送了正确结果
	// 可以是0或多个人
	// 核心问题是是否有人猜中,而不是几个人
	select {
	case result := <-bingoCh:
		println("some one hint the answer ", result)
	default:
		println("no one hint the answer")
	}
}

特别的情况是存在两个 case,其中一个是 default ,另一个是 channel case,那么 go 的优化器会优化内部这个 select 。内部会以 if 结构完成处理。因为这种情况,不用考虑随机性的问题

select {
    case result := <-bingoCh:
    println("some one hint the answer ", result)
    default:
    // 非阻塞的保证,存在default case
    println("no one hint the answer")
}

// 优化伪代码
if selectnbrecv(bingoCh) {
    println("some one hint the answer ", result)
} else {
    println("no one hint the answer")
}

七: 并发执行模式

1. Race 模式

Race模式,典型的并发执行模式之一,多路同时操作资源,哪路先操作成功,优先使用,同时放弃其他路的等待。简而言之,从多个操作中选择一个最快的。核心工作:

  • 选择最快的

  • 停止其他未完成的

func RaceTest() {
	// 初始化数据
	type Rows struct {
		// 数据字段

		// 索引标识
		Index int
	}

	// 定义查询器数量
	const QueryNum = 8

	// 数据通信channel 和 停止信号channel
	ResultChannel := make(chan Rows, 1)
	StopChannel := [QueryNum]chan struct{}{}
	for key := range StopChannel {
		StopChannel[key] = make(chan struct{})
	}

	//wg,rand
	wg := sync.WaitGroup{}
	//rand1 := rand.Intn(100)

	// 模拟查询 , 等待第一个结果反馈
	// 基于定义的查询器的数量,开启goroutine
	wg.Add(QueryNum)
	for i := 0; i < QueryNum; i++ {
		// 开启一个查询器
		go func(i int) {
			defer wg.Done()
			// 模拟执行时间
			RandTime := rand.Intn(1000)
			fmt.Println("查询器", i, "获取数据,需要 ", RandTime, " ms 执行")

			// 每个goroutine的结果channel
			chrst := make(chan Rows, 1)

			// 模拟执行查询
			go func() {
				//模拟时长
				time.Sleep(time.Duration(RandTime) * time.Millisecond)
				chrst <- Rows{
					Index: i,
				}
			}()

			// 监听查询结果和停止信号channel
			select {
			case result := <-chrst:
				fmt.Println("查询器 ", i, " 获得数据")
				// 保证没有其他结果写入
				if len(ResultChannel) == 0 {
					ResultChannel <- result
				}

			//停止信号
			case <-StopChannel[i]:
				fmt.Println("查询器 ", i, " 停止")
				return
			}
		}(i)
	}

	// 三:等待第一个查询结果的反馈
	wg.Add(1)
	go func() {
		defer wg.Done()
		//等待ch中传递的结果
		select {
		case value := <-ResultChannel:
			fmt.Println("得到了结果", value.Index, "停止其他查询")
			// 循环遍历,通知其他查询器结束
			for key := range StopChannel {
				if key == value.Index {
					continue
				}
				StopChannel[key] <- struct{}{}
			}
		//计划一个超时时间
		case <-time.After(5 * time.Second):
			fmt.Println("所有查询器停止")
			for key := range StopChannel {
				StopChannel[key] <- struct{}{}
			}
		}
	}()

	wg.Wait()
}

2. All 模式

0

评论区