大話Spark(3)-一圖深入理解WordCount程式在Spark中的執行過程
阿新 • • 發佈:2019-05-24
本文以WordCount為例, 畫圖說明spark程式的執行過程
WordCount就是統計一段資料中每個單詞出現的次數,
例如hello spark hello you
這段文字中hello
出現2次, spark
出現1次, you
出現1次.
先上完整程式碼:
object WordCount { def main(args: Array[String]) { val conf = new SparkConf().setAppName("WordCount"); val sc = new SparkContext(conf) val lines = sc.textFile("hdfs://xxx:9000/spark.txt", 3); val words = lines.flatMap { line => line.split("\s+") } val pairs = words.map { word => (word, 1) } val wordCounts = pairs.reduceByKey { _ + _ } wordCounts.foreach(wordCount => println(wordCount._1 + " appeared " + wordCount._2 + " times.")) } }
上面幾行程式碼就把hdfs上的spark.txt
中每個單詞出現的個數計算完成.
Spark叢集的執行單位是Application,任何提交的任務都會產生一個Application。一個Application只會關聯上一個Spark上下文,也就是SparkContext。構建SparkContext時可以傳入Spark相關配置,也就是SparkConf,它可以用來指定Application的名稱,任務需要的CPU核數/記憶體大小,調優需要的配置等等. 以下兩行建立了SparkContext:
val conf = new SparkConf().setAppName("WordCount"); val sc = new SparkContext(conf)
建立完SparkContext之後, spark.txt的檔案數如何被spark處理的呢,讓我們一起看一下:
首先我們假設spark.txt在hdfs上對應著3個檔案,檔案內容都一樣,sc.textFile("hdfs://xxx:9000/spark.txt", 3)
也執行了最小分割槽數為3.
然後wordcount執行過程如下:

說明:
- 綠,紅,黃色箭頭的地方發生了`Shuffer,把整個任務分成了2個Stage(2個藍色虛線框)
- 紅色虛線框代表一個Partition窄依賴(每個分割槽只被子RDD的一個分割槽所使用)的執行過程, 多個partition是並行執行的
- reduceByKey會先把每個Partition中的資料預聚合(groupByKey不會)
- Stage中的資料都是在記憶體中,不像MapReduce會頻繁寫磁碟,速度很快.
- 補充:其實
textFile,flatMap,map,reduceByKey
等transformation操作都是lazy的,程式執行到這裡不會立即執行,只有再觸發action操作的時候才會執行,此例中為wordCounts.foreach
這個action操作.
&n