网站开发好,如何做单页网站视频,海口网站建设服务,携程网站官网RabbitMQ的消费模式分两种#xff1a;推模式和拉模式#xff0c;推模式采用Basic.Consume进行消费#xff0c;拉模式则是调用Basic.Get进行消费。 消费者通过订阅队列从RabbitMQ中获取消息进行消费#xff0c;为避免消息丢失可采用消费确认机制 消费者 拉模式拉模式的实… RabbitMQ的消费模式分两种推模式和拉模式推模式采用Basic.Consume进行消费拉模式则是调用Basic.Get进行消费。 消费者通过订阅队列从RabbitMQ中获取消息进行消费为避免消息丢失可采用消费确认机制 消费者 拉模式拉模式的实现 推模式消费确认与拒绝消息确认的实现消息拒绝的实现basicRecover basicQos 限制消费总结 拉模式 顾名思义拉模式就是消费者主动的从RabbitMQ中获取数据通过拉模式每次获取数据只能获取一条。拉模式的时序图如下图所示。 RabbitMQ每次接收到Get请求后会将队列中即将被消费的消息发送给消费者消费者接收处理消息后向RabbitMQ发送消费应答然后该消息将从队列中移除。 需要注意的是拉模式普遍仅适用用从RabbitMQ中获取一条数据的场景如果以循环的方式获取批量数据将影响RabbitMQ的性能。
拉模式的实现 拉模式通过以下方法实现
/**
* queue 队列名称
* autoAck 是否开启自动应答
*/
GetResponse basicGet(String queue,boolean autoAck)如上述代码所示channel.basicGet方法返回的是一个GetResponse在GetResponse对象中包含了一条消息内容消费者可以获取该消息并进行处理。
推模式 推模式是指RabbitMQ将消息主动推送给订阅监听队列的消费者。在RabbitMQ推送消息的过程中其并不关心该消费者是否完成上一条消息的消费只要队列中存在消息则向消费者推送当然推送消息的个数会受Basic.Qos的限制。Basic.Qos指定了某个消费者可以保持的未应答的消息数量。 /*** Start a consumer. Calls the consumers {link Consumer#handleConsumeOk}* method.* Provide access to codebasic.deliver(Broker推送消息)/code, codebasic.cancel/code* and shutdown signal callbacks (which is sufficient* for most cases). See methods with a {link Consumer} argument* to have access to all the application callbacks.* param queue 队列名称* param autoAck 是否自动确认* param consumerTag 消费者标签,消费者的唯一标识符* param noLocal 是否可以接收同Connection中生产者的消息true不能接收* param exclusive 是否设置排他* param arguments 其他参数* param deliverCallback 消息接收回调* param cancelCallback 消费取消回调* param shutdownSignalCallback 连接或者信道关闭回调* return the consumerTag associated with the new consumer*/String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, MapString, Object arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, MapString, Object arguments, Consumer callback) throws IOException;可以通过上述两种方法设置参数最多的实现声明消费者。其中Consumer的定义如下 public interface Consumer {/*** 消费者通过basicConsume被注册后调用*/void handleConsumeOk(String consumerTag);/*** 消费者通过basicCancel取消时调用*/void handleCancelOk(String consumerTag);/*** 消费者不通过basicCancel取消时调用*/void handleCancel(String consumerTag) throws IOException;/*** 通道或者连接关闭时调用*/void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);/*** 接收重新发送的未被确认的消息时调用*/void handleRecoverOk(String consumerTag);/*** 接收消息时调用* param consumerTag 消费者标签* param envelope 打包消息的数据* param properties 消息的内容标头数据* param body 消息内容*/void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException;
}
消费确认与拒绝 为了保障消息从队列可靠地到达消费者RabbitMQ提供了消息确认机制。消费者在订阅队列时可以指定autoAck参数当autoAck为true时RabbitMQ会自动的把发送出去的消息设置为确认然后从队列中删除当autoACK为false时RabbitMQ会等待消费者显式回复确认信号后才从内存中移去消息先标记再删除。 autoAck参数意为自动应答但是如果该参数为true时则rabbitMQ将自动将发送的消息标记确认无需消费者进行应答。 当autoAck参数为false时对于RabbitMQ服务器而言队列中的消息分成两部分一部分时等待投递给消费者的消息一部分时已经投递给消费者但是还未收到消费者确认消息的消息 RabbitMQ不会为未确认的消息设置过期时间如果一个消息一直未被消费者确认那么这个消息再RabbitMQ中将一直保存为投递未确认状态指导消费者确认或者消费者断开连接如果消费者断开连接则该消费者接收但未确认的消息将重新入队。
消息确认的实现 消息的显式确认需要消费者再声明的过程中设置autoAckfalse。然后该消费者消费的消息可以显式的进行确认应答。确认应答方法如下 /*** param 消息的标签可通过Delivery.getEnvelope().getDeliveryTag()获取* param 如果为true则将发送给该消费者的该消息之前的所有未应答的消息进行应答如果为false则仅应答一条消息*/void basicAck(long deliveryTag, boolean multiple) throws IOException;当进行消息的批量确认时将所有发送给该消费者未确认的消息进行确认而针对监听同一队列的其他消费者的未确认消息并不进行处理。
消息拒绝的实现 RabbitMQ提供了两种消息拒绝的方法Basic.Reject和Basic.Nack命令其两者的区别时Nack可以进行批量拒绝。 /*** param deliveryTag 消息标签* param requeue 为true时被拒绝的消息重新入队否则将成为死信* throws java.io.IOException if an error is encountered*/void basicReject(long deliveryTag, boolean requeue) throws IOException;/*** param deliveryTag 消息标签* param multiple 如果为true则批量拒绝自该消息之前所有未确认的发送给该消费者的消息* param requeue 为true时被拒绝的消息重新入队否则将成为死信* throws java.io.IOException if an error is encountered*/void basicNack(long deliveryTag, boolean multiple, boolean requeue)throws IOException;basicRecover
该方法可以将某个消费者未应答确认或者拒绝的消息重新入队该方法会导致
投递而未被应答的消息可以重新发送给消费者进行处理消费者的消息队列被清空可以重新接收到其他消息 /*** p* Ask the broker to resend unacknowledged messages. In 0-8* basic.recover is asynchronous; in 0-9-1 it is synchronous, and* the new, deprecated method basic.recover_async is asynchronous.* /p* Equivalent to calling codebasicRecover(true)/code, messages* will be requeued and possibly delivered to a different consumer.* see #basicRecover(boolean)*/Basic.RecoverOk basicRecover() throws IOException;/*** Ask the broker to resend unacknowledged messages. In 0-8* basic.recover is asynchronous; in 0-9-1 it is synchronous, and* the new, deprecated method basic.recover_async is asynchronous.* param requeue If true, messages will be requeued and possibly* delivered to a different consumer. If false, messages will be* redelivered to the same consumer.*/Basic.RecoverOk basicRecover(boolean requeue) throws IOException;basicQos 限制消费 默认情况下消费者对于接收的消息数量并未限制也就是说一旦RabbitMQ中接收到消息并且存在消费者则RabbitMQ将把消息发送到相关的消费者中并不关心消费者是否消息完信息。 轮询的默认消息分发机制会导致消费者资源不能合理利用、消费者消息积压导致内存溢出等问题。为解决上述问题可以使用basicQos方法实现限制信道上消费者所能保持的最大未确认消息数量。该方法如下 /*** param prefetchSize 消息大小* param prefetchCount 消息数量* param global 是否全局* throws java.io.IOException if an error is encountered*/void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;针对global参数需要注意一下内容
当globaltrue时信道上所有的消费者都需要遵从消息数量限定值某个信道上所有消费者未确认消息数量prefetchCount)等globalfalse时新的消费者需要遵从消息数量的限定值。可以调用两次basicQos方法并使用不同的global参数这种情况下两次配置都可以生效。
总结 消费者就是针对某个队列进行消息监听和消息消费的。消费者消费消息存在拉模式和推模式推模式的是使用场景相对比较多。 为确保消息被合法的消费RabbitMQ提供了消费确认机制投递的消息并不能被理解完成了消费仅消费者确认消费该消息才会被移除队列。 默认的消息投递机制时轮询轮询的消息分发并会关系消费者的性能以及消息积压的问题因此需要限制每个消费者所能保持的最大未确认的消息数量。