Channel

Channel

Go 语言 Channel 实现原理精要 | Go 语言设计与实现 (draveness.me)

不要通过共享内存来通信,而要通过通信来实现内存共享

Channel 用于 goroutine 之间通信,充当一个先进先出的队列

语法

// 初始化

var readOnlyChan <-chan int  // 只读 channel
var writeOnlyChan chan<- int // 只写 channel
var ch chan int // 可读可写

readOnlyChan1 := make(<-chan int, 2)  // 只读且带缓存区的 channel
readOnlyChan2 := make(<-chan int)   // 只读且不带缓存区 channel

writeOnlyChan3 := make(chan<- int, 4) // 只写且带缓存区 channel
writeOnlyChan4 := make(chan<- int) // 只写且不带缓存区 channel

ch := make(chan int, 10)  // 可读可写且带缓存区

// 写入
ch <- 20
// 读取
value := <-ch      // 仅接收数据
value,ok := <-ch   // 接收数据以及代表操作成功与否的 bool 值

被关闭的有缓冲 channel 缓冲区不为空时仍能读出有效数据(ok == true),缓冲区读完后的后续读取返回元素类型的零值(ok==false)

发生 panic 的情况

数据结构

Channel 是一个引用类型,用 make 函数创建初始化 channel 时会在堆上分配一个 runtime.hchan 类型的数据结构,并返回指向这块内存区域的指针

type hchan struct {
   qcount   uint           // channel 中的元素数量
   dataqsiz uint           // 循环队列大小
   buf      unsafe.Pointer // 指向循环队列的指针
   elemsize uint16         // 循环队列中的每个元素的大小
   closed   uint32         // 标记位,标记channel是否关闭
   elemtype *_type         // 循环队列中的元素类型
   sendx    uint           // 已发送/待接收元素在循环队列中的索引
   recvx    uint           // 已接收/待发送元素在循环队列中的索引
   recvq    waitq          // 等待从channel接收消息的goroutine队列
   sendq    waitq          // 等待向channel写入消息的goroutine队列
   
   lock mutex              // 互斥锁,对channel的数据读写操作加锁,保证并发安全
}

// waitq是sudog的双向链表
type waitq struct {
    first *sudog           // sudog队列的队头指针
    last  *sudog           // sudog队列的队尾指针
}

// sudog是对goroutine的封装,存储了两个分别指向前后sudog的指针以构成链表
type sudog struct {
   g *g                    // 绑定的goroutine
   next *sudog             // 指向sudog链表中的下一个节点
   prev *sudog             // 指向sudog链表中的下一个节点
   elem unsafe.Pointer     // 数据对象
   acquiretime int64     
   releasetime int64
   ticket      uint32
   isSelect bool
   success  bool
   parent   *sudog         // semaRoot binary tree
   waitlink *sudog         // g.waiting list or semaRoot
   waittail *sudog         // semaRoot
   c        *hchan         // channel
}

无缓冲 channel 在读写的时候是阻塞的
有缓冲 channel 在缓冲队列满了之后写入阻塞,队列空时读取阻塞

...
...
buf
buf
sendx
sendx
recvx
recvx
recvq
recvq
sendq
sendq
...
...
1
1
3
3
4
4
6
6
5
5
123
123
456
456
789
789
waitq
waitq
waitq
waitq
sudog
sudog
sudog
sudog
sudog
sudog
sudog
sudog
sudog
sudog
sudog
sudog
sudog
sudog
first
first
first
first
last
last
last
last
0
0
2
2
7
7
hchan
hchan
Text is not SVG - cannot display

channel 的工作原理为通过 hchan 结构体的 buf,并使用 copy 内存的方式进行通信,最后达到了共享内存的目的

Channel 结束使用后需要 close,避免程序一直在等待以及资源的浪费
可以从被 close 的 channel 中接收数据,缓存数据接收完后将继续接收到零值,此时 ok 为 false

初始化

Channel 通过 runtime.makechan 函数初始化

func makechan(t *chantype, size int) *hchan {
	elem := t.elem

	// compiler checks this but be safe.
	if elem.size >= 1<<16 {
		throw("makechan: invalid channel element type")
	}
	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
		throw("makechan: bad alignment")
	}

	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

  // 如果 hchan 结构体中不含指针,GC 就不会扫描 chan 中的元素
	var c *hchan
	switch {
	// 不存在缓冲区,只为hchan分配一段内存空间
	case mem == 0: 
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		// Race detector uses this location for synchronization.
		c.buf = c.raceaddr()
	// 元素类型不含指针,一次性分配 hchan 和 buf 的内存
	case elem.ptrdata == 0: 
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default: // 包含指针情况下,单独为hchan和缓冲区分配内存
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}

	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)  // 循环数组长度
	lockInit(&c.lock, lockRankHchan)

	if debugChan {
		print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
	}
	return c  // 返回 hchan 指针
}

如果 hchan 结构体中不含指针,GC 就不会扫描 chan 中的元素
为 channel 开辟内存分为三种情况:

操作

发送

往 channel 发送数据分为三种方式:

读取

关闭

如何优雅地关闭 channel | Go 程序员面试笔试宝典 (golang.design)

Close 逻辑比较简单,对于一个 channel,recvq 和 sendq 中分别保存了阻塞的等待发送者和等待接收者
Close 函数先上锁,接着把所有挂在这个 channel 上的 sender 和 receiver 全都连成一个 sudog 链表,再解锁,最后将所有的 sudog 全都唤醒
关闭 channel 后,对于等待接收者而言,会收到一个相应类型的零值,对于等待发送者,会直接 panic

推荐关闭原则
不要从一个 receiver 侧关闭 channel,也不要在有多个 sender 时,关闭 channel
本质原则
不要关闭或写入已关闭 channel

在 Go 语言中,对于一个 channel,如果最终没有任何 goroutine 引用它,不管 channel 有没有被关闭,最终都会被 GC 回收

顺序

  1. 第 n 个 send 一定先于第 n 个 receive finished,无论是缓冲型还是非缓冲型的 channel
  2. 对于容量为 m 的缓冲型 channel,第 n 个 receive 一定先于第 n+m 个 send finished
  3. 对于非缓冲型的 channel,第 n 个 receive 一定先于第 n 个 send finished
  4. Channel close 一定先于 receiver 得到通知

并发安全

Channel 是并发安全的,多个 goroutine 同时读取 channel 中的数据,不会产生并发安全问题

当使用 send (ch <- xx) 或者 recv ( <-ch) 的时候,首先通过 hchan.lock 锁住 hchan 这个结构体,然后再读写

当有缓冲 channel 中已经有 receiver 在等待时,新到来的向 channel 发送数据的 sender 直接从源地址将数据拷贝到目的地址,通过增加写屏障避免问题出现

Select

Select 语句只能用于 channel 操作,每个 case 必须是一个通道操作,要么是发送要么是接收

Select 语句会监听所有指定的通道上的操作

select 的随机执行避免了饥饿问题,当多个 case 满足条件时避免因为顺序 case 导致后面的 case 过久得不到调用

对于 select 语句,在进入该语句时,会按源码的顺序对每一个 case 子句进行求值:这个求值只针对发送或接收操作的额外表达式

原理

go 语言设计与实现
我们简单总结一下 select 结构的执行过程与实现原理,首先在编译期间,Go 语言会对 select 语句进行优化,它会根据 select 中 case 的不同选择不同的优化路径:

  1. 空的 select 语句会被转换成调用 runtime.block 直接挂起当前 Goroutine;
  2. 如果 select 语句中只包含一个 case,编译器会将其转换成 if ch == nil { block }; n; 表达式;
    • 首先判断操作的 Channel 是不是空的;
    • 然后执行 case 结构中的内容;
  3. 如果 select 语句中只包含两个 case 并且其中一个是 default,那么会使用 runtime.selectnbrecv 和 runtime.selectnbsend 非阻塞地执行收发操作;
  4. 在默认情况下会通过 runtime.selectgo 获取执行 case 的索引,并通过多个 if 语句执行对应 case 中的代码;

在编译器已经对 select 语句进行优化之后,Go 语言会在运行时执行编译期间展开的 runtime.selectgo 函数,该函数会按照以下的流程执行:

  1. 随机生成一个遍历的轮询顺序 pollOrder 并根据 Channel 地址生成锁定顺序 lockOrder
  2. 根据 pollOrder 遍历所有的 case 查看是否有可以立刻处理的 Channel;
    1. 如果存在,直接获取 case 对应的索引并返回;
    2. 如果不存在,创建 runtime.sudog 结构体,将当前 Goroutine 加入到所有相关 Channel 的收发队列,并调用 runtime.gopark 挂起当前 Goroutine 等待调度器的唤醒;
  3. 当调度器唤醒当前 Goroutine 时,会再次按照 lockOrder 遍历所有的 case,从中查找需要被处理的 runtime.sudog 对应的索引;

select 关键字是 Go 语言特有的控制结构,它的实现原理比较复杂,需要编译器和运行时函数的通力合作。

Channel 实现互斥锁

package main

import (
    "log"
    "time"
)

type Mutex struct {
    ch chan struct{}
}

// 初始化锁
func NewMutex() *Mutex {
    mu := &Mutex{make(chan struct{}, 1)}
    mu.ch <- struct{}{}
    return mu
}

// 加锁,阻塞获取
func (m *Mutex) Lock()  {
    <- m.ch
}

// 释放锁
func (m *Mutex) Unlock()  {
    select {
    // 成功写入 channel 代表释放成功
    case m.ch <- struct{}{}:
    default:
        panic("unlock of unlocked mutex")
    }
}

// 尝试获取锁
func (m *Mutex) TryLock() bool {
    select {
    case <-m.ch:
        return true
    default:

    }
    return false
}

func (m *Mutex) LockTimeout(timeout time.Duration) bool {
    timer := time.NewTimer(timeout)

    select {
    case <-m.ch:
    // 成功获取锁关闭定时器
        timer.Stop()
        return true
    case <-timer.C:

    }
  // 获取锁超时
    return false
}

// 是否上锁
func (m *Mutex) IsLocked() bool {
    return len(m.ch) == 0
}


func main()  {
    m := NewMutex()
    ok := m.TryLock()
    log.Printf("locked v %v\n", ok)
    ok = m.TryLock()
    log.Printf("locked v %v\n", ok)

    go func() {
        time.Sleep(5*time.Second)
        m.Unlock()
    }()

    ok = m.LockTimeout(10*time.Second)
    log.Printf("LockTimeout v %v\n", ok)
}