Spark示例之陣列排序
阿新 • • 發佈:2019-01-31
陣列排序是一個常見的操作。基於比較的排序演算法其效能下限是O(nlog(n)),但在分散式環境下面我們可以併發,從而提高效能。這裡展示了Spark中陣列排序的實現,並分析了效能,同時嘗試找到導致效能提升的原因。
官方示例
import sys from pyspark import SparkContext if __name__ == "__main__": if len(sys.argv) != 2: print >> sys.stderr, "Usage: sort <file>" exit(-1) sc = SparkContext(appName="PythonSort") lines = sc.textFile(sys.argv[1], 1) sortedCount = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (int(x), 1)) \ .sortByKey(lambda x: x) # This is just a demo on how to bring all the sorted data back to a single node. # In reality, we wouldn't want to collect all the data to the driver node. output = sortedCount.collect() for (num, unitcount) in output: print num sc.stop()
所有Spark應用的入口都是SparkContext的例項。它是整個Spark環境的一個抽象。考慮都分佈環境的複雜性,如果程式設計時還要考慮資料集的劃分以及哪臺機子上面計算,那麼Spark的可用性將大大降低。SparkContext的例項就是整個環境的入口,就想汽車的操作介面一樣,程式設計時只要呼叫相應的介面,傳入資料,它就會在分散式系統中分發和執行,並儘可能達到最大的效能。在程式的最後,還要呼叫其stop方法來斷開環境。
方法textFile讀入一個文字檔案,並在Spark環境裡建立相應的RDD集。這個資料集存放在lines變數中。方法flatMap和map不同,map返回的是一個key,value
>>> rdd = sc.parallelize([2, 3, 4])
>>> sorted(rdd.flatMap(lambda x: range(1, x)).collect())
[1, 1, 1, 2, 2, 3]
接著呼叫map方法把每個陣列都變成(key,value)的形式,最後使用sortByKey來進行排序。排好序後得RDD集是sortedCount,呼叫其
容易看出,這段程式實際上只是呼叫了Spark提供的排序介面sortByKey,而不是在程式碼中實現一個排序演算法。方法sortByKey的底層實現如下:
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size)
: RDD[(K, V)] = {
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
}
其思路就是將資料分成不想交的區間(區間的數目預設為RDD分割槽的個數),然後再單獨對每個區間使用排序演算法。這個實現十分優雅,只用了兩行程式碼。假設分割槽的個數是m,資料集的大小為n,可以預期其時間複雜度為O((n/m)log(n))。Spark的一個分割槽會執行一個task,最終效果跟並行執行是一樣的。
訪問本人部落格獲得更多資訊:magic01