旅游网站开发意义和价值,社团网站开发模板,企业网站开发制作,阿里云域名注册口令1. FlinkKafka保证精确一次消费相关问题#xff1f;
Fink的检查点和恢复机制和可以重置读位置的source连接器结合使用#xff0c;比如kafka#xff0c;可以保证应用程序不会丢失数据。尽管如此#xff0c;应用程序可能会发出两次计算结果#xff0c;因为从上一次检查点恢…1. FlinkKafka保证精确一次消费相关问题
Fink的检查点和恢复机制和可以重置读位置的source连接器结合使用比如kafka可以保证应用程序不会丢失数据。尽管如此应用程序可能会发出两次计算结果因为从上一次检查点恢复的应用程序所计算的结果将会被重新发送一次一些结果已经发送出去了这时任务故障然后从上一次检查点恢复这些结果将被重新计算一次然后发送出去。这个时候需要下层sink做到幂等性或者事务。 所以
· souce使用执行ExactlyOnce的数据源比如kafka等
· 内部使用FlinkKafakConsumer并开启CheckPoint偏移量会保存到StateBackend中并且默认会将偏移量写入到topic中去即_consumer_offsets Flink设置CheckepointingModel.EXACTLY_ONCE
· sink
存储系统支持覆盖也即幂等性如Redis,Hbase,ES等
存储系统不支持覆需要支持事务(预写式日志或者两阶段提交),两阶段提交可参考Flink集成的kafka sink的实现。
2. 你们的Flink怎么提交的使用的per-job模式吗 模式 生命周期 资源隔离 优点 缺点 main方法 Session 关闭会话,才会停止 共用JM和TM 预先启动启动作业不再启动。资源充分共享 资源隔离比较差TM不容易扩展 在客户端执行 Per-job Job停止集群停止 单个Job独享JM和TM 充分隔离资源根据job按需申请 job启动慢,每个job需要启动一个JobManager 在客户端执行 Application 当Application全部执行完集群才会停止 Application使用一套JM和TM Client负载低Application之间实现资源隔离Application内实现资源共享 对per-job模式和session模式的优化部署模式(优点) 在Cluster中
欢迎关注一起学习 3. 了解过Flink的两阶段提交策略吗讲讲详细过程。如果第一阶段宕机了会怎么办第二阶段呢
顾名思义2PC将分布式事务分成了两个阶段两个阶段分别为提交请求和提交。协调者根据参与者的响应来决定是否需要真正地执行事务
提交请求阶段
· 协调者向所有参与者发送prepare请求与事务内容询问是否可以准备事务提交并等待参与者的响应。
· 参与者执行事务中包含的操作并记录undo日志用于回滚和redo日志用于重放但不真正提交。
· 参与者向协调者返回事务操作的执行结果执行成功返回yes否则返回no
提交执行阶段
分为成功与失败两种情况。
若所有参与者都返回yes说明事务可以提交
· 协调者向所有参与者发送commit请求。
· 参与者收到commit请求后将事务真正地提交上去并释放占用的事务资源并向协调者返回ack。
· 协调者收到所有参与者的ack消息事务成功完成。
若有参与者返回no或者超时未返回说明事务中断需要回滚
· 协调者向所有参与者发送rollback请求
· 参与者收到rollback请求后根据undo日志回滚到事务执行前的状态释放占用的事务资源并向协调者返回ack
· 协调者收到所有参与者的ack消息事务回滚完成 对于Flink sink是kafka为例
每当需要做checkpoint时JobManager就在数据流中打入一个屏障barrier作为检查点的界限。屏障随着算子链向下游传递每到达一个算子都会触发将状态快照写入状态后端(state BackEnd)的动作。当屏障到达Kafka sink后触发preCommit(实际上是KafkaProducer.flush())方法刷写消息数据但还未真正提交。接下来还是需要通过检查点来触发提交阶段。
第一阶段宕机这个时候offset没有提交重新启动会按offset继续消费和从状态中恢复状态值。
第二阶段宕机分两种情况在提交阶段后宕机因为这个链路已经处理完重新启动会按offset继续消费。在checkpint完成后宕机还没有来得及触发提交阶段这个时候可能会出现丢数据情况这个时候有学者提出了三阶段提交。
4. 你是如何通过Flink实现uv的
用户id刚好是数字可以使用bitmap去重简单原理是把 user_id 作为 bit 的偏移量 offset设置为 1 表示有访问使用 1 MB的空间就可以存放 800 多万用户的一天访问计数情况。
val bloomFilter new Bloom(129)
// 先定义redis中存储位图的key
val storedBitMapKey xxxxx
// 去重判断当前userId的hash值对应的位图位置是否为0
val userId elements.last._2.toString
// 计算hash值就对应着位图中的偏移量
val offset bloomFilter.hash(userId, 61)
val isExist jedis.getbit(storedBitMapKey, offset) class Bloom(size: Long) extends Serializable{ private val cap size // 默认cap应该是2的整次幂 //hash函数 value即userid,seed随机数种子 def hash(value: String, seed: Int): Long { var result 0 //遍历userid对每一位进行随机数种子的处理 for( i - 0 until value.length ){ result result * seed value.charAt(i) } // 返回hash值要映射到cap范围内 (cap - 1) result }
}
当然还有Flink自带BloomFilter, google现成的布隆过滤器。
5. Flink中的双流join怎么实现
Flink双流JOIN主要分为两大类。一类是基于原生State的Connect算子操作另一类是基于窗口的JOIN操作。其中基于窗口的JOIN可细分为window join和interval join两种。 实现原理底层原理依赖Flink的State状态存储通过将数据存储到State中进行关联join, 最终输出结果。 Window join
将两条实时流中元素分配到同一个时间窗口中完成Join。底层原理: 两条实时流数据缓存在Window State中当窗口触发计算时执行join操作。
源码核心总结windows窗口 state存储 双层for循环执行join() joinwhereequalTo 算子实现inner join coGroupwhereequalTo 可算子实现left/right join
这个时候有个问题某流数据可能晚到导致窗口关闭了都没有join上 Interval Join的双流JOIN实现机制
Interval Join根据右流相对左流偏移的时间区间(interval)作为关联窗口在偏移区间窗口中完成join操作。满足数据流stream2在数据流stream1的 interval(low, high)偏移区间内关联join。interval越大关联上的数据就越多超出interval的数据不再关联。 实现原理interval join也是利用Flink的state存储数据不过此时存在state失效机制ttl触发数据清理操作。比如
orderStream.keyBy(_.1)// 调用intervalJoin关联.intervalJoin(orderDetailStream._2)// 设定时间上限和下限.between(Time.milliseconds(-30), Time.milliseconds(30)) .process(new ProcessWindowFunction())class ProcessWindowFunction extends ProcessJoinFunction...{override def processElement(...) {collector.collect((r1, r2) r1 : r2)}
} 基于Connect的双流JOIN实现机制
对两个DataStream执行connect操作将其转化为ConnectedStreams, 生成的Streams可以调用不同方法在两个实时流上执行且双流之间可以共享状态。这个时候结合状态如果某流数据没有过来先存状态后流过来去状态去找没有再存状态。
orderStream.connect(orderDetailStream).keyBy(orderId, orderId).process(new orderProcessFunc());
6. Flink的checkpoint文件是保存在哪里, 可以选择哪些
MemoryStateBackend
在 CheckPoint 时State Backend 对状态进行快照并将快照信息作为 CheckPoint 应答消息的一部分发送给 JobManager(master)同时 JobManager 也将快照信息存储在堆内存中。
FsStateBackend
FsStateBackend 将正在运行中的状态数据保存在 TaskManager 的内存中。CheckPoint 时将状态快照写入到配置的文件系统目录中。少量的元数据信息存储到 JobManager 的内存中
RocksDBStateBackend
RocksDBStateBackend 将正在运行中的状态数据保存在 RocksDB 数据库中RocksDB 数据库默认将数据存储在 TaskManager 的数据目录。CheckPoint 时整个 RocksDB 数据库被 checkpoint 到配置的文件系统目录中。
7. Flink 维表关联怎么做的应该是开发必做建议提前准备
1 查找关联同步异步
2 状态编程,预加载数据到状态中按需取
3 冷热数据
4 广播维表
5 Temporal Table Join
8. Flink 数据倾斜是怎么解决的
1定位反压
Flink Web UI 自带的反压监控直接方式、Flink Task Metrics间接方式。通过监控反压的信息可以获取到数据处理瓶颈的 Subtask。
2确定数据倾斜
Flink Web UI 自带Subtask 接收和发送的数据量。当 Subtasks 之间处理的数据量有较大的差距则该 Subtask 出现数据倾斜。
解决
1数据源 source 消费不均匀
通过调整Flink并行度解决数据源消费不均匀或者数据源反压的情况。我们常常例如kafka数据源调整并行度的原则Source并行度与 kafka分区数是一样的或者 kafka 分区数是KafkaSource 并发度的整数倍。建议是并行度等于分区数。
2key 分布不均匀
上游数据分布不均匀使用keyBy来打散数据的时候出现倾斜。通过添加随机前缀打散 key 的分布使得数据不会集中在几个 Subtask。
两阶段聚合解决 KeyBy加盐局部聚合去盐全局聚合
预聚合加盐局部聚合在原来的 key 上加随机的前缀或者后缀。
聚合去盐全局聚合删除预聚合添加的前缀或者后缀然后进行聚合统计。
8.Flink如何处理乱序数据?
比如我们现在设置了每5秒一次的滚动窗口比如我们从Kafka中的读取到的第一个事件时间为10:00:00
以此从kafka读取数据如下
A10:00:00B10:00:01C10:00:05D10:00:06E10:00:03F10:00:04
当D10:00:06时间到了就会触发【10:00:00-10:00:05窗口只是简单举个例子没有严格意义上的按照源码公式去划分窗口后续的EF数据就会被抛弃会被忽略计算。
为了解决根据事件时间计算可能会产生这种问题Flink 提供了WaterMarker机制利用一定的延迟容忍可一定程度上避免因消息乱序导致的错误计算或者数据丢失。
单数据流情况并行度1WaterMarker当前数据流中当前元素最大事件时间 - 最大允许的延迟时间或乱序时间。
对于多流而言并行度1的source task),它每个独立的subtask都会生成各自的watermark。这些watermark会随着流数据一起分发到下游算子并覆盖掉之前的watermark。当有多个watermark同时到达下游算子的时候flink会选择较小的watermark进行更新。当一个task的watermark大于窗口结束时间时就会立马触发窗口操作。 watermark可以在一定程度上解决事件乱序问题但严重的乱序问题依然无法解决我们可以结合侧位输出来收集更为延迟的数据避免延迟数据丢失。所以不可过度依赖WaterMarker帮助我们解决乱序问题如果发生过多乱序问题应注重检查生产数据的生产端问题。还有一点要注意的watermark是一个全局的值不是某一个key下的值所以即使不是同一个key的数据其watermark也会不断增加。 9. Flink内存溢出怎么办
当Flink程序在运行过程中发生内存溢出一种可能的原因是任务需要处理的数据量超过了可以保存在内存中的数据导致运算符将部分数据溢出到磁盘。对于这种情况我们可以尝试以下几种解决方法
优化程序逻辑减小数据的处理量。例如我们可以使用更高效的算法或者对数据进行预处理来减少数据的复杂性。调整Flink程序的并行度。根据具体的问题和硬件环境增加或减少并行度可能会带来更好的性能。调整Flink程序的内存配置。我们可以根据程序的实际需求和系统资源情况提高或降低Flink程序可以使用的内存量。如果上述方法都无法解决问题那么可能需要考虑升级硬件资源增加服务器的内存。
10. Flink试过哪些优化
优化的话可以参考下面几点
GC的配置 1调整老年代与新生代的比值 或者 更换垃圾收集器 2增加JVM内存数据倾斜 1需要重新设计key以更小粒度的key使得task大小合理化。
2当分区导致数据倾斜时需要考虑优化分区。避免非并行度操作有些对DataStream的操作会导致无法并行例如WindowAll。
3调用rebalance操作使数据分区均匀。
4自定义分区使用一个用户自定义的Partitioner对每一个元素选择目标task由于用户对自己的数据更加熟悉可以按照某个特征进行分区从而优化任务执行。 3. checkpoint 1频率不宜过高 2超时时间不要过长一般在频率一半 3使用异步 4.其他配置 1配置JobManager内存 2配置TaskManager个数 3配置TaskManager Slot数
5.其他
(1)背压的时候大家往往忽略了数据的序列化和反序列化过程所造成的性能问题。
(2) 一些数据结构 比如 HashMap 和 HashSet 这种 key 需要经过 hash 计算的数据结构在数据量大的时候使用 keyby 进行操作 造成的性能影响是非常大的。
(3) 如果我们的下游是 MySQLHBase这种我们都会进行一个批处理的操作就是让数据存储到一个 buffer 里面在达到某些条件的时候再进行发送这样做的目的就是减少和外部系统的交互降低网络开销的成本。
(4) 频繁GC 无论是 CMS 也好G1也好在进行 GC 的时候都会停止整个作业的运行GC 时间较长还会导致 JobManager 和 TaskManager 没有办法准时发送心跳此时 JobManager 就会认为此 TaskManager 失联它就会另外开启一个新的 TaskManager。
6. 场景 产生背压的时候如果定位下游计算不过来导致上游挤压严重这个时候想着怎么去增加并行度也好或者利用多线程也好目的就是增加计算能力。如果多线程计算这个时候更多关注cpu核数来分配更多的时间片提高计算能力。 11. Flink的重启策略怎么设置的?
如果启用了checkpoint并且没有显式配置重启策略会默认使用fixeddelay策略最大重试次数为Integer.MAX_VALUE。
固定延迟重启策略
固定延迟重启策略是尝试给定次数重新启动作业。如果超过最大尝试次数则作业失败。在两次连续重启尝试之间会有一个固定的延迟等待时间
// 配置文件设置
restart-strategy: fixed-delay # fixed-delay:固定延迟策略restart-strategy.fixed-delay.attempts: 5 # 尝试5次默认Integer.MAX_VALUErestart-strategy.fixed-delay.delay: 10s # 设置延迟时间10s默认为 akka.ask.timeout时间// 代码设置固定延迟重启策略
env.setRestartStrategy(RestartStrategies
.fixedDelayRestart(3,Time.seconds(3)));
故障率重启策略
故障率重启策略在故障后重新作业当设置的故障率failure rate每个时间间隔内发生故障的次数超过设定的限制时作业最终失败。在两次连续重启尝试之间重启策略延迟等待一段时间。
restart-strategy: failure-rate # 设置重启策略为failure-raterestart-strategy.failure-rate.max-failures-per-interval: 3 # 失败作业之前的给定时间间隔内的最大重启次数默认1restart-strategy.failure-rate.failure-rate-interval: 5min # 测量故障率的时间间隔。默认1minrestart-strategy.failure-rate.delay: 10s # 两次连续重启尝试之间的延迟默认akka.ask.timeout时间失败后5分钟内重启3次每次重启间隔10s如果第3次还是失败则任务最终是失败不再重启无重启策略
restart-strategy: none 12. Flink重大版本差别
Flink 1.9
阿里内部版本Blink首次合并入Flink
重构 Flink WebUIFlink 1.10
原生 Kubernetes 的初步集成beta 版本以及对 Python 支持PyFlink的重大优化。
Flink 1.11
非对齐的 Checkpoint 机制。这一机制是对 Flink 容错机制的一个重要改进它可以提高严重反压作业的 Checkpoint 速度。
Flink SQL 引入了对 CDC PyFlink 优化了多个部分的性能包括对向量化的用户自定义函数Python UDF的支持。
Application 部署模式
Flink 1.12
DataStream API 上添加了高效的批执行模式的支持。这是批处理和流处理实现真正统一的运行时的一个重要里程碑。
扩展了 Kafka SQL connector使其可以在 upsert 模式下工作。
SQL 中 支持 Temporal Table Join 剩下的版本在原有的基础上优化等等。
13. Flink的怎么和RocksDB交互的。怎样一个流程
Flink和RocksDB的交互主要通过Java Native接口JNI实现。具体来说Flink作业运行时RocksDB会被内嵌到TaskManager进程中并以本地线程方式运行来读写本地文件。
14. Flink topN的实现?
统计最近10 秒钟内最热门的两个 url 链接并且每 5 秒钟更新一次
stream.map(data - data.getUserName())
.windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(new UserHashMapCountAgg(), new UserCountWindowResult())
.print();public static class UserHashMapCountAgg implements AggregateFunctionString, HashMapString, Long, ListTuple2String, Long{Overridepublic HashMapString, Long createAccumulator() {return new HashMap();}Overridepublic HashMapString, Long add(String value, HashMapString, Long accumulator) {if (accumulator.containsKey(value)){accumulator.put(value, accumulator.get(value) 1L);}else{accumulator.put(value, 1L);}return accumulator;}Overridepublic ListTuple2String, Long getResult(HashMapString, Long accumulator) {ListTuple2String, Long resultList new ArrayList();accumulator.forEach((key, count) - {resultList.add(Tuple2.of(key, count));});//排序resultList.sort(new ComparatorTuple2String, Long() {Overridepublic int compare(Tuple2String, Long o1, Tuple2String, Long o2) {return o2.f1.intValue() - o1.f1.intValue();}});
// resultList.sort(Comparator.comparing((key_1, key_2) - key_1.f1.compareTo(key_1.f1)));
// Collections.sort(resultList, Comparator.comparing(key - key.f1));return resultList;}Overridepublic HashMapString, Long merge(HashMapString, Long a, HashMapString, Long b) {//do nothingreturn null;}} 15. Flink监控怎么做的
Web UI方式: Flink提供了一个web UI来观察、监视和调试正在运行的应用服务。并且还可以执行或取消组件或任务的执行。
Prometheus GrafanaFlink提供了一个复杂的度量系统来收集和报告系统和用户定义的度量指标信息。flink-metrics-prometheus 的相关jar放到lib下然后在fink-conf中配置相关信息即可。然后配置相关的指标信息和报警即可。
16. Flink关闭后状态端数据恢复得慢怎么办
1选用合理的state数据结构和 statebackend 2并行度合理设置
17. Flink的序列化讲讲呢?
目前绝大多数的大数据计算框架都是基于JVM实现的为了快速地计算数据需要将数据加载到内存中进行处理。当大量数据需要加载到内存中时如果使用Java序列化方式来存储对象占用的空间会较大降低存储传输效率。
例如一个只包含布尔类型的对象需要占用16个字节的内存对象头要占8个字节、boolean属性占用1个字节、对齐填充还要占用7个字节。
Java序列化方式存储对象存储密度是很低的。也是基于此Flink框架实现了自己的内存管理系统在Flink自定义内存池分配和回收内存然后将自己实现的序列化对象存储在内存块中。 所谓序列化和反序列化的含义
序列化就是将一个内存对象转换成二进制串形成网络传输或者持久化的数据流。
反序列化将二进制串转换为内存对。
TypeInformation 是 Flink 类型系统的核心类
在Flink中当数据需要进行序列化时会使用TypeInformation的生成序列化器接口调用一个 createSerialize() 方法创建出TypeSerializerTypeSerializer提供了序列化和反序列化能力。如下图所示Flink 的序列化过程 对于大多数数据类型 Flink 可以自动生成对应的序列化器能非常高效地对数据集进行序列化和反序列化 如下图 比如BasicTypeInfo、WritableTypeIno 但针对 GenericTypeInfo 类型Flink 会使用 Kyro 进行序列化和反序列化。其中Tuple、Pojo 和 CaseClass 类型是复合类型它们可能嵌套一个或者多个数据类型。在这种情况下它们的序列化器同样是复合的。它们会将内嵌类型的序列化委托给对应类型的序列化器。
通过一个案例介绍Flink序列化和反序列化 如上图所示当创建一个Tuple 3 对象时包含三个层面一是 int 类型一是 double 类型还有一个是 Person。Person对象包含两个字段一是 int 型的 ID另一个是 String 类型的 name
1在序列化操作时会委托相应具体序列化的序列化器进行相应的序列化操作。从图中可以看到 Tuple 3 会把 int 类型通过 IntSerializer 进行序列化操作此时 int 只需要占用四个字节。
2Person 类会被当成一个 Pojo 对象来进行处理PojoSerializer 序列化器会把一些属性信息使用一个字节存储起来。同样其字段则采取相对应的序列化器进行相应序列化在序列化完的结果中可以看到所有的数据都是由 MemorySegment 去支持。
MemorySegment 具有什么作用呢
MemorySegment 在 Flink 中会将对象序列化到预分配的内存块上它代表 1 个固定长度的内存默认大小为 32 kb。MemorySegment 代表 Flink 中的一个最小的内存分配单元相当于是 Java 的一个 byte 数组。每条记录都会以序列化的形式存储在一个或多个 MemorySegment 中。
18. Flink的异步有了解吗
// 创建一个原始的流
val stream: DataStream[String] ...// 添加一个 async I/O
val resultStream: DataStream[(String, String)] AsyncDataStream.(un)orderedWait(stream, new AsyncDatabaseRequest(),500, TimeUnit.MILLISECONDS, // 超时时间120) // 进行中的异步请求的最大数量 无序模式 异步请求一结束就立刻发出结果记录。 流中记录的顺序在经过异步 I/O 算子之后发生了改变。 当使用 处理时间 作为基本时间特征时这个模式具有最低的延迟和最少的开销。 此模式使用 AsyncDataStream.unorderedWait(...) 方法。
有序模式: 这种模式保持了流的顺序。发出结果记录的顺序与触发异步请求的顺序记录输入算子的顺序相同。
19. Flink的boardcast join 的原理是什么
利用 broadcast State 将维度数据流广播到下游所有 task 中。这个 broadcast 的流可以与我们的事件流进行 connect然后在后续的 process 算子中进行关联操作即可。
当维度信息修改后flink 的 broadcast 流实时消费 MQ 中数据就可以实时读取到维表的更新然后配置就会在 Flink 任务生效通过这种方法及时的修改了维度信息。broadcast 可以动态实时更新配置。
20. 你们有用过Flink的背压吗怎么做优化和调整
简单来说就是下游处理速率 跟不上 上游发送数据的速率下游来不及消费导致队列被占满后上游的生产会被阻塞最终导致数据源的摄入被阻塞。
反压会影响到两项指标: checkpoint 时长和 state 大小
1前者是因为 checkpoint barrier 是不会越过普通数据的数据处理被阻塞也会导致 checkpoint barrier 流经整个数据管道的时长变长因而 checkpoint 总体时间End to End Duration变长。
2后者是因为为保证 EOSExactly-Once-Semantics准确一次对于有两个以上输入管道的 Operatorcheckpoint barrier 需要对齐Alignment接受到较快的输入管道的 barrier 后它后面数据会被缓存起来但不处理直到较慢的输入管道的 barrier 也到达这些被缓存的数据会被放到state 里面导致 checkpoint 变大。
Flink反压如何解决
1定位反压节点
要解决反压首先要做的是定位到造成反压的节点这主要有两种办法:
1. 通过 Flink Web UI 自带的反压监控面板
2. 通过 Flink Task Metrics。
1反压监控面板
Flink Web UI 的反压监控提供了 SubTask 级别的反压监控原理是通过周期性对 Task 线程的栈信息采样得到线程被阻塞在请求 Buffer意味着被下游队列阻塞的频率来判断该节点是否处于反压状态。默认配置下这个频率在 0.1 以下则为 OK0.1 至 0.5 为 LOW而超过 0.5 则为 HIGH。 2Task Metrics
Flink 提供的 Task Metrics 是更好的反压监控手段
如果一个 Subtask 的发送端 Buffer 占用率很高则表明它被下游反压限速了
如果一个 Subtask 的接受端 Buffer 占用很高则表明它将反压传导至上游。
21. Flink的CBO物理执行计划和逻辑执行计划?
Flink的优化执行其实是借鉴的数据库的优化器来生成的执行计划。
CBO成本优化器代价最小的执行计划就是最好的执行计划。传统的数据库成本优化器做出最优化的执行计划是依据统计信息来计算的。Flink 的成本优化器也一样。Flink 在提供最终执行前优化每个查询的执行逻辑和物理执行计划。这些优化工作是交给底层来完成的。根据查询成本执行进一步的优化从而产生潜在的不同决策如何排序连接执行哪种类型的连接并行度等等。
22. 你们用Flink怎么去开发一些checkpoint的超时问题
Flink 的 Checkpoint 包括如下几个部分
● JM trigger checkpoint
● Source 收到 trigger checkpoint 的 PRC自己开始做 snapshot并往下游发送 barrier
● 下游接收 barrier需要 barrier 都到齐才会开始做 checkpoint
● Task 开始同步阶段 snapshot
● Task 开始异步阶段 snapshot
● Task snapshot 完成汇报给 JM
上面的任何一个步骤不成功整个 checkpoint 都会失败。 从webui上可以看到Acknowledged 一列表示有多少个 subtask 对这个 Checkpoint 进行了 ack失败的情况大多数总有几个subtask 失败。
1Checkpoint Decline
当前 Flink 中如果较小的 Checkpoint 还没有对齐的情况下收到了更大的 Checkpoint则会把较小的 Checkpoint 给取消掉。
2Checkpoint Expire
Checkpoint 做的非常慢超过了 timeout 还没有完成则整个 Checkpoint 也会失败。
生产比如
1采用短连接方式获取数据库连接每次来一波数据都创建连接发送完断开连接。因此很容易因为获取不到连接而使得processElement方法处于阻塞状态。而processElement方法阻塞进而影响Barrier的流动所以导致了Checkpoint发生超时。
2Checkpoint状态比较大增 量 Checkpoint 则 只 备 份 上 一 次 Checkpoint 中 不 存 在 的 state。
3作业存在反压或者数据倾斜barrier 发送慢从而整体影响 Checkpoint 的时间。
4主线程太忙导致没机会做 snapshot。
在 task 端所有的处理都是单线程的数据处理和 barrier 处理都由主线程处理如果主线程在处理太慢也会影响整体 Checkpoint 的进度。