基於Curator操作ZooKeeper(二)-Watcher操作-補充TreeCache
轉自:https://blog.csdn.net/Leafage_M/article/details/78735485#treecache
Java原生API操作ZooKeeper可參看:
相關內容:
基於Curator操作ZooKeeper(二)-Watcher操作
TreeCache
TreeCache有點像上面兩種Cache的結合體,NodeCache能夠監聽自身節點的資料變化(或者是建立該節點),PathChildrenCache能夠監聽自身節點下的子節點的變化,而TreeCache既能夠監聽自身節點的變化、也能夠監聽子節點的變化。
TreeCache的話只有一種構造方法了:
TreeCache(CuratorFramework client, String path)//Create a TreeCache for the given client and path with default options.
與上面的PathChildrenCache不同的是,如果指定的節點路徑不存在的話,不會自動建立。但是也能夠監聽到一個INITIALIZED型別的事件。
TreeCache可以新增一個節點變化的監聽器,同樣的也可以新增一個異常的監聽器。
假如ZooKeeper伺服器中還是有如下的節點內容:
使用如下程式碼進行測試,就是當節點的內容發生變化時,輸出節點的變化型別和變化的節點路徑。
public class TreeCacheTest { private static final String zkAddress = "centos3"; private static final int sessionTimeout = 2000; private static String parentPath = "/Curator-Recipes1";//父節點 private static CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(zkAddress) .sessionTimeoutMs(sessionTimeout) .retryPolicy(new ExponentialBackoffRetry(1000,3)) .build(); public static void main(String[] args) throws InterruptedException { client.start(); final TreeCache treeCache = new TreeCache(client, parentPath); try { treeCache.start(); } catch (Exception e) { e.printStackTrace(); } //新增錯誤監聽器 treeCache.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() { public void unhandledError(String s, Throwable throwable) { System.out.println(".錯誤原因:" + throwable.getMessage() + "\n==============\n"); } }); //節點變化的監聽器 treeCache.getListenable().addListener(new TreeCacheListener() { public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception { System.out.println("treeCache ------ Type:" + treeCacheEvent.getType() + ","); System.out.println(treeCacheEvent.getData().getPath()); } }); Thread.sleep(Integer.MAX_VALUE); } }
執行時輸出的內容如下:
可以看到快取父節點和子節點的時候觸發了NODE_ADDED事件,同時也會觸發一個INITIALIZED事件,並且會在異常監聽器中發生回撥事件,異常的原因就是我們回撥函式因為INITIALIZED事件被觸發了,但是此時執行第條輸出語句的時候發現treeCacheEvent.getData()是null,而我們在null上呼叫了getPath方法,所以會觸發異常監聽。
注意:
- 無論如何都會收到一個INITIALIZED事件的。
- 無論是TreeCache、PathChildrenCache,所謂的監聽都是本地檢視和ZooKeeper伺服器進行對比。所以如果ZooKeeper節點不為空的話,才會在快取開始的時候監聽到NODE_ADDED事件,這是因為剛開始本地快取並沒有內容,然後本地快取和伺服器快取進行對比,發現ZooKeeper伺服器有節點而本地快取沒有,這才將伺服器的節點快取到本地,所以才會觸發NODE_ADDED事件。
示例
來一個綜合性的示例,該示例中先使用init進行初始化操作,會建立一個父節點“/Curator-Recipes”以及父節點下的一個子節點“c1”。然後分別使用TreeCache、PathChildrenCache、NodeCache三種快取方式進行快取。最後再建立c2、c3兩個節點並進行修改和刪除觀察監聽器的輸出內容。
package com.leafage.zk;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
/**
* @Author Leafage
* @Date 2017/12/6 14:03
**/
public class CuratorRecipes {
private static int num = 1;
private static final String zkAddress = "centos3";
private static final int sessionTimeout = 2000;
private static String parentPath = "/Curator-Recipes";//父節點
private static CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(zkAddress)
.sessionTimeoutMs(sessionTimeout)
.retryPolicy(new ExponentialBackoffRetry(1000,3))
.build();
public static void main(String[] args) throws InterruptedException {
init();
treeCache();
pathChildrenCache();
nodeCache();
testData();
Thread.sleep(Integer.MAX_VALUE);
}
/**
* 初始化操作,建立父節點
*/
public static void init() {
client.start();
try {
//可以用用來確保父節點存在,2.9之後棄用
// EnsurePath ensurePath = new EnsurePath(parentPath);
// ensurePath.ensure(client.getZookeeperClient());
if (client.checkExists().forPath(parentPath) == null) {
client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(parentPath, "This is Parent Data!".getBytes());
}
client.create().withMode(CreateMode.EPHEMERAL).forPath(parentPath + "/c1","This is C1.".getBytes());//建立第一個節點
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 監聽子節點變化
*/
public static void pathChildrenCache() {
final PathChildrenCache pathChildrenCache = new PathChildrenCache(client, parentPath, true);
try {
pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);//啟動模式
} catch (Exception e) {
e.printStackTrace();
}
//新增監聽
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
System.out.println( num++ + ".pathChildrenCache------發生的節點變化型別為:" + pathChildrenCacheEvent.getType() + ",發生變化的節點內容為:" + new String(pathChildrenCacheEvent.getData().getData()) + "\n======================\n");
}
});
}
/**
* 監聽節點資料變化
*/
public static void nodeCache() {
final NodeCache nodeCache = new NodeCache(client, parentPath, false);
try {
nodeCache.start(true);//true代表快取當前節點
} catch (Exception e) {
e.printStackTrace();
}
if (nodeCache.getCurrentData() != null) {//只有start中的設定為true才能夠直接得到
System.out.println( num++ + ".nodeCache-------CurrentNode Data is:" + new String(nodeCache.getCurrentData().getData()) + "\n===========================\n");//輸出當前節點的內容
}
//新增節點資料監聽
nodeCache.getListenable().addListener(new NodeCacheListener() {
public void nodeChanged() throws Exception {
System.out.println( num++ + ".nodeCache------節點資料發生了改變,發生的路徑為:" + nodeCache.getCurrentData().getPath() + ",節點資料發生了改變 ,新的資料為:" + new String(nodeCache.getCurrentData().getData()) + "\n===========================\n");
}
});
}
/**
* 同時監聽資料變化和子節點變化
*/
public static void treeCache() {
final TreeCache treeCache = new TreeCache(client, parentPath);
try {
treeCache.start();
} catch (Exception e) {
e.printStackTrace();
}
//新增錯誤
treeCache.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() {
public void unhandledError(String s, Throwable throwable) {
System.out.println(num++ + ".錯誤原因:" + throwable.getMessage() + "\n==============\n");
}
});
treeCache.getListenable().addListener(new TreeCacheListener() {
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
System.out.println( num++ + ".treeCache------當前發生的變化型別為:" + treeCacheEvent.getType() + ",發生變化的節點內容為:" + new String(treeCacheEvent.getData().getData()) + "\n=====================\n");
}
});
}
/**
* 建立節點、修改資料、刪除節點等操作,用來給其他的監聽器測試使用
*/
public static void testData() {
try {
client.create().withMode(CreateMode.EPHEMERAL).forPath(parentPath + "/c2","This is C2.".getBytes());//建立第一個節點
client.create().withMode(CreateMode.EPHEMERAL).forPath(parentPath + "/c3","This is C3.".getBytes());//建立第一個節點
client.setData().forPath(parentPath + "/c2", "This is New C2.".getBytes());//修改節點資料
client.delete().forPath(parentPath + "/c3");//刪除一個節點
client.delete().deletingChildrenIfNeeded().forPath(parentPath);//將父節點下所有內容刪除
} catch (Exception e) {
e.printStackTrace();
}
}
}
輸出內容(每次執行結果輸出順序可能不一致):
1.treeCache------當前發生的變化型別為:NODE_ADDED,發生變化的節點內容為:This is Parent Data!
=====================
2.treeCache------當前發生的變化型別為:NODE_ADDED,發生變化的節點內容為:This is C1.
=====================
4.錯誤原因:null
==============
5.nodeCache-------CurrentNode Data is:This is Parent Data!
===========================
6.pathChildrenCache------發生的節點變化型別為:CHILD_ADDED,發生變化的節點內容為:This is C1.
======================
8.nodeCache------節點資料發生了改變,發生的路徑為:/Curator-Recipes,節點資料發生了改變 ,新的資料為:This is Parent Data!
===========================
9.treeCache------當前發生的變化型別為:NODE_ADDED,發生變化的節點內容為:This is C2.
=====================
10.treeCache------當前發生的變化型別為:NODE_ADDED,發生變化的節點內容為:This is C3.
=====================
11.pathChildrenCache------發生的節點變化型別為:CHILD_ADDED,發生變化的節點內容為:This is C3.
======================
12.pathChildrenCache------發生的節點變化型別為:CHILD_ADDED,發生變化的節點內容為:This is C2.
======================
13.treeCache------當前發生的變化型別為:NODE_UPDATED,發生變化的節點內容為:This is New C2.
=====================
14.pathChildrenCache------發生的節點變化型別為:CHILD_UPDATED,發生變化的節點內容為:This is New C2.
======================
15.pathChildrenCache------發生的節點變化型別為:CHILD_REMOVED,發生變化的節點內容為:This is C3.
======================
16.treeCache------當前發生的變化型別為:NODE_REMOVED,發生變化的節點內容為:This is C3.
=====================
17.treeCache------當前發生的變化型別為:NODE_REMOVED,發生變化的節點內容為:This is C1.
=====================
18.pathChildrenCache------發生的節點變化型別為:CHILD_REMOVED,發生變化的節點內容為:This is C1.
======================
19.pathChildrenCache------發生的節點變化型別為:CHILD_REMOVED,發生變化的節點內容為:This is New C2.
======================
20.treeCache------當前發生的變化型別為:NODE_REMOVED,發生變化的節點內容為:This is New C2.
=====================
21.treeCache------當前發生的變化型別為:NODE_REMOVED,發生變化的節點內容為:This is Parent Data!
=====================
從輸出內容可以看到:
1、2 :代表是TreeCache監聽到了父節點和c1節點的建立快取事件。
3、4 :同時會發現並沒有3這條語句,而是直接跳到了4,這是因為接收到的事件為:INITIALIZED,所以使用getData會得到null,而我們試圖在null上呼叫getPath,所以才會觸發異常。
5:NodeCache快取進行start的時候傳入true引數,所以能夠直接得到當前節點的內容。
6: PathChildrenCache快取成功c1的時候接收到的事件。
7:會發現沒有7,因為PathChildrenCache的啟動模式是:INITIALIZED,此時也是試圖在null上呼叫GetPath,但是PathChildrenCache沒有提供異常監聽器,所以沒辦法獲取。
8:第八點最讓人疑惑了,因為上面的程式碼中並沒有對父節點的資料進行改變,但是卻監聽到了這個事件,做了很多的測試發現,觸發這個事件的原因為後面的testData方法中呼叫create導致的,並且只會監聽到一次,這一點的具體原因還不太清楚。
9、10、11、12:建立c2、c3節點是TreeCache和PathChildrenCache監聽到的事件。
13、14:修改c2節點資料,TreeCache和PathChildrenCache監聽到的事件。
15、16、17、18、19、20:刪除c2、c1、c3節點時,TreeCache和PathChildrenCache監聽到的事件。
21:刪除根節點時接收到的監聽事件,此時只有TreeCache能夠監聽到。