威海相亲网站,青岛建设大学招聘信息网站,html代码查看,wordpress 搜索 自定义字段Flink是一个带状态的数据处理系统#xff1b;系统在处理数据的过程中#xff0c;各算子所记录的状态会随着数据的处理而不断变化#xff1b;
1. 状态
所谓状态State#xff0c;一般指一个具体的 Task 的状态#xff0c;即线程处理过程中需要保存的历史数据或历史累计数据…Flink是一个带状态的数据处理系统系统在处理数据的过程中各算子所记录的状态会随着数据的处理而不断变化
1. 状态
所谓状态State一般指一个具体的 Task 的状态即线程处理过程中需要保存的历史数据或历史累计数据默认保存在 Java 的堆内存中。 根据算子是否存在按照Key进行分区State可以划分为keyed state 和 Non-keyed stateOperator State、算子状态
operator state是task级别的state说白了就是每个task对应一个state, 在逻辑上由算子task下所有subtask共享 Operator State的经常被用在Source或Sink算子上用来保存流入数据的偏移量或对输出数据做缓存以保证Flink应用的Exactly-Once语义。keyed state 是基于KeyedStream上的状态这个状态是跟特定的Key 绑定的。KeyedStream流上的每一个Key都对应一个State
2. 状态数据结构
状态数据由Flink内置状态机制管理。keyed state提供了5种数据结构
2.1 keyed state 数据结构
状态状态描述ValueState保存一个可以更新和检索的值如上所述每个值都对应到当前的输入数据的keyListState保存一个元素的列表。可以往这个列表中追加数据并在当前的列表上进行检索MapState维护了一个映射列表ReducingState保存一个单值表示添加到状态的所有值的聚合AggregateState保留一个单值表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与添加到状态的元素的类型不同
2.2 operator state
状态状态描述ListStateListState的快照存储数据系统重启后list数据的重分配模式为 round-robin 轮询平均分配UnionListStateUnionListState的快照存储数据在系统重启后list数据的重分配模式为 广播模式 在每个subtask上都拥有一份完整的数据
3. 状态后端
默认情况下state会保存在taskmanager的JVM堆内存checkpoint会存储在JobManager的内存中。然而状态数据的存储和checkpoint的存储位置可以改变由state Backend(状态后端)配置实现
老版本flink-1.12版及以前 Fsstatebackend MemoryStatebackend RocksdbStateBackend
flink1.12版本之后可用的状态后端类型有两种 HashMapStateBackend、EmbeddedRocksDBStateBackend 而且其所生成的快照文件也统一了格式因而在job重新部署或者版本升级时可以任意替换statebackend HashMapStateBackend ※ 状态数据是以java对象形式存储在heap内存中 ※ 内存空间不够时也会溢出部分数据到本地磁盘文件 ※ 可以支撑大规模的状态数据只不过在状态数据规模超出内存空间时读写效率就会明显降低 EmbeddedRocksDBStateBackend ※ RocksDB使用一套日志结构的数据库引擎它是Flink中内置的第三方状态管理器, 为了更好的性能这套引擎是用C编写的。 Key和value是任意大小的字节流。 ※ 它需要配置一个远端的filesystem uri一般是HDFS在做checkpoint的时候会把本地的数据直接复制到fileSystem中。fail over的时候从fileSystem中恢复到本地, RocksDB克服了state受内存限制的缺点同时又能够持久化到远端文件系统中比较适合在生产中使用。 ※ 使用RocksDB HDFS进行state存储首先state先在taskManger的本地存储到RocksDB然后异步写入到HDFS中状态数量仅仅受限于本地磁盘容量限制。
4. 状态数据容错
Flink是一个stateful带状态的数据处理系统系统在处理数据的过程中各算子所记录的状态会随着数据的处理而不断变化
一旦系统崩溃需要重启后能够恢复出崩溃前的状态才能进行数据的接续处理因此必须要一种机制能对系统内的各种状态进行持久化容错Flink用checkpoint机制实现状态数据的容错
4.1 checkpoint
Checkpoint是Flink实现容错机制最核心的功能它能够根据配置周期性地基于Stream中各个Operator/task的状态来生成快照从而将这些状态数据定期持久化存储下来当Flink程序一旦意外崩溃时重新运行程序时可以有选择地从这些快照进行恢复从而修正因为故障带来的程序数据异常。 checkpoint默认关闭需要手工开启。开启后默认Exactly-once快照模式。还有一种快照模式为At-least-once checkPoint的位置设置flink-conf.yaml#state.checkpoints.dir Flink支持不同的重启策略以在故障发生时控制作业如何重启集群在启动时会伴随一个默认的重启策略。配置参数flink-conf.yaml#restart-strategy ♦ 如果没有启用 checkpointing则使用无重启 (no restart) 策略 ♦ 如果启用了 checkpointing但没有配置重启策略则使用固定 间隔 (fixed-delay) 策略尝试重启次数默认值是Integer.MAX_VALUE。 ♦ 另一种重启策略为Failure rate某时间段内失败了N次就重启 全局配置
# 每隔3s重启一次重试间隔为10s
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
#5分钟内若失败了3次则认为该job失败重试间隔为10s
restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
# 不重启
restart-strategy: none单个JOB内配置
Configuration conf new Configuration();
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);
//开启状态检查点机制(它将会定期对整个系统中各个task的状态进行快照持久化以便失败重启后还能从失败之前的状态恢复env.enableCheckpointing(1000,CheckpointingMode.EXACTLY_ONCE);
// checkpoint机制触发后持久化保存各task状态数据的存储位置生产中用hdfs
env.getCheckpointConfig().setCheckpointStorage(hdfs://node01:8020/tmp/flink/state);
// 指定状态后端存储内存
env.setStateBackend(new HashMapStateBackend());
// 开启自动重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.milliseconds(2000)));
// env.setRestartStrategy(RestartStrategies.failureRateRestart(3, org.apache.flink.api.common.time.Time.seconds(100), org.apache.flink.api.common.time.Time.seconds(10)));
// env.setRestartStrategy(RestartStrategies.noRestart())checkpoint个数默认保留最近成功生成的一个支持保留多个通过参数flink-conf.yaml#state.checkpoints.num-retained控制如果希望回退到某个Checkpoint点只需要指定对应的某个Checkpoint路径即可实现
flink run -t yarn-per-job -yjm 1024 -ytm 1024 -s hdfs://node01:8020/tmp/flink/state/715b120fe8736a3af7842ea0a5264c46/chk-6/_metadata4.2 savePoint
savePoint是检查点一种特殊实现底层其实也是使用Checkpoint的机制。
savePoint是用户以手工命令的方式触发checkpoint并将结果持久化到指定的存储目录中。
作业升级、代码修改、任务迁移和维护,都可以使用savePoint savePoint的存储位置 savePoint的存储位置flink-conf.yaml#state.savepoints.dir不是必须设置但设置了后, 后面创建指定Job的Savepoint时可以不用在手动执行命令时指定Savepoint的位置。 savePoint的手动触发 #【针对on standAlone模式】 bin/flink savepoint jobId [targetDirectory] #【针对on yarn模式需要指定-yid参数】 bin/flink savepoint jobId [targetDirectory] -yid yarnAppId jobId 需要触发savepoint的jobId编号 targetDirectory 指定savepoint存储数据目录 -yid 指定yarnAppId 例如 flink savepoint 84e766231bbe4b9ff3667f9a0d80b867 -yid application_1619059559839_0001 查看HDFS上savepoint目录 #Savepoint directory /flink/savepoints/savepoint-:shortjobid-:savepointid/ #Savepoint file contains the checkpoint meta data /savepoints/savepoint-:shortjobid-:savepointid/_metadata 4. 触发savepoint并且停止作业 ##语法 bin/flink stop jobId -yid yarnAppId ##例如 flink stop 84e766231bbe4b9ff3667f9a0d80b867 -yid application_1619059559839_0001 从指定的savepoint启动job ##语法 bin/flink run -s savepointPath [runArgs] ##例如 flink run -t yarn-per-job -yjm 1024 -ytm 1024 -s hdfs://node01:8020/flink/savepoints/savepoint-84e766-0591f3377ad0 -c com.loess.checkpoint.TestCheckPoint flink-study-1.0-SNAPSHOT.jar 清除savepoint数据 bin/flink savepoint -d savepointPath ##也可以手动删除某个savepoint这通过常规的文件系统操作就可以做到不影响其它的savepoints和checkpoints savePoint使用建议 为了能够在作业的不同版本之间以及 Flink 的不同版本之间顺利升级推荐通过 uid(String) 方法手动的给算子赋予 ID这些 ID 将用于确定每一个算子的状态范围。 不手动给各算子指定 ID则会由 Flink 自动给每个算子生成一个 ID。只要这些 ID 没有改变就能从保存点 savepoint将程序恢复回来。而这些自动生成的 ID 依赖于程序的结构并且对代码的更改敏感。当程序改变时ID会随之变化所以建议用户手动设置 ID DataStreamString stream env.// Stateful source (e.g. Kafka) with ID.addSource(new StatefulSource()).uid(source-id) // ID for the source operator.shuffle()// Stateful mapper with ID.map(new StatefulMapper()).uid(mapper-id) // ID for the mapper// Stateless printing sink.print(); // Auto-generated ID4.3 savePoint与checkPoint的区别
checkpoint的侧重点是“容错”即Flink作业意外失败并重启之后能够直接从checkpoint来恢复运行且不影响作业逻辑的准确性。而savepoint的侧重点是“维护”即Flink作业需要在人工干预下手动重启、升级、迁移或A/B测试时先将状态整体写入可靠存储维护完毕之后再从savepoint恢复现场。savepoint是通过checkpoint机制创建的所以savepoint本质上是特殊的checkpoint。checkpoint面向Flink Runtime本身由Flink的各个TaskManager定时触发快照并自动清理一般不需要用户干预savepoint面向用户完全根据用户的需要触发与清理。checkpoint是支持增量的通过RocksDB特别是对于超大状态的作业而言可以降低写入成本。savepoint并不会连续自动触发所以savepoint没有必要支持增量。
5. checkpoint机制原理
checkPoint是所有 Operator / Task 的状态在某个时间点的一份拷贝一份快照, 这个时间点应该是所有 Operator / Task 任务都恰好处理完一个相同的输入数据的时候。 若某个subTask挂了则此时的状态都被清空从checkpoint恢复最近一次的状态重新启动应用程序计算输入流
5.1 Barrier机制
Barrier是一种特殊事件用来作为快照信号由checkpoint 协调器向数据流中注入该信号subtask任务收到该信号后就会执行状态的快照。
首先是JobManager中的checkpoint Coordinator(协调器) 向任务中的所有source Task周期性发送barrier栅栏进行快照请求。source Task接受到barrier后 会把当前自己的状态进行snapshot(可以保存在HDFS上)。source向checkpoint coordinator确认snapshot已经完成。source继续向下游transformation operator发送 barrier。transformation operator重复source的操作直到sink operator向协调器确认snapshot完成。coordinator确认完成本周期的snapshot已经完成。
5.2 Barrier对齐
对于下游算子来说可能有多个与之相连的上游输入我们将算子之间的边 称为通道。Source要将一个ID为n的Checkpoint Barrier向所有下游算子广播这也意味着下游算子的多个输入里都有同一个Checkpoint Barrier而且 不同输入里Checkpoint Barrier的流入进度可能不同。因此Checkpoint Barrier传播的过程需要进行对齐Barrier Alignment 算子对齐分为四部 (1). 算子子任务在某个输入通道中收到第一个ID为n的Checkpoint Barrier但是其他输入通道中ID为n的Checkpoint Barrier还未到达该算子子任务开始准备进行对齐。
(2). 算子子任务将第一个输入通道的数据缓存下来同时继续处理其他输入通道的数据这个过程被称为对齐。
(3). 第二个输入通道的Checkpoint Barrier抵达该算子子任务该算子子任务执行快照将状态写入State Backend然后将ID为n的Checkpoint Barrier向下游所有输出通道广播。
(4). 对于这个算子子任务快照执行结束继续处理各个通道中新流入数据包括刚才缓存起来的数据。
6.快照性能优化方案
每次进行Checkpoint前都需要暂停处理新流入数据然后开始执行快照假如状态比较大一次快照可能长达几秒甚至几分钟。
Checkpoint Barrier对齐时必须等待所有上游通道都处理完假如某个上游通道处理很慢这可能造成整个数据流堵塞。
两种优化方案 ① Flink提供了异步快照Asynchronous Snapshot的机制。当实际执行快照时Flink可以立即向下广播Checkpoint Barrier表示自己已经执行完自己部分的快照。一旦数据同步完成再给Checkpoint Coordinator发送确认信息。通过基于 Chandy-Lamport 算法的分布式快照将检查点的保存和数据处理分离开不暂停整个应用。 ② Flink允许跳过对齐这一步或者说一个算子子任务不需要等待所有上游通道的Checkpoint Barrier直接将Checkpoint Barrier广播执行快照并继续处理后续流入数据。为了保证数据一致性Flink必须将那些较慢的数据流中的元素也一起快照一旦重启这些元素会被重新处理一遍