免费建设小说网站,网站广告推广平台,西安房产网站制作公司,html5官方网站开发流程文章目录 前言一、配置1. 添加依赖2. 配置文件/类3. 注入redission3. 封装工具类 二、应用1. RedisUtils工具类的基本使用 三、队列1. 工具类2. 普通队列3. 有界队列#xff08;限制数据量#xff09;4. 延迟队列#xff08;延迟获取数据#xff09;5. 优先队列#xff08… 文章目录 前言一、配置1. 添加依赖2. 配置文件/类3. 注入redission3. 封装工具类 二、应用1. RedisUtils工具类的基本使用 三、队列1. 工具类2. 普通队列3. 有界队列限制数据量4. 延迟队列延迟获取数据5. 优先队列数据可插队 前言
redission是一个开源的java redis的客户端在其基础上进行了进一步扩展。这些扩展极大地丰富了Redis的应用场景尤其是在构建分布式系统时。 一、配置
1. 添加依赖
!--redisson--
dependencygroupIdorg.redisson/groupIdartifactIdredisson-spring-boot-starter/artifactIdversion${redisson.version}/versionexclusionsexclusiongroupIdorg.redisson/groupIdartifactIdredisson-spring-data-30/artifactId/exclusion/exclusions
/dependency
dependencygroupIdorg.redisson/groupIdartifactIdredisson-spring-data-27/artifactIdversion${redisson.version}/version
/dependency
dependencygroupIdcom.baomidou/groupIdartifactIdlock4j-redisson-spring-boot-starter/artifactIdversion${lock4j.version}/version
/dependency2. 配置文件/类
spring:redis:# 地址host: localhost# 端口默认为6379port: 6379# 数据库索引database: 0# 密码(如没有密码请注释掉)password: asd60787533# 连接超时时间timeout: 10s# 是否开启sslssl: falseredisson:# redis key前缀keyPrefix: demo# 线程池数量threads: 4# Netty线程池数量nettyThreads: 8# 单节点配置singleServerConfig:# 客户端名称clientName: demo# 最小空闲连接数connectionMinimumIdleSize: 8# 连接池大小connectionPoolSize: 32# 连接空闲超时单位毫秒idleConnectionTimeout: 10000# 命令等待超时单位毫秒timeout: 3000# 发布和订阅连接池大小subscriptionConnectionPoolSize: 50Data
Component
ConfigurationProperties(prefix redisson)
public class RedissonProperties {/*** redis缓存key前缀*/private String keyPrefix;/*** 线程池数量,默认值 当前处理核数量 * 2*/private int threads;/*** Netty线程池数量,默认值 当前处理核数量 * 2*/private int nettyThreads;/*** 单机服务配置*/private SingleServerConfig singleServerConfig;/*** 集群服务配置*/private ClusterServersConfig clusterServersConfig;DataNoArgsConstructorpublic static class SingleServerConfig {/*** 客户端名称*/private String clientName;/*** 最小空闲连接数*/private int connectionMinimumIdleSize;/*** 连接池大小*/private int connectionPoolSize;/*** 连接空闲超时单位毫秒*/private int idleConnectionTimeout;/*** 命令等待超时单位毫秒*/private int timeout;/*** 发布和订阅连接池大小*/private int subscriptionConnectionPoolSize;}DataNoArgsConstructorpublic static class ClusterServersConfig {/*** 客户端名称*/private String clientName;/*** master最小空闲连接数*/private int masterConnectionMinimumIdleSize;/*** master连接池大小*/private int masterConnectionPoolSize;/*** slave最小空闲连接数*/private int slaveConnectionMinimumIdleSize;/*** slave连接池大小*/private int slaveConnectionPoolSize;/*** 连接空闲超时单位毫秒*/private int idleConnectionTimeout;/*** 命令等待超时单位毫秒*/private int timeout;/*** 发布和订阅连接池大小*/private int subscriptionConnectionPoolSize;/*** 读取模式*/private ReadMode readMode;/*** 订阅模式*/private SubscriptionMode subscriptionMode;}}3. 注入redission
Slf4j
Configuration
EnableCaching
EnableConfigurationProperties(RedissonProperties.class)
public class RedisConfig {Autowiredprivate RedissonProperties redissonProperties;Autowiredprivate ObjectMapper objectMapper;Beanpublic RedissonAutoConfigurationCustomizer redissonCustomizer() {return config - {config.setThreads(redissonProperties.getThreads()).setNettyThreads(redissonProperties.getNettyThreads()).setCodec(new JsonJacksonCodec(objectMapper));RedissonProperties.SingleServerConfig singleServerConfig redissonProperties.getSingleServerConfig();if (ObjectUtil.isNotNull(singleServerConfig)) {// 使用单机模式config.useSingleServer()//设置redis key前缀.setNameMapper(new KeyPrefixHandler(redissonProperties.getKeyPrefix())).setTimeout(singleServerConfig.getTimeout()).setClientName(singleServerConfig.getClientName()).setIdleConnectionTimeout(singleServerConfig.getIdleConnectionTimeout()).setSubscriptionConnectionPoolSize(singleServerConfig.getSubscriptionConnectionPoolSize()).setConnectionMinimumIdleSize(singleServerConfig.getConnectionMinimumIdleSize()).setConnectionPoolSize(singleServerConfig.getConnectionPoolSize());}// 集群配置方式 参考下方注释RedissonProperties.ClusterServersConfig clusterServersConfig redissonProperties.getClusterServersConfig();if (ObjectUtil.isNotNull(clusterServersConfig)) {config.useClusterServers()//设置redis key前缀.setNameMapper(new KeyPrefixHandler(redissonProperties.getKeyPrefix())).setTimeout(clusterServersConfig.getTimeout()).setClientName(clusterServersConfig.getClientName()).setIdleConnectionTimeout(clusterServersConfig.getIdleConnectionTimeout()).setSubscriptionConnectionPoolSize(clusterServersConfig.getSubscriptionConnectionPoolSize()).setMasterConnectionMinimumIdleSize(clusterServersConfig.getMasterConnectionMinimumIdleSize()).setMasterConnectionPoolSize(clusterServersConfig.getMasterConnectionPoolSize()).setSlaveConnectionMinimumIdleSize(clusterServersConfig.getSlaveConnectionMinimumIdleSize()).setSlaveConnectionPoolSize(clusterServersConfig.getSlaveConnectionPoolSize()).setReadMode(clusterServersConfig.getReadMode()).setSubscriptionMode(clusterServersConfig.getSubscriptionMode());}log.info(初始化 redis 配置);};}/*** redis集群配置 yml** --- # redis 集群配置(单机与集群只能开启一个另一个需要注释掉)* spring:* redis:* cluster:* nodes:* - 192.168.0.100:6379* - 192.168.0.101:6379* - 192.168.0.102:6379* # 密码* password:* # 连接超时时间* timeout: 10s* # 是否开启ssl* ssl: false** redisson:* # 线程池数量* threads: 16* # Netty线程池数量* nettyThreads: 32* # 集群配置* clusterServersConfig:* # 客户端名称* clientName: ${ruoyi.name}* # master最小空闲连接数* masterConnectionMinimumIdleSize: 32* # master连接池大小* masterConnectionPoolSize: 64* # slave最小空闲连接数* slaveConnectionMinimumIdleSize: 32* # slave连接池大小* slaveConnectionPoolSize: 64* # 连接空闲超时单位毫秒* idleConnectionTimeout: 10000* # 命令等待超时单位毫秒* timeout: 3000* # 发布和订阅连接池大小* subscriptionConnectionPoolSize: 50* # 读取模式* readMode: SLAVE* # 订阅模式* subscriptionMode: MASTER*/}public class KeyPrefixHandler implements NameMapper {private final String keyPrefix;public KeyPrefixHandler(String keyPrefix) {//前缀为空 则返回空前缀this.keyPrefix StringUtils.isBlank(keyPrefix) ? : keyPrefix :;}/*** 增加前缀*/Overridepublic String map(String name) {if (StringUtils.isBlank(name)) {return null;}if (StringUtils.isNotBlank(keyPrefix) !name.startsWith(keyPrefix)) {return keyPrefix name;}return name;}/*** 去除前缀*/Overridepublic String unmap(String name) {if (StringUtils.isBlank(name)) {return null;}if (StringUtils.isNotBlank(keyPrefix) name.startsWith(keyPrefix)) {return name.substring(keyPrefix.length());}return name;}
}3. 封装工具类
NoArgsConstructor(access AccessLevel.PRIVATE)
SuppressWarnings(value {unchecked, rawtypes})
public class RedisUtils {private static final RedissonClient CLIENT SpringUtils.getBean(RedissonClient.class);/*** 限流** param key 限流key* param rateType 限流类型* param rate 速率* param rateInterval 速率间隔* return -1 表示失败*/public static long rateLimiter(String key, RateType rateType, int rate, int rateInterval) {RRateLimiter rateLimiter CLIENT.getRateLimiter(key);rateLimiter.trySetRate(rateType, rate, rateInterval, RateIntervalUnit.SECONDS);if (rateLimiter.tryAcquire()) {return rateLimiter.availablePermits();} else {return -1L;}}/*** 获取客户端实例*/public static RedissonClient getClient() {return CLIENT;}/*** 发布通道消息** param channelKey 通道key* param msg 发送数据* param consumer 自定义处理*/public static T void publish(String channelKey, T msg, ConsumerT consumer) {RTopic topic CLIENT.getTopic(channelKey);topic.publish(msg);consumer.accept(msg);}public static T void publish(String channelKey, T msg) {RTopic topic CLIENT.getTopic(channelKey);topic.publish(msg);}/*** 订阅通道接收消息** param channelKey 通道key* param clazz 消息类型* param consumer 自定义处理*/public static T void subscribe(String channelKey, ClassT clazz, ConsumerT consumer) {RTopic topic CLIENT.getTopic(channelKey);topic.addListener(clazz, (channel, msg) - consumer.accept(msg));}/*** 缓存基本的对象Integer、String、实体类等** param key 缓存的键值* param value 缓存的值*/public static T void setCacheObject(final String key, final T value) {setCacheObject(key, value, false);}/*** 缓存基本的对象保留当前对象 TTL 有效期** param key 缓存的键值* param value 缓存的值* param isSaveTtl 是否保留TTL有效期(例如: set之前ttl剩余90 set之后还是为90)* since Redis 6.X 以上使用 setAndKeepTTL 兼容 5.X 方案*/public static T void setCacheObject(final String key, final T value, final boolean isSaveTtl) {RBucketT bucket CLIENT.getBucket(key);if (isSaveTtl) {try {bucket.setAndKeepTTL(value);} catch (Exception e) {long timeToLive bucket.remainTimeToLive();setCacheObject(key, value, Duration.ofMillis(timeToLive));}} else {bucket.set(value);}}/*** 缓存基本的对象Integer、String、实体类等** param key 缓存的键值* param value 缓存的值* param duration 时间*/public static T void setCacheObject(final String key, final T value, final Duration duration) {RBatch batch CLIENT.createBatch();RBucketAsyncT bucket batch.getBucket(key);bucket.setAsync(value);bucket.expireAsync(duration);batch.execute();}/*** 注册对象监听器* p* key 监听器需开启 notify-keyspace-events 等 redis 相关配置** param key 缓存的键值* param listener 监听器配置*/public static T void addObjectListener(final String key, final ObjectListener listener) {RBucketT result CLIENT.getBucket(key);result.addListener(listener);}/*** 设置有效时间** param key Redis键* param timeout 超时时间* return true设置成功false设置失败*/public static boolean expire(final String key, final long timeout) {return expire(key, Duration.ofSeconds(timeout));}/*** 设置有效时间** param key Redis键* param duration 超时时间* return true设置成功false设置失败*/public static boolean expire(final String key, final Duration duration) {RBucket rBucket CLIENT.getBucket(key);return rBucket.expire(duration);}/*** 获得缓存的基本对象。** param key 缓存键值* return 缓存键值对应的数据*/public static T T getCacheObject(final String key) {RBucketT rBucket CLIENT.getBucket(key);return rBucket.get();}/*** 获得key剩余存活时间** param key 缓存键值* return 剩余存活时间*/public static T long getTimeToLive(final String key) {RBucketT rBucket CLIENT.getBucket(key);return rBucket.remainTimeToLive();}/*** 删除单个对象** param key 缓存的键值*/public static boolean deleteObject(final String key) {return CLIENT.getBucket(key).delete();}/*** 删除集合对象** param collection 多个对象*/public static void deleteObject(final Collection collection) {RBatch batch CLIENT.createBatch();collection.forEach(t - {batch.getBucket(t.toString()).deleteAsync();});batch.execute();}/*** 检查缓存对象是否存在** param key 缓存的键值*/public static boolean isExistsObject(final String key) {return CLIENT.getBucket(key).isExists();}/*** 缓存List数据** param key 缓存的键值* param dataList 待缓存的List数据* return 缓存的对象*/public static T boolean setCacheList(final String key, final ListT dataList) {RListT rList CLIENT.getList(key);return rList.addAll(dataList);}/*** 注册List监听器* p* key 监听器需开启 notify-keyspace-events 等 redis 相关配置** param key 缓存的键值* param listener 监听器配置*/public static T void addListListener(final String key, final ObjectListener listener) {RListT rList CLIENT.getList(key);rList.addListener(listener);}/*** 获得缓存的list对象** param key 缓存的键值* return 缓存键值对应的数据*/public static T ListT getCacheList(final String key) {RListT rList CLIENT.getList(key);return rList.readAll();}/*** 缓存Set** param key 缓存键值* param dataSet 缓存的数据* return 缓存数据的对象*/public static T boolean setCacheSet(final String key, final SetT dataSet) {RSetT rSet CLIENT.getSet(key);return rSet.addAll(dataSet);}/*** 注册Set监听器* p* key 监听器需开启 notify-keyspace-events 等 redis 相关配置** param key 缓存的键值* param listener 监听器配置*/public static T void addSetListener(final String key, final ObjectListener listener) {RSetT rSet CLIENT.getSet(key);rSet.addListener(listener);}/*** 获得缓存的set** param key 缓存的key* return set对象*/public static T SetT getCacheSet(final String key) {RSetT rSet CLIENT.getSet(key);return rSet.readAll();}/*** 缓存Map** param key 缓存的键值* param dataMap 缓存的数据*/public static T void setCacheMap(final String key, final MapString, T dataMap) {if (dataMap ! null) {RMapString, T rMap CLIENT.getMap(key);rMap.putAll(dataMap);}}/*** 注册Map监听器* p* key 监听器需开启 notify-keyspace-events 等 redis 相关配置** param key 缓存的键值* param listener 监听器配置*/public static T void addMapListener(final String key, final ObjectListener listener) {RMapString, T rMap CLIENT.getMap(key);rMap.addListener(listener);}/*** 获得缓存的Map** param key 缓存的键值* return map对象*/public static T MapString, T getCacheMap(final String key) {RMapString, T rMap CLIENT.getMap(key);return rMap.getAll(rMap.keySet());}/*** 获得缓存Map的key列表** param key 缓存的键值* return key列表*/public static T SetString getCacheMapKeySet(final String key) {RMapString, T rMap CLIENT.getMap(key);return rMap.keySet();}/*** 往Hash中存入数据** param key Redis键* param hKey Hash键* param value 值*/public static T void setCacheMapValue(final String key, final String hKey, final T value) {RMapString, T rMap CLIENT.getMap(key);rMap.put(hKey, value);}/*** 获取Hash中的数据** param key Redis键* param hKey Hash键* return Hash中的对象*/public static T T getCacheMapValue(final String key, final String hKey) {RMapString, T rMap CLIENT.getMap(key);return rMap.get(hKey);}/*** 删除Hash中的数据** param key Redis键* param hKey Hash键* return Hash中的对象*/public static T T delCacheMapValue(final String key, final String hKey) {RMapString, T rMap CLIENT.getMap(key);return rMap.remove(hKey);}/*** 获取多个Hash中的数据** param key Redis键* param hKeys Hash键集合* return Hash对象集合*/public static K, V MapK, V getMultiCacheMapValue(final String key, final SetK hKeys) {RMapK, V rMap CLIENT.getMap(key);return rMap.getAll(hKeys);}/*** 设置原子值** param key Redis键* param value 值*/public static void setAtomicValue(String key, long value) {RAtomicLong atomic CLIENT.getAtomicLong(key);atomic.set(value);}/*** 获取原子值** param key Redis键* return 当前值*/public static long getAtomicValue(String key) {RAtomicLong atomic CLIENT.getAtomicLong(key);return atomic.get();}/*** 递增原子值** param key Redis键* return 当前值*/public static long incrAtomicValue(String key) {RAtomicLong atomic CLIENT.getAtomicLong(key);return atomic.incrementAndGet();}/*** 递减原子值** param key Redis键* return 当前值*/public static long decrAtomicValue(String key) {RAtomicLong atomic CLIENT.getAtomicLong(key);return atomic.decrementAndGet();}/*** 获得缓存的基本对象列表** param pattern 字符串前缀* return 对象列表*/public static CollectionString keys(final String pattern) {StreamString stream CLIENT.getKeys().getKeysStreamByPattern(pattern);return stream.collect(Collectors.toList());}/*** 删除缓存的基本对象列表** param pattern 字符串前缀*/public static void deleteKeys(final String pattern) {CLIENT.getKeys().deleteByPattern(pattern);}/*** 检查redis中是否存在key** param key 键*/public static Boolean hasKey(String key) {RKeys rKeys CLIENT.getKeys();return rKeys.countExists(key) 0;}
}二、应用
1. RedisUtils工具类的基本使用 创建接口 GetMapping(key)
public String getKey(String key){return RedisUtils.getCacheObject(key);
}GetMapping(setKey)
public String setKey(String key,String value){RedisUtils.setCacheObject(key,value);return success;
}设置key 获取key对应的值 其他方法的作用可以自行测试。这里就不再演示使用
三、队列
redission也支持队列下面封装了一些队列的相关方法。可以处理了一些简单的队列任务如果业务复杂可以选择mq
1. 工具类
NoArgsConstructor(access AccessLevel.PRIVATE)
public class QueueUtils {private static final RedissonClient CLIENT SpringUtil.getBean(RedissonClient.class);/*** 获取客户端实例*/public static RedissonClient getClient() {return CLIENT;}/*** 添加普通队列数据** param queueName 队列名* param data 数据*/public static T boolean addQueueObject(String queueName, T data) {RBlockingQueueT queue CLIENT.getBlockingQueue(queueName);return queue.offer(data);}/*** 通用获取一个队列数据 没有数据返回 null(不支持延迟队列)** param queueName 队列名*/public static T T getQueueObject(String queueName) {RBlockingQueueT queue CLIENT.getBlockingQueue(queueName);return queue.poll();}/*** 通用删除队列数据(不支持延迟队列)*/public static T boolean removeQueueObject(String queueName, T data) {RBlockingQueueT queue CLIENT.getBlockingQueue(queueName);return queue.remove(data);}/*** 通用销毁队列 所有阻塞监听 报错(不支持延迟队列)*/public static T boolean destroyQueue(String queueName) {RBlockingQueueT queue CLIENT.getBlockingQueue(queueName);return queue.delete();}/*** 添加延迟队列数据 默认毫秒** param queueName 队列名* param data 数据* param time 延迟时间*/public static T void addDelayedQueueObject(String queueName, T data, long time) {addDelayedQueueObject(queueName, data, time, TimeUnit.MILLISECONDS);}/*** 添加延迟队列数据** param queueName 队列名* param data 数据* param time 延迟时间* param timeUnit 单位*/public static T void addDelayedQueueObject(String queueName, T data, long time, TimeUnit timeUnit) {RBlockingQueueT queue CLIENT.getBlockingQueue(queueName);RDelayedQueueT delayedQueue CLIENT.getDelayedQueue(queue);delayedQueue.offer(data, time, timeUnit);}/*** 获取一个延迟队列数据 没有数据返回 null** param queueName 队列名*/public static T T getDelayedQueueObject(String queueName) {RBlockingQueueT queue CLIENT.getBlockingQueue(queueName);RDelayedQueueT delayedQueue CLIENT.getDelayedQueue(queue);return delayedQueue.poll();}/*** 删除延迟队列数据*/public static T boolean removeDelayedQueueObject(String queueName, T data) {RBlockingQueueT queue CLIENT.getBlockingQueue(queueName);RDelayedQueueT delayedQueue CLIENT.getDelayedQueue(queue);return delayedQueue.remove(data);}/*** 销毁延迟队列 所有阻塞监听 报错*/public static T void destroyDelayedQueue(String queueName) {RBlockingQueueT queue CLIENT.getBlockingQueue(queueName);RDelayedQueueT delayedQueue CLIENT.getDelayedQueue(queue);delayedQueue.destroy();}/*** 添加优先队列数据** param queueName 队列名* param data 数据*/public static T boolean addPriorityQueueObject(String queueName, T data) {RPriorityBlockingQueueT priorityBlockingQueue CLIENT.getPriorityBlockingQueue(queueName);return priorityBlockingQueue.offer(data);}/*** 优先队列获取一个队列数据 没有数据返回 null(不支持延迟队列)** param queueName 队列名*/public static T T getPriorityQueueObject(String queueName) {RPriorityBlockingQueueT queue CLIENT.getPriorityBlockingQueue(queueName);return queue.poll();}/*** 优先队列删除队列数据(不支持延迟队列)*/public static T boolean removePriorityQueueObject(String queueName, T data) {RPriorityBlockingQueueT queue CLIENT.getPriorityBlockingQueue(queueName);return queue.remove(data);}/*** 优先队列销毁队列 所有阻塞监听 报错(不支持延迟队列)*/public static T boolean destroyPriorityQueue(String queueName) {RPriorityBlockingQueueT queue CLIENT.getPriorityBlockingQueue(queueName);return queue.delete();}/*** 尝试设置 有界队列 容量 用于限制数量** param queueName 队列名* param capacity 容量*/public static T boolean trySetBoundedQueueCapacity(String queueName, int capacity) {RBoundedBlockingQueueT boundedBlockingQueue CLIENT.getBoundedBlockingQueue(queueName);return boundedBlockingQueue.trySetCapacity(capacity);}/*** 尝试设置 有界队列 容量 用于限制数量** param queueName 队列名* param capacity 容量* param destroy 已存在是否销毁*/public static T boolean trySetBoundedQueueCapacity(String queueName, int capacity, boolean destroy) {RBoundedBlockingQueueT boundedBlockingQueue CLIENT.getBoundedBlockingQueue(queueName);if (boundedBlockingQueue.isExists() destroy) {destroyQueue(queueName);}return boundedBlockingQueue.trySetCapacity(capacity);}/*** 添加有界队列数据** param queueName 队列名* param data 数据* return 添加成功 true 已达到界限 false*/public static T boolean addBoundedQueueObject(String queueName, T data) {RBoundedBlockingQueueT boundedBlockingQueue CLIENT.getBoundedBlockingQueue(queueName);return boundedBlockingQueue.offer(data);}/*** 有界队列获取一个队列数据 没有数据返回 null(不支持延迟队列)** param queueName 队列名*/public static T T getBoundedQueueObject(String queueName) {RBoundedBlockingQueueT queue CLIENT.getBoundedBlockingQueue(queueName);return queue.poll();}/*** 有界队列删除队列数据(不支持延迟队列)*/public static T boolean removeBoundedQueueObject(String queueName, T data) {RBoundedBlockingQueueT queue CLIENT.getBoundedBlockingQueue(queueName);return queue.remove(data);}/*** 有界队列销毁队列 所有阻塞监听 报错(不支持延迟队列)*/public static T boolean destroyBoundedQueue(String queueName) {RBoundedBlockingQueueT queue CLIENT.getBoundedBlockingQueue(queueName);return queue.delete();}/*** 订阅阻塞队列(可订阅所有实现类 例如: 延迟 优先 有界 等)*/public static T void subscribeBlockingQueue(String queueName, ConsumerT consumer, boolean isDelayed) {RBlockingQueueT queue CLIENT.getBlockingQueue(queueName);if (isDelayed) {// 订阅延迟队列CLIENT.getDelayedQueue(queue);}queue.subscribeOnElements(consumer);}}2. 普通队列 添加数据到队列 GetMapping(add)
public String add(){QueueUtils.addQueueObject(queue:simple,1);QueueUtils.addQueueObject(queue:simple,2);QueueUtils.addQueueObject(queue:simple,3);return ok;
}消费队列数据 遵循先进先出获取数据后就会删除。如果队列中没有数据获取到的就为null
GetMapping(get)
public Integer get(){return QueueUtils.getQueueObject(queue:simple);
}移除队列数据 GetMapping(remove)
public String remove(){QueueUtils.removeQueueObject(queue:simple,3);return ok;
}销毁队列 GetMapping(destroy)
public String destroy(){QueueUtils.destroyQueue(queue:simple);return ok;
}订阅队列消息 订阅的消息一般在项目启动的时候使用只能订阅一次当监听到队列新增数据的时候会立即取出来进行消费
PostConstruct
public void sub(){QueueUtils.subscribeBlockingQueue(queue:simple,(o)-{System.out.println(接收到消息o);},false);
}我们再次调用新增 3. 有界队列限制数据量 设置队列最大容量 有界队列在使用前必须设置容量
GetMapping(set)
public String set(){boolean b QueueUtils.trySetBoundedQueueCapacity(queue:bound, 10);return ok;
}新增有界队列数据 GetMapping(add)
public String add(){QueueUtils.addBoundedQueueObject(queue:bound,1);return ok;
}新增完毕后我们可以发现我们直接设置的最大容量变成来了9。每次添加数据都会查询当前最大容量是否0如果大于0添加成功并且减一否则添加失败 获取有界队列数据 GetMapping(get)
public Integer get(){return QueueUtils.getBoundedQueueObject(queue:bound);
}我们可以看到当获取数据的时候容量1数据从redis中删除 其他用法与普通队列类似就不再演示了
4. 延迟队列延迟获取数据 添加延迟数据 延迟队列的实现原理是将数据添加到另一个缓存队列中当到达指定时间才会转移到普通队列中
GetMapping(add)
public String add(){QueueUtils.addDelayedQueueObject(queue:belay,1,10, TimeUnit.SECONDS);return ok;
}获取延迟数据 必须达到指定时间后才能获取
GetMapping(get)
public Integer get(){return QueueUtils.getDelayedQueueObject(queue:belay);
}删除延迟数据 GetMapping(remove)
public String remove(){QueueUtils.removeQueueObject(queue:belay,3);return ok;
}清空延迟数据 GetMapping(destroy)
public String destroy(){QueueUtils.destroyDelayedQueue(queue:belay);return ok;
}订阅消息使用方法同普通队列类似第三个参数需要改为true
5. 优先队列数据可插队
插入优先队列的数据我们需要先实现比较接口
Data
Accessors(chain true)
class Order implements ComparableOrder{private Long id;Overridepublic int compareTo(Order o) {return Long.compare(getId(), o.id);}
}新增优先数据 GetMapping(add)
public String add(){QueueUtils.addPriorityQueueObject(queue:priority,new Order().setId(1L));QueueUtils.addPriorityQueueObject(queue:priority,new Order().setId(6L));QueueUtils.addPriorityQueueObject(queue:priority,new Order().setId(2L));QueueUtils.addPriorityQueueObject(queue:priority,new Order().setId(5L));QueueUtils.addPriorityQueueObject(queue:priority,new Order().setId(22L));QueueUtils.addPriorityQueueObject(queue:priority,new Order().setId(3L));return ok;
}我们可以看到插入的数据是有序的 获取优先队列数据 GetMapping(get)
public Integer get(){return QueueUtils.getPriorityQueueObject(queue:priority);
}删除优先队列数据 GetMapping(remove)
public String remove(){QueueUtils.removeQueueObject(queue:priority,3);return ok;
}清空优先队列数据 GetMapping(destroy)
public String destroy(){QueueUtils.destroyDelayedQueue(queue:priority);return ok;
}订阅消息使用方法同普通队列一样