搭建公司介绍网站,wordpress安装显示英文,南京建设网站方案,12345微信公众号第 1 章#xff1a;spark sql概述
1.1 什么是spark sql
1、spark sql是spark用于结构化数据处理的spark模块 1#xff09;半结构化数据#xff08;日志数据#xff09;
2#xff09;结构化数据#xff08;数据库数据#xff09;
1.2 为什么要有sparksql hive on s…第 1 章spark sql概述
1.1 什么是spark sql
1、spark sql是spark用于结构化数据处理的spark模块 1半结构化数据日志数据
2结构化数据数据库数据
1.2 为什么要有sparksql hive on sparkhive既作为存储元数据又负责sql的解析优化语法是hql语法执行引擎编程了sparkspark负责采用rdd执行。
spark on hivehive只作为存储元数据spark负责sql解析优化语法是spark sql语法spark底层采用优化后的df或者ds执行。
1.3 spark sql原理
spark sql它提供了2个编程抽象dataframe、dataset类似spark core中的rdd
1.3.1 什么是dataframe
1、dataframe是一种类似rdd的分布式数据集类似于传统数据库中的二维表格。 2、dataframe与rdd的主要区别在于dataframe带有schema元信息即dataframe所表示的二维表数据集的每一列都带有名称和类型。
左侧的rdd[person]虽然person为类型参数但spark框架本身不了解person类的内部结构。而右侧的dataframe却提供了详细的结构信息使得spark sql可以清楚的指导这些数据集中包含哪些列每列的名称和类型各是什么。 3、spark sql性能上比rdd要高。因为spark sql了解数据内部结构从而对藏于dataframe背后的数据源以及作用域dataframe之上的变换进行了针对性的优化最终达到大幅提升运行时效率的目标。反观rdd由于无从得知所存数据元素的具体内部结构spark core只能在stage层面进行简单、通用的流水线优化。
1.3.2 什么是dataset
dataset是分布式数据集。 dataset是强类型的。比如可以有dataset[car]dataset[user]。具有类型安全检查。 dataframe是dataset的特例type dataframedataset[row]row是一个类型跟car、user这些的类型一样所有的表结构信息都用row来表示。
1.3.3 rdd、dataframe和dataset之间关系
1、发展历史
如果同样的数据都给到这三种数据结构他们分别计算之后都会给出相同的结果。不同的是他们的执行效率和执行方式。在后期的spark版本中dataset有可能会逐步取代rdd和dataframe成为唯一的api接口。 2、三者的共性 1rdd、dataframe、dataset全都是spark平台下的分布式弹性数据集为处理超大型数据提供便利。 2三者都是惰性机制在进行创建、转换如map方法时不会立即执行只有在遇到action行动算子如foreach时三者才会开始遍历运算 3三者有许多共同的函数如filter排序等 4三者都会根据spark的内存情况自动缓存运算 5三者都有分区概念
1.4 spark sql的特点
1、易整合 无缝的整合了sql查询和spark编程。
2、统一的数据访问方式 使用相同的方式连接不同的数据源
3、兼容hive 在已有的仓库上直接运行sql或者hql
4、标准的数据连接 通过jdbc或者odbc来连接
第2 章spark sql编程
本章重点学习如何使用dataframe和dataset进行编程以及他们之间的关系和转换关于具体的sql书写不是本章的重点。
2.1 sparksession新的起始点
在老的版本中sparksql提供两种sql查询起始点 1、一个是sqlcontext用于spark自己提供的sql查询 2、一个叫hivecontext用于连接hive的查询 sparksession是spark最新的sql查询起始点实质上是sqlcontext和hivecontext的组合所以在sqlcontext和hivecontext上可用的api在sparksession上同样是可用使用的。 sparksession内部封装了sparkcontext所以计算实际上是由sparkcontext完成的。当我们使用spark-shell的时候spark框架会自动地创建一个名称叫做spark的sparksession就像我们以前可以自动获取到一个sc来表示sparkcontext。
[atguiguhadoop102 spark-local]$ bin/spark-shell20/09/12 11:16:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Sparks default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to WARN.
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://hadoop102:4040
Spark context available as sc (master local[*], app id local-1599880621394).
Spark session available as spark.
Welcome to____ __/ __/__ ___ _____/ /___\ \/ _ \/ _ / __/ _//___/ .__/\_,_/_/ /_/\_\ version 3.0.0/_/Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_212)
Type in expressions to have them evaluated.
Type :help for more information.
2.2 dataframe
dataframe是一种类似于rdd的分布式数据集类似于传统数据库中的二维表格
2.2.1 创建dataframe
在spark sql中sparksession是创建dataframe和执行sql的入口创建dataframe有三种方式 通过spark的数据源进行创建 从一个存在的rdd进行转换 还可以从hive table进行查询返回 1、从spark数据源进行创建 1数据准备在/opt/module/spark-local目录下创建一个user.json文件
{age:20,name:qiaofeng}
{age:19,name:xuzhu}
{age:18,name:duanyu}
2查看spark支持创建文件的数据源格式使用tab键查看
scala spark.read.
csv format jdbc json load option options orc parquet schema table text textFile
3读取json文件创建dataframe
scala val df spark.read.json(/opt/module/spark-local/user.json)
df: org.apache.spark.sql.DataFrame [age: bigint name: string]
注意如果从内存种获取数据spark可以指导数据类型具体是什么如果是数字默认作为int处理但是从文件种读取的数字不能确定是什么类型所以用bigint接收可以和long类型转换但是和int不能进行转换。 4查看dataframe算子
scala df.5展示结果
scala df.show
-----------
|age| name|
-----------
| 20|qiaofeng|
| 19| xuzhu|
| 18| duanyu|
-----------
2、从rdd进行转换 3、hive table进行查询返回
2.2.2 sql风格语法
sql语法风格是指我们查询数据的时候使用sql语句来查询这种风格的查询必须要有临时视图或者全局视图来辅助。 视图对特定表的数据的查询结果重复使用。view只能查询不能修改和插入。
1、临时视图 1创建一个dataframe
scala val df spark.read.json(/opt/module/spark-local/user.json)
df: org.apache.spark.sql.DataFrame [age: bigint name: string]
2对dataframe创建一个临时视图
scala df.createOrReplaceTempView(user)3通过sql语句实现查询全表
scala val sqlDF spark.sql(SELECT * FROM user)
sqlDF: org.apache.spark.sql.DataFrame [age: bigint name: string]
4结果展示
4结果展示
scala sqlDF.show
-----------
|age| name|
-----------
| 20|qiaofeng|
| 19| xuzhu|
| 18| duanyu|
-----------
5求年龄的平均值
scala val sqlDF spark.sql(SELECT avg(age) from user)
sqlDF: org.apache.spark.sql.DataFrame [avg(age): double]
6结果展示
scala sqlDF.show
--------
|avg(age)|
--------
| 19.0|
--------
7创建一个新会话再执行发现视图找不到
scala spark.newSession().sql(SELECT avg(age) from user ).show()
org.apache.spark.sql.AnalysisException: Table or view not found: user; line 1 pos 14;
注意普通临时视图是session范围内的如果向全局有效可以创建全局临时视图。 2、全局视图 1对于dataframe创建一个全局视图
scala df.createOrReplaceGlobalTempView (user2)2通过sql语句查询全表
scala spark.sql(SELECT * FROM global_temp.user2).show()
-----------
|age| name|
-----------
| 20|qiaofeng|
| 19| xuzhu|
| 18| duanyu|
-----------
3新建session通过sql语句实现查询全表
scala spark.newSession().sql(SELECT * FROM global_temp.user2).show()
-----------
|age| name|
-----------
| 20|qiaofeng|
| 19| xuzhu|
| 18| duanyu|
-----------
2.2.3 dsl风格语法
dataframe提供一个特定领域语言去管理格式化的数据可以在scalajavapython和r种使用dsl使用dsl语法风格不必去创建临时视图了。 1、创建一个dataframe
scala val df spark.read.json(/opt/module/spark-local/user.json)
df: org.apache.spark.sql.DataFrame [age: bigint name: string]
2、查看dataframe的schema信息
scala df.printSchema
root|-- age: Long (nullable true)|-- name: string (nullable true)
3、只查看“name”列数据 注意列名要用双括号引起来如果是单引号的话只能在前面加一个单引号
scala df.select(name).show()
--------
| name|
--------
|qiaofeng|
| xuzhu|
| duanyu|
--------scala df.select(name).show
--------
| name|
--------
|qiaofeng|
| xuzhu|
| duanyu|
--------
4、查看年龄和姓名且年龄大于18
scala df.select(age,name).where(age18).show
-----------
|age| name|
-----------
| 20|qiaofeng|
| 19| xuzhu|
-----------
5、查看所有列
scala df.select(*).show
-----------
|age| name|
-----------
| 20| qiaofeng|
| 19| xuzhu|
| 18| duanyu|
-----------
6、查看name列数据以及“age1”数据 注意涉及到运算的时候每列都必须使用$或者采用单引号表达式单引号字段名
scala df.select($name,$age 1).show
scala df.select(name, age 1).show()
scala df.select(name, age 1 as newage).show()-----------------
| name |(age 1)|
-----------------
|qiaofeng| 21|
| xuzhu| 20|
| duanyu| 19|
-----------------
7、查看”age“大于”19“的数据
scala df.filter(age19).show
-----------
|age | name|
-----------
| 20|qiaofeng|
-----------
8、按照”age“分组查看数据条数
scala df.groupBy(age).count.show
--------
|age|count|
--------
| 19| 1|
| 18| 1|
| 20| 1|
--------
9、求平均年龄avg(age)
scala df.agg(avg(age)).show
--------
|avg(age)|
--------
| 19.0|
--------
10、求年龄总和sum(age)
scala df.agg(max(age)).show
--------
|max(age)|
--------
| 20|
--------
2.3 dataset
dataset是具有强类型的数据集合需要提供对应的类型信息。
2.3.1 创建dataset基本数据类型
使用基本类型的序列创建dataset。 1、将集合转换为dataset
scala val ds Seq(1,2,3,4,5,6).toDS
ds: org.apache.spark.sql.Dataset[Int] [value: int]
2、查看dataset的值
scala ds.show
-----
|value|
-----
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
-----
2.3.2 创建dataset样例类序列
使用样例类序列创建dataset。 1、创建一个user的样例类
scala case class User(name: String, age: Long)
defined class User
2、将集合转换为dataset
scala val caseClassDS Seq(User(wangyuyan,18)).toDS()
caseClassDS: org.apache.spark.sql.Dataset[User] [name: string, age: bigint]
3、查看dataset的值
scala caseClassDS.show
------------
| name|age|
------------
|wangyuyan| 18|
------------
注意在实际开发的时候很少会把序列转换成dataset更多是通过rdd和dataframe转换来得到dataset
2.4 rdd、dataframe、dataset相互转换 2.4.1 idea创建sparksql工程
1、创建一个maven工程sparksqltest 2、在项目sparksqltest上点击右键add framework support-勾选scala 3、在main下创建scala文件夹并右键mark directory as sources root-在Scala下创建包名com.atguigu.sparksql 4、输入文件夹准备在新建的sparksqltest项目上右键-新建input文件夹-在input文件夹上右键-新建user.json。并输入如下内容
{age:20,name:qiaofeng}
{age:19,name:xuzhu}
{age:18,name:duanyu}
5、在pom.xml文件中添加spark-sql的依赖和scala的编译插件
dependenciesdependencygroupIdorg.apache.spark/groupIdartifactIdspark-sql_2.12/artifactIdversion3.0.0/version/dependency
/dependenciesbuild
finalNameSparkSQLTest/finalName
pluginsplugingroupIdnet.alchim31.maven/groupIdartifactIdscala-maven-plugin/artifactIdversion3.4.6/versionexecutionsexecutiongoalsgoalcompile/goalgoaltestCompile/goal/goals/execution/executions/plugin/plugins
/build
6、代码实现
package com.atguigu.sparksqlimport org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}object SparkSQL01_input {def main(args: Array[String]): Unit {// 1 创建上下文环境配置对象val conf: SparkConf new SparkConf().setAppName(SparkSQLTest).setMaster(local[*])// 2 创建SparkSession对象val spark: SparkSession SparkSession.builder().config(conf).getOrCreate()// 3 读取数据val df: DataFrame spark.read.json(input/user.json)// 4 可视化df.show()// 5 释放资源spark.stop()}
}
2.4.2 rdd与dataframe相互转换
1、rdd转换为dataframe 手动转换rdd.todf(“列名1”,“列名2”) 通过样例类反射转换userrdd.map{x-user(x._1,x._2)}.todf() 2、dataframe转换为rdd dataframe.rdd 3、在Input/目录下准备user.txt
qiaofeng,20
xuzhu,19
duanyu,18
4、代码实现
package com.atguigu.sparksqlimport org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}object SparkSQL02_RDDAndDataFrame {def main(args: Array[String]): Unit {//TODO 1 创建SparkConf配置文件,并设置App名称val conf new SparkConf().setAppName(SparkCoreTest).setMaster(local[*])//TODO 2 利用SparkConf创建sc对象val sc new SparkContext(conf)val lineRDD: RDD[String] sc.textFile(input\\user.txt)//普通rdd,数据只有类型,没有列名(缺少元数据)val rdd: RDD[(String, Long)] lineRDD.map {line {val fileds: Array[String] line.split(,)(fileds(0), fileds(1).toLong)}}//TODO 3 利用SparkConf创建sparksession对象val spark: SparkSession SparkSession.builder().config(conf).getOrCreate()//RDD和DF、DS转换必须要导的包(隐式转换),spark指的是上面的sparkSessionimport spark.implicits._//TODO RDDDF//普通rdd转换成DF,需要手动为每一列补上列名(补充元数据)val df: DataFrame rdd.toDF(name, age)df.show()//样例类RDD,数据是一个个的样例类,有类型,有属性名(列名),不缺元数据val userRDD: RDD[User] rdd.map {t {User(t._1, t._2)}}//样例类RDD转换DF,直接toDF转换即可,不需要补充元数据val userDF: DataFrame userRDD.toDF()userDF.show()//TODO DFRDD//DF转换成RDD,直接.rdd即可,但是要注意转换出来的rdd数据类型会变成Rowval rdd1: RDD[Row] df.rddval userRDD2: RDD[Row] userDF.rddrdd1.collect().foreach(println)userRDD2.collect().foreach(println)//如果想获取到row里面的数据,直接row.get(索引)即可val rdd2: RDD[(String, Long)] rdd1.map {row {(row.getString(0), row.getLong(1))}}rdd2.collect().foreach(println)//TODO 4 关闭资源sc.stop()}
}
case class User(name:String,age:Long)
2.4.3 rdd与dataset相互转换
1、rdd转换为dataset rdd.map{x-user(x._1,x._2)},tods() sparksql能够自动将包含有样例类的rdd转换成dataset样例类定义了table的结构样例类属性通过反射编程了表的列名。样例类可以包含诸如seq或者array等复杂的结构。 2、dataset转换为rdd ds.rdd 3、代码实现
package com.atguigu.sparksqlimport org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}object SparkSQL03_RDDAndDataSet {def main(args: Array[String]): Unit {//TODO 1 创建SparkConf配置文件,并设置App名称val conf new SparkConf().setAppName(SparkCoreTest).setMaster(local[*])//TODO 2 利用SparkConf创建sc对象val sc new SparkContext(conf)val lineRDD: RDD[String] sc.textFile(input\\user.txt)//普通rdd,数据只有类型,没有列名(缺少元数据)val rdd: RDD[(String, Long)] lineRDD.map {line {val fileds: Array[String] line.split(,)(fileds(0), fileds(1).toLong)}}//TODO 3 利用SparkConf创建sparksession对象val spark: SparkSession SparkSession.builder().config(conf).getOrCreate()//RDD和DF、DS转换必须要导的包(隐式转换),spark指的是上面的sparkSessionimport spark.implicits._//TODO RDDDS//普通rdd转DS,没办法补充元数据,因此一般不用val ds: Dataset[(String, Long)] rdd.toDS()ds.show()//样例类RDD,数据是一个个的样例类,有类型,有属性名(列名),不缺元数据val userRDD: RDD[User] rdd.map {t {User(t._1, t._2)}}//样例类RDD转换DS,直接toDS转换即可,不需要补充元数据,因此转DS一定要用样例类RDDval userDs: Dataset[User] userRDD.toDS()userDs.show()//TODO DSRDD//ds转成rdd,直接.rdd即可,并且ds不会改变rdd里面的数据类型val rdd1: RDD[(String, Long)] ds.rddval userRDD2: RDD[User] userDs.rdd//TODO 4 关闭资源sc.stop()}
}
2.4.4 dataframe与dataset相互转换
1、dataframe转为dataset df.as[user] 2、dataset转换为dataframe ds.todf 3、代码实现
package com.atguigu.sparksqlimport org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}object SparkSQL04_DataFrameAndDataSet {def main(args: Array[String]): Unit {// 1 创建上下文环境配置对象val conf: SparkConf new SparkConf().setMaster(local[*]).setAppName(SparkSQLTest)// 2 创建SparkSession对象val spark: SparkSession SparkSession.builder().config(conf).getOrCreate()// 3 读取数据val df: DataFrame spark.read.json(input/user.json)//4.1 RDD和DataFrame、DataSet转换必须要导的包import spark.implicits._// 4.2 DataFrame 转换为DataSetval userDataSet: Dataset[User] df.as[User]userDataSet.show()// 4.3 DataSet转换为DataFrameval userDataFrame: DataFrame userDataSet.toDF()userDataFrame.show()// 5 释放资源spark.stop()}
}case class User(name: String,age: Long)
2.5 用户自定义函数
2.5.1 udf
1、udf一行进入一行出 2、代码实现
package com.atguigu.sparksqlimport org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}object SparkSQL05_UDF{def main(args: Array[String]): Unit {// 1 创建上下文环境配置对象val conf: SparkConf new SparkConf().setMaster(local[*]).setAppName(SparkSQLTest)// 2 创建SparkSession对象val spark: SparkSession SparkSession.builder().config(conf).getOrCreate()// 3 读取数据val df: DataFrame spark.read.json(input/user.json)// 4 创建DataFrame临时视图df.createOrReplaceTempView(user)// 5 注册UDF函数。功能在数据前添加字符串“Name:”spark.udf.register(addName, (x:String) Name: x)// 6 调用自定义UDF函数spark.sql(select addName(name), age from user).show()// 7 释放资源spark.stop()}
}
2.5.2 udaf
1、udaf输入多行返回一行 2、spark3.x推荐使用extends aggregator自定义udaf属于强类型的dataset方式 3、spark2.x使用extends userdefinedaggregatefunction数以弱类型的dataframe 4、案例 需求实现求平均年龄自定义udafmyavg(age) 1自定义聚合函数实现-强类型
package com.atguigu.sparksqlimport org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{DataFrame, Encoder, Encoders, SparkSession, functions}object SparkSQL06_UDAF {def main(args: Array[String]): Unit {// 1 创建上下文环境配置对象val conf: SparkConf new SparkConf().setMaster(local[*]).setAppName(SparkSQLTest)// 2 创建SparkSession对象val spark: SparkSession SparkSession.builder().config(conf).getOrCreate()// 3 读取数据val df: DataFrame spark.read.json(input/user.json)// 4 创建DataFrame临时视图df.createOrReplaceTempView(user)// 5 注册UDAFspark.udf.register(myAvg, functions.udaf(new MyAvgUDAF()))// 6 调用自定义UDAF函数spark.sql(select myAvg(age) from user).show()// 7 释放资源spark.stop()}
}//输入数据类型
case class Buff(var sum: Long, var count: Long)/*** 1,20岁 2,19岁 3,18岁* IN:聚合函数的输入类型Long* Buff : sum (181920) count 111* OUT:聚合函数的输出类型Double (181920) / 3*/
class MyAvgUDAF extends Aggregator[Long, Buff, Double] {// 初始化缓冲区override def zero: Buff Buff(0L, 0L)// 将输入的年龄和缓冲区的数据进行聚合override def reduce(buff: Buff, age: Long): Buff {buff.sum buff.sum agebuff.count buff.count 1buff}// 多个缓冲区数据合并override def merge(buff1: Buff, buff2: Buff): Buff {buff1.sum buff1.sum buff2.sumbuff1.count buff1.count buff2.countbuff1}// 完成聚合操作获取最终结果override def finish(buff: Buff): Double {buff.sum.toDouble / buff.count}// SparkSQL对传递的对象的序列化操作编码// 自定义类型就是product 自带类型根据类型选择override def bufferEncoder: Encoder[Buff] Encoders.productoverride def outputEncoder: Encoder[Double] Encoders.scalaDouble
}
第 3 章sparksql数据的加载和保存
3.1 加载数据
1、加载数据通用方法 spark.read.load是加载数据的通用方式 2、代码实现
package com.atguigu.sparksqlimport org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._object SparkSQL08_Load{def main(args: Array[String]): Unit {// 1 创建上下文环境配置对象val conf: SparkConf new SparkConf().setMaster(local[*]).setAppName(SparkSQLTest)// 2 创建SparkSession对象val spark: SparkSession SparkSession.builder().config(conf).getOrCreate()// 3.1 spark.read直接读取数据csv format jdbc json load option// options orc parquet schema table text textFile// 注意加载数据的相关参数需写到上述方法中// 如textFile需传入加载数据的路径jdbc需传入JDBC相关参数。spark.read.json(input/user.json).show()// 3.2 format指定加载数据类型// spark.read.format(…)[.option(…)].load(…)// format(…)指定加载的数据类型包括csv、jdbc、json、orc、parquet和text// load(…)在csv、jdbc、json、orc、parquet和text格式下需要传入加载数据路径// option(…)在jdbc格式下需要传入JDBC相应参数url、user、password和dbtablespark.read.format(json).load (input/user.json).show// 4 释放资源spark.stop()}
}
3.2 保存数据
1、保存数据通用方法 df.write.save是保存数据的通用方法 2、代码实现
package com.atguigu.sparksqlimport org.apache.spark.SparkConf
import org.apache.spark.sql._object SparkSQL09_Save{def main(args: Array[String]): Unit {// 1 创建上下文环境配置对象val conf: SparkConf new SparkConf().setMaster(local[*]).setAppName(SparkSQLTest)// 2 创建SparkSession对象val spark: SparkSession SparkSession.builder().config(conf).getOrCreate()// 3 获取数据val df: DataFrame spark.read.json(input/user.json)// 4.1 df.write.保存数据csv jdbc json orc parquet text// 注意保存数据的相关参数需写到上述方法中。如text需传入加载数据的路径JDBC需传入JDBC相关参数。// 默认保存为parquet文件可以修改conf.set(spark.sql.sources.default,json)df.write.save(output)// 默认读取文件parquetspark.read.load(output).show()// 4.2 format指定保存数据类型// df.write.format(…)[.option(…)].save(…)// format(…)指定保存的数据类型包括csv、jdbc、json、orc、parquet和text。// save (…)在csv、orc、parquet和text(单列DF)格式下需要传入保存数据的路径。// option(…)在jdbc格式下需要传入JDBC相应参数url、user、password和dbtabledf.write.format(json).save(output2)// 4.3 可以指定为保存格式直接保存不需要再调用save了df.write.json(output1)// 4.4 如果文件已经存在则追加df.write.mode(append).json(output2)// 如果文件已经存在则忽略(文件存在不报错,也不执行;文件不存在,创建文件)df.write.mode(ignore).json(output2)// 如果文件已经存在则覆盖df.write.mode(overwrite).json(output2)// 默认default:如果文件已经存在则抛出异常// path file:/E:/ideaProject2/SparkSQLTest/output2 already exists.;df.write.mode(error).json(output2)// 5 释放资源spark.stop()}
}
3.3 与mysql交互
1、导入依赖
dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion5.1.27/version
/dependency
2、从mysql读数据
package com.atguigu.sparksqlimport org.apache.spark.SparkConf
import org.apache.spark.sql._object SparkSQL10_MySQL_Read{def main(args: Array[String]): Unit {// 1 创建上下文环境配置对象val conf: SparkConf new SparkConf().setMaster(local[*]).setAppName(SparkSQLTest)// 2 创建SparkSession对象val spark: SparkSession SparkSession.builder().config(conf).getOrCreate()// 3.1 通用的load方法读取mysql的表数据val df: DataFrame spark.read.format(jdbc).option(url, jdbc:mysql://hadoop102:3306/gmall).option(driver, com.mysql.jdbc.Driver).option(user, root).option(password, 000000).option(dbtable, user_info).load()// 3.2 创建视图df.createOrReplaceTempView(user)// 3.3 查询想要的数据spark.sql(select id, name from user).show()// 4 释放资源spark.stop()}
}
3、向mysql写数据
package com.atguigu.sparksqlimport org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._object SparkSQL11_MySQL_Write {def main(args: Array[String]): Unit {// 1 创建上下文环境配置对象val conf: SparkConf new SparkConf().setMaster(local[*]).setAppName(SparkSQLTest)// 2 创建SparkSession对象val spark: SparkSession SparkSession.builder().config(conf).getOrCreate()// 3 准备数据// 注意id是主键不能和MySQL数据库中的id重复val rdd: RDD[User] spark.sparkContext.makeRDD(List(User(3000, zhangsan), User(3001, lisi)))val ds: Dataset[User] rdd.toDS// 4 向MySQL中写入数据ds.write.format(jdbc).option(url, jdbc:mysql://hadoop102:3306/gmall).option(driver, com.mysql.jdbc.Driver).option(user, root).option(password, 000000).option(dbtable, user_info).mode(SaveMode.Append).save()// 5 释放资源spark.stop()}case class User(id: Int, name: String)
}
3.4 与hive交互
sparksql可以采用内嵌hive也可以采用外部hive。企业开发中通常采用外部hive。
3.4.1 内嵌hive应用
内嵌hive元数据存储在derby数据库 1、如果使用spark内嵌的hive则什么都不用做直接使用即可。
[atguiguhadoop102 spark-local]$ bin/spark-shellscala spark.sql(show tables).show
注意执行完后发现多了$spark_home/metastore_db和derby.log用于存储元数据。 2、创建一个表
scala spark.sql(create table user(id int, name string))注意执行完后发现多了$spark_home/spark-warehouse/user用于存储数据库数据。 3、查看数据库
scala spark.sql(show tables).show4、向表中插入数据
scala spark.sql(insert into user values(1,zs))5、查询数据
scala spark.sql(select * from user).show注意然而在实际使用中几乎没有任何人会使用内置的hive因为元数据存储在derby数据库不支持多客户端访问。
3.4.2 外部hive应用
如果spark要接管hive外部已经部署好的hive需要通过一下几个步骤。 1、为了说明内嵌hive和外部hive区别删除内嵌hive的metastore_db和spark-warehouse
[atguiguhadoop102 spark-local]$ rm -rf metastore_db/ spark-warehouse/2、确定原有hive是正常工作的
[atguiguhadoop102 hadoop-3.1.3]$ sbin/start-dfs.sh
[atguiguhadoop103 hadoop-3.1.3]$ sbin/start-yarn.sh[atguiguhadoop102 hive]$ bin/hive
3、需要把hive-site.xml拷贝到spark的conf/目录下
[atguiguhadoop102 conf]$ cp hive-site.xml /opt/module/spark-local/conf/4、如果以前hive-site.xml文件中配置过tez相关信息注释掉不是必须 5、把mysql的驱动copy到spark的jars/目录下
[atguiguhadoop102 software]$ cp mysql-connector-java-5.1.48.jar /opt/module/spark-local/jars/6、需要提前启动hive服务/opt/module/hive/bin/hiveservices.sh start不是必须 7、如果访问不到hdfs则需把core-site.xml和hdfs-site.xml拷贝到conf/目录不是必须 8、启动spark-shell
[atguiguhadoop102 spark-local]$ bin/spark-shell9、查询表
scala spark.sql(show tables).show10、创建一个表
scala spark.sql(create table student(id int, name string))11、向表中插入数据
scala spark.sql(insert into student values(1,zs))12、查询数据
scala spark.sql(select * from student).show3.4.3 运行spark sql cli
spark sql cli可以方便的在本地下运行hive元数据服务以及从命令行执行查询任务。在spark目录下执行如下命令启动spark sql cli直接执行sql语句类型hive窗口。
[atguiguhadoop102 spark-local]$ bin/spark-sqlspark-sql (default) show tables;
3.4.4 idea操作外部hive
1、添加依赖
dependenciesdependencygroupIdorg.apache.spark/groupIdartifactIdspark-sql_2.12/artifactIdversion3.0.0/version/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion5.1.27/version/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-hive_2.12/artifactIdversion3.0.0/version/dependency
/dependencies
2、拷贝hive-site.xml到resources目录如果需要操作hadoop需要拷贝hdfs-site.xml、core-site.xml、yarn-site.xml 3、代码实现
package com.atguigu.sparksqlimport org.apache.spark.SparkConf
import org.apache.spark.sql._object SparkSQL12_Hive {def main(args: Array[String]): Unit {System.setProperty(HADOOP_USER_NAME,atguigu)// 1 创建上下文环境配置对象val conf: SparkConf new SparkConf().setMaster(local[*]).setAppName(SparkSQLTest)// 2 创建SparkSession对象val spark: SparkSession SparkSession.builder().enableHiveSupport().config(conf).getOrCreate()// 3 连接外部Hive并进行操作spark.sql(show tables).show()spark.sql(create table user3(id int, name string))spark.sql(insert into user3 values(1,zs))spark.sql(select * from user3).show// 4 释放资源spark.stop()}
}