当前位置: 首页 > news >正文

河西网站建设优化seo宣传手册word模板

河西网站建设优化seo,宣传手册word模板,城乡建设管理局网站,分类信息网址导航实验目的#xff1a;掌握Scala开发工具消费Kafka数据#xff0c;并将结果保存到关系型数据库中 实验方法#xff1a;消费Kafka数据保存到MySQL中 实验步骤#xff1a; 一、创建Job_ClickData_Process 代码如下#xff1a; package examsimport org.apache.kafka.clien…实验目的掌握Scala开发工具消费Kafka数据并将结果保存到关系型数据库中 实验方法消费Kafka数据保存到MySQL中 实验步骤 一、创建Job_ClickData_Process 代码如下 package examsimport org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.{Seconds, StreamingContext}import java.sql.{Connection, DriverManager, PreparedStatement} import scala.collection.mutable/*** projectName sparkGNU2023 * package exams * className exams.Job_ClickData_Process * description ${description} * author pblh123* date 2023/12/20 15:42* version 1.0**/object Job_ClickData_Process {def main(args: Array[String]): Unit {// 1. 创建spark,sc,sparkstreaming对象if (args.length ! 3) {println(您需要输入三个参数)System.exit(5)}val musrl: String args(0)val conf new SparkConf().setAppName(s${this.getClass.getSimpleName}).setMaster(musrl)val sc: SparkContext new SparkContext(conf)sc.setLogLevel(WARN)val ckeckpointdir: String args(1)val ssc new StreamingContext(sc, Seconds(5)) //连续流批次处理的大小// 2. 代码主体 // 设置ckeckpoint目录ssc.checkpoint(ckeckpointdir)//准备kafka的连接参数val kfkbst: String args(2)val kafkaParams: Map[String, Object] Map[String, Object](bootstrap.servers - kfkbst,group.id - SparkKafka,//latest表示如果记录了偏移量的位置就从记录的位置开始消费如果没有记录就从最新/或最后的位置开始消费//earliest表示如果记录了偏移量的位置就从记录的位置开始消费如果没有记录就从最开始/最早的位置开始消费//none示如果记录了偏移量的位置就从记录的位置开始消费如果没有记录则报错auto.offset.reset - latest, //偏移量的重置位置enable.auto.commit - (false: java.lang.Boolean), //是否自动提交偏移量key.deserializer - classOf[StringDeserializer],value.deserializer - classOf[StringDeserializer])val topics: Array[String] Array(RealDataTopic)//从mysql中查询出offsets:Map[TopicPartition, Long]val offsetsMap: mutable.Map[TopicPartition, Long] OffsetUtils.getOffsetMap(SparkKafka, RealDataTopic)val kafkaDS: InputDStream[ConsumerRecord[String, String]] if (offsetsMap.size 0) {println(MySql记录了offset信息,从offset处开始消费)//连接kafka的消息KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsetsMap))} else {println(MySql没有记录了offset信息,从latest处开始消费)//连接kafka的消息KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))}//实时处理数据并手动维护offsetval valueDS kafkaDS.map(_.value()) //_表示从kafka中消费出来的每一条数据valueDS.print()kafkaDS.map(_.value())valueDS.foreachRDD(rdd {rdd.foreachPartition(lines {//将处理分析的结果存入mysql/*DROP TABLE IF EXISTS job_real_time;CREATE TABLE job_real_time (datetime varchar(8) DEFAULT NULL COMMENT 日期,job_type int(2) DEFAULT NULL COMMENT 1代表新招聘岗位0代表找工作的人,job_id int(8) DEFAULT NULL COMMENT 岗位ID匹配岗位名称,count int(8) DEFAULT NULL COMMENT 企业新增岗位数和找工作的人数) ENGINEInnoDB DEFAULT CHARSETutf8;*///1.开启连接val conn: Connection DriverManager.getConnection(jdbc:mysql://192.168.137.110:3306/bigdata19?characterEncodingUTF-8serverTimezoneUTC, lh, Lh123456!)//2.编写sql并获取psval sql: String replace into job_real_time(datetime,job_type,job_id,count) values(?,?,?,?)val ps: PreparedStatement conn.prepareStatement(sql)//3.设置参数并执行for (line - lines) {var item line.split( )ps.setString(1, item(0).toString)ps.setInt(2, item(1).toInt)ps.setInt(3, item(2).toInt)ps.setInt(4, item(3).toInt)ps.executeUpdate()}//4.关闭资源ps.close()conn.close()})})//手动提交偏移量kafkaDS.foreachRDD(rdd {if (rdd.count() 0) {//获取偏移量val offsets: Array[OffsetRange] rdd.asInstanceOf[HasOffsetRanges].offsetRangesOffsetUtils.saveOffsets(groupId SparkKafka, offsets)}})//开启sparkstreaming任务并等待结束关闭ssc,scssc.start()ssc.awaitTermination()ssc.stop()sc.stop()}}二、编写模拟点击量并消费Kafka数据 启动zookeeper集群 zk.sh start 启动kafka集群 kf.sh start 检查模拟的实时数据是否正常更新 不断正常更新的情况下启动flume采集real-time-data.log的实时数据 启动flume 在mysql数据库中准备偏移表与实时数据表 启动Job_ClickData_Process方法消费kafka数据并保存到mysql中 检查mysql表是否存入数据 实验结果通过scala开发spark代码实现消费kafka数据存储到MySQL中
http://www.zqtcl.cn/news/613271/

相关文章:

  • 全国房地产网站企管宝app下载
  • 无线网络网站dns解析失败南通模板建站多少钱
  • h5手机网站建设哪家好北京海淀房管局网站
  • 制作一个简单的网站冬奥会网页设计代码
  • 如何做网站 百度西充建设部门投诉网站
  • 怎么创建自己的博客网站网站优化主要内容
  • 太原网站建设推广建设网站观澜
  • 网站开发员名称是什么网站制作教程及流程
  • 建设财经资讯网站的目的移动端网站模板怎么做的
  • 受欢迎的赣州网站建设青岛建站
  • 青海网站制作哪家好烟台龙口网站建设
  • 婚恋网站排名前十名网站建设的论坛
  • 进行网站建设有哪些重要意义上海浦东建筑建设网站污水处理工程
  • 自己做qq代刷网站要钱吗瑞安网站建设优化推广
  • 建设网站招标定制高端网站建设报价
  • 商城网站建设code521广州安全教育平台登录入囗
  • 如何做网站系统安庆网站建设公司简
  • 北京做网站电话的公司网站怎么做外链
  • 手工艺品外贸公司网站建设方案复古风格网站
  • 企业网站后端模板如何编写手机程序
  • 泰州网站建设服务好wordpress 子分类
  • 做个企业网站要多少钱php mysql怎么编写视频网站
  • 精仿手表网站做网站为什么要做备案接入
  • 哈什么网一个网站做ppt清新区城乡建设局网站
  • 重庆专业网站建设首页排名网站模板广告去除
  • 河南省建设行业证书查询网站怎么用ps做网站首页背景图片
  • 如何取一个大气的名字的做网站青岛北方现货交易平台
  • 关于做书的网站购物网站建设资讯
  • 运营网站开发工作招聘做装修有什么好网站可以做
  • 免费自学平面设计的网站直播网站开发源码