网络编程
位置:首页>> 网络编程>> Go语言>> Go语言实现的可读性更高的并发神库详解

Go语言实现的可读性更高的并发神库详解

作者:asong2020  发布时间:2023-07-20 08:03:49 

标签:Go,可读性,并发,库

前言

前几天逛github发现了一个有趣的并发库-conc,其目标是:

  • 更难出现goroutine泄漏

  • 处理panic更友好

  • 并发代码可读性高

从简介上看主要封装功能如下:

  • waitGroup进行封装,避免了产生大量重复代码,并且也封装recover,安全性更高

  • 提供panics.Catcher封装recover逻辑,统一捕获panic,打印调用栈一些信息

  • 提供一个并发执行任务的worker池,可以控制并发度、goroutine可以进行复用,支持函数签名,同时提供了stream方法来保证结果有序

  • 提供ForEachmap方法优雅的处理切片

接下来就区分模块来介绍一下这个库;

仓库地址:github.com/sourcegraph…

WaitGroup的封装

Go语言标准库有提供sync.waitGroup控制等待goroutine,我们一般会写出如下代码:

func main(){
   var wg sync.WaitGroup
   for i:=0; i < 10; i++{
       wg.Add(1)
       go func() {
           defer wg.Done()
           defer func() {
               // recover panic
               err := recover()
               if err != nil {
                   fmt.Println(err)
               }
           }
           // do something
           handle()
       }
   }
   wg.Wait()
}

上述代码我们需要些一堆重复代码,并且需要单独在每一个func中处理recover逻辑,所以conc库对其进行了封装,代码简化如下:

func main() {
wg := conc.NewWaitGroup()
for i := 0; i &lt; 10; i++ {
wg.Go(doSomething)
}
wg.Wait()
}
func doSomething() {
fmt.Println("test")
}

conc库封装也比较简单,结构如下:

type WaitGroup struct {
wg sync.WaitGroup
pc panics.Catcher
}

其自己实现了Catcher类型对recover逻辑进行了封装,封装思路如下:

type Catcher struct {
recovered atomic.Pointer[RecoveredPanic]
}

recovered是原子指针类型,RecoveredPanic是捕获的recover封装,封装了堆栈等信息:

type RecoveredPanic struct {
// The original value of the panic.
Value any
// The caller list as returned by runtime.Callers when the panic was
// recovered. Can be used to produce a more detailed stack information with
// runtime.CallersFrames.
Callers []uintptr
// The formatted stacktrace from the goroutine where the panic was recovered.
// Easier to use than Callers.
Stack []byte
}

提供了Try方法执行方法,只会记录第一个panic的goroutine信息:

func (p *Catcher) Try(f func()) {
defer p.tryRecover()
f()
}
func (p *Catcher) tryRecover() {
if val := recover(); val != nil {
rp := NewRecoveredPanic(1, val)
       // 只会记录第一个panic的goroutine信息
p.recovered.CompareAndSwap(nil, &amp;rp)
}
}

提供了Repanic()方法用来重放捕获的panic:

func (p *Catcher) Repanic() {
if val := p.Recovered(); val != nil {
panic(val)
}
}
func (p *Catcher) Recovered() *RecoveredPanic {
return p.recovered.Load()
}

waitGroup对此也分别提供了Wait()WaitAndRecover()方法:

func (h *WaitGroup) Wait() {
h.wg.Wait()
// Propagate a panic if we caught one from a child goroutine.
h.pc.Repanic()
}
func (h *WaitGroup) WaitAndRecover() *panics.RecoveredPanic {
h.wg.Wait()
// Return a recovered panic if we caught one from a child goroutine.
return h.pc.Recovered()
}

wait方法只要有一个goroutine发生panic就会向上抛出panic,比较简单粗暴;

waitAndRecover方法只有有一个goroutine发生panic就会返回第一个recover的goroutine信息;

总结:conc库对waitGrouop的封装总体是比较不错的,可以减少重复的代码;

worker池

conc提供了几种类型的worker池:

  • ContextPool:可以传递context的pool,若有goroutine发生错误可以cancel其他goroutine

  • ErrorPool:通过参数可以控制只收集第一个error还是所有error

  • ResultContextPool:若有goroutine发生错误会cancel其他goroutine并且收集错误

  • RestultPool:收集work池中每个任务的执行结果,并不能保证顺序,保证顺序需要使用stream或者iter.map;

我们来看一个简单的例子:

import "github.com/sourcegraph/conc/pool"
func ExampleContextPool_WithCancelOnError() {
p := pool.New().
WithMaxGoroutines(4).
WithContext(context.Background()).
WithCancelOnError()
for i := 0; i < 3; i++ {
i := i
p.Go(func(ctx context.Context) error {
if i == 2 {
return errors.New("I will cancel all other tasks!")
}
<-ctx.Done()
return nil
})
}
err := p.Wait()
fmt.Println(err)
// Output:
// I will cancel all other tasks!
}

在创建pool时有如下方法可以调用:

  • p.WithMaxGoroutines()配置pool中goroutine的最大数量

  • p.WithErrors:配置pool中的task是否返回error

  • p.WithContext(ctx):配置pool中运行的task当遇到第一个error要取消

  • p.WithFirstError:配置pool中的task只返回第一个error

  • p.WithCollectErrored:配置pool的task收集所有error

pool的基础结构如下:

type Pool struct {
handle   conc.WaitGroup
limiter  limiter
tasks    chan func()
initOnce sync.Once
}

limiter是控制器,用chan来控制goroutine的数量:

type limiter chan struct{}
func (l limiter) limit() int {
return cap(l)
}
func (l limiter) release() {
if l != nil {
&lt;-l
}
}

pool的核心逻辑也比较简单,如果没有设置limiter,那么就看有没有空闲的worker,否则就创建一个新的worker,然后投递任务进去;

如果设置了limiter,达到了limiter worker数量上限,就把任务投递给空闲的worker,没有空闲就阻塞等着;

func (p *Pool) Go(f func()) {
p.init()
if p.limiter == nil {
// 没有限制
select {
case p.tasks <- f:
// A goroutine was available to handle the task.
default:
// No goroutine was available to handle the task.
// Spawn a new one and send it the task.
p.handle.Go(p.worker)
p.tasks <- f
}
} else {
select {
case p.limiter <- struct{}{}:
// If we are below our limit, spawn a new worker rather
// than waiting for one to become available.
p.handle.Go(p.worker)
// We know there is at least one worker running, so wait
// for it to become available. This ensures we never spawn
// more workers than the number of tasks.
p.tasks <- f
case p.tasks <- f:
// A worker is available and has accepted the task.
return
}
}
}

这里work使用的是一个无缓冲的channel,这种复用方式很巧妙,如果goroutine执行很快避免创建过多的goroutine;

使用pool处理任务不能保证有序性,conc库又提供了Stream方法,返回结果可以保持顺序;

Stream

Steam的实现也是依赖于pool,在此基础上做了封装保证结果的顺序性,先看一个例子:

func ExampleStream() {
times := []int{20, 52, 16, 45, 4, 80}
stream := stream2.New()
for _, millis := range times {
dur := time.Duration(millis) * time.Millisecond
stream.Go(func() stream2.Callback {
time.Sleep(dur)
// This will print in the order the tasks were submitted
return func() { fmt.Println(dur) }
})
}
stream.Wait()
// Output:
// 20ms
// 52ms
// 16ms
// 45ms
// 4ms
// 80ms
}

stream的结构如下:

type Stream struct {
pool             pool.Pool
callbackerHandle conc.WaitGroup
queue            chan callbackCh
initOnce sync.Once
}

queue是一个channel类型,callbackCh也是channel类型 - chan func():

type callbackCh chan func()

在提交goroutine时按照顺序生成callbackCh传递结果:

func (s *Stream) Go(f Task) {
s.init()
// Get a channel from the cache.
ch := getCh()
// Queue the channel for the callbacker.
s.queue <- ch
// Submit the task for execution.
s.pool.Go(func() {
defer func() {
// In the case of a panic from f, we don't want the callbacker to
// starve waiting for a callback from this channel, so give it an
// empty callback.
if r := recover(); r != nil {
ch <- func() {}
panic(r)
}
}()
// Run the task, sending its callback down this task's channel.
callback := f()
ch <- callback
})
}
var callbackChPool = sync.Pool{
New: func() any {
return make(callbackCh, 1)
},
}
func getCh() callbackCh {
return callbackChPool.Get().(callbackCh)
}
func putCh(ch callbackCh) {
callbackChPool.Put(ch)
}

ForEach和map

ForEach

conc库提供了ForEach方法可以优雅的并发处理切片,看一下官方的例子:

Go语言实现的可读性更高的并发神库详解

conc库使用泛型进行了封装,我们只需要关注handle代码即可,避免冗余代码,我们自己动手写一个例子:

func main() {
input := []int{1, 2, 3, 4}
iterator := iter.Iterator[int]{
MaxGoroutines: len(input) / 2,
}
iterator.ForEach(input, func(v *int) {
if *v%2 != 0 {
*v = -1
}
})
fmt.Println(input)
}

ForEach内部实现为Iterator结构及核心逻辑如下:

type Iterator[T any] struct {
MaxGoroutines int
}
func (iter Iterator[T]) ForEachIdx(input []T, f func(int, *T)) {
if iter.MaxGoroutines == 0 {
// iter is a value receiver and is hence safe to mutate
iter.MaxGoroutines = defaultMaxGoroutines()
}
numInput := len(input)
if iter.MaxGoroutines > numInput {
// No more concurrent tasks than the number of input items.
iter.MaxGoroutines = numInput
}
var idx atomic.Int64
// 通过atomic控制仅创建一个闭包
task := func() {
i := int(idx.Add(1) - 1)
for ; i < numInput; i = int(idx.Add(1) - 1) {
f(i, &input[i])
}
}
var wg conc.WaitGroup
for i := 0; i < iter.MaxGoroutines; i++ {
wg.Go(task)
}
wg.Wait()
}

可以设置并发的goroutine数量,默认取的是GOMAXPROCS ,也可以自定义传参;

并发执行这块设计的很巧妙,仅创建了一个闭包,通过atomic控制idx,避免频繁触发GC;

map

conc库提供的map方法可以得到对切片中元素结果,官方例子:

Go语言实现的可读性更高的并发神库详解

使用map可以提高代码的可读性,并且减少了冗余代码,自己写个例子:

func main() {
input := []int{1, 2, 3, 4}
mapper := iter.Mapper[int, bool]{
MaxGoroutines: len(input) / 2,
}
results := mapper.Map(input, func(v *int) bool { return *v%2 == 0 })
fmt.Println(results)
// Output:
// [false true false true]
}

map的实现也依赖于Iterator,也是调用的ForEachIdx方法,区别于ForEach是记录处理结果;

  • conc.WatiGroup对Sync.WaitGroup进行了封装,对Add、Done、Recover进行了封装,提高了可读性,避免了冗余代码

  • ForEach、Map方法可以更优雅的并发处理切片,代码简洁易读,在实现上Iterator中的并发处理使用atomic来控制只创建一个闭包,避免了GC性能问题

  • pool是一个并发的协程队列,可以控制协程的数量,实现上也很巧妙,使用一个无缓冲的channel作为worker,如果goroutine执行速度快,避免了创建多个goroutine

  • stream是一个保证顺序的并发协程队列,实现上也很巧妙,使用sync.Pool在提交goroutine时控制顺序,值得我们学习;

来源:https://juejin.cn/post/7194459956525924408

0
投稿

猜你喜欢

  • 我们知道,Diango 接收的 HTTP 请求信息里带有 Cookie 信息。Cookie的作用是为了识别当前用户的身份,通过以下例子来说明
  • 很多朋友使用Dreamweaver一段时间后,开始热衷于寻找各式各样的插件,追求各种各样的特效,而对于Dreamweaver中的基本功能反而
  • GreatSQL社区原创内容未经授权不得随意使用,转载请联系小编并注明来源。GreatSQL是MySQL的国产分支版本,使用上与MySQL一
  • 前言:在转换操作中,我们执行各种操作,例如更改系列的数据类型,将系列更改为列表等。为了执行转换操作,我们有各种有助于转换的功能,例如.ast
  • 一、hook在PyTorch中,提供了一个专用的接口使得网络在前向传播过程中能够获取到特征图,这个接口的名称非常形象,叫做hook。可以想象
  • 注意:如果您尚未阅读过原来那篇老文章《悟透JavaScript》,请先行阅读该文,以了解上下文关系。在上面的示例中,我们定义了两个语法甘露,
  • 本文实例讲述了PHP实现判断二叉树是否对称的方法。分享给大家供大家参考,具体如下:问题请实现一个函数,用来判断一颗二叉树是不是对称的。注意,
  • 目录原生 JS怎么发送一个 get 请求怎么发送一个 post 请求发送一个带有参数的 get 请求发送一个带有参数的 post 请求jQu
  • python中查找指定的字符串的方法如下:code#查询def selStr():  sStr1 = 'jsjtt.com
  • 环境与开发工具在抓包的时候,开始使用的是Chrome开发工具中的Network,结果没有抓到,后来使用Fiddler成功抓取数据。下面逐步来
  • 改编自详解利用OpenCV提取图像中的矩形区域(PPT屏幕等) 原文是c++版,我改成了python版,供大家参考学习。主要思想:边缘检测—
  • 一,PHP脚本与动态页面。  PHP脚本是一种服务器端脚本程序,可通过嵌入等方  法与HTML文件混合,也可以类,函数封
  • 保存的文件有4个: checkpointmodel-parameters.bin-46000.data-00000-of-00001mode
  •    解决MySQL中文乱码的问题看到从数据库中取出的数据全部是“?????”,太让人郁闷了。网上有很多方法来解决这个问
  • 今天看到了mlxtend的包,看了下example集成得非常简洁。还有一个吸引我的地方是自带了一些data直接可以用,省去了自己造数据或者找
  • 前言SQL Server 是数据库软件中比较常见且实用的软件,它的集成度很高,且功能非常强大。很多类型的网站系统后台数据库都依赖于SQL S
  • 开局一张图:可以看出,在项目部署后,我们的资源文件请求都会保持原本大小,如果文件过大,并且很多的情况下,会导致网络请求耗时,严重点可能阻塞后
  • python是一种跨平台的计算机程序设计语言。python是一个高层次的结合了解释性、编译性、互动性和面向对象的脚本语言,可以应用于科学计算
  • python输入错误怎么删除?python常用的输入函数raw_input()在输入的过程中如果输错了,不能像在命令行下那样backspac
  • 简介Python中布尔值(Booleans)表示以下两个值之一:True或False。布尔值在编程中,通常需要知道表达式是 True 还是
手机版 网络编程 asp之家 www.aspxhome.com