1. 程式人生 > >Spark學習筆記:基於HDFS的實時計算WordCount

Spark學習筆記:基於HDFS的實時計算WordCount

基於HDFS的實時計算WordCount

基於HDFS檔案的實時計算,其實就是監控一個HDFS目錄,只要有新檔案出現就實時處理 StreamingContext.fileStream(dataDirectory)方法可以從多種檔案系統的檔案中讀取資料,然後建立一個DStream

package StreamingDemo

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 基於HDFS的實時WordCount
  */
object HDFSWordCount {
  def main(args: Array[String]): Unit = {
    //設定日誌的級別
    Logger.getLogger("org").setLevel(Level.WARN)
    val conf=new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
    val ssc=new StreamingContext(conf,Seconds(2))

    //從HDFS相應的目錄中獲取資料,建立輸入DStream,監控input目錄
    val inputDStream=ssc.textFileStream("hdfs://Hadoop01:9000/input")
    val wordCountDStram = inputDStream.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

    wordCountDStram.print()

    ssc.start()
    ssc.awaitTermination()

  }
}

注意事項 1.所有放入HDFS目錄中的檔案,都必須有相同的格式 2.檔案一旦處理之後,檔案的內容即使改變,也不會再處理了 3.基於HDFS檔案的資料來源是沒有Receiver(自定義的receiver相當於Socket套接字的客戶端程式設計)的,因此不會佔用一個cpu core