用eclipse做jsp网站,住房和城乡建设部主网站,wordpress 文章 模型,胶州国际网站建设效果RabbitMQ实战指南#xff08;三#xff09;—— 高级特性 RabbitMQ是一个功能强大的消息队列系统#xff0c;提供了许多高级特性来满足各种消息传递的需求。下面是一些常用的高级特性的详细描述和代码示例#xff1a; 详细描述
1.TTL#xff08;Time-To-Live#xff09;…RabbitMQ实战指南三—— 高级特性 RabbitMQ是一个功能强大的消息队列系统提供了许多高级特性来满足各种消息传递的需求。下面是一些常用的高级特性的详细描述和代码示例 详细描述
1.TTLTime-To-Live是指消息的生存时间它可以设定在消息发送时或队列创建时。一旦消息的生存时间超过了设定的 TTL就会被标记为过期消息并被丢弃或转发到死信队列。
2.死信Dead Letter队列是用于存放被标记为过期消息或被拒绝的消息的特殊队列。当消息被标记为过期或者被拒绝时可以将其转发到死信队列以备后续处理例如记录日志或进一步分析。
3.延迟队列Delay Queue是一种特殊的队列它可以延迟消息的传递。当消息被发送到延迟队列时不会立即传递给消费者而是在设定的延迟时间后才会被传递。这种机制可以用于处理需要延迟处理的任务如定时任务或订单超时处理。
4.优先级队列Priority Queue是一种按照消息优先级排序的队列。通过设置消息的优先级可以确保高优先级的消息在队列中被优先处理避免低优先级的消息长时间阻塞在队列中。
5.RPCRemote Procedure Call是一种远程过程调用的机制它允许一个应用程序向另一个应用程序请求执行某个特定的操作。在使用RabbitMQ进行RPC时客户端发送请求消息到队列并等待服务器返回响应消息从而实现应用程序之间的通信。
6.消息持久化是指将消息存储到磁盘以防止消息在RabbitMQ重启或崩溃时丢失。在RabbitMQ中默认情况下消息是非持久化的但通过将消息的delivery mode设置为2可以实现消息的持久化。当生产者发送持久化消息时RabbitMQ会将其存储到磁盘上的日志文件中以确保消息的安全。
7.在生产端消息确认Publish Confirm是一种机制用于确保消息成功发送到RabbitMQ服务器。通过设置消息的confirm模式生产者可以在消息成功到达服务器后得到确认。
代码示例:
TTLTime-To-Live消息的生存时间可以在消息发送时设置。当消息的生存时间超过设定的时间后消息将被删除。这对于一些需要限定消息处理时间的场景非常有用。
import pika# 创建连接和通道
connection pika.BlockingConnection(pika.ConnectionParameters(localhost))
channel connection.channel()# 设置队列的TTL为10秒
args {x-message-ttl: 10000
}
channel.queue_declare(queuemy_queue, argumentsargs)# 发送消息时设置消息的TTL为5秒
channel.basic_publish(exchange, routing_keymy_queue, bodyHello, RabbitMQ!, propertiespika.BasicProperties(expiration5000))# 关闭连接
connection.close()死信Dead Letter Exchange当消息被拒绝、过期或达到最大重试次数时可以将其发送到一个指定的交换机这个交换机被称为死信交换机。通过使用死信交换机可以对无法处理的消息进行统一处理。
import pika# 创建连接和通道
connection pika.BlockingConnection(pika.ConnectionParameters(localhost))
channel connection.channel()# 声明死信交换机和队列
channel.exchange_declare(exchangedlx_exchange, exchange_typedirect)
channel.queue_declare(queuedlx_queue)
channel.queue_bind(exchangedlx_exchange, queuedlx_queue, routing_keydlx_key)# 设置队列的死信交换机
args {x-dead-letter-exchange: dlx_exchange,x-dead-letter-routing-key: dlx_key
}
channel.queue_declare(queuemy_queue, argumentsargs)# 发送消息时设置消息的TTL为5秒
channel.basic_publish(exchange, routing_keymy_queue, bodyHello, RabbitMQ!, propertiespika.BasicProperties(expiration5000))# 关闭连接
connection.close()延迟队列通过TTL和死信结合可以实现延迟队列的功能。可以将需要延迟发送的消息发送到一个队列中并设置相应的TTL当消息过期时会被发送到死信队列中。
import pika# 创建连接和通道
connection pika.BlockingConnection(pika.ConnectionParameters(localhost))
channel connection.channel()# 声明死信交换机和队列
channel.exchange_declare(exchangedlx_exchange, exchange_typedirect)
channel.queue_declare(queuedlx_queue)
channel.queue_bind(exchangedlx_exchange, queuedlx_queue, routing_keydlx_key)# 设置队列的TTL和死信交换机
args {x-dead-letter-exchange: dlx_exchange,x-dead-letter-routing-key: dlx_key,x-message-ttl: 10000
}
channel.queue_declare(queuedelay_queue, argumentsargs)# 发送消息到延迟队列
channel.basic_publish(exchange, routing_keydelay_queue, bodyHello, RabbitMQ!)# 关闭连接
connection.close()优先级队列可以为消息设置优先级高优先级的消息会被先消费。默认情况下RabbitMQ不支持优先级队列需要安装插件rabbitmq-priority-queue。
import pika# 创建连接和通道
connection pika.BlockingConnection(pika.ConnectionParameters(localhost))
channel connection.channel()# 声明插件
channel.exchange_declare(exchange_typex-delayed-message, exchangedelayed_exchange, arguments{x-delayed-type: direct
})# 声明队列
channel.queue_declare(queuepriority_queue, arguments{x-max-priority: 10 # 设置队列的优先级
})# 绑定队列和交换机
channel.queue_bind(queuepriority_queue, exchangedelayed_exchange, routing_keypriority_key)# 发送消息时设置优先级为1
channel.basic_publish(exchangedelayed_exchange, routing_keypriority_key, bodyHello, RabbitMQ!, propertiespika.BasicProperties(priority1))# 关闭连接
connection.close()RPCRemote Procedure Call允许客户端发送请求消息到服务器端服务器端处理请求并返回结果消息给客户端。RabbitMQ提供了一个实现RPC的示例。
import pika# 创建连接和通道
connection pika.BlockingConnection(pika.ConnectionParameters(localhost))
channel connection.channel()# 声明队列
channel.queue_declare(queuerpc_queue)# 接收到请求时的回调函数
def on_request(ch, method, props, body):n int(body)# 处理请求response fibonacci(n)# 返回结果ch.basic_publish(exchange, routing_keyprops.reply_to,propertiespika.BasicProperties(correlation_idprops.correlation_id),bodystr(response))ch.basic_ack(delivery_tagmethod.delivery_tag)# 定义斐波那契数列函数
def fibonacci(n):if n 0:return 0elif n 1:return 1else:return fibonacci(n - 1) fibonacci(n - 2)# 监听队列
channel.basic_qos(prefetch_count1)
channel.basic_consume(queuerpc_queue, on_message_callbackon_request)# 开始监听
channel.start_consuming()示例代码
import pika
import uuid# 创建连接和通道
connection pika.BlockingConnection(pika.ConnectionParameters(localhost))
channel connection.channel()# 声明队列
channel.queue_declare(queuerpc_queue) 在消费端消息确认Message Acknowledgement是一种机制用于确保消息成功处理并从队列中删除。RabbitMQ提供了两种消息确认模式自动确认模式和手动确认模式。在自动确认模式下消费者在收到消息后立即确认在手动确认模式下消费者需要明确地发送确认消息给RabbitMQ以告诉它该消息已被正确处理。手动确认模式可以确保消息不会在消费者崩溃或网络故障时丢失。