网站建设到上线步骤,网站搭建思路,godaddy怎么建设网站,网站开发人员招聘背景
GenericWriteAheadSink是可以用于几乎是精准一次输出的场景#xff0c;为什么说是几乎精准一次呢#xff1f;我们从源码的角度分析一下
GenericWriteAheadSink做不到精准一次输出的原因
首先我们看一下flink检查点完成后通知GenericWriteAheadSink开始进行分段的记录…背景
GenericWriteAheadSink是可以用于几乎是精准一次输出的场景为什么说是几乎精准一次呢我们从源码的角度分析一下
GenericWriteAheadSink做不到精准一次输出的原因
首先我们看一下flink检查点完成后通知GenericWriteAheadSink开始进行分段的记录输出并提交事务的代码
pubblic void notifyCheckpointComplete(long checkpointId) throws Exception {super.notifyCheckpointComplete(checkpointId);synchronized (pendingCheckpoints) {IteratorPendingCheckpoint pendingCheckpointIt pendingCheckpoints.iterator();while (pendingCheckpointIt.hasNext()) {PendingCheckpoint pendingCheckpoint pendingCheckpointIt.next();long pastCheckpointId pendingCheckpoint.checkpointId;int subtaskId pendingCheckpoint.subtaskId;long timestamp pendingCheckpoint.timestamp;StreamStateHandle streamHandle pendingCheckpoint.stateHandle;if (pastCheckpointId checkpointId) {try {if (!committer.isCheckpointCommitted(subtaskId, pastCheckpointId)) {try (FSDataInputStream in streamHandle.openInputStream()) {//开始把分段记录列表的记录进行输出boolean success sendValues(new ReusingMutableToRegularIteratorWrapper(new InputViewIterator(new DataInputViewStreamWrapper(in),serializer),serializer),pastCheckpointId,timestamp);if (success) {//把分段记录列表输出成功后提交事务committer.commitCheckpoint(subtaskId, pastCheckpointId);streamHandle.discardState();pendingCheckpointIt.remove();}}} else {streamHandle.discardState();pendingCheckpointIt.remove();}} catch (Exception e) {// we have to break here to prevent a new (later) checkpoint// from being committed before this oneLOG.error(Could not commit checkpoint., e);break;}}}}}从上面的源码可以看出sendValue方法和提交事务commitCheckpoint方法并不能保证原子性这就意味着如果sendValue执行了一部分或者全部而提交事务方法commitCheckpoint失败那么此时这个检查点对应的事务相当于就没有完成在下一个检查点的通知消息中会把历史检查点的事务重新sendValue然后进行commit一次这就意味着相同的记录会执行两次sendValue操作这就是GenericWriteAheadSink不能保证精准一次的原因