温州网站建设有限公司,网站做淘宝客排名会掉吗,免费注册域名网,个人网站鉴赏目录
引入依赖
配置文件
不同模式下使用springboot收发消息
直连模式 生产者 消费者
Fanout模式
生产者
消费者
Topic主题模式
生产者
消费者
Headers模式
生产者 消费者
补充Quorum队列
生产者
消费者 引入依赖
dependencygroupIdorg.springf…目录
引入依赖
配置文件
不同模式下使用springboot收发消息
直连模式 生产者 消费者
Fanout模式
生产者
消费者
Topic主题模式
生产者
消费者
Headers模式
生产者 消费者
补充Quorum队列
生产者
消费者 引入依赖
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency 注意下版本。不同版本下的配置方式会有变化。 配置文件 所有的基础运行环境都在application.properties中进行配置。所有配置以spring.rabbitmq开头。通常按照示例进行一些基础的必要配置就可以跑了。
server.port8080
spring.rabbitmq.host127.0.0.1
spring.rabbitmq.port5672
spring.rabbitmq.usernameguest
spring.rabbitmq.passwordguest
spring.rabbitmq.virtual-host/mirror# 单词推送消息数量
spring.rabbitmq.listener.simple.prefetch1
# 消费者的消费线程数量
spring.rabbitmq.listener.simple.concurrency5
# 消费者的最大线程数量
spring.rabbitmq.listener.simple.max-concurrency10
# 手动确认消息
spring.rabbitmq.listener.simple.acknowledge-modenone 不同模式下使用springboot收发消息
工具类
public class MyConstants {public static final String QUEUE_QUORUM quorumQueue;public static final String QUEUE_STREAM streamQueue;//direct模式直接发送到队列public static final String QUEUE_DIRECT directqueue;//fanout模式public static final String EXCHANGE_FANOUT fanoutExchange;public static final String QUEUE_FANOUT_Q1 fanout.q1;public static final String QUEUE_FANOUT_Q2 fanout.q2;public static final String QUEUE_FANOUT_Q3 fanout.q3;public static final String QUEUE_FANOUT_Q4 fanout.q4;//topic模式public static final String EXCHANGE_TOPIC topicExchange;public static final String QUEUE_TOPIC1 hunan.eco;public static final String QUEUE_TOPIC2 hunan.IT;public static final String QUEUE_TOPIC3 hebei.eco;public static final String QUEUE_TOPIC4 hebei.IT;//header模式public static final String EXCHANGE_HEADER headerExchange;public static final String QUEUE_TXTYP1 txTyp1;public static final String QUEUE_BUSTYP1 busTyp1;public static final String QUEUE_TXBUSTYP1 txbusTyp1;}
直连模式
/*** 直连模式只需要声明队列所有消息都通过队列转发。*/
Configuration
public class DirectConfig {Beanpublic Queue directQueue() {return new Queue(MyConstants.QUEUE_DIRECT);}
} 生产者 ApiOperation(valuedirect发送接口,notes直接发送到队列。task模式)GetMapping(value/directSend)public Object directSend(String message) throws AmqpException, UnsupportedEncodingException {//设置部分请求参数MessageProperties messageProperties new MessageProperties();messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);messageProperties.setPriority(2);//设置消息转换器如jsonrabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());//将对象转换成json再发送。
// rabbitTemplate.convertandsend(,Object);//发消息rabbitTemplate.send(directqueue,new Message(message.getBytes(UTF-8),messageProperties));return message sended : message;} 消费者 //直连模式的多个消费者会分到其中一个消费者进行消费。类似task模式//通过注入RabbitContainerFactory对象来设置一些属性相当于task里的channel.basicQosRabbitListener(queuesMyConstants.QUEUE_DIRECT,containerFactoryqos_4)public void directReceive22(Message message, Channel channel, String messageStr) {System.out.println(consumer1 received message : messageStr);}RabbitListener(queuesMyConstants.QUEUE_DIRECT)public void directReceive2(String message) {System.out.println(consumer2 received message : message);}
Configuration
public class RabbitmqConfig {Bean(nameqos_4)public SimpleRabbitListenerContainerFactory getSimpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory new SimpleRabbitListenerContainerFactory();factory.setMaxConcurrentConsumers(4);factory.setConnectionFactory(connectionFactory);factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//手动确认return factory;}
}
Fanout模式
/*** Fanout模式需要声明exchange并绑定queue由exchange负责转发到queue上。*/
Configuration
public class FanoutConfig {//声明队列Beanpublic Queue fanoutQ1() {return new Queue(MyConstants.QUEUE_FANOUT_Q1);}Beanpublic Queue fanoutQ2() {return new Queue(MyConstants.QUEUE_FANOUT_Q2);}Beanpublic Queue fanoutQ3() {return new Queue(MyConstants.QUEUE_FANOUT_Q3);}Beanpublic Queue fanoutQ4() {return new Queue(MyConstants.QUEUE_FANOUT_Q4);}//声明exchangeBeanpublic FanoutExchange setFanoutExchange() {return new FanoutExchange(MyConstants.EXCHANGE_FANOUT);}//声明Binding,exchange与queue的绑定关系Beanpublic Binding bindQ1() {return BindingBuilder.bind(fanoutQ1()).to(setFanoutExchange());}Beanpublic Binding bindQ2() {return BindingBuilder.bind(fanoutQ2()).to(setFanoutExchange());}Beanpublic Binding bindQ3() {return BindingBuilder.bind(fanoutQ3()).to(setFanoutExchange());}Beanpublic Binding bindQ4() {return BindingBuilder.bind(fanoutQ4()).to(setFanoutExchange());}}
生产者 ApiOperation(valuefanout发送接口,notes发送到fanoutExchange。消息将往该exchange下的所有queue转发)GetMapping(value/fanoutSend)public Object fanoutSend(String message) throws AmqpException, UnsupportedEncodingException {MessageProperties messageProperties new MessageProperties();messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);//fanout模式只往exchange里发送消息。分发到exchange下的所有queuerabbitTemplate.send(MyConstants.EXCHANGE_FANOUT, , new Message(message.getBytes(UTF-8),messageProperties));Message message2 MessageBuilder.withBody(message.getBytes()).setMessageId(UUID.randomUUID().toString()).build();rabbitTemplate.send(message2);return message sended : message;}
消费者 //fanout模式接收还是只指定队列RabbitListener(queuesMyConstants.QUEUE_FANOUT_Q1)public void fanoutReceiveq1(String message) {System.out.println(fanoutReceive q1 received message : message);}RabbitListener(queuesMyConstants.QUEUE_FANOUT_Q2)public void fanoutReceiveq2(String message) {System.out.println(fanoutReceive q2 received message : message);}RabbitListener(queuesMyConstants.QUEUE_FANOUT_Q3)public void fanoutReceiveq3(String message) {System.out.println(fanoutReceive q3 received message : message);}RabbitListener(queuesMyConstants.QUEUE_FANOUT_Q4)public void fanoutReceiveq4(String message) {System.out.println(fanoutReceive q4 received message : message);}
Topic主题模式
Configuration
public class TopicConfig {//声明队列Beanpublic Queue topicQ1() {return new Queue(MyConstants.QUEUE_TOPIC1);}Beanpublic Queue topicQ2() {return new Queue(MyConstants.QUEUE_TOPIC2);}Beanpublic Queue topicQ3() {return new Queue(MyConstants.QUEUE_TOPIC3);}Beanpublic Queue topicQ4() {return new Queue(MyConstants.QUEUE_TOPIC4);}//声明exchangeBeanpublic TopicExchange setTopicExchange() {return new TopicExchange(MyConstants.EXCHANGE_TOPIC);}//声明binding需要声明一个roytingKeyBeanpublic Binding bindTopicHebei1() {return BindingBuilder.bind(topicQ1()).to(setTopicExchange()).with(hunan.*);}Beanpublic Binding bindTopicHebei2() {return BindingBuilder.bind(topicQ2()).to(setTopicExchange()).with(*.IT);}Beanpublic Binding bindTopicHebei3() {return BindingBuilder.bind(topicQ3()).to(setTopicExchange()).with(*.eco);}Beanpublic Binding bindTopicHebei4() {return BindingBuilder.bind(topicQ4()).to(setTopicExchange()).with(hebei.*);}}
生产者
ApiOperation(valuetopic发送接口,notes发送到topicExchange。exchange转发消息时会往routingKey匹配的queue发送*代表一个单词#代表0个或多个单词。)ApiImplicitParam(nameroutingKey,value路由关键字)GetMapping(value/topicSendHunanIT)public Object topicSend(String routingKey,String message) throws AmqpException, UnsupportedEncodingException {if(null routingKey) {routingKeyhebei.IT;}MessageProperties messageProperties new MessageProperties();messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);//fanout模式只往exchange里发送消息。分发到exchange下的所有queuerabbitTemplate.send(topicExchange, routingKey, new Message(message.getBytes(UTF-8),messageProperties));return message sended : routingKey routingKey;message message;}
消费者 //topic Receiver//注意这个模式会有优先匹配原则。例如发送routingKeyhunan.IT,那匹配到hunan.*(hunan.IT,hunan.eco),之后就不会再去匹配*.IT(hebei.IT)RabbitListener(queuesMyConstants.QUEUE_TOPIC1)public void topicReceiveq1(String message) {System.out.println(topic hunan.eco received message : message);}RabbitListener(queuesMyConstants.QUEUE_TOPIC2)public void topicReceiveq2(String message) {System.out.println(topic hunan.IT received message : message);}RabbitListener(queuesMyConstants.QUEUE_TOPIC3)public void topicReceiveq3(String message) {System.out.println(topic hebei.eco received message : message);}RabbitListener(queuesMyConstants.QUEUE_TOPIC4)public void topicReceiveq4(String message) {System.out.println(topic hebei.IT received message : message);}
Headers模式
Configuration
public class HeaderConfig {//声明queueBeanpublic Queue headQueueTxTyp1() {return new Queue(MyConstants.QUEUE_TXTYP1);}Beanpublic Queue headQueueBusTyp1() {return new Queue(MyConstants.QUEUE_BUSTYP1);}Beanpublic Queue headQueueTxBusTyp() {return new Queue(MyConstants.QUEUE_TXBUSTYP1);}//声明exchangeBeanpublic HeadersExchange setHeaderExchange() {return new HeadersExchange(MyConstants.EXCHANGE_HEADER);}//声明Binding//绑定header中txtyp1的队列。header的队列匹配可以用mathces和exisitsBeanpublic Binding bindHeaderTxTyp1() {return BindingBuilder.bind(headQueueTxTyp1()).to(setHeaderExchange()).where(txTyp).matches(1);}//绑定Header中busTyp1的队列。Bean public Binding bindHeaderBusTyp1() {return BindingBuilder.bind(headQueueBusTyp1()).to(setHeaderExchange()).where(busTyp).matches(1);}//绑定Header中txtyp1或者busTyp1的队列。Bean public Binding bindHeaderTxBusTyp1() {MapString,Object condMap new HashMap();condMap.put(txTyp, 1);condMap.put(busTyp, 1);
// return BindingBuilder.bind(headQueueTxBusTyp()).to(setHeaderExchange()).whereAny(new String[] {txTyp,busTyp}).exist();return BindingBuilder.bind(headQueueTxBusTyp()).to(setHeaderExchange()).whereAny(condMap).match();}
}
生产者
ApiOperation(valueheader发送接口,notes发送到headerExchange。exchange转发消息时不再管routingKey而是根据header条件进行转发。)GetMapping(value/headerSend)public Object headerSend(String txTyp,String busTyp,String message) throws AmqpException, UnsupportedEncodingException {if(null txTyp) {txTyp0;}if(null busTyp) {busTyp0;}MessageProperties messageProperties new MessageProperties();messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);messageProperties.setHeader(txTyp, txTyp);messageProperties.setHeader(busTyp, busTyp);//fanout模式只往exchange里发送消息。分发到exchange下的所有queuerabbitTemplate.send(headerExchange, uselessRoutingKey, new Message(message.getBytes(UTF-8),messageProperties));return message sended : txTyp txTyp;busTyp busTyp;} 消费者 //header receiver//这个模式不再根据routingKey转发而是根据header中的匹配条件进行转发RabbitListener(queuesMyConstants.QUEUE_TXTYP1)public void headerReceiveq1(String message) {System.out.println(header txTyp1 received message : message);}RabbitListener(queuesMyConstants.QUEUE_BUSTYP1)public void headerReceiveq2(String message) {System.out.println(header busTyp1 received message : message);}RabbitListener(queuesMyConstants.QUEUE_TXBUSTYP1)public void headerReceiveq3(String message) {System.out.println(header txbusTyp1 received message : message);}
补充Quorum队列
/*** desc 声明一个Quorum队列*/
Configuration
public class QuorumConfig {Beanpublic Queue quorumQueue() {MapString,Object params new HashMap();params.put(x-queue-type,quorum);return new Queue(MyConstants.QUEUE_QUORUM,true,false,false,params);}
}
生产者 ApiOperation(valuequorum队列发送接口,notes直接发送到队列。Quorum队列)GetMapping(value/directQuorum)public Object directQuorum(String message) throws AmqpException, UnsupportedEncodingException {//设置部分请求参数MessageProperties messageProperties new MessageProperties();messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);messageProperties.setPriority(2);//设置消息转换器如jsonrabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());//将对象转换成json再发送。
// rabbitTemplate.convertandsend(,Object);//发消息rabbitTemplate.send(MyConstants.QUEUE_QUORUM,new Message(message.getBytes(UTF-8),messageProperties));return message sended : message;}
消费者 RabbitListener(queues MyConstants.QUEUE_QUORUM)public void quorumReceiver(String message){System.out.println(quorumReceiver received message : message);}