富阳区住房与建设局网站,市场营销策划书,html做音乐网站模板,档案馆网站建设文章目录 1. PartitionInfo 分区源码2. Partitioner 分区器接口源码3. 自定义分区策略4. 轮询策略 RoundRobinPartitioner5. 黏性分区策略 UniformStickyPartitioner6. hash分区策略7. 默认分区策略 DefaultPartitioner 分区的作用就是提供负载均衡的能力#xff0c;或者说对数… 文章目录 1. PartitionInfo 分区源码2. Partitioner 分区器接口源码3. 自定义分区策略4. 轮询策略 RoundRobinPartitioner5. 黏性分区策略 UniformStickyPartitioner6. hash分区策略7. 默认分区策略 DefaultPartitioner 分区的作用就是提供负载均衡的能力或者说对数据进行分区的主要原因就是为了实现系统的高伸缩性。不同的分区能够被放置到不同节点的机器上而数据的读写操作也都是针对分区这个粒度而进行的这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且我们还可以通过添加新的节点机器来增加整体系统的吞吐量。
除了提供负载均衡这种最核心的功能之外利用分区也可以实现其他一些业务级别的需求比如实现业务级别的消息顺序的问题。
生产者发送的消息实体 ProducerRecord 的构造方法 我们发送消息时可以指定分区号如果不指定那就需要分区器这个很重要一条消息该发往哪一个分区关系到顺序消息问题。下面我们说说 Kafka 生产者的分区策略。所谓分区策略是决定生产者将消息发送到哪个分区的算法。Kafka 为我们提供了默认的分区策略同时它也支持你自定义分区策略。
1. PartitionInfo 分区源码
/*** This is used to describe per-partition state in the MetadataResponse.*/
public class PartitionInfo {// 表示该分区所属的主题名称。private final String topic;// 表示该分区的编号。private final int partition;// 表示该分区的领导者节点。private final Node leader;// 表示该分区的所有副本节点。private final Node[] replicas;// 表示该分区的所有同步副本节点。private final Node[] inSyncReplicas;// 表示该分区的所有离线副本节点。private final Node[] offlineReplicas;public PartitionInfo(String topic, int partition, Node leader, Node[] replicas, Node[] inSyncReplicas) {this(topic, partition, leader, replicas, inSyncReplicas, new Node[0]);}public PartitionInfo(String topic,int partition,Node leader,Node[] replicas,Node[] inSyncReplicas,Node[] offlineReplicas) {this.topic topic;this.partition partition;this.leader leader;this.replicas replicas;this.inSyncReplicas inSyncReplicas;this.offlineReplicas offlineReplicas;}// ....
}2. Partitioner 分区器接口源码
Kafka的Partitioner接口是用来决定消息被分配到哪个分区的。它定义了一个方法partition该方法接收三个参数topic、key和value返回一个int类型的分区号表示消息应该被分配到哪个分区。
public interface Partitioner extends Configurable {/*** Compute the partition for the given record.** param topic The topic name* param key The key to partition on (or null if no key)* param keyBytes The serialized key to partition on( or null if no key)* param value The value to partition on or null* param valueBytes The serialized value to partition on or null* param cluster The current cluster metadata*/int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);/*** This is called when partitioner is closed.*/default void close() {}
}Partitioner接口的实现类可以根据不同的业务需求来实现不同的分区策略例如根据消息的键、值、时间戳等信息来决定分区。
这里的topic、key、keyBytes、value和valueBytes都属于消息数据cluster则是集群信息。Kafka 给你这么多信息就是希望让你能够充分地利用这些信息对消息进行分区计算出它要被发送到哪个分区中。
3. 自定义分区策略
只要你自己的实现类定义好了 partition 方法同时设置partitioner.class 参数为你自己实现类的 Full Qualified Name那么生产者程序就会按照你的代码逻辑对消息进行分区。
① 实现自定义分区策略 DefinePartitioner
public class MyPartitioner implements Partitioner {private final AtomicInteger counter new AtomicInteger(0);Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 获取该 topic 可用的所有分区信息ListPartitionInfo partitionInfos cluster.availablePartitionsForTopic(topic);int size partitionInfos.size();if(keyBytesnull){// 如果 keyBytes 为 null表示该消息没有 key此时采用 round-robin 的方式将消息均匀地分配到不同的分区中。// 每次调用 getAndIncrement() 方法获取计数器的当前值并自增然后对可用分区数取模得到该消息应该被分配到的分区编号。return counter.getAndIncrement() % size;}else{// 如果 keyBytes 不为 null表示该消息有 key此时采用 murmur2 哈希算法将 key 转换为一个整数值并对可用分区数取模得到该消息应该被分配到的分区编号。return Utils.toPositive(Utils.murmur2(keyBytes) % size);}}Overridepublic void close() {}Overridepublic void configure(MapString, ? map) {}
}② 显式地配置生产者端的参数 partitioner.class
public class CustomProducer01 {private static final String brokerList 10.65.132.2:9093;private static final String topic test;public static Properties initConfig(){Properties properties new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 使用自定义分区器properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());return properties;}public static void main(String[] args) {// kafka生产者属性配置Properties properties initConfig();// kafka生产者发送消息默认是异步发送方式KafkaProducerString, String kafkaProducer new KafkaProducerString, String(properties);ProducerRecordString, String producerRecord new ProducerRecord(topic, 你好kafka,使用自定义分区器);kafkaProducer.send(producerRecord, new Callback() {Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(enull){System.out.println(recordMetadata发送的分区为recordMetadata.partition());}}});// 关闭资源kafkaProducer.close();}
}4. 轮询策略 RoundRobinPartitioner
也称 Round-robin 策略即顺序分配。比如一个主题下有 3 个分区那么第一条消息被发送到分区 0第二条被发送到分区 1第三条被发送到分区 2以此类推。当生产第 4 条消息时又会重新开始即将其分配到分区 0就像下面这张图展示的那样。 这就是所谓的轮询策略轮询策略是 Kafka Java 生产者 API 默认提供的分区策略。如果你未指定partitioner.class参数那么你的生产者程序会按照轮询的方式在主题的所有分区间均匀地“码放”消息。
轮询策略有非常优秀的负载均衡表现它总是能保证消息最大限度地被平均分配到所有分区上故默认情况下它是最合理的分区策略也是我们最常用的分区策略之一。
轮询策略实现类为 RoundRobinPartitioner实现源码
/*** The Round-Robin partitioner* * This partitioning strategy can be used when user wants * to distribute the writes to all partitions equally. This* is the behaviour regardless of record key hash. **/
public class RoundRobinPartitioner implements Partitioner {private final ConcurrentMapString, AtomicInteger topicCounterMap new ConcurrentHashMap();public void configure(MapString, ? configs) {}/*** Compute the partition for the given record.** param topic The topic name* param key The key to partition on (or null if no key)* param keyBytes serialized key to partition on (or null if no key)* param value The value to partition on or null* param valueBytes serialized value to partition on or null* param cluster The current cluster metadata*/Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 获取该 topic 所有的分区ListPartitionInfo partitions cluster.partitionsForTopic(topic);int numPartitions partitions.size();int nextValue nextValue(topic);// 获取该 topic 所有可用的分区ListPartitionInfo availablePartitions cluster.availablePartitionsForTopic(topic);if (!availablePartitions.isEmpty()) {// 取模这样获取的就是一个轮询的方式从可用的分区列表中获取分区// Utils.toPositive(nextValue) 的作用是将传入的参数 nextValue 转换为正数。// 如果 nextValue 是负数则返回 0否则返回 nextValue 的值。int part Utils.toPositive(nextValue) % availablePartitions.size();return availablePartitions.get(part).partition();} else {// no partitions are available, give a non-available partition// 取模这样获取的就是一个轮询的方式从分区列表中获取分区return Utils.toPositive(nextValue) % numPartitions;}}// 在ConcurrentMap中插入一个键值对如果该键不存在则使用提供的函数计算值并将其插入到Map中。// 如果该键已经存在则返回与该键关联的值。private int nextValue(String topic) {// 在ConcurrentMap中插入一个键值对如果该键不存在则使用AtomicInteger的默认值0初始化值// 如果该键已经存在则返回与该键关联的AtomicInteger对象。AtomicInteger counter topicCounterMap.computeIfAbsent(topic, k - {return new AtomicInteger(0);});// 使用返回的AtomicInteger对象对值进行原子操作增加值return counter.getAndIncrement();}public void close() {}}Kafka的RoundRobinPartitioner是一种分区策略它将消息依次分配到可用的分区中。具体来说它会维护一个计数器每次将消息分配到下一个分区直到计数器达到分区总数然后重新从第一个分区开始分配。这种策略可以确保消息在所有分区中均匀分布但可能会导致某些分区负载过重因为它无法考虑分区的实际负载情况。
5. 黏性分区策略 UniformStickyPartitioner
黏性分区策略会随机选择一个分区并尽可能一直使用该分区待该分区的batch已满或者已完成Kafka再随机选一个分区进行使用(和上一次的分区不同)。 Sticky Partitioning Strategy 会随机地选择一个分区并会尽可能地坚持使用该分区——即所谓的粘住这个分区。
kafka 在发送消息的时候 , 采用批处理方案 , 当达到一批后进行分送 , 但是如果一批数据中有不同分区的数据 , 就无法放置到一个批处理中, 而老版本2.4版本之前的轮询策略方案 , 就会导致一批数据被分到多个小的批次中 , 从而影响效率 , 故在新版本中 , 采用这种粘性的划分策略。
UniformStickyPartitioner 实现源码
/*** The partitioning strategy:* ul* liIf a partition is specified in the record, use it* liOtherwise choose the sticky partition that changes when the batch is full.* * NOTE: In constrast to the DefaultPartitioner, the record key is NOT used as part of the partitioning strategy in this * partitioner. Records with the same key are not guaranteed to be sent to the same partition.* * See KIP-480 for details about sticky partitioning.*/
public class UniformStickyPartitioner implements Partitioner {private final StickyPartitionCache stickyPartitionCache new StickyPartitionCache();public void configure(MapString, ? configs) {}/*** Compute the partition for the given record.** param topic The topic name* param key The key to partition on (or null if no key)* param keyBytes serialized key to partition on (or null if no key)* param value The value to partition on or null* param valueBytes serialized value to partition on or null* param cluster The current cluster metadata*/public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {return stickyPartitionCache.partition(topic, cluster);}public void close() {}/*** If a batch completed for the current sticky partition, change the sticky partition. * Alternately, if no sticky partition has been determined, set one.*/public void onNewBatch(String topic, Cluster cluster, int prevPartition) {stickyPartitionCache.nextPartition(topic, cluster, prevPartition);}
}分析 StickyPartitionCache 源码
/*** An internal class that implements a cache used for sticky partitioning behavior. The cache tracks the current sticky* partition for any given topic. This class should not be used externally. */
public class StickyPartitionCache {// ConcurrentMap类型的indexCache成员变量用于存储主题和其对应的粘性分区。private final ConcurrentMapString, Integer indexCache;public StickyPartitionCache() {this.indexCache new ConcurrentHashMap();}// 获取给定主题的当前粘性分区。如果该主题的粘性分区尚未设置则返回下一个分区。public int partition(String topic, Cluster cluster) {Integer part indexCache.get(topic);if (part null) {return nextPartition(topic, cluster, -1);}return part;}// 获取给定主题的下一个粘性分区。 public int nextPartition(String topic, Cluster cluster, int prevPartition) {ListPartitionInfo partitions cluster.partitionsForTopic(topic);// 获取给定主题的粘性分区Integer oldPart indexCache.get(topic);Integer newPart oldPart;// 如果该主题的粘性分区尚未设置则计算粘性分区if (oldPart null || oldPart prevPartition) {// 1. 计算分区号// 如果没有可用分区则从所有分区列表中随机选择一个可用分区ListPartitionInfo availablePartitions cluster.availablePartitionsForTopic(topic);if (availablePartitions.size() 1) {Integer random Utils.toPositive(ThreadLocalRandom.current().nextInt());newPart random % partitions.size();// 如果只有一个可用分区则选择该分区} else if (availablePartitions.size() 1) {newPart availablePartitions.get(0).partition();// 从可用分区列表中随机选择一个分区} else {while (newPart null || newPart.equals(oldPart)) {int random Utils.toPositive(ThreadLocalRandom.current().nextInt());newPart availablePartitions.get(random % availablePartitions.size()).partition();}}// 2. 填充 indexCacheif (oldPart null) {indexCache.putIfAbsent(topic, newPart);} else {indexCache.replace(topic, prevPartition, newPart);}return indexCache.get(topic);}return indexCache.get(topic);}
}6. hash分区策略
Kafka 允许为每条消息定义消息键简称为 Key。这个 Key 的作用非常大它可以是一个有着明确业务含义的字符串比如客户代码、部门编号或是业务 ID 等也可以用来表征消息元数据。特别是在 Kafka 不支持时间戳的年代在一些场景中工程师们都是直接将消息创建时间封装进 Key 里面的。一旦消息被定义了 Key那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面由于每个分区下的消息处理都是有顺序的故这个策略被称为按消息键保序策略如下图所示。 7. 默认分区策略 DefaultPartitioner
Kafka 默认使用的分区器为 DefaultPartitioner这是一个默认的分区策略实现类其分区策略如下
如果记录中指定了分区则使用该分区不会调用分区器接口实现类。如果记录中没有指定分区但有key则使用hash分区策略。如果记录中既没有指定分区也没有key则 kafka 2.4版本前使用轮询策略2.4版本后使用粘性分区策略。
/**The default partitioning strategy:If a partition is specified in the record, use itIf no partition is specified but a key is present choose a partition based on a hash of the keyIf no partition or key is present choose the sticky partition that changes when the batch is full.*/
public class DefaultPartitioner implements Partitioner {private final StickyPartitionCache stickyPartitionCache new StickyPartitionCache();public void configure(MapString, ? configs) {}/*** Compute the partition for the given record.** param topic The topic name* param key The key to partition on (or null if no key)* param keyBytes serialized key to partition on (or null if no key)* param value The value to partition on or null* param valueBytes serialized value to partition on or null* param cluster The current cluster metadata*/public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {return partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());}public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,int numPartitions) {// 如果没有指定key,则使用粘性分区策略if (keyBytes null) {return stickyPartitionCache.partition(topic, cluster);}// hash the keyBytes to choose a partition// Utils.murmur2(keyBytes) 是一个使用 MurmurHash2 算法计算给定字节数组的哈希值的方法。// 如果制定了key,则使用key的hash值对分区数取模得到分区。return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}public void close() {}/*** If a batch completed for the current sticky partition, change the sticky partition. * Alternately, if no sticky partition has been determined, set one.*/public void onNewBatch(String topic, Cluster cluster, int prevPartition) {stickyPartitionCache.nextPartition(topic, cluster, prevPartition);}
}基于 kafka 3.0 版本
① 如果记录中指定了分区则使用该分区此时不会进入任何分区器 public class KafkaProducerK, V implements ProducerK, V {// ...Overridepublic FutureRecordMetadata send(ProducerRecordK, V record, Callback callback) {ProducerRecordK, V interceptedRecord this.interceptors.onSend(record);return doSend(interceptedRecord, callback);}private FutureRecordMetadata doSend(ProducerRecordK, V record, Callback callback) {TopicPartition tp null;try {// ...int partition partition(record, serializedKey, serializedValue, cluster);// ...} }/*** computes partition for given record.* if the record has partition returns the value otherwise calls configured partitioner class to compute the partition.*/private int partition(ProducerRecordK, V record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {// 如果记录中指定了分区则使用该分区不会继续调用partitioner.partition()方法Integer partition record.partition();return partition ! null ?partition :partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);}
}② 如果记录中没有指定分区但有key则会使用hash分区策略计算分区 public class DefaultPartitioner implements Partitioner {private final StickyPartitionCache stickyPartitionCache new StickyPartitionCache();public void configure(MapString, ? configs) {}public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {return partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());}public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,int numPartitions) {if (keyBytes null) {return stickyPartitionCache.partition(topic, cluster);}// hash the keyBytes to choose a partition// 使用key的hash值对分区数取模得到分区return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}public void close() {}public void onNewBatch(String topic, Cluster cluster, int prevPartition) {stickyPartitionCache.nextPartition(topic, cluster, prevPartition);}
}③ 如果记录中既没有指定分区也没有key则会使用粘性分区策略计算分区