title 镇江网站建设,大庆市萨尔图区建设局网站,手机 网站,怎样做能让招聘网站记住密码Kafka事务使用和编程示例/实例_JobShow裁员加班实况-微信小程序-CSDN博客一、概述 Kafka事务特性是指一系列的生产者生产消息和消费者提交偏移量的操作在一个事务中#xff0c;或者说是一个原子操作#xff0c;生产消息和提交偏移量同时成功或者失败。注意#xff1a;kafk…Kafka事务使用和编程示例/实例_JobShow裁员加班实况-微信小程序-CSDN博客一、概述 Kafka事务特性是指一系列的生产者生产消息和消费者提交偏移量的操作在一个事务中或者说是一个原子操作生产消息和提交偏移量同时成功或者失败。注意kafka事务和DB事务。在理解消息的事务时一直处于一个错误理解是把操作db的业务逻辑跟操作消息当成是一个事务如下所示void kakfa_in_tranction(){ // 1.kafa的操作读取消息或生产消息 kafkaOperation(); // 2.db操作 dbOperation()https://blog.csdn.net/u010002184/article/details/113933973 一、概述
Kafka事务特性是指一系列的生产者生产消息和消费者提交偏移量的操作在一个事务中或者说是一个原子操作生产消息和提交偏移量同时成功或者失败。 注意kafka事务和DB事务。 在理解消息的事务时一直处于一个错误理解是把操作db的业务逻辑跟操作消息当成是一个事务如下所示 void kakfa_in_tranction(){// 1.kafa的操作读取消息或生产消息kafkaOperation();// 2.db操作dbOperation();
} 操作DB数据库的数据源是DB消息数据源是kfaka这是完全不同两个数据。一种数据源如mysqlkafka对应一个事务所以它们是两个独立的事务。kafka事务指kafka一系列 生产、消费消息等操作组成一个原子操作db事务是指操作数据库的一系列增删改操作组成一个原子操作。
二、事务的使用
Kafka中的事务特性主要用于以下两种场景 生产者发送多条消息可以封装在一个事务中形成一个原子操作。多条消息要么都发送成功要么都发送失败。 read-process-write模式将消息消费和生产封装在一个事务中形成一个原子操作。在一个**流式处理**的应用中常常一个服务需要从上游接收消息然后经过处理后送达到下游这就对应着消息的消费和生成。 当事务中仅仅存在Consumer消费消息的操作时它和Consumer手动提交Offset并没有区别。因此单纯的消费消息并不是Kafka引入事务机制的原因单纯的消费消息也没有必要存在于一个事务中。三、事务相关的API
1 api /*** 初始化事务*/public void initTransactions();/*** 开启事务*/public void beginTransaction() throws ProducerFencedException ;/*** 在事务内提交已经消费的偏移量*/public void sendOffsetsToTransaction(MapTopicPartition, OffsetAndMetadata offsets, String consumerGroupId) ;/*** 提交事务*/public void commitTransaction() throws ProducerFencedException;/*** 丢弃事务*/public void abortTransaction() throws ProducerFencedException ;
2事务配置
2,1 生产者 需要设置transactional.id属性。 设置了transactional.id属性后enable.idempotence属性会自动设置为true。 2.2 消费者 需要设置isolation.level read_committed这样Consumer只会读取已经提交了事务的消息。另外需要设置enable.auto.commit false来关闭自动提交Offset功能。 四、事务使用示例
1 需求 在Kafka的topicods_user中有一些用户数据数据格式如下 姓名,性别,出生日期 张三,1,1980-10-09 李四,0,1985-11-01 我们需要编写程序将用户的性别转换为男、女1-男0-女转换后将数据写入到topicdwd_user中。要求使用事务保障要么消费了数据同时写入数据到 topic提交offset。要么全部失败。 2控制台模拟数据
# 创建名为ods_user和dwd_user的主题
bin/kafka-topics.sh --create --zookeeper node-1:2181 --topic ods_user --partitions 3 --replication-factor 2
bin/kafka-topics.sh --create --zookeeper node-1:2181 --topic dwd_user --partitions 3 --replication-factor 2
# 生产数据到 ods_user
bin/kafka-console-producer.sh --broker-list node-1:9092 --topic ods_user
# 从dwd_user消费数据
bin/kafka-console-consumer.sh --bootstrap-server node-1:9092 --topic dwd_user --from-beginning
3 详细代码
public class TransUse {public static void main(String[] args) {ConsumerString, String consumer createConsumer();ProducerString, String producer createProduceer();// 初始化事务producer.initTransactions();while(true) {try {// 1. 开启事务producer.beginTransaction();// 2. 定义Map结构用于保存分区对应的offsetMapTopicPartition, OffsetAndMetadata offsetCommits new HashMap();// 2. 拉取消息ConsumerRecordsString, String records consumer.poll(2000);for (ConsumerRecordString, String record : records) {// 3. 保存偏移量offsetCommits.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() 1));// 4. 进行转换处理String[] fields record.value().split(,);fields[1] fields[1].equalsIgnoreCase(1) ? 男:女;String message fields[0] , fields[1] , fields[2];// 5. 生产消息到dwd_userproducer.send(new ProducerRecord(dwd_user, message));}// 6. 提交偏移量到事务producer.sendOffsetsToTransaction(offsetCommits, ods_user);// 7. 提交事务producer.commitTransaction();} catch (Exception e) {// 8. 放弃事务producer.abortTransaction();}}}// 1. 创建消费者public static ConsumerString, String createConsumer() {// 1. 创建Kafka消费者配置Properties props new Properties();props.setProperty(bootstrap.servers, node-1:9092);props.setProperty(group.id, ods_user);props.put(isolation.level,read_committed);props.setProperty(enable.auto.commit, false);props.setProperty(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);props.setProperty(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);// 2. 创建Kafka消费者KafkaConsumerString, String consumer new KafkaConsumer(props);// 3. 订阅要消费的主题consumer.subscribe(Arrays.asList(ods_user));return consumer;}// 2. 创建生产者public static ProducerString, String createProduceer() {// 1. 创建生产者配置Properties props new Properties();props.put(bootstrap.servers, node-1:9092);props.put(transactional.id, dwd_user);props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);// 2. 创建生产者ProducerString, String producer new KafkaProducer(props);return producer;}}
4异常模拟
// 3. 保存偏移量
offsetCommits.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() 1));
// 4. 进行转换处理
String[] fields record.value().split(,);
fields[1] fields[1].equalsIgnoreCase(1) ? 男:女;
String message fields[0] , fields[1] , fields[2];// 模拟异常
int i 1/0;// 5. 生产消息到dwd_user
producer.send(new ProducerRecord(dwd_user, message)); 我们发现可以消费到消息但如果中间出现异常的话offset是不会被提交的除非消费、生产消息都成功才会提交事务。
转自
kafka事务使用和编程示例_青眼酷白龙的博客-CSDN博客_kafka事务使用 -------------------------------------------
一、事务场景
最简单的需求是producer发的多条消息组成一个事务这些消息需要对consumer同时可见或者同时不可见 。producer可能会给多个topic多个partition发消息这些消息也需要能放在一个事务里面这就形成了一个典型的分布式事务。kafka的应用场景经常是应用先消费一个topic然后做处理再发到另一个topic这个consume-transform-produce过程需要放到一个事务里面比如在消息处理或者发送的过程中如果失败了消费位点也不能提交。producer或者producer所在的应用可能会挂掉新的producer启动以后需要知道怎么处理之前未完成的事务 。流式处理的拓扑可能会比较深如果下游只有等上游消息事务提交以后才能读到可能会导致rt非常长吞吐量也随之下降很多所以需要实现read committed和read uncommitted两种事务隔离级别。二、几个关键概念和推导
1.因为producer发送消息可能是分布式事务所以引入了常用的2PC所以有事务协调者(Transaction Coordinator)。Transaction Coordinator和之前为了解决脑裂和惊群问题引入的Group Coordinator在选举和failover上面类似。
2.事务管理中事务日志是必不可少的kafka使用一个内部topic来保存事务日志这个设计和之前使用内部topic保存位点的设计保持一致。事务日志是Transaction Coordinator管理的状态的持久化因为不需要回溯事务的历史状态所以事务日志只用保存最近的事务状态。 3.因为事务存在commit和abort两种操作而客户端又有read committed和read uncommitted两种隔离级别所以消息队列必须能标识事务状态这个被称作Control Message。 4.producer挂掉重启或者漂移到其它机器需要能关联的之前的未完成事务所以需要有一个唯一标识符来进行关联这个就是TransactionalId一个producer挂了另一个有相同TransactionalId的producer能够接着处理这个事务未完成的状态。注意不要把TransactionalId和数据库事务中常见的transaction id搞混了kafka目前没有引入全局序所以也没有transaction id这个TransactionalId是用户提前配置的。 5. TransactionalId能关联producer也需要避免两个使用相同TransactionalId的producer同时存在所以引入了producer epoch来保证对应一个TransactionalId只有一个活跃的producer epoch 三、事务语义 2.1. 多分区原子写入
事务能够保证Kafka topic下每个分区的原子写入。事务中所有的消息都将被成功写入或者丢弃。例如处理过程中发生了异常并导致事务终止这种情况下事务中的消息都不会被Consumer读取。现在我们来看下Kafka是如何实现原子的“读取-处理-写入”过程的。
首先我们来考虑一下原子“读取-处理-写入”周期是什么意思。简而言之这意味着如果某个应用程序在某个topic tp0的偏移量X处读取到了消息A并且在对消息A进行了一些处理如B FA之后将消息B写入topic tp1则只有当消息A和B被认为被成功地消费并一起发布或者完全不发布时整个读取过程写入操作是原子的。
现在只有当消息A的偏移量X被标记为消耗时消息A才被认为是从topic tp0消耗的消费到的数据偏移量record offset将被标记为提交偏移量Committing offset。在Kafka中我们通过写入一个名为__consumer_offsets topic的内部Kafka topic来记录offset commit。消息仅在其offset被提交给__consumer_offsets topic时才被认为成功消费。
由于offset commit只是对Kafkatopic的另一次写入并且由于消息仅在提交偏移量时被视为成功消费所以跨多个主题和分区的原子写入也启用原子“读取-处理-写入”循环提交偏移量X到offset topic和消息B到tp1的写入将是单个事务的一部分所以整个步骤都是原子的。
kafka系列九、kafka事务原理、事务API和使用场景 - 小人物的奋斗 - 博客园