网站开发(定制)合同 模板,自助建站官网,wordpress更新是乱码,建设一个网站怎么赚钱RocketMQ支持的消息类型有三种#xff1a;普通消息、顺序消息、延时消息、事务消息。以下内容的代码部分都是基于rocketmq-spring-boot-starter做的。
普通消息 普通消息是一种无序消息#xff0c;消息分布在各个MessageQueue当中#xff0c;以保证效率为第一使命。这种消息… RocketMQ支持的消息类型有三种普通消息、顺序消息、延时消息、事务消息。以下内容的代码部分都是基于rocketmq-spring-boot-starter做的。
普通消息 普通消息是一种无序消息消息分布在各个MessageQueue当中以保证效率为第一使命。这种消息适用于对顺序没有要求的基础消费需求。这里的Topic和MessageQueue是多对多关系。 // 生产者
public static void main(String[] args) throws MQClientException {DefaultMQProducer producer new DefaultMQProducer(rmq-group);producer.setNamesrvAddr(172.16.200.38:9876);producer.setInstanceName(producer);producer.start();try {for (int i 0; i 10; i) {Thread.sleep(1000);Message msg new Message(Topic-test, testTag, (new Date() RocketMQ test msg i).getBytes());SendResult sendResult producer.send(msg);System.out.println(sendResult.getMsgId());System.out.println(sendResult.getMessageQueue());System.out.println(sendResult.getSendStatus());System.out.println(sendResult.getOffsetMsgId());System.out.println(sendResult.getQueueOffset());System.out.println();System.out.println();}} catch (Exception e) {e.printStackTrace();}producer.shutdown();
}// 消费者
public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(rmq-group);consumer.setNamesrvAddr(172.16.200.38:9876);consumer.setInstanceName(consumer);consumer.subscribe(Topic-test, *);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : msgs) {System.out.printf(%s Receive New Messages: %s %n, Thread.currentThread().getName(), new String(messageExt.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();
} 顺序消息 当我开始对消息的时序性有要求的时候普通消息就无法满足我们的需求了。当我们要求顺序消费的时候我们的Topic就不能采用多对多的形式存放在多个MessageQueue中而是需要牺牲一部分性能存放在一个MessageQueue中。 // 生产者
public static void main(String[] args) throws MQClientException {DefaultMQProducer producer new DefaultMQProducer(rmq-group);producer.setNamesrvAddr(172.16.200.38:9876);producer.setInstanceName(producer);producer.start();try {for (int i 0; i 10; i) {Thread.sleep(1000);Message msg new Message(Topic-test, testTag, (new Date() RocketMQ test msg i).getBytes());SendResult sendResult producer.send(msg, (mqs, msg1, arg) - mqs.get(0), null);System.out.println(sendResult.getMsgId());System.out.println(sendResult.getMessageQueue());System.out.println(sendResult.getSendStatus());System.out.println(sendResult.getOffsetMsgId());System.out.println(sendResult.getQueueOffset());System.out.println(RocketMQ test msg i);System.out.println();}} catch (Exception e) {e.printStackTrace();}producer.shutdown();
}// 消费者
public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(rmq-group);consumer.setNamesrvAddr(172.16.200.38:9876);consumer.setInstanceName(consumer);consumer.subscribe(Topic-test, *);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 跟普通消费不同的地方这里采用了顺序消费方法。consumer.registerMessageListener(new MessageListenerOrderly() {Overridepublic ConsumeOrderlyStatus consumeMessage(ListMessageExt msgs, ConsumeOrderlyContext context) {for (MessageExt messageExt : msgs) {System.out.printf(%s Receive New Messages: %s %n, Thread.currentThread().getName(), new String(messageExt.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});//启动消费者consumer.start();
} 延时消息 通过指定的通知时间间隔让消息不会立刻被消费者收到。
// 生产者
public static void main(String[] args) throws MQClientException {DefaultMQProducer producer new DefaultMQProducer(rmq-group);producer.setNamesrvAddr(172.16.200.38:9876);producer.setInstanceName(producer);producer.start();try {for (int i 0; i 10; i) {Thread.sleep(1000);Message msg new Message(Topic-test, testTag, (new Date() RocketMQ test msg i).getBytes());// 指定延时等级// DelayTimeLevel对应的延时时间在服务端定义:rocketmq/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java// private String messageDelayLevel 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h;msg.setDelayTimeLevel(4);// 按毫秒延时msg.setDelayTimeMs(1L);// 按秒延时msg.setDelayTimeSec(1);SendResult sendResult producer.send(msg);System.out.println(sendResult.getMsgId());System.out.println(sendResult.getMessageQueue());System.out.println(sendResult.getSendStatus());System.out.println(sendResult.getOffsetMsgId());System.out.println(sendResult.getQueueOffset());System.out.println();System.out.println();}} catch (Exception e) {e.printStackTrace();}producer.shutdown();
}// 消费者
public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(rmq-group);consumer.setNamesrvAddr(172.16.200.38:9876);consumer.setInstanceName(consumer);consumer.subscribe(Topic-test, *);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : msgs) {System.out.printf(Receive New Messages: %s At %s %n, new String(messageExt.getBody()), new Date());}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();
}
30秒的延迟 事务消息 事务消息是RocketMQ提供的一种高级的消息类型支持在分布式场景下保障消息生产和本地事务的最终一致性。
事务消息的生命周期 (1)初始化半事务消息被生产者构建并完成初始化待发送到RocketMQ服务端的状态。 (2)事务待提交半事务消息被发送到服务端和普通消息不同半事务消息并不会直接被服务端持久化而是会被单独存储到事务存储系统中等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。 (3)消息回滚第二阶段如果事务执行结果明确为回滚服务端会将半事务消息回滚该事务消息流程终止。 (4)提交事务待消费第二阶段如果事务执行结果明确为提交服务端会将半事务消息重新存储到普通存储系统中此时消息对下游消费者可见。 (5)消费中消息被消费者获取并按照消费者本地的业务逻辑进行处理的过程。此时服务端会等待消费者完成消费并提交消费结果如果一段时间后没有收到消费者的响应服务端会对消息进行重试处理。 (6)消费完成消费者完成了消费动作并向服务端提交了消费结果服务端标记当前消息已经被处理。 (7)消息删除服务端按照消息保存机制滚动清理最早的消息数据将消息从物理文件中删除。 // 生产者
public static void main(String[] args) throws MQClientException {TransactionMQProducer producer new TransactionMQProducer(rmq-group);TransactionListener listener new TransactionListenerImpl();producer.setNamesrvAddr(172.16.200.38:9876);producer.setInstanceName(producer);producer.setTransactionListener(listener);producer.start();try {for (int i 0; i 10; i) {Thread.sleep(1000);Message msg new Message(Topic-test, testTag, (new Date() RocketMQ test msg i).getBytes());SendResult sendResult producer.sendMessageInTransaction(msg, null);System.out.println(new Date());System.out.println(sendResult.getMsgId());System.out.println(sendResult.getMessageQueue());System.out.println(sendResult.getSendStatus());System.out.println(sendResult.getOffsetMsgId());System.out.println(sendResult.getQueueOffset());System.out.println();System.out.println();}} catch (Exception e) {e.printStackTrace();}producer.shutdown();
}// 消费者
public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(rmq-group);consumer.setNamesrvAddr(172.16.200.38:9876);consumer.setInstanceName(consumer);consumer.subscribe(Topic-test, *);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : msgs) {System.out.printf(Receive New Messages: %s At %s %n, new String(messageExt.getBody()), new Date());}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();
} 控制台打印
// 生产者控制台打印
Wed Jul 26 17:55:45 CST 2023 RocketMQ test msg 0 : executeLocalTransaction:事务执行失败回滚
Wed Jul 26 17:55:45 CST 2023
7F00000154F073D16E938497DE420000
MessageQueue [topicTopic-test, brokerNamelocalhost.localdomain, queueId1]
SEND_OK
null
93
Wed Jul 26 17:55:46 CST 2023 RocketMQ test msg 1 : executeLocalTransaction:未知状态
Wed Jul 26 17:55:46 CST 2023
7F00000154F073D16E938497E2340001
MessageQueue [topicTopic-test, brokerNamelocalhost.localdomain, queueId2]
SEND_OK
null
94
Wed Jul 26 17:55:47 CST 2023 RocketMQ test msg 2 : executeLocalTransaction:事务执行失败回滚
Wed Jul 26 17:55:47 CST 2023
7F00000154F073D16E938497E6230002
MessageQueue [topicTopic-test, brokerNamelocalhost.localdomain, queueId3]
SEND_OK
null
95
Wed Jul 26 17:55:48 CST 2023 RocketMQ test msg 3 : executeLocalTransaction:事务执行成功提交
Wed Jul 26 17:55:48 CST 2023
7F00000154F073D16E938497EA180003
MessageQueue [topicTopic-test, brokerNamelocalhost.localdomain, queueId0]
SEND_OK
null
96
Wed Jul 26 17:55:49 CST 2023 RocketMQ test msg 4 : executeLocalTransaction:事务执行失败回滚
Wed Jul 26 17:55:49 CST 2023
7F00000154F073D16E938497EE0B0004
MessageQueue [topicTopic-test, brokerNamelocalhost.localdomain, queueId1]
SEND_OK
null
97
Wed Jul 26 17:55:50 CST 2023 RocketMQ test msg 5 : executeLocalTransaction:事务执行成功提交
Wed Jul 26 17:55:50 CST 2023
7F00000154F073D16E938497F1F70005
MessageQueue [topicTopic-test, brokerNamelocalhost.localdomain, queueId2]
SEND_OK
null
98
Wed Jul 26 17:55:51 CST 2023 RocketMQ test msg 6 : executeLocalTransaction:事务执行成功提交
Wed Jul 26 17:55:51 CST 2023
7F00000154F073D16E938497F5EC0006
MessageQueue [topicTopic-test, brokerNamelocalhost.localdomain, queueId3]
SEND_OK
null
99
Wed Jul 26 17:55:52 CST 2023 RocketMQ test msg 7 : executeLocalTransaction:事务执行失败回滚
Wed Jul 26 17:55:52 CST 2023
7F00000154F073D16E938497F9E50007
MessageQueue [topicTopic-test, brokerNamelocalhost.localdomain, queueId0]
SEND_OK
null
100
Wed Jul 26 17:55:53 CST 2023 RocketMQ test msg 8 : executeLocalTransaction:事务执行失败回滚
Wed Jul 26 17:55:53 CST 2023
7F00000154F073D16E938497FDD30008
MessageQueue [topicTopic-test, brokerNamelocalhost.localdomain, queueId1]
SEND_OK
null
101
Wed Jul 26 17:55:54 CST 2023 RocketMQ test msg 9 : executeLocalTransaction:事务执行成功提交
Wed Jul 26 17:55:54 CST 2023
7F00000154F073D16E93849801C90009
MessageQueue [topicTopic-test, brokerNamelocalhost.localdomain, queueId2]
SEND_OK
null
102// 消费者控制台打印
Receive New Messages: Wed Jul 26 17:55:50 CST 2023 RocketMQ test msg 5 At Wed Jul 26 17:55:50 CST 2023
Receive New Messages: Wed Jul 26 17:55:51 CST 2023 RocketMQ test msg 6 At Wed Jul 26 17:55:51 CST 2023
Receive New Messages: Wed Jul 26 17:55:54 CST 2023 RocketMQ test msg 9 At Wed Jul 26 17:55:54 CST 2023
这里消费者只收到了执行事务成功的5,6,9。