公司国外网站建设,珠海网站建设公司电话,搜索引擎免费下载,vs2008 做网站用户画像之Spark ML实现
1 Spark ML简单介绍
Spark ML是面向DataFrame编程的。Spark的核心开发是基于RDD#xff08;弹性分布式数据集#xff09;#xff0c;但是RDD#xff0c;但是RDD的处理并不是非常灵活#xff0c;如果要做一些结构化的处理#xff0c;将RDD转换成… 用户画像之Spark ML实现
1 Spark ML简单介绍
Spark ML是面向DataFrame编程的。Spark的核心开发是基于RDD弹性分布式数据集但是RDD但是RDD的处理并不是非常灵活如果要做一些结构化的处理将RDD转换成DataFrameDataFrame实际上就是行对象的RDDschema类似于原本的文本数据加上schema做一下结构的转换就变成数据库里面的表表是有元数据的有字段有类型。所以DataFrame处理起来更加灵活。
要进行机器学习是有一系列的流程通常离线的处理现有一组数据集然后进行预处理特征工程完成之后分成训练集合测试集基于训练集训练模型然后选择算法进行评估..这是可以形成一个管道的整体是一个DAG有向无环图。
其实整个进行模型算法训练的过程就是一个管道管道中就会有各种各样的组件这些组件总体来说可以分成两类①第一个是Transformerstransform()用于转换把一个DataFrame转换为另一个DataFrame如把原本的数据集拆分成测试集那就是DataFrame的转换像分词抽样模型的测试都是非常常见的转换操作②第二种类型就是Estimatorsfit()应用在DF上生成一个转换器算法Estimators评估器用到的函数是fit()Estimators是为了生成一个转换器在机器学习中会用到一些算法需要去建模根据训练集得到模型模型本质上就是转换器进行预测是用的这个模型进行预测所以转换是基于这个模型进行预测所以转换就是基于这个模型的转换器转换时他的实例来进行转换。
2 Spark ML的工作流程 首先进行预处理包括模型训练的整个过程是一个管道pipline这个pipline的目的是为了得到一个Estimator即得到一个模型假如说用逻辑回归输入的数据是普通的文本首先进行Toknizer分词分完次后计算他的词频这两个本质上否是transform的操作接下来就要创建一个逻辑回归的实例本质上就是一个Estimator得到一个转换器。 模型有了接下来就要做预测不管是训练集还是测试集都是要进行分词计算词频的这个piplineModel整个都是transform操作这个模型逻辑回归就是上一步通过训练的到的模型。
参数是所有转换器和评估器共享的一个公共api参数名Param是一个参数可以通过setter单独定义也可以通过ParamMap定义一个参数的集合parametervalue传递参数的两种方式①通过setter为实例设置参数②传递ParamMap给fit或者transform方法
3 EstimatorTransformerParam使用案例
1准备带标签和特征的数据
2创建逻辑回归的评估器
3使用setter方法设置参数
4使用存储在lr中的参数来训练一个模型
5使用ParamMap选择指定的参数
6准备测试数据
7预测结果
代码具体实现
1准备带标签和特征的数据
任何应用首先要把需要的类通过import引入性别预测是分类问题选择逻辑回归
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.linalg.{Vector,Vectors}
import org.apache.spark.sql.Row
定义一个初始的DataFrame通过sqlContext创建用Seq序列的方式创建一个集合第一个参数是标签即目标值后面的为特征
val sqlContextnew org.apache.spark.sql.SQLContext(sc)
val training sqlContext.createDataFrame(Seq((1.0, Vectors.dense(1.0,2.1,1.1)),(0.0, Vectors.dense(3.0,2.0,-2.0)),(0.0, Vectors.dense(3.0,0.3,1.0)),(1.0, Vectors.dense(1.0,1.2,-1.5))
)).toDF(label,features)
2创建逻辑回归的评估器设置参数
val lr new LogisticRegression()
//评估器会带一些默认的参数通过explainParams()查看
println(lr.explainParams())
//通过set方式修改迭代次数和正则化参数
lr.setMaxIter(10).setRegParam(0.01)//定义模型
val model1 lr.fit(training)
//查看模型的参数
model1.parent.extractParamMap//通过ParamMap设置参数
val paramMap ParamMap(lr.maxIter - 20).
put(lr.maxIter,30).
put(lr.regParam - 0.1, lr.threshold - 0.55)val paramMap2 ParamMap(lr.probabilityCol - myProbability)
//将两个ParamMap对象合并
val paramMapCombined paramMap paramMap2//根据ParamMap设置的参数定义模型
val model2 lr.fit(training, paramMapCombined)
model2.parent.extractParamMap 3准备测试数据
val test sqlContext.createDataFrame(Seq((1.0, Vectors.dense(-1.2,1.8,1.3)),(0.0, Vectors.dense(4.0,1.8,-0.1)),(1.0, Vectors.dense(0.0,1.9,-1.5))
)).toDF(label,features)
4预测结果
//调用模型1
model1.transform(test).select(label,features,probability,prediction).collect().foreach{case Row(label: Double, features: Vector, probability: Vector, prediction: Double) println(s($features, $label) - probability$probability, prediction$prediction)} 4 构建Pipline和保存Pipline
步骤
1准备训练的文档
2配置ML管道包含三个stageTokenizerHashingTF和LR
3安装管道到数据上
4保存管道到磁盘包括安装好的和未安装好的
5加载管道
6准备测试文档
7预测结果
代码实现
1引入需要的类
//用的数逻辑回归
import org.apache.spark.ml.classification.LogisticRegression
//因为特征工程处理的是特征向量所以需要Vector输入输出会用到
import org.apache.spark.ml.linalg.Vector
//行对象为了输出美化
import org.apache.spark.sql.Row
//需要分词需要Tokenizer需要转换计算词频需要HashingTF
import org.apache.spark.ml.feature.{Tokenizer,HashingTF}
//需要Pipeline将多个Transformers和Estimators连接起来以确定一个ML工作流程
import org.apache.spark.ml.{Pipeline,PipelineModel}
2准备数据集
//含Sprak的为一类
val training sqlContext.createDataFrame(Seq((0L, Spark Write applications quickly in Java, Scala, Python, R, and SQL., 1.0),(1L, Live and learn, 0.0),(2L, Spark Run workloads 100x faster., 1.0),(3L, study hard and make progress every day, 0.0)
)).toDF(id,text,label)
3定义管道中的TokenizerHashingTFLR这三个组件
//创建tokenizer分词器
//setInputCol指明输入DataFrame中的哪一列是被处理的输入参数是Dataframe中存在的列名
//setOutputCol设置新增加列的名字及对输入的列变换后会产生一个新列该方法设置增加新列的列名
val tokenizer new Tokenizer().
setInputCol(text).
setOutputCol(words)//创建hashingTF词频统计他的inputcolumn是tokenizerget出来的
//setNumFeatures设置特征值的数量
val hashingTF new HashingTF().
setNumFeatures(1000).
setInputCol(tokenizer.getOutputCol).
setOutputCol(features)//创建逻辑回归对象setMaxIter设置逻辑回归的迭代次数setRegParam设置正则化
val lr new LogisticRegression().
setMaxIter(10).setRegParam(0.01)
4定义管道
//创建管道setStages将各个计算阶段按照tokenizer,hashingTF,lr顺序pipeline是没有安装好的管道
val pipeline new Pipeline().
setStages(Array(tokenizer,hashingTF,lr))//使用pipeline构建模型model是安装好的管道
val model pipeline.fit(training)
5保存管道到磁盘
pipeline.save(/portrait/sparkML-LRpipeline)
model.save(/portrait/sparkML-LRmodel)
6加载模型
//加载保存到磁盘中模型
val model2 PipelineModel.load(/portrait/sparkML-LRmodel)
7准备测试文档通过回归预测没有测试集
val test sqlContext.createDataFrame(Seq((4L, learn Spark),(5L, hadoop hive),(6L, bigdata hdfs a),(7L, apache Spark)
)).toDF(id,text)
8预测结果
model.transform(test).select(id,text,probability,prediction).collect().foreach{case Row(id: Long, text: String, probability: Vector, prediction: Double) println(s($id, $text) - probability$probability, prediction$prediction)}
5 通过网格参数和交叉验证进行模型调优
所谓的调优就是怎样根据数据选择好的模型或者为整个模型整个管道选择好的参数这里是关注参数的调优模型就选择逻辑回归。参数调优就是给一组参数而不是一个参数让模型自己选择。调优是基于管道整体进行调优。
1准备训练的文档
2配置ML管道包含三个stageTokenizerHashingTF和LR
3使用ParamGridBuilder构建一个参数网格
4使用CrossValidator来选择模型和参数CrossValidator需要一个estimator一个评估器参数集合和一个evaluator
5运行交叉验证选择最好的参数集
6准备测试数据
7预测结果
代码实现过程
1引入需要的包
//用的数逻辑回归
import org.apache.spark.ml.classification.LogisticRegression
//因为特征工程处理的是特征向量所以需要Vector输入输出会用到
import org.apache.spark.ml.linalg.Vector
//行对象为了输出美化
import org.apache.spark.sql.Row
//需要分词需要Tokenizer需要转换计算词频需要HashingTF
import org.apache.spark.ml.feature.{Tokenizer,HashingTF}
//需要Pipeline将多个Transformers和Estimators连接起来以确定一个ML工作流程
import org.apache.spark.ml.{Pipeline,PipelineModel}
//因为是二元的所以用BinaryClassificationEvaluator评估器
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
//使用交叉校验CrossValidator把所有参数排列组合交叉进行校验。ParamGridBuilder参数网格
import org.apache.spark.ml.tuning.{CrossValidator,ParamGridBuilder}
//需要引入SQLContext
import org.apache.spark.sql.SQLContext
2准备数据
val sqlContextnew SQLContext(sc)
val training sqlContext.createDataFrame(Seq((0L, Spark Write applications quickly in Java, Scala, Python, R, and SQL., 1.0),(1L, Live and learn, 0.0),(2L, Spark Run workloads 100x faster., 1.0),(3L, study hard and make progress every day, 0.0),(4L, Rdd Spark who, 1.0),(5L, good good study, 0.0),(6L, Spark faster, 1.0),(7L, day day up, 0.0),(8L, Spark program, 1.0),(9L, hello world, 0.0),(10L, hello Spark, 1.0),(11L, hi how are you, 0.0)
)).toDF(id,text,label)
3构建管道
//创建tokenizer分词器
//setInputCol指明输入DataFrame中的哪一列是被处理的输入参数是Dataframe中存在的列名
//setOutputCol设置新增加列的名字及对输入的列变换后会产生一个新列该方法设置增加新列的列名
val tokenizer new Tokenizer().
setInputCol(text).
setOutputCol(words)
//创建hashingTF词频统计他的inputcolumn是tokenizerget出来的
//特征值的数量网格调优
val hashingTF new HashingTF().
setInputCol(tokenizer.getOutputCol).
setOutputCol(features)
//创建逻辑回归对象setMaxIter设置正则化参数网格调优
val lr new LogisticRegression().
setMaxIter(10)
//创建管道setStages将各个计算阶段按照tokenizer,hashingTF,lr顺序pipeline是没有安装好的管道
val pipeline new Pipeline().
setStages(Array(tokenizer,hashingTF,lr))
4构建网格参数
//构建网格参数addGrid添加网格hashingTF.numFeatures设置hashingTF特征数量
val paramGrid new ParamGridBuilder().
addGrid(hashingTF.numFeatures, Array(10,100,1000)).
addGrid(lr.regParam, Array(0.1,0.01)).
build()
5创建交叉验证CrossValidator对象
//创建CrossValidator交叉验证对象setEstimator设置评估器setEstimatorParamMaps设置参数集setEvaluator设置评估器setNumFolds创建交叉验证器他会把训练集分成NumFolds份实际生产要比2大
val cv new CrossValidator().
setEstimator(pipeline).
setEstimatorParamMaps(paramGrid).
setEvaluator(new BinaryClassificationEvaluator()).
setNumFolds(2)
6根据最优参数构建模型
//构借助参数网格交叉验证选择最优的参数构建模型
val cvModel cv.fit(training)
7添加测试数据
//添加测试集
val test sqlContext.createDataFrame(Seq((12L, learn Spark),(13L, hadoop hive),(14L, bigdata hdfs a),(15L, apache Spark)
)).toDF(id,text)
8预测结果
cvModel.transform(test).select(id,text,probability,prediction).collect().foreach{case Row(id: Long, text: String, probability: Vector, prediction: Double) println(s($id, $text) - probability$probability, prediction$prediction)} 6 通过训练校验分类来调优模型
前面交叉验证是把数据分成多份每一份把所有参数组合计算一次。而校验分类只需要把每一组参数计算一次把数据自动分成训练集合校验集这种方式依赖于比较大的数据量如果数量不够生成的结果是不可信任的。不像校验验证数据集小没关系会交叉验证多次所以即使数据量少但是计算多次多次的结果足够评估选出最优的参数。所以TrainValidationSplit需要的数据量就要大只会计算一次。这个例子采用线性回归。
与CrossValidator不同TrainValidationSplit创建一个训练测试数据集对。 它使用trainRatio参数将数据集分成这两个部分。 例如trainRatio 0.75TrainValidationSplit将生成训练和测试数据集对其中75的数据用于训练25用于验证。
步骤
1准备训练和测试数据
2使用ParamGridBuilder构建一个参数网格
3使用TrainValidationSplit来选择模型和参数CrossValidator需要一个estimator一个评估器参数集合和一个evaluator
4运行校验分类选择最好的参数
5在测试数据上做测试模型是参数组合中执行最好的一个 //使用线性回归求解
import org.apache.spark.ml.regression.LinearRegression
因为是回归问题所以用RegressionEvaluator回归评估器
import org.apache.spark.ml.evaluation.RegressionEvaluator
//使用ParamGridBuilder参数网格和TrainValidationSplit
import org.apache.spark.ml.tuning.{TrainValidationSplit,ParamGridBuilder}
//需要引入SQLContext
import org.apache.spark.sql.SQLContextval sqlContext new SQLContextsc
val data sqlContext.read.format(libsvm).load(file:/data/sample_linear_regression_data.txt)//randomSplits随机拆分seed随机种子
val Array(training, test) data.randomSplit(Array(0.75, 0.25), seed12345)//创建线性回归
val lr new LinearRegression()//elasticNetParam是Elastic net 回归参数取值介于0和1之间。
//fitIntercept是否允许阶段默认是true。regParam参数定义规范化项的权重
val paramGrid new ParamGridBuilder().
addGrid(lr.elasticNetParam, Array(0.0,0.5,1.0)).
addGrid(lr.fitIntercept).
addGrid(lr.regParam, Array(0.1,0.01)).
build()//训练校验的比例setTrainRatio
val trainValidationSplit new TrainValidationSplit().
setEstimator(lr).
setEstimatorParamMaps(paramGrid).
setEvaluator(new RegressionEvaluator).
setTrainRatio(0.8)val model trainValidationSplit.fit(training)model.transform(test).select(features,label,prediction).show()