域名没到期 网站打不开,东莞市塘厦镇,创建全国文明城市宣传栏,灵台县门户网前言
在上一篇文章中#xff0c;我们使用了springboot的AOP功能实现了kafka的分布式事务#xff0c;但是那样实现的kafka事务是不完美的#xff0c;因为请求进来之后分配的是不同线程#xff0c;但不同线程使用的kafka事务却是同一个#xff0c;这样会造成多请求情况下的…前言
在上一篇文章中我们使用了springboot的AOP功能实现了kafka的分布式事务但是那样实现的kafka事务是不完美的因为请求进来之后分配的是不同线程但不同线程使用的kafka事务却是同一个这样会造成多请求情况下的事务失效。
而解决这个问题的方法就是每个线程都使用一个新的事务生产者去发送一条新的事务消息然后这个事务还要和当前线程进行绑定实现不同线程之间的事务隔离。
通常来说这个繁杂的过程虽然我们可以实现但是始终没有框架研发者做的那么完美所以我们首先要去看一下框架的作者有没有实现这个功能。
幸运地是上述功能在kafka之中是有实现的而且首次实现的时间是在2017年所以我们可以直接使用作者提供的基于springboot的事务管理功能。
注入kafka事务
在springboot中启用kafka的事务有两种方式第一种方式为使用springboot提供的自动配置第二种是自己往容器中注入。
方式一springboot自动注入
想要使用自动注入我们只需要在配置文件中加入transaction-id-prefix即可配置文件如下
spring:kafka:producer:bootstrap-servers: localhost:9092#bootstrap-servers: localhost:9010key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializertransaction-id-prefix: test这样配置之后就开启了kafka的事务。
方式一弊端
这样虽然可以直接使用springboot自动装配功能但是却有下面两个弊端
只能使用一个kafka的集群地址全局开启了事务有的方法并不需要全局开启事务 所以一旦有多个kafka的地址需要配置或者只想让部分方法使用事务那么就可以使用第二种方法来解决那就是自己往容器里面添加kafka的事务管理器。
方式二向spring容器中添加自定义kafka事务管理器
在kafka事务管理器中有三个重要的对象分别是ProducerFactory、KafkaTemplate、KafkaTransactionManager他们的作用如下
ProducerFactory用来创建kafka的生产者对象KafkaTemplatespringboot封装的kafka模版KafkaTransactionManagerkafka的事务管理器 想要往spring容器中添加自定义的kafka事务管理器其实就是添加一个自定义的KafkaTransactionManager对象那么我们只需要想办法构造一个KafkaTransactionManager就好。
利用springboot的配置类我们能很轻松的做到这一点。 第一步构造一个配置类KafkaAndDataTransactionConfig加上Configuration注解。
Configuration
public class KafkaAndDataTransactionConfig {
}第二步构建一个ProducerFactory对象的Bean交给spring容器。 ResourceNacosDiscoveryProperties nacosDiscoveryProperties;/*** 注入一个kafka生产者这个生产者的transactional.id自定义避免导致多个生产者的事务id相同* param props yaml文件中的定义属性*/BeanProducerFactoryString, String pf1(KafkaProperties props) {MapString, Object pProps props.buildProducerProperties();pProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, product-transactional-id- nacosDiscoveryProperties.getIp() - nacosDiscoveryProperties.getPort());pProps.put(ProducerConfig.CLIENT_ID_CONFIG, product-client-id- nacosDiscoveryProperties.getIp() - nacosDiscoveryProperties.getPort());return new DefaultKafkaProducerFactory(pProps);}注意其中的nacosDiscoveryProperties变量这是用来获取实例在nacos中的ip地址因为在多实例的情况下需要保证每一个事务id的唯一才不会被kafka的事务管理器识别为失效事务生产者从而导致事务冲突失效。 第三步创建一个KafkaTransactionManager对象的Bean添加到spring容器。 /*** 注入一个kafka事务管理器这个事务管理器使用事务id* param pf1* return*/BeanKafkaTransactionManagerString, String kafkaTransactionManagerWithTxId(ProducerFactoryString, String pf1) {return new KafkaTransactionManager(pf1);}只需要将创建好的生产者bean作为构造参数传入即可。 通过以上三步我们就得到了一个支持事务的kafka事务管理器了不过此时我们还少创建了一个KafkaTemplate没有这个对象我们将完不成事务发送的管控。
第四步创建KafkaTemplate /*** 注入一个使用事务id的kafkaTemplate,这个kafkaTemplate可以使用事务* param pf1* return*/BeanKafkaTemplateString, String kafkaTemplateWithTxId(ProducerFactoryString, String pf1) {return new KafkaTemplate(pf1);}经过以上代码我们就得到了一个完整的kafka事务管理器了。 全部代码如下
Configuration
public class KafkaAndDataTransactionConfig {ResourceNacosDiscoveryProperties nacosDiscoveryProperties;/*** 注入一个kafka生产者这个生产者的transactional.id自定义避免导致多个生产者的事务id相同* param props yaml文件中的定义属性*/BeanProducerFactoryString, String pf1(KafkaProperties props) {MapString, Object pProps props.buildProducerProperties();pProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, product-transactional-id- nacosDiscoveryProperties.getIp() - nacosDiscoveryProperties.getPort());pProps.put(ProducerConfig.CLIENT_ID_CONFIG, product-client-id- nacosDiscoveryProperties.getIp() - nacosDiscoveryProperties.getPort());return new DefaultKafkaProducerFactory(pProps);/*** 注入一个kafka事务管理器这个事务管理器使用事务id* param pf1* return*/BeanKafkaTransactionManagerString, String kafkaTransactionManagerWithTxId(ProducerFactoryString, String pf1) {return new KafkaTransactionManager(pf1);}/*** 注入一个使用事务id的kafkaTemplate,这个kafkaTemplate可以使用事务* param pf1* return*/BeanKafkaTemplateString, String kafkaTemplateWithTxId(ProducerFactoryString, String pf1) {return new KafkaTemplate(pf1);}}增加DataSourceTransaction事务管理器
默认情况DataSourceTransaction事务管理器springboot会帮我们自动配置但是在使用了kafka的事务之后会存在一个类的加载冲突导致DataSourceTransaction没有被springboot自动加载到所以我们还需要自己将DataSourceTransaction事务管理加入进来。 在上面的代码中再加入以下代码 //构造器注入DataSource和transactionManagerCustomizersprivate final DataSource dataSource;private final TransactionManagerCustomizers transactionManagerCustomizers;KafkaAndDataTransactionConfig(DataSource dataSource,ObjectProviderTransactionManagerCustomizers transactionManagerCustomizers) {this.dataSource dataSource;this.transactionManagerCustomizers transactionManagerCustomizers.getIfAvailable();}/*** Bean 去掉了ConditionalOnMissingBean 避免注入了kafka事务管理器后springboot不再注入DataSourceTransactionManager* Primary 作为主事务管理器这样在使用Transactional时就会使用DataSourceTransactionManager* param properties* return*/BeanPrimarypublic DataSourceTransactionManager dstm(DataSourceProperties properties) {DataSourceTransactionManager transactionManager new DataSourceTransactionManager(this.dataSource);if (this.transactionManagerCustomizers ! null) {this.transactionManagerCustomizers.customize(transactionManager);}return transactionManager;}增加ChainedKafkaTransactionManager管理器
在实际开发中有时候一个方法需要既支持kafka的事务又需要支持JDBC的事务这个时候为了兼容两者的事务我们需要将两者的事务放到同一个事务管理器中让他们两个构成一个事务。kafka的作者为我们提供了ChainedKafkaTransactionManager这个对象来支持这个操作只需要加入以下代码即可 //多个事务管理器构成一个事务使用ChainedKafkaTransactionManager管理是因为可以自动偏移kafka事务给消费者Bean public ChainedKafkaTransactionManager kafkaAndDataSourceTransactionManager(DataSourceTransactionManager transactionManager,Autowired Qualifier(kafkaTransactionManagerWithTxId) KafkaTransactionManager?, ? kafkaTransactionManager){return new ChainedKafkaTransactionManager(transactionManager, kafkaTransactionManager);}以上就是kafka集成springboot的方案接下来看看怎么使用
使用
基于以上的配置一共有三种使用方式
只使用kafka事务只使用JDBC事务同时使用kafka和JDBC事务
针对于上面的三种情况的切换其实就是使用不同Transactional注解中的value值切换不同的事务管理器事务的指定都在service层的实现类中。
只使用kafka事务 //指定事务模版为自定义模版Resource(name kafkaTemplateWithTxId)private KafkaTemplateString, String kafkaTemplate;Transactional(rollbackFor Exception.class,value kafkaAndDataSourceTransactionManager)public void transation() {ProducerRecordString, String stringStringProducerRecord new ProducerRecord(test-topic, test);kafkaTemplate.send(stringStringProducerRecord);}只使用JDBC事务
不需要指定任何的事务管理器 OverrideTransactional(rollbackFor Exception.class)public void transationOfJdbc() {xxxService.update(user);}同时使用kafka和JDBC事务
指定自定义的事务管理器 //指定事务模版为自定义模版Resource(name kafkaTemplateWithTxId)private KafkaTemplateString, String kafkaTemplate;Transactional(rollbackFor Exception.class,value kafkaAndDataSourceTransactionManager)public void transationAll() {xxxService.update(user);spreadMonitorService.sendMsg();ProducerRecordString, String stringStringProducerRecord new ProducerRecord(test-topic, test);kafkaTemplate.send(stringStringProducerRecord);}结语
以上就是在springboot中生产端实现事务的方法总结一下一共分为以下几步
增加kafka事务管理器增加JDBC事务管理器增加事务链事务管理器使用三种事务管理器
下一篇将写springboot中消费端如何配置。 引用资料 kafka官网 kafka的github spring-kafka官网