公司网站无法打开,全球速卖通网址,手机手机网站制作,轻创网fe缺点不方便添加新的优化策略线程安全问题Spark SQL支持三种语言javaScalapythonDataFrame大规模数据化结构能历、提高了运算能力从sql到dataFrame的转化#xff0c;支持sql查询RDD是分布式的java对象的集合#xff0c;对象颞部结构不可知dataframe以rdd为基础的分布式数据集…fe缺点不方便添加新的优化策略线程安全问题Spark SQL支持三种语言javaScalapythonDataFrame大规模数据化结构能历、提高了运算能力从sql到dataFrame的转化支持sql查询RDD是分布式的java对象的集合对象颞部结构不可知dataframe以rdd为基础的分布式数据集提供了详细的结构信息DataFrame的创建SparkSessiondataframe的常用操作df spark.read.json(people.json)
df.printSchema() 查看表的结构
df select(df[name],df[age]1).show()df.filter(df[age]20).show()
df.groupby(age).count().show()df.sort(df[age] desc()).show()
df.sort(df[age].desc(),df[name].asc()).show()利用反射机制推断RDD的模式读取Mysql数据库中的数据DataFrame的创建from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
spark SparkSession.builder.config(conf SparkConf()).getOrCreate()# 分布式读取文件
spark.read.text(people.txt) spark.read.format(text).load(people.txt)
spark.read.json(people.json) spark.read.format(json).load(people.json)
spark.read.parquet(people/parquet) spark.read.format(parquet).load(people.parquent)#文件保存 最后保到一个目录
df.write.txt(people.txt)
df.write.json(people.json)
df.write.parquent(people.parquent)# dataFrame的一些常用操作df.printSchema()
dat.select(_c1).show()
df.filter(df[age]20).show()
df.groupby(age).count().show()
df.sort(df[age].desc()).show()
df.sort(df[age].desc(),df[name].asc()).show()RDD转换得到dataFrame利用反射机制推断RDD模式#用ROW对象去封装一行一行的数据
from pyspark.sql import ROW
people spark.sparkContext.textFile(file:///file_path).map(lambda x:x.split(,)).map(lambda x:ROW(NAME P[0],age int(p[1])))schemaPeople spark.createDataFrame(people)
#必须注册为临时表才供下面的查询使用
schemaPeople.createOrReplaceTempView(people)
personDF spark.sql(select name,age from people where age20)
personsDRR personsDF.rdd.map(lambda p:Nmaep.name,age:str()p.age))
personsRDD.foreach(print)