十大购物网站排名,成都网站开发工作室,家用电脑可以做网站吗,怎么建设课题网站看longAccumulator()方法源码里是val acc new LongAccumulator然后用register(acc)在Spark中注册了累加器#xff0c;进入ctrl鼠标左键进入LongAccumulator#xff0c;可以看到继承了AccumulatorV2[jl.Long, jl.Long],根据LongAccumulator来实现自定义累加器
实现类
//1.继…看longAccumulator()方法源码里是val acc new LongAccumulator然后用register(acc)在Spark中注册了累加器进入ctrl鼠标左键进入LongAccumulator可以看到继承了AccumulatorV2[jl.Long, jl.Long],根据LongAccumulator来实现自定义累加器
实现类
//1.继承父类AccumulatorV2[INOUT]INOUT是Driver发到Executor的类型与Executor返回给Driver的类型
//2.实现抽象方法
//3.创建累加器
class WordAccumulator extends AccumulatorV2[String,util.ArrayList[String]] {val list new util.ArrayList[String]()//当前的累加器是不是初始化状态(这里是判断创建的集合是不是空)override def isZero: Boolean {list.isEmpty}//复制累加器对象override def copy(): AccumulatorV2[String, util.ArrayList[String]] {new WordAccumulator}//重置累加器对象(这里把集合清空即可)override def reset(): Unit {list.clear()}//向累加器中增加数据override def add(v: String): Unit {if (v.contains(h)){list.add(v)}}//合并累加器不同executor返回会有个合并的过程override def merge(other: AccumulatorV2[String, util.ArrayList[String]]): Unit {list.addAll(other.value)}//获取累加器的结果override def value: util.ArrayList[String] list
}然后是main函数 def main(args: Array[String]): Unit {val conf new SparkConf().setAppName(CheckPoint).setMaster(local)//创建上下文对象val sc new SparkContext(conf)val dataRDD:RDD[String] sc.makeRDD(List(chun1,chun2,chun3,chun4),2)// TODO 创建累加器val wordAccumulator new WordAccumulator()// TODO 注册累加器sc.register(wordAccumulator)dataRDD.foreach{case word{//TODO 执行累加器的累加功能wordAccumulator.add(word)}}// TODO 获取累加结果println(wordAccumulator.value)}结果[chun1, chun2, chun3, chun4]完整代码
package date_9_23import java.utilimport org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}/*** 自定义累加器*/
object Spark4_LongAccumulator {def main(args: Array[String]): Unit {val conf new SparkConf().setAppName(CheckPoint).setMaster(local)//创建上下文对象val sc new SparkContext(conf)val dataRDD:RDD[String] sc.makeRDD(List(chun1,chun2,chun3,chun4),2)// TODO 创建累加器val wordAccumulator new WordAccumulator()// TODO 注册累加器sc.register(wordAccumulator)dataRDD.foreach{case word{//TODO 执行累加器的累加功能wordAccumulator.add(word)}}// TODO 获取累加结果println(wordAccumulator.value)}
}//声明累加器
//1.继承父类AccumulatorV2[INOUT]INOUT是Driver发到Executor的类型与Executor返回给Driver的类型
//2.实现抽象方法
//3.创建累加器
class WordAccumulator extends AccumulatorV2[String,util.ArrayList[String]] {val list new util.ArrayList[String]()//当前的累加器是不是初始化状态(这里是判断创建的集合是不是空)override def isZero: Boolean {list.isEmpty}//复制累加器对象override def copy(): AccumulatorV2[String, util.ArrayList[String]] {new WordAccumulator}//重置累加器对象(这里把集合清空即可)override def reset(): Unit {list.clear()}//向累加器中增加数据override def add(v: String): Unit {if (v.contains(h)){list.add(v)}}//合并累加器不同executor返回会有个合并的过程override def merge(other: AccumulatorV2[String, util.ArrayList[String]]): Unit {list.addAll(other.value)}//获取累加器的结果override def value: util.ArrayList[String] list
}