一文搞懂 Flink 中的鎖
阿新 • • 發佈:2020-12-29
技術標籤:Apache Flinkflinkflink原始碼flink timerflink lock
之前在介紹 flink timer 的時候( 一文搞懂 Flink Timer ) 官網有這樣的一句話
Flink synchronizes invocations of onTimer() and processElement(). Hence, users do not have to worry about concurrent modification of state.
當時覺得特別奇怪,今天我們就一起來看一下,flink 是如何保證 onTimer 與 processElement 同步的以及其他使用 lock 的地方
由 一文搞定 Flink 消費訊息的全流程 我們可以知道,當運算元處理 msg 時,保持同步
// 這裡就是真正的,使用者的程式碼即將被執行的地方
// now we can do the actual processing
StreamRecord<IN> record = recordOrMark.asRecord();
//處理每條 record lock
// 所以如果是 window 由 processElement 導致的 window fire 也會被 lock 住
synchronized (lock) {
numRecordsIn. inc();
//throught KeySelector set KeyContext setCurrentKey
streamOperator.setKeyContextElement1(record);
//處理資料
streamOperator.processElement(record);
}
由 一文搞懂 flink 處理水印全過程 我們可以知道下游運算元處理水印時,會保持同步
public void handleWatermark(Watermark watermark) {
try {
// event time lock
synchronized (lock) {
watermarkGauge.setCurrentWatermark(watermark.getTimestamp());//gauge
//處理 watermark 的入口
operator.processWatermark(watermark);
}
} catch (Exception e) {
throw new RuntimeException("Exception occurred while processing valve output watermark: ", e);
}
}
這是 event timer觸發的過程, 同理 process timer
@Override
public void run() {
// process timer lock
synchronized (lock) {
try {
if (serviceStatus.get() == STATUS_ALIVE) {
target.onProcessingTime(timestamp);
}
} catch (Throwable t) {
TimerException asyncException = new TimerException(t);
exceptionHandler.handleAsyncException("Caught exception while processing timer.", asyncException);
}
}
}
其中 lock 均來自於 StreamTask 的
private final Object lock = new Object();
另外 lock 除了應用於 ontimer() 與 processElement() 方法外,還應用於
處理水印、處理 record、triggerCheckpoint、kafka 傳送 msg、update offset