当前位置: 首页 > news >正文

现货电子交易平台冬镜seo

现货电子交易平台,冬镜seo,wordpress 搜索栏,中色十二冶金建设集团有限公司网站1. Spark SQL概述 1.1 什么是Spark SQL Spark SQL是用于结构化数据处理的Spark模块。与基本的Spark RDD API不同#xff0c;Spark SQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。在内部#xff0c;Spark SQL使用这些额外的信息来执行额外的优化。与Spa… 1. Spark SQL概述 1.1 什么是Spark SQL Spark SQL是用于结构化数据处理的Spark模块。与基本的Spark RDD API不同Spark SQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。在内部Spark SQL使用这些额外的信息来执行额外的优化。与Spark SQL交互的方式有多种包括SQL和Dataset API。计算结果时使用相同的执行引擎与您用于表达计算的API/语言无关。 1.2 为什么要有Spark SQL 1.3 SparkSQL的发展 1发展历史 RDDSpark1.0》DataframeSpark1.3》DatasetSpark1.6 如果同样的数据都给到这三个数据结构他们分别计算之后都会给出相同的结果。不同的是他们的执行效率和执行方式。在现在的版本中dataSet性能最好已经成为了唯一使用的接口。其中Dataframe已经在底层被看做是特殊泛型的DataSetRow。 2三者的共性 1RDD、DataFrame、DataSet全都是Spark平台下的分布式弹性数据集为处理超大型数据提供便利。 2三者都有惰性机制在进行创建、转换如map方法时不会立即执行只有在遇到Action行动算子如foreach时三者才会开始遍历运算。 3三者有许多共同的函数如filter排序等。 4三者都会根据Spark的内存情况自动缓存运算。 5三者都有分区的概念。 1.4 Spark SQL的特点 1易整合 无缝的整合了SQL查询和Spark编程。 2统一的数据访问方式 使用相同的方式连接不同的数据源。 3兼容Hive 在已有的仓库上直接运行SQL或者HQL。 4标准的数据连接 通过JDBC或者ODBC来连接。 1.5 SparkSession新的起始点 在老的版本中SparkSQL提供两种SQL查询起始点 一个叫SQLContext用于Spark自己提供的SQL查询一个叫HiveContext用于连接Hive的查询。 SparkSession是Spark最新的SQL查询起始点实质上是SQLContext和HiveContext的组合所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。 SparkSession内部封装了SparkContext所以计算实际上是由SparkContext完成的。当我们使用spark-shell的时候Spark框架会自动的创建一个名称叫做Spark的SparkSession就像我们以前可以自动获取到一个sc来表示SparkContext。 [atguiguhadoop102 spark-local]$ bin/spark-shell 20/09/12 11:16:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Sparks default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to WARN. To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://hadoop102:4040 Spark context available as sc (master  local[*], app id  local-1599880621394). Spark session available as spark. Welcome to       ____              __      / __/__  ___ _____/ /__     _\ \/ _ \/ _ / __/  _/    /___/ .__/\_,_/_/ /_/\_\   version 3.3.1       /_/ Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_212) Type in expressions to have them evaluated. Type :help for more information. 2 常用方式 2.1 方法调用 1创建一个maven工程SparkSQL 2创建包名为com.atguigu.sparksql 3输入文件夹准备在新建的SparkSQL项目名称上右键》新建input文件夹》在input文件夹上右键》新建user.json。并输入如下内容 {age:20,name:qiaofeng}{age:19,name:xuzhu}{age:18,name:duanyu} {age:22,name:qiaofeng} {age:11,name:xuzhu} {age:12,name:duanyu} 5在pom.xml文件中添加spark-sql的依赖 dependencies     dependency        groupIdorg.apache.spark/groupId        artifactIdspark-sql_2.12/artifactId        version3.3.1/version     /dependency     dependency        groupIdorg.projectlombok/groupId        artifactIdlombok/artifactId        version1.18.22/version     /dependency /dependencies 6代码实现 添加javaBean的User package com.atguigu.sparksql.Bean; import lombok.Data; import java.io.Serializable; Data public class User implements Serializable {     public Long age;     public String name;     public User() {     }     public User(Long age, String name) {         this.age  age;         this.name  name;     } } 代码编写 package com.atguigu.sparksql; import com.atguigu.sparksql.Bean.User; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.ReduceFunction; import org.apache.spark.sql.*; import scala.Tuple2; public class Test01_Method {     public static void main(String[] args) {         //1. 创建配置对象         SparkConf conf  new SparkConf().setAppName(sparksql).setMaster(local[*]);         //2. 获取sparkSession         SparkSession spark  SparkSession.builder().config(conf).getOrCreate();         //3. 编写代码         // 按照行读取         DatasetRow lineDS  spark.read().json(input/user.json);         // 转换为类和对象         DatasetUser userDS  lineDS.as(Encoders.bean(User.class)); //        userDS.show();         // 使用方法操作         // 函数式的方法         DatasetUser userDataset  lineDS.map(new MapFunctionRow, User() {             Override             public User call(Row value) throws Exception {                 return new User(value.getLong(0), value.getString(1));             }         },                 // 使用kryo在底层会有部分算子无法使用                 Encoders.bean(User.class));         // 常规方法         DatasetUser sortDS  userDataset.sort(new Column(age));         sortDS.show();         // 区分         RelationalGroupedDataset groupByDS  userDataset.groupBy(name);         // 后续方法不同         DatasetRow count  groupByDS.count();         // 推荐使用函数式的方法  使用更灵活         KeyValueGroupedDatasetString, User groupedDataset  userDataset.groupByKey(new MapFunctionUser, String() {             Override             public String call(User value) throws Exception {                 return value.name;             }         }, Encoders.STRING());         // 聚合算子都是从groupByKey开始         // 推荐使用reduceGroup         DatasetTuple2String, User result  groupedDataset.reduceGroups(new ReduceFunctionUser() {             Override             public User call(User v1, User v2) throws Exception {                 // 取用户的大年龄                 return new User(Math.max(v1.age, v2.age), v1.name);             }         });         result.show();         //4. 关闭sparkSession         spark.close();     } } 在sparkSql中DS直接支持的转换算子有map底层已经优化为mapPartition、mapPartition、flatMap、groupByKey聚合算子全部由groupByKey开始、filter、distinct、coalesce、repartition、sort和orderBy不是函数式的算子不过不影响使用。 2.2 SQL使用方式 package com.atguigu.sparksql; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public class Test02_SQL {     public static void main(String[] args) {         //1. 创建配置对象         SparkConf conf  new SparkConf().setAppName(sparksql).setMaster(local[*]);         //2. 获取sparkSession         SparkSession spark  SparkSession.builder().config(conf).getOrCreate();         //3. 编写代码         DatasetRow lineDS  spark.read().json(input/user.json);         // 创建视图  转换为表格 填写表名         // 临时视图的生命周期和当前的sparkSession绑定         // orReplace表示覆盖之前相同名称的视图         lineDS.createOrReplaceTempView(t1);         // 支持所有的hive sql语法,并且会使用spark的又花钱         DatasetRow result  spark.sql(select * from t1 where age  18);         result.show();         //4. 关闭sparkSession         spark.close();     } }} 2.3 DSL特殊语法扩展 package com.atguigu.sparksql; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import static org.apache.spark.sql.functions.col; public class Test03_DSL {     public static void main(String[] args) {         //1. 创建配置对象         SparkConf conf  new SparkConf().setAppName(sparksql).setMaster(local[*]);         //2. 获取sparkSession         SparkSession spark  SparkSession.builder().config(conf).getOrCreate();         //3. 编写代码         // 导入特殊的依赖 import static org.apache.spark.sql.functions.col;         DatasetRow lineRDD  spark.read().json(input/user.json);         DatasetRow result  lineRDD.select(col(name).as(newName),col(age).plus(1).as(newAge))                 .filter(col(age).gt(18));         result.show();         //4. 关闭sparkSession         spark.close();     } } 3 SQL语法的用户自定义函数 3.1 UDF 用户自定义函数 1UDF一行进入一行出 2代码实现 package com.atguigu.sparksql; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.api.java.UDF1; import org.apache.spark.sql.expressions.UserDefinedFunction; import org.apache.spark.sql.types.DataTypes; import static org.apache.spark.sql.functions.udf; public class Test04_UDF {     public static void main(String[] args) {         //1. 创建配置对象         SparkConf conf  new SparkConf().setAppName(sparksql).setMaster(local[*]);         //2. 获取sparkSession         SparkSession spark  SparkSession.builder().config(conf).getOrCreate();         //3. 编写代码         DatasetRow lineRDD  spark.read().json(input/user.json);         lineRDD.createOrReplaceTempView(user);         // 定义一个函数         // 需要首先导入依赖import static org.apache.spark.sql.functions.udf;         UserDefinedFunction addName  udf(new UDF1String, String() {             Override             public String call(String s) throws Exception {                 return s   大侠;             }         }, DataTypes.StringType);         spark.udf().register(addName,addName);         spark.sql(select addName(name) newName from user)                 .show();         // lambda表达式写法         spark.udf().register(addName1,(UDF1String,String) name - name   大侠,DataTypes.StringType);         //4. 关闭sparkSession         spark.close();     } } 3.2 UDAF 用户自定义聚合函数 1UDAF输入多行返回一行。通常和groupBy一起使用如果直接使用UDAF函数默认将所有的数据合并在一起。 2Spark3.x推荐使用extends Aggregator自定义UDAF属于强类型的Dataset方式。 3Spark2.x使用extends UserDefinedAggregateFunction属于弱类型的DataFrame 4案例实操 需求实现求平均年龄自定义UDAFMyAvg(age) 1自定义聚合函数实现-强类型 package com.atguigu.sparksql; import org.apache.spark.SparkConf; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.expressions.Aggregator; import java.io.Serializable; import static org.apache.spark.sql.functions.udaf; public class Test05_UDAF {     public static void main(String[] args) {         //1. 创建配置对象         SparkConf conf  new SparkConf().setAppName(sparksql).setMaster(local[*]);         //2. 获取sparkSession         SparkSession spark  SparkSession.builder().config(conf).getOrCreate();         //3. 编写代码         spark.read().json(input/user.json).createOrReplaceTempView(user);         // 注册需要导入依赖 import static org.apache.spark.sql.functions.udaf;         spark.udf().register(avgAge,udaf(new MyAvg(),Encoders.LONG()));         spark.sql(select avgAge(age) newAge from user).show();         //4. 关闭sparkSession         spark.close();     }     public static class Buffer implements Serializable {         private Long sum;         private Long count;         public Buffer() {         }         public Buffer(Long sum, Long count) {             this.sum  sum;             this.count  count;         }         public Long getSum() {             return sum;         }         public void setSum(Long sum) {             this.sum  sum;         }         public Long getCount() {             return count;         }         public void setCount(Long count) {             this.count  count;         }     }     public static class MyAvg extends AggregatorLong,Buffer,Double{         Override         public Buffer zero() {             return new Buffer(0L,0L);         }         Override         public Buffer reduce(Buffer b, Long a) {             b.setSum(b.getSum()  a);             b.setCount(b.getCount()  1);             return b;         }         Override         public Buffer merge(Buffer b1, Buffer b2) {             b1.setSum(b1.getSum()  b2.getSum());             b1.setCount(b1.getCount()  b2.getCount());             return b1;         }         Override         public Double finish(Buffer reduction) {             return reduction.getSum().doubleValue() / reduction.getCount();         }         Override         public EncoderBuffer bufferEncoder() {             // 可以用kryo进行优化             return Encoders.kryo(Buffer.class);         }         Override         public EncoderDouble outputEncoder() {             return Encoders.DOUBLE();         }     } } 3.3 UDTF没有 输入一行返回多行Hive。 SparkSQL中没有UDTF需要使用算子类型的flatMap先完成拆分。
http://www.zqtcl.cn/news/794706/

相关文章:

  • 试客那个网站做的好seo管理平台
  • 增加网站关键词库网盟推广合作
  • 企业门户网站内容建设濮阳网络培训基地
  • 做亚马逊运营要看哪些网站免费咨询电脑问题
  • 如何用html制作网站app开发要多少钱
  • 中国搜索提交网站信息网络犯罪
  • 网站服务器做下载链接分销平台系统源码
  • 网站管理助手建站沈阳专业网站建设企业
  • 企业网站开发公司大全建筑工程培训
  • 免费网站开发模板云南省网站开发软件
  • dede小游戏php网站源码广州网站vi设计报价
  • 邯郸建设局网站资质申报wordpress 前端 插件
  • 关于asp_sql网站开发的书籍小程序跳转网页方法
  • 昆明网站开发公司电话建设手机银行的网站
  • 福州建设注册中心网站怎么做公司展示网站
  • 网络营销网站建设知识平面设计初中毕业能学吗
  • 2019销售网站开发与设计现状怎么在网上卖东西赚钱
  • 做网站前后端的发布流程如何管理wordpress网站模板下载
  • 网站历史频道怎么做网站修改标题有影响吗
  • 做自己的卡盟网站做技术一般逛那些网站
  • 网站建设自学多长时间做网站原型的软件
  • 营销型的物流网站模板北京楼市最新消息
  • 宁波模版建站公司湘潭建设网站公司
  • 世界十大网站排名出炉最新军事新闻最新消息视频
  • 医疗电子的网站建设城市建设管理
  • win10建站wordpress商城网站结算页面怎么做
  • 电商网站模板引擎惠阳做网站公司
  • 如何在百度做网站推广中企动力企业邮箱手机邮箱
  • extjs做的网站开发公司宣传语
  • 长安做外贸网站关于阅读类网站的建设规划书