糖果网站建设策划书模板,国内外电子政务网站建设差距,wordpress图片类主题,人人车的网站找谁做的这是Mysql系列第26篇。
本篇我们使用mysql实现一个分布式锁。
分布式锁的功能 分布式锁使用者位于不同的机器中#xff0c;锁获取成功之后#xff0c;才可以对共享资源进行操作 锁具有重入的功能#xff1a;即一个使用者可以多次获取某个锁 获取锁有超时的功能#xff…这是Mysql系列第26篇。
本篇我们使用mysql实现一个分布式锁。
分布式锁的功能 分布式锁使用者位于不同的机器中锁获取成功之后才可以对共享资源进行操作 锁具有重入的功能即一个使用者可以多次获取某个锁 获取锁有超时的功能即在指定的时间内去尝试获取锁超过了超时时间如果还未获取成功则返回获取失败 能够自动容错比如A机器获取锁lock1之后在释放锁lock1之前A机器挂了导致锁lock1未释放结果会lock1一直被A机器占有着遇到这种情况时分布式锁要能够自动解决可以这么做持有锁的时候可以加个持有超时时间超过了这个时间还未释放的其他机器将有机会获取锁
预备技能乐观锁
通常我们修改表中一条数据过程如下
t1select获取记录R1
t2对R1进行编辑
t3update R1我们来看一下上面的过程存在的问题
如果A、B两个线程同时执行到t1他们俩看到的R1的数据一样然后都对R1进行编辑然后去执行t3最终2个线程都会更新成功后面一个线程会把前面一个线程update的结果给覆盖掉这就是并发修改数据存在的问题。
我们可以在表中新增一个版本号每次更新数据时候将版本号作为条件并且每次更新时候版本号1过程优化一下如下
t1打开事务start transaction
t2select获取记录R1,声明变量vR1.version
t3对R1进行编辑
t4执行更新操作update R1 set version version 1 where user_id#user_id# and version #v#;
t5t4中的update会返回影响的行数我们将其记录在count中然后根据count来判断提交还是回滚if(count1){//提交事务commit;}else{//回滚事务rollback;}上面重点在于步骤t4当多个线程同时执行到t1他们看到的R1是一样的但是当他们执行到t4的时候数据库会对update的这行记录加锁确保并发情况下排队执行所以只有第一个的update会返回1其他的update结果会返回0然后后面会判断count是否为1进而对事务进行提交或者回滚。可以通过count的值知道修改数据是否成功了。
上面这种方式就乐观锁。我们可以通过乐观锁的方式确保数据并发修改过程中的正确性。
使用mysql实现分布式锁
建表 我们创建一个分布式锁表如下 DROP DATABASE IF EXISTS javacode2018;
CREATE DATABASE javacode2018;
USE javacode2018;
DROP TABLE IF EXISTS t_lock;
create table t_lock(lock_key varchar(32) PRIMARY KEY NOT NULL COMMENT 锁唯一标志,request_id varchar(64) NOT NULL DEFAULT COMMENT 用来标识请求对象的,lock_count INT NOT NULL DEFAULT 0 COMMENT 当前上锁次数,timeout BIGINT NOT NULL DEFAULT 0 COMMENT 锁超时时间,version INT NOT NULL DEFAULT 0 COMMENT 版本号每次更新1
)COMMENT 锁信息表;分布式锁工具类
package com.itsoku.sql;import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;import java.sql.*;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;/*** 工作10年的前阿里P7分享Java、算法、数据库方面的技术干货坚信用技术改变命运让家人过上更体面的生活* 喜欢的请关注公众号路人甲Java*/
Slf4j
public class LockUtils {//将requestid保存在该变量中static ThreadLocalString requestIdTL new ThreadLocal();/*** 获取当前线程requestid** return*/public static String getRequestId() {String requestId requestIdTL.get();if (requestId null || .equals(requestId)) {requestId UUID.randomUUID().toString();requestIdTL.set(requestId);}log.info(requestId:{}, requestId);return requestId;}/*** 获取锁** param lock_key 锁key* param locktimeout(毫秒) 持有锁的有效时间防止死锁* param gettimeout(毫秒) 获取锁的超时时间这个时间内获取不到将重试* return*/public static boolean lock(String lock_key, long locktimeout, int gettimeout) throws Exception {log.info(start);boolean lockResult false;String request_id getRequestId();long starttime System.currentTimeMillis();while (true) {LockModel lockModel LockUtils.get(lock_key);if (Objects.isNull(lockModel)) {//插入一条记录,重新尝试获取锁LockUtils.insert(LockModel.builder().lock_key(lock_key).request_id().lock_count(0).timeout(0L).version(0).build());} else {String reqid lockModel.getRequest_id();//如果reqid为空字符表示锁未被占用if (.equals(reqid)) {lockModel.setRequest_id(request_id);lockModel.setLock_count(1);lockModel.setTimeout(System.currentTimeMillis() locktimeout);if (LockUtils.update(lockModel) 1) {lockResult true;break;}} else if (request_id.equals(reqid)) {//如果request_id和表中request_id一样表示锁被当前线程持有者此时需要加重入锁lockModel.setTimeout(System.currentTimeMillis() locktimeout);lockModel.setLock_count(lockModel.getLock_count() 1);if (LockUtils.update(lockModel) 1) {lockResult true;break;}} else {//锁不是自己的并且已经超时了则重置锁继续重试if (lockModel.getTimeout() System.currentTimeMillis()) {LockUtils.resetLock(lockModel);} else {//如果未超时休眠100毫秒继续重试if (starttime gettimeout System.currentTimeMillis()) {TimeUnit.MILLISECONDS.sleep(100);} else {break;}}}}}log.info(end);return lockResult;}/*** 释放锁** param lock_key* throws Exception*/public static void unlock(String lock_key) throws Exception {//获取当前线程requestIdString requestId getRequestId();LockModel lockModel LockUtils.get(lock_key);//当前线程requestId和库中request_id一致 lock_count0表示可以释放锁if (Objects.nonNull(lockModel) requestId.equals(lockModel.getRequest_id()) lockModel.getLock_count() 0) {if (lockModel.getLock_count() 1) {//重置锁resetLock(lockModel);} else {lockModel.setLock_count(lockModel.getLock_count() - 1);LockUtils.update(lockModel);}}}/*** 重置锁** param lockModel* return* throws Exception*/public static int resetLock(LockModel lockModel) throws Exception {lockModel.setRequest_id();lockModel.setLock_count(0);lockModel.setTimeout(0L);return LockUtils.update(lockModel);}/*** 更新lockModel信息内部采用乐观锁来更新** param lockModel* return* throws Exception*/public static int update(LockModel lockModel) throws Exception {return exec(conn - {String sql UPDATE t_lock SET request_id ?,lock_count ?,timeout ?,version version 1 WHERE lock_key ? AND version ?;PreparedStatement ps conn.prepareStatement(sql);int colIndex 1;ps.setString(colIndex, lockModel.getRequest_id());ps.setInt(colIndex, lockModel.getLock_count());ps.setLong(colIndex, lockModel.getTimeout());ps.setString(colIndex, lockModel.getLock_key());ps.setInt(colIndex, lockModel.getVersion());return ps.executeUpdate();});}public static LockModel get(String lock_key) throws Exception {return exec(conn - {String sql select * from t_lock t WHERE t.lock_key?;PreparedStatement ps conn.prepareStatement(sql);int colIndex 1;ps.setString(colIndex, lock_key);ResultSet rs ps.executeQuery();if (rs.next()) {return LockModel.builder().lock_key(lock_key).request_id(rs.getString(request_id)).lock_count(rs.getInt(lock_count)).timeout(rs.getLong(timeout)).version(rs.getInt(version)).build();}return null;});}public static int insert(LockModel lockModel) throws Exception {return exec(conn - {String sql insert into t_lock (lock_key, request_id, lock_count, timeout, version) VALUES (?,?,?,?,?);PreparedStatement ps conn.prepareStatement(sql);int colIndex 1;ps.setString(colIndex, lockModel.getLock_key());ps.setString(colIndex, lockModel.getRequest_id());ps.setInt(colIndex, lockModel.getLock_count());ps.setLong(colIndex, lockModel.getTimeout());ps.setInt(colIndex, lockModel.getVersion());return ps.executeUpdate();});}public static T T exec(SqlExecT sqlExec) throws Exception {Connection conn getConn();try {return sqlExec.exec(conn);} finally {closeConn(conn);}}FunctionalInterfacepublic interface SqlExecT {T exec(Connection conn) throws Exception;}GetterSetterBuilderpublic static class LockModel {private String lock_key;private String request_id;private Integer lock_count;private Long timeout;private Integer version;}private static final String url jdbc:mysql://localhost:3306/javacode2018?useSSLfalse; //数据库地址private static final String username root; //数据库用户名private static final String password root123; //数据库密码private static final String driver com.mysql.jdbc.Driver; //mysql驱动/*** 连接数据库** return*/public static Connection getConn() {Connection conn null;try {Class.forName(driver); //加载数据库驱动try {conn DriverManager.getConnection(url, username, password); //连接数据库} catch (SQLException e) {e.printStackTrace();}} catch (ClassNotFoundException e) {e.printStackTrace();}return conn;}/*** 关闭数据库链接** return*/public static void closeConn(Connection conn) {if (conn ! null) {try {conn.close(); //关闭数据库链接} catch (SQLException e) {e.printStackTrace();}}}
}上面代码中实现了文章开头列的分布式锁的所有功能大家可以认真研究下获取锁的方法lock释放锁的方法unlock。 测试用例
package com.itsoku.sql;import lombok.extern.slf4j.Slf4j;
import org.junit.Test;import static com.itsoku.sql.LockUtils.lock;
import static com.itsoku.sql.LockUtils.unlock;/*** 工作10年的前阿里P7分享Java、算法、数据库方面的技术干货坚信用技术改变命运让家人过上更体面的生活* 喜欢的请关注公众号路人甲Java*/
Slf4j
public class LockUtilsTest {//测试重复获取和重复释放Testpublic void test1() throws Exception {String lock_key key1;for (int i 0; i 10; i) {lock(lock_key, 10000L, 1000);}for (int i 0; i 9; i) {unlock(lock_key);}}//获取之后不释放超时之后被thread1获取Testpublic void test2() throws Exception {String lock_key key2;lock(lock_key, 5000L, 1000);Thread thread1 new Thread(() - {try {try {lock(lock_key, 5000L, 7000);} finally {unlock(lock_key);}} catch (Exception e) {e.printStackTrace();}});thread1.setName(thread1);thread1.start();thread1.join();}
}test1方法测试了重入锁的效果。 test2测试了主线程获取锁之后一直未释放持有锁超时之后被thread1获取到了。 留给大家一个问题
上面分布式锁还需要考虑一个问题比如A机会获取了key1的锁并设置持有锁的超时时间为10秒但是获取锁之后执行了一段业务操作业务操作耗时超过10秒了此时机器B去获取锁时可以获取成功的此时会导致A、B两个机器都获取锁成功了都在执行业务操作这种情况应该怎么处理大家可以思考一下然后留言我们一起讨论一下。