基于Spark的一个生态产品--MLlib,实现了经典的机器学算法,源码分8个文件夹,classification文件夹下面包含NB、LR、SVM的实现,clustering文件夹下面包含K均值的实现,linalg文件夹下面包含SVD的实现(稀疏矩阵的表示),recommendation文件夹下面包含als,矩阵分解实现,regression文件夹下面实现了线性回归,L2的线性回归,L1的线性回归,Util文件夹下面包含了可以为各个算法生成toy-data的文件,另外还有一个DataValidators.scala文件,api文件夹下面是PythonMLLibAPI.scala
文件,最后一个也是本文将要讲的optimization--优化算法模块包含Gradient.
scala,GradientDescent.scala,Optimizer.scala,Updater.scala4个文件,作为一个scala语言的新手,如文章标题写的一样,只是对四个文件源码进行了粗读,力求搞清楚MLlib的优化算法模块的代码架构是什么样的,实现了哪些算法以及采用了什么并行策略等,关于源码中用到的scala语言特性,等熟悉这门语言后,还需要反复阅读代码。走过、路过的朋友发现文中的错误,也烦请指正,谢谢,下面是阅读过程中的一些理解(注:由于源代码有非常多的注释,为节省空间,本文有选择性的删除了,详细注释请参考源码,另外貌似博客园没有scala语言的插入模板)。
Gradient.scala文件
第一部分,定义了Gradient 的抽象类
package
org.apache.spark.mllib.optimization
import
org.jblas.DoubleMatrix
/**
* Class used to
compute the gradient for a loss function, given a single data
point.
*/
abstract class
Gradient extends Serializable {
/**
* Compute the
gradient and loss given the features of a single data point.
* @param data
- Feature values for one data point. Column matrix of size dx1
* where d is
the number of features.
* @param label
- Label for this data item.
* @param
weights - Column matrix containing weights for every feature.
* @return A
tuple of 2 elements. The first element is a column matrix containing the
computed
* gradient and
the second element is the loss computed at this data point.
*/
def compute(data:
DoubleMatrix, label: Double, weights: DoubleMatrix):
(DoubleMatrix, Double)
}
可以从上面的注释上看出compute的参数data是一个样本的特征(d*1维度),label就是一个double型变量,该数据点(a single
data
point)的标签,weights就是特征变量的回归系数也是d*1维度,该函数返回2个东西,第1个是该样本点下计算的梯度,第2个该样本点下的损失loss
第二部分,Gradient 对三种不同损失函数(Log-Loss, LeastSquares -Loss,Hinge-Loss)的派生类
针对log-loss损失函数,重写抽象类的compute函数
/**
* Compute gradient
and loss for a logistic loss function, as used in binary
classification.
* See also the
documentation for the precise formulation.
*/
class LogisticGradient
extends Gradient {
override def
compute(data: DoubleMatrix, label: Double, weights:
DoubleMatrix):
(DoubleMatrix, Double) = {
val margin:
Double = -1.0 * data.dot(weights)
val
gradientMultiplier = (1.0 / (1.0 + math.exp(margin))) - label
val gradient
= data.mul(gradientMultiplier)
val loss
=
if
(label > 0) {
math.log(1 + math.exp(margin))
} else
{
math.log(1 + math.exp(margin)) - margin
}
(gradient,
loss)
}
}
结合上面代码,margin得到-wx(不明白为什么取margin的名字,函数间隔?但是函数间隔也是y*g(wx)呀),接着gradientMultiplier是求上面梯度公式的左边,gradient 就是该点的梯度,最后求loss,当label=1的时候,上面的log-loss表达式=-[1*log(g(wx))]=-log[1/(1+exp(-wx)]=log(1+exp(margin)),当label=0的时候,上面的log-loss表达式=-[log(1-g(wx))]=-[log(1-1/(1+exp(-wx)))]=-log[exp(-wc)/(1+exp(-wx))]=log(1+exp(-wx))-wc=
log(1+exp(margin)) -margin
针对leastsquares-loss损失函数,重写抽象类的compute函数
/**
* Compute gradient
and loss for a Least-squared loss function, as used in linear
regression.
* This is correct for
the averaged least squares loss function (mean squared error)
* L = 1/n ||A
weights-y||^2
* See also the
documentation for the precise formulation.
*/
class LeastSquaresGradient
extends Gradient {
override def
compute(data: DoubleMatrix, label: Double, weights:
DoubleMatrix):
(DoubleMatrix, Double) = {
val diff:
Double = data.dot(weights) - label
val loss =
diff * diff
val gradient
= data.mul(2.0 * diff)
(gradient,
loss)
}
}
leastsquares-loss的表达式
如注释所示:L = 1/n
||A
weights-y||^2,这里n=1,文中代码的变量diff,就是f(wx)-y的值,损失loss=diff*diff,梯度就是data.mul(2.0 *
diff),注意.mul是DoubleMatrix(jblas)的一个方法,是元素跟矩阵的点乘,.mull是矩阵跟矩阵的乘法,.dot是向量的内积
针对hinge-loss损失函数,重写抽象类的compute函数
/**
* Compute gradient
and loss for a Hinge loss function, as used in SVM binary
classification.
* See also the
documentation for the precise formulation.
* NOTE: This assumes
that the labels are {0,1}
*/
class HingeGradient extends
Gradient {
override def
compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix):
(DoubleMatrix, Double) = {
val
dotProduct = data.dot(weights)
// Our loss
function with {0, 1} labels is max(0, 1 - (2y – 1) (f_w(x)))
// Therefore
the gradient is -(2y - 1)*x
val
labelScaled = 2 * label - 1.0
if (1.0 >
labelScaled * dotProduct) {
(data.mul(-labelScaled), 1.0 - labelScaled * dotProduct)
} else
{
(DoubleMatrix.zeros(1, weights.length), 0.0)
}
}
}
hinge-loss的二分类(-1,1)的表达式是max(0,1- y * f(x)),代码中映射到(0,1),变成max(0, 1 - (2y – 1)
(f(x))),这时候当样本错分的时候(也就是labelScaled
* dotProduct<1),梯度是data.mul(-labelScaled),损失是1-labelScaled *
dotProduct
Updater.scala文件
第一部分,定义了Updater 的抽象类
/**
* Class used to
perform steps (weight update) using Gradient Descent methods.
* For general
minimization problems, or for regularized problems of the form
* min L(w) + regParam
* R(w),
* the compute
function performs the actual update step, when given some
* (e.g. stochastic)
gradient direction for the loss L(w),
* and a desired
step-size (learning rate).
*
* The updater is
responsible to also perform the update coming from the
* regularization term
R(w) (if any regularization is used).
*/
abstract class Updater
extends Serializable {
/**
* Compute an
updated value for weights given the gradient, stepSize, iteration number
and
*
regularization parameter. Also returns the regularization value regParam *
R(w)
* computed
using the *updated* weights.
* @param
weightsOld - Column matrix of size dx1 where d is the number of
features.
* @param
gradient - Column matrix of size dx1 where d is the number of
features.
* @param
stepSize - step size across iterations
* @param iter
- Iteration number
* @param
regParam - Regularization parameter
*
* @return A
tuple of 2 elements. The first element is a column matrix containing updated
weights,
* and the
second element is the regularization value computed using updated
weights.
*/
def
compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix, stepSize: Double,
iter: Int,
regParam: Double): (DoubleMatrix, Double)
}
compute的参数weightsOld是更新前的变量回归系数(d*1维),gradient是根据指定的损失函数计算出的当前梯度,stepSize 是步长也就是学习速率,iter 迭代次数,regParam 是正则参数值,该函数返回2个东西,第1个是更新后的回归系数,第2个是更新后的regParam *
R(w) 值。
第二部分,Updater 对三种不同正则方式(无正则,L1,L2)的派生类
针对无正则 ,重写抽象类的compute函数
/**
* A simple updater
for gradient descent *without* any regularization.
* Uses a step-size
decreasing with the square root of the number of iterations.
*/
class SimpleUpdater extends
Updater {
override def
compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix,
stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) =
{
val
thisIterStepSize = stepSize / math.sqrt(iter)
val step =
gradient.mul(thisIterStepSize)
(weightsOld.sub(step), 0)
}
}
对于梯度下降算法,w -= a*gradient,a是学习率对应代码里面的thisIterStepSize(相当于一开始步长很大,随迭代次数,增加而减小),式子中的a*gradient对应着step,最后,weightsNew=weightsOld.sub(step)
针对L1正则 ,重写抽象类的compute函数
/**
* Updater for L1
regularized problems.
* R(w) =
||w||_1
* Uses a step-size
decreasing with the square root of the number of iterations.
* Instead of
subgradient of the regularizer, the proximal operator for the
* L1 regularization
is applied after the gradient step. This is known to
* result in better
sparsity of the intermediate solution.
* The corresponding
proximal operator for the L1 norm is the soft-thresholding
* function. That is,
each weight component is shrunk towards 0 by shrinkageVal.
*
If w > shrinkageVal, set weight component to
w-shrinkageVal.
*
If w < -shrinkageVal, set weight component to
w+shrinkageVal.
*
If -shrinkageVal < w < shrinkageVal, set weight component to
0.
* Equivalently, set
weight component to signum(w) * max(0.0, abs(w) - shrinkageVal)
*/
class L1Updater extends
Updater {
override def
compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix,
stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) =
{
val
thisIterStepSize = stepSize / math.sqrt(iter)
val step =
gradient.mul(thisIterStepSize)
// Take
gradient step
val
newWeights = weightsOld.sub(step)
// Apply
proximal operator (soft thresholding)
val
shrinkageVal = regParam * thisIterStepSize
(0 until
newWeights.length).foreach { i =>
val wi
= newWeights.get(i)
newWeights.put(i, signum(wi) * max(0.0, abs(wi) - shrinkageVal))
}
(newWeights,
newWeights.norm1 * regParam)
}
}
加了正则项之后,前几步都一样,然后关键是对后面的处理(后面的理论暂时还不太理解,可以参考http://freemind.pluskid.org/machine-learning/sparsity-and-some-basics-of-l1-regularization/),还是说说代码步骤吧,变量shrinkageVal =regParam
* thisIterStepSize(注意:要*thisIterStepSize,因为w
-= a*gradient 里面的gradient包括L(w)还包括正则的R(w)),然后对加正则前更新的newWeights,上遍历每一个元素,直接对该元素赋值newWeights.put(i,
signum(wi) * max(0.0, abs(wi) - shrinkageVal)),对应着代码注释的黑体部分。
针对L2正则 ,重写抽象类的compute函数
/**
* Updater for L2
regularized problems.
* R(w) = 1/2
||w||^2
* Uses a step-size
decreasing with the square root of the number of iterations.
*/
class SquaredL2Updater
extends Updater {
override def
compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix,
stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) =
{
val
thisIterStepSize = stepSize / math.sqrt(iter)
val step =
gradient.mul(thisIterStepSize)
// add up
both updates from the gradient of the loss (= step) as well as
// the
gradient of the regularizer (= regParam * weightsOld)
val
newWeights = weightsOld.mul(1.0 - thisIterStepSize *
regParam).sub(step)
(newWeights,
0.5 * pow(newWeights.norm2, 2.0) * regParam)
}
}
GradientDescent.scala文件
第一部分,定义了GradientDescent 类
package
org.apache.spark.mllib.optimization
import
org.apache.spark.Logging
import
org.apache.spark.rdd.RDD
import
org.jblas.DoubleMatrix
import
scala.collection.mutable.ArrayBuffer
/**
* Class used to solve
an optimization problem using Gradient Descent.
* @param gradient
Gradient function to be used.
* @param updater
Updater to be used to update weights after every iteration.
*/
class GradientDescent(var
gradient: Gradient, var updater: Updater)
extends Optimizer
with Logging
{
private var
stepSize: Double = 1.0
private var
numIterations: Int = 100
private var
regParam: Double = 0.0
private var
miniBatchFraction: Double = 1.0
/**
* Set the
initial step size of SGD for the first step. Default 1.0.
* In
subsequent steps, the step size will decrease with stepSize/sqrt(t)
*/
def
setStepSize(step: Double): this.type = {
this.stepSize
= step
this
}
/**
* Set fraction
of data to be used for each SGD iteration.
* Default 1.0
(corresponding to deterministic/classical gradient descent)
*/
def
setMiniBatchFraction(fraction: Double): this.type = {
this.miniBatchFraction = fraction
this
}
/**
* Set the
number of iterations for SGD. Default 100.
*/
def
setNumIterations(iters: Int): this.type = {
this.numIterations = iters
this
}
/**
* Set the
regularization parameter. Default 0.0.
*/
def
setRegParam(regParam: Double): this.type = {
this.regParam
= regParam
this
}
/**
* Set the
gradient function (of the loss function of one single data example)
* to be used
for SGD.
*/
def
setGradient(gradient: Gradient): this.type = {
this.gradient
= gradient
this
}
/**
* Set the
updater function to actually perform a gradient step in a given
direction.
* The updater
is responsible to perform the update from the regularization term as
well,
* and
therefore determines what kind or regularization is used, if any.
*/
def
setUpdater(updater: Updater): this.type = {
this.updater
= updater
this
}
def optimize(data:
RDD[(Double, Array[Double])], initialWeights: Array[Double])
:
Array[Double] = {
val (weights,
stochasticLossHistory) = GradientDescent.runMiniBatchSGD(
data,
gradient,
updater,
stepSize,
numIterations,
regParam,
miniBatchFraction,
initialWeights)
weights
}
}
该类的输入有2个参数,第一个是前面都是gradient对应了用户需要选哪个损失函数计算梯度,第二个updater
对应了用户选择哪一种正则方式,程序开头都设置了stepSize,numIterations,regParam,miniBatchFraction的默认值最后一个函数optimize,输入RDD数据,跟初始的回归系数weight,返回weights权重
第二部分,定义了object
GradientDescent
// Top-level method to run
gradient descent.
object GradientDescent
extends Logging {
/**
* Run
stochastic gradient descent (SGD) in parallel using mini batches.
* In each
iteration, we sample a subset (fraction miniBatchFraction) of the total
data
* in order to
compute a gradient estimate.
* Sampling,
and averaging the subgradients over this subset is performed using one
standard
* spark
map-reduce in each iteration.
*
* @param data
- Input data for SGD. RDD of the set of data examples, each of
* the form
(label, [feature values]).
* @param
gradient - Gradient object (used to compute the gradient of the loss function
of
* one single
data example)
* @param
updater - Updater function to actually perform a gradient step in a given
direction.
* @param
stepSize - initial step size for the first step
* @param
numIterations - number of iterations that SGD should be run.
* @param
regParam - regularization parameter
* @param
miniBatchFraction - fraction of the input data set that should be used
for
* one
iteration of SGD. Default value 1.0.
*
* @return A
tuple containing two elements. The first element is a column matrix
containing
* weights for
every feature, and the second element is an array containing the
* stochastic
loss computed for every iteration.
*/
def
runMiniBatchSGD(
data:
RDD[(Double, Array[Double])],
gradient:
Gradient,
updater:
Updater,
stepSize:
Double,
numIterations: Int,
regParam:
Double,
miniBatchFraction: Double,
initialWeights: Array[Double]) : (Array[Double], Array[Double]) = {
val
stochasticLossHistory = new ArrayBuffer[Double](numIterations)
val
nexamples: Long = data.count()
val
miniBatchSize = nexamples * miniBatchFraction
// Initialize
weights as a column vector
var weights =
new DoubleMatrix(initialWeights.length, 1, initialWeights:_*)
var regVal =
0.0
for (i <-
1 to numIterations) {
//
Sample a subset (fraction miniBatchFraction) of the total data
//
compute and sum up the subgradients on this subset (this is one
map-reduce)
val
(gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i).map
{
case (y, features) =>
val featuresCol = new DoubleMatrix(features.length, 1,
features:_*)
val (grad, loss) = gradient.compute(featuresCol, y, weights)
(grad, loss)
}.reduce((a, b) => (a._1.addi(b._1), a._2 + b._2))
/**
* NOTE(Xinghao): lossSum is computed using the weights from the previous
iteration
* and regVal is the regularization value computed in the previous
iteration as well.
*/
stochasticLossHistory.append(lossSum / miniBatchSize + regVal)
val
update = updater.compute(
weights, gradientSum.div(miniBatchSize), stepSize, i, regParam)
weights = update._1
regVal
= update._2
}
logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses
%s".format(
stochasticLossHistory.takeRight(10).mkString(", ")))
(weights.toArray, stochasticLossHistory.toArray)
}
}
该object进行了整个的优化过程,输出是回归系数跟每次迭代的loss,这里实现的是minibatch-sgd的并行,前面的var weights = new
DoubleMatrix(initialWeights.length, 1,
initialWeights:_*),这个操作是把array型的搞成矩阵型的d*1维矩阵。关键代码for (i
<- 1 to numIterations) 里面的,首先data是spark的RDD数据类型,data.sample方法第一个参数指是否又放回的抽样,第二个是抽样比例,第三个是随机种子,data.sample返回抽样后的RDD,然后RDD.map,RDD.reduce操作就是一个完整的map-reduce操作。接着,把得到的gradientSum除以miniBatchSize,扔到updater里面去更新梯度,关于minibatch-sgd的并行策略可以参考我之前的文章《常见数据挖掘算法的Map-Reduce策略(2)》里面的algorithm3。 Spark0.9.0机器学习包MLlib-Optimization代码粗读,布布扣,bubuko.com
Spark0.9.0机器学习包MLlib-Optimization代码粗读
原文:http://www.cnblogs.com/kobedeshow/p/3622997.html