百度网站上做推广受骗,.net开发的大型网站,永久域名注册多少钱,永济网站建设Redis stream 是 Redis 5 引入的一种新的数据结构#xff0c;它是一个高性能、高可靠性的消息队列#xff0c;主要用于异步消息处理和流式数据处理。在此之前#xff0c;想要使用 Redis 实现消息队列#xff0c;通常可以使用例如#xff1a;列表#xff0c;有序集合、发布… Redis stream 是 Redis 5 引入的一种新的数据结构它是一个高性能、高可靠性的消息队列主要用于异步消息处理和流式数据处理。在此之前想要使用 Redis 实现消息队列通常可以使用例如列表有序集合、发布与订阅 3 种数据结构。但是 stream 相比它们具有以下的优势 支持范围查找内置的索引功能可以通过索引来对消息进行范围查找支持阻塞操作避免低效的反复轮询查找消息支持 ACK可以通过确认机制来告知已经成功处理了消息保证可靠性支持多个消费者多个消费者可以同时消费同一个流Redis 会确保每个消费者都可以独立地消费流中的消息 情况
当前项目是在window server 上 部署 rocketmq ps 单体使用 消息队列使用三方exe实现 注册到 window 服务中实现开机自启-自行搜索实现吧。
问题
当前发现一个问题window 被关机主动、断电等mq的broker无法启动原因是 delayOffset.json文件损坏造成文件内容变成如下图。 修改为重新启动没问题了 每次重新都需要处理虽然可以考虑使用脚本 处理但是徒增成本。就想着 使用redis实现 进行替换项目规模较小。redis-stream 需要将 版本升级为 5当前是4。 Redis for Windows redis下载 直接替换就行。
死信问题
网上看到 处理的方式有好几种。定时调度处理pending信息直接ack、重新消费、转移消费组等人工运维处理 等。看了很多结合 spring 提供的函数。 StringRedisTemplate.redisTemplate.opsForStream()返回的接口类StreamOperations 参考spring文档可以用的方法有 acknowledge、 add、 delete、 createGroup、 deleteConsumer、 destroyGroup、 consumers、 groups、 info、 pending、 size、 range、 read、 reverseRange、 trim等。 没有包含命令转移消费 xClaim实际在RedisStreamCommands接口中通过代码也能解决
redisTemplate.execute((RedisCallbackListByteRecord) connection - connection.streamCommands().xClaim(stream.getBytes(), group, consumer, Duration.ofSeconds(10), RecordId.of(streamId)))也没有包含命令设置消费组的起始消息 ID xgroupSetid实际在RedisStreamAsyncCommands接口中代码在比较底层不能拿来就用。
RedisFutureString xgroupSetid(StreamOffsetK streamOffset, K group);当前方式
当前实现的方式。在项目 重启 或 定时调度暂无代码demo通过 pending 获取消息 streamId先复制消息add在确认旧消息acknowledge最后删除旧消息delete。 ps自己感觉还行吧迷之自信O(∩_∩)O哈哈~不要拿来就用啊(⊙o⊙)… redis桌面管理
下载redis可视化管理工具AnotherRedisDesktopManager
参考
理解 Redis 新特性Stream springboot redis stream做轻量级消息队列 Redis Stream实现消息队列 redis Stream消息队列 redision redis stream消息队列pending 使用redis流和spring数据获取挂起的消息 在SpringBoot中使用RedisTemplate重新消费Redis Stream中未ACK的消息
说明
redis-stream命令
XADD向流中添加新的消息。XREAD从流中读取消息。XREADGROUP从消费组中读取消息。XRANGE根据消息 ID 范围读取流中的消息。XREVRANGE与 XRANGE 类似但以相反顺序返回结果。XDEL从流中删除消息。XTRIM根据 MAXLEN 参数修剪流的长度。XLEN获取流的长度。XGROUP管理消费组包括创建、删除和修改。XACK确认消费组中的消息已被处理。XPENDING查询消费组中挂起未确认的消息。XCLAIM将挂起的消息从一个消费者转移到另一个消费者。XINFO获取流、消费组或消费者的详细信息。
pom
spring-boot 加 redis依赖简单项目测试用
parentartifactIdspring-boot-starter-parent/artifactIdgroupIdorg.springframework.boot/groupIdversion2.7.0/versionrelativePath/
/parent
propertiesjava.version1.8/java.versionproject.build.sourceEncodingUTF-8/project.build.sourceEncodingproject.reporting.outputEncodingUTF-8/project.reporting.outputEncodingspring-boot.version2.7.0/spring-boot.version
/propertiesdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-data-redis/artifactId/dependencydependencygroupIdorg.apache.commons/groupIdartifactIdcommons-pool2/artifactId/dependency
/dependenciesdependencyManagementdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-dependencies/artifactIdversion${spring-boot.version}/versiontypepom/typescopeimport/scope/dependency/dependencies
/dependencyManagement配置
server:port: 8888servlet:context-path: /
spring:redis:msg:listener: falsehost: 127.0.0.1port: 6379database: 1client-type: lettucelettuce:pool:max-active: 8代码结构
引入spring boot的 StringRedisTemplate private static StringRedisTemplate redisTemplate;Autowiredpublic void setRedisTemplate(StringRedisTemplate redisTemplate) {RedisStreamConfig.redisTemplate redisTemplate;}声明变量。redis 的 key键名称消费者 组名消费者 客户端名。
final static String STREAM tsp, GROUP tsp-g-1, CONSUMER tsp-c-1;生产者-发送消息
这块比较简单直接调用就行
/** 生产者-发送消息 */public static void push(String msg){// 创建消息记录, 以及指定streamStringRecord record StreamRecords.string(Collections.singletonMap(data, msg)).withStreamKey(STREAM);// 将消息添加至消息队列中 XADD stream [MAXLEN len] id field value [field value ...]redisTemplate.opsForStream().add(record);log.info(redis-消息队列-stream, {} ,send msg: {}, STREAM,msg);}消费者
消费者-监听类实际的业务逻辑处理内容。消息队列 创建 会有测试消息需要跳过。实际业务逻辑需要考虑挂起消息 重新消费情况解决 死信问题的简单方法
/** 消费者 监听 */public static class TspStreamListener implements StreamListenerString, ObjectRecordString,String {Overridepublic void onMessage(ObjectRecordString, String message) {RecordId messageId message.getId();// 消息的key和valueString string message.getValue();if(T.equals(string)){log.info(消费者-监听测试消息-ack. msgId{}, stream{}, body{}, messageId, message.getStream(), string);redisTemplate.opsForStream().acknowledge(GROUP, message);return;}log.info(消费者-监听get msg. msgId{}, stream{}, body{}, messageId, message.getStream(), string);//业务逻辑,需要考虑 被挂起的消息 重新消费情况try {log.info(消费者-监听睡眠10s,模拟耗时逻辑-start);TimeUnit.SECONDS.sleep(10);log.info(消费者-监听睡眠10s,模拟耗时逻辑-end);} catch (InterruptedException e) {e.printStackTrace();}log.info(消费者-监听手动确认消息);redisTemplate.opsForStream().acknowledge(GROUP, message);log.info(消费者-监听end);}}消费者-注册关闭自动ack重连自动消费上次处理已经在pending队列里面了的下一个消息拉取消息超时时间50s每批数量1使用默认线程池传递数据类型 String异常处理 打印错误日志。最后 start 启动消费。 BeanConditionalOnProperty(name spring.redis.msg.listener,havingValue true)public StreamMessageListenerContainerString, ObjectRecordString, String tspConsumerListener(RedisConnectionFactory factory){//spring-data-redis 2.3.1.RELEASE及更高版本中createGroup如果不存在则会自动使用创建流return streamContainer(StreamOffset.create(STREAM, ReadOffset.lastConsumed()),Consumer.from(GROUP,CONSUMER),factory,new TspStreamListener());}private StreamMessageListenerContainerString, ObjectRecordString, String streamContainer(StreamOffsetString offset,Consumer consumer, RedisConnectionFactory factory, StreamListenerString, ObjectRecordString, String listener) {// pollTimeout 拉取消息超时时间,targetType 传递的数据类型, executor 线程池StreamMessageListenerContainerString, ObjectRecordString, String container StreamMessageListenerContainer.create(factory, StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().pollTimeout(Duration.ofSeconds(50)).batchSize(1).targetType(String.class).build());//指定消费最新的消息try {//创建消费者,接收上次处理未ACK消费的消息,指定消费者对象,autoAcknowledge 关闭自动ack确认container.register(StreamMessageListenerContainer.StreamReadRequest.builder(offset).errorHandler((error) - log.error(error.getMessage())).cancelOnError(e - false).consumer(consumer).autoAcknowledge(false).build(), listener);} catch (Exception e) {log.error(e.getMessage());}container.start();return container;}预处理
类加载完毕执行方法。预处理操作可以使用 spring 启动后处理类实现。本demo就先这样了。 本方法 是为了 解决项目启动报错无消息队列、无消费组和程序重启死信问题(꒦_꒦) PostConstructprivate void start(){}如果redis 没有 对应key 的 stream 消息队列消费者启动后会频繁报错。如下代码会在 项目启动 发送测试消息创建 消息队列并限制 消息队列 长度限制内存消耗。 ps我好菜呀不知道spring是否允许消费者 自动创建 消息队列o(╥﹏╥)o // 生产者 创建 消息队列防止启动 消费者 报错if(!(Boolean.TRUE.equals(redisTemplate.hasKey(STREAM)))){log.info(没有key,测试发送(创建key));push(T);//限制 消息队列 容量 XTRIM stream MAXLEN lenredisTemplate.opsForStream().trim(STREAM,1000L);}判断消息队列是否存在消费组。没有创建消费组
// 创建 消息队列-消费者if (redisTemplate.opsForStream().groups(STREAM).isEmpty()) {log.info(redis-消息队列-stream,createGroup,{} {},STREAM,GROUP);//XGROUP CREATE stream group [id|$|0] [MKSTREAM],使用 $ 表示仅消费新消息或者使用 0 表示消费流中的所有消息redisTemplate.opsForStream().createGroup(STREAM,GROUP);}死信处理代码
private void handlerPending(String key,String group){//判断是否存在挂起的消息 XPENDING stream group [start stop count] [consumer]PendingMessagesSummary pending redisTemplate.opsForStream().pending(key, group);long size;if(pending null || (size pending.getTotalPendingMessages()) 0 ) {log.debug(redis-消息队列,{},{},暂无挂起消息pending, key, group);return;}//---------------------String minId pending.minMessageId(),maxId pending.maxMessageId(),id;// 从挂起消息开始处理 [1],minId : 1706178903044-0,maxId : 1706178903044-0log.info(redis-消息队列,{},{},pending-挂起消息[{}],minId : {},maxId : {},key,group,size,minId,maxId);// - 4 ---------------------获取挂起所有信息 streamIdPendingMessages msgIds redisTemplate.opsForStream().pending(key, group, Range.closed(-, ), size);log.info(redis-消息队列,{},{},pending-挂起消息,{},key,group,msgIds);ListMapRecordString, Object, Object list;// ---------------------循环处理可以考虑 异步执行for (PendingMessage msgId : msgIds) {id msgId.getId().getValue();//PendingMessage{id1706178903044-0, consumertsp-g-1:tsp-c-1, elapsedTimeSinceLastDeliveryMS66775, totalDeliveryCount1}log.info(redis-消息队列,{},{},pending-挂起消息,{},key,group,msgId);//XRANGE key start end [COUNT count]list redisTemplate.opsForStream().range(key, Range.just(msgId.getIdAsString()));log.info(redis-消息队列,{},{},range-挂起消息 {},{},key,group,id,list);if(list null || list.isEmpty()){continue;}// 开始 结束 id相同只返回 一个消息结果MapRecordString, Object, Object record list.get(0);//MapBackedRecord{recordId1706233950802-0, kvMap{data401}}if(1706233940011-0.equals(id)){MapRecordString, Object, Object copy record.withId(RecordId.autoGenerate());log.info(redis-消息队列,{},{},range-挂起消息0addcopy {},{},key,group,copy.getStream(),copy);//XADD stream-name id field value [field value]RecordId add redisTemplate.opsForStream().add(copy);log.info(redis-消息队列,{},{},range-挂起消息0addcopy {},{},key,group,add,copy);log.info(redis-消息队列,{},{},range-挂起消息0ack^^^^^old {},{},key,group,record.getStream(),record);//XACK stream group id [id id ...]Long siz redisTemplate.opsForStream().acknowledge(GROUP, record);log.info(redis-消息队列,{},{},range-挂起消息0ack^^^^^old [{}],{},key,group,siz,record);log.info(redis-消息队列,{},{},range-挂起消息0del-----old {},{},key,group,record.getStream(),record);//XDEL key ID [ID ...]siz redisTemplate.opsForStream().delete(record);log.info(redis-消息队列,{},{},range-挂起消息0del-----old [{}],{},key,group,siz,record);}else{log.info(redis-消息队列,{},{},range-挂起消息0 {},{},{},key,group,record.getStream(),id,record);}}log.info(redis-消息队列,{},{},pending-挂起消息-end,key,group);}汇总
代码
package demo;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;/*** author z.y.l* version v1.0* date 2024/1/25*/
Configuration
Import (RedisAutoConfiguration.class)
public class RedisStreamConfig {private static final Logger log LoggerFactory.getLogger(RedisStreamConfig.class);final static String STREAM tsp, GROUP tsp-g-1, CONSUMER tsp-c-1;private static StringRedisTemplate redisTemplate;Autowiredpublic void setRedisTemplate(StringRedisTemplate redisTemplate) {RedisStreamConfig.redisTemplate redisTemplate;}/** 生产者-发送消息 */public static void push(String msg){// 创建消息记录, 以及指定streamStringRecord record StreamRecords.string(Collections.singletonMap(data, msg)).withStreamKey(STREAM);// 将消息添加至消息队列中 XADD stream [MAXLEN len] id field value [field value ...]redisTemplate.opsForStream().add(record);log.info(redis-消息队列-stream, {} ,send msg: {}, STREAM,msg);}/** 项目启动 预处理无消息队列消费者-启动报错、限制容量、手动创建消费组、处理异常挂起的消息 */PostConstructprivate void start(){// 生产者 创建 消息队列防止启动 消费者 报错if(!(Boolean.TRUE.equals(redisTemplate.hasKey(STREAM)))){log.info(没有key,测试发送(创建key));push(T);//限制 消息队列 容量 XTRIM stream MAXLEN lenredisTemplate.opsForStream().trim(STREAM,1000L);}// 创建 消息队列-消费者if (redisTemplate.opsForStream().groups(STREAM).isEmpty()) {log.info(redis-消息队列-stream,createGroup,{} {},STREAM,GROUP);//XGROUP CREATE stream group [id|$|0] [MKSTREAM],使用 $ 表示仅消费新消息或者使用 0 表示消费流中的所有消息redisTemplate.opsForStream().createGroup(STREAM,GROUP);}else{// 处理 挂起消息也可以 使用调度 定时处理// 当前 挂起消息处理方式复制新消息-ack旧消息-删除旧消息// 也可以使用 转移 消费组handlerPending(STREAM,GROUP);}}private void handlerPending(String key,String group){//判断是否存在挂起的消息 XPENDING stream group [start stop count] [consumer]PendingMessagesSummary pending redisTemplate.opsForStream().pending(key, group);long size;if(pending null || (size pending.getTotalPendingMessages()) 0 ) {log.debug(redis-消息队列,{},{},暂无挂起消息pending, key, group);return;}String minId pending.minMessageId(),maxId pending.maxMessageId(),id;// 从挂起消息开始处理 [1],minId : 1706178903044-0,maxId : 1706178903044-0log.info(redis-消息队列,{},{},pending-挂起消息[{}],minId : {},maxId : {},key,group,size,minId,maxId);// - 4 所有PendingMessages msgIds redisTemplate.opsForStream().pending(key, group, Range.closed(-, ), size);log.info(redis-消息队列,{},{},pending-挂起消息,{},key,group,msgIds);ListMapRecordString, Object, Object list;for (PendingMessage msgId : msgIds) {id msgId.getId().getValue();//PendingMessage{id1706178903044-0, consumertsp-g-1:tsp-c-1, elapsedTimeSinceLastDeliveryMS66775, totalDeliveryCount1}log.info(redis-消息队列,{},{},pending-挂起消息,{},key,group,msgId);//XRANGE key start end [COUNT count]list redisTemplate.opsForStream().range(key, Range.just(msgId.getIdAsString()));log.info(redis-消息队列,{},{},range-挂起消息 {},{},key,group,id,list);if(list null || list.isEmpty()){continue;}// 开始 结束 id相同只返回 一个消息结果MapRecordString, Object, Object record list.get(0);//MapBackedRecord{recordId1706233950802-0, kvMap{data401}}if(1706233940011-0.equals(id)){MapRecordString, Object, Object copy record.withId(RecordId.autoGenerate());log.info(redis-消息队列,{},{},range-挂起消息0addcopy {},{},key,group,copy.getStream(),copy);//XADD stream-name id field value [field value]RecordId add redisTemplate.opsForStream().add(copy);log.info(redis-消息队列,{},{},range-挂起消息0addcopy {},{},key,group,add,copy);log.info(redis-消息队列,{},{},range-挂起消息0ack^^^^^old {},{},key,group,record.getStream(),record);//XACK stream group id [id id ...]Long siz redisTemplate.opsForStream().acknowledge(GROUP, record);log.info(redis-消息队列,{},{},range-挂起消息0ack^^^^^old [{}],{},key,group,siz,record);log.info(redis-消息队列,{},{},range-挂起消息0del-----old {},{},key,group,record.getStream(),record);//XDEL key ID [ID ...]siz redisTemplate.opsForStream().delete(record);log.info(redis-消息队列,{},{},range-挂起消息0del-----old [{}],{},key,group,siz,record);}else{log.info(redis-消息队列,{},{},range-挂起消息0 {},{},{},key,group,record.getStream(),id,record);}}log.info(redis-消息队列,{},{},pending-挂起消息-end,key,group);}BeanConditionalOnProperty(name spring.redis.msg.listener,havingValue true)public StreamMessageListenerContainerString, ObjectRecordString, String tspConsumerListener(RedisConnectionFactory factory){//spring-data-redis 2.3.1.RELEASE及更高版本中createGroup如果不存在则会自动使用创建流return streamContainer(StreamOffset.create(STREAM, ReadOffset.lastConsumed()),Consumer.from(GROUP,CONSUMER),factory,new TspStreamListener());}private StreamMessageListenerContainerString, ObjectRecordString, String streamContainer(StreamOffsetString offset,Consumer consumer, RedisConnectionFactory factory, StreamListenerString, ObjectRecordString, String listener) {// pollTimeout 拉取消息超时时间,targetType 传递的数据类型, executor 线程池StreamMessageListenerContainerString, ObjectRecordString, String container StreamMessageListenerContainer.create(factory, StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().pollTimeout(Duration.ofSeconds(50)).batchSize(1).targetType(String.class).build());//指定消费最新的消息try {//创建消费者,接收上次处理未ACK消费的消息,指定消费者对象,autoAcknowledge 关闭自动ack确认container.register(StreamMessageListenerContainer.StreamReadRequest.builder(offset).errorHandler((error) - log.error(error.getMessage())).cancelOnError(e - false).consumer(consumer).autoAcknowledge(false).build(), listener);} catch (Exception e) {log.error(e.getMessage());}container.start();return container;}/** 消费者 监听 */public static class TspStreamListener implements StreamListenerString, ObjectRecordString,String {Overridepublic void onMessage(ObjectRecordString, String message) {RecordId messageId message.getId();// 消息的key和valueString string message.getValue();if(T.equals(string)){log.info(消费者-监听测试消息-ack. msgId{}, stream{}, body{}, messageId, message.getStream(), string);redisTemplate.opsForStream().acknowledge(GROUP, message);return;}log.info(消费者-监听get msg. msgId{}, stream{}, body{}, messageId, message.getStream(), string);//业务逻辑,需要考虑 被挂起的消息 重新消费情况try {log.info(消费者-监听睡眠10s,模拟耗时逻辑-start);TimeUnit.SECONDS.sleep(10);log.info(消费者-监听睡眠10s,模拟耗时逻辑-end);} catch (InterruptedException e) {e.printStackTrace();}log.info(消费者-监听手动确认消息);redisTemplate.opsForStream().acknowledge(GROUP, message);log.info(消费者-监听end);}}
}
redis 日志
2024-01-26 10:33:54.566 - [main] INFO o.s.b.w.s.c.ServletWebServerApplicationContext.prepareWebApplicationContext(292) - Root WebApplicationContext: initialization completed in 1162 ms
2024-01-26 10:33:58.388 - [main] INFO demo.RedisStreamConfig.handlerPending(85) - redis-消息队列,tsp,tsp-g-1,pending-挂起消息[3],minId : 1706178903044-0,maxId : 1706233950802-0
2024-01-26 10:33:58.396 - [main] INFO demo.RedisStreamConfig.handlerPending(88) - redis-消息队列,tsp,tsp-g-1,pending-挂起消息,PendingMessages{groupNametsp-g-1, range[--], pendingMessages[PendingMessage{id1706178903044-0, consumertsp-g-1:tsp-c-1, elapsedTimeSinceLastDeliveryMS57524430, totalDeliveryCount1}, PendingMessage{id1706233940011-0, consumertsp-g-1:tsp-c-1, elapsedTimeSinceLastDeliveryMS2477103, totalDeliveryCount1}, PendingMessage{id1706233950802-0, consumertsp-g-1:tsp-c-1, elapsedTimeSinceLastDeliveryMS1968963, totalDeliveryCount1}]}
2024-01-26 10:33:58.397 - [main] INFO demo.RedisStreamConfig.handlerPending(93) - redis-消息队列,tsp,tsp-g-1,pending-挂起消息,PendingMessage{id1706178903044-0, consumertsp-g-1:tsp-c-1, elapsedTimeSinceLastDeliveryMS57524430, totalDeliveryCount1}
2024-01-26 10:33:58.404 - [main] INFO demo.RedisStreamConfig.handlerPending(95) - redis-消息队列,tsp,tsp-g-1,range-挂起消息 1706178903044-0,[]
2024-01-26 10:33:58.405 - [main] INFO demo.RedisStreamConfig.handlerPending(93) - redis-消息队列,tsp,tsp-g-1,pending-挂起消息,PendingMessage{id1706233940011-0, consumertsp-g-1:tsp-c-1, elapsedTimeSinceLastDeliveryMS2477103, totalDeliveryCount1}
2024-01-26 10:33:58.409 - [main] INFO demo.RedisStreamConfig.handlerPending(95) - redis-消息队列,tsp,tsp-g-1,range-挂起消息 1706233940011-0,[MapBackedRecord{recordId1706233940011-0, kvMap{data303}}]
2024-01-26 10:33:58.409 - [main] INFO demo.RedisStreamConfig.handlerPending(104) - redis-消息队列,tsp,tsp-g-1,range-挂起消息0addcopy tsp,MapBackedRecord{recordId*, kvMap{data303}}
2024-01-26 10:33:58.413 - [main] INFO demo.RedisStreamConfig.handlerPending(106) - redis-消息队列,tsp,tsp-g-1,range-挂起消息0addcopy 1706236438413-0,MapBackedRecord{recordId*, kvMap{data303}}
2024-01-26 10:33:58.413 - [main] INFO demo.RedisStreamConfig.handlerPending(107) - redis-消息队列,tsp,tsp-g-1,range-挂起消息0ack^^^^^old tsp,MapBackedRecord{recordId1706233940011-0, kvMap{data303}}
2024-01-26 10:33:58.418 - [main] INFO demo.RedisStreamConfig.handlerPending(109) - redis-消息队列,tsp,tsp-g-1,range-挂起消息0ack^^^^^old [1],MapBackedRecord{recordId1706233940011-0, kvMap{data303}}
2024-01-26 10:33:58.419 - [main] INFO demo.RedisStreamConfig.handlerPending(110) - redis-消息队列,tsp,tsp-g-1,range-挂起消息0del-----old tsp,MapBackedRecord{recordId1706233940011-0, kvMap{data303}}
2024-01-26 10:33:58.420 - [main] INFO demo.RedisStreamConfig.handlerPending(112) - redis-消息队列,tsp,tsp-g-1,range-挂起消息0del-----old [1],MapBackedRecord{recordId1706233940011-0, kvMap{data303}}
2024-01-26 10:33:58.421 - [main] INFO demo.RedisStreamConfig.handlerPending(93) - redis-消息队列,tsp,tsp-g-1,pending-挂起消息,PendingMessage{id1706233950802-0, consumertsp-g-1:tsp-c-1, elapsedTimeSinceLastDeliveryMS1968963, totalDeliveryCount1}
2024-01-26 10:33:58.422 - [main] INFO demo.RedisStreamConfig.handlerPending(95) - redis-消息队列,tsp,tsp-g-1,range-挂起消息 1706233950802-0,[MapBackedRecord{recordId1706233950802-0, kvMap{data401}}]
2024-01-26 10:33:58.422 - [main] INFO demo.RedisStreamConfig.handlerPending(114) - redis-消息队列,tsp,tsp-g-1,range-挂起消息0 tsp,1706233950802-0,MapBackedRecord{recordId1706233950802-0, kvMap{data401}}
2024-01-26 10:33:58.422 - [main] INFO demo.RedisStreamConfig.handlerPending(117) - redis-消息队列,tsp,tsp-g-1,pending-挂起消息-end仅供参考-请结合实际情况使用