广州做大型网站建设,购物网站建设特色,辽阳做网站的公司,建设手机网站的公司Reids消息队列#xff08;Message Queue#xff09;
消息队列 是指利用 高效可靠 的 消息传递机制 进行与平台无关的 数据交流#xff0c;并基于数据通信来进行分布式系统的集成。 消息队列具有 低耦合、可靠投递、广播、流量控制、最终一致性 等功能。 常见的消息队列 有 …Reids消息队列Message Queue
消息队列 是指利用 高效可靠 的 消息传递机制 进行与平台无关的 数据交流并基于数据通信来进行分布式系统的集成。 消息队列具有 低耦合、可靠投递、广播、流量控制、最终一致性 等功能。 常见的消息队列 有 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ 等。 通过提供 消息传递 和 消息排队 模型它可以在 分布式环境 下提供 应用解耦、弹性伸缩、冗余存储、流量削峰、异步通信、数据同步 等功能。
基于List结构的模拟消息队列
Redis的list数据结构是一个双向链表使用出队入队即可实现消息队列。
LPUSH 、RPOP 或 RPUSH 、 LPOP
LPUSH将一个或多个值value插入到列表key的表头如果有多个value值那么各个value值按从左到右的顺序依次插入到表头。 RPOP移除并返回列表key的尾元素。 RPUSH与LPOP同理。 通过 LPUSHRPOP 这样的方式会存在一个性能风险点就是消费者如果想要及时的处理数据就要在程序中写个类似 while(true) 这样的逻辑不停地去调用 RPOP 或 LPOP 命令这就会给消费者程序带来些不必要的性能损失。
LPUSH、BRPOP 或 RPUSH、BLPOP
Redis 还提供了 BLPOP、BRPOP 这种阻塞式读取的命令带 B-Bloking的都是阻塞式客户端在没有读到队列数据时自动阻塞直到有新的数据写入队列再开始读取新数据。这种方式就节省了不必要的 CPU 开销。
数据存入格式lpush listname v1 v2 v3 v 表示存入链表的值 阻塞等待指令格式blpop list_name timeout listname 为 取出内容的列表名 timeout为等待超时时间如果为0则可无限等待 优点
利用Redis存储不受限于JVM内存基于Redis的持久化机制数据安全性有保证可以满足消息有序性
缺点
无法避免消息丢失从redis中取出消息后如果尚未处理完出现异常取出的消息就丢失了只支持单消费者一条消息被取走后其他消费者无法再获取。
基于PubSub的消息队列
发布/订阅模式包含两种角色分别是发布者和订阅者。订阅者可以订阅一个或者多个频道(channel)而发布者可以向指定的频道(channel)发送消息所有订阅此频道的订阅者都会收到此消息。
常用的指令 subscribe channel1 [channel...] 订阅一个或多个频道 publish channel1 msg向一个频道发送消息 psubscribe pattern [pattern]订阅与pattern格式匹配的所有频道 pattern通配符 h?llo?可以替换为任意一个其他字母比如hello而hllo和hkkllo不行 hllo 可以替换为0个或多个其他字母比如hllo、hello、heeeello h[ae]llo可以替换为任意一个中括号中的字母比如hallo、hello而hillo不行 优点
采用发布订阅模型支持多生产、多消费
缺点
不支持数据持久化如果出现网络断开、Redis 宕机等消息就会被丢弃。假设一个消费者都没有那消息就直接被丢弃了。无法避免消息丢失。消息堆积有上限、超出时数据丢失。
基于Stream的消息队列——单消费方式
Redis 5.0 版本新增了一个更强大的数据结构——Stream。它提供了消息的持久化和主备复制功能可以让任何客户端访问任何时刻的数据并且能记住每一个客户端的访问位置还能保证消息不丢失。
存入消息的方式之一_XADD
命令格式XADD key [NOMKSTREAM] [MAXLEN | MINID [ | ~] threshold [LIMIT count]] * | id field value [field value ...] NOMKSTREAM:如果队列不存在是否自动创建队列默认自动创建 MAXLEN | MINID [ | ~] threshold [LIMIT count] 设置消息队列的最大消息数量 * | id 消息的唯一id* 代表由redis自动生成。* 格式是时间戳-递增数字例如1644804662707-0 field value [field value …]发送到队列中的消息队列称为Entry。格式就是多个key-value键值对。 示例
# 创建名为 users 的队列并向其中发送一个消息内容是{namejack,age21},并且使用Redis自动生成id
XADD users * name jack age 21读取消息的方式之一——XREAD
命令格式XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...] [COUNT count]每次读取消息的最大数量 [BLOCK milliseconds]当没有消息时是否阻塞、阻塞时长 STREAMS key [key …]要从哪个队列读取消息key就是队列名 起始id只返回大于该id的消息0代表从第一个消息开始$代表从最新的消息开始 读取最新的消息示例
XREAD COUNT 1 BLOCK 1000 STREAMS users $优点
消息可回溯一个消息可被多个消费者读取可以阻塞读取
缺点
有消息漏读风险
基于Stream的消息队列——消费者组
消费者组将多个消费者划分到一个组里监听同一个队列。
特点
消费分流队列中的消息会分流给组内的不同消费者而不是重复消费加快消息的处理速度。消息标识消费者组会维护一个标示记录组内最后一个被处理的消息哪怕消费者宕机重启还是能够从标示之后读取消息确保每一个消息都被消费。消息确认消费者获取消息之后消息处于pending状态并存入一个pending-list。当处理完后需要XACK来确认消息标记消息已处理之后才会从pending-list移除。
创建消费者组
XGROUP CREATE key groupName ID [MKSTREAM]key:队列名称 groupName:消费者组名称 ID起始ID标示$代表队列中最后一个消息0代表队列中第一个消息 MKSTREAM队列不存在时自动创建队列 其他对应指令
# 删除指定的消费者组
XGROUP DESTROY key group# 给指定的消费者组添加消费者
XGROUP CREATECONSUMER key group consumer# 删除消费者组中的指定消费者
XGROUP DELCONSUMER key group consumer从消费者组读取消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds][NOACK] STREAMS key [key ...] id [id ...]group消费者组名称 consumer消费者名称如果消费者不存在会自动创建一个消费者 count本次查询的最大数量 BLOCK milliseconds当前消息等待最大时长 NOACK无需手动ACK获取消息后自动确认 STREAMS key指定监听一个或多个队列名称 ID获取消息的起始ID “ ” 从下一个未消费的消息开始其他根据指定ID从pending-list中获取一个消费但未确认的消息例如0是从pending-list中第一个消息开始 确认消息
XACK key group id [id ...]key消息队列名称 group组名 id确认的消息ID 查看未确认的消息
XPENDING key group [[IDLE min-idle-time] start end count [consumer]]总结
如果业务要求较高可以考虑使用更加专业的 Kafka、RocketMQ、RabbitMQ。