1. 程式人生 > >Spark之SparkStreaming-Input DStreams and Receivers

Spark之SparkStreaming-Input DStreams and Receivers

  Input DStreams是表示從源資料接收的資料流的DStream。 在wordcount案例中,lines是一個Input DStream,因為它表示從netcat的9999埠接收的資料流。 每個輸入DStream(檔案流除本節稍後討論)與Receiver(Scala doc,Java doc)物件相關聯,該物件從源接收資料並將其儲存在Spark的記憶體中進行處理。

Spark Streaming提供了兩類內建的streaming sources。

  basic source:StreamingContext API中直接提供的資源。示例:file systems, and socket connections。
  Advanced sources

:像kafak,Flume, Kinesis等等,可以通過外部的實用工具類來獲得。這些需要連結到連結部分中討論的額外依賴關係。
我們將在本節稍後討論每個類別中的一些來源。

請注意,如果要在流式應用程式中並行接收多個數據流,則可以建立多個 Input DStream(在“效能調優”部分進一步討論)。這將建立多個receiver,同時接收多個數據流。但是請注意,Sparkworker/executor是一個長期執行的任務,因此它佔據分配給Spark Streaming應用程式的核心之一。因此,重要的是要記住,Spark Streaming應用程式需要分配足夠的核心(或執行緒,如果在本地執行)來處理接收到的資料,以及執行接收器
(所以將核心數設定多些)。

  當本地執行Spark Streaming程式時,不要使用“local”或“local [1]”作為master URL。 這兩者意味著只有一個執行緒將用於在本地執行任務。 如果您正在使用基於receiver(e.g. sockets, Kafka, Flume, etc)的Input DStream,則單執行緒將用於執行接收器,不會留出執行緒來處理接收到的資料。 因此,當在本地執行時,始終使用“local [n]”作為master URL,其中n>要執行的接收器數量(有關如何設定主機的資訊,請參閱Spark屬性)。
  將邏輯擴充套件到在叢集上執行,分配給Spark Streaming應用程式的core數量必須大於接收器數量。 否則系統將收到資料,但無法處理它。

使用jssc.socketTextStream(...)通過TCP Socket接收的文字資料建立了一個DStream。 除了Socket之外,StreamingContext API提供了從檔案建立DStreams作為Input Source的方法。

FileStream:用於從與HDFS API(即HDFS,S3,NFS等)相容的任何檔案系統上的檔案讀取資料,DStream可以建立為:

 streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory);

Spark Streaming將監視目錄dataDirectory並處理在該目錄中建立的任何檔案(不支援巢狀目錄中寫入的檔案)。 注意:
  檔案必須具有相同的資料格式
  必須通過將資料原子移動或重新命名到資料目錄中,在dataDirectory中建立檔案。
  移動後,檔案不能更改。 因此,如果檔案被不斷附加,則不會讀取新的資料。
對於簡單的文字檔案,有一個更容易的方法streamingContext.textFileStream(dataDirectory)。 而檔案流不需要執行接收器,因此不需要分配核心。

Streams based on Custom Receivers:可以通過自定義接收器接收的資料流建立DStream。

Queue of RDDs as a Stream::為了使用測試資料測試Spark Streaming應用程式,還可以使用streamingContext.queueStream(queueOfRDDs)建立基於RDD佇列的DStream。 推送到佇列中的每個RDD將被視為DStream中的一批資料,並像流一樣處理。

  這種類別的源需要與外部非Spark庫進行連線,其中一些具有複雜的依賴關係(例如Kafka和Flume)。 因此,為了最小化與依賴關係的版本衝突相關的問題,從這些源建立DStream的功能已被移動到可以在必要時顯式連結到單獨的庫。

請注意,這些高階源在Spark shell中不可用,因此在shell中無法測試基於這些高階源的應用程式。 如果您真的想在Spark shell中使用它們,則必須下載相應的Maven artifact’s JAR及其依賴項,並將其新增到類路徑中。

也可以通過自定義資料來源建立輸入DStream。 所有你需要做的是實現一個使用者定義的接收器(請參見下一部分來了解是什麼)可以從自定義源接收資料並將其推入到Spark中。 See the Custom Receiver Guide for details.

  基於可靠性可以有兩種資料來源。 來源(如Kafka和Flume)允許傳輸的資料被確認。 如果從這些可靠來源接收資料的系統正確地確認接收到的資料,則可以確保由於任何種類的故障而不會丟失任何資料。 這導致兩種接收器:

Reliable Receiver - 當資料已被接收並且通過複製儲存在Spark中時,可靠的接收器正確地向可靠的源傳送確認。
Unreliable Receiver - 不可靠的接收器不向源傳送確認。 這可以用於不支援確認的源,或者甚至當不想要或需要進入確認的複雜性時,用於可靠的源。
Custom Receiver Guide.中討論瞭如何編寫可靠接收器的細節。