ZK自我複習之Watcher核心機制
1.ZK有個watch事件,是一次性觸發的,當watch監視的資料發生變化時,通知設定了該watch的client,即Watcher。同樣,Watcher是監聽資料發生了某些變化,那麼就一定會有對應的事件型別和狀態型別。
- 事件型別:(znode節點相關的)
EventType.NodeCreated
EventType.NodeDataChanged
EventType.NodeChildrenChanged
EventType.NodeDeleted - 狀態型別:(和客戶端例項相關的)
KeeperState.Disconnected
KeeperState.SyncConnected
KeeperState.AuthFailed
KeeperState.Expired
這個東西說起來太麻煩,我用程式碼的方式來簡單說明下大致的機制。(首先,你得有ZK原生API的瞭解,否則程式碼估計不太能看懂。不會的可以先百度瞭解下,原生的API不太好用,一般都是用curator。)
程式碼案例:(jar包用的zookeeper-3.4.5.jar)
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class ZkWatcher implements Watcher {
AtomicInteger seq = new AtomicInteger();
static final int SESSION_TIMEOUT = 10000; // 10秒
static final String ZK_SERVER_ADDR = "ZK伺服器的IP:ZK埠"; // ZK伺服器,我就選擇一臺連線了。
/** zk測試watcher概念父路徑設定 */
private static final String PARENT_PATH = "/testWatch";
/** zk測試watcher概念子路徑設定 */
private static final String CHILDREN_PATH = "/testWatch/children";
/** 主執行緒進入標識 */
private static final String LOG_PREFIX_OF_MAIN = "【Main】";
/** zk變數 */
private ZooKeeper zk = null;
/** 訊號量設定,用於等待zookeeper連線建立之後 通知阻塞程式繼續向下執行 */
private CountDownLatch connectedSemaphore = new CountDownLatch(1);
/**
* 建立ZK連線
*
* @param zkServerAddr
* ZK伺服器連線地址
* @param sessionTimeOut
* ZK超時時間
*/
public void createConnection(String zkServerAddr, int sessionTimeOut) {
releaseConnection(); // 釋放ZK連線
try {
// 這個this應該能看懂吧。 this指這個類,而這個類實現了Wathcher介面,重寫了 process方法
zk = new ZooKeeper(zkServerAddr, sessionTimeOut, this);
System.out.println(LOG_PREFIX_OF_MAIN + "開始連線ZK伺服器");
connectedSemaphore.await(); // 阻塞等待process方法連線上伺服器
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 關閉ZK連線
*/
public void releaseConnection() {
if (zk != null) {
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 重寫Watcher監聽
*/
@Override
public void process(WatchedEvent event) {
System.out.println("進入 process方法,event = " + event);
if (event == null) {
return;
}
// 獲取連線狀態
KeeperState keeperState = event.getState();
// 獲取事件型別
EventType eventType = event.getType();
// 監聽的受影響的path
String path = event.getPath();
String logPrefix = "【Watcher-" + this.seq.incrementAndGet() + "】";
System.out.println(logPrefix + "收到Watcher通知");
System.out.println(logPrefix + "連線狀態:\t" + keeperState.toString());
System.out.println(logPrefix + "事件型別:\t" + eventType.toString());
if (KeeperState.SyncConnected == keeperState) {
if (EventType.None == eventType) {
// 成功連線上ZK伺服器
System.out.println(logPrefix + "成功連線上ZK伺服器");
connectedSemaphore.countDown(); // 放行,繼續執行主執行緒程式碼
} else if (EventType.NodeCreated == eventType) {
// 建立節點事件
System.out.println(logPrefix + "節點建立。節點path:" + path);
// 判斷節點是否存在
Stat stat = this.exists(path, true);
System.out.println(logPrefix + "節點建立狀態:" + stat.toString());
}
}
}
/**
* 判斷指定節點是否存在
*
* @param path
* 節點路徑
*/
public Stat exists(String path, boolean needWatch) {
try {
return this.zk.exists(path, needWatch);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 建立節點
*
* @param path
* 節點路徑
* @param data
* 節點資料
* @return
*/
public boolean createPath(String path, String data) {
try {
exists(path, true);
System.out.println(LOG_PREFIX_OF_MAIN + "節點建立成功, Path: "
+ this.zk.create( /** 路徑 */
path,
/** 資料 */
data.getBytes(),
/** 所有可見 */
Ids.OPEN_ACL_UNSAFE,
/** 永久儲存 */
CreateMode.PERSISTENT) + ", content: " + data);
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
/**
* 刪除所有節點
*/
public void deleteAllTestPath() {
if (this.exists(CHILDREN_PATH, false) != null) {
this.deleteNode(CHILDREN_PATH);
}
if (this.exists(PARENT_PATH, false) != null) {
this.deleteNode(PARENT_PATH);
}
}
/**
* 刪除指定節點
*
* @param path
* 節點path
*/
public void deleteNode(String path) {
try {
this.zk.delete(path, -1);
System.out.println(LOG_PREFIX_OF_MAIN + "刪除節點成功,path:" + path);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 獲取子節點
*
* @param path
* 節點路徑
*/
private List<String> getChildren(String path, boolean needWatch) {
try {
return this.zk.getChildren(path, needWatch);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 更新指定節點資料內容
*
* @param path
* 節點路徑
* @param data
* 資料內容
* @return
*/
public boolean writeData(String path, String data) {
try {
System.out.println(LOG_PREFIX_OF_MAIN + "更新資料成功,path:" + path
+ ", stat: " + this.zk.setData(path, data.getBytes(), -1));
} catch (Exception e) {
e.printStackTrace();
return false
}
return true
}
/**
* 讀取指定節點資料內容
*
* @param path
* 節點路徑
* @return
*/
public String readData(String path, boolean needWatch) {
try {
return new String(this.zk.getData(path, needWatch, null));
} catch (Exception e) {
e.printStackTrace();
return "";
}
}
}
接下來通過寫一個main方法來單獨測試Watcher機制.
1)第一步,就是ZK建立連線。 這個沒什麼好說的,會執行process方法
public static void main(String[] args) {
ZkWatcher zkWatcher=new ZkWatcher();
zkWatcher.createConnection(ZK_SERVER_ADDR, SESSION_TIMEOUT);
System.out.println(LOG_PREFIX_OF_MAIN+"繼續向下執行。");
}
從上面執行結果可以看出,連線成功的狀態是SyncConnected,而建立連線是沒任何的操作的,所以事件型別是None。表示連線了ZK
2)第二步,模擬建立節點。(需求,想建立節點的時候服務端接收到建立的資訊,然後通知客戶端做相關操作)
備註:要牢牢記住watch事件是一次性觸發的,也就是觸發過一次後這個監聽關係就結束了。想再監聽必須再發起
2.1)先看第一種情況,也就是直接建立一個節點的情況。
public static void main(String[] args) {
ZkWatcher zkWatcher=new ZkWatcher();
zkWatcher.createConnection(ZK_SERVER_ADDR, SESSION_TIMEOUT);
System.out.println(LOG_PREFIX_OF_MAIN+"繼續向下執行。");
boolean createNode=zkWatcher.createPath(PARENT_PATH,System.currentTimeMillis() + "");
if(createNode){
System.out.println(LOG_PREFIX_OF_MAIN+"建立節點成功");
}else{
System.out.println(LOG_PREFIX_OF_MAIN+"建立節點失敗");
}
}
從上面結果可以看到,client端並不知道建立節點的事件。
2.2)第二種情況,也就是給path這個節點加個watch,設定為true,表示採用watch監聽。(把上面createPath方法中註釋的那個程式碼開啟),執行結果如下:
從上圖結果可以發現,process方法執行了二次,一次是連線ZK的時候。 而第二次從事件型別是NodeCreated可以看出,是觸發了節點建立,而觸發的節點就是當前監聽的path。
3)第三步,模擬修改節點(其實明白了第二步,其它基本都一樣)
public static void main(String[] args) {
ZkWatcher zkWatcher=new ZkWatcher();
zkWatcher.createConnection(ZK_SERVER_ADDR, SESSION_TIMEOUT);
System.out.println(LOG_PREFIX_OF_MAIN+"繼續向下執行。");
Stat stat=zkWatcher.exists(PARENT_PATH, false);
if(stat==null){
if(zkWatcher.createPath(PARENT_PATH,System.currentTimeMillis() + "")){
System.out.println(LOG_PREFIX_OF_MAIN+"新建立節點成功");
}
}else{
System.out.println(LOG_PREFIX_OF_MAIN+"節點已存在,不再建立");
}
if(zkWatcher.writeData(PARENT_PATH, System.currentTimeMillis() + "")){
System.out.println(LOG_PREFIX_OF_MAIN+"節點資料修改成功");
}else{
System.out.println(LOG_PREFIX_OF_MAIN+"節點資料修改失敗");
}
}
從上面結果可以看出,修改節點沒有被監聽。因為沒有對修改的節點進行監聽。 所以可以對節點進行監聽,比如我用讀的方式先加個監聽,再修改就可以了。
public static void main(String[] args) {
ZkWatcher zkWatcher=new ZkWatcher();
zkWatcher.createConnection(ZK_SERVER_ADDR, SESSION_TIMEOUT);
System.out.println(LOG_PREFIX_OF_MAIN+"繼續向下執行。");
Stat stat=zkWatcher.exists(PARENT_PATH, false);
if(stat==null){
if(zkWatcher.createPath(PARENT_PATH,System.currentTimeMillis() + "")){
System.out.println(LOG_PREFIX_OF_MAIN+"新建立節點成功");
}
}else{
System.out.println(LOG_PREFIX_OF_MAIN+"節點已存在,不再建立");
}
zkWatcher.readData(PARENT_PATH, true); //或者zkWatcher.exists(PARENT_PATH, true);
if(zkWatcher.writeData(PARENT_PATH, System.currentTimeMillis() + "")){
System.out.println(LOG_PREFIX_OF_MAIN+"節點資料修改成功");
}else{
System.out.println(LOG_PREFIX_OF_MAIN+"節點資料修改失敗");
}
}
從上圖結果看一看出,事件型別是NodeDataChanged,也就是節點資料發生變更觸發了process方法。
4.)刪除節點(從上面的例子,應該也知道怎麼用了,就不多說了)
public static void main(String[] args) {
ZkWatcher zkWatcher=new ZkWatcher();
zkWatcher.createConnection(ZK_SERVER_ADDR, SESSION_TIMEOUT);
System.out.println(LOG_PREFIX_OF_MAIN+"繼續向下執行。");
Stat stat=zkWatcher.exists(PARENT_PATH, false);
if(stat==null){
if(zkWatcher.createPath(PARENT_PATH,System.currentTimeMillis() + "")){
System.out.println(LOG_PREFIX_OF_MAIN+"新建立節點成功");
}
}else{
System.out.println(LOG_PREFIX_OF_MAIN+"節點已存在,不再建立");
}
/*zkWatcher.readData(PARENT_PATH, true); //或者zkWatcher.exists(PARENT_PATH, true);
if(zkWatcher.writeData(PARENT_PATH, System.currentTimeMillis() + "")){
System.out.println(LOG_PREFIX_OF_MAIN+"節點資料修改成功");
}else{
System.out.println(LOG_PREFIX_OF_MAIN+"節點資料修改失敗");
}*/
zkWatcher.exists(PARENT_PATH, true);
zkWatcher.deleteNode(PARENT_PATH);
}
當然還有建立子節點什麼的,上面程式碼也封裝了,原理都一樣。自行解讀,不做過多解釋了。
一些知識點的補充:
1)watcher的特性:一次性、客戶端序列執行、輕量。
- 一次性:對於ZK的watcher,你只需要記住一點:ZK有watch事件,是一次性觸發的。當watch監聽的資料發生變化時,通知設定了該watch的client,即watcher,由於ZK的監控都是一次性的所以每次必須設定監控。
- 客戶端序列執行:客戶端Watcher回撥過程時一個串行同步的過程,這為我們保證了順序,同時需要開發人員注意一點,千萬不要因為一個Watcher的處理邏輯影響了整個客戶端Watcher的回撥。
- 輕量:WatchedEvent是ZK整個Watcher通知機制的最小通知單元,整個結構只包含三個部分:通知狀態、事件型別和節點路徑。也就是說Watcher通知非常簡單,只會告訴客戶端發生了事件而不會告知其具體內容,需要客戶自己去進行獲取,比如NodeDataChanged事件。