1. 程式人生 > >Spark示例之陣列排序

Spark示例之陣列排序

陣列排序是一個常見的操作。基於比較的排序演算法其效能下限是O(nlog(n)),但在分散式環境下面我們可以併發,從而提高效能。這裡展示了Spark中陣列排序的實現,並分析了效能,同時嘗試找到導致效能提升的原因。

官方示例

import sys
 
from pyspark import SparkContext
 
if __name__ == "__main__":
    if len(sys.argv) != 2:
        print >> sys.stderr, "Usage: sort <file>"
        exit(-1)
    sc = SparkContext(appName="PythonSort")
    lines = sc.textFile(sys.argv[1], 1)
    sortedCount = lines.flatMap(lambda x: x.split(' ')) \
        .map(lambda x: (int(x), 1)) \
        .sortByKey(lambda x: x)
    # This is just a demo on how to bring all the sorted data back to a single node.
    # In reality, we wouldn't want to collect all the data to the driver node.
    output = sortedCount.collect()
    for (num, unitcount) in output:
        print num
 
    sc.stop()

所有Spark應用的入口都是SparkContext的例項。它是整個Spark環境的一個抽象。考慮都分佈環境的複雜性,如果程式設計時還要考慮資料集的劃分以及哪臺機子上面計算,那麼Spark的可用性將大大降低。SparkContext的例項就是整個環境的入口,就想汽車的操作介面一樣,程式設計時只要呼叫相應的介面,傳入資料,它就會在分散式系統中分發和執行,並儘可能達到最大的效能。在程式的最後,還要呼叫其stop方法來斷開環境。

方法textFile讀入一個文字檔案,並在Spark環境裡建立相應的RDD集。這個資料集存放在lines變數中。方法flatMapmap不同,map返回的是一個key,value

的對,得到的RDD集和雜湊表有點像。而flatMap的輸出結果是陣列。這個陣列是對每個元素呼叫傳入的lambda函式後得到的結果的並。這意味著傳入的lambda函式可以返回0個或多個結果,比如:

>>> rdd = sc.parallelize([2, 3, 4])
>>> sorted(rdd.flatMap(lambda x: range(1, x)).collect())
[1, 1, 1, 2, 2, 3]

接著呼叫map方法把每個陣列都變成(key,value)的形式,最後使用sortByKey來進行排序。排好序後得RDD集是sortedCount,呼叫其
collect方法即可返回集裡面的資料。

容易看出,這段程式實際上只是呼叫了Spark提供的排序介面sortByKey,而不是在程式碼中實現一個排序演算法。方法sortByKey的底層實現如下:

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size)      
	: RDD[(K, V)] =  {    
	val part = new RangePartitioner(numPartitions, self, ascending)    
	new ShuffledRDD[K, V, V](self, part)      
		.setKeyOrdering(if (ascending) ordering else ordering.reverse)  
}

其思路就是將資料分成不想交的區間(區間的數目預設為RDD分割槽的個數),然後再單獨對每個區間使用排序演算法。這個實現十分優雅,只用了兩行程式碼。假設分割槽的個數是m,資料集的大小為n,可以預期其時間複雜度為O((n/m)log(n))Spark的一個分割槽會執行一個task,最終效果跟並行執行是一樣的。

訪問本人部落格獲得更多資訊:magic01