1. 程式人生 > >zookeeper如何永久監聽

zookeeper如何永久監聽

一 回撥基礎知識  

znode 可以被監控,包括這個目錄節點中儲存的資料的修改,子節點目錄的變化等,一旦變化可以通知設定監控的客戶端,這個功能是zookeeper對於應用最重要的特性,通過這個特性可以實現的功能包括配置的集中管理,叢集管理,分散式鎖等等。

//建立一個Zookeeper例項,第一個引數為目標伺服器地址和埠,第二個引數為Session超時時間,第三個為節點變化時的回撥方法
ZooKeeper zk = new ZooKeeper("127.0.0.1:2181", 500000,new Watcher() {
          // 監控所有被觸發的事件
           public void process(WatchedEvent event) {
     //dosomething
          }

     });

將APP1的所有配置配置到/APP1 znode下,APP1所有機器一啟動就對/APP1這個節點進行監控(zk.exist("/APP1",true)),並且實現回撥方法Watcher,那麼在zookeeper上/APP1 znode節點下資料發生變化的時候,每個機器都會收到通知,Watcher方法將會被執行,那麼應用再取下資料即可(zk.getData("/APP1",false,null));

下面表格列出了寫操作與ZK內部產生的事件的對應關係:

event For /path

event For /path/child

create(/path)

EventType.NodeCreated

NA

delete(/path)

EventType.NodeDeleted

NA

setData(/path)

EventType.NodeDataChanged

NA

create(/path/child)

EventType.NodeChildrenChanged

EventType.NodeCreated

delete(/path/child)

EventType.NodeChildrenChanged

EventType.NodeDeleted

setData(/path/child)

NA

EventType.NodeDataChanged

ZK內部的寫事件與所觸發的watcher的對應關係如下:

event For /path

defaultWatcher

exists
(/path)

getData
(/path)

getChildren
(/path)

EventType.None

EventType.NodeCreated

EventType.NodeDeleted

(不正常)

EventType.NodeDataChanged

EventType.NodeChildrenChanged

綜合上面兩個表,我們可以總結出各種寫操作可以觸發哪些watcher,如下表所示:

/path

/path/child

exists

getData

getChildren

exists

getData

getChildren

create(/path)

delete(/path)

setData(/path)

create(/path/child)

delete(/path/child)

setData(/path/child)

如果發生session closeauthFailinvalid,那麼所有型別的wather都會被觸發

zkClient除了做了一些便捷包裝之外,對watcher使用做了一點增強。比如subscribeChildChanges實際上是通過existsgetChildren關注了兩個事件。這樣當create(/path)時,對應path上通過getChildren註冊的listener也會被呼叫。另外subscribeDataChanges實際上只是通過exists註冊了事件。因為從上表可以看到,對於一個更新,通過existsgetData註冊的watcher要麼都會觸發,要麼都不會觸發。

getData,getChildren(),exists()這三個方法可以針對引數中的path設定watcher,當path對應的Node 有相應變化時,server端會給對應的設定了watcherclient 傳送一個一次性的觸發通知事件。客戶端在收到這個觸發通知事件後,可以根據自己的業務邏輯進行相應地處理。

注意這個watcher的功能是一次性的,如果還想繼續得到watcher通知,在處理完事件後,要重新register

 二 測試watcher回撥機制

// Watcher例項

Watcher wh = new Watcher() {

public void process(WatchedEvent event) {

    System.out.println("回撥watcher例項: 路徑" + event.getPath() + " 型別:"

    + event.getType());

    }

};

 

ZooKeeper zk = new ZooKeeper("10.15.82.166:3351", 500000, wh);

System.out.println("---------------------");

// 建立一個節點root,資料是mydata,不進行ACL許可權控制,節點為永久性的(即客戶端shutdown了也不會消失)

zk.exists("/root", true);

zk.create("/root", "mydata".getBytes(), Ids.OPEN_ACL_UNSAFE,

CreateMode.PERSISTENT);

System.out.println("---------------------");

// 在root下面建立一個childone znode,資料為childone,不進行ACL許可權控制,節點為永久性的

zk.exists("/root/childone", true);

zk.create("/root/childone", "childone".getBytes(), Ids.OPEN_ACL_UNSAFE,

CreateMode.PERSISTENT);

System.out.println("---------------------");

// 刪除/root/childone這個節點,第二個引數為版本,-1的話直接刪除,無視版本

zk.exists("/root/childone", true);

zk.delete("/root/childone", -1);

System.out.println("---------------------");

zk.exists("/root", true);

zk.delete("/root", -1);

System.out.println("---------------------");

// 關閉session

zk.close();

---------------------

回撥watcher例項: 路徑null 型別:None

回撥watcher例項: 路徑/root 型別:NodeCreated

---------------------

回撥watcher例項: 路徑/root/childone 型別:NodeCreated

---------------------

回撥watcher例項: 路徑/root/childone 型別:NodeDeleted

---------------------

回撥watcher例項: 路徑/root 型別:NodeDeleted

---------------------


三 永久回撥 

3類事件觸發wather後就不再作用,也就是所謂的(一次作用),但是,如何永久監聽呢?這需要我們再程式邏輯上進行控制,網上有更好的辦法,但是,在簡單的應用中,可以再wather方法裡面再設定監聽,這個方法很笨,但是,很有效,達到了預期的效果。

Watcher wh = new Watcher() {
        public void process(WatchedEvent event) {
            Log.AC_DEBUG("觸發回撥watcher例項: 路徑" + event.getPath() + " 型別:"
                    + event.getType());

            if (event.getType() == EventType.None) {
                try { //
                        // 判斷userauth許可權是否能訪問userpath
                    String auth_type = "digest";
                    zk.addAuthInfo(auth_type, userauth.getBytes());
                    zk.getData(userpath, null, null);
                } catch (Exception e) {
//                    e.printStackTrace();
                    Log.AC_ERROR("get node faild:userpath=" + userpath
                            + ",auth=" + userauth + " e:" + e.getMessage());
                    return;
                }
                Log.AC_INFO("userpath=" + userpath + " userauth=" + userauth);
            }

            try {

                switchinfo = getallswitch(); // 更新userpath和匿名使用者路徑下的配置資訊,監聽這些節點

                // 監聽使用者路徑節點
                Log.AC_DEBUG("lesson user=" + userpath + " node...");
                zk.exists(userpath, true); // 監聽匿名使用者路徑節點
                Log.AC_DEBUG("lesson user=" + AnonymousUSERpath + " node...");
                zk.exists(AnonymousUSERpath, true);

                // 監聽使用者路徑下的開關節點
                if (zk.exists(userpath, false) != null) {
                    Log.AC_DEBUG("lesson user=" + userpath
                            + " 's swich node...");
                    List<String> swnodes = zk.getChildren(userpath, true); //
                    // 監聽switch層節點的變化
                    Iterator<String> it_sw = swnodes.iterator();
                    while (it_sw.hasNext()) {
                        String swpath = userpath + "/" + it_sw.next();
                        Log.AC_DEBUG("lesson user=" + swpath + " node...");
                        zk.exists(swpath, true);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                Log.AC_ERROR("lesson znode error:" + e.getMessage());
            }
        }
    };