控制并发有两种经典的方式,一种是WaitGroup,另外一种就是Context。

WaitGroup

直接看例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func main() {
	var wg sync.WaitGroup

	wg.Add(2)
	go func() {
		time.Sleep(2 * time.Second)
		fmt.Println("1号完成")
		wg.Done()
	}()
	go func() {
		time.Sleep(2 * time.Second)
		fmt.Println("2号完成")
		wg.Done()
	}()
	wg.Wait()
	fmt.Println("好了,大家都干完了,收工")
}

例子中的2个goroutine同时做完才算是完成,先做好的就要等着其他未完成的,所有的goroutine要都全部完成才可以。

实际上我们可能会有这么一种场景,需要我们主动的通知某一个goroutine结束。比如我们开启一个后台goroutine一直做事情,比如监控,现在不需要了,就需要通知这个监控goroutine结束,不然它会一直跑。

chan通知

一个goroutine启动后我们是无法控制他的,大部分情况是等待它自己结束,那么如果这个goroutine是一个不会自己结束的后台goroutine呢?

一种傻瓜式的办法是全局变量,其他地方通过修改这个变量完成结束通知,然后后台goroutine不停的检查这个变量,如果发现被通知关闭了,就自我结束。这种方式也可以,但是首先我们要保证这个变量在多线程下的安全,基于此,有一种更好的方式:chan + select 。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func main() {
	stop := make(chan bool)

	go func() {
		for {
			select {
			case <-stop:
				fmt.Println("监控退出,停止了...")
				return
			default:
				fmt.Println("goroutine监控中...")
				time.Sleep(2 * time.Second)
			}
		}
	}()

	time.Sleep(10 * time.Second)
	fmt.Println("可以了,通知监控停止")
	stop <- true
	// 为了检测监控过是否停止,如果没有监控输出,就表示停止了
	time.Sleep(5 * time.Second)
}

这里我们定义一个stop的chan,通知他结束后台goroutine。使用select判断stop是否可以接收到值,如果可以接收到,就表示可以退出停止了;如果没有接收到,就会执行default里的监控逻辑,继续监控,直到收到stop的通知。

发送了stop<- true结束的指令后,如果成功,不会再有goroutine监控中...的输出;如果没有成功,监控goroutine就会继续打印goroutine监控中...

新问题

这种chan+select的方式,是比较优雅的结束一个goroutine的方式,不过这种方式也有局限性,如果有很多goroutine都需要控制结束怎么办呢?如果这些goroutine又衍生了其他更多的goroutine怎么办呢?如果一层层的无穷尽的goroutine呢?这就非常复杂了,即使我们定义很多chan也很难解决这个问题,因为goroutine的关系链就导致了这种场景非常复杂。

Context接口

Go 1.7 标准库引入context包,中文译作“上下文”,准确说它是 goroutine 的上下文,包含 goroutine 的运行状态、环境、现场等信息。context 主要用来在 goroutine 之间传递上下文信息,包括:取消信号、超时时间、截止时间、K-V 等。

随着 context 包的引入,标准库中很多接口因此加上了 context 参数,例如 database/sql 包。context 几乎成为了并发控制和超时控制的标准做法。

Context控制单个goroutine

我们先看看用Context重写上面的示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func main() {
	ctx, cancel := context.WithCancel(context.Background())
	go func(ctx context.Context) {
		for {
			select {
			case <-ctx.Done():
				fmt.Println("监控退出,停止了...")
				return
			default:
				fmt.Println("goroutine监控中...")
				time.Sleep(2 * time.Second)
			}
		}
	}(ctx)

	time.Sleep(10 * time.Second)
	fmt.Println("可以了,通知监控停止")
	cancel()
	//为了检测监控过是否停止,如果没有监控输出,就表示停止了
	time.Sleep(5 * time.Second)
}

重写就是把原来的chan stop 换成Context,使用Context跟踪goroutine,以便进行控制。

context.Background() 返回一个空的Context,这个空的Context一般用于整个Context树的根节点。然后我们使用context.WithCancel(parent)函数,创建一个可取消的子Context,然后当作参数传给goroutine使用,这样就可以使用这个子Context跟踪这个goroutine。

在goroutine中,使用select调用<-ctx.Done()判断是否要结束,如果接受到值的话,就可以返回结束goroutine了;如果接收不到,就会继续进行监控。用cancel()函数发送结束指令。

Context控制多个goroutine

下面我们看看控制多个goroutine的例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func main() {
	ctx, cancel := context.WithCancel(context.Background())
	go watch(ctx, "[监控1]")
	go watch(ctx, "[监控2]")
	go watch(ctx, "[监控3]")

	time.Sleep(10 * time.Second)
	fmt.Println("可以了,通知监控停止")
	cancel()
	// 为了检测监控过是否停止,如果没有监控输出,就表示停止了
	time.Sleep(5 * time.Second)
}

func watch(ctx context.Context, name string) {
	for {
		select {
		case <-ctx.Done():
			fmt.Println(name, "监控退出,停止了...")
			return
		default:
			fmt.Println(name, "goroutine监控中...")
			time.Sleep(2 * time.Second)
		}
	}
}

这里启动3个监控goroutine进行不断的监控,每一个都使用了Context进行跟踪,当我们使用cancel函数通知取消时,这3个goroutine都会被结束。

Context定义

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
type Context interface {
    // 当 context 被取消或者到了 deadline,返回一个被关闭的 channel
    Done() <-chan struct{}
    // 在 channel Done 关闭后,返回 context 取消原因
    Err() error
    // 返回 context 是否会被取消以及自动取消时间(即 deadline)
    Deadline() (deadline time.Time, ok bool)
    // 获取 key 对应的 value
    Value(key interface{}) interface{}
}

Context 是一个接口,定义了 4 个方法,它们都是幂等的。也就是说连续多次调用同一个方法,得到的结果都是相同的。

Done() 返回一个 channel,可以表示 context 被取消的信号:当这个 channel 被关闭时,说明 context 被取消了。注意,这是一个只读的channel。 我们又知道,读一个关闭的 channel 会读出相应类型的零值。并且源码里没有地方会向这个 channel 里面塞入值。换句话说,这是一个 receive-only 的 channel。因此在子协程里读这个 channel,除非被关闭,否则读不出来任何东西。也正是利用了这一点,子协程从 channel 里读出了值(零值)后,就可以做一些收尾工作,尽快退出。

Err() 返回一个错误,表示 channel 被关闭的原因。例如是被取消,还是超时。

Deadline() 返回 context 的截止时间,通过此时间,函数就可以决定是否进行接下来的操作,如果时间太短,就可以不往下做了,否则浪费系统资源。当然,也可以用这个 deadline 来设置一个 I/O 操作的超时时间。

Value() 获取之前设置的 key 对应的 value。

Context接口并不需要我们实现,Go内置已经帮我们实现了2个,我们代码中最开始都是以这两个内置的作为最顶层的partent context,衍生出更多的子Context。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
var (
	background = new(emptyCtx)
	todo       = new(emptyCtx)
)
func Background() Context {
	return background
}
func TODO() Context {
	return todo
}

他们两个本质上都是emptyCtx结构体类型,是一个不可取消,没有设置截止时间,没有携带任何值的Context。

Context的继承衍生

有了根Context,那么是如何衍生更多的子Context的呢?这就要靠context包为我们提供的With系列的函数了。

1
2
3
4
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context

这四个With函数,接收的都有一个partent参数,就是父Context,我们要基于这个父Context创建出子Context的意思,这种方式可以理解为子Context对父Context的继承,也可以理解为基于父Context的衍生。

通过这些函数,就创建了一颗Context树,树的每个节点都可以有任意多个子节点,节点层级可以有任意多个。

WithCancel函数,传递一个父Context作为参数,返回子Context,以及一个取消函数用来取消Context。 WithDeadline函数,和WithCancel差不多,它会多传递一个截止时间参数,意味着到了这个时间点,会自动取消Context,当然我们也可以不等到这个时候,可以提前通过取消函数进行取消。WithTimeoutWithDeadline基本上一样,这个表示是超时自动取消,是多少时间后自动取消Context的意思。

WithValue函数和取消Context无关,它是为了生成一个绑定了一个键值对数据的Context,这个绑定的数据可以通过Context.Value方法访问到,后面我们会专门讲。

大家可能留意到,前三个函数都返回一个取消函数CancelFunc,这是一个函数类型,它的定义非常简单。

type CancelFunc func()

这就是取消函数的类型,该函数可以取消一个Context,以及这个节点Context下所有的所有的Context,不管有多少层级。

WithValue传递元数据

通过Context我们也可以传递一些必须的元数据,这些数据会附加在Context上以供使用。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
var key string = "name"

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	// 附加值
	valueCtx := context.WithValue(ctx, key, "[监控1]")
	go watch(valueCtx)
	time.Sleep(10 * time.Second)
	fmt.Println("可以了,通知监控停止")
	cancel()
	// 为了检测监控过是否停止,如果没有监控输出,就表示停止了
	time.Sleep(5 * time.Second)
}

func watch(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			// 取出值
			fmt.Println(ctx.Value(key), "监控退出,停止了...")
			return
		default:
			// 取出值
			fmt.Println(ctx.Value(key), "goroutine监控中...")
			time.Sleep(2 * time.Second)
		}
	}
}

前面的例子中我们通过传递参数的方式,把name的值传递给监控函数。在这里我们实现一样的效果,但是通过的是Context的Value的方式。

记住,使用WithValue传值,一般是必须的值,不要什么值都传递。

防止 goroutine 泄漏

案例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func gen() <-chan int {
    ch := make(chan int)
    go func() {
        var n int
        for {
            ch <- n
            n++
            time.Sleep(time.Second)
        }
    }()
    return ch
}

这是一个可以生成无限整数的协程,但如果我只需要它产生的前 3 个数,那么就会发生 goroutine 泄漏:

1
2
3
4
5
6
7
8
9
func main() {
    for n := range gen() {
        fmt.Println(n)
        if n == 3 {
            break
        }
    }
	time.Sleep(3600 * time.Second)
}

当 n == 3 的时候,直接 break 掉。那么 gen 函数的协程就会执行无限循环,永远不会停下来,发生了 goroutine 泄漏。

什么情况可能会 goroutine 泄漏

goroutine作为独立的运行单元,如果自己本身阻塞了,他就像一个被挂起的线程,永远无法正常退出并释放资源。只要goroutine代码中存在阻塞的调用,而阻塞一直没有解除,那么goroutine就无法正常的运行完成自动消亡。

比如:IO调用,还有更常见的无缓冲通道读取或写入。

改进方式

用 context 改进这个例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func gen(ctx context.Context) <-chan int {
    ch := make(chan int)
    go func() {
        var n int
        for {
            select {
            case <-ctx.Done():
                return
            case ch <- n:
                n++
                time.Sleep(time.Second)
            }
        }
    }()
    return ch
}
func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel() // 避免其他地方忘记 cancel,重复调用不影响
    for n := range gen(ctx) {
        fmt.Println(n)
        if n == 5 {
            cancel()
            break
        }
    }
}

增加一个 context,在 break 前调用 cancel 函数,取消 goroutine。gen 函数在接收到取消信号后,直接退出,系统回收资源。

Context 使用原则

  1. 不要把Context放在结构体中,要以参数的方式传递
  2. 以Context作为参数的函数方法,应该把Context作为第一个参数,放在第一位。
  3. 给一个函数方法传递Context的时候,不要传递nil,如果不知道传递什么,就使用context.TODO
  4. Context的Value相关方法应该传递必须的数据,不要什么数据都使用这个传递
  5. Context是线程安全的,可以放心的在多个goroutine中传递

(完)