马云是做网站的,加盟代理好项目哪家好,互联网工程有限公司,旅行社手机网站建设成1. RocketMq基本概念
1. NameServer
每个NameServer结点之间是相互独立#xff0c;彼此没有任何信息交互
启动NameServer。NameServer启动后监听端口#xff0c;等待Broker、Producer、Consumer连接#xff0c;
相当于一个路由控制中心。主要是用来保存topic路由信息#…1. RocketMq基本概念
1. NameServer
每个NameServer结点之间是相互独立彼此没有任何信息交互
启动NameServer。NameServer启动后监听端口等待Broker、Producer、Consumer连接
相当于一个路由控制中心。主要是用来保存topic路由信息管理Broker
2. Broker
消息存储和中转角色负责存储和转发消息
在启动时会向NameServer进行注册并且定时发送心跳包。心跳包中包含当前 Broker 信息
以及存储所有 Topic 信息。注册成功后NameServer 集群中就有 Topic跟Broker 的映射关系。
3. topic 一个消息的集合的名字
创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上也可以在发送消息时自动创建Topic。
4. 生产者
生产者发送消息。启动时先从 NameServer 集群中的其中一台拉取到路由表缓存到本地
并从 NameServer 中获取当前发送的 Topic存在于哪些 Broker 上
轮询从队列列表中选择一个队列默认轮询
5. 消费者
消费者跟其中一台NameServer建立连接获取当前订阅Topic存在哪些Broker上
然后直接跟Broker建立连接通道然后开始消费消息
2. maven 引入starter
dependency
groupIdorg.apache.rocketmq/groupId
artifactIdrocketmq-spring-boot-starter/artifactId
version2.2.2/version
/dependency
3.yml配置
3.1 生产者yml 配置
rocketmq:name-server: 127.0.0.1:9876producer:group: my-group# 发送消息超时时间send-message-timeout: 5000# 发送消息失败重试次数retry-times-when-send-failed: 2retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数默认2
3.2 消费者yml 配置
rocketmq:name-server: 127.0.0.1:9876consumer:topic: topic_testgroup: consumer_my-group
4.生产者发送消息
4.1 一般消息
Resourceprivate RocketMQTemplate rocketMQTemplate;/*** 一般消息* Topic 与 Tag 都是业务上用来归类的标识区别在于 Topic 是一级分类而 Tag 可以理解为是二级分类。* 使用 Tag 可以实现对 Topic 中的消息进行过滤。* **/GetMapping(/send)public String send(){rocketMQTemplate.convertAndSend(topic_test, Hello, World!);rocketMQTemplate.convertAndSend(topic_test:tagB,Hello, World222--tagB);return rocketMq普通消息发送完成;} 4.2 顺序消息
/** 支持消费者按照发送消息的先后顺序获取消息 */GetMapping(/send/orderly)public String sendOrder(){//发送顺序消息参数topic消息hashkey相同hashkey发送至同一个队列rocketMQTemplate.syncSendOrderly(topic_test:tagA, MessageBuilder.withPayload(消息编号 1).build(),queue);rocketMQTemplate.syncSendOrderly(topic_test:tagA, MessageBuilder.withPayload(消息编号 2).build(),queue);return rocketMq顺序-消息发送成功;}
4.3 同步消息
GetMapping(/send/sync)public String sendMsg() {String message 我是同步消息: LocalDateTime.now();SendResult result rocketMQTemplate.syncSend(topic_test:tagA, MessageBuilder.withPayload(message).build());log.info(同步-消息发送成功: LocalDateTime.now());return rocketMq 同步-消息发送成功 result.getSendStatus();} 4.4 异步消息
/** 发送异步消息 */GetMapping(/send/async)public String asyncSendMsg(){String message 我是异步消息: LocalDateTime.now();rocketMQTemplate.asyncSend(topic_test:tagA,message,new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) {log.info(发送成功 (后执行),SendStatus {},sendResult.getSendStatus());}Overridepublic void onException(Throwable throwable) {log.info(发送失败 (后执行));}});return rocketMq 异步-消息发送成功 LocalDateTime.now();} 4.5 单向消息:一般用来发送日志等不重要的消息
GetMapping(/send/oneWay)public String sendOneWayMessage() {String message 我是单向消息LocalDateTime.now();this.rocketMQTemplate.sendOneWay(topic_test:tagA, message);log.info(单向发送消息完成message {}, message);return rocketMq 单向-消息发送成功 LocalDateTime.now();} 4.6 延时消息
/** 延时消息 */GetMapping(/sendDelay)public String sendDelay(){String message 我是延时消息: LocalDateTime.now();// 第四个参数为延时级别分为1-18:1、5、10、30、1m、2m、3m、...10m、20m、30m、1h、2hrocketMQTemplate.syncSend(topic_test:tagC, MessageBuilder.withPayload(message).build(), 3000, 2);return rocketMq延时-消息发送成功;} 4.7 事务消息
4.7.1 事务消息发送代码
/** 事务消息 */GetMapping(/send/transaction/{id})public void sendTransactionMessage(PathVariable(id) Integer id){//发送事务消息:采用的是sendMessageInTransaction方法返回结果为TransactionSendResult对象该对象中包含了事务发送的状态、本地事务执行的状态等//参数一topic;参数二消息// 事务idString[] tags {tagA, tagB, tagC};int i id%3;String transactionId UUID.randomUUID().toString();String message 我是事务消息: LocalDateTime.now();TransactionSendResult result rocketMQTemplate.sendMessageInTransaction(topic_test: tags[i], MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.TRANSACTION_ID,transactionId).build(),// 给本地事务的参数2);//发送状态String sendStatus result.getSendStatus().name();//本地事务执行状态String localState result.getLocalTransactionState().name();log.info(发送状态:sendStatus;本地事务执行状态localState);}
4.7.2 继承 RocketMQLocalTransactionListener
Slf4j
RocketMQTransactionListener
public class MyTransactionListener implements RocketMQLocalTransactionListener {Overridepublic RocketMQLocalTransactionState executeLocalTransaction(org.springframework.messaging.Message message, Object o) {MessageHeaders headers message.getHeaders();//获取事务IDString transactionId (String) headers.get(RocketMQHeaders.TRANSACTION_ID);log.info(执行本地事务 ,transactionId is {}, orderId is {},transactionId, message.getHeaders().get(rocketmq_TAGS));try{//模拟网络波动Thread.sleep(3000);/**** 首先发送一个半消息half message这个消息不会立即投递给消费者然后执行本地事务比如数据库操作。* 根据本地事务的执行结果决定是提交commit还是回滚rollback这个消息。* 如果本地事务成功消息会被提交并发送给消费者* 如果失败消息会被回滚消费者不会接收到这个消息*/}catch (Exception e){return RocketMQLocalTransactionState.ROLLBACK;}// 执行本地事务String tag String.valueOf(message.getHeaders().get(rocketmq_TAGS));if (StringUtils.equals(tagA, tag)){//这里只讲TAGA消息提交状态为可执行return RocketMQLocalTransactionState.COMMIT;}else if (StringUtils.equals(tagB, tag)) {return RocketMQLocalTransactionState.ROLLBACK;} else if (StringUtils.equals(tagC,tag)) {return RocketMQLocalTransactionState.UNKNOWN;}log.info(事务提交消息正常处理: LocalDateTime.now());//执行成功可以提交事务return RocketMQLocalTransactionState.COMMIT;}Overridepublic RocketMQLocalTransactionState checkLocalTransaction(org.springframework.messaging.Message message) {MessageHeaders headers message.getHeaders();//获取事务IDString transactionId (String) headers.get(RocketMQHeaders.TRANSACTION_ID);log.info(transactionId 消息回查 LocalDateTime.now());return RocketMQLocalTransactionState.ROLLBACK;}
}tagA、tagB、tagC 三种事务消息只有Commit的才能发送到broker 5. 消费端
/*** topic指定消费的主题consumerGroup指定消费组,* 一个主题可以有多个消费者组,一个消息可以被多个不同的组的消费者都消费* 2.实现RocketMQListener接口* 如果想拿到消息的其他参数可以写成MessageExt* selectorExpression tagA || tagB 指定tag 的消费*/
Service
Slf4j
RocketMQMessageListener(topic ${rocketmq.consumer.topic}, consumerGroup ${rocketmq.consumer.group})
public class RocketMqConsumer implements RocketMQListenerString{Overridepublic void onMessage(String s) {log.info(topic_test: 所有的收到消息s);}}
6.广播消费模式
生产端是一样的但是消费端需要增加一个参数
messageModel设置消费模式,取值范围CLUSTERING(集群消费)、BROADCASTING(广播消费)
Service
Slf4j
RocketMQMessageListener(topic ${rocketmq.consumer.topic}, consumerGroup ${rocketmq.consumer.group}, messageModel MessageModel.BROADCASTING)
public class RocketMqConsumer implements RocketMQListenerString{Overridepublic void onMessage(String s) {log.info(consumer2---topic_test: 所有的收到消息s);}}// 第2个消费者类他们都是一样的代码
//为了表示广播就是一个消息会被这两个消费者消费Service
Slf4j
RocketMQMessageListener(topic ${rocketmq.consumer.topic}, consumerGroup ${rocketmq.consumer.group}, messageModel MessageModel.BROADCASTING)
public class RocketMqConsumer implements RocketMQListenerString{Overridepublic void onMessage(String s) {log.info(consumer1--topic_test: 所有的收到消息s);}}7.其他
RocketMQ 通过消费者组Consumer Group来维护不同消费者的消费进度。每个消费者组都有一个消费进度offset用于标记该组下的消费者在某个主题Topic和队列Queue上已经消费到的位置。所以不同的消费者组会被视为不同的消费者