[原始碼解析] 從TimeoutException看Flink的心跳機制
[原始碼解析] 從TimeoutException看Flink的心跳機制
目錄
- [原始碼解析] 從TimeoutException看Flink的心跳機制
0x00 摘要
本文從一個除錯時候常見的異常 "TimeoutException: Heartbeat of TaskManager timed out"切入,為大家剖析Flink的心跳機制。文中程式碼基於Flink 1.10。
0x01 緣由
大家如果經常除錯Flink,當進入斷點看到了堆疊和變數內容之後,你容易陷入了沉思。當你發現了問題可能所在,高興的讓程式Resume的時候,你發現程式無法執行,有如下提示:
Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id 93aa1740-cd2c-4032-b74a-5f256edb3217 timed out.
這實在是很鬱悶的事情。作為程式猿不能忍啊,既然異常提示中有 Heartbeat 字樣,於是我們就來一起看看Flink的心跳機制,看看有沒有可以修改的途徑。
0x02 背景概念
2.1 四大模組
Flink有核心四大元件:Dispatcher,JobMaster,ResourceManager,TaskExecutor。
- Dispatcher(Application Master)用於接收client提交的任務和啟動相應的JobManager。其提供REST介面來接收client的application提交,負責啟動JM和提交application,同時執行Web UI。
- ResourceManager:主要用於資源的申請和分配。當TM有空閒的slot就會告訴JM,沒有足夠的slot也會啟動新的TM。kill掉長時間空閒的TM。
- JobMaster :功能主要包括(舊版本中JobManager的功能在新版本中以JobMaster形式出現,可能本文中會混淆這兩個詞,請大家諒解):
- 將JobGraph轉化為ExecutionGraph(physical dataflow graph,並行化)。
- 向RM申請資源、schedule tasks、儲存作業的元資料。
- TaskManager:類似Spark的executor,會跑多個執行緒的task、資料快取與交換。Flink 架構遵循 Master - Slave 架構設計原則,JobMaster 為 Master 節點,TaskManager 為Slave節點。
這四大元件彼此之間的通訊需要依賴RPC實現。
2.2 Akka
Flink底層RPC基於Akka實現。Akka是一個開發併發、容錯和可伸縮應用的框架。它是Actor Model的一個實現,和Erlang的併發模型很像。在Actor模型中,所有的實體被認為是獨立的actors。actors和其他actors通過傳送非同步訊息通訊。
Actor模型的強大來自於非同步。它也可以顯式等待響應,這使得可以執行同步操作。但是強烈不建議同步訊息,因為它們限制了系統的伸縮性。
2.3 RPC機制
RPC作用是:讓非同步呼叫看起來像同步呼叫。
Flink基於Akka構建了其底層通訊系統,引入了RPC呼叫,各節點通過GateWay方式回撥,隱藏通訊元件的細節,實現解耦。Flink整個通訊框架的元件主要由RpcEndpoint、RpcService、RpcServer、AkkaInvocationHandler、AkkaRpcActor等構成。
RPC相關的主要介面如下:
- RpcEndpoint
- RpcService
- RpcGateway
2.3.1 RpcEndpoint:RPC的基類
RpcEndpoint是Flink RPC終端的基類,所有提供遠端過程呼叫的分散式元件必須擴充套件RpcEndpoint,其功能由RpcService支援。
RpcEndpoint的子類只有四類元件:Dispatcher,JobMaster,ResourceManager,TaskExecutor,即Flink中只有這四個元件有RPC的能力,換句話說只有這四個元件有RPC的這個需求。
每個RpcEndpoint對應了一個路徑(endpointId和actorSystem共同確定),每個路徑對應一個Actor,其實現了RpcGateway介面,
RpcService:RPC服務提供者
RpcServer是RpcEndpoint的成員變數,為RpcService提供RPC服務/連線遠端Server,其只有一個子類實現:AkkaRpcService(可見目前Flink的通訊方式依然是Akka)。
RpcServer用於啟動和連線到RpcEndpoint, 連線到rpc伺服器將返回一個RpcGateway,可用於呼叫遠端過程。
Flink四大元件Dispatcher,JobMaster,ResourceManager,TaskExecutor,都是RpcEndpoint的實現,所以構建四大元件時,同步需要初始化RpcServer。如JobManager的構造方式,第一個引數就是需要知道RpcService。
RpcGateway:RPC呼叫的閘道器
Flink的RPC協議通過RpcGateway來定義;由前面可知,若想與遠端Actor通訊,則必須提供地址(ip和port),如在Flink-on-Yarn模式下,JobMaster會先啟動ActorSystem,此時TaskExecutor的Container還未分配,後面與TaskExecutor通訊時,必須讓其提供對應地址。
Dispatcher,JobMaster,ResourceManager,TaskExecutor 這四大元件通過各種方式實現了Gateway。以JobMaster為例,JobMaster實現JobMasterGateway介面。各元件類的成員變數都有需要通訊的其他元件的GateWay實現類,這樣可通過各自的Gateway實現RPC呼叫。
2.4 常見心跳機制
常見的心跳檢測有兩種:
- socket 套接字SO_KEEPALIVE本身帶有的心跳機制,定期向對方傳送心跳包,對方收到心跳包後會自動回覆;
- 應用自身實現心跳機制,同樣也是使用定期傳送請求的方式;
Flink實現的是第二種方案。
0x03 Flink心跳機制
3.1 程式碼和機制
Flink的心跳機製程式碼在:
Flink-master/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat
四個介面:
HeartbeatListener.java HeartbeatManager.java HeartbeatTarget.java HeartbeatMonitor.java
以及如下幾個類:
HeartbeatManagerImpl.java HeartbeatManagerSenderImpl.java HeartbeatMonitorImpl.java
HeartbeatServices.java NoOpHeartbeatManager.java
Flink叢集有多種業務流程,比如Resource Manager, Task Manager, Job Manager。每種業務流程都有自己的心跳機制。Flink的心跳機制只是提供介面和基本功能,具體業務功能由各業務流程自己實現。
我們首先設定 心跳系統中有兩種節點:sender和receiver。心跳機制是sender和receivers彼此相互檢測。但是檢測動作是Sender主動發起,即Sender主動傳送請求探測receiver是否存活,因為Sender已經傳送過來了探測心跳請求,所以這樣receiver同時也知道Sender是存活的,然後Reciver給Sender迴應一個心跳錶示自己也是活著的。
因為Flink的幾個名詞和我們常見概念有所差別,所以流程上需要大家仔細甄別,即:
- Flink Sender 主動傳送Request請求給Receiver,要求Receiver迴應一個心跳;
- Flink Receiver 收到Request之後,通過Receive函式迴應一個心跳請求給Sender;
3.2 靜態架構
3.2.1 HeartbeatTarget :監控目標抽象
HeartbeatTarget是對監控目標的抽象。心跳機制在行為上而言有兩種動作:
- 向某個節點傳送請求。
- 處理某個節點發來的請求。
HeartbeatTarget的函式就是這兩個動作:
- receiveHeartbeat :向某個節點(Sender)傳送心跳回應,其引數heartbeatOrigin 就是 Receiver。
- requestHeartbeat :向某個節點(Receiver)要求其迴應一個心跳,其引數requestOrigin 就是 Sender。requestHeartbeat這個函式是Sender的函式,其中Sender通過RPC直接呼叫到Receiver。
這兩個函式的引數也很簡單:分別是請求的傳送放和接收方,還有Payload載荷。對於一個確定節點而言,接收的和傳送的載荷是同一型別的。
public interface HeartbeatTarget<I> {
/**
* Sends a heartbeat response to the target.
* @param heartbeatOrigin Resource ID identifying the machine for which a heartbeat shall be reported.
*/
// heartbeatOrigin 就是 Receiver
void receiveHeartbeat(ResourceID heartbeatOrigin, I heartbeatPayload);
/**
* Requests a heartbeat from the target.
* @param requestOrigin Resource ID identifying the machine issuing the heartbeat request.
*/
// requestOrigin 就是 Sender
void requestHeartbeat(ResourceID requestOrigin, I heartbeatPayload);
}
3.2.2 HeartbeatMonitor : 管理heartbeat target的心跳狀態
對HeartbeatTarget的封裝,這樣Manager對Target的操作是通過對Monitor完成,後續會在其繼承類中詳細說明。
public interface HeartbeatMonitor<O> {
// Gets heartbeat target.
HeartbeatTarget<O> getHeartbeatTarget();
// Gets heartbeat target id.
ResourceID getHeartbeatTargetId();
// Report heartbeat from the monitored target.
void reportHeartbeat();
//Cancel this monitor.
void cancel();
//Gets the last heartbeat.
long getLastHeartbeat();
}
3.2.3 HeartbeatManager :心跳管理者
HeartbeatManager負責管理心跳機制,比如啟動/停止/報告一個HeartbeatTarget。此介面繼承HeartbeatTarget。
除了HeartbeatTarget的函式之外,這介面有4個函式:
- monitorTarget,把和某資源對應的節點加入到心跳監控列表;
- unmonitorTarget,從心跳監控列表刪除某資源對應的節點;
- stop,停止心跳管理服務,釋放資源;
- getLastHeartbeatFrom,獲取某節點的最後一次心跳資料。
public interface HeartbeatManager<I, O> extends HeartbeatTarget<I> {
void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget);
void unmonitorTarget(ResourceID resourceID);
void stop();
long getLastHeartbeatFrom(ResourceID resourceId);
}
3.2.4 HearbeatListener 處理心跳結果
使用者業務邏輯需要繼承這個介面以處理心跳結果。其可以看做服務的輸出,實現了三個回撥函式。
- notifyHeartbeatTimeout,處理節點心跳超時
- reportPayload,處理節點發來的Payload載荷
- retrievePayLoad。獲取對某節點發下一次心跳請求的Payload載荷
public interface HeartbeatListener<I, O> {
void notifyHeartbeatTimeout(ResourceID resourceID);
void reportPayload(ResourceID resourceID, I payload);
O retrievePayload(ResourceID resourceID);
}
3.3 動態執行機制
之前提到Sender和Receiver,下面兩個類就對應上述概念。
- HeartbeatManagerImpl :Receiver,存在於JobMaster與TaskExecutor中;
- HeartbeatManagerSenderImpl :Sender,繼承 HeartbeatManagerImpl類,用於週期傳送心跳要求,存在於JobMaster、ResourceManager中。
幾個關鍵問題:
如何判定心跳超時?
心跳服務啟動後,Flink在Monitor中通過 ScheduledFuture 會啟動一個執行緒來處理心跳超時事件。在設定的心跳超時時間到達後才執行執行緒。
如果在設定的心跳超時時間內接收到元件的心跳訊息,會先將該執行緒取消而後重新開啟,重置心跳超時事件的觸發。
如果在設定的心跳超時時間內沒有收到元件的心跳,則會通知元件:你超時了。
何時"呼叫雙方"發起心跳檢查?
心跳檢查是雙向的,一方(Sender)會主動發起心跳請求,而另一方(Receiver)則是對心跳做出響應,兩者通過RPC相互呼叫,重置對方的 Monitor 超時執行緒。
以JobMaster和TaskManager為例,JM在啟動時會開啟週期排程,向已經註冊到JM中的TM發起心跳檢查,通過RPC呼叫TM的requestHeartbeat方法,重置TM中對JM超時執行緒的呼叫,表示當前JM狀態正常。在TM的requestHeartbeat方法被呼叫後,通過RPC呼叫JM的receiveHeartbeat,重置 JM 中對TM超時執行緒的呼叫,表示TM狀態正常。
如何處理心跳超時?
心跳服務依賴 HeartbeatListener,當在timeout時間範圍內未接收到心跳響應,則會觸發超時處理執行緒,該執行緒通過呼叫
HeartbeatListener.notifyHeartbeatTimeout
方法做後續重連操作或者直接斷開。
下面是一個概要(以RM & TM為例):
RM : 實現了ResourceManagerGateway (可以直接被RPC呼叫)
TM : 實現了TaskExecutorGateway (可以直接被RPC呼叫)
RM :有一個Sender HM : taskManagerHeartbeatManager,Sender HM 擁有使用者定義的 TaskManagerHeartbeatListener
TM :有一個Receiver HM :resourceManagerHeartbeatManager,Receiver HM 擁有使用者定義的ResourceManagerHeartbeatListener。
HeartbeatManager 有一個ConcurrentHashMap<ResourceID, HeartbeatMonitor> heartbeatTargets,這個Map是它監控的所有Target。
對於RM的每一個需要監控的TM, 其生成一個HeartbeatTarget,進而被構造成一個HeartbeatMonitor,放置到ResourceManager.taskManagerHeartbeatManager中。
每一個Target對應的Monitor中,有自己的非同步任務ScheduledFuture,這個ScheduledFuture不停的被取消/重新生成。如果在某個期間內沒有被取消,則通知使用者定義的listener出現了timeout。
3.3.1 HearbeatManagerImpl : Receiver
HearbeatManagerImpl是receiver的具體實現。它由 心跳 被髮起方(就是Receiver,例如TM) 建立,接收 發起方(就是Sender,例如 JM)的心跳傳送請求。心跳超時 會觸發 heartbeatListener.notifyHeartbeatTimeout方法。
注意:被髮起方監控執行緒(Monitor)的開啟是在接收到請求心跳(requestHeartbeat被呼叫後)以後才觸發的,屬於被動觸發。
HearbeatManagerImpl主要維護了
一個心跳監控列表 map :
<ResourceID, HeartbeatMonitor<O>> heartbeatTargets;
。這是一個KV關聯。key代表要傳送心跳元件(例如:TM)的ID,value則是為當前元件建立的觸發心跳超時的執行緒HeartbeatMonitor,兩者一一對應。
當一個從所聯絡的machine發過來的心跳被收到時候,對應的monitor的狀態會被更新(重啟一個新ScheduledFuture)。當一個monitor發現了一個 heartbeat timed out,它會通知自己的HeartbeatListener。
一個 ScheduledExecutor mainThreadExecutor 負責heartbeat timeout notifications。
heartbeatListener :處理心跳結果。
HearbeatManagerImpl 資料結構如下:
@ThreadSafe
public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {
/** Heartbeat timeout interval in milli seconds. */
private final long heartbeatTimeoutIntervalMs;
/** Resource ID which is used to mark one own's heartbeat signals. */
private final ResourceID ownResourceID;
/** Heartbeat listener with which the heartbeat manager has been associated. */
private final HeartbeatListener<I, O> heartbeatListener;
/** Executor service used to run heartbeat timeout notifications. */
private final ScheduledExecutor mainThreadExecutor;
/** Map containing the heartbeat monitors associated with the respective resource ID. */
private final ConcurrentHashMap<ResourceID, HeartbeatMonitor<O>> heartbeatTargets;
/** Running state of the heartbeat manager. */
protected volatile boolean stopped;
}
HearbeatManagerImpl實現的主要函式有:
- monitorTarget :把一個節點加入到心跳監控列表。
- 傳入引數有:ResourceId和HearbeatTarget,monitorTarget根據這兩個引數,生成一個HeartbeatMonitor物件,然後把這個物件跟ResrouceId做kv關聯,存入到heartbeatTargets。 一個節點可能參與多個業務流程,因此一個節點參與多個心跳流程,一個節點上執行多個不同型別的HearbeatTarget。所以一個ResourceID可能會跟不同型別的HearbeatTarget物件關聯,分別加入到多個HeartbeatManager,進行不同型別的心跳監控。也因此這個函式入參是兩個引數。
- requestHeartbeat :Sender通過RPC非同步呼叫到Receiver的這個函式 以要求receiver向requestOrigin節點(就是Sender)發起一次心跳響應,載荷是heartbeatPayLoad。其內部流程如下:
- 首先會呼叫reportHeartbeat函式,作用是 通過Monitor 記錄發起請求的這個時間點,然後建立一個ScheduleFuture。如果到期後,requestOrigin沒有作出響應,那麼就將requestOrigin節點對應的HeartbeatMonitor的state設定成TIMEOUT狀態,如果到期內requestOrigin響應了,ScheduleFuture會被取消,HeartbeatMonitor的state仍然是RUNNING。
- 其次呼叫reportPayload函式,把requestOrigin節點的最新的heartbeatPayload通知給heartbeatListener。heartbeatListener是外部傳入的,它根據所有節點的心跳記錄做監聽管理。
- 最後呼叫receiveHearbeat函式,響應一個心跳給Sender。
3.3.2 HeartbeatManagerSenderImpl : Sender
繼承HearbeatManagerImpl,由心跳管理的一方(例如JM)建立,實現了run函式(即它可以作為一個單獨執行緒執行),建立後立即開啟週期排程執行緒,每次遍歷自己管理的heartbeatTarget,觸發heartbeatTarget.requestHeartbeat,要求 Target 返回一個心跳響應。屬於主動觸發心跳請求。
public class HeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerImpl<I, O> implements Runnable {
public void run() {
if (!stopped) {
for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) {
requestHeartbeat(heartbeatMonitor);
}
// 週期排程
getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
}
}
// 主動發起心跳檢查
private void requestHeartbeat(HeartbeatMonitor<O> heartbeatMonitor) {
O payload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId());
final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget();
// 呼叫 Target 的 requestHeartbeat 函式
heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload);
}
}
3.3.3 HeartbeatMonitorImpl
Heartbeat monitor管理心跳目標,它啟動一個ScheduledExecutor。
- 如果在timeout時間內沒有接收到心跳訊號,則判定心跳超時,通知給HeartbeatListener。
- 如果在timeout時間內接收到心跳訊號,則重置當前ScheduledExecutor。
public class HeartbeatMonitorImpl<O> implements HeartbeatMonitor<O>, Runnable {
/** Resource ID of the monitored heartbeat target. */
private final ResourceID resourceID; // 被監控的resource ID
/** Associated heartbeat target. */
private final HeartbeatTarget<O> heartbeatTarget; //心跳目標
private final ScheduledExecutor scheduledExecutor;
/** Listener which is notified about heartbeat timeouts. */
private final HeartbeatListener<?, ?> heartbeatListener; // 心跳監聽器
/** Maximum heartbeat timeout interval. */
private final long heartbeatTimeoutIntervalMs;
private volatile ScheduledFuture<?> futureTimeout;
// AtomicReference 使用
private final AtomicReference<State> state = new AtomicReference<>(State.RUNNING);
// 最近一次接收到心跳的時間
private volatile long lastHeartbeat;
// 報告心跳
public void reportHeartbeat() {
// 保留最近一次接收心跳時間
lastHeartbeat = System.currentTimeMillis();
// 接收心跳後,重置timeout執行緒
resetHeartbeatTimeout(heartbeatTimeoutIntervalMs);
}
// 心跳超時,觸發lister的notifyHeartbeatTimeout
public void run() {
// The heartbeat has timed out if we're in state running
if (state.compareAndSet(State.RUNNING, State.TIMEOUT)) {
heartbeatListener.notifyHeartbeatTimeout(resourceID);
}
}
// 重置TIMEOUT
void resetHeartbeatTimeout(long heartbeatTimeout) {
if (state.get() == State.RUNNING) {
//先取消執行緒,在重新開啟
cancelTimeout();
// 啟動超時執行緒
futureTimeout = scheduledExecutor.schedule(this, heartbeatTimeout, TimeUnit.MILLISECONDS);
// Double check for concurrent accesses (e.g. a firing of the scheduled future)
if (state.get() != State.RUNNING) {
cancelTimeout();
}
}
}
3.3.3 HeartbeatServices
建立heartbeat receivers and heartbeat senders,主要是對外提供服務。這裡我們可以看到:
- HeartbeatManagerImpl就是receivers。
- HeartbeatManagerSenderImpl就是senders。
public class HeartbeatServices {
// Creates a heartbeat manager which does not actively send heartbeats.
public <I, O> HeartbeatManager<I, O> createHeartbeatManager(...) {
return new HeartbeatManagerImpl<>(...);
}
// Creates a heartbeat manager which actively sends heartbeats to monitoring targets.
public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(...) {
return new HeartbeatManagerSenderImpl<>(...);
}
}
0x04 初始化
4.1 心跳服務建立
心跳管理服務在Cluster入口建立。因為我們是除錯,所以在MiniCluster.start呼叫。
public void start() throws Exception {
......
heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
......
}
HeartbeatServices.fromConfiguration會從Configuration中獲取配置資訊:
- 心跳間隔 heartbeat.interval
- 心跳超時時間 heartbeat.timeout
這個就是我們解決最開始問題的思路:從配置資訊入手,擴大心跳間隔。
public HeartbeatServices(long heartbeatInterval, long heartbeatTimeout) {
this.heartbeatInterval = heartbeatInterval;
this.heartbeatTimeout = heartbeatTimeout;
}
public static HeartbeatServices fromConfiguration(Configuration configuration) {
long heartbeatInterval = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL);
long heartbeatTimeout = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT);
return new HeartbeatServices(heartbeatInterval, heartbeatTimeout);
}
0x05 Flink中具體應用
5.1 總述
5.1.1 RM, JM, TM之間關係
系統中有幾個ResourceManager?整個 Flink 叢集中只有一個 ResourceManager。
系統中有幾個JobManager?JobManager 負責管理作業的執行。預設情況下,每個 Flink 叢集只有一個 JobManager 例項。JobManager 相當於整個叢集的 Master 節點,負責整個叢集的任務管理和資源管理。
系統中有幾個TaskManager?這個由具體啟動方式決定。比如Flink on Yarn,Session模式能夠指定拉起多少個TaskManager。 Per job模式中TaskManager數量是在提交作業時根據併發度動態計算,即Number of TM = Parallelism/numberOfTaskSlots。比如:有一個作業,Parallelism為10,numberOfTaskSlots為1,則TaskManager為10。
5.1.2 三者間心跳機制
Flink中ResourceManager、JobMaster、TaskExecutor三者之間存在相互檢測的心跳機制:
- ResourceManager會主動傳送請求探測JobMaster、TaskExecutor是否存活。
- JobMaster也會主動傳送請求探測TaskExecutor是否存活,以便進行任務重啟或者失敗處理。
我們之前講過,HeartbeatManagerSenderImpl屬於Sender,HeartbeatManagerImpl屬於Receiver。
- HeartbeatManagerImpl所處位置可以理解為client,存在於JobMaster與TaskExecutor中;
- HeartbeatManagerSenderImpl類,繼承 HeartbeatManagerImpl類,用於週期傳送心跳請求,所處位置可以理解為server, 存在於JobMaster、ResourceManager中。
ResourceManager 級別最高,所以兩個HM都是Sender,監控taskManager和jobManager
public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
extends FencedRpcEndpoint<ResourceManagerId>
implements ResourceManagerGateway, LeaderContender {
taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender
jobManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender
}
JobMaster級別中等,一個Sender, 一個Receiver,受到ResourceManager的監控,監控taskManager。
public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService {
taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender
resourceManagerHeartbeatManager = heartbeatServices.createHeartbeatManager
}
TaskExecutor級別最低,兩個Receiver,分別被JM和RM疾控。
public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
this.jobManagerHeartbeatManager = return heartbeatServices.createHeartbeatManager
this.resourceManagerHeartbeatManager = return heartbeatServices.createHeartbeatManager
}
以JobManager和TaskManager為例。JM在啟動時會開啟週期排程,向已經註冊到JM中的TM發起心跳檢查,通過RPC呼叫TM的requestHeartbeat方法,重置對JM超時執行緒的呼叫,表示當前JM狀態正常。在TM的requestHeartbeat方法被呼叫後,通過RPC呼叫JM的receiveHeartbeat,重置對TM超時執行緒的呼叫,表示TM狀態正常。
5.2 初始化過程
5.2.1 TaskExecutor初始化
TM初始化生成了兩個Receiver HM。
public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
/** The heartbeat manager for job manager in the task manager. */
private final HeartbeatManager<AllocatedSlotReport, AccumulatorReport> jobManagerHeartbeatManager;
/** The heartbeat manager for resource manager in the task manager. */
private final HeartbeatManager<Void, TaskExecutorHeartbeatPayload> resourceManagerHeartbeatManager;
//初始化函式
this.jobManagerHeartbeatManager = createJobManagerHeartbeatManager(heartbeatServices, resourceId);
this.resourceManagerHeartbeatManager = createResourceManagerHeartbeatManager(heartbeatServices, resourceId);
}
生成HeartbeatManager時,就註冊了ResourceManagerHeartbeatListener和JobManagerHeartbeatListener。
此時,兩個HeartbeatManagerImpl中已經建立好對應monitor執行緒,只有在JM或者RM執行requestHeartbeat後,才會觸發該執行緒的執行。
5.2.2 JobMaster的初始化
JM生成了一個Sender HM,一個Receiver HM。這裡會註冊 TaskManagerHeartbeatListener 和 ResourceManagerHeartbeatListener
public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService {
private HeartbeatManager<AccumulatorReport, AllocatedSlotReport> taskManagerHeartbeatManager;
private HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;
private void startHeartbeatServices() {
taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(
resourceId,
new TaskManagerHeartbeatListener(),
getMainThreadExecutor(),
log);
resourceManagerHeartbeatManager = heartbeatServices.createHeartbeatManager(
resourceId,
new ResourceManagerHeartbeatListener(),
getMainThreadExecutor(),
log);
}
}
5.2.3 ResourceManager初始化
JobMaster在啟動時候,會在startHeartbeatServices函式中生成兩個Sender HeartbeatManager。
taskManagerHeartbeatManager :HeartbeatManagerSenderImpl物件,會反覆啟動一個定時器,定時掃描需要探測的物件並且傳送心跳請求。
jobManagerHeartbeatManager :HeartbeatManagerSenderImpl,會反覆啟動一個定時器,定時掃描需要探測的物件並且傳送心跳請求。
taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(
resourceId,
new TaskManagerHeartbeatListener(),
getMainThreadExecutor(),
log);
jobManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(
resourceId,
new JobManagerHeartbeatListener(),
getMainThreadExecutor(),
log);
5.3 註冊過程
我們以TM與RM互動為例。TaskExecutor啟動之後,需要註冊到RM和JM中。
流程圖如下:
* 1. Run in Task Manager
*
* TaskExecutor.onStart //Life cycle
* |
* +----> [email protected]
* | //開始TM服務
* |
* +----> resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
* | // 開始連線到RM
* | // start by connecting to the ResourceManager
* |
* +----> [email protected]
* | // 當RM狀態變化之後,將回撥到這裡
* | // The listener for leader changes of the resource manager.
* |
* +----> [email protected]
* | // 以下三步呼叫是漸進的,就是與RM聯絡。
* |
* +----> [email protected]
* |
* +----> connectToResourceManager()@TaskExecutor
* | // 主要作用是生成了 TaskExecutorToResourceManagerConnection
* |
* +----> [email protected]
* | // 開始RPC呼叫,將會呼叫到其基類RegisteredRpcConnection的start
* |
* +----> [email protected]
* | // RegisteredRpcConnection實現了元件之間註冊聯絡的基本RPC
* |
* ~~~~~~~~ 這裡是 Akka RPC
* 2. Run in Resource Manager
* 現在程式執行序列到達了RM, 主要是新增一個Target到RM 的 Sender HM;
*
* [email protected]
* |
* +----> taskExecutorGatewayFuture.handleAsync
* | // 非同步呼叫到這裡
* |
* +----> [email protected]
* | // RM的內部實現,將把TM註冊到RM自己這裡
* |
* +----> taskManagerHeartbeatManager.monitorTarget
* | // 生成HeartbeatMonitor,
* |
* +----> heartbeatTargets.put(resourceID,heartbeatMonitor);
* | // 把Monitor放到 HM in TM之中,就是說TM開始監控了RM
* |
* ~~~~~~~~ 這裡是 Akka RPC
* 3. Run in Task Manager
* 現在程式回到了TM, 主要是新增一個Target到 TM 的 Receiver HM;
*
* [email protected]
* |
* |
* +----> [email protected]
* | // 回撥函式
* |
* +----> runAsync(establishResourceManagerConnection)
* | // 非同步執行
* |
* +----> [email protected]
* | // 說明已經和RM建立了聯絡,所以可以開始監控RM了
* |
* +----> resourceManagerHeartbeatManager.monitorTarget
* | // 生成HeartbeatMonitor,
* |
* +----> heartbeatTargets.put(resourceID,heartbeatMonitor);
* | // 把 RM 也註冊到 TM了
* | // monitor the resource manager as heartbeat target
下面是具體文字描述。
5.3.1 TM註冊到RM中
5.3.1.1 TM的操作
- TaskExecutor啟動之後,呼叫onStart,開始其生命週期。
- onStart直接呼叫startTaskExecutorServices。
- 啟動服務的第一步就是與ResourceManager取得聯絡,這裡註冊了一個ResourceManagerLeaderListener(),用來監聽RM Leader的變化。
private final LeaderRetrievalService resourceManagerLeaderRetriever;
// resourceManagerLeaderRetriever其實是EmbeddedLeaderService的實現,A simple leader election service, which selects a leader among contenders and notifies listeners.
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
- 當得到RM Leader的地址之後,會呼叫到回撥函式[email protected],然後呼叫notifyOfNewResourceManagerLeader。
- notifyOfNewResourceManagerLeader中獲取到RM地址後,就通過reconnectToResourceManager與RM聯絡。
- reconnectToResourceManager中間接呼叫到TaskExecutorToResourceManagerConnection。其作用是建立TaskExecutor 和 ResourceManager之間的聯絡。因為知道 ResourceManagerGateway所以才能進行RPC操作。
- 然後在 TaskExecutorToResourceManagerConnection中,就通過RPC與RM聯絡。
5.3.1.2 RM的操作
- RPC呼叫後,程式就來到了RM中,RM做如下操作:
- 會註冊一個新的TaskExecutor到自己的taskManagerHeartbeatManager中。
- [email protected]會通過非同步呼叫到registerTaskExecutorInternal。
- registerTaskExecutorInternal中首先看看是否這個TaskExecutor的ResourceID之前註冊過,如果註冊過就移除再新增一個新的TaskExecutor。
- 通過 taskManagerHeartbeatManager.monitorTarget 開始進行心跳機制的註冊。
taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>() {
public void receiveHeartbeat(ResourceID resourceID, Void payload) {
// the ResourceManager will always send heartbeat requests to the
// TaskManager
}
public void requestHeartbeat(ResourceID resourceID, Void payload) {
taskExecutorGateway.heartbeatFromResourceManager(resourceID);
}
});
當註冊完成後,RM中的Sender HM內部結構如下,能看出來多了一個Target:
taskManagerHeartbeatManager = {[email protected]}
heartbeatPeriod = 10000
heartbeatTimeoutIntervalMs = 50000
ownResourceID = {[email protected]} "040709f36ebf38f309fed518a88946af"
heartbeatListener = {[email protected]}
mainThreadExecutor = {[email protected]}
heartbeatTargets = {[email protected]} size = 1
{[email protected]} "630c15c9-4861-4b41-9c95-92504f458b71" -> {[email protected]}
key = {[email protected]} "630c15c9-4861-4b41-9c95-92504f458b71"
value = {[email protected]}
resourceID = {[email protected]} "630c15c9-4861-4b41-9c95-92504f458b71"
heartbeatTarget = {[email protected]}
scheduledExecutor = {[email protected]}
heartbeatListener = {[email protected]}
heartbeatTimeoutIntervalMs = 50000
futureTimeout = {[email protected]}
state = {[email protected]} "RUNNING"
lastHeartbeat = 0
5.3.1.3 返回到TM
RM會通過RPC再次回到TaskExecutor,其新執行序列如下:
- 首先RPC呼叫到了 [email protected]。
- 然後[email protected]中通過非同步執行呼叫到了establishResourceManagerConnection。這說明TM已經和RM建立了聯絡,所以可以開始監控RM了。
- 然後和RM操作類似,通過resourceManagerHeartbeatManager.monitorTarget 來把RM註冊到自己這裡。
HeartbeatMonitor<O> heartbeatMonitor = heartbeatMonitorFactory.createHeartbeatMonitor
heartbeatTargets.put(resourceID, heartbeatMonitor);
當註冊完成後,其Receiver HM結構如下:
resourceManagerHeartbeatManager = {[email protected]}
heartbeatTimeoutIntervalMs = 50000
ownResourceID = {[email protected]} "96a9b80c-dd97-4b63-9049-afb6662ea3e2"
heartbeatListener = {[email protected]}
mainThreadExecutor = {[email protected]}
heartbeatTargets = {[email protected]} size = 1
{[email protected]} "122fa66685133b11ea26ee1b1a6cef75" -> {[email protected]}
key = {[email protected]} "122fa66685133b11ea26ee1b1a6cef75"
value = {[email protected]}
resourceID = {[email protected]} "122fa66685133b11ea26ee1b1a6cef75"
heartbeatTarget = {[email protected]}
scheduledExecutor = {[email protected]}
heartbeatListener = {[email protected]}
heartbeatTimeoutIntervalMs = 50000
futureTimeout = {[email protected]}
state = {[email protected]} "RUNNING"
lastHeartbeat = 0
5.3.2 TM註冊到 JM
其呼叫基本思路與之前相同,就是TM和JM之間互相註冊一個代表對方的monitor:
JobLeaderListenerImpl ----> establishJobManagerConnection
訊息到了JM中,做如下操作。
registerTaskManager ----> taskManagerHeartbeatManager.monitorTarget
// monitor the task manager as heartbeat target
5.4 心跳過程
在任務提交之後,我們就進入了正常的心跳監控流程。我們依然用 TM 和 RM進行演示。
我們先給出一個流程圖。
* 1. Run in Resouce Manager
*
* HeartbeatManagerSender in RM
* |
* +----> [email protected]
* | //遍歷所有監控的Monitor(Target),逐一在Target上呼叫requestHeartbeat
* |
* +----> [email protected]
* | // 將呼叫具體監控物件的自定義函式
* | // heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload);
* |
* +----> getHeartbeatListener().retrievePayload
* | // 呼叫到[email protected]
* | // 這裡是return null;,因為RM不會是任何人的Receiver
* |
* +----> [email protected]
* | // 呼叫到Target這裡,程式碼在ResourceManager這裡,就是生成Target時候賦值的
* |
* +----> taskExecutorGateway.heartbeatFromResourceManager
* | // 會通過gateway RPC 呼叫到TM,這就是主動對TM發起了心跳請求
* |
* ~~~~~~~~ 這裡是 Akka RPC
* 2. Run in Task Manager
* 現在程式執行序列到達了TM, 主要是 1. 重置TM的Monitor執行緒; 2.返回一些負載資訊;
*
* [email protected]
* |
* +----> resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
* | //開始要呼叫到 Receiver HM in Task Manager
* |
* +----> [email protected] in TM
* | // 在Receiver HM in Task Manager 這裡執行
* |
* +----> [email protected]
* | //reportHeartbeat : 記錄發起請求的這個時間點,然後resetHeartbeatTimeout
* |
* +----> [email protected]
* | // 如果Monitor狀態依然是RUNNING,則取消之前設定的ScheduledFuture。
* | // 重新建立一個ScheduleFuture。因為如果不取消,則之前那個ScheduleFuture執行時
* | // 會呼叫HeartbeatMonitorImpl.run函式,run直接compareAndSet後,通知目標函式
* | // 目前已經超時,即呼叫heartbeatListener.notifyHeartbeatTimeout。
* | // 這裡代表 JM 狀態正常。
* |
* +----> heartbeatListener.reportPayload
* | // 把Target節點的最新的heartbeatPayload通知給heartbeatListener。
* | // heartbeatListerner是外部傳入的,它根據所擁有的節點的心跳記錄做監聽管理。
* |
* +----> heartbeatTarget.receiveHeartbeat(getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin));
* |
* |
* +----> [email protected] in TM
* | // heartbeatTarget.receiveHeartbeat引數呼叫的
* |
* +----> return new TaskExecutorHeartbeatPayload
* |
* |
* +----> receiveHeartbeat in TM
* | // 回到 heartbeatTarget.receiveHeartbeat,這就是TM生成Target的時候的自定義函式
* | // 就是響應一個心跳訊息回給RM
* |
* +----> resourceManagerGateway.heartbeatFromTaskManager
* | // 會通過gateway RPC 呼叫到 ResourcManager
* |
* ~~~~~~~~ 這裡是 Akka RPC
* 3. Run in Resouce Manager
* 現在程式回到了RM, 主要是 1.重置RM的Monitor執行緒;2. 上報收到TaskExecutor的負載資訊
*
* heartbeatFromTaskManager in RM
* |
* |
* +----> taskManagerHeartbeatManager.receiveHeartbeat
* | // 這是個Sender HM
* |
* +----> HeartbeatManagerImpl.receiveHeartbeat
* |
* |
* +----> HeartbeatManagerImpl.reportHeartbeat(heartbeatOrigin);
* |
* |
* +----> heartbeatMonitor.reportHeartbeat();
* | // 這裡就是重置RM 這裡對應的Monitor。在reportHeartbeat重置 JM monitor執行緒的觸發,即cancelTimeout取消註冊時候的超時定時任務,並且註冊下一個超時檢測futureTimeout;這代表TM正常執行。
* |
* +----> heartbeatListener.reportPayload
* | //把Target節點的最新的heartbeatPayload通知給 TaskManagerHeartbeatListener。heartbeatListerner是外部傳入的,它根據所擁有的節點的心跳記錄做監聽管理。
* |
* +----> slotManager.reportSlotStatus(instanceId, payload.getSlotReport());
* | // TaskManagerHeartbeatListener中呼叫,上報收到TaskExecutor的負載資訊
* |
下面是具體文字描述。
5.4.1 ResourceManager主動發起
5.4.1.1 Sender遍歷所有監控的Monitor(Target)
心跳機制是由Sender主動發起的。這裡就是 ResourceManager 的HeartbeatManagerSenderImpl中定時schedual呼叫,這裡會遍歷所有監控的Monitor(Target),逐一在Target上呼叫requestHeartbeat。
// HeartbeatManagerSenderImpl中的程式碼
@Override
public void run() {
if (!stopped) {
for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) {
// 這裡向被監控物件節點發起一次心跳請求,載荷是heartbeatPayLoad,要求被監控物件迴應心跳
requestHeartbeat(heartbeatMonitor);
}
getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
}
}
}
// 執行時候的變數
this = {[email protected]}
heartbeatPeriod = 10000
heartbeatTimeoutIntervalMs = 50000
ownResourceID = {[email protected]} "d349506cae32cadbe99b9f9c49a01c95"
heartbeatListener = {[email protected]}
mainThreadExecutor = {[email protected]}
// 呼叫棧如下
requestHeartbeat:711, ResourceManager$2 (org.apache.flink.runtime.resourcemanager)
requestHeartbeat:702, ResourceManager$2 (org.apache.flink.runtime.resourcemanager)
requestHeartbeat:92, HeartbeatManagerSenderImpl (org.apache.flink.runtime.heartbeat)
run:81, HeartbeatManagerSenderImpl (org.apache.flink.runtime.heartbeat)
call:511, Executors$RunnableAdapter (java.util.concurrent)
run$$$capture:266, FutureTask (java.util.concurrent)
run:-1, FutureTask (java.util.concurrent)
5.4.1.2 Target進行具體操作
具體監控物件 Target 會呼叫自定義的requestHeartbeat。
HeartbeatManagerSenderImpl
private void requestHeartbeat(HeartbeatMonitor<O> heartbeatMonitor) {
O payload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId());
final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget();
// 這裡就是具體監控物件
heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload);
}
heartbeatTarget = {[email protected]}
taskExecutorGateway = {[email protected]} "[email protected]"
this$0 = {[email protected]}
請注意,每一個Target都是由ResourceManager生成的。ResourceManager之前註冊成為Monitor時候就註冊了這個HeartbeatTarget。
這個HeartbeatTarget的定義如下,兩個函式是:
receiveHeartbeat :這個是空,因為RM沒有自己的Sender。
requestHeartbeat :這個針對TM,就是呼叫TM的heartbeatFromResourceManager,當然是通過RPC呼叫。
5.4.1.3 RPC呼叫
會呼叫到ResourceManager定義的函式requestHeartbeat,而requestHeartbeat會通過gateway呼叫到TM,這就是主動對TM發起了心跳請求。
taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>() {
@Override
public void receiveHeartbeat(ResourceID resourceID, Void payload) {
// the ResourceManager will always send heartbeat requests to the TaskManager
}
@Override
public void requestHeartbeat(ResourceID resourceID, Void payload) {
//就是呼叫到這裡
taskExecutorGateway.heartbeatFromResourceManager(resourceID);
}
});
5.4.2 RM通過RPC呼叫TM
通過taskExecutorGateway。心跳程式執行就通過RPC從RM跳躍到了TM。
taskExecutorGateway.heartbeatFromResourceManager
的意義就是:通過RPC呼叫回到TaskExecutor。這個是在TaskExecutorGateway就定義好的。
// TaskExecutor RPC gateway interface.
public interface TaskExecutorGateway extends RpcGateway
TaskExecutor實現了TaskExecutorGateway,所以具體在TaskExecutor內部實現了介面函式。
@Override
public void heartbeatFromResourceManager(ResourceID resourceID) {
//呼叫到了這裡 ...........
resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
}
TM中,resourceManagerHeartbeatManager 定義如下。
/** The heartbeat manager for resource manager in the task manager. */
private final HeartbeatManager<Void, TaskExecutorHeartbeatPayload> resourceManagerHeartbeatManager;
所以下面就是執行TM中的Receiver HM。在這個過程中有兩個處理步驟:
- 呼叫對應HeartbeatMonitor的reportHeartbeat方法,cancelTimeout取消註冊時候的超時定時任務,並且註冊下一個超時檢測futureTimeout;
- 呼叫monitorTarget的receiveHeartbeat方法,也就是會通過rpc呼叫JobMaster的heartbeatFromTaskManager方法返回一些負載資訊;
具體是呼叫 [email protected]。在其中會
- 呼叫[email protected],記錄發起請求的這個時間點,然後resetHeartbeatTimeout。
- 在[email protected]之中,如果Monitor狀態依然是RUNNING,則取消之前設定的ScheduledFuture。重新建立一個ScheduleFuture。因為如果不取消,則之前那個ScheduleFuture執行時會呼叫HeartbeatMonitorImpl.run函式,run直接compareAndSet後,通知目標函式目前已經超時,即呼叫heartbeatListener.notifyHeartbeatTimeout。
- 呼叫 heartbeatListener.reportPayload,把Target節點的最新的heartbeatPayload通知給heartbeatListener。
- 呼叫 heartbeatTarget.receiveHeartbeat(getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin)); 就是響應一個心跳訊息回給RM。
@Override
public void requestHeartbeat(final ResourceID requestOrigin, I heartbeatPayload) {
if (!stopped) {
log.debug("Received heartbeat request from {}.", requestOrigin);
final HeartbeatTarget<O> heartbeatTarget = reportHeartbeat(requestOrigin);
if (heartbeatTarget != null) {
if (heartbeatPayload != null) {
heartbeatListener.reportPayload(requestOrigin, heartbeatPayload);
}
heartbeatTarget.receiveHeartbeat(getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin));
}
}
}
最後會通過resourceManagerGateway.heartbeatFromTaskManager
呼叫到 ResourcManager。
5.4.3 TM 通過RPC回到 RM
JobMaster在接收到rpc請求後呼叫其heartbeatFromTaskManager方法,會呼叫taskManagerHeartbeatManager的receiveHeartbeat方法,在這個過程中同樣有兩個處理步驟:
- 呼叫對應HeartbeatMonitor的reportHeartbeat方法,cancelTimeout取消註冊時候的超時定時任務,並且註冊下一個超時檢測futureTimeout;
- 呼叫TaskManagerHeartbeatListener的reportPayload方法,上報收到TaskExecutor的負載資訊
至此一次完成心跳過程已經完成,會根據heartbeatInterval執行下一次心跳。
5.5 超時處理
5.5.1 TaskManager
首先,在HeartbeatMonitorImpl中,如果超時,會呼叫Listener。
public void run() {
// The heartbeat has timed out if we're in state running
if (state.compareAndSet(State.RUNNING, State.TIMEOUT)) {
heartbeatListener.notifyHeartbeatTimeout(resourceID);
}
}
這就來到了ResourceManagerHeartbeatListener,會嘗試再次連線RM。
private class ResourceManagerHeartbeatListener implements HeartbeatListener<Void, TaskExecutorHeartbeatPayload> {
@Override
public void notifyHeartbeatTimeout(final ResourceID resourceId) {
validateRunsInMainThread();
// first check whether the timeout is still valid
if (establishedResourceManagerConnection != null && establishedResourceManagerConnection.getResourceManagerResourceId().equals(resourceId)) {
reconnectToResourceManager(new TaskManagerException(
String.format("The heartbeat of ResourceManager with id %s timed out.", resourceId)));
} else {
.....
}
}
5.5.2 ResourceManager
RM就直接簡單粗暴,關閉連線。
private class TaskManagerHeartbeatListener implements HeartbeatListener<TaskExecutorHeartbeatPayload, Void> {
@Override
public void notifyHeartbeatTimeout(final ResourceID resourceID) {
validateRunsInMainThread();
closeTaskManagerConnection(
resourceID,
new TimeoutException("The heartbeat of TaskManager with id " + resourceID + " timed out."));
}
}
0x06 解決問題
心跳機制我們講解完了,但是我們最初提到的異常應該如何解決呢?在程式最開始生成環境變數時候,通過設定環境變數的配置即可搞定:
Configuration conf = new Configuration();
conf.setString("heartbeat.timeout", "18000000");
final LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);