云主机怎么安装网站,南京企业官网建设,管理咨询人员的基本素质,佛山市专注网站建设报价星光下的赶路人star的个人主页 大鹏一日同风起#xff0c;扶摇直上九万里 文章目录 1、状态后端#xff08;State Backends#xff09;1.1 状态后端的分类#xff08;HashMapStateBackend/RocksDB#xff09;1.2 如何选择正确的状态后端1.3 状态后端的配置 1、状态后端扶摇直上九万里 文章目录 1、状态后端State Backends1.1 状态后端的分类HashMapStateBackend/RocksDB1.2 如何选择正确的状态后端1.3 状态后端的配置 1、状态后端State Backends
在Flink中状态的存储、访问以及维护都是由一个可插拔的组件决定的这个·组件就叫状态后端state backend。状态后端主要负责管理本地状态的储存方式和位置。
1.1 状态后端的分类HashMapStateBackend/RocksDB
状态后端是一个“开箱即用”的组件可以在不改变应用程序逻辑的情况下独立配置。Flink中提供了两类不同的状态后端一种是“哈希表状态后端”HashMapStateBackend另一种是“内嵌RocksDB状态后端”EmbeddedRocksDBStateBackend。如果没有特别配置系统默认的状态后端是HashMapStateBackend。
1哈希表状态后端HashMapStateBackend HashMapStateBackend是把状态存放在内存里。具体实现上哈希表状态后端在内部会直接把状态当作对象objects保存在Taskmanager的JVM堆上。普通的状态以及窗口中收集的数据和触发器都会以键值对的形式存储起来所以底层是一个哈希表HashMap这种状态后端也因此得名。
2内嵌RocksDB状态后端EmbeddedRocksDBStateBackend RocksDB是一种内嵌的key-value存储介质可以把数据持久化到本地硬盘。配置EmbeddedRocksDBStateBackend后会将处理中的数据全部放入RocksDB数据库中RocksDB默认存储在TaskManager的本地数据目录里。
RocksDB的状态数据被存储为序列化的字节数组读写操作需要序列化/反序列化因此状态的访问性能要差一些。另外因为做了序列化key的比较也会按照字节进行而不是直接调用.hashCode()和.equals()方法。 EmbeddedRocksDBStateBackend始终执行的是异步快照所以不会因为保存检查点而阻塞数据的处理而且它还提供了增量式保存检查点的机制这在很多情况下可以大大提升保存效率。
1.2 如何选择正确的状态后端
HashMap和RocksDB两种状态后端最大的区别就在于本地状态存放在哪里。
HashMapStateBackend是内存计算读写速度非常快但是状态的大小会受到集群可用内存的限制如果应用的状态随着时间不停地增长就会耗尽内存资源。
而RocksDB是硬盘存储所以可以根据可用的磁盘空间进行扩展所以它非常适合于超级海量状态的存储。不过由于每个状态的读写都需要做序列化/反序列化而且可能需要直接从磁盘读取数据这就会导致性能的降低平均读写性能要比HashMapStateBackend慢一个数量级。
1.3 状态后端的配置
在不做配置的时候应用程序使用的默认状态后端是由集群配置文件flink-conf.yaml中指定的配置的键名称为state.backend。这个默认配置对集群上运行的所有作业都有效我们可以通过更改配置值来改变默认的状态后端。另外我们还可以在代码中为当前作业单独配置状态后端这个配置会覆盖掉集群配置文件的默认值。
1配置默认的状态后端 在flink-conf.yaml中可以使用state.backend来配置默认状态后端。 配置项的可能值为hashmap这样配置的就是HashMapStateBackend如果配置项的值是rocksdb这样配置的就是EmbeddedRocksDBStateBackend。
下面是一个配置HashMapStateBackend的例子
# 默认状态后端
state.backend: hashmap# 存放检查点的文件路径
state.checkpoints.dir: hdfs://hadoop102:8020/flink/checkpoints这里的state.checkpoints.dir配置项定义了检查点和元数据写入的目录。
2为每个作业Per-job/Application单独配置状态后端 通过执行环境设置HashMapStateBackend。
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setStateBackend(new HashMapStateBackend());通过执行环境设置EmbeddedRocksDBStateBackend。
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setStateBackend(new EmbeddedRocksDBStateBackend());需要注意如果想在IDE中使用EmbeddedRocksDBStateBackend需要为Flink项目添加依赖
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-statebackend-rocksdb/artifactIdversion${flink.version}/version
/dependency而由于Flink发行版中默认就包含了RocksDB(服务器上解压的Flink)所以只要我们的代码中没有使用RocksDB的相关内容就不需要引入这个依赖。 您的支持是我创作的无限动力 希望我能为您的未来尽绵薄之力 如有错误谢谢指正若有收获谢谢赞美