Apache Flink原始碼解析之stream-sink
上一篇我們談論了Flink stream source,它作為流的資料入口是整個DAG(有向無環圖)拓撲的起點。那麼與此對應的,流的資料出口就是跟source對應的Sink。這是我們本篇解讀的內容。
SinkFunction
跟SourceFunction
對應,Flink針對Sink的根介面被稱為SinkFunction
。繼承自Function
這一標記介面。SinkFunction
介面只提供了一個方法:
void invoke(IN value) throws Exception;
該方法提供基於記錄級別的呼叫(也就是每個被輸出的記錄都會呼叫該介面一次)。上面方法的引數value
SinkFunction
相對來說比較簡潔,下面我們來看一下它的實現者。
內建的SinkFunction
同樣,我們先來看一下完整的型別繼承體系:
DiscardingSink
這是最簡單的SinkFunction的實現,它的實現等同於沒有實現(其實現為空方法)。它的作用就是將記錄丟棄掉。它的主要場景應該是那些無需最終處理結果的記錄。
WriteSinkFunction
WriteSinkFunction
是一個抽象類。該類的主要作用是將需要輸出的tuples
(元組)作為簡單的文字輸出到指定路徑的檔案中去,元組被收集到一個list中去,然後週期性得寫入檔案。
WriteSinkFunction
的構造器接收兩個引數:
- path : 需要寫入的檔案路徑
- format :
WriteFormat
的例項,用於指定寫入資料的格式
在構造器中,它呼叫方法cleanFile
,該方法用於初始化指定path的檔案。初始化的行為是:如果不存在則建立,如果存在則清空。
invoke方法的實現:
public void invoke(IN tuple) {
tupleList.add(tuple);
if (updateCondition()) {
format.write(path, tupleList);
resetParameters();
}
}
從實現來看,其先將需要sink的元組加入內部集合。然後呼叫updateCondition
方法。該方法是WriteSinkFunction
定義的抽象方法。用於實現判斷將tupleList寫入檔案以及清空tupleList的條件。接著將集合中的tuple寫入到指定的檔案中。最後又呼叫了resetParameters
方法。該方法同樣是一個抽象方法,它的主要用途是當寫入的場景是批量寫入時,可能會有一些狀態引數,該方法就是用於對狀態進行reset。
WriteSinkFunctionByMillis
該類是WriteSinkFunction
的實現類。它支援以指定的毫秒數間隔將tuple批量寫入檔案。間隔由構造器引數millis
指定。在內部,WriteSinkFunction
以lastTime
維護上一次寫入的時間狀態。它主要涉及上面提到的兩個抽象方法的實現:
protected boolean updateCondition() {
return System.currentTimeMillis() - lastTime >= millis;
}
updateCondition
的實現很簡單,拿當前主機的當前時間戳跟上一次的執行時間戳狀態作對比:如果大於指定的間隔,則條件為真,觸發寫入。
protected void resetParameters() {
tupleList.clear();
lastTime = System.currentTimeMillis();
}
resetParameters
實現是先清空tupleList,然後將lastTime老的時間戳狀態覆蓋為最新時間戳。
WriteFormat
一個寫入格式的抽象類,提供了兩種實現:
- WriteFormatAsText : 以原樣文字的形式寫入指定路徑的檔案
- WriteFormatAsCsv : 以csv格式寫入指定檔案
RichSinkFunction
RichSinkFunction
通過繼承AbstractRichFunction
為實現一個rich SinkFunction提供基礎(AbstractRichFunction
提供了一個open/close方法對,以及獲取執行時上下文物件手段)。RichSinkFunction
也是抽象類,它有三個具體實現。
SocketClientSink
支援以socket的方式將資料傳送到特定目標主機所在的伺服器作為flink stream的sink。資料被序列化為byte array然後寫入到socket。該sink支援失敗重試模式的訊息傳送。該sink 可啟用autoFlush
,如果啟用,那麼會導致吞吐量顯著下降,但延遲也會降低。該方法的構造器,提供的引數:
- hostName : 待連線的server的host name
- port : server的埠
- schema :
SerializationSchema
的例項,用於序列化物件。 - maxNumRetries : 最大重試次數(-1為無限重試)
- autoflush : 是否自動flush
重試的策略在invoke
方法中,當傳送失敗時進入到異常捕捉塊中進行。
OutputFormatSinkFunction
一個將記錄寫入OutputFormat
的SinkFunction的實現。
OutputFormat :定義被消費記錄的輸出介面。指定了最終的記錄如何被儲存,比如檔案就是一種儲存實現。
PrintSinkFunction
該實現用於將每條記錄輸出到標準輸出流(stdOut)或標準錯誤流(stdErr)。在輸出時,如果當前task的並行subtask例項個數大於1,也就是說當前task是並行執行的(同時存在多個例項),那麼在輸出每條記錄之前會輸出一個prefix
字首。prefix為在全域性上下文中當前subtask的位置。
常見聯結器中的Sink
Flink自身提供了一些針對第三方主流開源系統的聯結器支援,它們有:
- elasticsearch
- flume
- kafka(0.8/0.9版本)
- nifi
- rabbitmq
這些第三方系統(除了twitter)的sink,無一例外都是繼承自RichSinkFunction
。
小結
這篇文章我們主要談及了Flink的stream sink相關的設計、實現。當然這個主題還沒有完全談完,還會有後續篇幅繼續解讀。
微信掃碼關注公眾號:Apache_Flink