当前位置: 首页 > news >正文

网站开发(定制)合同 模板自助建站官网

网站开发(定制)合同 模板,自助建站官网,wordpress更新是乱码,建设一个网站怎么赚钱RocketMQ支持的消息类型有三种#xff1a;普通消息、顺序消息、延时消息、事务消息。以下内容的代码部分都是基于rocketmq-spring-boot-starter做的。 普通消息 普通消息是一种无序消息#xff0c;消息分布在各个MessageQueue当中#xff0c;以保证效率为第一使命。这种消息…        RocketMQ支持的消息类型有三种普通消息、顺序消息、延时消息、事务消息。以下内容的代码部分都是基于rocketmq-spring-boot-starter做的。 普通消息 普通消息是一种无序消息消息分布在各个MessageQueue当中以保证效率为第一使命。这种消息适用于对顺序没有要求的基础消费需求。这里的Topic和MessageQueue是多对多关系。 // 生产者 public static void main(String[] args) throws MQClientException {DefaultMQProducer producer new DefaultMQProducer(rmq-group);producer.setNamesrvAddr(172.16.200.38:9876);producer.setInstanceName(producer);producer.start();try {for (int i 0; i 10; i) {Thread.sleep(1000);Message msg new Message(Topic-test, testTag, (new Date() RocketMQ test msg i).getBytes());SendResult sendResult producer.send(msg);System.out.println(sendResult.getMsgId());System.out.println(sendResult.getMessageQueue());System.out.println(sendResult.getSendStatus());System.out.println(sendResult.getOffsetMsgId());System.out.println(sendResult.getQueueOffset());System.out.println();System.out.println();}} catch (Exception e) {e.printStackTrace();}producer.shutdown(); }// 消费者 public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(rmq-group);consumer.setNamesrvAddr(172.16.200.38:9876);consumer.setInstanceName(consumer);consumer.subscribe(Topic-test, *);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : msgs) {System.out.printf(%s Receive New Messages: %s %n, Thread.currentThread().getName(), new String(messageExt.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start(); } 顺序消息 当我开始对消息的时序性有要求的时候普通消息就无法满足我们的需求了。当我们要求顺序消费的时候我们的Topic就不能采用多对多的形式存放在多个MessageQueue中而是需要牺牲一部分性能存放在一个MessageQueue中。 // 生产者 public static void main(String[] args) throws MQClientException {DefaultMQProducer producer new DefaultMQProducer(rmq-group);producer.setNamesrvAddr(172.16.200.38:9876);producer.setInstanceName(producer);producer.start();try {for (int i 0; i 10; i) {Thread.sleep(1000);Message msg new Message(Topic-test, testTag, (new Date() RocketMQ test msg i).getBytes());SendResult sendResult producer.send(msg, (mqs, msg1, arg) - mqs.get(0), null);System.out.println(sendResult.getMsgId());System.out.println(sendResult.getMessageQueue());System.out.println(sendResult.getSendStatus());System.out.println(sendResult.getOffsetMsgId());System.out.println(sendResult.getQueueOffset());System.out.println(RocketMQ test msg i);System.out.println();}} catch (Exception e) {e.printStackTrace();}producer.shutdown(); }// 消费者 public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(rmq-group);consumer.setNamesrvAddr(172.16.200.38:9876);consumer.setInstanceName(consumer);consumer.subscribe(Topic-test, *);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 跟普通消费不同的地方这里采用了顺序消费方法。consumer.registerMessageListener(new MessageListenerOrderly() {Overridepublic ConsumeOrderlyStatus consumeMessage(ListMessageExt msgs, ConsumeOrderlyContext context) {for (MessageExt messageExt : msgs) {System.out.printf(%s Receive New Messages: %s %n, Thread.currentThread().getName(), new String(messageExt.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});//启动消费者consumer.start(); } 延时消息 通过指定的通知时间间隔让消息不会立刻被消费者收到。 // 生产者 public static void main(String[] args) throws MQClientException {DefaultMQProducer producer new DefaultMQProducer(rmq-group);producer.setNamesrvAddr(172.16.200.38:9876);producer.setInstanceName(producer);producer.start();try {for (int i 0; i 10; i) {Thread.sleep(1000);Message msg new Message(Topic-test, testTag, (new Date() RocketMQ test msg i).getBytes());// 指定延时等级// DelayTimeLevel对应的延时时间在服务端定义:rocketmq/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java// private String messageDelayLevel 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h;msg.setDelayTimeLevel(4);// 按毫秒延时msg.setDelayTimeMs(1L);// 按秒延时msg.setDelayTimeSec(1);SendResult sendResult producer.send(msg);System.out.println(sendResult.getMsgId());System.out.println(sendResult.getMessageQueue());System.out.println(sendResult.getSendStatus());System.out.println(sendResult.getOffsetMsgId());System.out.println(sendResult.getQueueOffset());System.out.println();System.out.println();}} catch (Exception e) {e.printStackTrace();}producer.shutdown(); }// 消费者 public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(rmq-group);consumer.setNamesrvAddr(172.16.200.38:9876);consumer.setInstanceName(consumer);consumer.subscribe(Topic-test, *);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : msgs) {System.out.printf(Receive New Messages: %s At %s %n, new String(messageExt.getBody()), new Date());}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start(); } 30秒的延迟 事务消息 事务消息是RocketMQ提供的一种高级的消息类型支持在分布式场景下保障消息生产和本地事务的最终一致性。 事务消息的生命周期 (1)初始化半事务消息被生产者构建并完成初始化待发送到RocketMQ服务端的状态。 (2)事务待提交半事务消息被发送到服务端和普通消息不同半事务消息并不会直接被服务端持久化而是会被单独存储到事务存储系统中等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。 (3)消息回滚第二阶段如果事务执行结果明确为回滚服务端会将半事务消息回滚该事务消息流程终止。 (4)提交事务待消费第二阶段如果事务执行结果明确为提交服务端会将半事务消息重新存储到普通存储系统中此时消息对下游消费者可见。 (5)消费中消息被消费者获取并按照消费者本地的业务逻辑进行处理的过程。此时服务端会等待消费者完成消费并提交消费结果如果一段时间后没有收到消费者的响应服务端会对消息进行重试处理。 (6)消费完成消费者完成了消费动作并向服务端提交了消费结果服务端标记当前消息已经被处理。 (7)消息删除服务端按照消息保存机制滚动清理最早的消息数据将消息从物理文件中删除。 // 生产者 public static void main(String[] args) throws MQClientException {TransactionMQProducer producer new TransactionMQProducer(rmq-group);TransactionListener listener new TransactionListenerImpl();producer.setNamesrvAddr(172.16.200.38:9876);producer.setInstanceName(producer);producer.setTransactionListener(listener);producer.start();try {for (int i 0; i 10; i) {Thread.sleep(1000);Message msg new Message(Topic-test, testTag, (new Date() RocketMQ test msg i).getBytes());SendResult sendResult producer.sendMessageInTransaction(msg, null);System.out.println(new Date());System.out.println(sendResult.getMsgId());System.out.println(sendResult.getMessageQueue());System.out.println(sendResult.getSendStatus());System.out.println(sendResult.getOffsetMsgId());System.out.println(sendResult.getQueueOffset());System.out.println();System.out.println();}} catch (Exception e) {e.printStackTrace();}producer.shutdown(); }// 消费者 public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(rmq-group);consumer.setNamesrvAddr(172.16.200.38:9876);consumer.setInstanceName(consumer);consumer.subscribe(Topic-test, *);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : msgs) {System.out.printf(Receive New Messages: %s At %s %n, new String(messageExt.getBody()), new Date());}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start(); } 控制台打印 // 生产者控制台打印 Wed Jul 26 17:55:45 CST 2023 RocketMQ test msg 0 : executeLocalTransaction:事务执行失败回滚 Wed Jul 26 17:55:45 CST 2023 7F00000154F073D16E938497DE420000 MessageQueue [topicTopic-test, brokerNamelocalhost.localdomain, queueId1] SEND_OK null 93 Wed Jul 26 17:55:46 CST 2023 RocketMQ test msg 1 : executeLocalTransaction:未知状态 Wed Jul 26 17:55:46 CST 2023 7F00000154F073D16E938497E2340001 MessageQueue [topicTopic-test, brokerNamelocalhost.localdomain, queueId2] SEND_OK null 94 Wed Jul 26 17:55:47 CST 2023 RocketMQ test msg 2 : executeLocalTransaction:事务执行失败回滚 Wed Jul 26 17:55:47 CST 2023 7F00000154F073D16E938497E6230002 MessageQueue [topicTopic-test, brokerNamelocalhost.localdomain, queueId3] SEND_OK null 95 Wed Jul 26 17:55:48 CST 2023 RocketMQ test msg 3 : executeLocalTransaction:事务执行成功提交 Wed Jul 26 17:55:48 CST 2023 7F00000154F073D16E938497EA180003 MessageQueue [topicTopic-test, brokerNamelocalhost.localdomain, queueId0] SEND_OK null 96 Wed Jul 26 17:55:49 CST 2023 RocketMQ test msg 4 : executeLocalTransaction:事务执行失败回滚 Wed Jul 26 17:55:49 CST 2023 7F00000154F073D16E938497EE0B0004 MessageQueue [topicTopic-test, brokerNamelocalhost.localdomain, queueId1] SEND_OK null 97 Wed Jul 26 17:55:50 CST 2023 RocketMQ test msg 5 : executeLocalTransaction:事务执行成功提交 Wed Jul 26 17:55:50 CST 2023 7F00000154F073D16E938497F1F70005 MessageQueue [topicTopic-test, brokerNamelocalhost.localdomain, queueId2] SEND_OK null 98 Wed Jul 26 17:55:51 CST 2023 RocketMQ test msg 6 : executeLocalTransaction:事务执行成功提交 Wed Jul 26 17:55:51 CST 2023 7F00000154F073D16E938497F5EC0006 MessageQueue [topicTopic-test, brokerNamelocalhost.localdomain, queueId3] SEND_OK null 99 Wed Jul 26 17:55:52 CST 2023 RocketMQ test msg 7 : executeLocalTransaction:事务执行失败回滚 Wed Jul 26 17:55:52 CST 2023 7F00000154F073D16E938497F9E50007 MessageQueue [topicTopic-test, brokerNamelocalhost.localdomain, queueId0] SEND_OK null 100 Wed Jul 26 17:55:53 CST 2023 RocketMQ test msg 8 : executeLocalTransaction:事务执行失败回滚 Wed Jul 26 17:55:53 CST 2023 7F00000154F073D16E938497FDD30008 MessageQueue [topicTopic-test, brokerNamelocalhost.localdomain, queueId1] SEND_OK null 101 Wed Jul 26 17:55:54 CST 2023 RocketMQ test msg 9 : executeLocalTransaction:事务执行成功提交 Wed Jul 26 17:55:54 CST 2023 7F00000154F073D16E93849801C90009 MessageQueue [topicTopic-test, brokerNamelocalhost.localdomain, queueId2] SEND_OK null 102// 消费者控制台打印 Receive New Messages: Wed Jul 26 17:55:50 CST 2023 RocketMQ test msg 5 At Wed Jul 26 17:55:50 CST 2023 Receive New Messages: Wed Jul 26 17:55:51 CST 2023 RocketMQ test msg 6 At Wed Jul 26 17:55:51 CST 2023 Receive New Messages: Wed Jul 26 17:55:54 CST 2023 RocketMQ test msg 9 At Wed Jul 26 17:55:54 CST 2023 这里消费者只收到了执行事务成功的5,6,9。
http://www.zqtcl.cn/news/368291/

相关文章:

  • 在手机上如何制作网站qq注册网页入口
  • asp.net程序做的网站安全吗国内什么网站用asp.net
  • 凡科网做网站网站编辑知识
  • c#做交易网站taxonomy wordpress
  • 统一门户网站开发员给我用织梦做的网站
  • 网站上有声的文章是怎么做的深圳市住房和建设局网站和市住宅租赁管理服务中心
  • 如何对网站进行爬虫页面设计存在的问题
  • 知名网站建设加盟合作企业邮箱如何登录
  • asp net mvc做网站软文推广是什么
  • 张家口住房和城乡建设厅网站如何做点击赚钱的网站
  • 网站在建设中无法访问贵州碧江区住房和城乡建设局网站
  • 营销类网站 英文东莞正规的免费网站优化
  • 柳州网站推广最好的公司百度seo优化培训
  • 哈尔滨门户网站建站哪个网站做农产品
  • 网站行业关键词如何建设网站
  • wordpress插件目录504wordpress访问优化插件
  • 固定ip做网站网页源码提取工具
  • php网站模板源码下载公司网络营销推广软件
  • 免费电子版个人简历模板温州快速排名优化
  • 网站修改titlewordpress显示icp备案
  • 中国国际贸易单一窗口登录南京专业网站优化公司
  • 手机网站建设合同wordpress案例分析
  • 深圳做网站什么公司好广州电商小程序开发
  • 郑州高新区做网站的公司如何欣赏网站
  • 网站做维恩图做网站的公司杭州
  • 柳州公司网站制作公司wordpress 网店
  • 网站增加栏目费用在网站开发中如何设置登录
  • 怎样用php做网站百度推广联系人
  • 怎么建立手机网站如何申请公司域名
  • 营销型网站怎么收费邓州企业网站