Yarn ResourceManager進行主從切換時發生腦裂原因分析
Brain Split 事故
時間先後順序:
ResourceManager同zookeeper通訊,發生異常:
2018-10-19 09:17:49,981 INFO org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore: Error storing info for AMRMTokenSecretManager java.io.IOException: Wait for ZKClient creation timed out at org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore$ZKAction.runWithCheck(ZKRMStateStore.java:1119) at org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore$ZKAction.runWithRetries(ZKRMStateStore.java:1155) at org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore.doStoreMultiWithRetries(ZKRMStateStore.java:947) at org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore.doStoreMultiWithRetries(ZKRMStateStore.java:961) at org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore.setDataWithRetries(ZKRMStateStore.java:1002) at org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore.storeOrUpdateAMRMTokenSecretManagerState(ZKRMStateStore.java:1244) at org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager.rollMasterKey(AMRMTokenSecretManager.java:148) at org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager$MasterKeyRoller.run(AMRMTokenSecretManager.java:134) at java.util.TimerThread.mainLoop(Timer.java:555) at java.util.TimerThread.run(Timer.java:505)
這是程式碼發生的位置:
public synchronized void storeOrUpdateAMRMTokenSecretManagerState( AMRMTokenSecretManagerState amrmTokenSecretManagerState, boolean isUpdate) { AMRMTokenSecretManagerState data = AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState); byte[] stateData = data.getProto().toByteArray(); try { setDataWithRetries(amrmTokenSecretManagerRoot, stateData, -1); } catch (Exception ex) { //在這裡丟擲異常 LOG.info("Error storing info for AMRMTokenSecretManager", ex); notifyStoreOperationFailed(ex); } }
呼叫notifyStoreOperationFailed
處理異常:
/** * This method is called to notify the ResourceManager that the store * operation has failed. * @param failureCause the exception due to which the operation failed */ protected void notifyStoreOperationFailed(Exception failureCause) { if (failureCause instanceof StoreFencedException) { //如果是StoreFencedException Thread standByTransitionThread = new Thread(new StandByTransitionThread()); standByTransitionThread.setName("StandByTransitionThread Handler"); standByTransitionThread.start(); } else { //如果不是StoreFencedException, 異常無法處理,屬於嚴重異常 rmDispatcher.getEventHandler().handle( new RMFatalEvent(RMFatalEventType.STATE_STORE_OP_FAILED, failureCause)); } }
注意,從日誌看到,上面的異常是IOException
,不是StoreFencedException
,因此走else
分支,生成一個RMFatalEvent事件,交個ResourceManager的dispatcher rmDispatcher去處理;
這裡的handle()
方法並沒有真正的處理事件,只是把這個事件放到AsyncDispatcher
的eventQueue
中:
class GenericEventHandler implements EventHandler<Event> {
public void handle(Event event) {
if (blockNewEvents) {
return;
}
drained = false;
/* all this method does is enqueue all the events onto the queue */
int qSize = eventQueue.size();
if (qSize !=0 && qSize %1000 == 0) {
LOG.info("Size of event-queue is " + qSize);
}
int remCapacity = eventQueue.remainingCapacity();
if (remCapacity < 1000) {
LOG.warn("Very low remaining capacity in the event-queue: "
+ remCapacity);
}
try {
eventQueue.put(event); //放入eventQueue
} catch (InterruptedException e) {
if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", e);
}
// Need to reset drained flag to true if event queue is empty,
// otherwise dispatcher will hang on stop.
drained = eventQueue.isEmpty();
throw new YarnRuntimeException(e);
}
};
}
然後,AsyncDispatcher
建立的一個獨立的執行緒會負責從這個eventQueue中取出事件進行處理:
Runnable createThread() {
return new Runnable() {
@Override
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
drained = eventQueue.isEmpty(); //標記事件佇列是否為空
// blockNewEvents is only set when dispatcher is draining to stop,
// adding this check is to avoid the overhead of acquiring the lock
// and calling notify every time in the normal run of the loop.
if (blockNewEvents) {
synchronized (waitForDrained) {
if (drained) {
waitForDrained.notify();
}
}
}
Event event;
try {
event = eventQueue.take(); //阻塞,直到取出事件
} catch(InterruptedException ie) {
if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", ie);
}
return;
}
if (event != null) {
dispatch(event); //對事件進行處理
}
}
}
};
}
這個event的型別是RMFatalEventType.STATE_STORE_OP_FAILED
。這個eventType對應的Handler是ResourceManager.RMFatalEventDispatcher
,從堆疊可以看到,當前jvm正處在這個dispatch()
方法中:
"AsyncDispatcher event handler" #62 prio=5 os_prio=0 tid=0x00007fdb7ca8e000 nid=0x3084 in Object.wait() [0x00007fcf19fc2000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1249)
- locked <0x00007fcfd48d9da0> (a org.apache.hadoop.util.ShutdownHookManager$1)
at java.lang.Thread.join(Thread.java:1323)
at java.lang.ApplicationShutdownHooks.runHooks(ApplicationShutdownHooks.java:106)
at java.lang.ApplicationShutdownHooks$1.run(ApplicationShutdownHooks.java:46)
at java.lang.Shutdown.runHooks(Shutdown.java:123)
at java.lang.Shutdown.sequence(Shutdown.java:167)
at java.lang.Shutdown.exit(Shutdown.java:212)
- locked <0x00007fcfd48a01e8> (a java.lang.Class for java.lang.Shutdown)
at java.lang.Runtime.exit(Runtime.java:109)
at java.lang.System.exit(System.java:971)
at org.apache.hadoop.util.ExitUtil.terminate(ExitUtil.java:133)
at org.apache.hadoop.yarn.server.resourcemanager.ResourceManager$RMFatalEventDispatcher.handle(ResourceManager.java:752)
at org.apache.hadoop.yarn.server.resourcemanager.ResourceManager$RMFatalEventDispatcher.handle(ResourceManager.java:743)
at org.apache.hadoop.yarn.event.AsyncDispatcher.dispatch(AsyncDispatcher.java:176)
at org.apache.hadoop.yarn.event.AsyncDispatcher$1.run(AsyncDispatcher.java:108)
at java.lang.Thread.run(Thread.java:745)
我們看它的RMFatalEventDispatcher.handle()
方法:
@Private
public static class RMFatalEventDispatcher
implements EventHandler<RMFatalEvent> {
@Override
public void handle(RMFatalEvent event) {
LOG.fatal("Received a " + RMFatalEvent.class.getName() + " of type " +
event.getType().name() + ". Cause:\n" + event.getCause());
ExitUtil.terminate(1, event.getCause());
}
}
這裡有一行fatal日誌,從RM的日誌裡也找到了:
2018-10-19 09:17:49,983 FATAL org.apache.hadoop.yarn.server.resourcemanager.ResourceManager: Received a org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent of type STATE_STORE_OP_FAILED. Cause:
這裡呼叫了
ExitUtil.terminate(1, event.getCause());
/**
* Terminate the current process. Note that terminate is the *only* method
* that should be used to terminate the daemon processes.
*
* @param status
* exit code
* @param msg
* message used to create the {@code ExitException}
* @throws ExitException
* if System.exit is disabled for test purposes
*/
public static void terminate(int status, String msg) throws ExitException {
LOG.info("Exiting with status " + status);
if (systemExitDisabled) {
ExitException ee = new ExitException(status, msg);
LOG.fatal("Terminate called", ee);
if (null == firstExitException) {
firstExitException = ee;
}
throw ee;
}
System.exit(status);
}
我們從日誌看到這行日誌,證明執行路徑的確如此:
2018-10-19 09:17:49,984 INFO org.apache.hadoop.util.ExitUtil: Exiting with status 1
這會導致ResourceManager
啟動的時候註冊的Shutdown Hook
被呼叫:
從當時的堆疊裡面可以看到,ShutDownHook
的確被呼叫:
"Thread-2" #25 prio=5 os_prio=0 tid=0x00007fdb5c7f2800 nid=0x1d5d waiting for monitor entry [0x00007fcf0a3c2000]
java.lang.Thread.State: BLOCKED (on object monitor)
at org.apache.hadoop.ha.ActiveStandbyElector.quitElection(ActiveStandbyElector.java:355)
- waiting to lock <0x00007fcfd3a4a450> (a org.apache.hadoop.ha.ActiveStandbyElector)
at org.apache.hadoop.yarn.server.resourcemanager.EmbeddedElectorService.serviceStop(EmbeddedElectorService.java:113)
at org.apache.hadoop.service.AbstractService.stop(AbstractService.java:221)
- locked <0x00007fcfd40795c8> (a java.lang.Object)
at org.apache.hadoop.service.ServiceOperations.stop(ServiceOperations.java:52)
at org.apache.hadoop.service.ServiceOperations.stopQuietly(ServiceOperations.java:80)
at org.apache.hadoop.service.CompositeService.stop(CompositeService.java:157)
at org.apache.hadoop.service.CompositeService.serviceStop(CompositeService.java:131)
at org.apache.hadoop.yarn.server.resourcemanager.AdminService.serviceStop(AdminService.java:155)
at org.apache.hadoop.service.AbstractService.stop(AbstractService.java:221)
- locked <0x00007fcfd4079538> (a java.lang.Object)
at org.apache.hadoop.service.ServiceOperations.stop(ServiceOperations.java:52)
at org.apache.hadoop.service.ServiceOperations.stopQuietly(ServiceOperations.java:80)
at org.apache.hadoop.service.CompositeService.stop(CompositeService.java:157)
at org.apache.hadoop.service.CompositeService.serviceStop(CompositeService.java:131)
at org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.serviceStop(ResourceManager.java:1121)
at org.apache.hadoop.service.AbstractService.stop(AbstractService.java:221)
- locked <0x00007fcfd407c228> (a java.lang.Object)
at org.apache.hadoop.service.ServiceOperations.stop(ServiceOperations.java:52)
at org.apache.hadoop.service.ServiceOperations.stopQuietly(ServiceOperations.java:80)
at org.apache.hadoop.service.ServiceOperations.stopQuietly(ServiceOperations.java:65)
at org.apache.hadoop.service.CompositeService$CompositeServiceShutdownHook.run(CompositeService.java:183)
at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
但是這個執行緒被阻塞了,阻塞的原因是在等待ActiveStandbyElector
的物件鎖,因為quitElection()
方法是一個synchronized
方法:
public synchronized void quitElection(boolean needFence) {
LOG.info("Yielding from election");
if (!needFence && state == State.ACTIVE) {
// If active is gracefully going back to standby mode, remove
// our permanent znode so no one fences us.
tryDeleteOwnBreadCrumbNode();
}
reset();
wantToBeInElection = false;
}
那麼,問題的關鍵是
- 誰把這把鎖搶走了?
- 搶到這把鎖的執行緒為什麼一直執行不完?
我們從堆疊可以看到誰已經拿到了這把鎖:
"main-EventThread" #897510 daemon prio=5 os_prio=0 tid=0x00007fdb65be1800 nid=0x5d01 waiting for monitor entry [0x00007fcf12f41000]
java.lang.Thread.State: BLOCKED (on object monitor)
at org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.transitionToStandby(ResourceManager.java:1066)
- waiting to lock <0x00007fcfd3b299e0> (a org.apache.hadoop.yarn.server.resourcemanager.ResourceManager)
at org.apache.hadoop.yarn.server.resourcemanager.AdminService.transitionToStandby(AdminService.java:322)
- locked <0x00007fcfd3a7b930> (a org.apache.hadoop.yarn.server.resourcemanager.AdminService)
at org.apache.hadoop.yarn.server.resourcemanager.EmbeddedElectorService.becomeStandby(EmbeddedElectorService.java:131)
at org.apache.hadoop.ha.ActiveStandbyElector.becomeStandby(ActiveStandbyElector.java:918)
at org.apache.hadoop.ha.ActiveStandbyElector.processResult(ActiveStandbyElector.java:429)
- locked <0x00007fcfd3a4a450> (a org.apache.hadoop.ha.ActiveStandbyElector)
at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:605)
at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:498)
可以看到這個執行緒已經拿到了這把物件鎖,因為processResult()
方法也是需要這個物件鎖的,這個執行緒已經執行進來,說明已經拿到了鎖:
/**
* interface implementation of Zookeeper callback for create
*/
@Override
public synchronized void processResult(int rc, String path, Object ctx,
String name) {
if (isStaleClient(ctx)) return;
LOG.debug("CreateNode result: " + rc + " for path: " + path
+ " connectionState: " + zkConnectionState +
" for " + this);
...
同時可以看到,這個執行緒同樣處於BLOCKED模式,從程式碼看到,它在等ResourceManager.class
的物件鎖,這是因為ResourceManager.transitionToStandby()
是一個synchronized
方法:
synchronized void transitionToStandby(boolean initialize)
throws Exception {
if (rmContext.getHAServiceState() ==
HAServiceProtocol.HAServiceState.STANDBY) {
LOG.info("Already in standby state");
return;
}
LOG.info("Transitioning to standby state");
if (rmContext.getHAServiceState() ==
HAServiceProtocol.HAServiceState.ACTIVE) {
stopActiveServices();
reinitialize(initialize);
}
rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY);
LOG.info("Transitioned to standby state");
}
那麼,同樣的問題,誰已經拿到了ResourceManager
的鎖並且沒有把鎖釋放呢?這也可以從堆疊裡面找出來:
"StandByTransitionThread Handler" #897459 daemon prio=5 os_prio=0 tid=0x0000000004cd0000 nid=0x5cce in Object.wait() [0x00007fcf191b9000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at org.apache.hadoop.yarn.event.AsyncDispatcher.serviceStop(AsyncDispatcher.java:143)
- locked <0x00007fcfd444f200> (a java.lang.Object)
at org.apache.hadoop.service.AbstractService.stop(AbstractService.java:221)
- locked <0x00007fcfd444f1e8> (a java.lang.Object)
at org.apache.hadoop.service.ServiceOperations.stop(ServiceOperations.java:52)
at org.apache.hadoop.service.ServiceOperations.stopQuietly(ServiceOperations.java:80)
at org.apache.hadoop.service.CompositeService.stop(CompositeService.java:157)
at org.apache.hadoop.service.CompositeService.serviceStop(CompositeService.java:131)
at org.apache.hadoop.service.AbstractService.stop(AbstractService.java:221)
- locked <0x00007fcfd444f1a0> (a java.lang.Object)
at org.apache.hadoop.service.ServiceOperations.stop(ServiceOperations.java:52)
at org.apache.hadoop.service.ServiceOperations.stopQuietly(ServiceOperations.java:80)
at org.apache.hadoop.service.CompositeService.stop(CompositeService.java:157)
at org.apache.hadoop.service.CompositeService.serviceStop(CompositeService.java:131)
at org.apache.hadoop.service.AbstractService.stop(AbstractService.java:221)
- locked <0x00007fcfd4430550> (a java.lang.Object)
at org.apache.hadoop.service.ServiceOperations.stop(ServiceOperations.java:52)
at org.apache.hadoop.service.ServiceOperations.stopQuietly(ServiceOperations.java:80)
at org.apache.hadoop.service.CompositeService.stop(CompositeService.java:157)
at org.apache.hadoop.service.CompositeService.serviceStop(CompositeService.java:131)
at org.apache.hadoop.yarn.server.resourcemanager.ResourceManager$RMActiveServices.serviceStop(ResourceManager.java:612)
at org.apache.hadoop.service.AbstractService.stop(AbstractService.java:221)
- locked <0x00007fcfd42ca768> (a java.lang.Object)
at org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.stopActiveServices(ResourceManager.java:1020)
at org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.transitionToStandby(ResourceManager.java:1075)
- locked <0x00007fcfd3b299e0> (a org.apache.hadoop.yarn.server.resourcemanager.ResourceManager)
at org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.handleTransitionToStandBy(ResourceManager.java:761)
at org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore$StandByTransitionThread.run(RMStateStore.java:722)
at java.lang.Thread.run(Thread.java:745)
可以看到,這個執行緒已經進入了ResourceManager.handleTransitionToStandBy()
方法,並且這個方法是一個synchronized
方法,因此這個執行緒已經拿到了ResourceManager.class
的物件鎖,這從日誌也可以看出來:
2018-10-19 02:35:29,518 INFO org.apache.hadoop.yarn.server.resourcemanager.ResourceManager: Transitioning to standby state
那麼, 為什麼這個執行緒拿到了ResourceManager.class
這個物件鎖,卻一直沒有ResourceManager.class
的物件鎖呢?從這個執行緒堆疊可以看到它的狀態是TIMED_WAITING
,明顯是在sleep
:
@Override
protected void serviceStop() throws Exception {
if (drainEventsOnStop) {
blockNewEvents = true;
LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
synchronized (waitForDrained) {
while (!drained && eventHandlingThread.isAlive()) {
waitForDrained.wait(1000);//反覆迴圈等待,只要drained=true或者eventHandlingThread.isAlive == false,就跳出迴圈
LOG.info("n for AsyncDispatcher to drain. Thread state is :" +
eventHandlingThread.getState());
}
}
}
stopped = true;
if (eventHandlingThread != null) {
eventHandlingThread.interrupt();
try {
eventHandlingThread.join();
} catch (InterruptedException ie) {
LOG.warn("Interrupted Exception while stopping", ie);
}
}
// stop all the components
super.serviceStop();
}
一直處於TIMED_WAITING
,即sleep()
被反覆呼叫, 說明這個while (!drained && eventHandlingThread.isAlive()) {
一直返回true
形成無限迴圈,也就是說:
drained = false
並且 eventHandlingThread.isAlive() = true
這兩個謂語分別進行分析:
-
為什麼
drained = false
會成立呢?drained代表隊列是否已經清空, 為false表示沒有清空,為true代表已經清空。 從上面已經列出的AsyncDispatcher.createThread()可以看到, 發生的時序是:
-
佇列裡面還有最後一個事件,就是上面的那個
RMFatalEvent
事件,所以drained = eventQueue.isEmpty();
導致drained
為false
-
通過
event = eventQueue.take();
取出事件; -
呼叫dispatch()來對這個事件進行處理;
注意,雖然此時佇列已經為空了,但是
drained
其實還沒有修改過來,需要等到下一次while到來才行。但是很可以,這個dispatch()方法被blocked了,所以這個drained一直是false
-
-
為什麼
eventHandlingThread.isAlive() = true
- 這個
eventHandlingThread
就是當前的這個createThread
執行緒,根據上文講解,這個方法當前正處在dispatch()
方法中,並且由於是WAITING,所以當然是alive的;
- 這個
綜上所述:
問題發生的原因是:
- 由於zk連線發生無法處理的嚴重錯誤IOException,觸發了
RMFatalEvent
, - 這個Event對應的處理器是
ResourceManager.RMFatalEventDispatcher
,它的處理方式很粗暴,呼叫System.exit(1),直接退出jvm; - 這個System.exit(1)導致JVM的shutdown hook被呼叫
- 這個ShutDown Hook會呼叫
ResourceManager.serviceStop()
來停掉ResourceManager
的相關服務,但是在獲取ActiveStandbyElector.class
的物件鎖的時候被阻塞,說明已經有其它執行緒獲取了這個鎖 ActiveStandbyElector.class
已經被另外一個執行緒拿到,這個執行緒是一個Zookeeper的節點監聽器的回撥,這個回調發生得比較早,所以拿到了ActiveStandbyElector.class
物件鎖,開始切換到standby模式,但是切換的時候需要呼叫ResourceManager.transitionToStandby
,這是一個synchronized方法,意味著需要拿到ResourceManager.class
的物件鎖,但是此時這個物件鎖已經被拿走了StandByTransitionThread Handler
,並且拿走這把鎖的執行緒通過迴圈反覆等待eventQueue已經清空了;
從上面客戶看到,整個過程抽象為這樣:
- 執行緒A 怎麼都拿不到鎖L,所以一直hang住,判斷條件drain 始終未false
- 已經拿到鎖L的執行緒,通過while迴圈反覆判斷並等待某個判斷條件drained為true
這樣就發生了相互等待,永遠無法執行結束!