精美驾校企业网站模板,物业网站建设方案,wordpress主机服务器销售源码,招聘网页制作人员Spring boot 使用Redis 消息发布订阅 文章目录 Spring boot 使用Redis 消息发布订阅Redis 消息发布订阅Redis 发布订阅 命令 Spring boot 实现消息发布订阅发布消息消息监听主题订阅 Spring boot 监听 Key 过期事件消息监听主题订阅 最近在做请求风控的时候#xff0c;在网上搜…Spring boot 使用Redis 消息发布订阅 文章目录 Spring boot 使用Redis 消息发布订阅Redis 消息发布订阅Redis 发布订阅 命令 Spring boot 实现消息发布订阅发布消息消息监听主题订阅 Spring boot 监听 Key 过期事件消息监听主题订阅 最近在做请求风控的时候在网上搜集了大量的解决方案最后使用Redis 消息发布订阅 比较符合业务。做一下记录 Redis 消息发布订阅 Redis 发布订阅 命令redis命令手册 1、Redis 中pub/sub的消息为即发即失server 不会保存消息如果 publish 的消息没有任何 client 处于 “subscribe” 状态消息将会被丢弃如果 client 在 subcribe 时链接断开后重连那在么此期间的消息也将丢失。 2、Redis server 将会尽力将消息发送给处于 subscribe 状态的 client但是仍不会保证每条消息都能被正确接收。 **优点**支持发布订阅支持多组生产者、消费者处理消息
缺点 消费者下线数据会丢失 不支持数据持久化Redis宕机则数据也会丢失 消息堆积缓存区溢出消费者会被强制踢下线数据也会丢失
Redis 发布订阅 命令
命令描述Redis Unsubscribe 命令指退订给定的频道。Redis Subscribe 命令订阅给定的一个或多个频道的信息。Redis Pubsub 命令查看订阅与发布系统状态。Redis Punsubscribe 命令退订所有给定模式的频道。Redis Publish 命令将信息发送到指定的频道。Redis Psubscribe 命令订阅一个或多个符合给定模式的频道。
Spring boot 实现消息发布订阅
1、引入 Redis 依赖 !--Spring Boot redis 启动器--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-data-redis/artifactId/dependency2、Redis 数据库配置
spring:data:redis:database: 0host: localhostport: 6379password:发布消息 /*** redis 将信息发送到指定的频道* param topic 消息所属的主题/频道* param context 消息内容* return*/redisTemplate.convertAndSend(topic, context);
RequiredArgsConstructor
Service
public class RequestRateLimiterService {private final RedisTemplateString, Object redisTemplate;// Redis 中的 key 前缀private static final String REDIS_KEY_PREFIX select_rate_limit:;// Redis 中的通道名称private static final String REDIS_CHANNEL select_rate_limit_channel;// 根据用户名 请求风控public boolean allowRequest(String username) {// 每分钟最大请求次数Long MAX_REQUESTS_PER_MINUTE 60L;String key REDIS_KEY_PREFIX username;Long currentRequests redisTemplate.opsForValue().increment(key);if (currentRequests ! null currentRequests MAX_REQUESTS_PER_MINUTE) {redisTemplate.convertAndSend(REDIS_CHANNEL, username);return false; // 超过阈值拒绝请求}if (currentRequests ! null currentRequests 1) {redisTemplate.expire(key, 1, TimeUnit.MINUTES); // 设置过期时间为1分钟}return true; // 允许请求}}消息监听
1、 Redis 消息订阅-消息监听器当收到阅订的消息时会将消息交给这个类处理。
/*** Redis 消息订阅-消息监听器当收到阅订的消息时会将消息交给这个类处理* p* 1、可以直接实现 MessageListener 接口也可以继承它的实现类 MessageListenerAdapter.* 2、自动多线程处理打印日志即可看出即使手动延迟也不会影响后面消息的接收。**/
Component
public class RequestRateLimitSubscriber implements MessageListener {// 直接从容器中获取Resourceprivate RedisTemplateString, Object redisTemplate;/*** 监听到的消息必须进行与发送时相同的方式进行反序列* 1、订阅端与发布端 Redis 序列化的方式必须相同否则会乱码。** param message 消息实体* param pattern 匹配模式*/Overridepublic void onMessage(Message message, byte[] pattern) {// 消息订阅的匹配规则如 new PatternTopic(basic-*) 中的 basic-*String msgPattern new String(pattern);// 消息所属的通道可以根据不同的通道做不同的业务逻辑String channel (String) redisTemplate.getStringSerializer().deserialize(message.getChannel());// 接收的消息内容可以根据自己需要强转为自己需要的对象但最好先使用 instanceof 判断一下Object body redisTemplate.getValueSerializer().deserialize(message.getBody());log.info(收到 Redis 订阅消息: channel{} body{} pattern{} , channel, body, msgPattern);// 模拟数据处理 ********// 发送警告通知可以通过邮件、短信等方式进行通知log.info(------------数据处理完成.......);}
}主题订阅
1、自定义 RedisTemplate 序列化方式(发布者和订阅者必须相同)。
2、配置主题订阅 - Redis 消息监听器绑定监听指定通道。
/*** 自定义 RedisTemplate 序列化方式* 配置主题订阅 - Redis 消息监听器绑定监听指定通道*/
Configuration
public class RedisConfig {// 自定义的消息订阅监听器当收到阅订的消息时会将消息交给这个类处理Resourceprivate RequestRateLimitSubscriber requestRateLimitSubscriber;// 自定义 RedisTemplate 序列化方式 Beanpublic RedisTemplateString, Object redisTemplate(RedisConnectionFactory factory) {RedisTemplateString, Object redisTemplate new RedisTemplate();redisTemplate.setKeySerializer(RedisSerializer.string());// key 序列化规则redisTemplate.setHashKeySerializer(RedisSerializer.string());// hash key 序列化规则redisTemplate.setValueSerializer(RedisSerializer.java());// value 序列化规则redisTemplate.setHashValueSerializer(RedisSerializer.java()); // hash value 序列化规则redisTemplate.setConnectionFactory(factory); //绑定 RedisConnectionFactoryreturn redisTemplate; //返回设置好的 RedisTemplate}/*** 配置主题订阅* RedisMessageListenerContainer - Redis 消息监听器绑定监听指定通道* 1、可以添加多个监听器监听多个通道只需要将消息监听器与订阅的通道/主题绑定即可。* 2、订阅的通道可以配置在全局配置文件中也可以配置在数据库中* p* addMessageListener(MessageListener listener, Collection? extends Topic topics)将消息监听器与多个订阅的通道/主题绑定* addMessageListener(MessageListener listener, Topic topic)将消息监听器与订阅的通道/主题绑定** param connectionFactory* return*/Beanpublic RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory) {RedisMessageListenerContainer container new RedisMessageListenerContainer();// 设置连接工厂RedisConnectionFactory 可以直接从容器中取也可以从 RedisTemplate 中取container.setConnectionFactory(factory);// 订阅名称叫 select_rate_limit_channel 的通道, 类似 Redis 中的 subscribe 命令container.addMessageListener(requestRateLimitSubscriber, new ChannelTopic(*));// 订阅名称以 basic- 开头的全部通道, 类似 Redis 的 pSubscribe 命令container.addMessageListener(requestRateLimitSubscriber, new PatternTopic(*));return container;}
}Spring boot 监听 Key 过期事件
1、Redis 数据库可以通过命令设置 Key 的有效时间当一个 Key 过期后会自动从数据库中删除释放空间。得益于于这个特性可以很轻松地实现诸多类似于 “Session” 管理、数据缓存等功能。它们都有一个共同点就是数据不会永久保存
2、在有些场景中可能希望在某些 Key 过期的时候获取到通知进行一些业务处理。或者是干脆用于 “定时通知/任务” 功能例如下单 30 分钟后未支付则取消订单。那么可以在用户下单的时候使用订单号作为 key 设置到 Redis 数据库中并且设置过期时间为 30 分钟。当超时后可以在 “key 过期通知” 中获取到 key 也就是订单号判断用户是否已经支付从而是否取消订单。
3、Redis 的 Key 过期通知功能本质上是通过 发布/订阅 功能实现的所以它「不能保证通知消息的交付」当 Key 过期时如果服务器停机、重启后则该通知消息会永久丢失。
消息监听
1、Spring Data Redis 专门提供了一个密钥过期事件消息侦听器KeyExpirationEventMessageListener自定义监听器类继承它然后覆写 doHandleMessage(Message message) 方法即可。
2、doHandleMessage 方法用于处理 Redis Key 过期通知事件其中 Message 参数表示通知消息只有 2 属性分别表示消息正文在这里就是过期的 Key 名称以及来自于哪个 channel。
3、在 Redis Key 过期事件中「只能获取到已过期的 Key 的名称不能获取到值。」
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
/*** Redis 缓存 Key 过期监听器* Spring Data Redis 专门提供了一个密钥过期事件消息侦听器KeyExpirationEventMessageListener* 自定义监听器类继承它然后覆写 doHandleMessage(Message message) 方法即可。*/
Component
public class KeyExpireListener extends KeyExpirationEventMessageListener {private static final Logger logger LoggerFactory.getLogger(KeyExpireListener.class);/*** 通过构造函数注入 RedisMessageListenerContainer 给 KeyExpirationEventMessageListener** param listenerContainer Redis消息侦听器容器*/public KeyExpireListener(RedisMessageListenerContainer listenerContainer) {super(listenerContainer);}/*** doHandleMessage 方法用于处理 Redis Key 过期通知事件* 在 Redis Key 过期事件中「只能获取到已过期的 Key 的名称不能获取到值。」** param message通知消息只有 2 属性分别表示消息正文在这里就是过期的 Key 名称以及来自于哪个 channel。*/Overridepublic void doHandleMessage(Message message) {// 过期的 keyString key new String(message.getBody());// 消息通道String channel new String(message.getChannel());logger.info(过期key{} 消息通道(channel){}, key, channel);}
}主题订阅
1、与上面稍微有点不同因为 key 过期事件属于 Redis 内部消息内部频道/通道所以只需要往容器中注入 RedisMessageListenerContainer 就行不需要 addMessageListener 手动设置监听器 监听指定的通道/频道(topic 表达式)。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
Configuration
public class RedisConfig {Beanpublic RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory) {RedisMessageListenerContainer container new RedisMessageListenerContainer();container.setConnectionFactory(factory);// container.setTaskExecutor(null); // 设置用于执行监听器方法的 Executor// container.setErrorHandler(null); // 设置监听器方法执行过程中出现异常的处理器// container.addMessageListener(null, null); // 手动设置监听器 监听的 topic 表达式return container;}
}