网站logo代码,seo建站,发布外链,上海网站建设-目前企业网站所面临的困惑1.Redisson实现分布式锁
在分布式系统中#xff0c;涉及到多个实例对同一资源加锁的情况#xff0c;传统的synchronized、ReentrantLock等单进程加锁的API就不再适用#xff0c;此时就需要使用分布式锁来保证多服务之间加锁的安全性。 常见的分布式锁的实现方式有#xff…1.Redisson实现分布式锁
在分布式系统中涉及到多个实例对同一资源加锁的情况传统的synchronized、ReentrantLock等单进程加锁的API就不再适用此时就需要使用分布式锁来保证多服务之间加锁的安全性。 常见的分布式锁的实现方式有zookeeper、Redis。Redis分布式锁相对简单Redis分布式锁常用于业务场景中Redisson是Redis实现分布式锁常用方式
1.1.Redisson锁使用
Redis分布式锁中setnx命令可保证一个key同时只能有一个线程设置成功这样可实现加锁的互斥性。但是Redisson并未通过setnx命令实现加锁。
引入依赖 dependencygroupIdorg.redisson/groupIdartifactIdredisson-spring-boot-starter/artifactIdversion3.15.6/version
/dependency配置类
package com.hong.springbootjwt.config.redission;import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;
import org.redisson.config.TransportMode;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** author: hong* date: 2023/2/16 20:09* description RedissonClient*/
Configuration
public class RedissonConfig {Beanpublic RedissonClient redissonClient(){Config config new Config();config.setTransportMode(TransportMode.NIO);SingleServerConfig singleServerConfig config.useSingleServer();//可以用rediss://来启用SSL连接singleServerConfig.setAddress(redis://127.0.0.1:6379);singleServerConfig.setPassword(123456);return Redisson.create(config);}
}Redisson加锁使用
package com.hong.springbootjwt.service.impl;import com.hong.springbootjwt.service.UserService;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Service;/*** author: hong* date: 2023/2/16 20:08* description*/
Service
public class UserServiceImpl implements UserService {private final RedissonClient redissonClient;public UserServiceImpl(RedissonClient redissonClient) {this.redissonClient redissonClient;}Overridepublic void redissionLock() {// 获取锁对象RLock myLock redissonClient.getLock(myLock);try {// 加锁myLock.lock();...}catch (Exception e) {}finally {// 释放锁myLock.unlock();}}
}1.1.1.Redisson加锁原理
通过RedissonClient传入锁的名称获取到RLock获得RLock接口的实现是RedissonLock然后通过RLock实现加锁和释放锁
public RLock getLock(String name) {return new RedissonLock(this.commandExecutor, name);
}RedissonLock对lock()方法的实现
public void lock() {try {this.lock(-1L, (TimeUnit)null, false);} catch (InterruptedException var2) {throw new IllegalStateException();}
}重载lock方法传入leaseTime为-1之后调用tryAcquire实现加锁
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {long threadId Thread.currentThread().getId();Long ttl this.tryAcquire(-1L, leaseTime, unit, threadId);...
}tryAcquire最后调用到tryAcquireAsync方法传入leaseTime和当前加锁线程IDtryAcquire和tryAcquireAsync的区别在于tryAcquireAsync是异步执行而tryAcquire是同步等待tryAcquireAsync的结果也即异步转同步的过程
private T RFutureLong tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFuture ttlRemainingFuture;if (leaseTime ! -1L) {ttlRemainingFuture this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {ttlRemainingFuture this.tryLockInnerAsync(waitTime, this.internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}...
}tryAcquireAsync方法会根据leaseTime是否为-1判断使用哪个分支加锁不论走哪个分支最后都调用tryLockInnerAsync方法实现加锁只是参数不同。此处leaseTime-1走下面分支 虽然传入tryAcquireAsync的leaseTime是-1但在调用tryLockInnerAsync方法传入的leaseTime参数是this.internalLockLeaseTime也即默认的30s进入到tryLockInnerAsync方法最终加锁是通过一段LUA脚本来实现的Redis在执行LUA脚本时可保证加锁的原子性所以Redisson实现加锁的原子性是依赖LUA脚本实现的。
T RFutureT tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommandT command) {return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, command,if (redis.call(exists, KEYS[1]) 0) then redis.call(hincrby, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end; if (redis.call(hexists, KEYS[1], ARGV[2]) 1) then redis.call(hincrby, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end; return redis.call(pttl, KEYS[1]);,Collections.singletonList(this.getRawName()), new Object[]{unit.toMillis(leaseTime), this.getLockName(threadId)});
}最后对于RedissonLock这个实现类来说最终实现加锁的逻辑都是通过tryLockInnerAsync来实现 1.1.2.LUA脚本实现加锁
Redis通过执行LUA脚本实现加锁保证加锁的原子性分析上述LUA脚本
KEY[1]加锁的名称此处demo就是myLockARGV[1]锁的过期时间不指定的话默认是30sARGV[2]代表加锁的唯一标识由UUID和线程ID组成一个Redisson客户端一个UUID代表唯一的客户端所以由UUID和线程ID组成加锁的唯一标识可理解为某个客户端的某个线程加锁
这些参数是如何传过去的呢其实就是在这里
getName获取锁的名称leaseTime传入的锁的过期时间没指定默认是30sgetLockName就是获取加锁的客户端线程的唯一标识。
这段LUA的加锁逻辑
调用Redis的exists命令判断加锁的key是否存在如果不存在进入if。不存在的话就是没有某个客户端的线程来加锁第一次加锁肯定没有加锁于是第一次if条件成立接着调用Redis的hincrby命令设置加锁的key和加锁的某个客户端的某线程加锁次数设置为1。加锁次数很重要是实现可重入锁特性的关键数据用hash数据结构保存。hincrby命令完成后形成如下数据结构
myLock:{b983c153-7421-469a-addb-44fb92259a1b:1:1
}最后调用Redis的pexpire命令将锁的过期时间设置为30s
总结第一个客户端线程加锁的逻辑还是挺简单的就是判断有无加过锁没有的话自己去加锁设置加锁的key保存加锁的线程和加锁次数设置锁的过期时间为30s 问题为何要设置加锁key的过期时间 主要原因是为了防止死锁当某客户端获取到锁还未来得及释放锁当客户端宕机了或者释放失败了一旦没设置过期时间那么这个锁key会一直存在当其他线程来加锁的话发生key已经被加锁了那么其他线程会一直加锁失败从而造成死锁问题。 1.2.延长加锁时间
在加锁过程中没指定锁的过期时间Redisson也会默认给锁设置30s的过期时间来防止死锁 虽然设置默认过期时间可防止死锁但若在30s内任务还未结束但是锁已经释放失效了一旦其他线程加锁成功就可能出现线程安全数据错乱问题。所以Redisson针对这种未指定超时时间的加锁实现了一个watchdog机制即“看门狗机制”自动延长加锁时间 在客户端通过tryLockInnerAsync方法加锁后如果没指定锁过期时间那么客户端会起一个定时任务来定时延长加锁时间默认10s执行一次所以watchdog的本质就是一个定时任务 最后定期执行一段LUA脚本实现加锁时间的延长 脚本中参数解释同加锁的参数
KEYS[1]锁的名称此demo为“myLock”ARGV[1]就是锁的过期时间ARGV[2]代表了加锁的唯一标识b983c153-7421-469a-addb-44fb92259a1b:1
这段LUA脚本意思是判断续约的线程和加锁的线程是否为同一个若为同一个将锁的过期时间延长30s然后返回1代表续约成功不是的话就返回0续约失败下一次定时任务就不会执行 注意因为有了看门狗机制所以若没有设置过期时间并且没有主动释放锁那么这个锁就永远不会释放因为定时任务会不断延长锁的过期时间造成死锁问题。 但是如果发生宕机是不会造成死锁的因为宕机了服务也就没有了那么看门狗的定时任务就没了自然不会续约等锁自动过期了也就自动释放锁了。 1.3.实现可重入锁
可重入锁的意思就是同一个客户端同一个线程多次对同一个锁进行加锁。 在Redisson中可以执行多次lock方法流程都是一样的最后调用到LUA脚本所以可重入锁的逻辑也是通过加锁的LUA脚本实现 下半部分就是可重入锁的逻辑 下面这段if的意思是判断当前已经加锁的key对应的加锁线程跟要加锁的线程是否为同一个如果是则将该线程对应的加锁次数加1也即实现了可重入加锁同时返回nil 可重入锁加锁成功后加锁key和对应值可能是这样
myLock:{b983c153-7421-469a-addb-44fb92259a1b:1:2
}1.4.主动释放锁
当业务执行完毕后需要主动释放锁为什么需要主动释放锁呢
当任务执行完未手动释放锁如果没有指定锁的超时时间那么因为看门狗机制会导致这个锁无法释放可能造成死锁问题如果指定了超时时间虽然不会造成死锁问题但会造成资源浪费。假设设置超时时间为30s但任务只执行了2s就完成那么这个锁会还会被占用28s这28s内其他线程就无法成功加锁。
Redisson如何主动释放锁以及避免其他线程释放自己加的锁呢 主动释放锁是通过unlock方法实现分析unlock方法的实现
unlock调用unlockAsync()方法传入当前释放线程的ID代表当前线程来释放锁unlock其实也是将unlockAsync的异步操作转为同步操作
public RFutureVoid unlockAsync(long threadId) {RPromiseVoid result new RedissonPromise();RFutureBoolean future this.unlockInnerAsync(threadId);future.onComplete((opStatus, e) - {this.cancelExpirationRenewal(threadId);if (e ! null) {result.tryFailure(e);} else if (opStatus null) {IllegalMonitorStateException cause new IllegalMonitorStateException(attempt to unlock lock, not locked by current thread by node id: this.id thread-id: threadId);result.tryFailure(cause);} else {result.trySuccess((Object)null);}});return result;
}unlockAsync最后会调用RedissonLock的unlockInnerAsync()实现释放锁的逻辑也是执行一段LUA脚本
protected RFutureBoolean unlockInnerAsync(long threadId) {return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,if (redis.call(hexists, KEYS[1], ARGV[3]) 0) then return nil; end; local counter redis.call(hincrby, KEYS[1], ARGV[3], -1); if (counter 0) then redis.call(pexpire, KEYS[1], ARGV[2]); return 0; else redis.call(del, KEYS[1]); redis.call(publish, KEYS[2], ARGV[1]); return 1; end; return nil;, Arrays.asList(this.getRawName(), this.getChannelName()), new Object[]{LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId)});
}LUA脚本解释
先判断来释放锁的线程是不是加锁的线程如果不是直接返回nil可看出此处是通过一个if条件防止线程释放了其他线程加的锁如果释放锁的线程是加锁的线程那么将锁次数减1然后拿到加锁次数counter变量若counter大于0说明有重入锁锁还未完全释放完那么设置一下过期时间然后返回0若counter未大于0说明此锁已经释放完成将锁对应的key删除然后发布一个锁已经释放的消息然后返回1 1.5.超时自动释放锁
已知如果不指定超时时间的话存在看门狗线程不断延长加锁时间不会导致锁超时释放自动过期那么指定超时时间的话是如何实现指定时间释放的呢 能够设置超时时间的方法
// 通过传入leaseTime参数可指定锁超时时间
void lock(long leaseTime, TimeUnit unit)boolean tryLock(long waitTime, long leaseTime, TimeUnit unit)已知无论是否设置锁超时时间最终都会调用tryAcquireAsync方法进行加锁。只是不指定超时时间的话传入的leaseTime值是-1也即不指定超时时间但是Redisson默认还是会设置30s过期时间当指定超时时间那么leaseTime就是自己指定的时间最终也是通过一个LUA脚本进行加锁逻辑 是否指定超时时间的区别
不指定超时时间会开启watchdog后台线程不断续约加锁时间指定超时时间就不会开启watchdog定时任务这样就不会续约加锁key到了过期时间就会自动删除即达到释放锁的目的 总结 指定超时时间达到超时释放锁的功能主要通过Redis自动过期来实现因为指定了超时时间加锁成功后就不会开启watchdog机制来延长加锁时间 实际项目中若能比较准确预估代码执行时间那么可以指定锁超时释放时间来防止业务执行错误导致无法释放锁的问题若不能预估代码执行时间那么可以不指定超时时间在finally代码块中采用unlock手动释放。
1.6.实现不同线程加锁的互斥
前面已经分析过第一次加锁逻辑和可重入锁的逻辑因为LUA脚本加锁的逻辑同时只有一个线程能够执行Redis是单线程的原因所以一旦有线程加锁成功那么另一线程来加锁前面两个if条件都不成立最后通过调用Redis的pttl命令返回锁的剩余过期时间回去。 这样客户端就可根据返回值判断是否加锁成功 因为第一次加锁和可重入锁的返回值都是nil而加锁失败就返回了锁的剩余过期时间
// 第一次加锁
if (redis.call(exists, KEYS[1]) 0) then redis.call(hincrby, KEYS[1], ARGV[2], 1);redis.call(pexpire, KEYS[1], ARGV[1]);return nil;
end;
// 可重入锁
if (redis.call(hexists, KEYS[1], ARGV[2]) 1) then redis.call(hincrby, KEYS[1], ARGV[2], 1);redis.call(pexpire, KEYS[1], ARGV[1]);return nil;
end;
// 无法加锁成功
return redis.call(pttl, KEYS[1]);所以加锁的LUA脚本通过条件判断就可实现加锁的互斥操作保证其他线程无法加锁成功。 总结加锁的LUA脚本实现了第一次加锁、可重入锁、加锁互斥的逻辑
1.7.加锁失败如何实现阻塞等待加锁
上面分析已知加锁失败后会走如下代码 这里可看出最终会执行死循环自旋的方式不停的通过tryAcquire()方法来实现加锁直到加锁成功后才会跳出死循环如果一直没有加锁成功那么就会一直旋转下去所谓阻塞就是自旋加锁的方式 但是这种阻塞可能产生问题如果其他线程释放锁失败那么这个阻塞加锁的线程会一直阻塞加锁肯定会出问题的所以需要设置超过一定时间还未加锁成功的话就放弃加锁。
超时放弃加锁的方法
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit)
boolean tryLock(long time, TimeUnit unit)通过waitTime参数或time参数来指定超时时间这两个方法的主要区别在于是否支持指定锁超时时间
do {long currentTime System.currentTimeMillis();ttl this.tryAcquire(waitTime, leaseTime, unit, threadId);if (ttl null) {var16 true;return var16;}// 超过尝试时间加锁失败返回falsetime - System.currentTimeMillis() - currentTime;if (time 0L) {this.acquireFailed(waitTime, unit, threadId);var16 false;return var16;}currentTime System.currentTimeMillis();if (ttl 0L ttl time) {((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}time - System.currentTimeMillis() - currentTime;
} while(time 0L);从源码可看出实现一定时间内还未获取到锁就放弃加锁的逻辑其实相比于一直自旋获取锁主要是加了超时判断如果超时了就退出循环放弃加锁
1.8.实现公平锁
什么是公平锁 公平锁就是指线程成功加锁的顺序跟线程请求加锁的顺序一样实现了先来先成功加锁的特点不插队才叫公平 前面所说的RedissonLock的实现都是非公平锁但里面有些机制如watchdog机制是公平的 公平锁和非公平锁比较 公平锁
优点按顺序平均分配锁资源不会出现线程饿死即某一线程长时间未获得锁的情况缺点按顺序唤醒线程的开销大执行性能不高
非公平锁
优点执行效率高谁先获得锁谁就先执行无需按顺序唤醒缺点资源分配随机性强可能出现线程饿死的情况
如何使用公平锁 通过RedissonClient的getFairLock可获取到公平锁。Redisson对于公平锁的实现是RedissonFairLock类通过RedissonFairLock来加锁可实现公平锁的特性
public void redissionLock() {// 获取锁对象RLock myLock redissonClient.getFairLock(myLock);try {// 加锁myLock.lock(30,TimeUnit.SECONDS);} catch (Exception e) {} finally {// 释放锁myLock.unlock();}
}RedissonFairLock继承了RedissonLock主要重写了tryLockInnerAsync方法也就是加锁逻辑的方法。 概述这段LUA的作用
当线程来加锁的时候如果加锁失败将线程放置到一个set中这样就按照加锁顺序给线程排队set集合的头部的线程就代表接下来要加锁成功的线程当有线程释放锁之后其他加锁失败的线程就会继续来实现加锁加锁前判断一下set集合的头部线程跟当前要加锁的线程是否同一个如果是同一个那么加锁成功如果不是的话就加锁失败这样就实现了加锁的顺序性
1.9.实现读写锁
在实际开发中会有很多“读多写少”的场景对于这种场景使用独占锁加锁在高并发情况下会导致大量线程加锁失败阻塞对系统吞吐量有一定影响为了适配这种“读多写少”的场景Redisson也实现了读写锁的功能 读写锁的特点
读与读是共享的不互斥读与写互斥写写互斥
Redisson中使用读写锁
public void redissionLock() {// 获取读写锁RReadWriteLock readWriteLock redissonClient.getReadWriteLock(readWriteLock);RLock readLock readWriteLock.readLock();try {readLock.lock();// 业务操作...} catch (Exception e) {} finally {// 释放锁readLock.unlock();}RLock writeLock readWriteLock.writeLock();try {writeLock.lock();// 业务操作...} catch (Exception e) {} finally {// 释放锁writeLock.unlock();}
}Redisson通过RedissonReadWriteLock类实现读写锁功能。通过这个类可以获取到读锁和写锁所以真正的加锁逻辑是由读锁和写锁实现的 Redisson是如何具体实现读写锁的 前面已知加锁成功后会在Redis中维护一个hash的数据结构存储加锁线程和加锁次数。在读写锁的实现中会往hash数据结构中多维护一个mode字段来表示当前加锁的模式。 所以能够实现读写锁最主要是因为维护了一个加锁模式的字段mode这样当线程来加锁的时候就能根据当前加锁模式结合读写的特性来判断要不要让当前线程加锁成功
若没有加锁那么不论读锁还是写锁都能加锁成功成功后根据加锁类型维护mode字段若模式是读锁加锁线程也是加读锁的就让它加锁成功若模式是读锁加锁线程是加写锁的就让它加锁失败若模式是写锁不论线程是加写锁还是读锁都让它加锁失败加锁线程自己除外可重入特性
1.10.实现批量加锁联锁
批量加锁的意思是同时加几个锁只有这些锁都加成功了才算真正的加锁成功 比如一个下单业务中同时需要锁定订单、库存、商品基于这种需要锁多种资源的场景中Redisson提供了批量加锁的实现对应的实现类是RedissonMultiLock 使用联锁
public void redissionLock() {// 获取读写锁RLock myLock1 redissonClient.getLock(myLock1);RLock myLock2 redissonClient.getLock(myLock2);RLock myLock3 redissonClient.getLock(myLock3);RLock multiLock redissonClient.getMultiLock(myLock1,myLock2,myLock3);try {multiLock.lock();// 业务操作...} catch (Exception e) {} finally {// 释放锁multiLock.unlock();}
}Redisson对于批量加锁的实现也很简单源码如下 就是根据顺序依次调用tryLock传入myLock1,myLock2,myLock3加锁方法如果都成功加锁了那么multiLock就算加锁成功
1.11.RedLock算法
对于单Redis实例来说如果Redis宕机了那么整个系统就无法运行所以为了保证Redis的高可用一般都会采用主从或哨兵模式但是一旦使用了主从或哨兵模式此时Redis的分布式锁就可能出现问题 例如使用哨兵模式 基于这种模式Redis客户端会在master节点上加锁然后异步复制到slave节点上。但是一旦master节点宕机那么哨兵感知到就会从slave节点选择一个节点作为主节点。 假设客户端对原主节点加锁加锁成功后还未来得及同步到从节点主节点宕机了从节点变为了主节点此时从节点是没有加锁信息的如果其他客户端来加锁是能够加锁成功的 针对此问题Redis官方提供一种RedLock算法Redisson刚好实现了这种算法
RedLock算法 在Redis分布式环境中假设有N个master节点这些节点相互独立不存在主从复制或其他集群协调机制。 前面描述过在Redis单例下怎么安全获取和释放锁需要确保将在N个实例上使用此方法获取和释放锁。为了获取锁客户端应该执行以下操作
获取当前Unix时间以ms为单位依次尝试从N个实例使用相同key和随机值获取锁当向Redis设置锁时客户端应该设置一个网络连接和响应超时时间这个超时时间小于锁的失效时间。这样可避免服务器端Redis已经挂掉情况下客户端还在等待响应结果如果服务器端没有在规定时间内响应客户端应尽快尝试其他Redis实例客户端使用当前时间减去开始获取锁的时间步骤1记录的时间就得到获取锁使用的时间并且仅当从大多数3个节点共5个的Redis节点中获取到锁并且使用时间小于锁失效时间时锁才算获取成功如果获取到锁key的真正有效时间等于有效时间减去获取锁所使用时间步骤3计算所得结果若因为某些原因获取锁失败没有在至少N/21个Redis实例上获取到锁或取锁时间已经超过有效时间客户端应该在所有Redis实例上进行解锁即使某些Redis实例根本没有加锁成功
Redisson对RedLock算法的实现 RLock lock1 redissonInstance1.getLock(lock1);
RLock lock2 redissonInstance2.getLock(lock2);
RLock lock3 redissonInstance3.getLock(lock3);RedissonRedLock lock new RedissonRedLock(lock1, lock2, lock3);
// 同时加锁lock1 lock2 lock3
// 红锁在大部分节点上加锁成功就算成功。
lock.lock();
...
lock.unlock();RedissonRedLock加锁过程如下
获取所有Redisson Node节点信息循环向所有Node节点加锁假设节点数为N一个Redisson Node代表一个主从节点若在N个节点中有 N/2 1个节点加锁成功那么整个RedissonRedLock加锁成功若在N个节点中小于N/2 1个节点加锁成功那么整个RedissonRedLock加锁失败若中途发现各节点加锁总耗时大于等于设置的最大等待时间则直接返回失败
RedissonRedLock底层其实也是基于RedissonMultiLock实现的RedissonMultiLock要求所有的加锁成功才算成功RedissonRedLock要求只要有N/21个成功就算成功