使用场景:
我需要完成一项任务,但是这项任务需要满足一定条件才可以执行,否则我就等着。
那我可以怎么获取这个条件呢?一种是循环去获取,一种是条件满足的时候通知我就可以了。显然第二种效率高很多。
通知的方式的话,golang里面通知可以用channel的方式

1
2
3
4
5
6
7
	var mail = make(chan string)
    go func() {
        <- mail
        fmt.Println("get chance to do something")
    }()
    time.Sleep(5*time.Second)
    mail <- "moximoxi"

但是channel的方式还是比较适合一对一,一对多并不是很适合。下面就来介绍一下另一种方式:sync.Cond sync.Cond就是用于实现条件变量的,是基于sync.Mutex的基础上,增加了一个通知队列,通知的线程会从通知队列中唤醒一个或多个被通知的线程。
主要有以下几个方法:

1
2
3
4
	sync.NewCond(&mutex)生成一个cond需要传入一个mutex因为阻塞等待通知的操作以及通知解除阻塞的操作就是基于sync.Mutex来实现的
	sync.Wait()用于等待通知
	sync.Signal()用于发送单个通知
	sync.Broadcat()用于广播

先找到一个sync.Cond的基本用法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import (
	"fmt"
	"sync"
	"time"
)

var locker sync.Mutex
var cond = sync.NewCond(&locker)

// NewCond(l Locker)里面定义的是一个接口,拥有lock和unlock方法。
// 看到sync.Mutex的方法,func (m *Mutex) Lock(),可以看到是指针有这两个方法,所以应该传递的是指针
func main() {
	for i := 0; i < 10; i++ {
		go func(x int) {
			cond.L.Lock()         // 获取锁
			defer cond.L.Unlock() // 释放锁
			cond.Wait()           // 等待通知,阻塞当前 goroutine
			// 通知到来的时候, cond.Wait()就会结束阻塞, do something. 这里仅打印
			fmt.Println(x)
		}(i)
	}
	time.Sleep(time.Second * 1) // 睡眠 1 秒,等待所有 goroutine 进入 Wait 阻塞状态
	fmt.Println("Signal...")
	cond.Signal() // 1 秒后下发一个通知给已经获取锁的 goroutine
	time.Sleep(time.Second * 1)
	fmt.Println("Signal...")
	cond.Signal() // 1 秒后下发下一个通知给已经获取锁的 goroutine
	time.Sleep(time.Second * 1)
	cond.Broadcast() // 1 秒后下发广播给所有等待的goroutine
	fmt.Println("Broadcast...")
	time.Sleep(time.Second * 5) // 睡眠 1 秒,等待所有 goroutine 执行完毕

}

上述代码实现了主线程对多个goroutine的通知的功能。
抛出一个问题:
主线程执行的时候,如果并不想触发所有的协程,想让不同的协程可以有自己的触发条件,应该怎么使用?
下面就是一个具体的需求:
有四个worker和一个master,worker等待master去分配指令,master一直在计数,计数到5的时候通知第一个worker,计数到10的时候通知第二个和第三个worker。
首先列出几种解决方式
1、所有worker循环去查看master的计数值,计数值满足自己条件的时候,触发操作 »»»»>弊端:无谓的消耗资源
2、用channel来实现,几个worker几个channel,eg:worker1的协程里<-channel(worker1)进行阻塞,计数值到5的时候,给worker1的channel放入值,
阻塞解除,worker1开始工作。 »»»>弊端:channel还是比较适用于一对一的场景,一对多的时候,需要起很多的channel,不是很美观
3、用条件变量sync.Cond,针对多个worker的话,用broadcast,就会通知到所有的worker。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {

	mutex := sync.Mutex{}
	var cond = sync.NewCond(&mutex)
	mail := 1
	go func() {
		for count := 0; count <= 15; count++ {
			time.Sleep(time.Second)
			mail = count
			cond.Broadcast()
		}
	}()
	// worker1
	go func() {
		for mail != 5 { // 触发的条件,如果不等于5,就会进入cond.Wait()等待,此时cond.Broadcast()通知进来的时候,wait阻塞解除,进入下一个循环,此时发现mail != 5,跳出循环,开始工作。
			cond.L.Lock()
			cond.Wait()
			cond.L.Unlock()
		}
		fmt.Println("worker1 started to work")
		time.Sleep(3 * time.Second)
		fmt.Println("worker1 work end")
	}()
	// worker2
	go func() {
		for mail != 10 {
			cond.L.Lock()
			cond.Wait()
			cond.L.Unlock()
		}
		fmt.Println("worker2 started to work")
		time.Sleep(3 * time.Second)
		fmt.Println("worker2 work end")
	}()
	// worker3
	go func() {
		for mail != 10 {
			cond.L.Lock()
			cond.Wait()
			cond.L.Unlock()
		}
		fmt.Println("worker3 started to work")
		time.Sleep(3 * time.Second)
		fmt.Println("worker3 work end")
	}()

	time.Sleep(20 * time.Second)
}