优秀的定制网站建设制作商,深圳制作小程序,html 音乐网站,松原网站推广一.Spark SQL基本介绍
1.什么是Spark SQL
Spark SQL是Spark多种组件中其中一个,主要是用于处理大规模的[结构化数据]
Spark SQL的特点:
1).融合性:既可以使用SQL语句,也可以编写代码,同时支持两者混合使用.
2).统一的数据访问:Spark SQL用统一的API对接不同的数据源
3).H…一.Spark SQL基本介绍
1.什么是Spark SQL
Spark SQL是Spark多种组件中其中一个,主要是用于处理大规模的[结构化数据]
Spark SQL的特点:
1).融合性:既可以使用SQL语句,也可以编写代码,同时支持两者混合使用.
2).统一的数据访问:Spark SQL用统一的API对接不同的数据源
3).Hive的兼容性:Spark SQL可以和Hive进行整合,合并后将执行引擎换成Spark,核心是基于hive的metastore来处理.
4).标准化连接:Spark SQL支持JDBC/ODBC连接
2.Spark SQL和Hive的异同点
相同点:
①都是分布式SQL计算引擎
②都可以处理大规模结构化数据
③都可以建立在Yarn集群上运行
不同点:
①Spark SQL的底层是RDD,Hive SQL的底层是MapReduce
②Spark SQL既可以编写SQL语句,又可以编写代码,而Hive SQL只可以编写SQL语句
③Spark SQL没有元数据管理服务,而Hive SQL有metastore管理元数据服务
④Spark SQL是基于内存运行的,Hive SQL是基于磁盘运行的
3.Spark SQL的数据结构对比 说明: pandas的DataFrame:二维表 处理单机结构数据 Spark Core:处理任何的数据结构,处理大规模的分布式数据 Spark SQL:二维表,处理大规模的分布式结构数据 RDD:存储直接就是对象,比如在图中,存储就是一个Person的对象,但是里面是什么数据内容,不太清楚. DataFrame:将Person中各个字段数据,进行结构化存储,形成一个DataFrame,可以直接看到数据 Dataset:将Person对象中数据都按照结构化的方式存储好,同时保留对象的类型,从而知道来源于一个Person对象 由于Python不支持泛型,所以无法使用Dataset类型,客户端仅支持DataFrame类型 二.DataFrame详解
1.DataFrame基本介绍 DataFrame表示的是一个二维的表,二维表,必然存在行,列等表结构描述信息. 表结构描述信息(元数据Schema) :StructType对象 字段:StructField对象,可以描述字段名称,字段数据类型,是否可以为空 行:Row对象 列:Column对象,包含字段名称和字段值 在一个StructType对象下,由多个StructField组成,构建成一个完整的元数据信息 2.DataFrame的构建方式
2.1 通过RDD得到一个DataFrame
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession# 绑定指定的Python解释器
from pyspark.sql.types import StructType, IntegerType, StringType, StructFieldos.environ[SPARK_HOME] /export/server/spark
os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3if __name__ __main__:# 1- 创建SparkSession对象spark SparkSession.builder\.appName(rdd_2_dataframe)\.master(local[*])\.getOrCreate()# 通过SparkSession得到SparkContextsc spark.sparkContext# 2- 数据输入# 2.1- 创建一个RDDinit_rdd sc.parallelize([1,李白,20,2,安其拉,18])# 2.2- 将RDD的数据结构转换成二维结构new_rdd init_rdd.map(lambda line: (int(line.split(,)[0]),line.split(,)[1],int(line.split(,)[2])))# 将RDD转成DataFrame方式一# schema方式一schema StructType()\.add(id,IntegerType(),False)\.add(name,StringType(),False)\.add(age,IntegerType(),False)# schema方式二schema StructType([StructField(id,IntegerType(),False),StructField(name,StringType(),False),StructField(age,IntegerType(),False)])# schema方式三schema id:int,name:string,age:int# schema方式四schema [id,name,age]init_df spark.createDataFrame(datanew_rdd,schemaschema)# 将RDD转成DataFrame方式二toDF中的schema既可以传List也可以传字符串形式的schema信息# init_df new_rdd.toDF(schema[id,name,age])init_df new_rdd.toDF(schemaid:int,name:string,age:int)# 3- 数据处理# 4- 数据输出init_df.show()init_df.printSchema()# 5- 释放资源sc.stop()spark.stop()
场景:RDD可以存储任意结构的数据而DataFrame只能处理二维表数据。在使用Spark处理数据的初期可能输入进来的数据是半结构化或者是非结构化的数据那么我可以先通过RDD对数据进行ETL处理成结构化数据再使用开发效率高的SparkSQL来对后续数据进行处理分析。
2.2 内部初始化数据得到DataFrame
from pyspark import SparkConf, SparkContext
import os# 绑定指定的Python解释器
from pyspark.sql import SparkSessionos.environ[SPARK_HOME] /export/server/spark
os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3if __name__ __main__:print(内部初始化数据得到DataFrame。类似SparkCore中的parallelize)# 1- 创建SparkSession顶级对象spark SparkSession.builder\.appName(inner_create_dataframe)\.master(local[*])\.getOrCreate()# 2- 数据输入通过createDataFrame创建DataFrameschema数据类型可以是DataType、字符串、List字符串格式要求格式一 字段1 字段类型,字段2 字段类型格式二推荐 字段1:字段类型,字段2:字段类型List格式要求[字段1,字段2]# 内部初始化数据得到DataFrameinit_df spark.createDataFrame(data[(1,张三,18),(2,李四,30)],schemaid:int,name:string,age:int)# init_df spark.createDataFrame(# data[(1, 张三, 18), (2, 李四, 30)],# schemaid int,name string,age int# )# init_df spark.createDataFrame(# data[(1, 张三, 18), (2, 李四, 30)],# schema[id,name,age]# )# init_df spark.createDataFrame(# data[(1, 张三, 18), (2, 李四, 30)],# schema[id:int, name:string, age:int]# )# 3- 数据处理# 4- 数据输出# 输出dataframe的数据内容init_df.show()# 输出dataframe的schema信息init_df.printSchema()# 5- 释放资源spark.stop()
场景:一般用在开发和测试中,因为只能处理少量的数据
Schema总结 通过createDataFrame创建DataFrame,schema数据类型可以是:DataType,字符串,List 1:字符串 格式一 字段1 字段类型,字段2 字段类型 格式二 字段1:字段类型,字段2:字段类型 2:List [字段1,字段2] 3:DataType 格式一 schema StructType().add(id,IntegerType(),False) .add(id,IntegerType(),False).add(id,IntegerType(),False) 格式二 schema StructType([StructField(id,IntegerType,False), StructField(id,IntegerType,False), StructField(id,IntegerType,False)]) 2.3 读取外部文件
复杂API 统一API格式: sparksession.read .format(text|csv|json|parquet|orc|avro|jdbc|...) .option(k,v) .schema(StructType | String) .load(加载数据路径) #读取外部文件的路径,支持HDFS也支持本地 简写API 请注意: 以上所有的外部读取方式都有简单的写法。spark内置了一些常用的读取方案的简写 格式:spark.read.读取方式() 例如: df spark.read.csv( path file:///export/data/_03_spark_sql/data/stu.txt,headerTrue,sep ,inferSchemaTrue,encodingutf-8) 2.3.1 Text方式读取 from pyspark import SparkConf, SparkContext import os from pyspark.sql import SparkSession # 绑定指定的Python解释器 os.environ[SPARK_HOME] /export/server/spark os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3 os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3 if __name__ __main__: print(text方式读取文件) # 1- 创建SparkSession对象 spark SparkSession.builder\ .appName(text_demo)\ .master(local[*])\ .getOrCreate() # 2- 数据输入 load支持读取HDFS文件系统和本地文件系统 HDFS文件系统hdfs://node1:8020/文件路径 本地文件系统file:///文件路径 text方式读取文件总结 1- 不管文件中内容是什么样的text会将所有内容全部放到一个列中处理 2- 默认生成的列名叫value数据类型string 3- 我们只能够在schema中修改字段value的名称其他任何内容不能修改 init_df spark.read\ .format(text)\ .schema(my_field string)\ .load(file:///export/data/gz16_pyspark/02_spark_sql/data/stu.txt) # 3- 数据处理 # 4- 数据输出 init_df.show() init_df.printSchema() # 5- 释放资源 spark.stop() text方式读取文件总结: 1-不管文件中内容是什么样的,text会将所有内容全部放到一个列中处理 2-默认生成的列名叫value,数据类型string 3-我们只能够在schema中修改字段value的名称,其他任何内容不能修改 2.3.2 CSV方式读取
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession# 绑定指定的Python解释器
os.environ[SPARK_HOME] /export/server/spark
os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3if __name__ __main__:print(csv方式读取文件)# 1- 创建SparkSession对象spark SparkSession.builder\.appName(csv_demo)\.master(local[*])\.getOrCreate()# 2- 数据输入csv格式读取外部文件总结1- 复杂API和简写API都必须掌握2- 相关参数作用说明2.1- path指定读取的文件路径。支持HDFS和本地文件路径2.2- schema手动指定元数据信息2.3- sep指定字段间的分隔符2.4- encoding指定文件的编码方式2.5- header指定文件中的第一行是否是字段名称2.6- inferSchema根据数据内容自动推断数据类型。但是推断结果可能不精确# 复杂API写法init_df spark.read\.format(csv)\.schema(id int,name string,address string,sex string,age int)\.option(sep, )\.option(encoding,UTF-8)\.option(header,True)\.load(file:///export/data/gz16_pyspark/02_spark_sql/data/stu.txt)# 简写API写法# init_df spark.read.csv(# pathfile:///export/data/gz16_pyspark/02_spark_sql/data/stu.txt,# schemaid int,name string,address string,sex string,age int,# sep ,# encodingUTF-8,# headerTrue# )# init_df spark.read.csv(# pathfile:///export/data/gz16_pyspark/02_spark_sql/data/stu.txt,# sep ,# encodingUTF-8,# headerTrue,# inferSchemaTrue# )# 3- 数据处理# 4- 数据输出init_df.show()init_df.printSchema()# 5- 释放资源spark.stop() csv格式读取外部文件总结: 1-相关参数说明: 1.1 path:文件路径,HDFS和本地 1.2 schema:手动指定元数据信息 1.3 sep:指定字段间的分隔符 1.4 encoding:指定文件的编码方式 1.5 header:指定文件中的第一行是否是字段名称 1.6 inferSchema:根据数据内容自动推断数据类型,但是推断结果可能不精确 2.3.3 JSON方式读取
json的数据内容 {id: 1,name: 张三,age: 20} {id: 2,name: 李四,age: 23,address: 北京} {id: 3,name: 王五,age: 25} {id: 4,name: 赵六,age: 29} 代码实现:
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession# 绑定指定的Python解释器
os.environ[SPARK_HOME] /export/server/spark
os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3if __name__ __main__:# 1- 创建SparkSession对象spark SparkSession.builder\.appName(json_demo)\.master(local[*])\.getOrCreate()# 2- 数据输入json读取数据总结1- 需要手动指定schema信息。如果手动指定的时候字段名称与json中的key名称不一致会解析不成功以null值填充2- csv/json中schema的结构如果是字符串类型那么字段名称和字段数据类型间只能以空格分隔# init_df spark.read.json(# pathfile:///export/data/gz16_pyspark/02_spark_sql/data/data.txt,# schemaid2 int,name string,age int,address string,# encodingUTF-8# )# init_df spark.read.json(# pathfile:///export/data/gz16_pyspark/02_spark_sql/data/data.txt,# schemaid:int,name:string,age:int,address:string,# encodingUTF-8# )init_df spark.read.json(pathfile:///export/data/gz16_pyspark/02_spark_sql/data/data.txt,schemaid int,name string,age int,address string,encodingUTF-8)# 3- 数据输出init_df.show()init_df.printSchema()# 4- 释放资源spark.stop() json读取数据总结: 1-需要手动指定schema信息,如果手动指定的时候,字段名称与json中的key名称不一致,会解析不成功,以null值填充 2-csv/json中schema的结构,如果是字符串类型,那么字段名称和字段数据类型间,只能以空格分隔 3.DataFrame的相关API
操作DataFrame一般有两种操作方案:一种为DSL方式,另一种为SQL方式 SQL方式: 通过编写SQL语句完成统计分析操作 DSL方式: 特定领域语言使用DataFrame特有的API完成计算操作也就是代码形式 从使用角度来说: SQL可能更加的方便一些当适应了DSL写法后你会发现DSL要比SQL更好用 从Spark角度来说: 更推荐使用DSL方案此种方案更加利于Spark底层的优化处理 3.1 SQL相关的API
创建一个视图/表 df.createTempView(视图名称): 创建一个临时的视图(表名) df.createOrReplaceTempView(视图名称): 创建一个临时的视图(表名)如果视图存在直接替换 临时视图仅能在当前这个Spark Session的会话中使用 df.createGlobalTempView(视图名称): 创建一个全局视图运行在一个Spark应用中多个spark会话中都可以使用。在使用的时候必须通过 global_temp.视图名称 方式才可以加载到。较少使用 执行SQL语句
spark.sql(书写SQL)
3.2 DSL相关的API
show():用于展示DF中数据,默认仅展示前20行
参数1:设置默认展示多少行,默认为20
参数2:是否为阶段列,默认仅展示前20个字符数据,如果过长,不展示
printSchema():用于打印当前这个DF的表结构信息
select():类似于SQL中select, SQL中select后面可以写什么, 这样同样也一样 filter()和 where()用于对数据进行过滤操作, 一般在spark SQL中主要使用where groupBy()用于执行分组操作 orderBy()用于执行排序操作
DSL主要支持以下几种传递的方式: str | Column对象 | 列表 str格式: 字段 Column对象: DataFrame含有的字段 df[字段] 执行过程新产生: F.col(字段) 列表: [字段1,字段2...] [df[字段1],df[字段2]] 为了能够支持在编写Spark SQL的DSL时候在DSL中使用SQL函数专门提供一个SQL的函数库。直接加载使用即可 导入这个函数库: import pyspark.sql.functions as F 通过F调用对应的函数即可。SparkSQL中所支持的函数都可以通过以下地址查询到: https://spark.apache.org/docs/3.1.2/api/sql/index.html