做运营的网站,自己做网站上传相册,长沙城乡建设网站首页,沙田镇网站建设公司在发布订阅中我们了解到发布订阅模式存在的无法持久化保存消息和对于离线重连的客户端不能读取历史消息的缺陷#xff0c;以下就来了解一下stream是如何解决这个问题的
steam是类似于仅添加log的数据结构#xff0c;提供了以下基本命令 XADD: 添加新条目到stream # 语法xadd…在发布订阅中我们了解到发布订阅模式存在的无法持久化保存消息和对于离线重连的客户端不能读取历史消息的缺陷以下就来了解一下stream是如何解决这个问题的
steam是类似于仅添加log的数据结构提供了以下基本命令 XADD: 添加新条目到stream # 语法xadd {key} {id_generator} {key} {value} [{key} {value} ...]
# id表示生成id的规则。*表示时间戳 序号的方式自动生成ID用户也可以自己指定 ID
xadd skey * name alex age 16XREAD: 读取条目 # xread [Count {count}] [BLOCK {ms}] STREAMS {key} {id}
# Block 0代表一致堵塞
# id采用$代表最新的id
# 查询skey stream id 100001之后的第一条消息
xread Count 1 STREAMS skey 100001XDEL: 根据id删除消息 xdel skey 100001DEL: 删除stream del skeyXRANGE: 返回范围内的条目 # xrange {key} start end [Count {count}]
# - 表示第一条消息 表示最后一条消息
# 查询skey steam的第一条到最后一条消息
xrange skey - XLEN: 返回流的长度 xlen skeyXINFO: 展示stream的信息 # 展示stream skey关于流如何在内部编码的信息还显示了流中的第一个和最后一个消息。另一个可用的信息是与此流关联的消费者组的数量。
XINFO STREAM skey
# 询问更多关于消费者群体的信息
XINFO GROUPS skey消费组 XGROUP: 创建、销毁、管理消费组 # xgroup create {stream_key} {group_key} {offset}
# 0-0代表从第一条消息开始读取$代表读取新消息
# 可以在最后添加MKSTREAM来自动创建不存在的stream
xgroup create skey gname $XREADGROUP: 通过消费组读取消息 # xreadgroup group {group_key} {consumer_key} streams {stream_key} {id}
# 代表到目前为止消息从未传递给其他消费者
xreadgroup group gname cname streams skey 如果ID是特殊ID 则该命令将只返回到目前为止从未传递给其他消费者的新消息并且作为副作用将更新消费者组的最后一个ID。 如果ID是任何其他有效的数字ID那么该命令将允许我们访问pending消息的历史记录。也就是说传递给该指定使用者(由所提供的名称标识)的消息集并且到目前为止从未使用XACK进行确认。 XACK: 标记消息已被处理 # 标记之后的消息就不在pending消息历史中了
XACK skey gname 1692632639151-0消息子集所有权转移 XPENDING: 输出消费组pending消息数量 # XPENDING key groupname [[IDLE min-idle-time] start-id end-id count [consumer-name]]
# 输出stream skey所在消息组gname的信息
xpending skey gnameXCLAIM: 变更消费者消息所有权 # XCLAIM key group consumer min-idle-time ID-1 ID-2 ... ID-N
# 提供了最小空闲时间因此只有当上述消息的空闲时间大于指定的空闲时间时操作才会工作。这很有用因为可能有两个客户端同时试图重新声明一条消息
XCLAIM skey gname cname 60000 1692632647899-06.2之后引入了XAUTOCLAIM用于自动识别空闲的挂起消息并将它们的所有权转移给消费者
与发布订阅的区别
所有消息都无限期地添加到流中(除非用户显式地要求删除条目):不同的消费者将通过记住最后收到的消息的ID从其角度知道什么是新消息。
流消费者组提供了发布订阅无法实现的控制级别对同一流使用不同的组明确确认已处理的项目检查挂起项目的能力声明未处理的消息以及每个单个客户端的连贯历史可见性只能看到其私有的过去的消息历史。
性能
添加一个条目是O(1)、访问任何单条目都是O(n)其中n是ID的长度。
由于流id通常很短且长度固定因此这有效地减少了常量查找时间。
Ref
https://redis.io/docs/data-types/streams/