1. 程式人生 > >spark streaming監控HDFS檔案目錄

spark streaming監控HDFS檔案目錄

叢集環境:CDH5.8.0 / spark1.6.0 / scala2.10.4

基於Scala的基本使用方式如下:

package com.egridcloud.sparkstreaming

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.SparkConf
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{ Durations, StreamingContext}
/**
  * Created by LHX on 2018/3/7 下午 8:06.
  * 監控資料夾,實現單詞統計,結果儲存到HDFS
  */
object SparkStreamingFile {
  def main(args: Array[String]): Unit = {
    val classes: Array[Class[_]] = Array[Class[_]](classOf[LongWritable], classOf[Text])
    val conf = new SparkConf().setAppName("sparkstreamingfile")//.setMaster("local[2]")
    conf.set("spark.streaming.fileStream.minRememberDuration", "2592000s")
    conf.set("spark.serialize", classOf[KryoSerializer].getName())
    conf.registerKryoClasses(classes)
    //    設定批次間隔時間
    val streamingContext = new StreamingContext(conf, Durations.seconds(30))
    //          val inputPath = "C:/tmp/sparkstreamingfile"
    val inputPath = args(0)
    //          val outputPath = "C:/tmp/sparkstreamingfile_save/"
    val outputPath=args(1)
    val hadoopConf = new Configuration()
    val fileStream: InputDStream[(LongWritable, Text)] = streamingContext.fileStream[LongWritable,Text,TextInputFormat](inputPath, (path: Path) => {println(path.getName);path.getName.endsWith(".csv")}, false, hadoopConf)
    //遍歷每一行,用“,”分割
    val flatMap: DStream[String] = fileStream.flatMap(_._2.toString.split(","))
    //將每個單詞標記 為1
    val mapToPair: DStream[(String, Int)] = flatMap.map((_,1))
    //將相同單詞標記 累加
    val reducerByKey: DStream[(String, Int)] = mapToPair.reduceByKey(_ + _)
    reducerByKey.foreachRDD((a,b)=> println(s"count time:${b},${a.collect().toList}"))
    //結果輸出到HDFS
    //  reducerByKey.saveAsTextFiles(outputPath, "suffix")
    reducerByKey.saveAsTextFiles(outputPath)

    //是否觸發job取決於設定的Duration時間間隔
    streamingContext.start()
    //等待程式結束
    streamingContext.awaitTermination()
  }
}

打包上傳叢集,指定輸入輸出路徑執行,往輸入目錄新增檔案即可。