网站怎么找,渭南市建设项目,微信平台制作网站开发,做外贸常用的网站本文主要介绍两个包Uber漏桶#xff0c;time/rate令牌桶 可以了解到#xff1a; 使用方法漏桶/令牌桶 两种限流思想 and 实现原理区别及适用场景应用Case 背景
我们为了保护系统资源#xff0c;防止过载#xff0c;常常会使用限流器。
使用场景#xff1a;
API速率限制… 本文主要介绍两个包Uber漏桶time/rate令牌桶 可以了解到 使用方法漏桶/令牌桶 两种限流思想 and 实现原理区别及适用场景应用Case 背景
我们为了保护系统资源防止过载常常会使用限流器。
使用场景
API速率限制防止用户恶意刷接口当然这部分现在有公司风控部门来管控例如whale刷数据控制对数据库的访问速率避免瞬时大流量打崩数据库保护实例服务降级例如节假日对一些非关键服务进行服务降级保证关键服务的可用性爬虫大多数网站具有反爬能力需要控制爬虫速率
限流器通用实现方案
固定窗口限流 在这种方法中时间被分割成固定的窗口每个窗口都有一个请求的最大限制。这种方法简单易实现但可能在窗口交界处出现请求的瞬时峰值。滑动窗口限流 滑动窗口限流是对固定窗口限流的改进它将时间分割成更小的窗口从而使请求分布更加均匀。这种方法可以更好地控制请求的速率但实现起来更复杂。并发限流 并发限流是限制系统中同时处理的请求的数量。这种方法可以防止系统过载但需要注意防止长时间的请求阻塞其他请求。漏桶限流Leaky Bucket 系统将请求放入桶中然后以固定的速率从桶中取出请求进行处理就像水从漏桶中以固定的速率漏出一样。如果桶已满则新的请求会等待。漏桶的优点是输出数据的速率总是恒定的无论输入数据的速率如何变化。令牌桶限流Token Bucket 系统以固定的速率向桶中添加令牌请求在处理前需要从桶中获取令牌只有获取到令牌的请求才会被处理。如果桶中没有足够的令牌则请求需要等待或被拒绝。令牌桶的优点是能够应对突发流量因为在低流量时期未使用的令牌可以积攒起来用于应对突发的高流量。自适应限流 根据系统能够承载的最大吞吐量来进行限流当当前的流量大于最大吞吐的时候就限制流量进入反之则允许通过需要一个机器指标来做触发阈值常见的选择一般是机器负载、CPU 使用率总体平均响应时间、入口 QPS 和并发线程数等。
本文我们主要聚焦于 漏桶限流 和 令牌桶限流。这两种方法分别有两种代表性的包
漏桶限流Uber开源的第三方包目前4k StarsMIT license 主要用于服务限流GitHub - uber-go/ratelimit: A Go blocking leaky-bucket rate limit implementation令牌桶限流Go标准库应用广泛golang.org/x/time/rate Uber “改良版”的漏桶限流
漏桶的基本思想是将该组件想象成一个出口大小固定的桶输出的数据流只能以固定的速率输出。 GitHub - uber-go/ratelimit: A Go blocking leaky-bucket rate limit implementation 包采用的是“改良版的漏桶”方式实现。 为什么这么说呢因为有两部分做了相关的改进
漏桶实现方案“改良版的漏桶”方式实现待处理队列Queue无待处理队列直接处理 or 等待(或者可以理解为待处理队列长度1)最大输出速率固定通过松弛的方式保证整体上的QPS输出速率可能瞬时增高允许短时间的突发流量
1. 使用方法
1.1 初始化
rl : ratelimit.New(100) // per second第一个参数代表每秒可以处理多少个请求。
1.2 限流
now : rl.Take()如果不满足条件请求速率过快Take方法将会阻塞一段时间直至满足条件。如果时间条件满足则直接返回。可以类比下文标准库中的 Wait 方法
1.3 举个
import (fmttimego.uber.org/ratelimit
)func main() {rl : ratelimit.New(100) // per secondprev : time.Now()for i : 0; i 10; i {now : rl.Take()fmt.Println(i, now.Sub(prev))prev now}
}// Output
0 0s
1 10ms
2 10ms
3 10ms
4 10ms
5 10ms
6 10ms
7 10ms
8 10ms
9 10ms可以看到每次执行的间隔时间为10ms
2. 原理分析
下文介绍2023.7更新的V3版本本人认为该思路更好理解网上也有许多介绍V1/V2版本的做法参考文档
2.1 初始化New方法
// New returns a Limiter that will limit to the given RPS.
func New(rate int, opts ...Option) Limiter {return newAtomicInt64Based(rate, opts...)
}type Limiter interface {// Take should block to make sure that the RPS is met.Take() time.Time
}创建了一个Limiter Interface库中有四个实现如下所示
atomicInt64Limiter: 使用sync/atomic并发安全高性能的限流版本V3版本限流atomicLimiter: 使用sync/atomic并发安全的限流版本mutexLimiter: 使用sync.Mutex并发安全的限流版本unlimited: 不限流
2.2 atomicInt64Limiter 对象
type atomicInt64Limiter struct {prepadding [64]byte // 防止伪共享缓存state int64 // 理论上本次应该执行的时间从0开始累计纳秒postpadding [56]byte // 防止伪共享缓存perRequest time.Duration // 每次请求理论间隔时间maxSlack time.Duration // 最大松弛量默认值是10个请求理论间隔时间V3版本这里是正值clock Clock // 休眠时间
}// 初始化
func newAtomicInt64Based(rate int, opts ...Option) *atomicInt64Limiter {config : buildConfig(opts)perRequest : config.per / time.Duration(rate)l : atomicInt64Limiter{perRequest: perRequest,maxSlack: time.Duration(config.slack) * perRequest,clock: config.clock,}// 原子操作保证对state值修改过程的并发安全atomic.StoreInt64(l.state, 0)return l
}// 默认一些通用的配置
func buildConfig(opts []Option) config {c : config{clock: clock.New(),slack: 10,per: time.Second,}for _, opt : range opts {opt.apply(c)}return c
}atomicInt64Limiter 对象中prepadding和postpadding 字段主要是用于填充 CPU 缓存行防止伪共享缓存的。不了解的朋友可以参考这篇文章 CPU缓存体系对Go程序的影响。
2.3 限流Take 方法
该方法可以阻塞请求保证每次请求的时间间隔。
func (t *atomicInt64Limiter) Take() time.Time {var (newTimeOfNextPermissionIssue int64 // 下一次请求理论执行时间now int64 // 当前时间纳秒)for {now t.clock.Now().UnixNano()timeOfNextPermissionIssue : atomic.LoadInt64(t.state) // 上一次理论执行时间switch {case timeOfNextPermissionIssue 0 || (t.maxSlack 0 now-timeOfNextPermissionIssue int64(t.perRequest)):// 第一个请求 or 无最大松弛时间不控制速度newTimeOfNextPermissionIssue nowcase t.maxSlack 0 now-timeOfNextPermissionIssue int64(t.maxSlack):// 当前请求-上一次请求时间中间的间隔过长大于最大松弛时间下一次理论执行时间定义为now-最大松弛时间即保留了最大松弛时间的长度用来调整newTimeOfNextPermissionIssue now - int64(t.maxSlack)default:// 最正常的情况下一次请求理论执行时间 上一次理论执行时间 理论实践间隔newTimeOfNextPermissionIssue timeOfNextPermissionIssue int64(t.perRequest)}// 更新t.state 执行时间的状态改为newTimeOfNextPermissionIssue如果修改成功跳出死循环保证并发安全if atomic.CompareAndSwapInt64(t.state, timeOfNextPermissionIssue, newTimeOfNextPermissionIssue) {break}}// 计算需要休眠的时间如果是正数那么需要休眠对应的时间sleepDuration : time.Duration(newTimeOfNextPermissionIssue - now)if sleepDuration 0 {t.clock.Sleep(sleepDuration)return time.Unix(0, newTimeOfNextPermissionIssue)}return time.Unix(0, now)
}刚看到这么复杂的逻辑一定心里发怵但是说实话已经比V2版本的清晰多了。下面画个图跟大家讲一下。
3. Case分析
假定限定 100ms 执行一个请求 Case1如果请求2提前到来 结论会等待到理论时间间隔再执行 1. 请求1到来命中switch中第一个条件 case timeOfNextPermissionIssue 0 为(t *atomicInt64Limiter).State now 0sleepDuration0请求1直接执行 2. 请求2在50ms时到来命中switch中default条件计算得到请求2的理论执行时间newTimeOfNextPermissionIssue timeOfNextPermissionIssue int64(t.perRequest) 0100ms 100ms并更新(t *atomicInt64Limiter).State newTimeOfNextPermissionIssus 100ms由于sleepDuration : time.Duration(newTimeOfNextPermissionIssue - now) 100ms - 50ms 50ms系统休眠50ms即阻塞执行最后返回请求2实际执行时间 100ms Case2如果请求2延迟到来请求3提前到来 结论请求2直接执行请求3仅需要等待到200ms时执行使三个请求整体上看间隔时间平均100ms具有一定的松弛度 1. 请求1到来同上(t *atomicInt64Limiter).State now 0sleepDuration0请求1直接执行 2. 请求2在130ms时到来命中switch中default条件计算得到请求2的理论执行时间newTimeOfNextPermissionIssue timeOfNextPermissionIssue int64(t.perRequest) 0100ms 100ms并更新(t *atomicInt64Limiter).State newTimeOfNextPermissionIssus 100ms由于sleepDuration : time.Duration(newTimeOfNextPermissionIssue - now) 100ms - 130ms -30ms休眠时间为负数即请求2直接在到来时刻130ms时执行 3. 请求3在180ms时到来命中switch中default条件计算得到请求2的理论执行时间newTimeOfNextPermissionIssue timeOfNextPermissionIssue int64(t.perRequest) 100ms100ms 200ms并更新(t *atomicInt64Limiter).State newTimeOfNextPermissionIssus 200ms由于sleepDuration : time.Duration(newTimeOfNextPermissionIssue - now) 200ms - 180ms 20ms休眠时间为正数系统休眠20ms即阻塞执行最后返回请求3实际执行时间 200ms这里比较关键请求3与请求2的间隔实际为200ms-130ms70ms并不是理论上每个间隔严格保证100ms仅是保证整体上间隔平均时间为100ms Case3如果请求2与请求1间隔时间非常长请求3及其后续密集到来 这个场景非常极端由于请求2-请求1过长后续都会在请求发送时刻执行导致后续会限流失效因为并没有违背整体上的10QPS的要求。 所以这里引入了一个概念最大松弛时间10倍的请求间隔时间用来保证后续限流的正常执行。 结论请求2直接执行请求3-请求12在请求发送时刻执行请求13休眠开始限流。 1. 请求2在2100ms时到来命中switch中t.maxSlack 0 now-timeOfNextPermissionIssue int64(t.maxSlack)2100ms-01000ms条件给予最大松弛时间的额度newTimeOfNextPermissionIssue now - int64(t.maxSlack) 2100ms - 1000ms 1100ms远大于正常用default计算出来的100ms。由于sleepDuration : time.Duration(newTimeOfNextPermissionIssue - now) 1100ms - 2100ms -1000ms休眠时间为负数即请求2直接在到来时刻2100ms时执行 2. 请求3在2105ms到来命中switch中的default条件newTimeOfNextPermissionIssue timeOfNextPermissionIssue int64(t.perRequest)1100ms100ms 1200ms由于sleepDuration : time.Duration(newTimeOfNextPermissionIssue - now) 1200ms - 2105ms -905ms休眠时间为负数即请求3直接在到来时刻2105ms时执行 3. 请求4-请求12与请求3类似命中switch中的default条件直接执行。假设间隔5ms到来 4. 请求13在2155ms到来命中switch中的default条件newTimeOfNextPermissionIssue timeOfNextPermissionIssue int64(t.perRequest)1200ms10*100ms 2200ms由于sleepDuration : time.Duration(newTimeOfNextPermissionIssue - now) 2200ms - 2155ms 45ms休眠45ms在2200ms时执行 time/rate令牌桶Plus
标准令牌桶方案想象有一个固定大小的桶系统会以恒定速率向桶中放Token桶满则暂时不放。 而用户则从桶中取Token如果有剩余Token就可以一直取。如果没有剩余Token则需要等到系统中被放置了Token才行。
golang.org/x/time/rate 包也做了对应的改进
令牌桶实现方案“改良版的令牌桶”方式实现待处理队列Queue无待处理队列直接处理 or 等待(或者可以理解为待处理队列长度1)定时产生token懒加载方式新请求到来时才根据时间差更新token数目
1. 使用方法
1.1 初始化
limiter : NewLimiter(10, 1);这里有两个参数
第一个参数是r LimitQPS代表每秒可以向Token桶中产生多少token。第二个参数是b int。b代表Token桶的容量大小。通过设置较大的 桶容量 值服务可以在短时间内处理大量的请求从而更好地应对请求峰值。然后当请求峰值过去后你的服务可以以 limit 指定的速率继续处理请求从而避免长期占用过多的系统资源——换句话说这个参数就是“瞬时能处理的上限”
1.2 限流
Wait/WaitN
func (lim *Limiter) Wait(ctx context.Context) (err error)
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)Wait实际上就是WaitN(ctx,1)。该方法可以类比上文中的Take方法。当使用Wait方法消费Token时如果此时桶内Token数组不足(小于N)那么Wait方法将会阻塞一段时间直至Token满足条件。如果充足则直接返回。这里可以看到Wait方法有一个context参数。 我们可以设置context的Deadline或者Timeout来决定此次Wait的最长时间。
Allow/AllowN func (lim *Limiter) Allow() bool func (lim *Limiter) AllowN(now time.Time, n int) bool
跟 wait 方法的区别是返回了bool值如果为true则允许本次操作如果为false则不允许本次操作
Reserve/ReserveN func (lim *Limiter) Reserve() *Reservation func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation
必定会返回结构体 *Reservation通过它的 Delay() 方法判断等待时间
1.3 举个
控制循环的速率保证每秒只能处理1个高峰期最高能同时处理3个。 针对Allow方法可以看到最开始同时处理了三个请求后两个因为没有令牌导致不处理 针对Wait和Reserve方法可以看到每次间隔1s左右即可执行一个。
package mainimport (fmtgolang.org/x/time/ratetime
)func main() {limiter : rate.NewLimiter(1, 3) // 每秒生成1个令牌桶的容量为3// 使用 Allow 方法for i : 0; i 5; i {if limiter.Allow() {fmt.Println(Allow: Allowed)} else {fmt.Println(Allow: Not allowed)}}// 使用 Wait 方法for i : 0; i 5; i {lastTime : time.Now()if err : limiter.Wait(context.Background()); err ! nil {fmt.Println(Error:, err)} else {fmt.Printf(Wait: Need to wait for %v\n, time.Now().Sub(lastTime))lastTime time.Now()fmt.Println(Wait: Allowed)}}// 使用 Reserve 方法for i : 0; i 5; i {reservation : limiter.Reserve()if !reservation.OK() {fmt.Println(Reserve: 桶数量需要大于0)} else {delay : reservation.Delay()fmt.Printf(Reserve: Need to wait for %v\n, delay)time.Sleep(delay)fmt.Println(Reserve: Allowed)}}
}Output
Allow: Allowed
Allow: Allowed
Allow: Allowed
Allow: Not allowed
Allow: Not allowed
Wait: wait 1.000398173s
Wait: Allowed
Wait: wait 999.442232ms
Wait: Allowed
Wait: wait 1.000598372s
Wait: Allowed
Wait: wait 999.837132ms
Wait: Allowed
Wait: wait 999.69229ms
Wait: Allowed
Reserve: wait 999.572574ms
Reserve: Allowed
Reserve: wait 999.376376ms
Reserve: Allowed
Reserve: wait 999.708594ms
Reserve: Allowed
Reserve: wait 999.769212ms
Reserve: Allowed
Reserve: wait 999.736995ms
Reserve: Allowed2. 原理分析
2.1 初始化NewLimiter方法
func NewLimiter(r Limit, b int) *Limiter {return Limiter{limit: r,burst: b,}
}type Limiter struct {mu sync.Mutex // 锁并发安全limit Limit // 每秒产生多少tokenQPSburst int // 桶的最大值tokens float64 // 此刻桶内token数量// last is the last time the limiters tokens field was updatedlast time.Time // 上一次tokens更新的时间// lastEvent is the latest time of a rate-limited event (past or future)lastEvent time.Time
}2.2 限流方法
包中提供的限流算法本质都是一个reserveN
为了方便理解我们重点看Wait方法更方便和上文Take方法作比较
func (lim *Limiter) Wait(ctx context.Context) (err error) {return lim.WaitN(ctx, 1)
}func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {// The test code calls lim.wait with a fake timer generator.// This is the real timer generator.newTimer : func(d time.Duration) (-chan time.Time, func() bool, func()) {timer : time.NewTimer(d)return timer.C, timer.Stop, func() {}}return lim.wait(ctx, n, time.Now(), newTimer)
}// Params
// ctx 用于计算最长超时不超过ctx存活时间
// n 每一个请求需要多少令牌数一般为1
// t 当前时间
// newTimer 休眠定时函数
func (lim *Limiter) wait(ctx context.Context, n int, t time.Time, newTimer func(d time.Duration) (-chan time.Time, func() bool, func())) error {// 赋值上锁lim.mu.Lock()burst : lim.burstlimit : lim.limitlim.mu.Unlock()// 边界校验if n burst limit ! Inf {return fmt.Errorf(rate: Wait(n%d) exceeds limiters burst %d, n, burst)}// 根据ctx判断需要等待的最长时间select {case -ctx.Done():return ctx.Err()default:}waitLimit : InfDurationif deadline, ok : ctx.Deadline(); ok {waitLimit deadline.Sub(t)}// 调用的核心方法返回Reservation对象r : lim.reserveN(t, n, waitLimit)if !r.ok {return fmt.Errorf(rate: Wait(n%d) would exceed context deadline, n)}// 如果需要限流休眠对应的时间delay : r.DelayFrom(t) // 下次理论执行时间 - 当前时间if delay 0 {return nil}ch, stop, advance : newTimer(delay)defer stop()advance() // only has an effect when testingselect {case -ch:return nilcase -ctx.Done():r.Cancel()return ctx.Err()}
}进一步观察reserveN方法它最后会返回Reservation对象
type Reservation struct {ok bool // 是否预占令牌成功,可以理解成是否执行成功lim *Limiter // 额外存储一份外面的限流结构体tokens int // 本次请求需要的token数量timeToAct time.Time // 请求执行的时间// This is the Limit at reservation time, it can change later.limit Limit // QPS每秒往桶中放置的数量Limit是float64的别名
}// Params
// t 当前时间
// n 每一个请求需要多少令牌数一般为1
// maxFutureReserve 最长等待时间
func (lim *Limiter) reserveN(t time.Time, n int, maxFutureReserve time.Duration) Reservation {// 实际处理过程中上锁lim.mu.Lock()defer lim.mu.Unlock()// 异常校验if lim.limit Inf {return Reservation{ok: true,lim: lim,tokens: n,timeToAct: t,}} else if lim.limit 0 {var ok boolif lim.burst n {ok truelim.burst - n}return Reservation{ok: ok,lim: lim,tokens: lim.burst,timeToAct: t,}}// 计算得到从当前时间桶内tokens数具体见下面t, tokens : lim.advance(t)// 减去本次请求需要的token数当前桶内还剩余的tokens数tokens - float64(n)// 如果令牌数为负数说明需要进行等待。// 根据令牌数(token/limit)计算等待时长var waitDuration time.Durationif tokens 0 {waitDuration lim.limit.durationFromTokens(-tokens)}// 边界校验是否预占成功ok : n lim.burst waitDuration maxFutureReserver : Reservation{ok: ok,lim: lim,limit: lim.limit,}if ok {r.tokens nr.timeToAct t.Add(waitDuration) // 需要等待的时间// Update statelim.last tlim.tokens tokenslim.lastEvent r.timeToAct}return r
}advance计算得到 此刻桶内tokens数
func (lim *Limiter) advance(t time.Time) (newT time.Time, newTokens float64) {last : lim.lastif t.Before(last) {last t}// 当前时间 - 上次请求时间 * 每秒产生的tokens数量 桶内原有tokens数 当前桶内tokens数elapsed : t.Sub(last)delta : lim.limit.tokensFromDuration(elapsed)tokens : lim.tokens deltaif burst : float64(lim.burst); tokens burst { // 不超过最大桶内tokens数量tokens burst}return t, tokens
}tokensFromDuration计算一段时间内产生的tokens数量
// 核心计算 tokens 数量的函数秒数*每秒产生的tokens数
func (limit Limit) tokensFromDuration(d time.Duration) float64 {if limit 0 {return 0}return d.Seconds() * float64(limit)
}总结一下:
懒加载每次请求时根据时间差本次减去上次保存的时间戳位置计算 Token 数目判断是否需要等待使用sync.Mutex锁保证并发安全当令牌桶数量请求所需的令牌数时可以获取令牌。当令牌桶数量请求所需的令牌数时需要看等待时间先到还是ctx超时时间哪个短。如果等待时间短可以获取令牌。如果ctx超时时间短则无法获取令牌需要回退令牌。
3. Case分析
分析Wait方法假定限定 100ms 执行一个请求QPS10桶容量为1 Case1如果请求2提前到来 结论会等待到理论时间间隔再执行 1. 请求1到来lim.advance(t)返回此刻tokens数为1消耗一个token不需要等待。 2. 请求2在50ms时到来lim.advance(t)计算逻辑当前时间 - 上次请求时间 * 每秒产生的tokens数量 桶内原有tokens数 50ms-0/ 1000 * 100 0.5 即当前时刻tokens0.5tokens - float64(n) 消耗掉本次token需要的数目tokens -0.5waitDuration lim.limit.durationFromTokens(-tokens)0.5 / 10 s 50ms通过r.DelayFrom(t)方法得到需要等待50ms最后返回请求2实际执行时间 100ms Case2如果请求2延迟到来请求3提前到来 结论请求2直接执行请求3仍需要等待与请求2间隔100ms再执行——由于没有松弛度的概念跟Uber方法的区别 1. 请求1直接执行 2. 请求2在50ms时到来lim.advance(t)计算逻辑当前时间 - 上次请求时间 * 每秒产生的tokens数量 桶内原有tokens数 130ms-0/ 1000 * 100 1.3 即当前时刻tokens1.3tokens - float64(n) 消耗掉本次token需要的数目tokens 0.3不需要等待直接执行。 3. 请求3在180ms时到来lim.advance(t)计算逻辑当前时间 - 上次请求时间 * 每秒产生的tokens数量 桶内原有tokens数 180ms-130ms/ 1000 * 100 0.5 即当前时刻tokens0.5tokens - float64(n) 消耗掉本次token需要的数目tokens -0.5waitDuration lim.limit.durationFromTokens(-tokens)0.5 / 10 s 50ms通过r.DelayFrom(t)方法得到需要等待50ms最后返回请求3实际执行时间 230ms Case3桶容量3短时间内来了5个请求每隔5ms 结论请求1-3直接执行请求4-5需要等待——令牌桶的优越性 1. 请求1在0ms到来lim.advance(t)计算逻辑当前时间 - 上次请求时间 * 每秒产生的tokens数量 桶内原有tokens数 0-0/ 1000 * 103 3 即当前时刻tokens3tokens - float64(n) 消耗掉本次token需要的数目tokens 2不需要等待直接执行。 2. 请求2在5ms时到来lim.advance(t)计算逻辑当前时间 - 上次请求时间 * 每秒产生的tokens数量 桶内原有tokens数 5ms-0/ 1000 * 102 2.05 即当前时刻tokens2.05tokens - float64(n) 消耗掉本次token需要的数目tokens 1.05不需要等待直接执行。 3. 请求3在10ms时到来lim.advance(t)计算逻辑当前时间 - 上次请求时间 * 每秒产生的tokens数量 桶内原有tokens数 10ms-5ms/ 1000 * 101.05 1.1 即当前时刻tokens1.1tokens - float64(n) 消耗掉本次token需要的数目tokens 0.1不需要等待直接执行。 4. 请求4在15ms时到来lim.advance(t)计算逻辑当前时间 - 上次请求时间 * 每秒产生的tokens数量 桶内原有tokens数 15ms-10ms/ 1000 * 100.1 0.15 即当前时刻tokens0.15tokens - float64(n) 消耗掉本次token需要的数目tokens -0.85waitDuration lim.limit.durationFromTokens(-tokens)0.85 / 10 *1000ms 85ms通过r.DelayFrom(t)方法得到需要等待85ms最后返回请求3实际执行时间 100ms
对比一下
共同点
无待处理队列直接处理 or 等待(或者可以理解为待处理队列长度1)可以支持限制请求的固定间隔并发安全
不同点
Ubertime/rate限流结构体较为简单 (设计了防止伪共享缓存的字段)较为复杂计算方式时间转化为int64仅涉及加减float64形式计算涉及乘法成本较高计算精确度较高一般高处理突发流量不支持支持通过桶最大容量松弛度支持不支持灵活度可以设置是否采用最大松弛度可以设置桶容量三种限流判定方法 wait, allow, reserve
用哪个好呢
用哪个好从来没有标准答案只有哪种方法最适合哪种场景。
请求QPS极高要求按照时间平均 由于令牌桶的时间计算方式是根据时间差*QPS获取桶的数目这种float64带乘法的计算过程由于是float64形式的计算高QPS的情况下例如1000000会存在一定的时间误差。因此这种情况更推荐采用Uber的包采用int64的计算方式且没有乘法操作相比会有更高一些的精度。 当然通过本人实测如果QPS1000以内我感觉这两种方法没有本质的差别需要具备一定量的突发流量处理能力 由于漏桶算法和令牌桶算法本质的区别令牌桶算法能够很好地处理突发流量因此这种场景推荐使用time/rate可以通过设置比较大的brust桶最大值来保证处理突发流量的能力。严格的限制请求的固定间隔 都可以其中Uber方法时间把控更为严格可以通过Uber方法中ratelimit.WithoutSlack参数取消松弛量的影响。 limiter : ratelimit.New(100, ratelimit.WithoutSlack)
⭐️其他限流
分布式限流例如Go Redis分布式限流库 https://github.com/go-redis/redis_rate
也有很多限流的包还没来得及仔细研究后面会做一个更加详细的对比