广州市建设厅网站首页,做网站办什么营业执照,个人免费设计网站,wordpress 文章 分类 页面目录 一、消息的接收1.1、消息监听器 二、消息监听容器2.1、 实现方法2.1.1、KafkaMessageListenerContainer2.1.1.1、 基本概念2.1.1.2、如何使用 KafkaMessageListenerContainer 2.1.2、ConcurrentMessageListenerContainer 三、偏移 四、监听器容器自动启动 一、消息的接收 … 目录 一、消息的接收1.1、消息监听器 二、消息监听容器2.1、 实现方法2.1.1、KafkaMessageListenerContainer2.1.1.1、 基本概念2.1.1.2、如何使用 KafkaMessageListenerContainer 2.1.2、ConcurrentMessageListenerContainer 三、偏移 四、监听器容器自动启动 一、消息的接收 消息的接收可以通过配置MessageListenerContainer并提供消息侦听器或使用KafkaListener注释来接收消息。本章我们主要说明通过配置MessageListenerContainer并提供消息侦听器的方式接收消息。 1.1、消息监听器
当使用消息监听容器时就必须提供一个监听器来接收数据。目前有八个支持消息侦听器的接口
public interface MessageListenerK, V { // 当使用自动提交或容器管理的提交方法之一时使用此接口处理从 Kafka 消费者 poll() 操作接收到的各个 ConsumerRecord 实例。void onMessage(ConsumerRecordK, V data);
}public interface AcknowledgingMessageListenerK, V { // 当使用手动提交方法之一时使用此接口处理从 Kafka 消费者 poll() 操作接收到的各个 ConsumerRecord 实例。void onMessage(ConsumerRecordK, V data, Acknowledgment acknowledgment);
}public interface ConsumerAwareMessageListenerK, V extends MessageListenerK, V { // 当使用自动提交或容器管理的提交方法之一时使用此接口处理从 Kafka 消费者 poll() 操作接收到的各个 ConsumerRecord 实例。提供对 Consumer 对象的访问。void onMessage(ConsumerRecordK, V data, Consumer?, ? consumer);}public interface AcknowledgingConsumerAwareMessageListenerK, V extends MessageListenerK, V { //当使用手动提交方法之一时使用此接口处理从 Kafka 消费者 poll() 操作接收到的各个 ConsumerRecord 实例。提供对 Consumer 对象的访问。void onMessage(ConsumerRecordK, V data, Acknowledgment acknowledgment, Consumer?, ? consumer);}public interface BatchMessageListenerK, V { //当使用自动提交或容器管理的提交方法之一时使用此接口处理从 Kafka 消费者 poll() 操作接收到的所有 ConsumerRecord 实例。使用此接口时不支持 AckMode.RECORD因为侦听器会获得完整的批次。void onMessage(ListConsumerRecordK, V data);}public interface BatchAcknowledgingMessageListenerK, V { // 当使用手动提交方法之一时使用此接口处理从 Kafka 消费者 poll() 操作接收到的所有 ConsumerRecord 实例。void onMessage(ListConsumerRecordK, V data, Acknowledgment acknowledgment);}public interface BatchConsumerAwareMessageListenerK, V extends BatchMessageListenerK, V { // 当使用自动提交或容器管理的提交方法之一时使用此接口处理从 Kafka 消费者 poll() 操作接收到的所有 ConsumerRecord 实例。使用此接口时不支持 AckMode.RECORD因为侦听器会获得完整的批次。提供对 Consumer 对象的访问。void onMessage(ListConsumerRecordK, V data, Consumer?, ? consumer);}public interface BatchAcknowledgingConsumerAwareMessageListenerK, V extends BatchMessageListenerK, V { //当使用手动提交方法之一时使用此接口处理从 Kafka 消费者 poll() 操作接收到的所有 ConsumerRecord 实例。提供对 Consumer 对象的访问。void onMessage(ListConsumerRecordK, V data, Acknowledgment acknowledgment, Consumer?, ? consumer);}注意1、 Consumer对象不是线程安全的2、不应执行任何Consumer?, ?影响消费者位置和/或监听器中已提交偏移量的方法容器需要管理这些信息。 二、消息监听容器
2.1、 实现方法 MessageListenerContainer 提供了两种实现方式 1、KafkaMessageListenerContainer 2、ConcurrentMessageListenerContainer 2.1.1、KafkaMessageListenerContainer
2.1.1.1、 基本概念
KafkaMessageListenerContainer在单个线程上接收来自所有主题或分区的所有消息。委托ConcurrentMessageListenerContainer给一个或多个KafkaMessageListenerContainer实例以提供多线程消费。
从2.2.7版本开始可以添加一个记录拦截器(RecordInterceptor)监听器容器它将在调用侦听器之前调用以允许检查或修改记录。如果拦截器返回 null则不会调用侦听器。从版本 2.7 开始它具有在侦听器退出后通常或通过抛出异常调用的附加方法。批处理拦截器BatchInterceptor为批量监听器Batch Listeners提供类似的功能。此外ConsumerAwareRecordInterceptor和 BatchInterceptor提供对 Consumer?, ? 的访问。 例如这可以用于访问拦截器中的消费者指标。CompositeRecordInterceptor and CompositeBatchInterceptor可以调用多个拦截器。默认情况下当使用事务时拦截器在事务启动后被调用。从版本 2.3.4 开始可以设置侦听器容器的 interceptBeforeTx 属性在事务开始之前调用拦截器。从版本 2.3.8、2.4.6 开始当并发大于 1 时 ConcurrentMessageListenerContainer 支持静态成员资格。 group.instance.id 后缀为 -n 起始n于1。这与增加 session.timeout.ms 的值 一起可用于减少重新平衡事件例如当应用程序实例重新启动时。静态成员资格是指在提高流应用程序、消费者组和其他构建在组再平衡协议之上的应用程序的可用性。再平衡协议依赖组协调器为组成员分配实体 ID。这些生成的 ID 是短暂的并且会在成员重新启动和重新加入时发生变化。对于基于消费者的应用程序这种“动态成员资格”可能会导致在管理操作例如代码部署、配置更新和定期重新启动期间将大部分任务重新分配给不同的实例。对于大型状态应用程序洗牌任务在处理之前需要很长时间才能恢复其本地状态从而导致应用程序部分或完全不可用。受这一观察的启发Kafka 的组管理协议允许组成员提供持久的实体 ID。根据这些 ID组成员资格保持不变因此不会触发重新平衡。 同样的拦截器中不应该执行任何影响消费者的位置和/或提交的偏移量的方法容器需要管理这些信息。 如果拦截器改变了记录通过创建新记录则topic、partition和offset必须保持不变以避免意外的副作用例如记录丢失。 2.1.1.2、如何使用 KafkaMessageListenerContainer KafkaMessageListenerContainer 构造函数 public KafkaMessageListenerContainer(ConsumerFactoryK, V consumerFactory,ContainerProperties containerProperties)该构造函数接收接收消费者工厂ConsumerFactory有关对象中主题和分区以及其他配置的信息。 容器属性ContainerProperties包含3个构造函数下面我们一个一个介绍它们。 1、以TopicPartitionOffset为参数 public ContainerProperties(TopicPartitionOffset... topicPartitions)该构造函数采用一个主题分区偏移量TopicPartitionOffset参数数组来显式指示容器要使用哪些分区使用消费者assign()方法并带有可选的初始偏移量。默认情况下正值是绝对偏移量负值是相对于分区内当前最后一个偏移量。TopicPartitionOffset提供了一个带有附加参数的构造函boolean如果是true则在容器启动时相对于该消费者的当前位置初始偏移正或负。 2、以String为参数 public ContainerProperties(String... topics)该构造函数采用主题数组Kafka 根据属性分配分区group.id——在组中分配分区 3、以Pattern为参数 public ContainerProperties(Pattern topicPattern)该构造函数使用正则表达式Pattern来选择主题。 如何将监听器分配给容器 监听器有了容器也有了如何将监听器分配给容器呢。要将 MessageListener 分配给容器可以在创建 Container 时使用 ContainerProps.setMessageListener 方法 ContainerProperties containerProps new ContainerProperties(topic1, topic2);
containerProps.setMessageListener(new MessageListenerInteger, String() {...
});
DefaultKafkaConsumerFactoryInteger, String cf new DefaultKafkaConsumerFactory(consumerProps());
KafkaMessageListenerContainerInteger, String container new KafkaMessageListenerContainer(cf, containerProps);
return container;要注意的是在创建 DefaultKafkaConsumerFactory 时使用仅接受上述属性的构造函数意味着从配置中选取键和值反序列化器类。 或者反序列化器实例可以传递到 DefaultKafkaConsumerFactory 构造函数以获取键和/或值在这种情况下所有消费者共享相同的实例。 另一种选择是提供Supplier从版本2.3开始它将用于为每个消费者获取单独的Deserializer实例 DefaultKafkaConsumerFactoryInteger, CustomValue cf new DefaultKafkaConsumerFactory(consumerProps(), null, () - new CustomValueDeserializer());KafkaMessageListenerContainerInteger, String container new KafkaMessageListenerContainer(cf, containerProps);
return container;从版本 2.3.5 开始引入了一个名为authorizationExceptionRetryInterval 的新容器属性。 这会导致容器在从 KafkaConsumer 获取任何 AuthorizationException 后重试获取消息。 例如当配置的用户被拒绝读取特定主题时就会发生这种情况。 定义authorizationExceptionRetryInterval应该有助于应用程序在授予适当的权限后立即恢复。 2.1.2、ConcurrentMessageListenerContainer
ConcurrentMessageListenerContainer只有一个构造函数与构造函数类似 KafkaListenerContainer。
public ConcurrentMessageListenerContainer(ConsumerFactoryK, V consumerFactory,ContainerProperties containerProperties)它有一个concurrency属性这个属性的作用是创建几个 KafkaMessageListenerContainer 实例。例如container.setConcurrency(3) 创建三个 KafkaMessageListenerContainer 实例。 当监听多个主题时默认的分区分布可能不是我们所期望的。 例如如果有 3 个主题每个主题有 5 个分区并且我们想要使用 concurrency15但是我们只会看到 5 个活动使用者每个使用者从每个主题分配一个分区而其他 10 个使用者处于空闲状态。 这是因为默认的 Kafka PartitionAssignor 是 RangeAssignor。 对于这种情况我们需要考虑使用 RoundRobinAssignor它将分区分配给所有使用者。 然后为每个消费者分配一个主题或分区。 我们可以在提供给DefaultKafkaConsumerFactory的属性中设置partition.assignment.strategy消费者属性来更改要更改PartitionAssignor。ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG。 在springboot中可以这样 spring.kafka.consumer.properties.partition.assignment.strategy org.apache.kafka.clients.consumer.RoundRobinAssignor 当使用 TopicPartitionOffset 配置容器属性时ConcurrentMessageListenerContainer 会在委托 KafkaMessageListenerContainer 实例之间分发 TopicPartitionOffset 实例。 假设提供了 6 个 TopicPartitionOffset 实例并发度为 3 每个容器有两个分区。 对于五个 TopicPartitionOffset 实例两个容器获得两个分区第三个容器获得一个分区。 如果并发数大于TopicPartition的数量则降低并发数使每个容器获得一个分区。 三、偏移
spring提供了几个偏移选项 如果 enable.auto.commit 消费者属性为 trueKafka会根据其配置自动提交偏移量。 如果为 false则容器支持多种 AckMode 设置。 默认 AckMode 为 BATCH。 从版本 2.3 开始框架将 enable.auto.commit 设置为 false除非在配置中明确设置。以前如果未设置该属性则使用 Kafka 默认值 (true)。 消费者 poll() 方法返回一个或多个 ConsumerRecord。 为每条记录调用 MessageListener。 以下列表描述了容器对每个 AckMode 采取的操作当未使用事务时 RECORD当侦听器处理记录后返回时提交偏移量。 BATCH当 poll() 返回的所有记录都已处理完毕时提交偏移量。 TIME当 poll() 返回的所有记录都处理完毕后只要超过了自上次提交以来的 ackTime就提交偏移量。 COUNT当 poll() 返回的所有记录都已处理完毕时提交偏移量只要自上次提交以来已收到 ackCount 条记录。 COUNT_TIME与 TIME 和 COUNT 类似但如果任一条件为真则执行提交。 MANUAL消息侦听器负责acknowledge() 确认。 之后应用与 BATCH 相同的语义。 MANUAL_IMMEDIATE当侦听器调用 Acknowledgment.acknowledge() 方法时立即提交偏移量。 使用事务transactions时偏移量将发送到事务语义相当于 RECORD 或 BATCH具体取决于侦听器类型记录或批处理。MANUAL 和 MANUAL_IMMEDIATE 要求侦听器是 AcknowledgingMessageListener 或 BatchAcknowledgingMessageListener。 根据syncCommits容器属性使用消费者上的commitSync()或commitAsync()方法。 默认情况下syncCommits 为 true。 作者个人建议建议设置ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 为 false。 从版本 2.3 开始Acknowledgment 接口增加了两个方法 nack(long sleep) 和 nack(int index, long sleep)。 第一个与记录侦听器一起使用第二个与批处理侦听器一起使用。 为侦听器类型调用错误的方法将引发 IllegalStateException。在此之前他是这样
public interface Acknowledgment {void acknowledge();}如果要提交部分批次使用 nack()。使用事务时将 AckMode 设置为 MANUAL调用 nack() 会将成功处理的记录的偏移量发送到事务。nack() 只能在调用侦听器的消费者线程上调用。当调用 nack() 时将提交所有挂起的偏移量丢弃上次轮询的剩余记录并在其分区上执行查找以便在下一次轮询时重新传递失败的记录和未处理的记录 。通过设置 sleep 参数消费者线程可以在重新交付之前暂停。 这与在容器配置了 SeekToCurrentErrorHandler 时抛出异常的功能类似。 当通过组管理使用分区分配时确保 sleep 参数加上处理先前轮询的记录所花费的时间小于使用者 max.poll.interval.ms属性这个非常重要。 四、监听器容器自动启动
侦听器容器实现 SmartLifecycle并且 autoStartup 默认为 true。 容器在后期启动 (Integer.MAX-VALUE - 100)。 实现 SmartLifecycle 来处理来自侦听器的数据的其他组件应在早期阶段启动。 -100 为后续阶段留出了空间使组件能够在容器之后自动启动。