网站cn和com有什么区别,自动登录网站的小程序,短租网站那家做的好处,网站建设类型报价表RabbitMQ 是一个 消息队列中间件#xff08;Message Broker#xff09;#xff0c;实现了 AMQP 协议#xff0c;常用于服务之间解耦、异步处理、流量削峰等场景。
我帮你分成两个部分来讲#xff1a;核心原理 常见用法。#x1f9e9; 一、核心原理
RabbitMQ 的核心是 生…RabbitMQ 是一个 消息队列中间件Message Broker实现了 AMQP 协议常用于服务之间解耦、异步处理、流量削峰等场景。
我帮你分成两个部分来讲核心原理 常见用法。一、核心原理
RabbitMQ 的核心是 生产者Producer → 交换机Exchange → 队列Queue → 消费者Consumer 这一条链路。
1. 基础概念
Producer消息的发送方。Consumer消息的接收方。Queue存储消息的队列。Exchange交换机接收生产者的消息按照规则路由到队列。Binding交换机和队列之间的绑定规则。Routing Key消息的“路由标识”交换机根据它决定发给哪个队列。BrokerRabbitMQ 服务本身。
2. 交换机类型Exchange Types
RabbitMQ 支持几种路由模式
direct按 routing key 精确匹配把消息路由到对应队列。fanout广播模式消息发送到所有绑定的队列。topic模糊匹配路由如 order.*支持通配符。headers根据消息头部的键值对匹配较少用。
3. ACK 消息可靠性
ACK消费者处理完消息后确认acknowledge。未 ACKRabbitMQ 会重新投递消息保证至少一次投递。持久化队列持久化 消息持久化确保 RabbitMQ 重启后消息不丢失。死信队列DLX处理无法投递或过期的消息。
4. 工作模式
简单队列Simple Queue一个生产者 → 一个消费者。工作队列Work Queue一个生产者 → 多个消费者任务分摊。发布订阅Pub/Sub广播消息fanout exchange。路由模式Routing按 key 精确路由direct exchange。主题模式Topic模糊匹配路由topic exchange。️ 二、常见用法Python 示例
安装依赖
pip install pika1. 简单队列
生产者
import pikaconnection pika.BlockingConnection(pika.ConnectionParameters(localhost))
channel connection.channel()channel.queue_declare(queuehello)channel.basic_publish(exchange, routing_keyhello, bodyHello World!)
print( [x] Sent Hello World!)
connection.close()消费者
import pikadef callback(ch, method, properties, body):print(f [x] Received {body})connection pika.BlockingConnection(pika.ConnectionParameters(localhost))
channel connection.channel()channel.queue_declare(queuehello)channel.basic_consume(queuehello, on_message_callbackcallback, auto_ackTrue)print( [*] Waiting for messages. To exit press CTRLC)
channel.start_consuming()2. 工作队列任务分摊
一个生产者发送很多任务。多个消费者并发消费每个消息只会被一个消费者处理。常用在 后台任务处理。
设置 prefetch_count1 可以让 RabbitMQ 公平分发任务
channel.basic_qos(prefetch_count1)3. 发布/订阅fanout
生产者
channel.exchange_declare(exchangelogs, exchange_typefanout)
channel.basic_publish(exchangelogs, routing_key, bodylog message)消费者
channel.exchange_declare(exchangelogs, exchange_typefanout)
result channel.queue_declare(queue, exclusiveTrue)
queue_name result.method.queuechannel.queue_bind(exchangelogs, queuequeue_name)这样消费者会自动生成临时队列接收广播。4. 路由模式direct
生产者
channel.exchange_declare(exchangedirect_logs, exchange_typedirect)
channel.basic_publish(exchangedirect_logs, routing_keyerror, bodyError log)消费者
channel.exchange_declare(exchangedirect_logs, exchange_typedirect)
queue_name channel.queue_declare(queue, exclusiveTrue).method.queue
channel.queue_bind(exchangedirect_logs, queuequeue_name, routing_keyerror)总结
RabbitMQ 生产者 → 交换机 → 队列 → 消费者。交换机类型决定了 消息的路由方式direct、fanout、topic、headers。可靠性依赖 ACK 持久化 死信队列。常见场景异步任务、削峰填谷、解耦微服务、日志广播。
好问题 这几个是 RabbitMQ 保证消息可靠性 的关键机制。我帮你逐个拆开讲1. ACK消息确认机制
默认行为消费者从队列里拿到消息后RabbitMQ 就认为它“已消费”会立即从队列里删除。风险如果消费者拿到消息后宕机/异常消息就丢了。ACK 就是解决这个问题的机制自动 ACK (auto_ackTrue)
一旦消费者收到消息就立刻确认哪怕还没处理完。风险消费者挂了消息丢失。手动 ACK (auto_ackFalse)推荐
消费者处理完任务后再调用 channel.basic_ack() 确认。如果消费者挂了RabbitMQ 会把消息重新投递给别的消费者。例子
def callback(ch, method, properties, body):print(处理消息:, body)# 处理完成后手动确认ch.basic_ack(delivery_tagmethod.delivery_tag)channel.basic_consume(queuetask_queue, on_message_callbackcallback, auto_ackFalse)作用确保消息至少被处理一次不会因为消费者挂掉而丢失。2. 持久化Persistence
RabbitMQ 的数据默认存在内存里服务一旦重启消息就没了。持久化保证 RabbitMQ 重启后消息不丢。
持久化分三层队列持久化声明时加 durableTrue
channel.queue_declare(queuetask_queue, durableTrue)→ RabbitMQ 重启后这个队列还在。消息持久化生产者发送时设置 delivery_mode2
channel.basic_publish(exchange,routing_keytask_queue,bodyHello,propertiespika.BasicProperties(delivery_mode2, # 2 表示持久化消息))→ RabbitMQ 重启后消息仍然在队列里。交换机持久化声明时加 durableTrue。作用保证即使 RabbitMQ 崩溃或重启消息不会丢失。3. 死信队列Dead Letter Queue, DLQ
当某些消息 无法被正常消费 时RabbitMQ 可以把它们转移到另一个队列里死信队列避免消息丢失。
死信队列触发的几种情况
消费者 拒绝消息nack/reject 且 requeueFalse。消息在队列里 过期TTL 超时。队列满了无法再接收新消息。配置死信队列的方法
args {x-dead-letter-exchange: dlx_exchange, # 指定死信交换机x-dead-letter-routing-key: dlx_key # 指定路由 key
}
channel.queue_declare(queuetask_queue, durableTrue, argumentsargs)然后消息会被转发到 死信队列便于后续人工排查或重试。作用防止消息丢失 提供兜底处理机制。总结
ACK保证消费者挂掉时消息不会丢至少投递一次。持久化保证 RabbitMQ 崩溃/重启时消息不会丢。死信队列保证异常消息有去处过期/拒绝/无法投递。
这三个机制配合起来RabbitMQ 就能实现 高可靠消息传递。
好问题 RabbitMQ 里的 队列满了或者说消息堆积过多是一个常见的情况处理思路分两类1. 队列为什么会满
队列本质上是内存磁盘结构如果消费者消费不过来就会导致消息积压。几种常见原因
消费者处理能力不足速度比不上生产者。没有限制队列长度消息无限堆积。消费者挂掉了没人消费。某些消息过大占满内存/磁盘。2. RabbitMQ 的应对机制
(1) 设置队列最大长度/容量防止无限堆积
channel.queue_declare(queuetask_queue,durableTrue,arguments{x-max-length: 1000, # 最大消息数x-max-length-bytes: 10485760 # 最大字节数 (10MB)}
)超过限制后旧消息会被丢弃FIFO或者转发到死信队列推荐。(2) 配置死信队列DLQ
当队列满了时新来的消息可以自动进入死信队列
channel.queue_declare(queuetask_queue,durableTrue,arguments{x-max-length: 1000,x-dead-letter-exchange: dlx_exchange,x-dead-letter-routing-key: dlx_key}
)新消息进不来时直接进入 DLQ避免消息丢失。(3) 限流QoS
消费者可以设置一次最多处理多少条消息避免被“压垮”
channel.basic_qos(prefetch_count1) # 一次只取 1 条处理完再取这样 RabbitMQ 会 公平调度不会把大量消息推给一个消费者。(4) 水平扩展消费者
如果是消费能力不足最直接的办法就是多开几个消费者。
RabbitMQ 会按照 Round Robin轮询 或 公平分发 把消息分配下去。(5) 生产端限流 / 拒绝
RabbitMQ 本身不对生产者限流但你可以在应用层做
使用 发布确认Publisher Confirms如果消息积压可以选择暂停生产。用 消息速率控制Rate Limit比如令牌桶算法减缓生产速度。3. 总结
当队列满了可以这样处理
预防堆积 → 设置 x-max-length / x-max-length-bytes。兜底方案 → 配置死信队列把溢出的消息转移出来。消费优化 → basic_qos 增加消费者实例。生产端调节 → 启用发布确认动态调整生产速度。最佳实践
设置合理的队列长度 消息 TTL。配死信队列确保不会无声丢失。消费端横向扩展必要时加缓存层Kafka 更适合高吞吐