Spark Streaming 基本輸入流
檔案流
在spark/mycode/streaming/logfile目錄下新建兩個日誌檔案log1.txt和log2.txt,隨便輸入內容。比如,在log1.txt中輸入以下內容:
I love Hadoop
I love Spark
Spark is fast
進入spark-shell建立檔案流。另起一個終端視窗,啟動進入spark-shell
import org.apache.spark.streaming._
val ssc = new StreamingContext(sc, Seconds(20))
val lines = ssc.textFileSteam("file:///usr/local/spark/mycode/streaming/logfile" )
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x=>(x, 1)).reduceByKey(_+_)
wordConnts.print()
ssc.start()
ssc.awaitTermination()
上面在spark-shell中執行的程式,一旦你輸入ssc.start()以後,程式就開始自動進入迴圈監聽狀態,螢幕上會顯示一堆資訊。
在”/usr/local/spark/mycode/streaming/logfile”目錄下再新建一個log3.txt檔案,就可以在監聽視窗中顯示詞頻統計結果。
下面採用獨立應用程式的方式實現上述監聽資料夾的功能
$ /usr/local/spark/mycode
$ mkdir streaming
$ cd streaming
$ mkdir -p src/main/scala
$ cd src/main/scala
$ vim TestStreaming.scala
用vim編輯器新建一個TestSteaming.scala程式碼檔案,程式碼如下:
import org.apache.spark._
import org.apache.spark.streaming._
object WordCountStreaming {
def main(args: Array[Strinig])
{
//設定為本地執行模式,2個執行緒,一個監聽,另一個處理資料
val sparkConf = new SparkConf().setAppName("WordCountStreaming").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(2)) //時間間隔為2秒
//這裡採用本地檔案,當然也可採用HDFS檔案
val lines = ssc.textFileStream("file:///usr/local/spark/mycode/streaming/logfile")
val words = lines.flatMap(_.split(" "))
val wordConts = words.map(x=>(x, 1)).reduceByKey(_+_)
wordConnts.print()
ssc.start()
ssc.awaitTermination()
}
}
然後在streaming目錄下
vim simple.sbt
在simple.sbt檔案中輸入以下程式碼:
name:="Simple Project"
version:="1.0"
scalaVersion:="2.11.8"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.3.0"
執行sbt打包編譯的命令:
$ /usr/local/sbt/sbt package
打包成功以後,就可以輸入以下命令啟動這個程式了:
$ cd /usr/local/spark/mycode/streaming
$ /usr/local/spark/bin/spark-submit --class "WordConntStreaming" /usr/local/spark/mycode/streaming/target/scala-2.11/simple-project_2.11-1.0.jar
執行上面命令後就進入了監聽狀態(我們把執行這個監聽程式的視窗稱為監聽視窗)
切換到另一個Shell視窗,在”/usr/local/spark/mycode/streaming/logfile”目錄下再新建一個log5.txt檔案,檔案裡面隨便輸入一些單詞,儲存好檔案退出vim編輯器
再次切換回”監聽視窗”,等待20秒以後,按鍵盤Ctrl+C或者Ctrl+D停止監聽程式,就可以看到監聽視窗的螢幕上會打印出單詞統資訊。
套接字流
Spark Streaming可以通過Socket埠監聽並接收資料,然後進行相應處理
$ /usr/local/spark/mycode
$ mkdir streaming
$ cd streaming
$ mkdir -p src/main/scala
$ cd src/main/scala
$ vim NetworkWordCount.scala
NetworkWordCount.scala檔案內容如下:
package org.apacke.spark.exmaples.streaming
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel
object NetworkWordCount {
def main(args:Array[String]) {
if(args.length < 2)
{
System.err.println("Usage:NetworkWordCount<hostname><port>")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(1))
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordConts = words.map(x=>(x, 1)).reduceByKey(_+_)
wordConnts.print()
ssc.start()
ssc.awaitTermination()
}
}
在相同目錄下再新建另外一個檔案StreamingExamples.scala,檔案內容如下:
package org.apacke.spark.exmaples.streaming
import org.apache.spark.internal.Logging
import org.apache.log4j.{Level, Logger}
/** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging {
/** Set reasonable logging levels for streaming if the user has not configured log4j. */
def setStreamingLogLevels() {
val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
if(!log4jInitialized){
//We first log something to initialize Spark's default loggin, then we override the logging level
logInfo("Setting log level to [WARN] for steaming exmaple."+" To override add a custom log4j.properties to the classpath.")
Logger.getRootLogger.setLevel(Level.WARN)
}
}
}
然後在streaming目錄下
vim simple.sbt
在simple.sbt檔案中輸入以下程式碼:
name:="Simple Project"
version:="1.0"
scalaVersion:="2.11.8"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.3.0"
執行sbt打包編譯的命令:
$ /usr/local/sbt/sbt package
打包成功以後,就可以輸入以下命令啟動這個程式了:
$ cd /usr/local/spark/mycode/streaming
$ /usr/local/spark/bin/spark-submit --class "org.apache.spark.examples.streaming.NetworkWordCount" /usr/local/spark/mycode/streaming/target/scala-2.11/simple-project_2.11-1.0.jar localhost 9999
新開啟一個視窗作為nc視窗,啟動nc程式:
$ nc -lk 9999
可以在nc視窗中隨意輸入一些單詞,監聽視窗就會自動獲得單詞資料流資訊,在監聽視窗每個1秒就會打印出詞頻統計資訊。
下面我們進一步把資料來源的產生方式修改一下,不要使用nc程式,而是採用自己編寫的程式產生Socket資料來源
$ cd /usr/local/spark/mycode/streaming/src/main/scala
$ vim DataSourceSocket.scala
package org.apache.spark.examples.streaming
import java.io.{PrintWriter}
import java.netServerSocket
import scala.io.Source
object DataSourceSocket {
def index(lenght:Int)={
val rdm = new java.util.Random
rdm.nextInt(length)
}
def main(args:Array[String]){
if(args.lenght != 3){
System.err.println("Usage:<filename><port><milliseconds>")
System.exit(1)
}
val fileName = args(0)
val lines = Source.fromFile(fileName).getLines.toList
val rowCount = lines.length
val listener = new ServerSocket(args(1).toInt)
while(true){
val socket = listener.accept()
new Thread(){
override def run = {
println("Got client connected from:"+socket.getInetAddress)
val out = new PrintWriter(socket.getOutputStream(), true)
while(true){
Thread.sleep(args(2).toLong)
val content = lines(index(rowCount))
println(content)
out.write(content+"\n")
out.flush()
}
socket.close()
}
}.start()
}
}
}
執行sbt打包編譯:
$ cd /usr/local/spark/mycode/streaming
$ /usr/local/sbt/sbt package
DataSourceSocket程式需要把一個文字檔案作為輸入引數,所以,在啟動這個程式之前,需要先建立一個文字檔案word.txt並隨便輸入幾行內容。
啟動DataSourceSocket程式:
$ cd /usr/local/spark/mycode/streaming
$ /usr/local/spark/bin/spark-submit --class "org.apache.spark.examples.streaming.DataSourceSocket" /usr/local/spark/mycode/streaming/target/scala-2.11/simple-project_2.11-1.0.jar /usr/local/spark/mycode/streaming/word.txt 9999 1000
這個視窗會不斷打印出一些隨機讀取到的文字資訊,這些資訊也是Socket資料來源,會被監聽程式捕捉到。
在另外一個視窗啟動監聽程式:
$ /usr/local/spark/bin/spark-submit --class "org.apache.spark.examples.streaming.NetworkWordCount" /usr/local/spark/mycode/streaming/target/scala-2.11/simple-project_2.11-1.0.jar localhost 9999
啟動成功後,你就可以看到螢幕上不斷打印出詞頻統計資訊。
RDD佇列流
在除錯Spark Streaming應用程式時,我們可以使用streamingContext.queueStreaming(queueOfRDD)建立基於RDD佇列的DStream。
下面新建一個TestRDDQueueStream.scala檔案,需要完成的功能是:每隔1秒建立一個RDD,Streaming每隔2秒對資料進行處理。
package org.apacke.spark.exmaples.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.rdd.RDD
import org.apache.spark.streaming.StringContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}
object QueueStream {
def main(args:Array[String])
{
val sparkConf = new SparkConf().setAppName("TestRDDQueue").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val rddQueue = new scala.collection.mutable.SynchronizedQueue[RDD[Int]]()
val queueStream = ssc.queueStream(rddQueue)
val reducedStream = mappedStream.reduceByKey(_+_)
reducedStream.print()
ssc.start()
for(i <- 1 to 10)
{
rddQueue += ssc.sparkContext.makeRDD(1 to 100, 2)
Thread.sleep(1000)
}
ssc.stop()
}
}
sbt打包成功後,執行下面命令執行程式:
$ cd /usr/local/spark/mycode/streaming
$ /usr/local/spark/bin/spark-submit --class "org.apache.spark.examples.streaming.QueueStream" /usr/local/spark/mycode/streaming/target/scala-2.11/simple-project_2.11-1.0.jar
執行上面命令後,程式就開始運行了。
自定義接收器(Receiver)
SparkStreaming 能夠接收任意型別的流式資料,不單單只是內建的Flume,Kafka,Kinesis,files,sockets等等。當然若要支援此種資料,則需要開發者自定義程式來接受對應的資料來源。
自定義一個接收器類,通常需要繼承原有的基類,在這裡需要繼承自Receiver,該虛基類有兩個方法需要重寫分別是
- onstart() 接收器開始執行時觸發方法,在該方法內需要啟動一個執行緒,用來接收資料。
- onstop() 接收器結束執行時觸發的方法,在該方法內需要確保停止接收資料。
當然在接收資料流過程中也可能會發生終止接收資料的情況,這時候onstart內可以通過isStoped()來判斷 ,是否應該停止接收資料
以下是接收一個套接字上的文字流的自定義接收器。它以文字流中的“\”分隔線分割,並將它們儲存在spark中。如果接收執行緒有任何連線錯誤或接收錯誤,則接收器將重新啟動。
class MyReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2){
override def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
override def run() { receive() }
}.start()
}
override def onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself if isStopped() returns false
}
/** Create a socket connection and receive data until receiver is stopped */
private def receive() {
var socket: Socket = null
var userInput: String = null
try {
// Connect to host:port
socket = new Socket(host, port)
// Until stopped or connection broken continue reading
val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
userInput = reader.readLine()
while(!isStopped && userInput != null) {
store(userInput)
userInput = reader.readLine()
}
reader.close()
socket.close()
// Restart in an attempt to connect again when server is active again
restart("Trying to connect again")
} catch {
case e: java.net.ConnectException =>
// restart if could not connect to server
restart("Error connecting to " + host + ":" + port, e)
case t: Throwable =>
// restart if there is any other error
restart("Error receiving data", t)
}
}
}
使用自定義接收器
val stream = ssc.receiverStream(new MyReceiver("218.193.154.155",9999))