Flink基礎(二十三):FLINK基本題(二)
1 Flink是如何支援批流一體的?
本道面試題考察的其實就是一句話:Flink的開發者認為批處理是流處理的一種特殊情況。批處理是有限的流處理。Flink 使用一個引擎支援了DataSet API 和 DataStream API。
2 Flink是如何做到高效的資料交換的?
在一個Flink Job中,資料需要在不同的task中進行交換,整個資料交換是有 TaskManager 負責的,TaskManager 的網路元件首先從緩衝buffer中收集records,然後再發送。Records 並不是一個一個被髮送的,二是積累一個批次再發送,batch 技術可以更加高效的利用網路資源。
3 Flink是如何做容錯的?
Flink 實現容錯主要靠強大的CheckPoint機制和State機制。Checkpoint 負責定時製作分散式快照、對程式中的狀態進行備份;State 用來儲存計算過程中的中間狀態。
4 Flink 分散式快照的原理是什麼?
Flink的分散式快照是根據Chandy-Lamport演算法量身定做的。簡單來說就是持續建立分散式資料流及其狀態的一致快照。
核心思想是在 input source 端插入 barrier,控制 barrier 的同步來實現 snapshot 的備份和 exactly-once 語義。
5 Flink是如何保證Exactly-once 語義的?
端到端的exactly-once對sink要求比較高,具體實現主要有冪等寫入和事務性寫入兩種方式。冪等寫入的場景依賴於業務邏輯,更常見的是用事務性寫入。
而事務性寫入又有預寫日誌(WAL)和兩階段提交(2PC)兩種方式。
兩階段提交
Flink通過實現兩階段提交和狀態儲存來實現端到端的一致性語義。分為以下幾個步驟:
開始事務(beginTransaction)建立一個臨時資料夾,來寫把資料寫入到這個資料夾裡面
預提交(preCommit)將記憶體中快取的資料寫入檔案並關閉
正式提交(commit)將之前寫完的臨時檔案放入目標目錄下。這代表著最終的資料會有一些延遲
丟棄(abort)丟棄臨時檔案
若失敗發生在預提交成功後,正式提交前。可以根據狀態來提交預提交的資料,也可刪除預提交的資料。
6 Flink 的 kafka 聯結器有什麼特別的地方?
Flink原始碼中有一個獨立的connector模組,所有的其他connector都依賴於此模組,Flink 在1.9版本釋出的全新kafka聯結器,摒棄了之前連線不同版本的kafka叢集需要依賴不同版本的connector這種做法,只需要依賴一個connector即可。
7 說說 Flink的記憶體管理是如何做的?
Flink 並不是將大量物件存在堆上,而是將物件都序列化到一個預分配的記憶體塊上,這個記憶體塊叫做MemorySegment
,它代表了一段固定長度的記憶體(預設大小為 32KB),也是 Flink 中最小的記憶體分配單元,並且提供了非常高效的讀寫方法。每條記錄都會以序列化的形式儲存在一個或多個MemorySegment
中。
如果需要處理的資料超出了記憶體限制,則會將部分資料儲存到硬碟上。Flink 為了直接操作二進位制資料實現了自己的序列化框架。理論上Flink的記憶體管理分為三部分:
Network Buffers:這個是在TaskManager啟動的時候分配的,這是一組用於快取網路資料的記憶體,每個塊是32K,預設分配2048個,可以通過“taskmanager.network.numberOfBuffers”修改
Memory Manage pool:這是一個由MemoryManager
管理的,由眾多MemorySegment
組成的超大集合。Flink 中的演算法(如 sort/shuffle/join)會向這個記憶體池申請 MemorySegment,將序列化後的資料存於其中,使用完後釋放回記憶體池。預設情況下,池子佔了堆記憶體的 70% 的大小。大量的Memory Segment塊,用於執行時的演算法(Sort/Join/Shuffle等),這部分啟動的時候就會分配。記憶體的分配支援預分配和lazy load,預設懶載入的方式。
User Code,這部分是除了Memory Manager之外的記憶體用於User code和TaskManager本身的資料結構。
Flink使用堆外記憶體:
- 啟動超大記憶體(上百GB)的JVM需要很長時間,GC停留時間也會很長(分鐘級)。使用堆外記憶體可以極大地減小堆記憶體(只需要分配Remaining Heap),使得 TaskManager 擴充套件到上百GB記憶體不是問題。
- 進行IO操作時,使用堆外記憶體可以zero-copy,使用堆內記憶體至少要複製一次。
- 堆外記憶體在程序間是共享的。
8 說說 Flink的序列化如何做的?
- 序列化與反序列化可以理解為編碼與解碼的過程。序列化以後的資料希望佔用比較小的空間,而且資料能夠被正確地反序列化出來。為了能正確反序列化,序列化時僅儲存二進位制資料本身肯定不夠,需要增加一些輔助的描述資訊。此處可以採用不同的策略,因而產生了很多不同的序列化方法。Java本身自帶的序列化和反序列化的功能,但是輔助資訊佔用空間比較大,在序列化物件時記錄了過多的類資訊。
- Flink實現了自己的序列化框架,Flink處理的資料流通常是一種型別,所以可以只儲存一份物件Schema資訊,節省儲存空間。又因為物件型別固定,所以可以通過偏移量存取。
- Java支援任意Java或Scala型別,型別資訊由
TypeInformation
類表示,TypeInformation 支援以下幾種型別:BasicTypeInfo
: 任意Java 基本型別或 String 型別。BasicArrayTypeInfo
: 任意Java基本型別陣列或 String 陣列。WritableTypeInfo
: 任意 Hadoop Writable 介面的實現類。TupleTypeInfo
: 任意的 Flink Tuple 型別(支援Tuple1 to Tuple25)。Flink tuples 是固定長度固定型別的Java Tuple實現。CaseClassTypeInfo
: 任意的 Scala CaseClass(包括 Scala tuples)。PojoTypeInfo
: 任意的 POJO (Java or Scala),例如,Java物件的所有成員變數,要麼是 public 修飾符定義,要麼有 getter/setter 方法。GenericTypeInfo
: 任意無法匹配之前幾種型別的類。
- 針對前六種型別資料集,Flink皆可以自動生成對應的TypeSerializer,能非常高效地對資料集進行序列化和反序列化。對於最後一種資料型別,Flink會使用Kryo進行序列化和反序列化。每個TypeInformation中,都包含了serializer,型別會自動通過serializer進行序列化,然後用Java Unsafe介面寫入MemorySegments。如下圖展示 一個內嵌型的Tuple3<integer,double,person>物件的序列化過程:
操縱二進位制資料:
- Flink 提供瞭如 group、sort、join 等操作,這些操作都需要訪問海量資料。以sort為例。
-
首先,Flink 會從 MemoryManager 中申請一批 MemorySegment,用來存放排序的資料。
- 這些記憶體會分為兩部分,一個區域是用來存放所有物件完整的二進位制資料。另一個區域用來存放指向完整二進位制資料的指標以及定長的序列化後的key(key+pointer)。將實際的資料和point+key分開存放有兩個目的。第一,交換定長塊(key+pointer)更高效,不用交換真實的資料也不用移動其他key和pointer。第二,這樣做是快取友好的,因為key都是連續儲存在記憶體中的,可以增加cache命中。 排序會先比較 key 大小,這樣就可以直接用二進位制的 key 比較而不需要反序列化出整個物件。訪問排序後的資料,可以沿著排好序的key+pointer順序訪問,通過 pointer 找到對應的真實資料。
9 Flink中的Window出現了資料傾斜,你有什麼解決辦法?
window產生資料傾斜指的是資料在不同的視窗內堆積的資料量相差過多。本質上產生這種情況的原因是資料來源頭髮送的資料量速度不同導致的。出現這種情況一般通過兩種方式來解決:
在資料進入視窗前做預聚合
重新設計視窗聚合的key
10 Flink中在使用聚合函式 GroupBy、Distinct、KeyBy 等函式時出現數據熱點該如何解決?
資料傾斜和資料熱點是所有大資料框架繞不過去的問題。處理這類問題主要從3個方面入手:
在業務上規避這類問題
例如一個假設訂單場景,北京和上海兩個城市訂單量增長几十倍,其餘城市的資料量不變。這時候我們在進行聚合的時候,北京和上海就會出現資料堆積,我們可以單獨資料北京和上海的資料。
Key的設計上
把熱key進行拆分,比如上個例子中的北京和上海,可以把北京和上海按照地區進行拆分聚合。
引數設定
Flink 1.9.0 SQL(Blink Planner) 效能優化中一項重要的改進就是升級了微批模型,即 MiniBatch。原理是快取一定的資料後再觸發處理,以減少對State的訪問,從而提升吞吐和減少資料的輸出量。
11 Flink任務延遲高,想解決這個問題,你會如何入手?
在Flink的後臺任務管理中,我們可以看到Flink的哪個運算元和task出現了反壓。最主要的手段是資源調優和運算元調優。
資源調優即是對作業中的Operator的併發數(parallelism)、CPU(core)、堆記憶體(heap_memory)等引數進行調優。作業引數調優包括:並行度的設定,State的設定,checkpoint的設定。
12 Flink是如何處理反壓的?
Flink 內部是基於 producer-consumer 模型來進行訊息傳遞的,Flink的反壓設計也是基於這個模型。Flink 使用了高效有界的分散式阻塞佇列,就像 Java 通用的阻塞佇列(BlockingQueue)一樣。下游消費者消費變慢,上游就會受到阻塞。
13 Flink的反壓和Strom有哪些不同?
Storm 是通過監控 Bolt 中的接收佇列負載情況,如果超過高水位值就會將反壓資訊寫到 Zookeeper ,Zookeeper 上的 watch 會通知該拓撲的所有 Worker 都進入反壓狀態,最後 Spout 停止傳送 tuple。Flink中的反壓使用了高效有界的分散式阻塞佇列,下游消費變慢會導致傳送端阻塞。二者最大的區別是Flink是逐級反壓,而Storm是直接從源頭降速。
14 Operator Chains(運算元鏈)這個概念你瞭解嗎?
為了更高效地分散式執行,Flink會盡可能地將operator的subtask連結(chain)在一起形成task。每個task在一個執行緒中執行。
將operators連結成task是非常有效的優化:它能減少執行緒之間的切換,減少訊息的序列化/反序列化,減少資料在緩衝區的交換,減少了延遲的同時提高整體的吞吐量。這就是我們所說的運算元鏈。
15 Flink什麼情況下才會把Operator chain在一起形成運算元鏈?
兩個operator chain在一起的的條件:
上下游的並行度一致
下游節點的入度為1 (也就是說下游節點沒有來自其他節點的輸入)
上下游節點都在同一個 slot group 中(下面會解釋 slot group)
下游節點的 chain 策略為 ALWAYS(可以與上下游連結,map、flatmap、filter等預設是ALWAYS)
上游節點的 chain 策略為 ALWAYS 或 HEAD(只能與下游連結,不能與上游連結,Source預設是HEAD)
兩個節點間資料分割槽方式是 forward(參考理解資料流的分割槽)
使用者沒有禁用 chain
16消費kafka資料的時候,如何處理髒資料?
可以在處理前加一個fliter運算元,將不符合規則的資料過濾出去。