在因特网上建设网站可选择的方案,网站建设的功能需求分析,常州网站建设优质商家,可信验证网站欢迎来到我的博客#xff0c;代码的世界里#xff0c;每一行都是一个故事 Redis Streams在Spring Boot中的应用#xff1a;构建可靠的消息队列解决方案 引言前言Redis Streams的基本概念和特性1. 日志数据结构2. 消息和字段3. 消费者组4. 消息ID5. 实时和历史数据处理6. 性能… 欢迎来到我的博客代码的世界里每一行都是一个故事 Redis Streams在Spring Boot中的应用构建可靠的消息队列解决方案 引言前言Redis Streams的基本概念和特性1. 日志数据结构2. 消息和字段3. 消费者组4. 消息ID5. 实时和历史数据处理6. 性能和可靠性 实战maven依赖配置StreamConfig(监听)配置生产者配置消费者(组)配置初始化方法实现效果 基于List和专业消息队列对比相比于Redis List解决的痛点相比于专业高级队列的不足 总结 引言
Redis Stream解密探秘数据流处理的黑科技【一】
解锁Redis Stream新境界高级用法大揭秘【二】
Redis List打造高效消息队列的秘密武器【redis实战 一】
前言
在快节奏的技术世界中消息队列是连接不同服务和组件的关键。而在这个领域Redis Streams作为一种新兴的消息队列解决方案以其高性能和易用性吸引了众多开发者的目光。当这项技术遇到了Spring Boot —— 当今最受欢迎的Java开发框架它们的结合将如何开启新的可能性让我们开始这趟探索之旅深入了解如何将这两种强大的技术融合在一起打造出优雅而强大的消息队列系统。
Redis Streams的基本概念和特性
Redis Streams是Redis数据库的一个强大类型于Redis 5.0中引入。它主要用于消息队列和事件流的存储与传递是一个高性能、持久化的日志数据结构。以下是Redis Streams的一些基本概念和核心特性
1. 日志数据结构
持久化的消息日志Redis Streams是一个按时间排序的消息日志。每条消息都存储在它被插入时的顺序位置并且有一个唯一的ID标识。可追溯性由于其日志特性Redis Streams允许你访问历史消息这对于消息的追溯、重放和延迟处理非常有用。
2. 消息和字段
消息结构每条消息都可以包含一个或多个字段field和值value。这类似于一个小的哈希结构使得每条消息可以携带多个相关的数据点。灵活的数据模型你可以根据应用的需要自由定义每条消息包含的字段和数据格式。
3. 消费者组
支持多消费者Redis Streams可以被多个消费者或多个消费者组同时读取每个消费者组都会跟踪其成员的进度。消息确认消费者读取并处理消息后可以发送确认表示消息已被处理。未确认的消息可以被再次处理确保消息不会因消费者失败而丢失。故障处理支持挂起的消息列表和消费者超时检测使得在消费者失败时可以由其他消费者接手处理消息。
4. 消息ID
自动生成或指定消息ID通常由Redis自动生成保证了全局唯一性和顺序性。你也可以手动指定ID以实现更复杂的场景。组成结构ID由一个时间戳部分和一个序列号部分组成格式为时间戳-序列号。
5. 实时和历史数据处理
实时消息处理通过XREAD或XREADGROUP命令你可以实时监听并处理新添加到流中的消息。历史消息查询通过XRANGE、XREVRANGE等命令可以查询流中的历史消息这对于数据分析、审计和消息重放非常有用。
6. 性能和可靠性
高性能Redis Streams设计用于处理高吞吐量的消息能够支持每秒数百万消息的读写。持久化与Redis的其他数据类型一样Streams的数据可以持久化到磁盘保证了数据的持久性和可靠性。
实战
maven依赖
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-data-redis/artifactId
/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId
/dependency
配置StreamConfig(监听)
package fun.bo.config;import fun.bo.consumer.MessageConsumer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;import java.time.Duration;/*** author xiaobo*/
Configuration
public class StreamConfig {Beanpublic StreamMessageListenerContainerString, ObjectRecordString, String streamMessageListenerContainer(RedisConnectionFactory connectionFactory, MessageConsumer messageConsumer) {// 用于配置消息监听容器的选项。在这个方法中通过设置不同的选项如轮询超时时间和消息的目标类型可以对消息监听容器进行个性化的配置。StreamMessageListenerContainer.StreamMessageListenerContainerOptionsString, ObjectRecordString, String options StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()// 设置了轮询超时的时间为100毫秒。这意味着当没有新的消息时容器将每隔100毫秒进行一次轮询。.pollTimeout(Duration.ofMillis(100))// 指定了消息的目标类型为 String。这意味着容器会将接收到的消息转换为 String 类型以便在后续的处理中使用。.targetType(String.class).build();// 创建一个可用于监听Redis流的消息监听容器。StreamMessageListenerContainerString, ObjectRecordString, String listenerContainer StreamMessageListenerContainer.create(connectionFactory, options);// 方法配置了容器来接收来自特定消费者组和消费者名称的消息。它还指定了要读取消息的起始偏移量以确定从哪里开始读取消息。listenerContainer.receive(Consumer.from(your-consumer-group, your-consumer-name),StreamOffset.create(your-stream-name, ReadOffset.lastConsumed()), messageConsumer);// 方法启动了消息监听容器使其开始监听消息。一旦容器被启动它将开始接收并处理来自Redis流的消息。listenerContainer.start();return listenerContainer;}
}
配置生产者
package fun.bo.produce;import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;import java.util.HashMap;
import java.util.Map;/*** author xiaobo*/
Service
RequiredArgsConstructor
public class MessageProducer {private final RedisTemplateString, String redisTemplate;public void sendMessage(String streamKey, String messageKey, String message) {MapString, String messageMap new HashMap();messageMap.put(messageKey, message);RecordId recordId redisTemplate.opsForStream().add(streamKey, messageMap);if (recordId ! null) {System.out.println(Message sent to Stream streamKey with RecordId: recordId);}}
}
配置消费者(组)
package fun.bo.consumer;import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Service;/*** author xiaobo*/
Service
public class MessageConsumer implements StreamListenerString, ObjectRecordString, String {Overridepublic void onMessage(ObjectRecordString, String message) {String stream message.getStream();String messageId message.getId().toString();String messageBody message.getValue();System.out.println(Received message from Stream stream with messageId: messageId);System.out.println(Message body: messageBody);}
}
配置初始化方法 如果是已经存在stream则可以不配置这个主要是为了防止启动报错org.springframework.data.redis.RedisSystemException: Error in execution; nested exception is io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key ‘your-stream-name’ or consumer group ‘your-consumer-group’ in XREADGROUP with GROUP option public void initializeStream() {StreamOperationsString, Object, Object streamOperations redisTemplate.opsForStream();// 创建一个流try {streamOperations.createGroup(your-stream-name, ReadOffset.from(0), your-consumer-group);} catch (Exception e) {// 流可能已存在忽略异常}
}实现效果 基于List和专业消息队列对比
Redis Streams作为消息队列相比于使用传统的Redis List类型引入了一系列改进和新功能同时也与专业的高级消息队列系统如RabbitMQ、Kafka等相比存在一些差距。以下是详细的分析
相比于Redis List解决的痛点 更好的消息顺序保证 List虽然List可以保持插入顺序但在高并发情况下确保生产者和消费者的顺序一致性较为复杂。Streams提供了全局唯一的、基于时间的ID来标识消息确保了消息的全局顺序。 消费者组支持 List原生List类型不支持消费者组的概念实现多消费者协调处理同一任务队列较为复杂。Streams原生支持消费者组允许多个消费者共享负载并跟踪各自的进度。 消息持久化和读取 List读取或消费消息后需要显式删除否则会一直保留在List中处理大量消息时可能会导致内存问题。Streams消息即使被消费仍然保留在Stream中可以随时查询历史消息且不会因消费而被移除。 复杂的读取操作 ListList提供的操作相对简单复杂的读取逻辑如按时间范围查询需要额外的逻辑来实现。Streams提供了复杂的查询命令如XRANGE、XREVRANGE可以按ID范围时间范围查询消息。 消息确认和重试 List需要手动实现消息确认和重试机制管理起来较为复杂。Streams提供了消息确认(XACK)和挂起消息查询(XPENDING)的功能使得消息的重试和故障处理更加容易。
相比于专业高级队列的不足 事务和消息持久性保证 Redis Streams虽然提供持久化但在处理复杂事务和确保消息持久性方面不如一些专业的消息队列系统如Kafka的WAL日志。 集群和分区 Redis Streams在集群环境下使用稍显复杂且对于数据分区和扩展性的支持不如专业的消息队列系统如Kafka的分区机制。 管理和监控工具 Redis Streams虽然有基本的监控命令但没有一些高级消息队列系统提供的丰富的管理界面和监控工具。 高级消息路由和过滤 Redis Streams缺乏一些高级消息队列提供的消息路由和过滤功能如RabbitMQ的Exchange和Binding。 消息传递语义 Redis Streams提供了基础的至少一次处理语义但可能不像某些系统那样支持严格的只处理一次语义。
总结
Redis Streams提供了一个轻量级、高性能且功能丰富的消息队列实现解决了使用List作为队列时的许多痛点特别适合需要快速部署、低延迟和简单可靠的场景。然而对于需要复杂事务处理、高级路由和过滤、或更丰富管理工具的复杂应用场景专业的消息队列系统可能更加适合。选择哪种方案应根据你的具体需求、资源和技术栈来决定。