自学内容网 自学内容网

开源限流组件分析(二):uber-go/ratelimit

漏桶限流算法

漏桶限流算法思路很简单,水(数据或者请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,漏桶算法能强行限制请求的处理速度
在这里插入图片描述


相比于令牌桶中只要桶内还有剩余令牌,调用方就可以马上消费令牌的策略。漏桶相对来说更加严格,调用方只能按照预定的间隔顺序消费令牌

例如:假设漏桶出水的间隔是10ms,上次在0ms时刻出水,那么下次是10ms时刻,再下次是20ms时刻

uber的漏桶算法

uber对标准的漏桶算法做了一些优化,加了一些松弛量,这么做了后就能应对突发流量,达到和令牌桶一样的效果

关于什么是松弛量,在介绍获取令牌流程时再详细分析

阅读的源码:https://github.com/uber-go/ratelimit,版本:v0.3.1

使用

下面展示一个没有使用松弛量的漏桶使用示例:

func Example_default() {
// 参数100:代表每秒放行100个请求,即每10ms放行一个请求
rl := ratelimit.New(100)
  
prev := time.Now()
for i := 0; i < 10; i++ {
        // Take获取令牌,返回获取令牌成功的时间
now := rl.Take()
if i > 0 {
            // 打印每次放行间隔
fmt.Println(i, now.Sub(prev))
}
prev = now
}
}

打印结果:

1 10ms
2 10ms
3 10ms
4 10ms
5 10ms
6 10ms
7 10ms
8 10ms
9 10ms

可以看出每次放行间隔10ms,符合预期

uber的漏桶提供了mutex版本atomic版本两种实现,主要区别在于怎么控制并发,以及怎么记录松弛量


mutex版本

数据结构

type mutexLimiter struct {
sync.Mutex

// 上次请求放行的时刻
last time.Time

    // 桶中积累的松弛量
sleepFor time.Duration

// 每个请求之间的间隔
perRequest time.Duration

    // 最大松弛量
maxSlack   time.Duration
clock      Clock
}

初始化桶:

func newMutexBased(rate int, opts ...Option) *mutexLimiter {
config := buildConfig(opts)
perRequest := config.per / time.Duration(rate)
l := &mutexLimiter{
perRequest: perRequest,

// config.slack默认值=10, 例如当perRequest=10ms时
           // maxSlack就是 -10 * 100ms = -1s
maxSlack: -1 * time.Duration(config.slack) * perRequest,
clock:    config.clock,
}
return l
}

func buildConfig(opts []Option) config {
c := config{
clock: clock.New(),
           // 最大松弛量默认是perRequest的10倍
slack: 10,
per:   time.Second,
}

for _, opt := range opts {
opt.apply(&c)
}
return c
}

主要设置两个变量:

  • 每次放行时间间隔perRequest:根据参数rate计算,rate表示每单位时间per能放行多少请求

    • 例如当per=1s,rate=100时,每次放行时间间隔perRequest=10ms
  • 最大松弛量maxSlack:config.slack默认值=10

    • 例如当perRequest=10ms时,maxSlack就是 -10 * 100ms = -1s

获取令牌

要实现标准的漏桶算法,其实比较简单:

记录上次放行时间last,当本次请求到来时, 如果此刻的时间与last相比并没有达到 perRequest 规定的间隔大小, sleep 一段时间即可

在这里插入图片描述

对应代码为(删除了松弛相关代码):

func (t *mutexLimiter) Take() time.Time {
t.Lock()
defer t.Unlock()

now := t.clock.Now()

// 如果第一次请求,直接放行
if t.last.IsZero() {
t.last = now
return t.last
}

// 当前时间距离上次放行时间的间隔,
sleepFor := t.perRequest - now.Sub(t.last)

// 如果比规定的间隔小,需要sleep
if sleepFor > 0 {
t.clock.Sleep(t.sleepFor)
t.last = now.Add(t.sleepFor)
t.sleepFor = 0

return t.last
}

松弛量

如果不引入松弛量,按照标准漏桶算法的流程:

假设现在有三个请求,req1req2req3,获取令牌间隔perRequest规定为 10ms

  1. req1 先到来

  2. req1 完成之后 20msreq2 才到来,此时距离上次放行有20ms,大于10ms,可以对req2 放行

  3. req2 放行后5msreq3 到来,此时距离上次放行才5ms,不足 规定的间隔10ms,因此还需要等待 5ms 才能继续放行req3

在这里插入图片描述

这种策略有什么问题?无法应对任何突发流量,没有弹性,定死了相邻两次请求放行的间隔必须大于等于规定的值

于是引入了松弛量的概念:当较长时间(超过perRequest)没有请求到来时,会在桶中积累一些松弛量,这样接下来的请求可以先消耗松弛量,而不会被漏洞的获取令牌间隔限制。直到把松弛量耗尽为止

当然松弛量也不是无限积累,这样当很长时间没有请求后,就跟没有限流没区别了。因此桶中规定了最多积累多少松弛量maxSlack

加上松弛量后代获取令牌代码如下:

  1. 计算perRequest - now.Sub(t.last)

    1. 如果大于0,说明当前时间距离上次放行时间不足规定间隔,需要等待,或者需要消耗松弛量
    2. 如果小于0,说明当前时间距离上次放行时间超过了规定间隔,那么当前请求一定可以放行,并且可以在桶中积累一些松弛量,给下次请求使用
    3. 将结果累加到t.sleepFor
  2. 假设累加的松弛值超过了maxSlack,修正为maxSlack,默认为10倍的规定间隔

  3. 如果t.sleepFor > 0,说明本次请求应该等待的时间,在使用完桶中的松弛量后还不够,需要sleep这段时间,结束后将sleepFor置为0

  4. 否则t.sleepFor <= 0,说明桶中的存储松弛余量可以满足本次请求,本次请求放行

func (t *mutexLimiter) Take() time.Time {
t.Lock()
defer t.Unlock()

now := t.clock.Now()

// 如果第一次请求,直接放行
if t.last.IsZero() {
t.last = now
return t.last
}

// 假设perRequest=10ms,now.Sub(t.last)=5ms,那么需要睡5ms
// 假设perRequest=10ms,now.Sub(t.last)=15ms,说明距离上传请求的时间超过了漏桶的限流间隔10ms,那么sleepFor变为-5
t.sleepFor += t.perRequest - now.Sub(t.last)


// 默认最多松弛 10倍的perRequest
// 如果累加的松弛值超过了maxSlack,修正为maxSlack
if t.sleepFor < t.maxSlack {
t.sleepFor = t.maxSlack
}

// 需要sleep
if t.sleepFor > 0 {
t.clock.Sleep(t.sleepFor)
t.last = now.Add(t.sleepFor)
t.sleepFor = 0
// 如果sleepFor <= 0,说明还有存储的松弛余量,可以放行
} else {
t.last = now
}

return t.last
}

atomic版本

数据结构

type atomicInt64Limiter struct {
// 上次在哪个时刻发放许可
state      int64
    // 每次请求间隔
perRequest time.Duration
    // 最大松弛量
maxSlack   time.Duration
clock      Clock
}

func newAtomicInt64Based(rate int, opts ...Option) *atomicInt64Limiter {

config := buildConfig(opts)
perRequest := config.per / time.Duration(rate)
l := &atomicInt64Limiter{
perRequest: perRequest,
// 最大松弛量:10倍的perRequest
maxSlack: time.Duration(config.slack) * perRequest,
clock:    config.clock,
}
atomic.StoreInt64(&l.state, 0)
return l
}

获取令牌

atomic版本的漏桶不是用一个变量存储桶中的松弛量,而是记录了上次在哪个时刻发放许可timeOfNextPermissionIssue,那松弛量用什么表示呢?当前时间now减去上次发放许可时间timeOfNextPermissionIssue就代表松弛量

怎么理解?这个值代表应该在什么时刻发放令牌:

  • 如果该值 比now大,说明需要sleep一段时间等待时间流逝,本次才能获取到令牌
  • 如果该值 比now小,说明应该在更早的时间就可以获取令牌,本次请求不需要等待

每次请求时会在timeOfNextPermissionIssue的基础上加上perRequest,并更新timeOfNextPermissionIssue,表示本次发放许可的时间比上次前进了perRequest时间

如果较长时间没有请求,那天然就积累了一些松弛量,因为上次发放许可时间就会比较小,即便加上本次的消耗perRequest也没有now大,就可以直接放行

  • 如果上次发放许可时间,加上固定间隔perRequest小于now,说明本次可以放行,并更新上次发放许可时间 += perRequest

    • 当然timeOfNextPermissionIssue不能比now小太多,否则限流就没意义了。因此规定该值最多比now小10倍的perRequest,表示松弛量最多为perRequest的10倍
  • 否则不能放行,需要等待时间流逝到timeOfNextPermissionIssue + perRequest为止

func (t *atomicInt64Limiter) Take() time.Time {
var (
newTimeOfNextPermissionIssue int64
now                          int64
)
for {
now = t.clock.Now().UnixNano()

// 上次在哪个时刻发放许可
timeOfNextPermissionIssue := atomic.LoadInt64(&t.state)

switch {
// 第一次请求,或者不是第一次请求,且(没有松弛量,且now 比 上次发放许可时间 多了超过perRequest )
case timeOfNextPermissionIssue == 0 || (t.maxSlack == 0 && now-timeOfNextPermissionIssue > int64(t.perRequest)):
// 本次放行,本次在now时刻发放许可
newTimeOfNextPermissionIssue = now
// 可以有松弛量,且 当前时间 - 上次发放许可时间 > 11倍perRequest
case t.maxSlack > 0 && now-timeOfNextPermissionIssue > int64(t.maxSlack)+int64(t.perRequest):
// 本次在 now - 最大松弛量 时刻发放许可
newTimeOfNextPermissionIssue = now - int64(t.maxSlack)
/**
1.没有松弛量,且当前时间距离 上次发放许可时间 超过了 不到perRequest,本次应该在上次发放许可时间 + perRequest 再发放
2.有松弛量,且 当前时间 - 上次发放许可时间 <= 11倍perRequest,那本次应该在上次发放许可时间 + perRequest 再发放
*/
default:
// 更新上次发放许可时间,+= perRequest
newTimeOfNextPermissionIssue = timeOfNextPermissionIssue + int64(t.perRequest)
}

if atomic.CompareAndSwapInt64(&t.state, timeOfNextPermissionIssue, newTimeOfNextPermissionIssue) {
break
}
}

  
sleepDuration := time.Duration(newTimeOfNextPermissionIssue - now)
// 没有余量了,需要sleep
    if sleepDuration > 0 {
t.clock.Sleep(sleepDuration)
// 返回放行时的时间
return time.Unix(0, newTimeOfNextPermissionIssue)
}
return time.Unix(0, now)
}

测试漏桶的松弛量

下面对漏桶的松弛量进行测试:如果先积累一些松弛量,就会起到因对突发流量的效果,例如让时间先走45ms

func Example_default() {
// 每10ms放行一个请求, 最大松弛量=100ms
rl := ratelimit.New(100)

// 在0ms发放许可
rl.Take()
// 此时时间来到45ms
time.Sleep(time.Millisecond * 45)

prev := time.Now()

for i := 0; i < 10; i++ {
now := rl.Take()
if i > 0 {
fmt.Println(i, now.Sub(prev))
}
prev = now
}
}

打印结果:

1 1µs
2 103µs
3 4µs
4 4.79ms
5 10ms
6 10ms
7 10ms
8 10ms
9 10ms

可以看出:前4次都没等,直接获取到令牌,第4次等了大约5ms,之后每次放行间隔都是10ms

执行到for循环时,上次发放许可时间10ms,当前时刻45ms

拆解for循环中的每次take中的漏桶状态:

  1. i=0, 下次应该在10ms发放许可,当前时刻45ms,放行
  2. i=1, 下次应该在20ms发放许可,当前时刻45ms,放行
  3. i=2, 下次应该在30ms发放许可,当前时刻45ms,放行
  4. i=3, 下次应该在40ms发放许可,当前时刻45ms,放行
  5. i=4, 下次应该在50ms发放许可,当前时刻45ms,更新timeOfNextPermissionIssue=50,需要sleep 5ms
  6. i=5, 下次应该在60ms发放许可,当前50ms,需要sleep10ms,更新timeOfNextPermissionIssue=60

如下图所示:

在这里插入图片描述

总结

加上松弛量后,这个漏桶和标准的令牌桶区别就不大了:

  • 都能应对一定的突发流量
  • 当桶中没有松弛量(没有令牌时),都按照固定时间间隔放行请求

这个库有个问题是:对桶中等待中的请求数没有限制,这样当并发量非常大时,会导致请求一直在桶中堆积

因此工程中要使用的话,最好对源码进行改造:当等待中的请求数超过阈值就快速返回失败


原文地址:https://blog.csdn.net/qq_39383767/article/details/143061685

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!