在网站开发中如何设置用户登录,百度学术官网登录入口,南昌建站方案,辽宁朝阳百姓网免费发布信息网文章目录 一. 调用StreamTask执行Checkpoint操作1. 执行Checkpoint总体代码流程1.1. StreamTask.checkpointState()1.2. executeCheckpointing1.3. 将算子中的状态快照操作封装在OperatorSnapshotFutures中1.4. 算子状态进行快照1.5. 状态数据快照持久化 二. CheckpointCoordin… 文章目录 一. 调用StreamTask执行Checkpoint操作1. 执行Checkpoint总体代码流程1.1. StreamTask.checkpointState()1.2. executeCheckpointing1.3. 将算子中的状态快照操作封装在OperatorSnapshotFutures中1.4. 算子状态进行快照1.5. 状态数据快照持久化 二. CheckpointCoordinator管理Checkpoint1. Checkpoint执行完毕后的确认过程2. 触发并完成Checkpoint操作3. 通知CheckpointComplete给TaskExecutor 三. 状态管理学习小结 上文介绍了CheckpointBarrier的对齐操作当CheckpointBarrier完成对齐操作后接下来就是通过notifyCheckpoint()方法触发StreamTask节点的Checkpoint操作。
一. 调用StreamTask执行Checkpoint操作
如下代码notifyCheckpoint()方法主要包含如下逻辑。 1. 判断toNotifyOnCheckpoint不为空。2. 创建CheckpointMetaData和CheckpointMetrics实例CheckpointMetaData用于存储Checkpoint的元信息CheckpointMetrics用于记录和监控Checkpoint监控指标。3. 触发StreamTask中算子的Checkpoint操作。
protected void notifyCheckpoint(CheckpointBarrier checkpointBarrier, long bufferedBytes, long alignmentDurationNanos) throws Exception {if (toNotifyOnCheckpoint ! null) {// 创建CheckpointMetaData对象用于存储Meta信息CheckpointMetaData checkpointMetaData new CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp());// 创建CheckpointMetrics对象用于记录监控指标CheckpointMetrics checkpointMetrics new CheckpointMetrics().setBytesBufferedInAlignment(bufferedBytes).setAlignmentDurationNanos(alignmentDurationNanos);// 调用toNotifyOnCheckpoint.triggerCheckpointOnBarrier()方法触发Checkpoint操作toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData,checkpointBarrier.getCheckpointOptions(),checkpointMetrics);}
}注意StreamTask是唯一实现了Checkpoint方法的子类即只有StreamTask才能触发当前Task实例中的Checkpoint操作。 接下来具体看Checkpoint执行细节
1. 执行Checkpoint总体代码流程 Checkpoint触发过程分为两种情况一种是CheckpointCoordinator周期性地触发数据源节点中的Checkpoint操作另一种是下游算子通过对齐CheckpointBarrier事件触发本节点算子的Checkpoint操作。 不管是哪种方式触发Checkpoint最终都是调用StreamTask.performCheckpoint()方法实现StreamTask实例中状态数据的持久化操作。 在StreamTask.performCheckpoint()方法中首先判断当前的Task是否运行正常然后使用actionExecutor线程池执行Checkpoint操作Checkpoint的实际执行过程如下。 Checkpoint执行前的准备操作让OperatorChain中所有的Operator执行Pre-barrier工作。将CheckpointBarrier事件发送到下游的节点中。算子状态数据进行快照 执行checkpointState()方法对StreamTask中OperatorChain的所有算子进行状态数据的快照操作该过程为异步非阻塞过程不影响数据的正常处理进程执行完成后会返回True到CheckpointInputGate中。 task挂掉情况处理 如果isRunning的条件为false表明Task不在运行状态此时需要给OperatorChain中的所有算子发送CancelCheckpointMarker消息这里主要借助recordWriter.broadcastEvent(message)方法向下游算子进行事件广播。当且仅当OperatorChain中的算子还没有执行完Checkpoint操作的时候下游的算子接收到CancelCheckpointMarker消息后会立即取消Checkpoint操作。 private boolean performCheckpoint(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetrics checkpointMetrics,boolean advanceToEndOfTime) throws Exception {LOG.debug(Starting checkpoint ({}) {} on task {},checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());final long checkpointId checkpointMetaData.getCheckpointId();if (isRunning) {// 使用actionExecutor执行Checkpoint逻辑actionExecutor.runThrowing(() - {if (checkpointOptions.getCheckpointType().isSynchronous()) {setSynchronousSavepointId(checkpointId);if (advanceToEndOfTime) {advanceToEndOfEventTime();}}//Checkpoint操作的准备工作operatorChain.prepareSnapshotPreBarrier(checkpointId);//将checkpoint barrier发送到下游的stream中operatorChain.broadcastCheckpointBarrier(checkpointId,checkpointMetaData.getTimestamp(),checkpointOptions);//对算子中的状态进行快照操作此步骤是异步操作//不影响streaming拓扑中数据的正常处理checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);});return true;} else {// 如果Task处于其他状态则向下游广播CancelCheckpointMarker消息actionExecutor.runThrowing(() - {final CancelCheckpointMarker message new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());recordWriter.broadcastEvent(message);});return false;}
} 1.1. StreamTask.checkpointState()
接下来我们看StreamTask.checkpointState()方法的具体实现如下代码。 创建CheckpointStateOutputStream实例。主要有如下两种实现类 FsCheckpointStateOutputStream文件类型系统MemoryCheckpointOutputStream内存的数据流输出。 创建CheckpointingOperation实例CheckpointingOperation封装了Checkpoint执行的具体操作流程以及checkpointMetaData、checkpointOptions、storage和checkpointMetrics等Checkpoint执行过程中需要的环境配置信息。调用CheckpointingOperation.executeCheckpointing()方法执行Checkpoint操作。 private void checkpointState(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetrics checkpointMetrics) throws Exception {// 创建CheckpointStreamFactory实例CheckpointStreamFactory storage checkpointStorage.resolveCheckpointStorageLocation(checkpointMetaData.getCheckpointId(),checkpointOptions.getTargetLocation());// 创建CheckpointingOperation实例CheckpointingOperation checkpointingOperation new CheckpointingOperation(this,checkpointMetaData,checkpointOptions,storage,checkpointMetrics);// 执行Checkpoint操作checkpointingOperation.executeCheckpointing();
}1.2. executeCheckpointing
如代码所示CheckpointingOperation.executeCheckpointing()方法主要包含如下逻辑。 遍历所有StreamOperator算子然后调用checkpointStreamOperator()方法为每个算子创建OperatorSnapshotFuture对象。这一步将所有算子的快照操作存储在OperatorSnapshotFutures集合中。将OperatorSnapshotFutures存储到operatorSnapshotsInProgress的键值对集合中其中Key为OperatorIDValue为该算子执行状态快照操作对应的OperatorSnapshotFutures对象创建AsyncCheckpointRunnable线程对象AsyncCheckpointRunnable实例中包含了创建好的OperatorSnapshotFutures集合。调用StreamTask.asyncOperationsThreadPool线程池运行asyncCheckpointRunnable线程执行operatorSnapshotsInProgress集合中算子的异步快照操作。 public void executeCheckpointing() throws Exception {//通过算子创建执行快照操作的OperatorSnapshotFutures对象for (StreamOperator? op : allOperators) {checkpointStreamOperator(op);}// 此处省略部分代码startAsyncPartNano System.nanoTime();checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);AsyncCheckpointRunnable asyncCheckpointRunnable new AsyncCheckpointRunnable(owner,operatorSnapshotsInProgress,checkpointMetaData,checkpointMetrics,startAsyncPartNano);// 注册Closeable操作owner.cancelables.registerCloseable(asyncCheckpointRunnable);// 执行asyncCheckpointRunnableowner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);}1.3. 将算子中的状态快照操作封装在OperatorSnapshotFutures中
如下代码AbstractStreamOperator.snapshotState()方法将当前算子的状态快照操作封装在OperatorSnapshotFutures对象中然后通过asyncOperationsThreadPool线程池异步触发所有的OperatorSnapshotFutures操作方法主要步骤如下。 创建OperatorSnapshotFutures对象封装当前算子对应的状态快照操作。创建snapshotContext上下文对象存储快照过程需要的上下文信息并调用snapshotState()方法执行快照操作。 snapshotState()方法由StreamOperator子类实现例如在AbstractUdfStreamOperator中会调用StreamingFunctionUtils.snapshotFunctionState(context,getOperatorStateBackend(), userFunction)方法执行函数中的状态快照操作。 向snapshotInProgress中指定KeyedStateRawFuture和OperatorStateRawFuture专门用于处理原生状态数据的快照操作。 如果operatorStateBackend不为空则将operatorStateBackend.snapshot()方法块设定到OperatorStateManagedFuture中并注册到snapshotInProgress中等待执行。如果keyedStateBackend不为空则将keyedStateBackend.snapshot()方法块设定到KeyedStateManagedFuture中并注册到snapshotInProgress中等待执行。 返回创建的snapshotInProgress异步Future对象snapshotInProgress中封装了当前算子需要执行的所有快照操作。 public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions,CheckpointStreamFactory factory) throws Exception {// 获取KeyGroupRangeKeyGroupRange keyGroupRange null ! keyedStateBackend ?keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;// 创建OperatorSnapshotFutures处理对象OperatorSnapshotFutures snapshotInProgress new OperatorSnapshotFutures();// 创建snapshotContext上下文对象StateSnapshotContextSynchronousImpl snapshotContext new StateSnapshotContextSynchronousImpl(checkpointId,timestamp,factory,keyGroupRange,getContainingTask().getCancelables());try {snapshotState(snapshotContext);// 设定KeyedStateRawFuture和OperatorStateRawFuturesnapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());// 如果operatorStateBackend不为空设定OperatorStateManagedFutureif (null ! operatorStateBackend) {snapshotInProgress.setOperatorStateManagedFuture(operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));}// 如果keyedStateBackend不为空设定KeyedStateManagedFutureif (null ! keyedStateBackend) {snapshotInProgress.setKeyedStateManagedFuture(keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));}} catch (Exception snapshotException) {// 此处省略部分代码}return snapshotInProgress;
}
这里可以看出原生状态和管理状态的RunnableFuture对象会有所不同 RawState主要通过从snapshotContext中获取的RawFuture对象 管理状态的快照操作ManagedState主要通过operatorStateBackend和keyedStateBackend进行状态的管理并根据StateBackend的不同实现将状态数据写入内存或外部文件系统中。 1.4. 算子状态进行快照
我们知道所有的状态快照操作都会被封装到OperatorStateManagedFuture对象中最终通过AsyncCheckpointRunnable线程触发执行。
下面我们看AsyncCheckpointRunnable线程的定义。如代码所示AsyncCheckpointRunnable.run()方法主要逻辑如下。 调用FileSystemSafetyNet.initializeSafetyNetForThread()方法为当前线程初始化文件系统安全网确保数据能够正常写入。创建TaskStateSnapshot实例 创建jobManagerTaskOperatorSubtaskStates和localTaskOperatorSubtaskStates对应的TaskStateSnapshot实例其中jobManagerTaskOperatorSubtaskStates用于存储和记录发送给JobManager的Checkpoint数据localTaskOperatorSubtaskStates用于存储TaskExecutor本地的状态数据。 执行所有状态快照线程操作 遍历operatorSnapshotsInProgress集合获取OperatorSnapshotFutures并创建OperatorSnapshotFinalizer实例用于执行所有状态快照线程操作。在OperatorSnapshotFinalizerz中会调用FutureUtils.runIfNotDoneAndGet()方法执行KeyedState和OperatorState的快照操作。 从finalizedSnapshots中获取JobManagerOwnedState和TaskLocalState分别存储在jobManagerTaskOperatorSubtaskStates和localTaskOperatorSubtaskStates集合中。调用checkpointMetrics对象记录Checkpoint执行的时间并汇总到Metric监控系统中。如果AsyncCheckpointState为COMPLETED状态则调用reportCompletedSnapshotStates()方法向JobManager汇报Checkpoint的执行结果。如果出现其他异常情况则调用handleExecutionException()方法进行处理。 public void run() {FileSystemSafetyNet.initializeSafetyNetForThread();try {// 创建TaskStateSnapshotTaskStateSnapshot jobManagerTaskOperatorSubtaskStates new TaskStateSnapshot(operatorSnapshotsInProgress.size());TaskStateSnapshot localTaskOperatorSubtaskStates new TaskStateSnapshot(operatorSnapshotsInProgress.size());for (Map.EntryOperatorID, OperatorSnapshotFutures entry : operatorSnapshotsInProgress.entrySet()) {OperatorID operatorID entry.getKey();OperatorSnapshotFutures snapshotInProgress entry.getValue();// 创建OperatorSnapshotFinalizer对象OperatorSnapshotFinalizer finalizedSnapshots new OperatorSnapshotFinalizer(snapshotInProgress);jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(operatorID,finalizedSnapshots.getJobManagerOwnedState());localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(operatorID,finalizedSnapshots.getTaskLocalState());}final long asyncEndNanos System.nanoTime();final long asyncDurationMillis (asyncEndNanos - asyncStartNanos) / 1_000_000L;checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING,CheckpointingOperation.AsyncCheckpointState.COMPLETED)) {reportCompletedSnapshotStates(jobManagerTaskOperatorSubtaskStates,localTaskOperatorSubtaskStates,asyncDurationMillis);} else {LOG.debug({} - asynchronous part of checkpoint {} could not be completed because it was closed before.,owner.getName(),checkpointMetaData.getCheckpointId());}} catch (Exception e) {handleExecutionException(e);} finally {owner.cancelables.unregisterCloseable(this);FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();}
}至此算子状态数据快照的逻辑基本完成算子中的托管状态主要借助KeyedStateBackend和OperatorStateBackend管理。
KeyedStateBackend和OperatorStateBackend都实现了SnapshotStrategy接口提供了状态快照的方法。SnapshotStrategy根据不同类型存储后端主要有HeapSnapshotStrategy和RocksDBSnapshotStrategy两种类型。 1.5. 状态数据快照持久化
这里我们以HeapSnapshotStrategy为例介绍在StateBackend中对状态数据进行状态快照持久化操作的步骤。如代码所示
HeapSnapshotStrategy.processSnapshotMetaInfoForAllStates()方法中定义了对KeyedState以及OperatorState的状态处理逻辑。 遍历每个StateSnapshotRestore。调用StateSnapshotRestore.stateSnapshot()方法此时会创建StateSnapshot对象。将创建的StateSnapshot添加到metaInfoSnapshots和cowStateStableSnapshots集合中完成堆内存存储类型KvState的快照操作。 private void processSnapshotMetaInfoForAllStates(List metaInfoSnapshots,MapStateUID, StateSnapshot cowStateStableSnapshots,MapStateUID, Integer stateNamesToId,MapString, ? extends StateSnapshotRestore registeredStates,StateMetaInfoSnapshot.BackendStateType stateType) {for (Map.EntryString, ? extends StateSnapshotRestore kvState :registeredStates.entrySet()) {final StateUID stateUid StateUID.of(kvState.getKey(), stateType);stateNamesToId.put(stateUid, stateNamesToId.size());StateSnapshotRestore state kvState.getValue();if (null ! state) {final StateSnapshot stateSnapshot state.stateSnapshot();metaInfoSnapshots.add(stateSnapshot.getMetaInfoSnapshot());cowStateStableSnapshots.put(stateUid, stateSnapshot);}}
}二. CheckpointCoordinator管理Checkpoint
1. Checkpoint执行完毕后的确认过程
当StreamTask中所有的算子完成状态数据的快照操作后Task实例会立即将TaskStateSnapshot消息发送到管理节点的CheckpointCoordinator中并在CheckpointCoordinator中完成后续的操作。如图所示Checkpoint执行完毕后的确认过程如下。 调用StreamTask.reportCompletedSnapshotStates 当StreamTask中的所有算子都完成快照操作后会调用StreamTask.reportCompletedSnapshotStates()方法将TaskStateSnapshot等Ack消息发送给TaskStateManager。TaskStateManager封装了CheckpointCoordinatorGateway因此可以直接和CheckpointCoordinator组件进行RPC通信。 消息传递 将消息传递给CheckpointCoordinatorGateway TaskStateManager通过CheckpointResponder.acknowledgeCheckpoint()方法将acknowledgedTaskStateSnapshot消息传递给CheckpointCoordinatorGateway接口实现者实际上就是JobMasterRPC服务。消息传递给CheckpointCoordinator JobMaster接收到RpcCheckpointResponder返回的Ack消息后会调用SchedulerNG.acknowledgeCheckpoint()方法将消息传递给调度器。调度器会将Ack消息封装成AcknowledgeCheckpoint传递给CheckpointCoordinator组件继续处理。 管理PendingCheckpoint 当CheckpointCoordinator接收到AcknowledgeCheckpoint后会从pendingCheckpoints集合中获取对应的PendingCheckpoint然后判断当前Checkpoint中是否收到AcknowledgedTasks集合所有的Task实例发送的Ack确认消息。 如果notYetAcknowledgedTasks为空则调用completePendingCheckpoint()方法完成当前PendingCheckpoint操作并从pendingCheckpoints集合中移除当前的PendingCheckpoint。 添加CompletedCheckpoint 紧接着PendingCheckpoint会转换成CompletedCheckpoint此时CheckpointCoordinator会在completedCheckpointStore集合中添加CompletedCheckpoint。 通知Checkpoint操作结束。 CheckpointCoordinator会遍历tasksToCommitTo集合中的ExecutionVertex节点并获取Execution对象然后通过Execution向TaskManagerGateway发送CheckpointComplete消息通知所有的Task实例本次Checkpoint操作结束。 通知同步 当TaskExecutor接收到CheckpointComplete消息后会从TaskSlotTable中获取对应的Task实例向Task实例中发送CheckpointComplete消息。所有实现CheckpointListener监听器的组件或算子都会获取Checkpoint完成的消息然后完成各自后续的处理操作。 2. 触发并完成Checkpoint操作
CheckpointCoordinator组件接收到Task实例的Ack消息快照完成了后会触发并完成Checkpoint操作。如代码PendingCheckpoint.finalizeCheckpoint()方法的具体实现如下。
1向sharedStateRegistry中注册operatorStates。
2结束pendingCheckpoint中的Checkpoint操作并生成CompletedCheckpoint。
3将completedCheckpoint添加到completedCheckpointStore中
4从pendingCheckpoint中移除checkpointId对应的PendingCheckpoint
并触发队列中的Checkpoint请求。
5向所有的ExecutionVertex节点发送CheckpointComplete消息
通知Task实例本次Checkpoint操作完成。private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {final long checkpointId pendingCheckpoint.getCheckpointId();final CompletedCheckpoint completedCheckpoint;// 首先向sharedStateRegistry中注册operatorStatesMapOperatorID, OperatorState operatorStates pendingCheckpoint.getOperatorStates();sharedStateRegistry.registerAll(operatorStates.values());// 对pendingCheckpoint中的Checkpoint做结束处理并生成CompletedCheckpointtry {try {completedCheckpoint pendingCheckpoint.finalizeCheckpoint();failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointId());}catch (Exception e1) {// 如果出现异常则中止运行并抛出CheckpointExecutionif (!pendingCheckpoint.isDiscarded()) {failPendingCheckpoint(pendingCheckpoint,CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);}throw new CheckpointException(Could not finalize the pending checkpoint checkpointId .,CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);}// 当完成finalization后PendingCheckpoint必须被丢弃Preconditions.checkState(pendingCheckpoint.isDiscarded() completedCheckpoint ! null);// 将completedCheckpoint添加到completedCheckpointStore中try {completedCheckpointStore.addCheckpoint(completedCheckpoint);} catch (Exception exception) {// 如果completed checkpoint存储出现异常则进行清理executor.execute(new Runnable() {Overridepublic void run() {try {completedCheckpoint.discardOnFailedStoring();} catch (Throwable t) {LOG.warn(Could not properly discard completed checkpoint {}.,completedCheckpoint.getCheckpointID(), t);}}});throw new CheckpointException(Could not complete the pending checkpoint checkpointId ., CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, exception);}} finally {// 最后从pendingCheckpoints中移除checkpointId对应的PendingCheckpointpendingCheckpoints.remove(checkpointId);// 触发队列中的Checkpoint请求triggerQueuedRequests();}// 记录checkpointIdrememberRecentCheckpointId(checkpointId);// 清除之前的CheckpointsdropSubsumedCheckpoints(checkpointId);// 计算和前面Checkpoint操作之间的最低延时lastCheckpointCompletionRelativeTime clock.relativeTimeMillis();LOG.info(Completed checkpoint {} for job {} ({} bytes in {} ms)., checkpointId, job,completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());// 通知所有的ExecutionVertex节点Checkpoint操作完成final long timestamp completedCheckpoint.getTimestamp();for (ExecutionVertex ev : tasksToCommitTo) {Execution ee ev.getCurrentExecutionAttempt();if (ee ! null) {ee.notifyCheckpointComplete(checkpointId, timestamp);}}
}3. 通知CheckpointComplete给TaskExecutor
当TaskExecutor接收到来自CheckpointCoordinator的CheckpointComplete消息后会调用Task.notifyCheckpointComplete()方法将消息传递到指定的Task实例中。Task线程会将CheckpointComplete消息通知给StreamTask中的算子。
如下代码
/**
将notifyCheckpointComplete()转换成RunnableWithException线程并提交到Mailbox中运行且在MailboxExecutor线程模型中获取和执行的优先级是最高的。
最终notifyCheckpointComplete()方法会在MailboxProcessor中运行。
**/public FutureVoid notifyCheckpointCompleteAsync(long checkpointId) {return mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).submit(() - notifyCheckpointComplete(checkpointId),checkpoint %d complete, checkpointId);
}继续具体看StreamTask.notifyCheckpointComplete()如下代码
1获取当前Task中算子链的算子并发送Checkpoint完成的消息。
2获取TaskStateManager对象向其通知Checkpoint完成消息这里主要调用
TaskLocalStateStore清理本地无用的Checkpoint数据。
3如果当前Checkpoint是同步的Savepoint操作直接完成并终止当前Task实例并调用
resetSynchronousSavepointId()方法将syncSavepointId重置为空。private void notifyCheckpointComplete(long checkpointId) {try {boolean success actionExecutor.call(() - {if (isRunning) {LOG.debug(Notification of complete checkpoint for task {}, getName());// 获取当前Task中operatorChain所有的Operator并通知每个Operator Checkpoint执行成功的消息for (StreamOperator? operator : operatorChain.getAllOperators()) {if (operator ! null) {operator.notifyCheckpointComplete(checkpointId);}}return true;} else {LOG.debug(Ignoring notification of complete checkpoint for not-running task {}, getName());return true;}});// 获取TaskStateManager并通知Checkpoint执行完成的消息getEnvironment().getTaskStateManager().notifyCheckpointComplete(checkpointId);// 如果是同步的Savepoint操作则直接完成当前Taskif (success isSynchronousSavepointId(checkpointId)) {finishTask();// Reset to notify the internal synchronous savepoint mailbox loop.resetSynchronousSavepointId();}} catch (Exception e) {handleException(new RuntimeException(Error while confirming checkpoint, e));}
}算子接收到Checkpoint完成消息后会根据自身需要进行后续的处理默认在AbstractStreamOperator基本实现类中会通知keyedStateBackend进行后续操作。
对于AbstractUdfStreamOperator实例会判断当前userFunction是否实现了CheckpointListener如果实现了则向UserFucntion通知Checkpoint执行完成的信息 例如在FlinkKafkaConsumerBase中会通过获取到的Checkpoint完成信息将Offset提交至Kafka集群确保消费的数据已经完成处理详细实现可以参考FlinkKafkaConsumerBase.notifyCheckpointComplete()方法。 public void notifyCheckpointComplete(long checkpointId) throws Exception {super.notifyCheckpointComplete(checkpointId);if (userFunction instanceof CheckpointListener) {((CheckpointListener) userFunction).notifyCheckpointComplete(checkpointId);}
}三. 状态管理学习小结
通过学习状态管理的源码我们可以再来思考下如下几个场景问题是不是有一点“庖丁解牛”的意思
flink中状态存在的意义是什么涉及到哪些场景。
实时聚合比如计算过去一小时内的平均销售额。这时你会需要使用到Flink的状态来存储过去一小时内的所有销售数据。窗口操作Flink SQL支持滚动窗口、滑动窗口、会话窗口等。这些窗口操作都需要Flink的状态来存储在窗口期限内的数据。状态的持久化与任务恢复实时任务挂掉之后为了快速从上一个点恢复任务可以使用savepoint和checkpoint。多流joinFlink至少存储一个流中的数据以便于在新的记录到来时进行匹配。 其次通过学习Flink状态管理相关源码可以进一步了解状态管理的细节操作为解决更加复杂的问题打下理论基础
深入理解任务运行过程中各算子状态的流转机制快速定位问题在遇到实际问题时能够快速反应出是哪块逻辑出现了问题应对故障状态管理和Flink容错机制相关可以了解Flink发生故障时如何保证状态的一致性和可恢复性二次开发可以自定义状态后端或者拓展优化已有的例如RocksDB状态后端等性能优化了解了Flink是如何有效的处理和管理状态就可以优化任务性能减少资源消耗。 参考《Flink设计与实现核心原理与源码解析》–张利兵