1. 程式人生 > >ZooKeeper客戶端與服務端的事件watcher原始碼閱讀

ZooKeeper客戶端與服務端的事件watcher原始碼閱讀

我是懷著無比激動的心情寫這篇部落格的,如果對您有幫助,歡迎給我點個贊

watcher存在的必要性

舉個特容易懂的例子: 假如我的專案是基於dubbo+zookeeper搭建的分散式專案, 我有三個功能相同的服務提供者,用zookeeper當成註冊中心,我的三個專案得註冊進zookeeper才能對外暴露服務,但是問題來了,寫的java程式碼怎麼才能註冊進zookeeper呢?當然加入依賴,寫好配置檔案再啟動就成了,這時,這三個服務體提供者就是zookeeper的客戶端了,zookeeper的客戶端不止一個,我選擇了哪個依賴,就是哪個客戶端,光有服務提供者不成啊,對外提供服務,我得需要服務消費者啊,於是用同樣的方式,把消費者也註冊進zookeeper,zookeeper中就存在了4個node,也就是4個客戶端,服務消費者訂閱zookeeper,向它拉取服務提供者的address,然後把地址快取在本地, 進而可以遠端呼叫服務消費者,那麼問題又來了,萬一哪一臺服務提供者掛了,怎麼辦呢?zookeeper是不是得通知消費者呢? 萬一哪一天服務提供者的address變了,是不是也得通知消費者? 這就是watcher存在的意義,它解決了這件事

實驗場景:

假設我們已經成功啟動了zookeeper的服務端和客戶端,並且預先添加了watcher,然後使用控制檯動態的修改下node的data,我們會發現watcher回撥的現象

新增的鉤子函式程式碼如下:

public class ZookepperClientTest {
    public static void main(String[] args) throws Exception {
        ZooKeeper client = new ZooKeeper("localhost", 5000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.err.println("連線,觸發");
            }
        });

    Stat stat = new Stat();
    
     //   todo 下面新增的事件監聽器可是實現事件的消費訂閱
      String content = new String(client.getData("/node1", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                // todo 任何連線上這個節點的客戶端修改了這個節點的 data資料,都會引起process函式的回撥

                // todo 特點1:  watch只能使用1次
                if (event.getType().equals(Event.EventType.NodeDataChanged)){
                    System.err.println("當前節點資料發生了改變");
                }
            }
        }, stat));

看如上的程式碼, 添加了一個自己的watcher也就是client.getData("/node1", new Watcher() {} 這是個回撥的鉤子函式,執行時不會執行,當滿足的某個條件時才會執行, 比如: node1被刪除了, node1的data被修改了

getData做了哪些事情?

原始碼如下: getdata,顧名思義,返回服務端的node的data+stat, 當然是當服務端的node發生了變化後呼叫的

主要主流如下幾個工作

  • 建立WatchRegistration wcb= new DataWatchRegistration(watcher, clientPath);
    • 其實就是一個簡單的內部類,將path 和 watch 封裝進了一個物件
  • 建立一個request,並且初始化這個request.head=getData=4
  • 呼叫ClientCnxn.submitRequest(...) , 將現存的這些資訊進一步封裝
  • request.setWatch(watcher != null);說明他並沒有將watcher封裝進去,而是僅僅做了個有沒有watcher的標記
 public byte[] getData(final String path, Watcher watcher, Stat stat)
        throws KeeperException, InterruptedException
     {
         // todo 校驗path
        final String clientPath = path;
        PathUtils.validatePath(clientPath);

        // the watch contains the un-chroot path
        WatchRegistration wcb = null;
        if (watcher != null) {
            // todo DataWatchRegistration 繼承了 WatchRegistration
            // todo DataWatchRegistration 其實就是一個簡單的內部類,將path 和 watch 封裝進了一個物件
            wcb = new DataWatchRegistration(watcher, clientPath);
        }

        final String serverPath = prependChroot(clientPath);
        // todo 建立一個請求頭
        RequestHeader h = new RequestHeader();
        h.setType(ZooDefs.OpCode.getData);

        // todo 建立了一個GetDataRequest
        GetDataRequest request = new GetDataRequest();
        // todo 給這個請求初始化,path 是傳遞進來的path,但是 watcher不是!!! 如果我們給定了watcher , 這裡面的條件就是  true
        request.setPath(serverPath);
        request.setWatch(watcher != null); // todo 可看看看服務端接收到請求是怎麼辦的

        GetDataResponse response = new GetDataResponse();

        // todo 同樣由 clientCnxn 上下文進行提交請求, 這個操作應該同樣是阻塞的
         // todo EventThread 和 SendThread 同時使用一份 clientCnxn的 submitRequest()
        ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);

        if (r.getErr() != 0) {
            throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                    clientPath);
        }
        if (stat != null) {
            DataTree.copyStat(response.getStat(), stat);
        }
        return response.getData();
    }

ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); 的原始碼我解除安裝下面, 這裡來到這個方法中,一眼能看到,它依然是阻塞的式的,並且requet被進一步封裝進packet

更重要的是 queuePacket()方法的最後一個引數,存在我們剛剛建立的path+watcher的封裝類

public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration)
        throws InterruptedException {
    ReplyHeader r = new ReplyHeader();
    // todo 來到這個 queuePacket() 方法在下面, 這個方法就是將  使用者輸入-> string ->>> request ->>> packet 的過程
    Packet packet = queuePacket(h, r, request, response, null, null, null,
            null, watchRegistration);


    // todo 使用同步程式碼塊,在下面的進行    同步阻塞等待, 直到有了Response響應才會跳出這個迴圈, 這個finished狀態就是在客戶端接受到服務端的
    // todo 的響應後, 將服務端的響應解析出來,然後放置到 pendingqueue裡時,設定上去的
    synchronized (packet) {
        while (!packet.finished) {
            // todo 這個等待是需要喚醒的
            packet.wait();
        }
    }
    // todo 直到上面的程式碼塊被喚醒,才會這個方法才會返回
    return r;
}

同樣,在queuePacket()方法中將packet提交到outgoingQueue中,最終被seadThread消費傳送到服務端

服務端如何處理watchRegistration不為空的packet

後續我準備用一整篇部落格詳解單機模式下服務端處理請求的流程,所以這篇部落格只說結論

在服務端,使用者的請求最終會按順序流向三個Processor,分別是

  • PrepRequestProcessor
    • 負責進行一些狀態的修改
  • SyncRequestProcessor
    • 將事務日誌同步到磁碟
  • FinalRequestProcessor
    • 相應使用者的請求

我們直接去看FinalRequestProcessorpublic void processRequest(Request request) {}方法,看他針對getData()方式的請求做出了哪些動作.下面來了個小高潮,zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null);跟進watcher的有無給服務端新增不同的Watcher

真的得劃重點了,當我發現這一點時,我的心情是超級激動的,就像發現了新大陸一樣

case OpCode.getData: {
        lastOp = "GETD";
        GetDataRequest getDataRequest = new GetDataRequest();
        ByteBufferInputStream.byteBuffer2Record(request.request,
                getDataRequest);
        DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
        if (n == null) {
            throw new KeeperException.NoNodeException();
        }
        PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
                ZooDefs.Perms.READ,
                request.authInfo);
        Stat stat = new Stat();
        // todo 這裡的操作    getDataRequest.getWatch() ? cnxn : null 對應可客戶端的  跟進watcher有沒有而決定往服務端傳遞 true 還是false 相關
        // todo 跟進去 getData()
        byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
                getDataRequest.getWatch() ? cnxn : null);
        //todo  cnxn的Processor()被回撥, 往客戶端傳送資料 , 什麼時候觸發呢? 就是上面的  處理事務時的回撥 第127行

        // todo 構建了一個 rsp ,在本類的最後面將rsp 響應給client
        rsp = new GetDataResponse(b, stat);
        break;
    }

繼續跟進這個getData()在服務端維護了一份path+watcher的map

public byte[] getData(String path, Stat stat, Watcher watcher)
        throws KeeperException.NoNodeException {
    DataNode n = nodes.get(path);
    if (n == null) {
        throw new KeeperException.NoNodeException();
    }
    synchronized (n) {
        n.copyStat(stat);
        if (watcher != null) {
            // todo 將path 和 watcher 繫結在一起
            dataWatches.addWatch(path, watcher);
        }
        return n.data;
    }
}

客戶端開啟命令列,修改服務端node的狀態

書接上回,當客戶單的程式碼去建立ClientCnxn時,有下面的邏輯 , 它開啟了兩條守護執行緒, sendThread負責向服務端傳送心跳,已經和服務端進行使用者相關的IO交流, EventThread就負責和txn事務相關的處理邏輯,級別上升到針對node

    // todo start就是啟動了在構造方法中建立的執行緒
    public void start() {
        sendThread.start();
        eventThread.start();
    }

到目前為止,客戶端就有如下三條執行緒了

  • 負責處理使用者在控制檯輸入命令的主執行緒
  • 守護執行緒1: seadThread
  • 守護執行緒2: eventThread

跟進主執行緒的處理使用者輸入部分的邏輯程式碼如下:

下面的程式碼的主要邏輯就是處理使用者輸入的命令,當通過if-else選擇分支判斷使用者到底輸入的啥命令

按照我們的假定的場景,使用者輸入的命令是這樣的 set /path newValue 所以,毫無疑問,經過解析後代碼會去執行下面的stat = zk.setData(path, args[2].getBytes(),部分

  // todo zookeeper客戶端, 處理使用者輸入命令的具體邏輯
    // todo  用大白話講,下面其實就是把 從控制檯獲取的使用者的輸入資訊轉換成指定的字元, 然後傳送到服務端
    // todo MyCommandOptions 是處理命令列選項和shell指令碼的工具類
    protected boolean processZKCmd(MyCommandOptions co) throws KeeperException, IOException, InterruptedException {
        // todo 在這個方法中可以看到很多的命令列所支援的命令
        Stat stat = new Stat();
        // todo 獲取命令列輸入中 0 1 2 3 ... 位置的內容, 比如 0 位置是命令  1 2 3 位置可能就是不同的引數
        String[] args = co.getArgArray();
        String cmd = co.getCommand();
        if (args.length < 1) {
            usage();
            return false;
        }

        if (!commandMap.containsKey(cmd)) {
            usage();
            return false;
        }

        boolean watch = args.length > 2;
        String path = null;
        List<ACL> acl = Ids.OPEN_ACL_UNSAFE;
        LOG.debug("Processing " + cmd);

        if (cmd.equals("quit")) {
            System.out.println("Quitting...");
            zk.close();
            System.exit(0);
        } else if (cmd.equals("set") && args.length >= 3) {
            path = args[1];
            stat = zk.setData(path, args[2].getBytes(),
                    args.length > 3 ? Integer.parseInt(args[3]) : -1);
            printStat(stat);

繼續跟進stat = zk.setData(path, args[2].getBytes(), 下面的邏輯也很簡單,就是將使用者的輸入封裝進來request中,通過ClientCnxn類的submit方法提交到一個佇列中,等待著sendThread去消費

這次有目的的看一下submitRequest的最後一個引數為null, 這個引數是WatchRegistration的位置,一開始置為null

 public Stat setData(final String path, byte data[], int version)
        throws KeeperException, InterruptedException
    {
        final String clientPath = path;
        PathUtils.validatePath(clientPath);

        final String serverPath = prependChroot(clientPath);

        RequestHeader h = new RequestHeader();
        h.setType(ZooDefs.OpCode.setData);
        SetDataRequest request = new SetDataRequest();
        request.setPath(serverPath);
        request.setData(data);
        request.setVersion(version);
        
        SetDataResponse response = new SetDataResponse();
        ReplyHeader r = cnxn.submitRequest(h, request, response, null);
        if (r.getErr() != 0) {
            throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                    clientPath);
        }
        return response.getStat();
    }

跟進這個submitRequest()方法, 原始碼如下,不處所料的是,它同樣被阻塞住了,直到服務端給了它響應

當前程式碼的主要邏輯就是將request封裝進packet,然後將packet新增到ClintCnxn維護的outgoingQueue佇列中等待sendThread的消費

這次來到這個方法是因為我們在控制檯輸入的set 命令而觸發的,比較重要的是本次packet攜帶的WatchRegistration==null, 毫無疑問,這次服務端在FinalRequestProcessor中再處理時取出的watcher==null, 也就不會將path+watcher儲存進maptable中

重要:傳送事務訊息

FinalRequestProcessorpublic void processRequest(Request request) {}方法中,有如下程式碼

//todo 請求頭不為空
    if (request.hdr != null) {
        // 獲取請求頭
       TxnHeader hdr = request.hdr;
       // 獲取事務
       Record txn = request.txn;
        // todo 跟進這個方法-----<--!!!!!!-----處理事務的邏輯,在這裡面有向客戶端傳送事件的邏輯, 回撥客戶端的watcher----!!!!!!-->
       rc = zks.processTxn(hdr, txn);
    }

繼續跟進去

// todo 處理事物日誌
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
    ProcessTxnResult rc;
    int opCode = hdr.getType();
    long sessionId = hdr.getClientId();
    // todo 繼續跟進去!!!!!!!!!
    // todo 跟進 processTxn(hdr, txn)
    rc = getZKDatabase().processTxn(hdr, txn);

跟進ZkDatabase.java中的processTxn(hdr, txn)方法

public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
    // todo 跟進 processTxn
    return dataTree.processTxn(hdr, txn);
}

跟進到DataTree.java

  public ProcessTxnResult processTxn(TxnHeader header, Record txn)
    {
        ProcessTxnResult rc = new ProcessTxnResult();

        try {
            rc.clientId = header.getClientId();
            rc.cxid = header.getCxid();
            rc.zxid = header.getZxid();
            rc.type = header.getType();
            rc.err = 0;
            rc.multiResult = null;
            switch (header.getType()) { // todo 根據客客戶端傳送過來的type進行switch,
                case OpCode.create:
                    CreateTxn createTxn = (CreateTxn) txn;
                    rc.path = createTxn.getPath();
                    // todo  跟進這個建立節點的方法
                    createNode(
                            createTxn.getPath(),

根據請求頭的值,進而判斷出走到那個switch的分支,當前我們在控制檯觸發,進入到setData分支如下:跟進這個方法中可以看到它主要做了如下幾件事

  • 使用傳遞進來的新值替代舊data
  • dataWatches.triggerWatch(path, EventType.NodeDataChanged);觸發指定的事件watch,什麼事件呢? NodeDataChange, 觸發了哪個watcher呢? 跟進去檢視


    //todo  setData
    public Stat setData(String path, byte data[], int version, long zxid,
            long time) throws KeeperException.NoNodeException {
        Stat s = new Stat();
        DataNode n = nodes.get(path);
        if (n == null) {
            throw new KeeperException.NoNodeException();
        }
        byte lastdata[] = null;
        synchronized (n) {
            // todo 修改記憶體的資料
            lastdata = n.data;
            n.data = data;
            n.stat.setMtime(time);
            n.stat.setMzxid(zxid);
            n.stat.setVersion(version);
            n.copyStat(s);
        }
        // now update if the path is in a quota subtree.
        String lastPrefix;
        if((lastPrefix = getMaxPrefixWithQuota(path)) != null) {
          this.updateBytes(lastPrefix, (data == null ? 0 : data.length)
              - (lastdata == null ? 0 : lastdata.length));
        }
        // todo 終於 看到了   服務端 關於觸發NodeDataChanged的事件
        dataWatches.triggerWatch(path, EventType.NodeDataChanged);
        return s;
    }

跟進去dataWatches.triggerWatch(path, EventType.NodeDataChanged);,原始碼如下, 主要的邏輯就是取出存放在服務端的watch,然後逐個回撥他們的processor函式,問題來了,到底是哪些watcher呢? 其實就是我們在客戶端啟動時新增getData()時存進去的wather,也就是ServerCnxn


 // todo 跟進去服務端的 觸發事件,  但是吧, 很納悶. 就是沒有往客戶端傳送資料的邏輯
    public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
        WatchedEvent e = new WatchedEvent(type,
                KeeperState.SyncConnected, path);
        HashSet<Watcher> watchers;
        synchronized (this) {
            watchers = watchTable.remove(path);
            if (watchers == null || watchers.isEmpty()) {
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logTraceMessage(LOG,
                            ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                            "No watchers for " + path);
                }
                return null;
            }
            for (Watcher w : watchers) {
                HashSet<String> paths = watch2Paths.get(w);
                if (paths != null) {
                    paths.remove(path);
                }
            }
        }
        for (Watcher w : watchers) {
            if (supress != null && supress.contains(w)) {
                continue;
            }
            // todo 繼續跟進去, 看它如何回撥的
            w.process(e);
        }
        return watchers;
    }

懷著激動的心情去看看ServerCnxn的process()方法做了什麼事?

來到ServerCnxn的實現類NIOServerCnxn, 確實很激動,看到了服務端在往客戶端傳送事務型訊息, 並且new ReplyHeader(-1, -1L, 0)第一個位置上的引數是-1, 這一點很重要,因為客戶端在接受到這個xid=-1的標記後,就會將這條響應交給EventThread處理

    @Override
    synchronized public void process(WatchedEvent event) {
        ReplyHeader h = new ReplyHeader(-1, -1L, 0);
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                                     "Deliver event " + event + " to 0x"
                                     + Long.toHexString(this.sessionId)
                                     + " through " + this);
        }

        // Convert WatchedEvent to a type that can be sent over the wire
        WatcherEvent e = event.getWrapper();
        // todo  往服務端傳送了 e event型別訊息
        sendResponse(h, e, "notification");
    }

處理回調回調watch使用的響應

進入到SendThread的讀就緒原始碼部分,如下: 它根據header.xid=-1就知道了這是事務型別的響應

// todo 服務端丟擲來的事件, 客戶端將把他存在EventThread的 watingEvents 佇列中
// todo 它的實現邏輯也是這樣, 會有另外一個執行緒不斷的消費這個佇列
if (replyHdr.getXid() == -1) {
    // -1 means notification
    if (LOG.isDebugEnabled()) {
        LOG.debug("Got notification sessionid:0x"
                + Long.toHexString(sessionId));
    }
    // todo 建立watcherEvent 並將服務端傳送回來的資料,反序列化進這個物件中
    WatcherEvent event = new WatcherEvent();
    event.deserialize(bbia, "response");

    // convert from a server path to a client path
    // todo 將server path 反轉成 client path
    if (chrootPath != null) {
        String serverPath = event.getPath();
        if (serverPath.compareTo(chrootPath) == 0)
            event.setPath("/");
        else if (serverPath.length() > chrootPath.length())
            event.setPath(serverPath.substring(chrootPath.length()));
        else {
            LOG.warn("Got server path " + event.getPath()
                    + " which is too short for chroot path "
                    + chrootPath);
        }
              WatchedEvent we = new WatchedEvent(event);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got " + we + " for sessionid 0x"
                            + Long.toHexString(sessionId));
                }
                //todo 跟進去
                eventThread.queueEvent(we);
                return;
            }
    }

在這個方法的最後,將這個相應新增進EventThread消費的佇列中,跟進 eventThread.queueEvent(we);

// todo
public void queueEvent(WatchedEvent event) {
    // todo 如果事件的型別是 none, 或者sessionState =  直接返回
    /**
     *   todo 事件的型別被設計成 watcher 介面的列舉
     *   None (-1),
     *   NodeCreated (1),
     *   NodeDeleted (2),
     *   NodeDataChanged (3),
     *   NodeChildrenChanged (4);
     */
    if (event.getType() == EventType.None
            && sessionState == event.getState()) {
        return;
    }
    sessionState = event.getState();

    // materialize the watchers based on the event
    // todo 根據事件的具體型別,將觀察者具體化, 跟進去
    // todo 這個類是ClientCnxn的輔助類,作用就是將watcher 和它觀察的事件封裝在一起
    WatcherSetEventPair pair = new WatcherSetEventPair(
            //todo 跟進這個 materialize方法. 其實就是從map中取出了和當前client關聯的全部 watcher set

            watcher.materialize(event.getState(), event.getType(),
                    event.getPath()),
            event);
    // queue the pair (watch set & event) for later processing
    // todo 將watch集合 和 event 進行排隊(按順序新增到佇列裡了), 以便後續處理 , 怎麼處理呢?  就在EventThread的run迴圈中消費
    // todo watingEvent ==>  LinkedBlockingQueue<Object>
    waitingEvents.add(pair);
}

上面的程式碼主要做了如下幾件事:

  • 從map中取出和當前事件相關的全部watcher
  • 將watcher set 新增進 waitingEvents佇列中,等待EventThead的消費

跟進 watcher.materialize(event.getState(), event.getType(), 會追到下面的程式碼

case NodeDataChanged: // todo node中的data改變和 nodeCreate 都會來到下面的分支
        case NodeCreated:
            synchronized (dataWatches) {
                // todo dataWatches 就是剛才存放  path : watcher 的map
                // todo dataWatches.remove(clientPath) 移除並返回clientPath對應的watcher , 放入 result 中
                addTo(dataWatches.remove(clientPath), result);
            }

上面的dataWatches 就是儲存path+watcher set的map, 上面的操作是移除並返回指定的watcher,這也說明了,為什麼zk原生客戶端新增的watcher僅僅會回撥一次

EventThread是如何消費waitingEvents的

EventThread是一條守護執行緒, 因此它擁有自己的不斷在執行的run方法,它就是在這個run方法中對這個佇列進行消費的


        @Override
        public void run() {
            try {
                isRunning = true;
                // todo 同樣是無限的迴圈
                while (true) {
                    // todo 從watingEvnets 中取出一個 WatcherSetEventPair
                    Object event = waitingEvents.take();
                    if (event == eventOfDeath) {
                        wasKilled = true;
                    } else {
                        // todo 本類方法,處理這個事件,繼續進入,方法就在下面
                        processEvent(event);
                    }
                    if (wasKilled)
                        synchronized (waitingEvents) {
                            if (waitingEvents.isEmpty()) {
                                isRunning = false;
                                break;
                            }
                        }
                }
            } catc
        

繼續跟進它的processEvent(event),最終會在這個方法中呼叫下面的程式碼,這裡的watcher就是我在本篇部落格的開始位置新增進去的watcher,至此打完收工

 watcher.process(pair.event);

我是懷著無比激動的心情寫這篇部落格的,如果對您有幫助,歡迎給我點個贊