Channel
Channel
不要通过共享内存来通信,而要通过通信来实现内存共享
Channel 用于 goroutine 之间通信,充当一个先进先出的队列
- 先从 Channel 读取数据的 Goroutine 会先接收到数据
- 先向 Channel 发送数据的 Goroutine 会得到先发送数据的权利
语法
// 初始化
var readOnlyChan <-chan int // 只读 channel
var writeOnlyChan chan<- int // 只写 channel
var ch chan int // 可读可写
readOnlyChan1 := make(<-chan int, 2) // 只读且带缓存区的 channel
readOnlyChan2 := make(<-chan int) // 只读且不带缓存区 channel
writeOnlyChan3 := make(chan<- int, 4) // 只写且带缓存区 channel
writeOnlyChan4 := make(chan<- int) // 只写且不带缓存区 channel
ch := make(chan int, 10) // 可读可写且带缓存区
// 写入
ch <- 20
// 读取
value := <-ch // 仅接收数据
value,ok := <-ch // 接收数据以及代表操作成功与否的 bool 值
被关闭的有缓冲 channel 缓冲区不为空时仍能读出有效数据(ok == true),缓冲区读完后的后续读取返回元素类型的零值(ok==false)
- 对一个 nil channel 进行读写会引发协程的永久阻塞
- Channel 读端和写端都可以有多个 goroutine 操作,在一端关闭 channel 的时候,该 channel 读端的所有 goroutine 都会收到 channel 已关闭的消息
- Channel 可以相互比较,比较两个不是 nil 的 channel 实际上是比较它们引用的对象是否是同一个
发生 panic 的情况
- 向一个关闭的 channel 进行写操作
- Close 一个 nil 的 channel
- Close 一个已关闭的 channel
数据结构
Channel 是一个引用类型,用 make 函数创建初始化 channel 时会在堆上分配一个 runtime.hchan 类型的数据结构,并返回指向这块内存区域的指针
type hchan struct {
qcount uint // channel 中的元素数量
dataqsiz uint // 循环队列大小
buf unsafe.Pointer // 指向循环队列的指针
elemsize uint16 // 循环队列中的每个元素的大小
closed uint32 // 标记位,标记channel是否关闭
elemtype *_type // 循环队列中的元素类型
sendx uint // 已发送/待接收元素在循环队列中的索引
recvx uint // 已接收/待发送元素在循环队列中的索引
recvq waitq // 等待从channel接收消息的goroutine队列
sendq waitq // 等待向channel写入消息的goroutine队列
lock mutex // 互斥锁,对channel的数据读写操作加锁,保证并发安全
}
// waitq是sudog的双向链表
type waitq struct {
first *sudog // sudog队列的队头指针
last *sudog // sudog队列的队尾指针
}
// sudog是对goroutine的封装,存储了两个分别指向前后sudog的指针以构成链表
type sudog struct {
g *g // 绑定的goroutine
next *sudog // 指向sudog链表中的下一个节点
prev *sudog // 指向sudog链表中的下一个节点
elem unsafe.Pointer // 数据对象
acquiretime int64
releasetime int64
ticket uint32
isSelect bool
success bool
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
无缓冲 channel 在读写的时候是阻塞的
有缓冲 channel 在缓冲队列满了之后写入阻塞,队列空时读取阻塞
channel 的工作原理为通过 hchan 结构体的 buf,并使用 copy 内存的方式进行通信,最后达到了共享内存的目的
Channel 结束使用后需要 close,避免程序一直在等待以及资源的浪费
可以从被 close 的 channel 中接收数据,缓存数据接收完后将继续接收到零值,此时 ok 为 false
初始化
Channel 通过 runtime.makechan 函数初始化
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// 如果 hchan 结构体中不含指针,GC 就不会扫描 chan 中的元素
var c *hchan
switch {
// 不存在缓冲区,只为hchan分配一段内存空间
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
// 元素类型不含指针,一次性分配 hchan 和 buf 的内存
case elem.ptrdata == 0:
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default: // 包含指针情况下,单独为hchan和缓冲区分配内存
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size) // 循环数组长度
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c // 返回 hchan 指针
}
如果 hchan 结构体中不含指针,GC 就不会扫描 chan 中的元素
为 channel 开辟内存分为三种情况:
- 没有缓冲区 buf,即创建无缓冲区的 channel,只分配 hchan 本身结构体大小的内存
- 有缓冲区 buf,但元素类型不含指针,一次性为当前的 hchan 结构和 buf 数组分配一块连续的内存空间
- 有缓冲区,且元素包含指针类型,分两次分配内存,先为 hchan 结构和分配内存,再为 buf 数组元素分配内存
操作
发送
往 channel 发送数据分为三种方式:
- 直接发送:当前 channel 的 recvq 不为空(有正在阻塞等待接收数据的 goroutine)时,出队 recvq 里 first 指针指向的 sudog,将其加入到 P 的可运行 goroutine 队列中,然后,sender 把发送元素直接 copy 到该接收 goroutine 中(不经过缓冲区),最后会调用
goready将该 goroutine 唤醒,状态变为 runnable - 缓冲发送:会判定缓冲区的剩余空间,如果有剩余空间,则将数据拷贝到 channel 中 sendx 索引处,之后 sendx 索引自行自增 1(若 sendx 等于 dataqsiz ,则将 sendx 置 0,因为 buf 是一个环形数组),自增完成之后,队列总数自增 1
- 阻塞发送:当前 channel 的 recvq 为空(没有正在阻塞等待接收数据的 goroutine) 且缓冲区已满时,发送的 goroutine 被阻塞,首先获取 sudog ,将该 goroutine 绑定到 sudog 上,加入到当前 channel 的 sendq 队列中,然后调用 gopark 方法挂起当前 goroutine,等待被唤醒
读取
- 从一个空 channel 接收数据,goroutine 会被挂起并阻塞等待
- 当前 channel 的 sendq 有 goroutine 等待发送数据时
- 如果是无缓冲的 channel,当有接收者到来时,会直接从阻塞等待发送的 goroutine 拷贝数据到接收 goroutine 的接收区
- 如果是有缓冲的 channel,此时缓冲区满,当有接收者到来时,会先从缓冲区把数据拷贝到接收者(注意,此时 recvx 和 sendx 相等,拷贝完之后,recvx 和 sendx 都自增 1),然后把等待的发送者的数据拷贝到缓冲区
- 当 channel 有缓冲区,并且缓冲区为空,且没有发送者时,这时 channel 阻塞,接收的 goroutine 被挂起,等待被唤醒
- 当 channel 有缓冲区,并且缓冲区有数据但未满,当有接收者来接收数据时,直接把缓冲区把数据拷贝到接收者
关闭
Close 逻辑比较简单,对于一个 channel,recvq 和 sendq 中分别保存了阻塞的等待发送者和等待接收者
Close 函数先上锁,接着把所有挂在这个 channel 上的 sender 和 receiver 全都连成一个 sudog 链表,再解锁,最后将所有的 sudog 全都唤醒
关闭 channel 后,对于等待接收者而言,会收到一个相应类型的零值,对于等待发送者,会直接 panic
推荐关闭原则
不要从一个 receiver 侧关闭 channel,也不要在有多个 sender 时,关闭 channel
本质原则
不要关闭或写入已关闭 channel
在 Go 语言中,对于一个 channel,如果最终没有任何 goroutine 引用它,不管 channel 有没有被关闭,最终都会被 GC 回收
顺序
- 第 n 个
send一定先于第 n 个receive finished,无论是缓冲型还是非缓冲型的 channel - 对于容量为 m 的缓冲型 channel,第 n 个
receive一定先于第 n+m 个send finished - 对于非缓冲型的 channel,第 n 个
receive一定先于第 n 个send finished - Channel close 一定先于 receiver 得到通知
并发安全
Channel 是并发安全的,多个 goroutine 同时读取 channel 中的数据,不会产生并发安全问题
当使用 send (ch <- xx) 或者 recv ( <-ch) 的时候,首先通过 hchan.lock 锁住 hchan 这个结构体,然后再读写
当有缓冲 channel 中已经有 receiver 在等待时,新到来的向 channel 发送数据的 sender 直接从源地址将数据拷贝到目的地址,通过增加写屏障避免问题出现
Select
Select 语句只能用于 channel 操作,每个 case 必须是一个通道操作,要么是发送要么是接收
Select 语句会监听所有指定的通道上的操作
- 一旦其中一个通道准备好就会执行相应的代码块
- 如果多个通道都准备好,那么 select 语句会随机选择一个通道执行
- 如果所有通道都没有准备好,那么执行 default 中的代码
- 如果没有 default 分支,select 会阻塞在多个 channel 上,对多个 channel 的读/写事件进行监控
select 的随机执行避免了饥饿问题,当多个 case 满足条件时避免因为顺序 case 导致后面的 case 过久得不到调用
对于 select 语句,在进入该语句时,会按源码的顺序对每一个 case 子句进行求值:这个求值只针对发送或接收操作的额外表达式
原理
go 语言设计与实现
我们简单总结一下select结构的执行过程与实现原理,首先在编译期间,Go 语言会对select语句进行优化,它会根据select中case的不同选择不同的优化路径:
- 空的
select语句会被转换成调用runtime.block直接挂起当前 Goroutine;- 如果
select语句中只包含一个case,编译器会将其转换成if ch == nil { block }; n;表达式;
- 首先判断操作的 Channel 是不是空的;
- 然后执行
case结构中的内容;- 如果
select语句中只包含两个case并且其中一个是default,那么会使用runtime.selectnbrecv和runtime.selectnbsend非阻塞地执行收发操作;- 在默认情况下会通过
runtime.selectgo获取执行case的索引,并通过多个if语句执行对应case中的代码;在编译器已经对
select语句进行优化之后,Go 语言会在运行时执行编译期间展开的runtime.selectgo函数,该函数会按照以下的流程执行:
- 随机生成一个遍历的轮询顺序
pollOrder并根据 Channel 地址生成锁定顺序lockOrder;- 根据
pollOrder遍历所有的case查看是否有可以立刻处理的 Channel;
- 如果存在,直接获取
case对应的索引并返回;- 如果不存在,创建
runtime.sudog结构体,将当前 Goroutine 加入到所有相关 Channel 的收发队列,并调用runtime.gopark挂起当前 Goroutine 等待调度器的唤醒;- 当调度器唤醒当前 Goroutine 时,会再次按照
lockOrder遍历所有的case,从中查找需要被处理的runtime.sudog对应的索引;
select关键字是 Go 语言特有的控制结构,它的实现原理比较复杂,需要编译器和运行时函数的通力合作。
Channel 实现互斥锁
package main
import (
"log"
"time"
)
type Mutex struct {
ch chan struct{}
}
// 初始化锁
func NewMutex() *Mutex {
mu := &Mutex{make(chan struct{}, 1)}
mu.ch <- struct{}{}
return mu
}
// 加锁,阻塞获取
func (m *Mutex) Lock() {
<- m.ch
}
// 释放锁
func (m *Mutex) Unlock() {
select {
// 成功写入 channel 代表释放成功
case m.ch <- struct{}{}:
default:
panic("unlock of unlocked mutex")
}
}
// 尝试获取锁
func (m *Mutex) TryLock() bool {
select {
case <-m.ch:
return true
default:
}
return false
}
func (m *Mutex) LockTimeout(timeout time.Duration) bool {
timer := time.NewTimer(timeout)
select {
case <-m.ch:
// 成功获取锁关闭定时器
timer.Stop()
return true
case <-timer.C:
}
// 获取锁超时
return false
}
// 是否上锁
func (m *Mutex) IsLocked() bool {
return len(m.ch) == 0
}
func main() {
m := NewMutex()
ok := m.TryLock()
log.Printf("locked v %v\n", ok)
ok = m.TryLock()
log.Printf("locked v %v\n", ok)
go func() {
time.Sleep(5*time.Second)
m.Unlock()
}()
ok = m.LockTimeout(10*time.Second)
log.Printf("LockTimeout v %v\n", ok)
}