石柱网站制作,网站搭建徐州百度网络搭建,湛江自助建站模板,eclipse视频网站开发1. 基于JVM的阻塞队列的局限 JVM内存限制问题#xff0c;大量订单出现时#xff0c;可能会超过JVM阻塞队列上限#xff1b;阻塞队列并不能持久化#xff0c;因为内存不能持久化#xff0c;出现异常或者宕机之类的故障时#xff0c;出现数据丢失#xff1b; 所以引出消息…1. 基于JVM的阻塞队列的局限 JVM内存限制问题大量订单出现时可能会超过JVM阻塞队列上限阻塞队列并不能持久化因为内存不能持久化出现异常或者宕机之类的故障时出现数据丢失 所以引出消息队列的概念 消息队列的两个优点 消息队列在JVM以外的独立服务不受JVM的内存限制消息队列不仅仅做数据存储确保数据安全会做数据的持久化并且消费者取数据要做消息确认如果没有确认那么消息会在队列中依旧存在下一次会再投递给消费者让它继续处理直到确认为止确保消息至少消费一次 2. 基于List结构模拟消息队列 假设从队列里取到消息取到还未处理就发生了异常这是消息就无法处理了因为pop相当于remove发送消息一旦被消费者拿走之后别的消费者就无法获得了无法解决一条消息多个消费者使用 3. 基于PubSub的消息队列不建议使用 PSUBSCRIBE 订阅的格式匹配的三种规则 PubSub消息队列在传递消息时并不会将消息持久化到硬盘上而是将消息存储在内存中当服务重启或者发生故障时可能会导致消息丢失。 4. 基于Stream的消息队列 $:返回最新的消息前提是该条信息并没有被消费者读过否则就是返回nil 阻塞等待读取最新的消息阻塞时间设置为0表示永久等待直到有新等待消息 重点这种读取方式存在着弊端当指定其实ID为$时代表读取最新等待消息此时处理一条消息的过程中又来了一条以上的信息到队列则下次获取也只能获取最新的一条可能就会出现漏读消息的问题 当消费者读取一次之后再生产k4、k5此时消费者再次阻塞读取最新的消息再生产消息k6此时消费者只能获取消息k6出现了消息漏读
5. 基于Stream的消息队列问题优化 示例 消息确认k1…k5都在pending-list中等待确认、 查看pending-list中所有未确认的元素 从pendin-list确认未确认的消息此时消息的起始ID为0 所以可以得出处理消息的大致流程先利用的方式去获取所有未消费的消息然后确认如果出现异常在Java中catch采用0的方法去获取pending-list的消息异常消息处理完毕再确认pending-list清空之后再使用的方式继续获取未消费的消息直到阻塞时间过后返回为nil 6. 基于Redis的Stream结构实现异步秒杀 创建消息队列 修改lua脚本认定可以抢购直接发送消息到消息队列中 注意
如果 redis.call(get, stockKey) 返回的结果是空值nil那么尝试将空值转换为数字时会出现错误。 因为无法将空值转换为数字。 为了避免这种错误可以在进行比较之前先检查返回结果是否为非空值。 这样如果 redis.call(get, stockKey) 返回的结果是空值就不会进行比较从而避免了错误。
-- 1.参数列表
-- 1.1 优惠券id
local voucherId ARGV[1]
-- 1.2 用户id
local userId ARGV[2]
-- 1.3 订单id
local orderId ARGV[3]-- 2.数据key
-- 2.1 库存key
local stockKey seckill:stock .. voucherId
-- 2.2 订单key
local orderKey seckill:order .. voucherId-- 3.脚本业务
-- 3.1 判断库存是否充足
local stock redis.call(get, stockKey)
if stock and tonumber(stock) 0 then-- 3.2 库存不足 返回1return 1
end
-- 3.2 判断用户是否下单
if(redis.call(sismember, orderKey, userId) 1) then-- 3.3 存在说明是重复下单返回2return 2
end
-- 3.4 扣库存
redis.call(incrby, stockKey, -1)
-- 3.5 下单保存用户
redis.call(sadd, orderKey, userId)
-- 3.6 发送消息到队列中,XDD stream.orders * k1 v1 k2 v2...
redis.call(xadd, stream.orders, *, userId, userId, voucherId, voucherId, id, orderId)return 0秒杀的逻辑修改修改之前的阻塞队列方法 Override
public Result seckillVoucher(Long voucherId) {// 获取用户idLong userId UserHolder.getUser().getId();// 订单id生成唯一IDlong orderId redisIdWorker.nextId(order);//1. 执行lua脚本(判断购买资格发送订单信息到消息队列)Long result stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),// key参数为0所以参数传空集合voucherId.toString(),userId.toString(),String.valueOf(orderId));//2. 判断是否为0int i result.intValue();if(i ! 0) {//2.1 不为0没有购买资格return Result.fail(i 1 ? 库存不足 : 不能重复下单);}//3. 获取代理对象proxy (IVoucherOrderService) AopContext.currentProxy();//4. 返回订单idreturn Result.ok(0);
}开启线程任务 /*** 异步线程从消息队列中取出订单信息执行保存订单到数据库*/
private class VoucherOrderHandler implements Runnable{String queueName stream.orders;Overridepublic void run() {while (true){try {// 1. 获取消息队列中的订单信息// XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.order ListMapRecordString, Object, Object list stringRedisTemplate.opsForStream().read(Consumer.from(g1, c1),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));// 2. 判断消息获取是否成功if(list null || list.isEmpty()) {// 2.1 如果获取失败说明没有消息继续下一次循环continue;}// 3. 解析消息中的订单信息,键值类型参考脚本中的userId, userId, voucherId, voucherId, id, orderId// 前者String指的是消息ID Object, Object指的是上述键值对MapRecordString, Object, Object record list.get(0);MapObject, Object values record.getValue();// 3.1 转为voucherOrder对象VoucherOrder voucherOrder BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);// 4. 如果有获取成功可以下单handleVoucherOrder(voucherOrder);// 5. ACK确认// SACK stream.orders g1 idstringRedisTemplate.opsForStream().acknowledge(queueName, g1, record.getId());} catch (Exception e) {log.error(订单异常信息,e);handlePendingList();}}}/*** 订单异常消息。采用0的方法去处理pending-list中的消息进行ACK确认*/private void handlePendingList() {while (true){try {// 1. 获取消息队列中的订单信息// XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.order 0ListMapRecordString, Object, Object list stringRedisTemplate.opsForStream().read(Consumer.from(g1, c1),StreamReadOptions.empty().count(1),StreamOffset.create(queueName, ReadOffset.from(0)));// 2. 判断消息获取是否成功if(list null || list.isEmpty()) {// 2.1 如果获取失败说明pending-list没有异常消息继续下一次循环break;}// 3. 解析消息中的订单信息,键值类型参考脚本中的userId, userId, voucherId, voucherId, id, orderId// 前者String指的是消息ID Object, Object指的是上述键值对MapRecordString, Object, Object record list.get(0);MapObject, Object values record.getValue();// 3.1 转为voucherOrder对象VoucherOrder voucherOrder BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);// 4. 如果有获取成功可以下单handleVoucherOrder(voucherOrder);// 5. ACK确认// SACK stream.orders g1 idstringRedisTemplate.opsForStream().acknowledge(queueName, g1, record.getId());} catch (Exception e) {log.error(订单异常信息,e);try {Thread.sleep(20);} catch (InterruptedException ex) {throw new RuntimeException(ex);}}}}
}