培训网站大全,智慧团建登录官网,网站建设培训 ppt,网页平面设计模板目录 一、单个RabbitMQ配置 1.1、导入Maven坐标 1.2、yaml配置 1.3、java配置类 1.3.1、交换机配置 1.3.2、队列配置 1.3.3、绑定配置 1.3.4、连接配置 1.4、生产者与消费者操作配置 1.4.1、生产者操作配置 1.4.2、消费者操作配置
二、多个RabbitMQ配置 2.1、yaml配置 2.2、j…目录 一、单个RabbitMQ配置 1.1、导入Maven坐标 1.2、yaml配置 1.3、java配置类 1.3.1、交换机配置 1.3.2、队列配置 1.3.3、绑定配置 1.3.4、连接配置 1.4、生产者与消费者操作配置 1.4.1、生产者操作配置 1.4.2、消费者操作配置
二、多个RabbitMQ配置 2.1、yaml配置 2.2、java配置类 2.3、生产者与消费者操作配置 2.3.1、生产者操作配置 2.3.1、消费者操作配置
三、总结 需求描述原SpringBoot工程已经配置了一个RabbitMQ现需求是再配置一个RabbitMQ实现效果是不同RabbitMQ推送到不同的队列中且互不干扰影响使用。 一、单个RabbitMQ配置 1.1、导入Maven坐标 dependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactIdversion5.10.0/version/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactIdversion2.4.4/version/dependency 1.2、yaml配置 rabbitmq:host: xx.xxx.xxx.xxxport: xxxxusername: xxxxpassword: xxxxxxvirtual-host: xxxxpublisher-returns: truepublisher-confirms: truelistener:simple:default-requeue-rejected: trueretry:enabled: falsemax-attempts: 3initial-interval: 5000 1.3、java配置类 1.3.1、交换机配置
package com.ruoyi.report.config;import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;Component
public class ExchangeConfig {public static final String ecoa_exchange ecoaExchange;/*** 1.定义direct exchange* 2.durabletrue rabbitmq重启的时候不需要创建新的交换机* 3.direct交换器相对来说比较简单匹配规则为如果路由键匹配消息就被投送到相关的队列*/Beanpublic DirectExchange ecoaExchange() {DirectExchange directExchange new DirectExchange(ecoa_exchange, true, false);return directExchange;}} 1.3.2、队列配置
package com.ruoyi.report.config;import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;/*** ClassName QueueConfig* Description* Author Mr.Huang* Date 2023/9/22 16:26* Version 1.0**/
Component
public class QueueConfig {private static final String ecoa_file_upload_queue ecoa_file_upload_queue;Beanpublic Queue ecoaFileUploadDispatchQueue() {/**durabletrue 持久化 rabbitmq重启的时候不需要创建新的队列auto-delete 表示消息队列没有在使用时将被自动删除 默认是falseexclusive 表示该消息队列是否只在当前connection生效,默认是false*/return new Queue(ecoa_file_upload_queue, true, false, false);}
} 1.3.3、绑定配置
package com.ruoyi.report.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;/*** ClassName BindingConfig* Description* Author Mr.Huang* Date 2023/9/22 16:31* Version 1.0**/
Component
public class BindingConfig {Autowiredprivate QueueConfig queueConfig;Autowiredprivate ExchangeConfig exchangeConfig;public static final String ECOA_file_upload_key ecoa_file_upload_key;Beanpublic Binding ecoaFileUploadDispatchBinding() {return BindingBuilder.bind(queueConfig.ecoaFileUploadDispatchQueue()).to(exchangeConfig.ecoaExchange()).with(ECOA_file_upload_key);}
}1.3.4、连接配置
package com.ruoyi.report.config;import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** ClassName RabbitMqConfig* Description* Author Mr.Huang* Date 2023/9/22 16:14* Version 1.0**/
Configuration
public class RabbitMqConfig {/*** 连接工厂*/Autowiredprivate ConnectionFactory connectionFactory;/*** 自定义rabbit template用于数据的接收和发送* 可以设置消息确认机制和回调** return*/Beanpublic RabbitTemplate rabbitTemplate() {RabbitTemplate template new RabbitTemplate(connectionFactory);return template;}
} 1.4、生产者与消费者操作配置 1.4.1、生产者操作配置
package com.ruoyi.report.utils;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.ruoyi.common.utils.StringUtils;
import com.ruoyi.report.config.BindingConfig;
import com.ruoyi.report.config.ExchangeConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.UUID;/*** ClassName MessageUtils* Description* Author Mr.Huang* Date 2023/9/22 16:36* Version 1.0**/
Component
public class MessageUtils {Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送消息* 发送随货单信息* param message 消息*/public void sendMessage(Object message) {String uuid UUID.randomUUID().toString();CorrelationData correlationId new CorrelationData(uuid);Message msg MessageBuilder.withBody(JSON.toJSONStringWithDateFormat(message, yyyy-MM-dd HH:mm:ss, SerializerFeature.WriteNullStringAsEmpty).getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();rabbitTemplate.convertAndSend(ExchangeConfig.ecoa_exchange, BindingConfig.ECOA_file_upload_key, msg, correlationId);}
} 1.4.2、消费者操作配置
package com.ruoyi.report.consumer;import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.ruoyi.report.config.RabbitMqConfig;
import com.ruoyi.report.entity.open.PrintResult;
import com.ruoyi.report.service.open.PrintSendLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.io.IOException;/*** ClassName PrintFeedbackConsumer* Description* Author Mr.Huang* Date 2024/4/30 10:23* Version 1.0**/
Slf4j
Component
public class PrintFeedbackConsumer {Autowiredprivate PrintSendLogService printSendLogService;RabbitListener(queues {RabbitMqConfig.print_4pl_dispatch_info_feedback_queue}, containerFactory printContainerFactory)public void receiveMq(Message message, Channel channel) {try {String body new String(message.getBody());log.info(接受【Print结果推送】RabbitMQ消息body);JSONObject objJson JSONObject.parseObject(body);Thread.sleep(1000);PrintResult printResult JSONObject.toJavaObject(objJson, PrintResult.class);printSendLogService.updatePrintSendLog(printResult);}catch (Exception e){log.error(,e);try {channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (IOException ex) {ex.printStackTrace();}}}
}
二、多个RabbitMQ配置 Maven坐标与上面单个RabbitMQ配置一致 2.1、yaml配置 rabbitmq:first:host: xx.xxx.xxx.xxxport: xxxxusername: xxxxpassword: xxxxxxvirtual-host: xxxxpublisher-returns: truepublisher-confirms: truelistener:simple:default-requeue-rejected: trueretry:enabled: falsemax-attempts: 3initial-interval: 5000 second:host: xx.xxx.xxx.xxxport: xxxxusername: xxxxpassword: xxxxxxpublisher-returns: truepublisher-confirms: truevirtual-host: xxxxlistener:simple:default-requeue-rejected: trueretry:enabled: falsemax-attempts: 3initial-interval: 5000 2.2、java配置类
package com.ruoyi.report.config;import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;/*** ClassName RabbitMqConfig* Description* Author Mr.Huang* Date 2023/9/22 16:14* Version 1.0**/
Configuration
public class RabbitMqConfig {// 第一个MQ电子药检队列与keypublic static final String ECOA_file_upload_queue ecoa_file_upload_queue;public static final String ECOA_file_upload_key ecoa_file_upload_key;// 第二个MQ单据打印平台队列与keypublic static final String print_tms_dispatch_info_queue print_tms_dispatch_info_queue;public static final String print_4pl_dispatch_info_feedback_queue print_4pl_dispatch_info_feedback_queue;public static final String print_tms_dispatch_info_key print_tms_dispatch_info_key;public static final String print_4pl_dispatch_info_feedback_key print_4pl_dispatch_info_feedback_key;/** 交换机名称 */public static final String EXCHANGE ecoaExchange;public static final String EXCHANGE2 tms_exchange;/** 第一个rabbitMq队列 */Bean(name ECOAConnectionFactory)Primarypublic ConnectionFactory ECOAConnectionFactory(Value(${spring.rabbitmq.first.host}) String host,Value(${spring.rabbitmq.first.port}) int port,Value(${spring.rabbitmq.first.username}) String username,Value(${spring.rabbitmq.first.password}) String password,Value(${spring.rabbitmq.first.virtual-host}) String virtualHost) {CachingConnectionFactory connectionFactory new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualHost);return connectionFactory;}/** 第二个rabbitMq队列 */Bean(name printConnectionFactory)public ConnectionFactory printConnectionFactory(Value(${spring.rabbitmq.second.host}) String host,Value(${spring.rabbitmq.second.port}) int port,Value(${spring.rabbitmq.second.username}) String username,Value(${spring.rabbitmq.second.password}) String password,Value(${spring.rabbitmq.second.virtual-host}) String virtualHost) {CachingConnectionFactory connectionFactory new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualHost);return connectionFactory;}/** 第一个rabbitMq操作模板 */Bean(nameECOARabbitTemplate)Primarypublic RabbitTemplate fplRabbitTemplate(Qualifier(ECOAConnectionFactory) ConnectionFactory connectionFactory){RabbitTemplate firstRabbitTemplate new RabbitTemplate(connectionFactory);return firstRabbitTemplate;}/** 第二个rabbitMq操作模板 */Bean(nameprintRabbitTemplate)public RabbitTemplate tcscRabbitTemplate(Qualifier(printConnectionFactory) ConnectionFactory connectionFactory){RabbitTemplate secondRabbitTemplate new RabbitTemplate(connectionFactory);return secondRabbitTemplate;}/** 第一个rabbitMq连接工厂 */Bean(nameECOAContainerFactory)public SimpleRabbitListenerContainerFactory ECOAContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,Qualifier(ECOAConnectionFactory) ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory new SimpleRabbitListenerContainerFactory();factory.setMaxConcurrentConsumers(5);factory.setConcurrentConsumers(1);factory.setPrefetchCount(1);configurer.configure(factory, connectionFactory);return factory;}/** 第二个rabbitMq连接工厂 */Bean(nameprintContainerFactory)public SimpleRabbitListenerContainerFactory printContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,Qualifier(printConnectionFactory) ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory new SimpleRabbitListenerContainerFactory();factory.setMaxConcurrentConsumers(5);factory.setConcurrentConsumers(1);factory.setPrefetchCount(1);configurer.configure(factory, connectionFactory);return factory;}/** 第一个mq绑定队列绑定交换机 */Beanpublic String runECOAQueue(Qualifier(ECOAConnectionFactory) ConnectionFactory connectionFactory) {System.out.println(configuration ECOAQueue ........................);Connection connection connectionFactory.createConnection();Channel channel connection.createChannel(false);try {channel.exchangeDeclare(EXCHANGE, direct, true, false, null);// 单据推送电子药检队列channel.queueDeclare(ECOA_file_upload_queue, true, false, false, null);channel.queueBind(ECOA_file_upload_queue, EXCHANGE, ECOA_file_upload_key);} catch (Exception e) {e.printStackTrace();} finally {return ECOAQueue;}}/** 第二个mq绑定队列绑定交换机 */Beanpublic String runPrintQueue(Qualifier(printConnectionFactory) ConnectionFactory connectionFactory) {System.out.println(configuration printQueue ........................);Connection connection connectionFactory.createConnection();Channel channel connection.createChannel(false);try {channel.exchangeDeclare(EXCHANGE2, direct, true, false, null);// 单据推送单据打印平台队列channel.queueDeclare(print_tms_dispatch_info_queue, true, false, false, null);channel.queueBind(print_tms_dispatch_info_queue, EXCHANGE2, print_tms_dispatch_info_key);// 单据打印平台反馈队列channel.queueDeclare(print_4pl_dispatch_info_feedback_queue,true,false,false,null);channel.queueBind(print_4pl_dispatch_info_feedback_queue,EXCHANGE2,print_4pl_dispatch_info_feedback_key);} catch (Exception e) {e.printStackTrace();} finally {return printQueue;}}
} 注意需将原MQ交换机、队列、绑定配置类注释掉只留这一个配置文件即可这个配置文件已经将对应的交换机、队列绑定好只是需要注意队列名字、交换机不要绑定错了 2.3、生产者与消费者操作配置 2.3.1、生产者操作配置
package com.ruoyi.report.utils;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.ruoyi.report.config.RabbitMqConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.UUID;/*** ClassName MessageUtils* Description* Author Mr.Huang* Date 2023/9/22 16:36* Version 1.0**/
Component
public class MessageUtils {Resource(name ECOARabbitTemplate)private RabbitTemplate ECOARabbitTemplate;Resource(name printRabbitTemplate)private RabbitTemplate printRabbitTemplate;/*** 向ECOA发送消息* 发送随货单信息* param message 消息*/public void sendMessage(Object message) {String uuid UUID.randomUUID().toString();CorrelationData correlationId new CorrelationData(uuid);Message msg MessageBuilder.withBody(JSON.toJSONStringWithDateFormat(message, yyyy-MM-dd HH:mm:ss, SerializerFeature.WriteNullStringAsEmpty).getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();ECOARabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE, RabbitMqConfig.ECOA_file_upload_key, msg, correlationId);}/*** 向print发送消息* 发送派车单信息* param message 消息*/public void sendPrintMessage(Object message) {String uuid UUID.randomUUID().toString();CorrelationData correlationId new CorrelationData(uuid);Message msg MessageBuilder.withBody(JSON.toJSONStringWithDateFormat(message, yyyy-MM-dd HH:mm:ss, SerializerFeature.WriteNullStringAsEmpty).getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();printRabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE2, RabbitMqConfig.print_tms_dispatch_info_key, msg, correlationId);}
}2.3.1、消费者操作配置
package com.ruoyi.report.consumer;import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.ruoyi.report.config.RabbitMqConfig;
import com.ruoyi.report.entity.open.PrintResult;
import com.ruoyi.report.service.open.PrintSendLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.io.IOException;/*** ClassName PrintFeedbackConsumer* Description* Author Mr.Huang* Date 2024/4/30 10:23* Version 1.0**/
Slf4j
Component
public class PrintFeedbackConsumer {Autowiredprivate PrintSendLogService printSendLogService;RabbitListener(queues {RabbitMqConfig.print_4pl_dispatch_info_feedback_queue}, containerFactory printContainerFactory)public void receiveMq(Message message, Channel channel) {try {String body new String(message.getBody());log.info(接受【Print结果推送】RabbitMQ消息body);JSONObject objJson JSONObject.parseObject(body);Thread.sleep(1000);PrintResult printResult JSONObject.toJavaObject(objJson, PrintResult.class);printSendLogService.updatePrintSendLog(printResult);}catch (Exception e){log.error(,e);try {channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (IOException ex) {ex.printStackTrace();}}}
} 与单个RabbitMQ消费者操作一致只是注意要消费的队列和连接工厂不要搞错了 三、总结 配置单个RabbitMQ时不需要关心底层的连接工厂是如何配置的当把yaml内容填好它会自动配置连接工厂只需要把交换机、队列、配置绑定起来即可。 当需要配置多个mq时才需要自己手动配置连接工厂并不是只能配置两个RabbitMQ可以按这个格式配置更多个。唯一注意的是不要把这些队列和交换机搞混了即可。