Golang中channel的原理解读(推荐)
作者:可问春风丶 发布时间:2024-02-08 15:41:38
数据结构
channel的数据结构在$GOROOT/src/runtime/chan.go
文件下:
type hchan struct {
qcount uint // 当前队列中剩余元素个数
dataqsiz uint // 环形队列长度,即可以存放的元素个数
buf unsafe.Pointer // 环形队列指针
elemsize uint16 // 每个元素的大小
closed uint32 // 标记是否关闭
elemtype *_type // 元素类型
sendx uint // 队列下标,指向元素写入时存放到队列中的位置
recvx uint // 队列下标,指向元素从队列中读出的位置
recvq waitq // 等待读消息的groutine队列
sendq waitq // 等待写消息的groutine队列
lock mutex // 互斥锁
}
chan内部实现了一个环形队列作为缓冲区,队列的长度在创建chan时指定:
等待队列(recvq/sendq)使用双向链表 runtime.waitq 表示,链表中所有的元素都是 runtime.sudog
结构:
type waitq struct {
first *sudog
last *sudog
}
type sudog struct {
g *g
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
acquiretime int64
releasetime int64
ticket uint32
isSelect bool
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
创建channel
通常使用make(channel string, 0)
的方式创建无缓存的channel,使用make(channel string, 10)
创建有缓存的channel。
源码:
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
case mem == 0:
// 如果当前 Channel 中不存在缓冲区,那么就只会为 runtime.hchan 分配一段内存空间;
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// 如果当前 Channel 中存储的类型不是指针类型,会为当前的 Channel 和底层的数组分配一块连续的内存空间;
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
//单独为 runtime.hchan 和缓冲区分配内存;
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
// 在函数的最后会统一更新elemsize、elemtype 和 dataqsiz 几个字段;
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
channel读写
写
当有新数据来时,首先判断recvq中是否有groutine存在,如果recvq不为空,则说明缓冲区为空,或者没有缓冲区,因为如果缓冲区有数据会被recvq里面的groutine消费。此时从recvq中拿出一个groutine并绑定数据,唤醒该groutine执行任务,这个过程跳过了将数据写入缓冲区的过程。
如果缓冲区有数据并有空余位置,将数据放入缓冲区。
如果缓冲区有数据但没有空余位置,当前groutine绑定数据并放入sendx,进入睡眠,等待被唤醒。
源码:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
.....
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 如果Channel 没有被关闭并且已经有处于读等待的 Goroutine,
// 那么从接收队列 recvq 中取出最先陷入等待的 Goroutine 并直接向它发送数据
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 如果recvq为空且缓冲区中还有剩余空间
if c.qcount < c.dataqsiz {
// 计算出下一个可以存储数据的位置,
qp := chanbuf(c, c.sendx)
// raceenabled: 是否启用数据竞争检测,在编译时指定,默认为false
if raceenabled {
// 发出数据竞争警告
raceacquire(qp)
racerelease(qp)
}
// 将发送的数据拷贝到缓冲区中,产生内存拷贝
typedmemmove(c.elemtype, qp, ep)
// 增加 sendx 索引
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 增加计数器
c.qcount++
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
// 将channel数据绑定到当前groutine并使groutine休眠
// 获取发送数据使用的 Goroutine
gp := getg()
// 获取 runtime.sudog 结构并设置这一次阻塞发送的相关信息,
// 例如发送的 Channel、是否在 select 中和待发送数据的内存地址等
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// 将刚刚创建并初始化的 mysg 加入发送等待队列,并设置到当前 Goroutine的waiting上,
// 表示 Goroutine 正在等待该sudog准备就绪
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
// 休眠groutine
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// 保证传入的数据不被GC
KeepAlive(ep)
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}
读
如果sendx不为空且缓冲区不为空,从缓冲区头部读出数据并在当前G执行任务,在sendx中拿出一个G,将其数据写入缓冲区尾部并唤醒该G。
如果sendx不为空且缓冲区为空,直接从sendx中拿出一个G,将G中数据取出并唤醒该G。
如果sendx为空且缓冲区不为空,则从缓冲区头部拿出一个数据。
如果sendx为空且缓冲区为空,将该G放入recvq,进入休眠,等待被唤醒。
源码:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// block:这次接收是否阻塞
if debugChan {
print("chanrecv: chan=", c, "\n")
}
if c == nil {
if !block {
return
}
// 从一个空 Channel 接收数据时会直接让出处理器的使用权
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
if !block && empty(c) {
// 如果channel为空并且未关闭,直接返回
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) {
// The channel is irreversibly closed and empty.
if raceenabled {
raceacquire(c.raceaddr())
}
if ep != nil {
// 手动标记清楚对象
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
//如果channel为空,并且已关闭,说明对象不可达
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
// 手动标记清除
typedmemclr(c.elemtype, ep)
}
return true, false
}
// 如果sendq不为空,直接消费,避免sendq --> queue --> recvx的过程
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// 当 Channel 的缓冲区中已经包含数据时,从 Channel 中接收数据会直接从缓冲区中
// recvx 的索引位置中取出数据进行处理
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
// 如果接收数据的内存地址不为空,那么会使用 runtime.typedmemmove将缓冲区中的数据拷贝到内存中
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 使用 runtime.typedmemclr清除队列中的数据并完成收尾工作
typedmemclr(c.elemtype, qp)
c.recvx++
// recvx位置归零
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount-- // 计数减一
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
// 当 sendq不为空 并且缓冲区中也不存在任何数据时,阻塞并休眠当前groutine
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
来源:https://blog.csdn.net/qq_41768400/article/details/120711768


猜你喜欢
- SQLite是一款轻型的数据库,是遵守ACID的关系型数据库管理系统。不像常见的客户-服务器范例,SQLite引擎不是个程序与之通信的独立进
- 1、定义路由// 阿里云文件储存Route::group(['prefix'=>'aliyun'],
- 在登陆界面中,通常,最重要的部分为登陆的Form表。一个非常棒的提升体验的做法是,在载入页面时自动聚焦到第一个提供用户输入的表单框,让用户不
- 查询重写插件从MySQL 5.7.6开始,MySQL Server支持查询重写插件,可以在服务器执行之前检查并可能修改服务器接收的语句。以下
- 前言最近需要读取Excel中的内容,然后进行后续操作,对于这块知识,博主以前以为自己不会涉及到,但是现在一涉及到,第一步就错了,搞了好久。真
- 教程前先给大家看看小编的实现成果吧!图1:图2:图3:教程:实现这个功能我们需要五个php文件:login.php(登录界面,如图2)<
- 简要讨论为什么它不提供++作为运算符 正常情况下,当有人问起++原因而不是Python中的运算符时,这一行引起了我的注意。如果您想知道最初的
- <%dim total(7,1) total(1,0)="中国经营报"
- 1. 前言分形几何是几何数学中的一个分支,也称大自然几何学,由著名数学家本华曼德勃罗( 法语:BenoitB.Mandelbrot)在 19
- 当我们打开一个图片编辑软件时,基本上都会提供几个操作:平移、缩放和旋转。特别目前在手机里拍照时,由于位置传感器的失灵,也许是软件的BUG,有
- 在这个由两部分组成的系列文章的第二部分中,我们将继续探索如何将函数式编程方法中的好想法引入到 Python中,以实现两全其美。在上一篇文章中
- 有时候,我们需要将TensorFlow的模型导出为单个文件(同时包含模型架构定义与权重),方便在其他地方使用(如在c++中部署网络)。利用t
- 列表推导(list comprehensions)场景1:将一个三维列表中所有一维数据为a的元素合并,组成新的二维列表。最简单的方法:新建列
- 一、前期配置 1. 加入依赖<dependency> <groupId>co
- 前言最近在工作中遇到一个需求,就是要把SQLite数据中没有存储的文件名的文件删除掉,想来想去还是决定用python。所以也就花了一天半的时
- 这两天为了这个问题, Gitter 上问, Twitter 上问, GitHub 上问, 两天没反应原来写博客的 jlongster 不理我
- 在日常工作中;经常会遇到这样的需求:Oracle 数据表跟文本或者文件格式进行交互;即将指定文件内容导入对应的 Oracle 数据表中;或者
- 引言承接上篇 parseHTML 函数源码解析拿到返回值后的处理接下来我们将会讲解当 textEnd === 0 解析器遇到结束标签,par
- 以下的文章主要介绍的是SQL Serve数据库到DB2连接服务器的实现过程,我们大家都知道不同数据库平台的互连,一般对其称之为数据库的异构服
- 文章前言每周五上午十二点前需要将项目上各组开发分支合并软集仓库分支, 需要在十个项目上进行 merge程序员一般都是 很讨厌麻烦, 所以编写