网站开发我嵌入式开发,个人身份调查网站,个人主题网站设计论文,青海省建设厅网站姚宽一王振华#xff0c;趣头条大数据总监#xff0c;趣头条大数据负责人曹佳清#xff0c;趣头条大数据离线团队高级研发工程师#xff0c;曾就职于饿了么大数据INF团队负责存储层和计算层组件研发#xff0c;目前负责趣头条大数据计算层组件Spark的建设范振#xff0c;花名辰…王振华趣头条大数据总监趣头条大数据负责人曹佳清趣头条大数据离线团队高级研发工程师曾就职于饿了么大数据INF团队负责存储层和计算层组件研发目前负责趣头条大数据计算层组件Spark的建设范振花名辰繁阿里云计算平台EMR高级技术专家目前主要关注开源大数据技术以及云原生技术。
1. 业务场景与现状
趣头条是一家依赖大数据的科技公司在2018-2019年经历了业务的高速发展主App和其他创新App的日活增加了10倍以上相应的大数据系统也从最初的100台机器增加到了1000台以上规模。多个业务线依赖于大数据平台展开业务大数据系统的高效和稳定成了公司业务发展的基石在大数据的架构上我们使用了业界成熟的方案存储构建在HDFS上、计算资源调度依赖Yarn、表元数据使用Hive管理、用Spark进行计算具体如图1所示 图1 趣头条离线大数据平台架构图 其中Yarn集群使用了单一大集群的方案HDFS使用了联邦的方案同时基于成本因素HDFS和Yarn服务在ECS上进行了DataNode和NodeManager的混部。 在趣头条每天有6W的Spark任务跑在Yarn集群上每天新增的Spark任务稳定在100左右公司的迅速发展要求需求快速实现积累了很多治理欠债种种问题表现出来集群稳定性需要提升其中Shuffle的稳定性越来越成为集群的桎梏亟需解决。
2. 当前大数据平台的挑战与思考
近半年大数据平台主要的业务指标是降本增效一方面业务方希望离线平台每天能够承载更多的作业另一方面我们自身有降本的需求如何在降本的前提下支撑更多地业务量对于每个技术人都是非常大地挑战。熟悉Spark的同学应该非常清楚在大规模集群场景下Spark Shuffle在实现上有比较大的缺陷体现在以下的几个方面
Spark Shuffle Fetch过程存在大量的网络小包现有的External Shuffle Service设计并没有非常细致的处理这些RPC请求大规模场景下会有很多connection reset发生导致FetchFailed从而导致stage重算。Spark Shuffle Fetch过程存在大量的随机读大规模高负载集群条件下磁盘IO负载高、CPU满载时常发生极容易发生FetchFailed从而导致stage重算。重算过程会放大集群的繁忙程度抢占机器资源导致恶性循环严重SLA完不成需要运维人员手动将作业跑在空闲的Label集群。计算和Shuffle过程架构不能拆开不能把Shuffle限定在指定的集群内不能利用部分SSD机器。M*N次的shuffle过程对于10K mapper5K reducer级别的作业基本跑不完。NodeManager和Spark Shuffle Service是同一进程Shuffle过程太重经常导致NodeManager重启从而影响Yarn调度稳定性。
以上的这些问题对于Spark研发同学是非常痛苦的好多作业每天运行时长方差会非常大而且总有一些无法完成的作业要么业务进行拆分要么跑到独有的Yarn集群中。除了现有面临的挑战之外我们也在积极构建下一代基础架构设施随着云原生Kubernetes概念越来越火Spark社区也提供了Spark on Kubernetes版本相比较于Yarn来说Kubernetes能够更好的利用云原生的弹性提供更加丰富的运维、部署、隔离等特性。但是Spark on Kubernetes目前还存在很多问题没有解决包括容器内的Shuffle方式、动态资源调度、调度性能有限等等。我们针对Kubernetes在趣头条的落地主要有以下几个方面的需求
实时集群、OLAP集群和Spark集群之前都是相互独立的怎样能够将这些资源形成统一大数据资源池。通过Kubernetes的天生隔离特性更好的实现离线业务与实时业务混部达到降本增效目的。公司的在线业务都运行在Kubernetes集群中如何利用在线业务和大数据业务的不同特点进行错峰调度达成ECS的总资源量最少。希望能够基于Kubernetes来包容在线服务、大数据、AI等基础架构做到运维体系统一化。
因为趣头条的大数据业务目前全都部署在阿里云上阿里云EMR团队和趣头条的大数据团队进行了深入技术共创共同研发了Remote Shuffle Service以下简称RSS旨在解决Spark on Yarn层面提到的所有问题并为Spark跑在Kubernetes上提供Shuffle基础组件。
3. Remote Shuffle Service设计与实现
3.1 Remote Shuffle Service的背景
早在2019年初我们就关注到了社区已经有相应的讨论如SPARK-25299。该Issue主要希望解决的问题是在云原生环境下Spark需要将Shuffle数据写出到远程的服务中。但是我们经过调研后发现Spark 3.0之前的master分支只支持了部分的接口而没有对应的实现。该接口主要希望在现有的Shuffle代码框架下将数据写到远程服务中。如果基于这种方式实现比如直接将Shuffle以流的方式写入到HDFS或者Alluxio等高速内存系统会有相当大的性能开销趣头条也做了一些相应的工作并进行了部分的Poc性能与原版Spark Shuffle实现相差特别多最差性能可下降3倍以上。同时我们也调研了一部分其他公司的实现方案例如Facebook的Riffle方案以及LinkedIn开源的Magnet这些实现方案是首先将Shuffle文件写到本地然后在进行Merge或者Upload到远程的服务上这和后续我们的Kubernetes架构是不兼容的因为Kubernetes场景下本地磁盘Hostpath或者LocalPV并不是一个必选项而且也会存在隔离和权限的问题。 基于上述背景我们与阿里云EMR团队共同开发了Remote Shuffle Service。RSS可以提供以下的能力完美的解决了Spark Shuffle面临的技术挑战为我们集群的稳定性和容器化的落地提供了强有力的保证主要体现在以下几个方面
高性能服务器的设计思路不同于Spark原有Shuffle ServiceRPC更轻量、通用和稳定。两副本机制能够保证的Shuffle fetch极小概率低于0.01%失败。合并shuffle文件从M*N次shuffle变成N次shuffle顺序读HDD磁盘会显著提升shuffle heavy作业性能。减少Executor计算时内存压力避免map过程中Shuffle Spill。计算与存储分离架构可以将Shuffle Service部署到特殊硬件环境中例如SSD机器可以保证SLA极高的作业。完美解决Spark on Kubernetes方案中对于本地磁盘的依赖。
3.2 Remote Shuffle Service的实现
3.2.1 整体设计
Spark RSS架构包含三个角色: Master, Worker, Client。Master和Worker构成服务端Client以不侵入的方式集成到Spark ShuffleManager里RssShuffleManager实现了ShuffleManager接口。
Master的主要职责是资源分配与状态管理。Worker的主要职责是处理和存储Shuffle数据。Client的主要职责是缓存和推送Shuffle数据。
整体流程如下所示(其中ResourceManager和MetaService是Master的组件)如图2。 图2 RSS整体架构图
3.2.2 实现流程
下面重点来讲一下实现的流程
RSS采用Push Style的shuffle模式每个Mapper持有一个按Partition分界的缓存区Shuffle数据首先写入缓存区每当某个Partition的缓存满了即触发PushData。Driver先和Master发生StageStart的请求Master接受到该RPC后会分配对应的Worker Partition并返回给DriverShuffle Client得到这些元信息后进行后续的推送数据。Client开始向主副本推送数据。主副本Worker收到请求后把数据缓存到本地内存同时把该请求以Pipeline的方式转发给从副本从而实现了2副本机制。为了不阻塞PushData的请求Worker收到PushData请求后会以纯异步的方式交由专有的线程池异步处理。根据该Data所属的Partition拷贝到事先分配的buffer里若buffer满了则触发flush。RSS支持多种存储后端包括DFS和Local。若后端是DFS则主从副本只有一方会flush依靠DFS的双副本保证容错若后端是Local则主从双方都会flush。在所有的Mapper都结束后Driver会触发StageEnd请求。Master接收到该RPC后会向所有Worker发送CommitFiles请求Worker收到后把属于该Stage buffer里的数据flush到存储层close文件并释放buffer。Master收到所有响应后记录每个partition对应的文件列表。若CommitFiles请求失败则Master标记此Stage为DataLost。在Reduce阶段reduce task首先向Master请求该Partition对应的文件列表若返回码是DataLost则触发Stage重算或直接abort作业。若返回正常则直接读取文件数据。
总体来讲RSS的设计要点总结为3个层面
采用PushStyle的方式做shuffle避免了本地存储从而适应了计算存储分离架构。按照reduce做聚合避免了小文件随机读写和小数据量网络请求。做了2副本提高了系统稳定性。
3.2.3 容错
对于RSS系统容错性是至关重要的我们分为以下几个维度来实现 PushData失败 当PushData失败次数(Worker挂了网络繁忙CPU繁忙等)超过MaxRetry后Client会给Master发消息请求新的Partition Location此后本Client都会使用新的Location地址该阶段称为Revive。若Revive是因为Client端而非Worker的问题导致则会产生同一个Partition数据分布在不同Worker上的情况Master的Meta组件会正确处理这种情形。若发生WorkerLost则会导致大量PushData同时失败此时会有大量同一Partition的Revive请求打到Master。为了避免给同一个Partition分配过多的LocationMaster保证仅有一个Revive请求真正得到处理其余的请求塞到pending queue里待Revive处理结束后返回同一个Location。 Worker宕机 当发生WorkerLost时对于该Worker上的副本数据Master向其peer发送CommitFile的请求然后清理peer上的buffer。若Commit Files失败则记录该Stage为DataLost若成功则后续的PushData通过Revive机制重新申请Location。 数据去重 Speculation task和task重算会导致数据重复。解决办法是每个PushData的数据片里编码了所属的mapIdattemptId和batchId并且Master为每个map task记录成功commit的attemtpId。read端通过attemptId过滤不同的attempt数据并通过batchId过滤同一个attempt的重复数据。 多副本 RSS目前支持DFS和Local两种存储后端。在DFS模式下ReadPartition失败会直接导致Stage重算或abort job。在Local模式ReadPartition失败会触发从peer location读若主从都失败则触发Stage重算或abort job。
3.2.4 高可用
大家可以看到RSS的设计中Master是一个单点虽然Master的负载很小不会轻易地挂掉但是这对于线上稳定性来说无疑是一个风险点。在项目的最初上线阶段我们希望可以通过SubCluster的方式进行workaround即通过部署多套RSS来承载不同的业务这样即使RSS Master宕机也只会影响有限的一部分业务。但是随着系统的深入使用我们决定直面问题引进高可用Master。主要的实现如下
首先Master目前的元数据比较多我们可以将一部分与ApplDShuffleId本身相关的元数据下沉到Driver的ShuffleManager中由于元数据并不会很多Driver增加的内存开销非常有限。另外关于全局负载均衡的元数据和调度相关的元数据我们利用Raft实现了Master组件的高可用这样我们通过部署3或5台Master真正的实现了大规模可扩展的需求。
4. 实际效果与分析
4.1 性能与稳定性
团队针对TeraSortTPC-DS以及大量的内部作业进行了测试在Reduce阶段减少了随机读的开销任务的稳定性和性能都有了大幅度提升。 图3是TeraSort的benchmark以10T Terasort为例Shuffle量压缩后大约5.6T。可以看出该量级的作业在RSS场景下由于Shuffle read变为顺序读性能会有大幅提升。 图3 TeraSort性能测试RSS性能更好 图4是一个线上实际脱敏后的Shuffle heavy大作业之前在混部集群中很小概率可以跑完每天任务SLA不能按时达成分析原因主要是由于大量的FetchFailed导致stage进行重算。使用RSS之后每天可以稳定的跑完2.1T的shuffle也不会出现任何FetchFailed的场景。在更大的数据集性能和SLA表现都更为显著。 图4 实际业务的作业stage图使用RSS保障稳定性和性能
4.2 业务效果
在大数据团队和阿里云EMR团队的共同努力下经过近半年的上线、运营RSS以及和业务部门的长时间测试业务价值主要体现在以下方面
降本增效效果明显在集群规模小幅下降的基础上支撑了更多的计算任务TCO成本下降20%。SLA显著提升大规模Spark Shuffle任务从跑不完到能跑完我们能够将不同SLA级别作业合并到同一集群减小集群节点数量达到统一管理缩小成本的目的。原本业务方有一部分SLA比较高的作业在一个独有的Yarn集群B中运行由于主Yarn集群A的负载非常高如果跑到集群A中会经常的挂掉。利用RSS之后可以放心的将作业跑到主集群A中从而释放掉独有Yarn集群B。作业执行效率显著提升跑的慢 - 跑的快。我们比较了几个典型的Shuffle heavy作业一个重要的业务线作业原本需要3小时RSS版本需要1.6小时。抽取线上5~10个作业大作业的性能提升相当明显不同作业平均下来有30%以上的性能提升即使是shuffle量不大的作业由于比较稳定不需要stage重算长期运行平均时间也会减少10%-20%。架构灵活性显著提升升级为计算与存储分离架构。Spark在容器中运行的过程中将RSS作为基础组件可以使得Spark容器化能够大规模的落地为离线在线统一资源、统一调度打下了基础。
5. 未来展望
趣头条大数据平台和阿里云EMR团队后续会继续保持深入共创将探索更多的方向。主要有以下的一些思路
RSS存储能力优化包括将云的对象存储作为存储后端。RSS多引擎支持例如MapReduce、Tez等提升历史任务执行效率。加速大数据容器化落地配合RSS能力解决K8s调度器性能、调度策略等一系列挑战。持续优化成本配合EMR的弹性伸缩功能一方面Spark可以使用更多的阿里云ECS/ECI抢占式实例来进一步压缩成本另一方面将已有机器包括阿里云ACKECI等资源形成统一大池子将大数据的计算组件和在线业务进行错峰调度以及混部。
原文链接 本文为阿里云原创内容未经允许不得转载。