linux主机做网站,wordpress 页面 404,成都网站建设开发,网站建设引流刘贺稳1简介#xff1a;阿里云EMR 自2020年推出 Remote Shuffle Service(RSS)以来#xff0c;帮助了诸多客户解决 Spark 作业的性能、稳定性问题#xff0c;并使得存算分离架构得以实施。为了更方便大家使用和扩展#xff0c;RSS 在2022年初开源(https://github.com/alibaba/Remot…简介阿里云EMR 自2020年推出 Remote Shuffle Service(RSS)以来帮助了诸多客户解决 Spark 作业的性能、稳定性问题并使得存算分离架构得以实施。为了更方便大家使用和扩展RSS 在2022年初开源(https://github.com/alibaba/RemoteShuffleService)欢迎各路开发者共建: )
阿里云RemoteShuffleService 新功能AQE 和流控
阿里云EMR 自2020年推出 Remote Shuffle Service(RSS) 以来帮助了诸多客户解决 Spark 作业的性能、稳定性问题并使得存算分离架构得以实施。为了更方便大家使用和扩展RSS 在2022年初开源(https://github.com/alibaba/RemoteShuffleService)欢迎各路开发者共建: ) RSS的整体架构请参考[1]本文将介绍 RSS 最新的两个重要功能支持 Adaptive Query Execution(AQE)以及流控。
RSS 支持 AQE
AQE 简介
自适应执行(Adaptive Query Execution AQE)是 Spark3 的重要功能[2]通过收集运行时 Stats来动态调整后续的执行计划从而解决由于 Optimizer 无法准确预估 Stats导致生成的执行计划不够好的问题。AQE 主要有三个优化场景: Partition 合并(Partition Coalescing), Join 策略切换(Switch Join Strategy)以及倾斜 Join 优化(Optimize Skew Join)。这三个场景都对 Shuffle 框架的能力提出了新的需求。
Partition 合并
Partition 合并的目的是尽量让 reducer 处理的数据量适中且均匀做法是首先 Mapper按较多的 Partition 数目进行 Shuffle WriteAQE 框架统计每个 Partition 的 Size若连续多个 Partition 的数据量都比较小则将这些 Partition 合并成一个交由一个 Reducer 去处理。过程如下所示。 由上图可知优化后的 Reducer2 需读取原属于 Reducer2-4 的数据对 Shuffle 框架的需求是 ShuffleReader 需要支持范围 Partition:
def getReader[K, C](handle: ShuffleHandle,startPartition: Int,endPartition: Int,context: TaskContext): ShuffleReader[K, C]Join 策略切换
Join 策略切换的目的是修正由于 Stats 预估不准导致 Optimizer 把本应做的 Broadcast Join 错误的选择了 SortMerge Join 或 ShuffleHash Join。具体而言在 Join 的两张表做完 Shuffle Write 之后AQE 框架统计了实际大小若发现小表符合 Broadcast Join 的条件则将小表 Broadcast 出去跟大表的本地 Shuffle 数据做 Join。流程如下 Join 策略切换有两个优化1. 改写成 Broadcast Join; 2. 大表的数据通过LocalShuffleReader 直读本地。其中第2点对 Shuffle 框架提的新需求是支持 Local Read。
倾斜Join优化
倾斜Join优化的目的是让倾斜的 Partition 由更多的 Reducer 去处理从而避免长尾。具体而言在 Shuffle Write 结束之后AQE 框架统计每个 Partition 的 Size接着根据特定规则判断是否存在倾斜若存在则把该 Partition 分裂成多个 Split每个 Split 跟另外一张表的对应 Partition 做 Join。如下所示。 Partiton 分裂的做法是按照 MapId 的顺序累加他们 Shuffle Output 的 Size累加值超过阈值时触发分裂。对 Shuffle 框架的新需求是 ShuffleReader 要能支持范围 MapId。综合 Partition 合并优化对范围 Partition 的需求ShuffleReader 的接口演化为:
def getReader[K, C](handle: ShuffleHandle,startMapIndex: Int,endMapIndex: Int,startPartition: Int,endPartition: Int,context: TaskContext,metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C]RSS 架构回顾
RSS 的核心设计是 Push Shuffle Partition 数据聚合即不同的 Mapper 把属于同一个 Partition 的数据推给同一个 Worker 做聚合Reducer 直读聚合后的文件。如下图所示。 在核心设计之外RSS 还实现了多副本全链路容错Master HA磁盘容错自适应Pusher滚动升级等特性详见[1]。
RSS 支持 Partition 合并
Partition 合并对 Shuffle 框架的需求是支持范围 Partition在 RSS 中每个 Partition 对应着一个文件因此天然支持如下图所示。 RSS 支持 Join 策略切换
Join 策略切换对 Shuffle 框架的需求是能够支持 LocalShuffleReader。由于 RSS 的 Remote 属性数据存放在 RSS 集群仅当 RSS 和计算集群混部的场景下才会存在在本地因此暂不支持 Local Read(将来会优化混部场景并加以支持)。需要注意的是尽管不支持 Local Read但并不影响 Join 的改写RSS 支持 Join 改写优化如下图所示。 RSS 支持 Join 倾斜优化
在 AQE 的三个场景中RSS 支持 Join 倾斜优化是最为困难的一点。RSS 的核心设计是 Partition 数据聚合目的是把 Shuffle Read 的随机读转变为顺序读从而提升性能和稳定性。多个 Mapper 同时推送给 RSS WorkerRSS 在内存聚合后刷盘因此 Partition 文件中来自不同 Mapper 的数据是无序的如下图所示。 Join 倾斜优化需要读取范围 Map例如读 Map1-2的数据常规的做法有两种
读取完整文件并丢弃范围之外的数据。引入索引文件记录每个 Block 的位置及所属 MapId仅读取范围内的数据。
这两种做法的问题显而易见。方法1会导致大量冗余的磁盘读方法2本质上回退成了随机读丧失了 RSS 最核心的优势并且创建索引文件成为通用的 Overhead即使是针对非倾斜的数据( Shuffle Write 过程中难以准确预测是否存在倾斜)。
为了解决以上两个问题我们提出了新的设计主动 Split Sort On Read。
主动Split
倾斜的 Partition 大概率 Size 非常大极端情况会直接打爆磁盘即使在非倾斜场景出现大 Partition 的几率依然不小。因此从磁盘负载均衡的角度监控 Partition 文件的 Size 并做主动 Split (默认阈值256m)是非常必要的。
Split 发生时RSS 会为当前 Partition 重新分配一对 Worker(主副本)后续数据将推给新的 Worker。为了避免 Split 对正在运行的 Mapper 产生影响我们提出了 Soft Split 的方法即当触发 Split 时RSS 异步去准备新的 WorkerReady 之后去热更新 Mapper 的 PartitionLocation 信息因此不会对 Mapper 的 PushData 产生任何干扰。整体流程如下图所示。 Sort On Read
为了避免随机读的问题RSS 采用了 Sort On Read 的策略。具体而言File Split 的首次 Range 读会触发排序(非 Range 读不会触发)排好序的文件连同其位置索引写回磁盘。后续的 Range 读即可保证是顺序读取。如下图所示。 为了避免多个 Sub-Reducer 等待同一个 File Split 的排序我们打散了各个 Sub-Reducer 读取 Split 的顺序如下图所示。 Sort 优化
Sort On Read 可以有效避免冗余读和随机读但需要对 Split File(256m)做排序本节讨论排序的实现及开销。文件排序包括3个步骤读文件对 MapId 做排序写文件。RSS 的 Block 默认256kBlock 的数量大概是1000因此排序的过程非常快主要开销在文件读写。整个排序过程大致有三种方案
预先分配文件大小的内存文件整体读入解析并排序 MapId按 MapId 顺序把 Block 写回磁盘。不分配内存Seek 到每个 Block 的位置解析并排序 MapId按 MapId 顺序把原文件的 Block transferTo 新文件。分配小块内存(如256k)顺序读完整个文件并解析和排序MapId按MapId顺序把原文件的Block transferTo新文件。
从 IO 的视角乍看之下方案1通过使用足量内存不存在顺序读写方案2存在随机读和随机写方案3存在随机写直观上方案1性能更好。然而由于 PageCache 的存在方案3在写文件时原文件大概率缓存在 PageCache 中因此实测下来方案3的性能更好如下图所示。 同时方案3无需占用进程额外内存故 RSS 采用方案3的算法。我们同时还测试了 Sort On Read 跟上述的不排序、仅做索引的随机读方法的对比如下图所示。 整体流程
RSS 支持 Join 倾斜优化的整体流程如下图所示。 RSS流控
流控的主要目的是防止 RSS Worker 内存被打爆。流控通常有两种方式
Client 在每次 PushData 前先向 Worker 预留内存预留成功才触发 Push。Worker 端反压。
由于 PushData 是非常高频且性能关键的操作若每次推送都额外进行一次 RPC 交互则开销太大因此我们采用了反压的策略。以 Worker 的视角流入数据有两个源
Client 推送的数据主副本发送的数据
如下图所示Worker2 既接收来自 Mapper 推送的 Partition3 的数据也接收 Worker1发送的 Partition1 的副本数据同时会把 Partition3 的数据发给对应的从副本。 其中来自 Mapper 推送的数据当且仅当同时满足以下条件时才会释放内存
Replication 执行成功数据写盘成功
来自主副本推送的数据当且仅当满足以下条件时才会释放内存
数据写盘成功
我们在设计流控策略时不仅要考虑限流(降低流入的数据)更要考虑泄流(内存能及时释放)。具体而言高水位我们定义了两档内存阈值(分别对应85%和95%内存使用)低水位只有一档(50%内存使用)。达到高水位一档阈值时触发流控暂停接收 Mapper 推送的数据同时强制刷盘从而达到泄流的目标。仅限制来自 Mapper 的流入并不能控制来自主副本的流量因此我们定义了高水位第二档达到此阈值时将同时暂停接收主副本发送的数据。当水位低于低水位后恢复正常状态。整体流程如下图所示。 性能测试
我们对比了 RSS 和原生的 External Shufle Service(ESS) 在 Spark3.2.0 开启 AQE 的性能。RSS 采用混部的方式没有额外占用任何机器资源。此外RSS 所使用的内存为8g仅占机器内存的2.3%(机器内存352g)。具体环境如下。
测试环境
硬件
header 机器组 1x ecs.g5.4xlarge
worker 机器组 8x ecs.d2c.24xlarge96 CPU352 GB12x 3700GB HDD。
Spark AQE 相关配置:
spark.sql.adaptive.enabled true
spark.sql.adaptive.coalescePartitions.enabled true
spark.sql.adaptive.coalescePartitions.initialPartitionNum 1000
spark.sql.adaptive.skewJoin.enabled true
spark.sql.adaptive.localShuffleReader.enabled falseRSS 相关配置:
RSS_MASTER_MEMORY2g
RSS_WORKER_MEMORY1g
RSS_WORKER_OFFHEAP_MEMORY7gTPCDS 10T测试集
我们测试了10T的 TPCDSE2E 来看ESS 耗时11734sRSS 单副本/两副本分别耗时8971s/10110s分别比 ESS 快了23.5%/13.8%如下图所示。我们观察到 RSS 开启两副本时网络带宽达到上限这也是两副本比单副本低的主要因素。 具体每个 Query 的时间对比如下: 原文链接
本文为阿里云原创内容未经允许不得转载。