Spark實現WordCount
首先,編寫第一個Spark應用程式 ,我們是如何建立起來的,其入口在哪裡呢,需要建立兩個物件。
一:val conf = new SparkConf()
.setAppName("WordCount")
.setMaster("local")
建立SparkConf物件,設定Spark應用的配置資訊。setAppName() 設定Spark應用程式在執行中的名字;如果是叢集執行,就可以在監控頁面直觀看到我們執行的job任務。setMaster() 設定執行模式、是本地執行,設定為local即可;如果是叢集執行,就可以設定程式要連線的Spark叢集的master節點的url。
二:val sc = new SparkContext(conf)
建立SparkContext物件, 在Spark中,SparkContext是Spark所有功能的一個入口,你無論是用java、scala,甚至是python編寫,都必須要有一個SparkContext,它的主要作用,包括初始化Spark應用程式所需的一些核心元件,包括排程器(DAGSchedule、TaskScheduler),還會去Spark Master節點上進行註冊等。所以SparkContext在Spark應用中是很重要的一個物件。
現在假設我們HDFS上有一個數據檔案data.txt檔案,需要對其進行WordCount統計計算
第A步:val lines = sc.textFile("hdfs://") ,主要功能是載入HDFS中的資料檔案進入Spark本地或是叢集計算,這裡我們使用的是SparkContext的textFile運算元,載入後的資料將以每行記錄組成元素,元素型別為String。
第B步:val words = lines.flatMap { line => line.split(" ") } ,主要是對每一行進行操作。這裡使用transformation中的flatMap運算元,作用是可以將一個map資料集轉變成為flap資料集,即資料扁平化處理。這裡也就是將輸入檔案的每一行資料,按空格(" ")進行拆分,得到單詞陣列,再將陣列進行扁平化後形成單詞字串,在flatMapRDD中.
第C步:val pairs = words.map { word => (word, 1) },主要是將第二步的單詞陣列flatMapRDD中的資料進行標記,即每個行的格式由單個單詞轉變成的形式。
第D步:val wordCounts = pairs.reduceByKey { _ + _ },主要是將第三步產生的pairs元素的不同RDD中相同key值拉到一起進行value的歸併操作。
第E步:wordCounts.foreach(wordCount => println("")),對結果資料進行action操作遍歷輸出到客戶端控制檯。