1. 程式人生 > >Zookeeper原始碼分析:Watcher機制

Zookeeper原始碼分析:Watcher機制

1. 設定Watcher

使用Watcher需要先實現Watcher介面,並將實現類物件傳遞到指定方法中,如getChildren, exist等。Zookeeper允許在構造Zookeeper物件時候指定一個預設Watcher物件.getChildren和exit方法可以使用這個預設的Watcher物件,也可以指定一個新Watcher物件。

Code 1: Watcher介面

public interface Watcher {

    /**
     * Event的狀態
     */
    public interface Event {
        /**
         * 在事件發生時,ZooKeeper的狀態
         */         public enum KeeperState {             @Deprecated             Unknown (-1),             Disconnected (0),             @Deprecated             NoSyncConnected (1),             SyncConnected (3),             AuthFailed (4),             ConnectedReadOnly (5),             SaslAuthenticated(6),             Expired (-112);             private
 final int intValue;              KeeperState( int intValue) {                 this.intValue = intValue;             }               ......         }         /**          * ZooKeeper中的事件          */         public enum EventType {             None (-1),             NodeCreated (1),             NodeDeleted (2),             NodeDataChanged (3),             NodeChildrenChanged (4);             private
 final int intValue;     // Integer representation of value                                             // for sending over wire             EventType( int intValue) {                 this.intValue = intValue;             }             ......           }     }     //Watcher的回撥方法     abstract public void process(WatchedEvent event); }

Code 2: Zookeeper.getChildren(final String, Watcher)方法

public List<String> getChildren(final String path, Watcher watcher)
    throws KeeperException, InterruptedException
{
    final String clientPath = path;
    PathUtils. validatePath(clientPath);

    WatchRegistration wcb = null;
    //如果watcher不等於null, 構建WatchRegistration物件,
    //該物件描述了watcher和path之間的關係
    if (watcher != null) {
        wcb = new ChildWatchRegistration(watcher, clientPath);
    }
    
    //在傳入的path加上root path字首,構成伺服器端的絕對路徑
    final String serverPath = prependChroot(clientPath);
    
    //構建RequestHeader物件
    RequestHeader h = new RequestHeader();
    //設定操作型別為OpCode. getChildren
    h.setType(ZooDefs.OpCode. getChildren);
    //構建GetChildrenRequest物件
    GetChildrenRequest request = new GetChildrenRequest();
    //設定path
    request.setPath(serverPath);
    //設定是否使用watcher
    request.setWatch(watcher != null);
    //構建GetChildrenResponse物件
    GetChildrenResponse response = new GetChildrenResponse();
    //提交請求,並阻塞等待結果
    ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
    if (r.getErr() != 0) {
        throw KeeperException.create(KeeperException.Code. get(r.getErr()),
                clientPath);
    }
    return response.getChildren();
}

Follower的NIOServerCnxn類接到了Client的請求,會呼叫ZookeeperServer.processPacket()方法。該方法會構建一個Request物件,並呼叫第一個處理器FollowerRequestProcessor。

由於我們的請求只是一個讀操作,而不是一個Quorum請求或者sync請求,所以FollowerRequestProcessor不需要呼叫Follower.request()方法將請求轉給Leader,只需要將請求傳遞到下一個處理器CommitProcessor。

處理器CommitProcessor執行緒發現請求是讀請求後,直接將Requet物件加入到toProcess佇列中,在接下的迴圈中會呼叫FinalRequestProcessor.processRequest方法進行處理。

FinalRequestProcessor.processRequest方法最終會呼叫ZKDatabase中的讀操作方法(如statNode和getData方法), 而ZKDatabase的這些方法會最終呼叫DataTree類的方法來獲取指定path的znode資訊並返回給Client端,同時也會設定Watcher。

Code 3: FinalRequestProcessor對OpCode.getData請求的處理

case OpCode. getData: {
               lastOp = "GETD";
               GetDataRequest getDataRequest = new GetDataRequest();
               ByteBufferInputStream. byteBuffer2Record(request.request,
                       getDataRequest);
               //獲得znode物件
               DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
               //n為null, 丟擲NoNodeException異常
               if (n == null) {
                   throw new KeeperException.NoNodeException();
               }
               Long aclL;
               synchronized(n) {
                   aclL = n. acl;
               }
               //檢查是否有讀許可權
               PrepRequestProcessor. checkACL(zks, zks.getZKDatabase().convertLong(aclL),
                       ZooDefs.Perms. READ,
                       request. authInfo);
               //構建狀態物件stat
               Stat stat = new Stat();
               //獲得指定path的znode資料,
               //如果GetDataRequest.getWatcher()返回true, 將ServerCnxn型別物件cnxn傳遞進去。
               //ServerCnxn是實現了Watcher介面
               byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
                       getDataRequest. getWatch() ? cnxn : null);
               //構建GetDataResponse物件
               rsp = new GetDataResponse(b, stat);
               break;
           }

Code 4: DataTree.getData()方法

public byte[] getData(String path, Stat stat, Watcher watcher)
        throws KeeperException.NoNodeException {
    //從nodes map中獲取指定path的DataNode物件
    DataNode n = nodes.get(path);
    //如果n為null, 則丟擲NoNodeException異常
    if (n == null) {
        throw new KeeperException.NoNodeException();
    }
    synchronized (n) {
        //將n的狀態copy到stat中
        n.copyStat(stat);
        //如果watcher不會null, 則將(path, watcher)鍵值對放入dataWatchers Map裡
        if (watcher != null) {
            dataWatches.addWatch(path, watcher);
        }
        //返回節點資料
        return n.data ;
    }
}

2. 修改znode資料觸發Watcher

在Zookeeper二階段提交的COMMIT階段。當Follower從Leader那接收到一個寫請求的Leader.COMMIT資料包,會呼叫FinalRequestProcessor.processRequest()方法。Leader本身在傳送完Leader.COMMIT資料包,也會呼叫FinalRequestProcessor.processRequest()方法。

如果是setData修改資料請求,那麼FinalRequestProcessor.processRequest()方法最終會呼叫到DataTree.setData方法將txn應用到指定znode上,同時觸發Watcher,併發送notification給Client端。

其關SetData請求的時序圖如下:

triggerWatcher

Code 5: DataTree.setData()方法

public Stat setData(String path, byte data[], int version, long zxid,
        long time) throws KeeperException.NoNodeException {
    Stat s = new Stat();
    //根據path, 獲得DataNode物件n
    DataNode n = nodes.get(path);
    //如果n為null, 則丟擲NoNodeException異常
    if (n == null) {
        throw new KeeperException.NoNodeException();
    }
    byte lastdata[] = null;
    synchronized (n) {
        lastdata = n. data;
        n. data = data;
        n. stat.setMtime(time);
        n. stat.setMzxid(zxid);
        n. stat.setVersion(version);
        n.copyStat(s);
    }
    // now update if the path is in a quota subtree.
    String lastPrefix = getMaxPrefixWithQuota(path);
    if(lastPrefix != null) {
      this.updateBytes(lastPrefix, (data == null ? 0 : data.length)
          - (lastdata == null ? 0 : lastdata.length ));
    }
    //觸發Watcher
    dataWatches.triggerWatch(path, EventType.NodeDataChanged);
    return s;
}


Code 6: WatchManage.triggerWatcher()方法,觸發Watcher。

Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
    WatchedEvent e = new WatchedEvent(type,
            KeeperState. SyncConnected, path);
    HashSet<Watcher> watchers;
    synchronized (this ) {
        //從watchTable刪除掉path對於的watcher
        watchers = watchTable.remove(path);
        if (watchers == null || watchers.isEmpty()) {
            if (LOG .isTraceEnabled()) {
                ZooTrace. logTraceMessage(LOG,
                        ZooTrace. EVENT_DELIVERY_TRACE_MASK,
                        "No watchers for " + path);
            }
            return null;
        }
        for (Watcher w : watchers) {
            HashSet<String> paths = watch2Paths.get(w);
            if (paths != null) {
                paths.remove(path);
            }
        }
    }
    //迴圈處理所有關於path的Watcher, 這裡Watcher物件實際上就是ServerCnxn型別物件
    for (Watcher w : watchers) {
        if (supress != null && supress.contains(w)) {
            continue;
        }
        w.process(e);
    }
    return watchers;
}

Code 7: NIOServerCnxn.process方法,傳送notification給Client端

synchronized public void process (WatchedEvent event) {
    ReplyHeader h = new ReplyHeader(-1, -1L, 0);
    if (LOG .isTraceEnabled()) {
        ZooTrace. logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK ,
                                 "Deliver event " + event + " to 0x"
                                 + Long. toHexString(this. sessionId)
                                 + " through " + this );
    }

    // Convert WatchedEvent to a type that can be sent over the wire
    WatcherEvent e = event.getWrapper();
    
    //傳送notification給Client端
    sendResponse(h, e, "notification");
}

3. 總結

Watcher具有one-time trigger的特性,在程式碼中我們也可以看到一個watcher被處理後會立即從watchTable中刪掉。

本部落格系博主原創,轉載請附上原部落格地址:http://blog.csdn.net/jeff_fangji/article/details/43910113

相關推薦

Zookeeper原始碼分析Watcher機制

1. 設定Watcher 使用Watcher需要先實現Watcher介面,並將實現類物件傳遞到指定方法中,如getChildren, exist等。Zookeeper允許在構造Zookeeper物件時候指定一個預設Watcher物件.getChildren和exit方法可以

ZooKeeper原始碼分析Quorum請求的整個流程

Quorum請求是轉發給Leader處理,並且需要得一個Follower Quorum確認的請求。這些請求包括: 1)znode的寫操作(OpCode.create,OpCode.delete,OpCode.setData,OpCode.setACL) 2)Sessi

Zookeeper原始碼閱讀分析watcher機制

    客戶端ClientWatchManager,管理由ClientXncn產生的watchers和handle events 在zookeeper的exists、getChildren、getData等這些API中可以註冊watcher物件到ClientWatchMa

Master原理剖析與原始碼分析資源排程機制原始碼分析(schedule(),兩種資源排程演算法)

1、主備切換機制原理剖析與原始碼分析 2、註冊機制原理剖析與原始碼分析 3、狀態改變處理機制原始碼分析 4、資源排程機制原始碼分析(schedule(),兩種資源排程演算法) * Dri

原始碼分析Android 的onTouch事件傳遞機制分析

當用戶觸控式螢幕幕的時候,最先接受到觸控事件的是Activity的dispatchTouchEvent(). 我們就從這裡開始分析事件的分發 Activity原始碼 看下Activity的dispatchTouchEvent()原始碼。

x265原始碼分析sao.cpp 自適應樣點補償

/* 對|num / den|四捨五入,然後前面新增符號 */ inline int32_t roundIBDI(int32_t num, int32_t den) { return num >= 0 ? ((num * 2 + den)/(den * 2)) : -((-

x265原始碼分析main函式及CLIOptions結構體解釋

/** * 返回碼資訊: * 0 – 編碼成功; * 1 – 命令列解釋失敗; * 2 – 編碼器開啟失敗; * 3 – 生成流頭部失敗; * 4 – 編碼出錯; * 5 – 開啟csv檔案失敗. */ int main(int argc, char **argv) {

Dubbo原始碼分析Dubbo自己實現的IOC

  在建立自適應例項時,都會呼叫ExtensionLoader的injectExtension方法: @SuppressWarnings("unchecked") private T createAdaptiveExtension() { try {

Spring Developer Tools 原始碼分析三、重啟自動配置'

接上文 Spring Developer Tools 原始碼分析:二、類路徑監控,接下來看看前面提到的這些類是如何配置,如何啟動的。 spring-boot-devtools 使用了 Spring Boot 的自動配置方式,我們先關注本地開發環境中自動重啟的部分。 在 LocalDevToolsAut

JAVA常用集合原始碼分析HashSet

序言 在上一篇文章中,我們介紹了HashMap,其實本來想自己完成原始碼分析的一系列文章的,但是HashMap的原始碼著實是複雜,看的我腦殼疼。。於是就自己去找了找大牛們的文章反覆看,後面總算有了點門道了,大致知道了HashMap的原理,然後轉載了一篇我認為總結的比較好的文章到我的部落格裡,供大

JAVA常用集合原始碼分析HashMap

我們這篇文章就來試著分析下 HashMap 的原始碼,由於 HashMap 底層涉及到太多方面,一篇文章總是不能面面俱到,所以我們可以帶著面試官常問的幾個問題去看原始碼: 瞭解底層如何儲存資料的 HashMap 的幾個主要方法 HashMap 是如何確定元素儲存位置的以及如何處

JAVA常用集合原始碼分析LinkedList

概述 上一篇我們介紹了ArrayList,我們知道它的底層是基於陣列實現的,提到陣列,我們就馬上會想到它的兄弟連結串列,今天我們要介紹的LinkedList就是基於連結串列實現的。 繼承結構 public class LinkedList<E> extends Abs

JAVA常用集合原始碼分析ArrayList

ArrayList簡介 ArrayList 是一個動態陣列,所謂動態,是相對陣列來說的,我們知道當我們在使用陣列的時候必須指定大小,而且大小隻能是固定的,有時候就很不方便,讓人不爽。而我們的ArrayList恰恰解決了這一痛點,讓我們可以不受束縛地使用陣列。 閱讀方法 看繼承結構與

QT原始碼分析QObject

  QT框架裡面最大的特色就是在C++的基礎上增加了元物件系統(Meta-Object System),而元物件系統裡面最重要的內容就是訊號與槽機制,這個機制是在C++語法的基礎上實現的,使用了函式、函式指標、回撥函式等概念。當然與我們自己去寫函式所不同的是槽與訊號機制會自動幫我們生成部分程式碼,比如我們寫的

Tomcat原始碼分析一、tomcat元件認識

    前言      最近一次上線過程中出現了jekin是自動化部署的web環境跟本地開發環境不一致的情況,導致生產環境應用訪問失敗,因此閱讀tomcat原始碼,以加深對web的認識。 基本元件 在閱讀原始碼之前,最好是對整個應

J.U.C原始碼分析CountDownLatch

        CountDownLoad是在併發程式設計中使用較多的一個類,可以完成多個執行緒之間的相互等待和協作,原始碼內容不多但功能強大且使用場景複雜多樣。         原始碼

SIFT原理與原始碼分析DoG尺度空間構造

《SIFT原理與原始碼分析》系列文章索引:http://blog.csdn.net/xiaowei_cqu/article/details/8069548 尺度空間理論   自然界中的物體隨著觀測尺度不同有不同的表現形態。例如我們形容建築物用“米”,觀測分子、原子等用“納米”。

muduo原始碼分析TcpConnection類

前言 前面學習了TcpServer的實現,TcpServer對每個連線都會新建一個TcpConnection(使用shared_ptr管理)。接下來學習一下TcpConnection的設計細節。   連線狀態 muduo對於一個連線的從生到死進行了狀態的定義,類似一個狀態機

muduo原始碼分析TcpServer類

上篇博文學習了Acceptor class 的實現,它僅僅是對Channel和Socket的簡單封裝,對使用者來說簡單易用。這得益於底層架構Reactor。接下來,開始學習muduo對於建立連線的處理。這屬於muduo提到的三個半事件中的第一個。可以想一下,TcpServer class應該也是對A

muduo原始碼分析Acceptor類

Acceptor用於接受(accept)客戶端的連線,通過設定回撥函式通知使用者。它只在muduo網路庫內部的TcpServer使用,由TcpServer控制它的生命期。 實際上,Acceptor只是對 Channel 的封裝,通過Channel關注listenfd的&nbs