Spark 执行层包括三个模块,master、worker、client。
master 负责管理 worker 进程,worker 负责任务的执行并将结果提交给 master,client 负责向 master 提交作业。其中,master 和 worker 是后台常驻进程。client 在作业运行过程中由 SparkContext 初始化的时候启动,然后,client 向 master 注册作业。master、worker、client 由事件驱动的 RPC 库来负责任务状态信息的交换,这个库由 akka 框架负责。
Spark 中 RDD 的 shuffle 操作涉及了中间数据的传输。在 map-reduce 过程中,每一个 mapper 为每一个 reducer 分配了一个 bucket 的数据结构用来缓存数据;reducer 通过两种方式获得数据,一种是使用 NIO 建立 socket 连接去 fetch 数据,这种方式是默认方式;一种是 OIO 通过 netty server 去 fetch 数据。与 Hadoop MapReduce 不同的是 Spark 在 Reduce 端没有强制的 merge-sort 操作,而是通过采用了 hashmap 数据结构建立了 key 和 value 之间的对应关系,通过消耗内存的方式减少了“不必要”的操作。
Spark kmeans example 分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 |
// the entrance of spark which contains the operations of creating RDD, accumulation, broadcast, etc. val sc = new
SparkContext(args( 0 ), "SparkLocalKMeans" , System.getenv( "SPARK_HOME" ), Seq(System.getenv( "SPARK_EXAMPLES_JAR" ))) // load the data from input path, such as HDFS or local disks, it is a RDD val lines = sc.textFile(args( 1 )) // convert the string format of the data to the double, it is a RDD val data = lines.map(parseVector _).cache() // k clusters val K = args( 2 ).toInt // termination threshold val convergeDist = args( 3 ).toDouble // initialize the centers of k clusters var kPoints = data.takeSample( false , K, 42 ).toArray var tempDist = 1.0 while (tempDist > convergeDist) { // choose the closest center to label each vector, the output is structured of (label, (vector, 1)) var closest = data.map (p => (closestPoint(p, kPoints), (p, 1 ))) // calculate the sum of vectors in each center, the output is structured (label, (sum of vector, number of vectors)) var pointStats = closest.reduceByKey{ case
((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)} // calculate the average vector in each cluster, the output is structured (label, new centers defined by average vector) var newPoints = pointStats.map {pair => (pair._1, pair._2._1 / pair._2._2)}.collectAsMap() tempDist = 0.0 // calculate the delta between current and previous vectors for
(i <- 0
until K) { tempDist += kPoints(i).squaredDist(newPoints(i)) } for
(newP <- newPoints) { kPoints(newP._1) = newP._2 } println( "Finished iteration (delta = "
+ tempDist + ")" ) } |
将 K-means 应用类比于 Hadoop MR 编程模型,我们主要观察执行 reduceByKey 操作时,Spark 是如何在分布式环境中进行计算的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71 |
/** * Merge the values for each key using an associative reduce function. This will also perform * the merging locally on each mapper before sending results to a reducer, similarly to a * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/ * parallelism level. */ def reduceByKey(func: (V, V) => V): RDD[(K, V)] = { reduceByKey(defaultPartitioner(self), func) } /** * Merge the values for each key using an associative reduce function. This will also perform * the merging locally on each mapper before sending results to a reducer, similarly to a * "combiner" in MapReduce. */ def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = { combineByKey[V]((v: V) => v, func, func, partitioner) } class
PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) ... ... /** * Generic function to combine the elements for each key using a custom set of aggregation * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C * Note that V and C can be different -- for example, one might group an RDD of type * (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions: * * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list) * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list) * - `mergeCombiners`, to combine two C‘s into a single one. * * In addition, users can control the partitioning of the output RDD, and whether to perform * map-side aggregation (if a mapper can produce multiple items with the same key). */ def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true , serializerClass: String = null ): RDD[(K, C)] = { if
(getKeyClass().isArray) { if
(mapSideCombine) { throw
new SparkException( "Cannot use map-side combining with array keys." ) } if
(partitioner.isInstanceOf[HashPartitioner]) { throw
new SparkException( "Default partitioner cannot partition array keys." ) } } val aggregator = new
Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) if
(self.partitioner == Some(partitioner)) { self.mapPartitionsWithContext((context, iter) => { new
InterruptibleIterator(context, aggregator.combineValuesByKey(iter)) }, preservesPartitioning = true ) } else
if (mapSideCombine) { val combined = self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true ) val partitioned = new
ShuffledRDD[K, C, (K, C)](combined, partitioner) .setSerializer(serializerClass) partitioned.mapPartitionsWithContext((context, iter) => { new
InterruptibleIterator(context, aggregator.combineCombinersByKey(iter)) }, preservesPartitioning = true ) } else
{ // Don‘t apply map-side combiner. // A sanity check to make sure mergeCombiners is not defined. assert (mergeCombiners == null ) val values = new
ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass) values.mapPartitionsWithContext((context, iter) => { new
InterruptibleIterator(context, aggregator.combineValuesByKey(iter)) }, preservesPartitioning = true ) } } |
[1]. Spark源码解析 – Shuffle http://blog.csdn.net/mango_song/article/details/17933115
[2]. Spark源码分析 – Shuffle http://www.cnblogs.com/fxjwind/p/3522219.html
[3]. Spark reference http://jerryshao.me/tags.html#spark-ref
原文:http://www.cnblogs.com/lf1205/p/3575000.html