1. 程式人生 > 其它 >一文搞懂 Flink 中的鎖

一文搞懂 Flink 中的鎖

技術標籤:Apache Flinkflinkflink原始碼flink timerflink lock

之前在介紹 flink timer 的時候( 一文搞懂 Flink Timer ) 官網有這樣的一句話

Flink synchronizes invocations of onTimer() and processElement(). Hence, users do not have to worry about concurrent modification of state.

當時覺得特別奇怪,今天我們就一起來看一下,flink 是如何保證 onTimer 與 processElement 同步的以及其他使用 lock 的地方

一文搞定 Flink 消費訊息的全流程 我們可以知道,當運算元處理 msg 時,保持同步

// 這裡就是真正的,使用者的程式碼即將被執行的地方
						// now we can do the actual processing
						StreamRecord<IN> record = recordOrMark.asRecord();
						//處理每條 record lock
						// 所以如果是 window 由 processElement 導致的 window fire 也會被 lock 住
						synchronized (lock) {
							numRecordsIn.
inc(); //throught KeySelector set KeyContext setCurrentKey streamOperator.setKeyContextElement1(record); //處理資料 streamOperator.processElement(record); }

一文搞懂 flink 處理水印全過程 我們可以知道下游運算元處理水印時,會保持同步

public void handleWatermark(Watermark watermark) {
			try {
				// event time lock
synchronized (lock) { watermarkGauge.setCurrentWatermark(watermark.getTimestamp());//gauge //處理 watermark 的入口 operator.processWatermark(watermark); } } catch (Exception e) { throw new RuntimeException("Exception occurred while processing valve output watermark: ", e); } }

這是 event timer觸發的過程, 同理 process timer

@Override
		public void run() {
			// process timer lock
			synchronized (lock) {
				try {
					if (serviceStatus.get() == STATUS_ALIVE) {
						target.onProcessingTime(timestamp);
					}
				} catch (Throwable t) {
					TimerException asyncException = new TimerException(t);
					exceptionHandler.handleAsyncException("Caught exception while processing timer.", asyncException);
				}
			}
		}

其中 lock 均來自於 StreamTask 的

private final Object lock = new Object();

另外 lock 除了應用於 ontimer() 與 processElement() 方法外,還應用於
處理水印、處理 record、triggerCheckpoint、kafka 傳送 msg、update offset