云龙微网站开发,二次元网站开发的意义,wordpress网址导航开源,做h5的网站页面1.前言
随着微服务的快速推进#xff0c;分布式架构也得到蓬勃的发展#xff0c;那么如何保证多进程之间的并发则成为需要考虑的问题。因为服务是分布式部署模式#xff0c;本地锁Reentrantlock和Synchnorized就无法使用了#xff0c;当然很多同学脱口而出的基于Redis的se…1.前言
随着微服务的快速推进分布式架构也得到蓬勃的发展那么如何保证多进程之间的并发则成为需要考虑的问题。因为服务是分布式部署模式本地锁Reentrantlock和Synchnorized就无法使用了当然很多同学脱口而出的基于Redis的setnx锁由于上手简单所以也被广泛使用但是Redis的setnx锁存在无法保证原子性所以Redisson目前备受推崇今天我们一起来了解一下并且用十分优雅的方式实现它。
当然实现分布式锁的方式有很多像基于数据库表主键、基于表字段版本号、基于Redis的SETNX、REDLOCK、REDISSON以及Zookeeper等方式来实现本文对以上锁的实现以及优缺点不在讨论有兴趣的可以移步至此《分布式锁》
本文重点讲解一下Redisson分布式锁的实现
2.Redisson是如何基于Redis实现分布式锁的原理
先看一下最简单的实现方式 Testvoid test1() {// 1、创建配置Config config new Config();config.useSingleServer().setAddress(redis://127.0.0.1:6379);// 2、根据 Config 创建出 RedissonClient 实例RedissonClient redissonClient Redisson.create(config);//获取锁RLock lock redissonClient.getLock(xxx-lock);try {// 2.加锁lock.lock();} finally {// 3.解锁lock.unlock();System.out.println(Finally释放锁成功);}}通过上面这段代码我们看一下Redisson是如何基于Redis实现分布式锁的 下面的原理分析来自《分布式锁-8.基于 REDISSON 做分布式锁》
2.1 加锁原理
通过上面的这段简单的代码可以看出其加锁的方法主要依赖于其lock()方法对于应的源码如下 可以看到调用getLock()方法后实际返回一个RedissonLock对象在RedissonLock对象的lock()方法主要调用tryAcquire()方法 由于leaseTime -1于是走tryLockInnerAsync()方法这个方法才是关键 首先看一下evalWriteAsync方法的定义
T, R RFuture evalWriteAsync(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object … params);最后两个参数分别是keys和params
evalWriteAsync具体如何调用的呢
commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,if (redis.call(exists, KEYS[1]) 0) then redis.call(hset, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end; if (redis.call(hexists, KEYS[1], ARGV[2]) 1) then redis.call(hincrby, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end; return redis.call(pttl, KEYS[1]);,Collections.ObjectsingletonList(getName()), internalLockLeaseTime, getLockName(threadId));
结合上面的参数声明我们可以知道这里
KEYS[1]就是getName()ARGV[2]是getLockName(threadId)
假设前面获取锁时传的name是“abc”假设调用的线程ID是Thread-1假设成员变量UUID类型的id是6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c
那么KEYS[1]abcARGV[2]6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1
因此这段代码想表达什么呢
1、判断有没有一个叫“abc”的key
2、如果没有则在其下设置一个字段为“6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1”值为“1”的键值对 并设置它的过期时间
3、如果存在则进一步判断“6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1”是否存在若存在则其值加1并重新设置过期时间
4、返回“abc”的生存时间毫秒
这里用的数据结构是hashhash的结构是 key 字段1 值1 字段2 值2 。。。
用在锁这个场景下key就表示锁的名称也可以理解为临界资源字段就表示当前获得锁的线程
所有竞争这把锁的线程都要判断在这个key下有没有自己线程的字段如果没有则不能获得锁如果有则相当于重入字段值加1次数 算法原理如下图所示
2.1 解锁原理
protected RFutureBoolean unlockInnerAsync(long threadId) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,if (redis.call(exists, KEYS[1]) 0) then redis.call(publish, KEYS[2], ARGV[1]); return 1; end; if (redis.call(hexists, KEYS[1], ARGV[3]) 0) then return nil; end; local counter redis.call(hincrby, KEYS[1], ARGV[3], -1); if (counter 0) then redis.call(pexpire, KEYS[1], ARGV[2]); return 0; else redis.call(del, KEYS[1]); redis.call(publish, KEYS[2], ARGV[1]); return 1; end; return nil;,Arrays.ObjectasList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));}我们还是假设nameabc假设线程ID是Thread-1
同理我们可以知道
KEYS[1]是getName()即KEYS[1]abc
KEYS[2]是getChannelName()即KEYS[2]redisson_lock__channel:{abc}
ARGV[1]是LockPubSub.unlockMessage即ARGV[1]0
ARGV[2]是生存时间
ARGV[3]是getLockName(threadId)即ARGV[3]6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1
因此上面脚本的意思是
1、判断是否存在一个叫“abc”的key
2、如果不存在向Channel中广播一条消息广播的内容是0并返回1
3、如果存在进一步判断字段6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1是否存在
4、若字段不存在返回空若字段存在则字段值减1
5、若减完以后字段值仍大于0则返回0
6、减完后若字段值小于或等于0则广播一条消息广播内容是0并返回1
可以猜测广播0表示资源可用即通知那些等待获取锁的线程现在可以获得锁了
2.3 等待
上面的加锁解锁均是 可以获取到锁资源的情况那么当无法立即获取锁资源时就需要等待
Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {long threadId Thread.currentThread().getId();Long ttl tryAcquire(leaseTime, unit, threadId);// lock acquiredif (ttl null) {return;}// 订阅RFutureRedissonLockEntry future subscribe(threadId);commandExecutor.syncSubscription(future);try {while (true) {ttl tryAcquire(leaseTime, unit, threadId);// lock acquiredif (ttl null) {break;}// waiting for messageif (ttl 0) {getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {getEntry(threadId).getLatch().acquire();}}} finally {unsubscribe(future, threadId);}
// get(lockAsync(leaseTime, unit));
}protected static final LockPubSub PUBSUB new LockPubSub();protected RFutureRedissonLockEntry subscribe(long threadId) {return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}protected void unsubscribe(RFutureRedissonLockEntry future, long threadId) {PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}这里会订阅Channel当资源可用时可以及时知道并抢占防止无效的轮询而浪费资源
当资源可用用的时候循环去尝试获取锁由于多个线程同时去竞争资源所以这里用了信号量对于同一个资源只允许一个线程获得锁其它的线程阻塞
3.Redisson分布式锁常规使用
本章讲主要讲述加锁的常规使用Redisson分布式锁是基于Redis的Rlock锁实现了JavaJUC包下的Lock接口。
3.1 添加maven依赖 dependencygroupIdorg.redisson/groupIdartifactIdredisson/artifactIdversion3.8.2/version/dependency3.2 REDISSON的牛刀小试
还是原理中的那段代码稍作修改 GetMapping(test1)public String test1() {// 1、创建配置Config config new Config();config.useSingleServer().setAddress(redis://127.0.0.1:6379);// 2、根据 Config 创建出 RedissonClient 实例RedissonClient redissonClient Redisson.create(config);//获取锁RLock lock redissonClient.getLock(xxx-lock);try {// 2.加锁lock.lock();System.out.println(new Date()获取锁成功);//业务代码Thread.sleep(1000 * 3);} catch (InterruptedException e) {throw new RuntimeException(e);} finally {// 3.解锁lock.unlock();System.out.println(Finally释放锁成功);}System.out.println(finish);return finish;}上面这段代码做的事情很简单 getLock获取锁lock.lock进行加锁会出现的问题就是lock拿不到锁一直等待会进入阻塞状态显然这样是不好的。 1.TryLock
返回boolean类型和Reentrantlock的tryLock是一个意思尝试获取锁获取到就返回true获取失败就返回false不会使获不到锁的线程一直处于等待状态返回false可以继续执行下面的业务逻辑当然Ression锁内部也涉及到watchDog看门狗机制主要作用就是给快过期的锁进行续期主要用途就是使拿到锁的有限时间让业务执行完再进行锁释放。
为了避免频繁的去书写创建redis连接的代码所以我们将获取锁和释放锁的过程简单封装一下
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;Component
public class LockUtil {static MapString, RLock lockMap new ConcurrentHashMap();/*** 获取redisson客户端** return*/public static final RedissonClient getClient() {// 1、创建配置Config config new Config();config.useSingleServer().setAddress(redis://127.0.0.1:6379);// 2、根据 Config 创建出 RedissonClient 实例RedissonClient redissonClient Redisson.create(config);return redissonClient;}/*** 获取锁** param lockName* return*/public static boolean getLock(String lockName) {//获取锁RLock lock getClient().getLock(lockName);try {if (lock.tryLock(2, 10, TimeUnit.SECONDS)) {lockMap.put(lockName, lock);return true;}return false;} catch (InterruptedException e) {throw new RuntimeException(e);}}public static boolean getLock(String lockName, long waitTime, long leaseTime, TimeUnit timeUnit) {//获取锁RLock lock getClient().getLock(lockName);try {if (lock.tryLock(waitTime, leaseTime, timeUnit)) {lockMap.put(lockName, lock);return true;}return false;} catch (InterruptedException e) {throw new RuntimeException(e);}}/*** 解锁** param lockName*/public static void unLock(String lockName) {RLock lock lockMap.get(lockName);if (Objects.nonNull(lock) lock.isHeldByCurrentThread()) {lock.unlock();lockMap.remove(lockName);}}
}
使用方式如下 GetMapping(test2)public void test2() {try {if (LockUtil.getLock(ninesun)) {//执行业务代码System.out.println(业务代码);}} catch (Exception e) {System.out.println(获取锁失败);e.printStackTrace();} finally {//释放锁LockUtil.unLock(ninesun);}}为了使我们实现的方式更加优雅下面我们通过注解来实现
2.自定义注解实现锁机制
通常我们都会将redisson实例注入到方法类里面然后调用加锁方法进行加锁如果其他业务方法也需要加锁执行将会产生很多重复代码由此采用AOP切面的方式只需要通过注解的方式就能将方法进行加锁处理。
2.1 添加切面依赖 dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-aop/artifactId/dependency2.2 自定义注解
import java.lang.annotation.*;
import java.util.concurrent.TimeUnit;/*** author ninesun* ClassName RedissonDistributedLock* description: TODO* date 2023年11月27日* version: 1.0*/
Documented
Inherited
Retention(RetentionPolicy.RUNTIME)
Target({ElementType.METHOD})
public interface RedissonDistributedLock {String key() default ;int leaseTime() default 10;boolean autoRelease() default true;String errorDesc() default 系统正常处理请稍后提交;int waitTime() default 1;TimeUnit timeUnit() default TimeUnit.SECONDS;
}2.3 切面类实现
import com.example.demo.Utils.LockUtil;
import com.example.demo.annoation.RedissonDistributedLock;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.DefaultParameterNameDiscoverer;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;/*** author ninesun* ClassName RedisonDistributedLockHandler* description: TODO* date 2023年11月27日* version: 1.0*/
Aspect
Component
public class RedisonDistributedLockHandler {private static final Logger log LoggerFactory.getLogger(RedisonDistributedLockHandler.class);public RedisonDistributedLockHandler() {}Around(annotation(distributedLock))public Object around(ProceedingJoinPoint joinPoint, RedissonDistributedLock distributedLock) throws Throwable {String lockName this.getRedisKey(joinPoint, distributedLock);int leaseTime distributedLock.leaseTime();String errorDesc distributedLock.errorDesc();int waitTime distributedLock.waitTime();TimeUnit timeUnit distributedLock.timeUnit();Object var8;try {boolean lock LockUtil.getLock(lockName, leaseTime, waitTime, timeUnit);if (!lock) {throw new RuntimeException(errorDesc);}var8 joinPoint.proceed();} catch (Throwable var12) {log.error(执行业务方法异常, var12);throw var12;} finally {LockUtil.unLock(lockName);}return var8;}/*** 获取加锁的key** param joinPoint* param distributedLock* return*/private String getRedisKey(ProceedingJoinPoint joinPoint, RedissonDistributedLock distributedLock) {String key distributedLock.key();Object[] parameterValues joinPoint.getArgs();MethodSignature signature (MethodSignature) joinPoint.getSignature();Method method signature.getMethod();DefaultParameterNameDiscoverer nameDiscoverer new DefaultParameterNameDiscoverer();String[] parameterNames nameDiscoverer.getParameterNames(method);if (StringUtils.isEmpty(key)) {if (parameterNames ! null parameterNames.length 0) {StringBuffer sb new StringBuffer();int i 0;for (int len parameterNames.length; i len; i) {sb.append(parameterNames[i]).append( ).append(parameterValues[i]);}key sb.toString();} else {key redissionLock;}return key;} else {SpelExpressionParser parser new SpelExpressionParser();Expression expression parser.parseExpression(key);if (parameterNames ! null parameterNames.length ! 0) {EvaluationContext evaluationContext new StandardEvaluationContext();for (int i 0; i parameterNames.length; i) {evaluationContext.setVariable(parameterNames[i], parameterValues[i]);}try {Object expressionValue expression.getValue(evaluationContext);return expressionValue ! null !.equals(expressionValue.toString()) ? expressionValue.toString() : key;} catch (Exception var13) {return key;}} else {return key;}}}
}2.4具体使用 GetMapping(test3)RedissonDistributedLock(key updateUserInfo:#id, errorDesc 请勿重复提交)public void test3(RequestParam(value id) String id) {//业务代码}方法头加自定义注解
key参数代表需要加锁的keyerrorDesc获取锁失败提示报错信息 上面的演示示例是单机模式我们线上使用的可能是redis集群以及哨兵模式这个只需控制我们redis的连接方式即可。
3.3 分布式集群
1.集群模式
这个需要我们redis中开启cluster nodes Config config new Config();config.useClusterServers().setScanInterval(2000) // cluster state scan interval in milliseconds.addNodeAddress(redis://127.0.0.1:7000, redis://127.0.0.1:7001).addNodeAddress(redis://127.0.0.1:7002);RedissonClient redisson Redisson.create(config);2.哨兵模式
在使用哨兵模式时需要创建SentinelServersConfig对象并将其设置为Config对象的配置信息。代码创建SentinelServersConfig对象的方式如下
SentinelServersConfig sentinelConfig new SentinelServersConfig();
sentinelConfig.setMasterName(mymaster);
sentinelConfig.addSentinelAddress(redis://127.0.0.1:26379);
sentinelConfig.addSentinelAddress(redis://127.0.0.1:26380);
sentinelConfig.addSentinelAddress(redis://127.0.0.1:26381);
config.useSentinelServers().setMasterName(mymaster).addSentinelAddress(redis://127.0.0.1:26379).addSentinelAddress(redis://127.0.0.1:26380).addSentinelAddress(redis://127.0.0.1:26381);根据Redisson的官方文档可以根据自己的需要来调整Redisson的各种参数以达到最优的性能表现。以下是一些常用的配置参数及其说明。
connectTimeout连接超时时间单位毫秒timeout读写超时时间单位毫秒retryAttempts连接失败重试次数-1表示不限制重试次数retryInterval重试时间间隔单位毫秒threads响应请求线程数最大为16
3.Redisson配置了集群不生效
3.4 Redisson配置序列化
为了提高Redisson的性能表现Redisson在数据存储时使用了高效的序列化机制。在Redisson中默认使用的是JDK序列化机制但是考虑到JDK的序列化机制在序列化性能、序列化结果可读性、可靠性等方面存在一些问题因此Redisson提供了多种序列化方式供用户选择。
常用的序列化方式有三种JDK序列化、FastJSON序列化和Kryo序列化。其中Kryo序列化是性能最高的一种序列化方式但是需要注意的是Kryo序列化与JDK序列化不兼容因此在使用Kryo序列化时需要注意操作系统的类型及JDK的版本。
如果要对Redisson的序列化机制进行定制可以通过以下方式来实现。
// 基于Jackson序列化
SerializationConfig serialConfig config.getCodec().getSerializationConfig();
serialConfig.setJacksonObjectMapper(new ObjectMapper());// 基于FastJSON序列化
SerializationConfig serialConfig config.getCodec().getSerializationConfig();
serialConfig.setSerializer(com.alibaba.fastjson.JSON).setDecoder(com.alibaba.fastjson.JSON);// 基于Kryo序列化
SerializationConfig serialConfig config.getCodec().getSerializationConfig();
Kryo kryo new Kryo();
kryo.register(User.class);
kryo.register(Order.class);
kryo.register(Item.class);
kryo.register(ArrayList.class);
kryo.register(LinkedList.class);
kryo.register(RedisCommand.class);
UnicornKryoPool pool new UnicornKryoPoolImpl(kryo);
serialConfig.setKryoPool(pool);具体使用方式如下 //使用json序列化方式Codec codec new JsonJacksonCodec();config.setCodec(codec);至此单机模式下的基于Redission和注解实现的幂等控制就实现了后面会将redis集群以及哨兵模式下的实现方式进行实现。 git地址https://gitee.com/ninesuntec/distributed-locks.git