go实现一个分布式限流器的方法步骤
作者:CocoAdapter 发布时间:2024-04-28 10:46:36
项目中需要对 api 的接口进行限流,但是麻烦的是,api 可能有多个节点,传统的本地限流无法处理这个问题。限流的算法有很多,比如计数器法,漏斗法,令牌桶法,等等。各有利弊,相关博文网上很多,这里不再赘述。
项目的要求主要有以下几点:
支持本地/分布式限流,接口统一
支持多种限流算法的切换
方便配置,配置方式不确定
go 语言不是很支持 OOP,我在实现的时候是按 Java 的思路走的,所以看起来有点不伦不类,希望能抛砖引玉。
1. 接口定义
package ratelimit
import "time"
// 限流器接口
type Limiter interface {
Acquire() error
TryAcquire() bool
}
// 限流定义接口
type Limit interface {
Name() string
Key() string
Period() time.Duration
Count() int32
LimitType() LimitType
}
// 支持 burst
type BurstLimit interface {
Limit
BurstCount() int32
}
// 分布式定义的 burst
type DistLimit interface {
Limit
ClusterNum() int32
}
type LimitType int32
const (
CUSTOM LimitType = iota
IP
)
Limiter 接口参考了 Google 的 guava 包里的 Limiter 实现。Acquire 接口是阻塞接口,其实还需要加上 context 来保证调用链安全,因为实际项目中并没有用到 Acquire 接口,所以没有实现完善;同理,超时时间的支持也可以通过添加新接口继承自 Limiter 接口来实现。TryAcquire 会立即返回。
Limit 抽象了一个限流定义,Key() 方法返回这个 Limit 的唯一标识,Name() 仅作辅助,Period() 表示周期,单位是秒,Count() 表示周期内的最大次数,LimitType()表示根据什么来做区分,如 IP,默认是 CUSTOM.
BurstLimit 提供突发的能力,一般是配合令牌桶算法。DistLimit 新增 ClusterNum() 方法,因为 mentor 要求分布式遇到错误的时候,需要退化为单机版本,退化的策略即是:2 节点总共 100QPS,如果出现分区,每个节点需要调整为各 50QPS
2. LocalCounterLimiter
package ratelimit
import (
"errors"
"fmt"
"math"
"sync"
"sync/atomic"
"time"
)
// todo timer 需要 stop
type localCounterLimiter struct {
limit Limit
limitCount int32 // 内部使用,对 limit.count 做了 <0 时的转换
ticker *time.Ticker
quit chan bool
lock sync.Mutex
newTerm *sync.Cond
count int32
}
func (lim *localCounterLimiter) init() {
lim.newTerm = sync.NewCond(&lim.lock)
lim.limitCount = lim.limit.Count()
if lim.limitCount < 0 {
lim.limitCount = math.MaxInt32 // count 永远不会大于 limitCount,后面的写法保证溢出也没问题
} else if lim.limitCount == 0 {
// 禁止访问, 会无限阻塞
} else {
lim.ticker = time.NewTicker(lim.limit.Period())
lim.quit = make(chan bool, 1)
go func() {
for {
select {
case <- lim.ticker.C:
fmt.Println("ticker .")
atomic.StoreInt32(&lim.count, 0)
lim.newTerm.Broadcast()
//lim.newTerm.L.Unlock()
case <- lim.quit:
fmt.Println("work well .")
lim.ticker.Stop()
return
}
}
}()
}
}
// todo 需要机制来防止无限阻塞, 不超时也应该有个极限时间
func (lim *localCounterLimiter) Acquire() error {
if lim.limitCount == 0 {
return errors.New("rate limit is 0, infinity wait")
}
lim.newTerm.L.Lock()
for lim.count >= lim.limitCount {
// block instead of spinning
lim.newTerm.Wait()
//fmt.Println(count, lim.limitCount)
}
lim.count++
lim.newTerm.L.Unlock()
return nil
}
func (lim *localCounterLimiter) TryAcquire() bool {
count := atomic.AddInt32(&lim.count, 1)
if count > lim.limitCount {
return false
} else {
return true
}
}
代码很简单,就不多说了
3. LocalTokenBucketLimiter
golang 的官方库里提供了一个 ratelimiter,就是采用令牌桶的算法。所以这里并没有重复造轮子,直接代理了 ratelimiter。
package ratelimit
import (
"context"
"golang.org/x/time/rate"
"math"
)
type localTokenBucketLimiter struct {
limit Limit
limiter *rate.Limiter // 直接复用令牌桶的
}
func (lim *localTokenBucketLimiter) init() {
burstCount := lim.limit.Count()
if burstLimit, ok := lim.limit.(BurstLimit); ok {
burstCount = burstLimit.BurstCount()
}
count := lim.limit.Count()
if count < 0 {
count = math.MaxInt32
}
f := float64(count) / lim.limit.Period().Seconds()
if f < 0 {
f = float64(rate.Inf) // 无限
} else if f == 0 {
panic("为 0 的时候,底层实现有问题")
}
lim.limiter = rate.NewLimiter(rate.Limit(f), int(burstCount))
}
func (lim *localTokenBucketLimiter) Acquire() error {
err := lim.limiter.Wait(context.TODO())
return err
}
func (lim *localTokenBucketLimiter) TryAcquire() bool {
return lim.limiter.Allow()
}
4. RedisCounterLimiter
package ratelimit
import (
"math"
"sync"
"xg-go/log"
"xg-go/xg/common"
)
type redisCounterLimiter struct {
limit DistLimit
limitCount int32 // 内部使用,对 limit.count 做了 <0 时的转换
redisClient *common.RedisClient
once sync.Once // 退化为本地计数器的时候使用
localLim Limiter
//script string
}
func (lim *redisCounterLimiter) init() {
lim.limitCount = lim.limit.Count()
if lim.limitCount < 0 {
lim.limitCount = math.MaxInt32
}
//lim.script = buildScript()
}
//func buildScript() string {
// sb := strings.Builder{}
//
// sb.WriteString("local c")
// sb.WriteString("\nc = redis.call('get',KEYS[1])")
// // 调用不超过最大值,则直接返回
// sb.WriteString("\nif c and tonumber(c) > tonumber(ARGV[1]) then")
// sb.WriteString("\nreturn c;")
// sb.WriteString("\nend")
// // 执行计算器自加
// sb.WriteString("\nc = redis.call('incr',KEYS[1])")
// sb.WriteString("\nif tonumber(c) == 1 then")
// sb.WriteString("\nredis.call('expire',KEYS[1],ARGV[2])")
// sb.WriteString("\nend")
// sb.WriteString("\nif tonumber(c) == 1 then")
// sb.WriteString("\nreturn c;")
//
// return sb.String()
//}
func (lim *redisCounterLimiter) Acquire() error {
panic("implement me")
}
func (lim *redisCounterLimiter) TryAcquire() (success bool) {
defer func() {
// 一般是 redis 连接断了,会触发空指针
if err := recover(); err != nil {
//log.Errorw("TryAcquire err", common.ERR, err)
//success = lim.degradeTryAcquire()
//return
success = true
}
// 没有错误,判断是否开启了 local 如果开启了,把它停掉
//if lim.localLim != nil {
// // stop 线程安全
// lim.localLim.Stop()
//}
}()
count, err := lim.redisClient.IncrBy(lim.limit.Key(), 1)
//panic("模拟 redis 出错")
if err != nil {
log.Errorw("TryAcquire err", common.ERR, err)
panic(err)
}
// *2 是为了保留久一点,便于观察
err = lim.redisClient.Expire(lim.limit.Key(), int(2 * lim.limit.Period().Seconds()))
if err != nil {
log.Errorw("TryAcquire error", common.ERR, err)
panic(err)
}
// 业务正确的情况下 确认超限
if int32(count) > lim.limitCount {
return false
}
return true
//keys := []string{lim.limit.Key()}
//
//log.Errorw("TryAcquire ", keys, lim.limit.Count(), lim.limit.Period().Seconds())
//count, err := lim.redisClient.Eval(lim.script, keys, lim.limit.Count(), lim.limit.Period().Seconds())
//if err != nil {
// log.Errorw("TryAcquire error", common.ERR, err)
// return false
//}
//
//
//typeName := reflect.TypeOf(count).Name()
//log.Errorw(typeName)
//
//if count != nil && count.(int32) <= lim.limitCount {
//
// return true
//}
//return false
}
func (lim *redisCounterLimiter) Stop() {
// 判断是否开启了 local 如果开启了,把它停掉
if lim.localLim != nil {
// stop 线程安全
lim.localLim.Stop()
}
}
func (lim *redisCounterLimiter) degradeTryAcquire() bool {
lim.once.Do(func() {
count := lim.limit.Count() / lim.limit.ClusterNum()
limit := LocalLimit {
name: lim.limit.Name(),
key: lim.limit.Key(),
count: count,
period: lim.limit.Period(),
limitType: lim.limit.LimitType(),
}
lim.localLim = NewLimiter(&limit)
})
return lim.localLim.TryAcquire()
}
代码里回退的部分注释了,因为线上为了稳定,实习生的代码毕竟,所以先不跑。
本来原有的思路是直接用 lua 脚本在 redis 上保证原子操作,但是底层封装的库对于直接调 eval 跑的时候,会抛错,而且 source 是 go-redis 里面,赶 ddl 没有时间去 debug,所以只能用 incrBy + expire 分开来。
5. RedisTokenBucketLimiter
令牌桶的状态变量得放在一个 线程安全/一致 的地方,redis 是不二人选。但是令牌桶的算法核心是个延迟计算得到令牌数量,这个是一个很长的临界区,所以要么用分布式锁,要么直接利用 redis 的单线程以原子方式跑。一般业界是后者,即 lua 脚本维护令牌桶的状态变量、计算令牌。代码类似这种
local tokens_key = KEYS[1]
local timestamp_key = KEYS[2]
--redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key)
local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
local intval = tonumber(ARGV[5])
local fill_time = capacity/rate
local ttl = math.floor(fill_time*2) * intval
local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
last_tokens = capacity
end
local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
last_refreshed = 0
end
local delta = math.max(0, now-last_refreshed)
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
if allowed then
new_tokens = filled_tokens - requested
end
redis.call("setex", tokens_key, ttl, new_tokens)
redis.call("setex", timestamp_key, ttl, now)
return { allowed, new_tokens }
来源:https://www.jianshu.com/p/709090b86018


猜你喜欢
- 之前用 copy 不多,本以为它是个很方便的函数,没想到在做练习题时竟还是被它坑了。是我对他期望太多了。func copy(dst, src
- 在学MVC过程中,我们一般都是利用layui插件里的layui数据表格加载数据库中的数据,而layui表格里有许多的事件监听,比如监听行的单
- request获取post请求中的json数据def hello(request): data = json.loads(request.b
- 目录简介创建线程构造器方式继承方式守护线程线程本地数据定时器简介Python 通过 _thread 和 threading 模块提供了对多线
- pandas每次多Sheet写入文件,只能一次性存入,不然每次会重写文件,最后只保留最后一次的写入。# !usr/bin env pytho
- 第一:mysql服务的启动和停止net stop mysqlnet start mysql第二:登陆mysql –u用户名 [–h主机名或者
- 这里主要是解决multipart/form-data这种格式的文件上传,基本现在http协议上传文件基本上都是通过这种格式上传1 思路一般情
- 前言:在页面操作过程中有时候点击某个链接会弹出新的窗口,但由于Selenium的所有操作都是在第一个打开的页面进行的,这时就需要主机切换到新
- 本文将介绍8个简洁的Python技巧,若非经验十足的程序员,你肯定有些从未见过。向着更简洁更高效,出发吧!1.通过多个键值将对象进行排序 假
- 1、文档使用numpy的 concatenate 拼接矩阵,文档里面这样解释:numpy.concatenate((a1, a2, ...)
- 导语哈喽铁汁们好久不见吖~小编已经复工了于是马不停蹄赶来给大家准备新年礼物算开工礼物吧!海龟来作图虎年就是要画老虎2022不用纸和笔~今晚画
- 1. Single array iteration>>> a = np.arange(6).reshape(2,3)>
- 示例1我们将要请求五个不同的url:单线程import timeimport urllib2defget_responses(): &nbs
- 平时经常看php的错误日志,很少有机会去自己动手写日志,看了王健的《最佳日志实践》觉得写一个清晰明了,结构分明的日志还是非常有必要的。在写日
- 文章介绍OpenCV 库中包含很多运算函数,这里着重介绍按位运算的基本原理并举例说明。本篇文章中主要涉及到的函数有:按位与:bitwise_
- 导入模块import numpy as npimport pandas as pd1.读取测试数据data=pd.read_csv(r
- Numpy模块被广泛用于科学和数值计算,自然有它的强大之处,之前对于特征处理中需要进行数据列表或者矩阵拼接的时候都是自己写的函数来完成的,今
- TTS简介TTS(Text To Speech)是一种语音合成技术,可以让机器将输入文本以语音的方式播放出来,实现机器说话的效果。TTS分成
- 一、前言关于什么是Dapper(详细入口),在此不做赘述;本文仅对Dapper在.Net Core中的使用作扼要说明,所陈代码以示例讲解为主
- OpenCV图像处理一、图像入门1.读取图像使用 cv.imread() 函数读取一张图像,图片应该在工作目录中,或者应该提供完整的图像路径