spark 系列之六 SparkStreaming資料來源之socket流
阿新 • • 發佈:2020-12-31
SparkStreaming 這個名字起的很有意思,就是隻要能流式讀取的資料,都可以作為SparkStreaming的資料來源
下面我們來介紹另一種常見的流,socket流(套接字流)
socket個人理解就像是一部手機(根據時代的不同,之前可以理解成有線電話),通過這部手機,可以進行資訊流的傳遞。
在打電話之前,我們是不是得先有一部電話,然後想給別人打電話的時候,是不是我們得先撥號,然後等待對方接通(就是阻塞的過程),再通話(send和receive的過程)?
對於socket不太瞭解的可以去自行Google,這裡不做詳細解讀。參考如下連結(https://www.jianshu.com/p/007adba06047)
上兩張圖震一下:
圖一:如上圖,在七個層級關係中,我們將的socket屬於傳輸層,其中UDP是一種面向無連線的傳輸層協議。
圖二:如上圖,Socket的通訊方式
切入正題:
首先初始化一個ServerSocket,然後對指定的埠進行繫結,接著對埠及進行監聽,通過呼叫accept方法阻塞。
import java.io.{PrintWriter} import java.net.ServerSocket import scala.io.Source object DataSourceSocket { def index(length: Int) = { val rdm= new java.util.Random rdm.nextInt(length) } def main(args: Array[String]) { /** * 第一個引數是文字檔案路徑,第二個引數是埠地址,第三個引數是時間間隔(單位是毫秒,也就是每隔多少毫秒傳送一次資訊) */ val params = Array("D:/software_download/spark_text/streaming/logfile/score.txt","9999", "1000") val fileName = params(0) val lines = Source.fromFile(fileName).getLines.toList println(lines) val rowCount= lines.length val listener = new ServerSocket(params(1).toInt)
while (true) { //呼叫accept方法,等待客戶端接通 val socket = listener.accept() new Thread() { override def run = { println("Got client connected from: " + socket.getInetAddress) val out = new PrintWriter(socket.getOutputStream(), true) while (true) { Thread.sleep(params(2).toLong) val content = lines(index(rowCount)) println(content) out.write(content + '\n') out.flush() } socket.close() } }.start() } } }
執行結果如下,會阻塞等待客戶端:
其次,使用客戶端接通socket服務端,並且進行通訊,不過該實驗的通訊方式是單向的,即客戶端只接受資料,而不傳送資料。
import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.storage.StorageLevel object SparkStreaming_socket { def main(args: Array[String]): Unit = { /** * 監聽本地的9999埠 */ val params = Array("localhost","9999") val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(1)) val lines = ssc.socketTextStream(params(0), params(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() } }
該服務一旦啟動,服務端會持續的傳送資料如下:
而客戶端,則根據傳送過來的資料,進行持續的計算操作如下:
以上。