赣州酒店网站设计,装修设计公司咨询,网站维护与更新,加强部门网站建设工作概述 spark 版本为 3.2.4#xff0c;注意 RDD 转 DataFrame 的代码出现的问题及解决方案 本文目标如下#xff1a;
RDD ,Datasets,DataFrames 之间的区别入门 SparkSession创建 DataFramesDataFrame 操作编程方式运行 sql 查询创建 DatasetsDataFrames 与 RDDs 互相转换 使用…概述 spark 版本为 3.2.4注意 RDD 转 DataFrame 的代码出现的问题及解决方案 本文目标如下
RDD ,Datasets,DataFrames 之间的区别入门 SparkSession创建 DataFramesDataFrame 操作编程方式运行 sql 查询创建 DatasetsDataFrames 与 RDDs 互相转换 使用反射推断模式编程指定 Schema
参考 Spark 官网
相关文章链接如下
文章链接spark standalone环境安装地址Spark的工作与架构原理地址使用spark开发第一个程序WordCount程序及多方式运行代码地址RDD编程指南地址RDD持久化地址
RDD ,Datasets,DataFrames 之间的区别
Datasets , DataFrames和 RDD
Dataset 是一个分布式的数据集合Dataset 是 Spark 1.6 中添加的一个新接口它增益了 RDD (强类型可以使用 lambda 函数的能力) 和 Spark sql 优化执行引擎的优势。Dataset 可以由JVM对象构建然后使用函数转换map、flatMap、filter等进行操作。数据集API有Scala和Java版本。Python不支持数据集API。
DataFrame是组织成命名列的数据集。它在概念上等同于关系数据库中的表DataFrame API在Scala、Java、Python和R中可用。在Scala API中DataFrame只是Dataset[Row]的一个类型别名。而在Java API中用户需要使用DatasetRow来表示DataFrame。
DataFrameRDDSchemaRDD可以认为是表中的数据Schema是表结构信息。DataFrame可以通过很多来源进行构建包括结构化的数据文件Hive中的表外部的关系型数据库以及RDD
入门
Spark SQL是一个用于结构化数据处理的Spark模块。与基本的Spark RDD API不同Spark SQL提供的接口为Spark提供了更多关于正在执行的数据结构信息。在内部Spark SQL使用这些额外的信息来执行额外的优化。有几种方法可以与SparkSQL进行交互包括SQL和 Dataset API。计算结果时使用相同的执行引擎与用于表示计算的API/语言无关。方便用户切换不同的方式进行操作
people.json
people.json文件准备
SparkSession
Spark sql 中所有功能入口点是 SparkSession类。创建一个基本的 SparkSession只需使用 SparkSession.builder()
import org.apache.spark.sql.SparkSessionval spark SparkSession.builder().appName(Spark SQL basic example).config(spark.some.config.option, some-value).getOrCreate()创建 DataFrames
使用 SparkSession通过存在的RDDhive 表或其它的Spark data sources 程序创建 DataFrames
val df spark.read.json(/tmp/people.json)
df.show()执行如下图
DataFrame 操作
使用数据集进行结构化数据处理的基本示例如下
// 需要引入 spark.implicits._ 才可使用 $
// This import is needed to use the $-notation
import spark.implicits._
// 打印schema 以树格式
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable true)
// |-- name: string (nullable true)// 仅显示 name 列
// Select only the name column
df.select(name).show()
// -------
// | name|
// -------
// |Michael|
// | Andy|
// | Justin|
// -------
// 显示所有age 加1
// Select everybody, but increment the age by 1
df.select($name, $age 1).show()
// ----------------
// | name|(age 1)|
// ----------------
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// ----------------// 过滤 人的 age 大于 21
// Select people older than 21
df.filter($age 21).show()
// -------
// |age|name|
// -------
// | 30|Andy|
// -------// 按 age 分组统计
// Count people by age
df.groupBy(age).count().show()
// ---------
// | age|count|
// ---------
// | 19| 1|
// |null| 1|
// | 30| 1|
// ---------spark-shell 执行如下图
编程方式运行 sql 查询
df.createOrReplaceTempView(people)val sqlDF spark.sql(SELECT * FROM people)
sqlDF.show()执行如下
scala df.createOrReplaceTempView(people)scala val sqlDF spark.sql(SELECT * FROM people)
sqlDF: org.apache.spark.sql.DataFrame [age: bigint, name: string]scala sqlDF.show()
-----------
| age| name|
-----------
|null|Michael|
| 30| Andy|
| 19| Justin|
-----------创建 Datasets
Datasets类似于RDD不是使用Java序列化或Kryo而是使用专门的编码器来序列化对象以便通过网络进行处理或传输。使用的格式允许Spark执行许多操作如过滤、排序和哈希而无需将字节反序列化为对象。
case class Person(name: String, age: Long)// 为 case classes 创建编码器
// Encoders are created for case classes
val caseClassDS Seq(Person(Andy, 32)).toDS()
caseClassDS.show()// 为能用类型创建编码器并提供 spark.implicits._ 引入
// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS Seq(1, 2, 3).toDS()
primitiveDS.map(_ 1).collect() // Returns: Array(2, 3, 4)// 通过定义类将按照名称映射DataFrames 能被转成 Dataset
// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path /tmp/people.json
val peopleDS spark.read.json(path).as[Person]
peopleDS.show()执行如下
scala case class Person(name: String, age: Long)
defined class Personscala val caseClassDS Seq(Person(Andy, 32)).toDS()
caseClassDS: org.apache.spark.sql.Dataset[Person] [name: string, age: bigint]scala caseClassDS.show()
-------
|name|age|
-------
|Andy| 32|
-------scala val primitiveDS Seq(1, 2, 3).toDS()
primitiveDS: org.apache.spark.sql.Dataset[Int] [value: int]scala primitiveDS.map(_ 1).collect()
res1: Array[Int] Array(2, 3, 4)scala val path /tmp/people.json
path: String /tmp/people.jsonscala val peopleDS spark.read.json(path).as[Person]
peopleDS: org.apache.spark.sql.Dataset[Person] [age: bigint, name: string]scala peopleDS.show()
-----------
| age| name|
-----------
|null|Michael|
| 30| Andy|
| 19| Justin|
-----------DataFrames 与 RDDs 互相转换
Spark SQL支持两种不同的方法将现有RDD转换为Datasets。
第一种方法使用反射来推断包含特定类型对象的RDD的模式。这种基于反射的方法可以生成更简洁的代码当知道 schema 结构的时间会有更好的效果。第二种方法是通过编程接口构造 schema然后将其应用于现有的RDD。虽然此方法更详细直至运行时才能知道他们的字段和类型用于构造 Datasets。
使用反射推断模式
代码如下
object RddToDataFrameByReflect {def main(args: Array[String]): Unit {val spark SparkSession.builder().appName(RddToDataFrameByReflect).master(local).getOrCreate()// 用于从RDD到DataFrames的隐式转换// For implicit conversions from RDDs to DataFramesimport spark.implicits._// Create an RDD of Person objects from a text file, convert it to a Dataframeval peopleDF spark.sparkContext.textFile(/Users/hyl/Desktop/fun/sts/spark-demo/people.txt).map(_.split(,)).map(attributes Person(attributes(0), attributes(1).trim.toInt)).toDF()// Register the DataFrame as a temporary viewpeopleDF.createOrReplaceTempView(people)// SQL statements can be run by using the sql methods provided by Sparkval teenagersDF spark.sql(SELECT name, age FROM people WHERE age BETWEEN 13 AND 19)// The columns of a row in the result can be accessed by field indexteenagersDF.map(teenager Name: teenager(0)).show()// or by field nameteenagersDF.map(teenager Name: teenager.getAs[String](name)).show()}case class Person(name: String, age: Long)
}执行如下图
编码问题
关于 Spark 官网 上复杂类型编码问题直接加下面一句代码
teenagersDF.map(teenager teenager.getValuesMap[Any](List(name, age))).collect().foreach(println(_))报以下图片错误 将原有代码改变如下 // 没有为 Dataset[Map[K,V]] 预先定义编码器需要自己定义// No pre-defined encoders for Dataset[Map[K,V]], define explicitlyimplicit val mapEncoder org.apache.spark.sql.Encoders.kryo[Map[String, Any]]// 也可以如下操作// Primitive types and case classes can be also defined as// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] ExpressionEncoder()// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]teenagersDF.map(teenager teenager.getValuesMap[Any](List(name, age))).collect().foreach(println(_))// Array(Map(name - Justin, age - 19))通过这一波操作就可以理解什么情况下需要编码器以及编码器的作用
编程指定 Schema
代码如下
object RddToDataFrameByProgram {def main(args: Array[String]): Unit {val spark SparkSession.builder().master(local).getOrCreate()import org.apache.spark.sql.Rowimport org.apache.spark.sql.types._// 加上此解决报错问题import spark.implicits._// Create an RDDval peopleRDD spark.sparkContext.textFile(/Users/hyl/Desktop/fun/sts/spark-demo/people.txt)// The schema is encoded in a stringval schemaString name age// Generate the schema based on the string of schemaval fields schemaString.split( ).map(fieldName StructField(fieldName, StringType, nullable true))val schema StructType(fields)// Convert records of the RDD (people) to Rowsval rowRDD peopleRDD.map(_.split(,)).map(attributes Row(attributes(0), attributes(1).trim))// Apply the schema to the RDDval peopleDF spark.createDataFrame(rowRDD, schema)// Creates a temporary view using the DataFramepeopleDF.createOrReplaceTempView(people)// SQL can be run over a temporary view created using DataFramesval results spark.sql(SELECT name FROM people)// The results of SQL queries are DataFrames and support all the normal RDD operations// The columns of a row in the result can be accessed by field index or by field nameresults.map(attributes Name: attributes(0)).show()}
}执行如下图
官方文档的代码不全问题 Unable to find encoder for type String. An implicit Encoder[String] is needed to store String instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. results.map(attributes Name: attributes(0)).show() 加下以下代码
// 加上此解决报错问题
import spark.implicits._如下图解决
结束
spark sql 至此结束如有问题欢迎评论区留言。