当前位置: 首页 > news >正文

网站建设有限公司六安百度推广公司

网站建设有限公司,六安百度推广公司,网站备案期,做网站烧钱吗SparkSql 结构化数据与非结构化数据 结构化数据就类似于excel表中的数据#xff08;统计的都是结构化的数据#xff09;一般都使用sparkSql处理结构化的数据 结构化的文件#xff1a;JSON、CSV【以逗号分隔】、TSV【以制表符分隔】、parquet、orc 结构化的表#xff1a;…SparkSql 结构化数据与非结构化数据 结构化数据就类似于excel表中的数据统计的都是结构化的数据一般都使用sparkSql处理结构化的数据 结构化的文件JSON、CSV【以逗号分隔】、TSV【以制表符分隔】、parquet、orc 结构化的表数据库中表的数据MySQL、Oracle、Hive 我们在sparkcore中导入数据使用的是textFile而在sparksql中怎么导入数据呢 使用的是DataFrame进行数据的导入 将一些结构化的数据进行sql查询需要将数据变为表是表就必须有表结构表结构就是Schema。 一个经典的wordcount案例 代码如下里面有sql和dsl两种写法 import osfrom pyspark import SparkConf, SparkContext from pyspark.sql import SparkSession import pyspark.sql.functions as Fif __name__ __main__:os.environ[JAVA_HOME] C:/Program Files/Java/jdk1.8.0_131# 配置Hadoop的路径就是前面解压的那个路径os.environ[HADOOP_HOME] rD:\BigDate\05-Hadoop\software\hadoop-3.3.1# 配置base环境Python解析器的路径os.environ[PYSPARK_PYTHON] C:/ProgramData/Miniconda3/python.exe # 配置base环境Python解析器的路径os.environ[PYSPARK_DRIVER_PYTHON] C:/ProgramData/Miniconda3/python.exe# 创建spark对象spark SparkSession.builder.master(local[2]).appName(SparkSQL-wordcount案例).config(spark.sql.shuffle.partitions, 2).getOrCreate()print(spark)# 创建一个DataFrame对象,读取数据df spark.read.text(../../datas/wordcount/data.txt)# 创建一个临时表表名为 wordcountdf.createOrReplaceTempView(wordcount)# 第一种写法使用sparksqlspark.sql(with t as ( select word from wordcount lateral view explode(split(value, )) wordtemp as word),t2 as (select trim(word) word from t where trim(word) ! )select word,count(1) countNum from t2 group by word order by countNum desc).show()# 第二种写法使用 dsldf.select(F.explode(F.split(value, )).alias(word)) \.where( trim(word) ! ).groupby(word).count().orderBy(count,ascendingFalse).show()#这里的where(F.trim(word) ! ) 还可以写成 where( trim(word) ! )# 还可以这样写df.select(F.explode(F.split(value, )).alias(word)) \.where(F.trim(word) ! ).groupby(F.col(word)).agg(F.count(F.col(word)).alias(cou)).orderBy(F.col(cou),ascendingFalse).show()spark.stop() 以上的代码还可以使用with进行优化 补充 with的作用: 我们在创建对象的时候经常需要关闭close、stop 如果忘记关闭太多对象的话就会影响性能使用with自动帮我们关闭 什么时候可以使用with呢 源码中有 __enter__ 和 __exit__ 的时候就可以使用with进行优化 优化过后的代码 此时就不需要在手动stop关闭了 import osfrom pyspark import SparkConf, SparkContext from pyspark.sql import SparkSession import pyspark.sql.functions as Fif __name__ __main__:os.environ[JAVA_HOME] C:/Program Files/Java/jdk1.8.0_131# 配置Hadoop的路径就是前面解压的那个路径os.environ[HADOOP_HOME] rD:\BigDate\05-Hadoop\software\hadoop-3.3.1# 配置base环境Python解析器的路径os.environ[PYSPARK_PYTHON] C:/ProgramData/Miniconda3/python.exe # 配置base环境Python解析器的路径os.environ[PYSPARK_DRIVER_PYTHON] C:/ProgramData/Miniconda3/python.exe# 创建spark对象with SparkSession.builder.master(local[2]).appName(SparkSQL-wordcount案例).config(spark.sql.shuffle.partitions, 2).getOrCreate() as spark:# 创建一个DataFrame对象,读取数据df spark.read.text(../../datas/wordcount/data.txt)# 创建一个临时表表名为 wordcountdf.createOrReplaceTempView(wordcount)# 第一种写法使用sparksqlspark.sql(with t as ( select word from wordcount lateral view explode(split(value, )) wordtemp as word),t2 as (select trim(word) word from t where trim(word) ! )select word,count(1) countNum from t2 group by word order by countNum desc).show()# 第二种写法使用 dsldf.select(F.explode(F.split(value, )).alias(word)) \.where( trim(word) ! ).groupby(word).count().orderBy(count,ascendingFalse).show()#这里的where(F.trim(word) ! ) 还可以写成 where( trim(word) ! )# 还可以这样写df.select(F.explode(F.split(value, )).alias(word)) \.where(F.trim(word) ! ).groupby(F.col(word)).agg(F.count(F.col(word)).alias(cou)).orderBy(F.col(cou),ascendingFalse).show() 一个案例 需求统计评分次数大于2000次的所有电影中平均评分最高的Top10结果显示电影名称、电影平均评分、电影评分次数。 电影评分数据datas/movie/ratings.dat【用户id、电影id、评分、评分时间】 数据如下 1::1193::5::978300760 1::661::3::978302109 1::914::3::978301968 1::3408::4::978300275 1::2355::5::978824291 1::1197::3::978302268 1::1287::5::978302039 1::2804::5::978300719 1::594::4::978302268 1::919::4::978301368 1::595::5::978824268 电影信息数据datas/movie/movies.dat【电影id、电影名称、分类】 1::Toy Story (1995)::Animation|Childrens|Comedy 2::Jumanji (1995)::Adventure|Childrens|Fantasy 3::Grumpier Old Men (1995)::Comedy|Romance 4::Waiting to Exhale (1995)::Comedy|Drama 5::Father of the Bride Part II (1995)::Comedy 6::Heat (1995)::Action|Crime|Thriller 7::Sabrina (1995)::Comedy|Romance 8::Tom and Huck (1995)::Adventure|Childrens 9::Sudden Death (1995)::Action 首先给定的数据不是我们所经常使用的格式化数据所以需要先将数据进行格式化 可以使用RDD的算子将数据改为我们想要的格式化数据 也可以直接利用sql将非格式化的数据修改为我们需要的格式的数据 写这个案例我们可以利用前面所学的 RDD 和 sparkSQL一起完成这个案例 使用RDDSparkSQL 代码如下 import os import refrom pyspark import SparkConf, SparkContext from pyspark.sql import SparkSessionif __name__ __main__:os.environ[JAVA_HOME] C:/Program Files/Java/jdk1.8.0_131# 配置Hadoop的路径就是前面解压的那个路径os.environ[HADOOP_HOME] rD:\BigDate\05-Hadoop\software\hadoop-3.3.1# 配置base环境Python解析器的路径os.environ[PYSPARK_PYTHON] C:/ProgramData/Miniconda3/python.exe # 配置base环境Python解析器的路径os.environ[PYSPARK_DRIVER_PYTHON] C:/ProgramData/Miniconda3/python.exe# 创建spark对象with SparkSession.builder.master(local[2]).appName(MovieTop10).config(spark.sql.shuffle.partitions, 2).getOrCreate() as spark:print(spark)rating_df spark.sparkContext.textFile(../../datas/movie/ratings.dat).map(lambda line:re.split(::,line)) \.filter(lambda item:len(item) 4).map(lambda item:(item[0],item[1],item[2],item[3])) \.toDF([user_id,movie_id,score,score_time]).createOrReplaceTempView(rating)# spark.sql(# select * from rating# ).show()movie_df spark.sparkContext.textFile(../../datas/movie/movies.dat) \.map(lambda line:(line.split(::)[0],line.split(::)[1],line.split(::)[2])) \.toDF([movie_id, movie_name, movie_categry]).createOrReplaceTempView(movie)# spark.sql(# select * from movie# ).show(truncateFalse)#统计评分次数大于2000次的所有电影中平均评分最高的Top10结果显示电影名称、电影平均评分、电影评分次数spark.sql(select m.movie_name,round(avg(r.score),2) avgRate,count(1) countNum from movie m join rating r on m.movie_id r.movie_idgroup by m.movie_name having countNum 2000 order by avgRate desc limit 10).show(truncateFalse)# 保留两位小数后结果可能有重复的想要获取重复排名也只算一位的可以使用排名函数dense_rank()spark.sql(with t as (select m.movie_name,round(avg(r.score),2) avgRate,count(1) countNum from movie m join rating r on m.movie_id r.movie_idgroup by m.movie_name having countNum 2000),t2 as (select *,dense_rank() over(order by avgRate desc) paiming from t) select * from t2 where paiming 10).show() 复习 排名函数 1、row_number() row_number从1开始按照顺序生成分组内记录的序列,row_number()的值不会存在重复,当排序的值相同时,按照表中记录的顺序进行排列 效果如下 98 1 97 2 97 3 96 4 95 5 95 6没有并列名次情况顺序递增 2、rank() 生成数据项在分组中的排名排名相等会在名次中留下空位 效果如下 98 1 97 2 97 2 96 4 95 5 95 5 94 7 有并列名次情况顺序跳跃递增 3、dense_rank() 生成数据项在分组中的排名排名相等会在名次中不会留下空位 效果如下 98 1 97 2 97 2 96 3 95 4 95 4 94 5 有并列名次情况顺序递增 只使用 SparkSQL 以上是RDD sparkSQL的写法 还可以通过 sparkSQL的写法硬写出来 通过split()方法根据非格式化数据的分隔符将数据切成我们需要的DataFrame类型的数据 df1 spark.read.text(../../datas/movie/movies.dat).createOrReplaceTempView(movie1) df2 spark.read.text(../../datas/movie/ratings.dat).createOrReplaceTempView(rating1)#统计评分次数大于2000次的所有电影中平均评分最高的Top10结果显示电影名称、电影平均评分、电影评分次数 spark.sql(with m1 as (select split(value,::)[0] movie_id,split(value,::)[1] movie_name,split(value,::)[2] movie_categary from movie1),r1 as ( select split(value,::)[0] user_id,split(value,::)[1] movie_id,split(value,::)[2] score,split(value,::)[3] score_time from rating1)select m1.movie_name,round(avg(r1.score),2) avgRote,count(1) countNum from m1 join r1 on m1.movie_id r1.movie_idgroup by m1.movie_name having countNum 2000 order by avgRote desc limit 10 ).show(truncateFalse)# 同样也可以写成排名函数 spark.sql(with m1 as (select split(value,::)[0] movie_id,split(value,::)[1] movie_name,split(value,::)[2] movie_categary from movie1),r1 as ( select split(value,::)[0] user_id,split(value,::)[1] movie_id,split(value,::)[2] score,split(value,::)[3] score_time from rating1),t as ( select m1.movie_name,round(avg(r1.score),2) avgRote,count(1) countNum from m1 join r1 on m1.movie_id r1.movie_idgroup by m1.movie_name having countNum 2000),t2 as ( select *,dense_rank() over(order by avgRote desc) paiming from t)select * from t2 where paiming 10 ).show(truncateFalse)
http://www.zqtcl.cn/news/19368/

相关文章:

  • 如果网站没有做icp备案吗备案域名批量查询
  • wordpress注册邮箱配置windows优化大师收费吗
  • 拍卖网站模版wordpress 屏蔽ftp
  • 网站宣传平台合肥正规的seo公司
  • 高端建站的公司昆明网络建设
  • 纪检网站建设计划企业年金是1比3还是1比4
  • 网站建设工具哪个好用手机做照片下载网站
  • 搭建网站的软件网站如何做词
  • 网站建设的财务计划书wordpress邮件设置
  • 网站成功秘诀架设网站 软件
  • 昆明seo公司网站网站开发费用明细
  • 建立一个个人介绍的网站网店怎么做
  • 大唐网站设计天津艺匠做网站怎么样
  • 重庆南昌网站建设亚马逊网站托管怎么做
  • app网站欣赏网站建设合伙合同
  • 营销网站更受用户欢迎的原因是p2p网贷网站建设
  • 哪个网站做马代路线好扁平化企业网站模板
  • 怀远县建设局网站wordpress取消邮件
  • asp系统网站源码下载网站软件免费安装
  • 一个网站两个页面湘潭知名网站建设
  • 做网站遇到的问题及解决方法aso优化推广
  • php网站开发流程逻辑牌具网站广告怎么做
  • 做网站配置服务器在广州开发一个营销网站多少钱
  • 360免费建站连接做化学科普网站的目的
  • 网站空间管理权限设计制作简单的手机网站
  • 自己做网站哪种好做深圳信息公司做关键词
  • 任丘住房建设局网站微信小程序注册是免费的吗
  • 网站建设销售人才简历建网页
  • 苏州高端网站建设kguwordpress 修改入口文件
  • 南京自助建站模板网站死链接