Spark實戰----(1)使用Scala開發本地測試的Spark WordCount程式
阿新 • • 發佈:2018-12-25
第一步:JDk的安裝
第二步:Scala的安裝 不會的可以看這裡 Scala環境安裝
鑑於以上兩步較為簡單,不再詳細贅述
第三步:去Spark官方網站下載Spark包 我下載的檔名是spark-1.6.2-bin-hadoop2.6
點選DownLoad就可以下載了,下載完並解壓
第四步:IDE選擇
我用的是 intellij IDEA ,不過我學習的時候用的是Scala for Eclipse,用法嘛大同小異的,個人推薦IDEA
第五步:建立工程
在eclipse中點選File->New->Scala Project ,填上Project name 然後點選finish
第六步:
更改Scala Library container的版本
第七步:匯入Spark 的jar包
在專案上右鍵,找到Build Path -> Configure Build Path
彈出這個視窗後,點選 Add External JARs ,找到Spark 包的位置,點選lib檔案,找到spark-assembly-1.6.2-hadoop2.6.0.jar
點選ok可以看到專案裡面多了一個Referenced Libraries,點開發現下面就是我們剛剛新增的包
第八步:建立包和scala檔案
在src下右鍵 點選Package,新建一個包,我命名的是cn.limbo.spark,至此專案的結構如下所示
其中WordCount是我們需要編寫的檔案,(上面那個別管 = =)
第九步:編寫WordCount.scala,程式碼如下
之後就可以看到控制檯的列印結果了~package cn.limbo.spark import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD /** * 使用Scala開發本地測試的Spark WordCount程式 */ object WordCount { def main(args: Array[String]): Unit = { /** * 第一步:建立Spark的配置物件SparkConf,設定Spark程式的執行時的配置資訊 * 例如說通過setMaster來設定程式要連線的Spark叢集的Master的URL * 如果設定為local,則代表Spark程式在本地執行,特別適合於配置條件的較差的人 * */ val conf = new SparkConf() conf.setAppName("MyFirstSparkApplication") //設定應用程式的名稱,在程式執行的監控介面可以看到名稱 conf.setMaster("local") //此時程式在本地執行,無需安裝Spark的任何叢集 /** * 第二步:建立SparkContext物件 * SparkContext是Spark程式所有功能的唯一入口,無論是採用Scala,Java,Python等都必須有一個SparkContext * SparkContext核心作用:初始化Spark應用程式執行所需要的核心元件,包括DAGScheduler,TaskScheduler,Scheduler * 同時還會負責Spark程式往Master註冊程式等 * SparkContext是整個Spark應用程式中最為至關重要的一個物件。 */ val sc = new SparkContext(conf) //建立SparkContext物件,通過傳入SparkConf例項來定製Spark執行的具體引數和配置資訊 /** * 第三步:根據具體的資料來源(HDFS,HBase,Local FS(本地檔案系統) ,DB,S3(雲上)等)通過SparkContext來建立RDD * RDD的建立基本有三種方式,根據外部的資料來源(例如HDFS),根據Scala集合,由其他的RDD操作產生 * 資料會被RDD劃分成為一系列的Partitions,分配到每個Partition的資料屬於一個Task的處理範疇 */ //檔案的路徑,最小並行度(根據機器數量來決定) //val lines:RDD[String]= sc.textFile("F://spark//spark-1.6.2-bin-hadoop2.6//README.md", 1) //讀取本地檔案,並設定Partition = 1 val lines= sc.textFile("F://spark//spark-1.6.2-bin-hadoop2.6//README.md", 1) //讀取本地檔案,並設定Partition = 1 //型別推導得出lines為RDD /** * 第四步:對初始的RDD進行Transformation級別的處理,例如map,filter等高階函式等的程式設計,來進行具體的資料計算 * 4.1:將每一行的字串拆分成單個的單詞 * 4.2:在單詞拆分的基礎上對每個單詞的例項計數為1,也就是word =>(word,1) * 4.3:在每個單詞例項計數為1基礎之上統計每個單詞在檔案出現的總次數 */ //對每一行的字串進行單詞的拆分並把所有行的拆分結果通過flat合併成為一個大的單詞集合 val words = lines.flatMap { line => line.split(" ") } //words同樣是RDD型別 val pairs = words.map { word => (word,1) } val wordCounts = pairs.reduceByKey(_+_) //對相同的key,進行value的累加(包括Local和Reducer級別同時Reduce) wordCounts.foreach(wordNumberPair => println(wordNumberPair._1 + " : " + wordNumberPair._2)) sc.stop() //注意一定要將SparkContext的物件停止,因為SparkContext執行時會建立很多的物件 /*這個程式執行之後一定會有一個錯誤,因為 沒有hadoop環境,這個不是程式錯誤,也不影響任何功能*/ } }
至此,Spark本地的部署就結束了
下一章介紹叢集部署