当前位置: 首页 > news >正文

赣州酒店网站设计装修设计公司咨询

赣州酒店网站设计,装修设计公司咨询,网站维护与更新,加强部门网站建设工作概述 spark 版本为 3.2.4#xff0c;注意 RDD 转 DataFrame 的代码出现的问题及解决方案 本文目标如下#xff1a; RDD ,Datasets,DataFrames 之间的区别入门 SparkSession创建 DataFramesDataFrame 操作编程方式运行 sql 查询创建 DatasetsDataFrames 与 RDDs 互相转换 使用…概述 spark 版本为 3.2.4注意 RDD 转 DataFrame 的代码出现的问题及解决方案 本文目标如下 RDD ,Datasets,DataFrames 之间的区别入门 SparkSession创建 DataFramesDataFrame 操作编程方式运行 sql 查询创建 DatasetsDataFrames 与 RDDs 互相转换 使用反射推断模式编程指定 Schema 参考 Spark 官网 相关文章链接如下 文章链接spark standalone环境安装地址Spark的工作与架构原理地址使用spark开发第一个程序WordCount程序及多方式运行代码地址RDD编程指南地址RDD持久化地址 RDD ,Datasets,DataFrames 之间的区别 Datasets , DataFrames和 RDD Dataset 是一个分布式的数据集合Dataset 是 Spark 1.6 中添加的一个新接口它增益了 RDD (强类型可以使用 lambda 函数的能力) 和 Spark sql 优化执行引擎的优势。Dataset 可以由JVM对象构建然后使用函数转换map、flatMap、filter等进行操作。数据集API有Scala和Java版本。Python不支持数据集API。 DataFrame是组织成命名列的数据集。它在概念上等同于关系数据库中的表DataFrame API在Scala、Java、Python和R中可用。在Scala API中DataFrame只是Dataset[Row]的一个类型别名。而在Java API中用户需要使用DatasetRow来表示DataFrame。 DataFrameRDDSchemaRDD可以认为是表中的数据Schema是表结构信息。DataFrame可以通过很多来源进行构建包括结构化的数据文件Hive中的表外部的关系型数据库以及RDD 入门 Spark SQL是一个用于结构化数据处理的Spark模块。与基本的Spark RDD API不同Spark SQL提供的接口为Spark提供了更多关于正在执行的数据结构信息。在内部Spark SQL使用这些额外的信息来执行额外的优化。有几种方法可以与SparkSQL进行交互包括SQL和 Dataset API。计算结果时使用相同的执行引擎与用于表示计算的API/语言无关。方便用户切换不同的方式进行操作 people.json people.json文件准备 SparkSession Spark sql 中所有功能入口点是 SparkSession类。创建一个基本的 SparkSession只需使用 SparkSession.builder() import org.apache.spark.sql.SparkSessionval spark SparkSession.builder().appName(Spark SQL basic example).config(spark.some.config.option, some-value).getOrCreate()创建 DataFrames 使用 SparkSession通过存在的RDDhive 表或其它的Spark data sources 程序创建 DataFrames val df spark.read.json(/tmp/people.json) df.show()执行如下图 DataFrame 操作 使用数据集进行结构化数据处理的基本示例如下 // 需要引入 spark.implicits._ 才可使用 $ // This import is needed to use the $-notation import spark.implicits._ // 打印schema 以树格式 // Print the schema in a tree format df.printSchema() // root // |-- age: long (nullable true) // |-- name: string (nullable true)// 仅显示 name 列 // Select only the name column df.select(name).show() // ------- // | name| // ------- // |Michael| // | Andy| // | Justin| // ------- // 显示所有age 加1 // Select everybody, but increment the age by 1 df.select($name, $age 1).show() // ---------------- // | name|(age 1)| // ---------------- // |Michael| null| // | Andy| 31| // | Justin| 20| // ----------------// 过滤 人的 age 大于 21 // Select people older than 21 df.filter($age 21).show() // ------- // |age|name| // ------- // | 30|Andy| // -------// 按 age 分组统计 // Count people by age df.groupBy(age).count().show() // --------- // | age|count| // --------- // | 19| 1| // |null| 1| // | 30| 1| // ---------spark-shell 执行如下图 编程方式运行 sql 查询 df.createOrReplaceTempView(people)val sqlDF spark.sql(SELECT * FROM people) sqlDF.show()执行如下 scala df.createOrReplaceTempView(people)scala val sqlDF spark.sql(SELECT * FROM people) sqlDF: org.apache.spark.sql.DataFrame [age: bigint, name: string]scala sqlDF.show() ----------- | age| name| ----------- |null|Michael| | 30| Andy| | 19| Justin| -----------创建 Datasets Datasets类似于RDD不是使用Java序列化或Kryo而是使用专门的编码器来序列化对象以便通过网络进行处理或传输。使用的格式允许Spark执行许多操作如过滤、排序和哈希而无需将字节反序列化为对象。 case class Person(name: String, age: Long)// 为 case classes 创建编码器 // Encoders are created for case classes val caseClassDS Seq(Person(Andy, 32)).toDS() caseClassDS.show()// 为能用类型创建编码器并提供 spark.implicits._ 引入 // Encoders for most common types are automatically provided by importing spark.implicits._ val primitiveDS Seq(1, 2, 3).toDS() primitiveDS.map(_ 1).collect() // Returns: Array(2, 3, 4)// 通过定义类将按照名称映射DataFrames 能被转成 Dataset // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name val path /tmp/people.json val peopleDS spark.read.json(path).as[Person] peopleDS.show()执行如下 scala case class Person(name: String, age: Long) defined class Personscala val caseClassDS Seq(Person(Andy, 32)).toDS() caseClassDS: org.apache.spark.sql.Dataset[Person] [name: string, age: bigint]scala caseClassDS.show() ------- |name|age| ------- |Andy| 32| -------scala val primitiveDS Seq(1, 2, 3).toDS() primitiveDS: org.apache.spark.sql.Dataset[Int] [value: int]scala primitiveDS.map(_ 1).collect() res1: Array[Int] Array(2, 3, 4)scala val path /tmp/people.json path: String /tmp/people.jsonscala val peopleDS spark.read.json(path).as[Person] peopleDS: org.apache.spark.sql.Dataset[Person] [age: bigint, name: string]scala peopleDS.show() ----------- | age| name| ----------- |null|Michael| | 30| Andy| | 19| Justin| -----------DataFrames 与 RDDs 互相转换 Spark SQL支持两种不同的方法将现有RDD转换为Datasets。 第一种方法使用反射来推断包含特定类型对象的RDD的模式。这种基于反射的方法可以生成更简洁的代码当知道 schema 结构的时间会有更好的效果。第二种方法是通过编程接口构造 schema然后将其应用于现有的RDD。虽然此方法更详细直至运行时才能知道他们的字段和类型用于构造 Datasets。 使用反射推断模式 代码如下 object RddToDataFrameByReflect {def main(args: Array[String]): Unit {val spark SparkSession.builder().appName(RddToDataFrameByReflect).master(local).getOrCreate()// 用于从RDD到DataFrames的隐式转换// For implicit conversions from RDDs to DataFramesimport spark.implicits._// Create an RDD of Person objects from a text file, convert it to a Dataframeval peopleDF spark.sparkContext.textFile(/Users/hyl/Desktop/fun/sts/spark-demo/people.txt).map(_.split(,)).map(attributes Person(attributes(0), attributes(1).trim.toInt)).toDF()// Register the DataFrame as a temporary viewpeopleDF.createOrReplaceTempView(people)// SQL statements can be run by using the sql methods provided by Sparkval teenagersDF spark.sql(SELECT name, age FROM people WHERE age BETWEEN 13 AND 19)// The columns of a row in the result can be accessed by field indexteenagersDF.map(teenager Name: teenager(0)).show()// or by field nameteenagersDF.map(teenager Name: teenager.getAs[String](name)).show()}case class Person(name: String, age: Long) }执行如下图 编码问题 关于 Spark 官网 上复杂类型编码问题直接加下面一句代码 teenagersDF.map(teenager teenager.getValuesMap[Any](List(name, age))).collect().foreach(println(_))报以下图片错误 将原有代码改变如下 // 没有为 Dataset[Map[K,V]] 预先定义编码器需要自己定义// No pre-defined encoders for Dataset[Map[K,V]], define explicitlyimplicit val mapEncoder org.apache.spark.sql.Encoders.kryo[Map[String, Any]]// 也可以如下操作// Primitive types and case classes can be also defined as// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] ExpressionEncoder()// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]teenagersDF.map(teenager teenager.getValuesMap[Any](List(name, age))).collect().foreach(println(_))// Array(Map(name - Justin, age - 19))通过这一波操作就可以理解什么情况下需要编码器以及编码器的作用 编程指定 Schema 代码如下 object RddToDataFrameByProgram {def main(args: Array[String]): Unit {val spark SparkSession.builder().master(local).getOrCreate()import org.apache.spark.sql.Rowimport org.apache.spark.sql.types._// 加上此解决报错问题import spark.implicits._// Create an RDDval peopleRDD spark.sparkContext.textFile(/Users/hyl/Desktop/fun/sts/spark-demo/people.txt)// The schema is encoded in a stringval schemaString name age// Generate the schema based on the string of schemaval fields schemaString.split( ).map(fieldName StructField(fieldName, StringType, nullable true))val schema StructType(fields)// Convert records of the RDD (people) to Rowsval rowRDD peopleRDD.map(_.split(,)).map(attributes Row(attributes(0), attributes(1).trim))// Apply the schema to the RDDval peopleDF spark.createDataFrame(rowRDD, schema)// Creates a temporary view using the DataFramepeopleDF.createOrReplaceTempView(people)// SQL can be run over a temporary view created using DataFramesval results spark.sql(SELECT name FROM people)// The results of SQL queries are DataFrames and support all the normal RDD operations// The columns of a row in the result can be accessed by field index or by field nameresults.map(attributes Name: attributes(0)).show()} }执行如下图 官方文档的代码不全问题 Unable to find encoder for type String. An implicit Encoder[String] is needed to store String instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. results.map(attributes Name: attributes(0)).show() 加下以下代码 // 加上此解决报错问题 import spark.implicits._如下图解决 结束 spark sql 至此结束如有问题欢迎评论区留言。
http://www.zqtcl.cn/news/618072/

相关文章:

  • 网上有做口译的网站么怎样手机做网站教程
  • 孵化器网站平台建设网站一直建设中
  • 企业网站建设的方案书网站镜像 cdn
  • 淘宝做网站的都是模板泉州模板建站公司
  • 清理网站数据库网站服务器租一个月
  • wordpress免费简约主题搜索引擎优化的英文
  • 瑞安门户网站建设怎么建设自己网站首页
  • 网站建设岗位周计划thinkphp微网站开发
  • 如何修改asp网站栏目帝国cms网站搬家教程
  • 网站建设与网页制作小团队兼职做网站
  • 嘉兴做网站的公司网红营销价值
  • scala做网站广州化妆品网站制作
  • 网站建设小组五类成员在线购物网站功能模块
  • 网站建设开发详细步骤流程图网站建设与管理实训报告总结
  • 网站设计的素材旅游网站建设标书
  • 做网站还得备案大企业网站建设多少钱
  • 一般做网站空间大概多少钱电商网站开发公司
  • 海报模板在线制作免费网站如何建设个人网站
  • 网站集群建设的意义如何优化推广网站
  • 怎么给公司做免费网站服装品牌网页设计图片
  • 中国通信建设协会网站新手建网站教程
  • 做网站页面的需要哪些技巧wordpress 网址导航
  • 如何做美食网站设计广州网页设计招聘
  • 中国商标网商标查询官方网站页面模板怎么添加文章
  • 建设基础化学网站的经验如何建设网站pdf下载
  • 外贸公司网站设计公司做网站能挣钱不
  • 免费网站ppt模板下载济南建设网站公司
  • 网站建设技术托管免费空间域名注册免备案
  • 威海住房建设部官方网站专科网站开发就业方向
  • 做外贸网站多少钱成都网页设计专业