苏州网站公司排名前十,最好看的视频免费下载,有什么设计网站,如何在阿里云做网站说明#xff1a; 生产者P 往交换机X#xff08;typedirect#xff09;会发送两种消息#xff1a;一、routingKeyXA的消息#xff08;消息存活周期10s#xff09;#xff0c;被队列QA队列绑定入列#xff1b;一、routingKeyXB的消息#xff08;消息存活周期40s#xf… 说明 生产者P 往交换机Xtypedirect会发送两种消息一、routingKeyXA的消息消息存活周期10s被队列QA队列绑定入列一、routingKeyXB的消息消息存活周期40s被队列Q B队列绑定入列。QA、QB两个队列消息在失活变成死信消息以routingKeyYD发送到交换机Ytypedirect。队列QD用routingKey绑定交换机Y消息入列。消费者监听处理QD的消息。 这个设计模型达到了消息从生产者到消费者延迟10s、40s不等的延迟队列处理。 这里用SpringBoot mavendependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency在封装工具类中 其中【交换机】【队列】【绑定器】 可直接使用工具类这里对案例图所用到组件器声明注解出来。
框内的组件和关系 可以在SpringBoot配置类中做出如下的组件声明与关系绑定
package com.esint.configs;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** TTL延迟队列配置文件类**/
Configuration
public class TtlQueueConfig {////普通交换机的名称 Xpublic static final String X_EXCHANGE X;//死信交换机名称 Ypublic static final String Y_DEAD_LETTER_EXCHANGE Y;//普通队列QA QBpublic static final String QUEUE_A QA;public static final String QUEUE_B QB;//死信队列名称QDpublic static final String DEAD_LETTER_QUEUE QD;////声明X_EXCHANGEBean(xExchange)public DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}//声明死信交换Y_DEAD_LETTER_EXCHANGEBean(yExchange)public DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}//声明队列 QABean(queueA)public Queue queueA(){MapString, Object arguments new HashMap(3);//设置死信交换机arguments.put(x-dead-letter-exchange,Y_DEAD_LETTER_EXCHANGE);//设置死信RoutingKey 死信后充当了消费者的发送路由arguments.put(x-dead-letter-routing-key,YD);//消息过期时间arguments.put(x-message-ttl,10000);return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();}//声明队列 QBBean(queueB)public Queue queueB(){MapString, Object arguments new HashMap(3);//设置死信交换机arguments.put(x-dead-letter-exchange,Y_DEAD_LETTER_EXCHANGE);//设置死信RoutingKey 死信后充当了消费者的发送路由arguments.put(x-dead-letter-routing-key,YD);//消息过期时间arguments.put(x-message-ttl,40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}//声明死信队列QDBean(queueD)public Queue queueD(){return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();}//捆绑//绑定队列QA与交换机X_EXCHANGEBeanpublic Binding queueABingXExchange(Qualifier(queueA) Queue queueA,Qualifier(xExchange) DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with(XA);}//绑定队列QB与交换机X_EXCHANGEBeanpublic Binding queueBBingXExchange(Qualifier(queueB) Queue queueB,Qualifier(xExchange) DirectExchange xExchange){return BindingBuilder.bind(queueB).to(xExchange).with(XB);}//绑定队列QD与交换机Y_ExchangeBeanpublic Binding queueDBingYExchange(Qualifier(queueD) Queue queueD,Qualifier(yExchange)DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with(YD);}
}
生产者与交换机X这里方便测试 我们把生产者放在一个Controller逻辑里
package com.esint.controller;//发送延迟消息import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;Slf4j
RestController
RequestMapping(/ttl)
public class SendMesController {Autowiredprivate RabbitTemplate rabbitTemplate;GetMapping(/senMsg/{message})public void sendMes(PathVariable String message){log.info(当前时间{},发送一条消息给两个TTL队列{},new Date().toString(),message);rabbitTemplate.convertAndSend(X,XA,消息来自ttl为10s的队列message);rabbitTemplate.convertAndSend(X,XB,消息来自ttl为40s的队列message);}
}
消费者与死信队列创建一个监听者示例
package com.esint.consumer;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;/*** 队列TTL消费者*/Slf4j
Component
public class DeadLetterQueueConsumer {//接受消息RabbitListener(queues QD)public void receiveD(Message message, Channel channel) throws Exception{String msg new String(message.getBody());log.info(当前时间{},收到私信队列的消息{},new Date().toString(),msg);}
}
rabbitmq的配置文件
spring:rabbitmq:host: *.*.*.*port: 5672username: guestpassword: guest
接下来可以启动SpringBoot: 启动后配置方法类会把交换机/队列/绑定器初始化配置
队列
交换机 点开详细后也能考到他们之间的绑定关系 消息发布测试
生产者发送消息
浏览器
http://127.0.0.1:19092/ttl/senMsg/nice通过生产者发送nice
当前时间Tue Nov 21 14:50:05 CST 2023,发送一条消息给两个TTL队列nice消费者在10s后和40秒分别收到了消息 拓展是不是有一种可能如果再队列中不设置过期时间在生产者发送消息时设置过期时间 来实现过期时间自由设定而延迟自由 结论是不能 rabbitMQ队列只会检查第一个消息是否过期。举例如果第一个消息的ttl为30s第二个消息ttl为3s。第二个消息不会再3s后到达而是会在第一个过期后再第二个到达。 示例验证 增加一个无过期时间约束的队列以routing-key为XC绑定X交换机过期后以routing-key为YD绑定Y交换机。 过期时间放生产者发送时设定。 在的rabbitMQ配置类中增加QC 绑定前X routing-keyXC后Y routing-keyYD交换机 // 优化新增队列 队列不设置TTL过期时间 把过期时间放到生产者发送消息时public static final String QUEUE_C QC;//声明队列 QC 优化新增队列 队列不设置TTL过期时间 把过期时间放到生产者发送消息时Bean(queueC)public Queue queueC(){MapString,Object arguments new HashMap(2);//设置死信交换机arguments.put(x-dead-letter-exchange,Y_DEAD_LETTER_EXCHANGE);//设置routing-keyarguments.put(x-dead-letter-routing-key,YD);return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();}
//绑定队列QC与交换机X_EXCHANGE 优化新增队列 队列不设置TTL过期时间 把过期时间放到生产者发送消息时Beanpublic Binding queueCBindXExchange(Qualifier(queueC) Queue queueC,Qualifier(xExchange)DirectExchange xExchange){return BindingBuilder.bind(queueC).to(xExchange).with(XC);}
生产者
GetMapping(/sendttl/{message}/{ttlTime})public void sendMes(PathVariable String message,PathVariable String ttlTime){
/*** 死信队列做延迟时的缺陷* rabbitMQ只会检查第一个消息是否过期带来的问题就是如果第一个消息的ttl为30s第二个消息ttl为3s。第二个消息不会再3s后到达而是会在第一个过期后再第二个到达。*/log.info(当前时间{},发送一条ttl为{}ms的消息给QC队列{},new Date().toString(),ttlTime,message);rabbitTemplate.convertAndSend(X,XC,message,mes-{mes.getMessageProperties().setExpiration(ttlTime);return mes;});}消费者不变启动服务
生产者发送消息第一条 3000ms http://127.0.0.1:19092/ttl/sendttl/第一条30000ms消息/30000 http://127.0.0.1:19092/ttl/sendttl/第二条3000ms消息/3000 结论第二条虽然早早过期它依然需要等待第一条过期后才能排到他。rabbitMQ的队列过期检查机制。