SparkStream檔案監控和資料讀取
阿新 • • 發佈:2018-12-30
程式碼
package main.scala import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.slf4j.LoggerFactory /** * Created by silentwolf on 2016/5/23. */ object FileRead { val log = LoggerFactory.getLogger(classOf[HdfsCount]) def main(args: Array[String]) { if (args.length < 1) { System.err.println("Usage: HdfsWordCount <directory>") System.exit(1) } val sparkConf = new SparkConf().setAppName("HdfsCount").setMaster("local[2]") // Create the context val ssc = new StreamingContext(sparkConf, Seconds(10)) val lines = ssc.textFileStream(args(0)) lines.map(line => { val json = line+"2222222222222" println("-----------------" ) println(json) println("-----------------" ) }).print() ssc.start() ssc.awaitTermination() } }
啟動
(備註:注意檢視/spark/bin/./spark-submit 的方法)
/spark/bin/./spark-submit --class main.scala.FileRead SparkSteamStudy.jar /user/yuhui/sparkStreaming/data
執行資料和過程
在一批資料來的時候,是一行一行存入list集合中,這樣資料可以一行一行取出來