做外链选择那些网站,东莞做网站的网络公司,晋城 网站建设,专业网络营销外包公司1、Topic的分片和副本机制
分片作用#xff1a;
解决单台节点容量有限的问题#xff0c;节点多#xff0c;效率提升#xff0c;吞吐量提升。通过分片#xff0c;将一个大的容器分解为多个小的容器#xff0c;分布在不同的节点上#xff0c;从而实现分布式存储。
分片…1、Topic的分片和副本机制
分片作用
解决单台节点容量有限的问题节点多效率提升吞吐量提升。通过分片将一个大的容器分解为多个小的容器分布在不同的节点上从而实现分布式存储。
分片的数量没有限制与节点数量没有关系分片数量不会超过总节点数量的三倍。
副本作用
提升数据的可靠性副本越多数据越可靠但是数据冗余越高。
副本数量有限制最多和节点的数量相等但是一般构建1~3个之间。
2、Kafka如何保证数据不丢失
数据传输的三个阶段
生产者生产数据到broker
broker存储数据
消费者从broker上消费数据
1 生产端如何保证数据不丢失
生产数据到broker之后的响应机制
当生产者生产数据到Broker后Broker应该给于确认响应ack。
ack 确认机制主要有三种方案分别为0 1 -1(ALL)
0生产者只管将数据生产到Borker 不等待Broker返回的ack 信息
1生产者将数据生产到Broker需要等待Broker端Topic的对应分片上的主副本接收到消息后即为成功发送消息。
-1: 生产者将数据生产到broker需要等待broker端Topic的对应分片所有副本都接收到消息即为成功发送
生产中一般根据消息重要情况以及生成和消费速率来选择相应的级别。一般来说重要程度越高的安全级别越高速率越高优先保证安全性在此基础上保持平衡。
相关问题思考
1-生产者发送一条数据到BrokerBroker给于一次响应如果Broker迟迟不予响应怎么办
先等待然后重试最后报错先等待一段时间当超时后然后触发充重试策略进行重试操作当重试后依然没有响应最后程序报错停止发送。
2-生产者发送一条数据Broker就要给予一次响应那么这样是否会占用更多的带宽如果占用如何解决
肯定会影响可以引入缓存池生产者在生产数据的时候底层先将其放置到一个缓存池当池子中消息数据达到一批数据大小后会专门有一个子线程触发执行将数据生产到Broker此时Broker只需要对这一批数据给予一次响应即可异步发送。
3- 如果采用一批一批的发送如果Broker又没有给予响应但是此时缓存池中数据已经满了如何解决
可以选择直接清空缓存池或者不清空如果数据可以重复读直接报错清空即可后续重新读取数据即可如果数据不可重复读可以提前设置处理方案将每一个消息提前先找一个容器进行备份存储自己维护数据当发送成功删除一部分数据如果出错重启后先从这个容器将剩余的数据发送即可当然如果选择不清空那么一直等待即可。
相关的一些参数设置
buffer.memory 设置缓存池大小默认值3355443232M
retries 重试的次数默认值2147483647最终的重试策略取决于超时设置
batch.size 表示一批数据大小默认值1638416KB
delivery.timeout.ms 总超时时间默认值120000120S
requesst.timeout.ms 每一次请求后的超时时间等待时间默认值 3000030s
2Broker 如何保证数据不丢失
Broker可以将每个分片的副本数量设置为多个提供数据的可靠性同时还需要生产端将ACK设置为-1
3消费端如何保证数据不丢失
消费者连接kafka集群kafka收到请求后首先会根据group_id 查询上一次消费到了哪个消息偏移量如果没有找到默认从当前的位置开始消费数据之前的消息默认不处理如果找到了就从记录的消息偏移量位置继续消费数据即可
消费者消费完数据后会把对应的消息的偏移量信息重新提交给Broker记录
在提交偏移量的时候有两种提交方式自动提交偏移量手动提交偏移量
配置自动提交 consumer KafkaConsumer(test,bootstrap_servers[localhost:9092],group_idg_2,enable_auto_commitTrue,auto_commit_interval_ms1000)
手动提交 consumer KafkaConsumer(test,bootstrap_servers[localhost:9092],group_idg_2,enable_auto_commitFalse,auto_commit_interval_ms1000)consumer.commit() #同步提交
consumer.commit_async() #异步提交
3、Kafka中生产者的数据分发策略
分发策略生产者生产数据到Broker的某一Topic这个数据最终落入到那个分片的副本即是分发策略
1、Hash策略 -- 支持
def send(self, topic, valueNone, keyNone, headersNone, partitionNone, timestamp_msNone):
如果发送数据时指定了topic ,value , key, 即是采用hash 策略
相同key 的hash是一样的会分发到同一个分区 2、随机分发策略---python客户端支持Java客户端不支持
发送数据时如果只传递了Topic和value即是随机分发
3、轮询策略 -- 2.4版本以上修改为粘性策略2.4版本以下支持但是这两种方式Java客户端支持python不支持
4、指定分区的策略 -- 支持
当发送数据时如果指定了partition参数即是采用指定分区策略分区的编号从0 开始
5、自定义分区策略 -- 支持
from kafka.partitioner import DefaultPartitioner
参考Kafka的默认分发策略方法DefaultPartitioner def __call__(cls, key, all_partitions, available):Get the partition corresponding to key:param key: partitioning key:param all_partitions: list of all partitions sorted by partition ID:param available: list of available partitions in no particular order:return: one of the values from all_partitions or availableif key is None:if available:return random.choice(available)return random.choice(all_partitions)idx murmur2(key)idx 0x7fffffffidx % len(all_partitions)return all_partitions[idx]
自定义实现
class MyPartitioner(object):def __call__(self, key, all_partitions, available):# 实现分发策略return all_partitions[i]kafkaProducer KafkaProducer(bootstrap_servers[],ack-1,partitionerMyPartitioner()
)
4、kakfa的存储和查询机制
1、存储
数据存储在磁盘中位置取决于配置log.dirs参数
在对应目录下以topic名称分片号创建目录目录下有2个主要文件log文件和index文件
log文件存储消息数据前面的数字代表消息从哪个偏移量开始存储
index文件存索引数据用于加速查询
默认情况下当log文件达到1G时会拆分数据滚动形成一个新的log文件index文件也会随之产生
问题为什么不放在同一个文件
一个文件过大打开和关闭可能很耗费资源从一个log中检索数据相对也很慢影响效率。
Kafka本质上是一个消息队列的中间件仅仅负责消息的临时存储当消息超过一定的时间Kafka就会执行删除数据操作默认168小时
如果保存到一个文件删除数据就会挺慢。当一个log永远不会达到1G时就永远不会被删除。 2、查询
首先确定要读取的offset在那个segment 片段中
查询这个片段的index文件根据offset确定这个消息在log文件中的什么位置
读取log文件检索对应位置下的内容即可底层是基于磁盘顺序查询
获取最终的消息数据
5、kafka的消费者的负载均衡机制
假设生产者速率400条/分钟 消费者速率400条/分钟
随着发展生产者速率达到1200条/分钟但是消费者还是400条/分钟会造成什么问题
会造成大量的数据在Broker积压影响消息处理的时效性问题。增加消费者必须保证都在同一个消费者组内
随着发展生产者速率达到1600条/分钟但是消费者还是400条/分钟会造成什么问题
会造成大量的数据在Broker积压影响消息处理的时效性问题。此时再添加一个消费者但是总有一个消费者无法消费数据这是因为Kafka的消费者的负载均衡机制只能增加topic的分片数量。
消费者的负载均衡机制
1、在一个消费者组内消费者的数量最多和所监听的topic 分片数量是相等的如果有多余的消费者那么会出现某些消费者处于限制的状态
2、在一个消费者组内topic的一个分片的数据只能被一个消费者所消费不允许出现一个分片被组内的多个消费者消费的情况但一个消费者可以消费多个分区的数据 所以点对点和发布订阅的实现
点对点把监听这个topic的消费者全部放到同一个消费者组内这样消息必然只能有一个消费者消费
发布订阅把监听这个topic的消费者放置在不同的消费者组内这样消息就可以被多个消费者消费了