关于GraphX的join操作,官网上给出两个方法:
JoinVertices连接输入RDD顶点并返回新的Graph,RDD中没有匹配值的顶点就保留其原始值。如果RDD对于给定的顶点包含多个值,则会使用一个。
outerJoinVertices除了可以将用户定义的函数应用于所有顶点并可以更改顶点属性外,若顶点在输入RDD中没有匹配值时,可以使用None值作为该点的属性值。
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx.{Edge, Graph, VertexId}
import org.apache.spark.rdd.RDD
object JoinDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SimpleGraphX").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
//设置顶点数据
val users: RDD[(VertexId, (String, Int))] =
sc.parallelize(Array((1L, ("Alice", 28)), (2L, ("Bob", 27)), (3L, ("Charlie", 65)), (4L, ("David", 42)), (5L, ("Ed", 55)), (6L, ("Fran", 50))))
//设置边
val edges: RDD[Edge[Int]] =
sc.parallelize(Array(Edge(2L, 1L, 7), Edge(2L, 4L, 2), Edge(3L, 2L, 4), Edge(3L, 6L, 3), Edge(4L, 1L, 1), Edge(5L, 2L, 2), Edge(5L, 3L, 8), Edge(5L, 6L, 3)))
val graph = Graph(users, edges)
// 任意设置一个新的外部rdd
val rdd: RDD[(VertexId, Boolean)] =
sc.parallelize(Array((3L, true), (3L, false), (2L, true), (1L, true), (5L, false), (4L, false)))
rdd.collect.foreach(println(_))
println("******************************************")
graph.vertices.collect.foreach(println(_))
//若外部rdd为true,年龄+1;若的false,年龄-1;若没有关联到,则设置年龄为0
val graph2: Graph[(String, Int), Int] = graph.outerJoinVertices(rdd)((id, attr, ages) => (
ages match {
case Some(true) => (attr._1, attr._2 + 1)
case Some(false) => (attr._1, attr._2 - 1)
case None => (attr._1, 0)
}
))
println("***********************************************")
graph2.vertices.collect.foreach(println(_))
}
}
运行结果:

原文:https://www.cnblogs.com/WhiteDeer-w/p/12707275.html