导购网站怎么建,公司注册信息查询,网店运营推广高级实训攻略,响应式网站需要的技术https://www.cnblogs.com/yongjian/p/6425772.html 概述 键值对RDD是Spark操作中最常用的RDD#xff0c;它是很多程序的构成要素#xff0c;因为他们提供了并行操作各个键或跨界点重新进行数据分组的操作接口。 创建 Spark中有许多中创建键值对RDD的方式#xff0c;其中包括…https://www.cnblogs.com/yongjian/p/6425772.html 概述 键值对RDD是Spark操作中最常用的RDD它是很多程序的构成要素因为他们提供了并行操作各个键或跨界点重新进行数据分组的操作接口。 创建 Spark中有许多中创建键值对RDD的方式其中包括 文件读取时直接返回键值对RDD通过List创建键值对RDD在Scala中可通过Map函数生成二元组 1 2 3 4 5 6 7 8 9 10 val listRDD sc.parallelize(List(1,2,3,4,5)) val result listRDD.map(x (x,1)) result.foreach(println) //结果 (1,1) (2,1) (3,1) (4,1) (5,1) 键值对RDD的转化操作 基本RDD转化操作在此同样适用。但因为键值对RDD中包含的是一个个二元组所以需要传递的函数会由原来的操作单个元素改为操作二元组。 下表总结了针对单个键值对RDD的转化操作以 { (1,2) , (3,4) , (3,6) } 为例f表示传入的函数 函数名目的示例结果reduceByKey(f)合并具有相同key的值rdd.reduceByKey( ( x,y) xy ){ (1,2) , (3,10) }groupByKey()对具有相同key的值分组rdd.groupByKey(){ (1,2) , (3, [4,6] ) }mapValues(f)对键值对中的每个值(value)应用一个函数但不改变键(key)rdd.mapValues(x x1){ (1,3) , (3,5) , (3,7) }combineBy Key( createCombiner, mergeValue, mergeCombiners, partitioner)使用不同的返回类型合并具有相同键的值下面有详细讲解-flatMapValues(f)对键值对RDD中每个值应用返回一个迭代器的函数然后对每个元素生成一个对应的键值对。常用语符号化rdd.flatMapValues(x ( x to 5 )) { (1, 2) , (1, 3) , (1, 4) , (1, 5) , (3, 4) , (3, 5) } keys()获取所有keyrdd.keys(){1,3,3}values()获取所有valuerdd.values(){2,4,6}sortByKey()根据key排序rdd.sortByKey(){ (1,2) , (3,4) , (3,6) } 下表总结了针对两个键值对RDD的转化操作以rdd1 { (1,2) , (3,4) , (3,6) } rdd2 { (3,9) } 为例 函数名目的示例结果subtractByKey删掉rdd1中与rdd2的key相同的元素rdd1.subtractByKey(rdd2){ (1,2) }join内连接rdd1.join(rdd2) {(3, (4, 9)), (3, (6, 9))} leftOuterJoin左外链接rdd1.leftOuterJoin (rdd2) {(3,( Some( 4), 9)), (3,( Some( 6), 9))} rightOuterJoin右外链接rdd1.rightOuterJoin(rdd2) {(1,( 2, None)), (3, (4, Some( 9))), (3, (6, Some( 9)))} cogroup将两个RDD钟相同key的数据分组到一起rdd1.cogroup(rdd2){(1,([ 2],[])), (3, ([4, 6],[ 9]))} combineByKey combineByKey( createCombiner, mergeValue, mergeCombiners, partitioner,mapSideCombine) combineByKey( createCombiner, mergeValue, mergeCombiners, partitioner) combineByKey( createCombiner, mergeValue, mergeCombiners) 函数功能 聚合各分区的元素而每个元素都是二元组。功能与基础RDD函数aggregate()差不多可让用户返回与输入数据类型不同的返回值。 combineByKey函数的每个参数分别对应聚合操作的各个阶段。所以理解此函数对Spark如何操作RDD会有很大帮助。 参数解析 createCombiner分区内 创建组合函数 mergeValue分区内 合并值函数 mergeCombiners多分区 合并组合器函数 partitioner自定义分区数默认为HashPartitioner mapSideCombine是否在map端进行Combine操作默认为true 工作流程 combineByKey会遍历分区中的所有元素因此每个元素的key要么没遇到过要么和之前某个元素的key相同。如果这是一个新的元素函数会调用createCombiner创建那个key对应的累加器初始值。如果这是一个在处理当前分区之前已经遇到的key会调用mergeCombiners把该key累加器对应的当前value与这个新的value合并。 代码例子 //统计男女个数 1 2 3 4 5 6 7 8 9 10 val conf new SparkConf ().setMaster (local).setAppName (app_1) val sc new SparkContext (conf) val people List((男, 李四), (男, 张三), (女, 韩梅梅), (女, 李思思), (男, 马云)) val rdd sc.parallelize(people,2) val result rdd.combineByKey( (x: String) (List(x), 1), //createCombiner (peo: (List[String], Int), x : String) (x :: peo._1, peo._2 1), //mergeValue (sex1: (List[String], Int), sex2: (List[String], Int)) (sex1._1 ::: sex2._1, sex1._2 sex2._2)) //mergeCombiners result.foreach(println) 结果 (男, ( List( 张三, 李四, 马云),3 ) )(女, ( List( 李思思, 韩梅梅),2 ) ) 流程分解 解析两个分区分区一按顺序V1、V2、V3遍历 V1发现第一个key男时调用createCombiner即 (x: String) (List(x), 1) V2第二次碰到key男的元素调用mergeValue即 (peo: (List[String], Int), x : String) (x :: peo._1, peo._2 1) V3发现第一个key女继续调用createCombiner即 (x: String) (List(x), 1) … …待各V1、V2分区都计算完后数据进行混洗调用mergeCombiners即 (sex1: (List[String], Int), sex2: (List[String], Int)) (sex1._1 ::: sex2._1, sex1._2 sex2._2)) add by jan 2017-02-27 18:34:39 以下例子都基于此RDD 1 2 3 4 (Hadoop,1) (Spark,1) (Hive,1) (Spark,1) reduceByKey(func) reduceByKey(func)的功能是使用func函数合并具有相同键的值。 比如reduceByKey((a,b) ab)有四个键值对(spark,1)、(spark,2)、(hadoop,3)和(hadoop,5)对具有相同key的键值对进行合并后的结果就是(spark,3)、(hadoop,8)。可以看出(a,b) ab这个Lamda表达式中a和b都是指value比如对于两个具有相同key的键值对(spark,1)、(spark,2)a就是1b就是2。 1 2 3 4 scala pairRDD.reduceByKey((a,b)ab).foreach(println) (Spark,2) (Hive,1) (Hadoop,1) groupByKey() roupByKey()的功能是对具有相同键的值进行分组。比如对四个键值对(spark,1)、(spark,2)、(hadoop,3)和(hadoop,5)采用groupByKey()后得到的结果是(spark,(1,2))和(hadoop,(3,5))。 1 2 3 4 5 6 7 scala pairRDD.groupByKey() res15: org.apache.spark.rdd.RDD[(String, Iterable[Int])] ShuffledRDD[15] at groupByKey at console:34 //从上面执行结果信息中可以看出分组后value被保存到Iterable[Int]中 scala pairRDD.groupByKey().foreach(println) (Spark,CompactBuffer(1, 1)) (Hive,CompactBuffer(1)) (Hadoop,CompactBuffer(1)) keys keys只会把键值对RDD中的key返回形成一个新的RDD。比如对四个键值对(spark,1)、(spark,2)、(hadoop,3)和(hadoop,5)构成的RDD采用keys后得到的结果是一个RDD[Int]内容是{spark,spark,hadoop,hadoop}。 1 2 3 4 5 6 7 scala pairRDD.keys res17: org.apache.spark.rdd.RDD[String] MapPartitionsRDD[17] at keys at console:34 scala pairRDD.keys.foreach(println) Hadoop Spark Hive Spark values values只会把键值对RDD中的value返回形成一个新的RDD。比如对四个键值对(spark,1)、(spark,2)、(hadoop,3)和(hadoop,5)构成的RDD采用keys后得到的结果是一个RDD[Int]内容是{1,2,3,5}。 1 2 3 4 5 6 7 8 scala pairRDD.values res0: org.apache.spark.rdd.RDD[Int] MapPartitionsRDD[2] at values at console:34 scala pairRDD.values.foreach(println) 1 1 1 1 sortByKey() sortByKey()的功能是返回一个根据键排序的RDD。 1 2 3 4 5 6 7 scala pairRDD.sortByKey() res0: org.apache.spark.rdd.RDD[(String, Int)] ShuffledRDD[2] at sortByKey at console:34 scala pairRDD.sortByKey().foreach(println) (Hadoop,1) (Hive,1) (Spark,1) (Spark,1) mapValues(func) 我们经常会遇到一种情形我们只想对键值对RDD的value部分进行处理而不是同时对key和value进行处理。对于这种情形Spark提供了mapValues(func)它的功能是对键值对RDD中的每个value都应用一个函数但是key不会发生变化。比如对四个键值对(spark,1)、(spark,2)、(hadoop,3)和(hadoop,5)构成的pairRDD如果执行pairRDD.mapValues(x x1)就会得到一个新的键值对RDD它包含下面四个键值对(spark,2)、(spark,3)、(hadoop,4)和(hadoop,6)。 1 2 3 4 5 6 7 scala pairRDD.mapValues(x x1) res2: org.apache.spark.rdd.RDD[(String, Int)] MapPartitionsRDD[4] at mapValues at console:34 scala pairRDD.mapValues(x x1).foreach(println) (Hadoop,2) (Spark,2) (Hive,2) (Spark,2) join join(连接)操作是键值对常用的操作。“连接”(join)这个概念来自于关系数据库领域因此join的类型也和关系数据库中的join一样包括内连接(join)、左外连接(leftOuterJoin)、右外连接(rightOuterJoin)等。最常用的情形是内连接所以join就表示内连接。对于内连接对于给定的两个输入数据集(K,V1)和(K,V2)只有在两个数据集中都存在的key才会被输出最终得到一个(K,(V1,V2))类型的数据集。 比如pairRDD1是一个键值对集合{(spark,1)、(spark,2)、(hadoop,3)和(hadoop,5)}pairRDD2是一个键值对集合{(spark,fast)}那么pairRDD1.join(pairRDD2)的结果就是一个新的RDD这个新的RDD是键值对集合{(spark,1,fast),(spark,2,fast)}。对于这个实例我们下面在spark-shell中运行一下 1 2 3 4 5 6 7 8 9 10 11 12 scala val pairRDD1 sc.parallelize(Array((spark,1),(spark,2),(hadoop,3),(hadoop,5))) pairRDD1: org.apache.spark.rdd.RDD[(String, Int)] ParallelCollectionRDD[24] at parallelize at console:27 scala val pairRDD2 sc.parallelize(Array((spark,fast))) pairRDD2: org.apache.spark.rdd.RDD[(String, String)] ParallelCollectionRDD[25] at parallelize at console:27 scala pairRDD1.join(pairRDD2) res9: org.apache.spark.rdd.RDD[(String, (Int, String))] MapPartitionsRDD[28] at join at console:32 scala pairRDD1.join(pairRDD2).foreach(println) (spark,(1,fast)) (spark,(2,fast))