网站如何导入织梦cms,提升学历的正规机构有哪些,五星花园网站建设兼职,海南住房与建设厅网站这篇文章来源于稀土掘金#xff0c;来源#xff1a;https://juejin.cn/post/7132268340541653005#xff0c;主要用来收藏学习。 常见的消息队列很多#xff0c;主要包括 RabbitMQ、Kafka、RocketMQ 和 ActiveMQ#xff0c;相关的选型可以看我之前的系列#xff0c;这篇文… 这篇文章来源于稀土掘金来源https://juejin.cn/post/7132268340541653005主要用来收藏学习。 常见的消息队列很多主要包括 RabbitMQ、Kafka、RocketMQ 和 ActiveMQ相关的选型可以看我之前的系列这篇文章只讲 RabbitMQ先讲原理后搞实战。 1. 消息队列 1.1 消息队列模式 消息队列目前主要 2 种模式分别为“点对点模式”和“发布/订阅模式”。 1.1.1 点对点模式
一个具体的消息只能由一个消费者消费多个生产者可以向同一个消息队列发送消息但是一个消息在被一个消息者处理的时候这个消息在队列上会被锁住或者被移除并且其他消费者无法处理该消息。需要额外注意的是如果消费者处理一个消息失败了消息系统一般会把这个消息放回队列这样其他消费者可以继续处理。 1.1.2 发布/订阅模式
单个消息可以被多个订阅者并发的获取和处理。一般来说订阅有两种类型
临时ephemeral订阅 这种订阅只有在消费者启动并且运行的时候才存在。一旦消费者退出相应的订阅以及尚未处理的消息就会丢失。持久durable订阅 这种订阅会一直存在除非主动去删除。消费者退出后消息系统会继续维护该订阅并且后续消息可以被继续处理。 1.2 衡量标准
对消息队列进行技术选型时需要通过以下指标衡量你所选择的消息队列是否可以满足你的需求
消息顺序 发送到队列的消息消费时是否可以保证消费的顺序比如A先下单B后下单应该是A先去扣库存B再去扣顺序不能反。消息路由 根据路由规则只订阅匹配路由规则的消息比如有A/B两者规则的消息消费者可以只订阅A消息B消息不会消费。消息可靠性 是否会存在丢消息的情况比如有A/B两个消息最后只有B消息能消费A消息丢失。消息时序 主要包括“消息存活时间”和“延迟/预定的消息”“消息存活时间”表示生产者可以对消息设置TTL如果超过该TTL消息会自动消失“延迟/预定的消息”指的是可以延迟或者预订消费消息比如延时5分钟那么消息会5分钟后才能让消费者消费时间未到的话是不能消费的。消息留存 消息消费成功后是否还会继续保留在消息队列。容错性 当一条消息消费失败后是否有一些机制保证这条消息是一种能成功比如异步第三方退款消息需要保证这条消息消费掉才能确定给用户退款成功所以必须保证这条消息消费成功的准确性。伸缩 当消息队列性能有问题比如消费太慢是否可以快速支持库容当消费队列过多浪费系统资源是否可以支持缩容。吞吐量 支持的最高并发数。 2. RabbitMQ 原理初探
RabbitMQ 2007 年发布是使用 Erlang 语言开发的开源消息队列系统基于 AMQP 协议来实现。 2.1 基本概念 提到RabbitMQ就不得不提AMQP协议。AMQP协议是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议是应用层协议的一个开放标准为面向消息的中间件设计。
先了解一下AMQP协议中间的几个重要概念
Server 接收客户端的连接实现AMQP实体服务。Connection 连接应用程序与Server的网络连接TCP连接。Channel 信道消息读写等操作在信道中进行。客户端可以建立多个信道每个信道代表一个会话任务。Message 消息应用程序和服务器之间传送的数据消息可以非常简单也可以很复杂。由Properties和Body组成。Properties为外包装可以对消息进行修饰比如消息的优先级、延迟等高级特性Body就是消息体内容。Virtual Host 虚拟主机用于逻辑隔离。一个虚拟主机里面可以有若干个Exchange和Queue同一个虚拟主机里面不能有相同名称的Exchange或Queue。Exchange 交换器接收消息按照路由规则将消息路由到一个或者多个队列。如果路由不到或者返回给生产者或者直接丢弃。RabbitMQ常用的交换器常用类型有direct、topic、fanout、headers四种后面详细介绍。Binding 绑定交换器和消息队列之间的虚拟连接绑定中可以包含一个或者多个RoutingKey。RoutingKey 路由键生产者将消息发送给交换器的时候会发送一个RoutingKey用来指定路由规则这样交换器就知道把消息发送到哪个队列。路由键通常为一个“.”分割的字符串例如“com.rabbitmq”。Queue 消息队列用来保存消息供消费者消费。 2.2 工作原理 AMQP 协议模型由三部分组成生产者、消费者和服务端执行流程如下
生产者是连接到 Server建立一个连接开启一个信道。生产者声明交换器和队列设置相关属性并通过路由键将交换器和队列进行绑定。消费者也需要进行建立连接开启信道等操作便于接收消息。生产者发送消息发送到服务端中的虚拟主机。虚拟主机中的交换器根据路由键选择路由规则发送到不同的消息队列中。订阅了消息队列的消费者就可以获取到消息进行消费。
2.3 常用交换机 RabbitMQ常用的交换器类型有direct、topic、fanout、headers四种
**Direct Exchange**见文知意直连交换机意思是此交换机需要绑定一个队列要求该消息与一个特定的路由键完全匹配。简单点说就是一对一的点对点的发送。 Fanout Exchange这种类型的交换机需要将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播每台子网内的主机都获得了一份复制的消息。简单点说就是发布订阅。Topic Exchange直接翻译的话叫做主题交换机如果从用法上面翻译可能叫通配符交换机会更加贴切。这种交换机是使用通配符去匹配路由到对应的队列。通配符有两种“*” 、 “#”。需要注意的是通配符前面必须要加上.符号。 “_”“符号有且只匹配一个词。比如 a._可以匹配到a.b”、“a.c”但是匹配不了a.b.c。#“符号匹配一个或多个词。比如rabbit.#“既可以匹配到rabbit.a.b”、“rabbit.a”也可以匹配到rabbit.a.b.c”。 Headers Exchange这种交换机用的相对没这么多。它跟上面三种有点区别它的路由不是用routingKey进行路由匹配而是在匹配请求头中所带的键值进行路由。创建队列需要设置绑定的头部信息有两种模式全部匹配和部分匹配。如上图所示交换机会根据生产者发送过来的头部信息携带的键值去匹配队列绑定的键值路由到对应的队列。 2.4 消费原理 我们先看几个基本概念
broker 每个节点运行的服务程序功能为维护该节点的队列的增删以及转发队列操作请求。master queue 每个队列都分为一个主队列和若干个镜像队列。mirror queue 镜像队列作为master queue的备份。在master queue所在节点挂掉之后系统把mirror queue提升为-- master queue负责处理客户端队列操作请求。注意mirror queue只做镜像设计目的不是为了承担客户端读写压力。
集群中有两个节点每个节点上有一个broker每个broker负责本机上队列的维护并且borker之间可以互相通信。集群中有两个队列A和B每个队列都分为master queue和mirror queue备份。那么队列上的生产消费怎么实现的呢
对于消费队列如下图有两个consumer消费队列A这两个consumer连在了集群的不同机器上。RabbitMQ集群中的任何一个节点都拥有集群上所有队列的元信息所以连接到集群中的任何一个节点都可以主要区别在于有的consumer连在master queue所在节点有的连在非master queue节点上。
因为mirror queue要和master queue保持一致故需要同步机制正因为一致性的限制导致所有的读写操作都必须都操作在master queue上想想为啥读也要从master queue中读和数据库读写分离是不一样的然后由master节点同步操作到mirror queue所在的节点。即使consumer连接到了非master queue节点该consumer的操作也会被路由到master queue所在的节点上这样才能进行消费。
对于生成队列原理和消费一样如果连接到非 master queue 节点则路由过去。 所以到这里小伙伴们就可以看到 RabbitMQ的不足由于master queue单节点导致性能瓶颈吞吐量受限。虽然为了提高性能内部使用了Erlang这个语言实现但是终究摆脱不了架构设计上的致命缺陷。 2.5 高级特性 2.5.1 过期时间
Time To Live也就是生存时间是一条消息在队列中的最大存活时间单位是毫秒下面看看RabbitMQ过期时间特性
RabbitMQ可以对消息和队列设置TTL。RabbitMQ支持设置消息的过期时间在消息发送的时候可以进行指定每条消息的过期时间可以不同。RabbitMQ支持设置队列的过期时间从消息入队列开始计算直到超过了队列的超时时间配置那么消息会变成死信自动清除。如果两种方式一起使用则过期时间以两者中较小的那个数值为准。当然也可以不设置TTL不设置表示消息不会过期如果设置为0则表示除非此时可以直接将消息投递到消费者否则该消息将被立即丢弃。 2.5.2 消息确认
为了保证消息从队列可靠地到达消费者RabbitMQ提供了消息确认机制。
消费者订阅队列的时候可以指定autoAck参数当autoAck为true的时候RabbitMQ采用自动确认模式RabbitMQ自动把发送出去的消息设置为确认然后从内存或者硬盘中删除而不管消费者是否真正消费到了这些消息。
当autoAck为false的时候RabbitMQ会等待消费者回复的确认信号收到确认信号之后才从内存或者磁盘中删除消息。
消息确认机制是RabbitMQ消息可靠性投递的基础只要设置autoAck参数为false消费者就有足够的时间处理消息不用担心处理消息的过程中消费者进程挂掉后消息丢失的问题。 2.5.3 持久化
消息的可靠性是RabbitMQ的一大特色那么RabbitMQ是如何保证消息可靠性的呢答案就是消息持久化。持久化可以防止在异常情况下丢失数据。RabbitMQ的持久化分为三个部分交换器持久化、队列持久化和消息的持久化。
交换器持久化可以通过在声明队列时将durable参数设置为true。如果交换器不设置持久化那么在RabbitMQ服务重启之后相关的交换器元数据会丢失不过消息不会丢失只是不能将消息发送到这个交换器了。
队列的持久化能保证其本身的元数据不会因异常情况而丢失但是不能保证内部所存储的消息不会丢失。要确保消息不会丢失需要将其设置为持久化。队列的持久化可以通过在声明队列时将durable参数设置为true。
设置了队列和消息的持久化当RabbitMQ服务重启之后消息依然存在。如果只设置队列持久化或者消息持久化重启之后消息都会消失。
当然也可以将所有的消息都设置为持久化但是这样做会影响RabbitMQ的性能因为磁盘的写入速度比内存的写入要慢得多。
对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量。鱼和熊掌不可兼得关键在于选择和取舍。在实际中需要根据实际情况在可靠性和吞吐量之间做一个权衡。 2.5.4 死信队列
当消息在一个队列中变成死信之后他能被重新发送到另一个交换器中这个交换器成为死信交换器与该交换器绑定的队列称为死信队列。消息变成死信有下面几种情况
消息被拒绝。消息过期队列达到最大长度
DLX也是一个正常的交换器和一般的交换器没有区别他能在任何的队列上面被指定实际上就是设置某个队列的属性。当这个队列中有死信的时候RabbitMQ会自动将这个消息重新发送到设置的交换器上进而被路由到另一个队列我们可以监听这个队列中消息做相应的处理。死信队列有什么用当发生异常的时候消息不能够被消费者正常消费被加入到了死信队列中。后续的程序可以根据死信队列中的内容分析当时发生的异常进而改善和优化系统。 2.5.5 延迟队列
一般的队列消息一旦进入队列就会被消费者立即消费。延迟队列就是进入该队列的消息会被消费者延迟消费延迟队列中存储的对象是的延迟消息“延迟消息”是指当消息被发送以后等待特定的时间后消费者才能拿到这个消息进行消费。
延迟队列用于需要延迟工作的场景。最常见的使用场景淘宝或者天猫我们都使用过用户在下单之后通常有30分钟的时间进行支付如果这30分钟之内没有支付成功那么订单就会自动取消。
除了延迟消费延迟队列的典型应用场景还有延迟重试。比如消费者从队列里面消费消息失败了可以延迟一段时间以后进行重试。 2.6 特性分析
这里才是内容的重点不仅需要知道Rabbit的特性还需要知道支持这些特性的原因
消息路由支持 RabbitMQ可以通过不同的交换器支持不同种类的消息路由消息有序不支持 当消费消息时如果消费失败消息会被放回队列然后重新消费这样会导致消息无序消息时序非常好 通过延时队列可以指定消息的延时时间过期时间TTL等容错处理非常好 通过交付重试和死信交换器DLX来处理消息处理故障伸缩一般 伸缩其实没有非常智能因为即使伸缩了master queue还是只有一个负载还是只有这一个master queue去抗所以我理解RabbitMQ的伸缩很弱个人理解。持久化不太好 没有消费的消息可以支持持久化这个是为了保证机器宕机时消息可以恢复但是消费过的消息就会被马上删除因为RabbitMQ设计时就不是为了去存储历史数据的。消息回溯支持 因为消息不支持永久保存所以自然就不支持回溯。高吞吐中等 因为所有的请求的执行最后都是在master queue它的这个设计导致单机性能达不到十万级的标准。 3. RabbitMQ环境搭建
因为我用的是Mac所以直接可以参考官网
https://www.rabbitmq.com/install-homebrew.html执行
brew update brew install rabbitmq之前没有执行brew update直接执行brew install rabbitmq时会报各种各样奇怪的错误其中“403 Forbidde”居多。 但是在执行“brew install rabbitmq”会自动安装其它的程序如果你使用源码安装Rabbitmq因为启动该服务依赖erlang环境所以你还需手动安装erlang但是目前官方已经一键给你搞定会自动安装Rabbitmq依赖的所有程序是不是很棒 执行成功输出
启动服务
# 启动方式1后台启动
brew services start rabbitmq
# 启动方式2当前窗口启动
cd /usr/local/Cellar/rabbitmq/3.8.19
rabbitmq-server在浏览器输入
http://localhost:15672/会出现RabbitMQ后台管理界面用户名和密码都为guest 通过brew安装一行命令搞定真香 4. RabbitMQ测试 4.1 添加账号 首先得启动mq
## 添加账号
rabbitmqctl add_user admin admin
## 添加访问权限
rabbitmqctl set_permissions -p / admin .* .* .*
## 设置超级权限
rabbitmqctl set_user_tags admin administrator4.2 JAVA代码示例 因为代码中引入了java 8的特性pom引入依赖
dependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactIdversion5.5.1/version
/dependencypluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdconfigurationsource8/sourcetarget8/target/configuration/plugin
/plugins测试代码
public class RabbitMqTest {//消息队列名称private final static String QUEUE_NAME hello;Testpublic void send() throws java.io.IOException, TimeoutException {//创建连接工程ConnectionFactory factory new ConnectionFactory();factory.setHost(127.0.0.1);factory.setPort(5672);factory.setUsername(admin);factory.setPassword(admin);//创建连接Connection connection factory.newConnection();//创建消息通道Channel channel connection.createChannel();//生成一个消息队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);for (int i 0; i 10; i) {String message Hello World RabbitMQ count: i;//发布消息第一个参数表示路由Exchange名称为则表示使用默认消息路由channel.basicPublish(, QUEUE_NAME, null, message.getBytes());System.out.println( [x] Sent message );}//关闭消息通道和连接channel.close();connection.close();}Testpublic void consumer() throws java.io.IOException, TimeoutException {//创建连接工厂ConnectionFactory factory new ConnectionFactory();factory.setHost(127.0.0.1);factory.setPort(5672);factory.setUsername(admin);factory.setPassword(admin);//创建连接Connection connection factory.newConnection();//创建消息信道final Channel channel connection.createChannel();//消息队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);System.out.println([*] Waiting for message. To exist press CTRLC);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received message );};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag - {});}
}执行send()后控制台输出
[x] Sent Hello World RabbitMQ count: 0
[x] Sent Hello World RabbitMQ count: 1
[x] Sent Hello World RabbitMQ count: 2
[x] Sent Hello World RabbitMQ count: 3
[x] Sent Hello World RabbitMQ count: 4
[x] Sent Hello World RabbitMQ count: 5
[x] Sent Hello World RabbitMQ count: 6
[x] Sent Hello World RabbitMQ count: 7
[x] Sent Hello World RabbitMQ count: 8
[x] Sent Hello World RabbitMQ count: 9执行consumer()后
5. 基本使用姿势
5.1 公共代码封装 封装工厂类
public class RabbitUtil {public static ConnectionFactory getConnectionFactory() {//创建连接工程下面给出的是默认的caseConnectionFactory factory new ConnectionFactory();factory.setHost(127.0.0.1);factory.setPort(5672);factory.setUsername(admin);factory.setPassword(admin);factory.setVirtualHost(/);return factory;}
}封装生成者
public class MsgProducer {public static void publishMsg(String exchange, BuiltinExchangeType exchangeType, String toutingKey, String message) throws IOException, TimeoutException {ConnectionFactory factory RabbitUtil.getConnectionFactory();//创建连接Connection connection factory.newConnection();//创建消息通道Channel channel connection.createChannel();// 声明exchange中的消息为可持久化不自动删除channel.exchangeDeclare(exchange, exchangeType, true, false, null);// 发布消息channel.basicPublish(exchange, toutingKey, null, message.getBytes());System.out.println(Sent message );channel.close();connection.close();}
}5.2 Direct方式 5.2.1 Direct示例
生产者
public class DirectProducer {private static final String EXCHANGE_NAME direct.exchange;public void publishMsg(String routingKey, String msg) {try {MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, routingKey, msg);} catch (Exception e) {e.printStackTrace();}}public static void main(String[] args) throws InterruptedException {DirectProducer directProducer new DirectProducer();String[] routingKey new String[]{aaa, bbb, ccc};String msg hello ;for (int i 0; i 10; i) {directProducer.publishMsg(routingKey[i % 3], msg i);}System.out.println(----over-------);Thread.sleep(1000 * 60 * 100);}
}执行生产者往消息队列中放入10条消息其中key分别为“aaa”、“bbb”和“ccc”分别放入qa、qb、qc三个队列 下面是qa队列的信息
消费者
public class DirectConsumer {private static final String exchangeName direct.exchange;public void msgConsumer(String queueName, String routingKey) {try {MsgConsumer.consumerMsg(exchangeName, queueName, routingKey);} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}public static void main(String[] args) throws InterruptedException {DirectConsumer consumer new DirectConsumer();String[] routingKey new String[]{aaa, bbb, ccc};String[] queueNames new String[]{qa, qb, qc};for (int i 0; i 3; i) {consumer.msgConsumer(queueNames[i], routingKey[i]);}Thread.sleep(1000 * 60 * 100);}
}执行后输出
[*] Waiting for message. To exist press CTRLC[x] Received hello 0[x] Done[x] Received hello 3[x] Done[x] Received hello 6[x] Done[x] Received hello 9[x] Done
[*] Waiting for message. To exist press CTRLC[x] Received hello 1[x] Done[x] Received hello 4[x] Done[x] Received hello 7[x] Done
[*] Waiting for message. To exist press CTRLC[x] Received hello 2[x] Done[x] Received hello 5[x] Done[x] Received hello 8[x] Done可以看到分别从qa、qb、qc中将不同的key的数据消费掉。 5.2.2 问题探讨 有个疑问这个队列的名称qa、qb和qc是RabbitMQ自动生成的么我们可以指定队列名称么 我做了个简单的实验我把消费者代码修改了一下
public static void main(String[] args) throws InterruptedException {DirectConsumer consumer new DirectConsumer();String[] routingKey new String[]{aaa, bbb, ccc};String[] queueNames new String[]{qa, qb, qc1}; // 将qc修改为qc1for (int i 0; i 3; i) {consumer.msgConsumer(queueNames[i], routingKey[i]);}Thread.sleep(1000 * 60 * 100);
}执行后如下图所示
我们可以发现多了一个qc1所以可以判断这个界面中的queues是消费者执行时会将消费者指定的队列名称和direct.exchange绑定绑定的依据就是key。
当我们把队列中的数据全部消费掉然后重新执行生成者后会发现qc和qc1中都有3条待消费的数据因为绑定的key都是“ccc”所以两者的数据是一样的绑定关系如下 注意当没有Queue绑定到Exchange时往Exchange中写入的消息也不会重新分发到之后绑定的queue上。 思考不执行消费者看不到这个Queres中信息我其实可以把这个界面理解为消费者信息界面。不过感觉还是怪怪的这个queues如果是消费者信息就不应该叫queues我理解queues应该是RabbitMQ中实际存放数据的queues难道是我理解错了 5.3 Fanout方式指定队列
生产者封装
public class FanoutProducer {private static final String EXCHANGE_NAME fanout.exchange;public void publishMsg(String routingKey, String msg) {try {MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, routingKey, msg);} catch (Exception e) {e.printStackTrace();}}public static void main(String[] args) {FanoutProducer directProducer new FanoutProducer();String msg hello ;for (int i 0; i 10; i) {directProducer.publishMsg(, msg i);}}
}消费者
public class FanoutConsumer {private static final String EXCHANGE_NAME fanout.exchange;public void msgConsumer(String queueName, String routingKey) {try {MsgConsumer.consumerMsg(EXCHANGE_NAME, queueName, routingKey);} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}public static void main(String[] args) {FanoutConsumer consumer new FanoutConsumer();String[] queueNames new String[]{qa-2, qb-2, qc-2};for (int i 0; i 3; i) {consumer.msgConsumer(queueNames[i], );}}
}执行生成者结果如下我们发现生产者生产的10条数据在每个消费者中都可以消费这个是和Direct不同的地方但是使用Fanout方式时有几个点需要注意一下
生产者的routkey可以为空因为生产者的所有数据会下放到每一个队列所以不会通过routkey去路由消费者需要指定queues因为消费者需要绑定到指定的queues才能消费。 这幅图就画出了Fanout的精髓之处exchange会和所有的queue进行绑定不区分路由消费者需要绑定指定的queue才能发起消费。 注意往队列塞数据时可能通过界面看不到消息个数的增加可能是你之前已经开启了消费进程导致增加的消息马上被消费了。 5.4 Fanout方式随机获取队列
上面我们是指定了队列这个方式其实很不友好比如对于Fanout我其实根本无需关心队列的名字如果还指定对应队列进行消费感觉这个很冗余所以我们这里就采用随机获取队列名字的方式下面代码直接Copy官网。
生成者封装
public static void publishMsgV2(String exchange, BuiltinExchangeType exchangeType, String message) throws IOException, TimeoutException {ConnectionFactory factory RabbitUtil.getConnectionFactory();//创建连接Connection connection factory.newConnection();//创建消息通道Channel channel connection.createChannel();// 声明exchange中的消息channel.exchangeDeclare(exchange, exchangeType);// 发布消息channel.basicPublish(exchange, , null, message.getBytes(UTF-8));System.out.println(Sent message );channel.close();connection.close();
}消费者封装
public static void consumerMsgV2(String exchange) throws IOException, TimeoutException {ConnectionFactory factory RabbitUtil.getConnectionFactory();Connection connection factory.newConnection();final Channel channel connection.createChannel();channel.exchangeDeclare(exchange, fanout);String queueName channel.queueDeclare().getQueue();channel.queueBind(queueName, exchange, );System.out.println( [*] Waiting for messages. To exit press CTRLC);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received message );};channel.basicConsume(queueName, true, deliverCallback, consumerTag - { });
}生产者
public class FanoutProducer {private static final String EXCHANGE_NAME fanout.exchange.v2;public void publishMsg(String msg) {try {MsgProducer.publishMsgV2(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, msg);} catch (Exception e) {e.printStackTrace();}}public static void main(String[] args) {FanoutProducer directProducer new FanoutProducer();String msg hello ;for (int i 0; i 10000; i) {directProducer.publishMsg(msg i);}}
}消费者
public class FanoutConsumer {private static final String EXCHANGE_NAME fanout.exchange.v2;public void msgConsumer() {try {MsgConsumer.consumerMsgV2(EXCHANGE_NAME);} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}public static void main(String[] args) {FanoutConsumer consumer new FanoutConsumer();for (int i 0; i 3; i) {consumer.msgConsumer();}}
}执行后管理界面如下
5.5 Topic方式 代码详见官网https://www.rabbitmq.com/tutorials/tutorial-five-java.html 更多方式请查看官网https://www.rabbitmq.com/getstarted.html 6. RabbitMQ 进阶
6.1 durable 和 autoDeleted
在定义Queue时可以指定这两个参数
/*** Declare an exchange.* see com.rabbitmq.client.AMQP.Exchange.Declare* see com.rabbitmq.client.AMQP.Exchange.DeclareOk* param exchange the name of the exchange* param type the exchange type* param durable true if we are declaring a durable exchange (the exchange will survive a server restart)* param autoDelete true if the server should delete the exchange when it is no longer in use* param arguments other properties (construction arguments) for the exchange* return a declaration-confirm method to indicate the exchange was successfully declared* throws java.io.IOException if an error is encountered*/
Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete,MapString, Object arguments) throws IOException;/**
* Declare a queue
* see com.rabbitmq.client.AMQP.Queue.Declare
* see com.rabbitmq.client.AMQP.Queue.DeclareOk
* param queue the name of the queue
* param durable true if we are declaring a durable queue (the queue will survive a server restart)
* param exclusive true if we are declaring an exclusive queue (restricted to this connection)
* param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* param arguments other properties (construction arguments) for the queue
* return a declaration-confirm method to indicate the queue was successfully declared
* throws java.io.IOException if an error is encountered
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,MapString, Object arguments) throws IOException;6.1.1 durable
持久化保证RabbitMQ在退出或者crash等异常情况下数据没有丢失需要将queueexchange和Message都持久化。
若是将queue的持久化标识durable设置为true则代表是一个持久的队列那么在服务重启之后会重新读取之前被持久化的queue。
虽然队列可以被持久化但是里面的消息是否为持久化还要看消息的持久化设置。即重启queue但是queue里面还没有发出去的消息那队列里面还存在该消息么这个取决于该消息的设置。 6.1.2 autoDeleted
自动删除如果该队列没有任何订阅的消费者的话该队列会被自动删除。这种队列适用于临时队列。
当一个Queue被设置为自动删除时当消费者断掉之后queue会被删除这个主要针对的是一些不是特别重要的数据不希望出现消息积累的情况。
6.1.3 小节
当一个Queue已经声明好了之后不能更新durable或者autoDelted值当需要修改时需要先删除再重新声明消费的Queue声明应该和投递的Queue声明的 durable,autoDelted属性一致否则会报错对于重要的数据一般设置 durabletrue, autoDeletedfalse对于设置 autoDeletedtrue 的队列当没有消费者之后队列会自动被删除 6.2 ACK
执行一个任务可能需要花费几秒钟你可能会担心如果一个消费者在执行任务过程中挂掉了。一旦RabbitMQ将消息分发给了消费者就会从内存中删除。在这种情况下如果正在执行任务的消费者宕机会丢失正在处理的消息和分发给这个消费者但尚未处理的消息。
但是我们不想丢失任何任务如果有一个消费者挂掉了那么我们应该将分发给它的任务交付给另一个消费者去处理。
为了确保消息不会丢失RabbitMQ支持消息应答。消费者发送一个消息应答告诉RabbitMQ这个消息已经接收并且处理完毕了。RabbitMQ就可以删除它了。
因此手动ACK的常见手段
// 接收消息之后主动ack/nak
Consumer consumer new DefaultConsumer(channel) {Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {String message new String(body, UTF-8);try {System.out.println( [ queue ] Received message);channel.basicAck(envelope.getDeliveryTag(), false);} catch (Exception e) {channel.basicNack(envelope.getDeliveryTag(), false, true);}}
};
// 取消自动ack
channel.basicConsume(queue, false, consumer);