Spark流程式設計指引(四)---------------------------DStreams基本模型,輸入DStreams和接收者
離散流(DStreams)
離散流或者稱為DStreams是Spark流程式設計提供的基本抽象。它代表了持續的資料流,從一個數據源接收到的資料流或者是在一個輸入流上應用轉變操作處理後的資料流。
在內部實現上,DStream代表了一系列連續的RDDs.RDDs是Spark對不可變的,分散式資料集的抽象。DStream中的每個RDD包含了一定間隔內的資料,正如下圖所示:
任何應用在DStream上的操作都會被轉換成應用在底層RDD上的操作。比如,在前面介紹的轉換一個行流到單詞流的的例子裡,flatMap操作作用在lines DStream上的每個RDDs上,然後產生了words DStream的RDDs。正如下圖所示:
這些底層的RDD轉換操作被Spark引擎計算。DStreams隱藏了大部分的細節,提供給開始者便捷地高層次的API。
輸入DStreams和接收者
DStreams中的輸入DStreams代表了從源流中接收到的輸入資料流。在前面的例子中,lines就是一個輸入DStreams,它代表了從網路伺服器中接收到的資料流。
每個輸入DStreams都與一個接收物件相關。這個接收物件從一個源中接收資料並存儲在Spark記憶體中用於以後的處理。
SPark流提供了兩種內建的源流:
1.基本的源:StreamingContext API直接可用的源,比如:檔案系統,socket連線,和Akka actors
2.高階源:一些源比如:Kafka, Flume, Kinesis, Twitter。需要通過額外的實用工具類來支援。在上一節的連結章節中有說明。
接下來,我們將要討論兩種型別中的一些源。
注意:如果你想要在你的流程式裡並行地接收多個流的資料,你可以建立多個輸入的DStreams.這將會建立多個接收者同時從多個數據流中接收資料。但是注意SPark worker/executor作為一個長期執行的任務,這樣做將會佔用分配給Spark流程式的一個cpu核心。所以,要記住給Spark流程式分配足夠的CPU(執行緒數,如果是本地模式),以便處理接收到的資料和執行接收者程式。
需要記住的關鍵點:
1.當在本地執行Spark流程式時,不要使用"local"或者"local[1]"作為master URL。因為,這意味著只用一個執行緒執行本地任務。如果你在使用一個基於接收者(如sockets, Kafka, Flume)的輸入DStream,這個執行緒將會被用來執行接收者任務,就不會有執行緒來處理接收到的資料。所以,當執行在本地模式時,總是應該使用"local[n]"作為master URL,n要大於執行的接收都個數。
2.同樣的道理,當執行在叢集模式時,分配給Spark流程式的核心數也必須大於接收者的個數。否則,系統將只會接收資料,但是確不能處理資料。
基本的源:
在上一節的例子中,我們建立了一個DStream從一個TCP連線中接收文字資料。在例子中,我們看到了ssc.socketTextStream(...)
的用法。除sockets外,Spark的Streaming API還提供了以檔案和Akka actors作為輸入源來建立DStreams的方法。
檔案流:
為了從任何與HDFS API相容的檔案系統(如HDFS,S3,NFS等)的檔案中讀取資料,一個DStream可以用以下的方式建立:
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
Spark流將會監控dataDirectory指明的目錄,並處理目錄下任何檔案的建立事件(巢狀目錄還不支援)。注意以下幾點:
1.所有檔案必須有相同的資料格式
2.在dataDirectory下建立的檔案必須是通過原子的移動或重新命名操作。
3.一旦被移動了,檔案就不能被修改。就是說如果檔案正在被持續的寫入,新的資料將不會被讀取。
對於簡單的文字檔案,有一個更簡單的方法streamingContext.textFileStream(dataDirectory)
。另外,檔案流不需要執行一個接收者,所以不需要分配核心。
對於pythonAPI,檔案流還不支援,只支援textFileStream方法。
基於典型的Actors的流:
可以通過使用streamingContext.actorStream(actorProps, actor-name)
方法建立從Akka actors接收資料流的DStreams。
對於pythonAPI,由於actors只在JAVA和Scala庫裡支援,所以actorStream還不支援python.
將RDDs隊列當作流:
為了用測試資料測試Spark Streaming應用程式,我們可以基於一個RDDs佇列來建立一個DStream,通過使用streamingContext.queueStream(queueOfRDDs)
方法。加入到佇列中的每個RDD都會被當成DStream的一個批次的資料,以流的方式被處理。
想要了解更多的關於基於sockets, files, 和actors的流的細節。可以檢視相關的API文件:StreamingContext for Scala, JavaStreamingContext for Java, and StreamingContext for Python.
高階源:
使用這類的源需要非Spark的外部介面,其中的一些源需要複雜的依懶(如Kafka 和 Flume)。所以,為了儘量減少依懶間的版本衝突問題,從這些源建立DStream的功能被劃分到不同的庫中。這樣就可以根據需要明確地連結不同的庫。比如,想要從Twitter的tweets流中建立一個DStream,你需要做如下幾步:
1.連結:向SBT或Maven工程新增座標為spark-streaming-twitter_2.10
的依懶。
2.程式設計:匯入TwitterUtils
類,並照下面的方法用TwitterUtils.createStream
建立一個DStream。
3.部署:生成一個包含所有依懶的JAR包,然後部署應用。我們將在後面“部署應用”一節中詳細介紹部署相關的知識。
import org.apache.spark.streaming.twitter._
TwitterUtils.createStream(ssc, None)
注意:這些高階源在Spark Shell裡不可用,所以不能在shell裡測試基於這些源的應用程式。如果你真的想在Spark Shell裡使用它們,你必須下載依懶的相關Maven JAR包,並新增到CLASSPATH。
下面是關於一些高階源的說明:
-
Kafka: Spark Streaming 1.4.0 與Kafka 0.8.1.1相容. 更多資訊 Kafka Integration Guide .
-
Flume: Spark Streaming 1.4.0 與Flume 1.4.0相容. 更多資訊 Flume Integration Guide .
-
Twitter: Spark Streaming的TwitterUtils使用Twitter4j 3.0.3 來獲取用Twitter’s Streaming API建立的tweets的公共流。Twitter4J庫提供的任何一個方法都提供了認證資訊.你可以獲取公共流或者基於關鍵字過濾後的流. 檢視API文件(Scala,Java) 和例子(TwitterPopularTags 和 TwitterAlgebirdCMS).
自定義源:
python還不支援。
也可以通過自定義源來建立輸入DStreams.你需要實現一個receiver方法用來從自定義的源中接收資料,並將其壓入Spark。檢視Custom ReceiverGuide以獲取更多資訊。
接收者的可靠性:
基於可靠性,有兩種資料來源。一些源(如Kafka 和 Flume)允許對傳送的資料進行確認。如果從這些可靠資料來源接收資料的系統正確地確認了接收到的資料,就可以保證沒有因為任何錯誤而丟失資料。這就產生了兩種型別的接收者:
1.可靠的接收者:一個可靠的接收者正確地確認了一個可靠的源,資料已經被接收並以備份地形式儲存到Spark中。
2.不可靠的接收者:這些源的接收者不支援確認。甚至對於一個可靠的源,可以實現一個不進行復雜確認的不可靠的接收者。