解决Golang并发工具Singleflight的问题
作者:北戴河游泳 发布时间:2024-04-25 15:07:17
前言
前段时间在一个项目里使用到了分布式锁进行共享资源的访问限制,后来了解到Golang里还能够使用singleflight对共享资源的访问做限制,于是利用空余时间了解,将知识沉淀下来,并做分享
文章尽量用通俗的语言表达自己的理解,从入门demo开始,结合源码分析singleflight的重点方法,最后分享singleflight的实际使用方式与需要注意的“坑“。
定义
按照官方文档的定义,singleflight 提供了一个重复的函数调用抑制机制
Package singleflight provides a duplicate function call suppression
用途
通俗的来说就是 singleflight将相同的并发请求合并成一个请求,进而减少对下层服务的压力,通常用于解决缓存击穿的问题
缓存击穿是指: 在高并发的场景中,大量的request同时请求查询一个共享资源(例如Redis缓存的key) ,如果这个共享资源正好过期失效了,就会导致大量相同的request都打到Redis下游的数据库,导致数据库的负载上升。
简单Demo
var (
sfKey1 = "key1"
wg *sync.WaitGroup
sf singleflight.Group
nums = 10
)
func getValueService(key string) { //service
var val string
wg = &sync.WaitGroup{}
wg.Add(nums)
for idx := 0; idx < nums; idx++ { // 模拟多协程同时请求
go func(idx int) { // 注意for的一个小坑
defer wg.Done()
value, _ := getAndSetCacheNoChan(idx, key) //简化代码,不处理error
log.Printf("request %v get value: %v", idx, value)
val = value
}(idx)
}
wg.Wait()
log.Println("val: ", val)
return
}
// getValueBySingleflight 使用singleflight取cacheKey对应的value值
func getValueBySingleflight(idx int, cacheKey string) (string, error) {
log.Printf("idx %v into-cache...", idx)
// 调用singleflight的Do()方法
value, _, _ := sf.Do(cacheKey, func() (ret interface{}, err error) {
log.Printf("idx %v is-setting-cache", idx)
// 休眠0.1s以捕获并发的相同请求
time.Sleep(100 * time.Millisecond)
log.Printf("idx %v set-cache-success!", idx)
return "myValue", nil
})
return value.(string), nil
}
看看实际效果
由结果图可以看到,索引=8的协程第一个进入了Do()方法,其他协程则阻塞住,等到idx=8的协程拿到执行结果后,协程以乱序的形式返回执行结果。
相同key的情况下,singleflight将我们的多个请求合并成1个请求。由1个请求去执行对共享资源的操作。
源码分析
结构
type (
Group struct { // singleflight实体
mu sync.Mutex // 互斥锁
m map[string]*call // 懒加载
}
call struct {
wg sync.WaitGroup
// 存储 调用singleflight.Do()方法返回的结果
val interface{}
err error
// 调用singleflight.Forget(key)时将对应的key从Group.m中删除
forgotten bool
// 通俗的理解成singleflight合并的并发请求数
dups int
// 存储 调用singleflight.DoChan()方法返回的结果
chans []chan<- Result
}
Result struct {
Val interface{}
Err error
Shared bool
}
)
对外暴露的方法
func Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)
func DoChan(key string, fn func() (interface{}, error)) <-chan Result)
// 将key从Group.m中删除
func Forget(key string)
DoChan()和Do()最大的区别是DoChan()属于异步调用,返回一个channel,解决同步调用时的阻塞问题
重点方法分析
Do
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock() // 加互斥锁
if g.m == nil { // 懒加载map
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok { // 检查相同的请求已经是否进入过singleflight
c.dups++
g.mu.Unlock()
c.wg.Wait() // 调用waitGroup的wait()方法阻塞住本次调用,等待第一个进入singleflight的请求执行完毕拿到结果,将本次请求唤醒.
if e, ok := c.err.(*panicError); ok { //如果调用完成,发生error ,将error上抛
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
// 返回调用结果
return c.val, c.err, true
}
c := new(call) // 相同的请求第一次进入singleflight
c.wg.Add(1)
g.m[key] = c // new一个call实体,放入singleflight.call这个map
g.mu.Unlock()
g.doCall(c, key, fn) //实际执行的函数
return c.val, c.err, c.dups > 0
}
流程图
由源码可以分析出,最后实际执行我们业务逻辑的函数其实是放到了doCall() 里,我们稍后分析这个函数
Forget
再简单看看Forget()函数,很短.
func (g *Group) Forget(key string) {
g.mu.Lock()
if c, ok := g.m[key]; ok {
c.forgotten = true // key的forgotten标志位记为true
}
delete(g.m, key) // Group.m中删除对应的key
g.mu.Unlock()
}
doCall
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
normalReturn := false
recovered := false
//使用双重defer来区分error的类型: panic && runtime.error
defer func() {
if !normalReturn && !recovered {
// fn()发生了panic且fn()中的panic没有被recover掉
// errGoexit连接runtime.Goexit错误
c.err = errGoexit
}
c.wg.Done()
g.mu.Lock()
defer g.mu.Unlock()
if !c.forgotten { // 检查key是否调用了Forget()
delete(g.m, key)
}
if e, ok := c.err.(*panicError); ok {
// 如果返回的是 panic 错误,为了避免channel被永久阻塞,我们需要确保这个panic无法被recover
if len(c.chans) > 0 {
go panic(e) // panic无法被恢复
select {} // 阻塞本goroutinue.
} else {
panic(e)
}
} else {
// 将结果正常地返回
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()
func() {
defer func() {
if !normalReturn {
// 表示fn()发生了panic()
// 此时与panic相关的堆栈已经被丢弃(调用的fn()) ,无法通过堆栈跟踪去确定error类型
if r := recover(); r != nil {
c.err = newPanicError(r) //new一个新的自定义panic err,往第一个defer抛
}
}
}()
// 执行我们实际的业务逻辑,并将业务方法的返回值赋给singleflight.call
c.val, c.err = fn()的val和err属性
// 如果fn()发生panic,normalReturn无法被赋值为true,而是进入doCall()的第二个defer()
normalReturn = true
}()
// 如果normalResult为false时,表示fn()发生了panic
// 但是执行到了这一步,表示fn()中的panic被recover了
if !normalReturn {
recovered = true // recovered标志位置为true
}
}
由以上分析可以得出几个重要的结论
singleflight主要使用sync.Mutex和sync.WaitGroup进行并发控制.
对于key相同的请求, singleflight只会处理的一个进入的请求,后续的请求都会使用waitGroup.wait()将请求阻塞
使用双重defer()区分了panic和runtime.Goexit错误,如果返回的是一个panic错误,group.c.chans会发生阻塞,那么需要抛出这个panic且确保其无法被recover
实际使用
分享一段实际项目中使用singleflight结合本地缓存的代码模版
func (s Service) getDataBySingleFlight(ctx context.Context) (entity.List, error) {
// 1. 从localCache查
resData, err := local_cache.Get(ctx, key)
if err != nil {
log.Fatalln()
return resData, err
}
if resData != nil {
return resData, nil
}
// 2. localCache无数据,从redis查
resData, err = srv.rdsRepo.Get()
if err != nil && err != redis.Nil {
// redis错误
log.Fatalln()
return resData, err
} else if redis.Nil == err {
// redis无数据 ,查db
resData, err, _ = singleFlight.Do(key, func() (interface{}, error) {
// 构建db查询条件
searchConn := entity.SearchInfo{}
// 建议休眠0.1s 捕获0.1s内的重复请求
time.Sleep(100 * time.Millisecond)
// 4. 查db
data, err := srv.dBRepo.GetByConn(ctx, searchConn)
if err != nil {
log.Fatalln()
return data, err
}
// 5. 回写localCache && redisCache
err = local_cache.Set(ctx, data)
if err != nil {
log.Fatalln()
}
err = srv.rdsRepo.Set(ctx, data)
if err != nil {
log.Fatalln()
}
// 返回db数据,回写cache的error不上抛
return data, nil
})
return resData, err
}
return resData, nil
弊端与解决方案
singleflight当然不是解决问题的银弹,在使用的过程中有一些“坑”需要我们注意
Do()方法是一个同步调用的方法,无法处理下游服务调用的超时情况
解决方案:
使用singleflight的doChan()方法,在service层使用 channel+select 做超时控制.
func enterGetAndSetCacheWithChan(ctx context.Context, key string) (str string, err error) {
tag := "enterGetAndSetCacheWithChan"
sonCtx, _ := context.WithTimeout(ctx, 2 * time.Second)
val := ""
nums := 10 //协程数
wg = &sync.WaitGroup{}
wg.Add(nums)
for idx := 0; idx < nums; idx++ {
go func() {
defer wg.Done()
val, err = getAndSetCacheWithChan(sonCtx, idx, key)
if err != nil {
log.Printf("err:[%+v]", err)
return
}
str = val
}()
}
wg.Wait()
log.Printf("tag:[%s] val:[%s]", tag, val)
return
}
func getAndSetCacheWithChan(ctx context.Context, idx int, cacheKey string) (string, error) {
tag := "getAndSetCacheWithChan"
log.Printf("tag: %s ;idx %d into-cache...", tag, idx)
ch := sf.DoChan(cacheKey, func() (ret interface{}, err error) { // do的入参key,可以直接使用缓存的key,这样同一个缓存,只有一个协程会去读DB
log.Printf("idx %v is-setting-cache", idx)
time.Sleep(100 * time.Millisecond)
log.Printf("idx %v set-cache-success!", idx)
return "myValue", nil
})
for { // 选择 context + select 超时控制
select {
case <-ctx.Done():
return "", errors.New("ctx-timeout") // 根据业务逻辑选择上抛 error
case data, _ := <-ch:
return data.Val.(string), nil
default:
}
}
}
如果第一个请求失败了,那么所有等待的请求都会返回同一个error
解决方案
根据实际情况,结合下游服务调用耗时与下游实际能支持的QPS等数据,对key做定时Forget()。
go func() {
time.Sleep(100 * time.Millisecond)
g.Forget(key)
}()
参考文章
singleflight双重defer: developer.51cto.com/article/652…
来源:https://juejin.cn/post/7093859835694809125
猜你喜欢
- time()方法返回时间,在UTC时代以秒表示浮点数。注意:尽管在时间总是返回作为一个浮点数,并不是所有的系统提供时间超过1秒精
- 一 概念Django的ORM中存在查询集的概念。查询集,也称查询结果集、QuerySet,表示从数据库中获取的对象集合。当调用如下过滤器方法
- 近期Github开源了一款基于Python开发、名为Textshot的截图工具,刚开源不到半个月已经500+Star。这两天抽空看了一下Te
- 问题:由于自己做项目的时候,需要循环的绘制数据,假设有100个样本,每个样本包含两个坐标点(A, B),我需要对这两个点标上不同的颜色,同时
- '数据库连接 Public Sub connectionDB() Try serverUrl = readFromIni(My.Ap
- JS无法进行精确计算的bug在做CRM,二代审核需求审核详情页面时。需要按比例(后端传类似0.8的小数)把用户输入的数字显示在不同的地方。
- MySQL支持的两种主要表存储格式MyISAM,InnoDB,上个月做个项目时,先使用了InnoDB,结果速度特别慢,1秒钟只能插入10几条
- jQuery的选择器可谓异常强大,没有什么DOM里的任何数据能逃出它的掌心,这点是我非常喜欢的,以前获取NODE要用getElementBy
- 本文实例讲述了Python编程实现的简单神经网络算法。分享给大家供大家参考,具体如下:python实现二层神经网络包括输入层和输出层# -*
- 写在前面Go语言在很多方面天然的具备很多便捷性,譬如网络编程,并发编程。而通道则又是Go语言实现并发编程的重要工具,因为其承担着通道之间互相
- pycharm中导入selenium报错现象: pycharm中输入from selenium import webdriver, sele
- 1、执行cmd指令,在cmd输出的内容会直接在控制台输出,返回结果为0表示执行成功。2、在调用完shell脚本后,返回一个16位的二进制数,
- 以上述图片举例,要求 相对 的顺时针夹角。注意:这里使用图像坐标系1 定义求顺时针角度的函数 import numpy as npdef
- 主程序import pygamefrom pygame.sprite import Groupfrom settings import Se
- 1、查看数据库中有哪些用户? select username from all_users;2、查看数据库中有哪些DBA用户? select
- 定义: 何为触发器?在SQL Server里面也就是对某一个表的一定的操作,触发某种条件,从而执行的一段程序。触发器是一个特殊的存储过程。
- 写在前面其实我之前写过一个简单的识别手写数字的程序,但是因为逻辑比较简单,而且要求比较严苛,是在50x50大小像素的白底图上手写黑色数字,并
- 如何检测某个对象是否有某个属性?第一想到的——没错,in:"prop" in obj这很完美,不过,还有不少人热衷下面的
- 相信很多人在使用Ajax与后台php页面进行交互的时候都碰到过中文乱码的问题。JSON作为一种轻量级的数据交换格式,备受亲睐,但是用PHP作
- 本文实例为大家分享了js图片加载淡入淡出效果展示的具体代码,供大家参考,具体内容如下HTML代码首先是图片标记的写法:<img dat