1. 程式人生 > >Spark Streaming 基本輸入流

Spark Streaming 基本輸入流

檔案流

在spark/mycode/streaming/logfile目錄下新建兩個日誌檔案log1.txt和log2.txt,隨便輸入內容。比如,在log1.txt中輸入以下內容:

I love Hadoop
I love Spark
Spark is fast

進入spark-shell建立檔案流。另起一個終端視窗,啟動進入spark-shell

import org.apache.spark.streaming._
val ssc = new StreamingContext(sc, Seconds(20))
val lines = ssc.textFileSteam("file:///usr/local/spark/mycode/streaming/logfile"
) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x=>(x, 1)).reduceByKey(_+_) wordConnts.print() ssc.start() ssc.awaitTermination()

上面在spark-shell中執行的程式,一旦你輸入ssc.start()以後,程式就開始自動進入迴圈監聽狀態,螢幕上會顯示一堆資訊。

在”/usr/local/spark/mycode/streaming/logfile”目錄下再新建一個log3.txt檔案,就可以在監聽視窗中顯示詞頻統計結果。

下面採用獨立應用程式的方式實現上述監聽資料夾的功能

$ /usr/local/spark/mycode
$ mkdir streaming
$ cd streaming
$ mkdir -p src/main/scala
$ cd src/main/scala
$ vim TestStreaming.scala

用vim編輯器新建一個TestSteaming.scala程式碼檔案,程式碼如下:

import org.apache.spark._
import org.apache.spark.streaming._
object WordCountStreaming {
    def main(args: Array[Strinig])
    {
        //設定為本地執行模式,2個執行緒,一個監聽,另一個處理資料
val sparkConf = new SparkConf().setAppName("WordCountStreaming").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(2)) //時間間隔為2秒 //這裡採用本地檔案,當然也可採用HDFS檔案 val lines = ssc.textFileStream("file:///usr/local/spark/mycode/streaming/logfile") val words = lines.flatMap(_.split(" ")) val wordConts = words.map(x=>(x, 1)).reduceByKey(_+_) wordConnts.print() ssc.start() ssc.awaitTermination() } }

然後在streaming目錄下

vim simple.sbt

在simple.sbt檔案中輸入以下程式碼:

name:="Simple Project"
version:="1.0"
scalaVersion:="2.11.8"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.3.0"

執行sbt打包編譯的命令:

$ /usr/local/sbt/sbt package

打包成功以後,就可以輸入以下命令啟動這個程式了:

$ cd /usr/local/spark/mycode/streaming
$ /usr/local/spark/bin/spark-submit --class "WordConntStreaming" /usr/local/spark/mycode/streaming/target/scala-2.11/simple-project_2.11-1.0.jar

執行上面命令後就進入了監聽狀態(我們把執行這個監聽程式的視窗稱為監聽視窗)
切換到另一個Shell視窗,在”/usr/local/spark/mycode/streaming/logfile”目錄下再新建一個log5.txt檔案,檔案裡面隨便輸入一些單詞,儲存好檔案退出vim編輯器
再次切換回”監聽視窗”,等待20秒以後,按鍵盤Ctrl+C或者Ctrl+D停止監聽程式,就可以看到監聽視窗的螢幕上會打印出單詞統資訊。

套接字流

Spark Streaming可以通過Socket埠監聽並接收資料,然後進行相應處理

$ /usr/local/spark/mycode
$ mkdir streaming
$ cd streaming
$ mkdir -p src/main/scala
$ cd src/main/scala
$ vim NetworkWordCount.scala

NetworkWordCount.scala檔案內容如下:

package org.apacke.spark.exmaples.streaming
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel
object NetworkWordCount {
    def main(args:Array[String]) {
        if(args.length < 2)
        { 
            System.err.println("Usage:NetworkWordCount<hostname><port>")
            System.exit(1)
        }
        StreamingExamples.setStreamingLogLevels()
        val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
        val ssc = new StreamingContext(sparkConf, Seconds(1))
        val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
        val words = lines.flatMap(_.split(" "))
        val wordConts = words.map(x=>(x, 1)).reduceByKey(_+_)
        wordConnts.print()
        ssc.start()
        ssc.awaitTermination()
    }
}

在相同目錄下再新建另外一個檔案StreamingExamples.scala,檔案內容如下:

package org.apacke.spark.exmaples.streaming
import org.apache.spark.internal.Logging
import org.apache.log4j.{Level, Logger}
/** Utility functions for Spark Streaming examples. */ 
object StreamingExamples extends Logging {
    /** Set reasonable logging levels for streaming if the user has not configured log4j. */
    def setStreamingLogLevels() {
        val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
        if(!log4jInitialized){
            //We first log something to initialize Spark's default loggin, then we override the logging level
            logInfo("Setting log level to [WARN] for steaming exmaple."+" To override add a custom log4j.properties to the classpath.")
            Logger.getRootLogger.setLevel(Level.WARN)       
        }
    }
}

然後在streaming目錄下

vim simple.sbt

在simple.sbt檔案中輸入以下程式碼:

name:="Simple Project"
version:="1.0"
scalaVersion:="2.11.8"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.3.0"

執行sbt打包編譯的命令:

$ /usr/local/sbt/sbt package

打包成功以後,就可以輸入以下命令啟動這個程式了:

$ cd /usr/local/spark/mycode/streaming
$ /usr/local/spark/bin/spark-submit --class "org.apache.spark.examples.streaming.NetworkWordCount" /usr/local/spark/mycode/streaming/target/scala-2.11/simple-project_2.11-1.0.jar localhost 9999

新開啟一個視窗作為nc視窗,啟動nc程式:

$ nc -lk 9999

可以在nc視窗中隨意輸入一些單詞,監聽視窗就會自動獲得單詞資料流資訊,在監聽視窗每個1秒就會打印出詞頻統計資訊。

下面我們進一步把資料來源的產生方式修改一下,不要使用nc程式,而是採用自己編寫的程式產生Socket資料來源

$ cd /usr/local/spark/mycode/streaming/src/main/scala
$ vim DataSourceSocket.scala
package org.apache.spark.examples.streaming
import java.io.{PrintWriter}
import java.netServerSocket
import scala.io.Source
object DataSourceSocket {
    def index(lenght:Int)={
        val rdm = new java.util.Random
        rdm.nextInt(length)
    }
    def main(args:Array[String]){
        if(args.lenght != 3){
            System.err.println("Usage:<filename><port><milliseconds>")
            System.exit(1)
        }
        val fileName = args(0)
        val lines = Source.fromFile(fileName).getLines.toList
        val rowCount = lines.length
        val listener = new ServerSocket(args(1).toInt)
        while(true){
            val socket = listener.accept()
            new Thread(){
                override def run = {
                    println("Got client connected from:"+socket.getInetAddress)
                    val out = new PrintWriter(socket.getOutputStream(), true)
                    while(true){
                        Thread.sleep(args(2).toLong)
                        val content = lines(index(rowCount))
                        println(content)
                        out.write(content+"\n")
                        out.flush()
                    }
                    socket.close()
                }
            }.start()       
        }

    }
}

執行sbt打包編譯:

$ cd /usr/local/spark/mycode/streaming
$ /usr/local/sbt/sbt package

DataSourceSocket程式需要把一個文字檔案作為輸入引數,所以,在啟動這個程式之前,需要先建立一個文字檔案word.txt並隨便輸入幾行內容。

啟動DataSourceSocket程式:

$ cd /usr/local/spark/mycode/streaming
$ /usr/local/spark/bin/spark-submit --class "org.apache.spark.examples.streaming.DataSourceSocket" /usr/local/spark/mycode/streaming/target/scala-2.11/simple-project_2.11-1.0.jar /usr/local/spark/mycode/streaming/word.txt 9999 1000

這個視窗會不斷打印出一些隨機讀取到的文字資訊,這些資訊也是Socket資料來源,會被監聽程式捕捉到。
在另外一個視窗啟動監聽程式:

$ /usr/local/spark/bin/spark-submit --class "org.apache.spark.examples.streaming.NetworkWordCount" /usr/local/spark/mycode/streaming/target/scala-2.11/simple-project_2.11-1.0.jar localhost 9999

啟動成功後,你就可以看到螢幕上不斷打印出詞頻統計資訊。

RDD佇列流

在除錯Spark Streaming應用程式時,我們可以使用streamingContext.queueStreaming(queueOfRDD)建立基於RDD佇列的DStream。

下面新建一個TestRDDQueueStream.scala檔案,需要完成的功能是:每隔1秒建立一個RDD,Streaming每隔2秒對資料進行處理。

package org.apacke.spark.exmaples.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.rdd.RDD
import org.apache.spark.streaming.StringContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}

object QueueStream {
    def main(args:Array[String])
    {
        val sparkConf = new SparkConf().setAppName("TestRDDQueue").setMaster("local[2]")
        val ssc = new StreamingContext(sparkConf, Seconds(2))
        val rddQueue = new scala.collection.mutable.SynchronizedQueue[RDD[Int]]()
        val queueStream = ssc.queueStream(rddQueue)
        val reducedStream = mappedStream.reduceByKey(_+_)
        reducedStream.print()
        ssc.start()
        for(i <- 1 to 10)
        {
            rddQueue += ssc.sparkContext.makeRDD(1 to 100, 2)
            Thread.sleep(1000)
        }
        ssc.stop()
    }
}

sbt打包成功後,執行下面命令執行程式:

$ cd /usr/local/spark/mycode/streaming
$ /usr/local/spark/bin/spark-submit --class "org.apache.spark.examples.streaming.QueueStream" /usr/local/spark/mycode/streaming/target/scala-2.11/simple-project_2.11-1.0.jar

執行上面命令後,程式就開始運行了。

自定義接收器(Receiver)

SparkStreaming 能夠接收任意型別的流式資料,不單單只是內建的Flume,Kafka,Kinesis,files,sockets等等。當然若要支援此種資料,則需要開發者自定義程式來接受對應的資料來源。

自定義一個接收器類,通常需要繼承原有的基類,在這裡需要繼承自Receiver,該虛基類有兩個方法需要重寫分別是

  • onstart() 接收器開始執行時觸發方法,在該方法內需要啟動一個執行緒,用來接收資料。
  • onstop() 接收器結束執行時觸發的方法,在該方法內需要確保停止接收資料。

當然在接收資料流過程中也可能會發生終止接收資料的情況,這時候onstart內可以通過isStoped()來判斷 ,是否應該停止接收資料

以下是接收一個套接字上的文字流的自定義接收器。它以文字流中的“\”分隔線分割,並將它們儲存在spark中。如果接收執行緒有任何連線錯誤或接收錯誤,則接收器將重新啟動。

class MyReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2){
  override def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      override def run() { receive() }
    }.start()
  }

  override def onStop() {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself if isStopped() returns false
  }
  /** Create a socket connection and receive data until receiver is stopped */
  private def receive() {
    var socket: Socket = null
    var userInput: String = null
    try {
      // Connect to host:port
      socket = new Socket(host, port)
      // Until stopped or connection broken continue reading
      val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
      userInput = reader.readLine()
      while(!isStopped && userInput != null) {
        store(userInput)
        userInput = reader.readLine()
      }
      reader.close()
      socket.close()
      // Restart in an attempt to connect again when server is active again
      restart("Trying to connect again")
    } catch {
      case e: java.net.ConnectException =>
        // restart if could not connect to server
        restart("Error connecting to " + host + ":" + port, e)
      case t: Throwable =>
        // restart if there is any other error
        restart("Error receiving data", t)
    }
  }
}

使用自定義接收器

val stream = ssc.receiverStream(new MyReceiver("218.193.154.155",9999))