1. 程式人生 > >SparkStream檔案監控和資料讀取

SparkStream檔案監控和資料讀取

程式碼

package main.scala

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.slf4j.LoggerFactory
/**
  * Created by silentwolf on 2016/5/23.
  */
object FileRead {

  val log = LoggerFactory.getLogger(classOf[HdfsCount])
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: HdfsWordCount <directory>")
      System.exit(1)
    }

    val sparkConf = new SparkConf().setAppName("HdfsCount").setMaster("local[2]")
    // Create the context
    val ssc = new StreamingContext(sparkConf, Seconds(10))

    val lines = ssc.textFileStream(args(0))
    lines.map(line => {
      val json = line+"2222222222222"
      println("-----------------" )
      println(json)
      println("-----------------" )
    }).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

啟動

(備註:注意檢視/spark/bin/./spark-submit   的方法)

/spark/bin/./spark-submit --class main.scala.FileRead SparkSteamStudy.jar /user/yuhui/sparkStreaming/data

執行資料過程

在一批資料來的時候,是一行一行存入list集合中,這樣資料可以一行一行取出來