当前位置: 首页 > news >正文

做背景图 网站安装网站模版视频

做背景图 网站,安装网站模版视频,东莞推广宣传短视频,wordpress 情侣博客一、场景分析Kafka采用的是主写主读的方式#xff0c;即客户端的读写请求都由分区的Leader副本处理#xff0c;那么Follower副本要想保证和Leader副本数据一致#xff0c;就需要不断地从Leader副本拉取消息来进行同步。由于同一个分区的Leader副本和Follower副本分布在不同的… 一、场景分析Kafka采用的是主写主读的方式即客户端的读写请求都由分区的Leader副本处理那么Follower副本要想保证和Leader副本数据一致就需要不断地从Leader副本拉取消息来进行同步。由于同一个分区的Leader副本和Follower副本分布在不同的节点上所以同步的过程可以简单概括为Follower副本所在节点封装拉取数据的请求并发送给Leader副本所在节点 → Leader副本所在节点接收拉取数据的请求并进行处理然后返回响应 → Follower副本所在节点接收到返回的响应并进行处理。这个过程中封装拉取请求和处理返回的响应是Follower副本所在节点的一个单独的线程完成的。二、图示说明    假设某主题只有1个分区该分区有两个副本Leader 副本在 Broker1 上Follower 副本在 Broker2 上其 Leader 副本写入数据和 Follower 副本同步数据的流程如下图三、源码分析Kafka分区的Leader副本接收客户端生产的数据写入本地存储然后Follower副本拉取数据写入本地存储并更新一系列关键的偏移量。整个流程比较复杂这里先通过一个简单的方法调用流程来看一下这个过程1.leader 副本将数据写入本地磁盘 KafkaApis.handleProduceRequest(){ replicaManager.appendRecords(){ appendToLocalLog(){ Partition.appendRecordsToLeader(){ Log.appendAsLeader(){ Log.append(){ //通过LogSegment.append()方法写入磁盘 LogSegment.append() } } } } } }2.leader 副本更新LEO KafkaApis.handleProduceRequest(){ replicaManager.appendRecords(){ appendToLocalLog(){ Partition.appendRecordsToLeader(){ Log.appendAsLeader(){ Log.append(){ //更新Leader副本的LEO值 updateLogEndOffset(appendInfo.lastOffset 1) } } } } } }3.follower 副本同步数据携带自身的LEO AbstractFetchThread.doWork(){ maybeFetch(){ buildFetch(fetchStates){ //这里的fetchState.fetchOffset 就是Follower副本的LEO值 builder.add(topicPartition, new FetchRequest.PartitionData( fetchState.fetchOffset, logStartOffset, fetchSize, Optional.of(fetchState.currentLeaderEpoch))) } } }4.leader 副本更新本地保存的Follower副本的LEO ReplicaManager.fetchMessages(){ //获取读取结果 val logReadResults readFromLog(){ if (isFromFollower) updateFollowerLogReadResults(replicaId, result){ //TODO 更新leader保存的各个follower副本的LEO partition.updateReplicaLogReadResult(replica, readResult){ //TODO 最终更新所有的replica的LEO的值 replica.updateLogReadResult(logReadResult){ //更新LEO对象 logEndOffsetMetadata logReadResult.info.fetchOffsetMetadata } } } } }5.leader 副本尝试更新ISR列表 ReplicaManager.fetchMessages(){ //获取读取结果 val logReadResults readFromLog(){ if (isFromFollower) updateFollowerLogReadResults(replicaId, result){ //TODO 尝试更新ISR列表 val leaderHWIncremented maybeExpandIsr(replicaId, logReadResult){ //更新ISR列表 updateIsr(newInSyncReplicas) } } } }6.leader 副本更新HW ReplicaManager.fetchMessages(){ //获取读取结果 val logReadResults readFromLog(){ if (isFromFollower) updateFollowerLogReadResults(replicaId, result){ //TODO 尝试更新ISR列表及Leader副本的HW val leaderHWIncremented maybeExpandIsr(replicaId, logReadResult){ //TODO 尝试更新leader的HW maybeIncrementLeaderHW(leaderReplica, logReadResult.fetchTimeMs){ //取ISR列表中副本的最小的LEO作为新的HW val newHighWatermark allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering) //获取旧的HW val oldHighWatermark leaderReplica.highWatermark //如果新的HW值大于旧的HW值就更新 if (oldHighWatermark.messageOffset newHighWatermark.messageOffset || (oldHighWatermark.messageOffset newHighWatermark.messageOffset oldHighWatermark.onOlderSegment(newHighWatermark))) { //更新 Leader 副本的 HW                            leaderReplica.highWatermark  newHighWatermark } } } } } }7.leader 副本给 follower副本 返回数据携带leader 副本的 HW 值 ReplicaManager.fetchMessages(){ //获取读取结果 val logReadResults readFromLog(){ readFromLocalLog(){ read(){ val readInfo partition.readRecords(){ //获取Leader Replica的高水位 val initialHighWatermark localReplica.highWatermark.messageOffset } } } } }8.follower 副本写入数据更新自身LEO、 ReplicaFetcherThread.processPartitionData(){ partition.appendRecordsToFollowerOrFutureReplica(records, isFuture false){ doAppendRecordsToFollowerOrFutureReplica(){ Log.appendAsFollower(){ Log.append(){ //更新Follower副本的LEO值 updateLogEndOffset(appendInfo.lastOffset 1) } } } } }9.follower 副本更新本地的 HW 值 ReplicaFetcherThread.processPartitionData(){ //根据leader返回的HW更新Follower本地的HW取Follower本地LEO 和 Leader HW 的较小值 val followerHighWatermark replica.logEndOffset.min(partitionData.highWatermark) //TODO 更新Follower副本的 HW 对象 replica.highWatermark new LogOffsetMetadata(followerHighWatermark)  }注意对于HWLeader 副本和 Follower 副本只保存自身的对于LEOFollower 副本只保存自身的但是 Leader 副本除了保存自身的外还会保存所有 Follower 副本的 LEO 值无论是Leader副本所在节点还是Follower副本所在节点分区对应的Partition 对象都会保存所有的副本对象但是只有本地副本对象有对应的日志文件整个数据写入及同步的过程分为九个步骤leader 副本将数据写入本地磁盘leader 副本更新 LEOfollower 副本发送同步数据请求携带自身的 LEOleader 副本更新本地保存的其它副本的 LEOleader 副本尝试更新 ISR 列表leader 副本更新 HWleader 副本给 follower 副本返回数据携带 leader 副本的 HW 值follower 副本接收响应并写入数据更新自身 LEOfollower 副本更新本地的 HW 值   下面具体分析这几个步骤。第一、二步在分析日志对象的写数据流程时已经详细介绍过这里不再赘述(《深入理解Kafka服务端之日志对象的读写数据流程》)。 对于后面的几个步骤由于发生在不同的节点上并没有按照这个顺序进行分析而是分成了Follower副本的相关操作即 第三步、第八步、第九步Leader副本的相关操作即 第四步、第五步、第六步、第七步上面提到Follower副本拉取数据是通过一个单独的线程完成的所以在分析这几个步骤之前先看一下这个线程相关的类抽象类AbstractFetcherThread实现类ReplicaFetcherThread先看一下 AbstractFetcherThread 类的定义abstract class AbstractFetcherThread(name: String,//线程名称 clientId: String,//Cliend ID用于日志输出 val sourceBroker: BrokerEndPoint,//数据源Broker地址 failedPartitions: FailedPartitions,//线程处理过程报错的分区集合 fetchBackOffMs: Int 0,//拉取的重试间隔默认是 Broker 端参数 replica.fetch.backoff.ms 值。 isInterruptible: Boolean true)//是否允许线程中断 extends ShutdownableThread(name, isInterruptible) { type FetchData FetchResponse.PartitionData[Records] type EpochData OffsetsForLeaderEpochRequest.PartitionData //泛型 PartitionFetchState表征分区读取状态包含已读取偏移量和对应的副本读取状态 //副本状态由 ReplicaState 接口定义包含 读取中 和 截断中 两个 private val partitionStates new PartitionStates[PartitionFetchState] ...}其中type 的用法是给指定的类起一个别名如type FetchData FetchResponse.PartitionData[Records]后面就可以用 FetchData 来表示 FetchResponse.PartitionData[Records] 类EpochData 同理。    FetchResponse.PartitionDataFetchResponse是封装的FETCH请求的响应类PartitionData是一个嵌套类表示响应中单个分区的拉取信息包括对应Leader副本的高水位分区日志的起始偏移量拉取到的消息集合等。public static final class PartitionDataT extends BaseRecords { public final Errors error;//错误码 public final long highWatermark;//从Leader返回的分区的高水位值 public final long lastStableOffset;// 最新LSO值 public final long logStartOffset;//日志起始偏移量 public final Optional preferredReadReplica;// 期望的Read ReplicaKAFKA 2.4之后支持部分Follower副本可以对外提供读服务 public final List abortedTransactions;// 该分区对应的已终止事务列表 public final T records;//消息集合}OffsetsForLeaderEpochRequest.PartitionData里面包含了Follower副本在本地保存的leader epoch 和从Leader副本获取到的leader epochpublic static class PartitionData { public final Optional currentLeaderEpoch; public final int leaderEpoch;}分区读取的状态    PartitionFetchState样例类用来表征分区的读取状态。包含已拉取的偏移量当前leader的epoch副本读取状态等case class PartitionFetchState(fetchOffset: Long,//已拉取的偏移量 currentLeaderEpoch: Int,//当前epoch delay: DelayedItem, state: ReplicaState//副本读取状态 ) { //表征分区的读取状态 //1.可拉取表明副本获取线程当前能够读取数据。判断条件是副本处于Fetching且未被推迟执行 def isReadyForFetch: Boolean state Fetching !isDelayed //2.截断中表明分区副本正在执行截断操作(比如该副本刚刚成为 Follower 副本)。判断条件是副本处于Truncating且未被推迟执行 def isTruncating: Boolean state Truncating !isDelayed //3.被推迟表明副本获取线程获取数据时出现错误需要等待一段时间后重试。判断条件是存在未过期的延迟任务 def isDelayed: Boolean delay.getDelay(TimeUnit.MILLISECONDS) 0}分区读取状态分为三种isReadyForFetch可拉取表明副本获取线程当前能够读取数据。判断条件是副本处于Fetching且未被推迟执行isTruncating截断中表明分区副本正在执行截断操作(比如该副本刚刚成为 Follower 副本)。判断条件是副本处于Truncating且未被推迟执行isDelayed被推迟表明副本获取线程获取数据时出现错误需要等待一段时间后重试。判断条件是存在未过期的延迟任务副本读取的状态    ReplicaState特质用来表征副本读取状态。sealed trait ReplicaState//截断中case object Truncating extends ReplicaState//拉取中case object Fetching extends ReplicaState副本读取状态分为两种Truncating截断中Fetching拉取中对应上面的拉取数据流程AbstractFetchThread定义了相关的方法buildFetch封装拉取数据的请求truncate进行日志截断processPartitionData处理返回的响应doWork将上面定义的三个方法串联起来形成一个闭环并不断地重复执行。从而实现从Leader副本所在的节点同步消息在 AbstractFetchThread 中前三个定义的都是抽象方法具体的方法实现在其实现类 ReplicaFetcherThread其定义如下class ReplicaFetcherThread(name: String, fetcherId: Int,//Follower 拉取的线程 Id也就是线程的编号。 // 单台 Broker 上允许存在多个 ReplicaFetcherThread 线程。 // Broker 端参数 num.replica.fetchers决定了 Kafka 到底创建多少个 Follower 拉取线程。 sourceBroker: BrokerEndPoint, brokerConfig: KafkaConfig,//服务端配置类用来获取配置信息 failedPartitions: FailedPartitions, replicaMgr: ReplicaManager,//副本管理器。该线程类通过副本管理器来获取分区对象、副本对象以及它们下面的日志对象。 metrics: Metrics, time: Time, quota: ReplicaQuota,//用做限流。作用是控制 Follower 副本拉取速度 leaderEndpointBlockingSend: Option[BlockingSend] None//用于实现同步发送请求的类。 // 所谓的同步发送是指该线程使用它给指定 Broker 发送请求然后线程处于阻塞状态直到接收到 Broker 返回的 Response。 )extends AbstractFetcherThread( name name, clientId name, sourceBroker sourceBroker, failedPartitions, fetchBackOffMs brokerConfig.replicaFetchBackoffMs, isInterruptible false) { //Follower副本所在Broker的Id private val replicaId brokerConfig.brokerId //用于执行请求发送的类 private val leaderEndpoint leaderEndpointBlockingSend.getOrElse( new ReplicaFetcherBlockingSend(sourceBroker, brokerConfig, metrics, time, fetcherId, sbroker-$replicaId-fetcher-$fetcherId, logContext)) //Follower发送的FETCH请求被处理返回前的最长等待时间由参数replica.fetch.wait.max.ms 配置默认 500 毫秒 private val maxWait brokerConfig.replicaFetchWaitMaxMs //每个FETCH Response返回前必须要累积的最少字节数由参数replica.fetch.min.bytes 配置默认 1 字节 private val minBytes brokerConfig.replicaFetchMinBytes //每个合法FETCH Response的最大字节数由参数replica.fetch.response.max.bytes 配置默认 10 M private val maxBytes brokerConfig.replicaFetchResponseMaxBytes //单个分区能够获取到的最大字节数由参数replica.fetch.max.bytes 配置默认 1 M private val fetchSize brokerConfig.replicaFetchMaxBytes ...}buildFetch() 方法为指定分区集合构建对应的FetchRequest.Builder 对象而该对象是构建 FetchRequest 的核心组件。这个方法中有一个重要的操作封装拉取请求时携带了Follower副本的 LogStartOffset 和 LEO 值(对应同步数据的第三步) override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] { //定义一个保存出错分区的集合 val partitionsWithError mutable.Set[TopicPartition]() val builder fetchSessionHandler.newBuilder() // 遍历每个分区将处于可获取状态的分区添加到builder后续统一处理 // 对于有错误的分区加入到出错分区集合 partitionMap.foreach { case (topicPartition, fetchState) //如果分区的状态是可拉取的且该分区未对follower限流 if (fetchState.isReadyForFetch !shouldFollowerThrottle(quota, topicPartition)) { try {          //获取本地Follower副本保存的分区日志的logStartOffset val logStartOffset replicaMgr.localReplicaOrException(topicPartition).logStartOffset /**将分区和对应的PartitionData添加到builder注意这里的PartitionData对应的是拉取请求FetchRequest里面封装了拉取请求的元数据信息如 * fetchOffset拉取消息的起始偏移量也就是Follower副本的LEO * currentLeaderEpochFollower副本保存的leader epoch值 */ builder.add(topicPartition, new FetchRequest.PartitionData( fetchState.fetchOffset, logStartOffset, fetchSize, Optional.of(fetchState.currentLeaderEpoch))) } catch { case _: KafkaStorageException //如果有异常将该分区添加到出错分区的集合 partitionsWithError topicPartition } } } val fetchData builder.build() val fetchRequestOpt if (fetchData.sessionPartitions.isEmpty fetchData.toForget.isEmpty) { None } else { //构造FETCH请求的Builder对象 val requestBuilder FetchRequest.Builder .forReplica(fetchRequestVersion, replicaId, maxWait, minBytes, fetchData.toSend) .setMaxBytes(maxBytes) .toForget(fetchData.toForget) .metadata(fetchData.metadata) Some(requestBuilder) } //构建返回结果返回Builder对象以及出错分区列表 ResultWithPartitions(fetchRequestOpt, partitionsWithError) }truncate() 方法用于将指定分区的日志截断到指定的偏移量override def truncate(tp: TopicPartition, offsetTruncationState: OffsetTruncationState): Unit { //根据分区获取本地副本 val replica replicaMgr.localReplicaOrException(tp) val partition replicaMgr.getPartition(tp).get //调用Partition.truecateTo方法进行日志截断 // offsetTruncationState.offset要截断到的偏移量 partition.truncateTo(offsetTruncationState.offset, isFuture false) if (offsetTruncationState.offset replica.highWatermark.messageOffset) warn(sTruncating $tp to offset ${offsetTruncationState.offset} below high watermark s${replica.highWatermark.messageOffset}) if (offsetTruncationState.truncationCompleted) replicaMgr.replicaAlterLogDirsManager.markPartitionsForTruncation(brokerConfig.brokerId, tp, offsetTruncationState.offset)}    这个方法内部依次调用了Partition.truncateTo - LogManager.truncateTo - Log.truncateTo - LogSegment.truncateTo 进行日志截断操作processPartitionData方法用于处理指定分区从Leader副本所在节点返回的响应将获取的消息写入本地存储并返回写入消息的元数据这里有两个个重要的操作写入消息更新 Follower 副本的 LEO(对应同步数据的第八步)更新 Follower 副本本地的 HW 值(对应同步数据的第九步)override def processPartitionData(topicPartition: TopicPartition, // 拉取数据的分区 fetchOffset: Long, // 拉取的消息集合的起始位移 partitionData: FetchData // 读取到的分区消息数据 ): Option[LogAppendInfo] { // 返回值写入已读取消息数据前的元数据 //从副本管理器获取副本对象Replica val replica replicaMgr.localReplicaOrException(topicPartition) //从副本管理器获取指定主题分区对象 val partition replicaMgr.getPartition(topicPartition).get //将获取的消息封装成MemoryRecords val records toMemoryRecords(partitionData.records) //判断获取的消息集合是否超限 maybeWarnIfOversizedRecords(records, topicPartition) //如果获取消息的起始位移值不是本地日志LEO值则视为异常情况 if (fetchOffset ! replica.logEndOffset) throw new IllegalStateException(Offset mismatch for partition %s: fetched offset %d, log end offset %d..format( topicPartition, fetchOffset, replica.logEndOffset)) if (isTraceEnabled) trace(Follower has replica log end offset %d for partition %s. Received %d messages and leader hw %d .format(replica.logEndOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark)) //TODO 写入Follower副本本地日志更新自身的LEO val logAppendInfo partition.appendRecordsToFollowerOrFutureReplica(records, isFuture false) if (isTraceEnabled) trace(Follower has replica log end offset %d after appending %d bytes of messages for partition %s .format(replica.logEndOffset, records.sizeInBytes, topicPartition)) //根据leader返回的HW更新Follower本地的HW取Follower本地LEO 和 Leader HW 的较小值 val followerHighWatermark replica.logEndOffset.min(partitionData.highWatermark) //获取从leader返回的LogStartOffset val leaderLogStartOffset partitionData.logStartOffset //TODO 更新Follower副本的HW对象 replica.highWatermark new LogOffsetMetadata(followerHighWatermark) //尝试更新Follower副本的LogStartOffset replica.maybeIncrementLogStartOffset(leaderLogStartOffset) if (isTraceEnabled) trace(sFollower set replica high watermark for partition $topicPartition to $followerHighWatermark) // 副本消息拉取限流 if (quota.isThrottled(topicPartition)) quota.record(records.sizeInBytes) replicaMgr.brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes) //返回写入消息的元数据 logAppendInfo }AbstractFetchThread.doWork() 方法将上面的三个方法串联起来形成闭环达到 Follower 副本从 Leader 副本同步数据的目的。override def doWork() { //尝试日志截断 maybeTruncate() //尝试拉取数据 maybeFetch()}这个方法很简单只在内部调用了两个方法maybeTruncate()尝试进行日志截断private def maybeTruncate(): Unit { // 将所有处于截断中状态的分区依据有无Leader Epoch值进行分组 val (partitionsWithEpochs, partitionsWithoutEpochs) fetchTruncatingPartitions() // 对于有Leader Epoch值的分区将日志截断到Leader Epoch值对应的位移值处 if (partitionsWithEpochs.nonEmpty) { truncateToEpochEndOffsets(partitionsWithEpochs) } // 对于没有Leader Epoch值的分区将日志截断到高水位值处 if (partitionsWithoutEpochs.nonEmpty) { truncateToHighWatermark(partitionsWithoutEpochs) }}这里先看对于没有Leader Epoch的分区将日志截断到高水位处private[server] def truncateToHighWatermark(partitions: Set[TopicPartition]): Unit inLock(partitionMapLock) { val fetchOffsets mutable.HashMap.empty[TopicPartition, OffsetTruncationState] // 遍历每个要执行截断操作的分区对象 for (tp // 获取分区的分区读取状态 val partitionState partitionStates.stateValue(tp) if (partitionState ! null) { // 取出高水位值。 val highWatermark partitionState.fetchOffset //封装截断状态 val truncationState OffsetTruncationState(highWatermark, truncationCompleted true) info(sTruncating partition $tp to local high watermark $highWatermark) // 执行截断到高水位值 if (doTruncate(tp, truncationState)) //保存分区和对应的截取状态 fetchOffsets.put(tp, truncationState) } } // 更新这组分区的分区读取状态 updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets)}其中 doTruncate(tp, truncationState) 方法内部就调用了实现类ReplicaFetcherThread.truncate() 方法maybeFetch()尝试从Leader副本拉取数据private def maybeFetch(): Unit { //获取分区状态集合和对应的拉取请求的集合 val (fetchStates, fetchRequestOpt) inLock(partitionMapLock) { //获取要拉取消息的分区和分区对应状态的集合 val fetchStates partitionStates.partitionStateMap.asScala // TODO 第一步为集合中的分区构造FetchRequest.builder对象这里的返回结果有两个对象 //fetchRequestOpt要读取的分区核心信息 FetchRequest.Builder 对象。 // 而这里的核心信息就是指要读取哪个分区从哪个位置开始读最多读多少字节等等。 //partitionsWithError一组出错的分区 val ResultWithPartitions(fetchRequestOpt, partitionsWithError) buildFetch(fetchStates) //TODO 第二步处理出错的分区处理方式主要是将这个分区加入到有序Map末尾等待后续重试 handlePartitionsWithErrors(partitionsWithError, maybeFetch) // 如果当前没有可读取的分区则等待fetchBackOffMs时间等候后续重试 if (fetchRequestOpt.isEmpty) { trace(sThere are no active partitions. Back off for $fetchBackOffMs ms before sending a fetch request) partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) } (fetchStates, fetchRequestOpt) } //TODO 第三步遍历FETCH请求发送FETCH请求给Leader副本并处理Response fetchRequestOpt.foreach { fetchRequest processFetchRequest(fetchStates, fetchRequest) }}这个方法可以划分为关键的三个步骤a为集合中的分区构造FetchRequest.builder对象val ResultWithPartitions(fetchRequestOpt, partitionsWithError) buildFetch(fetchStates)这里调用了实现类ReplicaFetcherThread.buildFetch() 方法返回结果有两个对象fetchRequestOpt要读取的分区核心信息 FetchRequest.Builder 对象。而这里的核心信息就是指要读取哪个分区从哪个位置开始读最多读多少字节等等。partitionsWithError一组出错的分区b处理出错的分区。处理方式主要是将这个分区加入到有序Map末尾等待后续重试handlePartitionsWithErrors(partitionsWithError, maybeFetch)这个方法最后调用了PartitionStates.updateAndMoveToEnd() 方法其作用就是把给定的分区从数据结构的头部移除然后放到尾部从而达到轮询的目的//将给定的分区从map头部移除然后再加到尾部以达到轮询的目的//这里的LinkedHashMap对于插入元素是有顺序的加入插入顺序是abcde先读取了a// 为了保证公平性会将a从集合中先移除然后放到尾部那么下次就从b开始读public void updateAndMoveToEnd(TopicPartition topicPartition, S state) { map.remove(topicPartition); map.put(topicPartition, state); updateSize();}c遍历并发送FETCH请求给Leader副本然后处理ResponsefetchRequestOpt.foreach { fetchRequest processFetchRequest(fetchStates, fetchRequest)}private def processFetchRequest(fetchStates: Map[TopicPartition, PartitionFetchState], fetchRequest: FetchRequest.Builder): Unit { //定义出错分区的集合 val partitionsWithError mutable.Set[TopicPartition]() //定义接收响应数据的集合 var responseData: Seq[(TopicPartition, FetchData)] Seq.empty try { trace(sSending fetch request $fetchRequest) //给Leader发送FETCH请求获取响应数据 responseData fetchFromLeader(fetchRequest) } catch { case t: Throwable if (isRunning) { warn(sError in response for fetch request $fetchRequest, t) inLock(partitionMapLock) { partitionsWithError partitionStates.partitionSet.asScala partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) } } } //更新请求发送速率指标 fetcherStats.requestRate.mark() //如果接收到了响应 if (responseData.nonEmpty) { inLock(partitionMapLock) { //遍历响应结果中的分区和分区对应的数据 responseData.foreach { case (topicPartition, partitionData) Option(partitionStates.stateValue(topicPartition)).foreach { currentFetchState //获取分区对应的拉取状态 val fetchState fetchStates(topicPartition) // 处理Response的条件 // 1. 获取的消息集合的起始偏移量和之前已保存的下一条待写入偏移量相等 // 2. 当前分区处于可获取状态 if (fetchState.fetchOffset currentFetchState.fetchOffset currentFetchState.isReadyForFetch) { //获取请求中携带的Follower副本保存的 leader epoch 值 val requestEpoch if (fetchState.currentLeaderEpoch 0) Some(fetchState.currentLeaderEpoch) else None partitionData.error match { // 如果没有错误 case Errors.NONE try { // 交由子类完成Response的处理 val logAppendInfoOpt processPartitionData(topicPartition, currentFetchState.fetchOffset, partitionData) logAppendInfoOpt.foreach { logAppendInfo val validBytes logAppendInfo.validBytes val nextOffset if (validBytes 0) logAppendInfo.lastOffset 1 else currentFetchState.fetchOffset fetcherLagStats.getAndMaybePut(topicPartition).lag Math.max(0L, partitionData.highWatermark - nextOffset) if (validBytes 0 partitionStates.contains(topicPartition)) { val newFetchState PartitionFetchState(nextOffset, fetchState.currentLeaderEpoch, state Fetching) // 将该分区放置在有序Map读取顺序的末尾保证公平性 partitionStates.updateAndMoveToEnd(topicPartition, newFetchState) fetcherStats.byteRate.mark(validBytes) } } } catch { case ime: CorruptRecordException error(sFound invalid messages during fetch for partition $topicPartition soffset ${currentFetchState.fetchOffset}, ime) partitionsWithError topicPartition case e: KafkaStorageException error(sError while processing data for partition $topicPartition sat offset ${currentFetchState.fetchOffset}, e) markPartitionFailed(topicPartition) case t: Throwable error(sUnexpected error occurred while processing data for partition $topicPartition sat offset ${currentFetchState.fetchOffset}, t) markPartitionFailed(topicPartition) } // 如果读取位移值越界通常是因为Leader发生变更 case Errors.OFFSET_OUT_OF_RANGE //调整越界主要办法是做截断 if (handleOutOfRangeError(topicPartition, currentFetchState, requestEpoch)) //如果依然不能成功将该分区添加到出错分区集合 partitionsWithError topicPartition //如果Follower本地保存的Leader Epoch值比Leader所在Broker上的Epoch值要新 case Errors.UNKNOWN_LEADER_EPOCH debug(sRemote broker has a smaller leader epoch for partition $topicPartition than sthis replicas current leader epoch of ${fetchState.currentLeaderEpoch}.) // 加入到出错分区集合 partitionsWithError topicPartition // 如果Follower本地保存的Leader Epoch值比Leader所在Broker上的Epoch值要旧 case Errors.FENCED_LEADER_EPOCH //将该分区标记为失效从分区拉取状态集合中移除并加入到失效分区集合 if (onPartitionFenced(topicPartition, requestEpoch)) partitionsWithError topicPartition // 如果Leader发生变更 case Errors.NOT_LEADER_FOR_PARTITION debug(sRemote broker is not the leader for partition $topicPartition, which could indicate that the partition is being moved) // 加入到出错分区列表 partitionsWithError topicPartition case _ error(sError for partition $topicPartition at offset ${currentFetchState.fetchOffset}, partitionData.error.exception) // 加入到出错分区集合 partitionsWithError topicPartition } } } } } } // 处理出错分区集合主要就是将该分区放到map数据结构的末尾 if (partitionsWithError.nonEmpty) { handlePartitionsWithErrors(partitionsWithError, processFetchRequest) } }Leader 副本如何处理拉取数据的请求前面提到过发送给服务端的各种请求都是由KafkaApis类处理的处理FETCH请求的方法是handleFetchRequest()内部调用了ReplicaManager.fetchMessages() 方法def handleFetchRequest(request: RequestChannel.Request) { ... //TODO 这里是处理Follower Replica 拉取消息请求的具体方法 replicaManager.fetchMessages( fetchRequest.maxWait.toLong, fetchRequest.replicaId, fetchRequest.minBytes, fetchRequest.maxBytes, versionId 2, interesting, replicationQuota(fetchRequest), processResponseCallback, fetchRequest.isolationLevel) ...}fetchMessages() 方法def fetchMessages(timeout: Long, replicaId: Int, fetchMinBytes: Int, fetchMaxBytes: Int, hardMaxBytesLimit: Boolean, fetchInfos: Seq[(TopicPartition, PartitionData)], quota: ReplicaQuota UnboundedQuota, responseCallback: Seq[(TopicPartition, FetchPartitionData)] Unit, isolationLevel: IsolationLevel) { val isFromFollower Request.isValidBrokerId(replicaId) val fetchOnlyFromLeader replicaId ! Request.DebuggingConsumerId replicaId ! Request.FutureLocalReplicaId val fetchIsolation if (isFromFollower || replicaId Request.FutureLocalReplicaId) FetchLogEnd else if (isolationLevel IsolationLevel.READ_COMMITTED) FetchTxnCommitted else FetchHighWatermark //从本地磁盘读取数据 def readFromLog(): Seq[(TopicPartition, LogReadResult)] { val result readFromLocalLog( replicaId replicaId, fetchOnlyFromLeader fetchOnlyFromLeader, fetchIsolation fetchIsolation, fetchMaxBytes fetchMaxBytes, hardMaxBytesLimit hardMaxBytesLimit, readPartitionInfo fetchInfos, quota quota) if (isFromFollower) updateFollowerLogReadResults(replicaId, result) else result } //获取读取结果 val logReadResults readFromLog() var bytesReadable: Long 0 var errorReadingData false val logReadResultMap new mutable.HashMap[TopicPartition, LogReadResult] logReadResults.foreach { case (topicPartition, logReadResult) if (logReadResult.error ! Errors.NONE) errorReadingData true bytesReadable bytesReadable logReadResult.info.records.sizeInBytes logReadResultMap.put(topicPartition, logReadResult) } if (timeout 0 || fetchInfos.isEmpty || bytesReadable fetchMinBytes || errorReadingData) { val fetchPartitionData logReadResults.map { case (tp, result) tp - FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records, result.lastStableOffset, result.info.abortedTransactions) } responseCallback(fetchPartitionData) } else { val fetchPartitionStatus new mutable.ArrayBuffer[(TopicPartition, FetchPartitionStatus)] fetchInfos.foreach { case (topicPartition, partitionData) logReadResultMap.get(topicPartition).foreach(logReadResult { val logOffsetMetadata logReadResult.info.fetchOffsetMetadata fetchPartitionStatus (topicPartition - FetchPartitionStatus(logOffsetMetadata, partitionData)) }) } val fetchMetadata FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader, fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus) val delayedFetch new DelayedFetch(timeout, fetchMetadata, this, quota, responseCallback) val delayedFetchKeys fetchPartitionStatus.map { case (tp, _) new TopicPartitionOperationKey(tp) } delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys) } }该方法内部定义了一个readFromLog()方法其作用有两个    a. 调用readFromLocalLog() 读取 Leader 副本的本地日志    b. 调用 updateFollowerLogReadResults() 更新Leader副本的HW、Leader副本保存的对应Follower副本的LEO以及尝试调整ISR列表等readFromLocalLog() 方法和内部定义的 read() 方法如下用于从Leader副本的日志文件读取数据def readFromLocalLog(replicaId: Int, fetchOnlyFromLeader: Boolean, fetchIsolation: FetchIsolation, fetchMaxBytes: Int, hardMaxBytesLimit: Boolean, readPartitionInfo: Seq[(TopicPartition, PartitionData)], quota: ReplicaQuota): Seq[(TopicPartition, LogReadResult)] { def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult { //读取的起始偏移量 val offset fetchInfo.fetchOffset //读取的大小 val partitionFetchSize fetchInfo.maxBytes //follower Replica 的LogStartOffset val followerLogStartOffset fetchInfo.logStartOffset brokerTopicStats.topicStats(tp.topic).totalFetchRequestRate.mark() brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark() val adjustedMaxBytes math.min(fetchInfo.maxBytes, limitBytes) try { trace(sFetching log segment for partition $tp, offset $offset, partition fetch size $partitionFetchSize, sremaining response limit $limitBytes (if (minOneMessage) s, ignoring response/partition size limits else )) val partition getPartitionOrException(tp, expectLeader fetchOnlyFromLeader) val fetchTimeMs time.milliseconds //读取数据获取读取结果里面包含了读取到的消息LEOHWLogStartOffset等信息 val readInfo partition.readRecords( //读取的起始偏移量 fetchOffset fetchInfo.fetchOffset, //Follower副本保存的Leader epoch currentLeaderEpoch fetchInfo.currentLeaderEpoch, maxBytes adjustedMaxBytes, fetchIsolation fetchIsolation, fetchOnlyFromLeader fetchOnlyFromLeader, minOneMessage minOneMessage) //获取读到的数据 val fetchDataInfo if (shouldLeaderThrottle(quota, tp, replicaId)) { //如果分区被限流了那么返回一个空集合 FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY) } else if (!hardMaxBytesLimit readInfo.fetchedData.firstEntryIncomplete) { //如果返回的消息集合不完整也返回一个空集合 FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY) } else { //正常返回 readInfo.fetchedData } //根据获取到的数据封装返回结果 LogReadResult(info fetchDataInfo, highWatermark readInfo.highWatermark,//Leader的HW leaderLogStartOffset readInfo.logStartOffset,//Leader的LogStartOffset leaderLogEndOffset readInfo.logEndOffset,//Leader的LEO followerLogStartOffset followerLogStartOffset,//Follower的LogStartOffset fetchTimeMs fetchTimeMs, readSize adjustedMaxBytes, lastStableOffset Some(readInfo.lastStableOffset), exception None//异常信息 ) } catch { case e (_: UnknownTopicOrPartitionException | _: NotLeaderForPartitionException | _: UnknownLeaderEpochException | _: FencedLeaderEpochException | _: ReplicaNotAvailableException | _: KafkaStorageException | _: OffsetOutOfRangeException) LogReadResult(info FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), highWatermark -1L, leaderLogStartOffset -1L, leaderLogEndOffset -1L, followerLogStartOffset -1L, fetchTimeMs -1L, readSize 0, lastStableOffset None, exception Some(e)) case e: Throwable brokerTopicStats.topicStats(tp.topic).failedFetchRequestRate.mark() brokerTopicStats.allTopicsStats.failedFetchRequestRate.mark() val fetchSource Request.describeReplicaId(replicaId) error(sError processing fetch with max size $adjustedMaxBytes from $fetchSource son partition $tp: $fetchInfo, e) LogReadResult(info FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), highWatermark -1L, leaderLogStartOffset -1L, leaderLogEndOffset -1L, followerLogStartOffset -1L, fetchTimeMs -1L, readSize 0, lastStableOffset None, exception Some(e)) } } //读取的最大字节 var limitBytes fetchMaxBytes //封装结果对象 val result new mutable.ArrayBuffer[(TopicPartition, LogReadResult)] //是否至少返回一条消息 var minOneMessage !hardMaxBytesLimit //遍历分区进行读取 readPartitionInfo.foreach { case (tp, fetchInfo) //获取读取的结果 val readResult read(tp, fetchInfo, limitBytes, minOneMessage) //获取每个分区读取的字节数 val recordBatchSize readResult.info.records.sizeInBytes if (recordBatchSize 0) minOneMessage false //更新还可以读取的字节数 limitBytes math.max(0, limitBytes - recordBatchSize) //将分区的读取结果保存到结果集合中 result (tp - readResult) } //返回结果集 result }其中read() 方法中通过调用Partition. readRecords() 方法就获取了 Leader 副本的高水位值//获取Leader Replica的高水位val initialHighWatermark localReplica.highWatermark.messageOffset从这里可以看出每个分区的读取结果中都包含了 Leader 副本的 LEO、HW、LogStartOffset以及 Follower 副本的LogStartOffset等信息。updateFollowerLogReadResults() 方法如下private def updateFollowerLogReadResults(replicaId: Int, readResults: Seq[(TopicPartition, LogReadResult)]): Seq[(TopicPartition, LogReadResult)] { debug(sRecording follower broker $replicaId log end offsets: $readResults) readResults.map { case (topicPartition, readResult) var updatedReadResult readResult nonOfflinePartition(topicPartition) match { //如果找到了对应的分区 case Some(partition) //根据副本id获取Partition对象中保存的副本对象 //Partition.allReplicasMap结构中保存了当前分区的所有副本对象。其中key是brokeridvalue是对应的Replica对象 partition.getReplica(replicaId) match { //如果获取到了Replica对象 case Some(replica) //TODO 更新leader保存的各个follower副本的LEO partition.updateReplicaLogReadResult(replica, readResult) case None warn(sLeader $localBrokerId failed to record follower $replicaIds position s${readResult.info.fetchOffsetMetadata.messageOffset} since the replica is not recognized to be sone of the assigned replicas ${partition.assignedReplicas.map(_.brokerId).mkString(,)} sfor partition $topicPartition. Empty records will be returned for this partition.) updatedReadResult readResult.withEmptyFetchInfo } //如果对应的分区没有被创建 case None warn(sWhile recording the replica LEO, the partition $topicPartition hasnt been created.) } topicPartition - updatedReadResult } }Partition.updateReplicaLogReadResult() 方法def updateReplicaLogReadResult(replica: Replica, logReadResult: LogReadResult): Boolean { val replicaId replica.brokerId val oldLeaderLW if (replicaManager.delayedDeleteRecordsPurgatory.delayed 0) lowWatermarkIfLeader else -1L //TODO 最终更新Leader副本保存的Follower副本的LEO的值 replica.updateLogReadResult(logReadResult) val newLeaderLW if (replicaManager.delayedDeleteRecordsPurgatory.delayed 0) lowWatermarkIfLeader else -1L val leaderLWIncremented newLeaderLW oldLeaderLW //TODO 尝试更新ISR列表在这个方法中会更新Leader副本对象的HW对象和分区对应的Log对象的HW值 val leaderHWIncremented maybeExpandIsr(replicaId, logReadResult) val result leaderLWIncremented || leaderHWIncremented if (result) tryCompleteDelayedRequests() debug(sRecorded replica $replicaId log end offset (LEO) position ${logReadResult.info.fetchOffsetMetadata.messageOffset}.) result }Replica.updateLogReadResult() 方法用于更新Partition保存的Follower副本的LEO(对应同步数据的第四步)def updateLogReadResult(logReadResult: LogReadResult) { if (logReadResult.info.fetchOffsetMetadata.messageOffset logReadResult.leaderLogEndOffset) _lastCaughtUpTimeMs math.max(_lastCaughtUpTimeMs, logReadResult.fetchTimeMs) else if (logReadResult.info.fetchOffsetMetadata.messageOffset lastFetchLeaderLogEndOffset) _lastCaughtUpTimeMs math.max(_lastCaughtUpTimeMs, lastFetchTimeMs) //更新Follower副本的日志起始偏移量即 _logStartOffset 变量 logStartOffset logReadResult.followerLogStartOffset //更新Follower副本的LEO元数据对象即 _logEndOffsetMetadata 变量 logEndOffsetMetadata logReadResult.info.fetchOffsetMetadata //最后一次拉取时Leader副本的LEO lastFetchLeaderLogEndOffset logReadResult.leaderLogEndOffset lastFetchTimeMs logReadResult.fetchTimeMs}maybeExpandIsr() 方法尝试更新ISR列表(对应同步数据的第五步)def maybeExpandIsr(replicaId: Int, logReadResult: LogReadResult): Boolean { inWriteLock(leaderIsrUpdateLock) { // 检查给定的副本对象是否需要添加到ISR列表 leaderReplicaIfLocal match { case Some(leaderReplica) //获取给定节点的Replica对象 val replica getReplica(replicaId).get //获取leader副本的HW值 val leaderHW leaderReplica.highWatermark //获取Follower副本的LEO val fetchOffset logReadResult.info.fetchOffsetMetadata.messageOffset //判断是否需要更新ISR列表的条件 //1.该节点不在ISR列表且replica.logEndOffsetMetadata.offsetDiff(leaderHW) //2.给定Follower副本的LEO大于等于leader副本的HW //3.给定的Follower副本属于该分区 //4.leader epoch对应的起始偏移量存在且小于Follower副本的LEO //满足这4个条件说明这个Follower副本已经和leader副本保持同步了把这个Follower副本加入到ISR列表中 if (!inSyncReplicas.contains(replica) assignedReplicas.map(_.brokerId).contains(replicaId) replica.logEndOffsetMetadata.offsetDiff(leaderHW) 0 leaderEpochStartOffsetOpt.exists(fetchOffset _)) { //将该副本加入集合 val newInSyncReplicas inSyncReplicas replica info(sExpanding ISR from ${inSyncReplicas.map(_.brokerId).mkString(,)} sto ${newInSyncReplicas.map(_.brokerId).mkString(,)}) // update ISR in ZK and cache //更新ISR列表 updateIsr(newInSyncReplicas) replicaManager.isrExpandRate.mark() } // check if the HW of the partition can now be incremented // since the replica may already be in the ISR and its LEO has just incremented //TODO 尝试更新leader副本的HW对象及分区对应的Log对象的HW值 maybeIncrementLeaderHW(leaderReplica, logReadResult.fetchTimeMs) case None false // nothing to do if no longer leader } } }maybeIncrementLeaderHW() 方法尝试更新 leader 副本的 HW 对象及分区对应的Log 对象的 HW 值(对应同步数据的第六步)private def maybeIncrementLeaderHW(leaderReplica: Replica, curTime: Long time.milliseconds): Boolean { val allLogEndOffsets assignedReplicas.filter { replica curTime - replica.lastCaughtUpTimeMs replicaLagTimeMaxMs || inSyncReplicas.contains(replica) }.map(_.logEndOffsetMetadata) //取ISR列表中副本的最小的LEO作为新的HW val newHighWatermark allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering) //获取旧的HW val oldHighWatermark leaderReplica.highWatermark //如果新的HW值大于旧的HW值就更新 if (oldHighWatermark.messageOffset newHighWatermark.messageOffset || (oldHighWatermark.messageOffset newHighWatermark.messageOffset oldHighWatermark.onOlderSegment(newHighWatermark))) { //更新Replica的hightWatermark对象以及对应Log对象的高水位值 leaderReplica.highWatermark newHighWatermark debug(sHigh watermark updated to $newHighWatermark) true } else { def logEndOffsetString(r: Replica) sreplica ${r.brokerId}: ${r.logEndOffsetMetadata} debug(sSkipping update high watermark since new hw $newHighWatermark is not larger than old hw $oldHighWatermark. sAll current LEOs are ${assignedReplicas.map(logEndOffsetString)}) false } }在前面分析Log日志对象的主要操作时其中有一项是进行高水位操作的管理。在Log类中操作高水位值的方法只有一个onHighWatermarkIncrementeddef onHighWatermarkIncremented(highWatermark: Long): Unit { lock synchronized { //更新高水位值      replicaHighWatermark  Some(highWatermark)      producerStateManager.onHighWatermarkUpdated(highWatermark) updateFirstUnstableOffset() } }这个方法就是将 Log 中的 replicaHightWatermark 变量修改为给定的值。那么什么时候会修改呢查看调用该方法的地方Replica.highWatermark_def highWatermark_(newHighWatermark: LogOffsetMetadata) { //如果是本地副本 if (isLocal) { if (newHighWatermark.messageOffset 0) throw new IllegalArgumentException(High watermark offset should be non-negative) //高水位的元数据对象 highWatermarkMetadata newHighWatermark //更新Log对象保存的高水位值 log.foreach(_.onHighWatermarkIncremented(newHighWatermark.messageOffset)) trace(sSetting high watermark for replica $brokerId partition $topicPartition to [$newHighWatermark]) } else { throw new KafkaException(sShould not set high watermark on partition $topicPartitions non-local replica $brokerId) } }在尝试更新Leader副本的高水位时会进行highWatermark_的调用//更新Replica的hightWatermark对象以及对应Log对象的高水位值leaderReplica.highWatermark newHighWatermark最后会将多个分区的读取结果(包含Leader副本 HW)放到集合中然后在合适的时机返回给Follower副本所在的节点(对应同步数据第七步)def fetchMessages(){ ... logReadResults.foreach { case (topicPartition, logReadResult) //如果读取发生错误 if (logReadResult.error ! Errors.NONE) errorReadingData true bytesReadable bytesReadable logReadResult.info.records.sizeInBytes //将读取结果放入集合 logReadResultMap.put(topicPartition, logReadResult) }    ...}    上面所说的合适的时机分为 立即返回 和 延时返回当满足下面四个条件之一时便立即返回否则会进行延时处理拉取等待的时间到了拉取请求中没有拉取分区的信息已经拉取到了足够多的数据拉取过程中发生错误总结Leader副本写入数据Follower副本进行同步的过程分为9个步骤leader 副本将数据写入本地磁盘leader 副本更新 LEOfollower 副本发送同步数据请求携带自身的 LEOleader 副本更新本地保存的其它副本的 LEOleader 副本尝试更新 ISR 列表leader 副本更新 HWleader 副本给 follower 副本返回数据携带 leader 副本的 HW 值follower 副本接收响应并写入数据更新自身 LEOfollower 副本更新本地的 HW 值关于 HW 和 LEO 的保存对于HWLeader 副本和 Follower 副本只保存自身的对于LEOFollower 副本只保存自身的但是 Leader 副本除了保存自身的外还会保存所有 Follower 副本的 LEO 值无论是Leader副本所在节点还是Follower副本所在节点分区对应的Partition 对象都会保存所有的副本对象但是只有本地副本对象有对应的日志文件
http://www.zqtcl.cn/news/35523/

相关文章:

  • 代理网站备案收钱seo按天计费系统
  • 贵州营销型网站“设计网站”
  • 做下载类网站前景黄骅港属于哪个区
  • 专门做app的网站wordpress post-type
  • 哈尔滨企业网站seo网页怎么弄
  • 网站建设的域名注册怎么做网站卖机床
  • 深圳沙头网站建设网站首页设计素材
  • 网站开发 在html标记后出现乱码 查看源文件显示是问好用python做电商网站
  • 工程网站怎么做中国建设银行官网站e路通下载
  • 建立网站 知乎金塔网站建设
  • wordpress搭建教育网站58直聘招聘网
  • wordpress主题 微信绍兴seo外包公司
  • 免费网站备案象山经济开发区建设有限公司网站
  • 广西网站建设软件推广汕头企业模板建站
  • PHP做克隆网站虚拟货币交易网站建设
  • 公司网站开发报价建e网手机版
  • 网站做统计分析学网站建设需要学多久
  • 用dedecms做的网站 脚本是什么做淘宝代销哪个网站好
  • 在线做qq空间的网站吗做ps兼职的网站有哪些
  • 如何做个购物网站安监局网站建设
  • 备案网站应用服务本地打开WordPress慢
  • 苏州网站开发建设方案中国重庆网站建设
  • 如何在12366网站上做实名认证做网站优化要多少钱
  • 设计彩票网站开发宣传片拍摄制作报价明细
  • 如何建设淘宝网站wordpress 多重搜索
  • wordpress搭建子网站wordpress版本要求
  • 做的网站响应速度慢微信服务号绑定网站吗
  • 合肥seo网站推广外包网络服务提供商是指什么
  • 招标文件免费下载网站福建中国建设工程造价管理协会网站
  • 兰州网站建设lst0931网站换主推关键词会怎么样