当前位置: 首页 > news >正文

马云是做网站的加盟代理好项目哪家好

马云是做网站的,加盟代理好项目哪家好,互联网工程有限公司,旅行社手机网站建设成1. RocketMq基本概念 1. NameServer 每个NameServer结点之间是相互独立#xff0c;彼此没有任何信息交互 启动NameServer。NameServer启动后监听端口#xff0c;等待Broker、Producer、Consumer连接#xff0c; 相当于一个路由控制中心。主要是用来保存topic路由信息#…1. RocketMq基本概念 1. NameServer 每个NameServer结点之间是相互独立彼此没有任何信息交互 启动NameServer。NameServer启动后监听端口等待Broker、Producer、Consumer连接 相当于一个路由控制中心。主要是用来保存topic路由信息管理Broker 2. Broker 消息存储和中转角色负责存储和转发消息 在启动时会向NameServer进行注册并且定时发送心跳包。心跳包中包含当前 Broker 信息 以及存储所有 Topic 信息。注册成功后NameServer 集群中就有 Topic跟Broker 的映射关系。 3. topic 一个消息的集合的名字 创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上也可以在发送消息时自动创建Topic。 4. 生产者 生产者发送消息。启动时先从 NameServer 集群中的其中一台拉取到路由表缓存到本地 并从 NameServer 中获取当前发送的 Topic存在于哪些 Broker 上 轮询从队列列表中选择一个队列默认轮询 5. 消费者 消费者跟其中一台NameServer建立连接获取当前订阅Topic存在哪些Broker上 然后直接跟Broker建立连接通道然后开始消费消息 2. maven 引入starter dependency groupIdorg.apache.rocketmq/groupId artifactIdrocketmq-spring-boot-starter/artifactId version2.2.2/version /dependency 3.yml配置 3.1 生产者yml 配置 rocketmq:name-server: 127.0.0.1:9876producer:group: my-group# 发送消息超时时间send-message-timeout: 5000# 发送消息失败重试次数retry-times-when-send-failed: 2retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数默认2 3.2 消费者yml 配置 rocketmq:name-server: 127.0.0.1:9876consumer:topic: topic_testgroup: consumer_my-group 4.生产者发送消息 4.1 一般消息 Resourceprivate RocketMQTemplate rocketMQTemplate;/*** 一般消息* Topic 与 Tag 都是业务上用来归类的标识区别在于 Topic 是一级分类而 Tag 可以理解为是二级分类。* 使用 Tag 可以实现对 Topic 中的消息进行过滤。* **/GetMapping(/send)public String send(){rocketMQTemplate.convertAndSend(topic_test, Hello, World!);rocketMQTemplate.convertAndSend(topic_test:tagB,Hello, World222--tagB);return rocketMq普通消息发送完成;} 4.2 顺序消息 /** 支持消费者按照发送消息的先后顺序获取消息 */GetMapping(/send/orderly)public String sendOrder(){//发送顺序消息参数topic消息hashkey相同hashkey发送至同一个队列rocketMQTemplate.syncSendOrderly(topic_test:tagA, MessageBuilder.withPayload(消息编号 1).build(),queue);rocketMQTemplate.syncSendOrderly(topic_test:tagA, MessageBuilder.withPayload(消息编号 2).build(),queue);return rocketMq顺序-消息发送成功;} 4.3 同步消息 GetMapping(/send/sync)public String sendMsg() {String message 我是同步消息: LocalDateTime.now();SendResult result rocketMQTemplate.syncSend(topic_test:tagA, MessageBuilder.withPayload(message).build());log.info(同步-消息发送成功: LocalDateTime.now());return rocketMq 同步-消息发送成功 result.getSendStatus();} 4.4 异步消息 /** 发送异步消息 */GetMapping(/send/async)public String asyncSendMsg(){String message 我是异步消息: LocalDateTime.now();rocketMQTemplate.asyncSend(topic_test:tagA,message,new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) {log.info(发送成功 (后执行),SendStatus {},sendResult.getSendStatus());}Overridepublic void onException(Throwable throwable) {log.info(发送失败 (后执行));}});return rocketMq 异步-消息发送成功 LocalDateTime.now();} 4.5 单向消息:一般用来发送日志等不重要的消息 GetMapping(/send/oneWay)public String sendOneWayMessage() {String message 我是单向消息LocalDateTime.now();this.rocketMQTemplate.sendOneWay(topic_test:tagA, message);log.info(单向发送消息完成message {}, message);return rocketMq 单向-消息发送成功 LocalDateTime.now();} 4.6 延时消息 /** 延时消息 */GetMapping(/sendDelay)public String sendDelay(){String message 我是延时消息: LocalDateTime.now();// 第四个参数为延时级别分为1-18:1、5、10、30、1m、2m、3m、...10m、20m、30m、1h、2hrocketMQTemplate.syncSend(topic_test:tagC, MessageBuilder.withPayload(message).build(), 3000, 2);return rocketMq延时-消息发送成功;} 4.7 事务消息 4.7.1 事务消息发送代码 /** 事务消息 */GetMapping(/send/transaction/{id})public void sendTransactionMessage(PathVariable(id) Integer id){//发送事务消息:采用的是sendMessageInTransaction方法返回结果为TransactionSendResult对象该对象中包含了事务发送的状态、本地事务执行的状态等//参数一topic;参数二消息// 事务idString[] tags {tagA, tagB, tagC};int i id%3;String transactionId UUID.randomUUID().toString();String message 我是事务消息: LocalDateTime.now();TransactionSendResult result rocketMQTemplate.sendMessageInTransaction(topic_test: tags[i], MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.TRANSACTION_ID,transactionId).build(),// 给本地事务的参数2);//发送状态String sendStatus result.getSendStatus().name();//本地事务执行状态String localState result.getLocalTransactionState().name();log.info(发送状态:sendStatus;本地事务执行状态localState);} 4.7.2 继承 RocketMQLocalTransactionListener Slf4j RocketMQTransactionListener public class MyTransactionListener implements RocketMQLocalTransactionListener {Overridepublic RocketMQLocalTransactionState executeLocalTransaction(org.springframework.messaging.Message message, Object o) {MessageHeaders headers message.getHeaders();//获取事务IDString transactionId (String) headers.get(RocketMQHeaders.TRANSACTION_ID);log.info(执行本地事务 ,transactionId is {}, orderId is {},transactionId, message.getHeaders().get(rocketmq_TAGS));try{//模拟网络波动Thread.sleep(3000);/**** 首先发送一个半消息half message这个消息不会立即投递给消费者然后执行本地事务比如数据库操作。* 根据本地事务的执行结果决定是提交commit还是回滚rollback这个消息。* 如果本地事务成功消息会被提交并发送给消费者* 如果失败消息会被回滚消费者不会接收到这个消息*/}catch (Exception e){return RocketMQLocalTransactionState.ROLLBACK;}// 执行本地事务String tag String.valueOf(message.getHeaders().get(rocketmq_TAGS));if (StringUtils.equals(tagA, tag)){//这里只讲TAGA消息提交状态为可执行return RocketMQLocalTransactionState.COMMIT;}else if (StringUtils.equals(tagB, tag)) {return RocketMQLocalTransactionState.ROLLBACK;} else if (StringUtils.equals(tagC,tag)) {return RocketMQLocalTransactionState.UNKNOWN;}log.info(事务提交消息正常处理: LocalDateTime.now());//执行成功可以提交事务return RocketMQLocalTransactionState.COMMIT;}Overridepublic RocketMQLocalTransactionState checkLocalTransaction(org.springframework.messaging.Message message) {MessageHeaders headers message.getHeaders();//获取事务IDString transactionId (String) headers.get(RocketMQHeaders.TRANSACTION_ID);log.info(transactionId 消息回查 LocalDateTime.now());return RocketMQLocalTransactionState.ROLLBACK;} }tagA、tagB、tagC 三种事务消息只有Commit的才能发送到broker  5. 消费端 /*** topic指定消费的主题consumerGroup指定消费组,* 一个主题可以有多个消费者组,一个消息可以被多个不同的组的消费者都消费* 2.实现RocketMQListener接口* 如果想拿到消息的其他参数可以写成MessageExt* selectorExpression tagA || tagB 指定tag 的消费*/ Service Slf4j RocketMQMessageListener(topic ${rocketmq.consumer.topic}, consumerGroup ${rocketmq.consumer.group}) public class RocketMqConsumer implements RocketMQListenerString{Overridepublic void onMessage(String s) {log.info(topic_test: 所有的收到消息s);}} 6.广播消费模式 生产端是一样的但是消费端需要增加一个参数 messageModel设置消费模式,取值范围CLUSTERING(集群消费)、BROADCASTING(广播消费) Service Slf4j RocketMQMessageListener(topic ${rocketmq.consumer.topic}, consumerGroup ${rocketmq.consumer.group}, messageModel MessageModel.BROADCASTING) public class RocketMqConsumer implements RocketMQListenerString{Overridepublic void onMessage(String s) {log.info(consumer2---topic_test: 所有的收到消息s);}}// 第2个消费者类他们都是一样的代码 //为了表示广播就是一个消息会被这两个消费者消费Service Slf4j RocketMQMessageListener(topic ${rocketmq.consumer.topic}, consumerGroup ${rocketmq.consumer.group}, messageModel MessageModel.BROADCASTING) public class RocketMqConsumer implements RocketMQListenerString{Overridepublic void onMessage(String s) {log.info(consumer1--topic_test: 所有的收到消息s);}}7.其他 RocketMQ 通过消费者组Consumer Group来维护不同消费者的消费进度。每个消费者组都有一个消费进度offset用于标记该组下的消费者在某个主题Topic和队列Queue上已经消费到的位置。所以不同的消费者组会被视为不同的消费者
http://www.zqtcl.cn/news/933741/

相关文章:

  • 表白网页制作免费网站制作西安网站快速优化
  • 如何破解网站后台管理做网站前端用什么软件好
  • 网站建设业务客户来源建德建设局官方网站
  • 网站设计 网站开发 优化网页设计一般尺寸
  • 好的版式设计网站网站建设商标属于哪个类别
  • 做淘宝素材网站哪个好用中国广告公司100强
  • 海拉尔网站建设平台wordpress的插件下载地址
  • 企业服务类网站常用python编程软件
  • 有哪些漫画做的好的网站西安seo建站
  • 在建设部网站如何查询注册信息网站开发项目的前端后端数据库
  • 自助建站网站seo公司wordpress 相册 免费模板
  • 搜索建站网在线crm管理系统
  • 旅游网站管理系统源码wordpress 禁止爬虫
  • 会员登录系统网站建设wordpress 二级页面
  • 北京网站建设公司代理记账代理公司注册
  • 网站建设需要提供的资料物流企业网站建设与管理规划书
  • .net 手机网站开发wordpress下载链接框
  • 省直部门门户网站建设网站视频点播怎么做
  • 广西网站建设-好发信息网做信息图的网站
  • 网站建设费用怎么算遵义市住房和城乡建设局官方网站
  • 网站部分网页乱码手把手教建设网站
  • 电商网站开发目的举报网站建设运行情况
  • 网站专业设计在线科技成都网站推广公司
  • 怎么建设幸运28网站seo工作是什么意思
  • 人工智能和网站开发如何做网站栏目
  • 设计有什么网站推荐ppt大全免费模板
  • 建站点wordpress百度云
  • 微信朋友圈的网站连接怎么做公众号小程序制作步骤
  • 做移动互联网站点网站建设完工确认书
  • 网站建设英语翻译资料潼南国外免费自助建站