现货电子交易平台,冬镜seo,wordpress 搜索栏,中色十二冶金建设集团有限公司网站1. Spark SQL概述 1.1 什么是Spark SQL Spark SQL是用于结构化数据处理的Spark模块。与基本的Spark RDD API不同#xff0c;Spark SQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。在内部#xff0c;Spark SQL使用这些额外的信息来执行额外的优化。与Spa… 1. Spark SQL概述 1.1 什么是Spark SQL Spark SQL是用于结构化数据处理的Spark模块。与基本的Spark RDD API不同Spark SQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。在内部Spark SQL使用这些额外的信息来执行额外的优化。与Spark SQL交互的方式有多种包括SQL和Dataset API。计算结果时使用相同的执行引擎与您用于表达计算的API/语言无关。 1.2 为什么要有Spark SQL 1.3 SparkSQL的发展 1发展历史 RDDSpark1.0》DataframeSpark1.3》DatasetSpark1.6 如果同样的数据都给到这三个数据结构他们分别计算之后都会给出相同的结果。不同的是他们的执行效率和执行方式。在现在的版本中dataSet性能最好已经成为了唯一使用的接口。其中Dataframe已经在底层被看做是特殊泛型的DataSetRow。 2三者的共性 1RDD、DataFrame、DataSet全都是Spark平台下的分布式弹性数据集为处理超大型数据提供便利。 2三者都有惰性机制在进行创建、转换如map方法时不会立即执行只有在遇到Action行动算子如foreach时三者才会开始遍历运算。 3三者有许多共同的函数如filter排序等。 4三者都会根据Spark的内存情况自动缓存运算。 5三者都有分区的概念。 1.4 Spark SQL的特点 1易整合 无缝的整合了SQL查询和Spark编程。 2统一的数据访问方式 使用相同的方式连接不同的数据源。 3兼容Hive 在已有的仓库上直接运行SQL或者HQL。 4标准的数据连接 通过JDBC或者ODBC来连接。 1.5 SparkSession新的起始点 在老的版本中SparkSQL提供两种SQL查询起始点 一个叫SQLContext用于Spark自己提供的SQL查询一个叫HiveContext用于连接Hive的查询。 SparkSession是Spark最新的SQL查询起始点实质上是SQLContext和HiveContext的组合所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。 SparkSession内部封装了SparkContext所以计算实际上是由SparkContext完成的。当我们使用spark-shell的时候Spark框架会自动的创建一个名称叫做Spark的SparkSession就像我们以前可以自动获取到一个sc来表示SparkContext。 [atguiguhadoop102 spark-local]$ bin/spark-shell 20/09/12 11:16:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Sparks default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to WARN. To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://hadoop102:4040 Spark context available as sc (master local[*], app id local-1599880621394). Spark session available as spark. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ / __/ _/ /___/ .__/\_,_/_/ /_/\_\ version 3.3.1 /_/ Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_212) Type in expressions to have them evaluated. Type :help for more information. 2 常用方式 2.1 方法调用 1创建一个maven工程SparkSQL 2创建包名为com.atguigu.sparksql 3输入文件夹准备在新建的SparkSQL项目名称上右键》新建input文件夹》在input文件夹上右键》新建user.json。并输入如下内容 {age:20,name:qiaofeng}{age:19,name:xuzhu}{age:18,name:duanyu} {age:22,name:qiaofeng} {age:11,name:xuzhu} {age:12,name:duanyu} 5在pom.xml文件中添加spark-sql的依赖 dependencies dependency groupIdorg.apache.spark/groupId artifactIdspark-sql_2.12/artifactId version3.3.1/version /dependency dependency groupIdorg.projectlombok/groupId artifactIdlombok/artifactId version1.18.22/version /dependency /dependencies 6代码实现 添加javaBean的User package com.atguigu.sparksql.Bean; import lombok.Data; import java.io.Serializable; Data public class User implements Serializable { public Long age; public String name; public User() { } public User(Long age, String name) { this.age age; this.name name; } } 代码编写 package com.atguigu.sparksql; import com.atguigu.sparksql.Bean.User; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.ReduceFunction; import org.apache.spark.sql.*; import scala.Tuple2; public class Test01_Method { public static void main(String[] args) { //1. 创建配置对象 SparkConf conf new SparkConf().setAppName(sparksql).setMaster(local[*]); //2. 获取sparkSession SparkSession spark SparkSession.builder().config(conf).getOrCreate(); //3. 编写代码 // 按照行读取 DatasetRow lineDS spark.read().json(input/user.json); // 转换为类和对象 DatasetUser userDS lineDS.as(Encoders.bean(User.class)); // userDS.show(); // 使用方法操作 // 函数式的方法 DatasetUser userDataset lineDS.map(new MapFunctionRow, User() { Override public User call(Row value) throws Exception { return new User(value.getLong(0), value.getString(1)); } }, // 使用kryo在底层会有部分算子无法使用 Encoders.bean(User.class)); // 常规方法 DatasetUser sortDS userDataset.sort(new Column(age)); sortDS.show(); // 区分 RelationalGroupedDataset groupByDS userDataset.groupBy(name); // 后续方法不同 DatasetRow count groupByDS.count(); // 推荐使用函数式的方法 使用更灵活 KeyValueGroupedDatasetString, User groupedDataset userDataset.groupByKey(new MapFunctionUser, String() { Override public String call(User value) throws Exception { return value.name; } }, Encoders.STRING()); // 聚合算子都是从groupByKey开始 // 推荐使用reduceGroup DatasetTuple2String, User result groupedDataset.reduceGroups(new ReduceFunctionUser() { Override public User call(User v1, User v2) throws Exception { // 取用户的大年龄 return new User(Math.max(v1.age, v2.age), v1.name); } }); result.show(); //4. 关闭sparkSession spark.close(); } } 在sparkSql中DS直接支持的转换算子有map底层已经优化为mapPartition、mapPartition、flatMap、groupByKey聚合算子全部由groupByKey开始、filter、distinct、coalesce、repartition、sort和orderBy不是函数式的算子不过不影响使用。 2.2 SQL使用方式 package com.atguigu.sparksql; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public class Test02_SQL { public static void main(String[] args) { //1. 创建配置对象 SparkConf conf new SparkConf().setAppName(sparksql).setMaster(local[*]); //2. 获取sparkSession SparkSession spark SparkSession.builder().config(conf).getOrCreate(); //3. 编写代码 DatasetRow lineDS spark.read().json(input/user.json); // 创建视图 转换为表格 填写表名 // 临时视图的生命周期和当前的sparkSession绑定 // orReplace表示覆盖之前相同名称的视图 lineDS.createOrReplaceTempView(t1); // 支持所有的hive sql语法,并且会使用spark的又花钱 DatasetRow result spark.sql(select * from t1 where age 18); result.show(); //4. 关闭sparkSession spark.close(); } }} 2.3 DSL特殊语法扩展 package com.atguigu.sparksql; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import static org.apache.spark.sql.functions.col; public class Test03_DSL { public static void main(String[] args) { //1. 创建配置对象 SparkConf conf new SparkConf().setAppName(sparksql).setMaster(local[*]); //2. 获取sparkSession SparkSession spark SparkSession.builder().config(conf).getOrCreate(); //3. 编写代码 // 导入特殊的依赖 import static org.apache.spark.sql.functions.col; DatasetRow lineRDD spark.read().json(input/user.json); DatasetRow result lineRDD.select(col(name).as(newName),col(age).plus(1).as(newAge)) .filter(col(age).gt(18)); result.show(); //4. 关闭sparkSession spark.close(); } } 3 SQL语法的用户自定义函数 3.1 UDF 用户自定义函数 1UDF一行进入一行出 2代码实现 package com.atguigu.sparksql; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.api.java.UDF1; import org.apache.spark.sql.expressions.UserDefinedFunction; import org.apache.spark.sql.types.DataTypes; import static org.apache.spark.sql.functions.udf; public class Test04_UDF { public static void main(String[] args) { //1. 创建配置对象 SparkConf conf new SparkConf().setAppName(sparksql).setMaster(local[*]); //2. 获取sparkSession SparkSession spark SparkSession.builder().config(conf).getOrCreate(); //3. 编写代码 DatasetRow lineRDD spark.read().json(input/user.json); lineRDD.createOrReplaceTempView(user); // 定义一个函数 // 需要首先导入依赖import static org.apache.spark.sql.functions.udf; UserDefinedFunction addName udf(new UDF1String, String() { Override public String call(String s) throws Exception { return s 大侠; } }, DataTypes.StringType); spark.udf().register(addName,addName); spark.sql(select addName(name) newName from user) .show(); // lambda表达式写法 spark.udf().register(addName1,(UDF1String,String) name - name 大侠,DataTypes.StringType); //4. 关闭sparkSession spark.close(); } } 3.2 UDAF 用户自定义聚合函数 1UDAF输入多行返回一行。通常和groupBy一起使用如果直接使用UDAF函数默认将所有的数据合并在一起。 2Spark3.x推荐使用extends Aggregator自定义UDAF属于强类型的Dataset方式。 3Spark2.x使用extends UserDefinedAggregateFunction属于弱类型的DataFrame 4案例实操 需求实现求平均年龄自定义UDAFMyAvg(age) 1自定义聚合函数实现-强类型 package com.atguigu.sparksql; import org.apache.spark.SparkConf; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.expressions.Aggregator; import java.io.Serializable; import static org.apache.spark.sql.functions.udaf; public class Test05_UDAF { public static void main(String[] args) { //1. 创建配置对象 SparkConf conf new SparkConf().setAppName(sparksql).setMaster(local[*]); //2. 获取sparkSession SparkSession spark SparkSession.builder().config(conf).getOrCreate(); //3. 编写代码 spark.read().json(input/user.json).createOrReplaceTempView(user); // 注册需要导入依赖 import static org.apache.spark.sql.functions.udaf; spark.udf().register(avgAge,udaf(new MyAvg(),Encoders.LONG())); spark.sql(select avgAge(age) newAge from user).show(); //4. 关闭sparkSession spark.close(); } public static class Buffer implements Serializable { private Long sum; private Long count; public Buffer() { } public Buffer(Long sum, Long count) { this.sum sum; this.count count; } public Long getSum() { return sum; } public void setSum(Long sum) { this.sum sum; } public Long getCount() { return count; } public void setCount(Long count) { this.count count; } } public static class MyAvg extends AggregatorLong,Buffer,Double{ Override public Buffer zero() { return new Buffer(0L,0L); } Override public Buffer reduce(Buffer b, Long a) { b.setSum(b.getSum() a); b.setCount(b.getCount() 1); return b; } Override public Buffer merge(Buffer b1, Buffer b2) { b1.setSum(b1.getSum() b2.getSum()); b1.setCount(b1.getCount() b2.getCount()); return b1; } Override public Double finish(Buffer reduction) { return reduction.getSum().doubleValue() / reduction.getCount(); } Override public EncoderBuffer bufferEncoder() { // 可以用kryo进行优化 return Encoders.kryo(Buffer.class); } Override public EncoderDouble outputEncoder() { return Encoders.DOUBLE(); } } } 3.3 UDTF没有 输入一行返回多行Hive。 SparkSQL中没有UDTF需要使用算子类型的flatMap先完成拆分。