建设厅国网查询网站,网站建设php招聘,到哪里做网站,福永网站的建设使用Redission分布式锁与Kafka消息队列#xff0c;实现学生抢课系统#xff08;高并发秒杀场景#xff09;。 目录 一、思路1.为频繁访问的信息设置缓存#xff08;1#xff09;登陆#xff08;2#xff09;课程任务信息#xff08;3#xff09;用户抢课记录 2.消息队…使用Redission分布式锁与Kafka消息队列实现学生抢课系统高并发秒杀场景。 目录 一、思路1.为频繁访问的信息设置缓存1登陆2课程任务信息3用户抢课记录 2.消息队列和分布式锁1抢课消息队列2锁定缓存抢夺3数量缓存操作 二、具体流程1.抢课任务设置2.用户抢课 三、实体类表四、核心代码1.消费端2.数量缓存操作逻辑 五、两种分布式锁1.基于redis命令的分布式锁a.加锁b.解锁c.局限性 2. redission分布式锁 参考文章 一、思路
1.为频繁访问的信息设置缓存
后台设置抢课任务配置抢课时间范围。一般情况下用户会在抢课时间开始前至抢课前期一段的时间集中访问系统。
1登陆
系统在登陆时会查询数据库返回用户信息所以需要对登陆接口进行改造将用户信息比如姓名、年级、班级、联系方式等保存至缓存中。当用户信息发生变更时才将信息从缓存中清除。
2课程任务信息
用户登陆进入系统后进入到抢课任务模块对应的抢课任务中进行课程选择。在抢课任务期间用户会为频繁的访问该模块信息抢课任务、关联课程、课程库存等后台在进行信息校验时也会用到以上数据。
3用户抢课记录
在抢课期间用户抢课退课操作和查询比较频繁可将该数据保存至缓存方便查询。
2.消息队列和分布式锁
为Kafka的抢课消息队列设置了一个主题五个分区可处理大量并发抢课消息的情况。然而在每个分区内消息是有序的不同分区中的消息无序会出现多个进程同时进行的情况而多个进程必须以互斥的方式独占共享资源课程库存。
为保证每一课程库存操作的独占性为课程库存设置了锁定缓存(LockKey)和数量缓存(StockKey)。 注意当消息抢夺到锁定缓存时才可对数量缓存进行扣除-1的操作。
1抢课消息队列
对用户抢课数据进行校验是否符合年级、已选课程日期冲突炖、课程班级人数限制、课程库存余量等校验通过后将请求数据发送至Kafka抢课消息队列。
2锁定缓存抢夺
此处使用了Redission分布式锁当同时有多个用户发送同一课程的消息时消费端接收到消息在5秒内尝试获取锁定缓存(LockKey)若获取成功加锁30秒否则失效。
3数量缓存操作
获取锁定缓存(LockKey)成功后查询数量缓存(StockKey)。此时需要对各项业务数据再次进行校验因为在数据进入消息队列前进程仍是并发的可能会出现数据已变动的情况。在满足抢课条件后取出数量缓存(StockKey)库存进行数量减一的操作操作完毕最终释放锁定缓存(LockKey)。
二、具体流程
1.抢课任务设置
1后台管理人员配置抢课任务信息开始时间、结束时间、课程、课程时间等。 2课程任务正式发布保存信息任务、关联课程、自定义课程可选人数等到缓存当任务取消发布时需要将对应的缓存删除。
2.用户抢课
1首次登陆查询用户信息并保存至缓存。 2进入抢课任务信息查看可选课程列表。 3点击抢课按钮发送请求。 4选课成功保存学生对应任务已选课程集合到缓存更新任务课程库存数量等缓存信息。 5退课更新学生已选课程缓存清除任务课程库存数量缓存。
三、实体类表
此处列举部分核心数据库表设计。
1.课程表
id课程名称课程编号课程教室课程简介教师id教师名称1舞蹈兴趣班C00011号楼6楼舞蹈教室面向0基础学生10001王老师2画图兴趣班C00021号楼2楼美术教室面向0基础学生10002李老师3音乐兴趣班C00033号楼1楼音乐教室面向0基础学生10003陈老师
2.选课任务表
id任务名称可选年级id集合可选班级id集合学生可选课程总数开始时间结束时间发布状态发布时间任务状态12023年秋季选课2019级id,2020级id2020级-1班id,2020级-2班id,2019级-3班id22023-08-01 09:00:002023-08-07 20:00:00已发布2023-07-20 09:00:00已结束22024年春季选课2019级id,2020级id32024-01-30 09:00:002024-01-07 20:00:00已发布2024-01-20 09:00:00进行中
3.选课任务课程关联表
id任务id课程id课程可选人数开始日期结束日期课程表json111502023-09-202023-12-30[{“name”:“周一”, “section”:[{“name”:“第5节”,“state”:“1”}]},{“name”:“周二”, “section”:[{“name”:“第3节”,“state”:“1”}]}]212502023-09-202023-12-30[{“name”:“周二”, “section”:[{“name”:“第3节”,“state”:“1”}]},{“name”:“周四”, “section”:[{“name”:“第5节”,“state”:“1”}]}]313502023-09-202023-12-30[{“name”:“周三”, “section”:[{“name”:“第5节”,“state”:“1”}]},{“name”:“周五”, “section”:[{“name”:“第5节”,“state”:“1”}]}]421502024-03-012024-05-30[{“name”:“周一”, “section”:[{“name”:“第5节”,“state”:“1”}]},{“name”:“周二”, “section”:[{“name”:“第3节”,“state”:“1”}]}]522502024-03-012024-05-30[{“name”:“周二”, “section”:[{“name”:“第3节”,“state”:“1”}]},{“name”:“周四”, “section”:[{“name”:“第5节”,“state”:“1”}]}]633502024-03-012024-05-30[{“name”:“周三”, “section”:[{“name”:“第5节”,“state”:“1”}]},{“name”:“周五”, “section”:[{“name”:“第5节”,“state”:“1”}]}]
4.学生选课关联表
id任务id课程id学生id选课状态老师帮选表示选课时间1111已选课是2023-08-01 09:01:002112已取消否2023-08-01 09:01:01
四、核心代码
1.消费端
1Kafka配置主题、分区初始化 2用户抢课数据初步通过校验发送到消息队列中的处理逻辑。
Component
Slf4j
public class KafkaConsumer {
/*** 初始化学生选课主题分区 5个* 通过注入一个 NewTopic 类型的 Bean 来创建 topic如果 topic 已存在则会忽略。*/Beanpublic NewTopic courseSelectionBatchTopic() {log.info(创建学生选课主题courseSelectionBatchTopic : szxy_oa_course_selection_student_add_topic分区5副本数1 );NewTopic newTopic new NewTopic(OaConstant.COURSE_SELECTION_STUDENT_ADD_TOPIC, 5, (short) 1);log.info(newTopictopicName{}分区: {} , newTopic.name(), newTopic.numPartitions());return newTopic;}/***添加学生选课主题消息*/KafkaListener(topics OaConstant.COURSE_SELECTION_STUDENT_ADD_TOPIC,groupId KafkaProducer.TOPIC_GROUP)public void courseSelectionStudentAddMsg(ConsumerRecord?, ? record, Acknowledgment ack, Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {log.info(COURSE_SELECTION_STUDENT_ADD_TOPIC-学生选课队列消费端 topic:{}, 收到消息, topic);try {Optional message Optional.ofNullable(record.value());if (message.isPresent()) {Object msg message.get();// 先判断消息是否已经已经消费过5sString fullKey2 redisLockUtil.getFullKey(COURSE_SELECTION_STUDENT_CONSUME_LOCK_PREFIX , String.valueOf(msg));if(redisLockUtil.getLock(fullKey2 , 5000)){// 获得学生选课入参CourseSelectionStudentParam param objectMapper.readValue(String.valueOf(msg), CourseSelectionStudentParam.class);// 选课任务课程 key 任务id 任务课程idLong taskId param.getTaskId(); // 选课任务idLong taskcourseId param.getTaskcourseId(); // 选课任务课程idString key taskId :: taskcourseId;// 获得课程容量库存锁任务id 任务课程id才可以操作库存缓存String fullKey redisLockUtil.getFullKey(COURSE_SELECTION_STUDENT_LOCK_PREFIX, key);final RLock lock redissonClient.getLock(fullKey);// 尝试抢【课程容量库存锁】锁时间调整 5sboolean bool lock.tryLock(5, 30, TimeUnit.SECONDS); // 5s内尝试加锁加锁成功后30s失效if (bool) {tCourseSelectionStudentService.handleCourseSelectionStudent(param, lock);log.info(courseSelectionStudentAddMsg 取得更新库存锁消费了 Topic: topic ,Message: String.valueOf(msg));}}else {log.info(courseSelectionStudentAddMsg 已经被消费 Topic: topic ,Message: String.valueOf(msg));}}} catch (Exception e) {e.printStackTrace();log.error(解析 OaConstant.COURSE_SELECTION_STUDENT_ADD_MESSAGE_KAFKA_TOPIC 数据异常);} finally {ack.acknowledge();}log.info(COURSE_SELECTION_STUDENT_ADD_TOPIC-学生选课队列消费端 消费结束 );}
}2.数量缓存操作逻辑
通过Redission分布式锁抢夺锁定缓存后的处理逻辑。 /*** 处理学生抢课消息* 入库并修改库存缓存* 编辑学生选课列表redis* 通过验证后发送消费消息需要再次校验redis是否已选课程* 加入锁对象*/Overridepublic void handleCourseSelectionStudent(CourseSelectionStudentParam param, RLock lock) {Date current new Date();// 库存余量判断 - 查询库存keyString stockKey COURSE_SELECTION_STUDENT_STOCK_PREFIX param.getTaskId() :: param.getTaskcourseId();String stockNumStr redisTemplate.opsForValue().get(stockKey);log.info(handleCourseSelectionStudent-学生抢课添加处理开始 key{}stockNum{}stuIndentityId{}, stockKey, stockNumStr, param.getStuIdentityId());if (StringUtils.isBlank(stockNumStr)) {log.error(handleCourseSelectionStudent-学生抢课添加处理未查询到库存 key{}stockNum{}, stockKey, stockNumStr);// 释放锁if (lock.isLocked() lock.isHeldByCurrentThread()) {// 是当前执行线程的锁lock.unlock();}return;}int num Integer.parseInt(stockNumStr) - 1;if (num 0) {log.error(handleCourseSelectionStudent-学生抢课添加处理库存余量不足key{}stockNum{}studentNum{}, stockKey, stockNumStr, 1);// 释放锁if (lock.isLocked() lock.isHeldByCurrentThread()) {// 是当前执行线程的锁lock.unlock();}return;}// 入库前再次校验是否已选该课程// 查询对该课程的选课结果 redis key taskcourseId classId stuIdentityId防止多次重复点击选课String couseSelectionStudentPkIdKey2 CourseSelectionConstants.COURSE_SELECTION_CLASS_STUDENT_PKID_PREFIX param.getTaskcourseId() : param.getClassId() : param.getStuIdentityId();String courseSelectionPkId redisTemplate.opsForValue().get(couseSelectionStudentPkIdKey2);if (StringUtils.isNotBlank(courseSelectionPkId)) {log.error(handleCourseSelectionStudent-学生抢课添加处理已存在学生选课记录key{}taskcourseId{}stuIndentityId{}, couseSelectionStudentPkIdKey2, param.getTaskcourseId(), param.getStuIdentityId());// 释放锁
// redisLockUtil.delLock(redisLockUtil.getFullKey(COURSE_SELECTION_STUDENT_LOCK_PREFIX, param.getTaskId() :: param.getTaskcourseId()));if (lock.isLocked() lock.isHeldByCurrentThread()) {// 是当前执行线程的锁lock.unlock();}return;}// 入库前再次校验所属班级是否有剩余人数// 查询是否超过班级可选限制if (!0.equals(param.getClassLimit())) {// 校验是否超过班级可选人数班级可选人数配置从taskcourse中获取int classSelectNum 0;String classLimitKey CourseSelectionConstants.COURSE_SELECTION_TASKCOURSE_CLASSLIMIT_PREFIX param.getOrgId() : param.getTaskId() : param.getTaskcourseId() : param.getClassId();String classLimitStr redisTemplate.opsForValue().get(classLimitKey);if (StringUtils.isNotBlank(classLimitStr)) {classSelectNum Integer.parseInt(classLimitStr);}// 校验是否超过任务选课数量上限// 查询学生当前任务的课程记录列表已选 已取消 redis中获取String stuCourseListKey3 CourseSelectionConstants.COURSE_SELECTION_STUDENT_COURSELIST_PREFIX param.getOrgId() : param.getTaskId() : param.getStuIdentityId();String stuCourseStr3 redisTemplate.opsForValue().get(stuCourseListKey3);ListCourseSelectionTaskcourseVo selectionCourseList new ArrayList();if (StringUtils.isNotBlank(stuCourseStr3)) {selectionCourseList JSONUtil.toList(JSONUtil.toJsonStr(stuCourseStr3), CourseSelectionTaskcourseVo.class);if (CollectionUtil.isNotEmpty(selectionCourseList)) {// 筛选出是已选状态的课程列表selectionCourseList selectionCourseList.stream().filter(c - 1.equals(c.getSelectionStatus())).collect(Collectors.toList());}}if (CollectionUtil.isNotEmpty(selectionCourseList)) {// 筛选学生已选当前任务的课程数量、是否存在当前课程的选课记录、是否存在重复上课时间int taskCourseNum 0; // 学生在该任务中已选的任务数量for (CourseSelectionTaskcourseVo taskcourseVo : selectionCourseList) {String taskId taskcourseVo.getTaskId();if (taskId.equals(param.getTaskId().toString())) {taskCourseNum ;}}if (taskCourseNum param.getMaxSelectNum()) {log.error(handleCourseSelectionStudent-当前选课任务选课数量已达到上限任务id{}学生id{}, param.getTaskId(), param.getStuIdentityId());return;}}// 添加选课信息TCourseSelectionStudent selectionStudent new TCourseSelectionStudentDTO();BeanUtil.copyProperties(param, selectionStudent);selectionStudent.setSelectionStatus(1); //1 选中selectionStudent.setDelFlag(0);selectionStudent.setCreatorId(UserHandle.getUserId()); // 学生id/教师idselectionStudent.setCreateTime(new Date());getBaseMapper().insert(selectionStudent);// 修改redis缓存库存释放锁redisTemplate.opsForValue().decrement(stockKey, 1); // 库存量-1// 添加选课结果 redis key taskcourseId classId stuIdentityIdvalue pkId保存30天退课时删除。String couseSelectionStudentPkIdKey CourseSelectionConstants.COURSE_SELECTION_CLASS_STUDENT_PKID_PREFIX selectionStudent.getTaskcourseId() : selectionStudent.getClassId() : selectionStudent.getStuIdentityId();redisTemplate.opsForValue().set(couseSelectionStudentPkIdKey, selectionStudent.getPkId().toString(), 30, TimeUnit.DAYS);// 构造个人选课记录缓存String stuCourseListKey CourseSelectionConstants.COURSE_SELECTION_STUDENT_COURSELIST_PREFIX selectionStudent.getOrgId() : selectionStudent.getTaskId() : selectionStudent.getStuIdentityId();String stuCourseStr redisTemplate.opsForValue().get(stuCourseListKey);ListCourseSelectionTaskcourseVo taskcourseList new ArrayList();if (StringUtils.isNotBlank(stuCourseStr)) {taskcourseList JSONUtil.toList(JSONUtil.toJsonStr(stuCourseStr), CourseSelectionTaskcourseVo.class);}// 构造学生选课记录列表对象CourseSelectionTaskcourseVo courseVo new CourseSelectionTaskcourseVo();courseVo.setTaskId(param.getTaskId().toString());courseVo.setPkId(param.getTaskcourseId().toString());courseVo.setCourseName(param.getTaskcourseName());courseVo.setCourseNo(param.getTaskcourseNo());courseVo.setWeekSection(param.getWeekSection());courseVo.setCourseNumber(param.getCourseNumber());courseVo.setCourseSelectionStudentId(selectionStudent.getPkId().toString()); // 学生选课记录idcourseVo.setSelectionStatus(1); //设置为1已选courseVo.setTeacherSelection(param.getTeacherSelection().toString());courseVo.setCreateTime(current);taskcourseList.add(courseVo);// 保存个人选课记录30天redisTemplate.opsForValue().set(stuCourseListKey, JSONUtil.toJsonStr(taskcourseList), 30, TimeUnit.DAYS);// 最后才释放锁if (lock.isLocked() lock.isHeldByCurrentThread()) {// 是当前执行线程的锁lock.unlock();}}五、两种分布式锁
1.基于redis命令的分布式锁
a.加锁
1setnx(lockKey, expireTime) set if not exist如果不存在就设置锁。原子方法返回1代表成功存入锁。
2get(lockKey) 获取值oldExpireTime与当前系统时间比较判断锁是否已超时。若已超时允许其他请求重新获取。
3getset(lockKey, newValue) 返回当前锁的过期时间。如果与2的oldExpireTime相等说明获取到了锁否则失败。
b.解锁
delete(lockKey) 在锁定时间内完成操作主动调用delete解锁
c.局限性
多服务器并发进行getset会出现过期时间覆盖问题。 锁不具备拥有者表示任何客户端都可解锁。 不支持阻塞等待和重入。
2. redission分布式锁
1lua脚本原子性执行加锁、解锁、广播解锁消息。 2可重入锁通过redis的hash结构实现内含一对键值对。锁名为hash的名称UUID线程ID作为hash的key锁被重入的次数为value。 3等待锁订阅解锁消息获取解锁时间阻塞待唤醒或者超时。
参考文章
Redis分布式锁-这一篇全了解(Redission实现分布式锁完美方案) redis分布式锁RedissonLock的实现细节 Redis 分布式锁实现的一些方法 setnx()、get()、getset()