当前位置: 首页 > news >正文

wordpress占用多大内存网站在线seo

wordpress占用多大内存,网站在线seo,wordpress 获取分类链接,应用软件开发公司本文转发自技术世界#xff0c;原文链接 http://www.jasongj.com/spark/skew/ Spark性能优化之道——解决Spark数据倾斜#xff08;Data Skew#xff09;的N种姿势 发表于 2017-02-28 | 更新于 2017-10-17 | 本文结合实例详细阐明了Spark数据倾斜的几种场景以及对应的解…本文转发自技术世界原文链接 http://www.jasongj.com/spark/skew/ Spark性能优化之道——解决Spark数据倾斜Data Skew的N种姿势  发表于 2017-02-28 |  更新于 2017-10-17 |   本文结合实例详细阐明了Spark数据倾斜的几种场景以及对应的解决方案包括避免数据源倾斜调整并行度使用自定义Partitioner使用Map侧Join代替Reduce侧Join给倾斜Key加上随机前缀等。 摘要 本文结合实例详细阐明了Spark数据倾斜的几种场景以及对应的解决方案包括避免数据源倾斜调整并行度使用自定义Partitioner使用Map侧Join代替Reduce侧Join给倾斜Key加上随机前缀等。 为何要处理数据倾斜Data Skew 什么是数据倾斜 对Spark/Hadoop这样的大数据系统来讲数据量大并不可怕可怕的是数据倾斜。 何谓数据倾斜数据倾斜指的是并行处理的数据集中某一部分如Spark或Kafka的一个Partition的数据显著多于其它部分从而使得该部分的处理速度成为整个数据集处理的瓶颈。 对于分布式系统而言理想情况下随着系统规模节点数量的增加应用整体耗时线性下降。如果一台机器处理一批大量数据需要120分钟当机器数量增加到三时理想的耗时为120 / 3 40分钟如下图所示。  但是上述情况只是理想情况实际上将单机任务转换成分布式任务后会有overhead使得总的任务量较之单机时有所增加所以每台机器的执行时间加起来比单台机器时更大。这里暂不考虑这些overhead假设单机任务转换成分布式任务后总任务量不变。  但即使如此想做到分布式情况下每台机器执行时间是单机时的1 / N就必须保证每台机器的任务量相等。不幸的是很多时候任务的分配是不均匀的甚至不均匀到大部分任务被分配到个别机器上其它大部分机器所分配的任务量只占总得的小部分。比如一台机器负责处理80%的任务另外两台机器各处理10%的任务如下图所示。  在上图中机器数据增加为三倍但执行时间只降为原来的80%远低于理想值。    数据倾斜的危害 从上图可见当出现数据倾斜时小量任务耗时远高于其它任务从而使得整体耗时过大未能充分发挥分布式系统的并行计算优势。  另外当发生数据倾斜时部分任务处理的数据量过大可能造成内存不足使得任务失败并进而引进整个应用失败。    数据倾斜是如何造成的 在Spark中同一个Stage的不同Partition可以并行处理而具有依赖关系的不同Stage之间是串行处理的。假设某个Spark Job分为Stage 0和Stage 1两个Stage且Stage 1依赖于Stage 0那Stage 0完全处理结束之前不会处理Stage 1。而Stage 0可能包含N个Task这N个Task可以并行进行。如果其中N-1个Task都在10秒内完成而另外一个Task却耗时1分钟那该Stage的总时间至少为1分钟。换句话说一个Stage所耗费的时间主要由最慢的那个Task决定。 由于同一个Stage内的所有Task执行相同的计算在排除不同计算节点计算能力差异的前提下不同Task之间耗时的差异主要由该Task所处理的数据量决定。 Stage的数据来源主要分为如下两类 从数据源直接读取。如读取HDFSKafka读取上一个Stage的Shuffle数据如何缓解/消除数据倾斜 避免数据源的数据倾斜 ———— 读Kafka 以Spark Stream通过DirectStream方式读取Kafka数据为例。由于Kafka的每一个Partition对应Spark的一个TaskPartition所以Kafka内相关Topic的各Partition之间数据是否平衡直接决定Spark处理该数据时是否会产生数据倾斜。 如《Kafka设计解析一- Kafka背景及架构介绍》一文所述Kafka某一Topic内消息在不同Partition之间的分布主要由Producer端所使用的Partition实现类决定。如果使用随机Partitioner则每条消息会随机发送到一个Partition中从而从概率上来讲各Partition间的数据会达到平衡。此时源Stage直接读取Kafka数据的Stage不会产生数据倾斜。 但很多时候业务场景可能会要求将具备同一特征的数据顺序消费此时就需要将具有相同特征的数据放于同一个Partition中。一个典型的场景是需要将同一个用户相关的PV信息置于同一个Partition中。此时如果产生了数据倾斜则需要通过其它方式处理。 避免数据源的数据倾斜 ———— 读文件 原理 Spark以通过textFile(path, minPartitions)方法读取文件时使用TextFileFormat。 对于不可切分的文件每个文件对应一个Split从而对应一个Partition。此时各文件大小是否一致很大程度上决定了是否存在数据源侧的数据倾斜。另外对于不可切分的压缩文件即使压缩后的文件大小一致它所包含的实际数据量也可能差别很多因为源文件数据重复度越高压缩比越高。反过来即使压缩文件大小接近但由于压缩比可能差距很大所需处理的数据量差距也可能很大。 此时可通过在数据生成端将不可切分文件存储为可切分文件或者保证各文件包含数据量相同的方式避免数据倾斜。 对于可切分的文件每个Split大小由如下算法决定。其中goalSize等于所有文件总大小除以minPartitions。而blockSize如果是HDFS文件由文件本身的block大小决定如果是Linux本地文件且使用本地模式由fs.local.block.size决定。 1 protected long computeSplitSize(long goalSize, long minSize, long blockSize) { 2 return Math.max(minSize, Math.min(goalSize, blockSize)); 3 } 默认情况下各Split的大小不会太大一般相当于一个Block大小在Hadoop 2中默认值为128MB所以数据倾斜问题不明显。如果出现了严重的数据倾斜可通过上述参数调整。  案例 现通过脚本生成一些文本文件并通过如下代码进行简单的单词计数。为避免Shuffle只计单词总个数不须对单词进行分组计数。 1 SparkConf sparkConf new SparkConf() 2 .setAppName(ReadFileSkewDemo); 3 JavaSparkContext javaSparkContext new JavaSparkContext(sparkConf); 4 long count javaSparkContext.textFile(inputFile, minPartitions) 5 .flatMap((String line) - Arrays.asList(line.split( )).iterator()).count(); 6 System.out.printf(total words : %s, count); 7 javaSparkContext.stop(); 总共生成如下11个csv文件其中10个大小均为271.9MB另外一个大小为8.5GB。  之后将8.5GB大小的文件使用gzip压缩压缩后大小仅为25.3MB。 使用如上代码对未压缩文件夹进行单词计数操作。Split大小为 max(minSize, min(goalSize, blockSize) max(1 B, min((271.9 108.5 1024) / 1 MB, 128 MB) 128MB。无明显数据倾斜。 使用同样代码对包含压缩文件的文件夹进行同样的单词计数操作。未压缩文件的Split大小仍然为128MB而压缩文件gzip压缩由于不可切分且大小仅为25.3MB因此该文件作为一个单独的Split/Partition。虽然该文件相对较小但是它由8.5GB文件压缩而来包含数据量是其它未压缩文件的32倍因此处理该Split/Partition/文件的Task耗时为4.4分钟远高于其它Task的10秒。 由于上述gzip压缩文件大小为25.3MB小于128MB的Split大小不能证明gzip压缩文件不可切分。现将minPartitions从默认的1设置为229从而目标Split大小为max(minSize, min(goalSize, blockSize) max(1 B, min((271.9 * 1025.3) / 229 MB, 128 MB) 12 MB。如果gzip压缩文件可切分则所有Split/Partition大小都不会远大于12。反之如果仍然存在25.3MB的Partition则说明gzip压缩文件确实不可切分在生成不可切分文件时需要如上文所述保证各文件数量大大致相同。 如下图所示gzip压缩文件对应的Split/Partition大小为25.3MB其它Split大小均为12MB左右。而该Task耗时4.7分钟远大于其它Task的4秒。 总结 适用场景 数据源侧存在不可切分文件且文件内包含的数据量相差较大。 解决方案 尽量使用可切分的格式代替不可切分的格式或者保证各文件实际包含数据量大致相同。 优势 可撤底消除数据源侧数据倾斜效果显著。 劣势 数据源一般来源于外部系统需要外部系统的支持。 调整并行度分散同一个Task的不同Key 原理 Spark在做Shuffle时默认使用HashPartitioner非Hash Shuffle对数据进行分区。如果并行度设置的不合适可能造成大量不相同的Key对应的数据被分配到了同一个Task上造成该Task所处理的数据远大于其它Task从而造成数据倾斜。 如果调整Shuffle时的并行度使得原本被分配到同一Task的不同Key发配到不同Task上处理则可降低原Task所需处理的数据量从而缓解数据倾斜问题造成的短板效应。 案例 现有一张测试表名为student_external内有10.5亿条数据每条数据有一个唯一的id值。现从中取出id取值为9亿到10.5亿的共1.5亿条数据并通过一些处理使得id为9亿到9.4亿间的所有数据对12取模后余数为8即在Shuffle并行度为12时该数据集全部被HashPartition分配到第8个Task其它数据集对其id除以100取整从而使得id大于9.4亿的数据在Shuffle时可被均匀分配到所有Task中而id小于9.4亿的数据全部分配到同一个Task中。处理过程如下 1 INSERT OVERWRITE TABLE test 2 SELECT CASE WHEN id 940000000 THEN (9500000 (CAST (RAND() * 8 AS INTEGER)) * 12 ) 3 ELSE CAST(id/100 AS INTEGER) 4 END, 5 name 6 FROM student_external 7 WHERE id BETWEEN 900000000 AND 1050000000; 通过上述处理一份可能造成后续数据倾斜的测试数据即以准备好。接下来使用Spark读取该测试数据并通过groupByKey(12)对id分组处理且Shuffle并行度为12。代码如下   1 public class SparkDataSkew {2 public static void main(String[] args) {3 SparkSession sparkSession SparkSession.builder()4 .appName(SparkDataSkewTunning)5 .config(hive.metastore.uris, thrift://hadoop1:9083)6 .enableHiveSupport()7 .getOrCreate();8 9 DatasetRow dataframe sparkSession.sql( select * from test); 10 dataframe.toJavaRDD() 11 .mapToPair((Row row) - new Tuple2Integer, String(row.getInt(0),row.getString(1))) 12 .groupByKey(12) 13 .mapToPair((Tuple2Integer, IterableString tuple) - { 14 int id tuple._1(); 15 AtomicInteger atomicInteger new AtomicInteger(0); 16 tuple._2().forEach((String name) - atomicInteger.incrementAndGet()); 17 return new Tuple2Integer, Integer(id, atomicInteger.get()); 18 }).count(); 19 20 sparkSession.stop(); 21 sparkSession.close(); 22 } 23 24 }       本次实验所使用集群节点数为4每个节点可被Yarn使用的CPU核数为16内存为16GB。使用如下方式提交上述应用将启动4个Executor每个Executor可使用核数为12该配置并非生产环境下的最优配置仅用于本文实验可用内存为12GB。 1 spark-submit --queue ambari --num-executors 4 --executor-cores 12 --executor-memory 12g --class com.jasongj.spark.driver.SparkDataSkew --master yarn --deploy-mode client SparkExample-with-dependencies-1.0.jar   GroupBy Stage的Task状态如下图所示Task 8处理的记录数为4500万远大于9倍于其它11个Task处理的500万记录。而Task 8所耗费的时间为38秒远高于其它11个Task的平均时间16秒。整个Stage的时间也为38秒该时间主要由最慢的Task 8决定。 在这种情况下可以通过调整Shuffle并行度使得原来被分配到同一个Task即该例中的Task 8的不同Key分配到不同Task从而降低Task 8所需处理的数据量缓解数据倾斜。 通过groupByKey(48)将Shuffle并行度调整为48重新提交到Spark。新的Job的GroupBy Stage所有Task状态如下图所示。 从上图可知记录数最多的Task 20处理的记录数约为1125万相比于并行度为12时Task 8的4500万降低了75%左右而其耗时从原来Task 8的38秒降到了24秒。 在这种场景下调整并行度并不意味着一定要增加并行度也可能是减小并行度。如果通过groupByKey(11)将Shuffle并行度调整为11重新提交到Spark。新Job的GroupBy Stage的所有Task状态如下图所示。 从上图可见处理记录数最多的Task 6所处理的记录数约为1045万耗时为23秒。处理记录数最少的Task 1处理的记录数约为545万耗时12秒。 总结 适用场景大量不同的Key被分配到了相同的Task造成该Task数据量过大。 解决方案调整并行度。一般是增大并行度但有时如本例减小并行度也可达到效果。 优势实现简单可在需要Shuffle的操作算子上直接设置并行度或者使用spark.default.parallelism设置。如果是Spark SQL还可通过SET spark.sql.shuffle.partitions[num_tasks]设置并行度。可用最小的代价解决问题。一般如果出现数据倾斜都可以通过这种方法先试验几次如果问题未解决再尝试其它方法。 劣势适用场景少只能将分配到同一Task的不同Key分散开但对于同一Key倾斜严重的情况该方法并不适用。并且该方法一般只能缓解数据倾斜没有彻底消除问题。从实践经验来看其效果一般。 自定义Partitioner 原理 使用自定义的Partitioner默认为HashPartitioner将原本被分配到同一个Task的不同Key分配到不同Task。 案例 以上述数据集为例继续将并发度设置为12但是在groupByKey算子上使用自定义的Partitioner实现如下 1 .groupByKey(new Partitioner() {2 Override3 public int numPartitions() {4 return 12;5 }6 7 Override8 public int getPartition(Object key) {9 int id Integer.parseInt(key.toString()); 10 if(id 9500000 id 9500084 ((id - 9500000) % 12) 0) { 11 return (id - 9500000) / 12; 12 } else { 13 return id % 12; 14 } 15 } 16 }) 由下图可见使用自定义Partition后耗时最长的Task 6处理约1000万条数据用时15秒。并且各Task所处理的数据集大小相当。  总结 适用场景大量不同的Key被分配到了相同的Task造成该Task数据量过大。 解决方案使用自定义的Partitioner实现类代替默认的HashPartitioner尽量将所有不同的Key均匀分配到不同的Task中。 优势不影响原有的并行度设计。如果改变并行度后续Stage的并行度也会默认改变可能会影响后续Stage。 劣势适用场景有限只能将不同Key分散开对于同一Key对应数据集非常大的场景不适用。效果与调整并行度类似只能缓解数据倾斜而不能完全消除数据倾斜。而且需要根据数据特点自定义专用的Partitioner不够灵活。 将Reduce side Join转变为Map side Join 原理 通过Spark的Broadcast机制将Reduce侧Join转化为Map侧Join避免Shuffle从而完全消除Shuffle带来的数据倾斜。 案例 通过如下SQL创建一张具有倾斜Key且总记录数为1.5亿的大表test。 1 INSERT OVERWRITE TABLE test 2 SELECT CAST(CASE WHEN id 980000000 THEN (95000000 (CAST (RAND() * 4 AS INT) 1) * 48 ) 3 ELSE CAST(id/10 AS INT) END AS STRING), 4 name 5 FROM student_external 6 WHERE id BETWEEN 900000000 AND 1050000000; 使用如下SQL创建一张数据分布均匀且总记录数为50万的小表test_new。  1 INSERT OVERWRITE TABLE test_new 2 SELECT CAST(CAST(id/10 AS INT) AS STRING), 3 name 4 FROM student_delta_external 5 WHERE id BETWEEN 950000000 AND 950500000; 直接通过Spark Thrift Server提交如下SQL将表test与表test_new进行Join并将Join结果存于表test_join中。  1 INSERT OVERWRITE TABLE test_join 2 SELECT test_new.id, test_new.name 3 FROM test 4 JOIN test_new 5 ON test.id test_new.id; 该SQL对应的DAG如下图所示。从该图可见该执行过程总共分为三个Stage前两个用于从Hive中读取数据同时二者进行Shuffle通过最后一个Stage进行Join并将结果写入表test_join中。  从下图可见Join Stage各Task处理的数据倾斜严重处理数据量最大的Task耗时7.1分钟远高于其它无数据倾斜的Task约2秒的耗时。 接下来尝试通过Broadcast实现Map侧Join。实现Map侧Join的方法并非直接通过CACHE TABLE test_new将小表test_new进行cache。现通过如下SQL进行Join。 1 CACHE TABLE test_new; 2 INSERT OVERWRITE TABLE test_join 3 SELECT test_new.id, test_new.name 4 FROM test 5 JOIN test_new 6 ON test.id test_new.id; 通过如下DAG图可见该操作仍分为三个Stage且仍然有Shuffle存在唯一不同的是小表的读取不再直接扫描Hive表而是扫描内存中缓存的表。  并且数据倾斜仍然存在。如下图所示最慢的Task耗时为7.1分钟远高于其它Task的约2秒。 正确的使用Broadcast实现Map侧Join的方式是通过SET spark.sql.autoBroadcastJoinThreshold104857600;将Broadcast的阈值设置得足够大。 再次通过如下SQL进行Join。 1 SET spark.sql.autoBroadcastJoinThreshold104857600; 2 INSERT OVERWRITE TABLE test_join 3 SELECT test_new.id, test_new.name 4 FROM test 5 JOIN test_new 6 ON test.id test_new.id; 通过如下DAG图可见该方案只包含一个Stage。  并且从下图可见各Task耗时相当无明显数据倾斜现象。并且总耗时为1.5分钟远低于Reduce侧Join的7.3分钟。 总结 适用场景参与Join的一边数据集足够小可被加载进Driver并通过Broadcast方法广播到各个Executor中。 解决方案在Java/Scala代码中将小数据集数据拉取到Driver然后通过Broadcast方案将小数据集的数据广播到各Executor。或者在使用SQL前将Broadcast的阈值调整得足够大从而使用Broadcast生效。进而将Reduce侧Join替换为Map侧Join。 优势避免了Shuffle彻底消除了数据倾斜产生的条件可极大提升性能。 劣势要求参与Join的一侧数据集足够小并且主要适用于Join的场景不适合聚合的场景适用条件有限。 为skew的key增加随机前/后缀 原理 为数据量特别大的Key增加随机前/后缀使得原来Key相同的数据变为Key不相同的数据从而使倾斜的数据集分散到不同的Task中彻底解决数据倾斜问题。Join另一则的数据中与倾斜Key对应的部分数据与随机前缀集作笛卡尔乘积从而保证无论数据倾斜侧倾斜Key如何加前缀都能与之正常Join。 案例 通过如下SQL将id为9亿到9.08亿共800万条数据的id转为9500048或者9500096其它数据的id除以100取整。从而该数据集中id为9500048和9500096的数据各400万其它id对应的数据记录数均为100条。这些数据存于名为test的表中。 对于另外一张小表test_new取出50万条数据并将id递增且唯一除以100取整使得所有id都对应100条数据。 1 INSERT OVERWRITE TABLE test2 SELECT CAST(CASE WHEN id 908000000 THEN (9500000 (CAST (RAND() * 2 AS INT) 1) * 48 )3 ELSE CAST(id/100 AS INT) END AS STRING),4 name5 FROM student_external6 WHERE id BETWEEN 900000000 AND 1050000000;7 8 INSERT OVERWRITE TABLE test_new9 SELECT CAST(CAST(id/100 AS INT) AS STRING), 10 name 11 FROM student_delta_external 12 WHERE id BETWEEN 950000000 AND 950500000;  通过如下代码读取test表对应的文件夹内的数据并转换为JavaPairRDD存于leftRDD中同样读取test表对应的数据存于rightRDD中。通过RDD的join算子对leftRDD与rightRDD进行Join并指定并行度为48。 1 public class SparkDataSkew{2 public static void main(String[] args) {3 SparkConf sparkConf new SparkConf();4 sparkConf.setAppName(DemoSparkDataFrameWithSkewedBigTableDirect);5 sparkConf.set(spark.default.parallelism, String.valueOf(parallelism));6 JavaSparkContext javaSparkContext new JavaSparkContext(sparkConf);7 8 JavaPairRDDString, String leftRDD javaSparkContext.textFile(hdfs://hadoop1:8020/apps/hive/warehouse/default/test/)9 .mapToPair((String row) - { 10 String[] str row.split(,); 11 return new Tuple2String, String(str[0], str[1]); 12 }); 13 14 JavaPairRDDString, String rightRDD javaSparkContext.textFile(hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/) 15 .mapToPair((String row) - { 16 String[] str row.split(,); 17 return new Tuple2String, String(str[0], str[1]); 18 }); 19 20 leftRDD.join(rightRDD, parallelism) 21 .mapToPair((Tuple2String, Tuple2String, String tuple) - new Tuple2String, String(tuple._1(), tuple._2()._2())) 22 .foreachPartition((IteratorTuple2String, String iterator) - { 23 AtomicInteger atomicInteger new AtomicInteger(); 24 iterator.forEachRemaining((Tuple2String, String tuple) - atomicInteger.incrementAndGet()); 25 }); 26 27 javaSparkContext.stop(); 28 javaSparkContext.close(); 29 } 30 } 从下图可看出整个Join耗时1分54秒其中Join Stage耗时1.7分钟。 通过分析Join Stage的所有Task可知在其它Task所处理记录数为192.71万的同时Task 32的处理的记录数为992.72万故它耗时为1.7分钟远高于其它Task的约10秒。这与上文准备数据集时将id为9500048为9500096对应的数据量设置非常大其它id对应的数据集非常均匀相符合。 现通过如下操作实现倾斜Key的分散处理 将leftRDD中倾斜的key即9500048与9500096对应的数据单独过滤出来且加上1到24的随机前缀并将前缀与原数据用逗号分隔以方便之后去掉前缀形成单独的leftSkewRDD将rightRDD中倾斜key对应的数据抽取出来并通过flatMap操作将该数据集中每条数据均转换为24条数据每条分别加上1到24的随机前缀形成单独的rightSkewRDD将leftSkewRDD与rightSkewRDD进行Join并将并行度设置为48且在Join过程中将随机前缀去掉得到倾斜数据集的Join结果skewedJoinRDD将leftRDD中不包含倾斜Key的数据抽取出来作为单独的leftUnSkewRDD对leftUnSkewRDD与原始的rightRDD进行Join并行度也设置为48得到Join结果unskewedJoinRDD通过union算子将skewedJoinRDD与unskewedJoinRDD进行合并从而得到完整的Join结果集具体实现代码如下 1 public class SparkDataSkew{2 public static void main(String[] args) {3 int parallelism 48;4 SparkConf sparkConf new SparkConf();5 sparkConf.setAppName(SolveDataSkewWithRandomPrefix);6 sparkConf.set(spark.default.parallelism, parallelism );7 JavaSparkContext javaSparkContext new JavaSparkContext(sparkConf);8 9 JavaPairRDDString, String leftRDD javaSparkContext.textFile(hdfs://hadoop1:8020/apps/hive/warehouse/default/test/) 10 .mapToPair((String row) - { 11 String[] str row.split(,); 12 return new Tuple2String, String(str[0], str[1]); 13 }); 14 15 JavaPairRDDString, String rightRDD javaSparkContext.textFile(hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/) 16 .mapToPair((String row) - { 17 String[] str row.split(,); 18 return new Tuple2String, String(str[0], str[1]); 19 }); 20 21 String[] skewedKeyArray new String[]{9500048, 9500096}; 22 SetString skewedKeySet new HashSetString(); 23 ListString addList new ArrayListString(); 24 for(int i 1; i 24; i) { 25 addList.add(i ); 26 } 27 for(String key : skewedKeyArray) { 28 skewedKeySet.add(key); 29 } 30 31 BroadcastSetString skewedKeys javaSparkContext.broadcast(skewedKeySet); 32 BroadcastListString addListKeys javaSparkContext.broadcast(addList); 33 34 JavaPairRDDString, String leftSkewRDD leftRDD 35 .filter((Tuple2String, String tuple) - skewedKeys.value().contains(tuple._1())) 36 .mapToPair((Tuple2String, String tuple) - new Tuple2String, String((new Random().nextInt(24) 1) , tuple._1(), tuple._2())); 37 38 JavaPairRDDString, String rightSkewRDD rightRDD.filter((Tuple2String, String tuple) - skewedKeys.value().contains(tuple._1())) 39 .flatMapToPair((Tuple2String, String tuple) - addListKeys.value().stream() 40 .map((String i) - new Tuple2String, String( i , tuple._1(), tuple._2())) 41 .collect(Collectors.toList()) 42 .iterator() 43 ); 44 45 JavaPairRDDString, String skewedJoinRDD leftSkewRDD 46 .join(rightSkewRDD, parallelism) 47 .mapToPair((Tuple2String, Tuple2String, String tuple) - new Tuple2String, String(tuple._1().split(,)[1], tuple._2()._2())); 48 49 JavaPairRDDString, String leftUnSkewRDD leftRDD.filter((Tuple2String, String tuple) - !skewedKeys.value().contains(tuple._1())); 50 JavaPairRDDString, String unskewedJoinRDD leftUnSkewRDD.join(rightRDD, parallelism).mapToPair((Tuple2String, Tuple2String, String tuple) - new Tuple2String, String(tuple._1(), tuple._2()._2())); 51 52 skewedJoinRDD.union(unskewedJoinRDD).foreachPartition((IteratorTuple2String, String iterator) - { 53 AtomicInteger atomicInteger new AtomicInteger(); 54 iterator.forEachRemaining((Tuple2String, String tuple) - atomicInteger.incrementAndGet()); 55 }); 56 57 javaSparkContext.stop(); 58 javaSparkContext.close(); 59 } 60 } 通过分析Join Stage的所有Task可知从下图可看出整个Join耗时58秒其中Join Stage耗时33秒。 由于Join分倾斜数据集Join和非倾斜数据集Join而各Join的并行度均为48故总的并行度为96由于提交任务时设置的Executor个数为4每个Executor的core数为12故可用Core数为48所以前48个Task同时启动其Launch时间相同后48个Task的启动时间各不相同等待前面的Task结束才开始由于倾斜Key被加上随机前缀原本相同的Key变为不同的Key被分散到不同的Task处理故在所有Task中未发现所处理数据集明显高于其它Task的情况 实际上由于倾斜Key与非倾斜Key的操作完全独立可并行进行。而本实验受限于可用总核数为48可同时运行的总Task数为48故而该方案只是将总耗时减少一半效率提升一倍。如果资源充足可并发执行Task数增多该方案的优势将更为明显。在实际项目中该方案往往可提升数倍至10倍的效率。 总结 适用场景两张表都比较大无法使用Map则Join。其中一个RDD有少数几个Key的数据量过大另外一个RDD的Key分布较为均匀。 解决方案将有数据倾斜的RDD中倾斜Key对应的数据集单独抽取出来加上随机前缀另外一个RDD每条数据分别与随机前缀结合形成新的RDD相当于将其数据增到到原来的N倍N即为随机前缀的总个数然后将二者Join并去掉前缀。然后将不包含倾斜Key的剩余数据进行Join。最后将两次Join的结果集通过union合并即可得到全部Join结果。 优势相对于Map则Join更能适应大数据集的Join。如果资源充足倾斜部分数据集与非倾斜部分数据集可并行进行效率提升明显。且只针对倾斜部分的数据做数据扩展增加的资源消耗有限。 劣势如果倾斜Key非常多则另一侧数据膨胀非常大此方案不适用。而且此时对倾斜Key与非倾斜Key分开处理需要扫描数据集两遍增加了开销。 大表随机添加N种随机前缀小表扩大N倍 原理 如果出现数据倾斜的Key比较多上一种方法将这些大量的倾斜Key分拆出来意义不大。此时更适合直接对存在数据倾斜的数据集全部加上随机前缀然后对另外一个不存在严重数据倾斜的数据集整体与随机前缀集作笛卡尔乘积即将数据量扩大N倍。 案例 这里给出示例代码读者可参考上文中分拆出少数倾斜Key添加随机前缀的方法自行测试。 1 public class SparkDataSkew {2 public static void main(String[] args) {3 SparkConf sparkConf new SparkConf();4 sparkConf.setAppName(ResolveDataSkewWithNAndRandom);5 sparkConf.set(spark.default.parallelism, parallelism );6 JavaSparkContext javaSparkContext new JavaSparkContext(sparkConf);7 8 JavaPairRDDString, String leftRDD javaSparkContext.textFile(hdfs://hadoop1:8020/apps/hive/warehouse/default/test/)9 .mapToPair((String row) - { 10 String[] str row.split(,); 11 return new Tuple2String, String(str[0], str[1]); 12 }); 13 14 JavaPairRDDString, String rightRDD javaSparkContext.textFile(hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/) 15 .mapToPair((String row) - { 16 String[] str row.split(,); 17 return new Tuple2String, String(str[0], str[1]); 18 }); 19 20 ListString addList new ArrayListString(); 21 for(int i 1; i 48; i) { 22 addList.add(i ); 23 } 24 25 BroadcastListString addListKeys javaSparkContext.broadcast(addList); 26 27 JavaPairRDDString, String leftRandomRDD leftRDD.mapToPair((Tuple2String, String tuple) - new Tuple2String, String(new Random().nextInt(48) , tuple._1(), tuple._2())); 28 29 JavaPairRDDString, String rightNewRDD rightRDD 30 .flatMapToPair((Tuple2String, String tuple) - addListKeys.value().stream() 31 .map((String i) - new Tuple2String, String( i , tuple._1(), tuple._2())) 32 .collect(Collectors.toList()) 33 .iterator() 34 ); 35 36 JavaPairRDDString, String joinRDD leftRandomRDD 37 .join(rightNewRDD, parallelism) 38 .mapToPair((Tuple2String, Tuple2String, String tuple) - new Tuple2String, String(tuple._1().split(,)[1], tuple._2()._2())); 39 40 joinRDD.foreachPartition((IteratorTuple2String, String iterator) - { 41 AtomicInteger atomicInteger new AtomicInteger(); 42 iterator.forEachRemaining((Tuple2String, String tuple) - atomicInteger.incrementAndGet()); 43 }); 44 45 javaSparkContext.stop(); 46 javaSparkContext.close(); 47 } 48 } 总结  适用场景一个数据集存在的倾斜Key比较多另外一个数据集数据分布比较均匀。 优势对大部分场景都适用效果不错。 劣势需要将一个数据集整体扩大N倍会增加资源消耗。 总结 对于数据倾斜并无一个统一的一劳永逸的方法。更多的时候是结合数据特点数据集大小倾斜Key的多少等综合使用上文所述的多种方法。转载于:https://www.cnblogs.com/zuizui1204/p/7920692.html
http://www.zqtcl.cn/news/118292/

相关文章:

  • 石家庄网站建设招聘珠海快速网站建设
  • 网站建设代理ai制作网页
  • 微网站平台怎样做网站wordpress侧栏跟随
  • 手机网站建设好吗湖南省专业建设公司网站的机构
  • 网站代码 字体好用的cms网站
  • 美食网站首页设计用手机怎么看自己做的网站
  • 平台类网站开发怎样做永久网站二维码
  • 网站开发客户挖掘php网站开发心得3500字
  • 检察院做网站的目的青岛网站推广优化
  • dede替换网站模板定制网站建设的流程
  • 天津专业网站制作网站开发模板
  • 做二手车网站需要什么怎样建立门户网站
  • 宁波做网站首荐荣盛网络网站建设太仓
  • 购物网站公司要花费多少钱wordpress 菜单 字体加粗
  • 网站模板如何编辑软件crm免费客户管理系统
  • 微信制作网站设计重庆关键词优化软件
  • 网站的设计与应用论文平台推广计划书模板范文
  • 网站备案用户名忘了怎么办网站做301排名会掉
  • 厦门制作网站企业网站子域名怎么做
  • 青岛微网站开发品牌建设青之见
  • 淄博哪有培训做网站的湖南营销型网站建设企业
  • 动物网站建设深圳最好的营销网站建设公司
  • 各种网站制作陕西建设厅证件查询网站
  • 如何提高一个网站如何做简单网站
  • 游戏网站开发找什么人可建智慧园区设计方案
  • 重庆网站设计公司推荐福州移动网站建设
  • 移动网站功能做网站fjfzwl
  • 食品网站建设的目的中级经济师考试成绩查询
  • 普宁建设局网站免费的网站开发平台
  • 网站域名主机空间区别网站上传系统