企业酒店的网站建设,和县网站定制,互动平台网站,校园 网站建设 知乎Manjaro安装RabbitMQ
安装
sudo pacman -S rabbitmq rabbitmqadmin启动管理模块
sudo rabbitmq-plugins enable rabbitmq_managementsudo rabbitmq-server管理界面 http://127.0.0.1:15672/ 默认用户名和密码都是guest。 要使用 rabbitmqctl 命令添加用户并分配权限#xf…Manjaro安装RabbitMQ
安装
sudo pacman -S rabbitmq rabbitmqadmin启动管理模块
sudo rabbitmq-plugins enable rabbitmq_managementsudo rabbitmq-server管理界面 http://127.0.0.1:15672/ 默认用户名和密码都是guest。 要使用 rabbitmqctl 命令添加用户并分配权限您可以按照以下步骤进行操作
添加用户
rabbitmqctl add_user mingcai password请将 password 替换为您想要设置的实际密码。
分配权限
rabbitmqctl set_permissions -p / mingcai .* .* .*这个命令将用户 mingcai 授予对所有虚拟主机的所有资源的读、写和管理权限。如果您只想给予特定权限请适当调整正则表达式 .*以授予适当的权限。例如如果您只想给予读取权限可以使用 ^amq\.。
可选步骤设置用户角色
您可以将用户分配给不同的角色以便更好地管理权限。例如您可以将用户添加到 administrator 角色以获取管理员权限
rabbitmqctl set_user_tags mingcai administrator这样用户 mingcai 就被赋予了管理员权限。
请确保您具有适当的权限来执行这些操作并确保替换示例中的用户名和密码为您自己的实际值。
死信队列 标题利用RabbitMQ死信队列处理消息的三种情况
在消息队列的应用中处理异常情况和消息的延迟成为了一项重要的任务。RabbitMQ作为一款流行的消息队列服务提供了死信队列Dead Letter Exchange功能能够有效地处理消息被拒绝、消息过期以及队列达到最大长度等情况。本文将介绍如何利用RabbitMQ的死信队列来处理这三种情况并提供了TypeScript示例代码。
1. 消息被拒绝
当消费者无法处理某条消息时可以选择将其标记为“被拒绝”。这种情况下我们可以配置RabbitMQ将被拒绝的消息发送到一个死信队列以后再处理。
// 引入amqplib库
import * as amqp from amqplib;// 连接到RabbitMQ服务器
const connection await amqp.connect(amqp://localhost);// 创建Channel
const channel await connection.createChannel();// 定义队列
const queueName my_queue;
await channel.assertQueue(queueName, {// 设置死信交换机deadLetterExchange: my_dead_letter_exchange
});// 消费消息
channel.consume(queueName, (msg) {// 处理消息if (msg) {// 处理失败拒绝消息并将其重新放回队列// channel.reject(msg, true); // 第二个参数设为 true 表示将消息重新放回队列// 处理失败拒绝消息channel.reject(msg, false); // 第二个参数设为 false 表示将消息投递到死信队列// or 处理失败拒绝消息并将其重新放回死信队列channel.nack(msg, false, false); // 第二个参数设为 false 表示不将消息重新放回原队列第三个参数设为 false 表示不拒绝当前和之前所有未确认的消息}
});2. 消息过期
有时候我们希望消息在一定时间内被处理如果超过了这个时间就认为消息已经过期。RabbitMQ允许我们设置消息的过期时间并在消息过期后将其发送到死信队列。
// 发布消息
await channel.sendToQueue(queueName, Buffer.from(Hello), {expiration: 60000 // 设置过期时间为60秒
});3. 队列达到最大长度
为了避免队列过载我们可以限制队列的最大长度。当队列达到最大长度时新的消息将被拒绝并发送到死信队列。
// 定义队列
await channel.assertQueue(queueName, {maxLength: 100, // 设置最大队列长度为100deadLetterExchange: my_dead_letter_exchange
});通过以上配置我们可以利用RabbitMQ的死信队列来处理消息被拒绝、消息过期以及队列达到最大长度等情况保证消息系统的稳定性和可靠性。
以上是利用TypeScript示例代码演示了如何在RabbitMQ中使用死信队列。希望这篇文章对你有所帮助
延时队列
什么是延时队列?顾名思义首先它要具有队列的特性再给它附加一个延迟消费队列消息的功能也就是说可以指定队列中的消息在哪个时间点被消费。 延时队列在项目中的应用还是比较多的尤其像电商类平台 1、订单成功后在30分钟内没有支付自动取消订单 2、外卖平台发送订餐通知下单成功后60s给用户推送短信。 3、如果订单一直处于某一个未完结状态时及时处理关单并退还库存 4、淘宝新建商户一个月内还没上传商品信息将冻结商铺等 npm install amqplib --save
npm install types/amqplib --save-dev总结 rabbitmqadmin 使用入门
rabbitmqadmin 是 RabbitMQ 的命令行管理工具可以用于执行各种管理任务如创建队列、交换机查看队列状态等。以下是一些基本的用法示例
export RABBITMQ_SERVER127.0.0.1
export RABBITMQ_PORT5672
export RABBITMQ_USERmingcai
export RABBITMQ_PASSWORDpasswordrabbitmqadmin list exchanges查看 RabbitMQ 服务器信息
rabbitmqadmin status列出所有交换机
rabbitmqadmin list exchanges列出所有队列
rabbitmqadmin list queues创建一个交换机
rabbitmqadmin declare exchange namemy_exchange typedirect创建一个队列
rabbitmqadmin declare queue namemy_queue绑定队列到交换机
rabbitmqadmin declare binding sourcemy_exchange destinationmy_queue routing_keymy_routing_key发送消息到指定交换机
rabbitmqadmin publish exchangemy_exchange routing_keymy_routing_key payloadHello, RabbitMQ!获取队列消息
rabbitmqadmin get queuemy_queue这些命令只是一些基本用法示例rabbitmqadmin 工具支持更多功能和选项。你可以通过运行 rabbitmqadmin help 命令来获取更详细的帮助信息或者查看官方文档以了解更多选项和使用方法。
延时3秒和8秒全部代码
// delayProducer.ts
import * as amqp from amqplib;async function setupRouting() {const connection await amqp.connect(amqp://mingcai:password127.0.0.1);const channel await connection.createChannel();const exchange routing_exchange;// 定义 dlx-exchangeconst dlxExchangeName dlx-exchange;// 声明交换机await channel.assertExchange(exchange, direct, {durable: true});await channel.assertExchange(dlxExchangeName, direct, { durable: true });//消息防止丢失const dlxqueueBindings [{dlxQueueName: dlx-3_second_queue, routingKey: fast,},{dlxQueueName: dlx-8_second_queue, routingKey: slow}];for (const binding of dlxqueueBindings) {// 绑定延迟死信队列await channel.assertQueue(binding.dlxQueueName );//死信交换机和死信队列绑定 Routing key fast 的消息await channel.bindQueue(binding.dlxQueueName, dlxExchangeName, binding.routingKey); // 将 dlx-queue 绑定到死信交换机}// 定义队列和路由键的映射const queueBindings [{queue: 3_second_queue, routingKey: fast, arguments: {x-message-ttl: 3000, // TTL 设置为 3 秒 消息被拒绝或过期时将重新发布到的交换器的可选名称。x-dead-letter-exchange: dlx-exchange//消息被拒绝或过期时将重新发布到的交换器的可选名称}},{queue: 8_second_queue, routingKey: slow, arguments: {x-message-ttl: 8000, // TTL 设置为 8 秒 消息被拒绝或过期时将重新发布到的交换器的可选名称。x-dead-letter-exchange: dlx-exchange//消息被拒绝或过期时将重新发布到的交换器的可选名称}}];// 声明队列并将队列绑定到交换机上for (const binding of queueBindings) {await channel.assertQueue(binding.queue, {durable: true, arguments: binding.arguments});await channel.bindQueue(binding.queue, exchange, binding.routingKey);}for (let i 0; i 10; i) {await new Promise((resolve) {setTimeout(() {resolve(1)}, 1000)})const chinaTime new Date().toLocaleString(zh-CN, { timeZone: Asia/Shanghai });console.log(当前中国时间, chinaTime);// 发送消息到交换机并设置不同的路由键await sendMessage(channel, exchange, fast, [${i}] ${chinaTime} Message for the fast queue);await sendMessage(channel, exchange, slow, [${i}] ${chinaTime} Message for the slow queue);}// 关闭连接setTimeout(async () {await channel.close();await connection.close();}, 10000); // 在 10 秒后关闭连接
}async function sendMessage(channel: amqp.Channel, exchange: string, routingKey: string, message: string) {channel.publish(exchange, routingKey, Buffer.from(message));console.log(Sent message ${message} with routing key ${routingKey});
}setupRouting().catch(console.error);
//消费者 dlx-3_second_queue.ts
import * as amqp from amqplib;async function setupRouting() {const connection await amqp.connect(amqp://mingcai:password127.0.0.1);const channel await connection.createChannel();let queue dlx-3_second_queue// 定义队列和路由键的映射await channel.consume(queue, (msg) {if (msg ! null) {const chinaTime new Date().toLocaleString(zh-CN, { timeZone: Asia/Shanghai });console.log(Received message ${chinaTime}${msg.content.toString()} from queue ${queue});channel.ack(msg); // 确认消息已被处理}});}setupRouting().catch(console.error);
//dlx-8_second_queue.ts
import * as amqp from amqplib;async function setupRouting() {const connection await amqp.connect(amqp://mingcai:password127.0.0.1);const channel await connection.createChannel();let queue dlx-8_second_queue// 定义队列和路由键的映射await channel.consume(queue, (msg) {if (msg ! null) {const chinaTime new Date().toLocaleString(zh-CN, { timeZone: Asia/Shanghai });console.log(Received message ${chinaTime}${msg.content.toString()} from queue ${queue});channel.ack(msg); // 确认消息已被处理}});}setupRouting().catch(console.error);