数组排序是一个常见的操作。基于比较的排序算法其性能下限是O(nlog(n)),但在分布式环境下面我们可以并发,从而提高性能。这里展示了Spark中数组排序的实现,并分析了性能,同时尝试找到导致性能提升的原因。
import sys
from pyspark import SparkContext
if __name__ == "__main__":
if len(sys.argv) != 2:
print >> sys.stderr, "Usage: sort <file>"
exit(-1)
sc = SparkContext(appName="PythonSort")
lines = sc.textFile(sys.argv[1], 1)
sortedCount = lines.flatMap(lambda x: x.split(' ')) .map(lambda x: (int(x), 1)) .sortByKey(lambda x: x)
# This is just a demo on how to bring all the sorted data back to a single node.
# In reality, we wouldn't want to collect all the data to the driver node.
output = sortedCount.collect()
for (num, unitcount) in output:
print num
sc.stop()方法textFile读入一个文本文件,并在Spark环境里创建相应的RDD集。这个数据集存放在lines变量中。方法flatMap和map不同,map返回的是一个key,value的对,得到的RDD集和哈希表有点像。而flatMap的输出结果是数组。这个数组是对每个元素调用传入的lambda函数后得到的结果的并。这意味着传入的lambda函数可以返回0个或多个结果,比如:
>>> rdd = sc.parallelize([2, 3, 4]) >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect()) [1, 1, 1, 2, 2, 3]
容易看出,这段程序实际上只是调用了Spark提供的排序接口sortByKey,而不是在代码中实现一个排序算法。方法sortByKey的底层实现如下:
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size)
: RDD[(K, V)] = {
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
}访问本人博客获得更多信息:magic01
原文:http://blog.csdn.net/alburthoffman/article/details/43957795