Zookeeper原始碼閱讀(十一) ZK Client-Server(3)
前言
上一篇講了client端和server建立連線的初始化和建立過程,這兩個部分主要是和sendthread緊緊相關的,這一篇講一下響應階段,響應階段和sendthread,eventthread都有一定的關係。
獲取響應
其實獲取響應對於sendthread來說就是readresponse方法,在上一篇已經詳細講過了,主要的流程就是:
- 反序列化response;
- 根據回覆頭來處理,如果是ping,auth和sasl直接處理後返回,不會加入waitingevent佇列;
- 如果是server的通知表示是event,加入佇列
- 處理pendingqueue裡已經發送的packet。
但是這裡關於ping的處理有點需要再說下。
if (replyHdr.getXid() == -2) {//ping的response,只要能收到就表示能ping通 // -2 is the xid for pings if (LOG.isDebugEnabled()) { LOG.debug("Got ping response for sessionid: 0x" + Long.toHexString(sessionId) + " after " + ((System.nanoTime() - lastPingSentNs) / 1000000) + "ms");//打log就ok } return; } if (replyHdr.getXid() == -4) {//auth的返回頭 // -4 is the xid for AuthPacket if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {//是否是驗證失敗,如果失敗了就要加入到等待佇列裡讓eventthread處理 state = States.AUTH_FAILED; eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null) ); } if (LOG.isDebugEnabled()) { LOG.debug("Got auth sessionid:0x" + Long.toHexString(sessionId)); //打log } return; }
如果之前說的,ping和auth的request不會加入pendingqueue裡,收到回覆後直接處理了。這裡有個地方強調下,client端檢測心跳的機制其實readresponse這裡沒有用,這裡只是收個回覆打個log。真正控制是不是斷開連線的地方就在上一篇發請求的那裡,通過和上一次ping的時間來判斷是否過期。那怎麼去知道上一次ping通是啥時候呢,在client負責和server連線的doIO方法裡有個updateLastHeard()方法,收到server的訊息後便會執行這個方法。
void updateLastHeard() { this.lastHeard = now; }
可以看到這個方法就是更新上次收到的時間的。
to = readTimeout - clientCnxnSocket.getIdleRecv();
to就是預設的readtimeout和當前請求和上一次請求的間隔時間的差值。如果處於連線狀態,則利用和上次ping時間的間隔的比較來判斷是否超時,如果超時就會丟擲異常。其實新的問題就來了,server怎麼知道自己和server處於連線還是斷開連線的狀態呢,這個後面講session再專門說一下。
加入等待佇列的event
經過前面的分析和總結可以知道sendthread把Packet傳送到server後會把部分Packet加入到pendingqueue中等待,而接收到server的回覆後會把event加入到等待佇列中處理,eventthread的主要功能就是處理這些event。首先總結下哪些event會被加入到等待佇列中。
- auth驗證失敗的event。
// -4 is the xid for AuthPacket
if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) { //ClientCnxn 756
state = States.AUTH_FAILED;
eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null) );
}
- watcher被觸發的event
eventThread.queueEvent( we );//ClientCnxn 794
- sasl驗證失敗的event
// An authentication error occurred when the SASL client tried to initialize:
// for Kerberos this means that the client failed to authenticate with the KDC.
// This is different from an authentication error that occurs during communication
// with the Zookeeper server, which is handled below.
LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without "
+ "SASL authentication, if Zookeeper server allows it.");
eventThread.queueEvent(new WatchedEvent(//ClientCnxn 1012
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null));
- sasl驗證的event
eventThread.queueEvent(new WatchedEvent(//ClientCnxn 1094
Watcher.Event.EventType.None,
authState,null));
- 斷開連線的event
eventThread.queueEvent(new WatchedEvent(//ClientCnxn 1175 1188
Event.EventType.None,
Event.KeeperState.Disconnected,
null));
- session過期的event
eventThread.queueEvent(new WatchedEvent(//ClientCnxn 1280
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Expired, null));
- 只讀連線的event
KeeperState eventState = (isRO) ?
KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
eventThread.queueEvent(new WatchedEvent( //ClientCnxn 1309
Watcher.Event.EventType.None,
eventState, null));
這7中event某種程度上是屬於server通知的訊息,所以必須要eventthread去處理,但是實際上sendthread很多發出去的Packet(如create,getdata等等)也會被加入到等待佇列中,但是這個是有限制的,只有在非同步模式下才會被加入到等待佇列中。
EventThread
EventThread是客戶端ClientCnxn內部的一個事件處理執行緒,負責客戶端的事件處理,並觸發客戶端註冊的Watcher監聽。EventThread中的watingEvents佇列用於臨時存放那些需要被觸發的Object,包括客戶端註冊的Watcher和非同步介面中註冊的回撥器AsyncCallback。同時,EventThread會不斷地從watingEvents中取出Object,識別具體型別(Watcher或AsyncCallback),並分別呼叫process和processResult介面方法來實現對事件的觸發和回撥。
程式碼結構:
域:
private final LinkedBlockingQueue<Object> waitingEvents =
new LinkedBlockingQueue<Object>();
/** This is really the queued session state until the event
* thread actually processes the event and hands it to the watcher.
* But for all intents and purposes this is the state.
*/
private volatile KeeperState sessionState = KeeperState.Disconnected;
private volatile boolean wasKilled = false;
private volatile boolean isRunning = false;
這裡最重要的就是waitingEvents,這個FIFO佇列就是之前說的等待佇列。下面說下佇列處理的各種型別的資料。
server的notification加入佇列
public void queueEvent(WatchedEvent event) {
if (event.getType() == EventType.None
&& sessionState == event.getState()) { //根據事件型別和狀態來判斷,如果事件型別為None且session狀態沒有變化就不加入佇列中
return;
}
sessionState = event.getState(); //獲取session狀態
// materialize the watchers based on the event
WatcherSetEventPair pair = new WatcherSetEventPair(//構建路徑和事件(連線狀態和event狀態)的關係,之前介紹過
watcher.materialize(event.getState(), event.getType(),
event.getPath()),
event);//根據事件型別做對應的處理
// queue the pair (watch set & event) for later processing
waitingEvents.add(pair);//加入佇列,等待處理
}
非同步請求的Packet加入佇列
private void finishPacket(Packet p) {
if (p.watchRegistration != null) {
p.watchRegistration.register(p.replyHeader.getErr());
}
if (p.cb == null) {//同步模式
synchronized (p) {
p.finished = true;
p.notifyAll();//如果呼叫的是同步的介面,在submitRequest時會wait住,而且同步的介面沒有回撥方法,所以不會加入佇列中。
//submitRequest裡wait住的部分
//synchronized (packet) {
// while (!packet.finished) {
// packet.wait();
// }
//}
}
} else {//非同步
p.finished = true;
eventThread.queuePacket(p);//非同步介面時把packet加入等待佇列
}
}
上面的程式碼解釋了為什麼呼叫非同步接口才會把packet加入佇列。
public void queuePacket(Packet packet) {
if (wasKilled) {//eventThread是否被kill
synchronized (waitingEvents) {
if (isRunning) waitingEvents.add(packet);//正在跑就加入佇列
else processEvent(packet);//如果執行緒沒跑了就直接處理掉
}
} else {
waitingEvents.add(packet);//加入對等佇列
}
}
這裡有兩個變數wasKilled和isRunning解釋下,它們的操作是在eventthread的run方法中被處理的。
@Override
public void run() {
try {
isRunning = true;
while (true) {
Object event = waitingEvents.take();//取出佇列第一個元素
if (event == eventOfDeath) {//eventOfDeath表示eventthread需要被kill
wasKilled = true;//設定標誌,但是這裡並沒有被真正kill,表示要被kill
} else {
processEvent(event);//不是death標誌就處理
}
if (wasKilled)
synchronized (waitingEvents) {//如果要被kill了,直到佇列被處理完了才會把isRunning狀態設定為false
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
} catch (InterruptedException e) {
LOG.error("Event thread exiting due to interruption", e);
}
LOG.info("EventThread shut down for session: 0x{}",
Long.toHexString(getSessionId()));
}
對於event的處理都在processevent方法中,這個方法主要處理了watcher被觸發後的執行和各個非同步介面的回撥函式這兩部分的內容。
private void processEvent(Object event) {
try {
if (event instanceof WatcherSetEventPair) {//watcher型別
// each watcher will process the event
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {
watcher.process(pair.event);//執行watcher的回撥
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
}
} else {//非同步介面的回撥
Packet p = (Packet) event;
int rc = 0;
String clientPath = p.clientPath;
if (p.replyHeader.getErr() != 0) {
rc = p.replyHeader.getErr();
}
if (p.cb == null) {
LOG.warn("Somehow a null cb got to EventThread!");
} else if (p.response instanceof ExistsResponse
|| p.response instanceof SetDataResponse
|| p.response instanceof SetACLResponse) {
...
} else if (p.response instanceof GetDataResponse) {
...
} else if (p.response instanceof GetACLResponse) {
...
} else if (p.response instanceof GetChildrenResponse) {
...
} else if (p.response instanceof GetChildren2Response) {
...
} else if (p.response instanceof CreateResponse) {
...
} else if (p.response instanceof MultiResponse) {
...
} else if (p.cb instanceof VoidCallback) {
...
}
}
} catch (Throwable t) {
LOG.error("Caught unexpected throwable", t);
}
}
根據函式名可以清楚地知道各個非同步介面的回撥都在這裡執行了。
eventThread的death加入佇列
public void queueEventOfDeath() {
waitingEvents.add(eventOfDeath);
}
eventthread要被kill只有兩種情況:
- client和server建立連線沒有連線上或者連線斷開。
void onConnected(int _negotiatedSessionTimeout, long _sessionId,
byte[] _sessionPasswd, boolean isRO) throws IOException {
negotiatedSessionTimeout = _negotiatedSessionTimeout;//連線的timeout
if (negotiatedSessionTimeout <= 0) {//沒有連線上server
state = States.CLOSED;
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Expired, null));
eventThread.queueEventOfDeath();//kill eventthread
- 客戶端和server斷開連線時
/**
* Shutdown the send/event threads. This method should not be called
* directly - rather it should be called as part of close operation. This
* method is primarily here to allow the tests to verify disconnection
* behavior.
*/
public void disconnect() {
if (LOG.isDebugEnabled()) {
LOG.debug("Disconnecting client for session: 0x"
+ Long.toHexString(getSessionId()));//log
}
sendThread.close();//sendthread關閉
eventThread.queueEventOfDeath();//eventthread關閉
}
思考
zk的session機制
參考
《從Paxos到Zookeeper》
http://www.cnblogs.com/leesf456/p/6098255.html
https://www.jianshu.com/p/4a1902a44439