当前位置: 首页 > news >正文

营销网站定位湖北专业网站建设市面价

营销网站定位,湖北专业网站建设市面价,网站运营怎么自学,温州网站设计服务商目录 1. 代码功能概述 2. 代码逐段解析 主程序逻辑 自定义累加器 MyAccumulator 3. Spark累加器原理 累加器的作用 AccumulatorV2 vs AccumulatorV1 累加器执行流程 4. 代码扩展与优化建议 支持多词统计 线程安全优化 使用内置累加器 5. Spark累加器的适用场景 6…目录 1. 代码功能概述 2. 代码逐段解析 主程序逻辑 自定义累加器 MyAccumulator 3. Spark累加器原理 累加器的作用 AccumulatorV2 vs AccumulatorV1 累加器执行流程 4. 代码扩展与优化建议 支持多词统计 线程安全优化 使用内置累加器 5. Spark累加器的适用场景 6. 总结 package core.bcimport org.apache.spark.util.AccumulatorV2 import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutableobject AccWordCount {def main(args: Array[String]): Unit {val sparkConfnew SparkConf().setMaster(local).setAppName(AccWordCount)val sc new SparkContext(sparkConf)val value sc.makeRDD(List(hello,spark,hello))//累加器WordCount//创建累加器对象val wcAccnew MyAccumulator()//向Spark进行注册sc.register(wcAcc, wordCountAcc)value.foreach(word{//数据的累加使用累加器wcAcc.add(word)})//获取累加器结果println(wcAcc.value)sc.stop()}/*** 自定义数据累加器* 1、继承AccumulatorV2。定义泛型* IN累加器输入的数据类型* OUT返回的数据类型* 2、重写方法*/class MyAccumulator extends AccumulatorV2[String,mutable.Map[String,Long]]{val wcMap mutable.Map[String, Long]()override def isZero: Boolean wcMap.isEmpty//判断知否为初始状态override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] new MyAccumulator()//复制一个新的累加器override def reset(): Unit wcMap.clear()//重置累加器override def add(word: String): Unit { //获取累加器需要计算的值val newcountwcMap.getOrElse(word,0L)1LwcMap.update(word,newcount)}override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit {//Driver合并多个累加器val map1this.wcMapval map2other.valuemap2.foreach {case (word, count) {val newCount map1.getOrElse(word, 0L) countwcMap.update(word, newCount)}}}override def value: mutable.Map[String, Long] wcMap //获取累加器结果} }1. 代码功能概述 该代码使用Apache Spark实现了一个基于自定义累加器的单词计数WordCount程序。通过自定义MyAccumulator类继承AccumulatorV2统计RDD中每个单词的出现次数并利用累加器的分布式聚合特性将结果汇总到驱动程序。 2. 代码逐段解析 主程序逻辑 object AccWordCount {def main(args: Array[String]): Unit {val sparkConf new SparkConf().setMaster(local).setAppName(AccWordCount)val sc new SparkContext(sparkConf)val value sc.makeRDD(List(hello, spark, hello))// 创建并注册累加器val wcAcc new MyAccumulator()sc.register(wcAcc, wordCountAcc)// 遍历RDD累加单词value.foreach(word wcAcc.add(word))// 输出结果println(wcAcc.value) // 预期输出Map(hello - 2, spark - 1)sc.stop()} }RDD创建sc.makeRDD生成包含3个单词的RDD。累加器注册MyAccumulator实例通过sc.register注册到SparkContext名称为wordCountAcc。累加操作foreach遍历RDD中的每个单词调用wcAcc.add(word)累加计数。结果获取wcAcc.value返回最终的单词计数Map。 自定义累加器 MyAccumulator class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {val wcMap mutable.Map[String, Long]()override def isZero: Boolean wcMap.isEmptyoverride def copy(): AccumulatorV2[String, mutable.Map[String, Long]] new MyAccumulator()override def reset(): Unit wcMap.clear()override def add(word: String): Unit {val newCount wcMap.getOrElse(word, 0L) 1LwcMap.update(word, newCount)}override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit {val map1 this.wcMapval map2 other.valuemap2.foreach { case (word, count) val newCount map1.getOrElse(word, 0L) countwcMap.update(word, newCount)}}override def value: mutable.Map[String, Long] wcMap }核心字段wcMap用于存储单词及其计数。关键方法 isZero判断累加器是否为空初始状态。copy创建累加器的副本用于任务节点本地计算。reset清空累加器状态。add累加单个单词的计数。merge合并其他累加器的统计结果分布式汇总。value返回最终结果。 3. Spark累加器原理 累加器的作用 分布式聚合在多个任务节点上独立计算局部结果最后汇总到驱动程序。高效通信避免频繁的Shuffle操作减少网络开销。线程安全Spark保证每个任务节点内的累加器操作是串行的。 AccumulatorV2 vs AccumulatorV1 AccumulatorV1仅支持简单数据类型如Long、Double适用于计数、求和等场景。AccumulatorV2支持复杂数据类型如Map、List需自定义add和merge方法适用于更灵活的聚合需求如WordCount。 累加器执行流程 任务节点本地计算每个任务节点维护累加器的本地副本通过add方法累加数据。结果汇总任务完成后Spark将各节点的累加器副本发送到驱动程序调用merge方法合并结果。驱动程序获取结果通过value方法获取全局聚合结果。 4. 代码扩展与优化建议 支持多词统计 当前代码统计单次出现的单词若需统计多个单词如键值对可修改add方法 override def add(input: String): Unit {val words input.split(\\s) // 按空格分割多词words.foreach(word {val newCount wcMap.getOrElse(word, 0L) 1LwcMap.update(word, newCount)}) }线程安全优化 若add方法可能被多线程并发调用如在复杂算子中需添加同步锁 override def add(word: String): Unit this.synchronized {val newCount wcMap.getOrElse(word, 0L) 1LwcMap.update(word, newCount) }使用内置累加器 对于简单场景如全局计数可直接使用Spark内置的LongAccumulator val countAcc sc.longAccumulator(countAcc) value.foreach(_ countAcc.add(1)) println(countAcc.value) // 输出总记录数5. Spark累加器的适用场景 全局计数统计任务处理的总记录数、错误数等。分组统计如WordCount、用户行为分类统计。指标监控实时计算平均值、最大值等需结合自定义逻辑。调试与日志在不中断作业的情况下收集分布式运行状态。 6. 总结 该代码通过自定义AccumulatorV2实现了分布式单词计数展示了累加器的核心原理任务节点本地计算 驱动程序全局汇总。通过合理设计add和merge方法累加器可支持复杂聚合逻辑是Spark中高效的分布式统计工具。
http://www.zqtcl.cn/news/852454/

相关文章:

  • 哈尔滨市建设安全网站火车头更新wordpress
  • 做亚马逊外国网站需要语言好吗邢台seo
  • jsp在网站开发中的优势国内哪个推广网站做的好
  • 做网站工资高吗精品资料
  • 做农业需关注什么网站热门代理项目
  • 网站开发公司营业范围照片制作视频软件app
  • 做网站怎么qq邮箱验证免费拥有wordpress
  • 校园网站建设资金来源有wordpress权重
  • 魔站网站开发wordpress 3.3.1
  • 东莞个人免费建网站网站后台管理系统 asp
  • 呼和浩特网站制作 建设wordpress怎么改中文
  • 银医网站建设方案公司网站模板免费下载
  • 优秀网站设计案例中国网站建设东莞公司
  • 自己的公网ip可以做网站网页设计与制作课程思政建设内容
  • 静态网站 搜索搭建织梦网站教程
  • idc 网站备案手机电脑网站建设短视频
  • 做搜狗pc网站优化快速亚马逊云搭建WordPress
  • 免费的建网站软件2020做seo还有出路吗
  • 宁波三优互动网站建设公司怎么样网站建设公司管理流程图
  • 网站内文章外链如何做创新设计
  • 西安做网站公网站做友链有行业要求吗
  • 做现金贷网站的公司软件开发设计文档
  • 数据做图网站表示商业网站的域名
  • 网站备案单位备案老域名
  • 黔西南州建设局网站网站建设流程的过程
  • 河南龙王建设集团网站沈阳专业建站
  • 百度网盘app下载安装手机版百度排名优化咨询电话
  • 网站微信公众号链接怎么做免费访问国外网站的应用
  • 东莞网站搜索排名wordpress 小工具居中
  • 网上商城网站源码网站建站中关键字搜索怎么弄