[原始碼分析] 從例項和原始碼入手看 Flink 之廣播 Broadcast
阿新 • • 發佈:2020-03-29
# [原始碼分析] 從例項和原始碼入手看 Flink 之廣播 Broadcast
## 0x00 摘要
本文將通過原始碼分析和例項講解,帶領大家熟悉Flink的廣播變數機制。
## 0x01 業務需求
### 1. 場景需求
對黑名單中的IP進行檢測過濾。IP黑名單的內容會隨時增減,因此是可以隨時動態配置的。
該黑名單假設存在mysql中,Flink作業啟動時候會把這個黑名單從mysql載入,作為一個變數由Flink運算元使用。
### 2. 問題
我們不想重啟作業以便重新獲取這個變數。所以就需要一個能夠動態修改運算元裡變數的方法。
### 3. 解決方案
使用廣播的方式去解決。去做配置的動態更新。
廣播和普通的流資料不同的是:廣播流的1條流資料能夠被運算元的所有分割槽所處理,而資料流的1條流資料只能夠被運算元的某一分割槽處理。因此廣播流的特點也決定適合做配置的動態更新。
## 0x02 概述
廣播這部分有三個難點:使用步驟;如何自定義函式;如何存取狀態。下面就先為大家概述下。
### 1. broadcast的使用步驟
- 建立MapStateDescriptor
- 通過DataStream.broadcast方法返回廣播資料流BroadcastStream
- 通過DataStream.connect方法,把業務資料流和BroadcastStream進行連線,返回BroadcastConnectedStream
- 通過BroadcastConnectedStream.process方法分別進行processElement及processBroadcastElement處理
### 2. 使用者自定義處理函式
- BroadcastConnectedStream.process接收兩種型別的function:KeyedBroadcastProcessFunction 和 BroadcastProcessFunction
- 兩種型別的function都定義了processElement、processBroadcastElement抽象方法,只是KeyedBroadcastProcessFunction多定義了一個onTimer方法,預設是空操作,允許子類重寫
- processElement處理業務資料流
- processBroadcastElement處理廣播資料流
### 3. Broadcast State
- Broadcast State始終表示為MapState,即map format。這是Flink提供的最通用的狀態原語。是託管狀態的一種,託管狀態是由Flink框架管理的狀態,如ValueState, ListState, MapState等。
- 使用者必須建立一個 `MapStateDescriptor`,才能得到對應的狀態控制代碼。 這儲存了狀態名稱, 狀態所持有值的型別,並且可能包含使用者指定的函式
- checkpoint的時候也會checkpoint broadcast state
- Broadcast State只在記憶體有,沒有RocksDB state backend
- Flink 會將state廣播到每個task,注意該state並不會跨task傳播,對其修改僅僅是作用在其所在的task
- downstream tasks接收到broadcast event的順序可能不一樣,所以依賴其到達順序來處理element的時候要小心
## 0x03. 示例程式碼
### 1. 示例程式碼
我們直接從Flink原始碼入手可以找到理想的示例。 以下程式碼直接摘錄 Flink 原始碼 StatefulJobWBroadcastStateMigrationITCase,我會在裡面加上註釋說明。
```scala
@Test
def testRestoreSavepointWithBroadcast(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 以下兩個變數是為了確定廣播流發出的資料型別,廣播流可以同時發出多種型別的資料
lazy val firstBroadcastStateDesc = new MapStateDescriptor[Long, Long](
"broadcast-state-1",
BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]])
lazy val secondBroadcastStateDesc = new MapStateDescriptor[String, String](
"broadcast-state-2",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO)
env.setStateBackend(new MemoryStateBackend)
env.enableCheckpointing(500)
env.setParallelism(4)
env.setMaxParallelism(4)
// 資料流,這裡資料流和廣播流的Source都是同一種CheckpointedSource。資料流這裡做了一系列運算元操作,比如flatMap
val stream = env
.addSource(
new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource")
.keyBy(
new KeySelector[(Long, Long), Long] {
override def getKey(value: (Long, Long)): Long = value._1
}
)
.flatMap(new StatefulFlatMapper)
.keyBy(
new KeySelector[(Long, Long), Long] {
override def getKey(value: (Long, Long)): Long = value._1
}
)
// 廣播流
val broadcastStream = env
.addSource(
new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedBroadcastSource")
.broadcast(firstBroadcastStateDesc, secondBroadcastStateDesc)
// 把資料流和廣播流結合起來
stream
.connect(broadcastStream)
.process(new VerifyingBroadcastProcessFunction(expectedFirstState, expectedSecondState))
.addSink(new AccumulatorCountingSink)
}
}
// 使用者自定義的處理函式
class TestBroadcastProcessFunction
extends KeyedBroadcastProcessFunction
[Long, (Long, Long), (Long, Long), (Long, Long)] {
// 重點說明,這裡的 firstBroadcastStateDesc,secondBroadcastStateDesc 其實和之前廣播流的那兩個MapStateDescriptor無關。
// 這裡兩個MapStateDescriptor是為了存取BroadcastState,這樣在 processBroadcastElement和processElement之間就可以傳遞變量了。我們完全可以定義新的MapStateDescriptor,只要processBroadcastElement和processElement之間認可就行。
// 這裡引數 "broadcast-state-1" 是name, flink就是用這個 name 來從Flink執行時系統中存取MapStateDescriptor
lazy val firstBroadcastStateDesc = new MapStateDescriptor[Long, Long](
"broadcast-state-1",
BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]])
val secondBroadcastStateDesc = new MapStateDescriptor[String, String](
"broadcast-state-2",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO)
override def processElement(
value: (Long, Long),
ctx: KeyedBroadcastProcessFunction
[Long, (Long, Long), (Long, Long), (Long, Long)]#ReadOnlyContext,
out: Collector[(Long, Long)]): Unit = {
// 這裡Flink原始碼中是直接把接受到的業務變數直接再次轉發出去
out.collect(value)
}
override def processBroadcastElement(
value: (Long, Long),
ctx: KeyedBroadcastProcessFunction
[Long, (Long, Long), (Long, Long), (Long, Long)]#Context,
out: Collector[(Long, Long)]): Unit = {
// 這裡是把最新傳來的廣播變數儲存起來,processElement中可以取出再次使用. 具體是通過firstBroadcastStateDesc 的 name 來獲取 BroadcastState
ctx.getBroadcastState(firstBroadcastStateDesc).put(value._1, value._2)
ctx.getBroadcastState(secondBroadcastStateDesc).put(value._1.toString, value._2.toString)
}
}
// 廣播流和資料流的Source
private class CheckpointedSource(val numElements: Int)
extends SourceFunction[(Long, Long)] with CheckpointedFunction {
private var isRunning = true
private var state: ListState[CustomCaseClass] = _
// 就是簡單的定期傳送
override def run(ctx: SourceFunction.SourceContext[(Long, Long)]) {
ctx.emitWatermark(new Watermark(0))
ctx.getCheckpointLock synchronized {
var i = 0
while (i < numElements) {
ctx.collect(i, i)
i += 1
}
}
// don't emit a final watermark so that we don't trigger the registered event-time
// timers
while (isRunning) Thread.sleep(20)
}
}
```
### 2. 技術難點
#### MapStateDescriptor
首先要說明一些概念:
- Flink中包含兩種基礎的狀態:Keyed State和Operator State。
- Keyed State和Operator State又可以 以兩種形式存在:原始狀態和託管狀態。
- 託管狀態是由Flink框架管理的狀態,如ValueState, ListState, MapState等。
- raw state即原始狀態,由使用者自行管理狀態具體的資料結構,框架在做checkpoint的時候,使用byte[]來讀寫狀態內容,對其內部資料結構一無所知。
- MapState是託管狀態的一種:即狀態值為一個map。使用者通過`put`或`putAll`方法新增元素。
回到我們的例子,廣播變數就是OperatorState的一部分,是以託管狀態的MapState形式儲存的。具體getBroadcastState函式就是DefaultOperatorStateBackend中的實現
所以我們需要用MapStateDescriptor描述broadcast state,這裡MapStateDescriptor的使用比較靈活,因為是key,value類似使用,所以個人覺得value直接使用類,這樣更方便,尤其是對於從其他語言轉到scala的同學。
#### processBroadcastElement
```scala
// 因為主要起到控制作用,所以這個函式的處理相對簡單
override def processBroadcastElement(): Unit = {
// 這裡可以把最新傳來的廣播變數儲存起來,processElement中可以取出再次使用,比如
ctx.getBroadcastState(firstBroadcastStateDesc).put(value._1, value._2)
}
```
#### processElement
```java
// 這個函式需要和processBroadcastElement配合起來使用
override def processElement(): Unit = {
// 可以取出processBroadcastElement之前儲存的廣播變數,然後用此來處理業務變數,比如
val secondBroadcastStateDesc = new MapStateDescriptor[String, String](
"broadcast-state-2",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO)
var actualSecondState = Map[String, String]()
for (entry <- ctx.getBroadcastState(secondBroadcastStateDesc).immutableEntries()) {
val v = secondExpectedBroadcastState.get(entry.getKey).get
actualSecondState += (entry.getKey -> entry.getValue)
}
// 甚至這裡只要和processBroadcastElement一起關聯好,可以儲存任意型別的變數。不必須要和廣播變數的型別一致。重點是宣告新的對應的MapStateDescriptor
// MapStateDescriptor繼承了StateDescriptor,其中state為MapState型別,value為Map型別
}
```
#### 結合起來使用
因為某些限制,所以下面只能從網上找一個例子給大家講講。
```java
// 模式始終儲存在MapState中,並將null作為鍵。broadcast state始終表示為MapState,這是Flink提供的最通用的狀態原語。
MapStateDe