zookeeper的永久監聽
回撥基礎知識
znode 可以被監控,包括這個目錄節點中儲存的資料的修改,子節點目錄的變化等,一旦變化可以通知設定監控的客戶端,這個功能是zookeeper對於應用最重要的特性,通過這個特性可以實現的功能包括配置的集中管理,叢集管理,分散式鎖等等
// 監控所有被觸發的事件
public void process(WatchedEvent event) {
//dosomething
}
});
將APP1的所有配置配置到/APP1 znode下,APP1所有機器一啟動就對/APP1這個節點進行監控(zk.exist(“/APP1”,true)),並且實現回撥方法Watcher,那麼在zookeeper上/APP1 znode節點下資料發生變化的時候,每個機器都會收到通知,Watcher方法將會被執行,那麼應用再取下資料即可(zk.getData(“/APP1”,false,null));
下面表格列出了寫操作與ZK內部產生的事件的對應關係:
路徑 | event For “/path” | event For “/path/child” |
---|---|---|
create(“/path”) | EventType.NodeCreated | NA |
delete(“/path”) | EventType.NodeDeleted | NA |
setData(“/path”) | EventType.NodeDataChanged | NA |
create(“/path/child”) | EventType.NodeChildrenChanged | EventType.NodeCreated |
delete(“/path/child”) | EventType.NodeChildrenChanged | EventType.NodeDeleted |
setData(“/path/child”) | NA | EventType.NodeDataChanged |
而ZK內部的寫事件與所觸發的watcher的對應關係如下:
綜合上面兩個表,我們可以總結出各種寫操作可以觸發哪些watcher,如下表所示:
如果發生session close、authFail和invalid,那麼所有型別的wather都會被觸發
zkClient除了做了一些便捷包裝之外,對watcher使用做了一點增強。比如subscribeChildChanges實際上是通過exists和getChildren關注了兩個事件。這樣當create(“/path”)時,對應path上通過getChildren註冊的listener也會被呼叫。另外subscribeDataChanges實際上只是通過exists註冊了事件。因為從上表可以看到,對於一個更新,通過exists和getData註冊的watcher要麼都會觸發,要麼都不會觸發。
getData,getChildren(),exists()這三個方法可以針對引數中的path設定watcher,當path對應的Node 有相應變化時,server端會給對應的設定了watcher的client 傳送一個一次性的觸發通知事件。客戶端在收到這個觸發通知事件後,可以根據自己的業務邏輯進行相應地處理。
注意這個watcher的功能是一次性的,如果還想繼續得到watcher通知,在處理完事件後,要重新register。
watcher回撥機制
// Watcher例項
Watcher wh = new Watcher() {
public void process(WatchedEvent event) {
System.out.println("回撥watcher例項: 路徑" + event.getPath() + " 型別:"
+ event.getType());
}
};
ZooKeeper zk = new ZooKeeper("10.15.82.166:3351", 500000, wh);
System.out.println("---------------------");
// 建立一個節點root,資料是mydata,不進行ACL許可權控制,節點為永久性的(即客戶端shutdown了也不會消失)
zk.exists("/root", true);
zk.create("/root", "mydata".getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
System.out.println("---------------------");
// 在root下面建立一個childone znode,資料為childone,不進行ACL許可權控制,節點為永久性的
zk.exists("/root/childone", true);
zk.create("/root/childone", "childone".getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
System.out.println("---------------------");
// 刪除/root/childone這個節點,第二個引數為版本,-1的話直接刪除,無視版本
zk.exists("/root/childone", true);
zk.delete("/root/childone", -1);
System.out.println("---------------------");
zk.exists("/root", true);
zk.delete("/root", -1);
System.out.println("---------------------");
// 關閉session
zk.close();
上面程式碼執行結果演示:
---------------------
回撥watcher例項: 路徑null 型別:None
回撥watcher例項: 路徑/root 型別:NodeCreated
---------------------
回撥watcher例項: 路徑/root/childone 型別:NodeCreated
---------------------
回撥watcher例項: 路徑/root/childone 型別:NodeDeleted
---------------------
回撥watcher例項: 路徑/root 型別:NodeDeleted
---------------------
永久回撥
3類事件觸發wather後就不再作用,也就是所謂的(一次作用),但是,如何永久監聽呢?這需要我們再程式邏輯上進行控制,網上有更好的辦法,但是,在簡單的應用中,可以再wather方法裡面再設定監聽,這個方法很笨,但是,很有效,達到了預期的效果
Watcher wh = new Watcher() {
public void process(WatchedEvent event) {
Log.AC_DEBUG("觸發回撥watcher例項: 路徑" + event.getPath() + " 型別:"
+ event.getType());
if (event.getType() == EventType.None) {
try { //
// 判斷userauth許可權是否能訪問userpath
String auth_type = "digest";
zk.addAuthInfo(auth_type, userauth.getBytes());
zk.getData(userpath, null, null);
} catch (Exception e) {
// e.printStackTrace();
Log.AC_ERROR("get node faild:userpath=" + userpath
+ ",auth=" + userauth + " e:" + e.getMessage());
return;
}
Log.AC_INFO("userpath=" + userpath + " userauth=" + userauth);
}
try {
switchinfo = getallswitch(); // 更新userpath和匿名使用者路徑下的配置資訊,監聽這些節點
// 監聽使用者路徑節點
Log.AC_DEBUG("lesson user=" + userpath + " node...");
zk.exists(userpath, true); // 監聽匿名使用者路徑節點
Log.AC_DEBUG("lesson user=" + AnonymousUSERpath + " node...");
zk.exists(AnonymousUSERpath, true);
// 監聽使用者路徑下的開關節點
if (zk.exists(userpath, false) != null) {
Log.AC_DEBUG("lesson user=" + userpath
+ " 's swich node...");
List<String> swnodes = zk.getChildren(userpath, true); //
// 監聽switch層節點的變化
Iterator<String> it_sw = swnodes.iterator();
while (it_sw.hasNext()) {
String swpath = userpath + "/" + it_sw.next();
Log.AC_DEBUG("lesson user=" + swpath + " node...");
zk.exists(swpath, true);
}
}
} catch (Exception e) {
e.printStackTrace();
Log.AC_ERROR("lesson znode error:" + e.getMessage());
}
}
};