1. 程式人生 > >深入瞭解ZooKeeper的Watcher機制的工作機制

深入瞭解ZooKeeper的Watcher機制的工作機制

        Zookeeper提供了分散式資料的釋出/訂閱功能,多個訂閱者同時監聽某一個主題物件,當這個主題物件自身狀態變化時,會通知所有訂閱者,使它們作出相應的處理,而ZooKeeper實現這一功能的根本就是Watcher機制。

        ZooKeeper的Watcher機制主要包括客戶端執行緒、客戶端WatchManager和ZooKeeper伺服器三部分。具體的流程主要是客戶端向ZooKeeper伺服器註冊Watcher的同時,會將Watcher物件儲存在客戶端的WatchManager中。當ZooKeeper伺服器端觸發Watcher事件後,會向客戶端傳送通知,客戶端執行緒從WatchManager中取出對應的Watcher物件來執行回撥邏輯。

1.客戶端註冊Watcher

        首先,註冊一個Watcher物件可以通過建立一個ZooKeeper客戶端物件例項時或者使用ZooKeeper客戶端的getData、getChildren和exist三個介面來向ZooKeeper伺服器註冊Watcher。在註冊Wacher後,客戶端首先會將當前客戶端請求request進行標記,將其設定為Watcher監聽,同時會封裝一個Watcher的註冊資訊WatchRegistration物件,用於暫時儲存資料節點路徑和Watcher的對應關係。

        在ZooKeeper中,Packet可以看作一個最小通訊協議單元,用於進行客戶端與服務端之間的網路傳輸,任何需要傳輸的物件都需要包裝成一個Packet物件。在ZooKeeper原始碼中,當使用非同步操作時,都會出現一行程式碼:

cnxn.queuePacket(h, new ReplyHeader(), request, null, cb, clientPath,
                serverPath, ctx, null);

這說明操作結束之後會呼叫cnxn物件的queuePacket方法:

Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
            Record response, AsyncCallback cb, String clientPath,
            String serverPath, Object ctx, WatchRegistration watchRegistration)
    {
        Packet packet = null;
        synchronized (outgoingQueue) {
            if (h.getType() != OpCode.ping && h.getType() != OpCode.auth) {
                h.setXid(getXid());
            }
            packet = new Packet(h, r, request, response, null,
                    watchRegistration);
            packet.cb = cb;
            packet.ctx = ctx;
            packet.clientPath = clientPath;
            packet.serverPath = serverPath;
            if (!zooKeeper.state.isAlive() || closing) {
                conLossPacket(packet);
            } else {
                // If the client is asking to close the session then
                // mark as closing
                if (h.getType() == OpCode.closeSession) {
                    closing = true;
                }
                outgoingQueue.add(packet);
            }
        }

        sendThread.wakeup();
        return packet;
    }

        從以上可以看出通過這個方法,WatchRegistration會被封裝到Packet中,然後放入傳送佇列中等待客戶端傳送。隨後,ZooKeeper客戶端就會向服務端傳送這個請求,同時等待請求的返回。這時,激活了客戶端的sendThreand執行緒,該執行緒的readResponse方法負責接收來自服務端的響應,然後將Watcher註冊到ZKWatchManager中進行管理。這裡需要注意的是,在底層實際的網路傳輸序列化過程中,並沒有將WatchRegistration物件完全地序列化到底層位元組陣列中去。這樣可以減輕服務端資源開銷以及網路傳輸開銷。

2.服務端處理Watcher

Watcher觸發

1.封裝WatchedEvent

首先將通知狀態、事件型別以及節點路徑封裝成一個WatchedEvent。

 

2.查詢Watcher

根據資料節點的節點路徑從watchTable中取出對應的Watcher。如果沒有找到Watcher,說明沒有任何客戶端在該資料節點上註冊過Watcher,直接退出。如果找到了Watcher,將其提取出來,同時會直接從watchTable和watch2Paths中將其刪除。

注意:WatchManager是ZooKeeper服務端Watcher的管理者,其內部管理的watchTable和watch2Paths兩個儲存結構,其中watchTable是從資料節點路徑的粒度來託管Watcher。watch2Paths是從Watcher的粒度來控制事件觸發需要觸發的資料節點。

3.呼叫process方法來觸發Watcher

依次呼叫步驟2中找出的所有Watcher的process方法,這是ServerCnxn對應的process方法,而該process方法中,主要工作如下:

a.在請求頭中標記"-1",表明當前是一個通知。

b.將WatchedEvent包裝成WatcherEvent,以便進行網路傳輸序列化。

c.向客戶端傳送該通知。

因此可以看出,該方法並不是處理客戶端Watcher真正的業務邏輯,而是藉助當前客戶端連線的ServerCnxn物件來實現對客戶端的WatchedEvent傳遞,真正的客戶端Watcher回撥與業務邏輯執行都在客戶端。

注意:以上服務端只是給客戶端傳送一個通知,內容僅包括說明這是什麼引起的Watcher。而WatchedEvent會被包裝成可序列化的WatcherEvent,可以在網路中進行傳輸。

3.客戶端回撥Watcher

SendThread接收事件通知

void readResponse() throws IOException {
            ByteBufferInputStream bbis = new ByteBufferInputStream(
                    incomingBuffer);
            BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
            ReplyHeader replyHdr = new ReplyHeader();

            replyHdr.deserialize(bbia, "header");
            if (replyHdr.getXid() == -2) {
                // -2 is the xid for pings
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got ping response for sessionid: 0x"
                            + Long.toHexString(sessionId)
                            + " after "
                            + ((System.nanoTime() - lastPingSentNs) / 1000000)
                            + "ms");
                }
                return;
            }
            if (replyHdr.getXid() == -4) {
            	 // -4 is the xid for AuthPacket               
                if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
                    zooKeeper.state = States.AUTH_FAILED;                    
                    eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, 
                            Watcher.Event.KeeperState.AuthFailed, null) );            		            		
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got auth sessionid:0x"
                            + Long.toHexString(sessionId));
                }
                return;
            }
            if (replyHdr.getXid() == -1) {
                // -1 means notification
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got notification sessionid:0x"
                        + Long.toHexString(sessionId));
                }
                WatcherEvent event = new WatcherEvent();
                event.deserialize(bbia, "response");

                // convert from a server path to a client path
                if (chrootPath != null) {
                    String serverPath = event.getPath();
                    if(serverPath.compareTo(chrootPath)==0)
                        event.setPath("/");
                    else
                        event.setPath(serverPath.substring(chrootPath.length()));
                }

                WatchedEvent we = new WatchedEvent(event);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got " + we + " for sessionid 0x"
                            + Long.toHexString(sessionId));
                }

                eventThread.queueEvent( we );
                return;
            }
            if (pendingQueue.size() == 0) {
                throw new IOException("Nothing in the queue, but got "
                        + replyHdr.getXid());
            }
            Packet packet = null;
            synchronized (pendingQueue) {
                packet = pendingQueue.remove();
            }
            /*
             * Since requests are processed in order, we better get a response
             * to the first request!
             */
            try {
                if (packet.header.getXid() != replyHdr.getXid()) {
                    packet.replyHeader.setErr(
                            KeeperException.Code.CONNECTIONLOSS.intValue());
                    throw new IOException("Xid out of order. Got "
                            + replyHdr.getXid() + " expected "
                            + packet.header.getXid());
                }

                packet.replyHeader.setXid(replyHdr.getXid());
                packet.replyHeader.setErr(replyHdr.getErr());
                packet.replyHeader.setZxid(replyHdr.getZxid());
                if (replyHdr.getZxid() > 0) {
                    lastZxid = replyHdr.getZxid();
                }
                if (packet.response != null && replyHdr.getErr() == 0) {
                    packet.response.deserialize(bbia, "response");
                }

                if (LOG.isDebugEnabled()) {
                    LOG.debug("Reading reply sessionid:0x"
                            + Long.toHexString(sessionId) + ", packet:: " + packet);
                }
            } finally {
                finishPacket(packet);
            }
        }

SendThread的response()方法負責接收這個客戶端事件通知。首先會對replyHdr中xid進行判斷,如果是-1,則說明是一個通知型別的響應。作以下處理:

1.反序列化:ZooKeeper客戶端接到請求後,首先將位元組流轉換成WatcherEvent物件。

2.處理chrootPath:如果客戶端設定了chrootPath屬性,那麼需要對服務端傳過來的完整的節點路徑進行chrootPath處理,生成客戶端的一個相對節點路徑。

3.還原WatchedEvent:將WatcherEvent物件轉換成WatchedEvent。

4.回撥Watcher:將WatchedEvent物件交給EventThread執行緒,在下一個輪詢週期中進行Watcher回撥。

EventThread處理事件通知

SendThread通過呼叫EventThread.queueEvent方法將事件傳給EventThread執行緒。queueEvent方法會根據通知事件,從ZKWatchManager中取出所有相關的watcher,並將其放入waitingEvents佇列中,進行串行同步處理。

 

4.總結

        瞭解了Watcher機制的工作機制後,有以下幾點總結比較重要。

1.Watcher具有一次性,無論是服務端還是客戶端,一旦一個Watcher被觸發,ZooKeeper都會將其從相應的儲存中移除。因此Watcher需要反覆註冊。

2.客戶端序列執行:最終Watcher會被放入一個佇列中序列執行。

3.WatchedEvent是ZooKeeper整個Watcher通知機制的最小單元,結構只包含:通知狀態、事件型別和節點路徑。Watcher只會通知客戶端發生的事件,不會說明事件的具體內容。具體內容的邏輯都在客戶端進行回撥獲取。同時,客戶端向服務端註冊Watcher的時候,並不會把真實的Watcher物件傳遞給服務端,僅僅只是在客戶端請求中使用boolean型別屬性進行了標記,同時服務端也僅僅儲存了當前連線的ServerCnxn物件。這樣設計大大減輕了網路開銷和服務端記憶體開銷。

4.WatchedEvent需要先轉換為WatcherEvent序列化進行網路傳輸,然後客戶端獲取之後要反序列化然後再轉換成WatchedEvent進行後續邏輯操作。