建设营销型网站不足之处,途牛网站建设的基本特点,安阳网站怎么优化,制作公众号网站开发《引言》 #xff08;中#xff09;篇将接着记录 Redis 实战篇 ——《黑马点评》#xff08;上#xff09;篇之后的学习内容与笔记#xff0c;希望大家能够点赞、收藏支持一下 ᕦ(#xff65;ㅂ#xff65;)ᕤ#xff0c;谢谢大家。 传送门#xff08;上#xff09;中篇将接着记录 Redis 实战篇 ——《黑马点评》上篇之后的学习内容与笔记希望大家能够点赞、收藏支持一下 ᕦ(ㅂ)ᕤ谢谢大家。 传送门上Redis 实战篇 ——《黑马点评》上 传送门中当-前-页 传送门下NULL 三、优惠卷秒杀
1. 全局唯一 ID 与秒杀 在如秒杀等业务场景中并发问题是其中的主要问题因为此时会产生许多并发的抢购操作请求而如果在表结构中使用自增主键 id 来保存购物信息会导致 id 的规律性太明显且在大用户量的基础下会产生数以亿计的数据量而单表是无法保存如此庞大的数据的。且订单 id 应该具有唯一性但将表进行拆分出现了分布式的存储时会违背其唯一性。 所以在这种情况下我们就要用到全局 ID 生成器其能在分布式系统下用来生成全局唯一 ID 的工具虽然我们是单体架构但因为数据量大所以也可以使用全局 ID 生成器。其要满足下列特性
唯一性ID 必须唯一如订单业务中不能出现重复 ID。高可用确保任何时候都能生成可用的 ID。高性能能够正确生成 ID且生成效率足够快。递增性确保整体是逐渐变大的尽量符合数据库 ID 自增的特性便于数据库创建索引提高查询速度。安全性ID 的规律需要确保不被猜测出来。、 而 Redis 几乎满足了所有的特性但其安全性无法通过其自增数值来保证但可以不直接使用 Redis 自增的数值而是通过拼接字符串的形式充当 ID。 ID使用 Java 中的 Long 类型占 8 字节64 个 bit 位。
符号位1 bit第一个 bit 位表示永远是正数。时间戳31 bit定义一个初始时间单位秒记录下单时间与初始时间相差多少秒用于增加 ID 的复杂度。序列号32 bit防止时间戳重复的情况增加 ID 的复杂度。 1.1. Redis 实现全局唯一 ID 想要使用 Redis 实现全局唯一 ID 的功能我们需要分成三步实现
生成时间戳生成序列号进行拼接 // 2024-1-1 00:00:00
private static final long BEGIN_TIMESTAMP 1704067200L;//1.生成时间戳
LocalDateTime now LocalDateTime.now();
long nowSecond now.toEpochSecond(ZoneOffset.UTC);
long timeStamp nowSecond - BEGIN_TIMESTAMP; ● 第一步使用 LocalDateTime 获取到当前时间后使用 toEpochSecond 方法转化为自 1970-1-1 00:00:00 到当前时间为止的秒数最后用当前的秒数减去我们规定好的起始时间就得到了时间戳。 //2.生成序列号
String date now.format(DateTimeFormatter.ofPattern(yyyy:MM:dd));
long increment stringRedisTemplate.opsForValue().increment(icr: keyPrefix : date); ● 第二步使用 Redis 里 String 类型中的自增长方法increment来确保 ID 的递增性且因为键 icr:业务名 以业务名为 key但一个 key 的 value 不断自增存在达到上限的风险所以我们需要在其后跟上时间来完善其还能起到以时间为分隔统计的作用。 //需要左移的位数
private static final int COUNT_BITS 32;//3,拼接
return timeStamp COUNT_BITS | increment; ● 第三步利用位运算进行拼接因为序列号共有 32 个bit位所以要左移 32 位且左移后位上全为 0做或运算相当于加上 increment。 完整代码如下 Resource
private StringRedisTemplate stringRedisTemplate;
private static final long BEGIN_TIMESTAMP 1704067200L;
private static final int COUNT_BITS 32;
public long nextId(String keyPrefix) {//1.生成时间戳LocalDateTime now LocalDateTime.now();long nowSecond now.toEpochSecond(ZoneOffset.UTC);long timeStamp nowSecond - BEGIN_TIMESTAMP;//2.生成序列号String date now.format(DateTimeFormatter.ofPattern(yyyy:MM:dd));long increment stringRedisTemplate.opsForValue().increment(icr: keyPrefix : date);//3,拼接return timeStamp COUNT_BITS | increment;
} 1.2. 优惠卷的分类 在项目中优惠卷分为普通优惠券与限时秒杀优惠券一个是永久有效的、一个是限时限量抢购的。而在数据库中与之对应的存在两个表一个是普通优惠券的信息表、一个是与此表的 id 一一对应关联的特殊优惠券表其在普通优惠券的信息基础上又多了数量、开始时间、结束时间等等信息。 我们可以通过接口 http://localhost:8081/voucher/seckill 来新增优惠券其中金额的单位为分添加一个限时优惠券。注意设定的时间过期后将无法显示且与之绑定商铺 Id 为 1 最终效果如下 可以看到成功的添加了限时代金券。 1.3. 实现下单功能 同样的想要实现下单的功能也需要我们分步进行。但这只是简单的实现下单功能其中还存在着诸多的多线程并发问题。
查询优惠券信息判断是否过期或未开始判断库存是否充足充足则扣减库存创建订单后返回订单ID //1.查询优惠券
SeckillVoucher voucher seckillVoucherServices.getById(voucherId)
//2.判断是否结束
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {//尚未开始return Result.fail(秒杀尚未开始);
}
//3.判断是否开始
if (voucher.getEndTime().isBefore(LocalDateTime.now())){return Result.fail(秒杀已经结束);
} ● 第一步根据传来的优惠券 Id 查询相关信息后分别判断其是否尚未开始或者已经过期。如果不符合则返回相应的错误信息。 //4.判断开始后库存是否充足
if (voucher.getStock() 1) {//4.1 库存不足return Result.fail(库存不足);
}
//4.2 库存充足扣减库存
boolean success seckillVoucherServices.update().setSql(stock stock - 1).eq(voucher_id, voucherId).update();
if (!success){return Result.fail(库存不足);
} ● 第二步判断库存是否充足如果库存数小于 1则表示库存不足需返回错误信息反之则扣减库存。其中 setSql 用于自定义一个 SQL 语句在 voucher_id 相等时将库存 stock 减 1。并对返回的结果进行判断如果失败则返回错误信息。 //5.创建订单
VoucherOrder voucherOrder new VoucherOrder();
//5.1 订单Id
long orderId redisIdWorker.nextId(order);
voucherOrder.setId(orderId);
//5.2 用户Id
Long userId UserHolder.getUser().getId();
voucherOrder.setUserId(userId);
//5.3 代金券Id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
//返回订单Id
return Result.ok(orderId); ● 第三步扣减库存后创建订单信息并写入数据库中其中的订单 Id 使用之前写好的使用 Redis 实现的全局唯一 ID 生成器来生成用户 Id 则从 UserHolder 工具类中获取其用于在用户登录成功后保存用户信息代金券 Id 就是将传入的参数直接使用即可最后保存订单信息到数据库后返回生成的订单 Id 号。 至此我们成功的实现了简单的下单功能。但其内部存在着许多的多线程并发问题尤其是当大量线程抢购时会出现超卖问题就是订单数超出了库存数。如下所示当线程 2 在线程 1 扣减前进行查询时就会导致数据不同步造成超卖问题。 想要解决这个问题首先想到的方案就是加锁。而锁分为悲观锁与乐观锁两种方案。其中悲观锁认为线程安全问题一定会发生所以会将所有线程串行执行这样会导致性能降低在高并发的情况下不适用而乐观锁则认为线程安全问题不一定会发生它只会在更新数据时去判断数据有没有被修改来确定是否存在线程安全问题所以性能相较于悲观锁会好一点。 所以在比对二者之间的优异后选择使用乐观锁来解决而乐观锁常见的方式有两种
版本号法使用版本号来进行校验每次修改之前对版本号进行比对并在修改后将版本号加一。CAS 法运用数据本身有无变化来判断线程是否安全就是使用数据本身来替代版本号省去了版本号。 最终选择使用 CAS 法来解决多线程并发的问题。
1.将原先扣减库存的语句新增对 stock 库存的判断判断其是否在扣减前被修改过。
//4.2 库存充足扣减库存
boolean success seckillVoucherServices.update().setSql(stock stock - 1).eq(voucher_id, voucherId).eq(stock, voucher.getStock()).update(); 但在经过 jmter 测试后发现产生了大量的异常情况。其原因是由于线程之间存在误差如果线程 1 在线程 2 扣减前提前扣减了库存则会导致线程 2 无法扣减库存。究其原因还是对数据的校验太过于严谨等值对于该业务我们可以只用判断其是否还大于 0 即可。
//4.2 库存充足扣减库存
boolean success seckillVoucherServices.update().setSql(stock stock - 1).eq(voucher_id, voucherId).gt(stock, 0).update(); 最后完美达到预期解决了高并发下的超卖问题。 但对还是有问题在业务要求中限量代金券应当只允许一人一·单一个用户不能购买多次因为限量代金券的目的是为了吸引新客户购买所以需要对每个用户的购买数进行校验如果订单号中存在对应的用户 Id则返回错误信息拒绝收售卖。
//优化一人一单
Long userId UserHolder.getUser().getId(); //将后面创建订单处的获取 userId 移到此处
Integer count query().eq(user_id, userId).eq(voucher_id, voucherId).count();
if (count 0) {//用户已购买过return Result.fail(用户已购买过);
} 在扣减库存操作前对订单是否存在中用户 Id 的订单数量进行校验如果查询出数量大于 0则代表该用户已经购买过代金券了直接返回错误信息。 但是!ψ(*ー´)ψ在进行校验后依然存在问题仍然是在多线程下产生的并发问题。因为在多线程的操作下仍会存在线程同时去操作。所以这里就需要使用悲观锁插入数据来确保线程的安全性。 ● 第一步将对订单的校验和创建操作封装为一个方法。CtrlAtlM ● 第二步将 seckillVoucher 方法上的 Transactional 更改到生成的方法上。 ● 第三步对用户 Id 加锁与锁定整个方法而言可以减小锁定资源范围。但在以 Id 为锁的情况下需要比较的是 Id 的值而非每次调用时生成的新的 Id 对象且因为 toString() 方法其底层是返回新创建的 String 字符串每次调用返回的都是新的字符串对象。所以就需要 intern 方法字符串方法来将字符串规范化就是返回在字符串常量池中与字符串的值一样的字符串的地址。保证 Id 一样时锁也一样。 ● 第四步对于锁锁定的范围如果只锁定提取的方法中的操作则会导致在锁释放会事务才会提交这样同样会产生并发问题。所以需要在事务提交之后再释放锁在调用方法外去加锁方法结束也就是事务提交之后才会释放锁。
Long userId UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {//提取的方法return createVoucherOrder(voucherId);
} 但注意此时会导致 Spring 事务失效原因是此时调用方法的是 this.createVoucherOrder此处的 this 并不是代理对象但事务被非代理对象调用时会导致 Spring 事务失效因为其依赖于代理对象去管理事务。所以我们需要拿到当前对象的代理对象。 Spring 事务失效的几种情况通常包括 方法没有被Spring管理。 方法不是public的。 目标对象没有被Spring容器管理即没有通过Autowired/Inject等注解注入。 接口的实现类上没有配置事务注解。 事务方法内调用了本类中未标记事务的方法。 异常类型不被当前的事务设置所识别或配置错误例如rollbackFor属性设置不当。 数据库本身不支持事务或者不在同一个事务上下文中。 事务方法被非代理对象调用即没有通过Spring生成的代理对象调用。 异步方法Async内部不支持事务。 异常发生在事务提交阶段导致事务已经被提交无法回滚。 我们使用 AopContext 中的 currentProxy 方法来获取到代理对象后使用该对象调用提取的方法即可保障事务生效。
Long userId UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {//获取与事务有关的代理对象IVoucherOrderService proxy (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);
} 之后需要在接口中创建好提取的方法并且添加新的依赖——aspectjweaver以及在启动类上添加 EnableAspectJAutoProxy(exposeProxy true) 去暴露代理对象默认不暴露无法获取代理对象。
!-- aspectJ --
dependencygroupIdorg.aspectjartifactIdaspectjw
/dependency 最后经过 jmter 的测试记得需要及时更新请求头 authorization后虽然我这里未知原因异常百分比是 100%不知道啥原因 ┗( ▔, ▔ )┛但在数据库中确实只扣减了一次库存且只有一个订单。 但~~┓(;´_)┏以上情况仅限于单机模式下才会成功起到效果而在集群模式或分布式系统的情况下因为锁的原理是在 JVM 内部维护了一个锁的监视器对象但由于集群模式下不同的部署都有自己的 JVM且不同 JVM 下锁监视器不同。导致出现并发问题。而下面我们就需要使用分布式锁来解决集群模式下的并发问题。注意开启了 2. 分布式锁 而在集群模式或分布式系统下上面实现的锁就不会起到太大的作用了这就需要用分布式锁了。分布式锁就是满足分布式系统或集群模式下多进程可见并且互斥的锁。其中重点就是多线程可见而常见的实现分布式锁的方法就是使用 MySQL、Redis、Zookeeper 等等来实现
MySQLRedisZookeeper互斥利用自身的互斥锁机制利用 setnx 这样的互斥命令利用节点的唯一性和有序性实现高可用好好好高性能一般好一般安全性断开连接后自动释放锁利用锁的超时时间到期后释放临时节点断开连接后自动释放 而在分析其优点缺点后选择使用 Redis 来实现分布式锁这不本来也是学 Redis 的...。 我们需要使用 Redis 中的 setnx 命令来实现分布式锁详细实现思路如下所示
1. 获取锁使用 setnx 命令保证只有一个线程能操作成功且在成功时返回 true失败是返回 false 保证是非阻塞的。其中使用 NX不存在时创建保证互斥EX设置过期时间保证锁能被及时释放。
2. 释放锁我们可以通过手动释放手动删除也可以等过期时间自动释放。 2.1. 分布式锁初级版本 这里只是简单的使用 Redis 来实现分布式锁的功能我们定义一个接口其中包括获取锁和释放锁两个方法并在实现类中实现相应的方法。
定义的接口如下所示
public interface ILock {/*** 尝试获取锁* param timeoutSec 锁的超时时间过期后自动释放* return true代表获取锁成功false代表获取锁失败*/boolean tryLock(long timeoutSec);/*** 释放锁*/void unlock();
}
尝试获取锁的方法如下所示
private String name;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name name;this.stringRedisTemplate stringRedisTemplate;}private static final String KEY_PREFIX lock:;Overridepublic boolean tryLock(long timeoutSec) {Long threadId Thread.currentThread().getId();Boolean success stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX name, threadId , timeoutSec, TimeUnit.SECONDS);return BooleanUtil.isTrue(success);} 其中因为此类不是由 Spring 管理的所以 StringRedisTemplate 只能使用构造函数为其赋值而不能注入。下面的 KEY_PREFIX 是定义好的键的前缀。在方法内我们先获取到当前线程的 ID 标示使用 setIfAbsent 方法获取锁并将线程 ID 保存到锁中其保证了互斥同时设置了过期时间。且此方法对返回结果进行了判断无需判断是 ok 或是 nil。但由于直接返回包装类存在拆箱问题所以这里使用 isTrue 方法进行判断后返回结果其方法底层实现如下图所示。 释放锁的方法如下所示 Overridepublic void unlock() {stringRedisTemplate.delete(KEY_PREFIX name);} 相对而言释放锁的实现逻辑很简单只需将其删除即可。但后面肯定会出问题要改造的o(╥﹏╥)o
紧接着我们就需要对原先的锁逻辑进行改造了。
Long userId UserHolder.getUser().getId();
//创建锁对象
SimpleRedisLock lock new SimpleRedisLock(order: userId, stringRedisTemplate);
//获取锁
boolean isLock lock.tryLock(1200);
//判断是否获取锁成功
if (!isLock) {//获取锁失败返回失败或者重试return Result.fail(不允许重复下单);
}
try {//获取与事务有关的代理对象IVoucherOrderService proxy (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);
} finally {//释放锁lock.unlock();
} 首先将我们写好的类创建出来其中使用业务名 order 拼接上用户 Id 来确保不同的用户能获取到不同的锁。获取锁失败后返回错误信息获取锁成功后执行后续逻辑将其使用 try finally 包围保证锁最终会被释放。 这样就实现了分布式锁的初级版本。耶()V 2.2. 改进分布式锁——误删问题 在一般情况下我们实现的初级版本并不会产生问题但~在一些十分巧合的情况下存在误删的问题。 在线程一获取到锁后业务如果阻塞了造成锁超时释放此时线程二获取锁就可以成功此时的锁是线程二的锁但如果此时线程一完成业务后会将线程二的锁释放从而导致误删使其他线程还能再次获取锁成功。究其原因还是因为在释放锁的逻辑中只是简单的删除并没有进行校验。 所以在解决误删问题时我们就需要将线程的标示存入锁中以便于在释放锁时对线程进行校验如果标示一致则继续释放如果标示不一致则不释放。且在集群模式下每一个 JVM 内部都会为维护一个递增的数字来做为线程 Id所以不同 JVM 下的线程 Id 可能出现相同的情况。所以还需要使用 UUID 来区分不同的线程。 改造获取锁
private static final String ID_PREFIX UUID.randomUUID().toString(true) -; 使用 hutool 包下的 UUID 工具类来生成相应的 UUID且由于其生成的字符串中会带有 - 所以在其 toString 方法内传入 true 表示去掉其中的 “ - ” 符号。 //获取线程标示
String threadId ID_PREFIX Thread.currentThread().getId(); 最后只需在获取线程标示时将其拼接在前面即可。
改造释放锁 在改造释放锁时我们只需在释放前加上对线程 Id 的校验即可
//获取线程标示
String threadId ID_PREFIX Thread.currentThread().getId();
//获取锁中标示
String id stringRedisTemplate.opsForValue().get(KEY_PREFIX name);
//判断是否一致
if (threadId.equals(id)) {stringRedisTemplate.delete(KEY_PREFIX name);
} 首先还是先获取到线程标示随后再获取到锁中标示并对两者进行比对如果相同则进行删除操作如果不同则不做任何操作。 至此成功解决了误删问题只需在释放时对线程标示进行校验即可防止线程误删不属于自己的锁。测试就免了吧我偷个懒(ェ;) 2.3. 改进分布式锁——原子性问题 虽然上面的改造已经杜绝了绝大多数问题的发生但是在极其极端的状况下还是会出现问题。好好好... 根本原因是因为我们释放锁中的校验与删除操作不是原子性的当校验完毕后由于 JVM 的垃圾回收机制可能导致线程阻塞在此如果阻塞的时间足够长使锁超时自动释放那么其他线程还是可以获取到锁从而再次产生误删问题的发生究其根本是因为校验与删除操作不具有原子性。 想要保证原子性可以使用 Redis 的事务去解决该问题但这种做法需要使用事务配合乐观锁进行校验实现操作较为复杂。我们可以使用 Lua 脚本解决该问题其原理是将校验与删除操作写入脚本中使用脚本执行多条命令从而保证其原子性。 其中 Lua 是一种轻量级的语言我们只需进行了解其基础语法即可。Lua 教程 | 菜鸟教程这里我就直接跳过不过多描述 Lua 的语法只需要知道其中可以通过 redis.call() 来执行 Redis 的命令如 redis.call(set, name, jack) 就等同于 set name jack。 Lua 脚本的内容如下所示可以在 IDEA 中下载一个插件 EmmyLua 来创建脚本文件。 -- 比较线程标示与锁中的标示
if (redis.call(get, KEYS[1]) ARGV[1]) then--释放锁return redis.call(del, KEYS[1])
end
return 0 在 Java 代码中我们可以先创建一个 DefaultRedisScript 并进行相应的配置来初始化脚本其中 setLocation 用于指定脚本文件setResultType 用于指定返回值类型。且使用静态代码块来初始化随着类的加载完成初始化无需重复加载提高了性能。
private static final DefaultRedisScriptLong UNLOCK_SCRIPT;static {UNLOCK_SCRIPT new DefaultRedisScript();UNLOCK_SCRIPT.setLocation(new ClassPathResource(unlock.lua));UNLOCK_SCRIPT.setResultType(Long.class);} 最后只需在原先释放锁的方法中使用 execute() 方法来调用初始化好的 lua 脚本即可其中传入参数分别为脚本对象、key 与锁的标示其中 key 需要转为 List 集合传入。
//调用 lua 脚本
stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX name),ID_PREFIX Thread.currentThread().getId()); 最后就成功解决了分布式锁的原子性问题其校验与删除操作在一行代码中完成保证了其原子性。 2.3. Redisson 组件 虽然已经解决了一些潜在的问题但其实上述的分布式锁还是存在着许多的问题如不可重入、不可重试、超时释放、主从一致性等问题解决起来较为麻烦。所以就需要我们使用一个全新的组件——Redisson它包含了许多的分布式服务其中就包括分布式锁。 想要使用 Redisson第一步就是引入对应的依赖
!-- redisson --
dependencygroupIdorg.redisson/groupIdartifactIdredisson/artifactIdversion3.16.2/version
/dependency 第二步就是将其配置成一个 Bean并在其中进行对应的配置
Configuration
public class RedissonConfig {Beanpublic RedissonClient redissonClient(){//配置Config config new Config();config.useSingleServer().setAddress(redis://127.0.0.1:6379).setPassword(123456);return Redisson.create(config);}
} 其中在配置类上加 Configuration 注解其中的 Config 类是 redisson 提供的使用其来设置地址与密码配置完成后就可以在代码中去使用它了。 第一步注入 RedissonClient 后获取锁对象
Resource
private RedissonClient redissonClient;
//Redisson 改造
//创建锁对象
RLock lock redissonClient.getLock(lock:order: userId); 第二步尝试获取锁
//获取锁
boolean isLock lock.tryLock(); 其中可以传入三个参数如果无参则 waitTime 默认为 -1失败后不等待立即返回且默认过期时间为 30 秒。 这里就不记录 Redisson 组件各功能的底层原理了偷懒偷懒ψ(*ー´)ψ 3. 秒杀优化 为了在代码中进行优惠券秒杀时进一步提高并发性能需要经过多个步骤来进行改进其中又分为简单操作与复杂操作各自的耗时不同而在业务中其串行执行对效率存在一定的影响。 我们可以将其进行拆分耗时短的校验操作的由当前线程去处理耗时长的读写数据库操作可以交给另一线程去处理这样就可以大大提高业务的性能同时在校验时又将读取数据库改为在 Redis 中进行校验因 Redis 的性能要高于 MySQL也提高了校验操作的性能。将同步的数据库操作变为了异步的操作大大提高了性能。 3.1. 基于 Redis 实现秒杀优化 基于 Redis 进行秒杀优化需要分为四步进行同时使用了异步操作提高了并发的性能。
在新增秒杀优惠券的同时将其信息保存到 Redis 中基于 Lua 脚本保证原子性对库存进行判断一人一单抢购成功后将优惠券 Id 和用户 Id 封装后存入阻塞队列中开启新线程不断从阻塞队列中获取信息实现下单功能 // 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
// 保存秒杀信息到 Redis 中
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY voucher.getId(), voucher.getStock().toString()); ● 第一步在原先保存秒杀优惠券的代码下将其信息中的库存数保存到 Redis 中便于在查询时直接查询 Redis提高效率。key 是前缀 优惠券 Idvalue 是优惠券的库存数量。 lua 脚本seckill.lua
local voucherId ARGV[1]
local userId ARGV[2]
-- 库存key
local stockKey seckill:stock: .. voucherId
-- 订单key
local orderKey seckill:order: .. voucherId
-- 判断库存是否充足
if (tonumber(redis.call(get, stockKey)) 0) then-- 库存不足return 1
end
-- 判断用户是否存在于已购买记录中
if (redis.call(sismember, orderKey, userId) 1) then-- 用户已经购买过return 2
end
-- 库存 -1
redis.call(incrby, stockKey, -1)
-- 添加用户 Id 到已购买集合
redis.call(sadd, orderKey, userId)
-- 秒杀成功
return 0
VoucherOrderServiceImpl 类
//封装 Lua 脚本对象
private static final DefaultRedisScriptLong SECKILL_SCRIPT;
static {SECKILL_SCRIPT new DefaultRedisScript();SECKILL_SCRIPT.setLocation(new ClassPathResource(seckill.lua));SECKILL_SCRIPT.setResultType(Long.class);
}
seckillVoucher 方法
Long userId UserHolder.getUser().getId();
// 1.执行 Lua 脚本
Long result stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(),userId.toString());
int r result.intValue();
if (r ! 0) {return Result.fail(r 1 ? 库存不足 : 不能重复下单);
} ● 第二步在原先的优惠券秒杀方法中先去执行 Lua 脚本判断用户是否具有购买资格最后使用三元运算符对返回结果进行判断并返回相应错误信息。 //声明阻塞队列 传入队列容量
private BlockingQueueVoucherOrder orderTasks new ArrayBlockingQueue(1024 * 1024);
//声明 proxy 成员变量
private IVoucherOrderService proxy;
// 返回值为 0表示有购买资格,生成订单信息
VoucherOrder voucherOrder new VoucherOrder();
long orderId redisIdWorker.nextId(order);
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
// 放入阻塞队里
orderTasks.add(voucherOrder);
//获取代理对象
proxy (IVoucherOrderService) AopContext.currentProxy();
return Result.ok(orderId); ● 第三步判断为具有购买资格后生成订单信息并将其放入阻塞队列中同时因为在子线程中无法获取到父线程的代理对象获取代理对象通过 ThreadLocal 实现但每个线程都有自己独立的 ThreadLocal而 Spring 的事务管理需要父线程代理对象实现所以需要在父线程中提前获取到代理对象这里将其声明为成员变量使其可以在子线程中使用。最后直接返回订单 Id代表秒杀成功提高并发性能。 private static final ExecutorService SECKILL_ORDER_EXECUTOR Executors.newSingleThreadExecutor();PostConstruct
private void init() {SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}private class VoucherOrderHandler implements Runnable {Overridepublic void run() {while (true){try {//1.获取队列中的订单信息VoucherOrder voucherOrder orderTasks.take();//2.创建订单handleVoucherOrder(voucherOrder);} catch (Exception e) {log.error(获取订单信息出错, e);}}}
}private void handleVoucherOrder(VoucherOrder voucherOrder) {Long userId voucherOrder.getUserId();Long voucherId voucherOrder.getVoucherId();//创建锁对象RLock lock redissonClient.getLock(lock:order: userId);//获取锁boolean isLock lock.tryLock();//判断是否获取锁成功if (!isLock) {//获取锁失败返回失败或者重试log.error(不允许重复下单);}try {proxy.createVoucherOrder(voucherOrder);} finally {//释放锁lock.unlock();}
} ● 第四步创建一个线程池 ExecutorService之后再创建一个线程任务Runnable其中不断的读取阻塞队列中的订单信息take() 就是实现阻塞等待的方法。同时为了在秒杀开始之前执行该操作通过在 init 方法上加 PostConstruct 注解使其在当前类初始化完毕后执行 init 方法。 take() 方法 将创建订单封装为一个函数handleVoucherOrder流程与之前的步骤相同区别在于需要从传入的订单信息中获取对应的信息且出错只需记录错误日志即可。而方法中调用的 createVoucherOrder() 方法也需要改造。
Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {Long userId voucherOrder.getUserId();//一人一单Integer count query().eq(user_id, userId).eq(voucher_id, voucherOrder.getVoucherId()).count();if (count 0) {//用户已购买过log.error(用户已购买过);}//库存充足扣减库存boolean success seckillVoucherServices.update().setSql(stock stock - 1).eq(voucher_id, voucherOrder.getVoucherId()).gt(stock, 0).update();if (!success) {log.error(库存不足);}//创建订单save(voucherOrder);
} 其中不用返回结果只需记录错误日志即可。同时不再从 ThreadLocal 中获取用户 Id 而是从传入的订单信息中获取。 效果展示 这里仅使用 ApiFox 单用户测试演示jmter 的多人秒杀测试因为需要大量 token 比较麻烦就不进行了。 第一次抢购成功返回订单 Id同时 Redis 与 MySQL数据库中的库存数减一并保存了对应的订单信息。 再次点击发送抢购请求时返回错误信息“不能重复下单”因在 Lua 脚本中会对传入的用户 Id 判断其是否存在于已购买用户的行列中从而保证一人一单。 注意测试时 Redis 中需要有对应优惠券的库存数如果测试的是代码修改前已存在的优惠券需要手动写到 Redis 中。 3.2. Redis 消息队列实现秒杀优化 虽然已经利用 Redis 实现了异步秒杀的功能但主要是利用 JVM 的阻塞队列实现的异步秒杀功能其会存在一些问题。由于 JVM 具有内存限制在高并发的情况下可能会导致内存超出上限如果从阻塞队列取出数据时出现异常会导致任务未处理和数据丢失的问题。 而解决以上问题的最佳方法就是使用消息队列加一层ㄟ( ▔, ▔ )ㄏ 其能发送、存放、接收消息分别对应其中的三个角色生产者、消息队列、消费者。最常见的消息队列中间件就是 RabbitMQ 在我的文章 【MQ】学习笔记——消息队列MQ之 黑马RabbitMQ 中就记录了学习内容。 而 Redis 中也提供了实现消息队列的方式一共有三种
1. List基于 List 结构来模拟消息队列双向链表先进先出 如左进右出 可以看做是利用 List 来模拟阻塞队列满足基本需求的消息队列且通过 BRPOP 或 BLPOP 来实现阻塞效果。
优点
利用 Redis 进行存储不再受限于 JVM 的内存上限基于 Redis 的持久化机制数据的安全线有保障可以满足消息的有序性
缺点
无法避免消息丢失在取出消息的同时出现异常会导致数据丢失仅支持单消费者 2. PubSub基本的点对点模型于 Redis 2.0 版本引入的模型 除了上图中的两种命令发送、订阅PSUBSCRIBE pattern [pattern] 还可以订阅与通配符格式一致的所有频道其通配符格式要求如下所示 优点
采取发布订阅的模型支持多生产、多消费
缺点
不支持数据持久化无法避免消息丢失消息堆积有上限超出时数据会丢失 3. Stram较完善的消息队列模型于 Redis 5.0 版本引入的新数据类型
◆ 单消费者模式
发送消息——xadd 读取消息——xread 其中阻塞时间为 0 时代表一直阻塞直到读取到消息且消息可以被重复读取。但当想要使用读取最新消息$来实现持续监听队列时如果此时业务正在处理时又发送了许多消息那么可以获取到最新消息但前面的消息就会丢失。
优点
消息可以被重复读取到一个消息可以被多个消费者读取可以阻塞读取
缺点
有消息漏读的风险 ◆ 消费者组模式 消费者组就是将多个消费者划分到一个组中共同监听同一个队列。
特点
消息分流消费者组内具有竞争关系消息会分流给不同的消费者可以提高处理消息的速度。而如果想要消息被多个消费者消费可以添加多个消费者组。消息标示消费者组会维护一个标示记录最后一个被处理的消息即使消费者宕机重启仍能从标示之后读取到消息确保每一个消息都能被消费消息确认消费者获取消息后消息会处于 pending 状态并存入一个 pending-list 中当处理完成后需要通过 XACK 来确认消息标记消息为已处理后才会将其从 pending-list 中移除。避免消息丢失的问题。
语法 创建消费者组 XGROUP CREATE key groupName ID [MKSTREAM] key队列名称 groupName消费者组名称 ID起始 ID 标示$ 表示队列中最后一个消息0 表示队列中第一个消息 MKSTREAM队列不存在时自动创建队列 删除指定的消费者组 XGROUP DESTORY key groupName 给指定的消费者组添加消费者 XGROUP CREATECONSUMER key groupName consumerName 删除消费者组中指定的消费者 XGROUP DELCONSUMER key groupName consumerName 从消费者组读取消息 XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREANS key [key ...] ID [ID ...] group消费者组名 consumer消费者名称如果消费者不存在则自动创建一个 count本次查询的最大数量 BLOCK milliseconds当没有消息时的最长等待时间 NOACK无需手动 ACK获取消息后自动确认不建议设置 STREAMS key指定队列名称 ID获取消息的起始 ID 表示从下一个未消费的开始其它 则表示根据指定 ID 从pending-list 队列中获取已消费但未确认的消息 将未处理消息处理 XACK key group ID [ID ...] key队列名称 group消费者组名 ID消息的唯一 ID 获取 pending-list 中一定范围内未确认的消息 XPENDING key group [ [IDLE min-idle-time] start end count [consumer] ] key队列名称 group消费者组名 IDLE min-idle-time空闲时间获取以后确认之前 start end消息范围最大 ID ~ 最小 ID- 表示都获取 count获取消息的数量 consumer消费者名称每个消费者都有自己的 pending-list 3.3. 基于 Stream 结构实现秒杀优化 想要基于 Redis 中的 Stream 结构来实现秒杀优化。首先要做的就是在 Redis 中提前创建好一个消息队列。
XGROUP CREATE stream.order g1 0 MKSTREAM 这里我们创建一个名为 stream.order 的消息队列。其中的消费者组名为 g1起始 ID 标识为 0MKSTREAM 表示队列不存在时自动创建。 local voucherId ARGV[1]
local userId ARGV[2]
local orderId ARGV[3]
-- 库存key
local stockKey seckill:stock: .. voucherId
-- 订单key
local orderKey seckill:order: .. voucherId
-- 判断库存是否充足
if (tonumber(redis.call(get, stockKey)) 0) then-- 库存不足return 1
end
-- 判断用户是否存在于已购买记录中
if (redis.call(sismember, orderKey, userId) 1) then-- 用户已经购买过return 2
end
-- 库存 -1
redis.call(incrby, stockKey, -1)
-- 添加用户 Id 到已购买集合
redis.call(sadd, orderKey, userId)
-- 发送消息到消息队列中
redis.call(xadd, stream.orders, *, userId, userId, voucherId, voucherId, id, orderId)
-- 秒杀成功
return 0 ● 第一步在原先秒杀条件校验的 Lua 脚本中添加发送消息到消息队列的操作在判断秒杀成功的条件下将对应的订单信息用户 Id、优惠券 Id、订单 Id发送到消息队列中。这里需要新增一个参数接收订单 Id Override
public Result seckillVoucher(Long voucherId) {Long userId UserHolder.getUser().getId();long orderId redisIdWorker.nextId(order);// 1.执行 Lua 脚本Long result stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(),userId.toString(),String.valueOf(orderId));int r result.intValue();if (r ! 0) {return Result.fail(r 1 ? 库存不足 : 不能重复下单);}//获取代理对象proxy (IVoucherOrderService) AopContext.currentProxy();return Result.ok(orderId);
} ● 第二步在原先的秒杀方法中执行 Lua 脚本时传入生成的订单 Id且由于是基于 Stream 结构实现的消息队列所以不再需要将订单信息添加到阻塞队列中。 private class VoucherOrderHandler implements Runnable {Overridepublic void run() {while (true){try {//1.获取消息队列中的订单信息ListMapRecordString, Object, Object list stringRedisTemplate.opsForStream().read(Consumer.from(g1, c1),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(QUEUE_NAME, ReadOffset.lastConsumed()));if (list null || list.isEmpty()){//获取失败continue;}MapRecordString, Object, Object record list.get(0);MapObject, Object value record.getValue();VoucherOrder voucherOrder BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);//2.创建订单handleVoucherOrder(voucherOrder);// ACK确认stringRedisTemplate.opsForStream().acknowledge(QUEUE_NAME, g1, record.getId());} catch (Exception e) {log.error(获取订单信息出错, e);handlePandingList();}}}
} ● 第三步在原先的线程任务中不断尝试获取到消息队列中订单信息调用对应 Stream 类型的 read 方法。Consumer.from() 中传入消费者组名和消费者名。StreamReadOptions 代表可选参数其中 count 设置获取消息的个数block 设置阻塞的时间。StreamOffset 中指定队列名和读取消息的位置。 因为指定只取出一个消息所以只用获取返回集合中的第一个元素即可。取出的结构如上图所示因存入了三个键值对用户 Id、优惠券 Id、订单 Id所以使用 MapRecord 类型存储通过 getValue 方法取出后再使用 hutool 工具包中的 BeanUtill 将其内容填入订单对象中。接着在用之前的 handleVoucherOrder方法创建好订单后进行 ACK 手动确认已处理消息。
private void handlePandingList() {while (true){try {//1.获取消息队列中的订单信息ListMapRecordString, Object, Object pendingList stringRedisTemplate.opsForStream().read(Consumer.from(g1, c1),StreamReadOptions.empty().count(1),StreamOffset.create(QUEUE_NAME, ReadOffset.from(0)));if (pendingList null || pendingList.isEmpty()){//pending-list 中不存在未处理消息结束循环break;}MapRecordString, Object, Object record pendingList.get(0);MapObject, Object value record.getValue();VoucherOrder voucherOrder BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);//2.创建订单handleVoucherOrder(voucherOrder);// ACK确认stringRedisTemplate.opsForStream().acknowledge(QUEUE_NAME, g1, record.getId());} catch (Exception e) {log.error(处理pending-list订单异常, e);try {Thread.sleep(20);} catch (InterruptedException ex) {ex.printStackTrace();}}}
} 如果线程任务中抛出了异常导致已处理消息未被确认就会进入 handlePandingList 方法去检查 pending-list 队列中是否存在未确认消息只在获取消息时不再设置阻塞时间且从 0 开始获取。如果未获取到则推出循环如果出现异常则在休眠后继续尝试获取直到没有异常确保异常订单一定会被处理。 效果展示 这里仅使用 ApiFox 单用户测试演示还是一样的原因|ूω )记得将之前测试时的订单数据删除。 第一次发送请求后成功获取到订单 Id。 第二次发送请求后提示不能重复下单。 四、达人探店
1. 发布探店笔记 在该业务中我们只需完善已经实现好的两个接口即可分别是上传图片与发布接口。在 UploadController 中找到 uploadImag 方法注意其中的 IMAGE_UPLOAD_DIR 常量我们需要将其设置为本地对应 Nginx 目录下存放静态资源的 imgs 地址。 然后就没有了!!!∑(Дノ)ノ成功实现了发布探店笔记的功能这是学 Redis 的还学别的干什么大傻春┓(;´_)┏ 效果展示 注意开启 Nginx 的负载均衡后需要将两个服务全部启动否则会出现问题 可以看到成功将图片上传成功并且成功发布探店笔记。 但此时查看笔记的接口还没有实现所以要接着实现查看笔记的接口
GetMapping(/{id})
public Result queryBlogById(PathVariable(id) Long id) {return blogService.queryBlogById(id);
} 其接收笔记 Id利用笔记 Id 去查询对应笔记信息并将其返回。可以根据与之功能相同的根据热度分页查询笔记来实现。
Override
public Result queryBlogById(Long id) {Blog blog getById(id);if (blog null) {return Result.fail(笔记不存在);}queryBlogUser(blog);return Result.ok(blog);
}private void queryBlogUser(Blog blog) {Long userId blog.getUserId();User user userService.getById(userId);blog.setName(user.getNickName());blog.setIcon(user.getIcon());
} 通过 Id 查询到对应笔记信息并通过笔记中的用户 Id 查询用户的头像和昵称用于展示。最后返回笔记对象。 效果展示 注意店铺不显示是因为在前面做活动预热时没有将此店铺信息保存到 Redis 中从而导致店铺信息查询不到显示异常。 2. 点赞 我们知道点赞功能这一类操作频繁且简单的操作需要保证每人只能对一个笔记点赞一次。如果这种校验如果通过数据库来实现由于频繁的操作会对数据库造成很大的压力所以实现校验功能最好的方法就是通过 Redis 来实现类似一人一单的校验。
PutMapping(/like/{id})
public Result likeBlog(PathVariable(id) Long id) {return blogService.likeBlog(id);
} 首先将方法转移到 Impl 层去实现主要是利用 Set 数据类型保存已点赞的用户 Id在下次重复操作时从中使用 SISMEMBER 命令判断其是否存在。
//1.获取当前用户 Id
Long userId UserHolder.getUser().getId();
//2.判断当前用户是否已经点赞
String key BLOG_LIKED_KEY id;
Boolean isMember stringRedisTemplate.opsForSet().isMember(key, userId.toString()); 获取当前用户的 Id从 Redis 中判断该用户是否已经点赞过了。
if (BooleanUtil.isFalse(isMember)){//3.如果未点赞可以点赞boolean isSuccess update().setSql(liked liked 1).eq(id, id).update();if (isSuccess){stringRedisTemplate.opsForSet().add(key,userId.toString());}
}else {//4.如果已点赞则不点赞boolean isSuccess update().setSql(liked liked - 1).eq(id, id).update();if (isSuccess){stringRedisTemplate.opsForSet().remove(key,userId.toString());}
}
return Result.ok(); 结果一未点赞需要将点赞数 1 并存入数据库中接着将用户 Id 存入 Redis 中 结果二已点赞需要将点赞数 - 1 并存入数据库中接着将用户 Id 从 Redis 删除。 以上判断完成后返回成功。同时在判断完点赞操作后在查询时还需展示点赞成功后标志变为醒目红色标志。Blog 类中有字段 isLike 记录是否点赞。
/*** 是否点赞过了*/
TableField(exist false)
private Boolean isLike; 在查看笔记与分页查询的方法中添加 isBlogLiked 方法判断笔记是否被当前用户点赞。
//查询 blog 是否被点赞
isBlogLiked(blog);
------------------------------
// 查询用户
records.forEach(blog - {this.queryBlogUser(blog);this.isBlogLiked(blog);
});
private void isBlogLiked(Blog blog) {if (UserHolder.getUser() null) {//用户未登录时无需查询是否点赞return;}Long userId UserHolder.getUser().getId();String key BLOG_LIKED_KEY blog.getId();Boolean isMember stringRedisTemplate.opsForSet().isMember(key, userId.toString());blog.setIsLike(BooleanUtil.isTrue(isMember));
} 同理获取到当前用户的 Id 从 Redis 中查询是否已点赞如果已点赞则将 isLike 字段设为 true。注意在用户未登录时也会进行分页查询而此时无需对是否点赞进行判断否则会出现空指针异常 效果展示 可以看到最后成功实现了点赞的功能一个用户只能对一个笔记点赞一次。 3.点赞排行榜 想要实现点赞排行榜的功能就需要我们将原先保存用户 Id 的 Set 数据类型更改为 Sorted Set 数据类型进行存储我们可以通过将每个用户点赞时间设置为分数来进行排序展示而点赞时间越早展示则越靠前。
Override
public Result likeBlog(Long id) {//1.获取当前用户 IdLong userId UserHolder.getUser().getId();//2.判断当前用户是否已经点赞String key BLOG_LIKED_KEY id;Double score stringRedisTemplate.opsForZSet().score(key, userId.toString());if (score null){//3.如果未点赞可以点赞boolean isSuccess update().setSql(liked liked 1).eq(id, id).update();if (isSuccess){stringRedisTemplate.opsForZSet().add(key,userId.toString(), System.currentTimeMillis());}}else {//4.如果已点赞则不点赞boolean isSuccess update().setSql(liked liked - 1).eq(id, id).update();if (isSuccess){stringRedisTemplate.opsForZSet().remove(key,userId.toString());}}return Result.ok();
} ● 第一步对原先的点赞方法进行改造存储类型变为 ZSetJava 中 Sorted Set 为 ZSet其中 add 方法再添加时加上当前时间作为 score删除方法变为 remove同时使用 score 方法查询对应键值对是否存在分数来判断是否已点赞。 Override
public Result queryBlogLikes(Long id) {//1.查询 top5 的点赞用户SetString userTop5 stringRedisTemplate.opsForZSet().range(BLOG_LIKED_KEY id, 0, 4);if (userTop5 null || userTop5.isEmpty()){return Result.ok(Collections.emptyList());}//2.解析出其中的 idListLong userIds userTop5.stream().map(Long::valueOf).collect(Collectors.toList());String idStr StrUtil.join(,, userIds);//3.根据 id 查询用户ListUserDTO userDTOS userService.query().in(id, userIds).last(order by field(id, idStr )).list().stream().map(user - BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());//4.返回return Result.ok(userDTOS);
} ● 第二步在 Impl 层中实现查看点赞排行的功能。其中使用 range 方法范围查询获取排序在前五的用户 Id如果为空则返回空集合避免空指针问题将其用 stream 流解析后再从数据库中查询出相应用户信息再次利用 stream 流收集为集合返回。注意如果利用 mp 中的 list 方法会导致查询出的用户顺序不会通过从 Redis 中已经过时间排序得到的用户 Id 顺序进行返回所以需要手写 sql 语句利用 order by field 来返回对应 Id 顺序的用户信息 效果展示 可以看到成功展示了点赞的用户并且利用时间对点赞的前后顺序进行了展示。 【中】完结 传送门上Redis 实战篇 ——《黑马点评》上 传送门下NULL