首页 > 其他 > 详细

Spark 代码阅读笔记

时间:2014-03-03 16:20:29      阅读:566      评论:0      收藏:0      [点我收藏+]

Spark 执行层包括三个模块,master、worker、client。

master 负责管理 worker 进程,worker 负责任务的执行并将结果提交给 master,client 负责向 master 提交作业。其中,master 和 worker 是后台常驻进程。client 在作业运行过程中由 SparkContext 初始化的时候启动,然后,client 向 master 注册作业。master、worker、client 由事件驱动的 RPC 库来负责任务状态信息的交换,这个库由 akka 框架负责。

Spark 中 RDD 的 shuffle 操作涉及了中间数据的传输。在 map-reduce 过程中,每一个 mapper 为每一个 reducer 分配了一个 bucket 的数据结构用来缓存数据;reducer 通过两种方式获得数据,一种是使用 NIO 建立 socket 连接去 fetch 数据,这种方式是默认方式;一种是 OIO 通过 netty server 去 fetch 数据。与 Hadoop MapReduce 不同的是 Spark 在 Reduce 端没有强制的 merge-sort 操作,而是通过采用了 hashmap 数据结构建立了 key 和 value 之间的对应关系,通过消耗内存的方式减少了“不必要”的操作。

Spark kmeans example 分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// the entrance of spark which contains the operations of creating RDD, accumulation, broadcast, etc.
val sc = new SparkContext(args(0), "SparkLocalKMeans",
  System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
// load the data from input path, such as HDFS or local disks, it is a RDD
val lines = sc.textFile(args(1))
// convert the string format of the data to the double, it is a RDD
val data = lines.map(parseVector _).cache()
// k clusters
val K = args(2).toInt
// termination threshold
val convergeDist = args(3).toDouble
 
// initialize the centers of k clusters
var kPoints = data.takeSample(false, K, 42).toArray
var tempDist = 1.0
 
while(tempDist > convergeDist) {
  // choose the closest center to label each vector, the output is structured of (label, (vector, 1))
  var closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
  // calculate the sum of vectors in each center, the output is structured (label, (sum of vector, number of vectors))
  var pointStats = closest.reduceByKey{case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)}
  // calculate the average vector in each cluster, the output is structured (label, new centers defined by average vector)
  var newPoints = pointStats.map {pair => (pair._1, pair._2._1 / pair._2._2)}.collectAsMap()
   
  tempDist = 0.0
  // calculate the delta between current and previous vectors
  for (i <- 0 until K) {
    tempDist += kPoints(i).squaredDist(newPoints(i))
  }
   
  for (newP <- newPoints) {
    kPoints(newP._1) = newP._2
  }
  println("Finished iteration (delta = " + tempDist + ")")
}

将 K-means 应用类比于 Hadoop MR 编程模型,我们主要观察执行 reduceByKey 操作时,Spark 是如何在分布式环境中进行计算的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
  /**
   * Merge the values for each key using an associative reduce function. This will also perform
   * the merging locally on each mapper before sending results to a reducer, similarly to a
   * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
   * parallelism level.
   */
  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
    reduceByKey(defaultPartitioner(self), func)
  }
 
  /**
   * Merge the values for each key using an associative reduce function. This will also perform
   * the merging locally on each mapper before sending results to a reducer, similarly to a
   * "combiner" in MapReduce.
   */
  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
    combineByKey[V]((v: V) => v, func, func, partitioner)
  }
 
class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
...
...
  /**
   * Generic function to combine the elements for each key using a custom set of aggregation
   * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
   * Note that V and C can be different -- for example, one might group an RDD of type
   * (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:
   *
   * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
   * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
   * - `mergeCombiners`, to combine two C‘s into a single one.
   *
   * In addition, users can control the partitioning of the output RDD, and whether to perform
   * map-side aggregation (if a mapper can produce multiple items with the same key).
   */
  def combineByKey[C](createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializerClass: String = null): RDD[(K, C)] = {
    if (getKeyClass().isArray) {
      if (mapSideCombine) {
        throw new SparkException("Cannot use map-side combining with array keys.")
      }
      if (partitioner.isInstanceOf[HashPartitioner]) {
        throw new SparkException("Default partitioner cannot partition array keys.")
      }
    }
    val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
    if (self.partitioner == Some(partitioner)) {
      self.mapPartitionsWithContext((context, iter) => {
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter))
      }, preservesPartitioning = true)
    } else if (mapSideCombine) {
      val combined = self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
      val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
        .setSerializer(serializerClass)
      partitioned.mapPartitionsWithContext((context, iter) => {
        new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter))
      }, preservesPartitioning = true)
    } else {
      // Don‘t apply map-side combiner.
      // A sanity check to make sure mergeCombiners is not defined.
      assert(mergeCombiners == null)
      val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)
      values.mapPartitionsWithContext((context, iter) => {
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter))
      }, preservesPartitioning = true)
    }
  }

  

[1]. Spark源码解析 – Shuffle http://blog.csdn.net/mango_song/article/details/17933115

[2]. Spark源码分析 – Shuffle http://www.cnblogs.com/fxjwind/p/3522219.html

[3]. Spark reference http://jerryshao.me/tags.html#spark-ref

Spark 代码阅读笔记,布布扣,bubuko.com

Spark 代码阅读笔记

原文:http://www.cnblogs.com/lf1205/p/3575000.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!