新网网站空间,php响应式个人博客网站设计,微信认证 网站,什么招聘网最好找工作文章目录1.什么是Shuffle?2.Shuffle解决什么问题?3.Shuffle Write与Shuffle Read4.Shuffle的计算需求4.1 计算需求表4.2 partitionby4.3 groupByKey4.4 reduceByKey4.5 sortByKey5.Shuffle Write框架设计与实现5.1 Shuffle Write框架实现的功能5.2 Shuffle Write的多种情况5.…
文章目录1.什么是Shuffle?2.Shuffle解决什么问题?3.Shuffle Write与Shuffle Read4.Shuffle的计算需求4.1 计算需求表4.2 partitionby4.3 groupByKey4.4 reduceByKey4.5 sortByKey5.Shuffle Write框架设计与实现5.1 Shuffle Write框架实现的功能5.2 Shuffle Write的多种情况5.2.1 不需要combine和sort5.2.1.1 操作流程5.2.1.2 优缺点5.2.1.3 适用性5.2.2 不需要combine需要sort5.2.2.1 操作流程5.2.2.2 优缺点5.2.2.3 适用性5.2.3 需要combile需要/不需要sort5.2.3.1 操作流程5.2.3.2 优缺点5.2.3.3 适用性6.Shuffle Read框架设计与实现6.1 Shuffle Read框架实现的功能6.2 Shuffle Read的不同情况6.2.1 不需要combine和sort6.2.1.1 操作流程6.2.1.2 优缺点6.2.1.3 适用性6.2.2 不需要combine需要sort6.2.2.1 操作流程6.2.2.2 优缺点6.2.2.3 适用性6.2.3 需要combine需要/不需要sort6.2.3.1 操作流程6.2.3.2 优缺点6.2.3.3 适用性阅读本篇文章前需要阅读
Spark执行计划与UI分析1.什么是Shuffle?
运行在不同stage、不同节点上的task间如何进行数据传递。这个数据传递过程通常被称为Shuffle机制。
2.Shuffle解决什么问题?
如果是单纯的数据传递则只需要将数据进行分区、通过网络传输即可没有太大难度但Shuffle机制还需要进行各种类型的计算如聚合、排序而且数据量一般会很大。如何支持这些不同类型的计算如何提高Shuffle的性能都是Shuffle机制设计的难点问题。
3.Shuffle Write与Shuffle Read
Shuffle Write上游stage预先将输出数据进行划分按照分区存放分区个数与下游task个数一致这个过程被称为Shuffle Write。Shuffle Read上游数据按照分区存放完成后下游的task将属于自己分区的数据通过网络传输获取然后将来自上游不同分区的数据聚合再一起处理这个过程称为Shuffle Read。
4.Shuffle的计算需求
4.1 计算需求表
所谓计算需求也就是Shuffle要解决具体算子的哪些计算需求 这里我来分析几个例子
4.2 partitionby 可以看到partitionby操作只进行了数据分区操作并没有涉及到数据的聚合和排序操作。
4.3 groupByKey 可以看到groupByKey的操作既需要分区又需要做聚合并且在Shuffle Read阶段做的聚合。
4.4 reduceByKey 可以看到reduceByKey做了两步聚合在Shuffle Write中先执行func聚合一次由spark内部执行不生成新的rdd然后进行分区数据传输最后再在每个分区聚合一次执行相同的func函数。同时func需要满足交换律和结合律。两次聚合多了Shuffle Write端聚合的优点是优化Shuffle的性能一是传输的数据量大大减少二是降低Shuffle Read端的内存消耗。
4.5 sortByKey 分区后在ShuffleRead端进行排序。sortByKey 为了保证生成的RDD中的数据是全局有序按照Key排序 的 采用Range划分来分发数据。 Range划分可以保证在生成的RDD中 partition 1中的所有record的Key小于或大于 partition 2中所有的record的Key。 可以看到当前并没有算子需要在Shuffle Write端进行排序的但不能保证用户实现的算子不会在Shuffle Write端进行排序因此在spark实现Shuffle框架的时候保留了在Shuffle Write端进行排序的功能。
5.Shuffle Write框架设计与实现
5.1 Shuffle Write框架实现的功能
如第四节中的图所示每个数据操作只需要其中的一个或两个功能。Spark为了支持所有的情况设计了一个通用的Shuffle Write框架框架的计算顺序为“map输出→数据聚合→排序→分区”输出。 map task每计算出一个record及其partitionId就将record放入类似HashMap的数据结构中进行聚合聚合完成后再将HashMap中的数据放入类似Array的数据结构中进行排序既可按照partitionId也可以按照partitionIdKey进行排序最后根据partitionId将数据写入不同的数据分区中存放到本地磁盘上。partitionIdHashKey% 下游分区数。
5.2 Shuffle Write的多种情况
5.2.1 不需要combine和sort 这种Shuffle Write方式称为BypassMergeSortShuffleWriter 这种情况最简单,只需要实现分区功能
5.2.1.1 操作流程
map()依次输出KV record并计算其partitionId(PID)Spark根据 partitionId将record依次输出到不同的buffer中每当buffer填满就将record溢写到磁盘上的分区文件中。分配buffer的原因是map()输出record的速度很快需要进行缓冲来减少磁盘I/O。
5.2.1.2 优缺点
该模式的优点是速度快直接将record输出到不同文件中。缺点是资源消耗过高每个分区都需要有一个buffer默认大小为32KB由spark.Shuffle.file.buffer进行控制当分区数过大时内存消耗会很高。
5.2.1.3 适用性
适用于Shuffle Write端不需要聚合和排序且分区个数较少小于spark.Shuffle.sort.bypassMergeThreshold默认值为200例如groupBy(100)partitionBy(100)sortByKey(100)。
5.2.2 不需要combine需要sort 这种Shuffle模式被命名为SortShuffleWriter(KeyOrderingtrue)使用的Array被命名为PartitionedPairBuffer。
5.2.2.1 操作流程
这种情况需要使用partitionIdkey进行排序Spark采用的实现方法是建立一个ArrayPartitionedPairBuffer来存放map()输出的record并将每个K,Vrecord转化为(PartitionId,K),Vrecord然后按照PartitionIdKey对record进行排序最后将所有record写入写入一个文件中通过建立索引来标示每个分区。如果Array存放不下则会先扩容如果还存放不下就将Array中的record排序后spill到磁盘上等待map()输出完以后再将Array中的record与磁盘上已排序的record进行全局排序得到最终有序的record并写入文件中。
5.2.2.2 优缺点
优点是只需要一个Array结构就可以支持按照partitionIdKey进行排序Array大小可控而且具有扩容和spill到磁盘上的功能支持从小规模到大规模数据的排序。同时输出的数据已经按照partitionId进行排序因此只需要一个分区文件存储即可标示不同的分区数据克服了BypassMergeSortShuffleWriter中建立文件数过多的问题适用于分区个数很大的情况。缺点是排序增加计算时延。
5.2.2.3 适用性
map()端不需要聚合combine、Key需要排序、分区个数无限制。目前Spark本身没有提供这种排序类型的数据操作但不排除用户会自定义或者系统未来会提供这种类型的操作。sortByKey操作虽然需要按Key进行排序但这个排序过程在Shuffle Read端完成即可不需要在Shuffle Write端进行排序。
最后使用这种Shuffle如何解决BypassMergeSortShuffleWriter存在的buffer分配过多的问题我们只需要将“按PartitionIdKey排序”改为“只按PartitionId排序”就可以支持“不需要map()端combine、不需要按照Key进行排序分区个数过大”的操作。例如groupByKey(300)、partitionBy(300)、sortByKey(300)。
5.2.3 需要combile需要/不需要sort 这种Shuffle模式被称为sort-based Shuffle Write哈希表为PartitionedAppendOnlyMap
5.2.3.1 操作流程
需要实现按Key进行聚合combine的功能Spark采用的实现方法是建立一个类似HashMap的数据结构对map()输出的record进行聚合。HashMap中的Key是“partitionIdKey”HashMap中的Value是经过相同combine的聚合结果。在图中combine是sum函数那么Value中存放的是多个record对应的Value相加的结果。聚合完成后Spark对HashMap中的record进行排序。如果不需要按Key进行排序如上图所示那么只按partitionId进行排序如果需要按Key进行排序如图6.7的下图所示那么按partitionIdKey进行排序。最后将排序后的record写入一个分区文件中。其中使用的hash表既可以实现聚合功能也可以实现排序功能。如果HashMap存放不下则会先扩容为两倍大小如果还存放不下就将HashMap中的record排序后spill到磁盘上。此时HashMap被清空可以继续对map()输出的record进行聚合如果内存再次不够用那么继续spill到磁盘上此过程可以重复多次。当map()输出完成以后将此时HashMap中的reocrd与磁盘上已排序的record进行再次聚合(merge)得到最终的record输出到分区文件中。
5.2.3.2 优缺点
优缺点同5.4.2
5.2.3.3 适用性
适合map()端聚合combine、需要或者不需要按Key进行排序、分区个数无限制的应用如reduceByKey、aggregateByKey等。
6.Shuffle Read框架设计与实现
6.1 Shuffle Read框架实现的功能 reduce task不断从各个map task的分区文件中获取数据Fetch records然后使用类似HashMap的结构来对数据进行聚aggregate该过程是边获取数据边聚合。聚合完成后将HashMap中的数据放入类似Array的数据结构中按照Key进行排序sort byKey最后将排序结果输出或者传递给下一个操作。
6.2 Shuffle Read的不同情况
6.2.1 不需要combine和sort 6.2.1.1 操作流程
这种情况最简单只需要实现数据获取功能即可。等待所有的map task结束后reduce task开始不断从各个map task获取K,Vrecord并将record输出到一个buffer中大小为spark.reducer.maxSizeInFlight48MB下一个操作直接从buffer中获取数据即可。
6.2.1.2 优缺点
优点是逻辑和实现简单内存消耗很小。缺点是不支持聚合、排序等复杂功能。
6.2.1.3 适用性
适合既不需要聚合也不需要排序的应用如partitionBy等。
6.2.2 不需要combine需要sort 使用的Array结构PartitionedPairBuffer
6.2.2.1 操作流程
获取数据后将buffer中的record依次输出到一个Array结构PartitionedPairBuffer中。由于这里采用了本来用于Shuffle Write端的PartitionedPairBuffer结构所以还保留了每个record的partitionId。然后对Array中的record按照Key进行排序并将排序结果输出或者传递给下一步操作。当内存无法存下所有的record时PartitionedPairBuffer将record排序后spill到磁盘上最后将内存中和磁盘上的record进行全局排序得到最终排序后的record。
6.2.2.2 优缺点
优点是只需要一个Array结构就可以支持按照Key进行排序Array大小可控而且具有扩容和spill到磁盘上的功能不受数据规模限制。缺点是排序增加计算时延。
6.2.2.3 适用性
适合reduce端不需要聚合但需要按Key进行排序的操作如sortByKey()、sortBy()等。
6.2.3 需要combine需要/不需要sort 哈希表ExternalAppendOnlyMap
6.2.3.1 操作流程
获取record后Spark建立一个类似HashMap的数据结构ExternalAppendOnlyMap对buffer中的record进行聚合HashMap中的Key是record中的KeyHashMap中的Value是经过相同聚合函数func计算后的结果。聚合函数是sum函数那么Value中存放的是多个record对应Value相加后的结果。之后如果需要按照Key进行排序如下图所示则建立一个Array结构读取HashMap中的record并对record按Key进行排序排序完成后将结果输出或者传递给下一步操作。如果HashMap存放不下则会先扩容为两倍大小如果还存放不下就将HashMap中的record排序后spill到磁盘上。此时HashMap被清空可以继续对buffer中的record进行聚合。如果内存再次不够用那么继续spill到磁盘上此过程可以重复多次。当聚合完成以后将此时HashMap中的reocrd与磁盘上已排序的record进行再次聚合得到最终的record输出到分区文件中。 注意这里的排序和聚合依然使用的同一个数据结构。
6.2.3.2 优缺点
同上一节。
6.2.3.3 适用性
适合reduce端需要聚合、不需要或需要按Key进行排序的操作如reduceByKey、aggregateByKey等。