1. 程式人生 > >ZK自我複習之Watcher核心機制

ZK自我複習之Watcher核心機制

1.ZK有個watch事件,是一次性觸發的,當watch監視的資料發生變化時,通知設定了該watch的client,即Watcher。同樣,Watcher是監聽資料發生了某些變化,那麼就一定會有對應的事件型別和狀態型別。

  • 事件型別:(znode節點相關的)
    EventType.NodeCreated
    EventType.NodeDataChanged
    EventType.NodeChildrenChanged
    EventType.NodeDeleted
  • 狀態型別:(和客戶端例項相關的)
    KeeperState.Disconnected
    KeeperState.SyncConnected
    KeeperState.AuthFailed
    KeeperState.Expired

這個東西說起來太麻煩,我用程式碼的方式來簡單說明下大致的機制。(首先,你得有ZK原生API的瞭解,否則程式碼估計不太能看懂。不會的可以先百度瞭解下,原生的API不太好用,一般都是用curator。)

程式碼案例:(jar包用的zookeeper-3.4.5.jar)

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.zookeeper.CreateMode;
import
org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; public
class ZkWatcher implements Watcher { AtomicInteger seq = new AtomicInteger(); static final int SESSION_TIMEOUT = 10000; // 10秒 static final String ZK_SERVER_ADDR = "ZK伺服器的IP:ZK埠"; // ZK伺服器,我就選擇一臺連線了。 /** zk測試watcher概念父路徑設定 */ private static final String PARENT_PATH = "/testWatch"; /** zk測試watcher概念子路徑設定 */ private static final String CHILDREN_PATH = "/testWatch/children"; /** 主執行緒進入標識 */ private static final String LOG_PREFIX_OF_MAIN = "【Main】"; /** zk變數 */ private ZooKeeper zk = null; /** 訊號量設定,用於等待zookeeper連線建立之後 通知阻塞程式繼續向下執行 */ private CountDownLatch connectedSemaphore = new CountDownLatch(1); /** * 建立ZK連線 * * @param zkServerAddr * ZK伺服器連線地址 * @param sessionTimeOut * ZK超時時間 */ public void createConnection(String zkServerAddr, int sessionTimeOut) { releaseConnection(); // 釋放ZK連線 try { // 這個this應該能看懂吧。 this指這個類,而這個類實現了Wathcher介面,重寫了 process方法 zk = new ZooKeeper(zkServerAddr, sessionTimeOut, this); System.out.println(LOG_PREFIX_OF_MAIN + "開始連線ZK伺服器"); connectedSemaphore.await(); // 阻塞等待process方法連線上伺服器 } catch (Exception e) { e.printStackTrace(); } } /** * 關閉ZK連線 */ public void releaseConnection() { if (zk != null) { try { zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 重寫Watcher監聽 */ @Override public void process(WatchedEvent event) { System.out.println("進入 process方法,event = " + event); if (event == null) { return; } // 獲取連線狀態 KeeperState keeperState = event.getState(); // 獲取事件型別 EventType eventType = event.getType(); // 監聽的受影響的path String path = event.getPath(); String logPrefix = "【Watcher-" + this.seq.incrementAndGet() + "】"; System.out.println(logPrefix + "收到Watcher通知"); System.out.println(logPrefix + "連線狀態:\t" + keeperState.toString()); System.out.println(logPrefix + "事件型別:\t" + eventType.toString()); if (KeeperState.SyncConnected == keeperState) { if (EventType.None == eventType) { // 成功連線上ZK伺服器 System.out.println(logPrefix + "成功連線上ZK伺服器"); connectedSemaphore.countDown(); // 放行,繼續執行主執行緒程式碼 } else if (EventType.NodeCreated == eventType) { // 建立節點事件 System.out.println(logPrefix + "節點建立。節點path:" + path); // 判斷節點是否存在 Stat stat = this.exists(path, true); System.out.println(logPrefix + "節點建立狀態:" + stat.toString()); } } } /** * 判斷指定節點是否存在 * * @param path * 節點路徑 */ public Stat exists(String path, boolean needWatch) { try { return this.zk.exists(path, needWatch); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 建立節點 * * @param path * 節點路徑 * @param data * 節點資料 * @return */ public boolean createPath(String path, String data) { try { exists(path, true); System.out.println(LOG_PREFIX_OF_MAIN + "節點建立成功, Path: " + this.zk.create( /** 路徑 */ path, /** 資料 */ data.getBytes(), /** 所有可見 */ Ids.OPEN_ACL_UNSAFE, /** 永久儲存 */ CreateMode.PERSISTENT) + ", content: " + data); } catch (Exception e) { e.printStackTrace(); return false; } return true; } /** * 刪除所有節點 */ public void deleteAllTestPath() { if (this.exists(CHILDREN_PATH, false) != null) { this.deleteNode(CHILDREN_PATH); } if (this.exists(PARENT_PATH, false) != null) { this.deleteNode(PARENT_PATH); } } /** * 刪除指定節點 * * @param path * 節點path */ public void deleteNode(String path) { try { this.zk.delete(path, -1); System.out.println(LOG_PREFIX_OF_MAIN + "刪除節點成功,path:" + path); } catch (Exception e) { e.printStackTrace(); } } /** * 獲取子節點 * * @param path * 節點路徑 */ private List<String> getChildren(String path, boolean needWatch) { try { return this.zk.getChildren(path, needWatch); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 更新指定節點資料內容 * * @param path * 節點路徑 * @param data * 資料內容 * @return */ public boolean writeData(String path, String data) { try { System.out.println(LOG_PREFIX_OF_MAIN + "更新資料成功,path:" + path + ", stat: " + this.zk.setData(path, data.getBytes(), -1)); } catch (Exception e) { e.printStackTrace(); return false } return true } /** * 讀取指定節點資料內容 * * @param path * 節點路徑 * @return */ public String readData(String path, boolean needWatch) { try { return new String(this.zk.getData(path, needWatch, null)); } catch (Exception e) { e.printStackTrace(); return ""; } } }

接下來通過寫一個main方法來單獨測試Watcher機制.
1)第一步,就是ZK建立連線。 這個沒什麼好說的,會執行process方法

public static void main(String[] args) {
        ZkWatcher zkWatcher=new ZkWatcher();
        zkWatcher.createConnection(ZK_SERVER_ADDR, SESSION_TIMEOUT);
        System.out.println(LOG_PREFIX_OF_MAIN+"繼續向下執行。");
    }

這裡寫圖片描述
從上面執行結果可以看出,連線成功的狀態是SyncConnected,而建立連線是沒任何的操作的,所以事件型別是None。表示連線了ZK

2)第二步,模擬建立節點。(需求,想建立節點的時候服務端接收到建立的資訊,然後通知客戶端做相關操作)
備註:要牢牢記住watch事件是一次性觸發的,也就是觸發過一次後這個監聽關係就結束了。想再監聽必須再發起
2.1)先看第一種情況,也就是直接建立一個節點的情況。

這裡寫圖片描述

public static void main(String[] args) {
        ZkWatcher zkWatcher=new ZkWatcher();
        zkWatcher.createConnection(ZK_SERVER_ADDR, SESSION_TIMEOUT);
        System.out.println(LOG_PREFIX_OF_MAIN+"繼續向下執行。");
        boolean createNode=zkWatcher.createPath(PARENT_PATH,System.currentTimeMillis() + "");
        if(createNode){
            System.out.println(LOG_PREFIX_OF_MAIN+"建立節點成功");
        }else{
            System.out.println(LOG_PREFIX_OF_MAIN+"建立節點失敗");
        }
    }

這裡寫圖片描述
從上面結果可以看到,client端並不知道建立節點的事件。

2.2)第二種情況,也就是給path這個節點加個watch,設定為true,表示採用watch監聽。(把上面createPath方法中註釋的那個程式碼開啟),執行結果如下:
這裡寫圖片描述
從上圖結果可以發現,process方法執行了二次,一次是連線ZK的時候。 而第二次從事件型別是NodeCreated可以看出,是觸發了節點建立,而觸發的節點就是當前監聽的path。

3)第三步,模擬修改節點(其實明白了第二步,其它基本都一樣)

public static void main(String[] args) {
        ZkWatcher zkWatcher=new ZkWatcher();
        zkWatcher.createConnection(ZK_SERVER_ADDR, SESSION_TIMEOUT);
        System.out.println(LOG_PREFIX_OF_MAIN+"繼續向下執行。");
        Stat stat=zkWatcher.exists(PARENT_PATH, false);
        if(stat==null){
           if(zkWatcher.createPath(PARENT_PATH,System.currentTimeMillis() + "")){
               System.out.println(LOG_PREFIX_OF_MAIN+"新建立節點成功");
           }
        }else{
            System.out.println(LOG_PREFIX_OF_MAIN+"節點已存在,不再建立");
        }
        if(zkWatcher.writeData(PARENT_PATH, System.currentTimeMillis() + "")){
            System.out.println(LOG_PREFIX_OF_MAIN+"節點資料修改成功");
        }else{
            System.out.println(LOG_PREFIX_OF_MAIN+"節點資料修改失敗");
        }

    }

這裡寫圖片描述
從上面結果可以看出,修改節點沒有被監聽。因為沒有對修改的節點進行監聽。 所以可以對節點進行監聽,比如我用讀的方式先加個監聽,再修改就可以了。

public static void main(String[] args) {
        ZkWatcher zkWatcher=new ZkWatcher();
        zkWatcher.createConnection(ZK_SERVER_ADDR, SESSION_TIMEOUT);
        System.out.println(LOG_PREFIX_OF_MAIN+"繼續向下執行。");
        Stat stat=zkWatcher.exists(PARENT_PATH, false);
        if(stat==null){
           if(zkWatcher.createPath(PARENT_PATH,System.currentTimeMillis() + "")){
               System.out.println(LOG_PREFIX_OF_MAIN+"新建立節點成功");
           }
        }else{
            System.out.println(LOG_PREFIX_OF_MAIN+"節點已存在,不再建立");
        }
        zkWatcher.readData(PARENT_PATH, true);  //或者zkWatcher.exists(PARENT_PATH, true);
        if(zkWatcher.writeData(PARENT_PATH, System.currentTimeMillis() + "")){
            System.out.println(LOG_PREFIX_OF_MAIN+"節點資料修改成功");
        }else{
            System.out.println(LOG_PREFIX_OF_MAIN+"節點資料修改失敗");
        }

    }

這裡寫圖片描述
從上圖結果看一看出,事件型別是NodeDataChanged,也就是節點資料發生變更觸發了process方法。

4.)刪除節點(從上面的例子,應該也知道怎麼用了,就不多說了)

public static void main(String[] args) {
        ZkWatcher zkWatcher=new ZkWatcher();
        zkWatcher.createConnection(ZK_SERVER_ADDR, SESSION_TIMEOUT);
        System.out.println(LOG_PREFIX_OF_MAIN+"繼續向下執行。");
        Stat stat=zkWatcher.exists(PARENT_PATH, false);
        if(stat==null){
           if(zkWatcher.createPath(PARENT_PATH,System.currentTimeMillis() + "")){
               System.out.println(LOG_PREFIX_OF_MAIN+"新建立節點成功");
           }
        }else{
            System.out.println(LOG_PREFIX_OF_MAIN+"節點已存在,不再建立");
        }
        /*zkWatcher.readData(PARENT_PATH, true);  //或者zkWatcher.exists(PARENT_PATH, true);
        if(zkWatcher.writeData(PARENT_PATH, System.currentTimeMillis() + "")){
            System.out.println(LOG_PREFIX_OF_MAIN+"節點資料修改成功");
        }else{
            System.out.println(LOG_PREFIX_OF_MAIN+"節點資料修改失敗");
        }*/
        zkWatcher.exists(PARENT_PATH, true);
        zkWatcher.deleteNode(PARENT_PATH);

    }

這裡寫圖片描述

當然還有建立子節點什麼的,上面程式碼也封裝了,原理都一樣。自行解讀,不做過多解釋了。

一些知識點的補充:
1)watcher的特性:一次性、客戶端序列執行、輕量。

  1. 一次性:對於ZK的watcher,你只需要記住一點:ZK有watch事件,是一次性觸發的。當watch監聽的資料發生變化時,通知設定了該watch的client,即watcher,由於ZK的監控都是一次性的所以每次必須設定監控。
  2. 客戶端序列執行:客戶端Watcher回撥過程時一個串行同步的過程,這為我們保證了順序,同時需要開發人員注意一點,千萬不要因為一個Watcher的處理邏輯影響了整個客戶端Watcher的回撥。
  3. 輕量:WatchedEvent是ZK整個Watcher通知機制的最小通知單元,整個結構只包含三個部分:通知狀態、事件型別和節點路徑。也就是說Watcher通知非常簡單,只會告訴客戶端發生了事件而不會告知其具體內容,需要客戶自己去進行獲取,比如NodeDataChanged事件。