关于GraphX的join操作,官网上给出两个方法:
JoinVertices连接输入RDD顶点并返回新的Graph,RDD中没有匹配值的顶点就保留其原始值。如果RDD对于给定的顶点包含多个值,则会使用一个。
outerJoinVertices除了可以将用户定义的函数应用于所有顶点并可以更改顶点属性外,若顶点在输入RDD中没有匹配值时,可以使用None值作为该点的属性值。
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx.{Edge, Graph, VertexId}
import org.apache.spark.rdd.RDD
object JoinDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SimpleGraphX").setMaster("local")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    //设置顶点数据
    val users: RDD[(VertexId, (String, Int))] =
      sc.parallelize(Array((1L, ("Alice", 28)), (2L, ("Bob", 27)), (3L, ("Charlie", 65)), (4L, ("David", 42)), (5L, ("Ed", 55)), (6L, ("Fran", 50))))
    //设置边
    val edges: RDD[Edge[Int]] =
      sc.parallelize(Array(Edge(2L, 1L, 7), Edge(2L, 4L, 2), Edge(3L, 2L, 4), Edge(3L, 6L, 3), Edge(4L, 1L, 1), Edge(5L, 2L, 2), Edge(5L, 3L, 8), Edge(5L, 6L, 3)))
    val graph = Graph(users, edges)
    // 任意设置一个新的外部rdd
    val rdd: RDD[(VertexId, Boolean)] =
      sc.parallelize(Array((3L, true), (3L, false), (2L, true), (1L, true), (5L, false), (4L, false)))
    rdd.collect.foreach(println(_))
    println("******************************************")
    graph.vertices.collect.foreach(println(_))
    //若外部rdd为true,年龄+1;若的false,年龄-1;若没有关联到,则设置年龄为0
    val graph2: Graph[(String, Int), Int] = graph.outerJoinVertices(rdd)((id, attr, ages) => (
      ages match {
        case Some(true) => (attr._1, attr._2 + 1)
        case Some(false) => (attr._1, attr._2 - 1)
        case None => (attr._1, 0)
      }
      ))
    println("***********************************************")
    graph2.vertices.collect.foreach(println(_))
  }
}
运行结果:

原文:https://www.cnblogs.com/WhiteDeer-w/p/12707275.html