1. 程式人生 > 其它 >Stream Processing with Apache Flink中文版-- 第8章 與外部系統的讀寫互動

Stream Processing with Apache Flink中文版-- 第8章 與外部系統的讀寫互動

資料可以儲存在許多不同的系統中,比如檔案系統、物件儲存、關係資料庫系統、鍵值儲存、搜尋索引、事件日誌、訊息佇列等等。每一類系統都是為特定的訪問模式設計的,並且擅長於服務於特定的目的。因此,今天的資料基礎設施通常由許多不同的儲存系統組成。在向架構中新增新元件之前,應該問一個合乎邏輯的問題:“它與架構中的其他元件的協作情況如何?”

新增資料處理系統(如Apache Flink)需要仔細考慮,因為它不包括儲存層,而是依賴外部儲存系統來獲取和儲存資料。因此,對於像Flink這樣的資料處理引擎來說,為從外部系統讀取資料和向外部系統寫入資料,提供一個裝備良好的聯結器庫以及實現定製聯結器的API是非常重要的。但是,僅能夠將資料讀寫到外部資料儲存庫對於流處理引擎來說是不夠的,流處理引擎希望在發生故障時提供有意義的一致性保證。

在本章中,我們將討論源和接收聯結器如何影響Flink流應用程式的一致性保證,並介紹Flink用於讀寫資料的最流行的聯結器。您將瞭解如何實現自定義源和接收聯結器,以及如何實現向外部資料儲存傳送非同步讀或寫請求的函式。

應用程式一致性保證

在“檢查點、儲存點和狀態恢復”章節中,您瞭解到Flink的檢查點和恢復機制定期接受應用程式狀態的一致檢查點。如果出現故障,應用程式的狀態將從最新完成的檢查點恢復並繼續處理。但是,能夠將應用程式的狀態重置為一致的檢查點,不足以為應用程式實現一致的處理保證。相反,應用程式的源和接收聯結器需要與Flink的檢查點和恢復機制整合,並提供某些屬性來提供有意義的保證。

為了為應用程式提供精確一次的狀態一致性保證,應用程式的每個源聯結器都需要能夠將其讀位置設定為以前的檢查點位置。在採取檢查點時,源操作符將保持其讀取位置,並在恢復期間恢復這些位置。支援讀取位置檢查點的源聯結器的示例是基於檔案的源,這些源將讀取偏移量儲存在檔案的位元組流中,或者Kafka源將讀取偏移量儲存在它所使用的主題分割槽中。如果應用程式從無法儲存和重置讀取位置的源聯結器接收資料,則在發生故障時,應用程式可能會遭受資料丟失,並且只能提供最多一次的一致性保證。

Flink的檢查點和恢復機制以及可重新設定的源聯結器的組合保證了應用程式不會丟失任何資料。但是,應用程式可能會兩次輸出結果,因為在最後一個成功的檢查點之後發出的所有結果(在恢復的情況下,應用程式退回到該檢查點)將再次發出。因此,可重新設定的源和Flink的恢復機制不足以提供端到端的精確一次保證,即使應用程式狀態是精確一次的。

旨在提供端到端(一次)保證的應用程式需要特殊的接收器聯結器。有兩種技術可以應用於不同的情況以實現精確一次的一致性保證:冪等寫和事務性寫。

冪等寫

冪等運算可以執行多次,但只會導致一次更改。例如,重複地將相同的鍵值對插入到hashmap中是冪等操作,因為第一個插入操作將鍵值新增到map中,並且所有後續插入操作都不會更改map,因為它已經包含了鍵值對。另一方面,追加操作不是冪等操作,因為多次追加一個元素會導致多次追加。冪等寫操作對於流應用程式來說很有趣,因為它們可以多次執行而不改變結果。因此,它們可以在一定程度上減輕由Flink的檢查點機制引起的重放結果的影響。

應該注意的是,依賴冪等性的接收器來實現精確一次的應用程式必須保證重播時覆蓋以前寫的結果。例如,如果應用程式有一個接收器,它要將資料更新到鍵值儲存中,則必須確保它能準確地計算用於更新的鍵值。此外,從sink系統讀取資料的應用程式可能會在應用程式恢復期間觀察到意外的結果。當重播開始時,先前發出的結果可能被先前的結果覆蓋。因此,一個

使用恢復應用程式的輸出的應用程式,可能會看到時間上的跳躍,例如,讀取比以前更小的計數。此外,在重播過程中,流應用程式的整體結果將處於不一致的狀態,因為一些結果將被覆蓋,而另一些則沒有。一旦重播完成,應用程式通過了先前失敗的點,結果將再次保持一致。

事務性寫

實現端到端一致性的第二種方法是基於事務寫。這裡的思想是隻將這些結果(在最後一個成功的檢查點之前計算過的結果)寫入外部接收系統。此方法確保端到端精確一次,因為在出現故障時,應用程式將重置到最後一個檢查點,並且在該檢查點之後沒有向接收系統傳送任何結果。通過只在完成一個檢查點後才寫資料,事務處理方法不會遭遇冪等寫的重播不一致性。但是,它增加了延遲,因為結果只有在檢查點完成時才可見。Flink提供了兩個構建塊來實現事務性的接收聯結器—一個通用的write-ahead-log (WAL)接收器和一個two-phase-commit(2PC)接收器。WAL sink將所有結果記錄寫入應用程式狀態,並在接收到完成檢查點的通知後將它們傳送到sink系統。由於接收器緩衝記錄在狀態後端儲存中,所以WAL接收器可以用於任何型別的接收器系統。然而,它並不是精確地提供精確一次保證的銀彈,增加應用程式的狀態大小,接收系統必須處理一個峰值的寫入模式。

相反,2PC sink需要一個接收器系統,該系統提供事務支援或公開構建塊以模擬事務。對於每個檢查點,接收器啟動一個事務並將所有接收到的記錄附加到事務中,將它們寫入接收器系統而不提交它們。當它收到一個檢查點完成的通知時,它提交事務並實現結果持久化。該機制依賴於sink從在完成檢查點之前開啟的故障中恢復後,提交事務的能力。

2PC協議利用了Flink現有的檢查點機制。檢查點barriers是啟動新事務的通知,所有操作符關於其單個檢查點成功的通知是提交投票,而通知檢查點成功的JobManager訊息是提交事務的指令。與WAL sink相比,2PC sink可以根據sink系統和sink的實現實現精確的一次輸出。此外,一個2PC sink不斷寫記錄到sink系統,不會出現WAL sink那種峰值的寫入模式。

表8-1顯示了在最佳情況下可以實現的不同型別的source和sink聯結器的端到端一致性保證;根據sink的實現,實際的一致性可能會較差。

Nonresettable源Resettable源
任意sink 最多一次 至少一次
冪等性sink 最多一次 精確一次(恢復期間暫時不一致)
WAL sink 最多一次 至少一次
2PC sink 最多一次 精確一次

內建聯結器

Apache Flink提供聯結器,用於從各種儲存系統讀取資料並將資料寫入各種儲存系統。訊息佇列和事件日誌(如Apache Kafka、Kinesis或RabbitMQ)是讀取資料流的常見源。在批處理為主的環境中,資料流也常常通過監視檔案系統目錄並在檔案出現時讀取它們來接收資料。

在sink端,資料流往往產生到訊息佇列,用於後續的事件流處理應用,寫入檔案系統歸檔或使資料可用於離線分析或批處理應用程式,或插入鍵值儲存或關係資料庫系統,如Cassandra,ElasticSearch,或MySQL,是資料可搜尋和可查詢,或服務於指示面板應用程式。

不幸的是,除了用於關係DBMS的JDBC外,大多數這些儲存系統都沒有標準介面。相反,每個系統都有自己的帶有專用協議的聯結器類庫。因此,像Flink這樣的處理系統需要維護幾個專用的聯結器,以便能夠從最常用的訊息佇列、事件日誌、檔案系統、鍵值儲存和資料庫系統中讀取事件並將事件寫入其中。

Flink為Apache Kafka、Kinesis、RabbitMQ、Apache Nifi、各種檔案系統、Cassandra、ElasticSearch和JDBC提供聯結器。此外,Apache Bahir專案還為ActiveMQ、Akka、Flume、Netty和Redis提供了額外的Flink聯結器。

為了在應用程式中使用提供的聯結器,需要將其依賴項新增到專案的構建檔案中。我們在“引入外部和Flink依賴項”章節中解釋瞭如何新增聯結器依賴項。

在下一節中,我們將討論Apache Kafka、基於檔案的源和sink以及Apache Cassandra的聯結器。這些是最廣泛使用的聯結器,它們也代表了源和sink系統的重要型別。您可以在Apache Flink或Apache Bahir的文件中找到關於其他聯結器的更多資訊。

Apache Kafka源聯結器

Apache Kafka是一個分散式流平臺。它的核心是一個分散式釋出-訂閱訊息傳遞系統,廣泛用於接收和分發事件流。在深入研究Flink的Kafka聯結器之前,我們簡要地解釋一下Kafka的主要概念。

Kafka將事件流組織為所謂的主題(Topic)。主題是一個事件日誌,它保證事件按寫入的順序讀取。為了擴充套件主題的寫和讀,可以將主題劃分為分佈在叢集中的分割槽。順序保證僅限於分割槽—kafka在從不同分割槽讀取時不提供順序保證。Kafka分割槽中的讀取位置稱為偏移量。

Flink為所有常見的Kafka版本提供源聯結器。通過Kafka 0.11,客戶端庫的API得到了改進,並添加了新的特性。例如,Kafka 0.10增加了對記錄時間戳的支援。自發布1.0以來,API一直保持穩定。Flink提供了一個通用的Kafka聯結器,適用於0.11以後的所有Kafka版本。Flink還為Kafka的0.8、0.9、0.10和0.11版本提供了特定於版本的聯結器。對於本節的其餘部分,我們將重點討論通用聯結器,而對於特定於版本的聯結器,我們建議您參考Flink的文件。

通用 Flink Kafka聯結器的依賴項新增到Maven專案中,如下圖所示:

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.7.1</version>
</dependency>

Flink Kafka聯結器並行地接收事件流。每個並行源任務可以從一個或多個分割槽讀取資料。任務跟蹤每個分割槽的當前讀取偏移量,並將其包含到檢查點資料中。從失敗中恢復時,將恢復偏移量,並且源例項將繼續從檢查點偏移量讀取資料。Flink Kafka聯結器不依賴Kafka自己的偏移跟蹤機制,該機制基於所謂的消費者組。圖8-1顯示了對源例項的分割槽分配。

圖8-1 讀取Kafka Topic分割槽的偏移量

建立一個Kafka源聯結器,如示例8-1所示。

valproperties=newProperties()
properties.setProperty("bootstrap.servers","localhost:9092")
properties.setProperty("group.id","test")
valstream:DataStream[String]=env.addSource(
newFlinkKafkaConsumer[String]("topic",newSimpleStringSchema(),properties))

建構函式有三個引數。第一個引數定義要讀取的主題。可以是單個主題、主題列表,也可以是匹配所有要讀取的主題的正則表示式。當從多個主題讀取時,Kafka聯結器將所有主題的所有分割槽都視為相同的,並將它們的事件多路複用到單個流中。

第二個引數是DeserializationSchema 或 KeyedDeserializationSchema。Kafka訊息儲存為原始位元組訊息,需要反序列化為Java或Scala物件。在例8-1中使用的SimpleStringSchema是一個內建的DeserializationSchema,它只是將位元組陣列反序列化為字串。此外,Flink還為Apache Avro和基於文字的JSON編碼提供了實現。

DeserializationSchema 和KeyedDeserializationSchema是公共介面,因此您可以始終實現自定義的反序列化邏輯。

第三個引數是一個Properties物件,它配置用於連線和讀取Kafka的Kafka客戶端。一個最小的屬性配置包含兩個屬性,"bootstrap.servers" 和"group.id"。有關其他配置屬性,請參閱Kafka文件。為了獲取事件時間時間戳並生成水印,可以通過呼叫FlinkKafkaConsumer.assignTimestampsAndWatermark()向Kafka 消費者提供一個AssignerWithPeriodicWatermark o或an AssignerWithPunctuatedWatermark。將分配程式應用於每個分割槽,以利用每個分割槽的排序保證,並且源例項根據水印傳播協議合併分割槽水印(請參閱“水印傳播和事件時間”)。

                     注意
請注意,如果一個分割槽處於不活動狀態(不提供訊息),則源例項的水印將不起作用。因此,一個不活動的分割槽會導致整個應用程式停頓,因為應用程式的水印不可用。

從0.10.0版本開始,Kafka支援訊息時間戳。當從Kafka版本0.10或更高版本讀取訊息時,如果應用程式以事件時間模式執行,消費者將自動提取訊息時間戳作為事件時間戳。在這種情況下,您仍然需要生成水印,並且應該應用AssignerWithPeriodicWatermark 或 AssignerWithPunctuatedWatermark來轉發之前分配的Kafka時間戳。

還有一些需要注意的配置選項。如可以配置最初讀取Topic分割槽的起始位置。有效的選項是:

  • 對於一個消費者組而言,kafka通過group.id知道最後一次的消費位置,這也是預設配置:

FlinkKafkaConsumer.setStartFromGroupOffsets()
  • 從每個分割槽的最開始位置消費:

FlinkKafkaConsumer.setStartFromEarliest()
  • 從每個分割槽的最新位置消費:

FlinkKafkaConsumer.setStartFromLatest()
  • 消費大於指定時間戳的所有記錄(Kafka版本0.10或更高版本):

FlinkKafkaConsumer.setStartFromTimestamp(long)
  • 使用map物件,指定每個分割槽的消費起始位置:

FlinkKafkaConsumer.setStartFromSpecificOffsets(Map)
                     注意
注意,這種配置隻影響第一次讀位置。在進行恢復或從儲存點開始時,應用程式將從儲存在檢查點或儲存點中的偏移量開始讀取。

可以將Flink Kafka消費者配置為自動發現與正則表示式匹配的新Topic或新增到Topic中的新分割槽。這些特性在預設情況下是禁用的,可以通過向Properties物件新增具有非負值的引數flink.partitiondiscovery.interval-millis來啟用。

Apache Kafka sink聯結器

Flink為0.8以後的所有Kafka版本提供sink聯結器。從Kafka 0.11,客戶端的API得到了改進,並添加了新的特性,比如Kafka 0.10支援記錄時間戳,Kafka 0.11支援事務性寫。自發布1.0以來,API一直保持穩定。Flink提供了一個通用的Kafka聯結器,適用於0.11以後的所有Kafka版本。Flink還提供了針對Kafka 0.8、0.9、0.10和0.11版本的特定於版本的聯結器。對於本節的其餘部分,我們將重點介紹通用聯結器,並向您介紹Flink的文件,以獲得特定於版本的聯結器。Flink的通用Kafka聯結器的依賴項被新增到Maven專案中,如下圖所示:

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.7.1</version>
</dependency>

將Kafka sink新增到DataStream應用程式中,如例8-2所示

valstream:DataStream[String]=...
valmyProducer=newFlinkKafkaProducer[String](
"localhost:9092",// broker list
"topic",// target topic
newSimpleStringSchema)// serialization schema
stream.addSink(myProducer)

例8-2中使用的建構函式接收三個引數。第一個引數是一個逗號分隔的Kafka broker地址字串。第二個是寫入資料的topic的名稱,最後一個是SerializationSchema,它將sink的輸入型別(例8-2中的字串)轉換為位元組陣列。SerializationSchema是我們在Kafka源聯結器部分討論的DeserializationSchema的對應版本。

FlinkKafkaProducer提供了更多具有不同引數組合的建構函式,如下:

與Kafka源聯結器類似,可以為Kafka客戶端傳遞一個Properties物件來提供定製選項。在使用Properties時,必須將brokers列表作為“bootstrap.servers”屬性。檢視Kafka文件以獲得完整的引數列表。

您可以指定一個FlinkKafkaPartitioner來控制記錄如何對映到Kafka分割槽。我們將在本節後面更深入地討論這個特性。

您還可以指定KeyedSerializationSchema,而不是使用SerializationSchema將記錄轉換為位元組陣列,KeyedSerializationSchema將記錄序列化為兩個位元組陣列—一個用於鍵,另一個用於Kafka訊息的值。此外,KeyedSerializationSchema還公開了更多的功能,比如覆蓋目標主題以寫入多個主題。

kafka sink 至少一次的保證

Flink的Kafka sink提供的一致性保證取決於它的配置。Kafka sink在以下條件下提供至少一次的保證:

  • 啟用了Flink的檢查點,應用程式的所有sources都可以重新設定。

  • 如果寫操作不成功,sink聯結器將丟擲異常,導致應用程式失敗並恢復。這是預設的行為。通過將retries屬性設定為大於0的值(預設值),可以將Kafka客戶端配置為在寫操作失敗之前進行重試。您還可以通過在接收器物件上呼叫setLogFailuresOnly(true)來將接收器配置為只記錄寫故障。注意,這將使應用程式的輸出保證無效。

  • sink聯結器等待Kafka在完成其檢查點之前確認輸出記錄。這是預設的行為。通過呼叫sink物件上的setFlushOnCheckpoint(false),可以禁用這種等待。但是,這也將禁用任何輸出保證。

kafka sink 精確一次的保證

Kafka 0.11引入了對事務性寫的支援。由於這個特性,Flink的Kafka接收器也能夠提供精確的一次輸出保證,只要接收器和Kafka配置正確。同樣,Flink應用程式必須啟用檢查點並從可重置的源消費。此外,FlinkKafkaProducer提供了一個語義引數的建構函式,該引數控制接收器提供的一致性保證。可能的一致性值為:

  • Semantic.NONE,它不提供任何保證——記錄可能丟失或多次寫入。

  • Semantic.AT_LEAST_ONCE,它保證沒有寫操作丟失,但是可能會重複。這是預設設定。

  • Semantic.EXACTLY_ONCE,它構建在Kafka的事務上,將每個記錄精確地寫入一次。

當使用Kafka接收器以精確一次模式執行Flink應用程式時,需要考慮一些事情,這有助於大致瞭解Kafka如何處理事務。簡而言之,Kafka的事務將所有訊息新增到分割槽日誌中,並將開啟的事務標記為未提交。一旦事務被提交,標記就被更改為已提交。從topic讀取資料的消費者可以配置一個隔離級別(通過isolation.level屬性)。宣告它是否可以讀取未提交的訊息(預設的read_uncommitted)。如果消費者被配置為read_committed,那麼一旦它遇到未提交的訊息,它就停止從一個分割槽消費,並在提交訊息時繼續使用。因此,開啟的事務可能會阻止消費者讀取分割槽訊息並帶來顯著的延遲。Kafka通過在超時間隔後拒絕和關閉事務來防止這種情況,超時間隔使用transaction.timeout.ms的屬性進行配置。

在Flink的Kafka sink上下文中,這很重要,因為由於恢復週期過長而超時的事務會導致資料丟失。因此,正確配置事務超時屬性非常重要。預設情況下,Flink Kafka sink設定transaction.timeout.ms為一小時。這意味著您可能需要調整kafka本身的transaction.max.timeout.ms屬性,預設設定為15分鐘。此外,提交訊息的可見性取決於Flink應用程式的檢查點間隔。請參閱Flink文件,瞭解在啟用精確一次一致性時的其他一些情況。

                  檢查kafka叢集的配置
Kafka叢集的預設配置仍然會導致資料丟失,即使在確認寫操作之後也是如此。您應該仔細修改Kafka設定的配置,特別注意以下引數:
ack
log.flush.interval.messages
log.flush.interval.ms
log.flush。*
我們建議您參考Kafka文件,以獲得關於它的配置引數的詳細資訊和適用配置的指導原則。

自定義分割槽和寫入訊息時間戳

當將訊息寫入Kafka topic時,Flink Kafka sink任務可以選擇寫入topic的哪個分割槽。FlinkKafkaPartitioner可以在Flink Kafka sink的一些建構函式中定義。如果沒有指定,預設的分割槽程式將每個sink任務對映到一個Kafka分割槽—----由同一個sink任務發出的所有記錄都被寫到同一個分割槽,如果任務多於分割槽,單個分割槽可能包含多個sink任務的記錄。如果分割槽的數量大於子任務的數量,則預設配置將導致空分割槽,事件時間模式下的Flink應用程式,消費此時的topic是,可能會出現問題。

通過提供一個自定義FlinkKafkaPartitioner,您可以控制如何將記錄路由到topic分割槽。例如,可以根據記錄的key屬性建立分割槽程式,或者建立迴圈分割槽程式以實現均勻分佈。還可以根據訊息key將分割槽委託給Kafka。這需要一個KeyedSerializationSchema來提取訊息key,並使用null配置FlinkKafkaPartitioner引數來禁用預設分割槽程式。

最後,可以將Flink的Kafka sink配置為寫入訊息時間戳,這是Kafka 0.10所支援的。通過在sink物件上呼叫setWriteTimestampToKafka(true),可以將記錄的事件時間戳寫入Kafka。

Filesystem源聯結器

Filesystems(檔案系統)通常用於以一種經濟有效的方式儲存大量資料。在大資料架構中,它們通常充當批處理應用程式的資料來源和資料接收器。與高階檔案格式(如Apache Parquet或Apache ORC)相結合,檔案系統可以有效地為分析查詢引擎(如Apache Hive、Apache Impala或Presto)提供服務。因此,檔案系統通常用於“連線”流和批處理應用程式。

Apache Flink提供了一個可重置的源聯結器,可以將檔案中的資料作為流獲取。檔案系統源是flinkstreaming- java模組的一部分。因此,您不需要新增任何其他依賴項來使用此功能。Flink支援不同型別的檔案系統,比如本地檔案系統(包括本地裝載的NFS或SAN共享、Hadoop HDFS、Amazon S3和OpenStack Swift FS)。請參閱“檔案系統配置”以瞭解如何在Flink中配置檔案系統。示例8-3顯示瞭如何通過按行讀取文字檔案來讀取流。

vallineReader=newTextInputFormat(null)
vallineStream:DataStream[String]=env.readFile[String](
lineReader,// The FileInputFormat
"hdfs:///path/to/my/data",// The path to read
FileProcessingMode.PROCESS_CONTINUOUSLY,// The processing mode
30000L)// The monitoring interval in ms

StreamExecutionEnvironment.readFile()方法的引數為:

  • 負責讀取檔案內容的FileInputFormat。我們將在本節的後面討論這個介面的細節。例8-3中的TextInputFormat的null引數定義了單獨設定的路徑。

  • 應該讀取的路徑。如果路徑引用一個檔案,則讀取單個檔案。如果它引用一個目錄,FileInputFormat將掃描該目錄以查詢要讀取的檔案。

  • 讀取路徑的模式。該模式可以是PROCESS_ONCE 或 PROCESS_CONTINUOUSLY。在PROCESS_ONCE模式中,當作業啟動並讀取所有匹配的檔案時,讀取路徑將被掃描一次。在 PROCESS_CONTINUOUSLY中,將定期掃描路徑(在初始掃描之後),並不斷讀取新的和修改過的檔案。

  • 週期性掃描路徑的毫秒間隔。在PROCESS_ONCE模式中忽略該引數。

FileInputFormat是一種專門用於從檔案系統中讀取檔案的InputFormat。FileInputFormat分兩個步驟讀取檔案。首先,它掃描檔案系統路徑,併為所有匹配的檔案建立所謂的輸入分片。輸入分片定義檔案上的範圍,通常通過起始偏移量和長度定義。在將一個大檔案分成多個分段之後,可以將這些分段分配給多個讀取器任務來並行地讀取檔案。根據檔案的編碼,可能需要只生成一個分割來讀取整個檔案。FileInputFormat的第二步是接收輸入分割,讀取分割定義的檔案範圍,並返回所有相應的記錄。

DataStream應用程式中使用的FileInputFormat還應該實現CheckpointableInputFormat介面,該介面定義了檢查點的方法,並在檔案分割中重置InputFormat的當前讀取位置。如果FileInputFormat沒有實現CheckpointableInputFormat介面,則檔案系統源聯結器僅在啟用檢查點時至少提供一次保證,因為輸入格式將從上次執行完整檢查點時處理的分割開始讀取。

在1.7版本中,Flink提供了一些擴充套件FileInputFormat和實現CheckpointableInputFormat的類。TextInputFormat按行讀取文字檔案(按換行字元分隔),CsvInputFormat的子類按逗號分隔值讀取檔案,AvroInputFormat按avro編碼記錄讀取檔案。

在PROCESS_CONTINUOUSLY模式下,檔案系統(filesystem)源聯結器根據修改時間戳識別新檔案。這意味著如果一個檔案被修改,它將被完全重新處理,因為修改時間戳發生了變化。這包括由於追加內容而引起的修改。因此,持續讀取檔案的一種常見技術是將它們寫入臨時目錄,並在完成後自動將它們移到受監控的目錄中。當一個檔案被完全讀取並且完成了一個檢查點時,就可以從目錄中刪除它。如果您使用最終一致的列表操作(如S3)從檔案儲存中讀取資料,那麼通過跟蹤修改時間戳來監控讀取的檔案也會產生影響。由於檔案可能不會按照修改時間戳的順序出現,檔案系統源聯結器可能會忽略它們。

請注意,在PROCESS_ONCE模式中,在掃描檔案系統路徑並建立所有的分片之後,不會採取任何檢查點。

如果你想使用一個檔案系統源聯結器在事件時間應用程式中,您應該清楚,生成水印會是一個挑戰,因為輸入分片由三個程序產生,然後輪詢分發給並行讀取程序。為了生成令人滿意的水印,您需要對包含在稍後由任務處理的分片中的記錄的最小時間戳進行推斷。

Filesystem Sink 聯結器

將流寫入檔案是一個常見的需求,例如,為離線互動分析準備低延遲的資料。由於大多數應用程式只有在檔案完成寫入,並流應用程式長時間執行之後才能讀取檔案,所以流 sink 聯結器通常會將其輸出分塊到多個檔案中。此外,將記錄組織到所謂的bucket中是很常見的,這樣消費應用程式可以更好地控制讀取哪些資料。

與檔案系統源聯結器一樣,Flink的StreamingFileSink聯結器也包含在flink-streaming-java模組中。因此,不需要向構建檔案新增依賴項。

StreamingFileSink為應用程式提供端到端的精確一次保證,前提是應用程式配置了精確一次檢查點,並且在出現故障時重置所有源。我們將在本節後面更詳細地討論恢復機制。示例8-4展示瞭如何使用最少的配置建立StreamingFileSink並將其附加到流中。

valinput:DataStream[String]=…
valsink:StreamingFileSink[String]=StreamingFileSink.forRowFormat(
newPath("/base/path"),
newSimpleStringEncoder[String]("UTF-8")
).build()

input.addSink(sink)

當StreamingFileSink接收到一條記錄時,將該記錄分配給一個bucket。bucket是基路徑的一個子目錄,在示例8-4中使用StreamingFileSink構建器配置了“/base/path”。

bucket是由BucketAssigner選擇的,它是一個公共介面,併為每個記錄返回一個BucketId,該BucketId確定記錄將被寫入的目錄。可以使用withBucketAssigner()方法在構建器上配置BucketAssigner()。如果沒有顯式指定BucketAssigner,則使用DateTimeBucketAssigner,根據記錄寫入時的處理時間將記錄分配到每小時的bucket。

每個bucket目錄包含多個分片檔案,這些檔案由StreamingFileSink的多個並行例項併發生成。此外,每個並行例項將其輸出分割成多個分片檔案。分片檔案的路徑格式如下:

[base-path]/[bucket-path]/part-[task-idx]-[id]

例如,給定一個“/johndoe/demo”的基路徑和一個part字首“part”,這個路徑“/johndoe/demo/2018-07-22-17/part-4-8”指向由第五個(下標從0開始)sink任務寫入bucket“2018-07-22-17”的8個檔案,即:2018年7月22日下午5點。

                 提交檔案的id可能不是連續的
非連續檔案id(提交檔名稱中的最後一個數字)不表示資料丟失。StreamingFileSink只是增加檔案id。當丟棄掛起的檔案時,它不會重用它們的id。

RollingPolicy確定任務何時建立新分片檔案。可以使用構建器上的withRollingPolicy()方法來配置RollingPolicy。預設情況下,StreamingFileSink使用一個DefaultRollingPolicy,配置為當分片檔案超過128 MB或超過60秒時滾動生成它們。還可以配置一個不活動的時間間隔,在此之後將滾動生成分片檔案。

StreamingFileSink支援將記錄寫入分片檔案的兩種模式:row編碼和buik編碼。在row編碼模式中,每個記錄都單獨編碼並附加到一個分片檔案中。在bulk編碼中,記錄是成批收集和寫入的。Apache Parquet以列格式組織和壓縮記錄,是一種需要bulk編碼的檔案格式。

例8-4通過提供一個將單個記錄寫入分片檔案的Encoder,使用row編碼建立StreamingFileSink。在例8-4中,我們使用了SimpleStringEncoder,它呼叫了記錄的toString()方法,並將記錄的字串表示形式寫入檔案。Encoder是一個簡單的介面,只有一個方法,可以很容易地實現。

例8-5所示,建立了一個bulk編碼的StreamingFileSink。

valinput:DataStream[String]=…
valsink:StreamingFileSink[String]=StreamingFileSink.forBulkFormat(
newPath("/base/path"),
ParquetAvroWriters.forSpecificRecord(classOf[AvroPojo])
).build()

input.addSink(sink)

bulk編碼模式下的StreamingFileSink需要BulkWriter.Factory。在例8-5中,我們對Avro檔案使用了Parquet寫入器。請注意,Parquet寫入器包含在flinkparquet模組中,需要將其作為依賴項新增。像往常一樣,BulkWriter.Factory是一個可以實現自定義檔案格式(如Apache Orc)的介面。

                      請注意
bulk編碼模式下的StreamingFileSink不能選擇RollingPolicy。bulk編碼格式只能與OnCheckpointRollingPolicy相結合,OnCheckpointRollingPolicy在每個檢查點上滾動生成分片檔案。

StreamingFileSink提供了精確的一次輸出保證。StreamingFileSink通過一個提交協議來實現這一點,該協議將檔案通過不同的stages,處理中狀態,掛起狀態和完成狀態進行移動,基於Flink的檢查點機制。當sink寫入檔案時,檔案處於處理中狀態。當RollingPolicy決定滾動檔案時,將關閉該檔案並通過重新命名將其移動到掛起狀態。當下一個檢查點完成時,掛起的檔案將移動到完成狀態(再次通過重新命名)。

                掛起的檔案可能永遠不會被提交
在某些情況下,永遠不會提交掛起檔案。StreamingFileSink確保這不會導致資料丟失。但是,這些檔案不會自動清除。
在手動刪除一個掛起檔案之前,您需要檢查它是在延遲還是即將提交。找到具有相同任務索引和更大ID的提交檔案後,可以安全地刪除掛起檔案。

在失敗的情況下,sink任務需要將當前正在處理的檔案重置為最近一次成功檢查點處的寫偏移量。這是通過關閉當前正在處理的檔案並丟棄檔案末尾的無效部分來實現的,例如,通過使用檔案系統的truncate操作。

                STREAMINGFILESINK需要啟用檢查點
如果應用程式沒有啟用檢查點,那麼StreamingFileSink將永遠不會將檔案從掛起狀態移動到完成狀態。

Apache Cassandra Sink 聯結器

略。

實現自定義源函式

DataStream API提供了兩個介面來實現源聯結器和相應的RichFunction抽象類:

  • SourceFunction和RichSourceFunction可用於定義非並行源連線—與單個任務一起執行的源。

  • ParallelSourceFunction和RichParallelSourceFunction可用於定義執行多個並行任務例項的源聯結器。

除了非並行和並行之外,這兩個介面是相同的。與處理函式的豐富變體一樣,RichSourceFunction和RichParallelSourceFunction的子類可以覆蓋open()和close()方法,並訪問RuntimeContext,其中提供並行任務例項的數量和當前例項的索引。

SourceFunction 和 ParallelSourceFunction定義了兩個方法:

  • void run(SourceContext<T> ctx)

  • void cancel()

run()方法執行讀取或接收記錄並將其放入到Flink應用程式中的實際工作。根據接收資料的系統,可以推或拉資料。run()方法由Flink呼叫一次,並在專用的源執行緒中執行,通常在一個無限迴圈(無限流)中讀取或接收資料併發出記錄。可以在某個時間點顯式地取消任務,或者在有限流的情況下,當輸入被完全消耗時終止任務。

當應用程式被取消和關閉時,Flink呼叫cancel()方法。為了執行適當的關閉,在單獨的執行緒中執行的run()方法應該在呼叫cancel()方法時立即終止。示例8-10顯示了一個簡單的源函式,其計數範圍從0到Long.MaxValue。

classCountSourceextendsSourceFunction[Long] {
varisRunning:Boolean=true
overridedefrun(ctx:SourceFunction.SourceContext[Long])={
varcnt:Long=-1
while(isRunning&&cnt<Long.MaxValue) {
cnt+=1
ctx.collect(cnt)
}
}
overridedefcancel()=isRunning=false
}

重置源函式

在本章的前面,我們解釋了Flink只能為使用源聯結器的應用程式提供令人滿意的一致性保證,這些源聯結器可以重放它們的輸出資料。如果提供資料的外部系統公開API來檢索和重置讀取偏移量,則源函式可以重播其輸出。此類系統的示例包括提供檔案流偏移量的檔案系統和將檔案流移動到特定位置的seek方法,或者Apache Kafka,後者為主題的每個分割槽提供偏移量,可以設定分割槽的讀取位置。一個反例是一個從網路套接字讀取資料的源聯結器,它會立即丟棄傳輸的資料。

支援輸出回放的源函式需要與Flink的檢查點機制整合,並且必須在採取檢查點時持久化當前的所有讀取位置。當應用程式從儲存點啟動或從故障中恢復時,將從最新的檢查點或儲存點檢索讀取偏移量。如果應用程式在沒有現有狀態的情況下啟動,則必須將讀取偏移量設定為預設值。復位源函式需要實現CheckpointedFunction介面,儲存讀取偏移和所有相關的元資料資訊,如檔案路徑或分割槽ID,在操作符列表狀態或操作符 union list狀態,取決於offsets應該如何分佈給並行的task例項。有關操作符列表狀態和union list狀態的分發行為的詳細資訊,請參閱“縮放有狀態操作符(Scaling Stateful Operators)”章節。

此外,確保在單獨的執行緒中執行的SourceFunction.run()方法不會提前讀取偏移量並在採取檢查點時發出資料,這一點非常重要;換句話說,當呼叫CheckpointedFunction.snapshotState()方法時。這是通過保護run()中的程式碼來實現的,run()將讀取位置提前,並在一個塊中發出記錄,該塊在一個鎖物件上進行同步,該物件是從SourceContext.getCheckpointLock()方法獲得的。例8-11使例8-10的CountSource可重新設定。

classResettableCountSourceextendsSourceFunction[Long]withCheckpointedFunction{
varisRunning:Boolean=true
varcnt:Long=_
varoffsetState:ListState[Long]=_
overridedefrun(ctx:SourceFunction.SourceContext[Long])={
while(isRunning&&cnt<Long.MaxValue) {
// synchronize data emission and checkpoints
ctx.getCheckpointLock.synchronized{
cnt+=1
ctx.collect(cnt)
}
}
}
overridedefcancel()=isRunning=false
overridedefsnapshotState(snapshotCtx:FunctionSnapshotContext):Unit={
// remove previous cnt
offsetState.clear()
// add current cnt
offsetState.add(cnt)
}
overridedefinitializeState(initCtx:FunctionInitializationContext):Unit={
valdesc=newListStateDescriptor[Long]("offset",classOf[Long])
offsetState=initCtx.getOperatorStateStore.getListState(desc)
// initialize cnt variable
valit=offsetState.get()
cnt=if(null==it||!it.iterator().hasNext) {
-1L
}else{
it.iterator().next()
}
}
}

源函式、時間戳和水印

源函式的另一個重要方面是時間戳和水印。正如在“事件時間處理”和“分配時間戳和生成水印”中指出的,DataStream API提供了兩個選項來分配時間戳和生成水印。時間戳和水印可以由專用的TimestampAssigner分配和生成(詳細資訊請參閱“分配時間戳和生成水印”),也可以由源函式分配和生成。

源函式分配時間戳並通過其SourceContext物件發出水印。SourceContext提供了以下方法:

  • def collectWithTimestamp(T record, longtimestamp): Unit

  • def emitWatermark(Watermark watermark):Unit

collectWithTimestamp()輸出帶有相關時間戳的記錄,emitWatermark()輸出提供的水印。

如果源函式的一個並行例項使用來自多個流分割槽(例如Kafka主題的分割槽)的記錄,那麼除了不需要額外的操作符外,在源函式中分配時間戳和生成水印也是有益的。通常,外部系統(如Kafka)只保證流分割槽中的訊息順序。給定一個並行度為2的源函式操作符,它用6個分割槽從一個Kafka主題讀取資料,源函式的每個並行例項將從3個Kafka主題分割槽讀取記錄。因此,源函式的每個例項多路複用三個流分割槽的記錄來輸出它們。多路複用記錄很可能會在事件時間戳方面引入額外的無序性,這樣下游時間戳分配者可能會產生比預期更多的延遲記錄。

為了避免這種行為,源函式可以獨立地為每個流分割槽生成水印,並且始終將其分割槽的最小水印作為水印。通過這種方式,它可以確保利用每個分割槽上的順序保證,並且不會輸出不必要的延遲記錄。

源函式必須處理的另一個問題是例項變得空閒並且不再輸出任何資料。這是非常有可能產生問題的,因為它可能會阻止整個應用程式前進其水印,從而導致一個停滯的應用程式。由於水印應該是資料驅動的,所以如果沒有接收到輸入記錄,水印生成器(整合在源函式或時間戳分配程式中)將不會發出新的水印。如果您檢視一下Flink是如何傳播和更新水印的(請參閱“水印傳播和事件時間”),您就會發現,如果應用程式涉及到一個shuffle操作(keyBy()、rebalance()等),那麼一個不提前使用水印的操作符就可以停止應用程式的所有水印。

Flink提供了一種機制,通過將源函式標記為臨時空閒來避免這種情況。當處於空閒狀態時,Flink的水印傳播機制將忽略空閒流分割槽。一旦源再次開始發出記錄,它就會被自動設定為活動的。源函式可以通過呼叫SourceContext.markAsTemporarilyIdle()方法來決定何時將自己標記為空閒。

實現自定義接收器函式

在Flink的DataStream API中,任何操作符或函式都可以將資料傳送到外部系統或應用程式。資料流最終不必流到接收器操作符中。例如,您可以實現一個FlatMapFunction,它通過HTTP POST呼叫而不是通過它的收集器來輸出每個傳入的記錄。儘管如此,DataStream API提供了一個專用的SinkFunction介面和一個相應的RichSinkFunction抽象類。SinkFunction介面提供了一個單一的方法:

voidinvoke(INvalue,Contextctx)

SinkFunction的上下文物件提供了對當前處理時間、當前水印以及記錄的時間戳的訪問。

例8-12顯示了一個簡單的SinkFunction,它將感測器讀數寫入socket。注意,在啟動程式之前,您需要啟動一個監聽socket的程序。否則,由於無法開啟到socket的連線,程式會因ConnectException異常而失敗。在Linux上執行命令nc -l localhost 9191來監聽localhost:9191。

valreadings:DataStream[SensorReading]=???
// write the sensor readings to a socket
readings.addSink(newSimpleSocketSink("localhost",9191))
// set parallelism to 1 because only one thread can write to a socket
.setParallelism(1)
// -----
classSimpleSocketSink(valhost:String,valport:Int)
extendsRichSinkFunction[SensorReading] {
varsocket:Socket=_
varwriter:PrintStream=_
overridedefopen(config:Configuration):Unit={
// open socket and writer
socket=newSocket(InetAddress.getByName(host),port)
writer=newPrintStream(socket.getOutputStream)
}
overridedefinvoke(
value:SensorReading,
ctx:SinkFunction.Context[_]):Unit={
// write sensor reading to socket
writer.println(value.toString)
writer.flush()
}
overridedefclose():Unit={
// close writer and socket
writer.close()
socket.close()
}
}

如前所述,應用程式的端到端精確一致性保證取決於其sink聯結器的屬性。為了實現端到端的精確一次語義,應用程式需要冪等或事務接收聯結器。例8-12中的SinkFunction既不執行冪等寫,也不提供事務性寫。由於套接字的僅提供追加特性,因此無法執行冪等寫操作。由於套接字沒有內建的事務支援,所以只能使用Flink的通用WAL sink完成事務寫。在接下來的部分中,您將瞭解如何實現冪等或事務接收聯結器。

冪等sink聯結器

對於許多應用程式,SinkFunction介面足以實現冪等接收器聯結器。這是可能的,如果滿足以下兩點:

  1. 結果資料具有一個確定性(複合)key,可以對其執行冪等更新。對於計算每個感測器和分鐘的平均溫度的應用程式,確定性key可以是感測器的ID和每分鐘的時間戳。確定性key對於確保在發生恢復時正確地覆蓋所有寫操作非常重要。

  2. 外部系統支援每個鍵的更新,比如關係資料庫系統或鍵值儲存。

示例8-13說明了如何實現和使用向JDBC資料庫(在本例中是嵌入式Apache Derby資料庫)寫入的冪等SinkFunction。

valreadings:DataStream[SensorReading]=???
// write the sensor readings to a Derby table
readings.addSink(newDerbyUpsertSink)
// -----
classDerbyUpsertSinkextendsRichSinkFunction[SensorReading] {
varconn:Connection=_
varinsertStmt:PreparedStatement=_
varupdateStmt:PreparedStatement=_
overridedefopen(parameters:Configuration):Unit={
// connect to embedded in-memory Derby
conn=DriverManager.getConnection("jdbc:derby:memory:flinkExample",newProperties())
// prepare insert and update statements
insertStmt=conn.prepareStatement("INSERT INTO Temperatures (sensor, temp) VALUES (?, ?)")
updateStmt=conn.prepareStatement("UPDATE Temperatures SET temp = ? WHERE sensor = ?")
}
overridedefinvoke(r:SensorReading,context:Context[_]):Unit={
// set parameters for update statement and execute it
updateStmt.setDouble(1,r.temperature)
updateStmt.setString(2,r.id)
updateStmt.execute()
// execute insert statement if update statement did not update any row
if(updateStmt.getUpdateCount==0) {
// set parameters for insert statement
insertStmt.setString(1,r.id)
insertStmt.setDouble(2,r.temperature)
// execute insert statement
insertStmt.execute()
}
}
overridedefclose():Unit={
insertStmt.close()
updateStmt.close()
conn.close()
}
}

由於Apache Derby不提供內建的UPSERT語句,因此示例接收器首先嚐試更新一行並插入新行(如果不存在具有給定鍵的行),從而執行UPSERT寫操作。當未啟用WAL時,Cassandra sink聯結器遵循相同的方法。

事務性sink聯結器

當冪等接收器聯結器不適合時,無論是應用程式輸出的特性、所需接收器系統的屬性,還是由於更嚴格的一致性要求,事務性接收器聯結器都可以作為替代。如前所述,事務接收聯結器需要與Flink的檢查點機制整合,因為它們可能只在檢查點成功完成時才向外部系統提交資料。

為了簡化事務接收的實現,Flink的DataStream API提供了兩個模板,可以擴充套件它們來實現自定義接收操作符。兩個模板都實現了CheckpointListener介面來接收來自JobManager關於完成的檢查點的通知(有關介面的詳細資訊,請參閱“接收關於完成的檢查點的通知”):

  • GenericWriteAheadSink模板收集每個檢查點的所有輸出記錄,並將它們儲存在sink任務的操作符狀態。在失敗的情況下,狀態被檢查並恢復。當任務收到檢查點完成通知時,它將完成的檢查點的記錄寫入外部系統。帶有WAL- enabled的Cassandra sink聯結器實現了這個介面。

  • TwoPhaseCommitSinkFunction模板利用了外部接收器系統的事務特性。對於每個檢查點,它啟動一個新事務,並在當前事務的上下文中將所有後續記錄寫入sink系統。接收器在接收到相應檢查點的完成通知時提交事務。

在下面,我們將描述介面及其一致性保證。

GENERICWRITEAHEADSINK

GenericWriteAheadSink通過改進一致性屬性簡化了sink操作符的實現。該操作符與Flink的檢查點機制整合,目標是將每條記錄精確地寫入外部系統一次。但是,您應該知道存在這樣的失敗場景,即提前寫日誌接收器輸出的記錄不止一次。因此,一個GenericWriteAheadSink不能提供精確一次保證,只能提供至少一次的保證。我們將在本節後面更詳細地討論這些場景。

GenericWriteAheadSink的工作方式是將所有接收到的記錄附加到一個寫前日誌中,這個日誌由檢查點分割。每次sink操作符接收到一個檢查點barrier時,它都會啟動一個新分片,並將所有後續記錄附加到新分片中。WAL被儲存並作為操作符狀態進行檢查。由於日誌將被恢復,所以在失敗的情況下不會丟失任何記錄。

當GenericWriteAheadSink接收到關於完成的檢查點的通知時,它會輸出儲存在對應於成功的檢查點的segment中的所有記錄。根據sink操作符的具體實現,可以將記錄寫入任何型別的儲存或訊息系統。當所有記錄都成功輸出後,必須在內部提交相應的檢查點。

檢查點通過兩個步驟提交。首先,接收器持續儲存提交的檢查點資訊,然後從WAL中刪除記錄。無法將提交資訊儲存在Flink的應用程式狀態中,因為它不是永續性的,並且在出現故障時將被重置。相反,GenericWriteAheadSink依賴於一個名為CheckpointCommitter的可插入元件來儲存和查詢關於外部持久儲存中提交的檢查點的資訊。例如,Cassandra sink聯結器預設使用一個向Cassandra寫入的CheckpointCommitter。

由於GenericWriteAheadSink的內建邏輯,實現一個利用WAL的sink並不困難。擴充套件GenericWriteAheadSink的操作符需要提供三個建構函式引數:

  • 一個CheckpointCommitter,見前面章節介紹。

  • 一個TypeSerializer用於序列化輸入記錄。

  • 傳遞給CheckpointCommitter的作業ID,以標識跨應用程式重新啟動的提交資訊

此外,write-ahead運算子需要實現一個單一的方法:

booleansendValues(Iterable<IN>values,longchkpntId,longtimestamp)

GenericWriteAheadSink呼叫sendValues()方法將完成的檢查點的記錄寫入外部儲存系統。該方法接收一個Iterable(包括檢查點的所有記錄)、一個檢查點的ID和一個生成檢查點的時間戳。如果所有寫操作都成功,則該方法必須返回true;如果寫操作失敗,則返回false。

示例8-14展示了一個寫到標準輸出的WriteAhead sink實現。它使用FileCheckpointCommitter,我們在這裡不討論它。您可以在包含該書示例的程式碼倉庫中查詢它的實現。

注意
GenericWriteAheadSink不實現SinkFunction介面。因此,不能使用DataStream.addSink()新增擴充套件GenericWriteAheadSink的sink,而是使用DataStream.transform()方法附加它。
valreadings:DataStream[SensorReading]=???
// write the sensor readings to the standard out via a writeahead log
readings.transform("WriteAheadSink",newSocketWriteAheadSink)
// -----
classStdOutWriteAheadSinkextendsGenericWriteAheadSink[SensorReading](
// CheckpointCommitter that commits checkpoints to the local filesystem
newFileCheckpointCommitter(System.getProperty("java.io.tmpdir")),
// Serializer for records
createTypeInformation[SensorReading].createSerializer(newExecutionConfig),
// Random JobID used by the CheckpointCommitter
UUID.randomUUID.toString) {
overridedefsendValues(
readings:Iterable[SensorReading],
checkpointId:Long,
timestamp:Long):Boolean={
for(r<-readings.asScala) {
// write record to standard out
println(r)
}
true
}
}

示例程式碼庫包含一個應用程式,該應用程式在發生故障時定期進行故障恢復,以演示StdOutWriteAheadSink和一個常規的DataStream.print() sink的行為。

如前所述,GenericWriteAheadSink不能提供精確一次保證。有兩種失敗情況會導致記錄被輸出不止一次:

  • 當任務執行sendValues()方法時,程式失敗。如果外部sink系統不能自動地寫入多個記錄(要麼全部寫入,要麼沒有寫入),那麼可能已經寫入了部分記錄。由於檢查點尚未提交,所以在恢復期間sink將再次寫入所有記錄。

  • 所有記錄都正確寫入,sendValues()方法返回true;但是,在呼叫CheckpointCommitter或CheckpointCommitter未能提交檢查點之前,程式會失敗。在恢復期間,所有尚未提交的檢查點記錄將被重新寫入。

TWOPHASECOMMITSINKFUNCTION

Flink提供了TwoPhaseCommitSinkFunction介面,以簡化sink函式的實現,這些sink函式提供端到端的精確一次保證。但是,2PC sink函式是否提供這種保證取決於實現細節。我們從一個問題開始討論這個介面:“2PC協議是不是太昂貴?”

通常,2PC是確保分散式系統一致性的昂貴方法。但是,在Flink上下文中,協議對於每個檢查點只執行一次。此外,TwoPhaseCommitSinkFunction協議利用了Flink的常規檢查點機制,因此增加的開銷很小。TwoPhaseCommitSinkFunction的工作原理與WAL sink非常相似,但它不會收集Flink應用狀態下的記錄;相反,它將它們以開放事務的形式寫入外部接收器系統。

TwoPhaseCommitSinkFunction實現以下協議。在sink任務發出第一個記錄之前,它在外部sink系統上啟動一個事務。所有隨後收到的記錄都是在事務的上下文中寫入的。當JobManager啟動一個檢查點並在應用程式的源中注入barriers時,2PC協議的投票階段就開始了。當操作符接收到barrier時,它會checkpoint狀態,並在完成之後向JobManager傳送確認訊息。當sink任務接收到barrier時,它將持久化其狀態,準備提交當前事務,並在JobManager上確認檢查點。JobManager的確認訊息類似於2PC協議的提交投票。sink任務必須尚未提交事務,因為不能保證作業的所有任務都將完成它們的檢查點。sink任務還為在下一個檢查點barrier之前到達的所有記錄啟動一個新事務。

當JobManager從所有任務例項接收到成功的檢查點通知時,它將檢查點完成通知傳送給所有感興趣的任務。此通知對應於2PC協議的提交命令。當接收任務接收到通知時,它提交以前檢查點的所有開啟的事務。sink任務一旦確認其檢查點,就必須能夠提交相應的事務,即使在出現故障的情況下也是如此。如果不能提交事務,接收器將丟失資料。當所有sink任務提交它們的事務時,2PC協議的迭代就成功了。

我們來總結一下外部sink系統的要求:

  • 外部sink系統必須提供事務支援,或者sink必須能夠模擬外部系統上的事務。因此,sink應該能夠向sink系統寫入資料,但是寫入的資料在提交之前不能對外公開。

  • 在檢查點間隔期間,事務必須開啟並接受寫操作。

  • 事務必須等到接收到檢查點完成通知時,再提交。在恢復週期的情況下,這可能需要一些時間。如果sink系統關閉事務(例如,一個超時),未提交的資料將丟失。

  • 處理一旦失敗,sink必須能夠恢復事務。一些sink系統提供一個事務ID可用於提交或中止一個開啟的事務。

  • 提交一個事務必須是一個冪等操作,sink或外部系統應該能夠做到:一個事務已經提交或重複提交,沒有影響。

通過一個具體的例子,可以更容易地理解sink系統的協議和需求。例8-15顯示了一個TwoPhaseCommitSinkFunction,它只向檔案系統寫一次(精確一次)。實際上,這是前面討論的BucketingFileSink的簡化版本。

classTransactionalFileSink(valtargetPath:String,valtempPath:String)
extendsTwoPhaseCommitSinkFunction[(String,Double),String,Void](
createTypeInformation[String].createSerializer(newExecutionConfig),
createTypeInformation[Void].createSerializer(newExecutionConfig)) {
vartransactionWriter:BufferedWriter=_
// Creates a temporary file for a transaction into which the records are written.
overridedefbeginTransaction():String={
// path of transaction file is built from current time and task index
valtimeNow=LocalDateTime.now(ZoneId.of("UTC")).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)
valtaskIdx=this.getRuntimeContext.getIndexOfThisSubtask
valtransactionFile=s"$timeNow-$taskIdx"
// create transaction file and writer
valtFilePath=Paths.get(s"$tempPath/$transactionFile")
Files.createFile(tFilePath)
this.transactionWriter=Files.newBufferedWriter(tFilePath)
println(s"Creating Transaction File: $tFilePath")
// name of transaction file is returned to later identify the transaction
transactionFile
}
/** Write record into the current transaction file. */
overridedefinvoke(
transaction:String,
value: (String,Double),
context:Context[_]):Unit={
transactionWriter.write(value.toString)
transactionWriter.write('\n')
}
/** Flush and close the current transaction file. */
overridedefpreCommit(transaction:String):Unit={
transactionWriter.flush()
transactionWriter.close()
}
/** Commit a transaction by moving the precommitted transaction file
* to the target directory.
*/
overridedefcommit(transaction:String):Unit={
valtFilePath=Paths.get(s"$tempPath/$transaction")
// check if the file exists to ensure that the commit is idempotent
if(Files.exists(tFilePath)) {
valcFilePath=Paths.get(s"$targetPath/$transaction")
Files.move(tFilePath,cFilePath)
}
}
/** Aborts a transaction by deleting the transaction file. */
overridedefabort(transaction:String):Unit={
valtFilePath=Paths.get(s"$tempPath/$transaction")
if(Files.exists(tFilePath)) {
Files.delete(tFilePath)
}
}
}

TwoPhaseCommitSinkFunction[IN, TXN, CONTEXT]有三個型別引數:

  • IN指定輸入記錄的型別。在例8-15中,這是一個帶有String和Double的Tuple2。

  • TXN定義了一個事務識別符號,可用於在失敗後識別和恢復事務。在例8-15中,這是一個包含事務檔名稱的字串。

  • CONTEXT定義了一個可選的自定義上下文。例8-15中的TransactionalFileSink不需要上下文,因此將型別設定為Void。

TwoPhaseCommitSinkFunction的建構函式需要兩個TypeSerializer----—一個用於TXN型別,另一個用於CONTEXT型別。

最後,TwoPhaseCommitSinkFunction定義了五個需要實現的功能:

  • beginTransaction(): TXN啟動一個新的事務並返回事務識別符號。例8-15中的TransactionalFileSink建立一個新的事務檔案,並將其名稱作為識別符號返回。

  • invoke(txn: TXN, value: IN, context: Context[_]): Unit 將一個值寫入當前事務。示例8-15中的sink將該值作為字串追加到事務檔案。

  • preCommit(txn: TXN): Unit 預提交一個事務。預提交事務可能不會收到進一步的寫操作。例8-15中的實現重新整理並關閉事務檔案。

  • commit(txn: TXN): Unit 提交一個事務。此操作必須是冪等的—如果此方法被呼叫兩次,不能將記錄寫入輸出系統兩次。在例8-15中,我們檢查事務檔案是否仍然存在,並將其移動到目標目錄(如果存在的話)。

  • abort(txn: TXN): Unit 中止一個事務。對於一個事務,此方法也可能被呼叫兩次。例8-15中的TransactionalFileSink檢查事務檔案是否仍然存在,如果仍然存在,則刪除它。

正如您所看到的,該介面的實現並不太複雜。然而,實現的複雜性和一致性保證取決於sink系統的特性和功能。例如,Flink的Kafka生成器實現了TwoPhaseCommitSinkFunction介面。如前所述,如果由於超時而回滾事務,聯結器可能會丟失資料。因此,即使它實現了TwoPhaseCommitSinkFunction介面,也不能提供精確一次保證。

非同步訪問外部系統

除了接收或傳送資料流之外,通過在遠端資料庫中查詢資訊來豐富資料流是另一個需要與外部儲存系統互動的常見用例。一個例子就是著名的雅虎流處理基準測試,它基於需要用儲存在鍵值儲存中的相應活動的詳細資訊豐富廣告點選流。

對於這些用例,最直接的方法是實現一個MapFunction,它為每個處理過的記錄查詢資料儲存,等待查詢返回結果,豐富記錄,並輸出結果。雖然這種方法很容易實現,但它存在一個主要問題:對外部資料儲存的每個請求都增加了顯著的延遲(一個請求/響應包含兩條網路訊息),而MapFunction將大部分時間花在等待查詢結果上。

Apache Flink提供AsyncFunction來減少遠端I/O呼叫的延遲。AsyncFunction併發傳送多個查詢並非同步處理它們的結果。可以將其配置為保留記錄的順序(請求返回的順序可能與傳送它們的順序不同),或者按照查詢結果的順序返回結果,以進一步減少延遲。該函式還與Flink的檢查點機制進行了適當的整合——當前正在等待響應的輸入記錄是檢查點的,在恢復的情況下,查詢是重複的。此外,AsyncFunction可以正確地處理事件時間處理,因為它可以確保即使啟用了無序結果,水印也不會被記錄覆蓋。

為了利用AsyncFunction,外部系統應該提供一個支援非同步呼叫的客戶端,這是許多系統的情況。如果系統只提供同步客戶端,則可以建立執行緒來發送請求並處理它們。AsyncFunction的介面如下圖所示:

traitAsyncFunction[IN,OUT]extendsFunction{
defasyncInvoke(input:IN,resultFuture:ResultFuture[OUT]):Unit
}

函式的型別引數定義其輸入和輸出型別。使用兩個引數為每個輸入記錄呼叫asyncInvoke()方法。第一個引數是輸入記錄,第二個引數是返回函式結果或異常的回撥物件。在示例8-16中,我們展示瞭如何在DataStream上應用AsyncFunction。

valreadings:DataStream[SensorReading]=???
valsensorLocations:DataStream[(String,String)]=AsyncDataStream.orderedWait(
readings,
newDerbyAsyncFunction,
5,
TimeUnit.SECONDS,// timeout requests after 5 seconds
100)// at most 100 concurrent requests

使用了AsyncFunction的非同步操作符用AsyncDataStream物件配置,它提供了兩個靜態方法:orderedWait() 和unorderedWait()。這兩個方法是過載方法的,使用不同的引數組合。orderedWait()應用一個非同步操作符,它按照輸入記錄的順序發出結果,而unorderWait()操作符只確保水印和檢查點barrier保持對齊。其他引數指定記錄的非同步呼叫何時超時,以及啟動多少併發請求。示例8-17顯示了DerbyAsyncFunction,它通過JDBC介面查詢嵌入式Derby資料庫。

classDerbyAsyncFunctionextendsAsyncFunction[SensorReading, (String,String)] {
// caching execution context used to handle the query threads
privatelazyvalcachingPoolExecCtx=ExecutionContext.fromExecutor(Executors.newCachedThreadPool())
// direct execution context to forward result future to callback object
privatelazyvaldirectExecCtx=
ExecutionContext.fromExecutor(org.apache.flink.runtime.concurrent.Executors.directExecutor())
/**
* Executes JDBC query in a thread and handles the resulting Future
* with an asynchronous callback.
*/
overridedefasyncInvoke(
reading:SensorReading,
resultFuture:ResultFuture[(String,String)]):Unit={
valsensor=reading.id
// get room from Derby table as Future
valroom:Future[String]=Future{
// Creating a new connection and statement for each record.
// Note: This is NOT best practice!
// Connections and prepared statements should be cached.
valconn=DriverManager.getConnection("jdbc:derby:memory:flinkExample",newProperties())
valquery=conn.createStatement()
// submit query and wait for result; this is a synchronous call
valresult=query.executeQuery(s"SELECT room FROM SensorLocations WHERE sensor ='$sensor'")
// get room if there is one
valroom=if(result.next()) {
result.getString(1)
}else{
"UNKNOWN ROOM"
}
// close resultset, statement, and connection
result.close()
query.close()
conn.close()
// return room
room
}(cachingPoolExecCtx)
// apply result handling callback on the room future
room.onComplete{
caseSuccess(r)=>resultFuture.complete(Seq((sensor,r)))
caseFailure(e)=>resultFuture.completeExceptionally(e)
}(directExecCtx)
}
}

示例8-17中的DerbyAsyncFunction的asyncInvoke()方法在Future中封裝了阻塞JDBC查詢,它是通過CachedThreadPool執行的。為了保持示例的簡潔,我們為每個記錄建立一個新的JDBC連線,當然,這是非常低效的,應該避免。Future[String]儲存JDBC查詢的結果。

最後,我們對Future應用一個onComplete()回撥,並將結果(或可能的異常)傳遞給ResultFuture處理程式。與JDBC查詢Future不同,onComplete()回撥由DirectExecutor處理,因為將結果傳遞給ResultFuture是一個輕量級操作,不需要專門的執行緒。注意,所有操作都是以非阻塞方式完成的。

  • 需要指出的是,AsyncFunction例項是按順序呼叫其每個輸入記錄的——函式例項不是以多執行緒方式呼叫的。因此,asyncInvoke()方法應該通過啟動非同步請求並使用將結果轉發到ResultFuture的回撥來處理結果,從而快速返回。必須避免的常見反模式包括:

  • 傳送一個阻塞asyncInvoke()方法的請求。

  • 傳送非同步請求,但在asyncInvoke()方法中等待請求完成。

結束語

在本章中,您將瞭解Flink DataStream應用程式如何從外部系統讀取資料並將資料寫入外部系統,以及應用程式實現不同端到端一致性保證的要求。我們介紹了Flink最常用的內建源和sink聯結器,它們也代表不同型別的儲存系統,如訊息佇列、檔案系統和鍵值儲存。

隨後,我們向您展示瞭如何實現自定義源和sink聯結器,包括WAL和2PC接收器聯結器,並提供了詳細的示例。最後,您瞭解了Flink的AsyncFunction,它可以通過非同步執行和處理請求來顯著提高與外部系統互動的效能。