松桃县住房和城乡建设局网站,wordpress做什么,长沙县好的建站按效果付费,移动网站与pc网站这里写目录标题 一、序言二、配置文件application.yml三、RabbitMQ交换机和队列配置1、定义4个队列2、定义Fanout交换机和队列绑定关系2、定义Direct交换机和队列绑定关系3、定义Topic交换机和队列绑定关系4、定义Header交换机和队列绑定关系 四、RabbitMQ消费者配置五、Rabbit… 这里写目录标题 一、序言二、配置文件application.yml三、RabbitMQ交换机和队列配置1、定义4个队列2、定义Fanout交换机和队列绑定关系2、定义Direct交换机和队列绑定关系3、定义Topic交换机和队列绑定关系4、定义Header交换机和队列绑定关系 四、RabbitMQ消费者配置五、RabbitMQ生产者六、测试用例1、发送到FanoutExchage2、发送到DirectExchage3、发送到TopicExchange4、发动到HeadersExchage 七、结语 一、序言
在上一节 RabbitMQ中的核心概念和交换机类型 中我们介绍了RabbitMQ中的一些核心概念尤其是各种交换机的类型接下来我们将具体讲解各种交换机的配置和消息订阅实操。 二、配置文件application.yml
我们先上应用启动配置文件application.yml如下
server:port: 8080
spring:rabbitmq:addresses: localhost:5672username: adminpassword: adminvirtual-host: /listener:type: simplesimple:acknowledge-mode: autoconcurrency: 5max-concurrency: 20prefetch: 5备注这里我们指定了RabbitListenerContainerFactory的类型为SimpleRabbitListenerContainerFactory并且指定消息确认模式为自动确认。 三、RabbitMQ交换机和队列配置
Spring官方提供了一套 流式API 来定义队列、交换机和绑定关系非常的方便接下来我们定义4种类型的交换机和相应队列的绑定关系。
1、定义4个队列
/*** 定义4个队列*/
Configuration
protected static class QueueConfig {Beanpublic Queue queue1() {return QueueBuilder.durable(queue-1).build();}Beanpublic Queue queue2() {return QueueBuilder.durable(queue-2).build();}Beanpublic Queue queue3() {return QueueBuilder.durable(queue-3).build();}Beanpublic Queue queue4() {return QueueBuilder.durable(queue-4).build();}
}2、定义Fanout交换机和队列绑定关系
/*** 定义Fanout交换机和对应的绑定关系*/
Configuration
protected static class FanoutExchangeBindingConfig {Beanpublic FanoutExchange fanoutExchange() {return ExchangeBuilder.fanoutExchange(fanout-exchange).build();}/*** 定义多个Fanout交换机和队列的绑定关系* param fanoutExchange* param queue1* param queue2* param queue3* param queue4* return*/Beanpublic Declarables bindQueueToFanoutExchange(FanoutExchange fanoutExchange, Queue queue1, Queue queue2, Queue queue3, Queue queue4) {Binding queue1Binding BindingBuilder.bind(queue1).to(fanoutExchange);Binding queue2Binding BindingBuilder.bind(queue2).to(fanoutExchange);Binding queue3Binding BindingBuilder.bind(queue3).to(fanoutExchange);Binding queue4Binding BindingBuilder.bind(queue4).to(fanoutExchange);return new Declarables(queue1Binding, queue2Binding, queue3Binding, queue4Binding);}}备注这里我们将4个队列绑定到了名为fanout-exchange的交换机上。 2、定义Direct交换机和队列绑定关系
Configuration
protected static class DirectExchangeBindingConfig {Beanpublic DirectExchange directExchange() {return ExchangeBuilder.directExchange(direct-exchange).build();}Beanpublic Binding bindingQueue3ToDirectExchange(DirectExchange directExchange, Queue queue3) {return BindingBuilder.bind(queue3).to(directExchange).with(queue3-route-key);}
}备注这里我们定义了名为direct-exchange的交换机并通过路由keyqueue3-route-key将queue-3绑定到了该交换机上。 3、定义Topic交换机和队列绑定关系
Configuration
protected static class TopicExchangeBindingConfig {Beanpublic TopicExchange topicExchange() {return ExchangeBuilder.topicExchange(topic-exchange).build();}Beanpublic Declarables bindQueueToTopicExchange(TopicExchange topicExchange, Queue queue1, Queue queue2) {Binding queue1Binding BindingBuilder.bind(queue1).to(topicExchange).with(com.order.*);Binding queue2Binding BindingBuilder.bind(queue2).to(topicExchange).with(com.#);return new Declarables(queue1Binding, queue2Binding);}
}这里我们定义了名为topic-exchange类型的交换机该类型交换机支持路由key通配符匹配*代表一个任意字符#代表一个或多个任意字符。 备注 通过路由keycom.order.*将queue-1绑定到了该交换机上。通过路由key com.#将queue-2也绑定到了该交换机上。 4、定义Header交换机和队列绑定关系
Configuration
protected static class HeaderExchangeBinding {Beanpublic HeadersExchange headersExchange() {return ExchangeBuilder.headersExchange(headers-exchange).build();}Beanpublic Binding bindQueueToHeadersExchange(HeadersExchange headersExchange, Queue queue4) {return BindingBuilder.bind(queue4).to(headersExchange).where(function).matches(logging);}
}备注这里我们定义了名为headers-exchange类型的交换机并通过参数functionlogging将queue-4绑定到了该交换机上。 四、RabbitMQ消费者配置
Spring RabbitMQ中支持注解式监听端点配置用于异步接收消息如下
Slf4j
Component
public class RabbitMqConsumer {RabbitListener(queues queue-1)public void handleMsgFromQueue1(String msg) {log.info(Message received from queue-1, message body: {}, msg);}RabbitListener(queues queue-2)public void handleMsgFromQueue2(String msg) {log.info(Message received from queue-2, message body: {}, msg);}RabbitListener(queues queue-3)public void handleMsgFromQueue3(String msg) {log.info(Message received from queue-3, message body: {}, msg);}RabbitListener(queues queue-4)public void handleMsgFromQueue4(String msg) {log.info(Message received from queue-4, message body: {}, msg);}
}备注这里我们分别定义了4个消费者分别用来接受4个队列的消息。 五、RabbitMQ生产者
Slf4j
Component
RequiredArgsConstructor
public class RabbitMqProducer {private final RabbitTemplate rabbitTemplate;public void sendMsgToFanoutExchange(String body) {log.info(开始发送消息到fanout-exchange, 消息体:{}, body);Message message MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();rabbitTemplate.send(fanout-exchange, StringUtils.EMPTY, message);}public void sendMsgToDirectExchange(String body) {log.info(开始发送消息到direct-exchange, 消息体:{}, body);Message message MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();rabbitTemplate.send(direct-exchange, queue3-route-key, message);}public void sendMsgToTopicExchange(String routingKey, String body) {log.info(开始发送消息到topic-exchange, 消息体:{}, body);Message message MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();rabbitTemplate.send(topic-exchange, routingKey, message);}public void sendMsgToHeadersExchange(String body) {log.info(开始发送消息到headers-exchange, 消息体:{}, body);MessageProperties messageProperties MessagePropertiesBuilder.newInstance().setHeader(function, logging).build();Message message MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();rabbitTemplate.send(headers-exchange, StringUtils.EMPTY, message);}} 六、测试用例
这里写了个简单的Controller用来测试如下
RestController
RequiredArgsConstructor
public class RabbitMsgController {private final RabbitMqProducer rabbitMqProducer;RequestMapping(/exchange/fanout)public ResponseEntityString sendMsgToFanoutExchange(String body) {rabbitMqProducer.sendMsgToFanoutExchange(body);return ResponseEntity.ok(广播消息发送成功);}RequestMapping(/exchange/direct)public ResponseEntityString sendMsgToDirectExchange(String body) {rabbitMqProducer.sendMsgToDirectExchange(body);return ResponseEntity.ok(消息发送到Direct交换成功);}RequestMapping(/exchange/topic)public ResponseEntityString sendMsgToTopicExchange(String routingKey, String body) {rabbitMqProducer.sendMsgToTopicExchange(routingKey, body);return ResponseEntity.ok(消息发送到Topic交换机成功);}RequestMapping(/exchange/headers)public ResponseEntityString sendMsgToHeadersExchange(String body) {rabbitMqProducer.sendMsgToHeadersExchange(body);return ResponseEntity.ok(消息发送到Headers交换机成功);}}1、发送到FanoutExchage
直接访问http://localhost:8080/exchange/fanout?bodyhello可以看到该消息广播到了4个队列上。
2023-11-07 17:41:12.959 INFO 39460 --- [nio-8080-exec-9] c.u.r.i.producer.RabbitMqProducer : 开始发送消息到fanout-exchange, 消息体:hello
2023-11-07 17:41:12.972 INFO 39460 --- [ntContainer#1-5] c.u.r.i.consumer.RabbitMqConsumer : Message received from queue-1, message body: hello
2023-11-07 17:41:12.972 INFO 39460 --- [ntContainer#0-4] c.u.r.i.consumer.RabbitMqConsumer : Message received from queue-4, message body: hello
2023-11-07 17:41:12.972 INFO 39460 --- [ntContainer#3-3] c.u.r.i.consumer.RabbitMqConsumer : Message received from queue-3, message body: hello
2023-11-07 17:41:12.972 INFO 39460 --- [ntContainer#2-4] c.u.r.i.consumer.RabbitMqConsumer : Message received from queue-2, message body: hello2、发送到DirectExchage
访问http://localhost:8080/exchange/direct?bodyhello可以看到消息通过路由keyqueue3-route-key发送到了queue-3上。
2023-11-07 17:43:26.804 INFO 39460 --- [nio-8080-exec-1] c.u.r.i.producer.RabbitMqProducer : 开始发送消息到direct-exchange, 消息体:hello
2023-11-07 17:43:26.822 INFO 39460 --- [ntContainer#3-5] c.u.r.i.consumer.RabbitMqConsumer : Message received from queue-3, message body: hello3、发送到TopicExchange
访问http://localhost:8080/exchange/topic?bodyhelloroutingKeycom.order.create路由key为 com.order.create的消息分别发送到了queue-1和queue-2上。
2023-11-07 17:44:45.301 INFO 39460 --- [nio-8080-exec-4] c.u.r.i.producer.RabbitMqProducer : 开始发送消息到topic-exchange, 消息体:hello
2023-11-07 17:44:45.312 INFO 39460 --- [ntContainer#1-3] c.u.r.i.consumer.RabbitMqConsumer : Message received from queue-1, message body: hello
2023-11-07 17:44:45.312 INFO 39460 --- [ntContainer#2-3] c.u.r.i.consumer.RabbitMqConsumer : Message received from queue-2, message body: hello4、发动到HeadersExchage
访问http://localhost:8080/exchange/headers?bodyhello消息通过头部信息functionlogging发送到了headers-exchange上。
2023-11-07 17:47:21.736 INFO 39460 --- [nio-8080-exec-9] c.u.r.i.producer.RabbitMqProducer : 开始发送消息到headers-exchange, 消息体:hello
2023-11-07 17:47:21.749 INFO 39460 --- [ntContainer#0-3] c.u.r.i.consumer.RabbitMqConsumer : Message received from queue-4, message body: hello七、结语
下一节我们将会介绍通过两种方式借由RabbitMQ实现延迟消息发送和订阅敬请期待。