华泰保险公司官方网站电话,ps怎么在dw上做网站,看视频的app有哪些,网站分站代理加盟1. RabbitMQ简介
RabbitMQ是一个可靠、高效的开源消息代理服务器#xff0c;基于AMQP协议。它具备以下特点#xff1a;
可以支持多种消息协议#xff0c;如AMQP、STOMP和MQTT等。提供了持久化、可靠性和灵活的路由等功能。支持消息的发布和订阅模式。具备高可用性和可扩展…1. RabbitMQ简介
RabbitMQ是一个可靠、高效的开源消息代理服务器基于AMQP协议。它具备以下特点
可以支持多种消息协议如AMQP、STOMP和MQTT等。提供了持久化、可靠性和灵活的路由等功能。支持消息的发布和订阅模式。具备高可用性和可扩展性。
RabbiMQ的核心概念包括生产者、消费者、队列、交换机和绑定。生产者将消息发送到交换机交换机根据其类型和绑定规则将消息路由到队列然后消费者从队列中获取消息进行处理。
RabbitMQ相关概念
Broker接收和分发消息的应用RabbitMQ Server就是Message Broker。Virtual host出于多租户和安全因素的设计把AMQP的基本组件划分到一个虚拟的分组中类似于网络中的namespace概念当多个不同的用户使用同一个RabbitMQ Server提供的服务时可以划分出多个vhost每个用户在自己的vhost创建exchange/queue等。Connectionpublisher/consumer和broker之间的TCP连接。Channel如果每一次访问RabbitMQ都建立一个Connection在消息量大的时候建立TCP Connection的开销都将是巨大的效率也是非常低的。Channel是在Connection内部建立的逻辑连接如果应用程序支持多线程通常每个thread会创建单独的Channel进行通信AMQP的method包含了channel id帮助客户端和message broker识别channel所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP连接的开销。
相关术语
producer生产者向队列中发送消息的程序。在图表中通常使用P表示queue队列用于存储消息定义在RabbitMQ内部queue本质上是一个消息缓存buffer生产者可以往里发送消息消费者也可以从里面获取消息。在图表中通常使用Q表示consumer消费者等待并从消息队列中获取消息的程序。在图表中通常使用C表示exchange交换机用于将producer发送来的消息发送到queue事实上producer是不能直接将message发送到queue必须先发送到exchange再由exchange发送到queue。
注生产者和消费者可能在不同的程序或主机中当然也有可能一个程序有可能既是生产者也是消费者。
2. pika简介
在Python中pika是一个用于处理RabbitMQ消息队列的第三方库它允许开发者在Python应用程序中发送和接收消息实现应用程序之间的异步通信。
主要功能
连接管理pika提供了与RabbitMQ服务器建立连接的功能。通道管理通过连接可以创建多个通道channel每个通道代表一个独立的通信流。消息发送与接收开发者可以使用pika发送消息到指定的队列queue并从队列中接收消息。交换机与队列声明支持声明交换机exchange、队列以及它们之间的绑定binding关系。消息确认支持消息的自动确认auto-ack或手动确认manual ack以确保消息的可靠传递。
使用流程
创建连接使用pika.BlockingConnection或pika.SelectConnection等类创建与RabbitMQ服务器的连接。创建通道通过连接对象的channel()方法创建通道。声明交换机与队列使用通道对象的exchange_declare()和queue_declare()方法声明交换机和队列。绑定交换机与队列使用通道对象的queue_bind()方法将队列绑定到交换机。发送消息使用通道对象的basic_publish()方法发送消息到指定的交换机和路由键routing key。接收消息使用通道对象的basic_consume()方法开始消费队列中的消息并通过回调函数处理接收到的消息。关闭连接在不再需要时使用连接对象的close()方法关闭连接。
安装pika
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pika
3. pika应用入门
3.1. 生产者
import pika# 1.连接rabbit
credentials pika.PlainCredentials(rabbit, *****) # rabbit用户名和密码
connection pika.BlockingConnection(pika.ConnectionParameters(192.168.17.61,port 5671,virtual_host /typc-fpd-dev,credentials credentials))
channel connection.channel()# 2.创建持久化队列
# 注意非持久化队列不能变持久化队列反之也是这样的所有创建队列中不能创建和非持久化队列重名的队列
channel.queue_declare(queuehello_world, durableTrue)# 3.向指定队列插入数据
poiid xxxxxx
channel.basic_publish(exchange, # 简单模式routing_keyhello_world, # 指定队列bodypoiid, # 向队列中添加的数据propertiespika.BasicProperties(delivery_mode2, # make message persistent))
print( [x] Sent Hello World!) 查看虚拟主机virtual-host: /typc-fpd-dev下队列hello_world。
3.2. 侦听消费者
import pika# 1.连接rabbit
credentials pika.PlainCredentials(rabbit, *****) # rabbit用户名和密码
connection pika.BlockingConnection(pika.ConnectionParameters(192.168.17.61,port 5671,virtual_host /typc-fpd-dev,credentials credentials))# 2.创建持久化队列
# 注意非持久化队列不能变持久化队列反之也是这样的所有创建队列中不能创建和非持久化队列重名的队列
# 注意这一步不是必须的但是如果消费者先启动而不是生成者先启动时这时队列中还没有hello_world队列这时就会报错
channel.queue_declare(queuehello_world, durableTrue)# 3.确定回调函数
def callback(ch, method, properties, body):print( [x] Received %r % body)# 手动应答poiid body.decode(utf-8) # 将 bytes 转换为字符串 Core.setPIO(poiid) # 输入数据Core.task_process() # 处理数据ch.basic_ack(delivery_tagmethod.delivery_tag)# 4.确定监听队列参数
channel.basic_consume(queuehello_world, # 指定队列auto_ackFalse, # 手动应答方式on_message_callbackcallback)print( [*] Waiting for messages. To exit press CTRLC)# 5.正式监听
channel.start_consuming()3.3. 主动处理消费消息
在Pika中basic_get方法确实可以用于从队列中直接获取消息但通常不推荐在生产环境中使用因为它不是高效的消息处理方式。不过如果你确实需要这种方法以下是如何使用basic_get的示例
# 1.连接rabbit
credentials pika.PlainCredentials(rabbit, *****) # rabbit用户名和密码
connection pika.BlockingConnection(pika.ConnectionParameters(192.168.17.61,port 5671,virtual_host /typc-fpd-dev,credentials credentials))
channel connection.channel()time.sleep(1)# 2.创建持久化队列
# 注意非持久化队列不能变持久化队列反之也是这样的所有创建队列中不能创建和非持久化队列重名的队列
# 注意这一步不是必须的但是如果消费者先启动而不是生成者先启动时这时队列中还没有hello2队列这时就会报错
channel.queue_declare(queuehello_world, durableTrue)
count 5
for i in range(count): print(取消息开始时间) method_frame, header_frame, body channel.basic_get(queuehello_world, auto_ackFalse) if method_frame: # 处理消息体 print(body:,body) poiid body.decode(utf-8) # 将 bytes 转换为字符串 Core.setPIO(poiid)Core.task_process()time.sleep(2) # 如果你设置了auto_ackFalse则需要手动确认消息 channel.basic_ack(delivery_tagmethod_frame.delivery_tag) else: print(没有消息可以获取,, str(i))time.sleep(1)
print(取消息完成时间)
connection.close()#来关闭连接 参考
三只松鼠. python 操作RabbitMq详解. 博客园. 2019.03
卫玠_juncheng. Python三方库PikaRabbitMQ基础使用. CSDN博客. 2024.03