当前位置: 首页 > news >正文

开封市网站开发公司广饶网站定制

开封市网站开发公司,广饶网站定制,网站制作毕业设计论文,软件开发税率是13%还是6目录 1、初识 RabbitMQ 消息队列 1.1 MQ 四大核心概念 1.2 消息的发送#xff08;无交换机态#xff09; 1.3 关于消息自动重新入队 1.3.1 消息的常见应答方法#xff08;R#xff09; 1.4 关于 RabbitMQ 的持久化、不公平分发以及预取值 2、RabbitMQ 消息的发布确认…目录 1、初识 RabbitMQ 消息队列 1.1 MQ 四大核心概念 1.2 消息的发送无交换机态 1.3 关于消息自动重新入队 1.3.1 消息的常见应答方法R 1.4 关于 RabbitMQ 的持久化、不公平分发以及预取值 2、RabbitMQ 消息的发布确认 2.1 MQ的单个确认发布 2.2 MQ的批量确认发布 2.3 MQ的异步确认发布重点 3、关于 Exchanges 交换机 4、死信队列重点 5、延迟队列整合SpringBoot 6、备份交换机重点 什么是 RabbitMQ ? RabbitMQ 是流行的消息队列服务软件是开源的AMQP高级消息队列协议实现支持多种客户端如Java、Python、C、PHP、Ruby、JavaScript等用于在分布式系统中存储转发消息可以实现异步处理、流量削峰、系统解耦在易用性、扩展性、高可用等方面表现优异 1、初识 RabbitMQ 消息队列 1.1 MQ 四大核心概念 生产者 产生数据发送消息的程序 交换机 交换机是 RabbitMQ 非常重要的一个部件一方面它接收来自生产者的消息另一方面它将消息推送到队列中交换机必须确切知道如何处理它接收到的消息是将这些消息推送到特定队列还是推送到多个队列亦或者是把消息丢弃这个得有交换机类型决定 队列 队列是 RabbitMQ 内部使用的一种数据结构尽管消息流经 RabbitMQ 和应用程序但它们只能存储在队列中队列仅受主机的内存和磁盘限制的约束本质上是一个大的消息缓冲区许多生产者可以将消息发送到一个队列许多消费者可以尝试从一个队列接收数据这就是我们使用队列的方式 消费者 消费与接收具有相似的含义消费者大多时候是一个等待接收消息的程序生产者消费者和消息中间件很多时候并不在同一机器上同一个应用程序既可以是生产者又是可以是消费者 以下是 RabbitMQ 的原理图 1.2 消息的发送无交换机态 这里使用MQ中间件进行简单的消息发送大致流程图如下所示 这里需要注意的是当一次性有多条消息发送到队列时这时需要多个消费者工作线程消费者进行消费信息是根据轮询的方式进行消费 创建一个Utils工具类与 MQ 进行交互连接 /*** 这里是与 MQ 交互的工具类*/ public class RabbitMQUtils {public static Channel getChannel() throws IOException, TimeoutException {ConnectionFactory factory new ConnectionFactory(); //创建连接工厂factory.setHost(192.168.101.65);factory.setUsername(admin);factory.setPassword(123);Connection connection factory.newConnection(); //创建连接return connection.createChannel(); //获取连接信道} }【消息生产者】代码如下所示 /*** 生产者*/ public class Produce {public static final String QUEUE_NAME hello; //队列名称public static void main(String[] args) throws IOException, TimeoutException {//这里创建一个工厂与 RabbitMQ 进行交互Channel channel01 RabbitMQUtils.getChannel();//1.队列名称 2.队列是否持久化 3.消息是否供多个消费者消费 4.消息是否自动删除 5.其他参数channel01.queueDeclare(QUEUE_NAME,false,false,false,null);String message hello mq; //发消息//1.对应的交换机 2.路由的KEY值本次是队列名 3.其他参数 4.发送消息的消息体channel01.basicPublish(,QUEUE_NAME,null,message.getBytes());System.out.println(消息发送完毕!);} } 消息栏 RabbitMQ 中以上创建的 hello 队列 【消息消费者】代码如下所示 /*** 消费者*/ public class Consumer {public static final String QUEUE_NAME hello; //要进行消费消息的队列public static void main(String[] args) throws IOException, TimeoutException {//创建连接工厂与MQ进行交互Channel channel RabbitMQUtils.getChannel();//接收消息的回调DeliverCallback deliverCallback (consumerTag,message)-{System.out.println(成功接收消息new String(message.getBody())); //接收其消息的消息体才能显示对应的消息};//取消消息时的回调CancelCallback cancelCallback (consumerTag) -{System.out.println(consumerTag 消费者的消息被中断!);};/*** 1.要被消费信息的队列* 2.消费成功之后是否需要自动应答* 3.消费成功时的回调* 4.取消消息发送时的回调*///消费者消费信息channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);} }消息栏 MQ 中的消息已经被消费 1.3 关于消息自动重新入队 如果消费者由于某些原因失去连接(其通道已关闭连接已关闭或 TCP 连接丢失)导致消息 未发送 ACK 确认RabbitMQ 将了解到消息未完全处理并将对其重新排队如果此时其他消费者可以处理它将很快将其重新分发给另一个消费者这样即使某个消费者偶尔死亡也可以确 保不会丢失任何消息 1.3.1 消息的常见应答方法R Channel.basicAck (用于肯定确认) RabbitMQ 已知道该消息并且成功的处理消息可以将其丢弃了 Channel.basicNack (用于否定确认) Channel.basicReject (用于否定确认) 与 Channel.basicNack 相比少一个参数         不处理该消息了直接拒绝可以将其丢弃了 丢失的消息重新入队传递给正常工作的消费者进行消费的大致图 由于生产者的代码没有改变这里就不写了以下是消费者两个消费者只有 sleep 的时间不一样关于 ACK 手动应答消息的代码 /*** 这里是消费者手动接受消息 ACK使发送失败的消息重新排队*/ public class Consumer01 {public static final String QUEUE_NAME ack_queue;public static void main(String[] args) throws IOException, TimeoutException {Channel channel RabbitMQUtils.getChannel();SleepUtils.sleep(8); //模拟消息多的情况//1、接收到消息的回调DeliverCallback deliverCallback (consumerTag,message) -{System.out.println(接收到消息 new String(message.getBody(), StandardCharsets.UTF_8));/*** 手动应答* 1. 消息的标记* 2. 是否批量应答信道中的消息*/channel.basicAck(message.getEnvelope().getDeliveryTag(), false);};//2.消息中断的回调CancelCallback cancelCallback (consumerTag) - {System.out.println(consumerTag 消费者取消了消息的接收!);};//3.使用手动应答boolean autoACK false;channel.basicConsume(QUEUE_NAME,autoACK,deliverCallback,cancelCallback);} }首先创建两个消费者分别为C1和C2这里生产者连续发送四条消息  消费者一 处于正常状态消费者二 接收了一条消息后就宕机了这时消费者一 将发送失败的消息从信道中取出并进行消费结果图如下所示 消费者一         消费者二 可见 就算消费者二突然宕机RabbitMQ 依然采用轮询方式将发送失败的消息轮询给正常工作的消费者 1.4 关于 RabbitMQ 的持久化、不公平分发以及预取值 队列的持久化 平时消息队列都是保存在内存中若 RabbitMQ服务 突然停止则之前的队列都会消失所以为了减少损失的可能性通常将消息队列保存到磁盘上即持久化 boolean durable true; //将队列进行持久化//1.队列名称 2.队列是否持久化 3.消息是否供多个消费者消费 4.消息是否自动删除 5.其他参数channel01.queueDeclare(QUEUE_NAME,durable,false,false,null);消息的持久化 将 MessageProperties.PERSISTENT_TEXT_PLAIN 标识放入 basicPublish消息发送方法的第三个参数中以开启消息持久化 将消息标记为持久化并不能完全保证不会丢失消息尽管它告诉 RabbitMQ 将消息保存到磁盘但是这里依然存在当消息刚准备存储在磁盘的时候还没有存储完消息还在缓存的一个间隔点此时并没有真正写入磁盘持久性保证并不强 while (sc.hasNext()){String message sc.next(); //发消息//1.对应的交换机 2.路由的KEY值本次是队列名 3.其他参数 4.发送消息的消息体channel01.basicPublish(,QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());System.out.println(消息发送完毕!);}不公平分发在消费者处开启 相对于轮询分发不公平分发采用能者多劳的策略谁干的快消息就先给谁发送避免慢进程拖慢整个服务的进度 预取值 不公平分发的值设置为1若设置的数值大于1则表示为预取值所谓的预取值是设置消费者缓冲信道中最大存储的数量 比如消费者C1设置预取值为2消费者C2设置预取值为5假设有8条消息进来时C1有可能消费了3条因为已经消费的消息不算入“预取值”内而C2信道中存入5条消息若这五条消息即使还未被C2消费C1也不能将其消费因为这5条消息已经放入C2的信道中进行等待排队了 MQ 中 可见这里明确标明了对应消费者的预取值 2、RabbitMQ 消息的发布确认 在设置发布确认时一般有三个步骤 设置队列的持久化  ---  设置队列中的消息进行持久化  ---   通过MQ将消息保存在磁盘上然后MQ跟生产者说明一声 “已经保存在磁盘上”这里就是消息确认 2.1 MQ的单个确认发布 定义 这是一种简单的确认方式它是一种同步确认发布的方式也就是发布一个消息之后只有它被确认发布后续的消息才能继续发布如果没有确认发布的消息就会阻塞所有后续消息的发布 这里是模拟消息单个确认的代码 waitForConfirmsOrDie(long) 这个方法只有在消息被确认的时候才返回如果在指定时间范围内这个消息没有被确认那么它将抛出异常也可以不指明时间范围 /*** 单个消息确认发布*/public static void SingleConfirmMessage () throws Exception{Channel channel RabbitMQUtils.getChannel();String QUEUE_NAME UUID.randomUUID().toString();channel.queueDeclare(QUEUE_NAME,durable,false,false,null); //开启队列的持久化//开启消息的发布确认channel.confirmSelect();long begin System.currentTimeMillis(); //开始时间for (int i 0;iMESSAGE_COUNT;i){String message i ;channel.basicPublish(,QUEUE_NAME,null,message.getBytes());//消息单个确认发送成功一次就确认一次boolean singleRes channel.waitForConfirms();if(singleRes) {System.out.println(消息发布成功);}}long end System.currentTimeMillis(); //结束时间System.out.println(总耗时为(end-begin)ms);}运行结果 可见总耗时时间为 1269 ms虽然保证了消息的可靠性但是性能下来了需要一条条确认 2.2 MQ的批量确认发布 定义 这是也一种同步确认发布消息的方式先发布一批消息然后一起确认可以极大地提高吞吐量当然这种方式的缺点就是由于是批量确认发布当发生故障导致发布出现问题时不知道是哪个消息出现问题了我们必须将整个批处理保存在内存中以记录重要的信息而后重新发布消息 这里是模拟批量确认发布的代码 /*** 批量消息确认发布*/public static void BatchConfirmMessage() throws Exception{Channel channel RabbitMQUtils.getChannel();String QUEUE_NAME UUID.randomUUID().toString();channel.queueDeclare(QUEUE_NAME,durable,false,false,null); //开启队列的持久化//开启消息的发布确认channel.confirmSelect();long begin System.currentTimeMillis(); //开始时间int batchSize 100; //批量确认的长度for (int i0;iMESSAGE_COUNT;i){String message i ;channel.basicPublish(,QUEUE_NAME,null,message.getBytes());//每发送一百条数据就进行批量发布确认if(i % batchSize 0){boolean batchRes channel.waitForConfirms();if(batchRes) {System.out.println(批量发送消息成功!);}}}long end System.currentTimeMillis(); //结束时间System.out.println(总耗时为(end-begin)ms);}运行结果 可见总耗时为 199 ms 相比于单个确认发布在性能方面有了很大的提升但是容错率相对来说就升高了因为由于批量很难确定是哪一条消息出现了错误 2.3 MQ的异步确认发布重点 定义 很显然这是一种异步确认发布消息的方式异步虽然编程逻辑比上两个要复杂但是性价比最高无论是可靠性还是效率都没得说它是利用回调函数来达到消息可靠性传递的这个中间件也是通过函数回调来保证是否投递成功 大致流程图 异步发送消息时不需要等待当前消息经过确认后才能将之后的消息发送出去我们要做的只是发布消息其余的交给 broker 中间人处理而最终的消息是否发布成功取决于之后的回调确认消息的函数由于每一个发出去的消息都有 KEY 和 VALUE 因此我们能很快的找到对应发送失败的消息 存在问题 由于监听器是在发布消息完成后执行的所以采用一般的方法是检测不到发送失败的消息 解决方案 最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列 比如说用 ConcurrentLinkedQueue 这个队列在确认回调与发布线程之间进行消息的传递 ConcurrentLinkedQueue是Java中的一个并发队列实现它提供了线程安全的无界队列unbounded queue功能它是基于链表实现的队列可以在多线程环境下高效地进行元素插入和删除操作 这里是模拟异步确认发布的代码 /*** 异步确认发布消息*/public static void AsynchronousConfirmMessage() throws Exception{Channel channel RabbitMQUtils.getChannel();String QUEUE_NAME UUID.randomUUID().toString();channel.queueDeclare(QUEUE_NAME,durable,false,false,null); //开启队列的持久化//1.开启消息的发布确认channel.confirmSelect();/*** 线程安全有序的一个哈希表适用于高并发的情况* 【优点】* 1.将消息与对应的序号相关联* 2.批量的根据序号删除条目* 3.支持高并发*/ConcurrentSkipListMapLong,String concurrentSkipListMap new ConcurrentSkipListMap();//2.这里准备消息监听器以便于监听消息的成功与否 (delivery消息的编号multiple用来判断是否为批量)ConfirmCallback ACK_callback (deliveryTag,multiple) -{ //消息确认成功 回调函数//2.2【第二步】若是批量发消息则进行批量的删除//【注意这里只有已经确认的消息不会干扰到未确认的消息】if(multiple) {ConcurrentNavigableMapLong, String concurrentNavigableMap concurrentSkipListMap.headMap(deliveryTag);concurrentNavigableMap.clear();}else {//2.3 若是单个发消息则单个删除concurrentSkipListMap.remove(deliveryTag);}log.info(确认的消息编号deliveryTag);};ConfirmCallback NACK_callback (deliveryTag,multiple)-{ //消息确认失败 回调函数log.error(未确认的消息编号deliveryTag);};channel.addConfirmListener(ACK_callback,NACK_callback); //将以上回调确认函数添加到监听器中long begin System.currentTimeMillis(); //开始时间//【第一步】这里为模拟消息的发送for (int i0;iMESSAGE_COUNT;i){String message i ;channel.basicPublish(,QUEUE_NAME,null,message.getBytes());//3.记录消息的总和往里面存入信道的序号以及对应序号的信息//channel.getNextPublishSeqNo() 表示获取当前消息的下一个消息编号concurrentSkipListMap.put(channel.getNextPublishSeqNo(),message);}long end System.currentTimeMillis(); //结束时间System.out.println(总耗时为(end-begin)ms);}关于其中使用 concurrentSkipListMap.headMap(deliveryTag) 进行批量删除的解释说明 批量删除的目的是在消息确认成功时删除已经确认的消息以便只保留未确认的消息。为了实现这一点代码使用了 headMap(deliveryTag)方法来获取 concurrentSkipListMap 中小于 deliveryTag的所有键值对子集 这是为什么使用 headMap(deliveryTag)的原因 ConcurrentSkipListMap是一个有序映射按照键的自然顺序排序并且支持高效的范围查询headMap(deliveryTag)方法返回一个新的ConcurrentNavigableMap该映射包含了所有键小于deliveryTag的键值对子集 批量确认通常涉及到确认多个消息这些消息的deliveryTag可能是连续的例如如果要确认deliveryTag为1到5的消息那么可以使用headMap(deliveryTag)来一次性获取deliveryTag小于6的所有消息这样可以高效地删除多个消息而不需要单独操作每个消息 使用 concurrentSkipListMap.headMap(deliveryTag)可以确保只删除已确认的消息而不会影响未确认的消息 批量删除的实现方法非常简单只需调用concurrentNavigableMap.clear()它会清除concurrentNavigableMap中的所有键值对这些键值对对应于已确认的消息 总结使用 headMap(deliveryTag)来获取小于 deliveryTag的消息是为了高效地批量删除已确认的消息批量删除的方法是调用 clear()来清除这些已确认的消息以确保只保留未确认的消息在 concurrentSkipListMap中这有助于有效地管理消息状态并节省操作的开销 运行结果 可见异步确认发布消息效率比以上两种方式都要高由于是异步发送的消息所以顺序会很不一致 3、关于 Exchanges 交换机 四种 MQ 交换机未按先后排序 直连交换机Direct Exchange 直连交换机是最简单的交换机类型它通过将消息的路由键Routing Key与绑定键Binding Key进行精确匹配来进行消息路由消息生产者将消息发送到具有匹配路由键的交换机上然后交换机将消息传递给与绑定键匹配的队列 【消息生产者】 public static final String Exchange_Name exchange_direct; //交换机public static void main(String[] args) throws Exception{Channel channel RabbitMQUtils.getChannel();//1.声明一个交换机channel.exchangeDeclare(Exchange_Name, BuiltinExchangeType.DIRECT);Scanner sc new Scanner(System.in);while (sc.hasNext()){String message sc.next();channel.basicPublish(Exchange_Name,error,null,message.getBytes());System.out.println(消息发送成功!);}}【消息消费者一】 public static final String Exchange_Name exchange_direct; //交换机public static final String QUEUE_NAME disk; //队列名称public static void main(String[] args) throws Exception {Channel channel RabbitMQUtils.getChannel();//1.声明一个交换机channel.exchangeDeclare(Exchange_Name, BuiltinExchangeType.DIRECT);//2.声明队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//3.将交换机与队列进行绑定channel.queueBind(QUEUE_NAME,Exchange_Name,error);System.out.println(【消费者二】等待接收信息...);//4.对应的回调函数DeliverCallback deliverCallback (consumerTag,message)-{System.out.println(【消费者二】接收到信息new String(message.getBody(), StandardCharsets.UTF_8));};CancelCallback cancelCallback (consumerTag)-{System.out.println(consumerTag【消费者二】取消了消息的发送);};//5.使用自动应答channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}【消息消费者二】 这里是消费者二设置的 Routing Key //3.将交换机与队列进行绑定channel.queueBind(QUEUE_NAME,Exchange_Name,info);channel.queueBind(QUEUE_NAME,Exchange_Name,warning);    运行结果 可见消息生产者将消息发送到对应的 Routing Key 处即消费者二而消费者一没有接收到消息 主题交换机Topic Exchange 主题交换机允许使用 “通配符匹配的方式” 进行消息路由其中路由键可以使用 * 代表单个单词# 代表多个单词它能够根据消息的路由键模式与队列的绑定键模式进行匹配以实现更灵活的消息路由 需要注意的是 当一个队列绑定键是 “ # ” 那么这个队列将接收所有数据类似 FANOUT交换机如果队列绑定键中没有 “*” 和 “#” 出现则类似 DIRECT交换机 由此可发现TOPIC交换机包括了扇出和直接交换机功能更加强大 【消息生产者】 这里使用 Map 集合将绑定键以及对应的消息进行封装然后再依次遍历取出 public static final String Exchange_Name exchange_topic; //交换机public static void main (String[] args) throws Exception {try (Channel channel RabbitUtils.getChannel()) {//1.声明一个交换机channel.exchangeDeclare(Exchange_Name, BuiltinExchangeType.TOPIC);/*** Q1--绑定的是* 中间带 orange 带 3 个单词的字符串(*.orange.*)* Q2--绑定的是* 最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)* 第一个单词是 lazy 的多个单词(lazy.#)**/MapString, String bindingKeyMap new HashMap();bindingKeyMap.put(quick.orange.rabbit, 被队列 Q1Q2 接收到);bindingKeyMap.put(lazy.orange.elephant, 被队列 Q1Q2 接收到);bindingKeyMap.put(quick.orange.fox, 被队列 Q1 接收到);bindingKeyMap.put(lazy.brown.fox, 被队列 Q2 接收到);bindingKeyMap.put(lazy.pink.rabbit, 虽然满足两个绑定但只被队列 Q2 接收一次);bindingKeyMap.put(quick.brown.fox, 不匹配任何绑定不会被任何队列接收到会被丢弃);bindingKeyMap.put(quick.orange.male.rabbit, 是四个单词不匹配任何绑定会被丢弃);bindingKeyMap.put(lazy.orange.male.rabbit, 是四个单词但匹配 Q2);//2.map 中进行 key-value 值的遍历操作依次放入信道中进行消息的发送for (Map.EntryString, String bindingKeyEntry : bindingKeyMap.entrySet()) {String bindingKey bindingKeyEntry.getKey();String message bindingKeyEntry.getValue();channel.basicPublish(Exchange_Name, bindingKey, null,message.getBytes(UTF-8));System.out.println(生产者发出消息 message);}} } 【消息消费者一】 public static final String Exchange_Name exchange_topic; //交换机public static final String QUEUE_NAME Q1; //队列名称public static void main(String[] args) throws Exception {Channel channel RabbitMQUtils.getChannel();//1.声明一个交换机channel.exchangeDeclare(Exchange_Name, BuiltinExchangeType.TOPIC);//2.声明队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//3.将交换机与队列进行绑定channel.queueBind(QUEUE_NAME,Exchange_Name,*.*.rabbit);channel.queueBind(QUEUE_NAME,Exchange_Name,lazy.#);System.out.println(【消费者一】等待接收信息...);//4.对应的回调函数DeliverCallback deliverCallback (consumerTag,message)-{System.out.println(【消费者一】接收到信息new String(message.getBody(), StandardCharsets.UTF_8));};CancelCallback cancelCallback (consumerTag)-{System.out.println(consumerTag【消费者一】取消了消息的发送);};//5.使用自动应答channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}【消息消费者二】 只有绑定键与消费者一不同 //3.将交换机与队列进行绑定channel.queueBind(QUEUE_NAME,Exchange_Name,*.orange.*);运行结果 消费者一消费者二 扇形交换机Fanout Exchange 扇形交换机将消息广播到所有绑定到该交换机上的队列中无需考虑路由键的匹配无论绑定键的数量和绑定队列的数量如何扇形交换机都会将消息复制并发送到所有队列中 【消息生产者】 这里只展示需要另外添加的部分语句        //1.声明一个交换机channel.exchangeDeclare(Exchange_Name, BuiltinExchangeType.FANOUT);【消息消费者】 这里只展示需要另外添加的部分语句  //1.声明一个交换机channel.exchangeDeclare(Exchange_Name, BuiltinExchangeType.FANOUT);//2.声明队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//3.将交换机与队列进行绑定channel.queueBind(QUEUE_NAME,Exchange_Name,);标题交换机Headers Exchange 标题交换机使用消息的标头Headers进行匹配并根据标头的匹配结果将消息路由到相关的队列它不依赖于路由键或绑定键而是使用消息的标头信息进行匹配过滤 4、死信队列重点 定义死信顾名思义死掉的信息即无法被消费的消息由于存在有该类型的消息所以对应保存该类型的队列随即产生即死信队列 应用场景为了保证订单业务的消息数据不丢失需要使用到 RabbitMQ 的死信队列机制当消息 消费发生异常时将消息投入死信队列中例用户在商城下单成功并点击去支付后在指定时间未支付则自动失效 死信队列大致流程图 正常情况下生产者producer将消息通过普通交换机normal_exchange所绑定的普通队列normal_queue发送到消费者 C1中而异常情况下将异常的消息保存到死信队列dead_queue并发送到消费者 C2 中 【消费者C1】 C1需要不仅需要处理正常消息的发送还需要处理失效消息往死信队列中的传递以及普通队列与死信队列之间的绑定关系 /*** 模拟死信队列** 消费者 C1*/ public class Receive01 {public static final String Normal_Exchange normal_exchange; //普通交换机public static final String Dead_Exchange dead_exchange; //死信交换机public static final String Normal_Queue normal_queue; //普通队列public static final String Dead_Queue dead_queue; //死信队列public static void main(String[] args) throws Exception {Channel channel RabbitMQUtils.getChannel();//1.声明死信以及普通队列的交换机channel.exchangeDeclare(Normal_Exchange,BuiltinExchangeType.DIRECT);channel.exchangeDeclare(Dead_Exchange,BuiltinExchangeType.DIRECT);//2.声明死信队列以及普通队列HashMapString, Object deadLetters new HashMap();//2.1 普通队列设置死信交换机注意这里的 KEY 是固定的deadLetters.put(x-dead-letter-exchange,Dead_Exchange);//2.2 设置死信的 Routing KeydeadLetters.put(x-dead-letter-routing-key,list);//2.3 设置过期时间这里不进行设置 // deadLetters.put(x-message-ttl,10000);channel.queueDeclare(Normal_Queue,false,false,false,deadLetters);channel.queueDeclare(Dead_Queue,false,false,false,null);//3.将队列与交换机进行绑定channel.exchangeBind(Normal_Queue,Normal_Exchange,zhangsan);channel.exchangeBind(Dead_Queue,Dead_Exchange,lisi);System.out.println(等待接收消息中.....);//4.接收到消息的回调DeliverCallback deliverCallback (consumerTag,message) -{System.out.println(消费者一接收到消息 new String(message.getBody(), StandardCharsets.UTF_8));};//5.消息中断的回调CancelCallback cancelCallback (consumerTag) - {System.out.println(consumerTag 消费者取消了消息的接收!);};channel.basicConsume(Normal_Queue,true,deliverCallback,cancelCallback);} }【生产者Producer】 这里模拟的是消息被拒 生产者不需要管理队列的消息是否发送成功只需要将消息发送到普通队列中 public static final String Normal_Exchange normal_exchange; //普通交换机public static final int MESSAGE_COUNT 10; //消息的总数public static final String TTL_TIME 10000; //过期时间public static void main(String[] args) throws Exception {Channel channel RabbitMQUtils.getChannel();//1.声明一个交换机channel.exchangeDeclare(Normal_Exchange, BuiltinExchangeType.DIRECT);//2.死信消息设置 TTL 消息过期时间过期则传送到死信队列中AMQP.BasicProperties properties new AMQP.BasicProperties().builder().expiration(TTL_TIME).build(); //10s//3.这里进行依次发送消息同时设置了消息过期时间for (int i0;iMESSAGE_COUNT;i){String message infoi;channel.basicPublish(Normal_Exchange,zhangsan,properties,message.getBytes());System.out.println(消息生产者发送消息message);}}【消费者C2】 消费者C2的任务是只需要消费死信队列中的消息 public static final String DEAD_QUEUE dead_queue;public static void main(String[] args) throws Exception {Channel channel RabbitMQUtils.getChannel();//1.声明死信队列channel.queueDeclare(DEAD_QUEUE,false,false,false,null);System.out.println(消费者二等待接收消息...);DeliverCallback deliverCallback (consumerTag,message)-{System.out.println(消费者二接收到消息 new String(message.getBody(), StandardCharsets.UTF_8));};CancelCallback cancelCallback (consumerTag)-{System.out.println(consumerTag消费者中断了消息..);};//2.消费死信队列中的消息channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);}运行结果 MQ 中 这里进行中断普通队列的接收模拟死信队列场景 当死信队列中存在未被消费的消息时C2感应到存在的消息并将之前发送失败的消息进行消费 5、延迟队列整合SpringBoot 定义 延时队列队列内部是有序的最重要的特性就体现在它的延时属性上延时队列中的元素是希望 在指定时间到了以后或之前取出和处理简单来说延时队列就是用来存放需要在指定时间被处理的元素的队列 延迟队列一般使用的场景 订单在十分钟之内未支付则自动取消新创建的店铺如果在十天内都没有上传过商品则自动发送消息提醒用户注册成功后如果三天内没有登陆则进行短信提醒用户发起退款如果三天内没有得到处理则通知相关运营人员预定会议后需要在预定的时间点前十分钟通知各个与会人员参加会议 代码架构图 创建两个队列 QA 和 QB两者队列 TTL 分别设置为 10S 和 40S然后在创建一个交换机 X 和死信 交换机 Y它们的类型都是 Direct创建一个 死信队列 QD它们的绑定关系如下 当然这里进行整合 SpringBoot 进行使用 【这里是 application.yaml 文件中的配置】 spring:#这里是 RabbitMQ 的配置rabbitmq:port: 5672 #指定的 rabbitMQ 服务器端口号username: adminpassword: 123host: 192.168.101.65【这里是延迟队列队列以及交换机的配置类】 这里是图中的中间那一段也就是队列以及交换机的声明以及绑定 /*** 延迟队列中【队列】与【交换机】的 声明 以及 绑定 的配置类*/ Configuration public class TTLQueueConfig {//普通交换机public static final String Normal_Exchange X;//死信交换机public static final String Dead_Exchange Y;//普通队列public static final String Normal_QueueA QA;public static final String Normal_QueueB QB;//死信队列public static final String Dead_Queue QD;//1.声明交换机//1.1声明普通交换机Bean(Normal_Exchange)public DirectExchange Normal_Exchange(){return new DirectExchange(Normal_Exchange);}//1.2 声明死信交换机Bean(Dead_Exchange)public DirectExchange Dead_Exchange(){return new DirectExchange(Dead_Exchange);}//2.声明普通队列与死信交换机进行绑定并声明过期时间Bean(Normal_QueueA) //【队列A】public Queue QA(){HashMapString, Object arguments new HashMap(2); //这里初始化 map 的长度加快编译速度//2.1 设置死信交换机arguments.put(x-dead-letter-exchange,Dead_Exchange);//2.2 设置死信 Routing Keyarguments.put(x-dead-letter-routing-key,YD);//2.3 设置 TTL 过期时间 // arguments.put(x-message-ttl,10000);return QueueBuilder.durable(Normal_QueueA) //开启队列持久化.withArguments(arguments).ttl(10000).build();}Bean(Normal_QueueB) //【队列B】public Queue QB(){HashMapString, Object arguments new HashMap(2); //这里初始化 map 的长度加快编译速度arguments.put(x-dead-letter-exchange,Dead_Exchange);arguments.put(x-dead-letter-routing-key,YD); // arguments.put(x-message-ttl,30000);return QueueBuilder.durable(Normal_QueueB).withArguments(arguments).ttl(30000).build();}//3.声明死信队列Bean(Dead_Queue)public Queue QD(){return QueueBuilder.durable(Dead_Queue).build();}//4.将普通交换机与队列A进行绑定Beanpublic Binding QA_Binding_NormalQueue(Qualifier(Normal_QueueA) Queue queueA,Qualifier(Normal_Exchange) DirectExchange normalExchange){return BindingBuilder.bind(queueA).to(normalExchange).with(XA);}//4.1将普通交换机与队列B进行绑定Beanpublic Binding QB_Binding_NormalQueue(Qualifier(Normal_QueueB) Queue queueB,Qualifier(Normal_Exchange) DirectExchange normalExchange){return BindingBuilder.bind(queueB).to(normalExchange).with(XB);}//4.2将死信交换机与死信队列进行绑定Beanpublic Binding QD_Binding_DeadExchange(Qualifier(Dead_Queue)Queue deadQueue,Qualifier(Dead_Exchange)DirectExchange deadExchange){return BindingBuilder.bind(deadQueue).to(deadExchange).with(YD);}}【消息生产者】 这里打算发送一个消息请求分别给不同的 TTL 队列 /*** 消息生产者** 这里进行发送 http://localhost:8080/ttl/sendMsg/小白*/ Slf4j RestController RequestMapping(/ttl) public class SendMsgController {ResourceRabbitTemplate rabbitTemplate;//发送消息GetMapping(/sendMsg/{message})public void sendMsg(PathVariable(message)String message){log.info(当前时间{},发送了一条信息{}给两个 TTL 队列,new Date(),message);rabbitTemplate.convertAndSend(X,XA,消息来自10s的队列message);rabbitTemplate.convertAndSend(X,XB,消息来自30s的队列message);} }【消息消费者】 使用 Listener 监听器进行死信队列的监听 /*** 这里是消息的消费者*/ Slf4j Component public class DeadLetterConsumer {//死信队列中进行接收 TTL 延迟消息RabbitListener(queuesQD) //调用监听器监听死信队列public void DeadQueue_consumer(Message message, Channel channel){String msg new String(message.getBody());log.info(当前时间{}消费者死信队列接收到消费的消息{},new Date().toString(),msg);} }运行结果 发现问题 若以后需要多个不同的 TTL 消息那么就需要建立多个消息队列以达到传递不同 TTL 的消息这样导致耦合度升高不符合开闭原则所以接下来进行延迟队列的优化 解决问题 新增一个 QC 队列这个队列不设置延迟时间而是让 Producer消息生产者 发送消息的时候进行设置消息的 TTL 时间这样就不用频繁改动队列的 TTL 时间 做绑定的代码跟上面一样不做展示这里是生产者的代码进行设置发送消息的 TTL时间 GetMapping(/sendMsg/ttl/{message}/{ttlTime})public void QC_sendMsgByTTL(PathVariable(message) String message,PathVariable(ttlTime) String ttlTime){log.info(当前时间{},发送了一条消息给QC{},new Date().toString(),message);rabbitTemplate.convertAndSend(X,QC,message,msg-{//这里进行设置消息的 TTL 时间msg.getMessageProperties().setExpiration(ttlTime);return msg;});}运行结果这里使用的是低版本的 RabbitMQ 由结果可知由于队列的先进先出特性先发的 TTL消息若时间设置大于后发的 TTL消息那么后发的消息就会被堵塞直到先发的 TTL消息发送完毕后发的 TTL 消息才能继续发送这是一个弊端 这里需要注意的是版本高一些的 RabbitMQ 已经修复了以上问题 6、备份交换机重点 定义 在消息无法被路由到任何队列时将这些消息发送到备份交换机指定的交换机中而不是直接丢弃这些消息这增加了消息传递的可靠性可以用于处理路由失败或未匹配的消息确保消息不会因无法路由而丢失并允许进行相应的处理或日志记录备份交换机通常用于处理消息路由失败的情况以提供额外的消息处理机制 大致流程图如下 【application.yaml配置类】 这里需要手动的开启消息回调与失败消息的回退 spring:#这里是 RabbitMQ 的配置rabbitmq:port: 5672 #指定的 rabbitMQ 服务器端口号username: adminpassword: 123host: 192.168.101.65#这里进行开启发布确认以及消息的回调publisher-confirm-type: correlated#将发送失败的消息回退给生产者publisher-returns: true【消息回调接口】 由于 MyCallBack 方法重写了 RabbitTemplate 接口中的 ConfirmCallback 以及 ReturnCallback 方法需要将 RabbitTemplate 对象进行注入才能进行调用自己重写的RabbitTemplate 内部方法其中PostConstruct   是一个在Spring Bean初始化完成后执行的注解init()方法 被标记为 PostConstruct因此它将在该Bean的初始化阶段被调用 需要注意的是RabbitTemplate 注入必须在 init 前不然会报未注入异常 Slf4j Component public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {ResourceRabbitTemplate rabbitTemplate;/*** 由于这里重写继承接口中的方法所以需要进行注入操作*/PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}/*** 交换机回调信息的接口* param correlationData 保存回调消息的 id 以及相关的信息* param Ack_Message 消息确认* param reason 消息发送失败回调的原因*/Overridepublic void confirm(CorrelationData correlationData, boolean Ack_Message, String reason) {String messageId ;if (ObjectUtil.isNotNull(correlationData)) { //判断是否为空防止空指针异常messageId correlationData.getId();}if (Ack_Message) {log.info(成功接收到消息消息ID为{}, messageId);} else {log.info(接收消息失败消息ID为{},失败的原因为{}, messageId, reason);}}/*** 将发送失败的消息进行回退*/Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.error(消息“{} 被交换机{} 退回退回的原因{}消息的 RoutingKey{},new String(returnedMessage.getMessage().getBody()),returnedMessage.getExchange(),returnedMessage.getReplyText(),returnedMessage.getRoutingKey());} }
http://www.zqtcl.cn/news/525166/

相关文章:

  • 关于电子商务网站建设的现状企业公示信息查询系统山西
  • 网站开发 翻译长春建站企业
  • dedecms网站网站解析一般什么时候
  • 制作网站的技术北京律师24小时电话
  • 可拖拽 网站建设如何做自媒体和网站签约赚点击
  • 做网站选哪个语言怎么登录百度app
  • 国发网站建设网站优化主要优化哪些地方
  • 快速微信网站开发定制网站建设费用预算
  • 网站制作叫什么知名网站建设制作
  • 网络营销网站建设公司h5应用
  • 网站开发合同要上印花税吗南江红鱼洞水库建设管理局网站
  • 疏通下水道网站怎么做wordpress 恢复初始化
  • 电脑商业网站怎的做软文推广渠道
  • 自己做网站需要买什么如何做微信商城网站
  • 有了网站开发app是不是更容易自建网站管理
  • 网站将要准备建设的内容有哪些做外贸有效的网站
  • 网站设计博客网站内容添加
  • 网站建站行业新闻微盟开店怎么收费
  • 网站的建设参考文献郑州网站建设中国建设建设银行
  • 重庆那些公司的网站是网易做的电信100m光纤做网站
  • 网站怎么设计产品营销策略包括哪些内容
  • 天元建设集团有限公司破产重组河源seo排名
  • 网站权重什么意思seo的搜索排名影响因素有
  • 建设报名系统是正规网站吗计算机培训班出来好找工作吗
  • 网站上的文章用秀米可以做吗宁波外客网络科技有限公司
  • 网站底部导航代码成品视频直播软件推荐哪个好一点ios
  • 上海电商网站开发公司垫江网站建设价格
  • 门户网站建设存在问题与不足商城网站开发项目文档
  • wordpress建站方便吗wordpress加入海报功能
  • 网站名称注册保护2018wordpress主题