网站开发美工,温州网站改版,wordpress分类页标题,网站建设百灵鸟优化文章目录 前言一、Redisson介绍二、Redisson的使用1.1 引入依赖1.2 编写配置1.3 示例测试_011.4 示例测试_02 三、Redisson源码分析2.1 加锁源码2.2 看门狗机制 前言
分布式锁主要是解决分布式系统下数据一致性的问题。在单机的环境下#xff0c;应用是在同一进程下的#x… 文章目录 前言一、Redisson介绍二、Redisson的使用1.1 引入依赖1.2 编写配置1.3 示例测试_011.4 示例测试_02 三、Redisson源码分析2.1 加锁源码2.2 看门狗机制 前言
分布式锁主要是解决分布式系统下数据一致性的问题。在单机的环境下应用是在同一进程下的只需要保证单进程多线程环境中的线程安全性通过 Java 提供的 volatile、ReentrantLock、synchronized 以及 concurrent 并发包下一些线程安全的类等就可以做到。
一、Redisson介绍
Redisson 是一个基于 Redis 的分布式 Java 客户端。它提供了丰富的功能和工具帮助开发者在分布式系统中解决数据共享、并发控制和任务调度等问题。通过使用Redisson开发者可以轻松地操作 Redis 的分布式对象如集合、映射、队列等实现可靠的分布式锁机制以及管理和调度分布式环境中的任务和服务。
Redisson 的分布式锁的特点
● 线程安全分布式锁可以确保在多线程和多进程环境下的数据一致性和可靠性。 ● 可重入性 同一个线程可以多次获取同一个锁避免死锁的问题。 ● 锁超时 支持设置锁的有效期防止锁被长时间占用而导致系统出现问题。 ● 阻塞式获取锁 当某个线程尝试获取锁时如果锁已经被其他线程占用则该线程可以选择等待直到锁释放。 ● 无阻塞式获取锁 当某个线程尝试获取锁时如果锁已经被其他线程占用则该线程不会等待而是立即返回获取锁失败的信息。
redisson 实现分布式官网文档https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers
二、Redisson的使用
Redisson 支持单点模式、主从模式、哨兵模式、集群模式本文以单点模式为例说明。
1.1 引入依赖
!-- redisson --
dependencygroupIdorg.redisson/groupIdartifactIdredisson/artifactIdversion3.12.0/version
/dependency
!-- redis --
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-data-redis/artifactIdversion2.1.21.RELEASE/version
/dependency1.2 编写配置
spring:redis:host: 192.168.57.129port: 6379Configuration
public class RedissonConfig {Value(${spring.redis.host})private String host;Value(${spring.redis.port})private String port;/*** 对所有redisson的使用都是通过redissonClient对象* return*/Bean(destroyMethod shutdown)public RedissonClient redissonClient() {Config config new Config();config.useSingleServer().setAddress(redis:// host : port);return Redisson.create(config);}
}1.3 示例测试_01
Autowired
private RedissonClient redissonClient;// redisson分布式锁的key
private static final String LOCK_TEST_KEY redisson_lock;// redisson分布式锁的key
private static int TICKET_NUMBER 10;/*** 分布式锁测试用例* 模拟开启11个用户去抢车票*/
Test
public void lockTest() {// 利用循环多线程 模仿高并发请求for (int i 0; i 11; i) {CompletableFuture.runAsync(() - {if (TICKET_NUMBER 0) {// 这里获取公平锁遵循先进先出原则方便测试RLock fairLock redissonClient.getFairLock(LOCK_TEST_KEY);try {// 尝试加锁// waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败// leaseTime 锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间确保在锁有效期内业务能处理完)boolean lock fairLock.tryLock(3000, 30, TimeUnit.MILLISECONDS);if (lock){log.info(线程: Thread.currentThread().getName() 获得了锁);log.info(车票剩余数量:{}, --TICKET_NUMBER);}} catch (InterruptedException e) {e.printStackTrace();} finally {log.info(线程: Thread.currentThread().getName() 准备释放锁);//注意: 无论出现任何情况都要主动解锁fairLock.unlock();}}else {log.info(车票已售罄);}});try {// -_- 这里使当前方法占用的线程休息10秒不要立即结束Thread.sleep(3000L);} catch (InterruptedException e) {e.printStackTrace();}}
}日志信息输出
2023-11-18 15:27:00.834 INFO 5800 --- [onPool-worker-6] c.a.g.p.GulimallProductApplicationTests : 线程:ForkJoinPool.commonPool-worker-6获得了锁
2023-11-18 15:27:00.835 INFO 5800 --- [onPool-worker-6] c.a.g.p.GulimallProductApplicationTests : 车票剩余数量:9
2023-11-18 15:27:00.835 INFO 5800 --- [onPool-worker-6] c.a.g.p.GulimallProductApplicationTests : 线程:ForkJoinPool.commonPool-worker-6准备释放锁
2023-11-18 15:27:03.749 INFO 5800 --- [onPool-worker-6] c.a.g.p.GulimallProductApplicationTests : 线程:ForkJoinPool.commonPool-worker-6获得了锁
2023-11-18 15:27:03.749 INFO 5800 --- [onPool-worker-6] c.a.g.p.GulimallProductApplicationTests : 车票剩余数量:8
2023-11-18 15:27:03.749 INFO 5800 --- [onPool-worker-6] c.a.g.p.GulimallProductApplicationTests : 线程:ForkJoinPool.commonPool-worker-6准备释放锁
2023-11-18 15:27:06.759 INFO 5800 --- [onPool-worker-6] c.a.g.p.GulimallProductApplicationTests : 线程:ForkJoinPool.commonPool-worker-6获得了锁
2023-11-18 15:27:06.759 INFO 5800 --- [onPool-worker-6] c.a.g.p.GulimallProductApplicationTests : 车票剩余数量:7
2023-11-18 15:27:06.759 INFO 5800 --- [onPool-worker-6] c.a.g.p.GulimallProductApplicationTests : 线程:ForkJoinPool.commonPool-worker-6准备释放锁
2023-11-18 15:27:09.749 INFO 5800 --- [onPool-worker-6] c.a.g.p.GulimallProductApplicationTests : 线程:ForkJoinPool.commonPool-worker-6获得了锁
2023-11-18 15:27:09.750 INFO 5800 --- [onPool-worker-6] c.a.g.p.GulimallProductApplicationTests : 车票剩余数量:6
2023-11-18 15:27:09.750 INFO 5800 --- [onPool-worker-6] c.a.g.p.GulimallProductApplicationTests : 线程:ForkJoinPool.commonPool-worker-6准备释放锁
2023-11-18 15:27:12.759 INFO 5800 --- [onPool-worker-6] c.a.g.p.GulimallProductApplicationTests : 线程:ForkJoinPool.commonPool-worker-6获得了锁
2023-11-18 15:27:12.759 INFO 5800 --- [onPool-worker-6] c.a.g.p.GulimallProductApplicationTests : 车票剩余数量:5
2023-11-18 15:27:12.759 INFO 5800 --- [onPool-worker-6] c.a.g.p.GulimallProductApplicationTests : 线程:ForkJoinPool.commonPool-worker-6准备释放锁
2023-11-18 15:27:15.752 INFO 5800 --- [onPool-worker-6] c.a.g.p.GulimallProductApplicationTests : 线程:ForkJoinPool.commonPool-worker-6获得了锁
2023-11-18 15:27:15.752 INFO 5800 --- [onPool-worker-6] c.a.g.p.GulimallProductApplicationTests : 车票剩余数量:4
2023-11-18 15:27:15.752 INFO 5800 --- [onPool-worker-6] c.a.g.p.GulimallProductApplicationTests : 线程:ForkJoinPool.commonPool-worker-6准备释放锁
2023-11-18 15:27:18.762 INFO 5800 --- [onPool-worker-6] c.a.g.p.GulimallProductApplicationTests : 线程:ForkJoinPool.commonPool-worker-6获得了锁
2023-11-18 15:27:18.762 INFO 5800 --- [onPool-worker-6] c.a.g.p.GulimallProductApplicationTests : 车票剩余数量:3
2023-11-18 15:27:18.762 INFO 5800 --- [onPool-worker-6] c.a.g.p.GulimallProductApplicationTests : 线程:ForkJoinPool.commonPool-worker-6准备释放锁
2023-11-18 15:27:21.754 INFO 5800 --- [onPool-worker-6] c.a.g.p.GulimallProductApplicationTests : 线程:ForkJoinPool.commonPool-worker-6获得了锁
2023-11-18 15:27:21.754 INFO 5800 --- [onPool-worker-6] c.a.g.p.GulimallProductApplicationTests : 车票剩余数量:2
2023-11-18 15:27:21.754 INFO 5800 --- [onPool-worker-6] c.a.g.p.GulimallProductApplicationTests : 线程:ForkJoinPool.commonPool-worker-6准备释放锁
2023-11-18 15:27:24.763 INFO 5800 --- [onPool-worker-6] c.a.g.p.GulimallProductApplicationTests : 线程:ForkJoinPool.commonPool-worker-6获得了锁
2023-11-18 15:27:24.763 INFO 5800 --- [onPool-worker-6] c.a.g.p.GulimallProductApplicationTests : 车票剩余数量:1
2023-11-18 15:27:24.763 INFO 5800 --- [onPool-worker-6] c.a.g.p.GulimallProductApplicationTests : 线程:ForkJoinPool.commonPool-worker-6准备释放锁
2023-11-18 15:27:27.757 INFO 5800 --- [onPool-worker-6] c.a.g.p.GulimallProductApplicationTests : 线程:ForkJoinPool.commonPool-worker-6获得了锁
2023-11-18 15:27:27.757 INFO 5800 --- [onPool-worker-6] c.a.g.p.GulimallProductApplicationTests : 车票剩余数量:0
2023-11-18 15:27:27.757 INFO 5800 --- [onPool-worker-6] c.a.g.p.GulimallProductApplicationTests : 线程:ForkJoinPool.commonPool-worker-6准备释放锁
2023-11-18 15:27:30.753 INFO 5800 --- [onPool-worker-6] c.a.g.p.GulimallProductApplicationTests : 车票已售罄1.4 示例测试_02
ResponseBody
GetMapping(/hello)
public String hello(){//1、获取一把锁只要锁的名字一样就是同一把锁String lockKey my-lock;RLock lock redissonClient.getLock(lockKey);//2、加锁阻塞式等待默认加的锁都是30s。lock.lock();//10秒自动解锁自动解锁时间一定要大于业务的执行时间。问题在锁时间到了以后不会自动续期。//lock.lock(10, TimeUnit.SECONDS);//最佳实战省掉了整个续期操作。手动解锁//1、lock.lock(30, TimeUnit.SECONDS);try {log.info(加锁成功执行业务ing, 线程ID {}, Thread.currentThread().getId());Thread.sleep(10000);}catch (Exception e){e.printStackTrace();}finally {//3、解锁 假设解锁代码没有运行redisson会不会出现死锁log.info(释放锁, 线程ID {}, Thread.currentThread().getId());lock.unlock();}return hello;
}浏览器执行两个 hello 请求只有当第一个请求业务执行完第二个才能正常执行不然第二个处于阻塞式等待状态。 控制台打印日志信息
2023-11-18 16:01:00.784 INFO 3916 --- [io-10000-exec-4] c.a.g.product.web.IndexController : 加锁成功执行业务ing, 线程ID 116
2023-11-18 16:01:10.785 INFO 3916 --- [io-10000-exec-4] c.a.g.product.web.IndexController : 释放锁, 线程ID 116
2023-11-18 16:01:10.794 INFO 3916 --- [io-10000-exec-2] c.a.g.product.web.IndexController : 加锁成功执行业务ing, 线程ID 114
2023-11-18 16:01:20.794 INFO 3916 --- [io-10000-exec-2] c.a.g.product.web.IndexController : 释放锁, 线程ID 114redisson 实现分布式锁解决了 redis 实现分布式锁的两个问题
锁的自动续期如果业务超长运行期间自动给锁续上新的30s。不用担心业务时间长锁自动过期被删掉。加锁的业务只要运行完成就不会给当前锁续期即使不手动解锁锁默认在30s以后自动删除。
三、Redisson源码分析
redisson 这个框架的实现依赖了 Lua 脚本和 Netty以及各种 Future 及FutureListener 的异步、同步操作转换加锁和解锁过程中还巧妙地利用了 redis 的发布订阅功能。
2.1 加锁源码
无参加锁方法
Override
public void lock() {try {lock(-1, null, false);} catch (InterruptedException e) {throw new IllegalStateException();}
}有参加锁方法
Override
public void lock(long leaseTime, TimeUnit unit) {try {lock(leaseTime, unit, false);} catch (InterruptedException e) {throw new IllegalStateException();}
}2.2 看门狗机制
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {long threadId Thread.currentThread().getId();//尝试获取锁Long ttl tryAcquire(leaseTime, unit, threadId);// lock acquiredif (ttl null) {return;}RFutureRedissonLockEntry future subscribe(threadId);if (interruptibly) {commandExecutor.syncSubscriptionInterrupted(future);} else {commandExecutor.syncSubscription(future);}try {while (true) {ttl tryAcquire(leaseTime, unit, threadId);// lock acquiredif (ttl null) {break;}// waiting for messageif (ttl 0) {try {future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {if (interruptibly) {throw e;}future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);}} else {if (interruptibly) {future.getNow().getLatch().acquire();} else {future.getNow().getLatch().acquireUninterruptibly();}}}} finally {unsubscribe(future, threadId);}
}//尝试获取锁
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {return get(tryAcquireAsync(leaseTime, unit, threadId));
}异步的方式尝试获取锁
private T RFutureLong tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {if (leaseTime ! -1) {//如果我们传递了锁的超时时间就发送给redis执行脚本进行占锁默认超时就是我们指定的时间。return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}//如果我们未指定锁的超时时间就使用30*1000【LockWatchdogTimeout看门狗的默认时间】RFutureLong ttlRemainingFuture tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);//占锁成功ttlRemainingFuture.onComplete((ttlRemaining, e) - {//发生异常直接返回若无异常执行下面逻辑if (e ! null) {return;}// lock acquiredif (ttlRemaining null) {scheduleExpirationRenewal(threadId);}});return ttlRemainingFuture;
}//默认自动续期时间30s看门狗时间
private long lockWatchdogTimeout 30 * 1000;T RFutureT tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommandT command) {internalLockLeaseTime unit.toMillis(leaseTime);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,if (redis.call(exists, KEYS[1]) 0) then redis.call(hset, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end; if (redis.call(hexists, KEYS[1], ARGV[2]) 1) then redis.call(hincrby, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end; return redis.call(pttl, KEYS[1]);,Collections.ObjectsingletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}重新设置超时时间
private void scheduleExpirationRenewal(long threadId) {ExpirationEntry entry new ExpirationEntry();ExpirationEntry oldEntry EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);if (oldEntry ! null) {oldEntry.addThreadId(threadId);} else {entry.addThreadId(threadId);renewExpiration();}
}开启定时任务发送 LUA 脚本锁的超时时间达到1/3就重新设为30s
private void renewExpiration() {ExpirationEntry ee EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee null) {return;}Timeout task commandExecutor.getConnectionManager().newTimeout(new TimerTask() {Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent null) {return;}Long threadId ent.getFirstThreadId();if (threadId null) {return;}RFutureBoolean future renewExpirationAsync(threadId);future.onComplete((res, e) - {if (e ! null) {log.error(Cant update lock getName() expiration, e);return;}if (res) {// reschedule itselfrenewExpiration();}});}//只要占锁成功就会启动一个定时任务【重新给锁设置过期时间新的过期时间就是看门狗的默认时间】每隔10s都会自动再次续期续成30s internalLockLeaseTime【看门狗时间】 / 310s}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);
}重新设置超时时间 LUA 脚本
protected RFutureBoolean renewExpirationAsync(long threadId) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,if (redis.call(hexists, KEYS[1], ARGV[2]) 1) then redis.call(pexpire, KEYS[1], ARGV[1]); return 1; end; return 0;,Collections.ObjectsingletonList(getName()), internalLockLeaseTime, getLockName(threadId));}