学校网站结构图,银州铁岭做网站,浙江网站建设优化,用手机做自己的网站一、RocketMq有3中消息类型1.普通消费2. 顺序消费3.事务消费顺序消费场景在网购的时候#xff0c;我们需要下单#xff0c;那么下单需要假如有三个顺序#xff0c;第一、创建订单 #xff0c;第二#xff1a;订单付款#xff0c;第三#xff1a;订单完成。也就是这个三个…一、RocketMq有3中消息类型1.普通消费2. 顺序消费3.事务消费顺序消费场景在网购的时候我们需要下单那么下单需要假如有三个顺序第一、创建订单 第二订单付款第三订单完成。也就是这个三个环节要有顺序这个订单才有意义。RocketMQ可以保证顺序消费。rocketMq实现顺序消费的原理produce在发送消息的时候把消息发到同一个队列(queue)中,消费者注册消息监听器为MessageListenerOrderly这样就可以保证消费端只有一个线程去消费消息注意是把把消息发到同一个队列(queue)不是同一个topic默认情况下一个topic包括4个queue单个节点(Producer端1个、Consumer端1个)1、Producer.javapackageorder;importjava.util.List;importcom.alibaba.rocketmq.client.exception.MQBrokerException;importcom.alibaba.rocketmq.client.exception.MQClientException;importcom.alibaba.rocketmq.client.producer.DefaultMQProducer;importcom.alibaba.rocketmq.client.producer.MessageQueueSelector;importcom.alibaba.rocketmq.client.producer.SendResult;importcom.alibaba.rocketmq.common.message.Message;importcom.alibaba.rocketmq.common.message.MessageQueue;importcom.alibaba.rocketmq.remoting.exception.RemotingException;/*** Producer发送顺序消息*/public classProducer {public static voidmain(String[] args) {try{DefaultMQProducer producer new DefaultMQProducer(order_Producer);producer.setNamesrvAddr(192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876);producer.start();//String[] tags new String[] { TagA, TagB, TagC, TagD,//TagE };for (int i 1; i 5; i) {Message msg new Message(TopicOrderTest, order_1, KEY i, (order_1 i).getBytes());SendResult sendResult producer.send(msg, newMessageQueueSelector() {public MessageQueue select(Listmqs, Message msg, Object arg) {Integer id(Integer) arg;int index id %mqs.size();returnmqs.get(index);}},0);System.out.println(sendResult);}producer.shutdown();}catch(MQClientException e) {e.printStackTrace();}catch(RemotingException e) {e.printStackTrace();}catch(MQBrokerException e) {e.printStackTrace();}catch(InterruptedException e) {e.printStackTrace();}}}2、Consumer.javapackageorder;importjava.util.List;importjava.util.concurrent.TimeUnit;importjava.util.concurrent.atomic.AtomicLong;importcom.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;importcom.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;importcom.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;importcom.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;importcom.alibaba.rocketmq.client.exception.MQClientException;importcom.alibaba.rocketmq.common.consumer.ConsumeFromWhere;importcom.alibaba.rocketmq.common.message.MessageExt;/*** 顺序消息消费带事务方式(应用可控制Offset什么时候提交)*/public classConsumer1 {public static void main(String[] args) throwsMQClientException {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(order_Consumer);consumer.setNamesrvAddr(192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876);/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费* 如果非第一次启动那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe(TopicOrderTest, *);consumer.registerMessageListener(newMessageListenerOrderly() {AtomicLong consumeTimes new AtomicLong(0);public ConsumeOrderlyStatus consumeMessage(Listmsgs, ConsumeOrderlyContext context) {//设置自动提交context.setAutoCommit(true);for(MessageExt msg : msgs) {System.out.println(msg ,内容 newString(msg.getBody()));}try{TimeUnit.SECONDS.sleep(5L);}catch(InterruptedException e) {e.printStackTrace();};returnConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.println(Consumer1 Started.);}}结果如下图所示这个五条数据被顺序消费了多个节点(Producer端1个、Consumer端2个)Producer.javapackageorder;importjava.util.List;importcom.alibaba.rocketmq.client.exception.MQBrokerException;importcom.alibaba.rocketmq.client.exception.MQClientException;importcom.alibaba.rocketmq.client.producer.DefaultMQProducer;importcom.alibaba.rocketmq.client.producer.MessageQueueSelector;importcom.alibaba.rocketmq.client.producer.SendResult;importcom.alibaba.rocketmq.common.message.Message;importcom.alibaba.rocketmq.common.message.MessageQueue;importcom.alibaba.rocketmq.remoting.exception.RemotingException;/*** Producer发送顺序消息*/public classProducer {public static voidmain(String[] args) {try{DefaultMQProducer producer new DefaultMQProducer(order_Producer);producer.setNamesrvAddr(192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876);producer.start();//String[] tags new String[] { TagA, TagB, TagC, TagD,//TagE };for (int i 1; i 5; i) {Message msg new Message(TopicOrderTest, order_1, KEY i, (order_1 i).getBytes());SendResult sendResult producer.send(msg, newMessageQueueSelector() {public MessageQueue select(Listmqs, Message msg, Object arg) {Integer id(Integer) arg;int index id %mqs.size();returnmqs.get(index);}},0);System.out.println(sendResult);}for (int i 1; i 5; i) {Message msg new Message(TopicOrderTest, order_2, KEY i, (order_2 i).getBytes());SendResult sendResult producer.send(msg, newMessageQueueSelector() {public MessageQueue select(Listmqs, Message msg, Object arg) {Integer id(Integer) arg;int index id %mqs.size();returnmqs.get(index);}},1);System.out.println(sendResult);}for (int i 1; i 5; i) {Message msg new Message(TopicOrderTest, order_3, KEY i, (order_3 i).getBytes());SendResult sendResult producer.send(msg, newMessageQueueSelector() {public MessageQueue select(Listmqs, Message msg, Object arg) {Integer id(Integer) arg;int index id %mqs.size();returnmqs.get(index);}},2);System.out.println(sendResult);}producer.shutdown();}catch(MQClientException e) {e.printStackTrace();}catch(RemotingException e) {e.printStackTrace();}catch(MQBrokerException e) {e.printStackTrace();}catch(InterruptedException e) {e.printStackTrace();}}}Consumer1.java/*** 顺序消息消费带事务方式(应用可控制Offset什么时候提交)*/public classConsumer1 {public static void main(String[] args) throwsMQClientException {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(order_Consumer);consumer.setNamesrvAddr(192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876);/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费* 如果非第一次启动那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe(TopicOrderTest, *);/*** 实现了MessageListenerOrderly表示一个队列只会被一个线程取到*第二个线程无法访问这个队列*/consumer.registerMessageListener(newMessageListenerOrderly() {AtomicLong consumeTimes new AtomicLong(0);public ConsumeOrderlyStatus consumeMessage(Listmsgs, ConsumeOrderlyContext context) {//设置自动提交context.setAutoCommit(true);for(MessageExt msg : msgs) {System.out.println(msg ,内容 newString(msg.getBody()));}try{TimeUnit.SECONDS.sleep(5L);}catch(InterruptedException e) {e.printStackTrace();};returnConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.println(Consumer1 Started.);}}Consumer2.java/*** 顺序消息消费带事务方式(应用可控制Offset什么时候提交)*/public classConsumer2 {public static void main(String[] args) throwsMQClientException {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(order_Consumer);consumer.setNamesrvAddr(192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876);/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费* 如果非第一次启动那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe(TopicOrderTest, *);/*** 实现了MessageListenerOrderly表示一个队列只会被一个线程取到*第二个线程无法访问这个队列*/consumer.registerMessageListener(newMessageListenerOrderly() {AtomicLong consumeTimes new AtomicLong(0);public ConsumeOrderlyStatus consumeMessage(Listmsgs, ConsumeOrderlyContext context) {//设置自动提交context.setAutoCommit(true);for(MessageExt msg : msgs) {System.out.println(msg ,内容 newString(msg.getBody()));}try{TimeUnit.SECONDS.sleep(5L);}catch(InterruptedException e) {e.printStackTrace();};returnConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.println(Consumer2 Started.);}}先启动Consumer1和Consumer2然后启动ProducerProducer会发送15条消息Consumer1消费情况如图都按照顺序执行了Consumer2消费情况如图都按照顺序执行了二、事务消费这里说的主要是分布式事物。下面的例子的数据库分别安装在不同的节点上。事物消费需要先说说什么是事务。比如说我们跨行转账从工商银行转到建设银行也就是我从工商银行扣除1000元之后我的建设银行也必须加1000元。这样才能保证数据的一致性。假如工商银行转1000元之后建设银行的服务器突然宕机那么我扣除了1000但是并没有在建设银行给我加1000就出现了数据的不一致。因此加1000和减1000才行减1000和减1000必须一起成功一起失败。再比如我们进行网购的时候我们下单之后订单提交成功仓库商品的数量必须减一。但是订单可能是一个数据库仓库数量可能又是在另个数据库里面。有可能订单提交成功之后仓库数量服务器突然宕机。这样也出现了数据不一致的问题。使用消息队列来解决分布式事物现在我们去外面饭店吃饭很多时候都不会直接给了钱之后直接在付款的窗口递饭菜而是付款之后他会给你一张小票你拿着这个小票去出饭的窗口取饭。这里和我们的系统类似提高了吞吐量。即使你到第二个窗口师傅告诉你已经没饭了你可以拿着这个凭证去退款即使中途由于出了意外你无法到达窗口进行取饭但是只要凭证还在可以将钱退给你。这样就保证了数据的一致性。如何保证凭证(消息)有2种方法1、在工商银行扣款的时候余额表扣除1000同时记录日志而且这2个表是在同一个数据库实例中可以使用本地事物解决。然后我们通知建设银行需要加1000给该用户建设银行收到之后给我返回已经加了1000给用户的确认信息之后我再标记日志表里面的日志为已经完成。2、通过消息中间件