Go singleflight使用以及原理
作者:胡桃姓胡,蝴蝶也姓胡 发布时间:2024-04-27 15:31:09
这个东西很重要,可以经常用在项目当中,所以我们单独拿出来进行讲解。
在使用它之前我们需要导包:
go get golang.org/x/sync/singleflight
golang/sync/singleflight.Group
是 Go 语言扩展包中提供了另一种同步原语,它能够在一个服务中抑制对下游的多次重复请求。一个比较常见的使用场景是:我们在使用 Redis 对数据库中的数据进行缓存,发生缓存击穿时,大量的流量都会打到数据库上进而影响服务的尾延时。
但是 golang/sync/singleflight.Group
能有效地解决这个问题,它能够限制对同一个键值对的多次重复请求,减少对下游的瞬时流量。
使用方法
singleflight
类的使用方法就新建一个singleflight.Group
,使用其方法Do
或者DoChan
来包装方法,被包装的方法在对于同一个key,只会有一个协程执行,其他协程等待那个协程执行结束后,拿到同样的结果。
Group
结构体
代表一类工作,同一个group
中,同样的key
同时只能被执行一次
Do
方法
func (g *Group) Do(key string, fn func() (interface{<!--{cke_protected}{C}%3C!%2D%2D%20%2D%2D%3E-->}, error)) (v interface{<!--{cke_protected}{C}%3C!%2D%2D%20%2D%2D%3E-->}, err error, shared bool)
key
:同一个key
,同时只有一个协程执行
fn
:被包装的函数
v
:返回值,即执行结果。其他等待的协程都会拿到
shared
:表示是否由其他协程得到了这个结果v
DoChan
方法
func (g *Group) DoChan(key string, fn func() (interface{<!--{cke_protected}{C}%3C!%2D%2D%20%2D%2D%3E-->}, error)) <-chan Result
和Do
差不多其实,因此我们就只讲解Do
的实际应用场景了。
具体应用场景
var singleSetCache singleflight.Group
func GetAndSetCache(r *http.Request, cacheKey string) (string, error) {
log.Printf("request %s start to get and set cache...", r.URL)
value, err, _ := singleSetCache.Do(cacheKey, func() (interface{}, error) {
log.Printf("request %s is getting cache...", r.URL)
time.Sleep(3 * time.Second)
log.Printf("request %s get cache success!", r.URL)
return cacheKey, nil
})
return value.(string), err
}
func main() {
r := gin.Default()
r.GET("/sekill/:id", func(context *gin.Context) {
ID := context.Param("id")
cache, err := GetAndSetCache(context.Request, ID)
if err != nil {
log.Println(err)
}
log.Printf("request %s get value: %v", context.Request.URL, cache)
})
r.Run()
}
来看一下执行结果:
2022/12/29 16:21:18 request /sekill/5 start to get and set cache...
2022/12/29 16:21:18 request /sekill/5 is getting cache...
2022/12/29 16:21:18 request /sekill/9 start to get and set cache...
2022/12/29 16:21:18 request /sekill/9 is getting cache...
2022/12/29 16:21:18 request /sekill/9 start to get and set cache...
2022/12/29 16:21:18 request /sekill/5 start to get and set cache...
2022/12/29 16:21:18 request /sekill/5 start to get and set cache...
2022/12/29 16:21:18 request /sekill/9 start to get and set cache...
2022/12/29 16:21:18 request /sekill/9 start to get and set cache...
2022/12/29 16:21:18 request /sekill/5 start to get and set cache...
2022/12/29 16:21:19 request /sekill/9 start to get and set cache...
2022/12/29 16:21:19 request /sekill/5 start to get and set cache...
2022/12/29 16:21:21 request /sekill/9 get cache success!
2022/12/29 16:21:21 request /sekill/5 get cache success!
2022/12/29 16:21:21 request /sekill/5 get value: 5
2022/12/29 16:21:21 request /sekill/5 get value: 5
[GIN] 2022/12/29 - 16:21:21 | 200 | 3.0106529s | 127.0.0.1 | GET "/sekill/5"
2022/12/29 16:21:21 request /sekill/9 get value: 9
[GIN] 2022/12/29 - 16:21:21 | 200 | 2.8090881s | 127.0.0.1 | GET "/sekill/5"
2022/12/29 16:21:21 request /sekill/9 get value: 9
[GIN] 2022/12/29 - 16:21:21 | 200 | 2.2166003s | 127.0.0.1 | GET "/sekill/9"
2022/12/29 16:21:21 request /sekill/9 get value: 9
[GIN] 2022/12/29 - 16:21:21 | 200 | 2.6064069s | 127.0.0.1 | GET "/sekill/9"
2022/12/29 16:21:21 request /sekill/9 get value: 9
[GIN] 2022/12/29 - 16:21:21 | 200 | 2.4178652s | 127.0.0.1 | GET "/sekill/9"
2022/12/29 16:21:21 request /sekill/9 get value: 9
[GIN] 2022/12/29 - 16:21:21 | 200 | 2.8101267s | 127.0.0.1 | GET "/sekill/9"
2022/12/29 16:21:21 request /sekill/5 get value: 5
[GIN] 2022/12/29 - 16:21:21 | 200 | 3.0116892s | 127.0.0.1 | GET "/sekill/9"
2022/12/29 16:21:21 request /sekill/5 get value: 5
[GIN] 2022/12/29 - 16:21:21 | 200 | 2.6074537s | 127.0.0.1 | GET "/sekill/5"
2022/12/29 16:21:21 request /sekill/5 get value: 5
[GIN] 2022/12/29 - 16:21:21 | 200 | 2.4076473s | 127.0.0.1 | GET "/sekill/5"
[GIN] 2022/12/29 - 16:21:21 | 200 | 2.218686s | 127.0.0.1 | GET "/sekill/5"
可以看到确实只有一个协程执行了被包装的函数,并且其他协程都拿到了结果。
接下来我们来看一下它的原理吧!
原理
首先来看一下Group
结构体:
type Group struct {
mu sync.Mutex // 锁保证并发安全
m map[string]*call //保存key对应的函数执行过程和结果的变量。
}
然后我们来看一下call
结构体:
type call struct {
wg sync.WaitGroup //用WaitGroup实现只有一个协程执行函数
val interface{} //函数执行结果
err error
forgotten bool
dups int //含义是duplications,即同时执行同一个key的协程数量
chans []chan<- Result
}
然后我们来看一下Do
方法:
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
// 写Group的m字段时,加锁保证写安全
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
// 如果key已经存在,说明已经由协程在执行,则dups++并等待其执行结果,执行结果保存在对应的call的val字段里
c.dups++
g.mu.Unlock()
c.wg.Wait()
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true
}
// 如果key不存在,则新建一个call,并使用WaitGroup来阻塞其他协程,同时在m字段里写入key和对应的call
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
g.doCall(c, key, fn) // 进来的第一个协程就来执行这个函数
return c.val, c.err, c.dups > 0
}
然后我们来分析一下doCall
函数:
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
c.val, c.err = fn()
c.wg.Done()
g.mu.Lock()
delete(g.m, key)
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
g.mu.Unlock()
}
运行传入的函数
fn
,该函数的返回值会赋值给c.val
和c.err
;调用
sync.WaitGroup.Done
方法通知所有等待结果的Goroutine
— 当前函数已经执行完成,可以从call
结构体中取出返回值并返回了;获取持有的互斥锁并通过管道将信息同步给使用
golang/sync/singleflight.Group.DoChan
方法的Goroutine
;
问题分析
分析了源码之后,我们得出了一个结论,这个东西是用阻塞来实现的,这就引发了一个问题:如果我们处理的那个请求刚好遇到问题了,那么后面的所有请求都会被阻塞,也就是,我们应该加上适合的超时控制,如果在一定时间内,没有获得结果,那么就当作超时处理。
于是这个适合我们应该使用DoChan()
。两者实现上完全一样,不同的是, DoChan()
通过 channel
返回结果。因此可以使用 select
语句实现超时控制。
var singleSetCache singleflight.Group
func GetAndSetCache(r *http.Request, cacheKey string) (string, error) {
log.Printf("request %s start to get and set cache...", r.URL)
retChan := singleSetCache.DoChan(cacheKey, func() (interface{}, error) {
log.Printf("request %s is getting cache...", r.URL)
time.Sleep(3 * time.Second)
log.Printf("request %s get cache success!", r.URL)
return cacheKey, nil
})
var ret singleflight.Result
timeout := time.After(2 * time.Second)
select {
case <-timeout:
log.Println("time out!")
return "", errors.New("time out")
case ret = <-retChan: // 从chan中获取结果
return ret.Val.(string), ret.Err
}
}
func main() {
r := gin.Default()
r.GET("/sekill/:id", func(context *gin.Context) {
ID := context.Param("id")
cache, err := GetAndSetCache(context.Request, ID)
if err != nil {
log.Println(err)
}
log.Printf("request %s get value: %v", context.Request.URL, cache)
})
r.Run()
}
补充
这里其实还有一个Forget
方法,它可以在映射表中删除某个键,接下来对键的调用就不会等待前面的函数返回了。
来源:https://blog.csdn.net/qq_61039408/article/details/128484679


猜你喜欢
- Django中集成jquery首先,静态的资源通常放入static文件夹中:static/ css/
- 问题有一个元素序列,想知道在序列中出现次数最多的元素是什么解决方案collections 模块中的 Counter 类转让给女士为此问题所设
- 处理数据时我们经常需要从数组中随机抽取元素,这时候我们可以考虑使用np.random.choice()函数语法格式numpy.random.
- 我就废话不多说了,直接上代码吧!#方法一def list_cut(mylist,count): length=len(mylis
- 前言很多朋友在使用Jetbrains系列软件的时候,可能都会有一个问题,那就是鼠标右击出现的Open Folder as PyCharm P
- one: declare @s varchar(20) declare @i varchar(20) set @i='' s
- 普通MySQL运行,数据量和访问量不大的话,是足够快的,但是当数据量和访问量剧增的时候,那么就会明显发现MySQL很慢,甚至do
- 【作者翻译】结构和层次降低了复杂性并提高了可读性。你的文章或站点组织的越深入,用户就越容易理解你观点和得到你想传达的信息。在网页上,这点被通
- 前言不要在用手敲生成Excel数据报表了,用Python自动生成Excel数据报表!废话不多说让我们愉快地开始吧~开发工具Python版本:
- 代码如下所示:$num = 10.4567; //第一种:利用round()对浮点数进行四舍五入 &n
- tensorflow中tf.concat的axis的使用我一直理解的比较模糊,这次做个笔记理下自己的思路。import tensorflow
- 这里介绍一个nii文件保存为png格式的方法。这篇文章是介绍多个nii文件保存为png格式的方法:https://www.jb51.net/
- 学过 Python 的朋友应该都知道 f-strings 是用来非常方便的格式化输出的,觉得它的使用方法无外乎就是 print(f'
- TypeScript 这些年越来越火,可以说是前端工程师的必备技能了,各大框架都基于它实现。那么,TypeScript 的出现和爆火是偶然发
- 而随着设备硬件配置的不断提升,对中小型应用程序来说,对算法的空间复杂度的要求也宽松了不少。不过,在当今 Web2.0 时代,对应用程序的时间
- 今天在一个QQ群中看到有人在问一个进度条的实现方式,当时因为工作时间,需求相对也比较紧,只是简单的说了一下可以通过CSS的边框属性和背景属性
- 我就废话不多说了,大家还是直接看代码吧~def list_dict(list_data): dict_data = {} &nb
- 本文实例讲述了python使用PyGame模块播放声音的方法。分享给大家供大家参考。具体实现方法如下:import pygamepygame
- 这是个删除非空目录的例子test.asp要执行删除你需要对该目录具有修改权限<% dim fso,tmpfold
- Django2.1 + Python3.6 + nginx + uwsgi 部署到Ubuntu18.04材料准备准备一个Django项目准备