当前位置: 首页 > news >正文

美发企业网站模板南开网站建设

美发企业网站模板,南开网站建设,东莞公司官网推广,做美足网站违法吗1 Transformation转换算子 1.1 Value类型 1#xff09;创建包名#xff1a;com.shangjack.value 1.1.1 map()映射 参数f是一个函数可以写作匿名子类#xff0c;它可以接收一个参数。当某个RDD执行map方法时#xff0c;会遍历该RDD中的每一个数据项#xff0c;并依次应用f函… 1 Transformation转换算子 1.1 Value类型 1创建包名com.shangjack.value 1.1.1 map()映射 参数f是一个函数可以写作匿名子类它可以接收一个参数。当某个RDD执行map方法时会遍历该RDD中的每一个数据项并依次应用f函数从而产生一个新的RDD。即这个新RDD中的每一个元素都是原来RDD中每一个元素依次应用f函数而得到的。 1具体实现 package com.shangjack.value; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; public class Test01_Map {     public static void main(String[] args) {         // 1.创建配置对象         SparkConf conf  new SparkConf().setMaster(local[*]).setAppName(sparkCore);         // 2. 创建sparkContext         JavaSparkContext sc  new JavaSparkContext(conf);         // 3. 编写代码         JavaRDDString lineRDD  sc.textFile(input/1.txt);         // 需求:每行结尾拼接||         // 两种写法  lambda表达式写法(匿名函数)          JavaRDDString mapRDD  lineRDD.map(s - s  ||);         // 匿名函数写法          JavaRDDString mapRDD1  lineRDD.map(new FunctionString, String() {             Override             public String call(String v1) throws Exception {                 return v1  ||;             }         });         for (String s : mapRDD.collect()) {             System.out.println(s);         }         // 输出数据的函数写法         mapRDD1.collect().forEach(a - System.out.println(a));         mapRDD1.collect().forEach(System.out::println);         // 4. 关闭sc         sc.stop();     } } 1.1.2 flatMap()扁平化 1功能说明 与map操作类似将RDD中的每一个元素通过应用f函数依次转换为新的元素并封装到RDD中。 区别在flatMap操作中f函数的返回值是一个集合并且会将每一个该集合中的元素拆分出来放到新的RDD中。 2需求说明创建一个集合集合里面存储的还是子集合把所有子集合中数据取出放入到一个大的集合中。 4具体实现 package com.shangjack.value; import org.apache.commons.collections.ListUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; public class Test02_FlatMap {     public static void main(String[] args) {         // 1.创建配置对象         SparkConf conf  new SparkConf().setMaster(local[*]).setAppName(sparkCore);         // 2. 创建sparkContext         JavaSparkContext sc  new JavaSparkContext(conf);         // 3. 编写代码         ArrayListListString arrayLists  new ArrayList();         arrayLists.add(Arrays.asList(1,2,3));         arrayLists.add(Arrays.asList(4,5,6));         JavaRDDListString listJavaRDD  sc.parallelize(arrayLists,2);         // 对于集合嵌套的RDD 可以将元素打散         // 泛型为打散之后的元素类型         JavaRDDString stringJavaRDD  listJavaRDD.flatMap(new FlatMapFunctionListString, String() {             Override             public IteratorString call(ListString strings) throws Exception {                 return strings.iterator();             }         });         stringJavaRDD. collect().forEach(System.out::println);         // 通常情况下需要自己将元素转换为集合         JavaRDDString lineRDD  sc.textFile(input/2.txt);         JavaRDDString stringJavaRDD1  lineRDD.flatMap(new FlatMapFunctionString, String() {             Override             public IteratorString call(String s) throws Exception {                 String[] s1  s.split( );                 return Arrays.asList(s1).iterator();             }         });         stringJavaRDD1. collect().forEach(System.out::println);         // 4. 关闭sc         sc.stop();     } } 1.1.3 groupBy()分组 1功能说明分组按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。 2需求说明创建一个RDD按照元素模以2的值进行分组。 3具体实现 package com.shangjack.value; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import java.util.Arrays; public class Test03_GroupBy {     public static void main(String[] args) {         // 1.创建配置对象         SparkConf conf  new SparkConf().setMaster(local[*]).setAppName(sparkCore);         // 2. 创建sparkContext         JavaSparkContext sc  new JavaSparkContext(conf);         // 3. 编写代码         JavaRDDInteger integerJavaRDD  sc.parallelize(Arrays.asList(1, 2, 3, 4),2);         // 泛型为分组标记的类型         JavaPairRDDInteger, IterableInteger groupByRDD  integerJavaRDD.groupBy(new FunctionInteger, Integer() {             Override             public Integer call(Integer v1) throws Exception {                 return v1 % 2;             }         });         groupByRDD.collect().forEach(System.out::println);         // 类型可以任意修改         JavaPairRDDBoolean, IterableInteger groupByRDD1  integerJavaRDD.groupBy(new FunctionInteger, Boolean() {             Override             public Boolean call(Integer v1) throws Exception {                 return v1 % 2  0;             }         });         groupByRDD1. collect().forEach(System.out::println); Thread.sleep(600000);         // 4. 关闭sc         sc.stop();     } } groupBy会存在shuffle过程shuffle将不同的分区数据进行打乱重组的过程shuffle一定会落盘。可以在local模式下执行程序通过4040看效果。 1.1.4 filter()过滤 1功能说明 接收一个返回值为布尔类型的函数作为参数。当某个RDD调用filter方法时会对该RDD中每一个元素应用f函数如果返回值类型为true则该元素会被添加到新的RDD中。 2需求说明创建一个RDD过滤出对2取余等于0的数据 3代码实现 package com.shangjack.value; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import java.util.Arrays; public class Test04_Filter {     public static void main(String[] args) {         // 1.创建配置对象         SparkConf conf  new SparkConf().setMaster(local[*]).setAppName(sparkCore);         // 2. 创建sparkContext         JavaSparkContext sc  new JavaSparkContext(conf);         // 3. 编写代码         JavaRDDInteger integerJavaRDD  sc.parallelize(Arrays.asList(1, 2, 3, 4),2);         JavaRDDInteger filterRDD  integerJavaRDD.filter(new FunctionInteger, Boolean() {             Override             public Boolean call(Integer v1) throws Exception {                 return v1 % 2  0;             }         });         filterRDD. collect().forEach(System.out::println);         // 4. 关闭sc         sc.stop();     } } 1.1.5 distinct()去重 1功能说明对内部的元素去重并将去重后的元素放到新的RDD中。 2代码实现 package com.shangjack.value; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.util.Arrays; public class Test05_Distinct {     public static void main(String[] args) {         // 1.创建配置对象         SparkConf conf  new SparkConf().setMaster(local[*]).setAppName(sparkCore);         // 2. 创建sparkContext         JavaSparkContext sc  new JavaSparkContext(conf);         // 3. 编写代码         JavaRDDInteger integerJavaRDD  sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);         // 底层使用分布式分组去重  所有速度比较慢,但是不会OOM         JavaRDDInteger distinct  integerJavaRDD.distinct();         distinct. collect().forEach(System.out::println);         // 4. 关闭sc         sc.stop();     } } 注意distinct会存在shuffle过程。 1.1.6 sortBy()排序 1功能说明 该操作用于排序数据。在排序之前可以将数据通过f函数进行处理之后按照f函数处理的结果进行排序默认为正序排列。排序后新产生的RDD的分区数与原RDD的分区数一致。Spark的排序结果是全局有序。 2需求说明创建一个RDD按照数字大小分别实现正序和倒序排序 3代码实现 package com.shangjack.value; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import java.util.Arrays; public class Test6_SortBy {     public static void main(String[] args) {         // 1.创建配置对象         SparkConf conf  new SparkConf().setMaster(local[*]).setAppName(sparkCore);         // 2. 创建sparkContext         JavaSparkContext sc  new JavaSparkContext(conf);         // 3. 编写代码         JavaRDDInteger integerJavaRDD  sc.parallelize(Arrays.asList(5, 8, 1, 11, 20), 2);         // (1)泛型为以谁作为标准排序  (2) true为正序  (3) 排序之后的分区个数         JavaRDDInteger sortByRDD  integerJavaRDD.sortBy(new FunctionInteger, Integer() {             Override             public Integer call(Integer v1) throws Exception {                 return v1;             }         }, true, 2);         sortByRDD. collect().forEach(System.out::println);         // 4. 关闭sc         sc.stop();     } } 1.2 Key-Value类型 1创建包名com.shangjack.keyvalue 要想使用Key-Value类型的算子首先需要使用特定的方法转换为PairRDD package com.shangjack.keyValue; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.Arrays; public class Test01_pairRDD{     public static void main(String[] args) {         // 1.创建配置对象         SparkConf conf  new SparkConf().setMaster(local[*]).setAppName(sparkCore);         // 2. 创建sparkContext         JavaSparkContext sc  new JavaSparkContext(conf);         // 3. 编写代码         JavaRDDInteger integerJavaRDD  sc.parallelize(Arrays.asList(1, 2, 3, 4),2);         JavaPairRDDInteger, Integer pairRDD  integerJavaRDD.mapToPair(new PairFunctionInteger, Integer, Integer() {             Override             public Tuple2Integer, Integer call(Integer integer) throws Exception {                 return new Tuple2(integer, integer);             }         });         pairRDD. collect().forEach(System.out::println);         // 4. 关闭sc         sc.stop();     } } 1.2.1 mapValues()只对V进行操作 1功能说明针对于(K,V)形式的类型只对V进行操作 2需求说明创建一个pairRDD并将value添加字符串||| 4代码实现 package com.shangjack.keyValue; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import scala.Tuple2; import java.util.Arrays; public class Test02_MapValues {     public static void main(String[] args) {         // 1.创建配置对象         SparkConf conf  new SparkConf().setMaster(local[*]).setAppName(sparkCore);         // 2. 创建sparkContext         JavaSparkContext sc  new JavaSparkContext(conf);         // 3. 编写代码         JavaPairRDDString, String javaPairRDD  sc.parallelizePairs(Arrays.asList(new Tuple2(k, v), new Tuple2(k1, v1), new Tuple2(k2, v2)));         // 只修改value 不修改key         JavaPairRDDString, String mapValuesRDD  javaPairRDD.mapValues(new FunctionString, String() {             Override             public String call(String v1) throws Exception {                 return v1  |||;             }         });         mapValuesRDD. collect().forEach(System.out::println);         // 4. 关闭sc         sc.stop();     } } 1.2.2 groupByKey()按照K重新分组 1功能说明 groupByKey对每个key进行操作但只生成一个seq并不进行聚合。 该操作可以指定分区器或者分区数默认使用HashPartitioner 2需求说明统计单词出现次数 4代码实现 package com.shangjack.keyValue; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.Arrays; public class Test03_GroupByKey {     public static void main(String[] args) {         // 1.创建配置对象         SparkConf conf  new SparkConf().setMaster(local[*]).setAppName(sparkCore);         // 2. 创建sparkContext         JavaSparkContext sc  new JavaSparkContext(conf);         // 3. 编写代码         JavaRDDString integerJavaRDD  sc.parallelize(Arrays.asList(hi,hi,hello,spark ),2);         // 统计单词出现次数         JavaPairRDDString, Integer pairRDD  integerJavaRDD.mapToPair(new PairFunctionString, String, Integer() {             Override             public Tuple2String, Integer call(String s) throws Exception {                 return new Tuple2(s, 1);             }         });         // 聚合相同的key         JavaPairRDDString, IterableInteger groupByKeyRDD  pairRDD.groupByKey();         // 合并值         JavaPairRDDString, Integer result  groupByKeyRDD.mapValues(new FunctionIterableInteger, Integer() {             Override             public Integer call(IterableInteger v1) throws Exception {                 Integer sum  0;                 for (Integer integer : v1) {                     sum  integer;                 }                 return sum;             }         });         result. collect().forEach(System.out::println);         // 4. 关闭sc         sc.stop();     } }} 1.2.3 reduceByKey()按照K聚合V 1功能说明该操作可以将RDD[K,V]中的元素按照相同的K对V进行聚合。其存在多种重载形式还可以设置新RDD的分区数。 2需求说明统计单词出现次数 3代码实现 package com.shangjack.keyValue; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.Arrays; public class Test04_ReduceByKey {     public static void main(String[] args) {         // 1.创建配置对象         SparkConf conf  new SparkConf().setMaster(local[*]).setAppName(sparkCore);         // 2. 创建sparkContext         JavaSparkContext sc  new JavaSparkContext(conf);         // 3. 编写代码         JavaRDDString integerJavaRDD  sc.parallelize(Arrays.asList(hi,hi,hello,spark ),2);         // 统计单词出现次数         JavaPairRDDString, Integer pairRDD  integerJavaRDD.mapToPair(new PairFunctionString, String, Integer() {             Override             public Tuple2String, Integer call(String s) throws Exception {                 return new Tuple2(s, 1);             }         });         // 聚合相同的key         JavaPairRDDString, Integer result  pairRDD.reduceByKey(new Function2Integer, Integer, Integer() {             Override             public Integer call(Integer v1, Integer v2) throws Exception {                 return v1  v2;             }         });         result. collect().forEach(System.out::println);         // 4. 关闭sc         sc.stop();     } } 1.2.4 reduceByKey和groupByKey区别 1reduceByKey按照key进行聚合在shuffle之前有combine预聚合操作返回结果是RDD[K,V]。 2groupByKey按照key进行分组直接进行shuffle。 3开发指导在不影响业务逻辑的前提下优先选用reduceByKey。求和操作不影响业务逻辑求平均值影响业务逻辑。影响业务逻辑时建议先对数据类型进行转换再合并。 package com.shangjack.keyValue; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.Arrays; public class Test06_ReduceByKeyAvg {     public static void main(String[] args) throws InterruptedException {         // 1.创建配置对象         SparkConf conf  new SparkConf().setMaster(local[*]).setAppName(sparkCore);         // 2. 创建sparkContext         JavaSparkContext sc  new JavaSparkContext(conf);         // 3. 编写代码         JavaPairRDDString, Integer javaPairRDD  sc.parallelizePairs(Arrays.asList(new Tuple2(hi, 96), new Tuple2(hi, 97), new Tuple2(hello, 95), new Tuple2(hello, 195)));         // (hi,(96,1))         JavaPairRDDString, Tuple2Integer, Integer tuple2JavaPairRDD  javaPairRDD.mapValues(new FunctionInteger, Tuple2Integer, Integer() {             Override             public Tuple2Integer, Integer call(Integer v1) throws Exception {                 return new Tuple2(v1, 1);             }         });         // 聚合RDD         JavaPairRDDString, Tuple2Integer, Integer reduceRDD  tuple2JavaPairRDD.reduceByKey(new Function2Tuple2Integer, Integer, Tuple2Integer, Integer, Tuple2Integer, Integer() {             Override             public Tuple2Integer, Integer call(Tuple2Integer, Integer v1, Tuple2Integer, Integer v2) throws Exception {                 return new Tuple2(v1._1  v2._1, v1._2  v2._2);             }         });         // 相除         JavaPairRDDString, Double result  reduceRDD.mapValues(new FunctionTuple2Integer, Integer, Double() {             Override             public Double call(Tuple2Integer, Integer v1) throws Exception {                 return (new Double(v1._1) / v1._2);             }         });         result. collect().forEach(System.out::println);         // 4. 关闭sc         sc.stop();     } } 1.2.5 sortByKey()按照K进行排序 1功能说明 在一个(K,V)的RDD上调用K必须实现Ordered接口返回一个按照key进行排序的(K,V)的RDD。 2需求说明创建一个pairRDD按照key的正序和倒序进行排序 3代码实现 package com.shangjack.keyValue; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; import java.util.Arrays; public class Test05_SortByKey {     public static void main(String[] args) {         // 1.创建配置对象         SparkConf conf  new SparkConf().setMaster(local[*]).setAppName(sparkCore);         // 2. 创建sparkContext         JavaSparkContext sc  new JavaSparkContext(conf);         // 3. 编写代码         JavaPairRDDInteger, String javaPairRDD  sc.parallelizePairs(Arrays.asList(new Tuple2(4, a), new Tuple2(3, c), new Tuple2(2, d)));         // 填写布尔类型选择正序倒序         JavaPairRDDInteger, String pairRDD  javaPairRDD.sortByKey(false);         pairRDD. collect().forEach(System.out::println);         // 4. 关闭sc         sc.stop();     } } 2 Action行动算子 行动算子是触发了整个作业的执行。因为转换算子都是懒加载并不会立即执行。 1创建包名com.shangjack.action 2.1 collect()以数组的形式返回数据集 1功能说明在驱动程序中以数组Array的形式返回数据集的所有元素。 注意所有的数据都会被拉取到Driver端慎用。 2需求说明创建一个RDD并将RDD内容收集到Driver端打印 package com.shangjack.action; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.util.Arrays; import java.util.List; public class Test01_Collect {     public static void main(String[] args) {         // 1.创建配置对象         SparkConf conf  new SparkConf().setMaster(local[*]).setAppName(sparkCore);         // 2. 创建sparkContext         JavaSparkContext sc  new JavaSparkContext(conf);         // 3. 编写代码         JavaRDDInteger integerJavaRDD  sc.parallelize(Arrays.asList(1, 2, 3, 4),2);         ListInteger collect  integerJavaRDD.collect();         for (Integer integer : collect) {             System.out.println(integer);         }         // 4. 关闭sc         sc.stop();     } } 2.2 count()返回RDD中元素个数 1功能说明返回RDD中元素的个数 3需求说明创建一个RDD统计该RDD的条数 package com.shangjack.action; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.util.Arrays; public class Test02_Count {     public static void main(String[] args) {         // 1.创建配置对象         SparkConf conf  new SparkConf().setMaster(local[*]).setAppName(sparkCore);         // 2. 创建sparkContext         JavaSparkContext sc  new JavaSparkContext(conf);         // 3. 编写代码         JavaRDDInteger integerJavaRDD  sc.parallelize(Arrays.asList(1, 2, 3, 4),2);         long count  integerJavaRDD.count();         System.out.println(count);         // 4. 关闭sc         sc.stop();     } } 2.3 first()返回RDD中的第一个元素 1功能说明返回RDD中的第一个元素 2需求说明创建一个RDD返回该RDD中的第一个元素 package com.shangjack.action; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.util.Arrays; public class Test03_First {     public static void main(String[] args) {         // 1.创建配置对象         SparkConf conf  new SparkConf().setMaster(local[*]).setAppName(sparkCore);         // 2. 创建sparkContext         JavaSparkContext sc  new JavaSparkContext(conf);         // 3. 编写代码         JavaRDDInteger integerJavaRDD  sc.parallelize(Arrays.asList(1, 2, 3, 4),2);         Integer first  integerJavaRDD.first();         System.out.println(first);         // 4. 关闭sc         sc.stop();     } } 2.4 take()返回由RDD前n个元素组成的数组 1功能说明返回一个由RDD的前n个元素组成的数组 2需求说明创建一个RDD取出前两个元素 package com.shangjack.action; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.util.Arrays; import java.util.List; public class Test04_Take {     public static void main(String[] args) {         // 1.创建配置对象         SparkConf conf  new SparkConf().setMaster(local[*]).setAppName(sparkCore);         // 2. 创建sparkContext         JavaSparkContext sc  new JavaSparkContext(conf);         // 3. 编写代码         JavaRDDInteger integerJavaRDD  sc.parallelize(Arrays.asList(1, 2, 3, 4),2);         ListInteger list  integerJavaRDD.take(3);         list.forEach(System.out::println);         // 4. 关闭sc         sc.stop();     } } 2.5 countByKey()统计每种key的个数 1功能说明统计每种key的个数 2需求说明创建一个PairRDD统计每种key的个数 package com.shangjack.action; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; import java.util.Arrays; import java.util.Map; public class Test05_CountByKey {     public static void main(String[] args) {         // 1.创建配置对象         SparkConf conf  new SparkConf().setMaster(local[*]).setAppName(sparkCore);         // 2. 创建sparkContext         JavaSparkContext sc  new JavaSparkContext(conf);         // 3. 编写代码         JavaPairRDDString, Integer pairRDD  sc.parallelizePairs(Arrays.asList(new Tuple2(a, 8), new Tuple2(b, 8), new Tuple2(a, 8), new Tuple2(d, 8)));         MapString, Long map  pairRDD.countByKey();         System.out.println(map);         // 4. 关闭sc         sc.stop();     } } 2.6 save相关算子 1saveAsTextFile(path)保存成Text文件 功能说明将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统对于每个元素Spark将会调用toString方法将它装换为文件中的文本 2saveAsObjectFile(path) 序列化成对象保存到文件 功能说明用于将RDD中的元素序列化成对象存储到文件中。 3代码实现 package com.shangjack.action; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; import java.util.Arrays; public class Test06_Save {     public static void main(String[] args) {         // 1.创建配置对象         SparkConf conf  new SparkConf().setMaster(local[*]).setAppName(sparkCore);         // 2. 创建sparkContext         JavaSparkContext sc  new JavaSparkContext(conf);         // 3. 编写代码         JavaRDDInteger integerJavaRDD  sc.parallelize(Arrays.asList(1, 2, 3, 4),2);         integerJavaRDD.saveAsTextFile(output);         integerJavaRDD.saveAsObjectFile(output1);         // 4. 关闭sc         sc.stop();     } } 2.7 foreach()遍历RDD中每一个元素 2需求说明创建一个RDD对每个元素进行打印 package com.shangjack.action; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.VoidFunction; import java.util.Arrays; public class Test07_Foreach {     public static void main(String[] args) {         // 1.创建配置对象         SparkConf conf  new SparkConf().setMaster(local[*]).setAppName(sparkCore);         // 2. 创建sparkContext         JavaSparkContext sc  new JavaSparkContext(conf);         // 3. 编写代码         JavaRDDInteger integerJavaRDD  sc.parallelize(Arrays.asList(1, 2, 3, 4),4);         integerJavaRDD.foreach(new VoidFunctionInteger() {             Override             public void call(Integer integer) throws Exception {                 System.out.println(integer);             }         });         // 4. 关闭sc         sc.stop();     } } 2.8 foreachPartition ()遍历RDD中每一个分区 package com.shangjack.spark.action; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.VoidFunction; import java.util.Arrays; import java.util.Iterator; public class Test08_ForeachPartition {     public static void main(String[] args) {         // 1. 创建配置对象         SparkConf conf  new SparkConf().setAppName(core).setMaster(local[*]);         // 2. 创建sc环境         JavaSparkContext sc  new JavaSparkContext(conf);         // 3. 编写代码         JavaRDDInteger parallelize  sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);         // 多线程一起计算   分区间无序  单个分区有序         parallelize.foreachPartition(new VoidFunctionIteratorInteger() {             Override             public void call(IteratorInteger integerIterator) throws Exception {                 // 一次处理一个分区的数据                 while (integerIterator.hasNext()) {                     Integer next  integerIterator.next();                     System.out.println(next);                 }             }         });         // 4. 关闭sc         sc.stop();     } }
http://www.zqtcl.cn/news/99617/

相关文章:

  • 龙华建站公司seo研究中心怎么样
  • 网站的大图标怎么做项目网站
  • 南京网站设计机构wap网站设计方案
  • 建站点怎么做网站wordpress 重写规则
  • 泰州做网站优化服装网站建设方案ppt
  • wordpress怎么设计网站微商城科技
  • 昆山营销型网站建设旅游网页制作模板教程
  • 企业网站开发时间淘客网站开发源代码
  • 传奇世界新开服网站html静态网页模板代码
  • 门户网站app开发网络服务提供者发现未成年通过网络发布
  • 编辑网站在线注册系统行业网站制作
  • 国外建设网站的软件西宁设计网站建设
  • 云服务器网站配置在线设计免费logo
  • 怎么在手机上做企业网站北京大学两学一做网站
  • 社区网站建设方案书服务型网站建设的主题
  • 做淘推广的网站如何制作表白链接
  • 外贸网站代码中国建设银行招聘网站甘肃分行
  • 免费ai设计logo网站西安网站开发外包公司有
  • 2017优秀网站设计欣赏如何做建议的网站
  • 获取网站访问qq怎么做链接
  • 最简单的网站建设中英文自助网站建设
  • vps 做网站品牌网站建设可信大蝌蚪
  • 怎样在百度建网站怎么建设课题网站
  • 广西网站设计欣赏企业网站建设的管理制度
  • 网站建设与管理提纲免费编程教学视频
  • 做效果图的网站有哪些推广网站详细教程
  • 2.0网站线上建设什么意思WordPress怎么设置分类
  • 湖南众诚建设 官方网站开发者模式是干什么的
  • o2o平台都有哪些网站公司莱芜网站优化方案
  • 个人或主题网站建设 实验体会网站开发可退税