室内设计师第一网站,电商设计是干嘛的,ui设计做app网站要学什么,在线看私人不收费不登录Spark SQL 中UDF的讲解 User Define Function, 用户自定义函数,简称UDF,存在与很多组件中。 在使用Sparksql的人都遇到了Sparksql所支持的函数太少了的难处#xff0c;除了最基本的函数#xff0c;Sparksql所能支撑的函数很少#xff0c;肯定不能满足正常的项目使用#xf…Spark SQL 中UDF的讲解 User Define Function, 用户自定义函数,简称UDF,存在与很多组件中。 在使用Sparksql的人都遇到了Sparksql所支持的函数太少了的难处除了最基本的函数Sparksql所能支撑的函数很少肯定不能满足正常的项目使用UDF可以解决问题。 SparkSQL中的UDF相当于是1进1出UDAF相当于是多进一出类似于聚合函数。 开窗函数一般分组取topn时常用。 可以理解为自己定义函数来获取自己想要的结果案例借鉴于网络需求计算文本中每一个单词的长度代码Scala版本package com.bynear.Scalaimport org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}object UDF {def main(args: Array[String]): Unit {val conf new SparkConf().setAppName(SparkSQL_UDF).setMaster(local)val sc new SparkContext(conf)val sqlContext new SQLContext(sc)val names Array(刘亦菲, 张柏芝, 冯提模,陈一发儿)val nameRDD sc.parallelize(names, 5)val nameRowRDD nameRDD.map(name Row(name))val structType StructType(Array(StructField(name, StringType, true)))val namesDF sqlContext.createDataFrame(nameRowRDD, structType)namesDF.registerTempTable(names)sqlContext.udf.register(strLen, (str: String) str.length)sqlContext.sql(select name,strLen(name) as length from names).show()sqlContext.sql(select name,strLen(name) as length from names).collect().foreach(println)}
} 运行结果---------- |name|length| ---------- | 刘亦菲| 3| | 张柏芝| 3| | 冯提模| 3| |陈一发儿| 4| ---------- [刘亦菲,3] [张柏芝,3] [冯提模,3] [陈一发儿,4] Java版本package com.bynear.spark_sql;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;import java.util.ArrayList;
public class JavaUDF {public static void main(String[] args) {SparkConf conf new SparkConf().setAppName(JavaUDF).setMaster(local);JavaSparkContext sc new JavaSparkContext(conf);SQLContext sqlContext new SQLContext(sc.sc());ArrayListString names new ArrayListString();names.add(刘亦菲);names.add(张柏芝);names.add(冯提模);names.add(陈一发儿);JavaRDDString nameRDD sc.parallelize(names);JavaRDDRow nameRowRDD nameRDD.map(new FunctionString, Row() {Overridepublic Row call(String line) throws Exception {return RowFactory.create(String.valueOf(line));}});/*** 使用动态编程方式将RDD转换为Dataframe*/ArrayListStructField fields new ArrayListStructField();fields.add(DataTypes.createStructField(name, DataTypes.StringType, true));StructType structType DataTypes.createStructType(fields);DataFrame nameDF sqlContext.createDataFrame(nameRowRDD, structType);/*** 注册临时表*/nameDF.registerTempTable(user);/*** 根据UDF函数参数的个数来决定是实现哪一个UDF UDF1UDF2, 表明包含几个参数传入* UDF1String, Integer 表示 传入参数 String 输出参数为 Integer* call方法为 自定义的函数* DataTypes.IntegerType 必须与输出参数的类型一致即 Integer*/sqlContext.udf().register(StrLen, new UDF1String, Integer() {Overridepublic Integer call(String s) throws Exception {return s.length();}}, DataTypes.IntegerType);/*** select name ,StrLen(name) as length from user* 在临时表user中 查找name StrLen(name) name的长度* StrLen(name) as length 表示将获取到的name的长度 例如15 15作为一列 as length 列名为 length*/sqlContext.sql(select name ,StrLen(name) as length from user).show();Row[] rows sqlContext.sql(select name ,StrLen(name) as length from user).collect();for (Row row : rows) {System.out.println(row);}sc.close();}
} 输出结果同上Java版本中主要之一到UDFX 方法以及传入参数的个数类型以及输出类型最终要的是文本最后的DataTypes.IntegerType 类型要与输出类型相同