沈阳哪家做网站好,忻州网站建设哪家好,做外贸收费的网站,邢台做外贸网站目录
一、现象
编辑
二、工具
1、AtomicInteger,AtomicLong 原子类操作
2、RedisLua
3、Google Guava的RateLimiter
1#xff09; 使用
2#xff09; Demo
3#xff09; 优化demo
4、阿里开源的Sentinel
三、算法
1、计数…目录
一、现象
编辑
二、工具
1、AtomicInteger,AtomicLong 原子类操作
2、RedisLua
3、Google Guava的RateLimiter
1 使用
2 Demo
3 优化demo
4、阿里开源的Sentinel
三、算法
1、计数限流
1原理
2实现
3优缺点
2、固定窗口限流
1原理
2实现
3优缺点
3、滑动窗口限流
1原理
2实现
3优缺点
4、漏桶算法
1原理
2实现
3优缺点
5、令牌桶算法
1原理
2实现
3优缺点 一、现象
为什么要限流用于在高并发环境中保护系统资源,避免因过多请求导致系统崩溃
线上服务运行中偶尔会遇见如Api服务瞬时请求流量过高服务被压垮数据处理服务处理消息队列数据消费速度过快导致处理性能下降甚至崩溃。
限流前数据推送给服务处理时速度过快服务未限流导致CPU突然暴涨达到临界值处理性能底下 限流后消费速度平稳CPU平稳未超限内存上涨也未超限 可见限流是非常重要的
二、工具
1、AtomicInteger,AtomicLong 原子类操作
优点
性能高AtomicInteger 和 AtomicLong 基于 CASCompare-And-Swap操作能够实现高效的并发访问适用于高并发场景。轻量级这些类作为 Java 标准库的一部分无需引入额外的依赖使用简单方便。内存操作AtomicInteger 和 AtomicLong 的操作都在内存中完成避免了网络开销。
缺点
单机限流AtomicInteger 和 AtomicLong 主要适用于单机环境下的限流对于分布式系统或微服务架构来说可能需要额外的机制来实现全局限流。功能相对单一AtomicInteger 和 AtomicLong 的功能较为单一可能无法满足复杂的限流需求。CAS 的潜在问题在高并发场景下CAS 操作可能导致自旋等待增加 CPU 开销。此外如果多个线程同时修改同一个值可能导致性能下降。
2、RedisLua
优点
灵活性高RedisLua 的限流方案允许根据业务需求灵活定制限流规则如限制特定来源 IP 或 API 的访问频率。分布式支持Redis 作为分布式缓存使得限流策略可以跨多个实例或节点生效非常适合微服务架构或分布式系统。原子性操作Lua 脚本在 Redis 中执行时具有原子性避免了并发访问时可能出现的数据不一致问题。
缺点
网络开销虽然 RedisLua 减少了部分网络请求但仍然存在网络 IO 开销尤其在高并发场景下可能会成为性能瓶颈。依赖 Redis该方案高度依赖 Redis 的稳定性和可用性一旦 Redis 出现故障限流策略可能失效。学习成本Redis 和 Lua 的使用需要一定的学习成本尤其是对于那些不熟悉这两个技术的开发者来说。
3、Google Guava的RateLimiter
优点
基于令牌桶算法RateLimiter采用了令牌桶算法进行限流该算法允许突发流量的处理当请求空闲时可以预先生成一部分令牌从而在新请求到达时无需等待。实现简单RateLimiter的使用相对简单可以方便地集成到现有系统中。
缺点
功能相对单一RateLimiter主要关注限流功能对于熔断降级等复杂场景的处理能力相对较弱。缺乏实时监测和报警机制RateLimiter没有提供实时的系统监测和报警功能对于系统问题的发现和解决可能不够及时。
1 使用
引入pom
dependencygroupIdcom.google.guava/groupIdartifactIdguava/artifactIdversion27.1-jre/version
/dependency
常用API介绍
RateLimiter.create(permitsPerSecond)设置当前接口的QPSrateLimiter.tryAcquire(timeout, timeUnit)尝试在一定时间内获取令牌超时则退出rateLimiter.acquire():尝试获取对应数量令牌默认一个令牌如果没有可以用的则该方法将一直阻塞线程直到RateLimiter允许获得新的许可证
2 Demo
/*** author : YY-帆S* since : 2024/2/26*/
Slf4j
Service
ConditionalOnProperty(value xxxxx, havingValue true)
public class ForwardAndCustomDetailConsumer {// 创建RateLimiter这里设置每秒最多处理300条消息private final RateLimiter rateLimiter RateLimiter.create(300.0);KafkaListener(topics #{${xxxxx}.split(,)}, groupId ${xxxxx}, containerFactory ActCommonKafkaConfig)public void consumer(ConsumerRecord?, ? record) {MDCHelper.fillMDC();String value null;try {// 通过RateLimiter进行限流rateLimiter.acquire();} catch (Exception e) {xxxx;}}
}3 优化demo
第二点的demo配置不好变动策略修改时需要修改代码重新编译服务
因此优化后通过依赖注入的方式创建RateLimiter实例
1. 创建RateLimit的配置类
/*** author YY-帆S* Date 2024/3/8 11:42*/
Configuration
public class RateLimiterConfig {Resourceprivate Act71225Properties act71225Properties;Bean(name act72183RateLimiter)public RateLimiter act72183OfflineRateLimiter() {return RateLimiter.create(act71225Properties.getOfflineDataRateLimit());}
}
2.修改RateLimiter实现方式
/*** author : YY-帆S* since : 2024/2/26*/
Slf4j
Service
ConditionalOnProperty(value xxxxx, havingValue true)
public class ForwardAndCustomDetailConsumer {// 创建RateLimiter这里设置每秒最多处理300条消息Resource(name act72183RateLimiter)private RateLimiter rateLimiter; KafkaListener(topics #{${xxxxx}.split(,)}, groupId ${xxxxx}, containerFactory ActCommonKafkaConfig)public void consumer(ConsumerRecord?, ? record) {MDCHelper.fillMDC();String value null;try {// 通过RateLimiter进行限流rateLimiter.acquire();} catch (Exception e) {xxxx;}}
}4、阿里开源的Sentinel
文档introduction | Sentinel
优点
功能丰富Sentinel不仅支持限流功能还提供了熔断降级、系统保护、热点参数限流等多种应用场景的支持。细粒度的控制Sentinel可以实现细粒度的控制满足复杂场景下对流量和资源的精确管理需求。强大的实时监测和报警机制Sentinel提供了实时的系统监测和报警功能可以及时发现并解决系统问题。易于扩展和集成Sentinel为开发者提供了简单易用的扩展接口支持多种开发框架的集成方便开发者根据需求进行定制和扩展。
缺点
学习成本可能较高由于Sentinel的功能丰富对于初学者来说可能需要一定的学习成本来熟悉和掌握。可能增加系统复杂度引入Sentinel可能会增加系统的复杂度特别是在大型项目中需要仔细规划和管理相关的配置和规则。
三、算法
1、计数限流
1原理
系统同时只能处理N个请求保存一个计数器开始处理时计数器1处理完成后计数器-1
每次请求查看计数器的值超过阈值就拒绝
2实现
实现类
如下以AtomicInteger 为工具实现计数限流即同时只能处理N个数据
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.atomic.AtomicInteger;/*** author YY-帆S* Date 2024/3/25 21:07*/
Slf4j
public class AtomicCounterRateLimiter {private final AtomicInteger counter;private final int limit;public AtomicCounterRateLimiter(int limit) {this.counter new AtomicInteger(0);this.limit limit;}/*** 限流** return*/public synchronized boolean tryAcquire() {//获取计数器不超过限制则则返回true并追加数值if (counter.get() limit) {counter.incrementAndGet();return true;}return false;}/*** 释放** return*/public synchronized boolean tryRelease() {//获取计数器不超过限制则则返回true并追加数值if (counter.get() 0) {counter.decrementAndGet();return true;}return false;}
} 2. 测试类
import org.junit.Test;/*** author YY-帆S* Date 2024/3/25 21:27*/
public class AtomicCounterRateLimiterTest {Testpublic void testLimit() {//创建一个限流器允许同时只能处理10个请求AtomicCounterRateLimiter atomicCounterRateLimiter new AtomicCounterRateLimiter(10);// 模拟15个请求每个请求间隔100毫秒for (int i 0; i 15; i) {new Thread(() - {// 尝试获取许可if (atomicCounterRateLimiter.tryAcquire()) {System.out.println(Thread.currentThread().getName() 获得了许可执行操作。);// 模拟请求之间的间隔try {Thread.sleep((long) (Math.random() * 100));} catch (InterruptedException e) {e.printStackTrace();}atomicCounterRateLimiter.tryRelease();} else {System.out.println(Thread.currentThread().getName() 请求被拒绝。);}}).start();}}}执行结果 Thread-0 获得了许可执行操作。 Thread-1 获得了许可执行操作。 Thread-4 获得了许可执行操作。 Thread-2 获得了许可执行操作。 Thread-3 获得了许可执行操作。 Thread-5 获得了许可执行操作。 Thread-7 获得了许可执行操作。 Thread-6 获得了许可执行操作。 Thread-8 获得了许可执行操作。 Thread-9 获得了许可执行操作。 Thread-10 请求被拒绝。 Thread-11 请求被拒绝。 Thread-13 请求被拒绝。 Thread-14 请求被拒绝。 Thread-12 请求被拒绝。 3优缺点
优点简单代码好实现单机可用Atomic 等原子类、分布式集群可以用Redis
缺点扛不住突发性的流量假设阈值1w即服务器可以同时处理1w个请求当1w个请求在1s内同时涌进来时服务有可能扛不住
2、固定窗口限流
1原理
固定窗口限流是在计数限流的概念上加上了时间窗口的概念计数器每过一个时间窗口就重置为0限流规则如在N秒内只允许处理M个请求
2实现
如下以RedisLua为工具实现固定时间窗口限流即 N秒内同时只能处理M个数据
import com.bigo.web.yummy.center.dao.redis.RedisDao;
import com.bigo.web.yummy.inner.Application;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.test.context.junit4.SpringRunner;import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;/*** author YY-帆S* Date 2024/3/25 18:30*/
Slf4j
RunWith(SpringRunner.class)
SpringBootTest(classes Application.class)
public class RedisFixedTimeRateLimit {ResourceRedisDao redisDao;private boolean tryAcquire() {String key limit:test;//同时只能处理10个请求int limitCount 10;//1s内int second 1;String luaCode local key KEYS[1]\n local limit tonumber(ARGV[1])\n local expire_time ARGV[2]\n local is_exists redis.call(\EXISTS\, key)\n if is_exists 1 then\n if redis.call(\INCR\, key) limit then\n return false\n else\n return true\n end\n else\n redis.call(\INCRBY\, key, 1)\n redis.call(\EXPIRE\, key, expire_time)\n return true\n end;ListString keys new ArrayList();keys.add(key);ListString values new ArrayList();values.add(String.valueOf(limitCount));values.add(String.valueOf(second));RedisTemplateString, String redisTemplate redisDao.getLongCodis().getRedisTemplate();DefaultRedisScriptBoolean redisScript new DefaultRedisScript(luaCode, Boolean.class);return redisTemplate.execute(redisScript, keys, String.valueOf(limitCount), String.valueOf(second));}Testpublic void testLimit() {// 模拟15个请求for (int i 0; i 15; i) {new Thread(() - {// 尝试获取许可if (tryAcquire()) {System.out.println(Thread.currentThread().getName() 获得了许可执行操作。);} else {System.out.println(Thread.currentThread().getName() 请求被拒绝。);}}).start();}}
}3优缺点
优点简单代码好实现单机可用Atomic 等原子类、分布式集群可以用Redis
缺点限流机制不够平滑如每秒允许请求100个请求在第一毫秒内就请求了100个请求此后都开始限流导致剩余窗口内的所有请求都会被拒绝 又如在最后1个毫秒内请求了100个请求下一个毫秒开始新的时间窗口计数清0此时又涌入了100个请求虽说固定时间窗口内没有超过阈值但是全局看来这两个毫秒内就涌入了200个请求对于限流100的概念是不可接受的 3、滑动窗口限流
1原理
在固定时间窗口的基础上进行优化对大的时间窗口进行划分每个小窗口对应大窗口中的不同时间点每个窗口独立计数。随时间的变化小窗口随之平移并且重置/舍弃过期的小窗口每个小窗口的计数器相加不超过大窗口的限流limit即限流阈值之内。 2实现
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;/*** author YY-帆S* Date 2024/3/26 15:04*/
public class SlidingWindowRateLimiter {private int windowSize; //时间窗口大小, Unit: sprivate int slotNum; //用于统计的子窗口数量默认为10private int slotTime; //子窗口的时间长度, Unit: msprivate int limit; //限流阈值/*** 存放子窗口统计结果的数组* note: counters[0]记为数组左边, counters[size-1]记为数组右边*/private AtomicInteger[] counters;private long lastTime;//初始化public SlidingWindowRateLimiter(int windowSize, int limit, int slotNum) {this.windowSize windowSize;this.limit limit;this.slotNum slotNum;// 计算子窗口的时间长度: 时间窗口 / 子窗口数量this.slotTime windowSize * 1000 / slotNum;this.lastTime System.currentTimeMillis();this.counters new AtomicInteger[slotNum];resetCounters();}public SlidingWindowRateLimiter(int windowSize, int limit) {this(windowSize, limit, 10);}private void resetCounters() {for (int i 0; i counters.length; i) {counters[i] new AtomicInteger(0); // 每个数组元素都是一个新的AtomicInteger实例}}/*** 限流请求* return*/public synchronized boolean tryAcquire() {long currentTime System.currentTimeMillis();// 计算滑动数, 子窗口统计时所对应的时间范围为左闭右开区间, 即[a,b)int slideNum (int) Math.floor((currentTime - lastTime) / slotTime);// 滑动窗口slideWindow(slideNum);// 统计滑动后的数组之和int sum Arrays.stream(counters).mapToInt(AtomicInteger::get).sum();// 以达到当前时间窗口的请求阈值, 故被限流直接返回falseif (sum limit) {return false;} else { // 未达到限流, 故返回truecounters[slotNum - 1].incrementAndGet();return true;}}/*** 将数组元素全部向左移动num个位置** param num*/private void slideWindow(int num) {if (num 0) {return;}// 数组中所有元素都会被移出, 故直接全部清零if (num slotNum) {resetCounters();} else {// 对于a[0]~a[num-1]而言, 向左移动num个位置后, 则直接被移出了// 故从a[num]开始移动即可for (int index num; index slotNum; index) {// 计算a[index]元素向左移动num个位置后的新位置索引int newIndex index - num;counters[newIndex] counters[index];counters[index].getAndSet(0);}}// 更新时间lastTime lastTime num * slotTime;}public static void main(String[] args) throws InterruptedException {//例子5s内只能有50个请求SlidingWindowRateLimiter rateLimiter new SlidingWindowRateLimiter(5, 50);int allNum 3; // 请求总数int passNum 0; // 通过数int blockNum 0; // 被限流数//模拟连续请求for (int i 0; i allNum; i) {if (rateLimiter.tryAcquire()) {passNum;} else {blockNum;}}System.out.println(请求总数: allNum , 通过数: passNum , 被限流数: blockNum);// 延时以准备下一次测试Thread.sleep(5000);allNum 100;passNum 0;blockNum 0;//模拟连续请求for (int i 0; i allNum; i) {if (rateLimiter.tryAcquire()) {passNum;} else {blockNum;}}System.out.println(请求总数: allNum , 通过数: passNum , 被限流数: blockNum);}
}执行结果 请求总数: 3, 通过数: 3, 被限流数: 0 请求总数: 100, 通过数: 50, 被限流数: 50 3优缺点
优点避免了固定窗口算法可能出现的窗口切换时的流量峰值使得流量控制更为平滑缺点
对时间区间精度要求越高算法所需的空间容量越大需要更多的计算和存储资源还是存在限流不够平滑的问题。例如限流是每秒100个在第一毫秒发送了100个请求达到限流剩余窗口时间的请求都将会被拒绝
4、漏桶算法
1原理
该算法使用“桶”来比喻不断有水请求进入桶内并以固定速率进行处理模拟桶中的“泄漏”当加水速度漏水速度时直到某一个时刻存储桶己满新的请求将被丢弃直到有可用空间。 2实现
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.atomic.AtomicInteger;/*** author YY-帆S* Date 2024/3/26 22:46*/
Slf4j
public class LeakyBucketRateLimiter {private AtomicInteger bucketLevel; // 当前桶中的请求数量private int capacity; // 桶的容量private long leakRate; // 漏水速率单位请求/秒private long lastLeakTime; // 上一次漏水的时间戳public LeakyBucketRateLimiter(int capacity, long leakRate) {this.capacity capacity;this.leakRate leakRate;this.bucketLevel new AtomicInteger(0);this.lastLeakTime System.currentTimeMillis();}public synchronized boolean tryAcquire() {// 获取当前时间long currentTime System.currentTimeMillis();//流出时间long elapsedTime currentTime - lastLeakTime;//计算流出的水量 当前时间 - 上次时间 * 出水速率long leaked (long) (elapsedTime * (leakRate / 1000.0));//只有有流出水才更新时间戳不然会漏不出水if (leaked 0) {//计算桶内水量 桶内当前水量 - 流出的水量int newLevel Math.max(0, bucketLevel.get() - (int) leaked);bucketLevel.set(newLevel);//更新上次漏水时间戳lastLeakTime currentTime;}// 尝试将请求加入桶中if (bucketLevel.get() capacity) {bucketLevel.incrementAndGet();return true;} else {return false;}}public static void main(String[] args) throws InterruptedException {LeakyBucketRateLimiter limiter new LeakyBucketRateLimiter(1, 1); // 容量为1漏水速率为1请求/秒// 模拟发送请求for (int i 0; i 20; i) {new Thread(() - {if (limiter.tryAcquire()) {log.info(Thread.currentThread().getName() 获得了许可执行操作。);} else {log.info(Thread.currentThread().getName() 请求被拒绝。);}}).start();//模拟执行时间Thread.sleep(500);}}}例子是1s/1个请求执行结果一致 23:05:15.200 INFO - Thread-0 获得了许可执行操作。 23:05:15.705 INFO - Thread-1 请求被拒绝。 23:05:16.215 INFO - Thread-2 获得了许可执行操作。 23:05:16.724 INFO - Thread-3 请求被拒绝。 23:05:17.233 INFO - Thread-4 获得了许可执行操作。 23:05:17.741 INFO - Thread-5 请求被拒绝。 23:05:18.252 INFO - Thread-6 获得了许可执行操作。 23:05:18.762 INFO - Thread-7 请求被拒绝。 23:05:19.273 INFO - Thread-8 获得了许可执行操作。 23:05:19.785 INFO - Thread-9 请求被拒绝。 23:05:20.299 INFO - Thread-10 获得了许可执行操作。 23:05:20.813 INFO - Thread-11 请求被拒绝。 23:05:21.327 INFO - Thread-12 获得了许可执行操作。 23:05:21.840 INFO - Thread-13 请求被拒绝。 23:05:22.353 INFO - Thread-14 获得了许可执行操作。 23:05:22.867 INFO - Thread-15 请求被拒绝。 23:05:23.382 INFO - Thread-16 获得了许可执行操作。 23:05:23.896 INFO - Thread-17 请求被拒绝。 23:05:24.411 INFO - Thread-18 获得了许可执行操作。 23:05:24.925 INFO - Thread-19 请求被拒绝。 3优缺点
优点
平滑流量输出漏桶算法可以有效地平滑网络上的突发流量为网络提供一个稳定的流量输出。通过将流量注入到漏桶中并根据桶的漏水速率来控制流量的输出可以确保流量的平稳性。防止流量冲击由于漏桶具有缓存功能当流量突发超过设定阈值时超出的部分可以被暂存在桶中/直接丢弃从而避免了流量冲击对系统造成的压力。
缺点
灵活性相对较差漏桶算法的速率是恒定的不能根据实际需要动态调整。这可能导致在某些情况下系统无法充分利用网络资源造成一定的资源浪费。无法应对突发流量由于漏桶的出口速度是固定的在面对突发流量时即使是在流量较小的情况下仍然是以固定速率处理也无法以更快的速度处理请求
5、令牌桶算法
1原理 与漏桶算法相反系统以固定的速率往桶里放入令牌称为令牌桶如果有请求需要这个令牌这可以从桶里拿一个拿到了令牌即允许放行直到令牌被拿完即令牌不足则请求需等待或被丢弃。 令牌的数量与时间和发放速率强相关随着时间流逝系统会不断往桶里放入更多的令牌如果放令牌的速度比拿令牌的速度快则令牌桶最终会被放满。 2实现
推荐用Google-Guava的RateLimiter比较成熟如上第二点工具有例子但是Google-Guava的RateLimiter是基于单机的限流工具。
如下基于Redislua实现一个令牌桶限流算法可以实现分布式集群限流的目的。
实现代码
lua限流脚本和初始化脚本
-- KEYS[1]: 令牌桶的key格式为 rate_limiter:{bucket_key}
-- ARGV[1]: 请求的令牌数量
-- ARGV[2]: 当前时间戳可选用于避免Redis服务器与客户端时间不同步的问题local key KEYS[1] --令牌桶的key
local tokens_requested tonumber(ARGV[1]) --请求的令牌数量
local now tonumber(ARGV[2]) or redis.call(TIME)[1] --当前时间戳可选用于避免Redis服务器与客户端时间不同步的问题-- 获取当前桶中的令牌数和上次刷新时间
local ratelimit_info redis.call(HMGET, key, last_refreshed, current_permits, capacity, rate)
local last_refreshed ratelimit_info[1]
local current_permits tonumber(ratelimit_info[2])
local capacity tonumber(ratelimit_info[3]) --桶的容量
local fill_rate tonumber(ratelimit_info[4]) --每秒添加的令牌数量即速率-- 初始化当前桶容量
local local_curr_permits capacity;-- 如果上次刷新时间不存在则初始化为当前时间
if (last_refreshed nil or type(last_refreshed) boolean) thenlast_refreshed nowredis.call(HMSET, key, last_refreshed, now);
else--计算该时间间隔内加入了多少令牌local reverse_permits (now - last_refreshed) * fill_rateif (reverse_permits 0) thenredis.call(HMSET, key, last_refreshed, now);end--计算当前总共有多少令牌期望值local expect_curr_permits reverse_permits current_permits-- 与桶最大容量做对比local_curr_permits math.min(expect_curr_permits, capacity);
endif (local_curr_permits tokens_requested) thenlocal_curr_permits local_curr_permits - tokens_requestedredis.call(HMSET, key, current_permits, local_curr_permits);return 1 -- 表示成功获取令牌
elseredis.call(HMSET, key, current_permits, local_curr_permits);return 0 -- 表示获取令牌失败
end------ 初始化lua ------
local result 1
redis.pcall(HMSET, KEYS[1],last_refreshed, ARGV[1],current_permits, ARGV[2],capacity, ARGV[3],rate, ARGV[4])
return resultRedisLua限流核心逻辑
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;import java.util.ArrayList;
import java.util.List;/*** author YY-帆S* Date 2024/3/27 11:17*/
Slf4j
public class TokenBucketRateLimiter {//Redis缓存key前缀private static final String KEY_PREFIX TokenRateLimiter:;/*** 初始化lua脚本*/private static final String rateLimitInitLuaCode local result true\n redis.pcall(\HMSET\, KEYS[1],\n \capacity\, ARGV[1],\n \rate\, ARGV[2])\n return true;/*** 令牌桶锁lua脚本*/private static final String rateLimitLuaCode -- KEYS[1]: 令牌桶的key格式为 \rate_limiter:{bucket_key}\\n -- ARGV[1]: 请求的令牌数量\n -- ARGV[2]: 当前时间戳可选用于避免Redis服务器与客户端时间不同步的问题\n \n local key KEYS[1] --令牌桶的key\n local tokens_requested tonumber(ARGV[1]) --请求的令牌数量\n local now tonumber(ARGV[2]) or redis.call(TIME)[1] --当前时间戳可选用于避免Redis服务器与客户端时间不同步的问题\n \n -- 获取当前桶中的令牌数和上次刷新时间\n local ratelimit_info redis.call(\HMGET\, key, \last_refreshed\, \current_permits\, \capacity\, \rate\)\n local last_refreshed ratelimit_info[1]\n local current_permits tonumber(ratelimit_info[2])\n local capacity tonumber(ratelimit_info[3]) --桶的容量\n local fill_rate tonumber(ratelimit_info[4]) --每秒添加的令牌数量即速率\n \n -- 初始化当前桶容量\n local local_curr_permits capacity;\n \n -- 如果上次刷新时间不存在则初始化为当前时间\n if (last_refreshed nil or type(last_refreshed) boolean) then\n last_refreshed now\n redis.call(\HMSET\, key, \last_refreshed\, now);\n else\n --计算该时间间隔内加入了多少令牌\n local reverse_permits (now - last_refreshed)* fill_rate\n if (reverse_permits 0) then\n redis.call(\HMSET\, key, \last_refreshed\, now);\n end\n \n --计算当前总共有多少令牌期望值\n local expect_curr_permits reverse_permits current_permits\n -- 与桶最大容量做对比\n local_curr_permits math.min(expect_curr_permits, capacity);\n end\n \n if(local_curr_permits tokens_requested) then\n local_curr_permits local_curr_permits - tokens_requested\n redis.call(\HMSET\, key, \current_permits\, local_curr_permits);\n return true -- 表示成功获取令牌\n else\n redis.call(\HMSET\, key, \current_permits\, local_curr_permits);\n return false -- 表示获取令牌失败\n end\n;RedisTemplate redisTemplate;//初始化构造器public TokenBucketRateLimiter(RedisTemplate redisTemplate, int permitsPerSecond, String bucketKey) {//初始化构造信息ListString keys new ArrayList();keys.add(getRateLimiterKey(bucketKey));this.redisTemplate redisTemplate;DefaultRedisScriptBoolean redisScript new DefaultRedisScript(rateLimitInitLuaCode, Boolean.class);this.redisTemplate.execute(redisScript, keys, String.valueOf(permitsPerSecond), String.valueOf(permitsPerSecond));}//构造令牌桶缓存keypublic String getRateLimiterKey(String bucketKey) {return KEY_PREFIX bucketKey;}//默认一次拿一个令牌public boolean tryAcquire(String bucketKey) {return tryAcquire(1, bucketKey);}/*** 核心逻辑获取令牌桶数据* param request* param bucketKey* return*/public boolean tryAcquire(int request, String bucketKey) {DefaultRedisScriptBoolean redisScript new DefaultRedisScript(rateLimitLuaCode, Boolean.class);ListString keys new ArrayList();keys.add(getRateLimiterKey(bucketKey));return (boolean) this.redisTemplate.execute(redisScript, keys, String.valueOf(request), String.valueOf(System.currentTimeMillis() / 1000));}
}2. 测试代码
import com.bigo.web.live.TimeUtil;
import com.bigo.web.yummy.center.dao.redis.RedisDao;
import com.bigo.web.yummy.inner.Application;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import javax.annotation.Resource;/*** author YY-帆S* Date 2024/3/27 13:25*/
RunWith(SpringRunner.class)
SpringBootTest(classes Application.class)
Slf4j
public class TokenBucketRateLimiterTest {ResourceRedisDao redisDao;Testpublic void testTokenBucketRateLimiter() throws InterruptedException {//令牌桶String bucketKey yyfsTest;// 每秒新增3个桶TokenBucketRateLimiter limiter new TokenBucketRateLimiter(redisDao.getLongCodis().getRedisTemplate(), 3, bucketKey);// 模拟发送请求int allNum 4; // 请求总数//模拟连续请求for (int i 0; i allNum; i) {if (limiter.tryAcquire(bucketKey)) {System.out.println(TimeUtil.parseTimestampToString(yyyy-MM-dd HH:mm:ss, System.currentTimeMillis()) 获得了许可执行操作。);} else {System.out.println(TimeUtil.parseTimestampToString(yyyy-MM-dd HH:mm:ss, System.currentTimeMillis()) 请求被拒绝。);}}// 延时以准备下一次测试Thread.sleep(5000);allNum 20;for (int i 0; i allNum; i) {if (limiter.tryAcquire(bucketKey)) {System.out.println(TimeUtil.parseTimestampToString(yyyy-MM-dd HH:mm:ss, System.currentTimeMillis()) 获得了许可执行操作。);} else {System.out.println(TimeUtil.parseTimestampToString(yyyy-MM-dd HH:mm:ss, System.currentTimeMillis()) 请求被拒绝。);}Thread.sleep(200);}}
}初始化结果 执行过程结果 执行输出结果 3优缺点
优点
应对突发流量令牌桶算法允许流量突发当桶满时系统能以最大的速度处理请求灵活性算法允许根据实际需求调整令牌生成速率和令牌桶大小等参数限制平均速度长期运行的服务数据处理速度最终会动态平衡限制在预定义的平均速率即生成令牌的速率
缺点
导致过载的可能性要控制令牌的产生速度如果令牌产生的速度过快可能会导致大量的突发流量这可能会使网络或服务过载。内存资源限制令牌桶需要一定的存储空间来保存令牌可能会导致内存资源的浪费。且对于特别频繁的请求令牌桶算法可能会占用较多的计算资源增加系统负担。实现稍复杂相比于计数器等其他限流算法令牌桶算法的实现稍微复杂一些 参考文档
https://zhuanlan.zhihu.com/p/494782784
https://blog.csdn.net/weixin_45583158/article/details/135664278