当前位置: 首页 > news >正文

哪里有免费网站空间邢台网站招聘员工123

哪里有免费网站空间,邢台网站招聘员工123,龙岗坑梓网站建设,优质外链平台Redis队列与Stream、Redis 6多线程详解 Redis队列与StreamStream总述常用操作命令生产端消费端单消费者消费组消息消费 Redis队列几种实现的总结基于List的 LPUSHBRPOP 的实现基于Sorted-Set的实现PUB/SUB#xff0c;订阅/发布模式基于Stream类型的实现与Java的集成消息队列问… Redis队列与Stream、Redis 6多线程详解 Redis队列与StreamStream总述常用操作命令生产端消费端单消费者消费组消息消费 Redis队列几种实现的总结基于List的 LPUSHBRPOP 的实现基于Sorted-Set的实现PUB/SUB订阅/发布模式基于Stream类型的实现与Java的集成消息队列问题Stream 消息太多怎么办?消息如果忘记 ACK 会怎样?PEL 如何避免消息丢失?死信问题Stream 的高可用分区 Partition Stream小结 Redis队列与Stream Redis5.0 最大的新特性就是多出了一个数据结构 Stream它是一个新的强大的支持多播的可持久化的 消息队列作者声明Redis Stream地借鉴了 Kafka 的设计。 Stream总述 Redis Stream 的结构如上图所示,每一个Stream都有一个消息链表将所有加入的消息都串起来每个消息都有一个唯一的 ID 和对应的内容。消息是持久化的Redis 重启后内容还在。 每个 Stream 都有唯一的名称它就是 Redis 的 key在我们首次使用xadd指令追加消息时自动创建。 每个 Stream 都可以挂多个消费组每个消费组会有个游标last_delivered_id在 Stream 数组之上往前移动表示当前消费组已经消费到哪条消息了。每个消费组都有一个 Stream 内唯一的名称消费组不会自动创建它需要单独的指令xgroup create进行创建需要指定从 Stream 的某个消息 ID 开始消费这个 ID 用来初始化last_delivered_id变量。 每个消费组 (Consumer Group) 的状态都是独立的相互不受影响。也就是说同一份 Stream 内部的消息会被每个消费组都消费到。 同一个消费组 (Consumer Group) 可以挂接多个消费者 (Consumer)这些消费者之间是竞争关系任意一个消费者读取了消息都会使游标last_delivered_id往前移动。每个消费者有一个组内唯一名称。 消费者 (Consumer) 内部会有个状态变量pending_ids它记录了当前已经被客户端读取,但是还没有 ack的消息。如果客户端没有 ack这个变量里面的消息 ID 会越来越多一旦某个消息被 ack它就开始减少。这个 pending_ids 变量在 Redis 官方被称之为PEL也就是Pending Entries List这是一个很核心的数据结构它用来确保客户端至少消费了消息一次而不会在网络传输的中途丢失了没处理。 消息 ID 的形式是timestampInMillis-sequence例如1527846880572-5它表示当前的消息在毫米时间戳1527846880572时产生并且是该毫秒内产生的第 5 条消息。消息 ID 可以由服务器自动生成也可以由客户端自己指定但是形式必须是整数-整数而且必须是后面加入的消息的 ID 要大于前面的消息 ID。 消息内容就是键值对形如 hash 结构的键值对这没什么特别之处。 常用操作命令 生产端 xadd 追加消息 xdel 删除消息这里的删除仅仅是设置了标志位不会实际删除消息。 xrange 获取消息列表会自动过滤已经删除的消息 xlen 消息长度 del 删除 Stream xadd streamtest * name mark age 18 streamtest 表示当前这个队列的名字也就是我们一般意义上Redis中的key* 号表示服务器自动生成 ID后面顺序跟着“name mark age 18”是我们存入当前streamtest 这个队列的消息采用的也是 key/value的存储形式 返回值1626705954593-0 则是生成的消息 ID由两部分组成时间戳-序号。时间戳时毫秒级单位是生成消息的Redis服务器时间它是个64位整型。序号是在这个毫秒时间点内的消息序号。它也是个64位整型。 为了保证消息是有序的因此Redis生成的ID是单调递增有序的。由于ID中包含时间戳部分为了避免服务器时间错误而带来的问题例如服务器时间延后了Redis的每个Stream类型数据都维护一个latest_generated_id属性用于记录最后一个消息的ID。若发现当前时间戳退后小于latest_generated_id所记录的则采用时间戳不变而序号递增的方案来作为新消息ID这也是序号为什么使用int64的原因保证有足够多的的序号从而保证ID的单调递增性质。 如果不是非常特别的需求强烈建议使用Redis的方案生成消息ID因为这种时间戳序号的单调递增的ID方案几乎可以满足全部的需求但ID是支持自定义的。 xrange streamtest - 其中-表示最小值 , 表示最大值 或者我们可以指定消息 ID 的列表 xdel streamtest 1626706380924-0 xlen streamtest del streamtest 删除整个 Stream 消费端 单消费者 虽然Stream中有消费者组的概念但是可以在不定义消费组的情况下进行 Stream 消息的独立消费当 Stream 没有新消息时甚至可以阻塞等待。Redis 设计了一个单独的消费指令xread可以将 Stream 当成普通的消息队列 (list) 来使用。使用 xread 时我们可以完全忽略消费组 (Consumer Group) 的存在就好比 Stream 就是一个普通的列表 (list) xread count 1 streams stream2 0-0 “count 1”表示从 Stream 读取1条消息缺省当然是头部“streams”可以理解为Redis关键字“stream2”指明了要读取的队列名称“0-0”指从头开始 xread count 2 streams stream2 1626710882927-0 也可以指定从streams的消息Id开始(不包括命令中的消息id) xread count 1 streams stream2 $ $代表从尾部读取上面的意思就是从尾部读取最新的一条消息,此时默认不返回任何消息 所以最好以阻塞的方式读取尾部最新的一条消息直到新的消息的到来 xread block 0 count 1 streams stream2 $ block后面的数字代表阻塞时间单位毫秒 此时我们新开一个客户端往stream2中写入一条消息 可以看到阻塞解除了返回了新的消息内容而且还显示了一个等待时间这里我们等待了127.87s 一般来说客户端如果想要使用 xread 进行顺序消费一定要记住当前消费到哪里了也就是返回的消息 ID。下次继续调用 xread 时将上次返回的最后一个消息 ID 作为参数传递进去就可以继续消费后续的消息。 消费组 创建消费组 Stream 通过xgroup create指令创建消费组 (Consumer Group)需要传递起始消息 ID 参数用来初始化last_delivered_id变量。 xgroup create stream2 cg1 0-0 “stream2”指明了要读取的队列名称“cg1”表示消费组的名称“0-0”表示从头开始消费 xgroup create stream2 cg2 $ $ 表示从尾部开始消费只接受新消息当前 Stream 消息会全部忽略 现在我们可以用xinfo命令来看看stream2的情况 xinfo stream stream2 xinfo groups stream2 消息消费 有了消费组自然还需要消费者Stream 提供了 xreadgroup 指令可以进行消费组的组内消费需要提供消费组名称、消费者名称和起始消息 ID。 它同 xread 一样也可以阻塞等待新消息。读到新消息后对应的消息 ID 就会进入消费者的PEL(正在处理的消息) 结构里客户端处理完毕后使用 xack 指令通知服务器本条消息已经处理完毕该消息 ID 就会从 PEL 中移除。 xreadgroup GROUP cg1 c1 count 1 streams stream2 “GROUP”属于关键字“cg1”是消费组名称“c1”是消费者名称“count 1”指明了消费数量 号表示从当前消费组的 last_delivered_id 后面开始读每当消费者读取一条消息last_delivered_id 变量就会前进 前面我们定义cg1的时候是从头开始消费的自然就获得Stream2中第一条消息 再执行一次上面的命令 自然就读取到了下条消息。 我们将Stream2中的消息读取完 xreadgroup GROUP cg1 c1 count 2 streams stream2 很自然就没有消息可读了 xreadgroup GROUP cg1 c1 count 1 streams stream2 然后设置阻塞等待 xreadgroup GROUP cg1 c1 block 0 count 1 streams stream2 我们新开一个客户端发送消息到stream2 xadd stream2 * name lison score 98 回到原来的客户端发现阻塞解除收到新消息 我们来观察一下观察消费组状态 如果同一个消费组有多个消费者我们还可以通过 xinfo consumers 指令观察每个消费者的状态 xinfo consumers stream2 cg1 可以看到目前c1这个消费者有 5 条待ACK的消息空闲了441340 ms 没有读取消息。 如果我们确认一条消息 xack stream2 cg1 1626751586744-0 就可以看到待确认消息变成了4条 xack允许带多个消息id比如 同时Stream还提供了命令XPENDIING 用来获消费组或消费内消费者的未处理完毕的消息每个Pending的消息有4个属性 消息ID 所属消费者 IDLE已读取时长 delivery counter消息被读取次数 命令XCLAIM用以进行消息转移的操作将某个消息转移到自己的Pending列表中。需要设置组、转移的目标消费者和消息ID同时需要提供IDLE已被读取时长只有超过这个时长才能被转移。 更多的Redis的Stream命令请大家参考Redis官方文档 https://redis.io/topics/streams-intro https://redis.io/commands 同时Redis文档中在每个命令的详情页右边会显示“Related commands”可以通过这个列表快速了解相关的命令和进入具体命令的详情页。 Redis队列几种实现的总结 基于List的 LPUSHBRPOP 的实现 足够简单消费消息延迟几乎为零但是需要处理空闲连接的问题。 如果线程一直阻塞在那里Redis客户端的连接就成了闲置连接闲置过久服务器一般会主动断开连接减少闲置资源占用这个时候blpop和brpop或抛出异常所以在编写客户端消费者的时候要小心如果捕获到异常需要重试。 其他缺点包括 做消费者确认ACK麻烦不能保证消费者消费消息后是否成功处理的问题宕机或处理异常等通常需要维护一个Pending列表保证消息处理确认不能做广播模式如pub/sub消息发布/订阅模型不能重复消费一旦消费就会被删除不支持分组消费。 基于Sorted-Set的实现 多用来实现延迟队列当然也可以实现有序的普通的消息队列但是消费者无法阻塞的获取消息只能轮询不允许重复消息。 PUB/SUB订阅/发布模式 优点 典型的广播模式一个消息可以发布到多个消费者多信道订阅消费者可以同时订阅多个信道从而接收多类消息消息即时发送消息不用等待消费者读取消费者会自动接收到信道发布的消息。 缺点 消息一旦发布不能接收。换句话就是发布时若客户端不在线则消息丢失不能寻回不能保证每个消费者接收的时间是一致的若消费者客户端出现消息积压到一定程度会被强制断开导致消息意外丢失。通常发生在消息的生产远大于消费速度时可见Pub/Sub 模式不适合做消息存储消息积压类的业务而是擅长处理广播即时通讯即时反馈的业务。 基于Stream类型的实现 基本上已经有了一个消息中间件的雏形可以考虑在生产过程中使用当然真正要在生产中应用要做的事情还很多比如消息队列的管理和监控就需要花大力气去实现而专业消息队列都已经自带或者存在着很好的第三方方案和插件。 与Java的集成 可以参见cn.tuling.redis.redismq.StreamVer 消息队列问题 从我们上面对Stream的使用表明Stream已经具备了一个消息队列的基本要素生产者API、消费者API消息Broker消息的确认机制等等所以在使用消息中间件中产生的问题这里一样也会遇到。 Stream 消息太多怎么办? 要是消息积累太多Stream 的链表岂不是很长内容会不会爆掉?xdel 指令又不会删除消息它只是给消息做了个标志位。 Redis 自然考虑到了这一点所以它提供了一个定长 Stream 功能。在 xadd 的指令提供一个定长长度 maxlen就可以将老的消息干掉确保最多不超过指定长度。 消息如果忘记 ACK 会怎样? Stream 在每个消费者结构中保存了正在处理中的消息 ID 列表 PEL如果消费者收到了消息处理完了但是没有回复 ack就会导致 PEL 列表不断增长如果有很多消费组的话那么这个 PEL 占用的内存就会放大。所以消息要尽可能的快速消费并确认。 PEL 如何避免消息丢失? 在客户端消费者读取 Stream 消息时Redis 服务器将消息回复给客户端的过程中客户端突然断开了连接消息就丢失了。但是 PEL 里已经保存了发出去的消息 ID。待客户端重新连上之后可以再次收到 PEL 中的消息 ID 列表。不过此时 xreadgroup 的起始消息 ID 不能为参数而必须是任意有效的消息 ID一般将参数设为 0-0表示读取所有的 PEL 消息以及自last_delivered_id之后的新消息。 死信问题 如果某个消息不能被消费者处理也就是不能被XACK这是要长时间处于Pending列表中即使被反复的转移给各个消费者也是如此。此时该消息的delivery counter通过XPENDING可以查询到就会累加当累加到某个我们预设的临界值时我们就认为是坏消息也叫死信DeadLetter无法投递的消息由于有了判定条件我们将坏消息处理掉即可删除即可。删除一个消息使用XDEL语法注意这个命令并没有删除Pending中的消息因此查看Pending消息还会在可以在执行执行XDEL之后XACK这个消息标识其处理完毕。 Stream 的高可用 Stream 的高可用是建立主从复制基础上的它和其它数据结构的复制机制没有区别也就是说在 Sentinel 和 Cluster 集群环境下 Stream 是可以支持高可用的。不过鉴于 Redis 的指令复制是异步的在 failover 发生时Redis 可能会丢失极小部分数据这点 Redis 的其它数据结构也是一样的。 分区 Partition 分区 Partition Stream小结 Stream 的消费模型借鉴了 Kafka 的消费分组的概念它弥补了 Redis Pub/Sub 不能持久化消息的缺陷。但是它又不同于 kafkaKafka 的消息可以分 partition而 Stream 不行。如果非要分 parition 的话得在客户端做提供不同的 Stream 名称对消息进行 hash 取模来选择往哪个 Stream 里塞。 总的来说如果是中小项目和企业在工作中已经使用了Redis在业务量不是很大而又需要消息中间件功能的情况下可以考虑使用Redis的Stream功能。但是如果并发量很高资源足够支持下还是以专业的消息中间件比如RocketMQ、Kafka等来支持业务更好。
http://www.zqtcl.cn/news/912694/

相关文章:

  • 给网站做选题计算机网络技术电商网站建设与运营方向
  • 网站如何做熊掌号并绑定wordpress pdf
  • wordpress页面构建器中文文山seo公司
  • 凡科免费做网站蜂箱尺寸与制作图片
  • 完全不收费的聊天软件班级优化大师下载安装app
  • 合肥网站改版360免费建站永久免费
  • 商业网站建设案例课程 下载工信部企业网站认证
  • 泉州网站设计哪家公司好沈阳seo代理计费
  • 做景观素材有哪几个网站国内建网站费用
  • 驻马店重点项目建设网站wordpress常规选项
  • 网站开发 英文网站策划建设阶段的推广
  • 建立网站一般多少钱wordpress评论跳过验证
  • 南京每月做社保明细在哪个网站查看设计作品的网站软件
  • html怎么做网站如何在腾讯云上网站建设
  • 网站建设怎么链接表格手机做外贸有什么好的网站
  • 深圳开发网站建设哪家好外贸网络营销培训
  • 广州智迅网络做网站免费下载ps素材网站
  • 什么网站时候做伪静态开发软件定制
  • 找人做网站 多少钱西宁市公司网站建设
  • 网页设计 教程网站找权重高的网站方法
  • 网站建设本地还是外地重庆seo排名方法
  • 那个网站做网编好昨晚兰州发生了什么事
  • 温州建设局网站首页哪里可以学做资料员的网站
  • 网站怎样在360做优化wordpress文章图片在线裁剪
  • 彭州建设网站建设网站哪间公司比较好
  • qq空间网站根目录慧聪网首页
  • 制作小程序和网站的公司杭州品牌设计公司
  • 显示网站翻页代码wordpress 金融 模板下载
  • 用双语网站做seo会不会phpmysql网站
  • 长沙专业网站建设公司优惠券怎么做自己的网站