基於Curator操作ZooKeeper(二)-Watcher操作
Java原生API操作ZooKeeper可參看:
相關內容:
基於Curator操作ZooKeeper(二)-Watcher操作-補充TreeCache
usingWatcher()方法
在ZooKeeper Watcher監聽機制(資料變更的通知)(一)(應用)中介紹過,繫結事件只有三個操作:getData、exists、getChildren。
這個方法有兩個過載的方法,實現這兩個介面其實都差不多:
使用usingWatcher()方法監聽只會觸發一次,監聽完畢後就會銷燬。
import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.CuratorWatcher; import org.apache.curator.retry.ExponentialBackoffRetry; public class CuratorOperator { public CuratorFramework client = null; public static final String zkServerPath = "192.168.220.135:2181,192.168.220.136:2181,192.168.220.137:2181"; /** * 例項化zk客戶端 */ public CuratorOperator() { /** * 同步建立zk示例,原生api是非同步的 * * curator連結zookeeper的重試策略: * * 1>ExponentialBackoffRetry【推薦】 * baseSleepTimeMs:初始sleep時間(ms) * maxRetries:最大重試次數,超過時間就不連結了 * maxSleepMs:最大重試時間(ms) * * 給定一個初始sleep時間base5leep丁imeMs,在這個基礎上結合重試次數,通過以下公式計算出當前需要sleep的時間: 當前sleep時間=baseSleepTimeMs*Math.max(1, random.nextInt(1<<(retryCount+1))) 可以看出,隨著重試次數的增加,計算出的sleep時間會越來越大。如果該sleep時間在maxSleepMs的範圍之內,那麼就使用該sleep時間,否則使用maxSleepMs。另外, maxRetries引數控制了最大重試次數,以避免無限制的重試。 */ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5); /** * curator連結zookeeper的策略: * 2>RetryNTimes【推薦】 * n:重試的次數 * sleepMsBetweenRetries:每次重試間隔的時間(ms) */ // RetryPolicy retryPolicy = new RetryNTimes(3, 5000); /** * curator連結zookeeper的策略: * 3>RetryOneTime * sleepMsBetweenRetry:只重試一次,重試間隔的時間 */ // RetryPolicy retryPolicy2 = new RetryOneTime(3000); /** * 4> * 永遠重試,不推薦使用 */ // RetryPolicy retryPolicy3 = new RetryForever(retryIntervalMs) /** * curator連結zookeeper的策略: * 5>RetryUntilElapsed * maxElapsedTimeMs:最大重試時間 * sleepMsBetweenRetries:每次重試間隔 * 重試時間超過maxElapsedTimeMs後,就不再重試 */ // RetryPolicy retryPolicy4 = new RetryUntilElapsed(2000, 3000); //建立客戶端 client = CuratorFrameworkFactory.builder() //builder .connectString(zkServerPath) .sessionTimeoutMs(10000) //session超時時間 .retryPolicy(retryPolicy) //重試策略 //namespace: .namespace("testCRUD") .build(); /** * CuratorFrameworkFactory工廠在創建出一個客戶端CuratorFramework例項之後,實質上並沒有完成會話的建立,而是需要呼叫 CuratorFramework的sta rt)方法來完成會話的建立。 */ client.start(); } /** * * @Description: 關閉zk客戶端連線 */ public void closeZKClient() { if (client != null) { this.client.close(); } } public static void main(String[] args) throws Exception { // 例項化 CuratorOperator cto = new CuratorOperator(); boolean isZkCuratorStarted = cto.client.isStarted(); System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連線中" : "已關閉")); String nodePath = "/dongguabai/a"; //建立節點 /* byte[] data = "abcd".getBytes(); cto.client.create() .creatingParentContainersIfNeeded() //遞迴建立節點 .withMode(CreateMode.PERSISTENT) //節點模式 .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) //ACL .forPath(nodePath,data); //不指定內容,則內容為空*/ //獲取節點 /* byte[] bytes = cto.client.getData().forPath(nodePath); System.out.println("第一次獲取節點資料為:"+new String(bytes)); Stat stat = new Stat(); byte[] bytes1 = cto.client.getData().storingStatIn(stat).forPath(nodePath); System.out.println("第二次獲取節點資料為:"+new String(bytes1)); System.out.println("獲取的Stat為:"+ JsonUtil.toJSON(stat)); */ cto.client.getData().usingWatcher((CuratorWatcher) event -> { System.out.println("觸發了watcher事件,節點路徑為:"+event.getPath()+",事件型別為:"+event.getType()); }).forPath(nodePath); //獲取子節點 /* List<String> list = cto.client.getChildren().forPath(nodePath); System.out.println("開始列印子節點:"); list.forEach(result-> System.out.println(result)); System.out.println("列印結束!");*/ //修改節點 /* Stat stat = cto.client.setData().forPath(nodePath,"new1".getBytes()); System.out.println("第一次獲取節點資料為:"+new String(cto.client.getData().forPath(nodePath))); Stat stat1 = cto.client.setData().withVersion(stat.getVersion()).forPath(nodePath, "new2".getBytes()); System.out.println("第二次獲取節點資料為:"+new String(cto.client.getData().forPath(nodePath)));*/ //刪除節點 /* Stat stat = new Stat(); byte[] bytes1 = cto.client.getData().storingStatIn(stat).forPath(nodePath); System.out.println("獲取節點資料為:"+new String(bytes1)); cto.client.delete() .guaranteed() //防止網路抖動,只要客戶端會話有效,那麼Curator 會在後臺持續進行刪除操作,直到節點刪除成功 .deletingChildrenIfNeeded() //如果有子節點會刪除,注意除非人為刪除namespace,否則namespace不會刪除 .withVersion(stat.getVersion()) .forPath(nodePath);*/ Thread.sleep(300000); cto.closeZKClient(); boolean isZkCuratorStarted2 = cto.client.isStarted(); System.out.println("當前客戶的狀態:" + (isZkCuratorStarted2 ? "連線中" : "已關閉")); } }
執行程式後,在客戶端執行:
可以看到控制檯輸出了,可以看出沒有輸出namespace:
但是再在客戶端執行相同的操作,控制檯沒有輸出,說明事件只會觸發一次。
ZooKeeper原生支援通過註冊Watcher來進行事件監聽,但是其使用並不是特別方便,需要開發人員自己反覆註冊Watcher,比較繁瑣。Curator引入了Cache來實現對ZooKeeper服務端事件的監聽。Cache是Curator中對事件監聽的包裝,其對事件的監聽其實可以近似看作是一個本地快取檢視和遠端ZooKeeper檢視的對比過程。同時Curator能夠自動為開發人員處理反覆註冊監聽,從而大大簡化了原生API開發的繁瑣過程。Cache分為兩類監聽型別:節點監聽和子節點監聽。
NodeCache的使用
NodeCache有兩個建構函式:
//構造NodeCache例項
NodeCache nodeCache = new NodeCache(cto.client,nodePath);
//建立Cache
//該方法有個boolean型別的引數,預設是false,如果設定為true,那麼NodeCache在第一次啟動的時候就會立刻從ZooKeeper上讀取對應節點的資料內容,並儲存在Cache中。一般在開發中我們會設定為true。
nodeCache.start();
通過程式碼測試看看:
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorOperator2 {
public CuratorFramework client = null;
public static final String zkServerPath = "192.168.220.135:2181,192.168.220.136:2181,192.168.220.137:2181";
public CuratorOperator2() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
client = CuratorFrameworkFactory.builder() //builder
.connectString(zkServerPath)
.sessionTimeoutMs(10000) //session超時時間
.retryPolicy(retryPolicy) //重試策略
//namespace:
.namespace("testCRUD")
.build();
client.start();
//client.start(true);
}
public void closeZKClient() {
if (client != null) {
this.client.close();
}
}
public static void main(String[] args) throws Exception {
// 例項化
CuratorOperator2 cto = new CuratorOperator2();
boolean isZkCuratorStarted = cto.client.isStarted();
System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連線中" : "已關閉"));
String nodePath = "/dongguabai/a";
cto.client.getData().usingWatcher((CuratorWatcher) event -> {
System.out.println("【使用usingWatcher】觸發了watcher事件,節點路徑為:"+event.getPath()+",事件型別為:"+event.getType());
}).forPath(nodePath);
//NodeCache:監聽資料節點的變更,會觸發事件
//構造NodeCache例項
NodeCache nodeCache = new NodeCache(cto.client,nodePath);
//建立Cache
//該方法有個boolean型別的引數,預設是false,如果設定為true,那麼NodeCache在第一次啟動的時候就會立刻從ZooKeeper上讀取對應節點的資料內容,並儲存在Cache中。
nodeCache.start();
if(nodeCache.getCurrentData()!=null){
System.out.println("節點初始化資料為:"+new String(nodeCache.getCurrentData().getData()));
}else {
System.out.println("節點資料為空!");
}
cto.closeZKClient();
boolean isZkCuratorStarted2 = cto.client.isStarted();
System.out.println("當前客戶的狀態:" + (isZkCuratorStarted2 ? "連線中" : "已關閉"));
}
}
執行結果:
測試事件觸發:
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorOperator2 {
public CuratorFramework client = null;
public static final String zkServerPath = "192.168.220.136:2181,192.168.220.137:2181";
public CuratorOperator2() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
client = CuratorFrameworkFactory.builder() //builder
.connectString(zkServerPath)
.sessionTimeoutMs(10000) //session超時時間
.retryPolicy(retryPolicy) //重試策略
//namespace:
.namespace("testCRUD")
.build();
client.start();
}
public void closeZKClient() {
if (client != null) {
this.client.close();
}
}
public static void main(String[] args) throws Exception {
// 例項化
CuratorOperator2 cto = new CuratorOperator2();
boolean isZkCuratorStarted = cto.client.isStarted();
System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連線中" : "已關閉"));
String nodePath = "/dongguabai/a";
cto.client.getData().usingWatcher((CuratorWatcher) event -> {
System.out.println("【使用usingWatcher】觸發了watcher事件,節點路徑為:"+event.getPath()+",事件型別為:"+event.getType());
}).forPath(nodePath);
//NodeCache:監聽資料節點的變更,會觸發事件
//構造NodeCache例項
NodeCache nodeCache = new NodeCache(cto.client,nodePath);
//建立Cache
//該方法有個boolean型別的引數,預設是false,如果設定為true,那麼NodeCache在第一次啟動的時候就會立刻從ZooKeeper上讀取對應節點的資料內容,並儲存在Cache中。
nodeCache.start(true);
if(nodeCache.getCurrentData()!=null){
System.out.println("節點初始化資料為:"+new String(nodeCache.getCurrentData().getData()));
}else {
System.out.println("節點資料為空!");
}
//新增事件(也有remove),還可以知道Excutor
nodeCache.getListenable().addListener(() -> {
String data = new String(nodeCache.getCurrentData().getData());
System.out.println("節點路徑:"+nodeCache.getCurrentData().getPath()+",節點資料為:"+data);
});
System.in.read();
cto.closeZKClient();
boolean isZkCuratorStarted2 = cto.client.isStarted();
System.out.println("當前客戶的狀態:" + (isZkCuratorStarted2 ? "連線中" : "已關閉"));
}
}
執行後在客戶端操作:
控制檯輸出:
如果在客戶端執行刪除操作:
控制檯輸出,空指標是因為節點被刪除了:
NodeCache不僅可以用於監聽資料節點的內容變更,也能監聽指定節點是否存在。如果原本節點不存在,那麼Cache就會在節點被建立後觸發NodeCacheListenera。但是,如果該資料節點被刪除,那麼Curator就無法觸發NodeCacheListener了(後面版本已經修復了這個問題,網上有的資料這麼說是有問題的)。
程式碼測試:
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import java.time.LocalDateTime;
public class CuratorOperator2 {
public CuratorFramework client = null;
public static final String zkServerPath = "192.168.220.136:2181,192.168.220.137:2181";
public CuratorOperator2() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
client = CuratorFrameworkFactory.builder() //builder
.connectString(zkServerPath)
.sessionTimeoutMs(10000) //session超時時間
.retryPolicy(retryPolicy) //重試策略
//namespace:
.namespace("testCRUD")
.build();
client.start();
}
public void closeZKClient() {
if (client != null) {
this.client.close();
}
}
public static void main(String[] args) throws Exception {
// 例項化
CuratorOperator2 cto = new CuratorOperator2();
boolean isZkCuratorStarted = cto.client.isStarted();
System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連線中" : "已關閉"));
String nodePath = "/dongguabai/a";
/* cto.client.getData().usingWatcher((CuratorWatcher) event -> {
System.out.println("【使用usingWatcher】觸發了watcher事件,節點路徑為:"+event.getPath()+",事件型別為:"+event.getType());
}).forPath(nodePath);*/
//NodeCache:監聽資料節點的變更,會觸發事件
//構造NodeCache例項
NodeCache nodeCache = new NodeCache(cto.client,nodePath);
//建立Cache
//該方法有個boolean型別的引數,預設是false,如果設定為true,那麼NodeCache在第一次啟動的時候就會立刻從ZooKeeper上讀取對應節點的資料內容,並儲存在Cache中。
nodeCache.start(true);
if(nodeCache.getCurrentData()!=null){
System.out.println("節點初始化資料為:"+new String(nodeCache.getCurrentData().getData()));
}else {
System.out.println("節點資料為空!");
}
//新增事件(也有remove)
nodeCache.getListenable().addListener(() -> {
/*String data = new String(nodeCache.getCurrentData().getData());
System.out.println("節點路徑:"+nodeCache.getCurrentData().getPath()+",節點資料為:"+data);*/
System.out.println(LocalDateTime.now()+" |觸發節點事件!!!");
});
System.out.println("開始建立節點!");
//建立節點
cto.client.create()
.creatingParentContainersIfNeeded() //遞迴建立節點
.withMode(CreateMode.PERSISTENT) //節點模式
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) //ACL
.forPath(nodePath,"new Time".getBytes());
System.in.read();
cto.closeZKClient();
boolean isZkCuratorStarted2 = cto.client.isStarted();
System.out.println("當前客戶的狀態:" + (isZkCuratorStarted2 ? "連線中" : "已關閉"));
}
}
控制檯輸出:
在客戶端操作:
控制檯輸出:
PathChildrenCache
有時候我們需要監聽某一個節點具體的操作情況,這時候PathChildrenCache就派上用場了。PathChildrenCache用於監聽指定ZooKeeper資料節點的子節點變化情況。我們要監聽一個節點也沒有被刪除、新增、修改,我們只需要監聽這個節點的父節點即可,即監聽這個父節點以下所有的子節點,當某一個子節點發生了增刪改操作的時候都會被監聽到。
跟NodeCache類似,PathChildrenCache也需要呼叫start()來初始化:
資料準備:
同步非同步測試:
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.List;
public class CuratorOperator3 {
public CuratorFramework client = null;
public static final String zkServerPath = "192.168.220.136:2181,192.168.220.137:2181";
public CuratorOperator3() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
client = CuratorFrameworkFactory.builder() //builder
.connectString(zkServerPath)
.sessionTimeoutMs(10000) //session超時時間
.retryPolicy(retryPolicy) //重試策略
//namespace:
.namespace("testCRUD")
.build();
client.start();
}
public void closeZKClient() {
if (client != null) {
this.client.close();
}
}
public static void main(String[] args) throws Exception {
// 例項化
CuratorOperator3 cto = new CuratorOperator3();
boolean isZkCuratorStarted = cto.client.isStarted();
System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連線中" : "已關閉"));
String nodePath = "/dongguabai/a";
//為子節點新增watcher
PathChildrenCache childrenCache = new PathChildrenCache(cto.client,nodePath,true);
/**
* StartMode:初始化方式
* NORMAL:普通非同步初始化
BUILD_INITIAL_CACHE:同步初始化
POST_INITIALIZED_EVENT:非同步初始化,初始化之後會觸發事件
*/
childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
// childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
// childrenCache.start(PathChildrenCache.StartMode.NORMAL);
List<ChildData> list = childrenCache.getCurrentData();
System.out.println("獲取子節點列表:");
//如果是BUILD_INITIAL_CACHE可以獲取這個資料,如果不是就不行
list.forEach(childData -> {
System.out.println(new String(childData.getData()));
});
System.in.read();
cto.closeZKClient();
boolean isZkCuratorStarted2 = cto.client.isStarted();
System.out.println("當前客戶的狀態:" + (isZkCuratorStarted2 ? "連線中" : "已關閉"));
}
}
註冊事件測試:
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.time.LocalDateTime;
import java.util.concurrent.atomic.AtomicInteger;
public class CuratorOperator3 {
public CuratorFramework client = null;
public static final String zkServerPath = "192.168.220.136:2181,192.168.220.137:2181";
public CuratorOperator3() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
client = CuratorFrameworkFactory.builder() //builder
.connectString(zkServerPath)
.sessionTimeoutMs(10000) //session超時時間
.retryPolicy(retryPolicy) //重試策略
//namespace:
.namespace("testCRUD")
.build();
client.start();
}
public void closeZKClient() {
if (client != null) {
this.client.close();
}
}
public static void main(String[] args) throws Exception {
// 例項化
CuratorOperator3 cto = new CuratorOperator3();
boolean isZkCuratorStarted = cto.client.isStarted();
System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連線中" : "已關閉"));
String nodePath = "/dongguabai/a";
//為子節點新增watcher
PathChildrenCache childrenCache = new PathChildrenCache(cto.client,nodePath,true);
/**
* StartMode:初始化方式
* NORMAL:普通非同步初始化
BUILD_INITIAL_CACHE:同步初始化
POST_INITIALIZED_EVENT:非同步初始化,初始化之後會觸發事件,而且所有的子節點的add操作都會來一遍這個也是比較坑的地方
*/
// childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
// childrenCache.start(PathChildrenCache.StartMode.NORMAL);
/*List<ChildData> list = childrenCache.getCurrentData();
System.out.println("獲取子節點列表:");
//如果是BUILD_INITIAL_CACHE可以獲取這個資料,如果不是就不行
list.forEach(childData -> {
System.out.println(new String(childData.getData()));
});*/
//註冊事件,也可以加Excutor
/**
* 當指定節點的子節點發生變化時,就會回撥該方法oPathChildrenCacheEvent
類中定義了所有的事件型別,主要包括新增子節點(CHILD_ADDED)、子節點資料
變更(CHILD_UPDATED)和子節點刪除(CHILD_REMOVED)三類。
*/
AtomicInteger atomicInteger = new AtomicInteger(0);
childrenCache.getListenable().addListener(((client1, event) -> {
atomicInteger.getAndIncrement();
System.out.println("----- "+LocalDateTime.now()+" "+event.getType());
if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){
System.out.println("子節點初始化成功...");
}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){
String path = event.getData().getPath();
System.out.println("新增子節點:" + event.getData().getPath());
System.out.println("子節點資料:" + new String(event.getData().getData()));
}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){
System.out.println("刪除子節點:" + event.getData().getPath());
}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
System.out.println("修改子節點路徑:" + event.getData().getPath());
System.out.println("修改子節點資料:" + new String(event.getData().getData()));
}
//如果想指定節點可以判斷路徑
/*String path = event.getData().getPath();
if (path.equals(ADD_PATH)) {
System.out.println("新增子節點:" + event.getData().getPath());
System.out.println("子節點資料:" + new String(event.getData().getData()));
} else if (path.equals("/super/imooc/e")) {
System.out.println("新增不正確...");
}*/
}));
Thread.sleep(1000);
System.out.println("結果:"+atomicInteger);
System.in.read();
cto.closeZKClient();
boolean isZkCuratorStarted2 = cto.client.isStarted();
System.out.println("當前客戶的狀態:" + (isZkCuratorStarted2 ? "連線中" : "已關閉"));
}
}
啟動專案,先看控制檯:
在客戶端執行一些操作:
再看控制檯: