当前位置: 首页 > news >正文

学校网站结构图银州铁岭做网站

学校网站结构图,银州铁岭做网站,浙江网站建设优化,用手机做自己的网站一、RocketMq有3中消息类型1.普通消费2. 顺序消费3.事务消费顺序消费场景在网购的时候#xff0c;我们需要下单#xff0c;那么下单需要假如有三个顺序#xff0c;第一、创建订单 #xff0c;第二#xff1a;订单付款#xff0c;第三#xff1a;订单完成。也就是这个三个…一、RocketMq有3中消息类型1.普通消费2. 顺序消费3.事务消费顺序消费场景在网购的时候我们需要下单那么下单需要假如有三个顺序第一、创建订单 第二订单付款第三订单完成。也就是这个三个环节要有顺序这个订单才有意义。RocketMQ可以保证顺序消费。rocketMq实现顺序消费的原理produce在发送消息的时候把消息发到同一个队列(queue)中,消费者注册消息监听器为MessageListenerOrderly这样就可以保证消费端只有一个线程去消费消息注意是把把消息发到同一个队列(queue)不是同一个topic默认情况下一个topic包括4个queue单个节点(Producer端1个、Consumer端1个)1、Producer.javapackageorder;importjava.util.List;importcom.alibaba.rocketmq.client.exception.MQBrokerException;importcom.alibaba.rocketmq.client.exception.MQClientException;importcom.alibaba.rocketmq.client.producer.DefaultMQProducer;importcom.alibaba.rocketmq.client.producer.MessageQueueSelector;importcom.alibaba.rocketmq.client.producer.SendResult;importcom.alibaba.rocketmq.common.message.Message;importcom.alibaba.rocketmq.common.message.MessageQueue;importcom.alibaba.rocketmq.remoting.exception.RemotingException;/*** Producer发送顺序消息*/public classProducer {public static voidmain(String[] args) {try{DefaultMQProducer producer new DefaultMQProducer(order_Producer);producer.setNamesrvAddr(192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876);producer.start();//String[] tags new String[] { TagA, TagB, TagC, TagD,//TagE };for (int i 1; i 5; i) {Message msg new Message(TopicOrderTest, order_1, KEY i, (order_1 i).getBytes());SendResult sendResult producer.send(msg, newMessageQueueSelector() {public MessageQueue select(Listmqs, Message msg, Object arg) {Integer id(Integer) arg;int index id %mqs.size();returnmqs.get(index);}},0);System.out.println(sendResult);}producer.shutdown();}catch(MQClientException e) {e.printStackTrace();}catch(RemotingException e) {e.printStackTrace();}catch(MQBrokerException e) {e.printStackTrace();}catch(InterruptedException e) {e.printStackTrace();}}}2、Consumer.javapackageorder;importjava.util.List;importjava.util.concurrent.TimeUnit;importjava.util.concurrent.atomic.AtomicLong;importcom.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;importcom.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;importcom.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;importcom.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;importcom.alibaba.rocketmq.client.exception.MQClientException;importcom.alibaba.rocketmq.common.consumer.ConsumeFromWhere;importcom.alibaba.rocketmq.common.message.MessageExt;/*** 顺序消息消费带事务方式(应用可控制Offset什么时候提交)*/public classConsumer1 {public static void main(String[] args) throwsMQClientException {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(order_Consumer);consumer.setNamesrvAddr(192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876);/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费* 如果非第一次启动那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe(TopicOrderTest, *);consumer.registerMessageListener(newMessageListenerOrderly() {AtomicLong consumeTimes new AtomicLong(0);public ConsumeOrderlyStatus consumeMessage(Listmsgs, ConsumeOrderlyContext context) {//设置自动提交context.setAutoCommit(true);for(MessageExt msg : msgs) {System.out.println(msg ,内容 newString(msg.getBody()));}try{TimeUnit.SECONDS.sleep(5L);}catch(InterruptedException e) {e.printStackTrace();};returnConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.println(Consumer1 Started.);}}结果如下图所示这个五条数据被顺序消费了多个节点(Producer端1个、Consumer端2个)Producer.javapackageorder;importjava.util.List;importcom.alibaba.rocketmq.client.exception.MQBrokerException;importcom.alibaba.rocketmq.client.exception.MQClientException;importcom.alibaba.rocketmq.client.producer.DefaultMQProducer;importcom.alibaba.rocketmq.client.producer.MessageQueueSelector;importcom.alibaba.rocketmq.client.producer.SendResult;importcom.alibaba.rocketmq.common.message.Message;importcom.alibaba.rocketmq.common.message.MessageQueue;importcom.alibaba.rocketmq.remoting.exception.RemotingException;/*** Producer发送顺序消息*/public classProducer {public static voidmain(String[] args) {try{DefaultMQProducer producer new DefaultMQProducer(order_Producer);producer.setNamesrvAddr(192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876);producer.start();//String[] tags new String[] { TagA, TagB, TagC, TagD,//TagE };for (int i 1; i 5; i) {Message msg new Message(TopicOrderTest, order_1, KEY i, (order_1 i).getBytes());SendResult sendResult producer.send(msg, newMessageQueueSelector() {public MessageQueue select(Listmqs, Message msg, Object arg) {Integer id(Integer) arg;int index id %mqs.size();returnmqs.get(index);}},0);System.out.println(sendResult);}for (int i 1; i 5; i) {Message msg new Message(TopicOrderTest, order_2, KEY i, (order_2 i).getBytes());SendResult sendResult producer.send(msg, newMessageQueueSelector() {public MessageQueue select(Listmqs, Message msg, Object arg) {Integer id(Integer) arg;int index id %mqs.size();returnmqs.get(index);}},1);System.out.println(sendResult);}for (int i 1; i 5; i) {Message msg new Message(TopicOrderTest, order_3, KEY i, (order_3 i).getBytes());SendResult sendResult producer.send(msg, newMessageQueueSelector() {public MessageQueue select(Listmqs, Message msg, Object arg) {Integer id(Integer) arg;int index id %mqs.size();returnmqs.get(index);}},2);System.out.println(sendResult);}producer.shutdown();}catch(MQClientException e) {e.printStackTrace();}catch(RemotingException e) {e.printStackTrace();}catch(MQBrokerException e) {e.printStackTrace();}catch(InterruptedException e) {e.printStackTrace();}}}Consumer1.java/*** 顺序消息消费带事务方式(应用可控制Offset什么时候提交)*/public classConsumer1 {public static void main(String[] args) throwsMQClientException {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(order_Consumer);consumer.setNamesrvAddr(192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876);/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费* 如果非第一次启动那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe(TopicOrderTest, *);/*** 实现了MessageListenerOrderly表示一个队列只会被一个线程取到*第二个线程无法访问这个队列*/consumer.registerMessageListener(newMessageListenerOrderly() {AtomicLong consumeTimes new AtomicLong(0);public ConsumeOrderlyStatus consumeMessage(Listmsgs, ConsumeOrderlyContext context) {//设置自动提交context.setAutoCommit(true);for(MessageExt msg : msgs) {System.out.println(msg ,内容 newString(msg.getBody()));}try{TimeUnit.SECONDS.sleep(5L);}catch(InterruptedException e) {e.printStackTrace();};returnConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.println(Consumer1 Started.);}}Consumer2.java/*** 顺序消息消费带事务方式(应用可控制Offset什么时候提交)*/public classConsumer2 {public static void main(String[] args) throwsMQClientException {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(order_Consumer);consumer.setNamesrvAddr(192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876);/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费* 如果非第一次启动那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe(TopicOrderTest, *);/*** 实现了MessageListenerOrderly表示一个队列只会被一个线程取到*第二个线程无法访问这个队列*/consumer.registerMessageListener(newMessageListenerOrderly() {AtomicLong consumeTimes new AtomicLong(0);public ConsumeOrderlyStatus consumeMessage(Listmsgs, ConsumeOrderlyContext context) {//设置自动提交context.setAutoCommit(true);for(MessageExt msg : msgs) {System.out.println(msg ,内容 newString(msg.getBody()));}try{TimeUnit.SECONDS.sleep(5L);}catch(InterruptedException e) {e.printStackTrace();};returnConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.println(Consumer2 Started.);}}先启动Consumer1和Consumer2然后启动ProducerProducer会发送15条消息Consumer1消费情况如图都按照顺序执行了Consumer2消费情况如图都按照顺序执行了二、事务消费这里说的主要是分布式事物。下面的例子的数据库分别安装在不同的节点上。事物消费需要先说说什么是事务。比如说我们跨行转账从工商银行转到建设银行也就是我从工商银行扣除1000元之后我的建设银行也必须加1000元。这样才能保证数据的一致性。假如工商银行转1000元之后建设银行的服务器突然宕机那么我扣除了1000但是并没有在建设银行给我加1000就出现了数据的不一致。因此加1000和减1000才行减1000和减1000必须一起成功一起失败。再比如我们进行网购的时候我们下单之后订单提交成功仓库商品的数量必须减一。但是订单可能是一个数据库仓库数量可能又是在另个数据库里面。有可能订单提交成功之后仓库数量服务器突然宕机。这样也出现了数据不一致的问题。使用消息队列来解决分布式事物现在我们去外面饭店吃饭很多时候都不会直接给了钱之后直接在付款的窗口递饭菜而是付款之后他会给你一张小票你拿着这个小票去出饭的窗口取饭。这里和我们的系统类似提高了吞吐量。即使你到第二个窗口师傅告诉你已经没饭了你可以拿着这个凭证去退款即使中途由于出了意外你无法到达窗口进行取饭但是只要凭证还在可以将钱退给你。这样就保证了数据的一致性。如何保证凭证(消息)有2种方法1、在工商银行扣款的时候余额表扣除1000同时记录日志而且这2个表是在同一个数据库实例中可以使用本地事物解决。然后我们通知建设银行需要加1000给该用户建设银行收到之后给我返回已经加了1000给用户的确认信息之后我再标记日志表里面的日志为已经完成。2、通过消息中间件
http://www.zqtcl.cn/news/256292/

相关文章:

  • 宠物网站制作内容正规货源网站大全
  • 网站建设pc端软件公司简介
  • 科技公司企业网站源码如何免费建购物网站
  • 用动物做网站名甘肃省城乡建设网站
  • 重庆网站制作长沙榆林网站建设
  • 加快政务公开网站建设在中企动力工作的感受
  • 佛山网站搜索排名宿迁新站seo
  • 上海免费网站建设公司南通高端网站
  • 网站被镜像 站长学院那个网站都有做莱的图片
  • 个人简历 网站开发做同城网站需要哪些手续
  • 建网站的公司南京网站权重是什么
  • 网站建设策略百度云域名没有备案怎么做网站
  • 档案网站建设图片网站名查找
  • 九亭镇村镇建设办官方网站好看的网站设计公司
  • 怎样建立门户网站怎么用wordpress模板
  • 潍坊专业建站wordpress建个人博客
  • 手把手网站开发网站建设违法行为
  • 网站模板插件做网站要审批吗
  • 建立网站如何盈利有哪些做室内设计好用的网站有哪些
  • 商城网站设计服务商网站开发时的闭包写法
  • 福建永安建设局网站如何在百度免费发布广告
  • 网站建设要用到哪些应用工具国际新闻最新消息今天2024年
  • 网站代码怎么打开门户网站建设目的
  • 个人网站开发项目总结做网站模板的网页名称是m开头
  • 响水哪家专业做网站win wordpress
  • 做图标去什么网站找微网页制作软件手机版
  • 网站开发源程序网页宣传方案
  • 做婚礼设计在哪个网站下载素材西安企业网站建设
  • 灵犀科技网站开发佼佼者门户网站建设和检务公开整改
  • php mysql做网站登录免费素材哪里找