免费素材库网站,微信里的小程序怎么找出来,gonzo wordpress,深圳网站建设制作哪家便宜文章目录 前言基本概念消息和主题相关发送普通消息 发送顺序消息RocketMQTemplate的API介绍参考资料#xff1a; 前言
本文主要有以下内容#xff1a;
简单消息的发送顺序消息的发送RocketMQTemplate的API介绍
环境搭建#xff1a; RocketMQ的安装教程#xff1a;在官网… 文章目录 前言基本概念消息和主题相关发送普通消息 发送顺序消息RocketMQTemplate的API介绍参考资料 前言
本文主要有以下内容
简单消息的发送顺序消息的发送RocketMQTemplate的API介绍
环境搭建 RocketMQ的安装教程在官网上下载bin文件解压到本地并配置环境变量如下图所示
在 Spring boot 项目中引入 RocketMQ 依赖
dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-spring-boot-starter/artifactIdversion2.2.3/version
/dependency在application.yml增加相关配置
server:port: 10001
rocketmq:name-server: 127.0.0.1:9876producer:group: springboot_produce_group # 必须指定groupsend-message-timeout: 3000 # 消息发送超时时长默认3sretry-times-when-send-failed: 3 # 同步发送消息失败重试次数默认2retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数默认2consumer:group: springboot_consumer_group在 Spring Boot 中使用RocketMQ很简单直接注入RocketMQTemplate对象即可:
Resource
private RocketMQTemplate rocketMQTemplate;基本概念
消息和主题相关
消息 message通信交互的载体分为事务消息半事务消息延迟消息顺序消息等。 主题 topic一类消息的集合逻辑概念。 队列 queue主题由一个队列或者多个队列构成当消息发送到某一个主题时需要选择某一个队列。 偏移量 offset消息追加到主题的队列后会分配一个数值表示该队列的几条消息。 消费者相关 消费组 consume group消费组用于订阅主题消费消息可以订阅多个主题一个消费组可以有多个消费者。 广播模式同一个消费组内的所有消费者都会消费订阅主题的所有消息。即一条消息会被该消费者组的所有消费者消费。 集群模式同一个消费组内的所有消费者只消费订阅主题的一部分消息即一条消息只会被改消费组的一个消费者消费。 并发消费同一个队列的消息由多线程消费且不保证消息的顺序。 顺序消费保证同一队列的消息按顺序消费。
发送普通消息
创建MsgController代码如下
RestController
RequestMapping(send/)
CrossOrigin(allowedHeaders *, origins *)
Slf4j
public class MsgController {Resourceprivate RocketMQTemplate rocketMQTemplate;GetMapping(normal)public void sendNormalMsg() {MessageString msg MessageBuilder.withPayload(Hello,RocketMQ Normal_msg).build();rocketMQTemplate.send(normal_msg, msg);}
}创建消息的消费者只需要实现RocketMQListener接口中的方法即可代码如下
Component
RocketMQMessageListener(topic normal_msg, consumerGroup consumer_normal)
Slf4j
public class NormalMsgConsumer implements RocketMQListenerString {Overridepublic void onMessage(String message) {log.info(Receive Normal Msg: {},message);}
}RocketMQMessageListener注解用在消费者类上指定当前类消费的主题。 topic指定消费者的主题 comsumerGroup指定消费者组Consumer Group名称用于区分不同的消费者。 启动项目运行结果如下图所示
发送顺序消息
顺序消息保证同一队列的消息按顺序消费。 在MsgController 中添加如下代码
GetMapping(order)
public void sendOrderMsg(){
log.info(开始发送顺序消息);for (int j 0; j 10; j) {MessageString sendOrderMsg MessageBuilder.withPayload(Send Order Msg j time: LocalDateTime.now()).build();try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}rocketMQTemplate.convertAndSend(msg:order, sendOrderMsg);}log.info(顺序消息发送结束);
}创建对应topic消息的消费者代码如下所示
Component
RocketMQMessageListener(topic msg,consumerGroup consumer_order_group,selectorExpression order,messageModel MessageModel.CLUSTERING,selectorType SelectorType.TAG)
Slf4j
public class OrderMsgConsumer implements RocketMQListenerString {Overridepublic void onMessage(String message) {log.info(Receive Order Msg: {},message);}
}RocketMQMessageListener其他属性介绍
selectorExpression 消息选择表达式用于过滤消息只有满足表达式条件的消息才会被消费。默认值为 *表示订阅所有消息。
全匹配*默认值。 属性匹配指定tag ‘tagName’上面的代码就可以改写为tag ‘order’ 表达式匹配需要指定selectType SelectorType.SQL92见下面。
selectorType指明了消息选择通过tag的方式默认值SelectorType.TAG。可选值有SelectorType.SQL92
TAG支持tagName的方式配置如果有多个标签则用||进行连接 SQL92关键字有AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS, NULL。支持的数据类型有Boolean, String, Decimal, Float number等。使用方式如(a 10 AND a 100) OR (b IS NOT NULL AND bTRUE)
messageModel消息模式可选值为 MessageModel.CLUSTERING默认或 MessageModel.BROADCASTING分别表示集群模式和广播模式。
重新启动项目运行结果如下图所示
RocketMQTemplate的API介绍
在上面的api使用中都没有去关注是否消息发送的状态如是否成功发送到了哪一个队列等。接下来就介绍一下相关API的使用
带返回值的发送普通消息SendResult syncSend(String destination, Message? message);
在MsgController添加如下代码
GetMapping(normal_result)
public void sendNormalResultMsg() {MessageString msg MessageBuilder.withPayload(normal_return_result).build();SendResult normalMsg rocketMQTemplate.syncSend(normal_msg, msg);log.info(normalMsg {},normalMsg);
}如log所示可以看到发送状态等信息。
发送异步消息在MsgController中添加如下代码
GetMapping(callback)
public void sendNormalResultMsgWithCallback(){MessageString msg MessageBuilder.withPayload(normal_return_result).build();rocketMQTemplate.asyncSend(normal_msg, msg, new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) {log.info(success);}Overridepublic void onException(Throwable throwable) {log.info(error);}});
}运行结果如下所示
发送顺序消息在第二部分以及展示过了也可以用如下代码替换
rocketMQTemplate.convertAndSend(msg:order, sendOrderMsg);
// 替换为
rocketMQTemplate.syncSendOrderly(msg:order, sendOrderMsg,String.valueOf(j));发送单向消息
GetMapping(oneway)
public void sendOneWay(){MessageString oneWay MessageBuilder.withPayload(Send Order Msg time: LocalDateTime.now()).build();rocketMQTemplate.sendOneWay(normal_msg,oneWay);
}运行结果如下图所示
发送事务消息暂不举例后续补充 发送事务消息带回调和syncSend()类似后续补充相关用法。
参考资料
《RocketMQ 实战》