Golang协程池gopool设计与实现
作者:ag9920 发布时间:2024-05-28 15:23:01
Goroutine
Goroutine 是 Golang 提供的一种轻量级线程,我们通常称之为「协程」,相比较线程,创建一个协程的成本是很低的。所以你会经常看到 Golang 开发的应用出现上千个协程并发的场景。
Goroutine 的优势:
与线程相比,Goroutines 成本很低。
它们的堆栈大小只有几 kb,堆栈可以根据应用程序的需要增长和缩小,context switch 也很快,而在线程的情况下,堆栈大小必须指定并固定。
Goroutine 被多路复用到更少数量的 OS 线程。
一个包含数千个 Goroutine 的程序中可能只有一个线程。如果该线程中的任何 Goroutine 阻塞等待用户输入,则创建另一个 OS 线程并将剩余的 Goroutine 移动到新的 OS 线程。所有这些都由运行时处理,作为开发者无需耗费心力关心,这也使得我们有很干净的 API 来支持并发。
Goroutines 使用 channel 进行通信。
channel 的设计有效防止了在使用 Goroutine 访问共享内存时发生竞争条件(race conditions) 。channel 可以被认为是 Goroutine 进行通信的管道。
下文中我们会以「协程」来代指 Goroutine。
协程池
在高并发场景下,我们可能会启动大量的协程来处理业务逻辑。协程池是一种利用池化技术,复用对象,减少内存分配的频率以及协程创建开销,从而提高协程执行效率的技术。
最近抽空了解了字节官方开源的 gopkg 库提供的 gopool
协程池实现,感觉还是很高质量的,代码也非常简洁清晰,而且 Kitex
底层也在使用 gopool
来管理协程,这里我们梳理一下设计和实现。
gopool
Repository:https://github.com/bytedance/gopkg/tree/develop/util/gopool
gopool
is a high-performance goroutine pool which aims to reuse goroutines and limit the number of goroutines. It is an alternative to thego
keyword.
了解官方 README 就会发现gopool
的用法其实非常简单,将曾经我们经常使用的 go func(){...}
替换为 gopool.Go(func(){...})
即可。
此时 gopool
将会使用默认的配置来管理你启动的协程,你也可以选择针对业务场景配置池子大小,以及扩容上限。
old:
go func() {
// do your job
}()
new:
import (
"github.com/bytedance/gopkg/util/gopool"
)
gopool.Go(func(){
/// do your job
})
核心实现
下面我们来看看gopool
是怎样实现协程池管理的。
Pool
Pool
是一个定义了协程池能力的接口。
type Pool interface {
// 池子的名称
Name() string
// 设置池子内Goroutine的容量
SetCap(cap int32)
// 执行 f 函数
Go(f func())
// 带 ctx,执行 f 函数
CtxGo(ctx context.Context, f func())
// 设置发生panic时调用的函数
SetPanicHandler(f func(context.Context, interface{}))
}
gopool
提供了这个接口的默认实现(即下面即将介绍的pool
),当我们直接调用 gopool.CtxGo 时依赖的就是这个。
这样的设计模式在 Kitex
中也经常出现,所有的依赖均设计为接口,便于随后扩展,底层提供一个默认的实现暴露出去,这样对调用方也很友好。
type pool struct {
// 池子名称
name string
// 池子的容量, 即最大并发工作的 goroutine 的数量
cap int32
// 池子配置
config *Config
// task 链表
taskHead *task
taskTail *task
taskLock sync.Mutex
taskCount int32
// 记录当前正在运行的 worker 的数量
workerCount int32
// 当 worker 出现panic时被调用
panicHandler func(context.Context, interface{})
}
// NewPool 创建一个新的协程池,初始化名称,容量,配置
func NewPool(name string, cap int32, config *Config) Pool {
p := &pool{
name: name,
cap: cap,
config: config,
}
return p
}
调用 NewPool
获取了以 Pool
的形式返回的 pool
结构体。
Task
type task struct {
ctx context.Context
f func()
next *task
}
task
是一个链表结构,可以把它理解为一个待执行的任务,它包含了当前节点需要执行的函数f
, 以及指向下一个task
的指针。
综合前一节 pool
的定义,我们可以看到,一个协程池 pool
对应了一组task
。
pool
维护了指向链表的头尾的两个指针:taskHead
和 taskTail
,以及链表的长度taskCount
和对应的锁 taskLock
。
Worker
type worker struct {
pool *pool
}
一个 worker
就是逻辑上的一个执行器,它唯一对应到一个协程池 pool
。当一个worker
被唤起,将会开启一个goroutine
,不断地从 pool
中的 task
链表获取任务并执行。
func (w *worker) run() {
go func() {
for {
// 声明即将执行的 task
var t *task
// 操作 pool 中的 task 链表,加锁
w.pool.taskLock.Lock()
if w.pool.taskHead != nil {
// 拿到 taskHead 准备执行
t = w.pool.taskHead
// 更新链表的 head 以及数量
w.pool.taskHead = w.pool.taskHead.next
atomic.AddInt32(&w.pool.taskCount, -1)
}
// 如果前一步拿到的 taskHead 为空,说明无任务需要执行,清理后返回
if t == nil {
w.close()
w.pool.taskLock.Unlock()
w.Recycle()
return
}
w.pool.taskLock.Unlock()
// 执行任务,针对 panic 会recover,并调用配置的 handler
func() {
defer func() {
if r := recover(); r != nil {
msg := fmt.Sprintf("GOPOOL: panic in pool: %s: %v: %s", w.pool.name, r, debug.Stack())
logger.CtxErrorf(t.ctx, msg)
if w.pool.panicHandler != nil {
w.pool.panicHandler(t.ctx, r)
}
}
}()
t.f()
}()
t.Recycle()
}
}()
}
整体来看
看到这里,其实就能把整个流程串起来了。我们来看看对外的接口 CtxGo(context.Context, f func())
到底做了什么?
func Go(f func()) {
CtxGo(context.Background(), f)
}
func CtxGo(ctx context.Context, f func()) {
defaultPool.CtxGo(ctx, f)
}
func (p *pool) CtxGo(ctx context.Context, f func()) {
// 创建一个 task 对象,将 ctx 和待执行的函数赋值
t := taskPool.Get().(*task)
t.ctx = ctx
t.f = f
// 将 task 插入 pool 的链表的尾部,更新链表数量
p.taskLock.Lock()
if p.taskHead == nil {
p.taskHead = t
p.taskTail = t
} else {
p.taskTail.next = t
p.taskTail = t
}
p.taskLock.Unlock()
atomic.AddInt32(&p.taskCount, 1)
// 以下两个条件满足时,创建新的 worker 并唤起执行:
// 1. task的数量超过了配置的限制
// 2. 当前运行的worker数量小于上限(或无worker运行)
if (atomic.LoadInt32(&p.taskCount) >= p.config.ScaleThreshold && p.WorkerCount() < atomic.LoadInt32(&p.cap)) || p.WorkerCount() == 0 {
// worker数量+1
p.incWorkerCount()
// 创建一个新的worker,并把当前 pool 赋值
w := workerPool.Get().(*worker)
w.pool = p
// 唤起worker执行
w.run()
}
}
相信看了代码注释,大家就能理解发生了什么。
gopool
会自行维护一个 defaultPool
,这是一个默认的 pool
结构体,在引入包的时候就进行初始化。当我们直接调用 gopool.CtxGo()
时,本质上是调用了 defaultPool
的同名方法
func init() {
defaultPool = NewPool("gopool.DefaultPool", 10000, NewConfig())
}
const (
defaultScalaThreshold = 1
)
// Config is used to config pool.
type Config struct {
// 控制扩容的门槛,一旦待执行的 task 超过此值,且 worker 数量未达到上限,就开始启动新的 worker
ScaleThreshold int32
}
// NewConfig creates a default Config.
func NewConfig() *Config {
c := &Config{
ScaleThreshold: defaultScalaThreshold,
}
return c
}
defaultPool
的名称为 gopool.DefaultPool
,池子容量一万,扩容下限为 1。
当我们调用 CtxGo
时,gopool
就会更新维护的任务链表,并且判断是否需要扩容 worker
:
若此时已经有很多
worker
启动(底层一个worker
对应一个goroutine
),不需要扩容,就直接返回。若判断需要扩容,就创建一个新的
worker
,并调用worker.run()
方法启动,各个worker
会异步地检查pool
里面的任务链表是否还有待执行的任务,如果有就执行。
三个角色的定位
task
是一个待执行的任务节点,同时还包含了指向下一个任务的指针,链表结构;worker
是一个实际执行任务的执行器,它会异步启动一个goroutine
执行协程池里面未执行的task
;pool
是一个逻辑上的协程池,对应了一个task
链表,同时负责维护task
状态的更新,以及在需要的时候创建新的worker
。
使用 sync.Pool 进行性能优化
其实到这个地方,gopool
已经是一个代码简洁清晰的协程池库了,但是性能上显然有改进空间,所以gopool
的作者应用了多次 sync.Pool
来池化对象的创建,复用woker和task对象。
这里建议大家直接看源码,其实在上面的代码中已经有所涉及。
task 池化
var taskPool sync.Pool
func init() {
taskPool.New = newTask
}
func newTask() interface{} {
return &task{}
}
func (t *task) Recycle() {
t.zero()
taskPool.Put(t)
}
worker 池化
var workerPool sync.Pool
func init() {
workerPool.New = newWorker
}
func newWorker() interface{} {
return &worker{}
}
func (w *worker) Recycle() {
w.zero()
workerPool.Put(w)
}
来源:https://juejin.cn/post/7086443265309818894


猜你喜欢
- --禁用 alter table tb disable trigger tir_name --啟用 alter table tb enabl
- 本文重在实践和测试,如果你还不了解Data URI,推荐先阅读秦歌的Data URI 和 MHTML。旺旺点灯(JS)实践经过:因为要对SR
- 本文实例讲述了PHP实现向关联数组指定的Key之前插入元素的方法。分享给大家供大家参考,具体如下:PHP 关联数组可以通过三种方式插入新元素
- 前言 大家周末好,今天给大家带来的是Python当中生成器和迭代器的使用。我当初第一次学到迭代器和生成器的时候,并没有太在意,只是觉得这是一
- vue2.x前置概念:路由钩子分类一共分3类, 7个钩子路由和组件的概念(方便理解钩子函数)路由和组件是2个概念, 可以粗犷的认为:路由是浏
- 建表,主键自增create table aaa(id bigint identity(1,1) not null PRIMARY key,n
- 概述要访问一个变量的内容,可以直接使用其名称。如果该变量是一个数组,可以使用变量名称和关键字或索引的组合来访问其内容。像其他变量一样,使用运
- 一、DataLoader理解在深度学习模型训练中,数据的预处理和读取是一个非常重要的问题。PyTorch作为深度学习框架之一,提供了Data
- 从Web查询数据库:Web数据库架构的工作原理 一个用户的浏览器发出一个HTTP请求,请求特定的Web页面,在该页面中出发form表单提交到
- 附上代码与运行结果截图:import time# 获取当前时间now = time.localtime()# 格式化日期now_ = tim
- Jupyter notebook 更改文件打开的默认路径第一步:修改图标- 找到 Jupyter notebook 桌面图标- 对着 Jup
- 本文实例讲述了Python画图的基本方法。分享给大家供大家参考,具体如下:Python:使用matplotlib绘制图表python绘制图表
- 本文为大家分享了python实现扫雷游戏的具体代码,供大家参考,具体内容如下本文实例借鉴mvc模式,核心数据为model,维护1个矩阵,0表
- 日志文件满而造成SQL数据库无法写入文件时,可用两种方法:一种方法:清空日志。1.打开查询分析器,输入命令DUMP TRANSAC
- 小贤是一条可爱的小狗(Dog), 它的叫声很好听(wow), 每次看到主人的时候就会乖乖叫一声(yelp).从这段描述可以得到以下对象:fu
- 方法一、尽量使用复杂的SQL来代替简单的一堆 SQL.同样的事务,一个复杂的SQL完成的效率高于一堆简单SQL完成的效率。有多个查询时,要善
- 在IE下,获取Param的时候有个诡异现象(不知道算不算bug)。为了清晰起见,下面用最简单的HTML和JavaScript来说明。有这么一
- 我们都知道Jupyter notebook更换主题后看着会很舒服,但是有个问题主题更换后工具栏不显示了。usename$ jt -lAvai
- Python虚拟机注:本篇是根据教程学习记录的笔记,部分内容与教程是相同的,因为转载需要填链接,但是没有,所以填的原创,如果侵权会直接删除。
- 黑夜可能漫长,但总会迎来温暖的阳光,三月如期而至,武大的樱花又一次盛开。那么今天就一起来看看怎样在python中画一棵美丽的樱花树~说到用p