苏州专业高端网站建设网络公司,不用写代码可以做网站的软件,网络推广产品公司,wordpress大侠目录
一、同步通信 VS 异步通信
二、MQ——消息队列
RabbitMQ
RabbitMQ安装
RabbitMQ的整体架构
常见消息模型 基本消息队列#xff08;BasicQueue#xff09;
工作消息队列#xff08;WorkQueue#xff09; 发布、订阅#xff08;Publish、Subscribe#xff0…目录
一、同步通信 VS 异步通信
二、MQ——消息队列
RabbitMQ
RabbitMQ安装
RabbitMQ的整体架构
常见消息模型 基本消息队列BasicQueue
工作消息队列WorkQueue 发布、订阅Publish、Subscribe Fanout Exchange
Direct Exchange
Topic Exchange
SpringAMQP-消息转换器 一、同步通信 VS 异步通信 同步通信双方在同一个时钟信号的控制下进行数据的接收和发送来一个时钟发送端发送接收端接收他们彼此之间的工作状态是一致的例如直播、打电话。 优点 时效性强能够立即得到结果 缺点 耦合性较高每次加入新的需求都需要修改原有代码性能下降调用者需要等待服务提供者响应若调用链过长则响应时间等于每次调用时间之和资源利用率低调用链中的每个服务在等待响应的过程中不能释放请求占用的资源高并发的情况下会造成资源的极度浪费级联失败如果服务提供者出现问题所有的调用方也会跟着出问题 适用场景业务要求时效性高 异步通信异步通信在发送字符时所发送的字符之间的时间间隔可以是任意的。例如微信聊天。 在异步调用过程常见的实现就是事件驱动模式系统中发生的事件会触发相应的事件处理器或监听器 从而实现特定的业务逻辑或功能。 例如在如下的支付场景中当有请求发送给支付服务时支付服务就会通知Broker接着后续的订阅事件就会接收到请求开始同时处理业务但是支付服务不用等到后续订阅事件完成后再返回而是将请求通知给Broker之后支付服务就会返回结果。 优点 服务解耦性能提升吞吐量提高服务之间没有强依赖不用担心级联失败问题故障隔离流量削峰 缺点 依赖于Broker的可靠性、安全性和吞吐能力结构复杂后业务没有了明显的流水线难以追踪管理 适用场景对于并发和吞吐量的要求高时效性的要求低 二、MQ——消息队列 MQ消息队列存放消息的队列也是事件驱动架构的Broker。 常见的消息队列实现对比 RabbitMQ RabbitMQ是基于Erlang语言开发的消息通信中间件RabbitMQ的性能以及可用性较好国内应用较为广泛所以对RabbitMQ进行重点学习。 RabbitMQ的官网地址https://www.rabbitmq.com RabbitMQ安装
可以根据自己的需求在RabbitMQ的官网进行查看下载和安装 RabbitMQ — 兔子MQ
RabbitMQ的整体架构 首先Publisher会把消息发送给exchange交换机,exchange负责路由再把消息投递到queue队列queue负责暂存消息Consumer会从队列中获取消息并处理消息。 RabbitMQ中的几个概念 • channel 操作 MQ 的工具 • exchange 路由消息到队列中 • queue 缓存消息 • virtual host 虚拟主机是对 queue 、 exchange 等资源的逻辑分组 常见消息模型 RabbitMQ的官方文档中给出了5个MQ的Demo实例可以分为如下 基本消息队列BasicQueue工作消息队列WorkQueue发布订阅Publish、Subscribe又根据交换机类型不同分为三种 Fanout Exchange广播 Direct Exchange路由 Topic Exchange主题 基本消息队列BasicQueue 官方的HelloWorld是基于最基础的消息队列模型来实现的只包括三个角色 publisher消息发布者将消息发送到队列queuequeue消息队列负责接受并缓存消息consumer订阅队列处理队列中的消息 在RabbitMQ中需要了解的端口 在使用端口时需要在云服务器上开放所用的端口 基本消息队列的消息发送流程 建立Connection创建Channel利用Channel声明队列利用Channel向队列中发送消息 代码实现 public class PublisherTest {Testpublic void testSendMessage() throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory new ConnectionFactory();// 1.1.设置连接参数分别是主机名、端口号、vhost、用户名、密码factory.setHost(x.x.x.x);factory.setPort(5672);factory.setVirtualHost(/);factory.setUsername(xx);factory.setPassword(xx);// 1.2.建立连接Connection connection factory.newConnection();// 2.创建通道ChannelChannel channel connection.createChannel();// 3.创建队列String queueName simple.queue;channel.queueDeclare(queueName, false, false, false, null);// 4.发送消息String message hello, rabbitmq!;channel.basicPublish(, queueName, null, message.getBytes());System.out.println(发送消息成功【 message 】);// 5.关闭通道和连接channel.close();connection.close();}
}运行结果 基本消息队列的消息接收流程 建立Connection创建Channel利用Channel声明队列定义Consumer的消费行为handleDelivery()利用Channel将消费者与队列进行绑定 代码实现 public class ConsumerTest {public static void main(String[] args) throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory new ConnectionFactory();// 1.1.设置连接参数分别是主机名、端口号、vhost、用户名、密码factory.setHost(x.x.x.x);factory.setPort(5672);factory.setVirtualHost(/);factory.setUsername(xx);factory.setPassword(xx);// 1.2.建立连接Connection connection factory.newConnection();// 2.创建通道ChannelChannel channel connection.createChannel();// 3.创建队列String queueName simple.queue;channel.queueDeclare(queueName, false, false, false, null);// 4.订阅消息channel.basicConsume(queueName, true, new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {// 5.处理消息String message new String(body);System.out.println(接收到消息【 message 】);}});System.out.println(等待接收消息。。。。);}
} 运行结果 上述实现方式相对比较复杂就引入了SpringAMQP来实现。 AMQP是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关更符合微服务中独立性的要求。 SpringAMQPSpringAMQP是基于AMQP协议定义的一套API规范提供了模板来发送和接收消息。包含两部分其中spring-amqp是基础抽象spring-rabbit是底层的默认实现。 SpringAMQP的官方地址 那么利用SpringAMQP来实现基本消息队列的流程如下 在父工程中引入spring-amqp的依赖在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列在consumer服务中编写消费逻辑绑定simple.queue这个队列 具体实现 1、在父工程中引入spring-amqp的依赖 !--AMQP依赖包含RabbitMQ--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency 2、在publisher中编写测试方法向simple.queue发送消息 在publisher服务的配置文件中添加mq的连接信息 spring:rabbitmq:host: # rabbitMQ的ip地址port: 5672 # 端口username: # 用户名password: # 密码virtual-host: # 虚拟主机 在publisher服务中新建一个测试类编写测试方法 RunWith(SpringRunner.class)
SpringBootTest
public class SpringAmqpTest {Autowiredprivate RabbitTemplate rabbitTemplate;Testpublic void testSendMessage2SimpleQueue() {String queueName simple.queue;String message hello, spring amqp!;rabbitTemplate.convertAndSend(queueName, message);}
} 在RabbitMQ中的simple队列中查询信息 3、在consumer服务中编写消费逻辑监听simple.queue 在consumer服务的配置文件中添加mq连接信息 spring:rabbitmq:host: # rabbitMQ的ip地址port: 5672 # 端口username: # 用户名password: # 密码virtual-host: # 虚拟主机 在consumer服务中新建一个类编写具体的消费逻辑 Component
public class SpringRabbitListener { RabbitListener(queues simple.queue)public void listenSimpleQueue(String msg) throws InterruptedException {System.out.println(消费者接收到消息【 msg 】 LocalTime.now());Thread.sleep(20);}
} 运行启动类 工作消息队列WorkQueue 下面场景中如果queue中有50条请求消息但是consumer1只能处理40条剩余的10条就可以由consumer进行处理所以说工作消息队列可以提高消息的处理速度避免队列消息堆积 模拟Workqueue实现一个队列绑定多个消费者基本实现思路如下 在publisher服务中定义测试方法每秒产生50条消息发送到simple.queue中在consumer服务中定义两个消息监听者都监听simple.queue队列消费者1每秒处理50条消息消费者2每秒处理10条消息 代码实现 在publisher服务中定义测试方法每秒产生50条消息发送到simple.queue中 public void testSendMessage2WorkQueue() throws InterruptedException {String queueName simple.queue;String message hello, message__;for (int i 1; i 50; i) {rabbitTemplate.convertAndSend(queueName, message i);Thread.sleep(20);}} 在consumer服务中定义两个消息监听者都监听simple.queue队列设置消费者1每秒处理50条消息消费者2每秒处理10条消息 RabbitListener(queues simple.queue)public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println(消费者1接收到消息【 msg 】 LocalTime.now());Thread.sleep(20);}RabbitListener(queues simple.queue)public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println(消费者2........接收到消息【 msg 】 LocalTime.now());Thread.sleep(200);} 修改application.yml文件设置preFetch这个值可以控制预取消息的上限确保消费者2取消息时只能取一条提高效率“能者多劳” spring:rabbitmq:listener:simple:prefetch: 1 运行结果 发布、订阅Publish、Subscribe 发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange交换机。 常见exchange类型包括 Fanout广播Direct路由Topic话题 exchange负责消息路由而不是存储路由失败则消息丢失 Fanout Exchange Fanout Exchange会将接收到的消息路由到每一个跟其绑定的queue中如下 基本实现思路如下 在consumer中利用代码声明队列、交换机将二者进行绑定在consumer中编写两个消费方法分别监听fanout.queue1和fanout.queue2在publisher中编写测试方法向fanout发送消息 代码实现 在consumer中利用代码声明队列、交换机将二者进行绑定 Configuration
public class FanoutConfig {// itcast.fanoutBeanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(itcast.fanout);}// fanout.queue1Beanpublic Queue fanoutQueue1(){return new Queue(fanout.queue1);}// 绑定队列1到交换机Beanpublic Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}// fanout.queue2Beanpublic Queue fanoutQueue2(){return new Queue(fanout.queue2);}// 绑定队列2到交换机Beanpublic Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}在consumer中编写两个消费方法分别监听fanout.queue1和fanout.queue2 RabbitListener(queues fanout.queue1)public void listenFanoutQueue1(String msg) {System.out.println(消费者接收到fanout.queue1的消息【 msg 】);}RabbitListener(queues fanout.queue2)public void listenFanoutQueue2(String msg) {System.out.println(消费者接收到fanout.queue2的消息【 msg 】);}在publisher中编写测试方法向fanout发送消息 Testpublic void testSendFanoutExchange() {// 交换机名称String exchangeName itcast.fanout;// 消息String message hello, every one!;// 发送消息rabbitTemplate.convertAndSend(exchangeName, , message);}运行结果 Direct Exchange Direct Exchange会将接收到的消息根据规则路由到指定的Queue因此被称为路由模式 l每一个Queue都与Exchange设置一个BindingKeyl发布者发送消息时指定消息的RoutingKeylExchange将消息路由到BindingKey与消息RoutingKey一致的队列 基本实现思路如下 利用RabbitListener声明Exchange、Queue、RoutingKey在consumer服务中编写两个消费者方法分别监听direct.queue1和direct.queue2在publisher中编写测试方法向itcast. direct发送消息 代码实现 在consumer服务中编写两个消费者方法分别监听direct.queue1和direct.queue2并利用RabbitListener声明Exchange、Queue、RoutingKey RabbitListener(bindings QueueBinding(value Queue(name direct.queue1),exchange Exchange(name itcast.direct, type ExchangeTypes.DIRECT),key {red, blue}))public void listenDirectQueue1(String msg){System.out.println(消费者接收到direct.queue1的消息【 msg 】);}RabbitListener(bindings QueueBinding(value Queue(name direct.queue2),exchange Exchange(name itcast.direct, type ExchangeTypes.DIRECT),key {red, yellow}))public void listenDirectQueue2(String msg){System.out.println(消费者接收到direct.queue2的消息【 msg 】);}在publisher服务发送消息到DirectExchange Testpublic void testSendDirectExchange() {// 交换机名称String exchangeName itcast.direct;// 消息String message hello, red!;// 发送消息rabbitTemplate.convertAndSend(exchangeName, red, message);} 运行结果 Topic Exchange Topic Exchange与Direct Exchange类似区别在于Topic Exchange的routingKey必须是多个单词的列表并且以.分割 Queue与Exchange指定BindingKey时可以使用通配符 #代指0个或多个单词 *代指一个单词 基本实现思路如下 利用RabbitListener声明Exchange、Queue、RoutingKey在consumer服务中编写两个消费者方法分别监听topic.queue1和topic.queue2在publisher中编写测试方法向itcast. topic发送消息 代码实现 利用RabbitListener声明Exchange、Queue、RoutingKey在consumer服务中编写两个消费者方法分别监听topic.queue1和topic.queue2 RabbitListener(bindings QueueBinding(value Queue(name topic.queue1),exchange Exchange(name itcast.topic, type ExchangeTypes.TOPIC),key china.#))public void listenTopicQueue1(String msg){System.out.println(消费者接收到topic.queue1的消息【 msg 】);}RabbitListener(bindings QueueBinding(value Queue(name topic.queue2),exchange Exchange(name itcast.topic, type ExchangeTypes.TOPIC),key #.news))public void listenTopicQueue2(String msg){System.out.println(消费者接收到topic.queue2的消息【 msg 】);}在publisher中编写测试方法向itcast. topic发送消息 Testpublic void testSendTopicExchange() {// 交换机名称String exchangeName itcast.topic;// 消息String message 今天天气不错我的心情好极了!;// 发送消息rabbitTemplate.convertAndSend(exchangeName, china.weather, message);} 运行结果 SpringAMQP-消息转换器 在SpringAMQP的发送方法中接收消息的类型是Object也就是说我们可以发送任意对象类型的消息SpringAMQP会帮我们序列化为字节后发送。 Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter基于JDK的ObjectOutputStream完成序列化。 如果要修改只需要定义一个MessageConverter 类型的Bean即可。 推荐用JSON方式序列化实现步骤如下 在父工程中引入依赖 dependencygroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-databind/artifactId/dependency 在publisher和consumer服务中声明MessageConverter Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}