闲来无事,整理一下算法。今天整理一下PageRank。
网上搜了搜感觉这篇文章还不错
http://www.cnblogs.com/fengfenggirl/p/pagerank-introduction.html
本文对这篇文章进行修改加工,加入了一些自己的思想,后面代码实现用了Spark而不是原文的MR。
PageRank作用是给出网页的重要性,它的思想是这样的:
根据“民主投票”来确定的重要性,越重要的话语权越大,不断的投票获得新的重要值,直到收敛。(能力有限就不推导证明收敛了)
PageRank让网页通过链接去投票。如果网页A有三个链接B、C、D,其重要性为PRa,那么BCD这轮迭代分别从A获得了1/3 PRa的投票值。一个网页把所有获得的投票值累加起来,就是新的网页重要值。
还有一种解释就是概率的解释。我们最开始认为所有网页被访问的概率都是一样的,假设有M个网页,那么每个网页被访问的概率是1/M。一个网页如果有N个链接,那么从这个网页到一个被链接的网页的概率就是1/M*1/N。
但是迭代过程中有一个问题,就是没有出度的节点,最后会使每个网页被访问的概率都是零,用上面的说法解释就是,我进入了这个网页后我没有概率去访问任何网站。另一个问题就是出度wei1,自己链接自己,这样迭代下去会变成其他网页概率为零,只有自己是1,也就是进入这个网站后只能访问自己了。
模拟下人浏览网站,用户不一定非要点击页面中的链接进入下一个页面,还可以通过随机输入网址获得下一页。所以为了解决上诉两个问题,加入了一个?概率。
其中M为网页链接关系矩阵,V为旧的PageRank值(就是重要性值),e为初始值。
下面是spark实现的pagerank,根据官网example代码修改
import org.apache.spark.sql.SparkSession
/*
我们的数据是这种格式的
url 链接的url
中间用空格隔开,每行两个url
**/
object SparkPageRank {
def main(args: Array[String]) {
if (args.length < 1) {
System.err.println("Usage: SparkPageRank <file> <iter>")
System.exit(1)
}
showWarning()
//spark 2.0版本
val spark = SparkSession
.builder
.appName("SparkPageRank")
.getOrCreate()
// 循环次数
val iters = if (args.length > 1) args(1).toInt else 10
val lines = spark.read.textFile(args(0)).rdd
// 生成(url,[a,b,c...])
val links = lines.map{ s =>
val parts = s.split("\\s+")
(parts(0), parts(1))
}.distinct().groupByKey().cache()//以后还用先存起来
// 生成rank表 (url,rank)初始值都为1.0,也就是e=[1,1,1,..]T
var ranks = links.mapValues(v => 1.0)
for (i <- 1 to iters) {
val contribs = links.join(ranks).values.flatMap{ case (urls, rank) =>
val size = urls.size
urls.map(url => (url, rank / size))
}
//(url1,r1),(url1,r2),(url2,r3)....
ranks =contribs.reduceByKey(_+_).mapValues(0.15 + 0.85 * _ )
}
val output = ranks.collect()
output.foreach(tup => println(tup._1 + " has rank: " + tup._2 + "."))
spark.stop()
}
}
原文:http://www.cnblogs.com/matianchi/p/7010773.html