1. 程式人生 > 實用技巧 >spark 系列之六 SparkStreaming資料來源之socket流

spark 系列之六 SparkStreaming資料來源之socket流

SparkStreaming 這個名字起的很有意思,就是隻要能流式讀取的資料,都可以作為SparkStreaming的資料來源

下面我們來介紹另一種常見的流,socket流(套接字流)

socket個人理解就像是一部手機(根據時代的不同,之前可以理解成有線電話),通過這部手機,可以進行資訊流的傳遞。

在打電話之前,我們是不是得先有一部電話,然後想給別人打電話的時候,是不是我們得先撥號,然後等待對方接通(就是阻塞的過程),再通話(send和receive的過程)?

對於socket不太瞭解的可以去自行Google,這裡不做詳細解讀。參考如下連結(https://www.jianshu.com/p/007adba06047)

上兩張圖震一下:

圖一:如上圖,在七個層級關係中,我們將的socket屬於傳輸層,其中UDP是一種面向無連線的傳輸層協議。

圖二:如上圖,Socket的通訊方式

切入正題:

首先初始化一個ServerSocket,然後對指定的埠進行繫結,接著對埠及進行監聽,通過呼叫accept方法阻塞。

import java.io.{PrintWriter}
import java.net.ServerSocket
import scala.io.Source

object DataSourceSocket {
  def index(length: Int) = {
    val rdm 
= new java.util.Random rdm.nextInt(length) } def main(args: Array[String]) { /** * 第一個引數是文字檔案路徑,第二個引數是埠地址,第三個引數是時間間隔(單位是毫秒,也就是每隔多少毫秒傳送一次資訊) */ val params = Array("D:/software_download/spark_text/streaming/logfile/score.txt","9999", "1000") val fileName = params(0) val lines = Source.fromFile(fileName).getLines.toList println(lines) val rowCount
= lines.length val listener = new ServerSocket(params(1).toInt)
while (true) { //呼叫accept方法,等待客戶端接通 val socket = listener.accept() new Thread() { override def run = { println("Got client connected from: " + socket.getInetAddress) val out = new PrintWriter(socket.getOutputStream(), true) while (true) { Thread.sleep(params(2).toLong) val content = lines(index(rowCount)) println(content) out.write(content + '\n') out.flush() } socket.close() } }.start() } } }

執行結果如下,會阻塞等待客戶端:

其次,使用客戶端接通socket服務端,並且進行通訊,不過該實驗的通訊方式是單向的,即客戶端只接受資料,而不傳送資料。

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel

object SparkStreaming_socket {

  def main(args: Array[String]): Unit = {
    /**
     * 監聽本地的9999埠
     */
    val params = Array("localhost","9999")

    val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    val lines = ssc.socketTextStream(params(0), params(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }

}

該服務一旦啟動,服務端會持續的傳送資料如下:

而客戶端,則根據傳送過來的資料,進行持續的計算操作如下:

以上。