没有网站怎么做cpa广告,推广合作,黄山市网站建设,大连网站建设工作室本文概览#xff1a;
组件介绍工作流程文本消息自定义信道多主题文本消息自定义信道标签过滤获取头信息定向的异常处理与全局异常处理顺序消息 全局顺序消息局部顺序消息 事务消息
当在选取队列组件的时候#xff0c;通常要结合实际情况#xff0c;大数据场景Kafka可能是理…本文概览
组件介绍工作流程文本消息自定义信道多主题文本消息自定义信道标签过滤获取头信息定向的异常处理与全局异常处理顺序消息 全局顺序消息局部顺序消息 事务消息
当在选取队列组件的时候通常要结合实际情况大数据场景Kafka可能是理想的选择事务或延迟队列场景可能RocketMQ是较成熟的选择其他常规业务高性能场景可能RabbitMQ是不错的选择。今天这里为了了解和使用事务和延迟队列的特性选择研究RocketMQ。
本文实践版本
Spring-Cloud-Stream: 2.2.10-C1Spring-Boot: 2.3.12.RELEASE
1、组件介绍
Producer生产者支持分布式集群部署支持快速产生消息并投递
Consumer消费者支持分布式集群部署支持Push和Pull的模式消费数据支持集群和广播方式消费数据
NameServertopic注册中心支持Broker的动态注册与发现
Broker 负责消息的存储、投递、查询与服务高可用
其他名词
Topic: 主题对消息分类Message: 消息体MessageID: 全局唯一标志系统自动生产Tag: 二级消息类型区分某个Topic下的消息分类Producer实例: 生产者的一个对象实例Consumer实例: 消费者的一个对象实例Group: 一类producer和consumerGroup ID: group标识队列: 每个Topic会对应一个或者多个队列来存储信息Exactly-Once 语义一条消息之后能被consumer消费一次即使重试也不会多次消费。消息队列 RocketMQ 的 Exactly-Once 投递语义适用于“接收消息 - 处理消息 - 结果持久化到数据库”的流程能够保证您的每一条消息消费的最终处理结果写入到您的数据库一次且仅一次保证消息消费的幂等。集群消费同一个groupId下的consumer平均消费一个消息只被投递到某一个consumer中广播消费同一个groupId下的consumer各自消费一个消息被投递到等多个consumer中定时消息指定时间将消息投递给consumer进行消费延时消息延后一段时间投递给consumer进行消费事务消息分布式事务最终一致性顺序消息按照顺序进行发布和消费全局顺序消息一种特殊的分区顺序消息严格遵守先进先出进行发布会和消费分区顺序消息一个topic多个分区通过shardingKey区分分区同一个分区内遵守先进先出多分区能够增加并发度提升性能消息堆积消费者未能在短时间内消费所有数据消息过滤消费者可以根据TAG过滤消息消息轨迹从生产者产出到消费者消费的过程中各个香干节点的时间、地点等数据汇聚而成的完整链路重置消费位点在消息持久化存储的时间范围内重新设置消费进度成功设置时间点后由生产者发送到服务端的消息死信队列处理无法正常消费的消息消息被初次消费失败后会进行自动重试重试达到上限依旧失败后消息会被放入死信队列-Dead Letter Queue,存储死信消息的特殊队列消息路由不同地域之间的消息同步
2、工作流程
启动 NameServer启动 BrokerBroker/Producer/Consumer 注册至 NameServer并彼此获取Topic等信息发送消息前创建TopicProducer启动与NameServer建立长链接。从NameServere获取Broker信息与Broker建立长链接向Broker发送消息Consumer启动与NameServer建立长链接。从NameServere获取Broker信息与Broker建立长链接从Broker获取消息
3、文本消息自定义信道
server:port: 8090spring:cloud:stream:rocketmq:binder:name-server: 127.0.0.1:9876group: rmq-gropbindings:textoutput:destination: text-topiccontentType: text/plaingroup: text-grouptextinput:destination: text-topiccontentType: text/plaingroup: text-group
# 日志级别
logging:level:com.alibaba.cloud.stream.binder.rocketmq: infopublic interface SelfSink {public static String TEXT_INPUTtextinput;Input(TEXT_INPUT)SubscribableChannel textInput();
}Configuration
EnableBinding(SelfSink.class)
public class RmqConsumerConfig {}Service
public class ReceiveService {StreamListener(value SelfSink.TEXT_INPUT)public void textInput(String message) {System.out.println(receive content: message);}
}Configuration
EnableBinding(SelfSource.class)
public class RmqProducerConfig {}public interface SelfSource {public static String TEXT_OUTPUTtextoutput;Output(TEXT_OUTPUT)MessageChannel textOutput();
}Service
public class SendService {Autowiredprivate SelfSource source;public void sendText(String msg){MessageString message (MessageString) MessageBuilder.withPayload(msg).build();source.textOutput().send(message);}
}
4、多主题文本消息自定义信道
server:port: 8090spring:cloud:stream:rocketmq:binder:name-server: 127.0.0.1:9876group: rmq-gropbindings:textoutput:destination: text-topiccontentType: text/plaingroup: text-grouptextoutput2:destination: text-topic2contentType: text/plaingroup: text-group2textinput:destination: text-topiccontentType: text/plaingroup: text-grouptextinput2:destination: text-topic2contentType: text/plaingroup: text-group2
# 日志级别
logging:level:com.alibaba.cloud.stream.binder.rocketmq: infopublic interface SelfSource {public static String TEXT_OUTPUTtextoutput;public static String TEXT_OUTPUT2textoutput2;Output(TEXT_OUTPUT)MessageChannel textOutput();Output(TEXT_OUTPUT2)MessageChannel textOutput2();
}Service
public class SendService {Autowiredprivate SelfSource source;public void sendText(String msg){MessageString message (MessageString) MessageBuilder.withPayload(msg).build();source.textOutput().send(message);}public void sendText2(String msg){MessageString message (MessageString) MessageBuilder.withPayload(msg).build();source.textOutput2().send(message);}
}public interface SelfSink {public static String TEXT_INPUTtextinput;public static String TEXT_INPUT2textinput2;Input(TEXT_INPUT)SubscribableChannel textInput();Input(TEXT_INPUT2)SubscribableChannel textInput2();
}Service
public class ReceiveService {StreamListener(value SelfSink.TEXT_INPUT)public void textInput(String message) {System.out.println(receive group1 content: message);}StreamListener(value SelfSink.TEXT_INPUT2)public void textInput2(String message) {System.out.println(receive group2 content: message);}
}
5、标签过滤获取头信息
server:port: 8090spring:cloud:stream:rocketmq:binder:name-server: 127.0.0.1:9876group: rmq-groupbindings:textinput3:consumer:subscription: tagfilterbindings:textoutput2:destination: text-topic2contentType: text/plaingroup: text-group2textinput3:destination: text-topic2contentType: text/plaingroup: text-group2
# 日志级别
logging:level:com.alibaba.cloud.stream.binder.rocketmq: infopublic interface SelfSource {public static String TEXT_OUTPUT2textoutput2;Output(TEXT_OUTPUT2)MessageChannel textOutput2();
}Service
public class SendService {Autowiredprivate SelfSource source;public void sendText(String msg) {MessageString message (MessageString) MessageBuilder.withPayload(msg).build();source.textOutput().send(message);}public void sendText2(String msg) {MessageString message (MessageString) MessageBuilder.withPayload(msg).build();source.textOutput2().send(message);}public void sendWithTag(String msg, String tag) {MessageString message MessageBuilder.withPayload(msg).setHeader(MessageConst.PROPERTY_TAGS, tag).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build();source.textOutput2().send(message);}
}public interface SelfSink {public static String TEXT_INPUTtextinput;public static String TEXT_INPUT2textinput2;public static String TEXT_INPUT3textinput3;Input(TEXT_INPUT3)SubscribableChannel textInput3();
}Service
public class ReceiveService {StreamListener(value SelfSink.TEXT_INPUT3,condition headers[ROCKET_TAGS] tagfilter)public void textInput3WithTag(String message, Headers Map headers, Header(name ROCKET_TAGS)String name) {System.out.println(receive group2 tagfilter content: message , headersheaders , namename);}
}send tagfilter group2 index:0
send tagfilter group2 index:1
send tagfilter group2 index:2
send tagfilter group2 index:3
receive group2 tagfilter content:group2-index:0, headers{ROCKET_TAGStagfilter, ROCKET_MQ_BORN_TIMESTAMP1698235938769, ROCKET_MQ_FLAG0, ROCKET_MQ_MESSAGE_IDC6120001C9F42C13DA157FEE87CD0000, ROCKET_MQ_TOPICtext-topic2, ROCKET_MQ_BORN_HOST172.17.0.1, idc13ac4ee-f498-a558-c6b7-30ed7563220f, ROCKET_MQ_SYS_FLAG0, contentTypeapplication/json, ROCKET_MQ_QUEUE_ID3, timestamp1698235938796}, nametagfilter
receive group2 tagfilter content:group2-index:1, headers{ROCKET_TAGStagfilter, ROCKET_MQ_BORN_TIMESTAMP1698235938793, ROCKET_MQ_FLAG0, ROCKET_MQ_MESSAGE_IDC6120001C9F42C13DA157FEE87E90002, ROCKET_MQ_TOPICtext-topic2, ROCKET_MQ_BORN_HOST172.17.0.1, iddf97a997-8437-c3de-7802-80313be691ed, ROCKET_MQ_SYS_FLAG0, contentTypeapplication/json, ROCKET_MQ_QUEUE_ID0, timestamp1698235938818}, nametagfilter
send tagfilter group2 index:4
receive group2 tagfilter content:group2-index:4, headers{ROCKET_TAGStagfilter, ROCKET_MQ_BORN_TIMESTAMP1698235938816, ROCKET_MQ_FLAG0, ROCKET_MQ_MESSAGE_IDC6120001C9F42C13DA157FEE88000009, ROCKET_MQ_TOPICtext-topic2, ROCKET_MQ_BORN_HOST172.17.0.1, id020522c8-6c29-b4db-c3ca-9987bd108625, ROCKET_MQ_SYS_FLAG0, contentTypeapplication/json, ROCKET_MQ_QUEUE_ID3, timestamp1698235938825}, nametagfilter
receive group2 tagfilter content:group2-index:3, headers{ROCKET_TAGStagfilter, ROCKET_MQ_BORN_TIMESTAMP1698235938808, ROCKET_MQ_FLAG0, ROCKET_MQ_MESSAGE_IDC6120001C9F42C13DA157FEE87F80007, ROCKET_MQ_TOPICtext-topic2, ROCKET_MQ_BORN_HOST172.17.0.1, idaa7d5d49-4178-f621-38ed-70b77bbfffc3, ROCKET_MQ_SYS_FLAG0, contentTypeapplication/json, ROCKET_MQ_QUEUE_ID2, timestamp1698235939676}, nametagfilter
receive group2 tagfilter content:group2-index:2, headers{ROCKET_TAGStagfilter, ROCKET_MQ_BORN_TIMESTAMP1698235938801, ROCKET_MQ_FLAG0, ROCKET_MQ_MESSAGE_IDC6120001C9F42C13DA157FEE87F10005, ROCKET_MQ_TOPICtext-topic2, ROCKET_MQ_BORN_HOST172.17.0.1, id76d60399-c127-adab-e262-59d01f50baab, ROCKET_MQ_SYS_FLAG0, contentTypeapplication/json, ROCKET_MQ_QUEUE_ID1, timestamp1698235939677}, nametagfilter6、定向的异常处理与全局异常处理 StreamListener(value SelfSink.TEXT_INPUT)public void textInput(String message) {System.out.println(receive group1 content: message);throw new IllegalStateException(异常信息1);}StreamListener(value SelfSink.TEXT_INPUT3,condition headers[ROCKET_TAGS] tagfilter)public void textInput3WithTag(String message, Headers Map headers, Header(name ROCKET_TAGS)String name) {System.out.println(receive group2 tagfilter content: message , headersheaders , namename);throw new IllegalArgumentException(异常信息3);}ServiceActivator(inputChannel text-topic.text-group.errors)public void handleError(ErrorMessage errorMessage){Throwable throwable errorMessage.getPayload();System.out.println(定向异常throwable);Message? originalMessage errorMessage.getOriginalMessage();System.out.println(Objects.nonNull(originalMessage)?originalMessage:空);}StreamListener(errorChannel)public void error(Message? message){ErrorMessage errorMessage (ErrorMessage)message;System.out.println(其他异常:errorMessage);}send group1 index:0
receive group1 content:group1-index:0
send tagfilter group2 index:0
receive group2 tagfilter content:group2-index:0, headers{ROCKET_TAGStagfilter, ROCKET_MQ_BORN_TIMESTAMP1698236994357, ROCKET_MQ_FLAG0, ROCKET_MQ_MESSAGE_IDC6120001D1122C13DA157FFEA3350003, ROCKET_MQ_TOPICtext-topic2, ROCKET_MQ_BORN_HOST172.17.0.1, idff93d975-c880-fbf0-0168-6b6e0fab9653, ROCKET_MQ_SYS_FLAG0, contentTypeapplication/json, ROCKET_MQ_QUEUE_ID0, timestamp1698236995233}, nametagfilter
receive group1 content:group1-index:0
receive group2 tagfilter content:group2-index:0, headers{ROCKET_TAGStagfilter, ROCKET_MQ_BORN_TIMESTAMP1698236994357, ROCKET_MQ_FLAG0, ROCKET_MQ_MESSAGE_IDC6120001D1122C13DA157FFEA3350003, ROCKET_MQ_TOPICtext-topic2, ROCKET_MQ_BORN_HOST172.17.0.1, idff93d975-c880-fbf0-0168-6b6e0fab9653, ROCKET_MQ_SYS_FLAG0, contentTypeapplication/json, ROCKET_MQ_QUEUE_ID0, timestamp1698236995233}, nametagfilter
receive group1 content:group1-index:0
INFO 53522 --- [ad_text-group_1] o.s.i.h.s.MessagingMethodInvokerHelper : Overriding default instance of MessageHandlerMethodFactory with provided one.
定向异常org.springframework.messaging.MessagingException: Exception thrown while invoking ReceiveService#textInput[1 args]; nested exception is java.lang.IllegalStateException: 异常信息1, failedMessageGenericMessage [payloadbyte[14], headers{ROCKET_MQ_BORN_TIMESTAMP1698236994326, ROCKET_MQ_FLAG0, ROCKET_MQ_MESSAGE_IDC6120001D1122C13DA157FFEA3130000, ROCKET_MQ_TOPICtext-topic, ROCKET_MQ_BORN_HOST172.17.0.1, id5672d0c2-04ae-6012-ae06-5cd84c2781fb, ROCKET_MQ_SYS_FLAG0, contentTypetext/plain, ROCKET_MQ_QUEUE_ID3, timestamp1698236994353}]
空
receive group2 tagfilter content:group2-index:0, headers{ROCKET_TAGStagfilter, ROCKET_MQ_BORN_TIMESTAMP1698236994357, ROCKET_MQ_FLAG0, ROCKET_MQ_MESSAGE_IDC6120001D1122C13DA157FFEA3350003, ROCKET_MQ_TOPICtext-topic2, ROCKET_MQ_BORN_HOST172.17.0.1, idff93d975-c880-fbf0-0168-6b6e0fab9653, ROCKET_MQ_SYS_FLAG0, contentTypeapplication/json, ROCKET_MQ_QUEUE_ID0, timestamp1698236995233}, nametagfilter
其他异常:ErrorMessage [payloadorg.springframework.messaging.MessagingException: Exception thrown while invoking ReceiveService#textInput3WithTag[3 args]; nested exception is java.lang.IllegalArgumentException: 异常信息3, failedMessageGenericMessage [payloadbyte[14], headers{ROCKET_TAGStagfilter, ROCKET_MQ_BORN_TIMESTAMP1698236994357, ROCKET_MQ_FLAG0, ROCKET_MQ_MESSAGE_IDC6120001D1122C13DA157FFEA3350003, ROCKET_MQ_TOPICtext-topic2, ROCKET_MQ_BORN_HOST172.17.0.1, idff93d975-c880-fbf0-0168-6b6e0fab9653, ROCKET_MQ_SYS_FLAG0, contentTypeapplication/json, ROCKET_MQ_QUEUE_ID0, timestamp1698236995233}], headers{id266dcc54-fb75-0a90-e456-917b8743ff5b, timestamp1698236998247}]
ERROR 53522 --- [d_text-group2_1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Exception thrown while invoking ReceiveService#textInput3WithTag[3 args]; nested exception is java.lang.IllegalArgumentException: 异常信息3, failedMessageGenericMessage [payloadbyte[14], headers{ROCKET_TAGStagfilter, ROCKET_MQ_BORN_TIMESTAMP1698236994357, ROCKET_MQ_FLAG0, ROCKET_MQ_MESSAGE_IDC6120001D1122C13DA157FFEA3350003, ROCKET_MQ_TOPICtext-topic2, ROCKET_MQ_BORN_HOST172.17.0.1, idff93d975-c880-fbf0-0168-6b6e0fab9653, ROCKET_MQ_SYS_FLAG0, contentTypeapplication/json, ROCKET_MQ_QUEUE_ID0, timestamp1698236995233}]at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:64)at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134)at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:69)at org.springframework.cloud.stream.binding.DispatchingStreamListenerMessageHandler.handleRequestMessage(DispatchingStreamListenerMessageHandler.java:96)获取异常消息体内容尝试关闭springcloudstream自带的重试机制能够实现。 上面步骤没有特殊配置默认遇到异常会进行重新投递导致System.out.println(Objects.nonNull(originalMessage)?originalMessage:空);始终为空无法获取原始信息。 调整如下配置后
server:port: 8090spring:cloud:stream:rocketmq:binder:name-server: 127.0.0.1:9876group: rmq-groupbindings:textinput3:consumer:subscription: tagfilterbindings:textoutput:destination: text-topiccontentType: text/plaingroup: text-grouptextoutput2:destination: text-topic2contentType: text/plaingroup: text-group2textinput:destination: text-topiccontentType: text/plaingroup: text-groupconsumer:maxAttempts: 1 #-- 默认是3,1表示不重试textinput2:destination: text-topic2contentType: text/plaingroup: text-group2textinput3:destination: text-topic2contentType: text/plaingroup: text-group2
# 日志级别
logging:level:com.alibaba.cloud.stream.binder.rocketmq: info可以获取原始异常 ServiceActivator(inputChannel text-topic.text-group.errors)public void handleError(ErrorMessage errorMessage){Throwable throwable errorMessage.getPayload();System.out.println(定向异常throwable);Message? originalMessage errorMessage.getOriginalMessage();
// System.out.println(Objects.nonNull(originalMessage)?处理定向异常原始信息: originalMessage :无);assert originalMessage ! null;System.out.println(处理定向异常原始信息new String((byte[])originalMessage.getPayload()));}定向异常org.springframework.messaging.MessagingException: Exception thrown while invoking ReceiveService#textInput[1 args]; nested exception is java.lang.IllegalStateException: 异常信息1, failedMessageGenericMessage [payloadbyte[14], headers{ROCKET_MQ_BORN_TIMESTAMP1698237547339, ROCKET_MQ_FLAG0, ROCKET_MQ_MESSAGE_IDC6120001D46F2C13DA15800713490000, ROCKET_MQ_TOPICtext-topic, ROCKET_MQ_BORN_HOST172.17.0.1, id1d05982a-38f7-5208-076d-7b84a2555bf1, ROCKET_MQ_SYS_FLAG0, contentTypetext/plain, ROCKET_MQ_QUEUE_ID0, timestamp1698237547362}]
处理定向异常原始信息group1-index:07、顺序消息
全局顺序消息
server:port: 8090spring:cloud:stream:rocketmq:binder:name-server: 127.0.0.1:9876group: rmq-groupbindings:pojooutput:producer:sendType: Syncgroup: pojo-grouppojoinput:consumer:orderly: true #这个参数的写法和你当前使用的版本息息相关具体见官方文档bindings:pojooutput:destination: pojo-topiccontentType: application/jsongroup: pojo-grouppojoinput:destination: pojo-topiccontentType: application/jsongroup: pojo-group
# 日志级别
logging:level:com.alibaba.cloud.stream.binder.rocketmq: infopublic class MessageDto {private static final long serialVersionUID 1L;private String index;private String title;private String content;public String getTitle() {return title;}public void setTitle(String title) {this.title title;}public String getContent() {return content;}public void setContent(String content) {this.content content;}public String getIndex() {return index;}public void setIndex(String index) {this.index index;}Overridepublic String toString() {return MessageDto{ index index \ , title title \ , content content \ };}
}Component
public class ProducerRunner implements CommandLineRunner {Autowiredprivate SendService sendService;Overridepublic void run(String... args) throws Exception {for (int i 0; i 5; i) {MessageDto messageDto new MessageDto();messageDto.setIndex(num: i);messageDto.setTitle(title);messageDto.setContent(content);sendService.sendPojoOrderly(messageDto);}}
}public interface SelfSource {public static String POJO_OUTPUTpojooutput;Output(POJO_OUTPUT)MessageChannel pojoOutput();
}Service
public class SendService {Autowiredprivate SelfSource source;public void sendPojoOrderly(MessageDto messageDto){MessageMessageDto message MessageBuilder.withPayload(messageDto).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build();System.out.println(message);source.pojoOutput().send(message);}
}public interface SelfSink {public static String POJO_INPUTpojoinput;Input(POJO_INPUT)SubscribableChannel pojoInput();
}Service
public class ReceiveService {StreamListener(value SelfSink.POJO_INPUT)public void pojoInout(Payload MessageDto messageDto){System.out.println([onMessage][线程编号:{} 消息内容{}] Thread.currentThread().getId(), messageDto.toString());}
} 测试效果正常发送并且按照顺序消费(不加orderly就是乱序)
GenericMessage [payloadMessageDto{indexnum:0, titletitle, contentcontent}, headers{contentTypeapplication/json, id00641aae-71d2-3a01-35f4-b3ec42886e9c, timestamp1698745880467}]
GenericMessage [payloadMessageDto{indexnum:1, titletitle, contentcontent}, headers{contentTypeapplication/json, id4d4a8cc2-d861-cf76-f0f5-33fdc06b2942, timestamp1698745880521}]
GenericMessage [payloadMessageDto{indexnum:2, titletitle, contentcontent}, headers{contentTypeapplication/json, id009ebc60-de06-adda-c44e-82faaae155c7, timestamp1698745880525}]
GenericMessage [payloadMessageDto{indexnum:3, titletitle, contentcontent}, headers{contentTypeapplication/json, idcad3af6c-df76-bc54-49c5-61cd27ef5b08, timestamp1698745880528}]
GenericMessage [payloadMessageDto{indexnum:4, titletitle, contentcontent}, headers{contentTypeapplication/json, id9beea414-7a78-a2e6-58f8-27729a2c7300, timestamp1698745880531}]
[onMessage][线程编号:{} 消息内容{}]119, MessageDto{indexnum:2, titletitle, contentcontent}
[onMessage][线程编号:{} 消息内容{}]119, MessageDto{indexnum:0, titletitle, contentcontent}
[onMessage][线程编号:{} 消息内容{}]119, MessageDto{indexnum:4, titletitle, contentcontent}
[onMessage][线程编号:{} 消息内容{}]119, MessageDto{indexnum:1, titletitle, contentcontent}
[onMessage][线程编号:{} 消息内容{}]119, MessageDto{indexnum:3, titletitle, contentcontent}配置顺序消费后rocketmq内部同一个topic有多个queue默认4个每个queue中的数据是有序的从日志打印的表面上看还是乱的实则已经队列内有序 但是如果想看到依据某一个标准看到绝对的有序可以通过分区有序来观测
局部顺序消息
计划分配两个分区依据index【0,1】字段来分流到不同的队列从而看出队列内的有序情况
server:port: 8090spring:cloud:stream:bindings:pojooutput: destination: pojo-topic content-type: application/json group: pojo-group producer:partitionCount: 2 #消息生产需要广播的消费者数量。即消息分区的数量partitionKeyExpression: payload.index #分区 key 表达式。该表达式基于 Spring EL从消息中获得分区 keypojoinput:destination: pojo-topiccontent-type: application/json group: pojo-group consumer:partitioned: true rocketmq:binder:name-server: 127.0.0.1:9876 group: rmq-groupbindings:pojooutput:producer:sendMsgTimeout: 3000 sendType: Sync pojoinput:consumer:enabled: true subscription: myTag||look messageModel: CLUSTERING push:orderly: trueinstance-count: 2 instance-index: 0
# 日志级别
logging:level:com.alibaba.cloud.stream.binder.rocketmq: info最终能看到数据被index分区每一个区里面消费是顺序的
GenericMessage [payloadMessageDto{index0, titletitle0, contentcontent}, headers{ide06aef14-e094-ab5e-32a6-c8ba574eb7cf, contentTypeapplication/json, TAGSmyTag, timestamp1698810537731}]
GenericMessage [payloadMessageDto{index0, titletitle1, contentcontent}, headers{idbdb2d72e-4502-380a-6324-e49af58e6082, contentTypeapplication/json, TAGSmyTag, timestamp1698810537790}]
GenericMessage [payloadMessageDto{index0, titletitle2, contentcontent}, headers{id5b2ea555-a3f5-648f-99e5-ac4183fd939c, contentTypeapplication/json, TAGSmyTag, timestamp1698810537794}]
GenericMessage [payloadMessageDto{index0, titletitle3, contentcontent}, headers{idba9ff31d-68ef-bcdb-3e86-f92cb88804c2, contentTypeapplication/json, TAGSmyTag, timestamp1698810537796}]
GenericMessage [payloadMessageDto{index1, titletitle4, contentcontent}, headers{id1c4b47c3-3f65-cf65-fa8e-143d873602f3, contentTypeapplication/json, TAGSmyTag, timestamp1698810537800}]
GenericMessage [payloadMessageDto{index1, titletitle5, contentcontent}, headers{id030732b8-1792-5899-b704-90462774ee76, contentTypeapplication/json, TAGSmyTag, timestamp1698810537803}]
GenericMessage [payloadMessageDto{index1, titletitle6, contentcontent}, headers{id2e4cdcc9-1b46-fddc-9df8-1c000d66648c, contentTypeapplication/json, TAGSmyTag, timestamp1698810537808}]
GenericMessage [payloadMessageDto{index1, titletitle7, contentcontent}, headers{id9f45f277-bab6-9abe-1618-6755ec3715b4, contentTypeapplication/json, TAGSmyTag, timestamp1698810537811}]
[onMessage][线程编号:{} 消息内容{}]120, MessageDto{index1, titletitle4, contentcontent}
[onMessage][线程编号:{} 消息内容{}]120, MessageDto{index1, titletitle5, contentcontent}
[onMessage][线程编号:{} 消息内容{}]120, MessageDto{index1, titletitle6, contentcontent}
[onMessage][线程编号:{} 消息内容{}]120, MessageDto{index1, titletitle7, contentcontent}
[onMessage][线程编号:{} 消息内容{}]120, MessageDto{index0, titletitle0, contentcontent}
[onMessage][线程编号:{} 消息内容{}]120, MessageDto{index0, titletitle1, contentcontent}
[onMessage][线程编号:{} 消息内容{}]120, MessageDto{index0, titletitle2, contentcontent}
[onMessage][线程编号:{} 消息内容{}]120, MessageDto{index0, titletitle3, contentcontent}8、事务消息
自定义listener来处理事务的执行、事务的提交或回滚
server:port: 8090spring:cloud:stream:bindings:transoutput:destination: trans-topiccontent-type: application/jsongroup: trans-grouptransinput:destination: trans-topiccontent-type: application/jsongroup: trans-grouprocketmq:binder:name-server: 127.0.0.1:9876 group: rmq-group bindings:transoutput:producer:producerType: Trans transactionListener: rocketMQTransactionListener group: trans-grouptransinput:consumer:group: trans-group
# 日志级别
logging:level:com.alibaba.cloud.stream.binder.rocketmq: infopublic class TransData {private String param1;private String param2;public String getParam1() {return param1;}public void setParam1(String param1) {this.param1 param1;}public String getParam2() {return param2;}public void setParam2(String param2) {this.param2 param2;}Overridepublic String toString() {return TransData{ param1 param1 \ , param2 param2 \ };}
}public interface SelfSource {public static String TRANS_OUTPUTtransoutput;Output(TRANS_OUTPUT)MessageChannel transOutput();
}Service
public class SendService {Autowiredprivate SelfSource source;public void sendTrans(String transMessage) {TransData data new TransData();data.setParam1(data1);data.setParam2(data2);MessageString message MessageBuilder.withPayload(transMessage).setHeader(args, JSON.toJSONString(data)).build();source.transOutput().send(message);}
}public interface SelfSink {public static String TRANS_INPUTtransinput;Input(TRANS_INPUT)SubscribableChannel transInput();
}Component
public class RocketMQTransactionListener implements TransactionListener {Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {System.out.println(本地事务开始执行);String message new String(msg.getBody());System.out.println(原始消息体{}message);String tid msg.getTransactionId();System.out.println(事务消息id{} tid);//模拟执行本地事务beginSystem.out.println(本地事务执行参数start......----------------------);TransData args JSON.parseObject(msg.getProperty(args), TransData.class);//rollback, commit or unknownSystem.out.println([executeLocalTransaction][执行本地事务消息{} args{}] msg-- args.toString());System.out.println(本地事务执行参数end......------------------------);//模拟执行本地事务end//TODO 根据本地事务执行结果返回//LocalTransactionState.COMMIT_MESSAGE 二次确认消息然后消费者可以消费//LocalTransactionState.ROLLBACK_MESSAGE 回滚消息Broker端会删除半消息//LocalTransactionState.UNKNOW Broker端会进行回查消息return LocalTransactionState.UNKNOW;}Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {System.out.println(回查接口);//rollback, commit or unknownSystem.out.println([checkLocalTransaction][回查消息{}] msg);String tid msg.getTransactionId();System.out.println([checkLocalTransaction][事务消息id{}] tid);return LocalTransactionState.COMMIT_MESSAGE;}
}public interface SelfSink {public static String TRANS_INPUTtransinput;Input(TRANS_INPUT)SubscribableChannel transInput();
}Service
public class ReceiveService {StreamListener(value SelfSink.TRANS_INPUT)public void transInput(String message,Header(name args)String args){System.out.println(receive trans content: message , argsargs);}
}测试情况
本地事务开始执行
原始消息体{}transdata
事务消息id{}C6120001B9C9077556FD02F051300000
本地事务执行参数start......----------------------
[executeLocalTransaction][执行本地事务消息{} args{}]Message{topictrans-topic, flag0, properties{args{param1:data1,param2:data2}, TRAN_MSGtrue, id5283b92f-8e67-c398-a63b-fd90ee7b577e, UNIQ_KEYC6120001B9C9077556FD02F051300000, WAITtrue, contentTypeapplication/json, PGROUPtrans-group, timestamp1698817303846}, body[116, 114, 97, 110, 115, 100, 97, 116, 97], transactionIdC6120001B9C9077556FD02F051300000}--TransData{param1data1, param2data2}
本地事务执行参数end......------------------------
本地事务开始执行
原始消息体{}transdata
事务消息id{}C6120001B9C9077556FD02F051440002
本地事务执行参数start......----------------------
[executeLocalTransaction][执行本地事务消息{} args{}]Message{topictrans-topic, flag0, properties{args{param1:data1,param2:data2}, TRAN_MSGtrue, id0cebbc2e-f6af-b6ab-a013-bc19d3d1b670, UNIQ_KEYC6120001B9C9077556FD02F051440002, WAITtrue, contentTypeapplication/json, PGROUPtrans-group, timestamp1698817303875}, body[116, 114, 97, 110, 115, 100, 97, 116, 97], transactionIdC6120001B9C9077556FD02F051440002}--TransData{param1data1, param2data2}
本地事务执行参数end......------------------------
本地事务开始执行
原始消息体{}transdata
事务消息id{}C6120001B9C9077556FD02F051470004
本地事务执行参数start......----------------------
[executeLocalTransaction][执行本地事务消息{} args{}]Message{topictrans-topic, flag0, properties{args{param1:data1,param2:data2}, TRAN_MSGtrue, id1ba5b5ae-c041-558f-b97e-5eff2093f477, UNIQ_KEYC6120001B9C9077556FD02F051470004, WAITtrue, contentTypeapplication/json, PGROUPtrans-group, timestamp1698817303879}, body[116, 114, 97, 110, 115, 100, 97, 116, 97], transactionIdC6120001B9C9077556FD02F051470004}--TransData{param1data1, param2data2}
本地事务执行参数end......------------------------
本地事务开始执行
原始消息体{}transdata
事务消息id{}C6120001B9C9077556FD02F0514D0006
本地事务执行参数start......----------------------
[executeLocalTransaction][执行本地事务消息{} args{}]Message{topictrans-topic, flag0, properties{args{param1:data1,param2:data2}, TRAN_MSGtrue, idf03f772b-0a49-d85c-4a53-9a2dbaf91c17, UNIQ_KEYC6120001B9C9077556FD02F0514D0006, WAITtrue, contentTypeapplication/json, PGROUPtrans-group, timestamp1698817303885}, body[116, 114, 97, 110, 115, 100, 97, 116, 97], transactionIdC6120001B9C9077556FD02F0514D0006}--TransData{param1data1, param2data2}
本地事务执行参数end......------------------------
本地事务开始执行
原始消息体{}transdata
事务消息id{}C6120001B9C9077556FD02F051520008
本地事务执行参数start......----------------------
[executeLocalTransaction][执行本地事务消息{} args{}]Message{topictrans-topic, flag0, properties{args{param1:data1,param2:data2}, TRAN_MSGtrue, id3bc809a8-2f33-40cb-edaf-9b69c68282c8, UNIQ_KEYC6120001B9C9077556FD02F051520008, WAITtrue, contentTypeapplication/json, PGROUPtrans-group, timestamp1698817303890}, body[116, 114, 97, 110, 115, 100, 97, 116, 97], transactionIdC6120001B9C9077556FD02F051520008}--TransData{param1data1, param2data2}
本地事务执行参数end......------------------------
回查接口
[checkLocalTransaction][回查消息{}]MessageExt [brokerNamenull, queueId1, storeSize416, queueOffset18, sysFlag0, bornTimestamp1698817303885, bornHost/172.17.0.1:60840, storeTimestamp1698817303907, storeHost/127.0.0.1:10911, msgId7F00000100002A9F000000000012DDF4, commitLogOffset1236468, bodyCRC147427201, reconsumeTimes0, preparedTransactionOffset0, toString()Message{topictrans-topic, flag0, properties{args{param1:data1,param2:data2}, REAL_TOPICtrans-topic, TRANSACTION_CHECK_TIMES1, TRAN_MSGtrue, idf03f772b-0a49-d85c-4a53-9a2dbaf91c17, UNIQ_KEYC6120001B9C9077556FD02F0514D0006, CLUSTERDefaultRmqCluster, contentTypeapplication/json, PGROUPtrans-group, WAITfalse, timestamp1698817303885, REAL_QID1}, body[116, 114, 97, 110, 115, 100, 97, 116, 97], transactionIdC6120001B9C9077556FD02F0514D0006}]
[checkLocalTransaction][事务消息id{}]C6120001B9C9077556FD02F0514D0006
回查接口
[checkLocalTransaction][回查消息{}]MessageExt [brokerNamenull, queueId3, storeSize416, queueOffset16, sysFlag0, bornTimestamp1698817303876, bornHost/172.17.0.1:60840, storeTimestamp1698817303897, storeHost/127.0.0.1:10911, msgId7F00000100002A9F000000000012DA9C, commitLogOffset1235612, bodyCRC147427201, reconsumeTimes0, preparedTransactionOffset0, toString()Message{topictrans-topic, flag0, properties{args{param1:data1,param2:data2}, REAL_TOPICtrans-topic, TRANSACTION_CHECK_TIMES1, TRAN_MSGtrue, id0cebbc2e-f6af-b6ab-a013-bc19d3d1b670, UNIQ_KEYC6120001B9C9077556FD02F051440002, CLUSTERDefaultRmqCluster, contentTypeapplication/json, PGROUPtrans-group, WAITfalse, timestamp1698817303875, REAL_QID3}, body[116, 114, 97, 110, 115, 100, 97, 116, 97], transactionIdC6120001B9C9077556FD02F051440002}]
[checkLocalTransaction][事务消息id{}]C6120001B9C9077556FD02F051440002
回查接口
[checkLocalTransaction][回查消息{}]MessageExt [brokerNamenull, queueId0, storeSize416, queueOffset17, sysFlag0, bornTimestamp1698817303879, bornHost/172.17.0.1:60840, storeTimestamp1698817303902, storeHost/127.0.0.1:10911, msgId7F00000100002A9F000000000012DC48, commitLogOffset1236040, bodyCRC147427201, reconsumeTimes0, preparedTransactionOffset0, toString()Message{topictrans-topic, flag0, properties{args{param1:data1,param2:data2}, REAL_TOPICtrans-topic, TRANSACTION_CHECK_TIMES1, TRAN_MSGtrue, id1ba5b5ae-c041-558f-b97e-5eff2093f477, UNIQ_KEYC6120001B9C9077556FD02F051470004, CLUSTERDefaultRmqCluster, contentTypeapplication/json, PGROUPtrans-group, WAITfalse, timestamp1698817303879, REAL_QID0}, body[116, 114, 97, 110, 115, 100, 97, 116, 97], transactionIdC6120001B9C9077556FD02F051470004}]
[checkLocalTransaction][事务消息id{}]C6120001B9C9077556FD02F051470004
回查接口
[checkLocalTransaction][回查消息{}]MessageExt [brokerNamenull, queueId2, storeSize416, queueOffset15, sysFlag0, bornTimestamp1698817303860, bornHost/172.17.0.1:60840, storeTimestamp1698817303887, storeHost/127.0.0.1:10911, msgId7F00000100002A9F000000000012D8F0, commitLogOffset1235184, bodyCRC147427201, reconsumeTimes0, preparedTransactionOffset0, toString()Message{topictrans-topic, flag0, properties{args{param1:data1,param2:data2}, REAL_TOPICtrans-topic, TRANSACTION_CHECK_TIMES1, TRAN_MSGtrue, id5283b92f-8e67-c398-a63b-fd90ee7b577e, UNIQ_KEYC6120001B9C9077556FD02F051300000, CLUSTERDefaultRmqCluster, contentTypeapplication/json, PGROUPtrans-group, WAITfalse, timestamp1698817303846, REAL_QID2}, body[116, 114, 97, 110, 115, 100, 97, 116, 97], transactionIdC6120001B9C9077556FD02F051300000}]
[checkLocalTransaction][事务消息id{}]C6120001B9C9077556FD02F051300000
回查接口
[checkLocalTransaction][回查消息{}]MessageExt [brokerNamenull, queueId2, storeSize416, queueOffset19, sysFlag0, bornTimestamp1698817303890, bornHost/172.17.0.1:60840, storeTimestamp1698817303912, storeHost/127.0.0.1:10911, msgId7F00000100002A9F000000000012DFA0, commitLogOffset1236896, bodyCRC147427201, reconsumeTimes0, preparedTransactionOffset0, toString()Message{topictrans-topic, flag0, properties{args{param1:data1,param2:data2}, REAL_TOPICtrans-topic, TRANSACTION_CHECK_TIMES1, TRAN_MSGtrue, id3bc809a8-2f33-40cb-edaf-9b69c68282c8, UNIQ_KEYC6120001B9C9077556FD02F051520008, CLUSTERDefaultRmqCluster, contentTypeapplication/json, PGROUPtrans-group, WAITfalse, timestamp1698817303890, REAL_QID2}, body[116, 114, 97, 110, 115, 100, 97, 116, 97], transactionIdC6120001B9C9077556FD02F051520008}]
[checkLocalTransaction][事务消息id{}]C6120001B9C9077556FD02F051520008
receive trans content:transdata, args{param1:data1,param2:data2}
receive trans content:transdata, args{param1:data1,param2:data2}
receive trans content:transdata, args{param1:data1,param2:data2}
receive trans content:transdata, args{param1:data1,param2:data2}
receive trans content:transdata, args{param1:data1,param2:data2}