绍兴市柯桥区建设局网站,wordpress 小组插件,移动端网站和微信网页设计,网站建设教程免费Checkpoint 使用了 Chandy-Lamport 算法
流程
1. 正常流式处理#xff08;尚未Checkpoint#xff09;
如下图#xff0c;Topic 有两个分区#xff0c;并行度也为 2#xff0c;根据奇偶数
我们假设任务从 Kafka 的某个 Topic 中读取数据#xff0c;该Topic 有 2 个 Pa…
Checkpoint 使用了 Chandy-Lamport 算法
流程
1. 正常流式处理尚未Checkpoint
如下图Topic 有两个分区并行度也为 2根据奇偶数
我们假设任务从 Kafka 的某个 Topic 中读取数据该Topic 有 2 个 Partition故任务的并行度为 2。根据读取到数据下面的数据是 offset 的值同时我们把它直接当成数据的奇偶性将数据分发到两个 task 进行 Sum
Source1 记录消费到了第 3 条数据Source2 记录消费到了第 4 条数据并将其发送
同时还有 Source1 正在发送的 2 和 3Source2 在发送的 4
已经处理的有 Source1 的 1 和 Source2 的 1、2、3当前SourceOperator ( Sum) 算子已经sum的结果是 2 和 1135 2. Flink 任务触发 Checkpoint
到了 Checkpoint 的设置的时间间隔jobmanager 触发 checkpoint 操作
此时会给每个 Source 发送一个 barrier 消息消息中的数值表示 Checkpoint 的序号每次启动新的 Checkpoint 该值都会递增 2.2.3 Source启动Checkpoint
当Source接收到barrier消息会将当前的状态Partition、Offset保存到 StateBackend然后向 JobManager 报告Checkpoint 完成。之后Source会将barrier消息广播给下游的每一个 task 2.2.4 task 接收 barrierbarrier 对数据的截断
当task接收到某个上游如这里的Source1发送来的 barrier会将该上游barrier之前的数据继续进行处理而barrier之后发送来的消息不会进行处理会被缓存起来。
也就是说
以 barrier 为节点对 barrier 前后的数据分开barrier 之前的数据属于本次 Checkpointbarrier 之后的数据属于下一次 Checkpoint所以下次 Checkpoint 的数据是不应该在本次 Checkpoint 过程中被计算的因此会将数据进行缓存
不同 Source 的barrier 发送时消费到的 offset 是不一样的barrier 只是区分当前某个时刻已经消费的数据和 barrier 后才来的数据不会去管你的 offset
2.2.5 barrier对齐
但是除了 Operator chains 这种一对一还可能 reblance 算子也就是某个 task 有多个上游输入的情况 sum_even 有两个 Source 源当接收到其中一个 Source 的barrier后会等待其他 Source 的 barrier 到来
在此期间接收到 barrier 的 Source 发来的数据不会处理只会缓存而未接收到 barrier 的 Source 发来的数据依然会进行处理直到接收到该Source 发来的 barrier这个过程称为 barrier的对齐
barrier 对齐主要是为了避免 Checkpoint 时有 barrier 后的数据而 barrier 是否对齐决定了程序实现的是 Exactly Once 还是 At Least Once
如果是一对一的Operator如map、flatMap 或 filter 等则没有对齐这个概念都会实现Exactly Once语义如果是多对一的Operator如 join或者一对多的Operator如 reparation/shuffle时可以通过配置Exactly Once语义时必须进行barrier的对齐而配置了 At Least Once语义时 barrier 可以不对齐 如果不进行barrier对齐那么这里 sum_even 在接收 Source2 的 barrier 之前对于接收到 Source1的数据4不会进行缓存而是直接进行计算sum_even 的状态改为12当接收到 Source2 的barrier会将 sum_even 的状态 sum12 进行持久化。如果本次Checkpoint成功在进行下次 Checkpoint 前任务崩溃会根据本次Checkpoint进行恢复。此时状态如下 Source1的 offset 为3从数据4开始读。 Source2 的 offset 为4从数据5开始读。 sum_even 的状态为 12Souce1的数据2,数据4Source2的数据2,数据4后续接收Source1的数据4数据6...接收Source2的数据6数据8... Source1的数据4被计算了两次 2.2.6 处理缓存数据
task接收到所有上游发送来的 barrier也就代表收到了本次 Checkpoint 的所有数据
但是我们还有 barrier 后的属于下一次 Checkpoint 的被缓存起来但没有处理的数据task 会将 barrier 继续发送给下游如下图 sum 以后的 sink然后处理缓存的数据 2.2.7 上报Checkpoint完成
当sink收到barrier后会向JobManager上报本次Checkpoint完成。至此本次Checkpoint结束各阶段的状态均进行了持久化可以用于后续的故障恢复 两阶段提交
如果开启了exact once 语义sink 写入后采用了两阶段提交比如mysql有事务的就是写入事务然后标记预提交等到checkpoint提交事务并改为标记提交完成
那我没事务怎么办hive、iceberg、paimon这些不能实现exact once当然可以比如hive就是写入临时文件此时数据不可见提交时修改文件名数据可见 他不是从 Source 到 Sink 完成后进行 Checkpoint而是预提交的方式
两阶段提交2PC将分布式事务分成了两个阶段两个阶段分别为提交请求投票和提交执行有兴趣的可以去搜下
异步每次在把快照存储到我们的状态后端时如果是同步进行就会阻塞正常任务从而引入延迟。因此 Flink 在做快照存储时采用异步方式 历史文章迁移未完成还需补充