spark streaming監控HDFS檔案目錄
阿新 • • 發佈:2019-01-06
叢集環境:CDH5.8.0 / spark1.6.0 / scala2.10.4
基於Scala的基本使用方式如下:
package com.egridcloud.sparkstreaming import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.spark.SparkConf import org.apache.spark.serializer.KryoSerializer import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.{ Durations, StreamingContext} /** * Created by LHX on 2018/3/7 下午 8:06. * 監控資料夾,實現單詞統計,結果儲存到HDFS */ object SparkStreamingFile { def main(args: Array[String]): Unit = { val classes: Array[Class[_]] = Array[Class[_]](classOf[LongWritable], classOf[Text]) val conf = new SparkConf().setAppName("sparkstreamingfile")//.setMaster("local[2]") conf.set("spark.streaming.fileStream.minRememberDuration", "2592000s") conf.set("spark.serialize", classOf[KryoSerializer].getName()) conf.registerKryoClasses(classes) // 設定批次間隔時間 val streamingContext = new StreamingContext(conf, Durations.seconds(30)) // val inputPath = "C:/tmp/sparkstreamingfile" val inputPath = args(0) // val outputPath = "C:/tmp/sparkstreamingfile_save/" val outputPath=args(1) val hadoopConf = new Configuration() val fileStream: InputDStream[(LongWritable, Text)] = streamingContext.fileStream[LongWritable,Text,TextInputFormat](inputPath, (path: Path) => {println(path.getName);path.getName.endsWith(".csv")}, false, hadoopConf) //遍歷每一行,用“,”分割 val flatMap: DStream[String] = fileStream.flatMap(_._2.toString.split(",")) //將每個單詞標記 為1 val mapToPair: DStream[(String, Int)] = flatMap.map((_,1)) //將相同單詞標記 累加 val reducerByKey: DStream[(String, Int)] = mapToPair.reduceByKey(_ + _) reducerByKey.foreachRDD((a,b)=> println(s"count time:${b},${a.collect().toList}")) //結果輸出到HDFS // reducerByKey.saveAsTextFiles(outputPath, "suffix") reducerByKey.saveAsTextFiles(outputPath) //是否觸發job取決於設定的Duration時間間隔 streamingContext.start() //等待程式結束 streamingContext.awaitTermination() } }
打包上傳叢集,指定輸入輸出路徑執行,往輸入目錄新增檔案即可。