Go singleflight使用以及原理
作者:胡桃姓胡,蝴蝶也姓胡 发布时间:2024-04-27 15:31:09
这个东西很重要,可以经常用在项目当中,所以我们单独拿出来进行讲解。
在使用它之前我们需要导包:
go get golang.org/x/sync/singleflight
golang/sync/singleflight.Group
是 Go 语言扩展包中提供了另一种同步原语,它能够在一个服务中抑制对下游的多次重复请求。一个比较常见的使用场景是:我们在使用 Redis 对数据库中的数据进行缓存,发生缓存击穿时,大量的流量都会打到数据库上进而影响服务的尾延时。
但是 golang/sync/singleflight.Group
能有效地解决这个问题,它能够限制对同一个键值对的多次重复请求,减少对下游的瞬时流量。
使用方法
singleflight
类的使用方法就新建一个singleflight.Group
,使用其方法Do
或者DoChan
来包装方法,被包装的方法在对于同一个key,只会有一个协程执行,其他协程等待那个协程执行结束后,拿到同样的结果。
Group
结构体
代表一类工作,同一个group
中,同样的key
同时只能被执行一次
Do
方法
func (g *Group) Do(key string, fn func() (interface{<!--{cke_protected}{C}%3C!%2D%2D%20%2D%2D%3E-->}, error)) (v interface{<!--{cke_protected}{C}%3C!%2D%2D%20%2D%2D%3E-->}, err error, shared bool)
key
:同一个key
,同时只有一个协程执行
fn
:被包装的函数
v
:返回值,即执行结果。其他等待的协程都会拿到
shared
:表示是否由其他协程得到了这个结果v
DoChan
方法
func (g *Group) DoChan(key string, fn func() (interface{<!--{cke_protected}{C}%3C!%2D%2D%20%2D%2D%3E-->}, error)) <-chan Result
和Do
差不多其实,因此我们就只讲解Do
的实际应用场景了。
具体应用场景
var singleSetCache singleflight.Group
func GetAndSetCache(r *http.Request, cacheKey string) (string, error) {
log.Printf("request %s start to get and set cache...", r.URL)
value, err, _ := singleSetCache.Do(cacheKey, func() (interface{}, error) {
log.Printf("request %s is getting cache...", r.URL)
time.Sleep(3 * time.Second)
log.Printf("request %s get cache success!", r.URL)
return cacheKey, nil
})
return value.(string), err
}
func main() {
r := gin.Default()
r.GET("/sekill/:id", func(context *gin.Context) {
ID := context.Param("id")
cache, err := GetAndSetCache(context.Request, ID)
if err != nil {
log.Println(err)
}
log.Printf("request %s get value: %v", context.Request.URL, cache)
})
r.Run()
}
来看一下执行结果:
2022/12/29 16:21:18 request /sekill/5 start to get and set cache...
2022/12/29 16:21:18 request /sekill/5 is getting cache...
2022/12/29 16:21:18 request /sekill/9 start to get and set cache...
2022/12/29 16:21:18 request /sekill/9 is getting cache...
2022/12/29 16:21:18 request /sekill/9 start to get and set cache...
2022/12/29 16:21:18 request /sekill/5 start to get and set cache...
2022/12/29 16:21:18 request /sekill/5 start to get and set cache...
2022/12/29 16:21:18 request /sekill/9 start to get and set cache...
2022/12/29 16:21:18 request /sekill/9 start to get and set cache...
2022/12/29 16:21:18 request /sekill/5 start to get and set cache...
2022/12/29 16:21:19 request /sekill/9 start to get and set cache...
2022/12/29 16:21:19 request /sekill/5 start to get and set cache...
2022/12/29 16:21:21 request /sekill/9 get cache success!
2022/12/29 16:21:21 request /sekill/5 get cache success!
2022/12/29 16:21:21 request /sekill/5 get value: 5
2022/12/29 16:21:21 request /sekill/5 get value: 5
[GIN] 2022/12/29 - 16:21:21 | 200 | 3.0106529s | 127.0.0.1 | GET "/sekill/5"
2022/12/29 16:21:21 request /sekill/9 get value: 9
[GIN] 2022/12/29 - 16:21:21 | 200 | 2.8090881s | 127.0.0.1 | GET "/sekill/5"
2022/12/29 16:21:21 request /sekill/9 get value: 9
[GIN] 2022/12/29 - 16:21:21 | 200 | 2.2166003s | 127.0.0.1 | GET "/sekill/9"
2022/12/29 16:21:21 request /sekill/9 get value: 9
[GIN] 2022/12/29 - 16:21:21 | 200 | 2.6064069s | 127.0.0.1 | GET "/sekill/9"
2022/12/29 16:21:21 request /sekill/9 get value: 9
[GIN] 2022/12/29 - 16:21:21 | 200 | 2.4178652s | 127.0.0.1 | GET "/sekill/9"
2022/12/29 16:21:21 request /sekill/9 get value: 9
[GIN] 2022/12/29 - 16:21:21 | 200 | 2.8101267s | 127.0.0.1 | GET "/sekill/9"
2022/12/29 16:21:21 request /sekill/5 get value: 5
[GIN] 2022/12/29 - 16:21:21 | 200 | 3.0116892s | 127.0.0.1 | GET "/sekill/9"
2022/12/29 16:21:21 request /sekill/5 get value: 5
[GIN] 2022/12/29 - 16:21:21 | 200 | 2.6074537s | 127.0.0.1 | GET "/sekill/5"
2022/12/29 16:21:21 request /sekill/5 get value: 5
[GIN] 2022/12/29 - 16:21:21 | 200 | 2.4076473s | 127.0.0.1 | GET "/sekill/5"
[GIN] 2022/12/29 - 16:21:21 | 200 | 2.218686s | 127.0.0.1 | GET "/sekill/5"
可以看到确实只有一个协程执行了被包装的函数,并且其他协程都拿到了结果。
接下来我们来看一下它的原理吧!
原理
首先来看一下Group
结构体:
type Group struct {
mu sync.Mutex // 锁保证并发安全
m map[string]*call //保存key对应的函数执行过程和结果的变量。
}
然后我们来看一下call
结构体:
type call struct {
wg sync.WaitGroup //用WaitGroup实现只有一个协程执行函数
val interface{} //函数执行结果
err error
forgotten bool
dups int //含义是duplications,即同时执行同一个key的协程数量
chans []chan<- Result
}
然后我们来看一下Do
方法:
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
// 写Group的m字段时,加锁保证写安全
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
// 如果key已经存在,说明已经由协程在执行,则dups++并等待其执行结果,执行结果保存在对应的call的val字段里
c.dups++
g.mu.Unlock()
c.wg.Wait()
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true
}
// 如果key不存在,则新建一个call,并使用WaitGroup来阻塞其他协程,同时在m字段里写入key和对应的call
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
g.doCall(c, key, fn) // 进来的第一个协程就来执行这个函数
return c.val, c.err, c.dups > 0
}
然后我们来分析一下doCall
函数:
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
c.val, c.err = fn()
c.wg.Done()
g.mu.Lock()
delete(g.m, key)
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
g.mu.Unlock()
}
运行传入的函数
fn
,该函数的返回值会赋值给c.val
和c.err
;调用
sync.WaitGroup.Done
方法通知所有等待结果的Goroutine
— 当前函数已经执行完成,可以从call
结构体中取出返回值并返回了;获取持有的互斥锁并通过管道将信息同步给使用
golang/sync/singleflight.Group.DoChan
方法的Goroutine
;
问题分析
分析了源码之后,我们得出了一个结论,这个东西是用阻塞来实现的,这就引发了一个问题:如果我们处理的那个请求刚好遇到问题了,那么后面的所有请求都会被阻塞,也就是,我们应该加上适合的超时控制,如果在一定时间内,没有获得结果,那么就当作超时处理。
于是这个适合我们应该使用DoChan()
。两者实现上完全一样,不同的是, DoChan()
通过 channel
返回结果。因此可以使用 select
语句实现超时控制。
var singleSetCache singleflight.Group
func GetAndSetCache(r *http.Request, cacheKey string) (string, error) {
log.Printf("request %s start to get and set cache...", r.URL)
retChan := singleSetCache.DoChan(cacheKey, func() (interface{}, error) {
log.Printf("request %s is getting cache...", r.URL)
time.Sleep(3 * time.Second)
log.Printf("request %s get cache success!", r.URL)
return cacheKey, nil
})
var ret singleflight.Result
timeout := time.After(2 * time.Second)
select {
case <-timeout:
log.Println("time out!")
return "", errors.New("time out")
case ret = <-retChan: // 从chan中获取结果
return ret.Val.(string), ret.Err
}
}
func main() {
r := gin.Default()
r.GET("/sekill/:id", func(context *gin.Context) {
ID := context.Param("id")
cache, err := GetAndSetCache(context.Request, ID)
if err != nil {
log.Println(err)
}
log.Printf("request %s get value: %v", context.Request.URL, cache)
})
r.Run()
}
补充
这里其实还有一个Forget
方法,它可以在映射表中删除某个键,接下来对键的调用就不会等待前面的函数返回了。
来源:https://blog.csdn.net/qq_61039408/article/details/128484679
猜你喜欢
- 如题,今天兜兜转转找了很多网站帖子,一个个环节击破,最后装好费了不少时间。希望这个帖子能帮助有需要的人,教你一篇帖子搞定python+num
- 对于部署在新浪应用引擎SAE上的项目,使用新浪SAE云存储是不错的存储方案。新浪SAE云存储仅能在SAE环境中正常使用,对它进行简单封装后,
- 视频本教程的视频选择图形我们谈到了 Opengameart.org,这是免费游戏艺术的重要来源,也是我们最喜欢的艺术家之一&ldqu
- 本文实例为大家分享了python实现五子棋游戏的具体代码,供大家参考,具体内容如下一、运行截图:二、代码# 用数组定义一个棋盘,棋盘大小为
- 本文实例讲述了python数据结构之图深度优先和广度优先用法。分享给大家供大家参考。具体如下:首先有一个概念:回溯回溯法(探索与回溯法)是一
- 使用matplotlib创建百分比堆积柱状图的思路与堆积柱状图类似,只不过bottom参数累计的不是数值而是百分比,因此,需要事先计算每组柱
- 使用PyQt5开发图形界面,里面使用日期框,这里把这个QDateEdit组件命名为:beginDatefrom PyQt5.QtCore i
- 要下午传上的.结果事一多,忘记了.好不容易回来 . 这个和 dh20156 的那个,是差不多的。 找不到合适的图片,也
- Python中对象方法的定义很怪异,第一个参数一般都命名为self(相当于其它语言的this),用于传递对象本身,而在调用的时候则不必显式传
- 一、创建excel代码备注:封装好了(可直接调用)"""-*- coding:utf-8 -*-@Time :
- 深入理解python try异常处理机制#python的try语句有两种风格#一:种是处理异常(try/except/else)#二:种是无
- 本节在这里主要说的是URLError还有HTTPError,以及对它们的一些处理。1.URLError首先解释下URLError可能产生的原
- 问题介绍在安装torch之后,命令行(Anaconda Powershell Prompt)运行这三行代码:python # pythoni
- 一般写ASP PHP代码的朋友都估计是采用直接操作SQL的吧~ 看以下的代码 <% dim conn,rs&nbs
- 01直接生成这类方法是利用基本程序软件包numpy的随机数产生方法来生成各类用于聚类算法数据集合,也是自行制作轮子的生成方法。一、基础类型1
- 前几天同学要我帮他做个国际聊天室,要求能够将聊天的内容自动翻译成多国语言.本来想用worldlink的翻译服务,但是用ajax很难获得结果,
- 本文实例总结了php处理json格式数据的方法。分享给大家供大家参考,具体如下:1.json简介:何为json?简 单地说,JSON 可以将
- 模板是一个文本,用于分离文档的表现形式和内容。 模板定义了占位符以及各种用于规范文档该如何显示的各部分基本逻辑(模板标签)。 模板通常用于产
- 背景本文主要给大家介绍了关于在Python一段程序中使用多次事件循环的相关内容,我们在Python异步程序编写中经常要用到如下的结构impo
- 一、前言在Python提供了re模块,用于实现正则表达式的操作。在实现时,可以使用re模块提供的方法(如,search()、match()、