当前位置: 首页 > news >正文

没有网站怎么做cpa成都百度推广公司地址

没有网站怎么做cpa,成都百度推广公司地址,郑州销售网站,网站建设中敬请期待2. PySpark——RDD编程入门 文章目录 2. PySpark——RDD编程入门2.1 程序执行入口SparkContext对象2.2 RDD的创建2.2.1 并行化创建2.2.2 获取RDD分区数2.2.3 读取文件创建 2.3 RDD算子2.4 常用Transformation算子2.4.1 map算子2.4.2 flatMap算子2.4.3 reduceByKey算子2.4.4 Wor…2. PySpark——RDD编程入门 文章目录 2. PySpark——RDD编程入门2.1 程序执行入口SparkContext对象2.2 RDD的创建2.2.1 并行化创建2.2.2 获取RDD分区数2.2.3 读取文件创建 2.3 RDD算子2.4 常用Transformation算子2.4.1 map算子2.4.2 flatMap算子2.4.3 reduceByKey算子2.4.4 WordCount回顾2.4.5 groupBy算子2.4.6 Filter算子2.4.7 distinct算子2.4.8 union算子2.4.9 join算子2.4.10 intersection 算子2.4.11 glom算子2.4.12 groupByKey算子2.4.13 sortBy算子2.4.14 sortByKey2.4.15 综合案例2.4.16 将案例提交到yarn运行 2.5 常用Action算子2.5.1 countByKey算子2.5.2 collect算子2.5.3 reduce算子2.5.4 fold算子2.5.5 first算子2.5.6 take算子2.5.7 top算子2.5.8 count算子2.5.9 takeSample算子2.5.10 takeOrdered2.5.11 foreach算子2.5.12 saveAsTextFile2.5.13 注意点 2.6 分区操作算子2.6.1 mapPartitions算子2.6.2 foreachPartition算子2.6.3 partitionBy算子2.6.4 repartition算子2.6.5 coalesce算子2.6.6 mapValues算子2.6.7 join算子 2.7 面试题2.8 总结 3. RDD的持久化3.1 RDD的数据是过程数据3.2 RDD的缓存3.2.1 缓存3.2.2 缓存特点3.2.3 缓存是如何保存的 3.3 RDD的CheckPoint3.3.1 RDD CheckPoint3.3.2 CheckPoint是如何保存数据的3.3.3 缓存和CheckPoint的对比3.3.4 代码3.3.5 注意3.3.6 总结 4. Spark案例练习4.1 搜索引擎日志分析案例4.2 提交到集群运行4.3 作业 5. 共享变量5.1 广播变量5.1.1 问题引出5.1.2 解决方案-广播变量 5.2 累加器5.2.1 需求5.2.2 没有累加器的代码演示5.2.3 解决方法-累加器5.2.4 累加器的注意事项 5.3 综合案例5.3.1 需求 5.4 总结 6.Spark内核调度重点理解6.1 DAG6.1.1 DAG6.1.2 Job和Action6.1.3 DAG和分区 6.2 DAG的宽窄依赖和阶段划分6.2.1 窄依赖6.2.2 宽依赖6.2.3 阶段划分 6.3 内存迭代计算6.3.1 面试题 6.4 Spark并行度6.4.1 如何设置并行度6.4.2 全局并行度-推荐6.4.3 针对RDD的并行度设置-不推荐6.4.4 集群中如何规划并行度 6.5 Spark任务调度6.5.1 Drivcer内的两个组件 6.6 拓展-Spark概念名词大全6.6.1 Spark运行中的概念名词大全 6.7 SparkShuffle6.7.1MR Shuffle回顾6.7.2 简介6.7.3 Sort Shuffle bypass机制6.7.4 Shuflle的配置选项 6.8 总结 2.1 程序执行入口SparkContext对象 Spark RDD 编程的程序入口对象是SparkContext对象(不论何种编程语言) 只有构建出SparkContext, 基于它才能执行后续的API调用和计算 本质上, SparkContext对编程来说, 主要功能就是创建第一个RDD出来 代码演示 # coding:utf8# 导入Spark相关包 from pyspark import SparkConf, SparkContext if __name__ __main__:# 构建SparkConf对象conf SparkConf().setAppName (helloSpark).setMaster(local[*])# 构建SparkContext执行环境入口对象sc SparkContext(confconf)master的种类 locallocal[N]:表示以N核CPU执行local[*]:给予local进程 所有CPU核心的使用权standlonespark//node1:7077yarn 模式 2.2 RDD的创建 RDD的创建主要有2种方式: • 通过并行化集合创建 ( 本地对象 转 分布式RDD ) • 读取外部数据源 ( 读取文件 ) 2.2.1 并行化创建 概念并行化创建是指将本地集合转向分布式RDD这一步就是分布式的开端本地转分布式 API rdd spakcontext.parallelize(参数1参数2) 参数1 集合对象即可比如list 参数2 分区数 完整代码 # coding:utf8 from pyspark import SparkConf,SparkContextif __name__ __main__:# 0. 构建Spark执行环境conf SparkConf().setAppName(create rdd).setMaster(local[*])sc SparkContext(confconf)# sc 对象的parallelize方法可以将本地集合转换成RDD返回给你data [1,2,3,4,5,6,7,8,9]rdd sc.parallelize(data,numSlices3)print(rdd.collect())执行结果 Setting default log level to WARN. To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). [1, 2, 3, 4, 5, 6, 7, 8, 9]Process finished with exit code 02.2.2 获取RDD分区数 getNumPartitions API :获取RDD分区数量返回值是Int数字 用法rdd.getNumPartitions() 例如基于上述代码设置了3为分区数调用以下代码 print(rdd.getNumPartitions())则会输出结果3 完整案例代码01_create_parallelize.py # coding:utf8# 导入Spark相关包 from pyspark import SparkConf, SparkContextif __name__ __main__:# 0. 初始化执行环境 构建SparkContext对象,本地集合-- 分布式对象RDDconf SparkConf().setAppName (test).setMaster(local[*])sc SparkContext(confconf)# 演示通过并行化集合的方式去创建RDDrdd sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9])# parallelize方法没有给定分区数默认分区数是多少 根据CPU核心来定print(默认分区数, rdd.getNumPartitions())rdd sc.parallelize([1, 2, 3], 3)print(分区数, rdd.getNumPartitions())# collect方法是将RDD分布式对象中每个分区的数据都发送到Driver中形成一个Python List对象# collect分布式 转-- 本地集合print(rdd的内容是, rdd.collect())print(type(rdd.collect())) 输出结果 默认分区数 8 分区数 3 rdd的内容是 [1, 2, 3] class list2.2.3 读取文件创建 textFileAPI 这个API可以读取本地数据也可以读取hdfs数据 使用方法 sparkcontext.textFile(参数1,参数2) 参数1必填文件路径 支持本地文件 支持HDFS 也支持一些比如S3协议 参数2 可选表示最小分区数量 注意参数2 话语权不足spark有自己的判断在它允许的范围内参数2有效果超出spark允许的范围参数2失效 案例代码02_create_textFile.py # coding : utf8 from pyspark import SparkConf, SparkContextif __name__ __main__:conf SparkConf().setAppName(02_create_textFile).setMaster(local[*])sc SparkContext(confconf)# 通过textFile API 读取数据# 读取本地文件数据file_rdd1 sc.textFile(../data/input/words.txt)print(默认读取分区数, file_rdd1.getNumPartitions())print(file_rdd1 内容, file_rdd1.collect())## # 加最小分区数的测试file_rdd2 sc.textFile(../data/input/words.txt,3)file_rdd3 sc.textFile(../data/input/words.txt,100)print(file_rdd2 分区数, file_rdd2.getNumPartitions())print(file_rdd3 分区数, file_rdd3.getNumPartitions())# 读取hdfs文件数据测试hdfs_rdd sc.textFile(hdfs://Tnode1:8020/input/words.txt)print(hdfs_rdd 分区数, hdfs_rdd.getNumPartitions())print(hdfs_rdd 内容, hdfs_rdd.collect())输出结果 默认读取分区数 2 file_rdd1 内容 [hello spark, hello hadoop, hello flink] file_rdd2 内容 4 file_rdd3 内容 38 hdfs_rdd 分区 2 hdfs_rdd 内容 [hello spark, hello hadoop, hello flink]wholeTextFile 读取文件的API有个适用场景适合读取一堆小文件 这个API是小文件读取专用 用法 sparkcontext.textFile(参数1,参数2)# 参数1必填文件路径 支持本地文件 支持HDFS 也支持一些比如S3协议# 参数2 可选表示最小分区数量# 注意参数2 话语权不足spark有自己的判断在它允许的范围内参数2有效果超出spark允许的范围参数2失效这个API偏向于少量分区读取数据 因为这个API表明了自己是小文件读取专用那么文件的数据很小、分区很多 导致shuffle的几率更高所以尽量少分区读取数据 案例代码03_create_wholeTextFile.py # coding:utf8 from pyspark import SparkConf, SparkContextif __name__ __main__:conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)# 读取小文件文件夹rdd sc.wholeTextFiles(../data/input/tiny_files)print(rdd.collect())print(rdd.map(lambda x: x[1]).collect()) 输出结果 [(file:/tmp/pycharm_project_937/PySpark01/data/input/tiny_files/1.txt, hello spark\r\nhello hadoop\r\nhello flink), (file:/tmp/pycharm_project_937/PySpark01/data/input/tiny_files/2.txt, hello spark\r\nhello hadoop\r\nhello flink), (file:/tmp/pycharm_project_937/PySpark01/data/input/tiny_files/3.txt, hello spark\r\nhello hadoop\r\nhello flink), (file:/tmp/pycharm_project_937/PySpark01/data/input/tiny_files/4.txt, hello spark\r\nhello hadoop\r\nhello flink), (file:/tmp/pycharm_project_937/PySpark01/data/input/tiny_files/5.txt, hello spark\r\nhello hadoop\r\nhello flink)] [hello spark\r\nhello hadoop\r\nhello flink, hello spark\r\nhello hadoop\r\nhello flink, hello spark\r\nhello hadoop\r\nhello flink, hello spark\r\nhello hadoop\r\nhello flink, hello spark\r\nhello hadoop\r\nhello flink] 2.3 RDD算子 算子是什么 ​ 算子分布式集合对象上的API称之为算子 ​ 方法、函数本地对象的API叫做方法、函数 ​ 算子分布式对象的API叫做算子 算子分类 ​ RDD的算子 分成2类 ​ Transformation转换算子​ Action动作行动算子 Transformation 算子 ​ 定义RDD的算子返回值任然是一个RDD的称之为转换算子 ​ 特性这类算子lazy 懒加载的如果没有action算子Transformation算子是不工作的 Action算子 ​ 定义返回值不是rdd的就是action算子 对于这两类算子来说Transformation算子相当于在构建执行计划action是一个指令让这个执行计划开始工作。 如果没有actionTransformation算子之间的迭代关系就是一个没有通电的流水线 只有action到来这个数据处理的流水线才开始工作 2.4 常用Transformation算子 2.4.1 map算子 演示代码04_operators_map.py # coding:utf8 from pyspark import SparkConf, SparkContextdef addNum(data):return data * 10if __name__ __main__:conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)rdd1 sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 4)rdd2 rdd1.map(lambda x: x * 10)rdd3 rdd1.map(addNum)result rdd2.collect()print(result)print(rdd3.collect()) 输出结果 [10, 20, 30, 40, 50, 60, 70, 80, 90] [10, 20, 30, 40, 50, 60, 70, 80, 90]对于传入参数的lambda表达式 传入方法作为传参的时候可以选择 定义方法传入其方法名使用lambda 匿名方法的方式 一般如果方法体可以一行写完用lambda方便。 如果方法体复杂就直接定义方法更方便 2.4.2 flatMap算子 功能对rdd执行map操作然后进行解除嵌套操作 解除嵌套 演示代码05_operators_flatMap.py # coding:utf8 from pyspark import SparkConf, SparkContextdef addNum(data):return data * 10if __name__ __main__:conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)rdd1 sc.parallelize([hadoop spark hadoop, spark hadoop hadoop, hadoop flink spark])# 得到所有的单词组成rdd,flatMap的传入参数和map一致就是给map逻辑用的解除嵌套无需逻辑传参rdd2 rdd1.flatMap(lambda line: line.split( ))print(rdd2.collect())输出结果 [hadoop, spark, hadoop, spark, hadoop, hadoop, hadoop, flink, spark]注意flatMap只适合用于有“嵌套”的rdd直接用于没有嵌套的rdd会报错 2.4.3 reduceByKey算子 功能针对KV型的RDD自动按照key分组然后根据你提供的聚合逻辑完成组内数据value的聚合操作。 用法 rdd.reduceByKey(func) # func:(V,V) ——V # 接收2个传入参数类型要一致返回一个返回值类型和传入要求一致。reduceByKey的聚合逻辑是 比如有[1,2,3,4,5],然后聚合函数是lambda a,b: a b 注意reduceByKey中接收的函数只负责聚合不理会分组 分组是自动 byKey来分组的。 代码演示06_operators_reduceByKey.py # coding:utf8 from pyspark import SparkConf, SparkContextdef addNum(data):return data * 10if __name__ __main__:conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)rdd sc.parallelize([(a, 1), (a, 1), (b, 1), (b, 1), (a, 1)])rdd2 sc.parallelize([(a, 1), (a, 11), (b, 3), (b, 1), (a, 5)])rdd3 sc.parallelize([(a, 1), (a, 11), (b, 3), (b, 1), (a, 5)])rdd rdd.reduceByKey(lambda a, b: a b)rdd2 rdd2.map(lambda x: (x[0], x[1] * 10))# 只操作value的算子rdd3 rdd3.mapValues(lambda value: value * 10)# recudeByKey 对相同key的数据执行聚合相加print(rdd.collect())print(rdd2.collect())print(rdd3.collect())输出结果 [(a, 3), (b, 2)] [(a, 10), (a, 110), (b, 30), (b, 10), (a, 50)] [(a, 10), (a, 110), (b, 30), (b, 10), (a, 50)]2.4.4 WordCount回顾 代码演示07_wordcount_example.py # coding:utf8from pyspark import SparkContext, SparkConfif __name__ __main__:# 构建SparkConf对象conf SparkConf().setAppName(test).setMaster(local[*])# 构建SparkContext执行环境入口对象sc SparkContext(confconf)# 1.读取文件获取数据 构建RDDfile_rdd sc.textFile(r../data/input/words.txt)# 2. 通过flatMap API取出所有的单词word_rdd file_rdd.flatMap(lambda x: x.split( ))# 3.将单词转换成元组key是单词value是1word_with_one_rdd word_rdd.map(lambda word:(word,1))# 4. 用reduceByKey 对单词进行分组并进行value的聚合result_rdd word_with_one_rdd.reduceByKey(lambda a,b:ab)# 5. 通过collect算子将rdd的数据收集到Driver中打印输出print(result_rdd.collect())输出结果 [(hadoop, 1), (hello, 3), (spark, 1), (flink, 1)]2.4.5 groupBy算子 功能将rdd的数据进行分组 语法 rdd.groupBy(func) # func 函数 # func:(T)——k # 函数要求传入一个参数返回一个返回值类型无所谓 # 这个函数是 拿到你返回值后将所有相同返回值的放入一个组中 # 分组完成后每一个组是一个二元元组key就是返回值所有同组的数据放入一个迭代器对象中作为value代码演示08_oprators_groupBy.py # coding:utf8from pyspark import SparkConf, SparkContextif __name__ __main__:conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)rdd sc.parallelize([(a, 1), (a, 1), (b, 1), (b, 1), (b, 1)])# 通过groupBy对数据进行分组# groupBy传入的函数的意思是通过这个函数确定按照谁来分组(返回谁即可)# 分组规则和SQL是一致的也就是相同的在一个组Hash分组result rdd.groupBy(lambda t: t[0])print(result.collect())print(hello)print(result.map(lambda t: (t[0], list(t[1]))).collect())输出结果 [(a, pyspark.resultiterable.ResultIterable object at 0x7f85fa80eca0), (b, pyspark.resultiterable.ResultIterable object at 0x7f85fa80ebb0)] hello [(a, [(a, 1), (a, 1)]), (b, [(b, 1), (b, 1), (b, 1)])]2.4.6 Filter算子 功能过滤把想要的数据进行保留 语法 rdd.filter(func) # func:(T)——bool 传入1个随意类型参数进来返回值必须是True or False返回值是True的数据被保留False的数据被丢弃 代码演示09_operators_filter.py # coding:utf8from pyspark import SparkConf, SparkContextif __name__ __main__:conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)rdd sc.parallelize([1, 2, 3, 4, 5, 6])# 通过Filter算子过滤奇数,filter 只返回true的值result rdd.filter(lambda x: x % 2 1)print(result.collect())输出结果 [1, 3, 5]2.4.7 distinct算子 功能对RDD数据进行去重返回新的RDD 语法 rdd.distinct(参数1) # 参数1去重分区数量一般不用传演示代码10_operators_distinct.py # coding:utf8from pyspark import SparkConf, SparkContextif __name__ __main__:conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)rdd sc.parallelize([1, 1, 1, 2, 2, 2, 3, 3, 3])# distinct 进行RDD数据去重操作print(rdd.distinct().collect())rdd2 sc.parallelize([(a, 1), (a, 1), (a, 3)])print(rdd2.distinct().collect())输出结果 [1, 2, 3] [(a, 3), (a, 1)]2.4.8 union算子 功能2个rdd合并成1个rdd返回 用法rdd.union(other_rdd) 注意只合并不会去重 代码演示11_operators_union.py # coding:utf8from pyspark import SparkConf, SparkContextif __name__ __main__:conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)rdd1 sc.parallelize([1, 1, 3, 3])rdd2 sc.parallelize([a,b,a])rdd3 rdd1.union(rdd2)print(rdd3.collect())print(rdd3.distinct().collect()) 1. 可以看到union算子是不会去重的 2. RDD的类型不同也是可以合并的输出结果 [1, 1, 3, 3, a, b, a] [1, 3, b, a]2.4.9 join算子 功能对两个RDD执行JOIN操作可实现SQL的内、外连接 注意join算子只能用于二元元组 语法 rdd.join(other_rdd) #内连接 rdd.leftOuterJoin(other_rdd) # 左外 rdd.rightOuterJoin(other_rdd) # 右外代码演示12_operators_join.py # coding:utf8from pyspark import SparkConf, SparkContextif __name__ __main__:conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)rdd1 sc.parallelize([(1001, 张三), (1002, 李四), (1003, 王五), (1004, 赵六)])rdd2 sc.parallelize([(1001, 销售部), (1002, 科技部)])# 通过join算子来进行rdd之间的关联# 对于join算子来说 关联条件 按照二元元组的key来进行关联# 内连接print(rdd1.join(rdd2).collect())# 左外连接print(rdd1.leftOuterJoin(rdd2).collect())# 右外连接print(rdd1.rightOuterJoin(rdd2).collect()) 输出结果 [(1001, (张三, 销售部)), (1002, (李四, 科技部))] [(1001, (张三, 销售部)), (1002, (李四, 科技部)), (1003, (王五, None)), (1004, (赵六, None))] [(1001, (张三, 销售部)), (1002, (李四, 科技部))]2.4.10 intersection 算子 功能求2个rdd的交集返回一个新rdd 用法rdd.intersection(other_rdd) 代码演示13_operators_intersection.py # coding:utf8from pyspark import SparkConf, SparkContextif __name__ __main__:conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)rdd1 sc.parallelize([(a,1),(a,3)])rdd2 sc.parallelize([(a,1),(b,3)])# 通过intersection算子求RDD之间的交集将交集取出返回新RDDrdd3 rdd1.intersection(rdd2)print(rdd3.collect())输出结果 [(a, 1)]2.4.11 glom算子 功能将RDD的数据加上嵌套这个嵌套按照分区来进行 比如RDD数据[1,2,3,4,5]有两个分区 那么被glom后数据变成[[1,2,3],[4,5]] 使用方法rdd.glom() 代码演示14_operators_glom.py # coding:utf8from pyspark import SparkConf, SparkContextif __name__ __main__:conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)rdd sc.parallelize([1,2,3,4,5,6,7,8,9,10],2)rdd2 sc.parallelize([1,2,3,4,5,6,7,8,9,10])print(rdd.glom().collect())print(rdd.glom().flatMap(lambda x:x).collect()) # 用flatMap解嵌套print(rdd2.glom().collect())输出结果 [[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]] [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] [[1], [2], [3], [4, 5], [6], [7], [8], [9, 10]]2.4.12 groupByKey算子 功能针对KV型RDD自动按照key分组 用法rdd.groupByKey() 自动按照key分组 代码演示15_operators_groupByKey.py # coding:utf8from pyspark import SparkConf, SparkContextif __name__ __main__:conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)rdd sc.parallelize([(a, 1), (a, 1), (a, 1,), (b, 1), (b, 1)])rdd2 rdd.groupByKey()print(rdd2.map(lambda x:(x[0],list(x[1]))).collect())输出结果 [(a, [1, 1, 1]), (b, [1, 1])]2.4.13 sortBy算子 功能对RDD数据进行排序基于你指定的排序依据 语法 rdd.sortBy(func,ascendingFalse,numPartitions1) # func:(T)——U:告知按照rdd中的哪个数据进行排序比如lambda x:x[1] 表示按照rdd中的第二列元素进行排序 # ascending True升序False 降序 # numPartition用多少分区来排序注意如果要全局有序排序分区数请设置为1因为生产环境下分区数大于1很可能只得到局部有序的结果 代码演示16_operators_sortBy.py # coding:utf8from pyspark import SparkConf, SparkContextif __name__ __main__:conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)rdd sc.parallelize([(g, 3), (c, 1), (b, 2,), (a, 9), (h, 10), (i, 4), (l, 26,), (o, 1), (d, 7)])# 使用sortBy对rdd进行排序# 参数1 函数表示的是告诉spark按照数据的哪个列进行排序# 参数2 boolTrue表示升序False表示降序# 参数3 分区数设置注意如果要全局有序排序分区数请设置为1因为生产环境下分区数大于1很可能只得到局部有序的结果rdd2 rdd.sortBy(lambda x:x[1],ascendingTrue,numPartitions3)rdd3 rdd.sortBy(lambda x:x[0],ascendingTrue,numPartitions8)print(rdd2.collect())print(rdd3.collect())输出结果 [(c, 1), (o, 1), (b, 2), (g, 3), (i, 4), (d, 7), (a, 9), (h, 10), (l, 26)] [(a, 9), (b, 2), (c, 1), (d, 7), (g, 3), (h, 10), (i, 4), (l, 26), (o, 1)]2.4.14 sortByKey 功能针对KV型RDD按照key进行排序 语法 sortByKey(ascendingTrue,numPartitionsNone,keyfuncfunction RDD,lambda) ascending:升序或降序True升序False降序默认是升序numPartitions按照几个分区进行排序如果全局有序设置为1keyfunc在排序前对key进行处理语法是(k)——U,一个参数传入返回一个值 代码演示17_operators_sortByKey.py # coding:utf8from pyspark import SparkConf, SparkContextif __name__ __main__:conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)rdd sc.parallelize([(g, 3), (A, 1), (B, 2,), (A, 9), (h, 10), (i, 4), (l, 26,), (o, 1), (d, 7)])# 调用了忽略大小写的函数print(rdd.sortByKey(ascendingTrue, numPartitions1, keyfunclambda key: str(key).lower()).collect())输出结果 [(A, 1), (A, 9), (B, 2), (d, 7), (g, 3), (h, 10), (i, 4), (l, 26), (o, 1)]2.4.15 综合案例 代码演示18_operators_demo.py # coding:utf8import jsonfrom pyspark import SparkConf, SparkContextif __name__ __main__:conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)# 读取数据文件file_rdd sc.textFile(../data/input/order.text)# 进行rdd数据的split 按照|符号进行得到一个json数据jsons_rdd file_rdd.flatMap(lambda line: line.split(|))# 通过python内置的json库完成json字符串到字典对象的转换dict_rdd jsons_rdd.map(lambda json_str: json.loads(json_str))# 过滤数据只保留北京的数据beijing_rdd dict_rdd.filter(lambda d: d[areaName] 北京)# 组合北京和商品类型形成的字符串category_rdd beijing_rdd.map(lambda x: x[areaName] _ x[category])# 对结果集进行去重操作result_rdd category_rdd.distinct()# 输出print(result_rdd.collect())输出结果 [北京_平板电脑, 北京_家具, 北京_书籍, 北京_食品, 北京_服饰, 北京_手机, 北京_家电, 北京_电脑]2.4.16 将案例提交到yarn运行 改动1加入环境变量让pycharm运行yarn的时候知道hadoop的配置在哪可以去读取yarn的信息 import os from defs_19 import city_with_category # 导入自己写的函数时把文件夹设置为SourceRoot就不会报错了 os.environ[HADOOP_CONF_DIR] /export/server/hadoop/etc/hadoop改动2在集群上运行本地文件就不可以用了需要用hdfs文件 # 在集群中运行我们需要用HDFS路径不能用本地路径file_rdd sc.textFile(hdfs://Tnode1:8020/input/order.text)改动3 如果提交到集群运行除了主代码以外还依赖了其它的代码文件需要设置一个参数来告知spark还有依赖文件要同步上传到集群中参数叫做spark.submit.pyFiles参数的值可以是单个.py文件也可以是.zip压缩包有多个依赖文件的时候可以用zip压缩后上传conf.set(spark.submit.pyFiles,defs_19.py)完整代码19_operators_runOnYarn.py # coding:utf8import jsonfrom pyspark import SparkConf, SparkContext import os from defs_19 import city_with_category # 导入自己写的函数时把文件夹设置为SourceRoot就不会报错了 os.environ[HADOOP_CONF_DIR] /export/server/hadoop/etc/hadoop if __name__ __main__:# 提交到yarn集群master设置为yarnconf SparkConf().setAppName(SparkDemo01).setMaster(yarn)如果提交到集群运行除了主代码以外还依赖了其它的代码文件需要设置一个参数来告知spark还有依赖文件要同步上传到集群中参数叫做spark.submit.pyFiles参数的值可以是单个.py文件也可以是.zip压缩包有多个依赖文件的时候可以用zip压缩后上传conf.set(spark.submit.pyFiles,defs_19.py)sc SparkContext(confconf)# 在集群中运行我们需要用HDFS路径不能用本地路径file_rdd sc.textFile(hdfs://Tnode1:8020/input/order.text)# 进行rdd数据的split 按照|符号进行得到一个json数据jsons_rdd file_rdd.flatMap(lambda line: line.split(|))# 通过python内置的json库完成json字符串到字典对象的转换dict_rdd jsons_rdd.map(lambda json_str: json.loads(json_str))# 过滤数据只保留北京的数据beijing_rdd dict_rdd.filter(lambda d: d[areaName] 北京)# 组合北京和商品类型形成的字符串category_rdd beijing_rdd.map(city_with_category)# 对结果集进行去重操作result_rdd category_rdd.distinct()# 输出print(result_rdd.collect())依赖代码defs_19.py # coding:utf8def city_with_category(data):return data[areaName] _ data[category] 输出结果 [北京_书籍, 北京_食品, 北京_服饰, 北京_平板电脑, 北京_家具, 北京_手机, 北京_家电, 北京_电脑]在服务器上通过spark-submit 提交到集群运行 # --py-files 可以帮你指定你依赖的其它python代码支持.zip(一堆)也可以单个.py文件都行。 /export/server/spark/bin/spark-submit --master yarn --py-files ./defs.py ./main.py 服务器上程序运行结果 注意在服务器上跑时需要把conf中的setMaster去掉 即conf SparkConf().setAppName(“SparkDemo01”).setMaster(“yarn”)改为 conf SparkConf().setAppName(“SparkDemo01”) 2.5 常用Action算子 2.5.1 countByKey算子 功能统计key出现的次数一般适用于KV型的RDD 代码演示20_operators_countByKey.py # coding:utf8import jsonfrom pyspark import SparkConf, SparkContextif __name__ __main__:conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)rdd sc.textFile(../data/input/words.txt)rdd2 rdd.flatMap(lambda x:x.split( )).map(lambda x: (x, 1))# 通过countByKey来对key进行计数这是一个Action算子result rdd2.countByKey()print(result)print(list(result))print(result[hello])print(type(result))输出结果 defaultdict(class int, {hello: 3, spark: 1, hadoop: 1, flink: 1}) [hello, spark, hadoop, flink] 3 class collections.defaultdict2.5.2 collect算子 功能将RDD各个分区内的数据统一收集到Driver中形成一个List对象 用法rdd.collect() 这个算子是将RDD各个分区数据都拉取到Driver 注意的是RDD是分布式对象其数据量可以很大 所以用这个算子之前要心知肚明地了解 结果数据集不会太大。 不然会把Driver内存撑爆 2.5.3 reduce算子 功能对RDD数据集按照你传入的逻辑进行聚合 语法 rdd.reduce(func) # func:(T,T)——T # 2参数传入1个返回值返回值要和参数要求类型一致代码演示21_operators_reduce.py # coding:utf8import jsonfrom pyspark import SparkConf, SparkContextif __name__ __main__:conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)rdd sc.parallelize([1,2,3,4,5])print(rdd.reduce(lambda a, b: a b))输出结果 152.5.4 fold算子 功能和reduce一样接收传入逻辑进行聚合聚合是带有初始值的 这个初始值聚合会作用在 分区内聚合分区间聚合 比如[[1,2,3],[4,5,6],[7,8,9]] 数据量分布在3个分区 分区1 1、2、3 聚合的时候带上10作为初始值得到16 分区3 4、5、6 聚合的时候带上10作为初始值得到25 分区4 7、8、9 聚合的时候带上10作为初始值得到34 3个分区的结果做聚合也带上初始值10所以结果是10162534 85 代码演示22_operators_fold.py # coding:utf8import jsonfrom pyspark import SparkConf, SparkContextif __name__ __main__:conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)rdd sc.parallelize([1,2,3,4,5,6,7,8,9],3)print(rdd.glom().collect())print(rdd.fold(10, lambda a, b: a b))输出结果 [[1, 2, 3], [4, 5, 6], [7, 8, 9]] 852.5.5 first算子 功能取出RDD的第一个元素 用法 sc.parallelize([3,2,1]).first() 输出32.5.6 take算子 功能取RDD的前N个元素。组合成list返回给你 用法 sc.parallelize([3,2,1,4,5,6]).take(5) [3, 2, 1, 4, 5]2.5.7 top算子 功能对RDD数据集进行降序排序取前N个 用法 sc.parallelize([3,2,1,4,5,6]).top(3) # 表示取降序前3个 [6, 5, 4]2.5.8 count算子 功能计算RDD有多少条数据返回值是一个数字 用法 sc.parallelize([3,2,1,4,5,6]).count() 62.5.9 takeSample算子 功能随机抽样RDD的数据 用法 takeSample(参数1True or False参数2采样数参数3随机数种子) - 参数1True表示允许取同一个数据False表示不允许取同一个数据和数据内容无关是否重复表示的是同一个位置的数据有、无放回抽样 - 参数2抽样要几个 - 参数3随机数种子这个参数传入一个数字即可随意给随机数种子 数字可以随便传如果传同一个数字 那么取出的结果是一致的。 一般参数3 我们不传Spark会自动给与随机的种子。 代码演示23_operators_takeSample.py # coding:utf8import jsonfrom pyspark import SparkConf, SparkContextif __name__ __main__:conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)rdd sc.parallelize([1, 3, 5, 3, 1, 3, 2, 6, 7, 8, 6],1)result rdd.takeSample(False,5,1)# 随机抽样可以抽出相同的数据只是位置不同而已# 随机数种子能让随机数不再继续发生变化print(result)输出结果 [2, 7, 6, 6, 3]注意 随机抽样可以抽出相同的数据只是位置不同而已 随机数种子能让随机数不再继续发生变化 2.5.10 takeOrdered 功能对RDD进行排序取前N个 用法 rdd.takeOrdered(参数1参数2) - 参数1 要几个数据 - 参数2 对排序的数据进行更改不会更改数据本身只是在排序的时候换个样子 这个方法按照元素自然顺序升序排序如果你想玩倒叙需要参数2 来对排序的数据进行处理代码演示24_operators_takeOrdered.py # coding:utf8import jsonfrom pyspark import SparkConf, SparkContextif __name__ __main__:conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)rdd sc.parallelize([1, 3, 2, 4, 7, 9, 6], 1)print(rdd.takeOrdered(3))print(rdd.takeOrdered(3,lambda x:-x))输出结果 [1, 2, 3] [9, 7, 6]2.5.11 foreach算子 功能对RDD的每一个元素执行你提供的逻辑的操作和map一个思想但是这个方法没有返回值 用法 rdd.foreach(func) # func:(T) —— None代码演示25_operators_foreach.py # coding:utf8import jsonfrom pyspark import SparkConf, SparkContextif __name__ __main__:conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)rdd sc.parallelize([1, 3, 2, 4, 7, 9, 6], 1)rdd.foreach(lambda x: print(x * 10))输出结果 10 30 20 40 70 90 602.5.12 saveAsTextFile 功能将RDD的数据写入文本文件中 支持本地写出hdfs等文件系统 代码演示 # coding:utf8import jsonfrom pyspark import SparkConf, SparkContextif __name__ __main__:conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)rdd sc.parallelize([1,3,2,4,7,9,6],3)rdd.saveAsTextFile(hdfs://Tnode1:8020/test/output/out1)运行结果 注意保存文件API是分布式执行的 这个API的执行数据是不经过driver的 如图写出的时候每个分区所在的Executor直接控制数据写出到目标文件系统中 所有才会一个分区产生一个结果文件 2.5.13 注意点 我们学习的action中 foreachsaveAsTextFile 这两个算子是分区Executor直接执行的跳过Driver由分区所在的Executor直接执行 反之其余的Action算子都会将结果发送至Driver 2.6 分区操作算子 2.6.1 mapPartitions算子 transformation算子 图解 如图mapPartition一次被传递的是一整个分区的数据 作为一个迭代器一次性list对象传入过来。 代码演示27_operators_mapPartitions.py # coding:utf8import jsonfrom pyspark import SparkConf, SparkContextimport timeif __name__ __main__:start_time time.time()conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)# 效果和map一样但是性能比map好cpu计算没有省但是网络IO少很多rdd sc.parallelize([1,3,2,4,7,9,6],3)def process(iter):result []for it in iter:result.append(it*10)return resultprint(rdd.mapPartitions(process).collect())# print(rdd.map(lambda x:x*10).collect())end_time time.time()gap_time (end_time - start_time)gap_time round(gap_time, 4) # 保留四位小数print(执行本程序共耗时 str(gap_time) s)输出结果 [10, 30, 20, 40, 70, 90, 60] 执行本程序共耗时8.0515s注意效果和map一样但是性能比map好cpu计算没有省但是网络IO少很多 2.6.2 foreachPartition算子 Action算子 功能和普通foreach一致一次处理的是一整个分区数据 代码演示28_operators_foreachPartitions.py # coding:utf8import jsonfrom pyspark import SparkConf, SparkContextif __name__ __main__:conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)rdd sc.parallelize([1,3,2,4,7,9,6],3)def process(iter):result []for it in iter:result.append(it*10)print(result)rdd.foreachPartition(process)输出结果 [70, 90, 60] [10, 30] [20, 40]foreachPartition 就是一个没有返回值的mapPartitions 2.6.3 partitionBy算子 transformation算子 功能对RDD进行自定义分区操作 用法 rdd.partitionBy(参数1参数2) - 参数1 重新分区后有几个分区 - 参数2 自定义分区规则函数传入参数2(K)——int 一个传入参数进来类型无所谓但是返回值一定是int类型 将key传给这个函数你自己写逻辑决定返回一个分区编号分区编号从0开始不要超出分区数-1代码演示29_operators_partitionBy.py # coding:utf8import jsonfrom pyspark import SparkConf, SparkContextif __name__ __main__:conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)rdd sc.parallelize([(hadoop, 1), (spark, 1), (hello, 1), (flink, 1), (hadoop, 1), (spark, 1)])# 使用partitionBy 自定义 分区def process(k):if hadoop k or hello k: return 0if spark k: return 1return 2print(rdd.partitionBy(3, process).glom().collect())输出结果分区依次为0、1、2 [[(hadoop, 1), (hello, 1), (hadoop, 1)], [(spark, 1), (spark, 1)], [(flink, 1)]]分区号不要超标你设置3个分区分区号只能是0 1 2 设置5个分区 分区号只能是0 1 2 3 4 2.6.4 repartition算子 transformation算子 功能对RDD的分区执行重新分区仅数量 用法 rdd.repartition(N) 传入N 决定新的分区数代码演示30_operators_repartition_and_coalesce.py # coding:utf8import jsonfrom pyspark import SparkConf, SparkContextif __name__ __main__:conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)rdd sc.parallelize([1, 2, 3, 4, 5], 3)# repartition 修改分区print(rdd.repartition(1).getNumPartitions())print(rdd.repartition(5).getNumPartitions())# coalesce 修改分区print(rdd.coalesce(1).getNumPartitions())print(rdd.coalesce(5,shuffleTrue).getNumPartitions())输出结果 1 5 1 5注意对分区的数量进行操作一定要慎重 一般情况下我们写spark代码除了要求全局排序设置为1个分区外 多数时候所有API中关于分区相关的代码我们都不太理会 因为如果你改分区了 会影响并行计算内存迭代的并行管道数量后面学分区如果增加极大可能导致shuffle 2.6.5 coalesce算子 transformation算子 功能对分区进行数量增减 用法 rdd.coalesce(参数1参数2) - 参数1分区数 - 参数2True or False True表示允许shuffle也就是可以加分区 False表示不允许shuffle也就是不能加分区False是默认代码见2.6.4 对比repartition一般使用coalesce较多因为加分区要写参数2 这样避免写repartition的时候手抖了加分区了 2.6.6 mapValues算子 Transformation算子 功能针对二元元组RDD对其内部的二元元组的Value执行map操作 语法 rdd.mapValues(func) # func: (V)—— U # 注意传入的参数是二元元组的 value值 # 我们这个传入的方法只对value进行处理代码演示 # coding:utf8 from pyspark import SparkConf, SparkContextif __name__ __main__:conf SparkConf().setAppName(create rdd).setMaster(local[*])sc SparkContext(confconf)rdd sc.parallelize([(a, 1), (a, 11), (a, 6), (b, 3), (b, 5)])# rdd.map(lambda x:(x[0],x[1]*10))# 将二元元组的所有value都乘以10进行处理print(rdd.mapValues(lambda x: x * 10).collect())输出结果 [(a, 10), (a, 110), (a, 60), (b, 30), (b, 50)]2.6.7 join算子 Transformation算子 功能对两个RDD执行join操作可以实现SQL的内、外连接 注意join算子只能用于二元元组 代码见 2.4.9 2.7 面试题 groupByKey和reduceByKey的区别 在功能上的区别 groupByKey仅仅只有分组功能而已reduceByKey除了有ByKey的分组功能外还有reduce聚合功能所以是一个分组聚合一体化的算子 如果对数据执行分组聚合那么使用这2个算子的性能差别是很大的 reduceByKey的性能是远大于groupByKey聚合逻辑的 因为 如图这是groupByKey聚合逻辑的执行流程。 因为groupByKey只能分组所以执行上是先分组shuffle后聚合 再来看reduceByKey 如图reduceByKey由于自带聚合逻辑所以可以完成 先在分区内做预聚合然后再走分组流程shuffle分组后再做最终聚合 对于groupByKeyreduceByKey最大的提升在于分组前进行了预聚合那么在shuffle分组节点被shuffle的数据可以极大地减少 这就极大地提升了性能 分组聚合首选reduceByKey数据越大对groupByKey的优势就越高 2.8 总结 RDD创建方式有哪几种方法 ​ 通过并行化集合的方式本地集合转分布式集合 ​ 或者读取数据的方式创建TextFile、WholeTextFile RDD分区数如何查看 ​ 通过getNumPartitions API查看返回值Int Transformation和Action的区别 转换算子的返回值100%是RDD而Action算子的返回值100%不是RDD 转换算子是懒加载的只有遇到Action才会执行Action就是转换算子处理链条的开关。 哪两个Action算子的结果不经过Driver直接输出 ​ foreach和saveAsTextFile 直接由Executor执行后输出不会将结果发送到Driver上去 reduceByKey和groupByKey的区别 reduceByKey自带聚合逻辑groupByKey不带 如果做数据聚合reduceByKey的效率更好因为可以先聚合后shuffle在最终聚合传输的IO小 mapPartitions和foreachPartition的区别 mapPartitions带有返回值 foreachPartition不带 对于分区操作有什么要注意的地方 尽量不要增加分区可能破坏内存迭代的计算管道 3. RDD的持久化 3.1 RDD的数据是过程数据 RDD之间进行相互迭代计算Transformation的转换当执行开启后新的RDD生成代表老RDD的消失。 RDD的数据是过程数据只在处理的过程中存在一旦处理完成就不见了。 这个特性可以最大化地利用资源老旧RDD没用了 就从内存中清理给后续的计算腾出内存空间。 如上图rdd3被2次使用第一次使用之后其实RDD3就不存在了。 第二次使用的时候只能基于RDD的血缘关系从RDD1重新执行构建出来RDD3供RDD5使用。 3.2 RDD的缓存 3.2.1 缓存 对于上述的场景肯定要执行优化优化就是 RDD3如果不消失那么RDD1——RDD2——RDD3这个链条就不会执行2次或者更多次 RDD的缓存技术Spark提供了缓存API可以让我们通过调用APi将指定的RDD数据保留在内存或者硬盘上 缓存的API # RDD3 被2次使用可以加入缓存进行优化 rdd3.cache() # 缓存到内存中 rdd3.persist(StorageLevel.MEMORY_ONLY) # 仅内存缓存 rdd3.persist(StorageLevel.MEMORY_ONLY_2) # 仅内存缓存,2个副本 rdd3.persist(StorageLevel.DISK_ONLY) # 仅缓存硬盘上 rdd3.persist(StorageLevel.DISK_ONLY_2) # 仅缓存硬盘上,2个副本 rdd3.persist(StorageLevel.DISK_ONLY_3) # 仅缓存硬盘上,3个副本 rdd3.persist(StorageLevel.MEMORY_AND_DISK) # 先放内存不够放硬盘 rdd3.persist(StorageLevel.MEMORY_AND_DISK_2) # 先放内存不够放硬盘2个副本 rdd3.persist(StorageLevel.OFF_HEAP) # 堆外内存系统内存# 如上API自行选择使用即可 # 一般建议使用rdd3.persist(StorageLevel.MEMORY_AND_DISK) # 如果内存比较小的集群建议使用rdd3.persist(StorageLevel.DISK_ONLY)或者就别用缓存了 用CheckPoint# 主动清理缓存的API rdd.unpersist()3.2.2 缓存特点 缓存技术可以将过程RDD数据持久化保存到内存或者硬盘上但是这个保存在设定上是认为不安全的。 缓存的数据在设计上是认为有丢失风险的。 所以缓存有一个特点就是其保留RDD之间的血缘依赖关系 一旦缓存丢失可以基于血缘关系的记录重新计算这个RDD的数据 缓存如何丢失 在内存中的缓存是不安全的比如断电、计算任务内存不足把缓存清理给计算让路硬盘中因为硬盘损坏也是可能丢失的。 代码演示31_cache.py # coding:utf8from pyspark.storagelevel import StorageLevelfrom pyspark import SparkConf, SparkContextimport timeif __name__ __main__:conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)rdd1 sc.textFile(../data/input/words.txt)rdd2 rdd1.flatMap(lambda x: x.split( ))rdd3 rdd2.map(lambda x: (x, 1))# 给rdd3加缓存# rdd3.cache()rdd3.persist(StorageLevel.MEMORY_AND_DISK_2) # 设置缓存级别rdd4 rdd3.reduceByKey(lambda a, b: a b)result rdd4.collect()print(result)rdd5 rdd3.groupByKey()rdd6 rdd5.mapValues(lambda x:sum(x))print(rdd6.collect())# 取消缓存rdd3.unpersist()time.sleep(10000000)输出结果 [(hadoop, 1), (hello, 3), (spark, 1), (flink, 1)] [(hadoop, 1), (hello, 3), (spark, 1), (flink, 1)]3.2.3 缓存是如何保存的 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EP3ykOn3-1692435339403)(https://cdn.jsdelivr.net/gh/Sql88/BlogImgmain/img/%E7%BC%93%E5%AD%98%E6%98%AF%E5%A6%82%E4%BD%95%E4%BF%9D%E5%AD%98%E7%9A%842.png)] 如图RDD是将自己分区的数据每个分区自行将其数据保存在其所在的Executor内存和硬盘上。 这是分散存储 3.3 RDD的CheckPoint 3.3.1 RDD CheckPoint CheckPoint技术也是将RDD的数据保存起来。 但是它仅支持硬盘存储 并且 它被设计认为是安全的不保留血缘关系 3.3.2 CheckPoint是如何保存数据的 如图CheckPoint存储RDD数据是集中收集各个分区数据进行存储。而缓存是分散存储 3.3.3 缓存和CheckPoint的对比 CheckPoint不管分区数量多少风险是一样的缓存分区越多风险越高CheckPoint支持写入HDFS缓存不行HDFS是高可靠存储CheckPoint被认为是安全的CheckPoint不支持内存缓存可以缓存如果写内存性能比CheckPoint要好一些CheckPoint因为设计是安全的所以不保留血缘关系而缓存因为设计上认为不安全所以保留 3.3.4 代码 # 设置CheckPoint第一件事情选择CP的保存路径 # 如果是Local模式可以支持本地文件系统如果在集群运行千万要用HDFS sc.setCheckpointDir(hdfs://node1:8020/output/bj52ckp) # 用的时候直接调用checkPoint算子即可。 rdd.checkpoint()完整代码演示32_checkPoint.py # coding:utf8import jsonfrom pyspark.storagelevel import StorageLevelfrom pyspark import SparkConf, SparkContextimport timeif __name__ __main__:conf SparkConf().setAppName(test).setMaster(local[*])sc SparkContext(confconf)# 1.告知spark开启checkPoint功能sc.setCheckpointDir(hdfs://Tnode1:8020/output/ckp)rdd1 sc.textFile(../data/input/words.txt)rdd2 rdd1.flatMap(lambda x: x.split( ))rdd3 rdd2.map(lambda x: (x, 1))# 调用checkPoint API 保存数据即可rdd3.checkpoint()rdd4 rdd3.reduceByKey(lambda a, b: a b)result rdd4.collect()print(result)rdd5 rdd3.groupByKey()rdd6 rdd5.mapValues(lambda x:sum(x))print(rdd6.collect())# 取消缓存rdd3.unpersist()time.sleep(10000000)输出结果 [(hadoop, 1), (hello, 3), (spark, 1), (flink, 1)] [(hadoop, 1), (hello, 3), (spark, 1), (flink, 1)]3.3.5 注意 CheckPoint是一种重量级的使用也就是RDD的重新计算成本很高的时候我们采用CheckPoint比较合适。 或者数据量很大用CheckPoint比较合适。 如果数据量小或者RDD重新计算是非常快的用CheckPoint没啥必要 Cache和CheckPoint两个API都不是Action类型 所以想要它俩工作必须在后面接上Action 接上Action的目的是让RDD有数据而不是为了CheckPoint和Cache工作。 3.3.6 总结 1.Cache和CheckPoint的区别 Cache是轻量化保存RDD数据可存储在内存和硬盘是分散存储设计上数据是不安全的保留RDD血缘关系CheckPoint是重量级保存RDD数据是集中存储只能存储在硬盘HDFS上设计上是安全的不保留RDD血缘关系 2.Cache和CheckPoint的性能对比 Cache性能更好因为是分散存储各个Executor并行执行效率高可以保存到内存中占内存更快CheckPoint比较慢因为是集中存储涉及到网络IO但是存储到HDFS上更加安全多副本 4. Spark案例练习 4.1 搜索引擎日志分析案例 数据格式 需求 用户搜索的关键词分析用户和关键词组合分析热门搜索时间段分析 案例实现代码 # coding:utf8from pyspark import SparkConf, SparkContext from pyspark.storagelevel import StorageLevel import jiebafrom operator import adddef context_jieba(data):通过jieba分词工具 进行分词操作seg jieba.cut_for_search(data)l []for word in seg:l.append(word)return ldef filter_words(data):过滤不要的 谷、帮、客 湖return data not in [谷, 帮, 客, 湖]def append_words(data):修订某些关键词的内容if data 传智播: data 传智播客if data 院校: data 院校帮if data 博学: data 博学谷if data 数据: data 数据湖return (data, 1)def extract_user_and_word(data):传入数据是 元组(1,我喜欢传智播客)user_id data[0]content data[1]# 对content进行分词words context_jieba(content)return_list []for word in words:# 不要忘记过滤 \谷\帮\客\湖if filter_words(word):return_list.append((user_id _ append_words(word)[0], 1))return return_listif __name__ __main__:conf SparkConf().setAppName(SparkDemo2)sc SparkContext(confconf)# 1.读取文件file_rdd sc.textFile(hdfs://Tnode1/input/SogouQ.txt)# 2. 对数据进行切分 \tsplit_rdd file_rdd.map(lambda x: x.split(\t))# 3. 因为要做多个需求split_rdd 作为基础的rdd 会被多次使用split_rdd.persist(StorageLevel.DISK_ONLY)# TODO:需求1用户搜索的关键‘词’分析# 主要分析热点词# 将所有的搜索内容取出# print(split_rdd.takeSample(True, 3))context_rdd split_rdd.map(lambda x: x[2])# 对搜索的内容进行分词分析words_rdd context_rdd.flatMap(context_jieba)# print(words_rdd.collect())# 异常的数据# 数据 湖 —— 数据湖# 院校 帮 —— 院校帮# 博学 谷 —— 博学谷# 传智播 客—— 传智播客filtered_rdd words_rdd.filter(filter_words)# 将关键词转换传智播 -- 传智播客final_words_rdd filtered_rdd.map(append_words)# 对单词进行分组、聚合、排序 求出前五名result1 final_words_rdd.reduceByKey(lambda a, b: a b). \sortBy(lambda x: x[1], ascendingFalse, numPartitions1). \take(5)print(需求1结果, result1)# TODO需求2用户和关键词组合分析# 1我喜欢传智播客# 1 我 1喜欢 1传智播客user_content_rdd split_rdd.map(lambda x: (x[1], x[2]))# 对用户的搜索内容进行分词分词后和用户ID再次组合user_word_with_one_rdd user_content_rdd.flatMap(extract_user_and_word)# 对内容进行分组、聚合、排序、求前5result2 user_word_with_one_rdd.reduceByKey(lambda a, b: a b). \sortBy(lambda x: x[1], ascendingFalse, numPartitions1). \take(5)print(需求2结果, result2)# TODO:需求3热门搜索时间段分析# 取出来所有的时间time_rdd split_rdd.map(lambda x: x[0])# 对时间进行处理只保留小时精度即可hour_with_one_rdd time_rdd.map(lambda x: (x.split(:)[0], 1))# 分组、聚合、排序result3 hour_with_one_rdd.reduceByKey(add). \sortBy(lambda x: x[1], ascendingFalse, numPartitions1). \collect()print(需求3结果, result3) 输出结果 需求1结果 [(scala, 2310), (hadoop, 2268), (博学谷, 2002), (传智汇, 1918), (itheima, 1680)] 需求2结果 [(6185822016522959_scala, 2016), (41641664258866384_博学谷, 1372), (44801909258572364_hadoop, 1260), (7044693659960919_仓库, 1120), (15984948747597305_传智汇, 1120)] 需求3结果 [(20, 3479), (23, 3087), (21, 2989), (22, 2499), (01, 1365)4.2 提交到集群运行 # 普通提交 /export/server/spark/bin/spark-submit --master yarn SparkDemo2.py# 压榨集群式提交 # 每个executor吃14g内存8核cpu总共3个executor /export/server/spark/bin/spark-submit --master yarn --executor-memory 14g --executor-cores 8 --num-executors 3 ./SparkDemo2.py输出结果 要注意代码中 master部分删除读取的文件路径改为hdfs才可以 4.3 作业 代码演示 # coding:utf8from pyspark import SparkContext, StorageLevel from pyspark import SparkConfif __name__ __main__:conf SparkConf().setAppName(sparkHomeWork01).setMaster(local[*])sc SparkContext(confconf)file_rdd sc.textFile(../../data/input/apache.log)file_rdd.persist(StorageLevel.MEMORY_AND_DISK_2)# 需求1TODO计算当前网站访问的PV被访问次数visit_Num file_rdd.count()print(当前网站的被访问次数, visit_Num) # 14# 需求2TODO当前网站访问的用户数userNum file_rdd.distinct().count()print(当前网站的访问用户数, userNum) ## 需求3TODO有哪些IP访问了本网站Ip_rdd1 file_rdd.map(lambda x: x.split( ))Ip_rdd1.cache()Ip_rdd2 Ip_rdd1.map(lambda x: x[0]).distinct()# print(IP_rdd2.collect())print(有哪些IP访问了本网站, Ip_rdd2.collect())# 需求4 TODO哪个页面访问量最高page_rdd1 Ip_rdd1.map(lambda x:x[-1])page_rdd2 page_rdd1.map(lambda x:(x,1))page_rdd3 page_rdd2.reduceByKey(lambda a,b:ab)# page page_rdd3.sortBy(lambda x:x[1],ascendingFalse,numPartitions1).take(1)page page_rdd3.takeOrdered(1,lambda x:-x[1])page page[0]print(page)print(访问量最高的页面是,page[0],共被访问,page[1],次)输出结果sparkHomeWork01.py 当前网站的被访问次数 14 当前网站的访问用户数 9 有哪些IP访问了本网站 [83.149.9.216, 10.0.0.1, 86.149.9.216] (/presentations/logstash-monitorama-2013/css/print/paper.css, 13) 访问量最高的页面是 /presentations/logstash-monitorama-2013/css/print/paper.css 共被访问 13 次5. 共享变量 5.1 广播变量 5.1.1 问题引出 有如下代码 上述代码本地list对象和分布式对象RDD有了关联。如下图 本地list对象被发送到每个分区的处理线程上使用也就是一个executor内其实存放了2份一样的数据。 executor是进程进程内资源共享这2份数据没有必要造成了内存浪费。 5.1.2 解决方案-广播变量 如果本地list对象标记为广播变量对象那么 当上述场景出现的时候Spark只会 给每个Executor来一份数据而不像原本那样每一个分区的处理线程都来一份节省内存。 如图使用广播变量后每个Executor只会收到一份数据集。 内部的各个线程分区共享这一份数据集。 使用方式 # 1. 将本地list 标记成广播变量即可 broadcast sc.broadcast(stu_info_list)# 2. 使用广播变量从broadcast对象中取出本地list对象即可 value broadcast.value# 也就是 先放进去broadcast内部然后从broadcast内部在取出来用中间传输的是broadcast这个对象了 # 只要中间传输的是broadcast对象spark就会留意只会给每个Executor发一份了而不是傻傻的哪个分区都要给代码演示33_broadcast.py # coding:utf8import jsonfrom pyspark.storagelevel import StorageLevelfrom pyspark import SparkConf, SparkContextimport timeif __name__ __main__:conf SparkConf().setAppName(33_broadcast.py).setMaster(local[*])sc SparkContext(confconf)stu_info_list [(1, 张大仙, 11),(2, 王晓晓, 13),(3, 张甜甜, 11),(4, 王大力, 11)]# 1.将本地Python List对象标记为广播变量broadcast sc.broadcast(stu_info_list)score_info_rdd sc.parallelize([(1, 语文, 99),(2, 数学, 99),(3, 英语, 99),(4, 编程, 99),(1, 语文, 99),(2, 编程, 99),(3, 语文, 99),(4, 英语, 99),(1, 语文, 99),(3, 英语, 99),(2, 编程, 99)])def map_func(data):name id data[0]# 匹配本地list和分布式rdd中的学生ID 匹配成功后 即可获得当前学生的姓名# 2.在使用到本地集合对象的地方从广播变量中取出来用即可for stu_info in broadcast.value:if stu_info[0] id:name stu_info[1]breakreturn (name, data[1], data[2])print(score_info_rdd.map(map_func).collect()) 广播变量使用场景本地集合对象和 分布式集合对象(RDD) 进行关联的时候 需要将本地集合对象封装为广播变量 可以节省 1. 网络IO的次数 2. Executor的内存占用输出结果 [(张大仙, 语文, 99), (王晓晓, 数学, 99), (张甜甜, 英语, 99), (王大力, 编程, 99), (张大仙, 语文, 99), (王晓晓, 编程, 99), (张甜甜, 语文, 99), (王大力, 英语, 99), (张大仙, 语文, 99), (张甜甜, 英语, 99), (王晓晓, 编程, 99)]5.2 累加器 5.2.1 需求 想要对map算子计算中的数据进行数据累加得到全部数据计算完后的累加结果 5.2.2 没有累加器的代码演示 # coding:utf8 from pyspark import SparkConf, SparkContext# 演示spark的accumulator累加器 if __name__ __main__:conf SparkConf().setAppName(create rdd).setMaster(local[*])sc SparkContext(confconf)rdd sc.parallelize([1,2,3,4,5,6,7,8,9,10],2)count 0def map_func(data):global countcount 1print(count)rdd.map(map_func).collect()print(count)# 代码中count 最后打印结果是0输出结果 1 2 3 4 5 1 2 3 4 5 0代码的问题在于 count来自driver对象当在分布式的map算子中需要count对象的时候driver会将count对象发送给每一个executor一份复制发送每个executor各自收到一个在最后执行print(count) 的时候这个被打印的count依旧是driver那个所以不管executor中累加到多少都和driver这个count无关 5.2.3 解决方法-累加器 代码演示 # coding:utf8import jsonfrom pyspark.storagelevel import StorageLevelfrom pyspark import SparkConf, SparkContextimport timeif __name__ __main__:conf SparkConf().setAppName(33_broadcast.py).setMaster(local[*])sc SparkContext(confconf)# 10条数据 2个分区rdd sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 2)# count 0# Spark 提供的累加器变量参数是初始值acmlt sc.accumulator(0)def map_func(data):global acmltacmlt 1print(acmlt)rdd.map(map_func).collect()# rdd2 rdd.map(map_func)# rdd2.cache()# rdd2.collect()# rdd3 rdd2.map(lambda x:x)# rdd3.collect()print(acmlt) # 结果是10累加器使用的注意点某个rdd使用完后再被新的rdd重新调用有可能会产生和想象中不一样的结果避免方法给需要重用的rdd加缓存如上代码将全部的count对象都替换成acmlt对象即可 这个对象就是累加器对象构建方式sc.accumulator(初始值)即可构建。 这个对象唯一和前面提到的count不同的是这个对象可以从各个Executor中收集它们的执行结果作用回自己身上。 输出结果 1 2 3 4 5 1 2 3 4 5 105.2.4 累加器的注意事项 如上代码第一次rdd2被action后累加器值是10然后rdd2就没有了没数据了 当rdd3构建出来的时候是依赖rdd2rdd2没数据那么rdd2就要重新生成 重新生成就导致累加器累加数据的代码再次被执行 所以代码的结果是20 也就是说使用累加器的时候要注意因为rdd是过程数据如果rdd被多次使用 可能重新构建此rdd 如果累加器累加代码存在重新构建的步骤中 累加器累加代码就可能被多次执行。 如何解决加缓存或者CheckPoint即可 5.3 综合案例 5.3.1 需求 对上面的数据执行 正常的单词进行单词计数特殊字符统计出现有多少个 特殊字符定义如下 abnormal_char [,,.,!,#,$,%]代码演示 # coding:utf8import jsonfrom pyspark.storagelevel import StorageLevelfrom pyspark import SparkConf, SparkContextimport time import reif __name__ __main__:conf SparkConf().setAppName(35_demo.py).setMaster(local[*])sc SparkContext(confconf)# 1.读取数据文件file_rdd sc.textFile(../data/input/accumulator_broadcast_data.txt)# 特殊字符的List定义abnormal_char [,, ., !, #, $, %]# 2. 特殊字符list 包装成广播变量broadcast sc.broadcast(abnormal_char)# 3.对特殊字符出现次数做累加累加使用累加器最好acmlt sc.accumulator(0)# 4.数据处理先处理数据的空行,在Python中有内容就是True None就是Falselines_rdd file_rdd.filter(lambda line: line.strip())# 5.去除前后的空格data_rdd lines_rdd.map(lambda line: line.strip())# 6.对数据进行切分按照正则表达式切分因为空格分隔符某些单词之间是两个或多个空格# 正则表达式 \s 表示 不确定多少个空格最少一个空格words_rdd data_rdd.flatMap(lambda line: re.split(\s, line))# 7. 当前words_rdd中有正常单词也有特殊符号# 现在需要过滤数据保留正常单词用于单词计数在过滤的过程中 对特殊符号做计数def filter_func(data):global acmlt# 取出广播变量中存储的特殊符号lsitabnormal_chars broadcast.valueif data in abnormal_chars:# 表示这个是特殊字符acmlt 1return Falseelse:return Truenormal_words_rdd words_rdd.filter(filter_func)# 8. 正常单词的单词计数逻辑result_rdd normal_words_rdd.map(lambda x: (x, 1)). \reduceByKey(lambda a, b: a b)print(正常单词计数结果,result_rdd.collect())print(特殊字符数量,acmlt)输出结果 正常单词计数结果 [(hadoop, 3), (hive, 6), (hdfs, 2), (spark, 11), (mapreduce, 4), (sql, 2)] 特殊字符数量 85.4 总结 广播变量解决了什么问题 分布式集合RDD和本地集合进行关联使用的时候降低内存占用以及减少网络IO传输提高性能。 累加器解决了什么问题 分布式代码执行中进行全局累加 6.Spark内核调度重点理解 6.1 DAG 6.1.1 DAG Spark的核心是根据RDD来实现的Spark Scheduler则为Spark核心实现的重要一环其作用就是任务调度。Spark 的任务调度就是如何组织任务去处理RDD中每个分区的数据根据RDD的依赖关系构建DAG基于DAG划分Stage 将每个Stage中的任务发到指定节点运行。基于Spark的任务调度原理可以合理规划资源利用做到尽可能用最少的 资源高效地完成任务计算。 以词频统计WordCount程序为例DAG图 DAG有向无环图拓扑结构 有向有方向 无环没有闭环 DAG有方向没有形成闭环的一个执行流程图 比如 此图就是一个典型的DAG图。 有方向RDD1——RDD2——…——collect结束 无闭环以action(collect) 结束了没有形成闭环循环 作用标识代码的逻辑执行流程 6.1.2 Job和Action Action返回值不是RDD的算子 它的作用是一个触发开关会将action算子之前的一串rdd依赖执行起来 如图我们前面写的搜索引擎日志分析案例中前两个需求就是2个action就产生了2个DAG 结论 1个Action会产生1个DAG如果在代码中有3个Action就产生3个DAG 一个Action产生的一个DAG会在程序运行中产生一个JOB 所以1个ACTION 1个DAG 1个JOB 如果一个代码中写了3个Action那么这个代码运行起来产生3个JOB每个JOB有自己的DAG 一个代码运行起来在Spark中称之为Application 层级关系 1个Application中可以有多个JOB每一个JOB内含一个DAG同时每一个JOB都是由一个Action产生的。 6.1.3 DAG和分区 DAG是Spark代码的逻辑执行图这个DAG的最终作用是为了构建物理上的Spark详细执行计划而生。 所以由于Spark是分布式多分区的那么DAG和分区之间也是有关联的。 rdd1 sc.textFile() rdd2 rdd1.flatMap() rdd3 rdd2.map() rdd4 rdd3.reduceByKey() rdd4.action()假设全部RDD都是3个分区在执行 如图就得到了带有分区关系的DAG图 6.2 DAG的宽窄依赖和阶段划分 在Spark RDD前后之间的关系为 窄依赖宽依赖 窄依赖父RDD的一个分区全部将数据发送给子RDD的一个分区 宽依赖父RDD的一个分区将数据发送给子RDD的多个分区 宽依赖还有一个别名shuffle 6.2.1 窄依赖 6.2.2 宽依赖 6.2.3 阶段划分 对于Spark来说会根据DAG按照宽依赖划分不同的DAG阶段 划分依据从后向前遇到宽依赖就划分出一个阶段称之为stage 如图可以看到在DAG中基于宽依赖将DAG划分成了2个stage 在stage的内部一定都是窄依赖 6.3 内存迭代计算 如图基于带有分区的DAG以及阶段划分。可以从图中得到 逻辑上最优的task分配一个task是一个线程来具体执行 那么如上图task1中rdd1 rdd2 rdd3的迭代计算都是由一个task(线程完成)这一阶段的这一条线是纯内存计算。 如上图task1 task2 task3就形成了三个并行的 内存计算管道。 Spark默认受到全局并行度的限制除了个别算子有特殊分区情况大部分的算子都会遵循全局并行度的要求来规划自己的分区数。 如果全局并行度是3其实大部分算子分区都是3 注意Spark 我们一般推荐只设置全局并行度不要在算子上设置并行度。 除了一些排序算子外计算算子就让他默认开分区就可以了 6.3.1 面试题 面试题1Spark是怎么做内存计算的DAG的作用Stage阶段划分的作用 Spark会产生DAG图DAG图会基于分区和宽窄依赖关系划分阶段一个阶段的内部都是窄依赖窄依赖内如果形成前后1:1的分区对应关系就可以产生许多内存迭代计算的管道这些内存迭代计算的管道就是一个个具体的执行Task一个Task是一个具体的线程任务跑在一个线程内就是走内存计算了。 面试题2Spark为什么比MapReduce快 Spark的算子丰富MapReduce算子匮乏Map和ReduceMapReduce这个编程模型很难在一套MR中处理复杂的任务。很多复杂任务是需要写多个MapReduce进行串联。多个MR串联通过磁盘交互数据。Spark可以执行内存迭代算子之间形成DAG基于依赖划分阶段后在阶段内形成内存迭代管道。但是MapReduce的Map和Reduce之间的交互依旧是通过硬盘来交互的。 总结 编程模型上Spark占优算子够多算子交互上和计算行可以尽量多的内存计算而非磁盘迭代 6.4 Spark并行度 Spark的并行度在同一时间内有多少个task在同时运行 并行度并行能力的设置 比如设置并行度6其实就是要6个task并行在跑 在有了6个task并行的前提下rdd的分区就被规划成6个分区了。 6.4.1 如何设置并行度 可以在代码中配置文件中以及提交程序的客户端参数中设置 优先级从高到低 代码中客户端提交参数中配置文件中默认1但是不会全部以1来跑多数时候基于读取文件的分片数量来作为默认并行度 全局并行度配置的参数spark.default.parallelism 6.4.2 全局并行度-推荐 配置文件中 conf/spark-defaults.conf 中设置 spark.default.parallelism 100在客户端提交参数中 bin/spark-submit --conf spark.default.parallelism100在代码中设置 conf SparkConf() conf.set(spark.default.parallelism,100)全局并行度是推荐设置不要针对RDD改分区可能会影响内存迭代管道的构建或者会产生额外的shuffle 6.4.3 针对RDD的并行度设置-不推荐 只能在代码中写算子 repartition算子coalesce算子partitionBy算子 6.4.4 集群中如何规划并行度 结论设置为CPU总核心的2~10倍 比如集群可用CPU核心是100个我们建议并行度是200~1000 确保是CPU核心的整数倍即可最小是2倍最大一般10倍或者更高适量均可 为什么要设置最少2倍 CPU的一个核心同一时间只能干一件事。 所以在100个核心的情况下设置100个并行就能让CPU100%出力。 这种设置下如果task的压力不均衡某个task先执行完了就导致某个CPU核心空闲 所以我们将Task并行分配的数量变多比如100个并行同一时间只有100个在运行700个在等待 但是可以确保某个task运行完了后续有task补上不让cpu闲下来最大程度利用集群的资源。 规划并行度只看集群总CPU核数 6.5 Spark任务调度 Spark的任务由Driver进行调度这个工作包含 逻辑DAG产生分区DAG产生Task划分将Task分配给Executor并监控其工作 如图Spark程序的调度流程如图 Driver被构建出来构建SparkContext执行环境入口对象基于DAG SchedulerDAG调度器构建逻辑Task分配基于TaskScheduleTask调度器将逻辑Task分配到各个Executor上干活并监控它们。WorkerExecutor被TaskScheduler管理监控听从它们的指令干活并定期汇报进度。 12,3,4都是Driver的工作 5是Worker的工作 6.5.1 Drivcer内的两个组件 DAG调度器 工作内容将逻辑的DAG图进行处理最终得到逻辑上的Task划分 Task调度器 工作内容基于DAG Scheduler的产出来规划这些逻辑的task应该在哪些物理的executor上运行以及监控管理它们的运行。 6.6 拓展-Spark概念名词大全 6.6.1 Spark运行中的概念名词大全 层级关系梳理 一个Spark环境可以运行多个Application一个代码运行起来会成为一个ApplicationApplication内部可以有多个Job每个Job由一个Action产生并且每个Job有自己的DAG执行图一个Job的DAG图会基于宽窄依赖划分成不同的阶段不同阶段内基于分区数量形成多个并行的内存迭代管道每一个内存迭代管道形成一个TaskDAG调度器划分将Job内划分出具体的task任务一个Job被划分出来的task在逻辑上称之为这个job的taskset 6.7 SparkShuffle 6.7.1MR Shuffle回顾 首先回顾MapReduce框架中Shuffle过程整体流程图如下 6.7.2 简介 Spark在DAG调度阶段会将一个Job划分为多个Stage上游Stage做map工作下游Stage做reduce工作其本质上 还是MapReduce计算框架。Shuffle是连接map和reduce之间的桥梁它将map的输出对应到reduce输入中涉及 到序列化反序列化、跨节点网络IO以及磁盘读写IO等。 Spark的Shuffle分为Write和Read两个阶段分属于两个不同的Stage前者是Parent Stage的最后一步后者是 Child Stage的第一步。 执行Shuffle的主体是Stage中的并发任务这些任务分ShuffleMapTask和ResultTask两种ShuffleMapTask要进行 ShuffleResultTask负责返回计算结果一个Job中只有最后的Stage采用ResultTask其他的均为ShuffleMapTask 。如果要按照map端和reduce端来分析的话ShuffleMapTask可以即是map端任务又是reduce端任务因为 Spark中的Shuffle是可以串行的ResultTask则只能充当reduce端任务的角色。 Spark在1.1以前的版本一直是采用Hash Shuffle的实现的方式到1.1版本时参考Hadoop MapReduce的实现开始引 入Sort Shuffle在1.5版本时开始Tungsten钨丝计划引入UnSafe Shuffle优化内存及CPU的使用在1.6中将 Tungsten统一到Sort Shuffle中实现自我感知选择最佳Shuffle方式到的2.0版本Hash Shuffle已被删除所有 Shuffle方式全部统一到Sort Shuffle一个实现中。 在Spark的中负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager也即shuffle管理器。ShuffleManager随着 Spark的发展有两种实现的方式分别为HashShuffleManager和SortShuffleManager因此spark的Shuffle有Hash Shuffle和Sort Shuffle两种。 在Spark 1.2以前默认的shuffle计算引擎是HashShuffleManager。该ShuffleManager而HashShuffleManager有着一个非常严重 的弊端就是会产生大量的中间磁盘文件进而由大量的磁盘IO操作影响了性能。 因此在Spark 1.2以后的版本中默认的ShuffleManager改成了SortShuffleManager。SortShuffleManager相较于 HashShuffleManager来说有了一定的改进。主要就在于每个Task在进行shuffle操作时虽然也会产生较多的临时磁盘文件但 是最后会将所有的临时文件合并(merge)成一个磁盘文件因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉 取自己的数据时只要根据索引读取每个磁盘文件中的部分数据即可。 6.7.3 Sort Shuffle bypass机制 bypass运行机制的触发条件如下 shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold200参数的值。不是map combine聚合的shuffle算子(比如reduceByKey有map combie)。 bypass运行机制的触发条件如下 1)shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold200参数的值。 2)不是map combine聚合的shuffle算子(比如reduceByKey有map combie)。 此时task会为每个reduce端的task都创建一个临时磁盘文件并将数据按key进行hash然后根据key的hash值 将key写入对应的磁盘文件之中。当然写入磁盘文件时也是先写入内存缓冲缓冲写满之后再溢写到磁盘文件的 。最后同样会将所有临时磁盘文件都合并成一个磁盘文件并创建一个单独的索引文件。 该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的因为都要创建数量惊人的磁盘文件 只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件也让该机制相对未经优化的 HashShuffleManager来说shuffle read的性能会更好。 而该机制与普通SortShuffleManager运行机制的不同在于 第一磁盘写机制不同; 第二不会进行排序。也就是说启用该机制的最大好处在于shuffle write过程中不需要进行数据的排序操作 也就节省掉了这部分的性能开销 总结 SortShuffle也分为普通机制和bypass机制普通机制在内存数据结构(默认为5M)完成排序会产生2M个磁盘小文件。而当shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。或者算子不是聚合类的 shuffle算子(比如reduceByKey)的时候会触发SortShuffle的bypass机制SortShuffle的bypass机制不会进行排序 极大的提高了其性能。 6.7.4 Shuflle的配置选项 Shuffle阶段划分 shuffle writemapper阶段上一个stage得到最后的结果写出 shuffle read reduce阶段下一个stage拉取上一个stage进行合并 spark 的shuffle调优主要是调整缓冲的大小拉取次数重试重试次数与等待时间内存比例分配是否进行排序操作等等 spark.shuffle.file.buffer 参数说明该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小默认是32K。将数据写到磁盘文件之前会先写入buffer缓冲中待缓冲写满之后才会溢写 到磁盘。 调优建议如果作业可用的内存资源较为充足的话可以适当增加这个参数的大小比如64k从而减少shuffle write过程中溢写磁盘文件的次数也就可以减少磁盘IO次数进而提升性 能。在实践中发现合理调节该参数性能会有1%~5%的提升。 spark.reducer.maxSizeInFlight 参数说明该参数用于设置shuffle read task的buffer缓冲大小而这个buffer缓冲决定了每次能够拉取多少数据。(默认48M) 调优建议如果作业可用的内存资源较为充足的话可以适当增加这个参数的大小比如96m从而减少拉取数据的次数也就可以减少网络传输的次数进而提升性能。在实践中发现 合理调节该参数性能会有1%~5%的提升。 spark.shuffle.io.maxRetries and spark.shuffle.io.retryWait spark.shuffle.io.maxRetries shuffle read task从shuffle write task所在节点拉取属于自己的数据时如果因为网络异常导致拉取失败是会自动进行重试的。该参数就代表了可以重试 的最大次数。默认是3次 spark.shuffle.io.retryWait该参数代表了每次重试拉取数据的等待间隔。默认为5s 调优建议一般的调优都是将重试次数调高不调整时间间隔。 spark.shuffle.memoryFraction 参数说明该参数代表了Executor内存中分配给shuffle read task进行聚合操作内存比例。 spark.shuffle.manager 参数说明该参数用于设置shufflemanager的类型默认为sort.Spark1.5x以后有三个可选项 Hashspark1.x版本的默认值HashShuffleManager Sortspark2.x版本的默认值普通机制当shuffle read task 的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数自动开启bypass 机制 spark.shuffle.sort.bypassMergeThreshold 参数说明当ShuffleManager为SortShuffleManager时如果shuffle read task的数量小于这个阈值默认是200则shuffle write过程中不会进行排序操作。 调优建议当你使用SortShuffleManager时如果的确不需要排序操作那么建议将这个参数调大一些 6.8 总结 DAG是什么有什么用 DAG是有向无环图用以描述任务执行流程主要作用就是协助DAG调度器构建Task分配用以做任务管理 内存迭代、阶段划分 基于DAG的宽窄依赖划分阶段阶段内部都是窄依赖可以构建内存迭代的管道 DAG调度器是 构建Task分配以做任务管理
http://www.zqtcl.cn/news/949469/

相关文章:

  • 龙湖地产 网站建设高端上海网站设计公司
  • 触屏手机网站模板装修设计软件排名
  • 怎么做盗文网站郑州建设教育培训中心
  • 网站安全解决方案嵌入式软件工程师培训
  • 怎么做一种网站为别人宣传网站界面切片做程序
  • 麻涌网站建设河北网站建设联系方式
  • 建设银行官方网站打不开啊寮步仿做网站
  • 一个人可做几次网站备案峰峰网站建设
  • 怎么盗号网站怎么做北京高端网站设计外包公司
  • 著名的淘宝客网站wordpress博客内容预览
  • 成都网站seo公司甘肃网站建设推广
  • 做网站加班网站项目意义
  • 在虚拟机中如何做二级域名网站个人网站做哪种能赚钱
  • 贵州建设水利厅考试网站wordpress主查询翻页
  • 网站优化网络推广seo天津建设工程信息网几点更新
  • 兰州网站seo技术厂家比较实用的h5网页建设网站
  • 怎样让自己做的网站被百度收录动漫制作软件
  • 西安网站制作哪家公司好怎么向企业推销网站建设
  • 电子商务网站建设新闻深圳坂田网站设计公司有哪些
  • 上海电子商城网站制作wordpress循环该分类子分类
  • 茶山做网站教育网站建设计划书
  • 成品门户网站源码免费海外网络加速器免费
  • 企业网站怎么建设公司深圳企业招聘信息最新招聘信息
  • 天津网站经营性备案下载网站上的表格 怎么做
  • 胶州企业网站设计十大互联网营销公司
  • 视频解析wordpresswordpress 优化版本
  • 柳州网站建设哪家便宜广东省建设厅三库一平台
  • 云南城市建设官方网站wordpress和织梦哪个好
  • 国外企业招聘网站专门做外贸的网站有哪些
  • 陕西交通建设集团网站营销公司是什么意思