如何做免费网站制作,郑州网站建设搜索优化,提高工作效率英语,百度推广好不好做文章目录 Redis消息队列实现异步秒杀1. jvm阻塞队列问题2. 什么是消息队列3. Redis实现消息队列1. 基于List结构模拟消息队列操作优缺点 2. 基于PubSub发布订阅的消息队列操作优缺点spring 结合redis的pubsub使用示例1. 引入依赖2. 配置文件3. RedisConfig4. CustomizeMessageL… 文章目录 Redis消息队列实现异步秒杀1. jvm阻塞队列问题2. 什么是消息队列3. Redis实现消息队列1. 基于List结构模拟消息队列操作优缺点 2. 基于PubSub发布订阅的消息队列操作优缺点spring 结合redis的pubsub使用示例1. 引入依赖2. 配置文件3. RedisConfig4. CustomizeMessageListener5. RedisMessageReceiver6. 监听原理简析7. 监听redis的key修改redis.confKeyspaceEventMessageListenerKeyExpirationEventMessageListener修改RedisConfig 3. 基于Stream的消息队列1. 单消费者xaddxread操作示例XREAD命令特点 2. 消费者组特点要点创建消费者组从消费者组读取消息图示操作过程消费者监听消息的基本思路XREADGROUP命令特点 Redis消息队列实现异步秒杀
1. jvm阻塞队列问题
java使用阻塞队列实现异步秒杀存在问题
jvm内存限制问题jvm内存不是无限的在高并发的情况下当有大量的订单需要创建时就有可能超出jvm阻塞队列的上限。数据安全问题jvm的内存没有持久化机制当服务重启或宕机时阻塞队列中的订单都会丢失。或者当我们从阻塞队列中拿到订单任务但是尚未处理时如果此时发生了异常这个订单任务就没有机会处理了也就丢失了。
2. 什么是消息队列
消息队列Message Queue字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色
消息队列存储和管理消息也被称为消息代理Message Broker生产者发送消息到消息队列消费者从消息队列获取消息并处理消息 正常下单我们需要将订单消息写入数据库。但由于秒杀并发访问量大数据库本身并发处理能力不强因此在处理秒杀业务时可以将部分业务在生产者这边做校验然后将消息写入消息队列而消费者处理该消息队列中的消息从而实现双方解耦更快的处理秒杀业务
3. Redis实现消息队列
我们可以使用一些现成的mq比如kafkarabbitmq等等但是呢如果没有安装mq我们也可以直接使用redis提供的mq方案降低我们的部署和学习成本。Redis提供了三种不同的方式来实现消息队列
list结构基于List结构模拟消息队列PubSub基本的点对点消息模型Stream比较完善的消息队列模型
1. 基于List结构模拟消息队列
消息队列Message Queue字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表很容易模拟出队列效果。
队列是入口和出口不在一边因此我们可以利用LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现。
不过要注意的是当队列中没有消息时RPOP或LPOP操作会返回null并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。 操作
命令介绍如下 优缺点
优点
利用Redis存储不受限于JVM内存上限基于Redis的持久化机制数据安全性有保证可以满足消息有序性
缺点
无法避免消息丢失如果消费者获取消息后然后立马就宕机了这个消息就得不到处理等同于丢失了只支持单消费者1个消息只能被1个消费者取走其它消费者会收不到此消息
2. 基于PubSub发布订阅的消息队列
PubSub发布订阅是Redis2.0版本引入的消息传递模型。顾名思义消费者可以订阅一个或多个channel生产者向对应channel发送消息后所有订阅者都能收到相关消息。
SUBSCRIBE channel [channel] 订阅一个或多个频道PUBLISH channel msg 向一个频道发送消息PSUBSCRIBE pattern [pattern] 订阅与pattern格式匹配的所有频道 ?匹配1个字符h?llo subscribes to hello, hallo and hxllo*匹配0个或多个字符h*llo subscribes to hllo and heeeello[]指定字符h[ae]llo subscribes to hello and hallo, but not hillo 操作 优缺点
优点
采用发布订阅模型支持多生产、多消费
缺点
不支持数据持久化如果发送消息时这个消息的频道没有被任何人订阅那这个消息就丢失了也消息就是不会被保存无法避免消息丢失发完了没人收直接就丢了消息堆积有上限超出时数据丢失当我们发送消息时如果有消费者在监听消费者会有1个缓存区去缓存这个消息数据如果消费者处理的慢那么客户端的缓存区中的消息会不断堆积而这个缓存区是有大小限制的如果超出了就会丢失
spring 结合redis的pubsub使用示例
1. 引入依赖
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.1.8.RELEASE/versionrelativePath/ !-- lookup parent from repository --/parentgroupIdcom.zzhua/groupIdartifactIddemo-redis-pubsub/artifactIdversion1.0-SNAPSHOT/versiondependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-data-redis/artifactId/dependency!-- 如果使用lettuce-core作为连接redis的实现, 不引入此依赖会报错: Caused by: java.lang.ClassNotFoundException:org.apache.commons.pool2.impl.GenericObjectPoolConfig --dependencygroupIdorg.apache.commons/groupIdartifactIdcommons-pool2/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependency/dependenciesbuildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactId/plugin/plugins/build/project
2. 配置文件
spring:redis:host: 127.0.0.1port: 6379database: 0password:lettuce:pool:min-idle: 2max-active: 8max-idle: 83. RedisConfig
spring-data-redis提供了2种处理redis消息的方法 自己实现MessageListener接口 public interface MessageListener {// 处理消息的方法// 第1个参数封装了: 消息发布到哪1个具体频道 和 消息的内容// 第2个参数封装了: // 1. 如果当前是通过普通模式去订阅的频道, 那么收到消息时该pattern就是消息发送的具体频道// 2. 如果当前是通过pattern通配符匹配去订阅的频道, 那么收到消息时, 该pattern就是订阅的频道void onMessage(Message message, Nullable byte[] pattern);
}指定MessageListenerAdapter适配器该适配器指定特定对象的特定方法来处理消息对特定的方法有参数方面的要求
Slf4j
Configuration
public class RedisConfig {Autowiredprivate RedisMessageReceiver redisMessageReceiver;Autowiredprivate CustomizeMessageListener customizeMessageListener;Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {RedisMessageListenerContainer container new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// 监听order.q通道不带通配符匹配channelcontainer.addMessageListener(customizeMessageListener, new ChannelTopic(order.q));// 监听order.*通道带通配符匹配channelcontainer.addMessageListener(listenerAdapter(), new PatternTopic(order.*));return container;}Beanpublic MessageListenerAdapter listenerAdapter() {// 交给receiver的receiveMessage方法, 对于这个方法的参数有如下要求:// 2个参数: 第一个参数是Object-即消息内容默认由RedisSerializer#deserialize处理,见MessageListenerAdapter#onMessage, // 第二个参数是String-即订阅的通道, 详细看上面MessageListener接口中第二个参数的解释// 1个参数: 参数是Object-即消息内容return new MessageListenerAdapter(redisMessageReceiver, receiveMessage);}}
4. CustomizeMessageListener
Slf4j
Component
public class CustomizeMessageListener implements MessageListener {Overridepublic void onMessage(Message message, byte[] pattern) {byte[] bodyBytes message.getBody();byte[] channelBytes message.getChannel();log.info(order.q - 消息订阅频道: {}, new String(channelBytes));log.info(order.q - 消息内容: {}, new String(bodyBytes));log.info(order.q - 监听频道: {}, new String(channelBytes));}
}5. RedisMessageReceiver
Slf4j
Component
public class RedisMessageReceiver {public void receiveMessage(String msg, String topic) {log.info(order.* - 消息的订阅频道: {}, topic);log.info(order.* - 消息的内容: {}, msg);}}
6. 监听原理简析
spring-data-redis的lettuce-core是基于netty的消息监听处理过程如下 PubSubCommandHandlernetty中的ChannelHandler处理器-PubSubEndpoint根据消息类型调用LettuceMessageListener 的不同方法-LettuceMessageListener - RedisMessageListenerContainer$DispatchMessageListener如果是pattern则从patternMapping中获取所有的listener如果不是pattern则从channelMapping中获取所有的listener。至于怎么判断是不是pattern?-使用异步线程池对上一步获取的所有listener执行onMessage方法
至于怎么判断是不是pattern这个是根据订阅关系来的如果订阅的是pattern那么如果这个向这个pattern中发送了消息那么就会收到1次消息并且是pattern。如果订阅的是普通channel那么如果向这个普通channel发送了消息那么又会收到1次消息不是pattern。如果向1个channel中发送消息这个channel既符合订阅的pattern也符合订阅的普通channel那么会收到2次消息并且这2次消息1次是pattern1次不是pattern的
7. 监听redis的key
既然已经说到了监听redis发布消息了那么也补充一下监听redis的key过期。因为监听redis的key过期也是通过redis的发布订阅实现的。
修改redis.conf
############################# EVENT NOTIFICATION ############################### Redis能够将在keyspace中发生的事件通知给 发布/订阅 客户端# Redis can notify Pub/Sub clients about events happening in the key space.
# This feature is documented at http://redis.io/topics/notifications# 例如如果开启了keyspace事件通知注意了,必须是开启了keyspace事件通知才可以开启的方式就是添加参数K
# 一个客户端在数据库0对一个叫foo的key执行了删除操作
# 那么redis将会通过 发布订阅 机制发布2条消息
# PUBLISH __keyspace0__:foo del
# PUBLISH __keyevent0__:del foo# For instance if keyspace events notification is enabled, and a client
# performs a DEL operation on key foo stored in the Database 0, two
# messages will be published via Pub/Sub:
#
# PUBLISH __keyspace0__:foo del
# PUBLISH __keyevent0__:del foo# 也可以指定一组 类名 来选择 Redis 会通知的一类事件。
# 每类事件 都通过一个字符定义# It is possible to select the events that Redis will notify among a set
# of classes. Every class is identified by a single character:# keySpace事件 以 __keyspace数据库序号__ 为前缀 发布事件
# K Keyspace events, published with __keyspacedb__ prefix. # Keyevent事件 以 __keyevent数据库序号__ 为前缀 发布事件
# E Keyevent events, published with __keyeventdb__ prefix. # 执行常规命令,比如del、expire、rename
# g Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ... # 执行 String 命令
# $ String commands # 执行 List 命令
# l List commands # 执行 Set 命令
# s Set commands # 执行 Hash 命令
# h Hash commands 执行 Hash 命令# 执行 ZSet 命令
# z Sorted set commands # key过期事件(每个key失效都会触发这类事件)
# x Expired events (events generated every time a key expires) # key驱逐事件(当key在内存满了被清除时生成)
# e Evicted events (events generated when a key is evicted for maxmemory) # A是g$lshzxe的别名因此AKE就意味着所有的事件
# A Alias for g$lshzxe, so that the AKE string means all the events.
## 配置中的notify-keyspace-events这个参数由0个或多个字符组成
# 如果配置为空字符串表示禁用通知
# The notify-keyspace-events takes as argument a string that is composed
# of zero or multiple characters. The empty string means that notifications
# are disabled.
## 比如要开启list命令和generic常规命令的事件通知
# 应该配置成 notify-keyspace-events Elg
# Example: to enable list and generic events, from the point of view of the
# event name, use:
#
# notify-keyspace-events Elg
#
# 比如订阅了__keyevent0__:expired频道的客户端要收到key失效的时间
# 应该配置成 notify-keyspace-events Ex
# Example 2: to get the stream of the expired keys subscribing to channel name __keyevent0__:expired use:
#
# notify-keyspace-events Ex
## 默认情况下所有的通知都被禁用了并且这个特性有性能上的开销。
# 注意K和E必须至少指定其中一个否则将收不到任何事件。
# By default all notifications are disabled because most users dont need
# this feature and the feature has some overhead. Note that if you dont
# specify at least one of K or E, no events will be delivered.
notify-keyspace-events Ex############################### ADVANCED CONFIG ###############################
KeyspaceEventMessageListener
通过实现InitializingBean接口在afterPropertiesSet方法中调用初始化init方法从redis中获取notify-keyspace-events配置项对应的值如果未设置任何值则改为EA结合上面的redis.conf节选可知表示的是开启所有的事件通知使用redisMessageListenerContainer通过pattern通配符匹配的方式订阅__keyevent*频道它是个抽象类实现了MessageListener接口处理消息的方法是个抽象方法它有1个子类KeyExpirationEventMessageListener订阅的pattern的频道是__keyevent*__:expired通过重写doRegister修改了订阅的频道。并且重写了处理消息的方法通过将消息内容包装成RedisKeyExpiredEvent事件对象然后通过事件发布器将事件发布出去。
public abstract class KeyspaceEventMessageListener implements MessageListener, InitializingBean, DisposableBean {private static final Topic TOPIC_ALL_KEYEVENTS new PatternTopic(__keyevent*);private final RedisMessageListenerContainer listenerContainer;private String keyspaceNotificationsConfigParameter EA;/*** Creates new {link KeyspaceEventMessageListener}.** param listenerContainer must not be {literal null}.*/public KeyspaceEventMessageListener(RedisMessageListenerContainer listenerContainer) {Assert.notNull(listenerContainer, RedisMessageListenerContainer to run in must not be null!);this.listenerContainer listenerContainer;}/** (non-Javadoc)* see org.springframework.data.redis.connection.MessageListener#onMessage(org.springframework.data.redis.connection.Message, byte[])*/Overridepublic void onMessage(Message message, Nullable byte[] pattern) {if (message null || ObjectUtils.isEmpty(message.getChannel()) || ObjectUtils.isEmpty(message.getBody())) {return;}doHandleMessage(message);}/*** Handle the actual message** param message never {literal null}.*/protected abstract void doHandleMessage(Message message);/*** Initialize the message listener by writing requried redis config for {literal notify-keyspace-events} and* registering the listener within the container.*/public void init() {if (StringUtils.hasText(keyspaceNotificationsConfigParameter)) {RedisConnection connection listenerContainer.getConnectionFactory().getConnection();try {Properties config connection.getConfig(notify-keyspace-events);if (!StringUtils.hasText(config.getProperty(notify-keyspace-events))) {connection.setConfig(notify-keyspace-events, keyspaceNotificationsConfigParameter);}} finally {connection.close();}}doRegister(listenerContainer);}/*** Register instance within the container.** param container never {literal null}.*/protected void doRegister(RedisMessageListenerContainer container) {listenerContainer.addMessageListener(this, TOPIC_ALL_KEYEVENTS);}/** (non-Javadoc)* see org.springframework.beans.factory.DisposableBean#destroy()*/Overridepublic void destroy() throws Exception {listenerContainer.removeMessageListener(this);}/*** Set the configuration string to use for {literal notify-keyspace-events}.** param keyspaceNotificationsConfigParameter can be {literal null}.* since 1.8*/public void setKeyspaceNotificationsConfigParameter(String keyspaceNotificationsConfigParameter) {this.keyspaceNotificationsConfigParameter keyspaceNotificationsConfigParameter;}/** (non-Javadoc)* see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()*/Overridepublic void afterPropertiesSet() throws Exception {init();}
}
KeyExpirationEventMessageListener
public class KeyExpirationEventMessageListener extends KeyspaceEventMessageListener implementsApplicationEventPublisherAware {private static final Topic KEYEVENT_EXPIRED_TOPIC new PatternTopic(__keyevent*__:expired);private Nullable ApplicationEventPublisher publisher;/*** Creates new {link MessageListener} for {code __keyevent*__:expired} messages.** param listenerContainer must not be {literal null}.*/public KeyExpirationEventMessageListener(RedisMessageListenerContainer listenerContainer) {super(listenerContainer);}/** (non-Javadoc)* see org.springframework.data.redis.listener.KeyspaceEventMessageListener#doRegister(org.springframework.data.redis.listener.RedisMessageListenerContainer)*/Overrideprotected void doRegister(RedisMessageListenerContainer listenerContainer) {listenerContainer.addMessageListener(this, KEYEVENT_EXPIRED_TOPIC);}/** (non-Javadoc)* see org.springframework.data.redis.listener.KeyspaceEventMessageListener#doHandleMessage(org.springframework.data.redis.connection.Message)*/Overrideprotected void doHandleMessage(Message message) {publishEvent(new RedisKeyExpiredEvent(message.getBody()));}/*** Publish the event in case an {link ApplicationEventPublisher} is set.** param event can be {literal null}.*/protected void publishEvent(RedisKeyExpiredEvent event) {if (publisher ! null) {this.publisher.publishEvent(event);}}/** (non-Javadoc)* see org.springframework.context.ApplicationEventPublisherAware#setApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher)*/Overridepublic void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {this.publisher applicationEventPublisher;}
}修改RedisConfig
Slf4j
Configuration
public class RedisConfig {Autowiredprivate RedisMessageReceiver redisMessageReceiver;Autowiredprivate CustomizeMessageListener customizeMessageListener;Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {RedisMessageListenerContainer container new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// 注意以下测试在redis.confi配置文件中设置了: notify-keyspace-events 为 AKE, // 也可以参照KeyspaceEventMessageListener在代码中设置这个配置项/*redis提供的事件通知发布消息示例如下:K PUBLISH __keyspace0__:foo delE PUBLISH __keyevent0__:del foo参照上述示例去写这个topic即可*/// 监听key删除事件container.addMessageListener(new MessageListener() {/*执行命令: del order:1234输出如下:监听key删除事件 - 消息的发布频道: __keyevent0__:del监听key删除事件 - 消息内容: order:1234监听key删除事件 - 消息的订阅频道: __keyevent*__:del*/Overridepublic void onMessage(Message message, byte[] pattern) {byte[] bodyBytes message.getBody();byte[] channelBytes message.getChannel();log.info(监听key删除事件 - 消息的发布频道: {}, new String(channelBytes));log.info(监听key删除事件 - 消息内容: {}, new String(bodyBytes));log.info(监听key删除事件 - 消息的订阅频道: {}, new String(pattern));}}, new PatternTopic(__keyevent*__:del));// 监听指定前缀的keycontainer.addMessageListener(new MessageListener() {Overridepublic void onMessage(Message message, byte[] pattern) {byte[] bodyBytes message.getBody();byte[] channelBytes message.getChannel();/*执行命令: set order:1234 a输出如下:监听指定前缀的key - 消息的发布频道: __keyspace0__:order:1234监听指定前缀的key - 消息内容: set监听指定前缀的key - 消息的订阅频道: __keyspace0__:order:**/log.info(监听指定前缀的key - 消息的发布频道: {}, new String(channelBytes));log.info(监听指定前缀的key - 消息内容: {}, new String(bodyBytes));log.info(监听指定前缀的key - 消息的订阅频道: {}, new String(pattern));}}, new PatternTopic(__keyspace0__:order:*));return container;}/* 借助了1. 这个KeyspaceEventMessageListener的bean中的对redis的配置修改2. 监听patter的topic*/Beanpublic KeyspaceEventMessageListener keyspaceEventMessageListener(RedisMessageListenerContainer container) {return new KeyspaceEventMessageListener(container){/* __keyevent* */Overrideprotected void doHandleMessage(Message message) {log.info(监听所有key命令事件, 消息内容:{}, {},// set name zzhua; expire name 5;// 消息内容就是key的名称, 比如: namenew String(message.getBody()),// 消息所发布的频道, 比如: __keyevent0__:set, __keyevent0__:expire等new String(message.getChannel()));}};}Beanpublic KeyExpirationEventMessageListener keyExpirationEventMessageListener(RedisMessageListenerContainer container) {return new KeyExpirationEventMessageListener(container){/* __keyevent*__:expired */Overrideprotected void doHandleMessage(Message message) {log.info(监听所有key失效, 消息内容:{}, {},// 消息内容就是key的名称, 比如: namenew String(message.getBody()),// 消息所发布的频道, 比如: __keyevent0__:expirednew String(message.getChannel()));}};}}
3. 基于Stream的消息队列
Stream 是 Redis 5.0 引入的一种新的数据类型因此支持持久化可以实现一个功能非常完善的消息队列专门为消息队列设计的Redis streams官网介绍
1. 单消费者
xadd
发送消息的命令 不指定消息队列的的最大消息数量就是不限制消息数量 消息唯一id建议使用*让redis自动生成消息唯一id (上面命令介绍中的大写表示照着抄就行小写的是需要我们自己提供的参数中括号表示可选参数)
示例
## 创建名为 users 的队列并向其中发送一个消息内容是{namejack,age21}并且使用Redis自动生成ID
127.0.0.1:6379 XADD users * name jack age 21
1644805700523-0xread
读取消息的方式之一 不指定阻塞时间就是直接返回不阻塞设置为0表示阻塞到有值为止stream中消息读取之后不会被删除$ 表示读取最新的消息但是如果之前消息都已经被读过了那么当前继续去读的话是读不到的尽管当前stream中仍然有消息
示例
## 从users的队列中读取1条消息, 从第1条开始读
127.0.0.1:6379 XREAD COUNT 1 STREAMS users 0
1) 1) users2) 1) 1) 1708522812423-02) 1) name2) jack3) age4) 21操作示例
查看当前redis版本是否支持stream数据结构 xadd与xread使用示例 在上面还有1点没有体现出来在stream中的每1个消息被当前客户端读了1遍还可以被当前客户端读1遍然后这个消息还可以被其它客户端读1遍。
xread读取最新数据要使用阻塞的方法才可以 我们发现只有在阻塞期间使用$才能读取到最新消息如果不使用阻塞想要读取最新数据是不可能的。
在业务开发中我们可以循环的调用XREAD阻塞方式来查询最新消息从而实现持续监听队列的效果伪代码如下 但是这会存在消息漏读的问题由于只有在阻塞期间使用$才能读取到最新消息假设在处理消息的时候此时消息队列中发来了消息那么这些消息就会被错过只有当执行XREAD COUNT 1 BLOCK 2000 STREAMS users $开始时收到的第1个消息才会被处理。
XREAD命令特点
STREAM类型消息队列的XREAD命令特点
消息可回溯消息读取完之后不会消失永久的保留在我们的队列当中随时想看都可以回去读一个消息可以被多个消费者读取因为消息读取之后不会消失可以阻塞读取有消息漏读的风险在处理消息的过程中如果来了多条消息则只能看到最后一条消息即最新的那1条
2. 消费者组
上面我们知道通过xread命令你阻塞读取最新消息有消息漏读的风险下面我们看看消费者组是如何解决这个问题的。
特点
消费者组Consumer Group将多个消费者划分到一个组中监听同一个队列。具备下列特点 消息分流
队列中的消息会分流给组内的不同消费者而不是重复消费从而加快消息处理的速度
消息标示
消费者组会维护一个标示记录最后一个被处理的消息哪怕消费者宕机重启还会从标示之后读取消息。确保每一个消息都会被消费
消息确认
消费者获取消息后消息处于pending状态并存入一个pending-list。当处理完成后需要通过XACK来确认消息标记消息为已处理才会从pending-list移除。
要点
redis服务器维护了多个消费者组
可以给1个stream指定多个消费者组
把这里的消费者组当成上节中的消费者即可1个stream绑定的的多个消费者组都会收到消息
消息发给消费者组
即多个消费者共同加入到消费者组中形成1个消费者而消息就分给消费者中中的消息来消费
消费者加入消费者组
消费者从消费者组中拉取消息拉取到的消息进入消费者组中的pending-list
消费者消费完消息后向消费者组确认消息已处理已确认处理的消息会从pending-list中删除
消费者组总会用1个标识来记录最后1个被处理的消息
创建消费者组
XGROUP CREATE key groupName ID [MKSTREAM]key队列名称groupName消费者组名称ID起始ID标示$代表队列中最后一个消息0则代表队列中第一个消息 建议如果不想处理队列中已存在的消息就可以使用$如果要处理已存在的消息就是用0 MKSTREAM队列不存在时自动创建队列不指定的话当不存在时不会创建
其它常见命令
# 删除指定的消费者组
XGROUP DESTORY key groupName# 给指定的消费者组添加消费者
#一般情况下我们并不需要自己添加消费者因为当我们从这个消费者组当中指定1个消费者
# 并且监听消息的时候如果这个消费者不存在则会自动创建消费者
XGROUP CREATECONSUMER key groupname consumername# 删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname consumername从消费者组读取消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]group消费组名称consumer消费者名称如果消费者不存在会自动创建一个消费者count本次查询的最大数量BLOCK milliseconds当没有消息时最长等待时间若未指定则不阻塞NOACK无需手动ACK即获取到消息后自动确认一般不建议使用STREAMS key指定队列名称ID获取消息的起始ID “”从消费者组的标记找到最后1个处理的消息注意不是已处理的消息是处理的消息也就是说它有可能被消费者获取了但还没被消费者确认掉的下一个未处理的消息开始其它除了以外的所有根据指定id从pending-list中获取已消费但未确认的消息。例如0是从pending-list中的第一个消息开始一直拿0就是一直从pending-list中拿第1个消息
图示操作过程 消费者监听消息的基本思路 XREADGROUP命令特点
消息可回溯可以多消费者争抢消息加快消费速度可以阻塞读取没有消息漏读的风险有消息确认机制保证消息至少被消费一次内存不受jvm限制消息可做持久化消息确认机制