为何要网站优化,邯郸制作网站的公司,模拟网站开发,证书兼职网【Spring连载】使用Spring Data访问Redis#xff08;九#xff09;----Redis流 Streams 一、追加Appending二、消费Consuming2.1 同步接收Synchronous reception2.2 通过消息监听器容器进行异步接收Asynchronous reception through Message Listener Containers2.2.1 命令式I… 【Spring连载】使用Spring Data访问Redis九----Redis流 Streams 一、追加Appending二、消费Consuming2.1 同步接收Synchronous reception2.2 通过消息监听器容器进行异步接收Asynchronous reception through Message Listener Containers2.2.1 命令式Imperative StreamMessageListenerContainer2.2.2 反应式Reactive StreamReceiver 2.3 确认策略Acknowledge strategies2.4 读取偏移量策略ReadOffset strategies 三、序列化Serialization四、对象映射Object Mapping4.1 简单值Simple Values4.2 复杂值Complex Values Redis Streams以抽象的方法对日志数据结构进行建模。通常日志是仅追加append-only的数据结构并且从一开始就在随机位置或通过流式传输新消息来消费。 在
Redis参考文档中了解有关Redis Streams的更多信息。 Redis Streams大致可以分为两个功能领域 追加记录消费记录
尽管这种模式与Pub/Sub有相似之处但主要区别在于消息的持久性以及消息的消费方式。 Pub/Sub依赖于瞬态消息的广播即如果你不听你就会错过消息而Redis Stream使用了一种持久的、仅追加的数据类型它会保留消息直到流被修剪。消费方面的另一个区别是Pub/Sub注册服务器端订阅。Redis将到达的消息推送到客户端而Redis Streams需要活动轮询active polling。 org.springframework.data.redis.connection 和 org.springframework.data.redis.stream包为Redis Streams提供了核心功能。
一、追加Appending
要发送记录你可以像使用其他操作一样使用低级low-levelRedisConnection或高级StreamOperations。这两个实体都提供add (xAdd)方法该方法接受记录和目标流作为参数。RedisConnection需要原始数据字节数组而StreamOperations允许任意对象作为记录传入如以下示例所示
// append message through connection
RedisConnection con …
byte[] stream …
ByteRecord record StreamRecords.rawBytes(…).withStreamKey(stream);
con.xAdd(record);// append message through RedisTemplate
RedisTemplate template …
StringRecord record StreamRecords.string(…).withStreamKey(my-stream);
template.opsForStream().add(record);流记录携带一个Map键值元组作为它们的payload。将记录附加到流中会返回可作为进一步引用的RecordId。
二、消费Consuming
在消费端你可以消费一个或多个流。Redis Streams提供读取命令允许从已知流的任意位置随机访问消费流和从流的结束消费新的流记录。 在底层RedisConnection提供了xRead和xReadGroup方法它们分别映射Redis命令以在消费者组中进行各自读取。请注意可以将多个流用作参数。 Redis中的订阅命令可能会被阻塞。也就是说在连接(connection)上调用xRead会导致当前线程在开始等待消息时阻塞。只有当读取命令超时或收到消息时线程才会被释放。 要消费流消息可以在应用程序代码中轮询poll消息也可以通过消息监听器容器使用两个异步接收中的一个2.2章节命令式或反应式。每次新记录到达时容器都会通知应用程序代码。
2.1 同步接收Synchronous reception
虽然流消费通常与异步处理相关联但也可以同步消费消息。重载的StreamOperations.read(…)方法提供了这个功能。在同步接收期间调用线程可能会阻塞直到消息可用为止。属性StreamReadOptions.block指定接收者在放弃等待消息之前应该等待多长时间。
// Read message through RedisTemplate
RedisTemplate template …ListMapRecordK, HK, HV messages template.opsForStream().read(StreamReadOptions.empty().count(2),StreamOffset.latest(my-stream));ListMapRecordK, HK, HV messages template.opsForStream().read(Consumer.from(my-group, my-consumer),StreamReadOptions.empty().count(2),StreamOffset.create(my-stream, ReadOffset.lastConsumed()))2.2 通过消息监听器容器进行异步接收Asynchronous reception through Message Listener Containers
由于其阻塞性低级别轮询low-level polling没有吸引力因为它需要为每个消费者进行连接和线程管理。为了缓解这个问题SpringData提供了消息侦听器它完成了所有繁重的工作。如果您熟悉EJB和JMS您应该会发现这些概念很熟悉因为它的设计尽可能接近Spring Framework及其消息驱动的POJOMDP中的支持。
Spring Data提供了两种针对所用编程模型量身定制的实现
StreamMessageListenerContainer充当命令式编程模型的消息侦听器容器。它用于使用Redis流中的记录并驱动注入其中的StreamListener实例。
StreamReceiver提供了消息侦听器的反应式变体。它用于将Redis流中的消息作为潜在的无限流使用并通过Flux发出流消息。
StreamMessageListenerContainer和StreamReceiver负责消息接收和调度到侦听器中进行处理的所有线程。消息侦听器容器/接收器是MDP和消息传递提供者之间的中介负责注册接收消息、资源获取和释放、异常转换等。这使您作为应用程序开发人员能够编写与接收消息并对其作出反应相关联的可能复杂的业务逻辑并将Redis基础设施的样板问题委托给框架。
这两个容器都允许更改运行时配置以便在应用程序运行时添加或删除订阅而无需重新启动。此外容器使用延迟订阅方法仅在需要时使用RedisConnection。如果所有侦听器都被取消订阅它会自动执行清理线程就会被释放。
2.2.1 命令式Imperative StreamMessageListenerContainer
2.2.2 反应式Reactive StreamReceiver
2.3 确认策略Acknowledge strategies
2.4 读取偏移量策略ReadOffset strategies
三、序列化Serialization
四、对象映射Object Mapping
4.1 简单值Simple Values
4.2 复杂值Complex Values