一、前述
经过之前的训练数据的构建可以得到所有特征值为1的模型文件,本文将继续构建训练数据特征并构建模型。
二、详细流程

将处理完成后的训练数据导出用做线下训练的源数据(可以用Spark_Sql对数据进行处理)
insert overwrite local directory ‘/opt/data/traindata‘ row format delimited fields terminated by ‘\t‘ select * from dw_rcm_hitop_prepare2train_dm;
注:这里是将数据导出到本地,方便后面再本地模式跑数据,导出模型数据。这里是方便演示真正的生产环境是直接用脚本提交spark任务,从hdfs取数据结果仍然在hdfs,再用ETL工具将训练的模型结果文件输出到web项目的文件目录下,用来做新的模型,web项目设置了定时更新模型文件,每天按时读取新模型文件
三、代码详解
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 | packagecom.bjsxt.dataimportjava.io.PrintWriterimportorg.apache.log4j.{ Level, Logger }importorg.apache.spark.mllib.classification.{ LogisticRegressionWithLBFGS, LogisticRegressionModel, LogisticRegressionWithSGD }importorg.apache.spark.mllib.linalg.SparseVectorimportorg.apache.spark.mllib.optimization.SquaredL2Updaterimportorg.apache.spark.mllib.regression.LabeledPointimportorg.apache.spark.mllib.util.MLUtilsimportorg.apache.spark.rdd.RDDimportorg.apache.spark.{ SparkContext, SparkConf }importscala.collection.Map/** * Created by root on 2016/5/12 0012. */classRecommonder {}objectRecommonder {  defmain(args:Array[String]) {    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)    valconf =newSparkConf().setAppName("recom").setMaster("local[*]")    valsc =newSparkContext(conf)    //加载数据,用\t分隔开    valdata:RDD[Array[String]] =sc.textFile("d:/result").map(_.split("\t"))    println("data.getNumPartitions:"+ data.getNumPartitions) //如果文件在本地的话,默认是32M的分片//    -1    Item.id,hitop_id85:1,Item.screen,screen2:1 一行数据格式    //得到第一列的值,也就是label    vallabel:RDD[String] =data.map(_(0))    println(label)    //sample这个RDD中保存的是每一条记录的特征名    valsample:RDD[Array[String]] =data.map(_(1)).map(x => {      valarr:Array[String] =x.split(";").map(_.split(":")(0))      arr    })    println(sample)//    //将所有元素压平,得到的是所有分特征,然后去重,最后索引化,也就是加上下标,最后转成map是为了后面查询用    valdict:Map[String, Long] =sample.flatMap(x =>x).distinct().zipWithIndex().collectAsMap()    //得到稀疏向量    valsam:RDD[SparseVector] =sample.map(sampleFeatures => {      //index中保存的是,未来在构建训练集时,下面填1的索引号集合      valindex:Array[Int] =sampleFeatures.map(feature => {        //get出来的元素程序认定可能为空,做一个类型匹配        valrs:Long =dict.get(feature) match{          caseSome(x) => x        }        //非零元素下标,转int符合SparseVector的构造函数        rs.toInt      })      //SparseVector创建一个向量      newSparseVector(dict.size, index, Array.fill(index.length)(1.0)) //通过这行代码,将哪些地方填1,哪些地方填0    })    //mllib中的逻辑回归只认1.0和0.0,这里进行一个匹配转换    valla:RDD[LabeledPoint] =label.map(x => {      x match{        case"-1"=> 0.0        case"1"=> 1.0      }      //标签组合向量得到labelPoint    }).zip(sam).map(x => newLabeledPoint(x._1, x._2))//    val splited = la.randomSplit(Array(0.1, 0.9), 10)////    la.sample(true, 0.002).saveAsTextFile("trainSet")//    la.sample(true, 0.001).saveAsTextFile("testSet")//    println("done")    //逻辑回归训练,两个参数,迭代次数和步长,生产常用调整参数     vallr =newLogisticRegressionWithSGD()    // 设置W0截距    lr.setIntercept(true)//    // 设置正则化//    lr.optimizer.setUpdater(new SquaredL2Updater)//    // 看中W模型推广能力的权重//    lr.optimizer.setRegParam(0.4)    // 最大迭代次数    lr.optimizer.setNumIterations(10)    // 设置梯度下降的步长,学习率    lr.optimizer.setStepSize(0.1)    valmodel:LogisticRegressionModel =lr.run(la)    //模型结果权重    valweights:Array[Double] =model.weights.toArray    //将map反转,weights相应下标的权重对应map里面相应下标的特征名    valmap:Map[Long, String] =dict.map(_.swap)    //模型保存    //    LogisticRegressionModel.load()    //    model.save()    //输出    valpw =newPrintWriter("model");    //遍历    for(i<- 0until weights.length){      //通过map得到每个下标相应的特征名      valfeatureName =map.get(i)match{        caseSome(x) => x        caseNone => ""      }      //特征名对应相应的权重      valstr =featureName+"\t"+ weights(i)      pw.write(str)      pw.println()    }    pw.flush()    pw.close()  }} | 
model文件截图如下:
各个特征下面对应的权重:

将模型文件和用户历史数据,和商品表数据加载到redis中去。
代码如下:
# -*- coding=utf-8 -*-
import redis
pool = redis.ConnectionPool(host=‘node05‘, port=‘6379‘,db=2)
r = redis.Redis(connection_pool=pool)
f1 = open(‘../data/ModelFile.txt‘)
f2 = open(‘../data/UserItemsHistory.txt‘)
f3 = open(‘../data/ItemList.txt‘)
for i in list:
    lines = i.readlines(100)
    if not lines:
        break
    for line in lines:
        kv = line.split(‘\t‘)
        if i==f1:
          r.hset("rcmd_features_score", kv[0], kv[1])
        if i == f2:
          r.hset(‘rcmd_user_history‘, kv[0], kv[1])
        if i==f3:
          r.hset(‘rcmd_item_list‘, kv[0], line[:-2])
f1.close()
最终redis文件中截图如下:

原文:https://www.cnblogs.com/timssd/p/12578821.html