中国国家城乡建设部网站,网站开发与建设,小制作大全简单又漂亮,网站建设发布ps科技感目录 前言producer配置consumer 配置listener 配置 前言
在 Spring Kafka 中#xff0c;主要的配置分为三大块#xff0c;分别是producer、consumer、listener#xff0c;下面我们就按模块介绍各个模块的常用配置
producer配置
在 Spring Kafka 中#xff0c;spring.kaf… 目录 前言producer配置consumer 配置listener 配置 前言
在 Spring Kafka 中主要的配置分为三大块分别是producer、consumer、listener下面我们就按模块介绍各个模块的常用配置
producer配置
在 Spring Kafka 中spring.kafka.producer 用于配置 Kafka 生产者相关属性。下面是一些常用的 spring.kafka.producer 配置项的详解
spring.kafka.producer.bootstrap-servers 指定 Kafka 服务器的地址列表格式为 host:port多个地址使用逗号分隔。
spring.kafka.producer.key-serializer 用于配置 Kafka 生产者发送消息中键key的序列化器。可以是字符串形式的完全限定类名也可以是一个实现 org.apache.kafka.common.serialization.Serializer 接口的自定义序列化器类。
常见的键序列化器包括
org.apache.kafka.common.serialization.StringSerializer将键对象作为字符串进行序列化。 org.apache.kafka.common.serialization.IntegerSerializer将键对象作为整数进行序列化。 org.apache.kafka.common.serialization.ByteArraySerializer将键对象直接作为字节数组进行序列化。 自定义的序列化器根据自己的需求实现键的序列化逻辑。
在 Kafka 中每条消息都由一个键key和一个值value组成。键是一个可选的、用于标识消息的数据而值则是实际的消息内容。在发送消息时Kafka 生产者需要将键和值进行序列化以便能够在网络上传输和存储到 Kafka 服务器。
spring.kafka.producer.value-serializer 用于配置 Kafka 生产者发送消息中值value的序列化器类用法同上。
spring.kafka.producer.acks生产者发送消息的确认模式。可选的值有 “0”不需要任何确认、“1”只需要 Leader 确认和 “all”需要 Leader 和所有副本确认。
spring.kafka.producer.retries配置生产者在发生错误时的重试次数。
spring.kafka.producer.retry-backoff-ms配置重试之间的延迟时间默认为 100 毫秒。重试的间隔时间会随着重试次数的增加而指数级增长以避免过度负载和大量的重复请求。
spring.kafka.producer.batch-size配置每个批次中包含的消息大小。当应用程序使用 Kafka 生产者发送消息时发送单个消息可能会带来一些性能开销。为了减少这种开销可以将多个消息进行批量发送。spring.kafka.producer.batch-size 参数就是用来指定每个批次中包含的消息大小。
spring.kafka.producer.buffer-memory用于配置 Kafka 生产者的缓冲区内存大小的属性Kafka 生产者在发送消息时不会立即将消息发送到服务器而是先将消息缓存在生产者的缓冲区中。当缓冲区中的消息达到一定大小或达到一定时间限制时生产者才会批量地将消息发送到 Kafka 服务器。 该参数的单位是字节默认值是 33554432 字节32MB。
spring.kafka.producer.client-id配置生产者的客户端 ID如果你没有显式地设置该属性则 Kafka 生产者会自动生成一个随机的客户端 ID。使用自定义的客户端 ID 可以帮助你更好地追踪和监控不同的生产者实例
spring.kafka.producer.compression-type指定生产者使用的消息压缩类型
常见的压缩类型包括 none表示不使用压缩消息以原始的形式发送。 gzip表示使用 GZIP 压缩算法对消息内容进行压缩。 snappy表示使用 Snappy 压缩算法对消息内容进行压缩。 lz4表示使用 LZ4 压缩算法对消息内容进行压缩。
spring.kafka.producer.enable-idempotence启用生产者的幂等性确保消息的唯一性和顺序性。
在消息系统中幂等性是指多次执行同一个操作所产生的影响与执行一次操作的影响相同。而在 Kafka 中启用幂等性可以确保生产者发送的消息具有幂等性特性即无论发送多少次相同的消息最终的影响都是一样的。
启用幂等性可以提供以下好处
1、消息去重当生产者发送重复的消息时Kafka 会自动去重保证只有一条消息被写入。 2、顺序保证Kafka 会确保相同键的消息按照发送顺序进行处理保证消息的顺序性。 3、提高可靠性当发生网络故障或生产者重试时启用幂等性可以确保消息不会被重复发送避免出现重复消费的问题。
需要注意的是启用幂等性会对性能产生一定的影响因为 Kafka 生产者会为每个分区维护序列号和重试缓冲区。因此在性能和可靠性之间需要进行权衡根据具体的业务需求来决定是否启用幂等性。
spring.kafka.producer.max-in-flight-requests-per-connection指定在单个连接上允许的未完成请求的最大数目。
consumer 配置
pring.kafka.consumer 用于配置 Kafka 消费者相关属性下面是一些常见的 spring.kafka.consumer 配置属性及其作用
spring.kafka.consumer.bootstrap-servers指定 Kafka 服务器的地址列表格式为 host:port多个地址使用逗号分隔。
spring.kafka.consumer.group-id指定消费者所属的消费组的唯一标识符。
在 Kafka 中每个消费者都必须加入一个消费组Consumer Group才能进行消息的消费。消费组的作用在于协调多个消费者对消息的处理以实现负载均衡和容错机制。
具体来说spring.kafka.consumer.group-id 的作用包括以下几点
消费者协调Kafka 会根据 group-id 将不同的消费者分配到不同的消费组中不同的消费组之间相互独立。消费组内的消费者协调工作由 Kafka 服务器自动完成确保消息在消费组内得到均匀地分发。
负载均衡当多个消费者加入同一个消费组时Kafka 会自动对订阅的主题进行分区分配以实现消费者之间的负载均衡。每个分区只会分配给消费组内的一个消费者进行处理从而实现并行处理和提高整体的消息处理能力。
容错机制在消费组内如果某个消费者出现故障或者新的消费者加入Kafka 会自动重新平衡分区的分配确保各个分区的消息能够被有效地消费。
需要注意的是同一个消费组内的消费者共享消费位移offset即每个分区的消息只会被消费组内的一个消费者处理。因此同一个主题下的不同消费组是相互独立的不会进行负载均衡和消费位移的共享。
spring.kafka.consumer.key-deserializer指定键key的反序列化器。将从 Kafka 中读取的键字节流反序列化为对象。
spring.kafka.consumer.value-deserializer指定值value的反序列化器。将从 Kafka 中读取的值字节流反序列化为对象。
spring.kafka.consumer.enable-auto-commit指定是否开启自动提交消费位移offset的功能。设置为 true 则开启自动提交设置为 false 则需要手动调用 Acknowledgment 接口的 acknowledge() 方法进行位移提交。
spring.kafka.consumer.auto-commit-interval当开启自动提交时指定自动提交的间隔时间以毫秒为单位。
spring.kafka.consumer.auto-offset-reset指定当消费者加入一个新的消费组或者偏移量无效时的重置策略。常见的取值有 earliest从最早的偏移量开始消费和 latest从最新的偏移量开始消费。
spring.kafka.consumer.auto-offset-reset 属性有以下几种取值
latest表示从当前分区的最新位置开始消费即只消费从启动之后生产的消息不消费历史消息。 earliest表示从该分区的最早位置开始消费即包含历史消息和当前的消息。 none表示如果没有找到先前的消费者偏移量则抛出异常。
需要注意的是spring.kafka.consumer.auto-offset-reset 的默认值是 latest如果不设置该属性则新加入消费组的消费者将从该主题的最新位置开始消费。
spring.kafka.consumer.max-poll-records指定每次拉取的最大记录数。用于控制每次消费者向服务器拉取数据的数量默认为 500。
listener 配置
在 Spring 中是使用 Kafka 监听器来进行消息消费的spring.kafka.listener用来配置监听器的相关配置以下是一些常见的 spring.kafka.listener 相关配置及作用
spring.kafka.listener.concurrency指定监听器容器中并发消费者的数量。默认值为 1。通过设置并发消费者的数量可以实现多个消费者同时处理消息提高消息处理的吞吐量。
spring.kafka.listener .autoStartup指定容器是否在启动时自动启动。默认值为 true。可以通过设置为 false 来在应用程序启动后手动启动容器。
spring.kafka.listener.clientIdPrefix指定用于创建消费者的客户端 ID 的前缀。默认值为 “spring”.
spring.kafka.listener .ackMode指定消息确认模式包括 RECORD、BATCH 和 MANUAL_IMMEDIATE等。可根据需求选择不同的确认模式用于控制消息的确认方式。
spring.kafka.listener.ackCount当ackMode为COUNT”或者COUNT_TIME时处理多少个消息后才进行消息确认。
ackMode的详细介绍可以看我上一篇文章: ackMode详解
spring.kafka.listener.missing-topics-fatal配置当消费者订阅的主题不存在时的行为
当将 spring.kafka.listener.missing-topics-fatal 设置为 true 时如果消费者订阅的主题在 Kafka 中不存在应用程序会立即失败并抛出异常阻止消费者启动。这意味着应用程序必须依赖于确保所有订阅的主题都存在否则应用程序将无法正常运行。
当将 spring.kafka.listener.missing-topics-fatal 设置为 false 时如果消费者订阅的主题在 Kafka 中不存在应用程序将继续启动并等待主题出现。一旦主题出现消费者将开始正常地消费消息。这种情况下应用程序需要能够处理主题缺失的情况并在主题出现后自动适应。
默认情况下spring.kafka.listener.missing-topics-fatal 属性的值为 false这意味着如果消费者订阅的主题不存在应用程序将会等待主题出现而不会立刻失败。
spring.kafka.listener.syncCommits指定是否在关闭容器时同步提交偏移量。默认值为 false。可以通过设置为 true 来确保在关闭容器时同步提交偏移量。