简单大方网站,个人简历网页设计,景观毕业设计作品网站,系统开发和网站开发现如今的互联网应用大都是采用 分布式系统架构 设计的#xff0c;所以 消息队列 已经逐渐成为企业应用系统 内部通信 的核心手段#xff0c;它具有 低耦合、可靠投递、广播、流量控制、最终一致性 等一系列功能。当前使用较多的 消息队列 有 RabbitMQ、RocketMQ、ActiveMQ、K… 现如今的互联网应用大都是采用 分布式系统架构 设计的所以 消息队列 已经逐渐成为企业应用系统 内部通信 的核心手段它具有 低耦合、可靠投递、广播、流量控制、最终一致性 等一系列功能。当前使用较多的 消息队列 有 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ 等而部分数据库 如 Redis、MySQL 以及 phxsql 如果硬搞的话其实也可实现消息队列的功能。可能有人觉得各种开源的 MQ 已经足够使用了为什么需要用 Redis 实现 MQ 呢有些简单的业务场景可能不需要重量级的 MQ 组件相比 Redis 来说Kafka 和 RabbitMQ 都算是重量级的消息队列那你有考虑过用 Redis 做消息队列吗这一章我会结合消息队列的特点和 Redis 做消息队列的使用方式以及实际项目中的使用来和大家探讨下 Redis 消息队列的方案。一、回顾消息队列消息队列 是指利用 高效可靠 的 消息传递机制 进行与平台无关的 数据交流并基于数据通信来进行分布式系统的集成。通过提供 消息传递 和 消息排队 模型它可以在 分布式环境 下提供 应用解耦、弹性伸缩、冗余存储、流量削峰、异步通信、数据同步 等等功能其作为 分布式系统架构 中的一个重要组件有着举足轻重的地位。mq_overview现在回顾下我们使用的消息队列一般都有什么样的特点三个角色生产者、消费者、消息处理中心异步处理模式生产者 将消息发送到一条 虚拟的通道消息队列上而无须等待响应。消费者 则 订阅 或是 监听 该通道取出消息。两者互不干扰甚至都不需要同时在线也就是我们说的 松耦合可靠性消息要可以保证不丢失、不重复消费、有时可能还需要顺序性的保证撇开我们常用的消息中间件不说你觉得 Redis 的哪些数据类型可以满足 MQ 的常规需求~~二、Redis 实现消息队列思来想去只有 List 和 Streams 两种数据类型可以实现消息队列的这些需求当然Redis 还提供了发布、订阅(pub/sub) 模式。我们逐一看下这 3 种方式的使用和场景。2.1 List 实现消息队列Redis 列表是简单的字符串列表按照插入顺序排序。你可以添加一个元素到列表的头部左边或者尾部右边。所以常用来做异步队列使用。将需要延后处理的任务结构体序列化成字符串塞进 Redis 的列表另一个线程从这个列表中轮询数据进行处理。Redis 提供了好几对 List 指令先大概看下这些命令混个眼熟List 常用命令命令用法描述LPUSHLPUSH key value [value ...]将一个或多个值 value 插入到列表 key 的表头如果有多个 value 值那么各个 value 值按从左到右的顺序依次插入到表头RPUSHRPUSH key value [value ...]将一个或多个值 value 插入到列表 key 的表尾(最右边)LPOPLPOP key移除并返回列表 key 的头元素。BLPOPBLPOP key [key ...] timeout移出并获取列表的第一个元素 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止RPOPRPOP key移除并返回列表 key 的尾元素。BRPOPBRPOP key [key ...] timeout移出并获取列表的最后一个元素 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。BRPOPLPUSHBRPOPLPUSH source destination timeout从列表中弹出一个值将弹出的元素插入到另外一个列表中并返回它如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。RPOPLPUSHRPOPLPUSH source destinationb命令 RPOPLPUSH 在一个原子时间内执行以下两个动作将列表 source 中的最后一个元素(尾元素)弹出并返回给客户端。将 source 弹出的元素插入到列表 destination 作为 destination 列表的的头元素LLENLLEN key返回列表 key 的长度。如果 key 不存在则 key 被解释为一个空列表返回 0 .如果 key 不是列表类型返回一个错误LRANGELRANGE key start stop返回列表 key 中指定区间内的元素区间以偏移量 start 和 stop 指定挑几个弹入、弹出的命令就可以组合出很多姿势LPUSH、RPOP 左进右出RPUSH、LPOP 右进左出127.0.0.1:6379 lpush mylist a a b c d e
(integer) 6
127.0.0.1:6379 rpop mylist
a
127.0.0.1:6379 rpop mylist
a
127.0.0.1:6379 rpop mylist
b
127.0.0.1:6379
redis-RPOP即时消费问题通过 LPUSHRPOP 这样的方式会存在一个性能风险点就是消费者如果想要及时的处理数据就要在程序中写个类似 while(true) 这样的逻辑不停的去调用 RPOP 或 LPOP 命令这就会给消费者程序带来些不必要的性能损失。所以Redis 还提供了 BLPOP、BRPOP 这种阻塞式读取的命令带 B-Bloking的都是阻塞式客户端在没有读到队列数据时自动阻塞直到有新的数据写入队列再开始读取新数据。这种方式就节省了不必要的 CPU 开销。LPUSH、BRPOP 左进右阻塞出RPUSH、BLPOP 右进左阻塞出127.0.0.1:6379 lpush yourlist a b c d
(integer) 4
127.0.0.1:6379 blpop yourlist 10
1) yourlist
2) d
127.0.0.1:6379 blpop yourlist 10
1) yourlist
2) c
127.0.0.1:6379 blpop yourlist 10
1) yourlist
2) b
127.0.0.1:6379 blpop yourlist 10
1) yourlist
2) a
127.0.0.1:6379 blpop yourlist 10
(nil)
(10.02s)
如果将超时时间设置为 0 时即可无限等待直到弹出消息因为 Redis 单线程的特点所以在消费数据时同一个消息会不会同时被多个 consumer 消费掉但是需要我们考虑消费不成功的情况。可靠队列模式 | ack 机制以上方式中 List 队列中的消息一经发送出去便从队列里删除。如果由于网络原因消费者没有收到消息或者消费者在处理这条消息的过程中崩溃了就再也无法还原出这条消息。究其原因就是缺少消息确认机制。为了保证消息的可靠性消息队列都会有完善的消息确认机制Acknowledge即消费者向队列报告消息已收到或已处理的机制。Redis List 怎么搞一搞呢再看上边的表格中有两个命令 RPOPLPUSH、BRPOPLPUSH 阻塞从一个 list 中获取消息的同时把这条消息复制到另一个 list 里可以当做备份而且这个过程是原子的。这样我们就可以在业务流程安全结束后再删除队列元素实现消息确认机制。127.0.0.1:6379 rpush myqueue one
(integer) 1
127.0.0.1:6379 rpush myqueue two
(integer) 2
127.0.0.1:6379 rpush myqueue three
(integer) 3
127.0.0.1:6379 rpoplpush myqueue queuebak
three
127.0.0.1:6379 lrange myqueue 0 -1
1) one
2) two
127.0.0.1:6379 lrange queuebak 0 -1
1) three
redis-rpoplpush之前做过的项目中就有用到这样的方式去处理数据数据标识从一个 List 取出后放入另一个 List业务操作安全执行完成后再去删除 List 中的数据如果有问题的话很好回滚。当然还有更特殊的场景可以通过 zset 来实现延时消息队列原理就是将消息加到 zset 结构后将要被消费的时间戳设置为对应的 score 即可只要业务数据不会是重复数据就 OK。2.2 订阅与发布实现消息队列我们都知道消息模型有两种点对点Point-to-Point(P2P)发布订阅Publish/Subscribe(Pub/Sub)List 实现方式其实就是点对点的模式下边我们再看下 Redis 的发布订阅模式消息多播这才是“根正苗红”的 Redis MQredis-pub_sub发布/订阅模式同样可以实现进程间的消息传递其原理如下:发布/订阅模式包含两种角色分别是发布者和订阅者。订阅者可以订阅一个或者多个频道(channel)而发布者可以向指定的频道(channel)发送消息所有订阅此频道的订阅者都会收到此消息。Redis 通过 PUBLISH 、 SUBSCRIBE 等命令实现了订阅与发布模式 这个功能提供两种信息机制 分别是订阅/发布到频道和订阅/发布到模式。这个 频道 和 模式 有什么区别呢频道我们可以先理解为是个 Redis 的 key 值而模式可以理解为是一个类似正则匹配的 Key只是个可以匹配给定模式的频道。这样就不需要显式的去订阅多个名称了可以通过模式订阅这种方式一次性关注多个频道。我们启动三个 Redis 客户端看下效果redis-subscribe先启动两个客户端订阅subscribe 名字叫 framework 的频道然后第三个客户端往 framework 发消息可以看到前两个客户端都会接收到对应的消息redis-publish我们可以看到订阅的客户端每次可以收到一个 3 个参数的消息分别为消息的种类始发频道的名称实际的消息再来看下订阅符合给定模式的频道这回订阅的命令是 PSUBSCRIBEredis-psubscribe我们往 java.framework 这个频道发送了一条消息不止订阅了该频道的 Consumer1 和 Consumer2 可以接收到消息订阅了模式 java.* 的 Consumer3 和 Consumer4 也可以接收到消息。redis-psubscribe1Pub/Sub 常用命令命令用法描述PSUBSCRIBEPSUBSCRIBE pattern [pattern ...]订阅一个或多个符合给定模式的频道PUBSUBPUBSUB subcommand [argument [argument ...]]查看订阅与发布系统状态PUBLISHPUBLISH channel message将信息发送到指定的频道PUNSUBSCRIBEPUNSUBSCRIBE [pattern [pattern ...]]退订所有给定模式的频道SUBSCRIBESUBSCRIBE channel [channel ...]订阅给定的一个或多个频道的信息UNSUBSCRIBEUNSUBSCRIBE [channel [channel ...]]指退订给定的频道2.3 Streams 实现消息队列Redis 发布订阅 (pub/sub) 有个缺点就是消息无法持久化如果出现网络断开、Redis 宕机等消息就会被丢弃。而且也没有 Ack 机制来保证数据的可靠性假设一个消费者都没有那消息就直接被丢弃了。后来 Redis 的父亲 Antirez又单独开启了一个叫 Disque 的项目来完善这些问题但是没有做起来github 的更新也定格在了 5 年前所以我们就不讨论了。Redis 5.0 版本新增了一个更强大的数据结构——Stream。它提供了消息的持久化和主备复制功能可以让任何客户端访问任何时刻的数据并且能记住每一个客户端的访问位置还能保证消息不丢失。它就像是个仅追加内容的消息链表把所有加入的消息都串起来每个消息都有一个唯一的 ID 和对应的内容。而且消息是持久化的。redis-stream每个 Stream 都有唯一的名称它就是 Redis 的 key在我们首次使用 xadd 指令追加消息时自动创建。Streams 是 Redis 专门为消息队列设计的数据类型所以提供了丰富的消息队列操作命令。Stream 常用命令描述用法添加消息到末尾保证有序可以自动生成唯一IDXADD key ID field value [field value ...]对流进行修剪限制长度XTRIM key MAXLEN [~] count删除消息XDEL key ID [ID ...]获取流包含的元素数量即消息长度XLEN key获取消息列表会自动过滤已经删除的消息XRANGE key start end [COUNT count]以阻塞或非阻塞方式获取消息列表XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]创建消费者组XGROUP [CREATE key groupname id-or-] [DESTROY key groupname] [DELCONSUMER key groupname consumername]读取消费者组中的消息XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]将消息标记为已处理XACK key group ID [ID ...]为消费者组设置新的最后递送消息IDXGROUP SETID [CREATE key groupname id-or-] [DESTROY key groupname]删除消费者XGROUP DELCONSUMER [CREATE key groupname id-or-] [DESTROY key groupname]删除消费者组XGROUP DESTROY [CREATE key groupname id-or-] [DESTROY key groupname] [DEL显示待处理消息的相关信息XPENDING key group [start end count] [consumer]查看流和消费者组的相关信息XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]打印流信息XINFO STREAM [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]CRUD 工程师上线增删改查来一波# * 号表示服务器自动生成 ID后面顺序跟着一堆 key/value
127.0.0.1:6379 xadd mystream * f1 v1 f2 v2 f3 v3
1609404470049-0 ## 生成的消息 ID有两部分组成毫秒时间戳-该毫秒内产生的第1条消息# 消息ID 必须要比上个 ID 大
127.0.0.1:6379 xadd mystream 123 f4 v4
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item# 自定义ID
127.0.0.1:6379 xadd mystream 1609404470049-1 f4 v4
1609404470049-1# -表示最小值 , 表示最大值,也可以指定最大消息ID或最小消息ID配合 -、 使用
127.0.0.1:6379 xrange mystream -
1) 1) 1609404470049-02) 1) f12) v13) f24) v25) f36) v3
2) 1) 1609404470049-12) 1) f42) v4127.0.0.1:6379 xdel mystream 1609404470049-1
(integer) 1
127.0.0.1:6379 xlen mystream
(integer) 1
# 删除整个 stream
127.0.0.1:6379 del mystream
(integer) 1
独立消费xread 以阻塞或非阻塞方式获取消息列表指定 BLOCK 选项即表示阻塞超时时间 0 毫秒意味着永不超时# 从ID是0-0的开始读前2条
127.0.0.1:6379 xread count 2 streams mystream 0
1) 1) mystream2) 1) 1) 1609405178536-02) 1) f52) v52) 1) 1609405198676-02) 1) f12) v13) f24) v2# 阻塞的从尾部读取流开启新的客户端xadd后发现这里就读到了,block 0 表示永久阻塞
127.0.0.1:6379 xread block 0 streams mystream $
1) 1) mystream2) 1) 1) 1609408791503-02) 1) f62) v6
(42.37s)
可以看到我并没有给流 mystream 传入一个常规的 ID而是传入了一个特殊的 ID $这个特殊的 ID 意思是 XREAD 应该使用流 mystream 已经存储的最大 ID 作为最后一个 ID。以便我们仅接收从我们开始监听时间以后的新消息。这在某种程度上相似于 Unix 命令tail -f。当然也可以指定任意有效的 ID。而且 XREAD 的阻塞形式还可以同时监听多个 Strema只需要指定多个键名即可。127.0.0.1:6379 xread block 0 streams mystream yourstream $ $
创建消费者组xread 虽然可以扇形分发到 N 个客户端然而在某些问题中我们想要做的不是向许多客户端提供相同的消息流而是从同一流向许多客户端提供不同的消息子集。比如下图这样三个消费者按轮训的方式去消费一个 Stream。redis-stream-cgRedis Stream 借鉴了很多 Kafka 的设计。Consumer Group有了消费组的概念每个消费组状态独立互不影响一个消费组可以有多个消费者last_delivered_id 每个消费组会有个游标 last_delivered_id 在数组之上往前移动表示当前消费组已经消费到哪条消息了pending_ids 消费者的状态变量作用是维护消费者的未确认的 id。pending_ids 记录了当前已经被客户端读取的消息但是还没有 ack。如果客户端没有 ack这个变量里面的消息 ID 会越来越多一旦某个消息被 ack它就开始减少。这个 pending_ids 变量在 Redis 官方被称之为 PEL也就是 Pending Entries List这是一个很核心的数据结构它用来确保客户端至少消费了消息一次而不会在网络传输的中途丢失了没处理。redis-group-strucureStream 不像 Kafak 那样有分区的概念如果想实现类似分区的功能就要在客户端使用一定的策略将消息写到不同的 Stream。xgroup create创建消费者组xgreadgroup读取消费组中的消息xackack 掉指定消息# 创建消费者组的时候必须指定 ID, ID 为 0 表示从头开始消费为 $ 表示只消费新的消息也可以自己指定
127.0.0.1:6379 xgroup create mystream mygroup $
OK# 查看流和消费者组的相关信息可以查看流、也可以单独查看流下的某个组的信息
127.0.0.1:6379 xinfo stream mystream1) length2) (integer) 4 # 共 4 个消息3) radix-tree-keys4) (integer) 15) radix-tree-nodes6) (integer) 27) last-generated-id8) 1609408943089-09) groups
10) (integer) 1 # 一个消费组
11) first-entry # 第一个消息
12) 1) 1609405178536-02) 1) f52) v5
13) last-entry # 最后一个消息
14) 1) 1609408943089-02) 1) f62) v6
127.0.0.1:6379
按消费组消费Stream 提供了 xreadgroup 指令可以进行消费组的组内消费需要提供消费组名称、消费者名称和起始消息 ID。它同 xread 一样也可以阻塞等待新消息。读到新消息后对应的消息 ID 就会进入消费者的 PEL(正在处理的消息) 结构里客户端处理完毕后使用 xack 指令通知服务器本条消息已经处理完毕该消息 ID 就会从 PEL 中移除。# 消费组 mygroup1 中的 消费者 c1 从 mystream 中 消费组数据
# 号表示从当前消费组的 last_delivered_id 后面开始读
# 每当消费者读取一条消息last_delivered_id 变量就会前进
127.0.0.1:6379 xreadgroup group mygroup1 c1 count 1 streams mystream
1) 1) mystream2) 1) 1) 1609727806627-02) 1) f12) v13) f24) v25) f36) v3
127.0.0.1:6379 xreadgroup group mygroup1 c1 count 1 streams mystream
1) 1) mystream2) 1) 1) 1609727818650-02) 1) f42) v4
# 已经没有消息可读了
127.0.0.1:6379 xreadgroup group mygroup1 c1 count 2 streams mystream
(nil)# 还可以阻塞式的消费
127.0.0.1:6379 xreadgroup group mygroup1 c2 block 0 streams mystream
µ1) 1) mystream2) 1) 1) 1609728270632-02) 1) f52) v5
(89.36s)# 观察消费组信息
127.0.0.1:6379 xinfo groups mystream
1) 1) name2) mygroup13) consumers4) (integer) 2 # 2个消费者5) pending6) (integer) 3 # 共 3 条正在处理的信息还没有 ack7) last-delivered-id8) 1609728270632-0127.0.0.1:6379 xack mystream mygroup1 1609727806627-0 # ack掉指定消息
(integer) 1
尝鲜到此结束就不继续深入了。个人感觉就目前来说Stream 还是不能当做主流的 MQ 来使用的而且使用案例也比较少慎用。写在最后当然还有需要注意的就是业务上避免过度复用一个 Redis。既用它做缓存、做计算还拿它做任务队列这样的话 Redis 会很累的。没有绝对好的技术、只有对业务最友好的技术共勉以梦为马越骑越傻。诗和远方越走越慌。不忘初心是对的但切记要出发加油吧程序员。参考《Redis 设计与实现》Redis 官网https://segmentfault.com/a/1190000012244418https://www.cnblogs.com/williamjie/p/11201654.html
往期推荐
Spring Boot集成Redis这个坑把我害惨了硬核Redis总结看这篇就够了Socket粘包问题终极解决方案—Netty版2W字关注我每天陪你进步一点点