深入瞭解ZooKeeper的Watcher機制的工作機制
Zookeeper提供了分散式資料的釋出/訂閱功能,多個訂閱者同時監聽某一個主題物件,當這個主題物件自身狀態變化時,會通知所有訂閱者,使它們作出相應的處理,而ZooKeeper實現這一功能的根本就是Watcher機制。
ZooKeeper的Watcher機制主要包括客戶端執行緒、客戶端WatchManager和ZooKeeper伺服器三部分。具體的流程主要是客戶端向ZooKeeper伺服器註冊Watcher的同時,會將Watcher物件儲存在客戶端的WatchManager中。當ZooKeeper伺服器端觸發Watcher事件後,會向客戶端傳送通知,客戶端執行緒從WatchManager中取出對應的Watcher物件來執行回撥邏輯。
1.客戶端註冊Watcher
首先,註冊一個Watcher物件可以通過建立一個ZooKeeper客戶端物件例項時或者使用ZooKeeper客戶端的getData、getChildren和exist三個介面來向ZooKeeper伺服器註冊Watcher。在註冊Wacher後,客戶端首先會將當前客戶端請求request進行標記,將其設定為Watcher監聽,同時會封裝一個Watcher的註冊資訊WatchRegistration物件,用於暫時儲存資料節點路徑和Watcher的對應關係。
在ZooKeeper中,Packet可以看作一個最小通訊協議單元,用於進行客戶端與服務端之間的網路傳輸,任何需要傳輸的物件都需要包裝成一個Packet物件。在ZooKeeper原始碼中,當使用非同步操作時,都會出現一行程式碼:
cnxn.queuePacket(h, new ReplyHeader(), request, null, cb, clientPath,
serverPath, ctx, null);
這說明操作結束之後會呼叫cnxn物件的queuePacket方法:
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration) { Packet packet = null; synchronized (outgoingQueue) { if (h.getType() != OpCode.ping && h.getType() != OpCode.auth) { h.setXid(getXid()); } packet = new Packet(h, r, request, response, null, watchRegistration); packet.cb = cb; packet.ctx = ctx; packet.clientPath = clientPath; packet.serverPath = serverPath; if (!zooKeeper.state.isAlive() || closing) { conLossPacket(packet); } else { // If the client is asking to close the session then // mark as closing if (h.getType() == OpCode.closeSession) { closing = true; } outgoingQueue.add(packet); } } sendThread.wakeup(); return packet; }
從以上可以看出通過這個方法,WatchRegistration會被封裝到Packet中,然後放入傳送佇列中等待客戶端傳送。隨後,ZooKeeper客戶端就會向服務端傳送這個請求,同時等待請求的返回。這時,激活了客戶端的sendThreand執行緒,該執行緒的readResponse方法負責接收來自服務端的響應,然後將Watcher註冊到ZKWatchManager中進行管理。這裡需要注意的是,在底層實際的網路傳輸序列化過程中,並沒有將WatchRegistration物件完全地序列化到底層位元組陣列中去。這樣可以減輕服務端資源開銷以及網路傳輸開銷。
2.服務端處理Watcher
Watcher觸發
1.封裝WatchedEvent
首先將通知狀態、事件型別以及節點路徑封裝成一個WatchedEvent。
2.查詢Watcher
根據資料節點的節點路徑從watchTable中取出對應的Watcher。如果沒有找到Watcher,說明沒有任何客戶端在該資料節點上註冊過Watcher,直接退出。如果找到了Watcher,將其提取出來,同時會直接從watchTable和watch2Paths中將其刪除。
注意:WatchManager是ZooKeeper服務端Watcher的管理者,其內部管理的watchTable和watch2Paths兩個儲存結構,其中watchTable是從資料節點路徑的粒度來託管Watcher。watch2Paths是從Watcher的粒度來控制事件觸發需要觸發的資料節點。
3.呼叫process方法來觸發Watcher
依次呼叫步驟2中找出的所有Watcher的process方法,這是ServerCnxn對應的process方法,而該process方法中,主要工作如下:
a.在請求頭中標記"-1",表明當前是一個通知。
b.將WatchedEvent包裝成WatcherEvent,以便進行網路傳輸序列化。
c.向客戶端傳送該通知。
因此可以看出,該方法並不是處理客戶端Watcher真正的業務邏輯,而是藉助當前客戶端連線的ServerCnxn物件來實現對客戶端的WatchedEvent傳遞,真正的客戶端Watcher回撥與業務邏輯執行都在客戶端。
注意:以上服務端只是給客戶端傳送一個通知,內容僅包括說明這是什麼引起的Watcher。而WatchedEvent會被包裝成可序列化的WatcherEvent,可以在網路中進行傳輸。
3.客戶端回撥Watcher
SendThread接收事件通知
void readResponse() throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(
incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
replyHdr.deserialize(bbia, "header");
if (replyHdr.getXid() == -2) {
// -2 is the xid for pings
if (LOG.isDebugEnabled()) {
LOG.debug("Got ping response for sessionid: 0x"
+ Long.toHexString(sessionId)
+ " after "
+ ((System.nanoTime() - lastPingSentNs) / 1000000)
+ "ms");
}
return;
}
if (replyHdr.getXid() == -4) {
// -4 is the xid for AuthPacket
if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
zooKeeper.state = States.AUTH_FAILED;
eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null) );
}
if (LOG.isDebugEnabled()) {
LOG.debug("Got auth sessionid:0x"
+ Long.toHexString(sessionId));
}
return;
}
if (replyHdr.getXid() == -1) {
// -1 means notification
if (LOG.isDebugEnabled()) {
LOG.debug("Got notification sessionid:0x"
+ Long.toHexString(sessionId));
}
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");
// convert from a server path to a client path
if (chrootPath != null) {
String serverPath = event.getPath();
if(serverPath.compareTo(chrootPath)==0)
event.setPath("/");
else
event.setPath(serverPath.substring(chrootPath.length()));
}
WatchedEvent we = new WatchedEvent(event);
if (LOG.isDebugEnabled()) {
LOG.debug("Got " + we + " for sessionid 0x"
+ Long.toHexString(sessionId));
}
eventThread.queueEvent( we );
return;
}
if (pendingQueue.size() == 0) {
throw new IOException("Nothing in the queue, but got "
+ replyHdr.getXid());
}
Packet packet = null;
synchronized (pendingQueue) {
packet = pendingQueue.remove();
}
/*
* Since requests are processed in order, we better get a response
* to the first request!
*/
try {
if (packet.header.getXid() != replyHdr.getXid()) {
packet.replyHeader.setErr(
KeeperException.Code.CONNECTIONLOSS.intValue());
throw new IOException("Xid out of order. Got "
+ replyHdr.getXid() + " expected "
+ packet.header.getXid());
}
packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if (replyHdr.getZxid() > 0) {
lastZxid = replyHdr.getZxid();
}
if (packet.response != null && replyHdr.getErr() == 0) {
packet.response.deserialize(bbia, "response");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Reading reply sessionid:0x"
+ Long.toHexString(sessionId) + ", packet:: " + packet);
}
} finally {
finishPacket(packet);
}
}
SendThread的response()方法負責接收這個客戶端事件通知。首先會對replyHdr中xid進行判斷,如果是-1,則說明是一個通知型別的響應。作以下處理:
1.反序列化:ZooKeeper客戶端接到請求後,首先將位元組流轉換成WatcherEvent物件。
2.處理chrootPath:如果客戶端設定了chrootPath屬性,那麼需要對服務端傳過來的完整的節點路徑進行chrootPath處理,生成客戶端的一個相對節點路徑。
3.還原WatchedEvent:將WatcherEvent物件轉換成WatchedEvent。
4.回撥Watcher:將WatchedEvent物件交給EventThread執行緒,在下一個輪詢週期中進行Watcher回撥。
EventThread處理事件通知
SendThread通過呼叫EventThread.queueEvent方法將事件傳給EventThread執行緒。queueEvent方法會根據通知事件,從ZKWatchManager中取出所有相關的watcher,並將其放入waitingEvents佇列中,進行串行同步處理。
4.總結
瞭解了Watcher機制的工作機制後,有以下幾點總結比較重要。
1.Watcher具有一次性,無論是服務端還是客戶端,一旦一個Watcher被觸發,ZooKeeper都會將其從相應的儲存中移除。因此Watcher需要反覆註冊。
2.客戶端序列執行:最終Watcher會被放入一個佇列中序列執行。
3.WatchedEvent是ZooKeeper整個Watcher通知機制的最小單元,結構只包含:通知狀態、事件型別和節點路徑。Watcher只會通知客戶端發生的事件,不會說明事件的具體內容。具體內容的邏輯都在客戶端進行回撥獲取。同時,客戶端向服務端註冊Watcher的時候,並不會把真實的Watcher物件傳遞給服務端,僅僅只是在客戶端請求中使用boolean型別屬性進行了標記,同時服務端也僅僅儲存了當前連線的ServerCnxn物件。這樣設計大大減輕了網路開銷和服務端記憶體開銷。
4.WatchedEvent需要先轉換為WatcherEvent序列化進行網路傳輸,然後客戶端獲取之後要反序列化然後再轉換成WatchedEvent進行後續邏輯操作。