山东有实力的网站开发多少钱,企业法治建设工作报告,wordpress科技公司主题,住房建设厅网站1、背景
Hudi程序中upsert操作频繁#xff0c;过多的删除和回滚操作,导致集群RPC持续偏高
2、描述
hudi采用的是mvcc设计#xff0c;提供了清理工具cleaner来把旧版本的文件分片删除#xff0c;默认开启了清理功能#xff0c;可以防止文件系统的存储空间和文件数量的无限…1、背景
Hudi程序中upsert操作频繁过多的删除和回滚操作,导致集群RPC持续偏高
2、描述
hudi采用的是mvcc设计提供了清理工具cleaner来把旧版本的文件分片删除默认开启了清理功能可以防止文件系统的存储空间和文件数量的无限增长。
3、清理保留策略
清理旧文件需要考虑数据查询的情况有些长查询会占用着旧版本的文件需要设置合适的清理策略来保留一定数量的commit或者文件版本以提高系统的容错性
KEEP_LATEST_COMMITS默认策略表示保留最后n次提交默认为10通过参数hoodie.cleaner.commits.retained或clean.retain_commits(flink)设置KEEP_LATEST_FILE_VERSIONS保留最后n个文件版本默认为3通过参数hoodie.cleaner.fileversions.retained设置KEEP_LATEST_BY_HOURS保留最后n小时默认24小时通过参数hoodie.cleaner.hours.retained设置这是0.11版本后新增的
4、 清理触发策略
目前仅支持一种触发清理的策略CleaningTriggerStrategy#NUM_COMMITS即根据提交的次数默认为1可以通过设置参数hoodie.clean.max.commits进行修改在flink job的每次checkpoint时都会进行触发策略的条件判断所以在两次chekpoint之间发生过1次或n次提交都会触发清理动作。
5、清理流程分析
5.1、清理器初始化
清理逻辑是被包装成一个flink sink在HoodieTableSink#getSinkRuntimeProvider中进行初始化
如果是mor表且开启了异步合并(compaction.async.enabled)则创建CompactionCommitSink继承了CleanFunction所以包含了清理逻辑这是由于SQL API中一个SinkRuntimeProvider不支持多个sink. 否则直接将CleanFunction作为sink这种情况必需启用异步清理配置clean.async.enabled因为CleanFunction的主要方法都判断了是否为异步清理。
5.2、清理启动入口
compact成功后同步清理 需要满足条件1mor表2启用异步合并compaction.async.enabled3禁用异步清理clean.async.enabled。入代码在CompactionCommitSink#doCommit中
if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {this.writeClient.clean();
}
checkpoint时异步清理 需要满足条件1非mor表或启用异步合并compaction.async.enabled2启用异步清理clean.async.enabled。入口代码在CleanFunction#snapshotState中
if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) !isCleaning) {this.writeClient.startAsyncCleaning();this.isCleaning true;
} 6、清理逻辑执行
清理逻辑的流程主要包含有三个步骤生成清理计划、刷新ActiveTimeline、执行清理计划
如果处理的instant状态为requested需要先转换为inflight状态(生成xxx.clean.inflight文件)表示开始清理。执行清理clean(context, cleanerPlan)根据清理计划的数据进行文件删除即可首先删除每个分区下需要清理的文件然后删除需清理的分区目录最后收集统计数据返回。清理成功后将infight状态转换为completed状态表示清理完成。 参考
All Configurations | Apache Hudi
hudi系列-旧文件清理clean-天翼云开发者社区 - 天翼云 (ctyun.cn)