1. 程式人生 > 其它 >ZooKeeper的Watcher機制以及原始碼分析

ZooKeeper的Watcher機制以及原始碼分析

技術標籤:Javazookeeper

ZooKeeper 的 Watcher 機制,總的來說可以分為三個過程:客戶端註冊 Watcher、伺服器處理 Watcher 和客戶端回撥 Watcher客戶端。註冊 watcher 有 3 種方式,getData、exists、getChildren;

如何觸發事件? 凡是事務型別的操作,都會觸發監聽事件。create /delete /setData,來看以下程式碼簡單實現

        final CountDownLatch countDownLatch = new CountDownLatch(1);
		zooKeeper = new ZooKeeper(zkAddress, sessionTimeout, new Watcher() {
			@Override
			public void process(WatchedEvent watchedEvent) {
				if (watchedEvent.getState() == SyncConnected) {
                    //如果收到響應事件,表明連線成功.
					countDownLatch.countDown();
				}
			}
		});
		boolean await = countDownLatch.await(connectTimeout, TimeUnit.SECONDS);
		if (await) {
			LOGGER.info("connected zookeeper success ~");
		} else {
			throw new Exception("connected zookeeper failed ~");
		}

Watcher實現由三個部分組成:

  • Zookeeper服務端;
  • Zookeeper客戶端;
  • 客戶端的ZKWatchManager物件;

  客戶端首先將Watcher註冊到服務端,同時將Watcher物件儲存到客戶端的Watch管理器中。當ZooKeeper服務端監聽的資料狀態發生變化時,服務端會主動通知客戶端,接著客戶端的Watch管理器會觸發相關Watcher來回調相應處理邏輯,從而完成整體的資料釋出/訂閱流程。

zookeeper物件建立原始碼如下:

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            boolean canBeReadOnly)
        throws IOException
    {
        LOG.info("Initiating client connection, connectString=" + connectString
                + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
        //watcherManager接收管理watcher物件
        watchManager.defaultWatcher = watcher;
        //設定連線引數
        ConnectStringParser connectStringParser = new ConnectStringParser(
                connectString);
        HostProvider hostProvider = new StaticHostProvider(
                connectStringParser.getServerAddresses());
        cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                hostProvider, sessionTimeout, this, watchManager,
                getClientCnxnSocket(), canBeReadOnly);
        //啟動2個執行緒,一個是請求執行緒,一個是事件非同步通知執行緒
        cnxn.start();
    }
    public void start() {
        sendThread.start();
        eventThread.start();
    }

其實這兩個都是非同步執行緒,第一個是請求建立連線的執行緒,connectString是可以是多個host:port,用逗號分隔,Zookeeper服務端會亂序選擇其中的一個建立連線,建立失敗再隨機選擇剩下的建立連線,不等建立完成,就會返回.服務端建立完成連線後,watcher會監聽到狀態的變化,然後通過事件通知執行緒執行客戶端的watcher的process方法.

英文原文如下:

* To create a ZooKeeper client object, the application needs to pass a
* connection string containing a comma separated list of host:port pairs,
* each corresponding to a ZooKeeper server.
* <p>
* Session establishment is asynchronous. This constructor will initiate
* connection to the server and return immediately
- potentially (usually) * before the session is fully established. The watcher argument specifies * the watcher that will be notified of any changes in state. This * notification can come at any point before or after the constructor call * has returned. * <p> * The instantiated ZooKeeper client object will pick an arbitrary server * from the connectString and attempt to connect to it. If establishment of * the connection fails, another server in the connect string will be tried * (the order is non-deterministic, as we random shuffle the list), until a * connection is established. The client will continue attempts until the * session is explicitly closed.