网络编程
位置:首页>> 网络编程>> Go语言>> Golang实现常见的限流算法的示例代码

Golang实现常见的限流算法的示例代码

作者:jxwu  发布时间:2024-04-25 13:22:35 

标签:Go,限流,算法

限流是项目中经常需要使用到的一种工具,一般用于限制用户的请求的频率,也可以避免瞬间流量过大导致系统崩溃,或者稳定消息处理速率

这个文章主要是使用Go实现常见的限流算法,代码参考了文章面试官:来,年轻人!请手撸5种常见限流算法! 和面试必备:4种经典限流算法讲解如果需要Java实现或更详细的算法介绍可以看这两篇文章

固定窗口

每开启一个新的窗口,在窗口时间大小内,可以通过窗口请求上限个请求。

该算法主要是会存在临界问题,如果流量都集中在两个窗口的交界处,那么突发流量会是设置上限的两倍。

package limiter

import (
  "sync"
  "time"
)

// FixedWindowLimiter 固定窗口限流器
type FixedWindowLimiter struct {
  limit    int           // 窗口请求上限
  window   time.Duration // 窗口时间大小
  counter  int           // 计数器
  lastTime time.Time     // 上一次请求的时间
  mutex    sync.Mutex    // 避免并发问题
}

func NewFixedWindowLimiter(limit int, window time.Duration) *FixedWindowLimiter {
  return &FixedWindowLimiter{
     limit:    limit,
     window:   window,
     lastTime: time.Now(),
  }
}

func (l *FixedWindowLimiter) TryAcquire() bool {
  l.mutex.Lock()
  defer l.mutex.Unlock()
  // 获取当前时间
  now := time.Now()
  // 如果当前窗口失效,计数器清0,开启新的窗口
  if now.Sub(l.lastTime) > l.window {
     l.counter = 0
     l.lastTime = now
  }
  // 若到达窗口请求上限,请求失败
  if l.counter >= l.limit {
     return false
  }
  // 若没到窗口请求上限,计数器+1,请求成功
  l.counter++
  return true
}

滑动窗口

滑动窗口类似于固定窗口,它只是把大窗口切分成多个小窗口,每次向右移动一个小窗口,它可以避免两倍的突发流量。

固定窗口可以说是滑动窗口的一种特殊情况,只要滑动窗口里面的小窗口和大窗口大小一样。

窗口算法都有一个问题,当流量达到上限,后面的请求都会被拒绝。

package limiter

import (
  "errors"
  "sync"
  "time"
)

// SlidingWindowLimiter 滑动窗口限流器
type SlidingWindowLimiter struct {
  limit        int           // 窗口请求上限
  window       int64         // 窗口时间大小
  smallWindow  int64         // 小窗口时间大小
  smallWindows int64         // 小窗口数量
  counters     map[int64]int // 小窗口计数器
  mutex        sync.Mutex    // 避免并发问题
}

// NewSlidingWindowLimiter 创建滑动窗口限流器
func NewSlidingWindowLimiter(limit int, window, smallWindow time.Duration) (*SlidingWindowLimiter, error) {
  // 窗口时间必须能够被小窗口时间整除
  if window%smallWindow != 0 {
     return nil, errors.New("window cannot be split by integers")
  }

return &SlidingWindowLimiter{
     limit:        limit,
     window:       int64(window),
     smallWindow:  int64(smallWindow),
     smallWindows: int64(window / smallWindow),
     counters:     make(map[int64]int),
  }, nil
}

func (l *SlidingWindowLimiter) TryAcquire() bool {
  l.mutex.Lock()
  defer l.mutex.Unlock()

// 获取当前小窗口值
  currentSmallWindow := time.Now().UnixNano() / l.smallWindow * l.smallWindow
  // 获取起始小窗口值
  startSmallWindow := currentSmallWindow - l.smallWindow*(l.smallWindows-1)

// 计算当前窗口的请求总数
  var count int
  for smallWindow, counter := range l.counters {
     if smallWindow < startSmallWindow {
        delete(l.counters, smallWindow)
     } else {
        count += counter
     }
  }

// 若到达窗口请求上限,请求失败
  if count >= l.limit {
     return false
  }
  // 若没到窗口请求上限,当前小窗口计数器+1,请求成功
  l.counters[currentSmallWindow]++
  return true
}

漏桶算法

漏桶是模拟一个漏水的桶,请求相当于往桶里倒水,处理请求的速度相当于水漏出的速度。

主要用于请求处理速率较为稳定的服务,需要使用生产者消费者模式把请求放到一个队列里,让消费者以一个较为稳定的速率处理。

package limiter

import (
  "sync"
  "time"
)

// LeakyBucketLimiter 漏桶限流器
type LeakyBucketLimiter struct {
  peakLevel       int        // 最高水位
  currentLevel    int        // 当前水位
  currentVelocity int        // 水流速度/秒
  lastTime        time.Time  // 上次放水时间
  mutex           sync.Mutex // 避免并发问题
}

func NewLeakyBucketLimiter(peakLevel, currentVelocity int) *LeakyBucketLimiter {
  return &LeakyBucketLimiter{
     peakLevel:       peakLevel,
     currentVelocity: currentVelocity,
     lastTime:        time.Now(),
  }
}

func (l *LeakyBucketLimiter) TryAcquire() bool {
  l.mutex.Lock()
  defer l.mutex.Unlock()

// 尝试放水
  now := time.Now()
  // 距离上次放水的时间
  interval := now.Sub(l.lastTime)
  if interval >= time.Second {
     // 当前水位-距离上次放水的时间(秒)*水流速度
     l.currentLevel = maxInt(0, l.currentLevel-int(interval/time.Second)*l.currentVelocity)
     l.lastTime = now
  }

// 若到达最高水位,请求失败
  if l.currentLevel >= l.peakLevel {
     return false
  }
  // 若没有到达最高水位,当前水位+1,请求成功
  l.currentLevel++
  return true
}

func maxInt(a, b int) int {
  if a > b {
     return a
  }
  return b
}

令牌桶

与漏桶算法的相反,令牌桶会不断地把令牌添加到桶里,而请求会从桶中获取令牌,只有拥有令牌地请求才能被接受。

因为桶中可以提前保留一些令牌,所以它允许一定地突发流量通过。

package limiter

import (
  "sync"
  "time"
)

// TokenBucketLimiter 令牌桶限流器
type TokenBucketLimiter struct {
  capacity      int        // 容量
  currentTokens int        // 令牌数量
  rate          int        // 发放令牌速率/秒
  lastTime      time.Time  // 上次发放令牌时间
  mutex         sync.Mutex // 避免并发问题
}

func NewTokenBucketLimiter(capacity, rate int) *TokenBucketLimiter {
  return &TokenBucketLimiter{
     capacity: capacity,
     rate:     rate,
     lastTime: time.Now(),
  }
}

func (l *TokenBucketLimiter) TryAcquire() bool {
  l.mutex.Lock()
  defer l.mutex.Unlock()

// 尝试发放令牌
  now := time.Now()
  // 距离上次发放令牌的时间
  interval := now.Sub(l.lastTime)
  if interval >= time.Second {
     // 当前令牌数量+距离上次发放令牌的时间(秒)*发放令牌速率
     l.currentTokens = minInt(l.capacity, l.currentTokens+int(interval/time.Second)*l.rate)
     l.lastTime = now
  }

// 如果没有令牌,请求失败
  if l.currentTokens == 0 {
     return false
  }
  // 如果有令牌,当前令牌-1,请求成功
  l.currentTokens--
  return true
}

func minInt(a, b int) int {
  if a < b {
     return a
  }
  return b
}

滑动日志

滑动日志与滑动窗口算法类似,但是滑动日志主要用于多级限流的场景,比如短信验证码1分钟1次,1小时10次,1天20次这种业务。

算法流程与滑动窗口相同,只是它可以指定多个策略,同时在请求失败的时候,需要通知调用方是被哪个策略所拦截。

package limiter

import (
  "errors"
  "fmt"
  "sort"
  "sync"
  "time"
)

// ViolationStrategyError 违背策略错误
type ViolationStrategyError struct {
  Limit  int           // 窗口请求上限
  Window time.Duration // 窗口时间大小
}

func (e *ViolationStrategyError) Error() string {
  return fmt.Sprintf("violation strategy that limit = %d and window = %d", e.Limit, e.Window)
}

// SlidingLogLimiterStrategy 滑动日志限流器的策略
type SlidingLogLimiterStrategy struct {
  limit        int   // 窗口请求上限
  window       int64 // 窗口时间大小
  smallWindows int64 // 小窗口数量
}

func NewSlidingLogLimiterStrategy(limit int, window time.Duration) *SlidingLogLimiterStrategy {
  return &SlidingLogLimiterStrategy{
     limit:  limit,
     window: int64(window),
  }
}

// SlidingLogLimiter 滑动日志限流器
type SlidingLogLimiter struct {
  strategies  []*SlidingLogLimiterStrategy // 滑动日志限流器策略列表
  smallWindow int64                        // 小窗口时间大小
  counters    map[int64]int                // 小窗口计数器
  mutex       sync.Mutex                   // 避免并发问题
}

func NewSlidingLogLimiter(smallWindow time.Duration, strategies ...*SlidingLogLimiterStrategy) (*SlidingLogLimiter, error) {
  // 复制策略避免被修改
  strategies = append(make([]*SlidingLogLimiterStrategy, 0, len(strategies)), strategies...)

// 不能不设置策略
  if len(strategies) == 0 {
     return nil, errors.New("must be set strategies")
  }

// 排序策略,窗口时间大的排前面,相同窗口上限大的排前面
  sort.Slice(strategies, func(i, j int) bool {
     a, b := strategies[i], strategies[j]
     if a.window == b.window {
        return a.limit > b.limit
     }
     return a.window > b.window
  })
  fmt.Println(strategies[0], strategies[1])

for i, strategy := range strategies {
     // 随着窗口时间变小,窗口上限也应该变小
     if i > 0 {
        if strategy.limit >= strategies[i-1].limit {
           return nil, errors.New("the smaller window should be the smaller limit")
        }
     }
     // 窗口时间必须能够被小窗口时间整除
     if strategy.window%int64(smallWindow) != 0 {
        return nil, errors.New("window cannot be split by integers")
     }
     strategy.smallWindows = strategy.window / int64(smallWindow)
  }

return &SlidingLogLimiter{
     strategies:  strategies,
     smallWindow: int64(smallWindow),
     counters:    make(map[int64]int),
  }, nil
}

func (l *SlidingLogLimiter) TryAcquire() error {
  l.mutex.Lock()
  defer l.mutex.Unlock()

// 获取当前小窗口值
  currentSmallWindow := time.Now().UnixNano() / l.smallWindow * l.smallWindow
  // 获取每个策略的起始小窗口值
  startSmallWindows := make([]int64, len(l.strategies))
  for i, strategy := range l.strategies {
     startSmallWindows[i] = currentSmallWindow - l.smallWindow*(strategy.smallWindows-1)
  }

// 计算每个策略当前窗口的请求总数
  counts := make([]int, len(l.strategies))
  for smallWindow, counter := range l.counters {
     if smallWindow < startSmallWindows[0] {
        delete(l.counters, smallWindow)
        continue
     }
     for i := range l.strategies {
        if smallWindow >= startSmallWindows[i] {
           counts[i] += counter
        }
     }
  }

// 若到达对应策略窗口请求上限,请求失败,返回违背的策略
  for i, strategy := range l.strategies {
     if counts[i] >= strategy.limit {
        return &ViolationStrategyError{
           Limit:  strategy.limit,
           Window: time.Duration(strategy.window),
        }
     }
  }

// 若没到窗口请求上限,当前小窗口计数器+1,请求成功
  l.counters[currentSmallWindow]++
  return nil
}

来源:https://juejin.cn/post/7056068978862456846

0
投稿

猜你喜欢

手机版 网络编程 asp之家 www.aspxhome.com