手机怎么做钓鱼网站,有没有做家居服设计师看的网站,自己的公网ip可以做网站,中国工商注册网官网入口Overview
从高层次来看#xff0c;每个 Spark 应用程序都包含一个driver program#xff0c;该程序运行用户的main方法并在集群上执行各种并行操作。
Spark 提供的主要抽象是 resilient distributed dataset#xff08;RDD)#xff0c;它是跨集群节点分区的元素集合…Overview
从高层次来看每个 Spark 应用程序都包含一个driver program该程序运行用户的main方法并在集群上执行各种并行操作。
Spark 提供的主要抽象是 resilient distributed datasetRDD)它是跨集群节点分区的元素集合可以并行操作。RDD 是通过从 Hadoop 文件系统中的文件开始创建的。用户还可以要求 Spark 将 RDD 持久保存在内存中从而使其能够在并行操作中高效地重用。最后RDD 会自动从节点故障中恢复。
Spark 中提供的第二抽象是 shared variables 他可以用在并行操作中。默认情况下当 Spark 将函数作为一组任务task在不同节点上并行运行时它会将函数中使用的每个变量的副本携带给每个任务。有时变量需要在任务之间共享或者在driver program和任务之间共享。Spark 支持两种类型的 shared variables一是 broadcast variables 可用于在所有节点的内存中缓存一个值二是 accumulators
他是仅“added”的变量例如counters和sums。
Resilient Distributed Datasets (RDDs)
Spark围绕 RDD 的概念展开RDD是可以并行操作的元素的容错集合。有两种方法可以创建RDD在driver program中并行化现有集合或者引用外部存储系统中的数据集例如共享文件系统、HDFS、HBase或任何提供Hadoop InputFormat的数据源。
Parallelized Collections
并行化集合是通过在驱动程序中的现有集合上调用JavaSparkContext的并行化方法创建的。集合的元素被复制以形成可以并行操作的分布式数据集。例如以下是如何创建一个包含数字1到5的并行化集合
ListInteger data Arrays.asList(1, 2, 3, 4, 5);
JavaRDDInteger distData sc.parallelize(data);一旦创建分布式数据集distData就可以并行操作。并行集合的一个重要参数是将数据集划分为的分区数量。 Spark 将为集群的每个分区partition运行一个任务task任务将分配给节点执行。可以手动指定分区数或使用默认值。
RDD Operations
RDD 支持两种类型的操作
transformations从现有dataset创建新dataset。例如map 是一种transformations它将每个dataset每个元素传递给函数并返回表示结果的新 RDDactions在对dataset运行计算后将值返回给driver program。例如reduce 是一个使用某个函数聚合 RDD 的所有元素并将最终结果返回给driver program的操作
Spark中的所有transformations都是 lazy 的因为它们不会立即计算结果。相反它们只记住应用于某些基本 dataset例如文件的transformations。只有当操作需要将结果返回给driver program时transformations 才会被计算。这样的设计使得Spark能够更高效地运行。例如我们可以意识到通过map创建的dataset将在reduce中使用并且只将reduce的结果返回给driver program而不是更大的映射dataset。
默认情况下每次对transform后的RDD运行操作时都可能会被重新计算。但是您也可以使用持久或缓存方法将RDD持久化在内存中在这种情况下Spark将保留集群中的元素以便在您下次查询时更快地访问它。还支持在磁盘上持久化RDD或跨多个节点复制。如下图所示如果不cache/persist 任何内容那么每次您需要输出时当您调用诸如“count”之类的操作时都会从磁盘读取数据并完成操作。您可以在读取后进行缓存然后所有其他操作都会跳过读取并从缓存的数据开始。 为了说明 RDD 基础知识请看下面的简单程序
JavaRDDString lines sc.textFile(data.txt);
JavaRDDInteger lineLengths lines.map(s - s.length());
int totalLength lineLengths.reduce((a, b) - a b);第一行定义了来自外部文件的基本RDD。该数据集没有加载到内存中也没有以其他方式对其进行操作行只是指向文件的指针。第二行将lineLengths定义为map transformation的结果。同样由于 lazylineLengths不会立即计算。最后我们运行 reduce这是一个操作。此时Spark将计算分解为在不同机器上运行的任务每台机器都运行其 map 和本地数据的 reduce只将其答案返回给driver program。
如果 lineLengths 可能被再次使用可以增加下面代码
lineLengths.persist(StorageLevel.MEMORY_ONLY());在reduce之前这会导致lineLengths在第一次计算后保存在内存中。
Understanding closures
关于Spark更难的事情之一是理解跨集群执行代码时变量和方法的范围和生命周期。修改超出其范围的变量的RDD操作可能是混淆的常见来源。在下面的示例中我们将查看使用foreach增加计数器的代码但其他操作也可能出现类似的问题。
Example
考虑下面简单的计算将RDD元素sum。根据是否在同一JVM中执行它的行为可能会有所不同。一个常见的例子是在本地模式下运行Spark–masterlocal[n]与将Spark应用程序部署到集群例如通过Spark提交到YARN
int counter 0;
JavaRDDInteger rdd sc.parallelize(data);// Wrong: Dont do this!!
rdd.foreach(x - counter x);println(Counter value: counter);Local vs. cluster modes
上述代码的行为是未定义的可能无法按预期工作。为了执行作业Spark将RDD操作的处理分解为任务每个任务都由执行器执行。在执行之前Spark计算任务的 closures。closures 是执行器在RDD上执行计算在本例中为foreach()时必须可见的变量和方法。此closures 被序列化并发送给每个执行器。
发送给每个excutor的closure中的变量现在是副本因此当在foreach函数中引用counter时它不再是driver program上的count。driver program的内存中仍然有一个counter但不再对excutor可见
在本地模式下在某些情况下foreach 函数实际上将在与driver program相同的 JVM 中执行并且将引用相同的原始counter并且可能会更新它。
为了确保在这些场景中定义良好的行为应该使用 Accumulator.。Spark中的 Accumulator专门用于提供一种机制用于在集群中跨worker node 执行时安全地更新变量.
一般来说closure——像循环或本地定义的方法这样的构造——不应该被用来改变一些全局状态。Spark不定义或保证对从闭包外部引用的对象的更改行为。执行此操作的一些代码可能在本地模式下工作但这只是偶然的这样的代码在分布式模式下不会按预期运行。如果需要一些全局聚合请使用Accumulator。
Printing elements of an RDD
另一个常见的习惯用法是尝试使用rdd. foreachprintln或rdd.mapprintln打印出RDD的元素。在单台机器上这将生成预期的输出并打印RDD的所有元素。但是在集群模式下excutor 调用的stdout的输出现在写入执行程序的stdout而不是driver program上的stdout因此driver program上的stdout不会显示这些要在驱动程序上打印所有元素可以使用 collect() 首先将RDD带到 driver program 节点因此使用rdd.collect().foreach(println).
Working with Key-Value Pairs
虽然大多数Spark操作适用于包含任何类型对象的RDD但一些特殊操作仅适用于键值对的RDD。最常见的是分布式“shuffle”操作例如通过键对元素进行分组或聚合reduceByKey和sortByKey等。
Shuffle operations
Spark中的某些操作会触发称为shuffle的事件。shuffle是Spark重新分配数据的机制以便在分区之间以不同的方式分组。这通常涉及跨executor和机器复制数据这使得shuffle成为一项复杂且成本高昂的操作。
为了理解在shuffle过程中会发生什么我们可以考虑一个例子这个例子中有一个reduceByKey 操作它生成一个新的RDD其中一个键的所有值都被组合成一个tuple这个tuple就是键和对与该键相关的所有值执行一个reduce函数的结果。挑战在于单个键的所有值不一定都位于同一分区甚至同一台机器上但它们必须位于同一位置才能计算结果。
对于大多数操作Spark不会自动地将数据重新分布到特定的节点或分区以满足特定操作的需要。相反每个任务通常只处理一个分区内的数据。然而对于像reduceByKey这样的操作Spark需要将具有相同键(key)的所有值(value)聚合在一起以进行计算。这意味着如果这些值分布在不同的分区中Spark必须执行一个全局的重组操作all-to-all operation这个过程被称为shuffle。在shuffle过程中Spark会执行以下步骤
读取所有分区的数据以找出每个键对应的所有值。将具有相同键的值跨分区传输到相同的节点以便可以对它们进行聚合。在每个节点上对每个键的所有值进行最终的聚合计算得到每个键的最终结果。
可能导致随机播放的操作包括repartition operations like repartition and coalesce、‘ByKey operations (except for counting) like groupByKey and reduceByKey, and join operations like cogroup and join.
Performance Impact
Shuffle是一项昂贵的操作因为它涉及磁盘I/O、数据序列化和网络I/O。为了组织shuffle的数据Spark生成一组任务——map 任务来组织数据以及一组reduce任务来聚合数据。这个术语来自MapReduce与Spark的map和reduce操作没有直接关系。
从内部来看单个map任务的结果保存在内存中直到内存放不下。然后根据目标分区对它们进行排序并写入单个文件。在reduce端任务读取相关的排序block
某些shuffle操作可能会消耗大量的堆内存因为它们使用内存中的数据结构在传输数据之前或之后组织数据。具体来说reduceByKey和aggregateByKey在map端创建这些结构而’ByKey操作在reduce端生成这些结构。当数据在内存放不下时spark会将这些数据spill到磁盘从而导致磁盘IO的额外开销和垃圾回收机制的增加。
Shuffle还会在磁盘上生成大量中间文件。从Spark 1.3开始这些文件将被保留直到相应的RDD不再使用并被垃圾收集掉。这样做是为了在重新计算lineage时不需要重新创建随机文件。垃圾收集可能只在很长一段时间后发生如果应用程序保留对这些RDD的引用或者如果GC不经常启动。这意味着长时间运行的Spark作业可能会消耗大量磁盘空间。临时存储目录在配置Spark上下文时由park. local.dir配置参数指定。
RDD Persistence
Spark中最重要的功能之一是跨操作在内存中持久化或缓存dataset。当您持久化RDD时每个节点都将其计算的任何分区存储在内存中并在该dataset或从该dataset派生的dataset的其他操作中重用它们。这使得未来的操作更快通常超过10倍。缓存是迭代算法和快速交互使用的关键工具。
您可以使用RDD上的persist() cache()方法将其标记为持久化。第一次在操作中计算时它将保存在节点的内存中。Spark的缓存是容错的——如果RDD的任何分区丢失它将使用最初创建它的转换自动重新计算。
此外每个持久化的RDD都可以使用不同的storage level,来存储例如允许您将数据集持久化在磁盘上将其持久化在内存中但作为序列化的Java对象以节省空间跨节点复制它。cache()方法是使用默认存储级别的简写即StorageLevel.MEMORY_ONLY在内存中存储反序列化的对象。
Spark还会在shuffle操作中自动持久化一些中间数据例如reduceByKey即使用户没有调用persist。这样做是为了避免在shuffle期间节点发生故障时重新计算整个input。如果用户计划重用新生成的RDD我们仍然建议他们在生成的RDD上调用persist。
Which Storage Level to Choose?
Spark的存储级别旨在在内存使用和CPU效率之间提供不同的权衡。我们建议通过以下过程来选择一个
如果您的RDD适合默认存储级别MEMORY_ONLY请保持这样。这是CPU效率最高的选项允许RDD上的操作尽可能快地运行。如果没有请尝试使用MEMORY_ONLY_SER并选择一个快速序列化库以使对象更加节省空间但访问速度仍然相当快。Java和Scala不要spill到磁盘除非计算数据集的函数很重或者它们过滤了大量数据。否则重新计算分区可能与从磁盘读取分区一样快。如果您想要快速故障恢复例如如果使用Spark处理来自Web应用程序的请求请使用replicated 的存储级别。所有存储级别都通过重新计算丢失的数据提供完全的容错能力但复制的存储级别允许您继续在RDD上运行任务而无需等待重新计算丢失的分区。
Removing Data
Spark会自动监视每个节点上的缓存使用情况并以最近最少使用LRU的方式删除旧的数据分区。如果您想手动删除RDD而不是等待它从缓存中删除请使用RDD.unpersist()方法。请注意此方法默认不阻塞。要在释放资源之前阻塞请在调用此方法时指定blockingtrue。
Shared Variables
通常当传递给Spark操作如map或reduce的函数在远程集群节点上执行时它会在函数中使用的所有变量的单独副本上工作。这些变量被复制到每台机器上并且远程机器上的变量的更新不会传播回driver program。跨任务支持通用的读写共享变量将是低效的。然而Spark确实为两种常见的使用模式提供了两种有限类型的共享变量broadcast variables and accumulators.
Broadcast Variables
广播变量允许程序员将只读变量缓存在每台机器上而不是将其副本与task一起发送。例如它们可以用来以有效的方式为每个节点提供大型输入数据集的副本减少了数据传输的开销从task粒度下降到节点粒度。Spark还尝试使用有效的广播算法来分发广播变量以降低通信成本。
Spark action通过一组stage执行由分布式“shuffle”操作分隔。Spark自动广播每个stage内task所需的公共数据。以这种方式广播的数据以序列化形式缓存并在运行每个任务之前进行反序列化。这意味着显式创建广播变量仅在跨多个stage的task需要相同数据或以反序列化形式缓存数据很重要时才有用。
广播变量是通过调用SparkContext.broadcast(v)从变量v创建的。广播变量是v的包装器可以通过调用value方法访问它的值。下面的代码显示了这一点
Broadcastint[] broadcastVar sc.broadcast(new int[] {1, 2, 3});broadcastVar.value();
// returns [1, 2, 3]创建广播变量后应该在集群上运行的任何函数中使用它而不是值v这样v就不会多次发送到节点。此外对象v在广播后不应该被修改以确保所有节点都获得相同的广播变量值例如如果变量稍后传送到新加入的节点。
要释放广播变量复制到执行器上的资源请调用.unpersist()。如果广播之后再次使用它将被重新广播。要永久释放广播变量使用的所有资源请调用.destroy()。之后广播变量就不能使用了。请注意这些方法默认情况下不会阻塞。要阻塞直到资源被释放请在调用它们时指定blockingtrue。
Accumulators
Accumulators是仅通过关联和交换运算“added”的变量因此可以有效地支持并行。它们可用于实现计数器如在 MapReduce 中或求和。 Spark 原生支持数字类型的累加器程序员可以添加对新类型的支持。
作为用户您可以创建命名或未命名的累加器。如下图所示修改该累加器的阶段将在Web UI中显示一个命名累加器在本例中为计数器。Spark在“任务”表中显示由任务修改的每个累加器的值。 然后在集群上运行的task可以使用add方法add到Accumulators。但是他们无法读取其值。只有driver program可以使用其value方法读取累加器的值。
LongAccumulator accum jsc.sc().longAccumulator();sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x - accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 saccum.value();
// returns 10对于仅在action中执行的Accumulators更新Spark保证每个任务对Accumulators的更新只会应用一次即重新启动的任务不会更新值。在transformations中用户应该知道如果重新执行任务或作业阶段每个任务的更新可能会应用不止一次。
累加器不会改变 Spark 的惰性求值模型。如果它们是在 RDD 的操作中更新的则只有当 RDD 作为action的一部分进行计算时它们的值才会更新。因此在像 map() 这样的惰性转换中进行累加器更新时不能保证执行。下面的代码片段演示了这个属性
LongAccumulator accum jsc.sc().longAccumulator();
data.map(x - { accum.add(x); return f(x); });
// Here, accum is still 0 because no actions have caused the map to be computed.reference
https://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds