1. 程式人生 > >zookeeper的永久監聽

zookeeper的永久監聽

回撥基礎知識

znode 可以被監控,包括這個目錄節點中儲存的資料的修改,子節點目錄的變化等,一旦變化可以通知設定監控的客戶端,這個功能是zookeeper對於應用最重要的特性,通過這個特性可以實現的功能包括配置的集中管理,叢集管理,分散式鎖等等

 // 監控所有被觸發的事件
 public void process(WatchedEvent event) {
     //dosomething
          }

 });

將APP1的所有配置配置到/APP1 znode下,APP1所有機器一啟動就對/APP1這個節點進行監控(zk.exist(“/APP1”,true)),並且實現回撥方法Watcher,那麼在zookeeper上/APP1 znode節點下資料發生變化的時候,每個機器都會收到通知,Watcher方法將會被執行,那麼應用再取下資料即可(zk.getData(“/APP1”,false,null));

下面表格列出了寫操作與ZK內部產生的事件的對應關係:

路徑 event For “/path” event For “/path/child”
create(“/path”) EventType.NodeCreated NA
delete(“/path”) EventType.NodeDeleted NA
setData(“/path”) EventType.NodeDataChanged NA
create(“/path/child”) EventType.NodeChildrenChanged EventType.NodeCreated
delete(“/path/child”) EventType.NodeChildrenChanged EventType.NodeDeleted
setData(“/path/child”) NA EventType.NodeDataChanged

而ZK內部的寫事件與所觸發的watcher的對應關係如下:
這裡寫圖片描述

綜合上面兩個表,我們可以總結出各種寫操作可以觸發哪些watcher,如下表所示:
這裡寫圖片描述

如果發生session close、authFail和invalid,那麼所有型別的wather都會被觸發

zkClient除了做了一些便捷包裝之外,對watcher使用做了一點增強。比如subscribeChildChanges實際上是通過exists和getChildren關注了兩個事件。這樣當create(“/path”)時,對應path上通過getChildren註冊的listener也會被呼叫。另外subscribeDataChanges實際上只是通過exists註冊了事件。因為從上表可以看到,對於一個更新,通過exists和getData註冊的watcher要麼都會觸發,要麼都不會觸發。

getData,getChildren(),exists()這三個方法可以針對引數中的path設定watcher,當path對應的Node 有相應變化時,server端會給對應的設定了watcher的client 傳送一個一次性的觸發通知事件。客戶端在收到這個觸發通知事件後,可以根據自己的業務邏輯進行相應地處理。

注意這個watcher的功能是一次性的,如果還想繼續得到watcher通知,在處理完事件後,要重新register。

watcher回撥機制

// Watcher例項

Watcher wh = new Watcher() {

public void process(WatchedEvent event) {

System.out.println("回撥watcher例項: 路徑" + event.getPath() + " 型別:"

+ event.getType());

}

};



ZooKeeper zk = new ZooKeeper("10.15.82.166:3351", 500000, wh);

System.out.println("---------------------");

// 建立一個節點root,資料是mydata,不進行ACL許可權控制,節點為永久性的(即客戶端shutdown了也不會消失)

zk.exists("/root", true);

zk.create("/root", "mydata".getBytes(), Ids.OPEN_ACL_UNSAFE,

CreateMode.PERSISTENT);

System.out.println("---------------------");

// 在root下面建立一個childone znode,資料為childone,不進行ACL許可權控制,節點為永久性的

zk.exists("/root/childone", true);

zk.create("/root/childone", "childone".getBytes(), Ids.OPEN_ACL_UNSAFE,

CreateMode.PERSISTENT);

System.out.println("---------------------");

// 刪除/root/childone這個節點,第二個引數為版本,-1的話直接刪除,無視版本

zk.exists("/root/childone", true);

zk.delete("/root/childone", -1);

System.out.println("---------------------");

zk.exists("/root", true);

zk.delete("/root", -1);

System.out.println("---------------------");

// 關閉session

zk.close();

上面程式碼執行結果演示:

---------------------

回撥watcher例項: 路徑null 型別:None

回撥watcher例項: 路徑/root 型別:NodeCreated

---------------------

回撥watcher例項: 路徑/root/childone 型別:NodeCreated

---------------------

回撥watcher例項: 路徑/root/childone 型別:NodeDeleted

---------------------

回撥watcher例項: 路徑/root 型別:NodeDeleted

---------------------

永久回撥

3類事件觸發wather後就不再作用,也就是所謂的(一次作用),但是,如何永久監聽呢?這需要我們再程式邏輯上進行控制,網上有更好的辦法,但是,在簡單的應用中,可以再wather方法裡面再設定監聽,這個方法很笨,但是,很有效,達到了預期的效果

Watcher wh = new Watcher() {
        public void process(WatchedEvent event) {
            Log.AC_DEBUG("觸發回撥watcher例項: 路徑" + event.getPath() + " 型別:"
                    + event.getType());

            if (event.getType() == EventType.None) {
                try { //
                        // 判斷userauth許可權是否能訪問userpath
                    String auth_type = "digest";
                    zk.addAuthInfo(auth_type, userauth.getBytes());
                    zk.getData(userpath, null, null);
                } catch (Exception e) {
//                    e.printStackTrace();
                    Log.AC_ERROR("get node faild:userpath=" + userpath
                            + ",auth=" + userauth + " e:" + e.getMessage());
                    return;
                }
                Log.AC_INFO("userpath=" + userpath + " userauth=" + userauth);
            }

            try {

                switchinfo = getallswitch(); // 更新userpath和匿名使用者路徑下的配置資訊,監聽這些節點

                // 監聽使用者路徑節點
                Log.AC_DEBUG("lesson user=" + userpath + " node...");
                zk.exists(userpath, true); // 監聽匿名使用者路徑節點
                Log.AC_DEBUG("lesson user=" + AnonymousUSERpath + " node...");
                zk.exists(AnonymousUSERpath, true);

                // 監聽使用者路徑下的開關節點
                if (zk.exists(userpath, false) != null) {
                    Log.AC_DEBUG("lesson user=" + userpath
                            + " 's swich node...");
                    List<String> swnodes = zk.getChildren(userpath, true); //
                    // 監聽switch層節點的變化
                    Iterator<String> it_sw = swnodes.iterator();
                    while (it_sw.hasNext()) {
                        String swpath = userpath + "/" + it_sw.next();
                        Log.AC_DEBUG("lesson user=" + swpath + " node...");
                        zk.exists(swpath, true);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                Log.AC_ERROR("lesson znode error:" + e.getMessage());
            }
        }
    };