当前位置: 首页 > news >正文

鞍山网站设计制作网站建设整体设计流程

鞍山网站设计制作,网站建设整体设计流程,懒人免费建站模板,鞍山高新区网站1.RocketMQ简介 官网#xff1a; http://rocketmq.apache.org/ RocketMQ是阿里巴巴2016年MQ中间件#xff0c;使用Java语言开发#xff0c;RocketMQ 是一款开源的分布式消息系统#xff0c;基于高可用分布式集群技术#xff0c;提供低延时的、高可靠的消息发布与订阅服…1.RocketMQ简介 官网 http://rocketmq.apache.org/ RocketMQ是阿里巴巴2016年MQ中间件使用Java语言开发RocketMQ 是一款开源的分布式消息系统基于高可用分布式集群技术提供低延时的、高可靠的消息发布与订阅服务。同时广泛应用于多个领域包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。 具有以下特点 l能够保证严格的消息顺序 l提供丰富的消息拉取模式 l高效的订阅者水平扩展能力 l实时的消息订阅机制 l亿级消息堆积能力 2.为什么要使用MQ 1要做到系统解耦当新的模块进来时可以做到代码改动最小; 能够解耦 2设置流程缓冲池可以让后端系统按自身吞吐能力进行消费不被冲垮; 能够削峰限流 3强弱依赖梳理能把非关键调用链路的操作异步化并提升整体系统的吞吐能力;能够异步 Mq的作用 削峰限流 异步 解耦合 2.1 定义 中间件缓存中间件 redis memcache 数据库中间件 mycat canal 消息中间件mq 面向消息的中间件(message-oriented middleware0) MOM能够很好的解决以上的问题。 是指利用高效可靠的消息传递机制进行与平台无关跨平台的数据交流并基于数据通信来进行分布式系统的集成。 通过提供消息传递和消息排队模型在分布式环境下提供应用解耦弹性伸缩冗余存储流量削峰异步通信数据同步等 大致流程 发送者把消息发给消息服务器消息服务器把消息存放在若干队列/主题中在合适的时候消息服务器会把消息转发给接受者。在这个过程中发送和接受是异步的,也就是发送无需等待发送者和接受者的生命周期也没有必然关系在发布pub/订阅sub模式下也可以完成一对多的通信可以让一个消息有多个接受者[微信订阅号就是这样的] 2.2 特点 2.2.1 异步处理模式 消息发送者可以发送一个消息而无需等待响应。消息发送者把消息发送到一条虚拟的通道(主题或队列)上; 消息接收者则订阅或监听该通道。一条信息可能最终转发给一个或多个消息接收者这些接收者都无需对消息发送者做出回应。整个过程都是异步的。 案例 也就是说一个系统和另一个系统间进行通信的时候假如系统A希望发送一个消息给系统B让它去处理但是系统A不关注系统B到底怎么处理或者有没有处理好所以系统A把消息发送给MQ然后就不管这条消息的“死活” 了接着系统B从MQ里面消费出来处理即可。至于怎么处理是否处理完毕什么时候处理都是系统B的事与系统A无关。 这样的一种通信方式就是所谓的“异步”通信方式对于系统A来说只要把消息发给MQ,然后系统B就会异步处去进行处理了系统A不能“同步”的等待系统B处理完。这样的好处是什么呢解耦 2.2.2 应用系统的解耦 发送者和接收者不必了解对方只需要确认消息 发送者和接收者不必同时在线 2.2.3 现实中的业务 3.各个MQ产品的比较 4.RocketMQ重要概念【重点】 Producer消息的发送者生产者举例发件人 Consumer消息接收者消费者举例收件人 Broker暂存和传输消息的通道举例快递 NameServer管理Broker举例各个快递公司的管理机构 相当于broker的注册中心保留了broker的信息 Queue队列消息存放的位置一个Broker中可以有多个队列 Topic主题消息的分类 ProducerGroup生产者组 ConsumerGroup消费者组多个消费者组可以同时消费一个主题的消息 消息发送的流程是Producer询问NameServerNameServer分配一个broker 然后Consumer也要询问NameServer得到一个具体的broker然后消费消息 5.生产和消费理解【重点】 6.RocketMQ安装 了解了mq的基本概念和角色以后我们开始安装rocketmq建议在linux上 6.1 下载RocketMQ 下载地址https://rocketmq.apache.org/dowloading/releases/ 注意选择版本这里我们选择4.9.2的版本后面使用alibaba时对应 下载地址 https://archive.apache.org/dist/rocketmq/4.9.2/rocketmq-all-4.9.2-bin-release.zip 6.2 上传服务器 在root目录下创建文件夹 mkdir rocketmq 将下载后的压缩包上传到阿里云服务器或者虚拟机中去 6.3 解压 unzip rocketmq-all-4.9.2-bin-release.zip 如果你的服务器没有unzip命令则下载安装一个 yum install unzip 目录分析 Benchmark包含一些性能测试的脚本 Bin可执行文件目录 Conf配置文件目录 Lib第三方依赖 LICENSE授权信息; NOTICE版本公告 6.4 配置环境变量 vi /etc/profile 在文件末尾添加 export NAMESRV_ADDR阿里云公网IP:9876 刷新环境变量 source /etc/profile 6.5 修改nameServer的运行脚本 进入bin目录下修改runserver.sh文件,将71行和76行的Xms和Xmx等改小一点 vi runserver.sh 保存退出 6.6 修改broker的运行脚本 进入bin目录下修改runbroker.sh文件,修改67行 保存退出 6.7 修改broker的配置文件 进入conf目录下修改broker.conf文件 brokerClusterName DefaultCluster brokerName broker-a brokerId 0 deleteWhen 04 fileReservedTime 48 brokerRole ASYNC_MASTER flushDiskType ASYNC_FLUSH namesrvAddrlocalhost:9876 autoCreateTopicEnabletrue brokerIP1阿里云公网IP 添加参数解释 namesrvAddrnameSrv地址 可以写localhost因为nameSrv和broker在一个服务器 autoCreateTopicEnable自动创建主题不然需要手动创建出来 brokerIP1broker也需要一个公网ip如果不指定那么是阿里云的内网地址我们再本地无法连接使用 6.8 启动 首先在安装目录下创建一个logs文件夹用于存放日志 mkdir logs 一次运行两条命令 启动nameSrv nohup sh bin/mqnamesrv ./logs/namesrv.log 启动broker 这里的-c是指定使用的配置文件 nohup sh bin/mqbroker -c conf/broker.conf ./logs/broker.log 查看启动结果 6.9 RocketMQ控制台的安装RocketMQ-Console Rocketmq 控制台可以可视化MQ的消息发送 旧版本源码是在rocketmq-external里的rocketmq-console新版本已经单独拆分成dashboard 网址 https://github.com/apache/rocketmq-dashboard 下载地址 https://github.com/apache/rocketmq-dashboard/archive/refs/tags/rocketmq-dashboard-1.0.0.zip 下载后解压出来在跟目录下执行 mvn clean package -Dmaven.test.skiptrue 将jar包上传到服务器上去 然后运行 nohup java -jar ./rocketmq-dashboard-1.0.0.jar ./rocketmq-4.9.2/logs/dashboard.log 命令拓展:–server.port指定运行的端口 –rocketmq.config.namesrvAddr127.0.0.1:9876 指定namesrv地址 访问 http://localhost:8001 运行访问端口是8001如果从官网拉下来打包的话默认端口是8080 nohup java -jar rocketmq-dashboard-1.0.0.jar --server.port8081 --rocketmq.config.namesrvAddr47.100.238.122:9876 rocketmq-dashboard.log 7.RocketMQ安装之docker 7.1 下载RockerMQ需要的镜像 docker pull rocketmqinc/rocketmq docker pull styletang/rocketmq-console-ng 7.2 启动NameServer服务 7.2.1 创建NameServer数据存储路径 mkdir -p /home/rocketmq/data/namesrv/logs /home/rocketmq/data/namesrv/store 7.2.2 启动NameServer容器 docker run -d --name rmqnamesrv -p 9876:9876 -v /home/rocketmq/data/namesrv/logs:/root/logs -v /home/rocketmq/data/namesrv/store:/root/store -e “MAX_POSSIBLE_HEAP100000000” rocketmqinc/rocketmq sh mqnamesrv 7.3 启动Broker服务 7.3.1 创建Broker数据存储路径 mkdir -p /home/rocketmq/data/broker/logs /home/rocketmq/data/broker/store 7.3.2 创建conf配置文件目录 mkdir /home/rocketmq/conf 7.3.3 在配置文件目录下创建broker.conf配置文件 # 所属集群名称如果节点较多可以配置多个 brokerClusterName DefaultCluster #broker名称master和slave使用相同的名称表明他们的主从关系 brokerName broker-a #0表示Master大于0表示不同的slave brokerId 0 #表示几点做消息删除动作默认是凌晨4点 deleteWhen 04 #在磁盘上保留消息的时长单位是小时 fileReservedTime 48 #有三个值SYNC_MASTERASYNC_MASTERSLAVE同步和异步表示Master和Slave之间同步数据的机制 brokerRole ASYNC_MASTER #刷盘策略取值为ASYNC_FLUSHSYNC_FLUSH表示同步刷盘和异步刷盘SYNC_FLUSH消息写入磁盘后才返回成功状态ASYNC_FLUSH不需要 flushDiskType ASYNC_FLUSH # 设置broker节点所在服务器的ip地址 brokerIP1 你服务器外网ip7.3.4 启动Broker容器 docker run -d --name rmqbroker --link rmqnamesrv:namesrv -p 10911:10911 -p 10909:10909 -v /home/rocketmq/data/broker/logs:/root/logs -v /home/rocketmq/data/broker/store:/root/store -v /home/rocketmq/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf --privilegedtrue -e NAMESRV_ADDRnamesrv:9876 -e MAX_POSSIBLE_HEAP200000000 rocketmqinc/rocketmq sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf7.4 启动控制台 docker run -d --name rmqadmin -e JAVA_OPTS-Drocketmq.namesrv.addr你的外网地址:9876 \-Dcom.rocketmq.sendMessageWithVIPChannelfalse \-Duser.timezoneAsia/Shanghai -v /etc/localtime:/etc/localtime -p 9999:8080 styletang/rocketmq-console-ng7.5 正常启动后的docker ps 7.6 访问控制台 http://你的服务器外网ip:9999/ 8.RocketMQ快速入门 RocketMQ提供了发送多种发送消息的模式例如同步消息异步消息顺序消息延迟消息事务消息等我们一一学习 8.1 消息发送和监听的流程 我们先搞清楚消息发送和监听的流程然后我们在开始敲代码 8.1.1 消息生产者 1.创建消息生产者producer并制定生产者组名 2.指定Nameserver地址 3.启动producer 4.创建消息对象指定主题Topic、Tag和消息体等 5.发送消息 6.关闭生产者producer 8.1.2 消息消费者 1.创建消费者consumer制定消费者组名 2.指定Nameserver地址 3.创建监听订阅主题Topic和Tag等 4.处理消息 5.启动消费者consumer 8.2 搭建Rocketmq-demo 8.2.1 加入依赖 dependenciesdependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-client/artifactIdversion4.9.2/version!--docker的用下面这个版本--version4.4.0/version/dependencydependencygroupIdjunit/groupIdartifactIdjunit/artifactIdversion4.12/version/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.18.22/version/dependency /dependencies8.2.2 编写生产者 /*** 测试生产者** throws Exception*/ Test public void testProducer() throws Exception {// 创建默认的生产者DefaultMQProducer producer new DefaultMQProducer(test-group);// 设置nameServer地址producer.setNamesrvAddr(localhost:9876);// 启动实例producer.start();for (int i 0; i 10; i) {// 创建消息// 第一个参数主题的名字// 第二个参数消息内容Message msg new Message(TopicTest, (Hello RocketMQ i).getBytes());SendResult send producer.send(msg);System.out.println(send);}// 关闭实例producer.shutdown(); }8.2.3 编写消费者 /*** 测试消费者** throws Exception*/Testpublic void testConsumer() throws Exception {// 创建默认消费者组DefaultMQPushConsumer consumer new DefaultMQPushConsumer(consumer-group);// 设置nameServer地址consumer.setNamesrvAddr(localhost:9876);// 订阅一个主题来消费 *表示没有过滤参数 表示这个主题的任何消息consumer.subscribe(TopicTest, *);// 注册一个消费监听 MessageListenerConcurrently 是多线程消费默认20个线程可以参看consumer.setConsumeThreadMax()consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs,ConsumeConcurrentlyContext context) {System.out.println(Thread.currentThread().getName() ---- msgs);// 返回消费的状态 如果是CONSUME_SUCCESS 则成功若为RECONSUME_LATER则该条消息会被重回队列重新被投递// 重试的时间为messageDelayLevel 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h// 也就是第一次1s 第二次5s 第三次10s .... 如果重试了18次 那么这个消息就会被终止发送给消费者 // return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});// 这个start一定要写在registerMessageListener下面consumer.start();System.in.read();}8.2.4 测试 启动生产者和消费者进行测试 9.RocketMQ发送同步消息* 上面的快速入门就是发送同步消息发送过后会有一个返回值也就是mq服务器接收到消息后返回的一个确认这种方式非常安全但是性能上并没有这么高而且在mq集群中也是要等到所有的从机都复制了消息以后才会返回所以针对重要的消息可以选择这种方式 10.RocketMQ发送异步消息* 异步消息通常用在对响应时间敏感的业务场景即发送端不能容忍长时间地等待Broker的响应。发送完以后会有一个异步消息通知 10.1 异步消息生产者 Test public void testAsyncProducer() throws Exception {// 创建默认的生产者DefaultMQProducer producer new DefaultMQProducer(test-group);// 设置nameServer地址producer.setNamesrvAddr(localhost:9876);// 启动实例producer.start();Message msg new Message(TopicTest, (异步消息).getBytes());producer.send(msg, new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) {System.out.println(发送成功);}Overridepublic void onException(Throwable e) {System.out.println(发送失败);}});System.out.println(看看谁先执行);// 挂起jvm 因为回调是异步的不然测试不出来System.in.read();// 关闭实例producer.shutdown(); }10.2 异步消息消费者 Test public void testAsyncConsumer() throws Exception {// 创建默认消费者组DefaultMQPushConsumer consumer new DefaultMQPushConsumer(consumer-group);// 设置nameServer地址consumer.setNamesrvAddr(localhost:9876);// 订阅一个主题来消费 *表示没有过滤参数 表示这个主题的任何消息consumer.subscribe(TopicTest, *);// 注册一个消费监听 MessageListenerConcurrently是并发消费// 默认是20个线程一起消费可以参看 consumer.setConsumeThreadMax()consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs,ConsumeConcurrentlyContext context) {// 这里执行消费的代码 默认是多线程消费System.out.println(Thread.currentThread().getName() ---- msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read(); }11.RocketMQ发送单向消息* 这种方式主要用在不关心发送结果的场景这种方式吞吐量很大但是存在消息丢失的风险例如日志信息的发送 11.1 单向消息生产者 Test public void testOnewayProducer() throws Exception {// 创建默认的生产者DefaultMQProducer producer new DefaultMQProducer(test-group);// 设置nameServer地址producer.setNamesrvAddr(localhost:9876);// 启动实例producer.start();Message msg new Message(TopicTest, (单向消息).getBytes());// 发送单向消息producer.sendOneway(msg);// 关闭实例producer.shutdown(); }11.2 单向消息消费者 消费者和上面一样 12.RocketMQ发送延迟消息* 消息放入mq后过一段时间才会被监听到然后消费 比如下订单业务提交了一个订单就可以发送一个延时消息30min后去检查这个订单的状态如果还是未付款就取消订单释放库存。 12.1 延迟消息生产者 Test public void testDelayProducer() throws Exception {// 创建默认的生产者DefaultMQProducer producer new DefaultMQProducer(test-group);// 设置nameServer地址producer.setNamesrvAddr(localhost:9876);// 启动实例producer.start();Message msg new Message(TopicTest, (延迟消息).getBytes());// 给这个消息设定一个延迟等级// messageDelayLevel 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2hmsg.setDelayTimeLevel(3);// 发送单向消息producer.send(msg);// 打印时间System.out.println(new Date());// 关闭实例producer.shutdown(); }12.2 延迟消息消费者 消费者和上面一样 这里注意的是RocketMQ不支持任意时间的延时 只支持以下几个固定的延时等级等级1就对应1s以此类推最高支持2h延迟 private String messageDelayLevel “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”; 13.RocketMQ发送顺序消息** 消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序可以分为分区有序或者全局有序。 可能大家会有疑问mq不就是FIFO吗 rocketMq的broker的机制导致了rocketMq会有这个问题 因为一个broker中对应了四个queue 顺序消费的原理解析在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列)而消费消息的时候从多个queue上拉取消息这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中消费的时候只从这个queue上依次拉取则就保证了顺序。当发送和消费参与的queue只有一个则是全局有序如果多个queue参与则为分区有序即相对每个queue消息都是有序的。 下面用订单进行分区有序的示例。一个订单的顺序流程是下订单、发短信通知、物流、签收。订单顺序号相同的消息会被先后发送到同一个队列中消费时同一个顺序获取到的肯定是同一个队列。 13.1 场景分析 模拟一个订单的发送流程创建两个订单发送的消息分别是 订单号111 消息流程 下订单-物流-签收 订单号112 消息流程 下订单-物流-拒收 13.2 创建一个订单对象 Data AllArgsConstructor NoArgsConstructor public class Order {/*** 订单id*/private Integer orderId;/*** 订单编号*/private Integer orderNumber;/*** 订单价格*/private Double price;/*** 订单号创建时间*/private Date createTime;/*** 订单描述*/private String desc;}13.3 顺序消息生产者 Test public void testOrderlyProducer() throws Exception {// 创建默认的生产者DefaultMQProducer producer new DefaultMQProducer(test-group);// 设置nameServer地址producer.setNamesrvAddr(localhost:9876);// 启动实例producer.start();ListOrder orderList Arrays.asList(new Order(1, 111, 59D, new Date(), 下订单),new Order(2, 111, 59D, new Date(), 物流),new Order(3, 111, 59D, new Date(), 签收),new Order(4, 112, 89D, new Date(), 下订单),new Order(5, 112, 89D, new Date(), 物流),new Order(6, 112, 89D, new Date(), 拒收));// 循环集合开始发送orderList.forEach(order - {Message message new Message(TopicTest, order.toString().getBytes());try {// 发送的时候 相同的订单号选择同一个队列producer.send(message, new MessageQueueSelector() {Overridepublic MessageQueue select(ListMessageQueue mqs, Message msg, Object arg) {// 当前主题有多少个队列int queueNumber mqs.size();// 这个arg就是后面传入的 order.getOrderNumber()Integer i (Integer) arg;// 用这个值去%队列的个数得到一个队列int index i % queueNumber;// 返回选择的这个队列即可 那么相同的订单号 就会被放在相同的队列里 实现FIFO了return mqs.get(index);}}, order.getOrderNumber());} catch (Exception e) {System.out.println(发送异常);}});// 关闭实例producer.shutdown(); }13.4 顺序消息消费者测试时等一会即可有延迟 Test public void testOrderlyConsumer() throws Exception {// 创建默认消费者组DefaultMQPushConsumer consumer new DefaultMQPushConsumer(consumer-group);// 设置nameServer地址consumer.setNamesrvAddr(localhost:9876);// 订阅一个主题来消费 *表示没有过滤参数 表示这个主题的任何消息consumer.subscribe(TopicTest, *);// 注册一个消费监听 MessageListenerOrderly 是顺序消费 单线程消费consumer.registerMessageListener(new MessageListenerOrderly() {Overridepublic ConsumeOrderlyStatus consumeMessage(ListMessageExt msgs, ConsumeOrderlyContext context) {MessageExt messageExt msgs.get(0);System.out.println(new String(messageExt.getBody()));return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.in.read(); }14.RocketMQ发送批量消息 Rocketmq可以一次性发送一组消息那么这一组消息会被当做一个消息消费 14.1 批量消息生产者 Test public void testBatchProducer() throws Exception {// 创建默认的生产者DefaultMQProducer producer new DefaultMQProducer(test-group);// 设置nameServer地址producer.setNamesrvAddr(localhost:9876);// 启动实例producer.start();ListMessage msgs Arrays.asList(new Message(TopicTest, 我是一组消息的A消息.getBytes()),new Message(TopicTest, 我是一组消息的B消息.getBytes()),new Message(TopicTest, 我是一组消息的C消息.getBytes()));SendResult send producer.send(msgs);System.out.println(send);// 关闭实例producer.shutdown(); }14.2 批量消息消费者 Test public void testBatchConsumer() throws Exception {// 创建默认消费者组DefaultMQPushConsumer consumer new DefaultMQPushConsumer(consumer-group);// 设置nameServer地址consumer.setNamesrvAddr(localhost:9876);// 订阅一个主题来消费 表达式默认是*consumer.subscribe(TopicTest, *);// 注册一个消费监听 MessageListenerConcurrently是并发消费// 默认是20个线程一起消费可以参看 consumer.setConsumeThreadMax()consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs,ConsumeConcurrentlyContext context) {// 这里执行消费的代码 默认是多线程消费System.out.println(Thread.currentThread().getName() ---- new String(msgs.get(0).getBody()));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read(); }15.RocketMQ发送事务消息**** 15.1 事务消息的发送流程 它可以被认为是一个两阶段的提交消息实现以确保分布式系统的最终一致性。事务性消息确保本地事务的执行和消息的发送可以原子地执行。 上图说明了事务消息的大致方案其中分为两个流程正常事务消息的发送及提交、事务消息的补偿流程。 事务消息发送及提交 发送消息half消息。 服务端响应消息写入结果。 根据发送结果执行本地事务如果写入失败此时half消息对业务不可见本地逻辑不执行。 根据本地事务状态执行Commit或RollbackCommit操作生成消息索引消息对消费者可见 事务补偿 对没有Commit/Rollback的事务消息pending状态的消息从服务端发起一次“回查” Producer收到回查消息检查回查消息对应的本地事务的状态 根据本地事务状态重新Commit或者Rollback 其中补偿阶段用于解决消息UNKNOW或者Rollback发生超时或者失败的情况。 事务消息状态 事务消息共有三种状态提交状态、回滚状态、中间状态 l TransactionStatus.CommitTransaction: 提交事务它允许消费者消费此消息。 l TransactionStatus.RollbackTransaction: 回滚事务它代表该消息将被删除不允许被消费。 l TransactionStatus.Unknown: 中间状态它代表需要检查消息队列来确定状态。 15.2 事务消息生产者 /*** TransactionalMessageCheckService的检测频率默认1分钟可通过在broker.conf文件中设置transactionCheckInterval的值来改变默认值单位为毫秒。* 从broker配置文件中获取transactionTimeOut参数值。* 从broker配置文件中获取transactionCheckMax参数值表示事务的最大检测次数如果超过检测次数消息会默认为丢弃即回滚消息。** throws Exception*/ Test public void testTransactionProducer() throws Exception {// 创建一个事务消息生产者TransactionMQProducer producer new TransactionMQProducer(test-group);producer.setNamesrvAddr(localhost:9876);// 设置事务消息监听器producer.setTransactionListener(new TransactionListener() {// 这个是执行本地业务方法Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {System.out.println(new Date());System.out.println(new String(msg.getBody()));// 这个可以使用try catch对业务代码进行性包裹// COMMIT_MESSAGE 表示允许消费者消费该消息// ROLLBACK_MESSAGE 表示该消息将被删除不允许消费// UNKNOW表示需要MQ回查才能确定状态 那么过一会 代码会走下面的checkLocalTransaction(msg)方法return LocalTransactionState.UNKNOW;}// 这里是回查方法 回查不是再次执行业务操作而是确认上面的操作是否有结果// 默认是1min回查 默认回查15次 超过次数则丢弃打印日志 可以通过参数设置// transactionTimeOut 超时时间// transactionCheckMax 最大回查次数// transactionCheckInterval 回查间隔时间单位毫秒// 触发条件// 1.当上面执行本地事务返回结果UNKNOW时,或者下面的回查方法也返回UNKNOW时 会触发回查// 2.当上面操作超过20s没有做出一个结果也就是超时或者卡主了也会进行回查Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {System.err.println(new Date());System.err.println(new String(msg.getBody()));// 这里return LocalTransactionState.UNKNOW;}});producer.start();Message message new Message(TopicTest2, 我是一个事务消息.getBytes());// 发送消息producer.sendMessageInTransaction(message, null);System.out.println(new Date());System.in.read(); }15.3 事务消息消费者 Test public void testTransactionConsumer() throws Exception {// 创建默认消费者组DefaultMQPushConsumer consumer new DefaultMQPushConsumer(consumer-group);// 设置nameServer地址consumer.setNamesrvAddr(localhost:9876);// 订阅一个主题来消费 *表示没有过滤参数 表示这个主题的任何消息consumer.subscribe(TopicTest2, *);// 注册一个消费监听 MessageListenerConcurrently是并发消费// 默认是20个线程一起消费可以参看 consumer.setConsumeThreadMax()consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs,ConsumeConcurrentlyContext context) {// 这里执行消费的代码 默认是多线程消费System.out.println(Thread.currentThread().getName() ---- new String(msgs.get(0).getBody()));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read(); }15.4 测试结果 16.RocketMQ发送带标签的消息消息过滤 Rocketmq提供消息过滤功能通过tag或者key进行区分 我们往一个主题里面发送消息的时候根据业务逻辑可能需要区分比如带有tagA标签的被A消费带有tagB标签的被B消费还有在事务监听的类里面只要是事务消息都要走同一个监听我们也需要通过过滤才区别对待 16.1 标签消息生产者 Test public void testTagProducer() throws Exception {// 创建默认的生产者DefaultMQProducer producer new DefaultMQProducer(test-group);// 设置nameServer地址producer.setNamesrvAddr(localhost:9876);// 启动实例producer.start();Message msg new Message(TopicTest,tagA, 我是一个带标记的消息.getBytes());SendResult send producer.send(msg);System.out.println(send);// 关闭实例producer.shutdown(); }16.2 标签消息消费者 Test public void testTagConsumer() throws Exception {// 创建默认消费者组DefaultMQPushConsumer consumer new DefaultMQPushConsumer(consumer-group);// 设置nameServer地址consumer.setNamesrvAddr(localhost:9876);// 订阅一个主题来消费 表达式默认是*,支持tagA || tagB || tagC 这样或者的写法 只要是符合任何一个标签都可以消费consumer.subscribe(TopicTest, tagA || tagB || tagC);// 注册一个消费监听 MessageListenerConcurrently是并发消费// 默认是20个线程一起消费可以参看 consumer.setConsumeThreadMax()consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs,ConsumeConcurrentlyContext context) {// 这里执行消费的代码 默认是多线程消费System.out.println(Thread.currentThread().getName() ---- new String(msgs.get(0).getBody()));System.out.println(msgs.get(0).getTags());return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read(); }17.RocketMQ中消息的Key(业务相关) 在rocketmq中的消息默认会有一个messageId当做消息的唯一标识我们也可以给消息携带一个key用作唯一标识或者业务标识包括在控制面板查询的时候也可以使用messageId或者key来进行查询 17.1 带key消息生产者 Test public void testKeyProducer() throws Exception {// 创建默认的生产者DefaultMQProducer producer new DefaultMQProducer(test-group);// 设置nameServer地址producer.setNamesrvAddr(localhost:9876);// 启动实例producer.start();Message msg new Message(TopicTest,tagA,key, 我是一个带标记和key的消息.getBytes());SendResult send producer.send(msg);System.out.println(send);// 关闭实例producer.shutdown(); }17.2 带key消息消费者 Test public void testKeyConsumer() throws Exception {// 创建默认消费者组DefaultMQPushConsumer consumer new DefaultMQPushConsumer(consumer-group);// 设置nameServer地址consumer.setNamesrvAddr(localhost:9876);// 订阅一个主题来消费 表达式默认是*,支持tagA || tagB || tagC 这样或者的写法 只要是符合任何一个标签都可以消费consumer.subscribe(TopicTest, tagA || tagB || tagC);// 注册一个消费监听 MessageListenerConcurrently是并发消费// 默认是20个线程一起消费可以参看 consumer.setConsumeThreadMax()consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs,ConsumeConcurrentlyContext context) {// 这里执行消费的代码 默认是多线程消费System.out.println(Thread.currentThread().getName() ---- new String(msgs.get(0).getBody()));System.out.println(msgs.get(0).getTags());System.out.println(msgs.get(0).getKeys());return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read(); } 18.RocketMQ重试机制 18.1 生产者重试 // 失败的情况重发3次producer.setRetryTimesWhenSendFailed(3);// 消息在1S内没有发送成功就会重试producer.send(msg, 1000);18.2 消费者重试 在消费者放return ConsumeConcurrentlyStatus.RECONSUME_LATER;后就会执行重试 上图代码中说明了我们再实际生产过程中一般重试5-7次如果还没有消费成功则可以把消息签收了通知人工等处理 messageDelayLevel 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h /*** 测试消费者** throws Exception*/ Test public void testConsumer() throws Exception {// 创建默认消费者组DefaultMQPushConsumer consumer new DefaultMQPushConsumer(consumer-group);// 设置nameServer地址consumer.setNamesrvAddr(localhost:9876);// 订阅一个主题来消费 *表示没有过滤参数 表示这个主题的任何消息consumer.subscribe(TopicTest, *);// 注册一个消费监听consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs,ConsumeConcurrentlyContext context) {try {// 这里执行消费的代码System.out.println(Thread.currentThread().getName() ---- msgs);// 这里制造一个错误int i 10 / 0;} catch (Exception e) {// 出现问题 判断重试的次数MessageExt messageExt msgs.get(0);// 获取重试的次数 失败一次消息中的失败次数会累加一次int reconsumeTimes messageExt.getReconsumeTimes();if (reconsumeTimes 3) {// 则把消息确认了可以将这条消息记录到日志或者数据库 通知人工处理return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} else {return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read(); }19.RocketMQ死信消息 当消费重试到达阈值以后消息不会被投递给消费者了而是进入了死信队列 19.1 消息生产者 Test public void testDeadMsgProducer() throws Exception {DefaultMQProducer producer new DefaultMQProducer(dead-group);producer.setNamesrvAddr(localhost:9876);producer.start();Message message new Message(dead-topic, 我是一个死信消息.getBytes());producer.send(message);producer.shutdown(); }19.2 消息消费者 Test public void testDeadMsgConsumer() throws Exception {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(dead-group);consumer.setNamesrvAddr(localhost:9876);consumer.subscribe(dead-topic, *);// 设置最大消费重试次数 2 次consumer.setMaxReconsumeTimes(2);consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {System.out.println(msgs);// 测试消费失败return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});consumer.start();System.in.read(); }19.3 信消费者 注意权限问题 perm 2读 4写 6读写 Test public void testDeadMq() throws Exception{DefaultMQPushConsumer consumer new DefaultMQPushConsumer(dead-group);consumer.setNamesrvAddr(localhost:9876);// 消费重试到达阈值以后消息不会被投递给消费者了而是进入了死信队列// 队列名称 默认是 %DLQ% 消费者组名consumer.subscribe(%DLQ%dead-group, *);consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {System.out.println(msgs);// 处理消息 签收了return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read(); }19.4 控制台显示
http://www.zqtcl.cn/news/411874/

相关文章:

  • 微信链接的微网站怎么做西安企业网站制作价格
  • uniapp怎么做淘客网站表格布局的网站
  • wordpress侧栏图片插件提升seo搜索排名
  • 如何查询网站的域名注册邹城建设银行网站
  • 招生门户网站建设方案国家企业信用信息公示信息查询网
  • 用dw做淘客网站的步骤移动互联网应用技术
  • 企业合作的响应式网站石家庄网站建设推广
  • 成都网站排名优化开发广告传媒公司简介模板
  • 中山网站建设企业网站内容建设
  • 免费网站建站页面wordpress的主题在哪个文件夹
  • 国企网站建设要求站长之家排行榜
  • 做视频网站利润如何处理旅游电子商务网站建设技术规范
  • 做网站架构网页浏览器怎么卸载
  • 做甜品的网站网页传奇游戏排行榜比亚迪
  • 广州网站建设菲利宾百度关键词优化排名
  • 南昌网站建设业务wordpress添加购买按钮
  • 个人现在可以做哪些网站企业所得税是多少
  • 网站建设招标信息科技企业网站建设
  • 怎样弄网站站长工具综合查询
  • 表白网站在线制作软件合肥seo按天收费
  • 襄阳企业网站建设免费行情的软件入口下载
  • 对百度网站进行分析中国机械加工网18易0下6拉en
  • 一般做网站都在什么网做wordpress轮播图设置
  • 深圳装饰公司网站thinkphp 网站根目录地址
  • 购物网站建设资讯原创文章代写
  • 门票预订网站建设wordpress siren主题
  • 单位建设网站装修公司需要什么资质
  • 做做做网站做网站赚外快
  • 网站备案后应该做什么网站流量监测
  • 开发网站用什么语言做名片的网站叫什么来着