合肥做网站可以吗,家政服务网站开发的依据,外贸网站建设推广方案,wordpress农历插件简介#xff1a; 滴滴实时计算引擎从 Flink-1.4 无缝升级到 Flink-1.10 版本#xff0c;做到了完全对用户透明。并且在新版本的指标、调度、SQL 引擎等进行了一些优化#xff0c;在性能和易用性上相较旧版本都有很大提升。 一、 背景
在本次升级之前#xff0c;我们使用的… 简介 滴滴实时计算引擎从 Flink-1.4 无缝升级到 Flink-1.10 版本做到了完全对用户透明。并且在新版本的指标、调度、SQL 引擎等进行了一些优化在性能和易用性上相较旧版本都有很大提升。 一、 背景
在本次升级之前我们使用的主要版本为 Flink-1.4.2并且在社区版本上进行了一些增强提供了 StreamSQL 和低阶 API 两种服务形式。现有集群规模达到了 1500 台物理机运行任务数超过 12000 日均处理数据 3 万亿条左右。
不过随着社区的发展尤其是 Blink 合入 master 后有很多功能和架构上的升级我们希望能通过版本升级提供更好的流计算服务。今年 2 月份里程碑版本 Flink-1.10 发布我们开始在新版上上进行开发工作踏上了充满挑战的升级之路。
二、 Flink-1.10 新特性
作为 Flink 社区至今为止的最大的一次版本升级加入的新特性解决了之前遇到很多的痛点。
1. 原生 DDL 语法与 Catalog 支持
Flink SQL 原生支持了 DDL 语法比如 CREATE TABLE/CREATE FUNCTION可以使用 SQL 进行元数据的注册而不需要使用代码的方式。
也提供了 Catalog 的支持默认使用 InMemoryCatalog 将信息临时保存在内存中同时也提供了 HiveCatalog 可以与 HiveMetastore 进行集成。也可以通过自己拓展 Catalog 接口实现自定义的元数据管理。
2.Flink SQL 的增强
基于 ROW_NUMBER 实现的 TopN 和去重语法拓展了 StreamSQL 的使用场景。实现了 BinaryRow 类型作为内部数据交互将数据直接以二进制的方式构建而不是对象数组比如使用一条数据中的某个字段时可以只反序列其中部分数据减少了不必要的序列化开销。新增了大量内置函数例如字符串处理、FIRST/LAST_VALUE 等等由于不需要转换为外部类型相较于自定义函数效率更高。增加了 MiniBatch 优化通过微批的处理方式提升任务的吞吐
3.内存配置优化
之前对 Flink 内存的管理一直是一个比较头疼的问题尤其是在使用 RocksDB 时因为一个 TaskManager 中可能存在多个 RocksDB 实例不好估算内存使用量就导致经常发生内存超过限制被杀。
在新版上增加了一些内存配置例如 state.backend.rocksdb.memory.fixed-per-slot 可以轻松限制每个 slot的RocksDB 内存的使用上限避免了 OOM 的风险。
三、挑战与应对
本次升级最大的挑战是如何保证 StreamSQL 的兼容性。StreamSQL 的目的就是为了对用户屏蔽底层细节能够更加专注业务逻辑而我们可以通过版本升级甚至更换引擎来提供更好的服务。保证任务的平滑升级是最基本的要求。
1. 内部 patch 如何兼容
由于跨越多个版本架构差距巨大内部 patch 基本无法直接合入需要在新版本上重新实现。我们首先整理了所有的历史 commit筛选出那些必要的修改并且在新版上进行重新实现目的是能覆盖已有的所有功能确保新版本能支持现有的所有任务需求。
例如
新增或修改 Connectors 以支持公司内部需要例如 DDMQ滴滴开源消息队列产品权限认证功能等。新增 Formats 实现例如 binlog内部日志采集格式的解析等。增加 ADD JAR 语法可以在 SQL 任务中引用外部依赖比如 UDF JAR自定义 Source/Sink。增加 SET 语法可以在 SQL 中设置 TableConfig指导执行计划的生成
2. StreamSQL 语法兼容
社区在 1.4 版本时FlinkSQL还处于比较初始的阶段也没有原生的 DDL 语法支持我们使用 Antlr 实现了一套自定义的 DDL 语法。但是在 Flink1.10 版本上社区已经提供了原生的 DDL 支持而且与我们内部的语法差别较大。现在摆在我们面前有几条路可以选择
放弃内部语法的支持修改全部任务至新语法。违背了平滑迁移的初衷而且对已有用户学习成本高修改 Flink 内语法解析的模块sql-parser支持对内部语法的解析。实现较为复杂且不利于后续的版本升级在 sql-parser 之上封装一层语法转换层将原本的 SQL 解析提取有效信息后通过字符串拼接的方式组织成社区语法再运行。
最终我们选用了第三种方案这样可以最大限度的减少和引擎的耦合作为插件运行未来再有引擎升级完全可以复用现有的逻辑能够降低很多的开发成本。
例如我们在旧版本上使用 json-path 的库实现了 json 解析通过在建表语句里定义类似 $.status 的表达式表示如何提取此字段。 新版本上原生的 json 类型解析可以使用 ROW 类型来表示嵌套结构在转换为新语法的过程中将原本的表达是解析为树并构建出新的字段类型再使用计算列的方式提取出原始表中的字段确保表结构与之前一致。类型名称、配置属性也通过映射转换为社区语法。 3. 兼容性测试
最后是测试阶段需要进行完善的测试确保所有任务都能做到平滑升级。我们原本的计划是准备进行回归测试对已有的所有任务替换配置后进行回放但是在实际操作中有很多问题
测试流程过长一次运行可能需要数个小时。出现问题时不好定位可能发生在任务的整个生命周期的任何阶段。无法验证计算结果即新旧版本语义是否一致
所以我们按任务的提交流程分成多个阶段进行测试只有在当前阶段能够全部测试通过后后进入下一个阶段测试提前发现问题将问题定位范围缩小到当前阶段提高测试效率。 转换测试对所有任务进行转换测试结果符合预期抽象典型场景为单元测试。编译测试确保所有任务可以通过 TablePlanner 生成执行计划在编译成 JobGraph真正提交运行前结束。回归测试在测试环境对任务替换配置后进行回放确认任务可以提交运行对照测试对采样数据以文件的形式提交至新旧两个版本中运行对比结果是否完全一致因为部分任务结果不具有确定性所以使用旧版本连续运行 2 次筛选出确定性任务作为测试用例
四、引擎增强
除了对旧版本的兼容我们也结合了新版本的特性对引擎进行了增强。
1. Task-Load 指标
我们一直希望能精确衡量任务的负载状况使用反压指标指标只能粗略的判断任务的资源够或者不够。
结合新版的 Mailbox 线程模型所有互斥操作全部运行在 TaskThread 中只需统计出线程的占用时间就可以精确计算任务负载的百分比。
未来可以使用指标进行任务的资源推荐让任务负载维持在一个比较健康的水平。 2. SubTask 均衡调度
在 FLIP-6 后Flink 修改了资源调度模型移除了--container 参数slot 按需申请确保不会有闲置资源。但是这也导致了一个问题Source 的并发数常常是小于最大并发数的而 SubTask 调度是按 DAG 的拓扑顺序调度这样 SourceTask 就会集中在某些 TaskManager 中导致热点。 我们加入了最小 slot 数的配置保证在 Flink session 启动后立即申请相应数量的 slot且闲置时也不主动退出搭配 cluster.evenly-spread-out-slots 参数可以保证在 slot 数充足的情况下SubTask 会均匀分布在所有的 TaskManager 上。 3. 窗口函数增强
以滚动窗口为例 TUMBLE(time_attr, INTERVAL 1 DAY)窗口为一天时开始和结束时间固定为每天 0 点 -24 点无法做到生产每天 12 点-次日 12 点的窗口。
对于代码可以通过指定偏移量实现但是 SQL 目前还未实现通过增加参数 TUMBLE(time_attr, INTERVAL 1 DAY, TIME 12:00:00) 表示偏移时间为 12 小时。
还有另外一种场景比如统计一天的 UV同时希望展示当前时刻的计算结果例如每分钟触发窗口计算。对于代码开发的方式可以通过自定义 Trigger 的方式决定窗口的触发逻辑而且 Flink 也内置了一些 Tigger 实现比如 ContinuousTimeTrigger 就很适合这种场景。所以我们又在窗口函数里增加了一种可选参数代表窗口的触发周期TUMBLE(time_attr, INTERVAL 1 DAY, INTERVAL 1 MINUTES) 。
通过增加 offset 和 tiggger 周期参数TUMBLE(time_attr, size[,offset_time][,trigger_interval])拓展了 SQL 中窗口的使用场景类似上面的场景可以直接使用 SQL 开发而不需要使用代码的方式。
4. RexCall 结果复用
在很多 SQL 的使用场景里会多次使用上一个计算结果比如将 JSON 解析成 Map 并提取多个字段 。 虽然通过子查询看起来 json 解析只调用一次但是经过引擎的优化后通过结果表的投影 (Projection) 生成函数调用链 (RexCall)结果类似 这样会导致 json 解析的计算重复运行了3次即使使用视图分割成两步操作经过 Planner 的优化一样会变成上边的样子。
对于确定性 (isDeterministictrue) 的函数来说相同的输入一定代表相同的结果重复执行 3 次 json 解析其实是没有意义的如何优化才能实现对函数结果的复用呢
在代码生成时将 RexCall 生成的唯一标识Digest和变量符号的映射保存在 CodeGenContext 中如果遇到 Digest 相同的函数调用则可以复用已经存在的结果变量这样解析 JSON 只需要执行第一次之后就可以复用第一次的结果。
五、总结
通过几个月的努力新版本已经上线运行,并且作为 StreamSQL 的默认引擎任务重启后直接使用新版本运行。兼容性测试的通过率达到 99.9%可以基本做到对用户的透明升级。对于新接触 StreamSQL 用户可以使用社区 SQL 语法进行开发已有任务也可以修改 DML 部分语句来使用新特性。现在新版本已经支持了公司内许多业务场景例如公司实时数据仓库团队依托于新版本更强的表达能力和性能承接了多种多样的数据需求做到稳定运行且与离线口径保持一致。
版本升级不是我们的终点随着实时计算的发展公司内也有越来越多团队需要使用 Flink 引擎, 也向我们提出了更多的挑战例如与 Hive 的整合做到将结果直接写入 Hive 或直接使用 Flink 作为批处理引擎这些也是我们探索和发展的方向通过不断的迭代向用户提供更加简单好用的流计算服务。
五、总结
通过几个月的努力新版本已经上线运行,并且作为 StreamSQL 的默认引擎任务重启后直接使用新版本运行。兼容性测试的通过率达到 99.9%可以基本做到对用户的透明升级。对于新接触 StreamSQL 用户可以使用社区 SQL 语法进行开发已有任务也可以修改 DML 部分语句来使用新特性。现在新版本已经支持了公司内许多业务场景例如公司实时数据仓库团队依托于新版本更强的表达能力和性能承接了多种多样的数据需求做到稳定运行且与离线口径保持一致。
版本升级不是我们的终点随着实时计算的发展公司内也有越来越多团队需要使用 Flink 引擎, 也向我们提出了更多的挑战例如与 Hive 的整合做到将结果直接写入 Hive 或直接使用 Flink 作为批处理引擎这些也是我们探索和发展的方向通过不断的迭代向用户提供更加简单好用的流计算服务。
作者Alan
原文链接
本文为阿里云原创内容未经允许不得转载