福州网站建设,淘宝网店设计制作,设计网站一般要多少钱,上杭网站设计公司安装延迟插件
根据rabbitmq的版本下载插件版本
# 延迟队列插件下载地址
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases# 将本地下载好的插件复制到docker里
# docker cp rabbitmq_delayed_message_exchange-3.9.0.ez 容器名:/plugins
docker cp r…安装延迟插件
根据rabbitmq的版本下载插件版本
# 延迟队列插件下载地址
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases# 将本地下载好的插件复制到docker里
# docker cp rabbitmq_delayed_message_exchange-3.9.0.ez 容器名:/plugins
docker cp rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins# 开启延迟队列插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange# 查看插件
rabbitmq-plugins listconfig配置
public class RabbitMqInfo {public static final String EXCHANGE_NAME myDelayedExchange; // 交换机public static final String QUEUE_NAME delayed_queue; // 队列名称public static final String ROUTING_KEY delayed.routing.key; // routing_key
}import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.*;Configuration
public class RabbitMqConfig {Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory new CachingConnectionFactory();connectionFactory.setHost(127.0.0.1); // host主机connectionFactory.setPort(5672); // 端口号connectionFactory.setUsername(admin); // 用户名connectionFactory.setPassword(admin); // 密码connectionFactory.setVirtualHost(/); // Virtual Hostsreturn connectionFactory;}/*** 交换机** return*/Beanpublic CustomExchange delayedExchange() {HashMapString, Object map new HashMap();map.put(x-delayed-type, direct); // 看图创建交换机输入的 Argumentsreturn new CustomExchange(RabbitMqInfo.EXCHANGE_NAME, // 交换机名称x-delayed-message, // 消息类型true, // 是否持久化false, // 是否自动删除map);}Beanpublic Queue queue() {/*** 参数1: 队列名* 参数2: durable:是否持久化,默认false* 参数3: exclusive只能被当前创建的连接使用而且当连接关闭后队列即被删除。此参考优先级高于durable 默认false* 参数4: 是否自动删除当没有生产者或者消费者使用此队列该队列会自动删除* 一般设置一下队列的持久化就好,其余两个就是默认false*/return new Queue(RabbitMqInfo.QUEUE_NAME, true, false, false);}Beanpublic Binding binding() {return BindingBuilder.bind(queue()) //队列.to(delayedExchange()) //交换机.with(RabbitMqInfo.ROUTING_KEY) //routing_key.noargs();}
}生成者测试类
import com.example.config.RabbitMqInfo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.time.LocalDateTime;
import java.util.UUID;SpringBootTest
RunWith(SpringRunner.class)
public class ProduceTest {private static final Logger logger org.slf4j.LoggerFactory.getLogger(ProduceTest.class);Autowiredprivate RabbitTemplate rabbitTemplate;Testpublic void sendProduce(){MessagePostProcessor postProcessor new MessagePostProcessor() {Overridepublic Message postProcessMessage(Message message) throws AmqpException {MessageProperties messageProperties message.getMessageProperties();// 设置延迟消费时间messageProperties.setHeader(x-delay, 10000); // 毫秒 10001秒 1000010秒// 设置消息IDmessageProperties.setMessageId(100); //字符串取消延迟队列return message;}};String content UUID.randomUUID().toString();logger.info(生产者发送消息发送时间{} ,发送内容{},LocalDateTime.now(),content);// 交换机routing_key消息内容rabbitTemplate.convertAndSend(RabbitMqInfo.EXCHANGE_NAME, RabbitMqInfo.ROUTING_KEY, content,postProcessor);}/*** 取消延迟队列消息*/Testpublic void clearProduce() {// 交换机routing_key取消延迟队列中 messageId100的rabbitTemplate.convertAndSend(RabbitMqInfo.EXCHANGE_NAME, RabbitMqInfo.ROUTING_KEY, 100);}}消费者
import com.example.config.RabbitMqInfo;
import org.slf4j.Logger;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.time.LocalDateTime;Component
public class RabbitmqConsumer {private static final Logger logger org.slf4j.LoggerFactory.getLogger(RabbitmqConsumer.class);RabbitListener(queues RabbitMqInfo.QUEUE_NAME)public void OnMessage(String message){logger.info(消费着接受消息接受时间{} ,接受内容{},LocalDateTime.now(),message);}
}