唯品会网站开发费用,包头网站制作公司,wordpress板块,视频网站架构在大数据计算领域#xff0c;Spark已经成为了越来越流行、越来越受欢迎的计算平台之一。Spark的功能涵盖了大数据领域的离线批处理、SQL类处理、流式/实时计算、机器学习、图计算等各种不同类型的计算操作#xff0c;应用范围与前景非常广泛。在美团•大众点评#xff0c;已… 在大数据计算领域Spark已经成为了越来越流行、越来越受欢迎的计算平台之一。Spark的功能涵盖了大数据领域的离线批处理、SQL类处理、流式/实时计算、机器学习、图计算等各种不同类型的计算操作应用范围与前景非常广泛。在美团•大众点评已经有很多同学在各种项目中尝试使用Spark。大多数同学包括笔者在内最初开始尝试使用Spark的原因很简单主要就是为了让大数据计算作业的执行速度更快、性能更高。 然而通过Spark开发出高性能的大数据计算作业并不是那么简单的。如果没有对Spark作业进行合理的调优Spark作业的执行速度可能会很慢这样就完全体现不出Spark作为一种快速大数据计算引擎的优势来。因此想要用好Spark就必须对其进行合理的性能优化。 Spark的性能调优实际上是由很多部分组成的不是调节几个参数就可以立竿见影提升作业性能的。我们需要根据不同的业务场景以及数据情况对Spark作业进行综合性的分析然后进行多个方面的调节和优化才能获得最佳性能。 笔者根据之前的Spark作业开发经验以及实践积累总结出了一套Spark作业的性能优化方案。整套方案主要分为开发调优、资源调优、数据倾斜调优、shuffle调优几个部分。开发调优和资源调优是所有Spark作业都需要注意和遵循的一些基本原则是高性能Spark作业的基础数据倾斜调优主要讲解了一套完整的用来解决Spark作业数据倾斜的解决方案shuffle调优面向的是对Spark的原理有较深层次掌握和研究的同学主要讲解了如何对Spark作业的shuffle运行过程以及细节进行调优。 本文作为Spark性能优化指南的基础篇主要讲解开发调优以及资源调优。 调优概述 Spark性能优化的第一步就是要在开发Spark作业的过程中注意和应用一些性能优化的基本原则。开发调优就是要让大家了解以下一些Spark基本开发原则包括RDD lineage设计、算子的合理使用、特殊操作的优化等。在开发过程中时时刻刻都应该注意以上原则并将这些原则根据具体的业务以及实际的应用场景灵活地运用到自己的Spark作业中。 原则一避免创建重复的RDD 通常来说我们在开发一个Spark作业时首先是基于某个数据源比如Hive表或HDFS文件创建一个初始的RDD接着对这个RDD执行某个算子操作然后得到下一个RDD以此类推循环往复直到计算出最终我们需要的结果。在这个过程中多个RDD会通过不同的算子操作比如map、reduce等串起来这个“RDD串”就是RDD lineage也就是“RDD的血缘关系链”。 我们在开发过程中要注意对于同一份数据只应该创建一个RDD不能创建多个RDD来代表同一份数据。 一些Spark初学者在刚开始开发Spark作业时或者是有经验的工程师在开发RDD lineage极其冗长的Spark作业时可能会忘了自己之前对于某一份数据已经创建过一个RDD了从而导致对于同一份数据创建了多个RDD。这就意味着我们的Spark作业会进行多次重复计算来创建多个代表相同数据的RDD进而增加了作业的性能开销。 一个简单的例子 // 需要对名为“hello.txt”的HDFS文件进行一次map操作再进行一次reduce操作。也就是说需要对一份数据执行两次算子操作。// 错误的做法对于同一份数据执行多次算子操作时创建多个RDD。
// 这里执行了两次textFile方法针对同一个HDFS文件创建了两个RDD出来然后分别对每个RDD都执行了一个算子操作。
// 这种情况下Spark需要从HDFS上两次加载hello.txt文件的内容并创建两个单独的RDD第二次加载HDFS文件以及创建RDD的性能开销很明显是白白浪费掉的。
val rdd1 sc.textFile(hdfs://192.168.0.1:9000/hello.txt)
rdd1.map(...)
val rdd2 sc.textFile(hdfs://192.168.0.1:9000/hello.txt)
rdd2.reduce(...)// 正确的用法对于一份数据执行多次算子操作时只使用一个RDD。
// 这种写法很明显比上一种写法要好多了因为我们对于同一份数据只创建了一个RDD然后对这一个RDD执行了多次算子操作。
// 但是要注意到这里为止优化还没有结束由于rdd1被执行了两次算子操作第二次执行reduce操作的时候还会再次从源头处重新计算一次rdd1的数据因此还是会有重复计算的性能开销。
// 要彻底解决这个问题必须结合“原则三对多次使用的RDD进行持久化”才能保证一个RDD被多次使用时只被计算一次。
val rdd1 sc.textFile(hdfs://192.168.0.1:9000/hello.txt)
rdd1.map(...)
rdd1.reduce(...)原则二尽可能复用同一个RDD 除了要避免在开发过程中对一份完全相同的数据创建多个RDD之外在对不同的数据执行算子操作时还要尽可能地复用一个RDD。比如说有一个RDD的数据格式是key-value类型的另一个是单value类型的这两个RDD的value数据是完全一样的。那么此时我们可以只使用key-value类型的那个RDD因为其中已经包含了另一个的数据。对于类似这种多个RDD的数据有重叠或者包含的情况我们应该尽量复用一个RDD这样可以尽可能地减少RDD的数量从而尽可能减少算子执行的次数。 一个简单的例子 // 错误的做法。// 有一个Long, String格式的RDD即rdd1。
// 接着由于业务需要对rdd1执行了一个map操作创建了一个rdd2而rdd2中的数据仅仅是rdd1中的value值而已也就是说rdd2是rdd1的子集。
JavaPairRDDLong, String rdd1 ...
JavaRDDString rdd2 rdd1.map(...)// 分别对rdd1和rdd2执行了不同的算子操作。
rdd1.reduceByKey(...)
rdd2.map(...)// 正确的做法。// 上面这个case中其实rdd1和rdd2的区别无非就是数据格式不同而已rdd2的数据完全就是rdd1的子集而已却创建了两个rdd并对两个rdd都执行了一次算子操作。
// 此时会因为对rdd1执行map算子来创建rdd2而多执行一次算子操作进而增加性能开销。// 其实在这种情况下完全可以复用同一个RDD。
// 我们可以使用rdd1既做reduceByKey操作也做map操作。
// 在进行第二个map操作时只使用每个数据的tuple._2也就是rdd1中的value值即可。
JavaPairRDDLong, String rdd1 ...
rdd1.reduceByKey(...)
rdd1.map(tuple._2...)// 第二种方式相较于第一种方式而言很明显减少了一次rdd2的计算开销。
// 但是到这里为止优化还没有结束对rdd1我们还是执行了两次算子操作rdd1实际上还是会被计算两次。
// 因此还需要配合“原则三对多次使用的RDD进行持久化”进行使用才能保证一个RDD被多次使用时只被计算一次。原则三对多次使用的RDD进行持久化 当你在Spark代码中多次对一个RDD做了算子操作后恭喜你已经实现Spark作业第一步的优化了也就是尽可能复用RDD。此时就该在这个基础之上进行第二步优化了也就是要保证对一个RDD执行多次算子操作时这个RDD本身仅仅被计算一次。 Spark中对于一个RDD执行多次算子的默认原理是这样的每次你对一个RDD执行一个算子操作时都会重新从源头处计算一遍计算出那个RDD来然后再对这个RDD执行你的算子操作。这种方式的性能是很差的。 因此对于这种情况我们的建议是对多次使用的RDD进行持久化。此时Spark就会根据你的持久化策略将RDD中的数据保存到内存或者磁盘中。以后每次对这个RDD进行算子操作时都会直接从内存或磁盘中提取持久化的RDD数据然后执行算子而不会从源头处重新计算一遍这个RDD再执行算子操作。 对多次使用的RDD进行持久化的代码示例 // 如果要对一个RDD进行持久化只要对这个RDD调用cache()和persist()即可。// 正确的做法。
// cache()方法表示使用非序列化的方式将RDD中的数据全部尝试持久化到内存中。
// 此时再对rdd1执行两次算子操作时只有在第一次执行map算子时才会将这个rdd1从源头处计算一次。
// 第二次执行reduce算子时就会直接从内存中提取数据进行计算不会重复计算一个rdd。
val rdd1 sc.textFile(hdfs://192.168.0.1:9000/hello.txt).cache()
rdd1.map(...)
rdd1.reduce(...)// persist()方法表示手动选择持久化级别并使用指定的方式进行持久化。
// 比如说StorageLevel.MEMORY_AND_DISK_SER表示内存充足时优先持久化到内存中内存不充足时持久化到磁盘文件中。
// 而且其中的_SER后缀表示使用序列化的方式来保存RDD数据此时RDD中的每个partition都会序列化成一个大的字节数组然后再持久化到内存或磁盘中。
// 序列化的方式可以减少持久化的数据对内存/磁盘的占用量进而避免内存被持久化数据占用过多从而发生频繁GC。
val rdd1 sc.textFile(hdfs://192.168.0.1:9000/hello.txt).persist(StorageLevel.MEMORY_AND_DISK_SER)
rdd1.map(...)
rdd1.reduce(...)对于persist()方法而言我们可以根据不同的业务场景选择不同的持久化级别。 Spark的持久化级别 持久化级别含义解释MEMORY_ONLY使用未序列化的Java对象格式将数据保存在内存中。如果内存不够存放所有的数据则数据可能就不会进行持久化。那么下次对这个RDD执行算子操作时那些没有被持久化的数据需要从源头处重新计算一遍。这是默认的持久化策略使用cache()方法时实际就是使用的这种持久化策略。MEMORY_AND_DISK使用未序列化的Java对象格式优先尝试将数据保存在内存中。如果内存不够存放所有的数据会将数据写入磁盘文件中下次对这个RDD执行算子时持久化在磁盘文件中的数据会被读取出来使用。MEMORY_ONLY_SER基本含义同MEMORY_ONLY。唯一的区别是会将RDD中的数据进行序列化RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存从而可以避免持久化的数据占用过多内存导致频繁GC。MEMORY_AND_DISK_SER基本含义同MEMORY_AND_DISK。唯一的区别是会将RDD中的数据进行序列化RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存从而可以避免持久化的数据占用过多内存导致频繁GC。DISK_ONLY使用未序列化的Java对象格式将数据全部写入磁盘文件中。MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等.对于上述任意一种持久化策略如果加上后缀_2代表的是将每个持久化的数据都复制一份副本并将副本保存到其他节点上。这种基于副本的持久化机制主要用于进行容错。假如某个节点挂掉节点的内存或磁盘中的持久化数据丢失了那么后续对RDD计算时还可以使用该数据在其他节点上的副本。如果没有副本的话就只能将这些数据从源头处重新计算一遍了。如何选择一种最合适的持久化策略 默认情况下性能最高的当然是MEMORY_ONLY但前提是你的内存必须足够足够大可以绰绰有余地存放下整个RDD的所有数据。因为不进行序列化与反序列化操作就避免了这部分的性能开销对这个RDD的后续算子操作都是基于纯内存中的数据的操作不需要从磁盘文件中读取数据性能也很高而且不需要复制一份数据副本并远程传送到其他节点上。但是这里必须要注意的是在实际的生产环境中恐怕能够直接用这种策略的场景还是有限的如果RDD中数据比较多时比如几十亿直接用这种持久化级别会导致JVM的OOM内存溢出异常。如果使用MEMORY_ONLY级别时发生了内存溢出那么建议尝试使用MEMORY_ONLY_SER级别。该级别会将RDD数据序列化后再保存在内存中此时每个partition仅仅是一个字节数组而已大大减少了对象数量并降低了内存占用。这种级别比MEMORY_ONLY多出来的性能开销主要就是序列化与反序列化的开销。但是后续算子可以基于纯内存进行操作因此性能总体还是比较高的。此外可能发生的问题同上如果RDD中的数据量过多的话还是可能会导致OOM内存溢出的异常。如果纯内存的级别都无法使用那么建议使用MEMORY_AND_DISK_SER策略而不是MEMORY_AND_DISK策略。因为既然到了这一步就说明RDD的数据量很大内存无法完全放下。序列化后的数据比较少可以节省内存和磁盘的空间开销。同时该策略会优先尽量尝试将数据缓存在内存中内存缓存不下才会写入磁盘。通常不建议使用DISK_ONLY和后缀为_2的级别因为完全基于磁盘文件进行数据的读写会导致性能急剧降低有时还不如重新计算一次所有RDD。后缀为_2的级别必须将所有数据都复制一份副本并发送到其他节点上数据复制以及网络传输会导致较大的性能开销除非是要求作业的高可用性否则不建议使用。原则四尽量避免使用shuffle类算子 如果有可能的话要尽量避免使用shuffle类算子。因为Spark作业运行过程中最消耗性能的地方就是shuffle过程。shuffle过程简单来说就是将分布在集群中多个节点上的同一个key拉取到同一个节点上进行聚合或join等操作。比如reduceByKey、join等算子都会触发shuffle操作。 shuffle过程中各个节点上的相同key都会先写入本地磁盘文件中然后其他节点需要通过网络传输拉取各个节点上的磁盘文件中的相同key。而且相同key都拉取到同一个节点进行聚合操作时还有可能会因为一个节点上处理的key过多导致内存不够存放进而溢写到磁盘文件中。因此在shuffle过程中可能会发生大量的磁盘文件读写的IO操作以及数据的网络传输操作。磁盘IO和网络数据传输也是shuffle性能较差的主要原因。 因此在我们的开发过程中能避免则尽可能避免使用reduceByKey、join、distinct、repartition等会进行shuffle的算子尽量使用map类的非shuffle算子。这样的话没有shuffle操作或者仅有较少shuffle操作的Spark作业可以大大减少性能开销。 Broadcast与map进行join代码示例 // 传统的join操作会导致shuffle操作。
// 因为两个RDD中相同的key都需要通过网络拉取到一个节点上由一个task进行join操作。
val rdd3 rdd1.join(rdd2)// Broadcastmap的join操作不会导致shuffle操作。
// 使用Broadcast将一个数据量较小的RDD作为广播变量。
val rdd2Data rdd2.collect()
val rdd2DataBroadcast sc.broadcast(rdd2Data)// 在rdd1.map算子中可以从rdd2DataBroadcast中获取rdd2的所有数据。
// 然后进行遍历如果发现rdd2中某条数据的key与rdd1的当前数据的key是相同的那么就判定可以进行join。
// 此时就可以根据自己需要的方式将rdd1当前数据与rdd2中可以连接的数据拼接在一起String或Tuple。
val rdd3 rdd1.map(rdd2DataBroadcast...)// 注意以上操作建议仅仅在rdd2的数据量比较少比如几百M或者一两G的情况下使用。
// 因为每个Executor的内存中都会驻留一份rdd2的全量数据。原则五使用map-side预聚合的shuffle操作 如果因为业务需要一定要使用shuffle操作无法用map类的算子来替代那么尽量使用可以map-side预聚合的算子。 所谓的map-side预聚合说的是在每个节点本地对相同的key进行一次聚合操作类似于MapReduce中的本地combiner。map-side预聚合之后每个节点本地就只会有一条相同的key因为多条相同的key都被聚合起来了。其他节点在拉取所有节点上的相同key时就会大大减少需要拉取的数据数量从而也就减少了磁盘IO以及网络传输开销。通常来说在可能的情况下建议使用reduceByKey或者aggregateByKey算子来替代掉groupByKey算子。因为reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。而groupByKey算子是不会进行预聚合的全量的数据会在集群的各个节点之间分发和传输性能相对来说比较差。 比如如下两幅图就是典型的例子分别基于reduceByKey和groupByKey进行单词计数。其中第一张图是groupByKey的原理图可以看到没有进行任何本地聚合时所有数据都会在集群节点之间传输第二张图是reduceByKey的原理图可以看到每个节点本地的相同key数据都进行了预聚合然后才传输到其他节点上进行全局聚合。 原则六使用高性能的算子 除了shuffle相关的算子有优化原则之外其他的算子也都有着相应的优化原则。 使用reduceByKey/aggregateByKey替代groupByKey 详情见“原则五使用map-side预聚合的shuffle操作”。 使用mapPartitions替代普通map mapPartitions类的算子一次函数调用会处理一个partition所有的数据而不是一次函数调用处理一条性能相对来说会高一些。但是有的时候使用mapPartitions会出现OOM内存溢出的问题。因为单次函数调用就要处理掉一个partition所有的数据如果内存不够垃圾回收时是无法回收掉太多对象的很可能出现OOM异常。所以使用这类操作时要慎重 使用foreachPartitions替代foreach 原理类似于“使用mapPartitions替代map”也是一次函数调用处理一个partition的所有数据而不是一次函数调用处理一条数据。在实践中发现foreachPartitions类的算子对性能的提升还是很有帮助的。比如在foreach函数中将RDD中所有数据写MySQL那么如果是普通的foreach算子就会一条数据一条数据地写每次函数调用可能就会创建一个数据库连接此时就势必会频繁地创建和销毁数据库连接性能是非常低下但是如果用foreachPartitions算子一次性处理一个partition的数据那么对于每个partition只要创建一个数据库连接即可然后执行批量插入操作此时性能是比较高的。实践中发现对于1万条左右的数据量写MySQL性能可以提升30%以上。 使用filter之后进行coalesce操作 通常对一个RDD执行filter算子过滤掉RDD中较多数据后比如30%以上的数据建议使用coalesce算子手动减少RDD的partition数量将RDD中的数据压缩到更少的partition中去。因为filter之后RDD的每个partition中都会有很多数据被过滤掉此时如果照常进行后续的计算其实每个task处理的partition中的数据量并不是很多有一点资源浪费而且此时处理的task越多可能速度反而越慢。因此用coalesce减少partition数量将RDD中的数据压缩到更少的partition之后只要使用更少的task即可处理完所有的partition。在某些场景下对于性能的提升会有一定的帮助。 使用repartitionAndSortWithinPartitions替代repartition与sort类操作 repartitionAndSortWithinPartitions是Spark官网推荐的一个算子官方建议如果需要在repartition重分区之后还要进行排序建议直接使用repartitionAndSortWithinPartitions算子。因为该算子可以一边进行重分区的shuffle操作一边进行排序。shuffle与sort两个操作同时进行比先shuffle再sort来说性能可能是要高的。 原则七广播大变量 有时在开发过程中会遇到需要在算子函数中使用外部变量的场景尤其是大变量比如100M以上的大集合那么此时就应该使用Spark的广播Broadcast功能来提升性能。 在算子函数中使用到外部变量时默认情况下Spark会将该变量复制多个副本通过网络传输到task中此时每个task都有一个变量副本。如果变量本身比较大的话比如100M甚至1G那么大量的变量副本在网络中传输的性能开销以及在各个节点的Executor中占用过多内存导致的频繁GC都会极大地影响性能。 因此对于上述情况如果使用的外部变量比较大建议使用Spark的广播功能对该变量进行广播。广播后的变量会保证每个Executor的内存中只驻留一份变量副本而Executor中的task执行时共享该Executor中的那份变量副本。这样的话可以大大减少变量副本的数量从而减少网络传输的性能开销并减少对Executor内存的占用开销降低GC的频率。 广播大变量的代码示例 // 以下代码在算子函数中使用了外部的变量。
// 此时没有做任何特殊操作每个task都会有一份list1的副本。
val list1 ...
rdd1.map(list1...)// 以下代码将list1封装成了Broadcast类型的广播变量。
// 在算子函数中使用广播变量时首先会判断当前task所在Executor内存中是否有变量副本。
// 如果有则直接使用如果没有则从Driver或者其他Executor节点上远程拉取一份放到本地Executor内存中。
// 每个Executor内存中就只会驻留一份广播变量副本。
val list1 ...
val list1Broadcast sc.broadcast(list1)
rdd1.map(list1Broadcast...)原则八使用Kryo优化序列化性能 在Spark中主要有三个地方涉及到了序列化 * 在算子函数中使用到外部变量时该变量会被序列化后进行网络传输见“原则七广播大变量”中的讲解。 * 将自定义的类型作为RDD的泛型类型时比如JavaRDD Student是自定义类型所有自定义类型对象都会进行序列化。因此这种情况下也要求自定义的类必须实现Serializable接口。 * 使用可序列化的持久化策略时比如MEMORY_ONLY_SERSpark会将RDD中的每个partition都序列化成一个大的字节数组。 对于这三种出现序列化的地方我们都可以通过使用Kryo序列化类库来优化序列化和反序列化的性能。Spark默认使用的是Java的序列化机制也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库Kryo序列化类库的性能比Java序列化类库的性能要高很多。官方介绍Kryo序列化机制比Java序列化机制性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库是因为Kryo要求最好要注册所有需要进行序列化的自定义类型因此对于开发者来说这种方式比较麻烦。 以下是使用Kryo的代码示例我们只要设置序列化类再注册要序列化的自定义类型即可比如算子函数中使用到的外部变量类型、作为RDD泛型类型的自定义类型等 // 创建SparkConf对象。
val conf new SparkConf().setMaster(...).setAppName(...)
// 设置序列化器为KryoSerializer。
conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)
// 注册要序列化的自定义类型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))原则九优化数据结构 Java中有三种类型比较耗费内存 * 对象每个Java对象都有对象头、引用等额外的信息因此比较占用内存空间。 * 字符串每个字符串内部都有一个字符数组以及长度等额外信息。 * 集合类型比如HashMap、LinkedList等因为集合类型内部通常会使用一些内部类来封装集合元素比如Map.Entry。 因此Spark官方建议在Spark编码实现中特别是对于算子函数中的代码尽量不要使用上述三种数据结构尽量使用字符串替代对象使用原始类型比如Int、Long替代字符串使用数组替代集合类型这样尽可能地减少内存占用从而降低GC频率提升性能。 但是在笔者的编码实践中发现要做到该原则其实并不容易。因为我们同时要考虑到代码的可维护性如果一个代码中完全没有任何对象抽象全部是字符串拼接的方式那么对于后续的代码维护和修改无疑是一场巨大的灾难。同理如果所有操作都基于数组实现而不使用HashMap、LinkedList等集合类型那么对于我们的编码难度以及代码可维护性也是一个极大的挑战。因此笔者建议在可能以及合适的情况下使用占用内存较少的数据结构但是前提是要保证代码的可维护性。 调优概述 在开发完Spark作业之后就该为作业配置合适的资源了。Spark的资源参数基本都可以在spark-submit命令中作为参数设置。很多Spark初学者通常不知道该设置哪些必要的参数以及如何设置这些参数最后就只能胡乱设置甚至压根儿不设置。资源参数设置的不合理可能会导致没有充分利用集群资源作业运行会极其缓慢或者设置的资源过大队列没有足够的资源来提供进而导致各种异常。总之无论是哪种情况都会导致Spark作业的运行效率低下甚至根本无法运行。因此我们必须对Spark作业的资源使用原理有一个清晰的认识并知道在Spark作业运行过程中有哪些资源参数是可以设置的以及如何设置合适的参数值。 Spark作业基本运行原理 详细原理见上图。我们使用spark-submit提交一个Spark作业之后这个作业就会启动一个对应的Driver进程。根据你使用的部署模式deploy-mode不同Driver进程可能在本地启动也可能在集群中某个工作节点上启动。Driver进程本身会根据我们设置的参数占有一定数量的内存和CPU core。而Driver进程要做的第一件事情就是向集群管理器可以是Spark Standalone集群也可以是其他的资源管理集群美团•大众点评使用的是YARN作为资源管理集群申请运行Spark作业需要使用的资源这里的资源指的就是Executor进程。YARN集群管理器会根据我们为Spark作业设置的资源参数在各个工作节点上启动一定数量的Executor进程每个Executor进程都占有一定数量的内存和CPU core。 在申请到了作业执行所需的资源之后Driver进程就会开始调度和执行我们编写的作业代码了。Driver进程会将我们编写的Spark作业代码分拆为多个stage每个stage执行一部分代码片段并为每个stage创建一批task然后将这些task分配到各个Executor进程中执行。task是最小的计算单元负责执行一模一样的计算逻辑也就是我们自己编写的某个代码片段只是每个task处理的数据不同而已。一个stage的所有task都执行完毕之后会在各个节点本地的磁盘文件中写入计算中间结果然后Driver就会调度运行下一个stage。下一个stage的task的输入数据就是上一个stage输出的中间结果。如此循环往复直到将我们自己编写的代码逻辑全部执行完并且计算完所有的数据得到我们想要的结果为止。 Spark是根据shuffle类算子来进行stage的划分。如果我们的代码中执行了某个shuffle类算子比如reduceByKey、join等那么就会在该算子处划分出一个stage界限来。可以大致理解为shuffle算子执行之前的代码会被划分为一个stageshuffle算子执行以及之后的代码会被划分为下一个stage。因此一个stage刚开始执行的时候它的每个task可能都会从上一个stage的task所在的节点去通过网络传输拉取需要自己处理的所有key然后对拉取到的所有相同的key使用我们自己编写的算子函数执行聚合操作比如reduceByKey()算子接收的函数。这个过程就是shuffle。 当我们在代码中执行了cache/persist等持久化操作时根据我们选择的持久化级别的不同每个task计算出来的数据也会保存到Executor进程的内存或者所在节点的磁盘文件中。 因此Executor的内存主要分为三块第一块是让task执行我们自己编写的代码时使用默认是占Executor总内存的20%第二块是让task通过shuffle过程拉取了上一个stage的task的输出后进行聚合等操作时使用默认也是占Executor总内存的20%第三块是让RDD持久化时使用默认占Executor总内存的60%。 task的执行速度是跟每个Executor进程的CPU core数量有直接关系的。一个CPU core同一时间只能执行一个线程。而每个Executor进程上分配到的多个task都是以每个task一条线程的方式多线程并发运行的。如果CPU core数量比较充足而且分配到的task数量比较合理那么通常来说可以比较快速和高效地执行完这些task线程。 以上就是Spark作业的基本运行原理的说明大家可以结合上图来理解。理解作业基本原理是我们进行资源参数调优的基本前提。 资源参数调优 了解完了Spark作业运行的基本原理之后对资源相关的参数就容易理解了。所谓的Spark资源参数调优其实主要就是对Spark运行过程中各个使用资源的地方通过调节各种参数来优化资源使用的效率从而提升Spark作业的执行性能。以下参数就是Spark中主要的资源参数每个参数都对应着作业运行原理中的某个部分我们同时也给出了一个调优的参考值。 num-executors 参数说明该参数用于设置Spark作业总共要用多少个Executor进程来执行。Driver在向YARN集群管理器申请资源时YARN集群管理器会尽可能按照你的设置来在集群的各个工作节点上启动相应数量的Executor进程。这个参数非常之重要如果不设置的话默认只会给你启动少量的Executor进程此时你的Spark作业的运行速度是非常慢的。参数调优建议每个Spark作业的运行一般设置50~100个左右的Executor进程比较合适设置太少或太多的Executor进程都不好。设置的太少无法充分利用集群资源设置的太多的话大部分队列可能无法给予充分的资源。executor-memory 参数说明该参数用于设置每个Executor进程的内存。Executor内存的大小很多时候直接决定了Spark作业的性能而且跟常见的JVM OOM异常也有直接的关联。参数调优建议每个Executor进程的内存设置4G~8G较为合适。但是这只是一个参考值具体的设置还是得根据不同部门的资源队列来定。可以看看自己团队的资源队列的最大内存限制是多少num-executors乘以executor-memory是不能超过队列的最大内存量的。此外如果你是跟团队里其他人共享这个资源队列那么申请的内存量最好不要超过资源队列最大总内存的1/3~1/2避免你自己的Spark作业占用了队列所有的资源导致别的同学的作业无法运行。executor-cores 参数说明该参数用于设置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程因此每个Executor进程的CPU core数量越多越能够快速地执行完分配给自己的所有task线程。参数调优建议Executor的CPU core数量设置为2~4个较为合适。同样得根据不同部门的资源队列来定可以看看自己的资源队列的最大CPU core限制是多少再依据设置的Executor数量来决定每个Executor进程可以分配到几个CPU core。同样建议如果是跟他人共享这个队列那么num-executors * executor-cores不要超过队列总CPU core的1/3~1/2左右比较合适也是避免影响其他同学的作业运行。driver-memory 参数说明该参数用于设置Driver进程的内存。参数调优建议Driver的内存通常来说不设置或者设置1G左右应该就够了。唯一需要注意的一点是如果需要使用collect算子将RDD的数据全部拉取到Driver上进行处理那么必须确保Driver的内存足够大否则会出现OOM内存溢出的问题。spark.default.parallelism 参数说明该参数用于设置每个stage的默认task数量。这个参数极为重要如果不设置可能会直接影响你的Spark作业性能。参数调优建议Spark作业的默认task数量为500~1000个较为合适。很多同学常犯的一个错误就是不去设置这个参数那么此时就会导致Spark自己根据底层HDFS的block数量来设置task的数量默认是一个HDFS block对应一个task。通常来说Spark默认设置的数量是偏少的比如就几十个task如果task数量偏少的话就会导致你前面设置好的Executor的参数都前功尽弃。试想一下无论你的Executor进程有多少个内存和CPU有多大但是task只有1个或者10个那么90%的Executor进程可能根本就没有task执行也就是白白浪费了资源因此Spark官网建议的设置原则是设置该参数为num-executors * executor-cores的2~3倍较为合适比如Executor的总CPU core数量为300个那么设置1000个task是可以的此时可以充分地利用Spark集群的资源。spark.storage.memoryFraction 参数说明该参数用于设置RDD持久化数据在Executor内存中能占的比例默认是0.6。也就是说默认Executor 60%的内存可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略如果内存不够时可能数据就不会持久化或者数据会写入磁盘。参数调优建议如果Spark作业中有较多的RDD持久化操作该参数的值可以适当提高一些保证持久化的数据能够容纳在内存中。避免内存不够缓存所有的数据导致数据只能写入磁盘中降低了性能。但是如果Spark作业中的shuffle类操作比较多而持久化操作比较少那么这个参数的值适当降低一些比较合适。此外如果发现作业由于频繁的gc导致运行缓慢通过spark web ui可以观察到作业的gc耗时意味着task执行用户代码的内存不够用那么同样建议调低这个参数的值。spark.shuffle.memoryFraction 参数说明该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后进行聚合操作时能够使用的Executor内存的比例默认是0.2。也就是说Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时如果发现使用的内存超出了这个20%的限制那么多余的数据就会溢写到磁盘文件中去此时就会极大地降低性能。参数调优建议如果Spark作业中的RDD持久化操作较少shuffle操作较多时建议降低持久化操作的内存占比提高shuffle操作的内存占比比例避免shuffle过程中数据过多时内存不够用必须溢写到磁盘上降低了性能。此外如果发现作业由于频繁的gc导致运行缓慢意味着task执行用户代码的内存不够用那么同样建议调低这个参数的值。资源参数的调优没有一个固定的值需要同学们根据自己的实际情况包括Spark作业中的shuffle操作数量、RDD持久化操作数量以及spark web ui中显示的作业gc情况同时参考本篇文章中给出的原理以及调优建议合理地设置上述参数。 资源参数参考示例 以下是一份spark-submit命令的示例大家可以参考一下并根据自己的实际情况进行调节 ./bin/spark-submit \--master yarn-cluster \--num-executors 100 \--executor-memory 6G \--executor-cores 4 \--driver-memory 1G \--conf spark.default.parallelism1000 \--conf spark.storage.memoryFraction0.5 \--conf spark.shuffle.memoryFraction0.3 \根据实践经验来看大部分Spark作业经过本次基础篇所讲解的开发调优与资源调优之后一般都能以较高的性能运行了足以满足我们的需求。但是在不同的生产环境和项目背景下可能会遇到其他更加棘手的问题比如各种数据倾斜也可能会遇到更高的性能要求。为了应对这些挑战需要使用更高级的技巧来处理这类问题。在后续的《Spark性能优化指南——高级篇》中我们会详细讲解数据倾斜调优以及Shuffle调优。