Spark學習筆記:基於HDFS的實時計算WordCount
阿新 • • 發佈:2018-12-10
基於HDFS的實時計算WordCount
基於HDFS檔案的實時計算,其實就是監控一個HDFS目錄,只要有新檔案出現就實時處理 StreamingContext.fileStream(dataDirectory)方法可以從多種檔案系統的檔案中讀取資料,然後建立一個DStream
package StreamingDemo import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * 基於HDFS的實時WordCount */ object HDFSWordCount { def main(args: Array[String]): Unit = { //設定日誌的級別 Logger.getLogger("org").setLevel(Level.WARN) val conf=new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]") val ssc=new StreamingContext(conf,Seconds(2)) //從HDFS相應的目錄中獲取資料,建立輸入DStream,監控input目錄 val inputDStream=ssc.textFileStream("hdfs://Hadoop01:9000/input") val wordCountDStram = inputDStream.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) wordCountDStram.print() ssc.start() ssc.awaitTermination() } }
注意事項 1.所有放入HDFS目錄中的檔案,都必須有相同的格式 2.檔案一旦處理之後,檔案的內容即使改變,也不會再處理了 3.基於HDFS檔案的資料來源是沒有Receiver(自定義的receiver相當於Socket套接字的客戶端程式設計)的,因此不會佔用一個cpu core