type hchan struct { qcount uint// total data in the queue 当前 channel 中存在多少个元素; dataqsiz uint// size of the circular queue 当前 channel 能存放的元素容量; buf unsafe.Pointer // points to an array of dataqsiz elements channel 中用于存放元素的环形缓冲区; elemsize uint16// channel 元素类型的大小; closed uint32// 标识 channel 是否关闭; elemtype *_type // element type \ channel 元素类型; sendx uint// send index 发送元素进入环形缓冲区的 index; recvx uint// receive index 接收元素所处的环形缓冲区的 index; recvq waitq // list of recv waiters 因接收而陷入阻塞的协程队列;waitq是啥下面会贴 sendq waitq // list of send waiters 因发送而陷入阻塞的协程队列; lock mutex // 并发安全必须有的东西 }
type waitq struct {//阻塞的协程队列 first *sudog //sudog 是啥,下面会贴 last *sudog }
type sudog struct { g *g //goroutine,协程;
next *sudog 队列中的下一个节点; prev *sudog 队列中的前一个节点 elem unsafe.Pointer // data element (may point to stack) 读取/写入 channel 的数据的容器;
// ... mem, overflow := math.MulUintptr(elem.size, uintptr(size)) // mem 大小为 element 类型大小与 element 个数相乘后得到 // 仅当无缓冲型 channel 时,因size 为 0 导致 mem= 0; // 当 elem 是 struct{} 类型,其 elem.size=0 也导致 mem=0 if overflow || mem > maxAlloc-hchanSize || size < 0 { // 判断申请内存空间大小是否越界 panic(plainError("makechan: size out of range")) }
var c *hchan switch { case mem == 0: // Queue or element size is zero. // hchanSize 是源码中写死的一个常量。hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1)) c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. c.buf = c.raceaddr() case elem.ptrdata == 0: // Elements do not contain pointers. 不是指针类型。(是 struct 类型) // Allocate hchan and buf in one call. //有缓冲的 struct 型,则一次性分配好 hchanSize + mem 大小的空间,并且调整 chan 的 buf 指向 mem 的起始位置; c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // Elements contain pointers. c = new(hchan) // 有缓冲的 pointer 型,则分别申请 chan 和 buf 的空间,两者无需连续; c.buf = mallocgc(mem, elem, true) }
funcchansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr)bool { // ... lock(&c.lock) // ... if c.qcount < c.dataqsiz { // Space is available in the channel buffer. Enqueue the element to send. // 将当前元素添加到环形缓冲区 sendx 对应的位置; qp := chanbuf(c, c.sendx) typedmemmove(c.elemtype, qp, ep)
funcchanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { if c == nil { gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) // park 挂起,引起死锁; throw("unreachable") }
lock(&c.lock)
if c.closed != 0 { if c.qcount == 0 { unlock(&c.lock) if ep != nil { // 赋0值 typedmemclr(c.elemtype, ep) } // 关闭且无元素、直接解锁返回即可 returntrue, false } // The channel has been closed, but the channel's buffer have data. // 如果关闭了。但是缓冲区有元素。这个 if 会直接结束。往下走到 case 2 的逻辑里 } }
funcclosechan(c *hchan) { if c == nil { // 关闭未初始化过的 channel 会 panic; panic(plainError("close of nil channel")) }
lock(&c.lock) if c.closed != 0 { unlock(&c.lock) // 重复关闭 channel 会 panic; panic(plainError("close of closed channel")) }
c.closed = 1
var glist gList //将阻塞读协程队列中的协程节点统一添加到 glist; // release all readers for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false glist.push(gp) }
// 将阻塞写协程队列中的协程节点统一添加到 glist;(他们会 panic) // release all writers (they will panic) for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false glist.push(gp) } unlock(&c.lock)
// 唤醒 glist 当中的所有协程. // Ready all Gs now that we've dropped the channel lock. for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) // 至于 gp 是怎么 panic 的,可以看上面介绍的写时 gopark 阻塞,又被唤醒的逻辑。