摘要:通常在大厂实际项目中会使用Spark来处理大规模数据下的数据挖掘和分析相关工作。本篇从项目实战中总结常用的Spark特征处理实例,方便小伙伴们更好的使用Spark做数据挖掘相关的工作。
目录
01 特征处理的意义
02 特征提取
03 特征转换
04 特征选择
01 特征处理的意义
在数据挖掘项目中,由于我们获取的原始数据中包含很多噪声,所以在真正提供给模型前需要特征处理处理工作,否则再好的模型也只能“Garbage in,garbage out”。
总的来说,特征处理主要包括三部分,特征提取、特征转换和特征选择。
02 特征提取
?特征提取一般指从原始数据中抽取特征的过程。
1. 计数向量器(Countvectorizer)
importorg.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel} val df =spark.createDataFrame(Seq( (0, Array("a","e","a","d","b")), (1, Array("a","c","b","d","c","f","a","b")), (2, Array("a","f")) )).toDF("id", "words") var cv_model = newCountVectorizer().setInputCol("words").setOutputCol("features").setVocabSize(10).setMinDF(2).fit(df) val cv1 = cv_model.transform(df) cv1.show(false)
2. 词频-逆向文件频率(TF-IDF)
import org.apache.spark.ml.feature.{HashingTF,IDF, Tokenizer} val wordsData =spark.createDataFrame(Seq( "传奇 游戏 战士".split(" "), "苹果 梨 香蕉".split(" "), "苹果 手机 流畅".split(" ") ).map(Tuple1.apply)).toDF("words") wordsData.show(false) // step1 hashingTF val hashingTF = newHashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(2000) val featurizedData =hashingTF.transform(wordsData) // step2 计算IDF val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features") val idfModel =idf.fit(featurizedData) val rescaledData =idfModel.transform(featurizedData) rescaledData.select("words","features").show(false)
3. 词转向量(Word2Vec)
?importorg.apache.spark.ml.feature.Word2Vec val documentDF =spark.createDataFrame(Seq( "传奇 游戏 战士".split(" "), "苹果 梨 香蕉".split(" "), "传奇 游戏 种类多".split(" "), "苹果 手机 流畅".split(" ") ).map(Tuple1.apply)).toDF("text") val word2Vec = newWord2Vec().setInputCol("text").setOutputCol("result").setVectorSize(10).setMinCount(2) val model =word2Vec.fit(documentDF) val result =model.transform(documentDF) result.show(false)
03 特征转换
1. 连续型数据转换成离散数据
1.1 二值化(Binarizer)
1.2 离散化重组(Bucketizer)
?importorg.apache.spark.ml.feature.Bucketizer val data = Array(-8.0, -0.5, -0.3,0.0, 0.2, 9.0) val splits = Array(Double.NegativeInfinity,-0.5, 0.0, 0.5, Double.PositiveInfinity) val dataFrame =spark.createDataFrame(data.map(Tuple1.apply)).toDF("features") val bucketizer = newBucketizer().setInputCol("features").setOutputCol("bucketedFeatures").setSplits(splits) bucketizer.transform(dataFrame).show(false)
1.3 分位数离散化(QuantileDiscretizer)
?importorg.apache.spark.ml.feature.QuantileDiscretizer val data = Array((0, 18.0), (1, 19.0),(2, 8.0), (3, 5.0), (4, 2.2)) var df =spark.createDataFrame(data).toDF("id", "hour") val discretizer = newQuantileDiscretizer().setInputCol("hour").setOutputCol("result").setNumBuckets(3) val result =discretizer.fit(df).transform(df) result.show()
2. 字符串和索引相互转换
2.1 字符串-索引变换(StringIndexer)
importorg.apache.spark.ml.feature.StringIndexer val df = spark.createDataFrame( Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"),(4, "a"), (5, "c")) ).toDF("id", "category") val indexer = newStringIndexer().setInputCol("category").setOutputCol("categoryIndex") val indexed =indexer.fit(df).transform(df) indexed.show(false)
2.2 索引-字符串(IndexToString)
3. 正则化(Normalizer)
?importorg.apache.spark.ml.feature.Normalizer importorg.apache.spark.ml.linalg.{Vector,Vectors} val data=Seq(Vectors.dense(-1,1,1,8,56),Vectors.dense(-1,3,-1,-9,88),Vectors.dense(0,5,1,10,96),Vectors.dense(0,5,1,11,589),Vectors.dense(0,5,1,11,688)) valdf=spark.createDataFrame(data.map(Tuple1.apply)).toDF("features") val normalizer = newNormalizer().setInputCol("features").setOutputCol("normFeatures").setP(1.0) normalizer.transform(df).show(false)
4. 规范化(StandardScaler)
?importorg.apache.spark.ml.feature.StandardScaler importorg.apache.spark.ml.linalg.{Vector,Vectors} val dataFrame =spark.createDataFrame(Seq( (0, Vectors.dense(1.0, 0.5, -1.0)),(1,Vectors.dense(2.0, 1.0, 1.0)), (2, Vectors.dense(4.0, 10.0, 2.0)))).toDF("id","features") val scaler = newStandardScaler().setInputCol("features") .setOutputCol("scaledFeatures").setWithStd(true).setWithMean(false) val scalerModel =scaler.fit(dataFrame) val scaledData =scalerModel.transform(dataFrame) scaledData.show(false)
5. 主成分分析 (PCA)
importorg.apache.spark.ml.feature.PCA importorg.apache.spark.ml.linalg.{Vector,Vectors} val data = Array( Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))), Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0), Vectors.dense(4.0, 0.0, 0.0, 6.0,7.0)) val df =spark.createDataFrame(data.map(Tuple1.apply)).toDF("features") val scaledDataFrame = newStandardScaler().setInputCol("features").setOutputCol("scaledFeatures").fit(df).transform(df) val pca = new PCA().setInputCol("features").setOutputCol("pcaFeatures").setK(3).fit(scaledDataFrame) val pcaDF =pca.transform(scaledDataFrame) pcaDF.select("features","pcaFeatures").show(false)
6. 向量-索引变换(VectorIndexer)
importorg.apache.spark.ml.feature.VectorIndexer importorg.apache.spark.ml.linalg.Vectors val data=Seq(Vectors.dense(-1,1,1,8,56), Vectors.dense(-1,3,-1,-9,88), Vectors.dense(0,5,1,10,96), Vectors.dense(0,5,1,11,589)) valdf=spark.createDataFrame(data.map(Tuple1.apply)).toDF("features") val indexer = newVectorIndexer().setInputCol("features").setOutputCol("indexed").setMaxCategories(3) val indexerModel =indexer.fit(df) indexerModel.transform(df).show(false)
7. SQL转换器(SQLTransformer)
?import org.apache.spark.ml.feature.SQLTransformer val df = spark.createDataFrame( Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2") val sqlTrans = newSQLTransformer().setStatement( "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__") sqlTrans.transform(df).show()
8. 独热编码(OneHotEncoder):独热编码将标签指标映射为二值变量。
9. 最大值-最小值缩放(MinMaxScaler):将独立的特征值转换到指定的范围内,通常为[0,1]。
10. 特征向量合并(VectorAssembler):将原始特征和不同特征转换器生成的特征合并为单个特征向量。输入列的值将按指定顺序依次添加到一个新向量中。
04 特征选择
特征选择是从特征向量中选择那些更简单有效的特征。适用于在高维数据分析中剔除冗余特征,提升模型的性能。特征选择后的特征是原来特征的一个子集。
1. 向量机(VectorSlicer) :基于已有的特征库,通过索引或者列名来选择部分需要的特征。
2. R公式(RFormula) :通过R模型公式产生一个特征向量和一个标签列。适合在需要做OneHotEncoder的时候,可以一个简单的代码把所有的离散特征转化成数值化表示。
3. 卡方特征选择(ChiSqSelector)
importorg.apache.spark.ml.feature.ChiSqSelector importorg.apache.spark.ml.feature.VectorIndexer val data = Seq( (7,Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0), (8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0), (9, Vectors.dense(1.0, 0.0,15.0, 0.2), 0.0)) val df =spark.createDataset(data).toDF("id", "features", "clicked") val selector = newChiSqSelector().setNumTopFeatures(2).setFeaturesCol("features").setLabelCol("clicked").setOutputCol("selectedFeatures") val result =selector.fit(df).transform(df) result.show(false)
参考资料
[1] http://spark.apache.org/docs/latest/ml-features.html
[2] http://www.apache.wiki/pages/viewpage.action?pageId=5505205
[3] https://blog.csdn.net/liulingyuan6/article/details/53413728
特别说明:本篇主要参考资料是Spark官网资料,结合实际项目实例总结出常用的特征处理的实例。有兴趣的小伙伴可以参考上面的资料[1]。
?
喜欢本类型文章的小伙伴可以关注我的微信公众号:数据拾光者。有任何干货我会首先发布在微信公众号,还会同步在知乎、头条、简书、csdn等平台。也欢迎小伙伴多交流。如果有问题,可以在微信公众号随时Q我哈。
原文:https://www.cnblogs.com/wilson0068/p/12389009.html