1. 程式人生 > >## Zookeeper原始碼閱讀(六) Watcher

## Zookeeper原始碼閱讀(六) Watcher

前言

好久沒有更新部落格了,最近這段時間過得很壓抑,終於開始踏上為換工作準備的正軌了,工作又真的很忙而且很瑣碎,讓自己有點煩惱,希望能早點結束這種狀態。

繼上次分析了ZK的ACL相關程式碼後,ZK裡非常重要的另一個特性就是Watcher機制了。其實在我看來,就ZK的使用而言,Watche機制是最核心的特性也不為過了!這一篇先簡單介紹下watcher相關的實體類和介面。

Watcher機制

在ZK中,客戶端可以為znode向服務端註冊監聽,當相應znode的指定事件被觸發時,服務端就會向客戶端傳送通知,而客戶端收到通知後也會執行相應的響應邏輯。整體邏輯如下:

Watcher特性:

  1. 一次性:每次註冊的watcher只能被觸發一次,之後就會被刪除,如果想繼續觸發需要註冊新的watcher;
  2. 序列性:客戶端執行watcher是一個序列過程;
  3. 輕量性:watcher僅包含通知狀態、事件型別和節點路徑。

Watcher總體框架

看Watcher相關的程式碼和寫這篇部落格參考了很多資料,首先列一下Watcher的總體的框架:

上面兩幅圖能很清楚的看到和Watcher有關的介面和類的結構,簡單介紹下它們的功能:

Watcher和Event介面:使用過ZK的人對Watcher介面應該很熟悉,內部定義的process方法是watcher被觸發是zk呼叫的方法。同時,watcher內部有內部介面Event,定義了事件的型別(連線狀態KeeperState和znode的變化狀態EventType);

WatchedEvent介面:連線狀態和znode的變化狀態以及znode的路徑;

WatcherEvent介面:內容和WatchedEvent一模一樣,但是是負責網路傳輸的,由jute生成。

ClientWatchManager介面:根據event返回相應的watcher(Return a set of watchers that should be notified of the event.)。定義了materialize方法。ZKWatchManager實現了這個介面。

程式碼分析

Watcher介面

這是watcher介面和兩個內部列舉之間的類圖關係,詳細看下程式碼:

/**
 * This interface specifies the public interface an event handler class must
 * implement. A ZooKeeper client will get various events from the ZooKeeper
 * server it connects to. An application using such a client handles these
 * events by registering a callback object with the client. The callback object
 * is expected to be an instance of a class that implements Watcher interface.
 * 
 */
@InterfaceAudience.Public
public interface Watcher {

    /**
     * This interface defines the possible states an Event may represent
     */
    @InterfaceAudience.Public
    //內部介面Event,表示事件的狀態
    public interface Event {
        /**
         * Enumeration of states the ZooKeeper may be at the event
         */
        @InterfaceAudience.Public
        //內部列舉,連線狀態
        public enum KeeperState {
            /** Unused, this state is never generated by the server */
            @Deprecated
            Unknown (-1),

            /** The client is in the disconnected state - it is not connected
             * to any server in the ensemble. */
             //斷開連線-0
            Disconnected (0),

            /** Unused, this state is never generated by the server */
            @Deprecated
            NoSyncConnected (1),

            /** The client is in the connected state - it is connected
             * to a server in the ensemble (one of the servers specified
             * in the host connection parameter during ZooKeeper client
             * creation). */
             //連線狀態-3
            SyncConnected (3),

            /**
             * Auth failed state
             */
             //認證失敗
            AuthFailed (4),

            /**
             * The client is connected to a read-only server, that is the
             * server which is not currently connected to the majority.
             * The only operations allowed after receiving this state is
             * read operations.
             * This state is generated for read-only clients only since
             * read/write clients aren't allowed to connect to r/o servers.
             */
             //連線到read-only的server
            ConnectedReadOnly (5),

            /**
              * SaslAuthenticated: used to notify clients that they are SASL-authenticated,
              * so that they can perform Zookeeper actions with their SASL-authorized permissions.
              */
              //Sasl認證通過-6
            SaslAuthenticated(6),

            /** The serving cluster has expired this session. The ZooKeeper
             * client connection (the session) is no longer valid. You must
             * create a new client connection (instantiate a new ZooKeeper
             * instance) if you with to access the ensemble. */
             //session過期狀態-(-112)
            Expired (-112);

			//intValue用來表示當前的連線狀態
            private final int intValue;     // Integer representation of value
                                            // for sending over wire
			//構造器
            KeeperState(int intValue) {
                this.intValue = intValue;
            }

			//返回整型值
            public int getIntValue() {
                return intValue;
            }

			//int->狀態
            public static KeeperState fromInt(int intValue) {
                switch(intValue) {
                    case   -1: return KeeperState.Unknown;
                    case    0: return KeeperState.Disconnected;
                    case    1: return KeeperState.NoSyncConnected;
                    case    3: return KeeperState.SyncConnected;
                    case    4: return KeeperState.AuthFailed;
                    case    5: return KeeperState.ConnectedReadOnly;
                    case    6: return KeeperState.SaslAuthenticated;
                    case -112: return KeeperState.Expired;

                    default:
                        throw new RuntimeException("Invalid integer value for conversion to KeeperState");
                }
            }
        }

        /**
         * Enumeration of types of events that may occur on the ZooKeeper
         */
        @InterfaceAudience.Public
        //事件型別
        public enum EventType {
            None (-1),//無事件
            NodeCreated (1),//建立節點
            NodeDeleted (2),//刪除節點
            NodeDataChanged (3),//節點資料改變
            NodeChildrenChanged (4);//節點的孩子節點發生change

			//時間整型值
            private final int intValue;     // Integer representation of value
                                            // for sending over wire
			//構造器
            EventType(int intValue) {
                this.intValue = intValue;
            }
			//獲取整型值
            public int getIntValue() {
                return intValue;
            }
			//int->事件型別
            public static EventType fromInt(int intValue) {
                switch(intValue) {
                    case -1: return EventType.None;
                    case  1: return EventType.NodeCreated;
                    case  2: return EventType.NodeDeleted;
                    case  3: return EventType.NodeDataChanged;
                    case  4: return EventType.NodeChildrenChanged;

                    default:
                        throw new RuntimeException("Invalid integer value for conversion to EventType");
                }
            }           
        }
    }

	//回撥方法
    abstract public void process(WatchedEvent event);
}

為了更簡潔的說明Event介面中兩種狀態在實際使用時的情況,用《從zk到paxos》中的表來表示下:

特別的說明:

  1. NodeDataChanged事件包含znode的dataversion和data本身的修改均會觸發Watcher,所以即使用相同內容來更新data,dataversion依然會更新;NodeChildrenDataChanged則指的是子節點列表發生變化,如節點增加或刪除時會觸發。
  2. AuthFailed和NoAuth是兩種狀態,前者是auth的模式不對(例如選擇了digest1,而不是正確的digest模式),後者是表示auth資訊不對。
  3. process這個回撥方法非常重要。當zk向客戶端傳送一個watcher通知時,客戶端就會對相應的process方法進行回撥,從而實現對事件的處理。

WatchedEvent和WatcherEvent

/**
 *  A WatchedEvent represents a change on the ZooKeeper that a Watcher
 *  is able to respond to.  The WatchedEvent includes exactly what happened,
 *  the current state of the ZooKeeper, and the path of the znode that
 *  was involved in the event.
 */
@InterfaceAudience.Public
public class WatchedEvent {
    final private KeeperState keeperState;//連線狀態
    final private EventType eventType;//事件型別
    private String path;//路徑

WatchedEvent用來封裝服務端事件並傳遞給Watcher,從而方便回撥方法process來處理。

WatcherEvetn從內容含義上來說和WatchedEvent是一樣的,只是WatcherEvent實現了Record介面,方便序列化來進行網路傳輸。

public class WatcherEvent implements Record {
  private int type;//事件型別
  private int state;//連線狀態
  private String path;//路徑

特別的是,WatchedEvent中的getWrapper方法就是把WatchedEvent包裝成WatcherEvent,程式碼很簡單:

/**
 *  Convert WatchedEvent to type that can be sent over network
 */
public WatcherEvent getWrapper() {
    return new WatcherEvent(eventType.getIntValue(), //呼叫watcherevent的建構函式
                            keeperState.getIntValue(), 
                            path);
}

ClientWatchManager和ZKWatchManager

ClientWatchManager是根據獲得的Event得到需要通知的watcher。

public interface ClientWatchManager {
    /**
     * Return a set of watchers that should be notified of the event. The 
     * manager must not notify the watcher(s), however it will update it's 
     * internal structure as if the watches had triggered. The intent being 
     * that the callee is now responsible for notifying the watchers of the 
     * event, possibly at some later time.
     * 
     * @param state event state
     * @param type event type
     * @param path event path
     * @return may be empty set but must not be null
     */
    public Set<Watcher> materialize(Watcher.Event.KeeperState state,
        Watcher.Event.EventType type, String path);

其實現類ZKWatchManager在Zookeeper類中:

/**
 * Manage watchers & handle events generated by the ClientCnxn object.
 * 可以看到這個類的目的是為了處理有clientCnxn獲取的events和相關的watcher
 * We are implementing this as a nested class of ZooKeeper so that
 * the public methods will not be exposed as part of the ZooKeeper client
 * API.
 */
private static class ZKWatchManager implements ClientWatchManager {
    private final Map<String, Set<Watcher>> dataWatches =
        new HashMap<String, Set<Watcher>>();//通過getdata設定的watch
    private final Map<String, Set<Watcher>> existWatches =
        new HashMap<String, Set<Watcher>>();//通過exists設定的watch
    private final Map<String, Set<Watcher>> childWatches =
        new HashMap<String, Set<Watcher>>();//通過getchildren設定的watch

    private volatile Watcher defaultWatcher;//client在和zookeeper建立連線時傳遞的watcher

    final private void addTo(Set<Watcher> from, Set<Watcher> to) {
        if (from != null) {
            to.addAll(from);
        }
    }

    /* (non-Javadoc)
     * @see org.apache.zookeeper.ClientWatchManager#materialize(Event.KeeperState, 
     *                                                        Event.EventType, java.lang.String)
     */
    @Override
    public Set<Watcher> materialize(Watcher.Event.KeeperState state,
                                    Watcher.Event.EventType type,
                                    String clientPath)
    {
        //相關的watcher集合
        Set<Watcher> result = new HashSet<Watcher>();

        switch (type) {
        case None://事件型別是none
            result.add(defaultWatcher);//把defaultWatcher加入
            //是否清空,根據zookeeper.disableAutoWatchReset欄位進行配置的值、Zookeeper的狀態是否為同步連線來判斷
            boolean clear = ClientCnxn.getDisableAutoResetWatch() &&
                    state != Watcher.Event.KeeperState.SyncConnected;//

            //datawatches同步塊
            synchronized(dataWatches) {
                for(Set<Watcher> ws: dataWatches.values()) {
                    result.addAll(ws);//把所有的watcher加入...含義應該是建立連線後如果連線斷開或超時會清空所有的watcher
                }
                if (clear) {//如果要清空
                    dataWatches.clear();
                }
            }

            synchronized(existWatches) {//同上
                for(Set<Watcher> ws: existWatches.values()) {
                    result.addAll(ws);
                }
                if (clear) {
                    existWatches.clear();
                }
            }

            synchronized(childWatches) {//同上
                for(Set<Watcher> ws: childWatches.values()) {
                    result.addAll(ws);
                }
                if (clear) {
                    childWatches.clear();
                }
            }

            return result;
        case NodeDataChanged:
        case NodeCreated://nodedatachange和新建node
            synchronized (dataWatches) {
                addTo(dataWatches.remove(clientPath), result);//把所有clientPath位置的datawatch移除並加入result
            }
            synchronized (existWatches) {
                addTo(existWatches.remove(clientPath), result);//把所有clientPath位置的existwatch移除並加入result
            }
            break;
        case NodeChildrenChanged://node的孩子節點發生改變
            synchronized (childWatches) {
                addTo(childWatches.remove(clientPath), result);//把所有clientPath位置的childwatch移除並加入result
            }
            break;
            case NodeDeleted://node節點被刪除
            synchronized (dataWatches) {
                addTo(dataWatches.remove(clientPath), result);//把所有clientPath位置的datawatch移除並加入result
            }
            // XXX This shouldn't be needed, but just in case
            synchronized (existWatches) {//不應該發生,為什麼呢?
                Set<Watcher> list = existWatches.remove(clientPath);
                if (list != null) {
                    addTo(existWatches.remove(clientPath), result);
                    LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
                }
            }
            synchronized (childWatches) {
                addTo(childWatches.remove(clientPath), result);//把所有clientPath位置的childwatch移除並加入result
            }
            break;
        default://預設處理
            String msg = "Unhandled watch event type " + type
                + " with state " + state + " on path " + clientPath;
            LOG.error(msg);
            throw new RuntimeException(msg);
        }

        return result;
    }
}

可以看到,這裡的程式碼裡體現了watcher的一次性,每次觸發之後原來的watcher就會被刪除。

WatcherSetEventPair

這個類的目的很簡單,就是為了把Event和Watcher繫結起來,此類在clientCnxn中。

private static class WatcherSetEventPair {
    private final Set<Watcher> watchers;//watchers
    private final WatchedEvent event;//事件

    //構造器
    public WatcherSetEventPair(Set<Watcher> watchers, WatchedEvent event) {
        this.watchers = watchers;
        this.event = event;
    }
}

總結

這一篇主要先說一下Watcher相關的一些介面和實體類,但是儘管參考了許多資料,但還是有幾處不太理解:

  1. 為什麼nodedelete後出發的watcher中exist相關的不會被觸發呢?

  2. 為什麼需要WatcherSetEventPair 這個類(參考,講得很好,後面會涉及到這裡)

    因為watcher介面process函式需要event引數 那麼在ClientWatchManager完成了根據event找到對應的watchers之後 就可以直接呼叫watcher.process(event)了

    但是!!!由於ClientCnxn.EventThread是非同步處理的,通過生產消費完成 在processEvent的函式中,要取出一個數據結構Object,既包含watchers集合,又要包含event,所以就把兩者組合在一起出現了WatcherSetEventPair 這個類;

  3. Watcher的註冊和觸發

  4. materialize方法中none時加入所有的watcher應該是為了在連線狀態發生變化時刪除所有的watcher。

參考

https://www.jianshu.com/p/4d09cc083571

https://www.cnblogs.com/leesf456/p/6286827.html