Golang sync包中errgroup的使用详解
作者:人艰不拆_zmc 发布时间:2024-02-08 23:19:08
1、初识 errgroup
WaitGroup 主要用于控制任务组下的并发子任务。它的具体做法就是,子任务 goroutine 执行前通过 Add 方法添加任务数目,子任务 goroutine 结束时调用 Done 标记已完成任务数,主任务 goroutine 通过 Wait 方法等待所有的任务完成后才能执行后续逻辑。
package main
import (
"net/http"
"sync"
)
func main() {
var wg sync.WaitGroup
var urls = []string{
"http://www.golang.org/",
"http://www.baidu.com/",
"http://www.bokeyuan12111.com/",
}
for _, url := range urls {
wg.Add(1)
go func(url string) {
defer wg.Done()
resp, err := http.Get(url)
if err != nil {
return
}
resp.Body.Close()
}(url)
}
wg.Wait()
}
在以上示例代码中,我们通过三个 goroutine 去并发的请求 url,直到所有的子任务 goroutine 均完成访问,主任务 goroutine 下的 wg.Wait 才会停止阻塞。
但在实际的项目代码中,子任务 goroutine 的执行并不总是顺风顺水,它们也许会产生 error。而 WaitGroup 并没有告诉我们在子 goroutine 发生错误时,如何将其抛给主任务 groutine。
这个时候可以考虑使用 errgroup
package main
import (
"fmt"
"net/http"
"golang.org/x/sync/errgroup"
)
func main() {
var urls = []string{
"http://www.golang.org/",
"http://www.baidu.com/",
"http://www.bokeyuan12111.com/",
}
g := new(errgroup.Group)
for _, url := range urls {
url := url
g.Go(func() error {
resp, err := http.Get(url)
if err != nil {
fmt.Println(err)
return err
}
fmt.Printf("get [%s] success: [%d] \n", url, resp.StatusCode)
return resp.Body.Close()
})
}
if err := g.Wait(); err != nil {
fmt.Println(err)
} else {
fmt.Println("All success!")
}
}
结果如下:
get [http://www.baidu.com/] success: [200]
Get "http://www.bokeyuan12111.com/": dial tcp: lookup www.bokeyuan12111.com: no such host
Get "http://www.golang.org/": dial tcp 142.251.42.241:80: i/o timeout
Get "http://www.bokeyuan12111.com/": dial tcp: lookup www.bokeyuan12111.com: no such host
可以看到,执行获取www.bokeyuan12111.com和www.golang.org两个 url 的子 groutine 均发生了错误,在主任务 goroutine 中成功捕获到了第一个错误信息。
除了 拥有 WaitGroup 的控制能力 和 错误传播 的功能之外,errgroup 还有最重要的 context 反向传播机制,我们来看一下它的设计。
2、errgroup 源码解析
errgroup 的设计非常精练,全部代码如下
type Group struct {
cancel func()
wg sync.WaitGroup
errOnce sync.Once
err error
}
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := context.WithCancel(ctx)
return &Group{cancel: cancel}, ctx
}
func (g *Group) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel()
}
return g.err
}
func (g *Group) Go(f func() error) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel()
}
})
}
}()
}
可以看到,errgroup 的实现依靠于结构体 Group,它通过封装 sync.WaitGroup,继承了 WaitGroup 的特性,在 Go() 方法中新起一个子任务 goroutine,并在 Wait() 方法中通过 sync.WaitGroup 的 Wait 进行阻塞等待。
同时 Group 利用 sync.Once 保证了它有且仅会保留第一个子 goroutine 错误。
最后,Group 通过嵌入 context.WithCancel 方法产生的 cancel 函数,能够在子 goroutine 发生错误时,及时通过调用 cancle 函数,将 Context 的取消信号及时传播出去。当然,这一特性需要用户代码的配合。
3、errgroup 上下文取消
在 errgroup 的文档(https://pkg.go.dev/golang.org/x/sync@v0.0.0-20210220032951-036812b2e83c/errgroup#example-Group-Pipeline)中,它基于 Go 官方文档的 pipeline( https://blog.golang.org/pipelines) ,实现了一个任务组 goroutine 中上下文取消(Context cancelation)演示的示例。但该 Demo 的前提知识略多,本文这里基于其思想,提供一个易于理解的使用示例。
package main
import (
"context"
"fmt"
"golang.org/x/sync/errgroup"
)
func main() {
g, ctx := errgroup.WithContext(context.Background())
dataChan := make(chan int, 20)
// 数据生产端任务子 goroutine
g.Go(func() error {
defer close(dataChan)
for i := 1; ; i++ {
if i == 10 {
return fmt.Errorf("data 10 is wrong")
}
dataChan <- i
fmt.Println(fmt.Sprintf("sending %d", i))
}
})
// 数据消费端任务子 goroutine
for i := 0; i < 3; i++ {
g.Go(func() error {
for j := 1; ; j++ {
select {
case <-ctx.Done():
return ctx.Err()
case number := <-dataChan:
fmt.Println(fmt.Sprintf("receiving %d", number))
}
}
})
}
// 主任务 goroutine 等待 pipeline 结束数据流
err := g.Wait()
if err != nil {
fmt.Println(err)
}
fmt.Println("main goroutine done!")
}
在以上示例中,我们模拟了一个数据传送管道。在数据的生产与消费任务集中,有四个子任务 goroutine:一个生产数据的 goroutine,三个消费数据的 goroutine。当数据生产方存在错误数据时(数据等于 10 ),我们停止数据的生产与消费,并将错误抛出,回到 main goroutine 的执行逻辑中。
可以看到,因为 errgroup 中的 Context cancle 函数的嵌入,我们在子任务 goroutine 中也能反向控制任务上下文。
程序的某一次运行,输出结果如下:
sending 1
sending 2
sending 3
sending 4
sending 5
sending 6
sending 7
sending 8
sending 9
receiving 1
receiving 3
receiving 2
receiving 4
data 10 is wrong
main goroutine done!
4、总结
errgroup 是 Go 官方的并发原语补充库,相对于标准库中提供的原语而言,显得没那么核心。这里总结一下 errgroup 的特性。
继承了 WaitGroup 的功能
错误传播:能够返回任务组中发生的第一个错误,但有且仅能返回该错误
context 信号传播:如果子任务 goroutine 中有循环逻辑,则可以添加 ctx.Done 逻辑,此时通过 context 的取消信号,提前结束子任务执行。
来源:https://www.cnblogs.com/zhangmingcheng/p/16494665.html


猜你喜欢
- 本文实例讲述了python转换字符串为摩尔斯电码的方法。分享给大家供大家参考。具体实现方法如下:chars = ",.012345
- 在ASP与ASP.NET之间共享对话状态(1)ASP实现原来的ASP对话只能将对话数据保存在内存中。为了将对话数据保存到SQL Server
- 前言:现在写爬虫,入门已经不是一件门槛很高的事情了,网上教程一大把,但很多爬虫新手在爬取数据的过程中依旧会遇到这样那样的问题。今天整理了部分
- 1、获取文件的创建、修改、访问时间# -*- encoding=utf-8 -*-import osimport timedef get_f
- 以这个为例: yyyy-MM-dd HH:mm:ss首先得写好你需要的模板options.sign =
- 问题公司项目使用Laravel的开发的两个项目在同一个测试服务器部署,公用同一个redis。在使用laravel中的队列时,产生冲突干扰。查
- 导语每日游戏更新系列——今天带大家来看看扫雷小游戏!它是许多人接触到的第一款游戏,大概也是广大办公族和无网学生无聊时消遣的最佳游戏。在那些还
- 一,索引的重要性索引用于快速找出在某个列中有一特定值的行。不使用索引,MySQL必须从第1条记录开始然后读完整个表直到找出相关的行。表越大,
- 1、下载安装 python https://www.python.org/downloads/windows/web-based insta
- 前言前面几个章节我们学习了对于普通文件的操作,比如说文件的创建、复制粘贴、裁剪粘贴、文件名的重命名、删除等等。另外还学习了一些基本练习,如何
- 我也一一试过,结果是:中文乱码问题没解决,mysql服务却不能启动了, 汗颜了,还是自己动手解决吧,我这里也截图了,方便参观。我用的是app
- 前言最近需要做一个浏览器的, 支持大体积文件上传且要支持断点续传的上传组件, 本来以为很容易的事情, 结果碰到了一个有意思的问题:循环执行连
- 我之前想写路由器的密码暴力破解器,我手上只有极路由,发现极路由有安全限制,只能允许连续10密码错误,所以我改拿博客园练手。博客园的博客有个功
- 一、变量1.变量Python 中的变量不需要声明。每个变量在使用前都必须赋值,变量赋值以后该变量才会被创建。在 Python 中,变量就是变
- 在当今企业环境中,保证数据安全不是可有可无的工作。频繁曝光的入侵和欺骗事件、萨班斯•奥克斯利法案、HIPAA法案规定和爱国
- 一、背景协助产品部门提取10000份产品log信息中的SN号、IMEI号、ICCID号到Excel表格中。1.l原始的og内容:2.提取后的
- viper作为配置框架,其功能非常的强大,我们没有理由不去了解一下。我们先看官网对它的功能简介:viper是完整配置解决方案,他可以处理所有
- 迪杰斯特拉(Dijkstra)算法主要是针对没有负值的有向图,求解其中的单一起点到其他顶点的最短路径算法。1 算法原理迪杰斯特拉(Dijks
- 在 Go 中,有几种不同的方法来检查一个值是null还是empty。您应该使用的具体方法取决于您正在检查的值的类型。例如,如果你正在检查一个
- 学习了vue.js一段时间,拿它来做2个小组件,练习一下。我这边是用webpack进行打包,也算熟悉一下它的运用。源码放在文末的 githu