当前位置: 首页 > news >正文

qq免费建网站织梦网站如何上传

qq免费建网站,织梦网站如何上传,微信小程序页面模板,郑州核酸vip服务转自#xff1a; https://juejin.im/post/6844903826953076750 1.RDD 概述 1.1 什么是 RDD ? RDD(Resilient Distributed Dataset) 叫着 弹性分布式数据集 #xff0c;是Spark 中最基本的抽象#xff0c;它代表一个不可变、可分区、里面元素可以并行计算的集合。 RDD …转自  https://juejin.im/post/6844903826953076750  1.RDD 概述 1.1 什么是 RDD ? RDD(Resilient Distributed Dataset) 叫着 弹性分布式数据集 是Spark 中最基本的抽象它代表一个不可变、可分区、里面元素可以并行计算的集合。 RDD 具有数据流模型特点自动容错、位置感知性调度和可伸缩。 RDD 允许用户在执行多个查询时显示地将工作集缓存在内存中后续的查询能够重用工作集这将会极大的提升查询的效率。 我们可以认为 RDD 就是一个代理我们操作这个代理就像操作本地集合一样不需去关心任务调度、容错等问题。 1.2 RDD 的属性 在 RDD 源码中这样来描述 RDD * - A list of partitions * - A function for computing each split * - A list of dependencies on other RDDs * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)复制代码 一组分片Partition即数据集的基本组成单位。 对于RDD来说每个分片都会被一个计算任务处理并决定并行计算的粒度。用户可以在创建RDD 的时候指定RDD的分片个数如果没有指定那么就会采用默认值。默认值就是程序所分配到的CPU Cores 的数目对于RDD来说每个分片都会被一个计算任务处理并决定并行计算的粒度。用户可以在创建RDD 的时候指定RDD的分片个数如果没有指定那么就会采用默认值。默认值就是程序所分配到的CPU Cores 的数目RDD 之间互相存在依赖关系。 RDD 的每次转换都会生成一个新的 RDD ,所以 RDD 之前就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时Spark 可以通过这个依赖关系重新计算丢失部分的分区数据而不是对 RDD 的所有分区进行重新计算。一个Partitioner 即 RDD 的分片函数 。当前Spark 中实现了两种类型的分片函数一个是基于哈希的 HashPartitioner 另外一个是基于范围的 RangePartitioner。只有对于key-value的RDD ,才会有 Partitioner,非 key-value 的RDD 的 Partitioner 的值是None。Partitioner 函数不但决定了RDD 本身的分片数量也决定了 Parent RDD Shuffle 输出时的分片数量。一个列表存储存取每个Partition 的优先位置preferred location。 对于一个HDFS 文件来说这个列表保存的就是每个 Partition 所在的块位置。安装“移动数据不如移动计算”的理念Spark 在进行任务调度的时候会尽可能地将计算任务分配到其所要处理数据块的存储位置。 2 创建 RDD 2.1 由一个存在的 Scala 集合进行创建 #通过并行化scala集合创建RDD一般在测试的时候使用 scala var rdd sc.parallelize(List(1,2,3,4,5,6,7,8,9)) rdd: org.apache.spark.rdd.RDD[Int] ParallelCollectionRDD[0] at parallelize at console:24 复制代码 2.2 由外部的存储系统的数据集创建包括本地的文件系统还有所有 Hadoop 支持的数据集比如 HDFS、Cassandra、Hbase var rdd1 sc.textFile(/root/words.txt) var rdd2 sc.textFile(hdfs:192.168.80.131:9000/words.text) 复制代码 2.3 调用一个已经存在了的RDD 的 Transformation会生成一个新的 RDD。 3 RDD 的编程 API 3.1 Transformation 这种 RDD 中的所有转换都是延迟加载的也就是说他们并不会直接就计算结果。相反的他们只是记住这些应用到基础数据集例如一个文件上的转换动作。只有当发生一个返回结果的 Driver 的动作时这些操作才会真正的运行。这种设计会让Spark 更加有效率的运行。 常用的 Transformation 操作 转换含义map(func)返回一个新的RDD该RDD由每一个输入元素经过func函数转换后组成filter(func)返回一个新的RDD该RDD由经过func函数计算后返回值为true的输入元素组成flatMap(func)类似于map但是每一个输入元素可以被映射为0或多个输出元素所以func应该返回一个序列而不是单一元素mapPartitions(func)类似于map但独立地在RDD的每一个分片上运行因此在类型为T的RDD上运行时func的函数类型必须是Iterator[T] Iterator[U]mapPartitionsWithIndex(func)类似于mapPartitions但func带有一个整数参数表示分片的索引值因此在类型为T的RDD上运行时func的函数类型必须是(Int, Interator[T]) Iterator[U]sample(withReplacement, fraction, seed)根据fraction指定的比例对数据进行采样可以选择是否使用随机数进行替换seed用于指定随机数生成器种子union(otherDataset)对源RDD和参数RDD求并集后返回一个新的RDDintersection(otherDataset)对源RDD和参数RDD求交集后返回一个新的RDDdistinct([numTasks]))对源RDD进行去重后返回一个新的RDDgroupByKey([numTasks])在一个(K,V)的RDD上调用返回一个(K, Iterator[V])的RDDreduceByKey(func, [numTasks])在一个(K,V)的RDD上调用返回一个(K,V)的RDD使用指定的reduce函数将相同key的值聚合到一起与groupByKey类似reduce任务的个数可以通过第二个可选的参数来设置aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])先按分区聚合 再总的聚合 每次要跟初始值交流 例如aggregateByKey(0)(,) 对k/y的RDD进行操作sortByKey([ascending], [numTasks])在一个(K,V)的RDD上调用K必须实现Ordered接口返回一个按照key进行排序的(K,V)的RDDsortBy(func,[ascending], [numTasks])与sortByKey类似但是更灵活join(otherDataset, [numTasks])在类型为(K,V)和(K,W)的RDD上调用返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDDcogroup(otherDataset, [numTasks])在类型为(K,V)和(K,W)的RDD上调用返回一个(K,(Iterable,Iterable))类型的RDDcartesian(otherDataset)笛卡尔积pipe(command, [envVars])调用外部程序coalesce(numPartitions)重新分区 第一个参数是要分多少区第二个参数是否shuffle 默认false 少分区变多分区 true 多分区变少分区 falserepartition(numPartitions)重新分区 必须shuffle 参数是要分多少区 少变多repartitionAndSortWithinPartitions(partitioner)重新分区排序 比先分区再排序效率高 对K/V的RDD进行操作 3.2 Action 触发代码的运行操作我们一个Spark 应用至少需要一个 Action 操作。 动作含义reduce(func)通过func函数聚集RDD中的所有元素这个功能必须是课交换且可并联的collect()在驱动程序中以数组的形式返回数据集的所有元素count()返回RDD的元素个数first()返回RDD的第一个元素类似于take(1)take(n)返回一个由数据集的前n个元素组成的数组takeSample(withReplacement,num, [seed])返回一个数组该数组由从数据集中随机采样的num个元素组成可以选择是否用随机数替换不足的部分seed用于指定随机数生成器种子takeOrdered(n, [ordering]) saveAsTextFile(path)将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统对于每个元素Spark将会调用toString方法将它装换为文件中的文本saveAsSequenceFile(path)将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下可以使HDFS或者其他Hadoop支持的文件系统。saveAsObjectFile(path) countByKey()针对(K,V)类型的RDD返回一个(K,Int)的map表示每一个key对应的元素个数。foreach(func)在数据集的每一个元素上运行函数func进行更新。foreachPartition(func)在每个分区上运行函数 func 3.3 Spark WordCount 代码示例 执行流程图 pom.xml 依赖 !-- 导入scala的依赖 -- dependencygroupIdorg.scala-lang/groupIdartifactIdscala-library/artifactIdversion2.2.0/version /dependency !-- 导入spark的依赖 -- dependencygroupIdorg.apache.spark/groupIdartifactIdspark-core_2.11/artifactIdversion2.2.0/version /dependency!-- 指定hadoop-client API的版本 -- dependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-client/artifactIdversion2.6.0/version /dependency 复制代码 scala 版本代码实现 package com.zhouq.sparkimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}/*** scala 版本实现 wc**/ object ScalaWordCount {def main(args: Array[String]): Unit {//这行代码是因为我在windows 上直接跑需要去读取 hadoop 上的文件设置我的用户名。如果是linux 环境可以不设置。视情况而定System.setProperty(HADOOP_USER_NAME, root)//创建spark 配置,设置应用程序名字 // val conf new SparkConf().setAppName(scalaWordCount)val conf new SparkConf().setAppName(scalaWordCount).setMaster(local[4])// conf.set(spark.testing.memory,102457600)//创建spark 执行的入口val sc new SparkContext(conf)//指定以后从哪里读取数据创建RDD (弹性分布式数据集)//取到一行数据val lines: RDD[String] sc.textFile(args(0))//切分压平val words: RDD[String] lines.flatMap(_.split( ))//按单词和一组合val wordAndOne: RDD[(String, Int)] words.map((_, 1))//按key 进行聚合val reduced: RDD[(String, Int)] wordAndOne.reduceByKey(_ _)// 排序 false 表示倒序val sorted reduced.sortBy(_._2, false)//将结果保存到hdfs中sorted.saveAsTextFile(args(1))//释放资源sc.stop()} } 复制代码 Java7 版本 package com.zhouq.spark;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator;/** * Java 版WordCount */ public class JavaWordCount {public static void main(String[] args) {SparkConf conf new SparkConf().setAppName(JavaWordCount);//创建SparkContextJavaSparkContext jsc new JavaSparkContext(conf);//指定读取数据的位置JavaRDDString lines jsc.textFile(args[0]);//切分压平JavaRDDString words lines.flatMap(new FlatMapFunctionString, String() {Overridepublic IteratorString call(String line) throws Exception{return Arrays.asList(line.split( )).iterator();}});//将单词进行组合 (a,1),(b,1),(c,1),(a,1)JavaPairRDDString, Integer wordAndOne words.mapToPair(new PairFunctionString, String, Integer() {Overridepublic Tuple2String, Integer call(String tp) throws Exception {return new Tuple2(tp, 1);}});//先交换再排序,因为 只有groupByKey 方法JavaPairRDDInteger, String swaped wordAndOne.mapToPair(new PairFunctionTuple2String, Integer, Integer, String() {Overridepublic Tuple2Integer, String call(Tuple2String, Integer tp) throws Exception { // return new Tuple2(tp._2, tp._1);return tp.swap();}});//排序JavaPairRDDInteger, String sorted swaped.sortByKey(false);//再次交换顺序JavaPairRDDString, Integer result sorted.mapToPair(new PairFunctionTuple2Integer, String, String, Integer() {Overridepublic Tuple2String, Integer call(Tuple2Integer, String tp) throws Exception {return tp.swap();}});//输出到hdfsresult.saveAsTextFile(args[1]);jsc.stop();} } 复制代码 Java8 版本 package com.zhouq.spark;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.Arrays;/** * Java Lambda 表达式版本的 WordCount */ public class JavaLambdaWordCount {public static void main(String[] args) {SparkConf conf new SparkConf().setAppName(JavaWordCount);//创建SparkContextJavaSparkContext jsc new JavaSparkContext(conf);//指定读取数据的位置JavaRDDString lines jsc.textFile(args[0]);//切分压平 // lines.flatMap(line - Arrays.asList(line.split( )).iterator());JavaRDDString words lines.flatMap((FlatMapFunctionString, String) line - Arrays.asList(line.split( )).iterator());//将单词进行组合 (a,1),(b,1),(c,1),(a,1) // words.mapToPair(tp - new Tuple2(tp,1));JavaPairRDDString, Integer wordAndOne words.mapToPair((PairFunctionString, String, Integer) tp - new Tuple2(tp, 1));//先交换再排序,因为 只有groupByKey 方法 // swaped.mapToPair(tp - tp.swap());JavaPairRDDInteger, String swaped wordAndOne.mapToPair((PairFunctionTuple2String, Integer, Integer, String) tp - { // return new Tuple2(tp._2, tp._1);return tp.swap();});//排序JavaPairRDDInteger, String sorted swaped.sortByKey(false);//再次交换顺序 // sorted.mapToPair(tp - tp.swap());JavaPairRDDString, Integer result sorted.mapToPair((PairFunctionTuple2Integer, String, String, Integer) tp - tp.swap());//输出到hdfsresult.saveAsTextFile(args[1]);jsc.stop();} } 复制代码 4 RDD 的依赖关系 RDD 和它依赖的 父 RDD(可能有多个) 的关系有两种不同的类型即 窄依赖narrow dependency和宽依赖wide dependency。 窄依赖窄依赖指的是每一个父 RDD 的 Partition 最多被子 RDD 的一个分区使用。可以比喻为独生子女。 宽依赖宽依赖是多个字 RDD 的Partition 会依赖同一个父 RDD 的 Partition 5 RDD 的持久化 5.1 RDD 的 cache(持久化) Spark中最重要的功能之一是跨操作在内存中持久化或缓存数据集。当您持久保存RDD时每个节点都会存储它在内存中计算的任何分区并在该数据集或从中派生的数据集的其他操作中重用它们。这使得未来的行动更快通常超过10倍。缓存是迭代算法和快速交互使用的关键工具。 您可以使用persist()或cache()方法标记要保留的RDD 。第一次在动作中计算它它将保留在节点的内存中。Spark的缓存是容错的 - 如果丢失了RDD的任何分区它将使用最初创建它的转换自动重新计算。 5.2 什么时候我们需要持久化 要求的计算速度快集群的资源要足够大重要: cache 的数据会多次触发Action建议先进行数据过滤,然后将缩小范围后的数据再cache 到内存中. 5.3 如何使用 使用 rdd.persist()或者rdd.cache() val lines: RDD[String] sc.textFile(hdfs://xxx/user/accrss) //使用cache 方法来缓存数据到内存 val cache lines.cache() //注意查看下面两次count 的时间 cached.count cached.count复制代码 5.4 数据缓存的存储级别 StorageLevel 我们在 StorageLevel.scala 源码中可以看到 val NONE new StorageLevel(false, false, false, false) val DISK_ONLY new StorageLevel(true, false, false, false) val DISK_ONLY_2 new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 new StorageLevel(true, true, false, false, 2) val OFF_HEAP new StorageLevel(true, true, true, false, 1) 复制代码 解释一下各个参数的意思 第一个参数表示: 放到磁盘 第二个参数表示: 放到内存 第三个参数表示: 磁盘中的数据是否以Java 对象的方式保存,true 表示是, false表示以序列化的方式存放 第四个参数表示: 内存中的数据是否以Java 对象的方式保存,true 表示是, false表示以序列化的方式存放 第五个参数表示: 存放几份数据(目的是为了怕executor 出现故障导致分区数据丢失,当重新分配任务时,去另外的机器读取备份数据进行重新计算) OFF_HEAP : 堆外内存,以序列化的格式存储RDD到Tachyon(一个分布式内存存储系统)中 5.5 如何选择存储级别 Spark的多个存储级别意味着在内存利用率和cpu利用效率间的不同权衡。我们推荐通过下面的过程选择一个合适的存储级别 如果你的RDD适合默认的存储级别MEMORY_ONLY就选择默认的存储级别。因为这是cpu利用率最高的选项会使RDD上的操作尽可能的快。如果不适合用默认的级别选择MEMORY_ONLY_SER。选择一个更快的序列化库提高对象的空间使用率但是仍能够相当快的访问。除非函数计算RDD的花费较大或者它们需要过滤大量的数据不要将RDD存储到磁盘上否则重复计算一个分区就会和重磁盘上读取数据一样慢。如果你希望更快的错误恢复可以利用重复(replicated)存储级别。所有的存储级别都可以通过重复计算丢失的数据来支持完整的容错但是重复的数据能够使你在RDD上继续运行任务而不需要重复计算丢失的数据。在拥有大量内存的环境中或者多应用程序的环境中OFF_HEAP具有如下优势 它运行多个执行者共享Tachyon中相同的内存池它显著地减少垃圾回收的花费如果单个的执行者崩溃缓存的数据不会丢失 5.6 删除 cache Spark自动的监控每个节点缓存的使用情况利用最近最少使用原则删除老旧的数据。如果你想手动的删除RDD可以使用 RDD.unpersist()方法 5.7 RDD 的 checkpoint机制 我们除了把数据缓存到内存中,还可以把数据缓存到HDFS 中,保证中间数据不丢失. 什么时候我们需要做chechpoint 做复杂的迭代计算,要求保证数据安全,不丢失对速度要求不高(跟 cache 到内存进行对比)将中间结果保存到 hdfs 中 怎么做 checkpoint 首先设置 checkpoint 目录然后再执行计算逻辑再执行 checkpoint() 操作。 下面代码使用cache 和 checkpoint 两种方式实现计算每门课最受欢迎老师的 topN package com.zhouq.sparkimport java.net.URL import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}/*** 求每门课程最受欢迎老师TopN --2* -- 使用cache* -- 使用checkpoint 一般设置hdfs 目录*/ object GroupFavTeacher2_cache_checkpoint {def main(args: Array[String]): Unit {//前 Nval topN args(1).toInt//学科集合val subjects Array(bigdata, javaee, php)val conf new SparkConf().setAppName(FavTeacher).setMaster(local[4])//创建spark 执行入口val sc new SparkContext(conf)//checkpoint 得先设置 sc 的checkpoint 的dir // sc.setCheckpointDir(hdfs://hdfs://hadoop1:8020/user/root/ck20190215)//指定读取数据val lines: RDD[String] sc.textFile(args(0))val subjectTeacherAndOne: RDD[((String, String), Int)] lines.map(line {val index line.lastIndexOf(/)var teacher line.substring(index 1)var httpHost line.substring(0, index)var subject new URL(httpHost).getHost.split([.])(0)((subject, teacher), 1)})//将学科,老师联合当做keyval reduced: RDD[((String, String), Int)] subjectTeacherAndOne.reduceByKey(_ _)//第一种使用cache RDD 把数据缓存在内存中.标记为cache 的RDD 以后被反复使用,才使用cacheval cached: RDD[((String, String), Int)] reduced.cache()//第二种 使用checkpoint,得先设置 sc 的 checkpointDir // val cached: RDD[((String, String), Int)] reduced.checkpoint()/*** 先对学科进行过滤,然后再进行排序,调用RDD 的sortBy进行排序,避免scala 的排序当数据量大时,内存不足的情况.* take 是Action 操作,每次take 都会进行一次任务提交,具体查看日志打印情况*/for (sub - subjects) {//过滤出当前的学科val filtered: RDD[((String, String), Int)] cached.filter(_._1._1 sub)//使用RDD 的 sortBy ,内存磁盘排序,避免scala 中的排序因内存不足导致异常情况.//take 是Action 的,所以每次循环都会触发一次提交任务,祥见日志打印情况val favTeacher: Array[((String, String), Int)] filtered.sortBy(_._2, false).take(topN)println(favTeacher.toBuffer)}/*** 前面cache的数据已经计算完了后面还有很多其他的指标要计算* 后面计算的指标也要触发很多次Action最好将数据缓存到内存* 原来的数据占用着内存把原来的数据释放掉才能缓存新的数据*///把原来缓存的数据释放掉cached.unpersist(true)sc.stop()} } 复制代码 6 DAG 的生成 DAG(Directed Acyclic Graph)叫做有向无环图原始的RDD通过一系列的转换就就形成了DAG根据RDD之间的依赖关系的不同将DAG划分成不同的Stage对于窄依赖partition的转换处理在Stage中完成计算。对于宽依赖由于有Shuffle的存在只能在parent RDD处理完成后才能开始接下来的计算因此宽依赖是划分Stage的依据。 作者乔二爷 链接https://juejin.im/post/6844903826953076750 来源掘金 著作权归作者所有。商业转载请联系作者获得授权非商业转载请注明出处。
http://www.zqtcl.cn/news/96240/

相关文章:

  • 物流营销型网站案例分析渭南专业做网站
  • 织梦音乐网站接推广任务的平台
  • 网站建设设计团队平面设计主要做什么ui
  • 站长工具seo综合查询广告和京东一样的网站
  • 柳州做网站的企业做黑彩网站
  • 商城网站开发那家好网站建设知识平台
  • 莱州网站定制flash网站cms
  • 经营范围里的网站建设直播系统程序
  • 58同城类似的网站开发wordpress 地方生活
  • wordpress 七牛ossseo系统
  • 郑州做网站 熊掌号太原今天最新通知
  • 文章网站如何与压力做足球比赛直播间在线观看
  • 越秀网站建设优化呼和浩特住房和城乡建设部网站
  • 河南省路桥建设集团网站建网站公司郑州
  • 海沧做网站深圳外贸招聘
  • 网站建设置顶多少钱翻译成英文
  • 柳州正规网站制作公司哪家好怎么学好网站建设
  • 德宏做网站网站的设计思路范文
  • 自己的电脑做网站服务器深圳福田有什么好玩的地方
  • 奕腾网站建设上海十大装修公司排名榜单
  • 简述建设一个网站的基本步骤wordpress欢迎新会员
  • 国外医疗网站模板wordpress主题 科技
  • 海淀企业型网站建设wordpress自定义帖子链接
  • 自己的网站怎么优化做网页的
  • dw设计一个简单网站网页微信版文件传输
  • 网站地图怎么做XML宁波网站建设服务提供商
  • 中石化两学一做网站获取网站域名
  • 吉林长春火车站官网湖北葛洲坝建设工程网站
  • 重庆网站推广服务广告公司女员工深夜兼职
  • 网站的要素是什么wordpress框架解密_day3