手机网站开发教程视频,hao爱做网站,主页面设计图片,哈尔滨百度推广联系人第 5 章 Spark Shuffle 解析 5.1 Shuffle 的核心要点1. 数据分区#xff1a;2.数据传输#xff1a;3. 数据排序#xff1a;4.数据聚合#xff1a;5. 数据重分发#xff1a;6.数据持久化#xff1a;5.1.1 ShuffleMapStage 与 ResultStage 5.2 HashShuffle 解析5.2.1 未优化… 第 5 章 Spark Shuffle 解析 5.1 Shuffle 的核心要点1. 数据分区2.数据传输3. 数据排序4.数据聚合5. 数据重分发6.数据持久化5.1.1 ShuffleMapStage 与 ResultStage 5.2 HashShuffle 解析5.2.1 未优化的 HashShuffle5.2.2 优化后的 HashShuffle 5.3 SortShuffle 解析5.3.1 普通 SortShuffle5.3.2 bypass SortShuffle 5.1 Shuffle 的核心要点
Shuffle 是在数据处理和计算过程中的一个重要操作主要用于打乱数据的顺序和重新分配数据。在大数据处理和分布式计算中Shuffle 是实现数据传输、数据聚合和任务分发的关键步骤之一。以下是 Shuffle 的核心要点
1. 数据分区
Shuffle 首先将原始数据根据某种规则如哈希函数划分为多个数据分区以便后续的数据处理和计算操作。每个数据分区通常包含一部分原始数据且可以被并行处理。 数据分区是在分布式计算中将数据划分为多个逻辑块或分区的过程。数据分区是为了将大规模数据集拆分成小块以便在计算节点上并行处理和分配计算任务。数据分区的目的是实现数据的并行处理、负载均衡和提高计算性能。
在数据分区过程中数据根据一定的规则或策略被分配到不同的分区中。每个分区通常包含一部分数据可以由一个或多个计算节点进行处理。常见的数据分区方法包括 哈希分区Hash Partitioning根据数据的哈希值将数据均匀地分配到不同的分区中。这样可以确保具有相同哈希值的数据项在同一个分区中从而便于后续的聚合和处理操作。 范围分区Range Partitioning根据数据的排序或范围条件将数据划分为不同的分区。例如将数据按照数值的大小或时间的先后顺序进行范围分区。 列分区Column Partitioning根据数据的列值将数据分配到不同的分区中。例如在关系型数据库中可以根据表的某个列进行分区将具有相同列值的数据放在同一个分区中。 轮流分区Round-robin Partitioning按照轮询的方式将数据依次分配到不同的分区中。每个分区依次接收一个数据项直到所有数据项都被分配完毕。 自定义分区Custom Partitioning根据特定的业务需求自定义实现数据分区策略。例如根据地理位置、用户ID等自定义规则进行数据分区。
数据分区的目标是将数据均匀地分配到不同的计算节点上以便实现并行处理和负载均衡。通过合理的数据分区策略可以提高分布式计算的性能和效率并更好地利用计算资源。在具体的分布式计算框架中可以根据需求和场景选择合适的数据分区方法和分区策略。
2.数据传输
在 Shuffle 过程中数据分区需要从计算节点传输到不同的节点以便进行后续的数据处理或计算。数据传输可能发生在网络中涉及节点间的数据交换。
数据传输是指在计算系统中将数据从一个节点传输到另一个节点的过程。在分布式计算和数据处理中数据传输是一个重要的环节涉及计算节点之间的数据交换和通信。
数据传输可以发生在以下场景中 数据分发在分布式计算中数据需要从一个节点传输到多个节点以便并行处理。这种数据传输通常发生在任务启动时将输入数据分发给各个计算节点。 Shuffle传输Shuffle是在数据处理过程中的一个重要操作涉及数据重新分区、排序和合并。在Shuffle过程中数据需要从Map节点传输到Reduce节点以进行数据重分区和分组操作。 数据合并在一些情况下多个节点上的计算结果需要进行合并以生成最终的计算结果。这涉及将数据从各个节点传输到一个节点并进行合并操作。
数据传输可以通过多种方式进行包括但不限于 网络传输数据可以通过计算节点之间的网络进行传输。这可能涉及节点之间的数据交换、消息传递和网络通信。 磁盘传输数据可以通过将数据写入磁盘并在其他节点上读取磁盘上的数据来进行传输。这通常用于将数据从一个节点传输到另一个节点特别是在Shuffle过程中。 内存传输如果计算节点之间有共享的内存或高速连接数据可以通过内存进行传输。这种方式通常比网络传输更快并且在某些情况下可以减少数据的序列化和反序列化开销。
在设计和实现分布式计算系统时需要考虑数据传输的效率和性能。合理的数据传输策略和机制可以减少数据传输的开销、降低网络延迟并提高整体的计算性能和效率。这包括优化网络带宽利用、数据压缩、缓存机制和并行传输等技术。 数据持久化是指将数据存储到长期存储介质如磁盘、数据库、文件系统等中以便在程序运行结束后仍然能够访问和使用数据的过程。
数据持久化的目的是确保数据在程序运行结束后的持久性和可靠性以便后续的读取、查询、分析或其他操作。通过数据持久化数据可以长期保存随时可以被读取和处理即使在系统重启或程序重新运行后也能保留数据状态。 3. 数据排序
在 Shuffle 过程中对于需要排序的数据会对每个数据分区内的数据进行排序。排序操作确保后续的聚合和计算操作能够有效地处理数据。
数据排序是将一组数据按照指定的排序规则进行重新排列的过程。在计算和数据处理中数据排序是常见的操作用于整理数据、提取有序子集或为后续计算提供有序输入。
数据排序通常基于某个特定的排序键或排序规则按照指定的顺序对数据进行排列。常见的排序规则包括升序从小到大和降序从大到小。排序键可以是任何可比较的数据类型如整数、浮点数、字符串等。
在排序过程中通常使用排序算法来实现。常见的排序算法包括冒泡排序、插入排序、选择排序、快速排序、归并排序等。这些算法在时间复杂度和空间复杂度上有所差异适用于不同规模和类型的数据集。
数据排序的目的主要有两个 数据整理排序可以将数据按照指定的顺序进行整理使其更易于处理和分析。有序的数据可以提供更好的查询性能和更高效的数据操作。 数据分析和计算在一些计算场景中有序的数据可以更有效地进行计算。例如排序后的数据可以更好地利用二分查找、归并操作和其他分析算法。
数据排序在各种领域和应用中都有广泛的应用。在数据库系统中排序用于执行ORDER BY查询、索引维护和连接操作。在大数据处理和分布式计算中排序常用于数据预处理、Shuffle阶段和结果合并等操作。
对于大规模的数据集数据排序可能需要考虑性能、资源消耗和分布式环境下的并行处理。一些优化技术如外部排序、并行排序和分布式排序可以用于处理大规模数据的排序需求。
总之数据排序是对数据按照指定规则进行重新排列的过程它在数据处理和计算中起到重要的作用提供了有序数据的基础。
4.数据聚合
在 Shuffle 过程中根据业务需求可以进行数据的合并和聚合操作。例如对相同键的数据进行合并计算键对应的总和、平均值等统计量。
数据聚合是将多个数据项合并为一个或多个汇总结果的过程。在计算和数据处理中数据聚合是一种常见的操作用于计算统计信息、生成摘要信息或将数据集合并为更小的集合。
数据聚合可以应用于不同类型的数据包括数值数据、文本数据和结构化数据等。聚合操作可以根据具体需求进行不同的计算如求和、平均值、最大值、最小值、计数、频率统计、分组统计等。
以下是一些常见的数据聚合操作 求和Sum将一组数值相加得到总和。 平均值Average将一组数值相加后除以数量得到平均值。 最大值Maximum从一组数值中找到最大值。 最小值Minimum从一组数值中找到最小值。 计数Count统计一组数据中的元素数量。 频率统计Frequency Count统计一组数据中不同元素的出现次数。 分组统计Group By根据某个特定的属性对数据进行分组然后对每个组进行聚合操作。
数据聚合可以应用于不同的数据处理场景包括数据分析、数据挖掘、报告生成和业务决策等。在大规模数据处理和分布式计算中数据聚合通常涉及对分布在不同节点上的数据进行并行计算和合并。
对于大规模数据集和复杂的聚合操作需要考虑性能、资源消耗和分布式环境下的并行计算。一些优化技术如局部聚合、合并树、混合聚合等可以用于提高聚合操作的性能和效率。
总之数据聚合是将多个数据项合并为一个或多个汇总结果的过程。它在数据处理和计算中起到重要的作用用于计算统计信息、生成摘要信息或将数据集合并为更小的集合。 5. 数据重分发
在 Shuffle 过程中经过排序和聚合后需要将数据重新分发到不同的计算节点上以便进一步的计算操作。数据重分发通常涉及将数据分发到正确的节点以满足后续计算任务的需求。
数据重分发是在分布式计算中将数据从一个节点重新分发到另一个节点的过程。它通常发生在Shuffle阶段用于重新分配和组合数据以便进行后续的计算和处理。
数据重分发的主要目的是将具有相同key的数据项聚集到同一个节点上以便进行分组、合并或聚合操作。它是实现分布式计算中数据重组和数据交换的重要环节。
数据重分发的过程如下 Map阶段在Map阶段输入数据被映射为(key, value)对并根据指定的分区器将它们分配到不同的分区中。每个分区的数据被分散存储在计算节点的内存或磁盘中。 Shuffle阶段 数据分区在Shuffle阶段根据分区器的规则计算节点上的每个分区会被划分为多个数据块chunk。每个数据块包含具有相同key的数据项。数据重分发在数据重分发过程中各个分区的数据项将根据key的哈希值或其他分区规则重新分配到不同的节点上。这通常涉及数据的网络传输或磁盘写入和读取操作。 Reduce阶段在Reduce阶段重新分发的数据项到达相应的节点并被组合到相同key的数据集中。这样可以在每个节点上进行后续的分组、合并或聚合操作。
数据重分发的实现方式可以根据具体的分布式计算框架和算法进行选择和优化。在分布式计算系统中数据重分发的效率和性能对整体计算的速度和可靠性有重要影响。因此需要考虑网络带宽、数据传输延迟、数据量、节点负载等因素选择合适的数据重分发策略和机制。一些优化技术如数据压缩、数据本地化、数据合并等可以用于提高数据重分发的效率和性能。 6.数据持久化
在 Shuffle 过程中为了保持数据的可靠性和容错性通常需要将数据持久化存储以便在出现故障或需要恢复时使用。
Shuffle 的性能和效率对于大数据处理和分布式计算来说非常重要。优化 Shuffle 过程可以提高整体的计算性能和效率减少数据传输和网络开销并避免数据倾斜和资源浪费问题。
需要根据具体的数据处理框架和场景来实现 Shuffle 操作。一些常见的大数据处理框架如Apache Hadoop、Apache Spark等提供了内置的 Shuffle 支持以简化和优化数据处理过程中的 Shuffle 操作。
数据持久化是指将数据存储到长期存储介质如磁盘、数据库、文件系统等中以便在程序运行结束后仍然能够访问和使用数据的过程。
数据持久化的目的是确保数据在程序运行结束后的持久性和可靠性以便后续的读取、查询、分析或其他操作。通过数据持久化数据可以长期保存随时可以被读取和处理即使在系统重启或程序重新运行后也能保留数据状态。
数据持久化的方式和方法可以根据具体的应用场景和需求进行选择常见的数据持久化方式包括 关系型数据库使用关系型数据库管理系统如MySQL、Oracle、PostgreSQL等进行数据的持久化存储和管理。关系型数据库提供结构化的数据存储和丰富的查询功能适用于需要事务支持和复杂数据模型的应用。 NoSQL数据库使用NoSQL数据库如MongoDB、Redis、Cassandra等进行数据的持久化存储。NoSQL数据库提供灵活的数据模型和高吞吐量的数据访问适用于大规模数据存储和快速读写的应用场景。 文件系统将数据以文件的形式存储在文件系统中可以使用文本文件、JSON文件、XML文件等格式进行存储。文件系统提供简单的数据持久化方式适用于小型数据集或简单数据结构的应用。 分布式存储系统使用分布式存储系统如Hadoop HDFS、Amazon S3、Google Cloud Storage等进行大规模数据的分布式持久化存储。分布式存储系统提供高容量、高可靠性和可扩展性的数据存储解决方案。 内存数据库将数据存储在内存中进行快速访问和处理常见的内存数据库包括Redis、Memcached等。内存数据库提供低延迟的数据访问和高性能的数据处理适用于对实时性要求较高的应用。
在选择数据持久化方式时需要考虑数据的性质、访问模式、数据量、性能需求、可靠性要求等因素。不同的数据持久化方式有不同的优势和限制需要根据具体的业务需求和系统要求进行权衡和选择。 5.1.1 ShuffleMapStage 与 ResultStage
ShuffleMapStage和ResultStage是Apache Spark中两个关键的概念用于表示Spark中的任务执行流程和数据流转过程。
ShuffleMapStage ShuffleMapStage是Spark中的一个执行阶段Stage用于执行包含Shuffle操作的任务。在Spark的执行过程中当需要进行Shuffle操作如reduceByKey、groupBy等时会触发一个ShuffleMapStage。ShuffleMapStage将数据进行重新分区和排序并将结果写入磁盘上的中间文件shuffle文件。
ShuffleMapStage包含一组任务Tasks每个任务负责处理输入数据的一个分区并生成Shuffle输出数据的一部分。每个任务在一个计算节点上执行可以并行处理多个分区以提高执行效率。执行ShuffleMapStage的任务之间通常是相互独立的可以并行执行。
ResultStage ResultStage是Spark中的另一个执行阶段Stage用于执行不包含Shuffle操作的任务。ResultStage接收ShuffleMapStage的输出数据中间文件并根据任务的计算逻辑进行计算和处理。ResultStage的输入数据已经根据键进行了分组和排序可以直接进行后续的聚合、过滤或其他操作。
ResultStage中的任务Tasks执行并行处理数据每个任务负责处理一部分输入数据可以在不同的计算节点上并行执行。任务之间相互独立可以同时进行计算和处理。
ShuffleMapStage和ResultStage是Spark中任务执行流程中的两个关键阶段。ShuffleMapStage负责处理包含Shuffle操作的任务并生成Shuffle输出数据而ResultStage接收ShuffleMapStage的输出数据进行进一步的计算和处理。这两个阶段的协同工作实现了数据的重新分区、排序和计算过程以提供高性能和高效率的数据处理能力。 在划分 stage 时最后一个 stage 称为 finalStage它本质上是一个 ResultStage 对象前 面的所有 stage 被称为 ShuffleMapStage。ShuffleMapStage 的结束伴随着 shuffle 文件的写磁盘。ResultStage 基本上对应代码中的 action 算子即将一个函数应用在 RDD 的各个 partition 的数据集上意味着一个 job 的运行结束。 5.2 HashShuffle 解析
HashShuffle是一种常见的Shuffle实现策略用于在分布式计算中重新分配和组合数据。在HashShuffle中数据的重新分发和分组是基于哈希函数的。
HashShuffle的过程如下 Map阶段在Map阶段输入数据被映射为(key, value)对并根据指定的分区器将它们分配到不同的分区中。每个分区的数据被分散存储在计算节点的内存中。 Shuffle阶段 数据分区在Shuffle阶段计算节点上的每个分区会被划分为多个数据块chunk。每个数据块包含相同key的数据项。数据排序对于每个数据块数据项根据key进行排序。排序的目的是将具有相同key的数据项相邻存储以方便后续的分组操作。 Reduce阶段在Reduce阶段数据项被重新组合具有相同key的数据项被分组到同一个计算节点上的Reducer任务中进行处理。Reducer任务对每个key的数据项进行聚合、计算等操作生成最终的结果。
HashShuffle的优点是简单且高效适用于处理大规模数据集。它通过使用哈希函数将数据重新分配到不同的计算节点上并通过数据块和排序操作提高了后续分组操作的效率。然而HashShuffle也有一些限制例如需要足够的内存来存储数据块和排序操作可能引入的磁盘IO开销。
值得注意的是Spark和其他分布式计算框架通常提供多种Shuffle实现策略例如SortShuffle、TungstenSort等以根据不同的场景和需求选择最合适的Shuffle策略。这些策略可能在性能、内存消耗和网络开销等方面有所不同开发人员可以根据具体的应用场景选择合适的Shuffle实现方式。
这里我们先明确一个假设前提每个 Executor 只有 1 个 CPU core也就是说无论这个 Executor 上分配多少个 task 线程同一时间都只能执行一个 task 线程。如下图中有 3 个 Reducer从 Task 开始那边各自把自己进行 Hash 计算(分区器hash/numreduce 取模)分类出 3 个不同的类别每个 Task 都分成 3 种类别的数据想把不同的数据汇聚然后计算出最终的结果所以 Reducer 会在每个 Task 中把属于自己类别的数据收集过来汇聚成一个同类别的大集合每 1 个 Task 输出 3 份本地文件这里有 4 个Mapper Tasks所以总共输出了 4 个 Tasks x 3 个分类文件 12 个本地小文件。 5.2.1 未优化的 HashShuffle
未优化的HashShuffle是指在分布式计算中使用基本的HashShuffle实现策略没有进行额外的优化措施。它可以被视为最简单和最基本的Shuffle实现方式但可能存在性能瓶颈和资源利用不足的问题。
未优化的HashShuffle的特点如下 数据传输在未优化的HashShuffle中所有的数据都会通过网络传输包括从Mapper节点到Reducer节点的数据传输和中间数据的传输。这可能导致网络带宽和延迟成为性能瓶颈并且可能造成资源的浪费。 数据持久化未优化的HashShuffle通常需要将中间数据写入磁盘以便在Reducer节点上进行排序和合并操作。这可能导致磁盘IO成为瓶颈并且可能导致额外的存储开销。 数据排序在未优化的HashShuffle中中间数据的排序操作可能是基于磁盘的即需要将数据读取到内存中进行排序。这可能会导致大量的磁盘IO操作和额外的计算开销。 资源利用未优化的HashShuffle可能没有充分利用计算节点的资源如内存和CPU。由于数据传输、磁盘IO和排序操作的开销可能导致资源利用率低下降低了整体的计算性能和效率。
对于大规模的数据集和复杂的计算任务未优化的HashShuffle可能无法满足性能和可扩展性的要求。因此在实际的分布式计算环境中通常需要进行Shuffle的优化以减少数据传输、磁盘IO和排序操作的开销提高计算性能和资源利用率。这可以通过采用更高级的Shuffle实现策略如SortShuffle、TungstenSort、调整Shuffle的参数和配置以及使用缓存和内存管理等技术来实现。 5.2.2 优化后的 HashShuffle
优化后的HashShuffle是指在分布式计算中对HashShuffle进行了一系列的优化措施以提高性能、减少资源消耗和优化数据传输过程。以下是一些常见的HashShuffle优化技术 拷贝优化为了减少数据传输的网络开销可以采用拷贝优化技术。在Map阶段可以在Map任务本地生成中间结果并将其拷贝到Reducer任务所在的节点上。这样可以减少数据的传输量提高数据传输的效率。 聚合优化在Shuffle阶段可以进行本地聚合操作即在Mapper节点上对具有相同键的数据项进行局部聚合。这样可以减少中间数据量并减少数据传输的开销。 压缩优化在数据传输过程中可以使用压缩技术对数据进行压缩。压缩后的数据可以减少网络传输的带宽消耗并降低数据传输的延迟。 内存优化在Shuffle阶段可以针对中间数据的排序和合并操作进行内存优化。例如使用基于磁盘的排序算法避免将所有数据加载到内存中进行排序以减少内存消耗。 磁盘IO优化对于磁盘IO操作可以采用批量写入和异步IO等技术减少磁盘读写的次数提高IO的效率。 基于内存的Shuffle使用内存进行数据传输和排序操作可以极大地提高Shuffle的性能。这可以通过使用堆外内存或内存映射文件来实现。 动态分区优化根据数据分布和负载情况动态调整分区策略和分区数量以实现负载均衡和提高计算性能。 数据倾斜处理对于可能出现的数据倾斜问题可以采用一些技术如数据重分区、局部聚合、采样和随机化等以均衡数据分布并提高计算效率。
这些优化措施可以根据具体的分布式计算框架和应用场景进行选择和应用。通过对HashShuffle的优化可以提高数据处理的性能和效率减少资源的消耗并提升分布式计算系统的整体可靠性和可扩展性。
优化的 HashShuffle 过程就是启用合并机制合并机制就是复用 buffer开启合并机制的配置是spark.shuffle.consolidateFiles。该参数默认值为 false将其设置为 true 即可开启优化机制。通常来说如果我们使用 HashShuffleManager那么都建议开启这个选项。这里还是有 4 个 Tasks数据类别还是分成 3 种类型因为 Hash 算法会根据你的 Key 进行分类在同一个进程中无论是有多少过 Task都会把同样的 Key 放在同一个 Buffer里然后把 Buffer 中的数据写入以 Core 数量为单位的本地文件中(一个 Core 只有一种类型的 Key 的数据)每 1 个 Task 所在的进程中分别写入共同进程中的 3 份本地文件这里有 4 个 Mapper Tasks所以总共输出是 2 个 Cores x 3 个分类文件 6 个本地小文件。 5.3 SortShuffle 解析
SortShuffle是Apache Spark中的一种Shuffle实现策略用于在分布式计算中重新分配和组合数据。SortShuffle通过排序操作对数据进行重新分区和组合以提高数据传输和聚合操作的效率。
SortShuffle的过程如下 Map阶段在Map阶段输入数据被映射为(key, value)对并根据指定的分区器将它们分配到不同的分区中。每个分区的数据被分散存储在计算节点的内存中。 Shuffle阶段 数据分区在Shuffle阶段计算节点上的每个分区会被划分为多个数据块chunk。每个数据块包含相同key的数据项。数据排序对于每个数据块数据项根据key进行排序。排序的目的是将具有相同key的数据项相邻存储以方便后续的分组操作。 Reduce阶段在Reduce阶段数据项被重新组合具有相同key的数据项被分组到同一个计算节点上的Reducer任务中进行处理。Reducer任务对每个key的数据项进行聚合、计算等操作生成最终的结果。
SortShuffle相较于未优化的HashShuffle具有以下优点 数据局部性SortShuffle通过排序操作将具有相同key的数据项在内存中相邻存储。这样可以提高数据的局部性减少数据传输的网络开销。 内存效率SortShuffle利用排序操作可以将具有相同key的数据项紧密存储从而减少内存的使用量。这可以提高内存的利用效率并减少额外的存储开销。 聚合效率由于具有相同key的数据项已经相邻存储Reduce阶段的聚合操作可以更加高效地进行。Reducer任务可以逐个读取具有相同key的数据项并在内存中进行聚合计算而无需频繁地访问磁盘。
尽管SortShuffle提供了优化的数据传输和聚合操作但它也存在一些限制。在数据倾斜的情况下具有相同key的数据项可能不均匀地分布在不同的节点上从而导致性能不均衡。此外SortShuffle可能在内存不足的情况下导致大量的磁盘IO操作。
在实际应用中可以根据数据集的特点和应用需求选择合适的Shuffle实现策略。除了SortShuffle还有其他的Shuffle实现策略可供选择如HashShuffle、TungstenSort等。选择合适的Shuffle策略可以根据数据的大小、计算负载、可用资源等因素进行评估和权衡。
5.3.1 普通 SortShuffle
普通的SortShuffle是指在分布式计算中基于排序的Shuffle实现方式用于重新分配和组合数据。它通过对数据进行排序操作以提高数据传输和聚合操作的效率。
普通SortShuffle的过程如下 Map阶段在Map阶段输入数据被映射为(key, value)对并根据指定的分区器将它们分配到不同的分区中。每个分区的数据被分散存储在计算节点的内存中。 Shuffle阶段 数据分区在Shuffle阶段计算节点上的每个分区会被划分为多个数据块chunk。每个数据块包含相同key的数据项。数据排序对于每个数据块数据项根据key进行排序。排序的目的是将具有相同key的数据项相邻存储以方便后续的分组操作。 Reduce阶段在Reduce阶段数据项被重新组合具有相同key的数据项被分组到同一个计算节点上的Reducer任务中进行处理。Reducer任务对每个key的数据项进行聚合、计算等操作生成最终的结果。
普通SortShuffle的主要特点是基于全排序的方式进行数据的重新分区和排序。它可以通过并行化和排序优化来提高数据的传输效率和聚合效率减少数据传输的网络开销和磁盘IO操作。
然而普通SortShuffle也存在一些潜在的限制。由于需要对所有数据项进行全局排序因此它对内存的需求较高可能会导致内存溢出或额外的磁盘交换开销。在处理大规模数据集或数据倾斜的情况下普通SortShuffle可能会面临性能瓶颈和资源消耗过大的问题。
为了克服这些限制一些优化的SortShuffle实现策略被提出如TungstenSort、SortMergeShuffle等。这些优化策略通常使用更高效的排序算法、内存管理技术和数据分区策略以提高性能、降低资源消耗和应对特殊情况如数据倾斜。
在实际应用中选择合适的Shuffle实现策略需要根据具体的场景和需求进行评估和权衡以获得最佳的性能和可靠性。
在该模式下数据会先写入一个数据结构reduceByKey 写入 Map一边通过 Map 局部聚合一遍写入内存。Join 算子写入 ArrayList 直接写入内存中。然后需要判断是否达到阈值如果达到就会将内存数据结构的数据写入到磁盘清空内存数据结构。在溢写磁盘前先根据 key 进行排序排序过后的数据会分批写入到磁盘文件中。默认批次为 10000 条数据会以每批一万条写入到磁盘文件。写入磁盘文件通过缓冲区溢写的方式每次溢写都会产生一个磁盘文件 也就是说一个 Task 过程会产生多个临时文件。 最后在每个 Task 中将所有的临时文件合并这就是 merge 过程此过程将所有临时文件读取出来一次写入到最终文件。 意味着一个 Task 的所有数据都在这一个文件中。同时单独写一份索引文件标识下游各个Task的数据在文件中的索引start offset和end offset。 5.3.2 bypass SortShuffle
Bypass SortShuffle是一种优化的Shuffle实现策略旨在减少不必要的排序操作和数据传输开销。它适用于某些特定的场景其中不需要对Shuffle的输出进行排序或合并。
在传统的Shuffle过程中数据在Map阶段被划分为多个分区并在Shuffle阶段进行排序和数据传输。然后在Reduce阶段对具有相同key的数据项进行合并和处理。这种方式在某些情况下可能会导致不必要的开销尤其是在不需要排序和合并的情况下。
Bypass SortShuffle通过跳过排序和合并操作来减少这些开销从而提高性能和效率。它在Map阶段生成分区文件并在Reduce阶段直接读取分区文件而不需要排序和合并。
以下是Bypass SortShuffle的主要步骤 Map阶段在Map阶段输入数据被映射为(key, value)对并根据指定的分区器将其分配到不同的分区中。每个分区的数据被写入磁盘上的分区文件中而不进行排序。 Shuffle阶段在Shuffle阶段Reducer任务根据分区文件的位置和分区信息直接读取分区文件并对具有相同key的数据项进行处理。这个过程不涉及排序和合并操作因为数据已经按分区存储。
Bypass SortShuffle的优点是减少了排序和合并操作的开销提高了Shuffle过程的性能和效率。它特别适用于一些场景如对已经预先有序的数据进行处理或需要保留原始数据顺序的情况。
需要注意的是Bypass SortShuffle并不适用于所有的场景尤其是需要排序和合并操作的情况。在选择Shuffle实现策略时需要根据具体的需求和场景来评估和权衡各种选项以获得最佳的性能和结果。
bypass 运行机制的触发条件如下
shuffle reduce task 数量小于等于 spark.shuffle.sort.bypassMergeThreshold 参数的值默认 为 200。不是聚合类的 shuffle 算子比如 reduceByKey。此时 task 会为每个 reduce 端的 task 都创建一个临时磁盘文件并将数据按 key 进行hash 然后根据 key 的 hash 值将 key 写入对应的磁盘文件之中。当然写入磁盘文件时也是先写入内存缓冲缓冲写满之后再溢写到磁盘文件的。最后同样会将所有临时磁盘文件都合并成一个磁盘文件并创建一个单独的索引文件。该过程的磁盘写机制其实跟未经优化的 HashShuffleManager 是一模一样的因为都要创建数量惊人的磁盘文件只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件也让该机制相对未经优化的 HashShuffleManager 来说shuffle read 的性能会更好。而该机制与普通 SortShuffleManager 运行机制的不同在于不会进行排序。也就是说启用该机制的最大好处在于shuffle write 过程中不需要进行数据的排序操作也就节省掉了这部分的性能开销。