2核4g做网站,网站开发工程师待遇淄博,石家庄网络营销哪家好做,商城推广 网站建设1、State Backends 概述
Flink 提供了多种 state backends#xff0c;它用于指定状态的存储方式和位置。
状态可以位于 Java 的堆或堆外内存#xff1b;取决于你的 state backend#xff0c;Flink 也可以自己管理应用程序的状态。
为了让应用程序可以维护非常大的状态它用于指定状态的存储方式和位置。
状态可以位于 Java 的堆或堆外内存取决于你的 state backendFlink 也可以自己管理应用程序的状态。
为了让应用程序可以维护非常大的状态Flink 可以自己管理内存如果有必要可以溢写到磁盘默认情况下所有 Flink Job 会使用 Flink 配置文件中指定的 state backend但配置文件中指定的 state backend 会被 Job 中指定的 state backend 覆盖。
Configuration config new Configuration();
config.set(StateBackendOptions.STATE_BACKEND, rocksdb);
env.configure(config);2、使用State Backends
1.State Backends 作用
用 Data Stream API 编写的程序通常以各种形式保存状态
在 Window 触发之前要么收集元素、要么聚合转换函数可以使用 key/value 格式的状态接口来存储状态转换函数可以实现 CheckpointedFunction 接口使其本地变量具有容错能力
**在启动 CheckPoint 机制时状态会随着 CheckPoint 而持久化以防止数据丢失、保障恢复时的一致性**状态内部的存储格式、状态在 CheckPoint 时如何持久化以及持久化在哪里均取决于选择的 State Backend。
2.可用的 State Backends
Flink 内置了以下 state backends
HashMapStateBackendEmbeddedRocksDBStateBackend
默认使用 HashMapStateBackend。
aHashMapStateBackend
在 HashMapStateBackend 内部数据以 Java 对象的形式存储在堆中Key/value 形式的状态和窗口算子会持有一个 hash table其中存储着状态值、触发器。
HashMapStateBackend 的适用场景
有较大 state较长 window 和较大 key/value 状态的 Job。所有的高可用场景。
建议将 managed memory 设为0以保证将最大限度的内存分配给 JVM 上的用户代码。
与 EmbeddedRocksDBStateBackend 不同的是由于 HashMapStateBackend 将数据以对象形式存储在堆中因此重用这些对象数据是不安全的。
bEmbeddedRocksDBStateBackend
EmbeddedRocksDBStateBackend 将正在运行中的状态数据保存在 RocksDB 数据库中RocksDB 数据库默认将数据存储在 TaskManager 的数据目录不同于 HashMapStateBackend 中的 java 对象数据被以序列化字节数组的方式存储这种方式由序列化器决定因此 key 之间的比较是以字节序的形式进行而不是使用 Java 的 hashCode 或 equals() 方法。
EmbeddedRocksDBStateBackend 会使用异步的方式生成 snapshots。
EmbeddedRocksDBStateBackend 的局限
由于 RocksDB 的 JNI API 构建在 byte[] 数据结构之上, 所以每个 key 和 value 最大支持 2^31 字节RocksDB 合并操作的状态例如ListState累积数据量大小可以超过 2^31 字节会在下一次获取数据时失败这是当前 RocksDB JNI 的限制。
EmbeddedRocksDBStateBackend 的适用场景
状态非常大、窗口非常长、key/value 状态非常大的 Job。所有高可用的场景。
注意
保留的状态大小仅受磁盘空间的限制与状态存储在内存中的 HashMapStateBackend 相比EmbeddedRocksDBStateBackend 允许存储非常大的状态使用 EmbeddedRocksDBStateBackend 将会使应用程序的最大吞吐量降低所有的读写都必须序列化、反序列化比基于堆内存的 state backend 的效率要低很多因为需要序列化、反序列化重用放入 EmbeddedRocksDBStateBackend 的对象是安全的。
EmbeddedRocksDBStateBackend 是目前唯一支持增量 CheckPoint 的 State Backend。
可以使用 RocksDB 的本地指标(metrics)但默认是关闭的每个 slot 中的 RocksDB instance 的内存大小是有限制的。
3.选择合适的 State Backend
在选择 HashMapStateBackend 和 RocksDB 时是在性能与可扩展性之间权衡
HashMapStateBackend 非常快因为每个状态的读取和算子对于 objects 的更新都是在 Java 的 heap 上但是状态的大小受限于集群中可用的内存。RocksDB 可以根据可用的 disk 空间扩展并且只有它支持增量 snapshot然而每个状态的读取和更新都需要(反)序列化而且在 disk 上进行读操作的性能要比基于内存的 state backend 慢一个数量级。
在 Flink 1.13 版本中统一了 savepoints 的二进制格式可以生成 savepoint 并且之后使用另一种 state backend 读取它。
4.设置 State Backend
默认使用 jobmanager 做为 state backend每一个 Job 的 state backend 配置会覆盖默认的 state backend 配置。
设置每个 Job 的 State Backend
StreamExecutionEnvironment 可以对每个 Job 的 State Backend 进行设置如下所示
Configuration config new Configuration();
config.set(StateBackendOptions.STATE_BACKEND, hashmap);
env.configure(config);在 IDE 中使用 EmbeddedRocksDBStateBackend需要添加以下依赖到 Flink 项目中。
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-statebackend-rocksdb/artifactIdversion1.19.0/versionscopeprovided/scope
/dependency5.设置默认的全局的 State Backend
在 Flink 配置文件中通过键 state.backend.type 设置默认的 State Backend。
可选值包括 jobmanager (HashMapStateBackend)rocksdb (EmbeddedRocksDBStateBackend) 或使用实现了 state backend 工厂 StateBackendFactory 的类的全限定类名 例如 EmbeddedRocksDBStateBackend 对应为 org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackendFactory。
state.checkpoints.dir 选项指定了所有 State Backend 写 CheckPoint 数据和写元数据文件的目录。
配置文件的部分示例如下所示
# 存储 operator state 快照的 State Backendstate.backend: hashmap# 存储快照的目录state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints6.RocksDB State Backend 进阶
a增量快照
RocksDB 支持增量快照不同于产生一个包含所有数据的全量备份增量快照中只包含自上一次快照完成之后被修改的记录可以显著减少快照完成的耗时。
一个增量快照是基于通常多个前序快照构建的由于 RocksDB 内部存在 compaction 机制对 sst 文件进行合并Flink 的增量快照也会定期重新设立起点rebase因此增量链条不会一直增长旧快照包含的文件也会逐渐过期并被自动清理。
和基于全量快照的恢复时间相比如果网络带宽是瓶颈那么基于增量快照恢复可能会消耗更多时间因为增量快照包含的 sst 文件之间可能存在数据重叠导致需要下载的数据量变大而当 CPU 或者 IO 是瓶颈的时候基于增量快照恢复会更快因为从增量快照恢复不需要解析 Flink 的统一快照格式来重建本地的 RocksDB 数据表而是可以直接基于 sst 文件加载。
状态数据量很大时推荐使用增量快照但这并不是默认的快照机制需要通过配置手动开启该功能
在 Flink 配置文件中设置state.backend.incremental: true 或者在代码中按照右侧方式配置来覆盖默认配置EmbeddedRocksDBStateBackend backend new EmbeddedRocksDBStateBackend(true);
注意一旦启用了增量快照网页上展示的 Checkpointed Data Size 只代表增量上传的数据量而不是一次快照的完整数据量。
b内存管理
Flink 致力于控制整个进程的内存消耗以确保 Flink 任务管理器TaskManager有良好的内存使用保证既不会在容器Docker/Kubernetes, Yarn等环境中由于内存超用被杀掉也不会因为内存利用率过低导致不必要的数据落盘或是缓存命中率下降致使性能下降。
因此Flink 默认将 RocksDB 的可用内存配置为任务管理器的单槽per-slot托管内存量即大多数应用程序不需要调整 RocksDB 配置简单的增加 Flink 的托管内存即可改善内存相关性能问题。
也可以选择不使用 Flink 自带的内存管理而是手动为 RocksDB 的每个列族ColumnFamily分配内存每个算子的每个 state 都对应一个列族提供了对 RocksDB 进行更细粒度控制的途径但是需要自行保证总内存消耗不会超过尤其是容器环境的限制。
RocksDB 使用托管内存
默认打开可以通过 state.backend.rocksdb.memory.managed 控制。
Flink 并不直接控制 RocksDB 的 native 内存分配而是通过配置 RocksDB 来确保其使用的内存正好与 Flink 的托管内存预算相同这是在任务槽per-slot级别上完成的托管内存以任务槽为粒度计算。
为了设置 RocksDB 实例的总内存使用量Flink 对同一个任务槽上的所有 RocksDB 实例使用共享的 cache 以及 write buffer manager共享 cache 将对 RocksDB 中内存消耗的三个主要来源块缓存、索引和bloom过滤器、MemTables设置上限。
Flink还提供了两个参数来控制写路径MemTable和读路径索引及过滤器读缓存之间的内存分配当看到 RocksDB 由于缺少写缓冲内存频繁刷新或读缓存未命中而性能不佳时可以使用这些参数调整读写间的内存分配。
state.backend.rocksdb.memory.write-buffer-ratio默认值 0.5即 50% 的给定内存会分配给写缓冲区使用。state.backend.rocksdb.memory.high-prio-pool-ratio默认值 0.1即 10% 的 block cache 内存会优先分配给索引及过滤器强烈建议不要将此值设置为零以防止索引和过滤器被频繁踢出缓存而导致性能问题此外默认将L0级的过滤器和索引固定到缓存中以提高性能。
注意 上述机制开启时将覆盖用户在 PredefinedOptions 和 RocksDBOptionsFactory中对 block cache 和 write buffer 进行的配置。
注意 仅面向专业用户若要手动控制内存可以将 state.backend.rocksdb.memory.managed 设置为 false并通过 ColumnFamilyOptions配置 RocksDB或者复用上述 cache/write-buffer-manager 机制但将内存大小设置为与 Flink 的托管内存大小无关的固定大小通过 state.backend.rocksdb.memory.fixed-per-slot/state.backend.rocksdb.memory.fixed-per-tm 选项在这两种情况下用户都需要确保在 JVM 之外有足够的内存可供 RocksDB 使用。
c计时器内存 vs. RocksDB
当选择 RocksDB 作为 State Backend 时默认情况下计时器也存储在 RocksDB 中这是一种健壮且可扩展的方式允许应用程序使用很多个计时器。
另一方面在 RocksDB 中维护计时器会有一定的成本因此 Flink 也提供了将计时器存储在 JVM 堆上而使用 RocksDB 存储其他状态的选项当计时器数量较少时基于堆的计时器可以有更好的性能。
通过将 state.backend.rocksdb.timer-service.factory 配置项设置为 heap而不是默认的 rocksdb来将计时器存储在堆上。
注意 在 RocksDB state backend 中使用基于堆的计时器的组合当前不支持计时器状态的异步快照其他状态如 keyed state可以被异步快照。
d开启 RocksDB 原生监控指标
可以使用 Flink 的监控指标系统来汇报 RocksDB 的原生指标也可以选择性的指定特定指标进行汇报。 注意 启用 RocksDB 的原生指标可能会对应用程序的性能产生负面影响。 列族ColumnFamily级别的预定义选项
注意 在引入 RocksDB 使用托管内存 功能后此机制应限于在专家调优或故障处理中使用。
使用预定义选项可以在每个 RocksDB 列族上应用一些预定义的配置例如配置内存使用、线程、Compaction 设置等目前每个算子的每个状态都在 RocksDB 中有专门的一个列族存储。
选择要应用的预定义选项
通过 state.backend.rocksdb.predefined-options 配置项将选项名称设置进 Flink 配置文件。通过程序设置EmbeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM) 。
该选项的默认值是 DEFAULT 对应 PredefinedOptions.DEFAULT 。
从 Flink 配置文件中读取列族选项
RocksDB State Backend 会将【Advanced RocksDB State Backends Options】的所有配置项全部加载可以通过关闭 RocksDB 使用托管内存的功能并将需要的设置选项加入配置文件来配置底层的列族选项。
通过 RocksDBOptionsFactory 配置 RocksDB 选项
注意 在引入 RocksDB 使用托管内存功能后此机制应限于在专家调优或故障处理中使用。
通过配置一个 RocksDBOptionsFactory 来手动控制 RocksDB 的选项您可以对列族的设置进行细粒度控制例如内存使用、线程、Compaction 设置等目前每个算子的每个状态都在 RocksDB 中有专门的一个列族存储。
将 RocksDBOptionsFactory 传递给 RocksDB State Backend
通过 state.backend.rocksdb.options-factory 选项将工厂实现类的名称设置到 Flink 配置文件。通过程序设置例如 EmbeddedRocksDBStateBackend.setRocksDBOptions(new MyOptionsFactory()); 。
注意 通过程序设置的 RocksDBOptionsFactory 将覆盖 Flink 配置文件的设置且 RocksDBOptionsFactory 设置的优先级高于预定义选项PredefinedOptions。
注意 RocksDB 是一个本地库它直接从进程分配内存 而不是从JVM分配内存分配给 RocksDB 的任何内存都必须被考虑在内通常需要将这部分内存从任务管理器TaskManager的JVM堆中减去否则可能会导致JVM进程由于分配的内存超过申请值而被 YARN 等资源管理框架终止。
自定义 ConfigurableRocksDBOptionsFactory 示例 (需要将实现类全名设置到 state.backend.rocksdb.options-factory)
public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory {public static final ConfigOptionInteger BLOCK_RESTART_INTERVAL ConfigOptions.key(my.custom.rocksdb.block.restart-interval).intType().defaultValue(16).withDescription( Block restart interval. RocksDB has default block restart interval as 16. );private int blockRestartInterval BLOCK_RESTART_INTERVAL.defaultValue();Overridepublic DBOptions createDBOptions(DBOptions currentOptions,CollectionAutoCloseable handlesToClose) {return currentOptions.setIncreaseParallelism(4).setUseFsync(false);}Overridepublic ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions,CollectionAutoCloseable handlesToClose) {return currentOptions.setTableFormatConfig(new BlockBasedTableConfig().setBlockRestartInterval(blockRestartInterval));}Overridepublic RocksDBOptionsFactory configure(ReadableConfig configuration) {this.blockRestartInterval configuration.get(BLOCK_RESTART_INTERVAL);return this;}
}7.开启 Changelog
a介绍
Changelog 旨在减少 checkpointing 的时间也可以减少 exactly-once 模式下的端到端延迟。
一般 checkpoint 的持续时间受如下因素影响
Barrier 到达和对齐时间可以通过 Unaligned checkpoints 和 Buffer debloating 解决。快照制作时间所谓同步阶段可以通过异步快照解决。快照上传时间异步阶段可以用增量 checkpoints 来减少上传时间但是大多数支持增量 checkpoint 的状态后端会定期执行合并类型的操作这会导致除了新的变更之外还要重新上传旧状态在大规模部署中每次 checkpoint 中至少有一个 task 上传大量数据的可能性往往非常高。
开启 Changelog 功能之后Flink 会不断上传状态变更并形成 changelog创建 checkpoint 时只有 changelog 中的相关部分需要上传而配置的状态后端则会定期在后台进行快照快照成功上传后相关的changelog 将会被截断。
基于此异步阶段的持续时间减少另外因为不需要将数据刷新到磁盘同步阶段持续时间也减少了特别是长尾延迟得到了改善同时还可以获得以下好处
更稳定、更低的端到端时延。Failover 后数据重放更少。资源利用更加稳定。
但是资源使用会变得更高
将会在 DFS 上创建更多文件将使用更多的 IO 带宽用来上传状态变更将使用更多 CPU 资源来序列化状态变更Task Managers 将会使用更多内存来缓存状态变更
虽然 Changelog 增加了少量的日常 CPU 和网络带宽资源使用 但会降低峰值的 CPU 和网络带宽使用量。
恢复时间取决于 state.backend.changelog.periodic-materialize.interval 的设置changelog 可能会变得冗长因此重放会花费更多时间但是恢复时间加上 checkpoint 持续时间仍然可能低于不开启 changelog 功能的时间从而在故障恢复的情况下也能提供更低的端到端延迟当然取决于上述时间的实际比例有效恢复时间也有可能会增加。
b安装
标准的 Flink 发行版包含 Changelog 所需要的 JAR包请确保添加所需的文件系统插件。
c配置
YAML 中的示例配置
state.backend.changelog.enabled: true
state.backend.changelog.storage: filesystem # 当前只支持 filesystem 和 memory仅供测试用
dstl.dfs.base-path: s3://bucket-name # 类似于 state.checkpoints.dir请将如下配置保持默认值
execution.checkpointing.max-concurrent-checkpoints: 1通过编程方式为每个作业开启或关闭 Changelog
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
env.enableChangelogStateBackend(true);d监控
如果 task 因写状态变更而被反压它将在 UI 中被显示为忙碌红色。
e升级现有作业
开启 Changelog
支持从 savepoint 或 checkpoint 恢复
给定一个没有开启 Changelog 的作业创建一个 savepoint 或一个 checkpoint更改配置开启 Changelog从创建的 snapshot 恢复
关闭 Changelog
支持从 savepoint 或 checkpoint 恢复
给定一个开启 Changelog 的作业创建一个 savepoint 或一个 checkpoint更改配置关闭 Changelog从创建的 snapshot 恢复
f限制
最多同时创建一个 checkpoint到 Flink 1.15 为止只有 filesystem changelog 实现可用尚不支持 NO_CLAIM 模式
8.自旧版本迁移
a概述
从 Flink 1.13 开始社区改进了 state backend 的公开类帮助理解本地状态存储和 checkpoint 存储这个变化并不会影响 state backend 和 checkpointing 过程的运行时实现和机制用户可以将现有作业迁移到新的 API同时不会损失原有 state。
bMemoryStateBackend
旧版本的 MemoryStateBackend 等价于使用 HashMapStateBackend 和 JobManagerCheckpointStorage。
使用 Flink 配置文件
state.backend: hashmap# Optional, Flink will automatically default to JobManagerCheckpointStorage
# when no checkpoint directory is specified.
state.checkpoint-storage: jobmanager代码配置
Configuration config new Configuration();
config.set(StateBackendOptions.STATE_BACKEND, hashmap);
config.set(CheckpointingOptions.CHECKPOINT_STORAGE, jobmanager);
env.configure(config);cFsStateBackend
旧版本的 FsStateBackend 等价于使用 HashMapStateBackend 和 FileSystemCheckpointStorage。
使用 Flink 配置文件
state.backend: hashmap
state.checkpoints.dir: file:///checkpoint-dir/# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem代码配置
Configuration config new Configuration();
config.set(StateBackendOptions.STATE_BACKEND, hashmap);
config.set(CheckpointingOptions.CHECKPOINT_STORAGE, filesystem);
config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, file:///checkpoint-dir);
env.configure(config);// Advanced FsStateBackend configurations, such as write buffer size
// can be set manually by using CheckpointingOptions.
config.set(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, 4 * 1024);
env.configure(config);dRocksDBStateBackend
旧版本的 RocksDBStateBackend 等价于使用 EmbeddedRocksDBStateBackend 和 FileSystemCheckpointStorage。
使用 Flink 配置文件
state.backend: rocksdb
state.checkpoints.dir: file:///checkpoint-dir/# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem代码配置
Configuration config new Configuration();
config.set(StateBackendOptions.STATE_BACKEND, rocksdb);
config.set(CheckpointingOptions.CHECKPOINT_STORAGE, filesystem);
config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, file:///checkpoint-dir);
env.configure(config);// If you manually passed FsStateBackend into the RocksDBStateBackend constructor
// to specify advanced checkpointing configurations such as write buffer size,
// you can achieve the same results by using CheckpointingOptions.
config.set(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, 4 * 1024);
env.configure(config);9.总结
1.状态可以存储在 TM 的内存/rocksdb[文件系统]中进行 Checkpoint 时可以存储在 JM [内存]/文件系统中
2.开启 Changelog 可以减少 checkpointing 的时间也可以减少 exactly-once 模式下的端到端延迟。
3.开启 ChangelogStateBackend 限制[最多同时创建一个 checkpoint\只有 filesystem changelog 可用\不支持 NO_CLAIM 模式]
4.选择 HashMapStateBackend 和 RocksDB 时是在性能与可扩展性之间权衡
- HashMapStateBackend 非常快因为每个状态的读取和算子对于 objects 的更新都是在 Java 的 heap 上但是状态的大小受限于集群中可用的内存。
- RocksDB 可以根据可用的 disk 空间扩展并且只有它支持增量 snapshot但每个状态的读取和更新都需要(反)序列化而且在 disk 上进行读操作的性能要比基于内存的 state backend 慢一个数量级。