Channel 的种类

在 Golang 中,Channel 作为多个 goroutine 之间通信的桥梁,大致可以分为两类:

  • 缓冲
  • 非缓冲

Channel 在使用之间,必须要通过make来进行创建。

非缓冲 Channel

若在两个 gorountine 中使用了非缓冲 Channel,就说明在通信双方的中间,留给它们可以存放「通信消息」的位置只有一个。此时,如果该 Channel 中没有消息,那么「读取」操作将会被阻塞。同样,如果该 Channel 中有消息但是没有被取走,那么「写入」操作将会被阻塞。

在使用了 Channel 之后,通信双方和 Channel 一起可以构成一个相当标准的「生产者—消费者」模型。

缓冲 Channel

缓冲 Channel 与非缓冲 Channel 唯一不同的地方就在于存放「通信消息」的位置可以有多个。「写入」和「读取」操作被阻塞的条件也随之变成了:

  • 当缓冲区满的时候,「写入」操作被阻塞
  • 当缓冲区空的时候,「读取」操作被阻塞

Channel 的关闭操作

Channel 的操作和队列一样,无非就三个:写入,读取,删除(关闭)。其中写入和读取操作相对来说比较简单。接下来我们看下,如果某个 Channel 不想使用,准备关闭它,需要通过调用一个名为close 的函数。若一个 Channel 已经被关闭,再对它进行一些操作可能会引起不同的结果:

  • 重复关闭:程序会 panic
  • 向关闭的 Channel 继续写入数据:程序会 panic
  • 从关闭的 Channel 中读取数据:程序会正常读取 Channel 中余下的消息。当 Channel 中不再有消息的时候,它会返回一个 Channel 存储消息类型的 0 值。假设我们现在有一个存储 int 类型消息的 Channel。当 Channel 已经关闭之后,我们继续读取它内部的消息。由于 int 的默认值是0,且 Channel 里面也完全有可能有 0 这个数据。所以我们要通过额外的一个变量来判断这个 0 到底是标识 Channel 内没有消息了还是数据就是 0。
1
2
3
4
5
msg, ok := <- ch
if !ok {
	log.Info("There is no msg")
}
log.Info("You get a new msg from channel")

ok 是一个 bool 类型的变量,它会告诉我们此次读取出来的消息是默认的0值,还是缓冲区内真正保存的内容。

Channel 的常用姿势

多个 goroutine 间通信

完全可以在 Golang 中将 Channel 当做消息队列来使用。通过 Channel 在多个 goroutine 间共享数据是非常方便的,减少了很多使用者的心智负担,不需要再担心锁的问题。

select

Golang 中有一个名为select的原语,它的作用类似 Linux 系统调用中的select。我们可以将多个 Channel 的读取或者写入操作放在 select 的 Case 中进行使用。通常情况下,我们用非缓冲的 Channel 和 select 一起实现一个「超时处理机制」:

1
2
3
4
5
6
select {
  case value := <- ch:
		handle(value)
  case time.After(5 * time.Second):
      handleTimeout()
}

range

range 通常用于遍历一个集合内的元素,它同样可以用于 Channel,无论是缓冲版本还是非缓冲版本。(这里埋了一个小伏笔)。若 Channel 中有数据,那么 range 其实就相当于执行一个「读取」操作。关键是,当 channel 中没有数据的时候,range 它会发生什么情况呢?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
	"fmt"
)

func main() {
	fmt.Println("Hello, playground")
	ch := make(chan int)
	go func() {
		ch <- 1
	}()

	count := 0
	for v := range ch {
		fmt.Print(v)
		count++
		if count == 1 {
			close(ch)
		}
	}
}

上述实例程序最终会顺利退出。最主要的原因是我们在读取了 Channel 中的消息之后,主动的关闭了这个 Channel。而 range 一个关闭了 Channel 会自动退出循环。如果不执行 close 操作的话,那么程序会报错:dead lock。因为 range 执行的读取操作会一直阻塞在那。

Channel 的内部实现

数据结构

Channel 的本质其实是一个指针,它指向了一个名为hchan的结构体,所以我们在传递 Channel 变量的时候不需要对其进行取地址操作:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}

通过注释其实我们就可以大概的了解 Channel 是如何来实现的:

首先,qcount 和 dataqsiz 分别代表了缓冲区内元素的个数以及缓冲区本身的大小,注释中是通过 queue 来标识的。那就说明,Channel 底层的缓冲区可能是通过一个队列实现的。紧接着,我们看到了三个比较关键的成员:

  • buf
  • sendx
  • recvx

其中 buf 所指向的内存区域应该就是我们前面说到的 queue 了。而且,我们又发现了sendxsendx两个成员。他们似乎标识了缓冲区中读取和写入要操作的索引信息。这三个成员在一起,不得不让我们怀疑,Channel 底层的缓冲区是通过一个「循环队列」的数据结构来实现的。因为 Channel 中既有读取操作也有写入操作,如果一直使用线性增长的缓冲区,势必会造成资源的浪费,并且在元素顺序的整理上也会付出相当大的代价。

既然是对内存中的数据操作,并且还会涉及到多个 goroutine,所以不可避免的要通过锁来避免「竞争」:lock。另外,之前提到过,如果一个 Channel 的缓冲区没有元素或者已经满了,那么执行读取操作或者写入操作的 goroutine 会被组塞住。而 recvq 和 sendq 就是用来保存它们的。以便在方便的时候将它们唤醒。

写入和读取操作

Golang 使用 groutine 和 Channel 实现了CSP(Communicating Sequential Processes)模型。在 CSP 模型中有一个非常重要的理念:

1
不要通过共享内存的方式进行通信要通过通信的方式共享内存

这个理念在 Channel 的写入和读取操作上得到了充分展示。

无论是写入还是读取操作,涉及到 hchan 数据结构中的成员有如下几个:

  • lock
  • sendx/recvx
  • buf
  • recvq/sendq

非阻塞场景

我们先把阻塞写入和读取操作的场景放在一边,只讨论下 Channel 写入和读取数据的过程。因为这是最基本的,也是最主要的部分。

写入

当我们向一个 Channel 发送一个数据的时候,写入操作会首先尝试获取 lock,将 buf 锁住,防止出现竞争。然后根据 sendx 给出的索引值,将消息 Copy 一份,塞入其中。为什么一定要 Copy 一份呢?直接使用原始数据不行么?很显然,如果我们将原始数据即生产者写入的数据放到 buf 内,这相当于让消费者和生产者共享内存了。这是违背了 CSP 模型提出的那个核心理念的。最后,在写入操作完成的时候,它会释放 lock。

读取

读取操作是写入操作的逆过程。在获取 Channel 元素的时候,也同样会执行一个 Copy 操作。

阻塞场景

当 buf 已经存满消息的时候,触发写入操作的 goroutine 将会被阻塞住。此时这个阻塞的 goroutine 将会被存储到我们前面说的 hchan对象的 sendq 成员内。直到 buf 内再次有空位即有读取操作发生之后,该 goroutine 才会被唤醒,执行完之前的写入操作。

goroutine—用户态实现的线程模型

既然说到了 goroutine 的唤醒与阻塞,我们就不得不回忆起操作系统中线程和进程的相关概念。实际上,goroutine 就是一种 Golang 在用户态下实现的轻量级线程,它由用户态中的 go runtime 进行管理。但是,goroutine 最终还是跑在一个线程上的。因为可被 CPU 调度执行的最小单位是进程创建的线程。goroutine 和 线程的关系类似于「多路复用」,一个线程可能会携带多个 goroutine。Golang Scheduler 将会依照一定的调度策略,将线程和 goroutine 进行绑定,以便线程和 goroutine 一起被 CPU 调度运行。

线程,goroutine,调度上下文三者之间的关系如下图所示:

回到刚才那个触发了「阻塞」操作的 goroutine,我们将它标记为 G1。

此时它将从 M 上被调度下来,放入一个等待队列中,即我们上面提到过的 sendq 内。Golang Scheduler 将会从 runable queue 中取出一个可执行的 goroutine,将其调度到线程上执行。

如果有另外一个 goroutine 从 Channel 中读取一个消息,当消息成功的被读取之后,这次「读取」操作仍然没有结束。它还会执行接下来的两个步骤:

  1. 从 sendq 中将 G1 要写入的元素直接塞入 queue 中
  2. 通过使用 Golang Scheduler 提供的一些接口,将 G1 塞入我们上面提到的 runnable queue 中

上面我们提到的是写入操作被阻塞,接下来我们来看看当读取操作被阻塞的时候会发生什么。

读取操作被阻塞过程的前半部分和写入操作一样

一共发生了三件事:

  1. 创建一个 sudog 对象,保存 G2
  2. 将 sudog 对象塞入 recvq 中
  3. Golang Scheduler 将 G2 调离线程,从 runnable queue 中找一个新的且可运行的 goroutine 调度执行

按照上面「读取操作唤醒写入操作」过程的描述,写入操作 G1 应该是先获取锁,然后将消息写入 buf 中,然后释放锁。最终将 G2 唤醒。G2 下次再被调度执行的时候就可以拿到 buf 中的消息了。但是,Golang 对这一过程进行了优化:

  1. 去掉 G1 对锁的操作
  2. 直接将 G1 当中要写入的数据塞入 G2 sudog 中的 elem 成员

第一个操作不用说了,减少对锁的操作肯定会提升程序的执行效率。第二个操作避免了 G2 再一次访问 buf,也提升了程序运行的效率。