ZooKeeper的Watcher機制以及原始碼分析
阿新 • • 發佈:2021-01-01
ZooKeeper 的 Watcher 機制,總的來說可以分為三個過程:客戶端註冊 Watcher、伺服器處理 Watcher 和客戶端回撥 Watcher客戶端。註冊 watcher 有 3 種方式,getData、exists、getChildren;
如何觸發事件? 凡是事務型別的操作,都會觸發監聽事件。create /delete /setData,來看以下程式碼簡單實現
final CountDownLatch countDownLatch = new CountDownLatch(1); zooKeeper = new ZooKeeper(zkAddress, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { if (watchedEvent.getState() == SyncConnected) { //如果收到響應事件,表明連線成功. countDownLatch.countDown(); } } }); boolean await = countDownLatch.await(connectTimeout, TimeUnit.SECONDS); if (await) { LOGGER.info("connected zookeeper success ~"); } else { throw new Exception("connected zookeeper failed ~"); }
Watcher實現由三個部分組成:
- Zookeeper服務端;
- Zookeeper客戶端;
- 客戶端的ZKWatchManager物件;
客戶端首先將Watcher註冊到服務端,同時將Watcher物件儲存到客戶端的Watch管理器中。當ZooKeeper服務端監聽的資料狀態發生變化時,服務端會主動通知客戶端,接著客戶端的Watch管理器會觸發相關Watcher來回調相應處理邏輯,從而完成整體的資料釋出/訂閱流程。
zookeeper物件建立原始碼如下:
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException { LOG.info("Initiating client connection, connectString=" + connectString + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher); //watcherManager接收管理watcher物件 watchManager.defaultWatcher = watcher; //設定連線引數 ConnectStringParser connectStringParser = new ConnectStringParser( connectString); HostProvider hostProvider = new StaticHostProvider( connectStringParser.getServerAddresses()); cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly); //啟動2個執行緒,一個是請求執行緒,一個是事件非同步通知執行緒 cnxn.start(); }
public void start() {
sendThread.start();
eventThread.start();
}
其實這兩個都是非同步執行緒,第一個是請求建立連線的執行緒,connectString是可以是多個host:port,用逗號分隔,Zookeeper服務端會亂序選擇其中的一個建立連線,建立失敗再隨機選擇剩下的建立連線,不等建立完成,就會返回.服務端建立完成連線後,watcher會監聽到狀態的變化,然後通過事件通知執行緒執行客戶端的watcher的process方法.
英文原文如下:
* To create a ZooKeeper client object, the application needs to pass a * connection string containing a comma separated list of host:port pairs, * each corresponding to a ZooKeeper server. * <p> * Session establishment is asynchronous. This constructor will initiate * connection to the server and return immediately- potentially (usually) * before the session is fully established. The watcher argument specifies * the watcher that will be notified of any changes in state. This * notification can come at any point before or after the constructor call * has returned. * <p> * The instantiated ZooKeeper client object will pick an arbitrary server * from the connectString and attempt to connect to it. If establishment of * the connection fails, another server in the connect string will be tried * (the order is non-deterministic, as we random shuffle the list), until a * connection is established. The client will continue attempts until the * session is explicitly closed.