当前位置: 首页 > news >正文

广州网站设计 信科网络wordpress shortcode土豆 视频

广州网站设计 信科网络,wordpress shortcode土豆 视频,计算机信息网络系统,建设营销型网站的步骤Spark学习–spark算子介绍 1.基本概念 spark算子#xff1a;为了提供方便的数据处理和计算#xff0c;spark提供了一系列的算子来进行数据处理。一般算子分为action#xff08;执行算子#xff09;算子Transformation#xff08;懒执行#xff09;算子。2.Transformatio…Spark学习–spark算子介绍 1.基本概念 spark算子为了提供方便的数据处理和计算spark提供了一系列的算子来进行数据处理。一般算子分为action执行算子算子Transformation懒执行算子。2.Transformation算子基本介绍 简介transformation被称为懒执行算子如果没有action算子则代码是不会执行的一般分为 map算子map算子是将rdd中的数据一条一条传递给后面的函数将函数的返回值构建成一个新的rdd。map算子是不会生成shuffle。后面的分区数等于map算子的分区数。 object Demo2Map {def main(args: Array[String]): Unit {//saprk代码的入口val conf new SparkConf()conf.setMaster(local).setAppName(map)val sc new SparkContext(conf)/*** 构建rdd的方法* 1.读取文件* 2.基于scala的集合构建rdd ---- 用于测试**/val listRDD: RDD[Int] sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9),2)/*** map算子* 将rdd中的数据一条一条传递给后面的函数将函数的返回值构建成一个新的rdd* map 不会产生shufflemap之后的分区数等于map之前rdd的分区数**如果一个算子是一个新的rdd那么这个算子就是转换算子。*/val mapRDD: RDD[Int] listRDD.map{i i * 2}//一次遍历整个分区的数据,将每一个分区的数据传递给后面的函数函数需要返回一个迭代器再构建一个新的rdd。val mapPartitionRDD: RDD[Int] listRDD.mapPartitions {case iter: Iterator[Int] iter}val mapPartitionRDD2: RDD[Int] listRDD.mapPartitions {case iter: Iterator[Int] val iterator: Iterator[Int] iter.map(i i * 2)//最后一行作为返回值iterator}mapPartitionRDD2.foreach(println)mapPartitionRDD.foreach(println)val mapPartitionsWithIndexRDD: RDD[Int] listRDD.mapPartitionsWithIndex((index: Int, iter: Iterator[Int]) {println(smapPartitionsWithIndexRDD的分区为:$index)iter})mapPartitionsWithIndexRDD.foreach(println)} }flat算子:对RDD中的数据进行过滤通过返回true保留数据函数返回false过滤数据。转换算子懒执行 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object Demo4Filter {def main(args: Array[String]): Unit {val conf new SparkConf()conf.setMaster(local).setAppName(filter)val sc new SparkContext(conf)val ListRDD: RDD[Int] sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 0), 2)/*** filter: 对RDD中的数据进行过滤通过返回true保留数据函数返回false过滤数据** filter: 转换算子懒执行*/val filterRDD: RDD[Int] ListRDD.filter(i {i % 2 1})filterRDD.foreach(println)} }flatmap算子将rdd的数据一条一条传递给后面的函数函数的返回值是一个集合最后将这个集合拆分出来构建成新的rdd import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDDobject Demo5Flatmap {def main(args: Array[String]): Unit {val conf new SparkConf()conf.setMaster(local).setAppName(filter)val sc new SparkContext(conf)val listRDD: RDD[String] sc.parallelize(List(java,spark,java,spark,scala,hadoop))/*** 将rdd的数据一条一条传递给后面的函数函数的返回值是一个集合* 最后将这个集合拆分出来构建成新的rdd*/val wordsRDD: RDD[String] listRDD.flatMap(line {val arr: Array[String] line.split(,)//返回值可以是一个数组listset map必须是scala中的集合arr.toList})wordsRDD.foreach(println)} }Sample算子:抽样,withReplacement:是否放回。fraction抽样比例。 package com.zjlimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object Demo6Sample {def main(args: Array[String]): Unit {val conf new SparkConf()conf.setMaster(local).setAppName(Demo6Sample)val sc new SparkContext(conf)val listRDD: RDD[Int] sc.parallelize(List(1, 2, 3, 4, 45, 6, 7, 8, 9, 0))val studentRDD: RDD[String] sc.textFile(D:\\data\\data\\ideadata\\idea-project\\big_data\\data\\students.txt)/*** sample:抽样。* withReplacement:是否放回。* fraction抽样比例。*/val sampleRDD: RDD[String] {studentRDD.sample(false, 0.1)}} }groupByKey算子:按照key进行分组必须是kv格式的才能用将同一个key的value放在迭代器中。相对比groupBy指定一个分组的罗列返回的RDD的value包含所有的列。shuffle过程中需要传输的数据量groupByKey要多性能差一点 package com.zjlimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object Demo8GroupByKey {def main(args: Array[String]): Unit {val conf: SparkConf {new SparkConf()}conf.setMaster(local).setAppName(groupByKey)val sc: SparkContext new SparkContext(conf)val linesRDD: RDD[String] sc.textFile(D:\\data\\data\\ideadata\\idea-project\\big_data\\spark\\data\\words.txt)val wordsRDD: RDD[String] linesRDD.flatMap(i i.split(,))val mapWordRDD: RDD[(String, Int)] wordsRDD.map(word (word, 1))/*** 按照key进行分组必须是kv格式的才能用将同一个key的value放在迭代器中*/val groupByKeyRDD: RDD[(String, Iterable[Int])] mapWordRDD.groupByKey()groupByKeyRDD.map({case(words:String, ints:Iterable[Int]) ints.sum})groupByKeyRDD.foreach(println)/*** groupBy:指定一个分组的罗列返回的RDD的value包含所有的列* shuffle过程中需要传输的数据量groupByKey要多性能差一点*/val groupByRDD: RDD[(String, Iterable[(String, Int)])] mapWordRDD.groupBy(kv kv._1)groupByRDD.foreach(println)}} reduceByKey算子按照key进行聚合计算会在map端进行预聚合只能做简单的聚合计算。 package com.zjlimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object Demo9ReduceByKey {def main(args: Array[String]): Unit {val conf new SparkConf().setAppName(reduceByKey).setMaster(local)val sc: SparkContext new SparkContext(conf)val linesRDD: RDD[String] sc.textFile(D:\\data\\data\\ideadata\\idea-project\\big_data\\spark\\data\\words.txt)val wordsRDD: RDD[String] linesRDD.flatMap(i i.split(,))val mapRDD: RDD[(String, Int)] wordsRDD.map(i (i, 1))/*** reduceByKey按照key进行聚合计算会在map端进行预聚合* 只能做简单的聚合计算*///统计单词数量val reducrByKeyRDD: RDD[(String, Int)] mapRDD.reduceByKey((x: Int, y: Int) x y)reducrByKeyRDD.foreach(println)}} union算子:合并两个rdd两个rdd的数据类型要一致但是只是代码层面的合并底层没有合并。这个属于并集如果取交集可以使用intersection算子。 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object Demo10Union {def main(args: Array[String]): Unit {val conf: SparkConf new SparkConf()conf.setMaster(local).setAppName(union)val sc: SparkContext new SparkContext(conf)val rdd1: RDD[Int] sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9))val rdd2: RDD[Int] sc.parallelize(List( 4, 5, 6, 7, 8, 9,10))/*** union:合并两个rdd两个rdd的数据类型要一致* union只是代码层面的合并底层没有合并* union不会产生shuffle*/val unionRDD: RDD[Int] rdd1.union(rdd2)unionRDD.foreach(println)/*** distinctRDD去重会产生shuffle* distinct:会先在map端局部去重再到reduce端全局去重*/val distinctRDD: RDD[Int] unionRDD.distinct()distinctRDD.foreach(println)/*** 所有会产生shuffle的算子都可以指定分区数。反过来也成立。*//*** intersection:取两个rdd的交集*/val interRDD: RDD[Int] rdd1.intersection(rdd2)interRDD.foreach(println)} } join算子:inner join:通过rdd的key进行关联必须是kv格式的rddleft join:以左表为主如果右表没有数据就会补一个nullright join和left join相反full join:两边都可能没有关联上如果是没关联上补null import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object Demo11Join {def main(args: Array[String]): Unit {val conf: SparkConf new SparkConf()conf.setMaster(local).setAppName(join)val sc: SparkContext new SparkContext(conf)val nameRDD: RDD[(String, String)] sc.makeRDD(List((001, 张三), (002, 李四), (003, 王五)))val ageRDD: RDD[(String, String)] sc.makeRDD(List((001, 23), (002, 35), (004, 19)))/*** inner join:通过rdd的key进行关联必须是kv格式的rdd*/ // //关联之后处理数据方法1--下划线方法val innerJoinRDD: RDD[(String, (String, String))] nameRDD.join(ageRDD) // innerJoinRDD.map(i{ // val id: String i._1 // val name: String i._2._1 // val age: String i._2._1 // })//关联之后处理数据2--模式匹配val rdd1: RDD[(String, String, Int)] innerJoinRDD.map(i {case (id: String, (name: String, age: String)) (id, name, age)})rdd1.foreach(println)/*** left join:以左表为主如果右表没有数据就会补一个null* 数据中右表没有003所有会补一个null* Option[String]:没有值就是None* right join:和left join相反*/val leftRDD: RDD[(String, (String, Option[Int]))] nameRDD.leftOuterJoin(ageRDD)leftRDD.foreach(println)//整理数据val rdd2: RDD[(String, String, Int)] leftRDD.map({//匹配关联成功的数据case (id: String, (name: String, Some(age))) (id, name, age)//匹配未关联成功的数据case (id: String, (name: String, None)) (id, name, 0)})rdd2.foreach(println)/*** full join:两边都可能没有关联上如果是没关联上补null*/val fullRDD: RDD[(String, (Option[String], Option[Int]))] nameRDD.fullOuterJoin(ageRDD)//整理数据val rdd3: RDD[(String, String, Int)] fullRDD.map {case (id: String, (Some(name), Some(age))) (id, name, age)case (id: String, (None, Some(age))) (id, 0, age)case (id: String, (Some(name), None)) (id, name, 0)case (id: String, (None, None)) (id, 0, 0)}}} mapValue算子:只对value进行处理key不变 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object Demo12MapValues {def main(args: Array[String]): Unit {val conf: SparkConf new SparkConf()conf.setMaster(local).setAppName(mapValues)val sc: SparkContext new SparkContext()//使用mapval ageRDD: RDD[(String, Int)] sc.makeRDD(List((001, 23), (002, 35), (004, 19)))val linesRDD: RDD[(String, Int)] ageRDD.map {case (id: String, age: Int) (id, age 1)}/***mapValue:只对value进行处理key不变*///使用mapValueval mapValuesRDD: RDD[(String, Int)] ageRDD.mapValues(v v 1)}} sort算子:指定一个排序的列默认是升序,ascending是控制排序方式。 package com.zjlimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object Demo13Sort {def main(args: Array[String]): Unit {val conf: SparkConf new SparkConf()conf.setMaster(local).setAppName(sort)val sc: SparkContext new SparkContext(conf)val studentsRDD: RDD[String] sc.textFile(D:\\data\\data\\ideadata\\idea-project\\big_data\\data\\students.txt)/*** sortBy:指定一个排序的列默认是升序* ascending控制排序方式*/val sortByRDD: RDD[String] studentsRDD.sortBy(student {val age: Int student.split(,)(2).toIntage},false)val ageRDD: RDD[(String, String)] sc.makeRDD(List((001, 23), (002, 35), (004, 19)))val dataRDD: RDD[Int] sc.makeRDD(List(1, 2, 3, 4, 5, 6))val kvRDD: RDD[(Int, Int)] dataRDD.map(i (i, 1))kvRDD.foreach(println)/*** 通过key排序默认升序*/val sortByKeyRDD: RDD[(Int, Int)] kvRDD.sortByKey()sortByKeyRDD.foreach(println)}} .AGG算子 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object Demo14Agg {def main(args: Array[String]): Unit {val conf: SparkConf new SparkConf()conf.setMaster(local).setAppName(Agg)val sc: SparkContext new SparkContext(conf)val linesRDD: RDD[String] sc.textFile(D:\\data\\data\\ideadata\\idea-project\\big_data\\spark\\data\\words.txt)val wordsRDD: RDD[String] linesRDD.flatMap(i i.split(,))val mapRDD: RDD[(String, Int)] wordsRDD.map(i (i, 1))/*** reduceByKey:在map端进行预聚合聚合函数会应用在map端和reduce端聚合函数会应用在分区内的聚合和分区间的聚合*/val reduceByKeyRDD: RDD[(String, Int)] mapRDD.reduceByKey((x, y) x y)val aggRDD: RDD[(String, Int)] mapRDD.aggregateByKey(0)( //初始值(u: Int, i: Int) u i, //分区键的聚合函数map端的聚合函数(u1: Int, u2: Int) u1 u2 //分区间的聚合reduce的聚合函数)aggRDD.foreach(println)}} 求平均年龄案例使用aggregateByKey package com.zjlimport org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDDobject Demo15AggAvgAge {def main(args: Array[String]): Unit {val conf: SparkConf new SparkConf()conf.setMaster(local).setAppName(Agg)val sc: SparkContext new SparkContext(conf)val linesRDD: RDD[String] sc.textFile(D:\\data\\data\\ideadata\\idea-project\\big_data\\spark\\data\\students.txt)/*** 计算班级的平均年龄*///val studentsRDD: RDD[String] linesRDD.flatMap(i i.split(,))val classAndAge: RDD[(String,Double)] linesRDD.map(student {val split: Array[String] student.split(,)( split(4),split(2).toDouble)})classAndAge.foreach(println)/*** 使用groupbykey*/val groupByKeyRDD: RDD[(String, Iterable[Double])] classAndAge.groupByKey()val avgAgeRDD: RDD[(String,Double)] groupByKeyRDD.map({case (clazz: String, age: Iterable[Double]) val avgAge: Double age.sum / age.size(clazz, avgAge)})/*** 大数据计算中最耗时间的就是shuffleshuffle过程中数据是落地到磁盘中的。* aggregateByKey:会在map端做预聚合性能高* 1.初始值可以有多个* 2.map端的聚合函数* 3.reduce端的聚合函数*/val avgAge: RDD[(String, (Double, Int))] classAndAge.aggregateByKey((0.0, 0))((u:(Double,Int), age:Double) (u._1 age, u._2 1),//map端的聚合函数(u1:(Double,Int), u2:(Double,Int)) (u1._1 u2._1, u1._2 u2._2)//reduce端的聚合函数)avgAge.foreach(println)//计算平均年龄val avgAgeMapRDD: RDD[(String, Double)] avgAge.map({case (clazz: String, (totalAge: Double, sumPerpon: Int)) (clazz, totalAge / sumPerpon)})avgAgeMapRDD.foreach(println)while(true){}} }cartesian算子笛卡尔积很少使用。 package com.zjlimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object Demo16Cartesian {def main(args: Array[String]): Unit {val conf: SparkConf new SparkConf()conf.setMaster(local).setAppName(Agg)val sc: SparkContext new SparkContext(conf)val nameRDD: RDD[(String, String)] sc.makeRDD(List((001, 张三), (002, 李四), (003, 王五)))val ageRDD: RDD[(String, String)] sc.makeRDD(List((001, 23), (002, 35), (004, 19)))/*** 笛卡尔积*/val cartesianRDD: RDD[((String, String), (String, String))] nameRDD.cartesian(ageRDD)}} reduce算子全局聚合是个action算子。 package com.zjlimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object Demo17Reduce {def main(args: Array[String]): Unit {val conf: SparkConf new SparkConf()conf.setMaster(local).setAppName(Agg)val sc: SparkContext new SparkContext(conf)val LinesRDD: RDD[Int] sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 0))/*** sum:求和只能用于int或者double或者null类型的求和action算子*/val sumRDD: Double LinesRDD.sum()/*** reduce:全局聚合action算子* reduceByKey:通过key进行聚合*/val reducrRDD: Int LinesRDD.reduce((x, y) (x y))}} take算子取top值是一个action算子。如果是取第一条数据使用first。 package com.zjlimport org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDDobject Demo18Take {def main(args: Array[String]): Unit {val conf: SparkConf new SparkConf()conf.setMaster(local).setAppName(Agg)val sc: SparkContext new SparkContext(conf)val linesRDD: RDD[String] sc.textFile(D:\\data\\data\\ideadata\\idea-project\\big_data\\spark\\data\\words.txt)/*** take:取top是一个action算子*/val top100: Array[String] linesRDD.take(100)//获取第一条数据val first: String linesRDD.first()}} 17.案例 统计总分大于年级平均分的学生 package com.zjlimport org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDDobject Demo19Student1 {def main(args: Array[String]): Unit {val conf: SparkConf new SparkConf()conf.setMaster(local).setAppName(Agg)val sc: SparkContext new SparkContext(conf)val linesRDD: RDD[String] sc.textFile(D:\\data\\data\\ideadata\\idea-project\\big_data\\spark\\data\\score.txt)/*** 统计总分大于年级平均分的学生*///1、计算学生的总分val total: RDD[(String, Double)] linesRDD.map(student {val splitRDD: Array[String] student.split(,)(splitRDD(0), splitRDD(2).toDouble)})total.foreach(println)val totalScore: RDD[(String, Double)] total.reduceByKey((x, y) (x y))totalScore.foreach(println)val totalAllRDD: RDD[Double] totalScore.map(kv kv._2)val avgScore: Double totalAllRDD.sum() / totalAllRDD.count()//取出总分大于平均分val endRDD: RDD[(String, Double)] totalScore.filter {case (id: String, score: Double) score avgScore}}} 3.action算子基本介绍 action算子在Spark中action 算子是一类触发 Spark 作业执行的操作。action 算子会导致计算结果被返回到驱动程序或者将计算结果保存到外部存储系统。与 transformation 算子不同action 算子会触发 Spark 作业的执行而不仅仅是定义计算逻辑。foreach遍历rddcount统计rdd的行数sum求和collect将rdd转换成scala的集合 object Demo7Action { //spark代码的入口def main(args: Array[String]): Unit {/*** spark任务的层级关系* application --- job --- stages ---task*/val conf: SparkConf {new SparkConf()}conf.setMaster(local).setAppName(action)val sc: SparkContext new SparkContext(conf)/*** action 算子 --触发任务执行每一个action算子都会触发一个job任务* 1、foreach遍历rdd* 2、saveAsTextFile保存数据* 3、count统计rdd的行数* 4、sum求和* 5、collect将rdd转换成scala的集合*/val studentRDD: RDD[String] sc.textFile(D:\\data\\data\\ideadata\\idea-project\\big_data\\spark\\data\\students.txt)//一次遍历一个数据studentRDD.foreach(println)//一次遍历一个分区studentRDD.foreachPartition((iter:Iterator[String]) println(iter.toList))//保存数据/*** saveAsTextFile将数据保存到hdfs中* 1、输出的目录不能存在* 2、rdd一个分区对应一个文件*/studentRDD.saveAsTextFile(D:\\data\\data\\ideadata\\idea-project\\big_data\\spark\\data)//统计行数val count: Long studentRDD.count()println(sstudentRDD的行数$count)//将rdd的数据拉取到内存中如果数据量很大会出现内存溢出val studentArr: Array[String] studentRDD.collect()}}
http://www.zqtcl.cn/news/70650/

相关文章:

  • 网站ui设计基础创业先做网站
  • 深圳网站建设的公司招聘现在的报税网站怎么做更正申报
  • 网站定制公司排行榜网站域名做链接怎么做
  • 网站建设廴金手指花总壹柒单页网站cms
  • 酒店建设网站的优势有哪些上海正规网站制作价格
  • 怀柔石家庄网站建设有哪些程序做的网站
  • 网站 方案黄骅市原来叫什么名字
  • 永仁县工程建设信息网站天津网站建设制作设计
  • 推广自己的网站需要怎么做网上建设银行网站首页
  • 免费个人网站怎么建立区总工会网站建设流程
  • 外贸机械网站杭州软装公司排名前十强
  • 网站拉圈圈接口怎么做西安短视频代运营
  • 网站源代码下载工具php网站开发培训
  • 吉林有做网站的吗域名解析错误怎么解决
  • 做外贸主要在那些网站找单凡科网可以免费做网站吗
  • 自助手机网站建站软件深圳通公司网站
  • 用dw做的网站怎么上线网站如何转做app
  • 怎么用自己电脑做服务器发布网站吗中国icp备案网站
  • 建设工程平台网站网站系统设计方案
  • 网站建设的开题报告如何做微信朋友圈网站
  • 网站开发使用的技术有哪些找国外人做网站
  • 可以免费发帖的网站织梦 网站首页
  • 百度云做.net网站做cpc不做网站可以吗
  • flash网站制作教程怎么找到网站的空间服务商
  • 抚州建设网站邢台网站制作哪家好
  • 网站后台怎么上传文章wordpress4.7.4漏洞
  • 简介网站建设流程百度搜不到网站
  • 西餐甜点网站建设做网站需要什么技术人员
  • 英孚做网络作业的网站深圳龙岗区地图全图
  • 大型网站开发教程wordpress连接oss