广州市建设厅网站,京东慧采入驻条件及费用2022,滨州建设工程备案网站,各地人社app大全官网目录
一、介绍
1. 概述
2. 作用及优势
3. 工作原理
二、交换机Exchange
1. Direct
2. Topic
3. Fanout
三、代码案例
消费者代码
1. 直连direct
生产者代码
测试
2. 主题topic
生产者代码
测试
3. 扇形fanout
生产者代码
测试
每篇一获 一、介绍
1. …目录
一、介绍
1. 概述
2. 作用及优势
3. 工作原理
二、交换机Exchange
1. Direct
2. Topic
3. Fanout
三、代码案例
消费者代码
1. 直连direct
生产者代码
测试
2. 主题topic
生产者代码
测试
3. 扇形fanout
生产者代码
测试
每篇一获 一、介绍
1. 概述
RabbitMQ中的交换机exchange是消息的分发中心它接收来自生产者的消息并将这些消息路由到一个或多个队列中。交换机根据消息的路由键routing key将消息发送到相应的队列中。
四型交换机 直连交换机direct exchange直连交换机根据消息的路由键将消息发送到与之匹配的队列中。如果消息的路由键与队列的绑定键binding key完全匹配那么消息将被发送到该队列中。 主题交换机topic exchange主题交换机根据消息的路由键和队列的绑定键的模式进行匹配。可以使用通配符*和#来匹配多个路由键从而实现更灵活的消息路由。 扇形交换机fanout exchange扇出交换机将消息发送到所有与之绑定的队列中无论消息的路由键是什么。它实现了一对多的消息分发。 头部交换机headers exchange头部交换机根据消息的头部信息进行匹配而不是路由键。可以根据消息的头部属性来决定消息的路由。 交换机和队列之间通过绑定binding进行关联生产者将消息发送到交换机交换机根据路由键将消息发送到相应的队列中。交换机和队列的绑定关系可以通过管理界面或者命令行工具进行配置。 总的来说交换机在RabbitMQ中扮演着非常重要的角色它负责将消息路由到相应的队列中实现了灵活的消息分发机制。不同类型的交换机可以满足不同的业务需求开发者可以根据实际情况选择合适的交换机类型来实现消息的路由和分发。
2. 作用及优势
2.1 作用 交换机在项目中的主要作用包括 1. 消息路由交换机负责将消息路由到一个或多个队列中根据消息的路由键和交换机的类型进行匹配和分发确保消息能够准确地到达目标队列。 2. 消息分发交换机可以根据不同的规则将消息分发到不同的队列中实现灵活的消息分发机制。这对于实现消息的多播、广播等场景非常有用。 3. 解耦通过交换机生产者和消费者之间可以完全解耦生产者只需要将消息发送到交换机中而不需要关心消息具体发送到哪个队列中消费者也只需要从队列中接收消息而不需要关心消息的来源。 4. 消息过滤通过不同类型的交换机和绑定规则可以实现消息的过滤和选择性接收确保消费者只接收到其关心的消息。 5. 实现消息通道交换机是消息在RabbitMQ中的通道通过交换机可以将消息从生产者传递给消费者实现了消息的传递和通信。 总的来说交换机在项目中起到了消息路由、分发、解耦和过滤等重要作用是实现消息传递和通信的关键组件。通过合理使用不同类型的交换机可以实现灵活、高效的消息传递机制满足不同业务场景的需求。 2.2 优势 交换机在消息传递系统中具有以下优势 1. 灵活的消息路由交换机可以根据消息的路由键将消息发送到不同的队列中实现了灵活的消息路由机制。这样可以根据消息的不同属性将消息发送到不同的消费者或处理逻辑中提高了系统的灵活性和可扩展性。 2. 解耦和分布式系统支持通过交换机生产者和消费者之间可以完全解耦生产者只需要将消息发送到交换机中而不需要关心消息具体发送到哪个队列中消费者也只需要从队列中接收消息而不需要关心消息的来源。这对于构建分布式系统和微服务架构非常有用。 3. 多播和广播支持通过扇出交换机fanout exchange交换机可以将消息发送到所有与之绑定的队列中实现了一对多的消息分发支持了多播和广播的消息传递方式。 4. 消息过滤和选择性接收通过不同类型的交换机和绑定规则可以实现消息的过滤和选择性接收确保消费者只接收到其关心的消息提高了系统的效率和性能。 5. 实现消息通道交换机是消息在消息队列系统中的通道通过交换机可以将消息从生产者传递给消费者实现了消息的传递和通信为系统中的消息传递提供了可靠的通道。 总的来说交换机在消息传递系统中具有灵活的消息路由、解耦和分布式系统支持、多播和广播支持、消息过滤和选择性接收等优势为构建高效、灵活的消息传递系统提供了重要的支持。 3. 工作原理
RabbitMQ的交换机Exchange是消息路由的核心组件负责消息的分发和路由。下面是RabbitMQ交换机的工作原理 1. 发布消息生产者将消息发送到RabbitMQ的交换机中同时指定一个路由键Routing Key。 2. 交换机根据类型进行路由RabbitMQ的交换机有四种类型分别是直连交换机direct exchange、扇出交换机fanout exchange、主题交换机topic exchange和头部交换机headers exchange。不同类型的交换机根据不同的路由规则进行消息的路由和分发。 3. 路由规则直连交换机根据消息的路由键将消息发送到与之绑定的队列中扇出交换机将消息发送到所有与之绑定的队列中主题交换机根据消息的路由键和队列的绑定规则进行匹配将消息发送到匹配的队列中头部交换机根据消息的头部属性进行匹配将消息发送到匹配的队列中。 4. 绑定队列交换机需要和队列进行绑定指定绑定的路由键或者其他条件确保消息能够被正确地路由到目标队列中。 5. 发送到队列一旦消息被交换机路由到目标队列消费者就可以从队列中接收并处理消息。 总的来说RabbitMQ的交换机根据不同的类型和路由规则将消息发送到目标队列中实现了消息的路由和分发。通过合理使用不同类型的交换机和绑定规则可以实现灵活、高效的消息传递机制满足不同业务场景的需求。 二、交换机Exchange
1. Direct 直连交换机Direct Exchange是RabbitMQ中最简单的交换机类型之一它使用消息的路由键Routing Key来决定将消息发送到哪个队列。 案例及实现流程 假设有一个简单的电子商务系统其中包括订单服务和库存服务。订单服务负责接收和处理用户的订单库存服务负责管理商品的库存信息。这两个服务之间需要通过消息队列进行通信。 首先在RabbitMQ中创建一个名为order-exchange的直连交换机类型为direct。 订单服务和库存服务分别创建自己的队列并将它们分别绑定到order-exchange交换机上。订单服务将队列绑定键设置为order.create库存服务将队列绑定键设置为stock.update。 当用户下单时订单服务将订单信息发送到order-exchange交换机并指定路由键为order.create。 直连交换机根据消息的路由键进行匹配发现匹配了order.create的绑定键将消息发送到订单服务的队列中。同时库存服务的队列不会收到该消息。 订单服务从队列中接收订单信息并处理订单库存服务则不会收到该消息因为它的队列没有匹配的路由键。 通过上述实现订单服务和库存服务之间实现了解耦合订单服务只需要将订单相关的消息发送到order-exchange交换机而不需要直接关心库存服务的处理逻辑。同时库存服务也只需要关注与自己相关的消息通过设置不同的绑定键实现了消息的精确路由和分发。 2. Topic 主题交换机Topic Exchange是RabbitMQ中一种灵活且强大的交换机类型它使用消息的路由键和通配符模式来决定将消息发送到哪个队列。 案例及实现流程 假设我们有一个电子商务平台需要实现一个基于主题交换机的消息路由系统。具体的场景是用户下单后需要根据订单的类型和地区信息将订单信息发送给不同的处理服务进行处理。 在RabbitMQ中创建一个名为order-topic-exchange的主题交换机类型为topic。 我们有三个处理服务分别是订单处理服务、支付处理服务和物流处理服务。我们为每个服务创建一个队列并将它们分别绑定到order-topic-exchange交换机上。订单处理服务的队列绑定键为order.支付处理服务的队列绑定键为payment.物流处理服务的队列绑定键为shipping.*。这样订单处理服务将接收所有以order.开头的消息支付处理服务将接收所有以payment.开头的消息物流处理服务将接收所有以shipping.开头的消息。 当用户下单后订单服务将订单信息发送到order-topic-exchange交换机并指定路由键为order.create.us其中us表示美国地区。 主题交换机根据消息的路由键进行匹配发现匹配了order.*的绑定键将消息发送到订单处理服务的队列中。同时支付处理服务和物流处理服务的队列不会收到该消息。 订单处理服务从队列中接收订单信息并处理订单支付处理服务和物流处理服务则不会收到该消息因为它们的队列没有匹配的路由键。 通过上述实现我们实现了基于主题交换机的灵活消息路由系统。订单服务只需要将订单相关的消息发送到order-topic-exchange交换机而不需要直接关心支付处理服务和物流处理服务的处理逻辑。同时支付处理服务和物流处理服务也只需要关注与自己相关的消息通过设置不同的绑定键实现了消息的精确路由和分发。 3. Fanout 扇形交换机是RabbitMQ中的一种消息路由方式它将消息发送到与之绑定的所有队列中不管消息的路由键是什么。 案例及实现流程 假设我们有一个在线商城的系统需要实现一个基于扇out交换机的消息通知系统。具体的场景是当用户下单后需要通知多个服务进行处理比如订单处理服务、支付处理服务和物流处理服务。 在RabbitMQ中创建一个名为order-fanout-exchange的扇out交换机类型为fanout。 我们有三个处理服务分别是订单处理服务、支付处理服务和物流处理服务。我们为每个服务创建一个队列并将它们分别绑定到order-fanout-exchange交换机上。 当用户下单后订单服务将订单信息发送到order-fanout-exchange交换机。 扇out交换机会将消息发送到所有与之绑定的队列中不管消息的路由键是什么。因此订单处理服务、支付处理服务和物流处理服务都会接收到相同的消息。 订单处理服务、支付处理服务和物流处理服务分别从各自的队列中接收订单信息并进行相应的处理。 通过上述实现我们实现了基于扇out交换机的消息通知系统。订单服务只需要将订单相关的消息发送到order-fanout-exchange交换机而不需要关心具体的处理服务。同时处理服务也只需要关注与自己相关的消息而不需要关心其他处理服务的处理逻辑。这样实现了消息的广播通知和解耦。 三、代码案例 以下代码都是基于我博客中的 RabbitMQ的基本使用进行实例案例的消息队列https://blog.csdn.net/SAME_LOVE/article/details/135704883?spm1001.2014.3001.5501 有了前面的安装部署我们只需要开启docker服务如果是开机自启就可以不用输入以下 命令(开启docker服务) systemctl start docker docker run -d \
--name my-rabbitmq \
-p 5672:5672 -p 15672:15672 \
--hostname my-rabbitmq-host \
-e RABBITMQ_DEFAULT_VHOSTmy_vhost \
-e RABBITMQ_DEFAULT_USERadmin \
-e RABBITMQ_DEFAULT_PASSadmin \
--restartalways \
rabbitmq:management 以上是我们创建的容器如果有了就可以直接启动容器即可 docker start my-rabbitmq 消费者代码 创建ReceiverQ1接收交换机中Queue01队列消息的方法 ReceiverQ1 package com.cloudjun.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;Component
SuppressWarnings(all)
Slf4j
RabbitListener(queues Queue01)
public class ReceiverQ1 {// 接收directExchange01交换机中Queue01队列消息的方法RabbitHandlerpublic void Queue01(String msg) {log.warn(Queue01接收到信息: msg);}}创建ReceiverQ2接收交换机中Queue02队列消息的方法 ReceiverQ2 package com.cloudjun.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;Component
SuppressWarnings(all)
Slf4j
RabbitListener(queues Queue02)
public class ReceiverQ2 {// 接收directExchange01交换机中Queue02队列消息的方法RabbitHandlerpublic void Queue02(String msg) {log.warn(Queue02接收到信息: msg);}}1. 直连direct
生产者代码 在生产者项目中的RabbitConfig中增加以下代码 /*** 直连交换机* /* 创建两个Binding Bean分别与Queue01和Queue02队列进行绑定* 并都指向directExchange01直连交换机键分别为Key01和Key02*/// 创建队列Beanpublic Queue Queue01() {return new Queue(Queue01);}Beanpublic Queue Queue02() {return new Queue(Queue02);}// 创建直连direct交换机Beanpublic DirectExchange directExchange01() {return new DirectExchange(directExchange01);}// 创建Binding Bean与Queue01和directExchange01绑定键为Key01Beanpublic Binding binding01() {return BindingBuilder.bind(Queue01()).to(directExchange01()).with(Key01);}// 创建Binding Bean与Queue02和directExchange01绑定键为Key02Beanpublic Binding binding02() {return BindingBuilder.bind(Queue02()).to(directExchange01()).with(Key02);} 在生产者项目中的TestController中增加以下代码 RequestMapping(test03)public String test03() {// 发送消息到名为directExchange01的交换机路由键为key01信息内容为Hello, direct exchange!// 这里的directExchange01是RabbitMQ中定义的交换机名称// 这里的key01是RabbitMQ中定义的路由键名称template.convertAndSend(directExchange01,Key01, Hello, direct exchange!);return ;}RequestMapping(test04)public String test04() {// 发送消息到名为directExchange01的交换机路由键为key02信息内容为Hello, direct exchange!// 这里的directExchange01是RabbitMQ中定义的交换机名称// 这里的key02是RabbitMQ中定义的路由键名称template.convertAndSend(directExchange01,Key02, Hello, direct exchange!);return ;} 测试
测试一test03 测试二test04 2. 主题topic
生产者代码 在生产者项目中的RabbitConfig中增加以下代码 /*** 主题交换机* /* binding03将Queue01绑定到topicExchange并使用*.*.Q1作为路由键。* binding04将Queue02绑定到topicExchange并使用*.*.Q2作为路由键。* binding05将Queue01绑定到topicExchange并使用un.#作为路由键。* binding06将Queue02绑定到topicExchange并使用un.#作为路由键。* *代表一个单词* #代表任意数量的字符也代表0个或多个*/// 创建主题交换机Beanpublic TopicExchange topicExchange() {return new TopicExchange(topicExchange);}Beanpublic Binding binding03() {return BindingBuilder.bind(Queue01()).to(topicExchange()).with(*.*.Q1);}Beanpublic Binding binding04() {return BindingBuilder.bind(Queue02()).to(topicExchange()).with(*.*.Q2);}Beanpublic Binding binding05() {return BindingBuilder.bind(Queue01()).to(topicExchange()).with(un.#);}Beanpublic Binding binding06() {return BindingBuilder.bind(Queue02()).to(topicExchange()).with(un.#);} 在生产者项目中的TestController中增加以下代码 RequestMapping(test05)public String test05(String rex) {template.convertAndSend(topicExchange,rex,Hello,topicExchange:Queue!);return ;} 测试 3. 扇形fanout
生产者代码 在生产者项目中的RabbitConfig中增加以下代码 /*** 扇形交换机** 定义了一个FanoutExchange加上Bean注解* 定义了两个Binding加上Bean注解* 将两个队列绑定到FanoutExchange上从而实现广播消息的功能* 扇形交换机会将接收到的消息路由到所有绑定到它上的队列。*/// 创建扇形交换机Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange(fanoutExchange);}Beanpublic Binding binding07() {return BindingBuilder.bind(Queue01()).to(fanoutExchange());}Beanpublic Binding binding08() {return BindingBuilder.bind(Queue02()).to(fanoutExchange());}在生产者项目中的TestController中增加以下代码 RequestMapping(test06)public String test06() {template.convertAndSend(fanoutExchange,,Hello,fanoutExchange:Queue!);return ;} 测试 整合代码
RabbitConfig
package com.cloudjun.publisher;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
SuppressWarnings(all)
public class RabbitConfig {// 创建队列Beanpublic Queue messageQueue() {return new Queue(messageQueue);}Beanpublic Queue messageUser() {return new Queue(messageUser);}/*** 直连交换机* /* 创建两个Binding Bean分别与Queue01和Queue02队列进行绑定* 并都指向directExchange01直连交换机键分别为Key01和Key02*/// 创建队列Beanpublic Queue Queue01() {return new Queue(Queue01);}Beanpublic Queue Queue02() {return new Queue(Queue02);}// 创建直连direct交换机Beanpublic DirectExchange directExchange01() {return new DirectExchange(directExchange01);}// 创建Binding Bean与Queue01和directExchange01绑定键为Key01Beanpublic Binding binding01() {return BindingBuilder.bind(Queue01()).to(directExchange01()).with(Key01);}// 创建Binding Bean与Queue02和directExchange01绑定键为Key02Beanpublic Binding binding02() {return BindingBuilder.bind(Queue02()).to(directExchange01()).with(Key02);}/*** 主题交换机* /* binding03将Queue01绑定到topicExchange并使用*.*.Q1作为路由键。* binding04将Queue02绑定到topicExchange并使用*.*.Q2作为路由键。* binding05将Queue01绑定到topicExchange并使用un.#作为路由键。* binding06将Queue02绑定到topicExchange并使用un.#作为路由键。* *代表一个单词* #代表任意数量的字符也代表0个或多个*/// 创建主题交换机Beanpublic TopicExchange topicExchange() {return new TopicExchange(topicExchange);}Beanpublic Binding binding03() {return BindingBuilder.bind(Queue01()).to(topicExchange()).with(*.*.Q1);}Beanpublic Binding binding04() {return BindingBuilder.bind(Queue02()).to(topicExchange()).with(*.*.Q2);}Beanpublic Binding binding05() {return BindingBuilder.bind(Queue01()).to(topicExchange()).with(un.#);}Beanpublic Binding binding06() {return BindingBuilder.bind(Queue02()).to(topicExchange()).with(un.#);}/*** 扇形交换机** 定义了一个FanoutExchange加上Bean注解* 定义了两个Binding加上Bean注解* 将两个队列绑定到FanoutExchange上从而实现广播消息的功能* 扇形交换机会将接收到的消息路由到所有绑定到它上的队列。*/// 创建扇形交换机Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange(fanoutExchange);}Beanpublic Binding binding07() {return BindingBuilder.bind(Queue01()).to(fanoutExchange());}Beanpublic Binding binding08() {return BindingBuilder.bind(Queue02()).to(fanoutExchange());}}TestController
package com.cloudjun.publisher;import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** author CloudJun*/
RestController
public class TestController {Autowiredprivate AmqpTemplate template;Autowiredprivate ObjectMapper objectMapper;RequestMapping(test01)public String test01(){// 发送消息到名为messageQueue的队列// 这里的messageQueue是RabbitMQ中定义的队列名称// 这里的Hello World!是发送的消息内容template.convertAndSend(messageQueue, HelloWorld!);return ;}RequestMapping(test02)public String test02() throws Exception {// 发送消息到名为messageQueue的队列// 这里的messageQueue是RabbitMQ中定义的队列名称User user new User(Jun, 123456);// 序列化对象转换为JSON字符串String json objectMapper.writeValueAsString(user);template.convertAndSend(messageUser, json);return ;}RequestMapping(test03)public String test03() {// 发送消息到名为directExchange01的交换机路由键为key01信息内容为Hello, direct exchange!// 这里的directExchange01是RabbitMQ中定义的交换机名称// 这里的key01是RabbitMQ中定义的路由键名称template.convertAndSend(directExchange01,Key01, Hello, direct exchange!);return ;}RequestMapping(test04)public String test04() {// 发送消息到名为directExchange01的交换机路由键为key02信息内容为Hello, direct exchange!// 这里的directExchange01是RabbitMQ中定义的交换机名称// 这里的key02是RabbitMQ中定义的路由键名称template.convertAndSend(directExchange01,Key02, Hello, direct exchange!);return ;}RequestMapping(test05)public String test05(String rex) {template.convertAndSend(topicExchange,rex,Hello,topicExchange:Queue!);return ;}RequestMapping(test06)public String test06() {template.convertAndSend(fanoutExchange,,Hello,fanoutExchange:Queue!);return ;}}每篇一获 通过学习以上所有的技术您可能会获得以下收获 理解消息队列的概念和作用学习了消息队列的基本概念以及它在系统架构中的作用能够更好地理解消息队列的优势和适用场景。 掌握RabbitMQ的基本原理和使用方法学习了RabbitMQ的基本原理、交换机类型、队列绑定等概念以及如何使用RabbitMQ实现消息的发送、接收和路由。 理解消息队列在微服务架构中的应用学习了消息队列在微服务架构中的重要性和应用场景能够将消息队列应用于实际的微服务系统中实现服务之间的解耦和异步通信。 掌握Docker容器化技术学习了Docker容器化技术的基本原理和使用方法能够使用Docker构建、部署和管理容器化的应用程序实现应用的快速部署和扩展。 理解扇out交换机的应用场景学习了扇out交换机的工作原理和应用场景能够将其应用于实际的系统中实现消息的广播通知和解耦。 这些技术的学习和掌握有助于您更好地理解和应用现代系统架构中的关键技术提升系统设计和开发的能力同时也为您在实际项目中的应用提供了技术支持。