同心食品厂网站建设项目任务分解,wordpress linux 伪静态,网络营销的解释,asp网站安装到空间1. 背景
一个主库和N个应用库的数据源#xff0c;并且会同时操作主库和应用库的数据#xff0c;需要解决以下两个问题#xff1a; 如何动态管理多个数据源以及切换#xff1f; 如何保证多数据源场景下的数据一致性(事务)#xff1f;
本文主要探讨这两个问题的解决方案…1. 背景
一个主库和N个应用库的数据源并且会同时操作主库和应用库的数据需要解决以下两个问题 如何动态管理多个数据源以及切换 如何保证多数据源场景下的数据一致性(事务)
本文主要探讨这两个问题的解决方案希望能对读者有一定的启发。
2. 数据源切换原理
通过扩展Spring提供的抽象类AbstractRoutingDataSource可以实现切换数据源。其类结构如下图所示 targetDataSourcesdefaultTargetDataSource
项目上需要使用的所有数据源和默认数据源。 resolvedDataSourcesresolvedDefaultDataSource
当Spring容器创建AbstractRoutingDataSource对象时通过调用afterPropertiesSet复制上述目标数据源。由此可见一旦数据源实例对象创建完毕业务无法再添加新的数据源。 determineCurrentLookupKey
此方法为抽象方法通过扩展这个方法来实现数据源的切换。目标数据源的结构为MapObject, DataSource其key为lookup key。
我们来看官方对这个方法的注释 lookup key通常是绑定在线程上下文中根据这个key去resolvedDataSources中取出DataSource。
根据目标数据源的管理方式不同可以使用基于配置文件和数据库表两种方式。基于配置文件管理方案无法后续添加新的数据源而基于数据库表方案管理则更加灵活。关注公z号码猿技术专栏回复关键词1111 获取阿里内部Java性能调优手册
3. 配置文件解决方案
根据上面的分析我们可以按照下面的步骤去实现 定义DynamicDataSource类继承AbstractRoutingDataSource重写determineCurrentLookupKey()方法。 配置多个数据源注入targetDataSources和defaultTargetDataSource通过afterPropertiesSet()方法将数据源写入resolvedDataSources和resolvedDefaultDataSource。 调用AbstractRoutingDataSource的getConnection()方法时determineTargetDataSource()方法返回DataSource执行底层的getConnection()。
其流程如下图所示 3.1 创建数据源
DynamicDataSource数据源的注入目前业界主流实现步骤如下
在配置文件中定义数据源
spring.datasource.typecom.alibaba.druid.pool.DruidDataSource
spring.datasource.driverClassNamecom.mysql.jdbc.Driver
# 主数据源
spring.datasource.druid.master.urljdbcUrl
spring.datasource.druid.master.username***
spring.datasource.druid.master.password***
# 其他数据源
spring.datasource.druid.second.urljdbcUrl
spring.datasource.druid.second.username***
spring.datasource.druid.second.password***在代码中配置Bean
Configuration
public class DynamicDataSourceConfig {BeanConfigurationProperties(spring.datasource.druid.master)public DataSource firstDataSource(){return DruidDataSourceBuilder.create().build();}BeanConfigurationProperties(spring.datasource.druid.second)public DataSource secondDataSource(){return DruidDataSourceBuilder.create().build();}BeanPrimarypublic DynamicDataSource dataSource(DataSource firstDataSource, DataSource secondDataSource) {MapObject, Object targetDataSources new HashMap(5);targetDataSources.put(DataSourceNames.FIRST, firstDataSource);targetDataSources.put(DataSourceNames.SECOND, secondDataSource);return new DynamicDataSource(firstDataSource, targetDataSources);}
}3.2 AOP处理
通过DataSourceAspect切面技术来简化业务上的使用只需要在业务方法添加SwitchDataSource注解即可完成动态切换
Documented
Retention(RetentionPolicy.RUNTIME)
Target({ElementType.METHOD})
public interface SwitchDataSource {String value();
}DataSourceAspect拦截业务方法更新当前线程上下文DataSourceContextHolder中存储的key即可实现数据源切换。
3.3 方案不足
基于AbstractRoutingDataSource的多数据源动态切换有个明显的缺点无法动态添加和删除数据源。在我们的产品中不能把应用数据源写死在配置文件。接下来分享一下基于数据库表的实现方案。
4. 数据库表解决方案
我们需要实现可视化的数据源管理并实时查看数据源的运行状态。所以我们不能把数据源全部配置在文件中应该将数据源定义保存到数据库表。参考AbstractRoutingDataSource的设计思路实现自定义数据源管理。
4.1 设计数据源表
主库的数据源信息仍然配置在项目配置文件中应用库数据源配置参数则设计对应的数据表。表结构如下所示 这个表主要就是DataSource的相关配置参数其相应的ORM操作代码在此不再赘述主要是实现数据源的增删改查操作。
4.2 自定义数据源管理
4.2.1 定义管理接口
通过继承AbstractDataSource即可实现DynamicDataSource。为了方便对数据源进行操作我们定义一个接口DataSourceManager为业务提供操作数据源的统一接口。
public interface DataSourceManager {void put(String var1, DataSource var2);DataSource get(String var1);Boolean hasDataSource(String var1);void remove(String var1);void closeDataSource(String var1);CollectionDataSource all();
}该接口主要是对数据表中定义的数据源提供基础管理功能。
4.2.2 自定义数据源
DynamicDataSource的实现如下图所示 根据前面的分析AbstractRoutingDataSource是在容器启动的时候执行afterPropertiesSet注入数据源对象完成之后无法对数据源进行修改。DynamicDataSource则实现DataSourceManager接口可以将数据表中的数据源加载到dataSources。
4.2.3 切面处理
这一块的处理跟配置文件数据源方案处理方式相同都是通过AOP技术切换lookup key。
public DataSource determineTargetDataSource() {String lookupKey DataSourceContextHolder.getKey();DataSource dataSource Optional.ofNullable(lookupKey).map(dataSources::get).orElse(defaultDataSource);if (dataSource null) {throw new IllegalStateException(Cannot determine DataSource for lookup key [ lookupKey ]);}return dataSource;}4.2.4 管理数据源状态
在项目启动的时候加载数据表中的所有数据源并执行初始化。初始化操作主要是使用SpringBoot提供的DataSourceBuilder类根据数据源表的定义创建DataSource。在项目运行过程中可以使用定时任务对数据源进行保活为了提升性能再添加一层缓存。 AbstractRoutingDataSource 只支持单库事务切换数据源是在开启事务之前执行。Spring使用 DataSourceTransactionManager进行事务管理。开启事务会将数据源缓存到DataSourceTransactionObject对象中后续的commit和 rollback事务操作实际上是使用的同一个数据源。
如何解决切库事务问题借助Spring的声明式事务处理我们可以在多次切库操作时强制开启新的事务
SwitchDataSource
Transactional(rollbackFor Exception.class, propagation Propagation.REQUIRES_NEW)这样的话执行切库操作的时候强制启动新事务便可实现多次切库而且事务能够生效。但是这种事务方式存在数据一致性问题 假若ServiceB正常执行提交事务接着返回ServiceA执行并且发生异常。因为两次处理是不同的事务ServiceA这个事务执行回滚而ServiceA事务已经提交。这样的话数据就不一致了。接下来我们主要讨论如何解决多库的事务问题。
6. 多库事务处理
6.1 关于事务的理解
首先有必要理解事务的本质。
1.提到Spring事务就离不开事务的四大特性和隔离级别、七大传播特性。
事务特性和离级别是属于数据库范畴。Spring事务的七大传播特性是什么呢它是Spring在当前线程内处理多个事务操作时的事务应用策略数据库事务本身并不存在传播特性。 2.Spring事务的定义包括begin、commit、rollback、close、suspend、resume等动作。 begin(事务开始) 可以认为存在于数据库的命令中比如Mysql的start transaction命令但是在JDBC编程方式中不存在。 close(事务关闭) Spring事务的close()方法是把Connection对象归还给数据库连接池与事务无关。 suspend(事务挂起) Spring中事务挂起的语义是需要新事务时将现有的Connection保存起来(还有尚未提交的事务)然后创建新的Connection2Connection2提交、回滚、关闭完毕后再把Connection1取出来继续执行。 resume(事务恢复) 嵌套事务执行完毕返回上层事务重新绑定连接对象到事务管理器的过程。
实际上只有commit、rollback、close是在JDBC真实存在的而其他动作都是应用的语意而非JDBC事务的真实命令。因此事务真实存在的方法是setAutoCommit()、commit()、rollback()。
close()语义为 关闭一个数据库连接这已经不再是事务的方法了。
使用DataSource并不会执行物理关闭只是归还给连接池。
6.2 自定义管理事务
为了保证在多个数据源中事务的一致性我们可以手动管理Connetion的事务提交和回滚。考虑到不同ORM框架的事务管理实现差异要求实现自定义事务管理不影响框架层的事务。
这可以通过使用装饰器设计模式对Connection进行包装重写commit和rolllback屏蔽其默认行为这样就不会影响到原生Connection和ORM框架的默认事务行为。其整体思路如下图所示 这里并没有使用前面提到的SwitchDataSource这是因为我们在TransactionAop中已经执行了lookupKey的切换。
6.2.1 定义多事务注解
Target({ElementType.METHOD})
Retention(RetentionPolicy.RUNTIME)
Documented
public interface MultiTransaction {String transactionManager() default multiTransactionManager;// 默认数据隔离级别随数据库本身默认值IsolationLevel isolationLevel() default IsolationLevel.DEFAULT;// 默认为主库数据源String datasourceId() default default;// 只读事务若有更新操作会抛出异常boolean readOnly() default false;业务方法只需使用该注解即可开启事务datasourceId指定事务用到的数据源不指定默认为主库。
6.2.3 包装Connection
自定义事务我们使用包装过的Connection屏蔽其中的commitrollback方法。这样我们就可以在主事务里进行统一的事务提交和回滚操作。
public class ConnectionProxy implements Connection {private final Connection connection;public ConnectionProxy(Connection connection) {this.connection connection;}Overridepublic void commit() throws SQLException {// connection.commit();}public void realCommit() throws SQLException {connection.commit();}Overridepublic void close() throws SQLException {//connection.close();}public void realClose() throws SQLException {if (!connection.getAutoCommit()) {connection.setAutoCommit(true);}connection.close();}Overridepublic void rollback() throws SQLException {if(!connection.isClosed())connection.rollback();}...
}这里commitclose方法不执行操作rollback执行的前提是连接执行close才生效。这样不管是使用哪个ORM框架其自身事务管理都将失效。事务的控制就交由MultiTransaction控制了。
6.2.4 事务上下文管理
public class TransactionHolder {// 是否开启了一个MultiTransactionprivate boolean isOpen;// 是否只读事务private boolean readOnly;// 事务隔离级别private IsolationLevel isolationLevel;// 维护当前线程事务ID和连接关系private ConcurrentHashMapString, ConnectionProxy connectionMap;// 事务执行栈private StackString executeStack;// 数据源切换栈private StackString datasourceKeyStack;// 主事务IDprivate String mainTransactionId;// 执行次数private AtomicInteger transCount;// 事务和数据源key关系private ConcurrentHashMapString, String executeIdDatasourceKeyMap;}每开启一个事物生成一个事务ID并绑定一个ConnectionProxy。事务嵌套调用保存事务ID和lookupKey至栈中当内层事务执行完毕执行pop。这样的话外层事务只需在栈中执行peek即可获取事务ID和lookupKey。
6.2.5 数据源兼容处理
为了不影响原生事务的使用需要重写getConnection方法。当前线程没有启动自定义事务则直接从数据源中返回连接。
Overridepublic Connection getConnection() throws SQLException {TransactionHolder transactionHolder MultiTransactionManager.TRANSACTION_HOLDER_THREAD_LOCAL.get();if (Objects.isNull(transactionHolder)) {return determineTargetDataSource().getConnection();}ConnectionProxy ConnectionProxy transactionHolder.getConnectionMap().get(transactionHolder.getExecuteStack().peek());if (ConnectionProxy null) {// 没开跨库事务直接返回return determineTargetDataSource().getConnection();} else {transactionHolder.addCount();// 开了跨库事务从当前线程中拿包装过的Connectionreturn ConnectionProxy;}}6.2.6 切面处理
切面处理的核心逻辑是维护一个嵌套事务栈当业务方法执行结束或者发生异常时判断当前栈顶事务ID是否为主事务ID。如果是的话这时候已经到了最外层事务这时才执行提交和回滚。详细流程如下图所示 package com.github.mtxn.transaction.aop;
Aspect
Component
Slf4j
Order(99999)
public class MultiTransactionAop {Pointcut(annotation(com.github.mtxn.transaction.annotation.MultiTransaction))public void pointcut() {if (log.isDebugEnabled()) {log.debug(start in transaction pointcut...);}}Around(pointcut())public Object aroundTransaction(ProceedingJoinPoint point) throws Throwable {MethodSignature signature (MethodSignature) point.getSignature();// 从切面中获取当前方法Method method signature.getMethod();MultiTransaction multiTransaction method.getAnnotation(MultiTransaction.class);if (multiTransaction null) {return point.proceed();}IsolationLevel isolationLevel multiTransaction.isolationLevel();boolean readOnly multiTransaction.readOnly();String prevKey DataSourceContextHolder.getKey();MultiTransactionManager multiTransactionManager Application.resolve(multiTransaction.transactionManager());// 切数据源如果失败使用默认库if (multiTransactionManager.switchDataSource(point, signature, multiTransaction)) return point.proceed();// 开启事务栈TransactionHolder transactionHolder multiTransactionManager.startTransaction(prevKey, isolationLevel, readOnly, multiTransactionManager);Object proceed;try {proceed point.proceed();multiTransactionManager.commit();} catch (Throwable ex) {log.error(execute method:{}#{},err:, method.getDeclaringClass(), method.getName(), ex);multiTransactionManager.rollback();throw ExceptionUtils.api(ex, 系统异常%s, ex.getMessage());} finally {// 当前事务结束出栈String transId multiTransactionManager.getTrans().getExecuteStack().pop();transactionHolder.getDatasourceKeyStack().pop();// 恢复上一层事务DataSourceContextHolder.setKey(transactionHolder.getDatasourceKeyStack().peek());// 最后回到主事务关闭此次事务multiTransactionManager.close(transId);}return proceed;}}7.总结
本文主要介绍了多数据源管理的解决方案(应用层事务而非XA二段提交保证)以及对多个库同时操作的事务管理。
需要注意的是这种方式只适用于单体架构的应用。
因为多个库的事务参与者都是运行在同一个JVM进行。
如果是在微服务架构的应用中则需要使用分布式事务管理(譬如Seata)。
最后说一句(求关注!别白嫖)
如果这篇文章对您有所帮助或者有所启发的话求一键三连点赞、转发、在看。
关注公众号woniuxgg在公众号中回复笔记 就可以获得蜗牛为你精心准备的java实战语雀笔记回复面试、开发手册、有超赞的粉丝福利