做网站看好金石网络,常州建设网站公司,免费发布推广信息,wordpress 禁止twitter偏移量offset是消费者消费数据的一个非常重要的属性。默认情况下#xff0c;消费者如果不指定消费主题数据的偏移量#xff0c;那么消费者启动消费时#xff0c;无论当前主题之前存储了多少历史数据#xff0c;消费者只能从连接成功后当前主题最新的数据偏移位置读取#…偏移量offset是消费者消费数据的一个非常重要的属性。默认情况下消费者如果不指定消费主题数据的偏移量那么消费者启动消费时无论当前主题之前存储了多少历史数据消费者只能从连接成功后当前主题最新的数据偏移位置读取而无法读取之前的任何数据。如果想要获取之前的数据就需要设定配置参数或指定数据偏移量。
【1】起始偏移量
在消费者的配置中我们可以增加偏移量相关参数auto.offset.reset用于从最开始获取主题数据。 参数取值有3个latest、earliest、none。
① earliest
earliest对于同一个消费者组从头开始消费。就是说如果这个topic有历史消息存在现在新启动了一个消费者组且auto.offset.resetearliest那将会从头开始消费未提交偏移量的场合。
② latest
latest对于同一个消费者组消费者只能消费到连接topic后新产生的数据未提交偏移量的场合。 none生产环境不使用。
【2】 指定偏移量消费
除了从最开始的偏移量或最后的偏移量读取数据以外Kafka还支持从指定的偏移量的位置开始消费数据。
【3】偏移量提交
生产环境中消费者可能因为某些原因或故障重新启动消费那么如果不知道之前消费数据的位置重启后再消费就可能重复消费earliest或漏消费latest。所以Kafka提供了保存消费者偏移量的功能而这个功能需要由消费者进行提交操作。这样消费者重启后就可以根据之前提交的偏移量进行消费了。
注意一旦消费者提交了偏移量那么kafka会优先使用提交的偏移量进行消费。此时auto.offset.reset参数是不起作用的。
① 自动提交
所谓的自动提交就是消费者消费完数据后无需告知kafka当前消费数据的偏移量而是由消费者客户端API周期性地将消费的偏移量提交到Kafka中。这个周期默认为5000ms可以通过配置进行修改。 ② 手动提交
基于时间周期的偏移量提交是我们无法控制的一旦参数设置的不合理或单位时间内数据量消费的很多却没有来及的自动提交那么数据就会重复消费。所以Kafka也支持消费偏移量的手动提交也就是说当消费者消费完数据后自行通过API进行提交。
不过为了考虑效率和安全kafka同时提供了异步提交和同步提交两种方式供我们选择。注意需要禁用自动提交auto.offset.resetfalse才能开启手动提交 。
异步提交
向Kafka发送偏移量offset提交请求后就可以直接消费下一批数据因为无需等待kafka的提交确认所以无法知道当前的偏移量一定提交成功所以安全性比较低但相对消费性能会提高。 同步提交
必须等待Kafka完成offset提交请求的响应后才可以消费下一批数据。一旦提交失败会进行重试处理尽可能保证偏移量提交成功(但是依然可能因为意外情况导致提交请求失败)。此种方式消费效率比较低但是安全性高。
【4】偏移量保存
由于消费者在消费消息的时候可能会由于各种原因而断开消费当重新启动消费者时我们需要让它接着上次消费的位置offset继续消费因此消费者需要实时的记录自己以及消费的位置。
0.90版本之前这个信息是记录在zookeeper内的在0.90之后的版本offset保存在__consumer_offsets这个topic内。
每个consumer会定期将自己消费分区的offset提交给kafka内部topic__consumer_offsets提交过去的时候key是consumerGroupIdtopic分区号 。 value就是当前offset的值kafka会定期清理topic里的消息最后就保留最新的那条数据。 因为__consumer_offsets可能会接收高并发的请求kafka默认给其分配50个分区(可以通过offsets.topic.num.partitions设置)均匀分配到Kafka集群的多个Broker中。Kafka采用hash(consumerGroupId) % __consumer_offsets主题的分区数来计算我们的偏移量提交到哪一个分区。
因为偏移量也是保存到主题中的所以保存的过程和生产者生产数据的过程基本相同。
【5】消费者事务
无论偏移量使用自动提交还是手动提交特殊场景中数据都有可能会出现重复消费。 如果提前提交偏移量再处理业务又可能出现数据丢失的情况。
对于单独的Consumer来讲事务保证会比较弱尤其是无法保证提交的信息被精确消费。主要原因就是消费者可以通过偏移量访问信息而不同的数据文件生命周期不同同一事务的信息可能会因为重启导致被删除的情况。
所以一般情况下想要完成kafka消费者端的事务处理需要将数据消费过程和偏移量提交过程进行原子性绑定。也就是说数据处理完了必须要保证偏移量正确提交才可以做下一步的操作。如果偏移量提交失败那么数据就恢复成处理之前的效果。
对于生产者事务而言消费者消费的数据也会受到限制。默认情况下消费者只能消费到生产者提交的数据也就是未提交完成的数据消费者是看不到的。