想在土巴兔做装修网站找谁,动图生成器在线制作,免费网站建设绑定域名,外贸营销渠道在现代分布式架构的开发中#xff0c;消息队列扮演着至关重要的角色#xff0c;用于解耦系统组件、保障可靠性以及实现异步通信。RocketMQ作为一款开源的分布式消息中间件#xff0c;凭借其高性能、高可用性和良好的扩展性#xff0c;成为了众多企业在构建高可靠性、高吞吐… 在现代分布式架构的开发中消息队列扮演着至关重要的角色用于解耦系统组件、保障可靠性以及实现异步通信。RocketMQ作为一款开源的分布式消息中间件凭借其高性能、高可用性和良好的扩展性成为了众多企业在构建高可靠性、高吞吐量应用系统时的首选。 对于Spring Cloud Alibaba的用户来说集成RocketMQ并进行消息的发送与消费是常见的任务。本篇博客将深入介绍RocketMQ的基础使用方法带大家一步步学习如何在Spring Cloud Alibaba中发送和消费消息。 在开始之前确保已经完成了以下准备工作
安装RocketMQ确保已经在系统中成功安装了RocketMQ并启动了相关服务。教程可以查看我上一篇博客 【Spring Cloud Alibaba】Linux安装RocketMQ以及RocketMQ Dashboard可视化工具JDK安装了JDK 1.8及以上版本以便于运行Java应用程序。 文章目录 第一步搭建rocketmq项目环境 第二步生产者代码普通消息普通消息发送同步发送异步发送单向模式发送 普通消息接收 顺序消息顺序消息发送顺序消息接收 延迟消息 延迟消息发送延时消息接收 批量消息批量消息发送批量消息接收 事务消息 事务消息发送在实际中遇到的问题 rocketmq官网地址 https://rocketmq.apache.org/zh/docs/4.x/ 这里我们讲的是4.x的版本 第一步搭建rocketmq项目环境 dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-spring-boot-starter/artifactIdversion2.2.1/version/dependency我们使用的是rocketmq-spring-boot-starter2.2.1其中的rocketmq版本为4.9.1 第二步生产者代码
普通消息
普通消息发送
同步发送 同步发送是最常用的方式是指消息发送方发出一条消息后会在收到服务端同步响应之后才发下一条消息的通讯方式可靠的同步传输被广泛应用于各种场景如重要的通知消息、短消息通知等。 /*** 普通消息发送 https://rocketmq.apache.org/zh/docs/4.x/producer/02message1#31-%E5%90%8C%E6%AD%A5%E5%8F%91%E9%80%81** return*/GetMapping(/syncSend)public SendResult syncSend(String message) {MessageString stringMessage MessageBuilder.createMessage(message, new MessageHeaders(null));return rocketMQTemplate.syncSend(my-topic:*, stringMessage);}异步发送 异步发送是指发送方发出一条消息后不等服务端返回响应接着发送下一条消息的通讯方式。 /*** 普通消息发送 https://rocketmq.apache.org/zh/docs/4.x/producer/02message1#32-%E5%BC%82%E6%AD%A5%E5%8F%91%E9%80%81** return*/GetMapping(/asyncSend)public String asyncSend(String message) {MessageString stringMessage MessageBuilder.createMessage(message, new MessageHeaders(null));rocketMQTemplate.asyncSend(my-topic:*, stringMessage, new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) {System.out.println(异步发送服务器返回信息成功);}Overridepublic void onException(Throwable throwable) {System.out.println(异步发送服务器返回信息失败);}});return 异步发送成功;}
单向模式发送 发送方只负责发送消息不等待服务端返回响应且没有回调函数触发即只发送请求不等待应答。此方式发送消息的过程耗时非常短一般在微秒级别。适用于某些耗时非常短但对可靠性要求并不高的场景例如日志收集。 /*** 普通消息发送 https://rocketmq.apache.org/zh/docs/4.x/producer/02message1#33-%E5%8D%95%E5%90%91%E6%A8%A1%E5%BC%8F%E5%8F%91%E9%80%81** return*/GetMapping(/sendOneway)public String sendOneway(String message) {MessageString stringMessage MessageBuilder.createMessage(message, new MessageHeaders(null));rocketMQTemplate.sendOneWay(my-topic:*, stringMessage);return 发送成功;} 普通消息接收
MQ的消费模式可以大致分为两种一种是推Push一种是拉Pull。
Push是服务端主动推送消息给客户端优点是及时性较好但如果客户端没有做好流控一旦服务端推送大量消息到客户端时就会导致客户端消息堆积甚至崩溃。
Pull是客户端需要主动到服务端取数据优点是客户端可以依据自己的消费能力进行消费但拉取的频率也需要用户自己控制拉取频繁容易造成服务端和客户端的压力拉取间隔长又容易造成消费不及时。
Apache RocketMQ既提供了Push模式也提供了Pull模式。
该博客中所有消费者都使用默认的push模式
Component
RocketMQMessageListener(topic my-topic, consumerGroup my-group, consumeMode ConsumeMode.CONCURRENTLY)
public class NormalRocketMQListener implements RocketMQListenerString {Overridepublic void onMessage(String message) {System.out.println(Received message:message);}
}
messageModel MessageModel.BROADCASTING即广播消息每个消费者都会去消费 但是即使都消费了但是trackType都会显示NOT_CONSUME_YET
顺序消息
顺序消息发送
顺序消息是一种对消息发送和消费顺序有严格要求的消息。 对于一个指定的Topic消息严格按照先进先出FIFO的原则进行消息发布和消费即先发布的消息先消费后发布的消息后消费
/*** 顺序消息发送 https://rocketmq.apache.org/zh/docs/4.x/producer/03message2** return*/GetMapping(/syncSendOrderly)public SendResult syncSendOrderly(String message) {String[] split message.split(,);ListMessageString list new ArrayList();for (String mes : split) {list.add(MessageBuilder.createMessage(mes, new MessageHeaders(null)));}return rocketMQTemplate.syncSendOrderly(order-topic:*, list, String.valueOf(System.currentTimeMillis()));}顺序消息接收
消费者这里要设置consumeMode ConsumeMode.ORDERLY才能实现顺序接收
Component
RocketMQMessageListener(topic order-topic, consumerGroup order-topic, consumeMode ConsumeMode.ORDERLY)
public class OrderRocketMQListener implements RocketMQListenerString {Overridepublic void onMessage(String message) {System.out.println(Received message:message);}
} 延迟消息 延迟消息发送
延迟消息发送是指消息发送到Apache RocketMQ后并不期望立马投递这条消息而是延迟一定时间后才投递到Consumer进行消费。 /*** 延迟消息发送 https://rocketmq.apache.org/zh/docs/4.x/producer/04message3** return*/GetMapping(/send)public SendResult send(String message) {MessageString stringMessage MessageBuilder.createMessage(message, new MessageHeaders(null));return rocketMQTemplate.syncSend(delay-topic:*, stringMessage,1000,2);}延时消息接收
Component
RocketMQMessageListener(topic delay-topic, consumerGroup delay-topic)
public class DelayRocketMQListener implements RocketMQListenerString {Overridepublic void onMessage(String message) {System.out.println(Received message:message);}
}
批量消息
批量消息发送
在对吞吐率有一定要求的情况下Apache RocketMQ可以将一些消息聚成一批以后进行发送可以增加吞吐率并减少API和网络调用次数。 /*** 批量消息发送 https://rocketmq.apache.org/zh/docs/4.x/producer/05message4** return*/GetMapping(/send)public SendResult send(String message) {String[] split message.split(,);ListMessageString list new ArrayList();for (String mes : split) {list.add(MessageBuilder.createMessage(mes, new MessageHeaders(null)));}return rocketMQTemplate.syncSend(list-topic:*, list);}批量消息接收
Component
RocketMQMessageListener(topic list-topic, consumerGroup list-topic)
public class ListRocketMQListener implements RocketMQListenerString {Overridepublic void onMessage(String message) {System.out.println(Received message:message);}
} 事务消息 事务消息发送 /*** 事务消息发送 https://rocketmq.apache.org/zh/docs/4.x/producer/06message5** return*/GetMapping(/send)public SendResult send(String message) {MessageString message1 MessageBuilder.createMessage(message, new MessageHeaders(null));return rocketMQTemplate.sendMessageInTransaction(transA-topic:*, message1, null);} rocketmq-springboot 提供了一个注解RocketMQTransactionListener 使用方法实现RocketMQLocalTransactionListener接口并且类上加注解RocketMQTransactionListener
RocketMQTransactionListener
public class TopicATransactionalMessageService implements RocketMQLocalTransactionListener {private MapString, RocketMQTransactionStrategy strategyMap;//策略模式Autowiredpublic TopicATransactionalMessageService(TopicAStrategy topicAStrategy,TopicBStrategy topicBStrategy) {strategyMap new HashMap();strategyMap.put(transA-topic, topicAStrategy);strategyMap.put(TopicB, topicBStrategy);// ...}/*** param msg* param arg* return*/Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {String topic msg.getHeaders().get(RocketMQHeaders.PREFIXRocketMQHeaders.TOPIC).toString();RocketMQTransactionStrategy strategy strategyMap.get(topic);if (strategy null) {// 如果没有对应的策略可以抛出异常或者返回一个默认的事务状态}return strategy.executeLocalTransaction(msg, arg);}/*** param msg* return*/Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {String topic msg.getHeaders().get(RocketMQHeaders.PREFIXRocketMQHeaders.TOPIC).toString();RocketMQTransactionStrategy strategy strategyMap.get(topic);if (strategy null) {// 如果没有对应的策略可以抛出异常或者返回一个默认的事务状态}return strategy.checkLocalTransaction(msg);}
} 这里可以使用策略模式来实现对每个topic的自定义策略
每个topic处理类需要实现RocketMQTransactionStrategy接口
Service
public class TopicAStrategy implements RocketMQTransactionStrategy {Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {msg.getHeaders().get(RocketMQHeaders.PREFIX RocketMQHeaders.TOPIC);Object payload msg.getPayload();String mes new String((byte[]) payload);if (mes.equals(1)) {return RocketMQLocalTransactionState.COMMIT;} else if (mes.equals(2)) {return RocketMQLocalTransactionState.ROLLBACK;} else {return RocketMQLocalTransactionState.UNKNOWN;}// ...}Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {return RocketMQLocalTransactionState.COMMIT;// ...}
}
这样发送不同的topic都会有不同的处理策略 更多消息可查看官网 https://rocketmq.apache.org/zh/docs/4.x/introduction/02quickstart/ 在实际中遇到的问题 在conf/broker.conf 中配置autoCreateTopicEnabletrue ,如果没有对应的topic则会在生产者有消息发送到mq的时候自动创建对应的topic 如果不配置该属性且开始没有topic的时候生产者发送消息到topic会报错org.apache.rocketmq.client.exception.MQBrokerException: CODE: 17 DESC: topic[delay-topic] not exist, apply first please! 不管有没有配置autoCreateTopicEnabletrue都会出现以下的情况 添加了消费者注解如RocketMQMessageListener(topic delay-topic, consumerGroup delay-topic)程序会自动创建一个名称为%RETRY%delay-topic的topic 如果没有对应的topic则会一直报错org.apache.rocketmq.client.exception.MQClientException: CODE: 17 DESC: No topic route info in name server for the topic: delay-topic 解决方法 在mq中手动添加对应的topic即可 如果你修改了autoCreateTopicEnabletrue没有效果删除rocketmq存储数据的文件夹store即可默认存放位置/root/store 博客中涉及到的仓库地址 https://gitee.com/WangFuGui-Ma/spring-cloud-alibaba