网站备案被恶意注销,网站群建设的目的,公司网站传图片,甘肃启航网络科技有限公司文章目录 前言核心组成Rabbitmq 消息模式3.1 Simple 模式ProductorCustomer 3.2 Fanout 模式ProductorCustomer 3.3 Direct 模式Productor 3.4 Topic 模式Productor 3.5 Work 模式3.5.1 轮询分发ProductorWorker1 3.5.2 公平分发Worker1 防止消息丢失机制4.1 消息确认4.2 持久化… 文章目录 前言核心组成Rabbitmq 消息模式3.1 Simple 模式ProductorCustomer 3.2 Fanout 模式ProductorCustomer 3.3 Direct 模式Productor 3.4 Topic 模式Productor 3.5 Work 模式3.5.1 轮询分发ProductorWorker1 3.5.2 公平分发Worker1 防止消息丢失机制4.1 消息确认4.2 持久化 使用场景解耦削峰异步 前言
Rabbitmq 是使用 Erlang 语言开发的开源消息队列系统基于 AMQP 实现是一种应用程序对应用程序的通信方法应用程序通过读写出入队列的消息来通信而无需专用连接来链接它们。消息传递指的是应用程序之间通过在消息中发送数据进行通信而不是通过直接调用彼此通信直接调用通常是指远程过程调用的技术。 核心组成 Server又称 Broker接收客户端的连接实现 AMQP 实体服务安装 rabbitmq-serverConnection连接应用程序与Broker的网络连接TCP/IP/三次握手和四次挥手Channel网络信道几乎所有操作都在 Channel 中进行Channel 是进行消息读写的通道客户端可以建立多个 Channel每个 Channel 代表一个会话任务。Message消息服务与应用程序之间传送的数据由 Properties 和 Body 组成Properties 可以对消息进行修饰比如消息的优先级延迟等高级特性Body 则是消息体的内容。Virtual Host虚拟地址用于进行逻辑隔离最上层的消息路由一个虚拟主机可以有若干个 exchange 和 queue同一个虚拟主机里面不能有相同名称的 exchangeExchange交换机接收消息根据路由键发送消息到绑定的队列不具备消息存储能力Bindingsexchange 和 queue 之间的虚拟连接binding 中可以保存多个 routing keyRouting key是一个路由规则虚拟机可以用它来确定如何路由一个特定消息Queue队列也称为 Message Queue消息队列保存消息并将它们转发给消费者
Rabbitmq 消息模式
3.1 Simple 模式 Simple 模式是最简单的一个模式由一个生产者一个队列一个消费者组成生产者将消息通过交换机此时图中并没有交换机的概念如不定义交换机会使用默认的交换机把消息存储到队列消费者从队列中取出消息进行处理。
Productor
public class Send {private final static String QUEUE_NAME queue1;public static void main(String[] args) {// 1、创建连接工程ConnectionFactory factory new ConnectionFactory();factory.setHost(192.168.96.109);factory.setVirtualHost(/);Connection connection null;Channel channel null;try {// 2、创建连接、通道connection factory.newConnection();channel connection.createChannel();// 3、声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 消息内容String message Hello world;// 4、发送消息到指定队列channel.basicPublish(, QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));System.out.println( [x] Sent message );} catch (TimeoutException | IOException e) {e.printStackTrace();} finally {// 关闭通道if (channel ! null channel.isOpen()) {try {channel.close();} catch (Exception e) {e.printStackTrace();}}// 关闭连接if (connection ! null connection.isOpen()) {try {connection.close();} catch (Exception e) {e.printStackTrace();}}}}
}Customer
public class Recv {private final static String QUEUE_NAME queue1;public static void main(String[] args) throws IOException, TimeoutException {// 1、创建连接工程ConnectionFactory factory new ConnectionFactory();factory.setHost(192.168.96.109);factory.setVirtualHost(/);// 2、获取 Connection和 ChannelConnection connection factory.newConnection();Channel channel connection.createChannel();// 3、声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println( [*] Waiting for messages. To exit press CTRLC);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received message );};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag - {});}
}观察可视化界面会看到消息先会被写入到队列中随后又被消费者消费了。
3.2 Fanout 模式 Fanout——发布订阅模式是一种广播机制。
此模式包括一个生产者、一个交换机 (exchange)、多个队列、多个消费者。生产者将消息发送到交换机交换机不存储消息将消息存储到队列消费者从队列中取消息。如果生产者将消息发送到没有绑定队列的交换机上消息将丢失。
用 Java demo 实现此模式
Productor
public class Productor {private static final String EXCHANGE_NAME fanout_exchange;public static void main(String[] args) {// 1、创建连接工程ConnectionFactory factory new ConnectionFactory();factory.setHost(192.168.96.109);factory.setUsername(admin);factory.setPassword(admin);factory.setVirtualHost(/);Connection connection null;Channel channel null;try {// 2、获取连接、通道connection factory.newConnection();channel connection.createChannel();// 消息内容String message hello fanout mode;// 指定路由keyString routeKey ;String type fanout;// 3、声明交换机channel.exchangeDeclare(EXCHANGE_NAME, type);// 4、声明队列channel.queueDeclare(queue1, true, false, false, null);channel.queueDeclare(queue2, true, false, false, null);channel.queueDeclare(queue3, true, false, false, null);channel.queueDeclare(queue4, true, false, false, null);// 5、绑定 channel 与 queuechannel.queueBind(queue1, EXCHANGE_NAME, routeKey);channel.queueBind(queue2, EXCHANGE_NAME, routeKey);channel.queueBind(queue3, EXCHANGE_NAME, routeKey);channel.queueBind(queue4, EXCHANGE_NAME, routeKey);// 6、发布消息channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes(UTF-8));System.out.println(消息发送成功!);} catch (IOException | TimeoutException e) {e.printStackTrace();System.out.println(消息发送异常);}finally {// 关闭通道和连接......}}
}Customer
public class Customer {private static Runnable runnable new Runnable() {Overridepublic void run() {// 创建连接工厂ConnectionFactory factory new ConnectionFactory();factory.setHost(192.168.96.109);factory.setUsername(admin);factory.setPassword(admin);factory.setVirtualHost(/);final String queueName Thread.currentThread().getName();Connection connection null;Channel channel null;try {// 获取连接、通道connection factory.newConnection();channel connection.createChannel();Channel finalChannel channel;finalChannel.basicConsume(queueName, true, new DeliverCallback() {Overridepublic void handle(String consumerTag, Delivery delivery) throws IOException {System.out.println(delivery.getEnvelope().getDeliveryTag());System.out.println(queueName :收到消息是 new String(delivery.getBody(), UTF-8));}}, new CancelCallback() {Overridepublic void handle(String consumerTag) throws IOException {}});System.out.println(queueName :开始接收消息);} catch (IOException |TimeoutException e) {e.printStackTrace();} finally {// 关闭通道和连接......}}};public static void main(String[] args) throws IOException, TimeoutException {// 创建线程分别从四个队列中获取消息new Thread(runnable, queue1).start();new Thread(runnable, queue2).start();new Thread(runnable, queue3).start();new Thread(runnable, queue4).start();}
}执行完 Productor 发现四个队列中分别增加了一条消息而执行完 Customer 后四个队列中的消息都被消费者消费了。
3.3 Direct 模式 Direct 模式是在 Fanout 模式基础上添加了 routing keyFanout发布/订阅模式是交换机将消息存储到所有绑定的队列中而 Direct 模式是在此基础上添加了过滤条件交换机只会将消息存储到满足 routing key 的队列中。
在上图中我们可以看到交换机绑定了两个队列其中队列 Q1绑定的 routing key 为 “orange” 队列Q2绑定的routing key 为 “black” 和 “green”。在这样的设置中发布 routing key 为 “orange” 的消息将被路由到 Q1routing key 为 “black” 或 “green” 的消息将被路由到 Q2
在 rabbitmq 中给队列绑定 routing_keyrouting_key 必须是单词列表。
用 Java demo 实现此模式
Productor
public class Productor {private static final String EXCHANGE_NAME direct_exchange;public static void main(String[] args) {// 1、创建连接工程ConnectionFactory factory new ConnectionFactory();factory.setHost(192.168.96.109);factory.setUsername(admin);factory.setPassword(admin);factory.setVirtualHost(/);Connection connection null;Channel channel null;try {// 2、获取连接、通道connection factory.newConnection();channel connection.createChannel();// 消息内容String message hello direct mode;// 指定路由keyString routeKey email;String type direct;// 3、声明交换机channel.exchangeDeclare(EXCHANGE_NAME, type);// 4、声明队列channel.queueDeclare(queue1, true, false, false, null);channel.queueDeclare(queue2, true, false, false, null);channel.queueDeclare(queue3, true, false, false, null);// 5、绑定 channel 与 queuechannel.queueBind(queue1, EXCHANGE_NAME, email);channel.queueBind(queue2, EXCHANGE_NAME, sms);channel.queueBind(queue3, EXCHANGE_NAME, vx);// 6、发布消息channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes(UTF-8));System.out.println(消息发送成功!);} catch (IOException | TimeoutException e) {e.printStackTrace();System.out.println(消息发送异常);} finally {// 关闭通道和连接......}}
}可以通过可视化页面查看各队列绑定的 routing_key 由于设置的 routing_key为 “email”所以应该只有 queue1 存储了一条消息。 Customer 与上述 fanout 示例一致。
3.4 Topic 模式 Topic 模式是生产者通过交换机将消息存储到队列后交换机根据绑定队列的 routing key 的值进行通配符匹配如果匹配通过消息将被存储到该队列如果 routing key 的值匹配到了多个队列消息将会被发送到多个队列如果一个队列也没匹配上该消息将丢失。
routing_key 必须是单词列表用点分隔其中 * 和 # 的含义为
*1个单词#0个或多个单词
用Java demo 实现此模式
Productor
public class Productor {private static final String EXCHANGE_NAME topic_exchange;public static void main(String[] args) {// 1、创建连接工程ConnectionFactory factory new ConnectionFactory();factory.setHost(192.168.96.109);factory.setUsername(admin);factory.setPassword(admin);factory.setVirtualHost(/);Connection connection null;Channel channel null;try {// 2、获取连接、通道connection factory.newConnection();channel connection.createChannel();// 消息内容String message hello topic mode;// 指定路由keyString routeKey com.order.test.xxx;String type topic;// 3、声明交换机channel.exchangeDeclare(EXCHANGE_NAME, type);// 4、声明队列channel.queueDeclare(queue5,true,false,false,null);channel.queueDeclare(queue6,true,false,false,null);// 5、绑定 channel 与 queuechannel.queueBind(queue5, EXCHANGE_NAME, *.order.#);channel.queueBind(queue6, EXCHANGE_NAME, #.test.*);// 6、发布消息channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes(UTF-8));System.out.println(消息发送成功!);} catch (IOException | TimeoutException e) {e.printStackTrace();System.out.println(消息发送异常);} finally {// 关闭通道和连接......}}
}执行完 Productor 后通过可视化页面查看到queue 绑定的 routing_key 由于上述例子中routing_key为“com.order.test.xxx”那么 queue5 和 queue6 都将接收到消息。 Customer 与上述实例一样执行完 Customer 后再次查看队列信息queue5 和 queue6 的消息都被消费了。
3.5 Work 模式
当有多个消费者时如何均衡消息者消费消息的多少主要有两种模式
轮询模式分发按顺序轮询分发每个消费者获得相同数量的消息公平分发根据消费者消费能力公平分发处理快的处理的多处理慢的处理的少按劳分配
3.5.1 轮询分发
在这种模式下rabbitmq 采用轮询的方式将任务分配给多个消费者但可能出现一种情况当分配给某一个消费者的任务很复杂时而有些消费者接收的任务较轻量会出现有的消费者很忙而有的消费者处于空闲的状态而 rabbitmq 不会感知到这种情况的发生rabbitmq 不考虑消费者未确认消息的数量只是盲目的分配任务。
用 Java demo 实现此模式
Productor
public class Productor {public static void main(String[] args) {// 1、创建连接工程ConnectionFactory factory new ConnectionFactory();factory.setHost(192.168.96.109);factory.setUsername(admin);factory.setPassword(admin);factory.setVirtualHost(/);Connection connection null;Channel channel null;try {// 2、获取连接、通道connection factory.newConnection();channel connection.createChannel();// 3、向 Queue1 发布20个消息for (int i 0; i 20; i) {String msg feiyangyang: i;channel.basicPublish(, queue1, null, msg.getBytes(StandardCharsets.UTF_8));}System.out.println(消息发送成功!);} catch (IOException | TimeoutException e) {e.printStackTrace();System.out.println(消息发送异常);} finally {// 关闭通道和连接......}}
}Worker1
public class Worker1 {public static void main(String[] args) {// 1、创建连接工厂ConnectionFactory factory new ConnectionFactory();factory.setHost(192.168.96.109);factory.setUsername(admin);factory.setPassword(admin);factory.setVirtualHost(/);Connection connection null;Channel channel null;try {// 获取连接、通道connection factory.newConnection();channel connection.createChannel();Channel finalChannel channel;finalChannel.basicConsume(queue1, true, new DeliverCallback() {Overridepublic void handle(String consumerTag, Delivery delivery) throws IOException {System.out.println(Worker1 :收到消息是 new String(delivery.getBody(), UTF-8));try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}}, new CancelCallback() {Overridepublic void handle(String consumerTag) throws IOException {}});System.out.println(Worker1 开始接收消息);System.in.read();} catch (IOException |TimeoutException e) {e.printStackTrace();} finally {// 关闭通道和连接......}}
}Worker2 与 Worker1 相同
我们看下消息分发结果
Worker1 开始接收消息
Worker1:收到消息是feiyangyang: 0
Worker1:收到消息是feiyangyang: 2
Worker1:收到消息是feiyangyang: 4
Worker1:收到消息是feiyangyang: 6
Worker1:收到消息是feiyangyang: 8
Worker1:收到消息是feiyangyang: 10
Worker1:收到消息是feiyangyang: 12
Worker1:收到消息是feiyangyang: 14
Worker1:收到消息是feiyangyang: 16
Worker1:收到消息是feiyangyang: 18Worker2 开始接收消息
Worker2:收到消息是feiyangyang: 1
Worker2:收到消息是feiyangyang: 3
Worker2:收到消息是feiyangyang: 5
Worker2:收到消息是feiyangyang: 7
Worker2:收到消息是feiyangyang: 9
Worker2:收到消息是feiyangyang: 11
Worker2:收到消息是feiyangyang: 13
Worker2:收到消息是feiyangyang: 15
Worker2:收到消息是feiyangyang: 17
Worker2:收到消息是feiyangyang: 19可以看出轮询分发模式就是将消息均衡的分配所有消费者。
3.5.2 公平分发 为了解决 Work 轮询分发模式 这个问题rabbitmq 使用带有 perfetchCount 1 设置的 basicQos 方法。当消费者接受处理并确认前一条消息前不向此消费者发送新消息会分配给其他空闲的消费者。
Productor 代码与上述轮询模式相同而 Customer 中稍作修改
Worker1
// Channel 使用 Qos 机制
finalChannel.basicQos(1);
finalChannel.basicConsume(queue1, false, new DeliverCallback() {Overridepublic void handle(String consumerTag, Delivery delivery) throws IOException {System.out.println(Worker1 :收到消息是 new String(delivery.getBody(), UTF-8));try {Thread.sleep(1000);// 改成手动应答finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);} catch (InterruptedException e) {e.printStackTrace();}}
}, new CancelCallback() {Overridepublic void handle(String consumerTag) throws IOException {}
});上述实例相较于轮询分发模式添加了 Qos 机制设置值为1代表消费者每次从队列中获取几条消息将 Worker1 的 sleep 时间设置为 1s将 Worker2 的 sleep 时间设置为 2s查看消息分发结果
Worker1 开始接收消息
Worker1:收到消息是feiyangyang: 0
Worker1:收到消息是feiyangyang: 2
Worker1:收到消息是feiyangyang: 4
Worker1:收到消息是feiyangyang: 5
Worker1:收到消息是feiyangyang: 7
Worker1:收到消息是feiyangyang: 8
Worker1:收到消息是feiyangyang: 10
Worker1:收到消息是feiyangyang: 11
Worker1:收到消息是feiyangyang: 13
Worker1:收到消息是feiyangyang: 14
Worker1:收到消息是feiyangyang: 16
Worker1:收到消息是feiyangyang: 17
Worker1:收到消息是feiyangyang: 19
Worker2 开始接收消息
Worker2:收到消息是feiyangyang: 1
Worker2:收到消息是feiyangyang: 3
Worker2:收到消息是feiyangyang: 6
Worker2:收到消息是feiyangyang: 9
Worker2:收到消息是feiyangyang: 12
Worker2:收到消息是feiyangyang: 15
Worker2:收到消息是feiyangyang: 18当使用 Work 公平分发模式时要设置消费者为手动应答并且开启 Qos 机制。
防止消息丢失机制
4.1 消息确认
消费者完成一项任务可能需要几秒钟如果其中一个消费者开始了一项长期任务并且只完成了部分任务而死亡如果将 autoAck 设置为 true 一旦 RabbitMQ 将消息传递给消费者它会立即将其标记为删除在这种情况下我们将丢失所有已分派给该特定消费者但尚未处理的消息。
如果其中一个消费者宕了rabbitmq 可以将其消息分配给其他消费者。为了确保消息不会丢失rabbitmq 采用消息确认消费者发回确认消息告诉 rabbitmq 消息已经被接收并处理此时rabbitmq 可以放心的删除这条消息。
如果消费者在没有发送 ack 的情况下宕了rabbitmq 将理解为该条消息未被消费者处理完如果有其他消费者在线将迅速重新交付给其他消费者这样就可以确保不会丢失消息了。
默认情况下rabbitmq 会启用手动消息确认也就是 autoAck 默认为 false一旦我们完成了一项任务需要手动的进行消息确认所以 autoAck 需要保持为默认值 false并使用如下方法进行手动应答。
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);4.2 持久化
rabbitmq 的消息确认机制可以保证消息不会丢失但是如果 rabbitmq 服务器停止我们的任务仍然会丢失。
当 rabbitmq 退出或崩溃时如果不进行持久化队列和消息都会消失。需要做两件事来确保消息不会丢失将队列和消息都标记为持久的。
设置队列持久
boolean durable true;
channel.queueDeclare(hello, durable, false, false, null);设置消息持久
channel.basicPublish(, task_queue, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());将消息标记为持久性并不能完全保证消息不会丢失当 rabbitmq 接收到消息并且还没保存时仍然有很短的时间窗口会使消息丢失如果需要更强的保证可以使用发布者确认机制。
使用场景
解耦、削峰、异步
解耦
在微服务架构体系中微服务A需要与微服务B进行通信传统的做法是A调用B的接口。但这样做如果系统B无法访问或连接超时系统A需要等待直到系统B做出响应并且A与B存在严重的耦合现象。如果引入消息队列进行系统AB的通信流程是这样的
系统A将消息存储到消息队列中返回成功信息 系统B从队列中获取消息进行处理操作 系统A将消息放到队列中就不用关心系统B是否可以获取等其他事情了实现了两个系统间的解耦。
使用场景 短信、邮件通知 削峰
系统A每秒请求100个系统可以稳定运行但如果在秒杀活动中每秒并发达到1w个但系统最大处理能力只能每秒处理 1000 个所以在秒杀活动中系统服务器会出现宕机的现象。如果引入 MQ 可以解决这个问题。每秒 1w个请求会导致系统崩溃那我们让用户发送的请求都存储到队列中由于系统最大处理能力是每秒1000个请求让系统A每秒只从队列中拉取1000个请求保证系统能稳定运行在秒杀期间请求大量进入到队列积压到MQ中而系统每秒只从队列中取1000个请求处理。这种短暂的高峰期积压是没问题的因为高峰期一旦过去每秒请求数迅速递减而系统每秒还是从队列中取1000个请求进行处理系统会快速将积压的消息消费掉。
使用场景 秒杀活动团抢活动 异步
用户注册需要发送注册邮件和注册短信传统的做法有两种串行、并行。
串行方式将注册信息写库后50ms发送邮件50ms再发送短信50ms任务完成后返回客户端共耗时150ms 并行方式将注册信息写库后50ms开启子线程让发送邮件和发送短信同时进行50ms返回客户端共耗时100ms 引入MQ将注册信息写库50ms将发送邮件和短信的操作写入队列5s返回客户端而消费者什么时候从队列中取消息进行处理不用关心共耗时55ms 使用场景 将不是必须等待响应结果的业务逻辑进行异步处理