如可建设淘宝链接网站,电子商务网站建设与管理a,大连在哪儿,wordpress表单提交 阿里云邮箱简介#xff1a; 顺丰基于 Flink 建设实时数仓的思路#xff0c;引入 Hudi On Flink 加速数仓宽表#xff0c;以及实时数仓平台化建设的实践。 本⽂由社区志愿者苗文婷整理#xff0c;内容源⾃顺丰科技大数据平台研发工程师龙逸尘在 Flink Forward Asia 2020 分享的《Flink…简介 顺丰基于 Flink 建设实时数仓的思路引入 Hudi On Flink 加速数仓宽表以及实时数仓平台化建设的实践。 本⽂由社区志愿者苗文婷整理内容源⾃顺丰科技大数据平台研发工程师龙逸尘在 Flink Forward Asia 2020 分享的《Flink 在顺丰的应用实践》主要分享内容为顺丰基于 Flink 建设实时数仓的思路引入 Hudi On Flink 加速数仓宽表以及实时数仓平台化建设的实践。分为以下 5 个部分 建设背景建设思路落地实践应用案例未来规划一、建设背景 顺丰是国内领先的快递物流综合服务商经过多年的发展顺丰使用大数据技术支持高质量的物流服务。以下是一票快件的流转过程可以看到从客户下单到最终客户收件的整个过程是非常长的其中涉及的一些处理逻辑也比较复杂。为了应对复杂业务的挑战顺丰进行了数据仓库的探索。 传统数仓主要分为离线和实时两个部分。
离线部分以固定的计算逻辑通过定时调度完成数据抽取清洗计算最后产出报表而实时部分则是需求驱动的用户需要什么就马上着手开发。
这种数仓架构在数据量小、对实时性要求不高的情况下运行得很好。然而随着业务的发展数据规模的扩大和实时需求的不断增长传统数仓的缺点也被放大了。
从业务指标的开发效率来看
实时指标采用的是需求驱动的、纵向烟囱式的开发模式需要用户手写 Flink 任务进行开发这种开发方式效率低门槛高输出的指标很难统一管理与复用。
从技术架构方面来看
离线和实时两套架构是不统一的开发方式、运维方式、元数据方面都存在差异。传统架构整体还是以离线为主实时为辅依赖离线 T1 调度导出报表这些调度任务通常都运行在凌晨导致凌晨时集群压力激增可能会导致报表的产出不稳定如果重要的报表产出有延迟相应的下游的报表产出也会出现延迟。这种以离线为主的架构无法满足精细化、实时化运营的需要。
从平台管理的角度来看
传统数仓的实时指标开发是比较粗放的没有 Schema 的规范没有元数据的管理也没有打通实时和离线数据之间的联系。
为了解决传统数仓的问题顺丰开始了实时数仓的探索。实时数仓和离线数仓实际上解决的都是相同的业务问题最大的区别就在于时效性。
离线数仓有小时级或天级的延迟而实时数仓则是秒级或分钟级的延迟。
其他特性比如数据源、数据存储以及开发方式都是比较相近的。因此我们希望
用户能从传统数仓平滑迁移到实时数仓保持良好的体验同时统一实时和离线架构加快数据产出减少开发的撕裂感加强平台治理降低用户使用门槛提高开发效率也是我们的目标。
二、建设思路
经过总结我们提炼出以下 3 个实时数仓的建设思路。首先是通过统一数仓标准、元数据以及开发流程使得用户达到开发体验上的批流统一。随后引入 Hudi 加速数仓宽表基于 Flink SQL 建设我们的实时数仓。最后是加强平台治理进行数仓平台化建设实现数据统一接入、统一开发、以及统一的元数据管理。 1. 批流统一的实时数仓
建设批流统一的实时数仓可以分为以下 3 个阶段 1.1 统一数仓规范 首先无规矩不成方圆建设数仓必须有统一的数仓规范。统一的数仓规范包括以下几个部分
设计规范命名规范模型规范开发规范存储规范流程规范
统一好数仓规范之后开始数仓层级的划分将实时和离线统一规划数仓层级分为 ODS、DWD、DWS、ADS 层。 1.2 统一元数据
基于以上统一的数仓规范和层级划分模型可以将实时和离线的元数据进行统一管理。下游的数据治理过程比如数据字典、数据血缘、数据质量、权限管理等都可以达到统一。这种统一可以沉淀实时数仓的建设成果使数仓能更好的落地实施。 1.3 基于 SQL 统一开发流程 开发人员都知道使用 DataStream API 开发 Flink 任务是比较复杂的。在数据量比较大的情况下如果用户使用 API 不规范或者开发能力不足可能会导致性能和稳定性的问题。如果我们能将实时开发的过程统一到 SQL 上就可以达到减少用户开发成本、学习成本以及运维成本的目的。
之前提到过我们已经统一了实时和离线的元数据那么就可以将上图左边的异构数据源和数据存储抽象成统一的 Table 然后使用 SQL 进行统一的数仓开发也就是将离线批处理、实时流处理以及 OLAP 查询统一 SQL 化。
1.4 实时数仓方案对比
完成了数仓规范、元数据、开发流程的统一之后我们开始探索数仓架构的具体架构方案。业界目前的主流是 Lambda 架构和 Kappa 架构。
Lambda 架构
Lambda 架构是在原有离线数仓的基础上将对实时性要求比较高的部分剥离出来增加了一个实时速度层。Lambda 架构的缺点是需要维护实时和离线两套架构和两套开发逻辑维护成本比较高另外两套架构带来的资源消耗也是比较大的。 Kappa 架构
为了应对 Lambda 架构的缺陷Jay Kreps 提出了 Kappa 架构Kappa 架构移除了原有的离线部分使用纯流式引擎开发。 Kappa 架构的最大问题是流数据重放处理时的吞吐能力达不到批处理的级别导致重放时产生一定的延时。 实时数仓方案对比与实际需求
在真实的生产实践中并不是一定要严格遵循规范的 Lambda 架构或 Kappa 架构可以是两者的混合。比如大部分指标使用流式引擎开发少部分重要的指标使用批处理开发并增加数据校对的过程。
在顺丰的业务场景中并非所有用户都需要纯实时的表许多用户的报表还是依赖离线 T1 调度产出的宽表如果我们能够加速宽表的产出那么其他报表的时效性也能相应地得到提高。
另外这个离线 T1 调度产出的宽表需要聚合 45 天内多个数据源的全量数据不管是 Lambda 架构还是 Kappa 架构都需要对数据进行全量聚合如果能够直接更新宽表就可以避免全量重新计算大大降低资源消耗和延时。 2. 引入 Hudi 加速宽表
之前说过维护 Lambda 架构的复杂性在于需要同时维护实时和离线两套系统架构。而对于这个缺点我们可以通过批流统一来克服。
经过权衡我们决定改造原有 Lambda 架构通过加速它的离线部分来建设数仓宽表。此时就需要一个工具来实时快速的更新和删除 Hive 表支持 ACID 特性支持历史数据的重放。基于这样的需求我们调研了市面上的三款开源组件Delta Lake、Iceberg、Hudi最后选择 Hudi 来加速宽表。
2.1 Hudi 关键特性 Hudi 的关键特性包括可回溯历史数据支持在大规模数据集中根据主键更新删除数据支持数据增量消费支持 HDFS 小文件压缩。这些特性恰好能满足我们的需求。
2.2 引入 Hudi 加速宽表 引入 Hudi 有两种方式加速数仓。首先在 ODS 层引入 Hudi 实现实时数据接入将 ODS 层 T1 的全量数据抽取改成 T0 的实时接入从数据源头实现 Hive 表的加速。
另外使用 Flink 消费 Kafka 中接入的数据进行清洗聚合通过 Hudi 增量更新 DWD 层的 Hive 宽表将宽表从离线加速成准实时。
2.3 构建实时数仓宽表示例 这里通过一个例子介绍如何构建实时数仓宽表。
假设运单宽表由运单表订单表和用户表组成分别包含运单号、运单状态、订单号、订单状态、用户 ID、用户名等字段。
首先将运单表数据插入宽表运单号作为宽表主键并且将运单号和订单号的映射存入临时表。当订单表数据更新后首先关联用户维表获取用户名再从临时表中获取对应运单号。最后根据运单号将订单表数据增量插入宽表以更新宽表状态。
3. 最终架构
引入 Hudi 后基于 Lambda 架构我们定制化的实时数仓最终架构如下图所示。实时速度层通过 CDC 接入数据到 Kafka采用 Flink SQL 处理 Kafka 中的数据并将 ODS 层 Kafka 数据清洗计算后通过 Hudi 准实时更新 DWD 层的宽表以加速宽表的产出。离线层采用 Hive 存储及处理。最后由 ADS 层提供统一的数据存储与服务。 除了制定数仓标准和构建数仓架构我们还需要构建数仓平台来约束开发规范和流程提升开发效率提高用户体验。
站在数据开发人员的角度我们不仅要提供快速的数据接入能力还需要关注开发效率以及统一的元数据治理。因此可以基于 Table 和 SQL 抽象对数据接入、数据开发、元数据管理这三个主要功能进行平台化为实时数仓用户提供统一、便捷、高效的体验。 三、落地实践
1. Hudi On Flink
顺丰是最早将 Hudi On Flink 引入生产实践的公司顺丰内部使用版本基于 T3 出行的内部分支进行了许多修改和完善大大提升了 Hudi on Flink 的性能和稳定性。
1.1 实现原理 这里介绍下 Hudi On Flink 的原理。Hudi 原先与 Spark 强绑定它的写操作本质上是批处理的过程。为了解耦 Spark 并且统一 API Hudi On Flink 采用的是在 Checkpoint 期间攒批的机制在 Checkpoint 触发时将这一批数据Upsert 到 Hive根据 Upsert 结果统一提交或回滚。
Hudi On Flink 的实现流可以分解为几个步骤
首先使用 Flink 消费 Kafka 中的 Binlog 类型数据将其转化为 Hudi Record。Hudi Record 进入 InstantTime Generator该 Operator 并不对数据做任何处理只负责转发数据。它的作用是每次 Checkpoint 时在 Hudi 的 Timeline 上生成全局唯一且递增的 Instant并下发。随后数据进入 Partitioner 根据分区路径以及主键进行二级分区。分区后数据进入 File Indexer 根据主键找到在 HDFS 上需要更新的对应文件将这个对应关系按文件 id 进行分桶并下发到下游的 WriteProcessOperator 。WriteProcessOperator 在 Checkpoint 期间会积攒一批数据当 Checkpoint 触发时通过 Hudi 的 Client 将这批数据 Upsert 到 HDFS 中并且将 Upsert 的结果下发到下游的 CommitSink 。CommitSink 会收集上游所有算子的 upsert 结果如果成功的个数和上游算子的并行度相等时就认为本次 commit 成功并将 Instant 的状态设置为 success 否则就认为本次 commit 失败并进行回滚。
1.2 优化 顺丰基于社区代码对 Hudi On Flink 进行了一些优化主要目的是增强性能和提升稳定性。
二级分区
对于增量写入的场景大部分的数据都写入当天的分区可能会导致数据倾斜。因此我们使用分区路径和主键 id 实现二级分区避免攒批过程中单个分区数据过多解决数据倾斜问题。
文件索引
Hudi 写入过程的瓶颈在于如何快速找到记录要写入的文件并更新。为此 Hudi 提供了一套索引机制该机制会将一个记录的键 分区路径的组合映射到一个文件 ID. 这个映射关系一旦记录被写入文件组就不会再改变。Hudi 当前提供了 HBase、Bloom Filter 和内存索引 3 种索引机制。然而经过生产实践HBase 索引需要依赖外部的组件内存索引可能存在 OOM 的问题Bloom Filter 存在一定的误算率。我们研究发现在 Hudi 写入的 parquet 文件中存在一个隐藏的列通过读取这个列可以拿到文件中所有数据的主键因此可以通过文件索引获取到数据需要写入的文件路径并保存到 Flink 算子的 state 中也避免了外部依赖和 OOM 的问题。
索引写入分离
原先 Hudi 的 Upsert 过程写入和索引的过程是在一个算子中的算子的并行度只由分区路径来决定。我们将索引和写入的过程进行分离这样可以提高 Upsert 算子的并行度提高写入的吞吐量。
故障恢复
最后我们将整个流程的状态保存到 Flink State 中设计了一套基于 State 的故障恢复机制可以保证端到端的 exactly-once 语义。
2. 实时数仓的产品化
在实时数仓产品化方面我们也做了一些工作。提供了包括数据接入、元数据管理、数据处理在内的数仓开发套件。 2.1 实时数据接入 实时数据接入采用的是表单式的流程接入方式屏蔽了复杂的底层技术用户只需要经过简单的操作就可以将外部数据源接入到数仓体系。以 MySQL 为例用户只需要选择 MySQL 数据源平台就会自动抽取并展示 Schema 用户确认 Schema 之后就会将 Schema 插入到平台元数据中。 随后用户选择有权限的集群设置 Hive 表的主键 ID 和分区字段提交申请之后平台就会自动生成 Flink 任务抽取数据到 Kafka 并自动落入 Hive 表中。对数据库类型的数据源还支持分库分表功能将分库分表的业务数据写入 ODS 层的同一张表。另外也支持采集主从同步的数据库从从库中查询存量数据主库拉取 Binlog在减轻主库压力的同时降低数据同步延迟。
2.2 实时元数据更新 实时元数据更新的过程还是以 MySQL 为例。CDC Source 会抽取数据库中的 Binlog 区分 DDL 和 DML 语句分别处理DDL 语句会上报到元数据中心DML 语句经过转化变成 avro 格式的 Binlog 数据发送到 Kafka 如果下游有写入到 Hive 的需求就消费 Kafka 的数据通过 Hudi Sink 写入到 Hive 。
2.3 数据资产管理体系 基于实时数据的统一接入并将其与现有的离线数仓结合我们构建了数据资产管理体系。包括规范数仓标准统一管理元数据提升数据质量保障数据安全盘点数据资产。
3. 实时计算平台架构 有了数据统一接入的基础和数据资产资产管理体系的保驾护航我们还需要一个数据开发套件将整个数据开发的过程整合到实时计算平台。实时计算平台的最底层是数据接入层支持 Kafka 和 Binlog 等数据源。上一层是数据存储层提供了 Kafka 、ES、HBase、Hive、ClickHouse、MySQL 等存储组件。支持 JStorm 、Spark Streaming、Flink 计算引擎。并进行了框架封装和公共组件打包。
3.1 多种开发模式 - JAR DRAG 实时计算平台提供了多种开发模式供不同用户选择。以 Flink 为例Flink JAR 模式由用户编写 Flink 任务代码打成 jar 包上传到平台满足高级用户的需求。Flink DRAG 模式则是图形化的拖拽式开发由平台封装好公共组件之后用户只需要拖拽公共组件将其组装成一个 Flink 任务提交至集群运行。
3.2 多种开发模式 - SQL 实时计算平台同样提供 SQL 开发模式支持手动建表根据元数据自动识别表及设置表属性。支持创建 UDF、自动识别 UDF、执行 DML 等。
3.3 任务管控 在任务管控方面实时计算平台尽量简化任务的配置屏蔽了一些复杂的配置。用户开发完成之后只需要选择集群填写资源就能将任务提交到集群中运行。对每个任务平台还提供了历史版本控制能力。 当用户操作任务时平台会自动解析任务的配置根据不同的组件提供不同的选项。比如选择了 Kafka 数据源启动的时候可以选择从上次消费位置、最早位置、最新位置或指定位置启动。 任务恢复方面用户可以选择从 Savepoint 启动已停止的 Flink 任务便于快速恢复历史状态。
3.4 任务运维 对实时任务来说任务运维是一个难点也是一个痛点。平台提供了日志查询功能采集历史的启动日志和任务运行日志用户可以方便的进行对比和查询。
当任务启动之后平台会自动采集并上报任务的指标用户可以根据这些指标自定义告警配置当告警规则被触发时平台会通过各种方式告警到用户。最后平台提供了指标的实时监控看板当然用户也可以自行在 Grafana 中配置监控看板。
通过采集日志、指标以及监控告警以及过往的历史经验我们实现了一个智能的机器客服可以实现任务故障的一些自助诊断。这些举措大大降低了任务的运维成本减轻平台研发人员的压力。
3.5 Flink 任务稳定性保障 实时作业运维最关注的是稳定性在保障 Flink 任务稳定性上我们也有一些实践。首先提供多种异常检测和监控告警的功能方便用户快速的发现问题。每个任务都会定时的生成任务快照保存任务历史的 Savepoint以方便任务回滚和故障恢复。任务可能会由于某种异常原因导致任务失败任务失败之后会被平台重新拉新并指定从上次失败的位置开始重新消费。
基于 Zookeeper 的高可用机制以保障 JobManager 的可用性。支持多集群、多机房的容灾切换能力可以将任务一键切换至容灾集群上运行。实现了一套实时离线集群隔离、队列管理的资源隔离系统。
四、应用案例 以业务宽表计算为例需要获取 45 天内的多个数据源的数据进行计算聚合。如果使用离线数仓大概需要 3000 核的 CPU、12000G 的内存耗时 120 150 min 完成计算处理的数据量大概为 450T。如果使用实时数仓大概需要 2500 核的 CPU、1400G 的内存更新宽表大概有 25 min 的延时处理的数据量约为 18T。
五、未来规划
顺丰的实时数仓建设取得了一些成果但未来仍需要进行不断的优化。
1. 增强 SQL 能力 首先希望能够支持更多 SQL 的语法和特性支持更多可用的连接器以及实现 SQL 任务的自动调优等。
2. 精细化资源管理 其次基于 Flink On Kubernets 、任务的自动弹性扩缩容Task 级别的细粒度资源调度实现精细化的资源调度管理使得 Flink 任务达到全面的弹性化和云原生化。
3. 流批一体 最后希望能够实现流批一体通过统一的高度兼容性的 SQL 经过 SQL 解析以及引擎的适配通过 Flink 统一的引擎去处理流和批。
原文链接
本文为阿里云原创内容未经允许不得转载。