首页 > 其他 > 详细

[推荐系统] 基于Spark的 user-base的协同过滤

时间:2016-01-10 11:33:25      阅读:501      评论:0      收藏:0      [点我收藏+]
import breeze.numerics.{pow, sqrt}
import org.apache.spark.{SparkContext, SparkConf}


/**
  * Created by gavinzjchao on 2016/1/8.
  */
object SparkTest008 {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("UserSimilarity")
    val sc = new SparkContext(conf)

    val data = sc.parallelize(List("u1,v1,2", "u1,v2,1", "u1,v3,2", "u2,v1,3", "u2,v3,4", "u2,v4,1", "u2,v2,9", "u3,v2,9"))

    // get user -> (video, score)
    val rddUserRating = data.map {
      line => val fields = line.trim().split(",")
        fields match {
          case Array(user, video, score) => (user, video, score)
        }
    }

    //  get user‘s score square sum: sqrt(s1^2 + s2^2 + ... + sn^2)
    val rddUserScoreSum = rddUserRating
      .map(fields => (fields._1, pow(fields._3.toDouble, 2)))
      .reduceByKey(_ + _)
      .map(fields => (fields._1, sqrt(fields._2)))

    // get <video, (user, score)>
    val rddVideoInfo = rddUserRating.map(tokens => tokens._2 -> (tokens._1, tokens._3))

    // get <video, ((user1, score1), (user2, score2))>
    val rddUserPairs = rddVideoInfo.join(rddVideoInfo)
      .filter {
        tokens => tokens match {
          case (video, ((user1, score1), (user2, score2))) => user1 < user2
        }
      }

    // get score1 * score2 and reduce by key (user1, user2) and get sum
    val rddUserPairScore = rddUserPairs.map {
      tokens => tokens match {
        case (video, ((user1, score1), (user2, score2))) => (user1, user2) -> score1.toDouble * score2.toDouble
      }
    }
      .reduceByKey(_ + _)

    // get cos similarity
    val rddSimilarityTmp = rddUserPairScore.map {
      tokens => tokens match {
        case ((user1, user2), productScore) => user1 -> (user2, productScore)
      }
    }
      .join(rddUserScoreSum)

    val rddSimilarity = rddSimilarityTmp.map {
      tokens => tokens match {
        case (user1, ((user2, productScore), squareSumScore1)) => user2 -> ((user1, squareSumScore1), productScore)
      }
    }
      .join(rddUserScoreSum)

    val userSimilarity = rddSimilarity.map {
      tokens => tokens match {
        case (user2, (((user1, squareSumScore1), productScore), squareSumScore2)) => (user1, user2) -> productScore / (squareSumScore1 * squareSumScore2)
      }
    }

    for (i <- userSimilarity) {
      print(s"$i\n")

    }

    sc.stop()
  }
}

  

[推荐系统] 基于Spark的 user-base的协同过滤

原文:http://www.cnblogs.com/alexander-chao/p/5117889.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!