当前位置: 首页 > news >正文

营销型网站建设公司哪里有计算机大专生的出路

营销型网站建设公司哪里有,计算机大专生的出路,网站域名怎么查询,网站建设公司 中企动力公司基础知识 基本简介 Kafka 是一个分布式流式处理平台#xff0c;是一种分布式的#xff0c;基于发布/订阅的消息系统。 Kafka特点#xff1a; 1. 同时为发布和订阅提供高吞吐量 Kafka 的设计目标是以时间复杂度为 O(1) 的方式提供消息持久化能力#xff0c;即使对 TB 级以…基础知识 基本简介 Kafka 是一个分布式流式处理平台是一种分布式的基于发布/订阅的消息系统。 Kafka特点 1. 同时为发布和订阅提供高吞吐量 Kafka 的设计目标是以时间复杂度为 O(1) 的方式提供消息持久化能力即使对 TB 级以上数据也能保证常数时间的访问性能。即使在非常廉价的商用机器上也能做到单机支持每秒 100K 条消息的传输。 2. 消息持久化 将消息持久化到磁盘因此可用于批量消费例如 ETL 以及实时应用程序。通过将数据持久化到硬盘以及 replication 防止数据丢失。 3. 分布式 支持 Server 间的消息分区及分布式消费同时保证每个 partition 内的消息顺序传输。这样易于向外扩展所有的 producer、broker 和 consumer 都会有多个均为分布式的。无需停机即可扩展机器。 4. 消费消息采用 pull 模式 消息被处理的状态是在 consumer 端维护而不是由 server 端维护broker 无状态consumer 自己保存 offset。 5. 支持 online 和 offline 的场景 同时支持离线数据处理和实时数据处理。 基础概念 1. Broker Kafka 集群中的一台或多台服务器统称为 Broker多个 Kafka Broker 组成一个 Kafka Cluster。 2. Producer 消息和数据的生产者可以理解为往 Kafka 发消息的客户端 3. Consumer 消息和数据的消费者可以理解为从 Kafka 取消息的客户端 4. Topic 每条发布到 Kafka 的消息都有一个类别即为 Topic 。 Producer 将消息发送到指定的TopicConsumer 通过订阅特定的Topic来消费消息。 物理上不同 Topic 的消息分开存储。一个 Topic 的消息可以保存于一个或多个broker上但用户只需指定消息的 Topic 即可生产或消费数据。 5. Partition 是Topic 物理上的分组一个 Topic 可以分为多个 Partition 同一 Topic 下的 Partition 可以分布在不同的 Broker 上这也说明一个 Topic 可以横跨多个 Broker 。 每个 Partition 是一个有序的队列Partition 中的每条消息都会被分配一个有序的 idoffset所有的partition当中的数据全部合并起来就是一个topic当中的所有的数据。 每一个分区内的数据是有序的但全局的数据不能保证是有序的。有序是指生产什么样顺序消费时也是什么样的顺序 生产者生产的每条消息只会被发送到一个分区中也就是说如果向一个双分区的主题发送一条消息这条消息要么在分区 0 中要么在分区 1 中。 6. Consumer Group 每个 Consumer 属于一个特定的 Consumer Group若不指定则属于默认的 Group。 这是 Kafka 用来实现一个 Topic 消息的广播发给所有的 Consumer 和单播发给任意一个 Consumer 的手段。 一个 Topic 可以有多个 Consumer Group。Topic 的消息会复制不是真的复制是概念上的到所有的 Consumer Group但每个 Consumer Group 只会把消息发给该 Consumer Group 中的一个 Consumer。 如果要实现广播只要让每个 Consumer 有一个独立的 Consumer Group 即可。如果要实现单播只要所有的 Consumer 在同一个 Consumer Group 。 用 Consumer Group 还可以将 Consumer 进行自由的分组而不需要多次发送消息到不同的 Topic 。 从分区的角度看每个分区只能由同一个消费组内的一个消费者来消费可以由不同的消费组来消费partition数量决定了每个consumer group中并发消费者的最大数量。 消费某一主题的一个消费组下的消费者数量应该小于等于该主题下的分区数。 如某一个主题有4个分区那么消费组中的消费者应该小于等于4而且最好与分区数成整数倍 1 2 4 这样。同一个分区下的数据在同一时刻不能由同一个消费组的不同消费者消费。 7. 备份机制 就是把相同的数据拷贝到多台机器上而这些相同的数据拷贝被称为副本。 Kafka定义了两类副本领导者副本和追随者副本前者对外提供服务即与客户端程序进行交互而后者从 leader 副本中拉取消息进行同步不与外界进行交互。 8. segment文件 一个partition由多个segment文件组成每个segment文件包含多类文件比较主要的有两部分分别是 .log 文件和 .index 文件。 其中 .log 文件包含了我们发送的数据存储而.index 文件记录的是我们.log文件的数据索引值以便于我们加快数据的查询速度.index文件中元数据指向对应 .log 文件中message的物理偏移地址。 index文件采用了稀疏存储的方式对每隔一定字节的数据建立一条索引。 这样避免了索引文件占用过多的空间从而可以将索引文件保留在内存中。 但缺点是没有建立索引的Message也不能一次定位到其在数据文件的位置从而需要做一次顺序扫描。 生产者分区以及策略 Kafka的消息组织方式实际上是三级结构主题-分区-消息。 主题下的每条消息只会保存在某一个分区中不同的分区能够被放置到不同的机器上。而数据的读写操作也都是针对分区这个粒度而进行的这样每个节点的机器都能独立地执行各自分区的读写请求处理并且我们还可以通过添加新的节点机器来增加整体系统的吞吐量。 其实分区的作用就是提供负载均衡的能力或者说对数据进行分区的主要原因就是为了实现系统的高伸缩性Scalability。 分区策略 自定义分区策略 在编写生产者程序时可以编写一个具体的类实现org.apache.kafka.clients.producer.Partitioner接口。 这个接口只定义了两个方法partition()和close()通常只需要实现partition()方法。 同时设置partitioner.class参数为实现类的Full Qualified Name那么生产者程序就会按照自定义的代码逻辑对消息进行分区 轮询策略 轮询策略有非常优秀的负载均衡表现它总是能保证消息最大限度地被平均分配到所有分区上是最常用的分区策略之一。 如果一个主题下有3个分区那么第一条消息被发送到分区0第二条被发送到分区1第三条被发送到分区2以此类推。 随机策略 所谓随机就是将消息放置到任意一个分区上。 如果要实现随机策略版的partition方法需要两行代码 List partitions cluster.partitionsForTopic(topic); return ThreadLocalRandom.current().nextInt(partitions.size());先计算出该主题总的分区数然后随机地返回一个小于它的正整数。 按消息键保序策略 Kafka允许为每条消息定义消息键简称为Key。 Key可以是一个有着明确业务含义的字符串比如客户代码、部门编号或是业务ID等也可以用来表征消息元数据。 一旦消息被定义了Key就可以保证同一个Key的所有消息都进入到相同的分区里面由于每个分区下的消息处理都是有顺序的故这个策略被称为按消息键保序策略。 实现这个策略的partition方法只需要下面两行代码 List partitions cluster.partitionsForTopic(topic); return Math.abs(key.hashCode()) % partitions.size();Kafka默认分区策略实际上同时实现了两种策略如果指定了Key那么默认实现按消息键保序策略如果没有指定Key则使用轮询策略。 基于地理位置的分区策略 这种策略一般只针对那些大规模的Kafka集群特别是跨城市、跨国家甚至是跨大洲的集群。 可以根据Broker所在的IP地址实现定制化的分区策略。比如下面这段代码: List partitions cluster.partitionsForTopic(topic); return partitions.stream().filter(p - isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();生产者压缩算法 在Kafka中压缩可能发生在两个地方生产者端和Broker端。 生产者程序中配置compression.type参数即表示启用指定类型的压缩算法。 比如下面这段程序代码展示了如何构建一个开启GZIP的Producer对象 Properties props new Properties(); props.put(bootstrap.servers, localhost:9092); props.put(acks, all); props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer); props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer); // 开启GZIP压缩 props.put(compression.type, gzip); Producer producer new KafkaProducer(props);这样Producer启动后生产的每个消息集合都是经GZIP压缩过的故而能很好地节省网络传输带宽以及Kafka Broker端的磁盘占用。 有两种例外情况就可能让Broker重新压缩消息 「情况一Broker端指定了和Producer端不同的压缩算法。」 在Broker端设置了不同的compression.type值可能会发生预料之外的压缩/解压缩操作通常表现为Broker端CPU使用率飙升。 「情况二Broker端发生了消息格式转换。」 所谓的消息格式转换主要是为了兼容老版本的消费者程序。 在一个生产环境中Kafka集群可能同时存在多种版本的消息格式。为了兼容老版本的格式Broker端会对新版本消息执行向老版本格式的转换。 这个过程中会涉及消息的解压缩和重新压缩。一般情况下这种消息格式转换对性能是有很大影响的除了这里的压缩之外它还让Kafka丧失了Zero Copy特性。 「何时解压缩」 通常来说解压缩发生在消费者程序中也就是说Producer发送压缩消息到Broker后Broker照单全收并原样保存起来。当Consumer程序请求这部分消息时Broker依然原样发送出去当消息到达Consumer端后由Consumer自行解压缩还原成之前的消息。 「基本过程Producer端压缩、Broker端保持、Consumer端解压缩。」 注意除了在Consumer端解压缩Broker端也会进行解压缩。 每个压缩过的消息集合在Broker端写入时都要发生解压缩操作目的就是为了对消息执行各种验证。 我们必须承认这种解压缩对Broker端性能是有一定影响的特别是对CPU的使用率而言。 **启用压缩比较合适的时机**启用压缩的一个条件就是Producer程序运行机器上的CPU资源要很充足。如果环境中带宽资源有限那么建议开启压缩。 消费者组 Consumer Group是Kafka提供的可扩展且具有容错性的消费者机制。 组内可以有多个消费者或消费者实例它们共享一个公共的ID这个ID被称为Group ID。组内的所有消费者协调在一起来消费订阅主题的所有分区。 Consumer Group三个特性: Consumer Group下可以有一个或多个Consumer实例这里的实例可以是一个单独的进程也可以是同一进程下的线程。Group ID是一个字符串在一个Kafka集群中它标识唯一的一个Consumer Group。Consumer Group下所有实例订阅的主题的单个分区只能分配给组内的某个Consumer实例消费这个分区当然也可以被其他的Group消费。 当Consumer Group订阅了多个主题后组内的每个实例不要求一定要订阅主题的所有分区它只会消费部分分区中的消息。 各个Consumer Group之间彼此独立互不影响它们能够订阅相同的一组主题而互不干涉。 Kafka使用Consumer Group这一种机制同时实现了传统消息引擎系统的两大模型: 如果所有实例都属于同一个Group那么它实现的就是消息队列模型如果所有实例分别属于不同的Group那么它实现的就是发布/订阅模型。 理想情况下Consumer实例的数量应该等于该Group订阅主题的分区总数。假设一个Consumer Group订阅了3个主题分别是A、B、C它们的分区数依次是1、2、3那么通常情况下为该Group设置6个Consumer实例是比较理想的情形因为它能最大限度地实现高伸缩性。 针对Consumer GroupKafka管理位移的方式老版本的Consumer Group把位移保存在ZooKeeper中在新版本中采用了将位移保存在Kafka内部主题的方法这个内部主题就是__consumer_offsets。 ConsumerOffsets Kafka将Consumer的位移数据作为一条条普通的Kafka消息提交到__consumer_offsets中。__consumer_offsets的主要作用是保存Kafka消费者的位移信息。 __consumer_offsets主题就是普通的Kafka主题但它的消息格式却是Kafka自己定义的用户不能修改。其有3种消息格式 用于保存Consumer Group信息的消息。用于删除Group过期位移以及删除Group的消息保存了位移值。 第2种格式名为tombstone消息即墓碑消息也称delete mark它的主要特点是它的消息体是null即空消息体。 一旦某个Consumer Group下的所有Consumer实例都停止了而且它们的位移数据都已被删除时Kafka会向__consumer_offsets主题的对应分区写入tombstone消息表明要彻底删除这个Group的信息。 当Kafka集群中的第一个Consumer程序启动时Kafka会自动创建位移主题。默认该主题的分区数是50副本数是3。 目前Kafka Consumer提交位移的方式有两种自动提交位移和手动提交位移。 如果你选择的是自动提交位移那么就可能存在一个问题只要Consumer一直启动着它就会无限期地向位移主题写入消息。 假设Consumer当前消费到了某个主题的最新一条消息位移是100之后该主题没有任何新消息产生故Consumer无消息可消费了所以位移永远保持在100。由于是自动提交位移位移主题中会不停地写入位移100的消息。 Kafka使用Compact策略来删除__consumer_offsets主题中的过期消息避免该主题无限期膨胀。Compact的过程就是扫描日志的所有消息剔除那些过期的消息然后把剩下的消息整理在一起。 Kafka提供了专门的后台线程Log Cleaner定期地巡检待Compact的主题看看是否存在满足条件的可删除数据。 位移提交 因为Consumer能够同时消费多个分区的数据所以位移的提交实际上是在分区粒度上进行的即Consumer需要为分配给它的每个分区提交各自的位移数据。 位移提交分为自动提交和手动提交从Consumer端的角度来说位移提交分为同步提交和异步提交。 开启自动提交位移的方法Java Consumer默认就是自动提交位移的控制的参数是enable.auto.commit。 如果启用了自动提交可以通过参数auto.commit.interval.ms控制提交频率。它的默认值是5秒表明Kafka每5秒会为你自动提交一次位移。 Kafka会保证在开始调用poll方法时提交上次poll返回的所有消息。从顺序上来说poll方法的逻辑是先提交上一批消息的位移再处理下一批消息因此它能保证不出现消费丢失的情况。 但自动提交位移的一个问题在于它可能会出现重复消费。而手动提交的好处就在于更加灵活你完全能够把控位移提交的时机和频率。 开启手动提交位移的方法就是设置enable.auto.commit为false还需要调用相应的API手动提交位移。 最简单的API就是KafkaConsumer#commitSync()。该方法会提交KafkaConsumer#poll()返回的最新位移。它是一个同步操作即该方法会一直等待直到位移被成功提交才会返回。如果提交过程中出现异常该方法会将异常信息抛出。实例如下 while (true) {ConsumerRecords records consumer.poll(Duration.ofSeconds(1));process(records); // 处理消息try {consumer.commitSync();} catch (CommitFailedException e) {handle(e); // 处理提交失败异常} }在调用commitSync()时Consumer程序会处于阻塞状态直到远端的Broker返回提交结果这个状态才会结束。 鉴于这个问题可以使用异步提交KafkaConsumer#commitAsync()Kafka提供了回调函数callback处理提交之后的逻辑比如记录日志或处理异常等。 while (true) {ConsumerRecords records consumer.poll(Duration.ofSeconds(1));process(records); // 处理消息consumer.commitAsync((offsets, exception) - {if (exception ! null)handle(exception);}); }commitAsync的问题在于出现问题时不会自动重试。如果是手动提交需要将commitSync和commitAsync组合使用才能到达最理想的效果 try {while(true) {ConsumerRecords records consumer.poll(Duration.ofSeconds(1));process(records); // 处理消息consumer.commitAysnc(); // 使用异步提交规避阻塞} } catch(Exception e) {handle(e); // 处理异常 } finally {try {consumer.commitSync(); // 最后一次提交使用同步阻塞式提交} finally {consumer.close();} }Kafka Consumer API为手动提交提供了这样的方法 commitSync(Map)和commitAsync(Map)。在poll返回大量消息时可以控制处理完部分消息就提交位移。它们的参数是一个Map对象键就是TopicPartition即消费的分区而值是一个OffsetAndMetadata对象保存的主要是位移数据。 private Map offsets new HashMap(); int count 0; …… while (true) {ConsumerRecords records consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord record: records) {process(record); // 处理消息offsets.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() 1)ifcount % 100 0consumer.commitAsync(offsets, null); // 回调处理逻辑是nullcount;} }但实际目前一般使用Spring Kafka在这种Kafka消费者配置中消息的拉取和提交是逐条进行的: KafkaListener(topics DemoMessage.TOPIC,groupId demo-consumer-group- DemoMessage.TOPIC)public void onMessage(DemoMessage message, Acknowledgment acknowledgment) {log.info([onMessage][线程编号:{} 消息内容{}], Thread.currentThread().getId(), message);// 提交消费进度acknowledgment.acknowledge();}消费者策略 Round 默认也叫轮询说的是对于同一组消费者来说使用轮询分配的方式决定消费者消费的分区 Range 消费方式是以分区总数除以消费者总数来决定一般如果不能整除往往是从头开始将剩余的分区分配好 Sticky Round和Range方式当同组内有新的消费者加入或者旧的消费者退出的时候会从新开始决定消费者消费方式。 但是Sticky在同组中有新的新的消费者加入或者旧的消费者退出时不会直接开始新的分配而是保留现有消费者原来的消费策略将退出的消费者所消费的分区平均分配给现有消费者新增消费者同理同其他现存消费者的消费策略中分离。 重平衡 Rebalance本质上是一种协议规定了一个Consumer Group下的所有Consumer如何达成一致来分配订阅Topic的每个分区。 比如某个Group下有20个Consumer实例它订阅了一个具有100个分区的Topic。正常情况下Kafka平均会为每个Consumer分配5个分区。这个分配的过程就叫Rebalance。 Rebalance的触发条件 组成员数发生变更。比如有新的Consumer实例加入组或者离开组或是有Consumer实例崩溃被踢出组。订阅主题数发生变更。Consumer Group可以使用正则表达式的方式订阅主题比如consumer.subscribe(Pattern.compile(“t.*c”))就表明该Group订阅所有以字母t开头、字母c结尾的主题在Consumer Group的运行过程中你新创建了一个满足这样条件的主题那么该Group就会发生Rebalance。订阅主题的分区数发生变更会触发订阅该主题的所有Group开启Rebalance。 Rebalance发生时Group下所有的Consumer实例都会协调在一起共同参与。 多副本机制 Kafka 为分区Partition引入了多副本Replica机制上节的备份机制中也对此有一些介绍。 副本角色 一个Partition可以有一个 leader 副本和多个 follower副本这些副本分散保存在不同的Broker上而一个topic的不同Partition的leader副本也会尽量被分配在不同的broker针对同一个Partition在同一个broker节点上不可能出现它的多个副本。 生产者发送的消息会被发送到 leader 副本随后 follower 副本从 leader 副本中拉取消息进行同步也即生产者和消费者只与 leader 副本交互而同一个Partition下的所有副本保存有相同的消息序列。 也可以理解为其他副本只是 leader 副本的拷贝它们的存在只是为了保证消息存储的安全性。 当 leader 副本发生故障时会从 follower 中选举出一个 leader但是 follower 中如果有和 leader 同步程度达不到要求的参加不了 leader 的竞选。旧的Leader副本恢复后只能作为follower副本加入到集群中。 ISR副本集合 ISR中的副本都是与Leader同步的副本不在ISR中的follower副本就被认为是与Leader不同步的Leader副本默认在ISR中。 只要一个Follower副本落后Leader副本的时间不连续超过replica.lag.time.max.ms秒那么Kafka就认为该Follower副本与Leader是同步的即使此时Follower副本中的消息少于Leader副本。 其中参数replica.lag.time.max.ms是在Broker端设置的默认为10s。 Unclean领导者选举 所有不在ISR中的存活副本都称为非同步副本在Kafka中选举这种副本的过程称为Unclean领导者选举。 开启Unclean领导者选举可能会造成数据丢失优势在于它使得分区Leader副本一直存在不至于停止对外提供服务因此提升了高可用性。反之禁止Unclean领导者选举的好处在于维护了数据的一致性避免了消息丢失但牺牲了高可用性。 其中可以通过Broker端参数unclean.leader.election.enable控制是否允许Unclean领导者选举。 副本选举 kafka引入了优先副本的概念即在副本集合列表中的第一个副本在理想状态下就是该分区的leader副本。 例如kafka集群由3台broker组成创建了一个topic设置partition为3副本数为3partition0中副本集合列表为 [1,2,0]那么分区0的优先副本为1。 当 leader 副本发生故障时会从 follower 中选举出一个 leader当原来leader的节点恢复之后它只能成为一个follower节点此时就导致了集群负载不均衡。 为了解决上问题kafka支持了优先副本选举此时只要再触发一次优先副本选举就能保证分区负载均衡。 kafka支持自动优先副本选举功能默认每5分钟触发一次优先副本选举操作。 优势 Kafka 通过给特定 Topic 指定多个 Partition, 而各个 Partition 可以分布在不同的 Broker 上, 这样便能提供比较好的并发能力负载均衡。 Partition 可以指定对应的 Replica 数提高了消息存储的安全性和容灾能力不过也相应的增加了所需要的存储空间。 幂等性producer 在Kafka中Producer默认不是幂等性的但0.11.0.0版本引入了幂等性Producer仅需要设置一个参数即可 props.put(“enable.idempotence”, ture) 或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG true)。底层具体的原理很简单就是经典的用空间去换时间的优化思路即在Broker端多保存一些字段。当Producer发送了具有相同字段值的消息后Broker能够自动知晓这些消息已经重复了于是可以在后台默默地把它们丢弃掉。 幂等性Producer的作用范围: 它只能保证单分区上的幂等性即一个幂等性Producer能够保证某个主题的一个分区上不出现重复消息它无法实现多个分区的幂等性。它只能实现单会话上的幂等性不能实现跨会话的幂等性。这里的会话可以理解为Producer进程的一次运行当你重启了Producer进程之后这种幂等性保证就丧失了。 事务 目前隔离级别主要是read committed。事务型Producer能保证多条消息原子性地写入到目标分区。 这批消息要么全部写入成功要么全部失败即使Producer发生了重启Kafka依然保证它们发送消息的精确一次处理。 设置事务型Producer的方法也很简单满足两个要求即可 和幂等性Producer一样开启enable.idempotence true。设置Producer端参数transactional.id最好为其设置一个有意义的名字。 此外需要在Producer代码中做一些调整如这段代码所示 producer.initTransactions(); try {producer.beginTransaction();producer.send(record1);producer.send(record2);producer.commitTransaction(); } catch (KafkaException e) {producer.abortTransaction(); }事务型Producer的调用了一些事务API如initTransaction、beginTransaction、commitTransaction和abortTransaction分别对应事务的初始化、事务开始、事务提交以及事务终止。 这段代码能够保证Record1和Record2被当作一个事务统一提交到Kafka要么它们全部提交成功要么全部写入失败。 实际上即使写入失败Kafka也会把它们写入到底层的日志中也就是说Consumer还是会看到这些消息。 有一个isolation.level参数这个参数有两个取值 read_uncommitted这是默认值表明Consumer能够读取到Kafka写入的任何消息不论事务型Producer提交事务还是终止事务其写入的消息都可以读取如果你用了事务型Producer那么对应的Consumer就不要使用这个值。read_committed表明Consumer只会读取事务型Producer成功提交事务写入的消息它也能看到非事务型Producer写入的所有消息。 拦截器 Kafka拦截器分为生产者拦截器和消费者拦截器。 生产者拦截器支持在发送消息前以及消息提交成功后编写特定逻辑消费者拦截器支持在消费消息前以及提交位移后编写特定逻辑。 可以将一组拦截器串连成一个大的拦截器Kafka会按照添加顺序依次执行拦截器逻辑。 Kafka拦截器的设置方法是通过参数配置完成的生产者和消费者两端有一个相同的参数interceptor.classes它指定的是一组类的列表每个类就是特定逻辑的拦截器实现类。 Properties props new Properties(); List interceptors new ArrayList(); interceptors.add(com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor); // 拦截器1 interceptors.add(com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor); // 拦截器2 props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); 这两个类以及编写的所有Producer端拦截器实现类都要继承org.apache.kafka.clients.producer.ProducerInterceptor接口里面有两个核心的方法。 onSend该方法会在消息发送之前被调用。onAcknowledgement该方法会在消息成功提交或发送失败之后被调用。onAcknowledgement的调用要早于callback的调用。值得注意的是这个方法和onSend不是在同一个线程中被调用的因此如果在这两个方法中调用了某个共享可变对象要保证线程安全。 同理指定消费者拦截器也是同样的方法只是具体的实现类要实现org.apache.kafka.clients.consumer.ConsumerInterceptor接口这里面也有两个核心方法。 onConsume该方法在消息返回给Consumer程序之前调用。onCommitConsumer在提交位移之后调用该方法。 一定要注意的是指定拦截器类时要指定它们的全限定名。 控制器 控制器组件Controller它的主要作用是在Apache ZooKeeper的帮助下管理和协调整个Kafka集群。Kafka控制器大量使用ZooKeeper的Watch功能实现对集群的协调管理。 集群中任意一台Broker都能充当控制器的角色但是在运行过程中只能有一个Broker成为控制器行使其管理和协调的职责。 Broker在启动时会尝试去ZooKeeper中创建/controller节点。Kafka当前选举控制器的规则是第一个成功创建/controller节点的Broker会被指定为控制器。 控制器的职责大致可以分为5种 主题管理创建、删除、增加分区控制器帮助我们完成对Kafka主题的创建、删除以及分区增加的操作。分区重分配Preferred领导者选举Preferred领导者选举主要是Kafka为了避免部分Broker负载过重而提供的一种换Leader的方案。集群成员管理 包括自动检测新增Broker、Broker主动关闭及被动宕机。这种自动检测是依赖于Watch功能和ZooKeeper临时节点组合实现的。 比如控制器组件会利用Watch机制检查ZooKeeper的/brokers/ids节点下的子节点数量变更。 目前当有新Broker启动后它会在/brokers下创建专属的znode节点。一旦创建完毕ZooKeeper会通过Watch机制将消息通知推送给控制器这样控制器就能自动地感知到这个变化进而开启后续的新增Broker作业。 侦测Broker存活性则是依赖于刚刚提到的另一个机制临时节点。 每个Broker启动后会在/brokers/ids下创建一个临时znode。当Broker宕机或主动关闭后该Broker与ZooKeeper的会话结束这个znode会被自动删除。 同理ZooKeeper的Watch机制将这一变更推送给控制器这样控制器就能知道有Broker关闭或宕机了从而进行善后。数据服务控制器上保存了最全的集群元数据信息其他所有Broker会定期接收控制器发来的元数据更新请求从而更新其内存中的缓存数据。 日志存储 Kafka中的消息是以主题为基本单位进行归类的每个主题在逻辑上相互独立。每个主题又可以分为一个或多个分区在不考虑副本的情况下一个分区会对应一个日志。 但随着时间推移日志文件会不断扩大因此引入了日志分段LogSegment的概念将Log切分为多个LogSegment便于后续的消息维护和清理工作。 下图描绘了主题、分区、副本、Log、LogSegment五者之间的关系。 在Kafka中每个Log对象可以划分为多个LogSegment文件每个LogSegment文件包括数据文件、索引文件、时间戳文件等。 其中每个LogSegment中的日志数据文件大小均相等该日志数据文件的大小可以通过在Kafka Broker的config/server.properties配置文件的中的log.segment.bytes进行设置默认为1G大小1073741824字节在顺序写入消息时如果超出该设定的阈值将会创建一组新的日志数据和索引文件。 Kafka 如何保证消息的消费顺序 kafka的topic是无序的但是一个topic包含多个partition每个partition内部是有序的。 生产者控制 消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量offsetKafka 通过偏移量offset来保证消息在分区内的顺序性。 所以1 个 Topic 只对应一个 Partition就可以保证消息消费顺序但是破坏了 Kafka 的设计初衷。 Kafka 中发送 1 条消息的时候可以指定 topic, partition, key,data数据 4 个参数。如果发送消息的时候指定了 Partition 的话所有消息都会被发送到指定的 Partition。同一个 key 的消息可以保证只发送到同一个 partition可以采用表/对象的 id 来作为 key 。 因此对于如何保证 Kafka 中消息消费的顺序有下面两种方法 不推荐1 个 Topic 只对应一个 Partition。发送消息的时候指定 key/Partition。 消费者控制 对于同一业务进入了同一个消费者组之后用了多线程来处理消息会导致消息的乱序。 消费者内部根据线程数量创建等量的内存队列对于需要顺序的一系列业务数据根据key或者业务数据放到同一个内存队列中然后线程从对应的内存队列中取出并操作 通过设置相同key来保证消息有序性会有一点缺陷 例如消息发送设置了重试机制并且异步发送消息A和B设置相同的key业务上A先发B后发由于网络或者其他原因A发送失败B发送成功A由于发送失败就会重试且重试成功这时候消息顺序B在前A在后与业务发送顺序不一致。 如果需要解决这个问题需要设置参数max.in.flight.requests.per.connection1其含义是限制客户端在单个连接上能够发送的未响应请求的个数设置此值是1表示在 broker响应请求之前producer不能再向同一个broker发送请求这个参数默认值是5。 Kafka 如何保证消息不丢失 生产者丢失消息的情况 生产者调用send方法发送消息之后消息可能因为网络问题并没有发送过去。所以不能默认在调用send方法发送消息之后消息发送成功了。 为了确定消息是发送成功要判断消息发送的结果。但是要注意的是 生产者使用 send 方法发送消息实际上是异步的操作可以通过 get()方法获取调用结果但是这样也让它变为了同步操作示例代码如下 SendResultString, Object sendResult kafkaTemplate.send(topic, o).get(); if (sendResult.getRecordMetadata() ! null) {logger.info(生产者成功发送消息到 sendResult.getProducerRecord().topic() - sendResult.getProducerRecord().value().toString()); }所以一般不推荐这么做可以采用为其添加回调函数的形式示例代码如下 ListenableFutureSendResultString, Object future kafkaTemplate.send(topic, o); future.addCallback(result - logger.info(生产者成功发送消息到topic:{} partition:{}的消息, result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),ex - logger.error(生产者发送消失败原因{}, ex.getMessage()));另外推荐为 Producer 的retries重试次数设置一个比较合理的值一般是 3 但是为了保证消息不丢失的话一般会设置比较大一点。设置完成之后当出现网络问题之后能够自动重试消息发送避免消息丢失。 另外建议还要设置重试间隔因为间隔太小的话重试的效果就不明显了。 消费者丢失消息的情况 我们知道消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量offset。偏移量offset)表示 Consumer 当前消费到的 Partition(分区)的所在的位置。Kafka 通过偏移量offset可以保证消息在分区内的顺序性。 当消费者拉取到了分区的某个消息之后消费者会自动提交了 offset。自动提交的话会有一个问题当消费者刚拿到这个消息准备进行真正消费的时候突然挂掉了消息实际上并没有被消费但是 offset 却被自动提交了。 可以手动关闭自动提交 offset每次在真正消费完消息之后再自己手动提交 offset 。 但是这样会带来消息被重新消费的问题。比如刚消费完消息之后提交 offset前服务挂掉了那么这个消息理论上就会被消费两次所以还要考虑消费幂等性。 Kafka 弄丢了消息 Kafka 为分区引入了多副本机制。分区中的多个副本之间会有一个 leader 和多个follower。生产者发送的消息会被发送到 leader 副本然后 follower 副本才能从 leader 副本中拉取消息进行同步。 假如 leader 副本所在的 broker 突然挂掉那么就要从 follower 副本重新选出一个 leader 但是 leader 的数据还有一些没有被 follower 副本的同步的话就会造成消息丢失。以下是解决方式 设置 acks all 解决办法就是我们设置 acks all。acks 的默认值即为 1代表消息被 leader 副本接收之后就算被成功发送。 配置 acks all 表示只有所有 ISR 列表的副本全部收到消息时生产者才会接收到来自服务器的响应。这种模式是最高级别的也是最安全的可以确保不止一个 Broker 接收到了消息。该模式的延迟会很高。设置 replication.factor 3 为了保证 leader 副本能有 follower 副本能同步消息一般为 topic 设置 replication.factor 3。这样就可以保证每个分区至少有 3 个副本。设置 min.insync.replicas 1 一般情况下还需要设置 min.insync.replicas 1 这样配置代表消息至少要被写入到 2 个副本才算是被成功发送。min.insync.replicas 的默认值为 1 在实际生产中应尽量避免默认值 1。设置replication.factor min.insync.replicas 假如两者相等的话只要是有一个副本挂掉整个分区就无法正常工作了。一般推荐设置成 replication.factor min.insync.replicas 1。设置 unclean.leader.election.enable false 它控制的是哪些Broker有资格竞选分区的Leader当 leader 副本发生故障时就不会从 follower 副本中和 leader 同步程度达不到要求的副本中选择出 leader 这样降低了消息丢失的可能性。 Kafka 如何保证消息不重复消费 kafka 出现消息重复消费的原因 服务端侧已经消费的数据没有成功提交 offset根本原因例如consumer 在消费过程中应用进程被强制kill掉或发生异常退出。Kafka 侧由于服务端处理业务时间长或者网络链接等等原因让 Kafka 认为服务假死触发了分区 rebalance。 举例单次拉取11条消息每条消息耗时30s11条消息耗时5分钟30秒由于max.poll.interval.ms 默认值5分钟所以消费者无法在5分钟内消费完consumer会离开组导致rebalance。 在消费完11条消息后consumer会重新连接broker再次rebalance因为上次消费的offset未提交再次拉取的消息是之前消费过的消息造成重复消费。 解决方案 消费消息服务做幂等校验比如 Redis 的 set、MySQL 的主键等天然的幂等功能。这种方法最有效。提高消费能力提高单条消息的处理速度根据实际场景可讲max.poll.interval.ms值设置大一点避免不必要的rebalance可适当减小max.poll.records的值默认值是500可根据实际消息速率适当调小。将 enable.auto.commit 参数设置为 false关闭自动提交开发者在代码中手动提交 offset。那么这里会有个问题什么时候提交 offset 合适 处理完消息再提交依旧有消息重复消费的风险和自动提交一样拉取到消息即提交会有消息丢失的风险。允许消息延时的场景会采用这种方式通过定时任务在业务不繁忙的时候做数据兜底。 Kafka 重试机制 消费失败会怎么样 生产者代码 for (int i 0; i 10; i) {kafkaTemplate.send(KafkaConst.TEST_TOPIC, String.valueOf(i))}消费者消代码 KafkaListener(topics {KafkaConst.TEST_TOPIC},groupId apple)private void customer(String message) throws InterruptedException {log.info(kafka customer:{},message);Integer n Integer.parseInt(message);if (n % 50){throw new RuntimeException();}}在默认配置下当消费异常会进行重试重试多次后会跳过当前消息继续进行后续消息的消费不会一直卡在当前消息。 默认会重试多少次 默认配置下消费异常会进行重试。看源码 FailedRecordTracker 类有个 recovered 函数返回 Boolean 值判断是否要进行重试下面是这个函数中判断是否重试的逻辑 Overridepublic boolean recovered(ConsumerRecord ? , ? record, Exception exception,Nullable MessageListenerContainer container,Nullable Consumer ? , ? consumer) throws InterruptedException {if (this.noRetries) {// 不支持重试attemptRecovery(record, exception, null, consumer);return true;}// 取已经失败的消费记录集合Map TopicPartition, FailedRecord map this.failures.get();if (map null) {this.failures.set(new HashMap ());map this.failures.get();}// 获取消费记录所在的Topic和PartitionTopicPartition topicPartition new TopicPartition(record.topic(), record.partition());FailedRecord failedRecord getFailedRecordInstance(record, exception, map, topicPartition);// 通知注册的重试监听器消息投递失败this.retryListeners.forEach(rl - rl.failedDelivery(record, exception, failedRecord.getDeliveryAttempts().get()));// 获取下一次重试的时间间隔long nextBackOff failedRecord.getBackOffExecution().nextBackOff();if (nextBackOff ! BackOffExecution.STOP) {this.backOffHandler.onNextBackOff(container, exception, nextBackOff);return false;} else {attemptRecovery(record, exception, topicPartition, consumer);map.remove(topicPartition);if (map.isEmpty()) {this.failures.remove();}return true;}}其中 BackOffExecution.STOP 的值为 -1。 FunctionalInterface public interface BackOffExecution {long STOP -1;long nextBackOff();}nextBackOff 的值调用 BackOff 类的 nextBackOff() 函数。如果当前执行次数大于最大执行次数则返回 STOP既超过这个最大执行次数后才会停止重试。 public long nextBackOff() {this.currentAttempts;if (this.currentAttempts getMaxAttempts()) {return getInterval();}else {return STOP;} }那么这个 getMaxAttempts 的值又是多少呢回到最开始当执行出错会进入 DefaultErrorHandler 。DefaultErrorHandler 默认的构造函数是 public DefaultErrorHandler() {this(null, SeekUtils.DEFAULT_BACK_OFF); }SeekUtils.DEFAULT_BACK_OFF 定义的是: public static final int DEFAULT_MAX_FAILURES 10;public static final FixedBackOff DEFAULT_BACK_OFF new FixedBackOff(0, DEFAULT_MAX_FAILURES - 1);DEFAULT_MAX_FAILURES 的值是 10currentAttempts 从 0 到 9所以总共会执行 10 次每次重试的时间间隔为 0。 最后简单总结一下Kafka 消费者在默认配置下会进行最多 10 次 的重试每次重试的时间间隔为 0即立即进行重试。如果在 10 次重试后仍然无法成功消费消息则不再进行重试消息将被视为消费失败 自定义重试次数以及时间间隔 自定义重试次数以及时间间隔只需要在 DefaultErrorHandler 初始化的时候传入自定义的 FixedBackOff 即可。重新实现一个 KafkaListenerContainerFactory 调用 setCommonErrorHandler 设置新的自定义的错误处理器就可以实现。 Bean public KafkaListenerContainerFactory kafkaListenerContainerFactory(ConsumerFactoryString, String consumerFactory) {ConcurrentKafkaListenerContainerFactory factory new ConcurrentKafkaListenerContainerFactory();// 自定义重试时间间隔以及次数FixedBackOff fixedBackOff new FixedBackOff(1000, 5);factory.setCommonErrorHandler(new DefaultErrorHandler(fixedBackOff));factory.setConsumerFactory(consumerFactory);return factory; }在重试失败后进行告警 自定义重试失败后逻辑需要手动实现以下是一个简单的例子重写 DefaultErrorHandler 的 handleRemaining 函数加上自定义的告警等操作。 Slf4j public class DelErrorHandler extends DefaultErrorHandler {public DelErrorHandler(FixedBackOff backOff) {super(null,backOff);}Overridepublic void handleRemaining(Exception thrownException, ListConsumerRecord?, ? records, Consumer?, ? consumer, MessageListenerContainer container) {super.handleRemaining(thrownException, records, consumer, container);log.info(重试多次失败);// 自定义操作} }DefaultErrorHandler 只是默认的一个错误处理器Spring Kafka 还提供了 CommonErrorHandler 接口。手动实现 CommonErrorHandler 就可以实现更多的自定义操作有很高的灵活性。例如根据不同的错误类型实现不同的重试逻辑以及业务逻辑等。 重试失败后的数据如何再次处理? 死信队列Dead Letter Queue简称 DLQ 是消息中间件中的一种特殊队列。它主要用于处理无法被消费者正确处理的消息通常是因为消息格式错误、处理失败、消费超时等情况导致的消息被丢弃或死亡的情况。 当消息进入队列后如果超过一定的重试次数仍无法被成功处理消息可以发送到死信队列中而不是被永久性地丢弃。 RetryableTopic 是 Spring Kafka 中的一个注解它用于配置某个 Topic 支持消息重试更推荐使用这个注解来完成重试。 // 重试 5 次重试间隔 100 毫秒,最大间隔 1 秒 RetryableTopic(attempts 5,backoff Backoff(delay 100, maxDelay 1000) ) KafkaListener(topics {KafkaConst.TEST_TOPIC}, groupId apple) private void customer(String message) {log.info(kafka customer:{}, message);Integer n Integer.parseInt(message);if (n % 5 0) {throw new RuntimeException();}System.out.println(n); }当达到最大重试次数后如果仍然无法成功处理消息消息会被发送到对应的死信队列中。对于死信队列的处理既可以用 DltHandler 处理也可以使用 KafkaListener 重新消费。 高性能原因 顺序读写 kafka的消息是不断追加到文件中的这个特性使kafka可以充分利用磁盘的顺序读写性能。 顺序读写不需要硬盘磁头的寻道时间只需很少的扇区旋转时间所以速度远快于随机读写。 Kafka 可以配置异步刷盘不开启同步刷盘异步刷盘不需要等写入磁盘后返回消息投递的 ACK所以它提高了消息发送的吞吐量降低了请求的延时。 零拷贝 传统的 IO 流程需要先把数据拷贝到内核缓冲区再从内核缓冲拷贝到用户空间应用程序处理完成以后再拷贝回内核缓冲区。 这个过程中发生了多次数据拷贝为了减少不必要的拷贝Kafka 依赖 Linux 内核提供的 Sendfile 系统调用。 在 Sendfile 方法中数据在内核缓冲区完成输入和输出不需要拷贝到用户空间处理这也就避免了重复的数据拷贝。 在具体的操作中Kafka 把所有的消息都存放在单独的文件里在消息投递时直接通过 Sendfile 方法发送文件减少了上下文切换因此大大提高了性能。 MMAP技术 除了 Sendfile 之外还有一种零拷贝的实现技术即 Memory Mapped Files。 Kafka 使用 Memory Mapped Files 完成内存映射Memory Mapped Files 对文件的操作不是 write/read而是直接对内存地址的操作如果是调用文件的 read 操作则把数据先读取到内核空间中然后再复制到用户空间但 MMAP可以将文件直接映射到用户态的内存空间省去了用户空间到内核空间复制的开销。 Producer生产的数据持久化到broker采用mmap文件映射实现顺序的快速写入。 Consumer从broker读取数据采用sendfile将磁盘文件读到OS内核缓冲区后直接转到socket buffer进行网络发送。 批量发送读取 Kafka 的批量包括批量写入、批量发布等它在消息投递时会将消息缓存起来然后批量发送。 同样消费端在消费消息时也不是一条一条处理的而是批量进行拉取提高了消息的处理速度。 数据压缩 Kafka还支持对消息集合进行压缩Producer可以通过GZIP或Snappy格式对消息集合进行压缩来减少传输的数据量减轻对网络传输的压力。 Producer压缩之后在Consumer需进行解压虽然增加了CPU的工作但在对大数据处理上瓶颈在网络上而不是CPU所以这个成本很值得。 分区机制 kafka中的topic中的内容可以被分为多partition存在每个partition又分为多个段segment所以每次操作都是针对一小部分做操作增加并行操作的能力。 常用参数 broker端配置 broker.id 每个 kafka broker 都有一个唯一的标识来表示这个唯一的标识符即是 broker.id它的默认值是 0。 这个值在 kafka 集群中必须是唯一的这个值可以任意设定 port 如果使用配置样本来启动 kafka它会监听 9092 端口修改 port 配置参数可以把它设置成任意的端口。 要注意如果使用 1024 以下的端口需要使用 root 权限启动 kakfa。 zookeeper.connect 用于保存 broker 元数据的 Zookeeper 地址是通过 zookeeper.connect 来指定的。 比如可以这么指定 localhost:2181 表示这个 Zookeeper 是运行在本地 2181 端口上的。 也可以通过 zk1:2181,zk2:2181,zk3:2181 来指定 zookeeper.connect 的多个参数值。 该配置参数是用冒号分割的一组 hostname:port/path 列表其含义如下 hostname 是 Zookeeper 服务器的机器名或者 ip 地址。port 是 Zookeeper 客户端的端口号/path 是可选择的 Zookeeper 路径Kafka 路径是使用了 chroot 环境如果不指定默认使用跟路径。 如果你有两套 Kafka 集群假设分别叫它们 kafka1 和 kafka2那么两套集群的zookeeper.connect参数可以这样指定zk1:2181,zk2:2181,zk3:2181/kafka1和zk1:2181,zk2:2181,zk3:2181/kafka2 log.dirs Kafka 把所有的消息都保存到磁盘上存放这些日志片段的目录是通过 log.dirs 来制定的它是用一组逗号来分割的本地系统路径log.dirs 是没有默认值的必须手动指定他的默认值。 还有一个参数是 log.dir这个配置是没有 s 的默认情况下只用配置 log.dirs 就好了比如可以通过 /home/kafka1,/home/kafka2,/home/kafka3 这样来配置这个参数的值。 auto.create.topics.enable 默认情况下kafka 会自动创建主题auto.create.topics.enable参数建议最好设置成 false即不允许自动创建 Topic。 主题相关配置 num.partitions num.partitions 参数指定了新创建的主题需要包含多少个分区该参数的默认值是 1。 default.replication.factor 表示 kafka保存消息的副本数。 log.retention.ms Kafka 通常根据时间来决定数据可以保留多久。默认使用log.retention.hours参数来配置时间默认是 168 个小时即一周。 除此之外还有两个参数log.retention.minutes 和log.retentiion.ms 。 这三个参数作用是一样的都是决定消息多久以后被删除推荐使用log.retention.ms。 message.max.bytes broker 通过设置message.max.bytes参数来限制单个消息的大小默认是 1000 000 也就是 1MB如果生产者尝试发送的消息超过这个大小不仅消息不会被接收还会收到 broker 返回的错误消息。 retention.ms 规定了该主题消息被保存的时常默认是7天即该主题只能保存7天的消息一旦设置了这个值它会覆盖掉 Broker 端的全局参数值。 常见面试题 Kafka是Push还是Pull模式 Kafka遵循了一种大部分消息系统共同的传统的设计producer将消息推送到brokerconsumer从broker拉取消息。 push模式由broker决定消息推送的速率对于不同消费速率的consumer就不太好处理了。 消息系统都致力于让consumer以最大的速率最快速的消费消息push模式下当broker推送的速率远大于consumer消费的速率时consumer处理会有问题。 Kafka中的Producer和Consumer采用的是Push-and-Pull模式即Producer向Broker Push消息Consumer从Broker Pull消息。 Pull有个缺点是如果broker没有可供消费的消息将导致consumer不断在循环中轮询直到新消息到达。 Kafka如何保证高可用? 面试题Kafka如何保证高可用有图有真相 Kafka的使用场景 异步通信 消息中间件在异步通信中用的最多很多业务流程中如果所有步骤都同步进行可能会导致核心流程耗时非常长更重要的是所有步骤都同步进行一旦非核心步骤失败会导致核心流程整体失败因此在很多业务流程中Kafka就充当了异步通信角色。 日志同步 大规模分布式系统中的机器非常多而且分散在不同机房中分布式系统带来的一个明显问题就是业务日志的查看、追踪和分析等行为变得十分困难对于集群规模在百台以上的系统查询线上日志很恐怖。 为了应对这种场景统一日志系统应运而生日志数据都是海量数据通常为了不给系统带来额外负担一般会采用异步上报这里Kafka以其高吞吐量在日志处理中得到了很好的应用。 实时计算 随着据量的增加离线的计算会越来越慢难以满足用户在某些场景下的实时性要求因此很多解决方案中引入了实时计算。 很多时候即使是海量数据我们也希望即时去查看一些数据指标实时流计算应运而生。 实时流计算有两个特点一个是实时随时可以看数据另一个是流。 kafka的作用 缓冲和削峰上游数据时有突发流量下游可能扛不住或者下游没有足够多的机器来保证冗余kafka在中间可以起到一个缓冲的作用把消息暂存在kafka中下游服务就可以按照自己的节奏进行慢慢处理。 解耦和扩展性消息队列可以作为一个接口层解耦重要的业务流程。只需要遵守约定针对数据编程即可获取扩展能力。 冗余可以采用一对多的方式一个生产者发布消息可以被多个订阅topic的服务消费到供多个毫无关联的业务使用。 健壮性消息队列可以堆积请求所以消费端业务即使短时间死掉也不会影响主要业务的正常进行。 异步通信很多时候用户不想也不需要立即处理消息。消息队列提供了异步处理机制允许用户把一个消息放入队列但并不立即处理它。想向队列中放入多少消息就放多少然后在需要的时候再去处理它们。 Kafka消费过的消息如何再消费 kafka消费消息的offset是定义在zookeeper中的 如果想重复消费kafka的消息可以在redis中自己记录offset的checkpoint点n个当想重复消费消息时通过读取redis中的checkpoint点进行zookeeper的offset重设这样就可以达到重复消费消息的目的了 kafka的数据是放在磁盘上还是内存上为什么速度会快 kafka使用的是磁盘存储。 速度快是因为 顺序写入因为硬盘是机械结构每次读写都会寻址-写入其中寻址是一个“机械动作”它是耗时的。所以硬盘 “讨厌”随机I/O 喜欢顺序I/O。为了提高读写硬盘的速度Kafka就是使用顺序I/O。Memory Mapped Files内存映射文件64位操作系统中一般可以表示20G的数据文件它的工作原理是直接利用操作系统的Page来实现文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上。Kafka高效文件存储设计 Kafka把topic中一个parition大文件分成多个小文件段通过多个小文件段就容易定期清除或删除已经消费完文件减少磁盘占用。 通过索引信息可以快速定位 message和确定response的大小。通过index元数据全部映射到memory内存映射文件 可以避免segment file的IO磁盘操作。通过索引文件稀疏存储可以大幅降低index文件元数据占用空间大小。 注 Kafka解决查询效率的手段之一是将数据文件分段比如有100条Message它们的offset是从0到99。假设将数据文件分成5段第一段为0-19第二段为20-39以此类推每段放在一个单独的数据文件里面数据文件以该段中小的offset命名。这样在查找指定offset的Message的时候用二分查找就可以定位到该Message在哪个段中。为数据文件建索引虽然数据文件分段使得可以在一个较小的数据文件中查找对应offset的Message但是这依然需要顺序扫描才能找到对应offset的Message。 为了进一步提高查找的效率Kafka为每个分段后的数据文件建立了索引文件文件名与数据文件的名字是一样的只是文件扩展名为.index。 Kafka数据怎么保障不丢失 参考上面章节 kafka 重启是否会导致数据丢失 kafka是将数据写到磁盘的一般数据不会丢失。 但是在重启kafka过程中如果有消费者消费消息那么kafka如果来不及提交offset可能会造成数据的不准确丢失或者重复消费。 kafka 宕机了如何解决 kafka 宕机了首先考虑的问题是所提供的服务是否因为宕机的机器而受到影响如果服务提供没问题事前做好了集群的容灾机制那么就不用担心了。 想要恢复集群的节点主要的步骤就是通过日志分析来查看节点宕机的原因从而解决并重新恢复节点。 为什么Kafka不支持读写分离 在 Kafka 中生产者写入消息、消费者读取消息的操作都是与 leader 副本进行交互的从而实现的是一种主写主读的生产消费模型。 Kafka 并不支持主写从读因为主写从读有 2 个很明显的缺点: 数据一致性问题数据从主节点转到从节点必然会有一个延时的时间窗口这个时间窗口会导致主从节点之间的数据不一致。延时问题类似 Redis 这种组件数据从写入主节点到同步至从节点中的过程需要经历 网络→主节点内存→网络→从节点内存 这几个阶段整个过程会耗费一定的时间。而在 Kafka 中主从同步会比 Redis 更加耗时它需要经历 网络→主节点内存→主节点磁盘→网络→从节 点内存→从节点磁盘 这几个阶段。对延时敏感的应用而言主写从读的功能并不太适用。 而kafka的主写主读的优点就很多了 可以简化代码的实现逻辑减少出错的可能;将负载粒度细化均摊与主写从读相比不仅负载效能更好而且对用户可控;没有延时的影响;在副本稳定的情况下不会出现数据不一致的情况。 kafka数据分区和消费者的关系 每个分区只能由同一个消费组内的一个消费者(consumer)来消费可以由不同的消费组的消费者来消费同组的消费者则起到并发的效果。 kafka的数据offset读取流程 连接ZK集群从ZK中拿到对应topic的partition信息和partition的Leader的相关信息连接到对应Leader对应的brokerconsumer将⾃自⼰己保存的offset发送给LeaderLeader根据offset等信息定位到segment索引⽂文件和⽇日志⽂文件根据索引⽂文件中的内容定位到⽇日志⽂文件中该偏移量量对应的开始位置读取相应⻓长度的数据并返回给consumer。 kafka内部如何保证顺序结合外部组件如何保证消费者的顺序 参考上面章节 Kafka消息数据积压Kafka消费能力不足怎么处理 如果是Kafka消费能力不足则可以考虑增加Topic的分区数并且同时提升消费组的消费者数量消费者数分区数。两者缺一不可如果是下游的数据处理不及时提高每批次拉取的数量。批次拉取数据过少拉取数据/处理时间生产速度使处理的数据小于生产的数据也会造成数据积压。 Kafka单条日志传输大小 kafka对于消息体的大小默认为单条最大值是1M但是在我们应用场景中, 常常会出现一条消息大于1M如果不对kafka进行配置。则会出现生产者无法将消息推送到kafka或消费者无法去消费kafka里面的数据, 这时我们就要以下参数进行配置 replica.fetch.max.bytes: 1048576 broker可复制的消息的最大字节数, 默认为1M message.max.bytes: 1000012 kafka 会接收单个消息size的最大限制 默认为1M左右注意message.max.bytes必须小于等于replica.fetch.max.bytes否则就会导致replica之间数据同步失败。 参考 四万字32图Kafka知识体系保姆级教程宝典 Kafka常见问题总结 Kafka核心知识点大梳理
http://www.zqtcl.cn/news/298824/

相关文章:

  • 奥迪网站建设策划书wordpress取消评论审核
  • 无锡百度正规公司专业seo网站优化推广排名教程
  • 湖南城乡建设厅网站青岛网站推广招商
  • 网站备案信息加到哪里国际要闻军事新闻
  • 商河县做网站公司如何仿制国外网站
  • 网站如何跟域名绑定唐山正规做网站的公司哪家好
  • 网站建设wang.cdwordpress文章链接插件
  • 本地进wordpress后台搜索优化师
  • 网站备案证书下载失败法国 wordpress
  • 海南平台网站建设企业优秀的设计案例
  • 拿别的公司名字做网站合肥网页设计培训班
  • 到哪个网站做任务太原百度seo优化推广
  • 北京外贸网站开发广东智慧团建系统入口
  • 做百度网站接到多少客户电话阿里云服务器win系统建站教程
  • 天空在线网站建设深圳外贸网站怎么建
  • 网站的交流的功能怎么做小商品网站建设
  • 求职招聘网站建设投标书怎样在手机上面建设网站
  • 重庆工厂网站建设备案域名出售平台
  • 免费网站优化校园电商平台网站建设
  • 宁波市住房和城乡建设局网站成都网站建设网站制作
  • 网站制作还花钱建设银行网站查询密码是啥
  • 周到的做pc端网站产品图册设计公司
  • 淘宝客新增网站网页设计板式类型
  • 怎么使用wordpress建站吃什么补肾气效果好
  • 建设网站中期wordpress做分类信息网站
  • 百色住房和城乡建设部网站江苏交通建设监理协会网站
  • 常州网站建设哪儿好薇有哪些做外贸网站
  • ip域名找网站一级域名和二级域名的区别
  • 手机网站 底部菜单网站切换效果
  • 珠海公司做网站wordpress最近访客