1. 程式人生 > >Apache ZooKeeper Watcher 機制原始碼解釋

Apache ZooKeeper Watcher 機制原始碼解釋

分散式系統從根本上來說就是不同節點上的程序併發執行,並且相互之間對程序的行為進行協調處理的過程。不同節點上的程序互相協調行為的過程叫做分散式同步。許多分散式系統需要一個程序作為任務的協調者,執行一些其他程序並不執行的特殊的操作,一般情況下哪個程序擔當任務的協調者都無所謂,但是必須有一個程序作為協調者,自動選舉出一個協調者的過程就是分散式選舉。ZooKeeper 正是為了解決這一系列問題而生的。上一篇我們介紹了 ZooKeeper 服務啟動原理和原始碼剖析,這一講我們來談談 Watcher 機制,首先介紹一個監控示例,然後我們再來聊聊 Watcher 機制原理。

ZooKeeper Watcher 機制

叢集狀態監控示例

為了確保叢集能夠正常執行,ZooKeeper 可以被用來監視叢集狀態,這樣就可以提供叢集高可用性。使用 ZooKeeper 的瞬時(ephemeral)節點概念可以設計一個叢集機器狀態檢測機制:

1. 每一個運行了 ZooKeeper 客戶端的生產環境機器都是一個終端程序,我們可以在它們連線到 ZooKeeper 服務端後在 ZooKeeper 服務端建立一系列對應的瞬時節點,可以用/hostname 來進行區分。

2. 這裡還是採用監聽(Watcher)方式來完成對節點狀態的監視,通過對/hostname 節點的 NodeChildrenChanged 事件的監聽來完成這一目標。監聽程序是作為一個獨立的服務或者程序執行的,它覆蓋了 process 方法來實現應急措施。

3. 由於是一個瞬時節點,所以每次客戶端斷開時 znode 會立即消失,這樣我們就可以監聽到叢集節點異常。

4.NodeChildrenChanged 事件觸發後我們可以呼叫 getChildren 方法來知道哪臺機器發生了異常。

清單 1.ClusterMonitor 類
import java.io.IOException;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class ClusterMonitor implements Runnable{
 private static String membershipRoot = "/Members";
 private final Watcher connectionWatcher;
 private final Watcher childrenWatcher;
 private ZooKeeper zk;
 boolean alive = true;
 public ClusterMonitor(String HostPort) throws IOException,InterruptedException,KeeperException{
 connectionWatcher = new Watcher(){
 @Override
 public void process(WatchedEvent event) {
 // TODO Auto-generated method stub
 if(event.getType() == Watcher.Event.EventType.None && 
 event.getState() == Watcher.Event.KeeperState.SyncConnected){
 System.out.println("\nconnectionWatcher Event Received:%s"+event.toString());
 }
 } 
 };
 
 childrenWatcher = new Watcher(){
 @Override
 public void process(WatchedEvent event) {
 // TODO Auto-generated method stub
 System.out.println("\nchildrenWatcher Event Received:%s"+event.toString());
 if(event.getType()==Event.EventType.NodeChildrenChanged){
 try{
 //Get current list of child znode and reset the watch
 List<String> children = zk.getChildren(membershipRoot, this);
 System.out.println("Cluster Membership change,Members: "+children);
 }catch(KeeperException ex){
 throw new RuntimeException(ex);
 }catch(InterruptedException ex){
 Thread.currentThread().interrupt();
 alive = false;
 throw new RuntimeException(ex);
 }
 }
 }
 };
 
 zk = new ZooKeeper(HostPort,2000,connectionWatcher);
 //Ensure the parent znode exists
 if(zk.exists(membershipRoot, false) == null){
 zk.create(membershipRoot, "ClusterMonitorRoot".getBytes(), 
 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 }
 //Set a watch on the parent znode
 List<String> children = zk.getChildren(membershipRoot, childrenWatcher);
 System.err.println("Members:"+children);
 }
 
 public synchronized void close(){
 try{
 zk.close();
 }catch(InterruptedException ex){
 ex.printStackTrace();
 }
 }
 
 @Override
 public void run() {
 // TODO Auto-generated method stub
 try{
 synchronized(this){
 while(alive){
 wait();
 }
 }
 }catch(InterruptedException ex){
 ex.printStackTrace();
 Thread.currentThread().interrupt();
 }finally{
 this.close();
 }
 }

 public static void main(String[] args) throws IOException,InterruptedException,KeeperException{
 if(args.length != 1){
 System.err.println("Usage:ClusterMonitor<Host:Port>");
 System.exit(0);
 }
 String hostPort = args[0];
 new ClusterMonitor(hostPort).run();
 }

}
清單 2.ClusterClient 類
 import java.io.IOException;
import java.lang.management.ManagementFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class ClusterClient implements Watcher,Runnable{
 private static String membershipRoot = "/Members";
 ZooKeeper zk;
 public ClusterClient(String hostPort,Long pid){
 String processId = pid.toString();
 try{
 zk = new ZooKeeper(hostPort,2000,this);
 }catch(IOException ex){
 ex.printStackTrace();
 }
 if(zk!=null){
 try{
 zk.create(membershipRoot+'/'+processId, processId.getBytes(), 
 Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
 }catch(KeeperException | InterruptedException ex){
 ex.printStackTrace();
 }
 }
 }
 
 public synchronized void close(){
 try{
 zk.close();
 }catch(InterruptedException ex){
 ex.printStackTrace();
 }
 }
 
 @Override
public void process(WatchedEvent event) {
// TODO Auto-generated method stub
System.out.println("\nEvent Received:%s"+event.toString());
}

@Override
public void run() {
// TODO Auto-generated method stub
try{
synchronized(this){
while(true){
wait();
}
}
}catch(InterruptedException ex){
ex.printStackTrace();
Thread.currentThread().interrupt();
}finally{
this.close();
}
}

public static void main(String[] args){
if(args.length!=1){
System.err.println("Usage:ClusterClient<Host:Port>");
System.exit(0);
}

String hostPort=args[0];
//Get the process id
String name = ManagementFactory.getRuntimeMXBean().getName();
int index = name.indexOf('@');
Long processId = Long.parseLong(name.substring(0,index));
new ClusterClient(hostPort,processId).run();
}
 
}
清單 3.Eclipse 執行輸出
childrenWatcher Event Received:%sWatchedEvent state:SyncConnected type:NodeChildrenChanged path:/Members
Cluster Membership change,Members: [dweref0000000009, test100000000003, dsdawqeqw0000000008, 
test111110000000004, test22220000000005, dsda32130000000007, dsda0000000006, test10000000002]

我們通過 zkCli 方式對被監聽的/Members 這個 ZNODE 操作,增加一個子節點,您會在 zkCli 裡看到如清單 4 所示輸出。

清單 4.ZKCli 建立 ZNode 子節點
[zk: localhost:2181(CONNECTED) 0] create -s /Members/dweref rew23rf
Created /Members/dweref0000000009 [zk: localhost:2181(CONNECTED) 4]

上面的示例我們演示瞭如何發起對於一個 ZNODE 的監聽,當該 ZNODE 被改變後,我們會觸發對應的方法進行處理,這類方式可以被用在資料監聽、叢集狀態監聽等用途。

回撥函式

由於 Watcher 機制涉及到回撥函式,所以我們先來介紹一下回調函式的基礎知識。

打個比方,有一家旅館提供叫醒服務,但是要求旅客自己決定叫醒的方法。可以是打客房電話,也可以是派服務員去敲門,睡得死怕耽誤事的,還可以要求往自己頭上澆盆水。這裡,“叫醒”這個行為是旅館提供的,相當於庫函式,但是叫醒的方式是由旅客決定並告訴旅館的,也就是回撥函式。而旅客告訴旅館怎麼叫醒自己的動作,也就是把回撥函式傳入庫函式的動作,稱為登記回撥函式(to register a callback function)。

乍看起來,回撥似乎只是函式間的呼叫,但仔細一琢磨,可以發現兩者之間的一個關鍵的不同:在回撥中,我們利用某種方式,把回撥函式像引數一樣傳入中間函式。可以這麼理解,在傳入一個回撥函式之前,中間函式是不完整的。換句話說,程式可以在執行時,通過登記不同的回撥函式,來決定、改變中間函式的行為。這就比簡單的函式呼叫要靈活太多了。

回撥實際上有兩種:阻塞式回撥和延遲式回撥。兩者的區別在於:阻塞式回撥裡,回撥函式的呼叫一定發生在起始函式返回之前;而延遲式回撥裡,回撥函式的呼叫有可能是在起始函式返回之後。我們來看一個簡單的示例。

清單 5.Caller 類
public class Caller 
{ 
 public MyCallInterface mc; 
 
 public void setCallfuc(MyCallInterface mc) 
 { 
 this.mc= mc; 
 } 
 
 public void call(){ 
 this.mc.method(); 
 } 
}
清單 6.MyCallInterface 介面
public interface MyCallInterface {
public void method(); 
}
清單 7.CallbackClass 類
public class CallbackClass implements MyCallInterface{
public void method() 
 { 
 System.out.println("回撥函式"); 
 } 
 
 public static void main(String args[]) 
 { 
 Caller call = new Caller(); 
 call.setCallfuc(new CallbackClass()); 
 call.call(); 
 } 
}
清單 8. 執行結果
回撥函式

原理及原始碼解釋

實現原理

ZooKeeper 允許客戶端向服務端註冊一個 Watcher 監聽,當服務端的一些指定事件觸發了這個 Watcher,那麼就會向指定客戶端傳送一個事件通知來實現分散式的通知功能。

ZooKeeper 的 Watcher 機制主要包括客戶端執行緒、客戶端 WatchManager 和 ZooKeeper 伺服器三部分。在具體工作流程上,簡單地講,客戶端在向 ZooKeeper 伺服器註冊 Watcher 的同時,會將 Watcher 物件儲存在客戶端的 WatchManager 中。當 ZooKeeper 伺服器端觸發 Watcher 事件後,會向客戶端傳送通知,客戶端執行緒從 WatchManager 中取出對應的 Watcher 物件來執行回撥邏輯。如清單 9 所示,WatchManager 建立了一個 HashMap,這個 HashMap 被用來存放 Watcher 物件。

清單 9.WatchManager 類
private final HashMap<String, HashSet<Watcher>> watchTable =
 new HashMap<String, HashSet<Watcher>>();
public synchronized void addWatch(String path, Watcher watcher) {
 HashSet<Watcher> list = watchTable.get(path);
 if (list == null) {
 // don't waste memory if there are few watches on a node
 // rehash when the 4th entry is added, doubling size thereafter
 // seems like a good compromise
 list = new HashSet<Watcher>(4);
 watchTable.put(path, list);
 }
 list.add(watcher);

 HashSet<String> paths = watch2Paths.get(watcher);
 if (paths == null) {
 // cnxns typically have many watches, so use default cap here
 paths = new HashSet<String>();
 watch2Paths.put(watcher, paths);
 }
 paths.add(path);
}

整個 Watcher 註冊和通知流程如圖 1 所示。

圖 1.Watcher 註冊和通知流程圖

Watcher 介面

Watcher 的理念是啟動一個客戶端去接收從 ZooKeeper 服務端發過來的訊息並且同步地處理這些資訊。ZooKeeper 的 Java API 提供了公共介面 Watcher,具體操作類通過實現這個介面相關的方法來實現從所連線的 ZooKeeper 服務端接收資料。如果要處理這個訊息,需要為客戶端註冊一個 CallBack(回撥)物件。Watcher 介面定義在 org.apache.zookeeper 包裡面,程式碼如清單 10 所示。

清單 10.Watcher 介面
public interface Watcher {
abstract public void process(WatchedEvent event);
}

在 Watcher 接口裡面,除了回撥函式 process 以外,還包含 KeeperState 和 EventType 兩個列舉類,分別代表了通知狀態和事件型別,如圖 2 所示。

圖 2.Watcher通知狀態和事件型別表

process 方法是 Watcher 介面中的一個回撥方法,當 ZooKeeper 向客戶端傳送一個 Watcher 事件通知時,客戶端就會對相應的 process 方法進行回撥,從而實現對事件的處理。

process 方法包含 WatcherEvent 型別的引數,WatchedEvent 包含了每一個事件的三個基本屬性:通知狀態(KeeperState)、事件型別(EventType)和節點路徑(Path),ZooKeeper 使用 WatchedEvent 物件來封裝服務端事件並傳遞給 Watcher,從而方便回撥方法 process 對服務端事件進行處理。

WatchedEvent 和 WatcherEvent 都表示的是同一個事物,都是對一個服務端事件的封裝。不同的是,WatchedEvent 是一個邏輯事件,用於服務端和客戶端程式執行過程中所需的邏輯物件,而 WatcherEvent 因為實現了序列化介面,因此可以用於網路傳輸。

服務端線上程 WatchedEvent 事件之後,會呼叫 getWrapper 方法將自己包裝成一個可序列化的 WatcherEvent 事件,如清單 7 所示,以便通過網路傳輸到客戶端。客戶端在接收到服務端的這個事件物件後,首先會將 WatcherEvent 事件還原成一個 WatchedEvent 事件,並傳遞給 process 方法處理,回撥方法 process 根據入參就能夠解析出完整的服務端事件了。

清單 11. 可序列化的事件
public WatcherEvent getWrapper() {
 return new WatcherEvent(eventType.getIntValue(), 
 keeperState.getIntValue(), 
 path);
}

客戶端註冊 Watcher 流程

清單 1 所示程式碼中採用了 ZooKeeper 建構函式來傳入一個 Watcher,如程式碼 zk = new ZooKeeper(HostPort,2000,connectionWatcher);在這行程式碼裡,第三個引數是連線到 ZooKeeper 服務端的 connectionWatcher 事件監聽,這個 Watcher 將作為整個 ZooKeeper 會話期間的預設 Watcher,會一直被儲存在客戶端 ZKWatchManager 的 defaultWatcher 裡面。

客戶端的請求基本都是在 ClientCnxn 裡面進行操作,當收到請求後,客戶端會對當前客戶端請求進行標記,將其設定為使用 Watcher 監聽,同時會封裝一個 Watcher 的註冊資訊 WatchRegistration 物件,用於暫時儲存資料節點的路徑和 Watcher 的對應關係。

清單 12.getChildren 方法新增 watch 事件
public byte[] getData(final String path, Watcher watcher, Stat stat)
 throws KeeperException, InterruptedException
 {
 final String clientPath = path;
 PathUtils.validatePath(clientPath);

 // the watch contains the un-chroot path
 WatchRegistration wcb = null;
 if (watcher != null) {
 wcb = new DataWatchRegistration(watcher, clientPath);
 }

 final String serverPath = prependChroot(clientPath);

 RequestHeader h = new RequestHeader();
 h.setType(ZooDefs.OpCode.getData);
 GetDataRequest request = new GetDataRequest();
 request.setPath(serverPath);
 request.setWatch(watcher != null);
 GetDataResponse response = new GetDataResponse();
 ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
 if (r.getErr() != 0) {
 throw KeeperException.create(KeeperException.Code.get(r.getErr()),
 clientPath);
 }
 if (stat != null) {
 DataTree.copyStat(response.getStat(), stat);
 }
 return response.getData();
 }

在 ZooKeeper 中,Packet 是一個最小的通訊協議單元,即資料包。Pakcet 用於進行客戶端與服務端之間的網路傳輸,任何需要傳輸的物件都需要包裝成一個 Packet 物件。在 ClientCnxn 中 WatchRegistration 也會被封裝到 Pakcet 中,然後由 SendThread 執行緒呼叫 queuePacke 方法把 Packet 放入傳送佇列中等待客戶端傳送,這又是一個非同步過程,分散式系統採用非同步通訊是一個普遍認同的觀念。隨後,SendThread 執行緒會通過 readResponse 方法接收來自服務端的響應,非同步地呼叫 finishPacket 方法從 Packet 中取出對應的 Watcher 並註冊到 ZKWatchManager 中去,如清單 13 所示。

清單 13.getChildren 方法新增 watch 事件
private void finishPacket(Packet p) {
 if (p.watchRegistration != null) {
 p.watchRegistration.register(p.replyHeader.getErr());
 }

 if (p.cb == null) {
 synchronized (p) {
 p.finished = true;
 p.notifyAll();
 }
 } else {
 p.finished = true;
 eventThread.queuePacket(p);
 }
 }

除了上面介紹的方式以外,ZooKeeper 客戶端也可以通過 getData、getChildren 和 exist 三個介面來向 ZooKeeper 伺服器註冊 Watcher,無論使用哪種方式,註冊 Watcher 的工作原理是一致的。如清單 14 所示,getChildren 方法呼叫了 WatchManager 類的 addWatch 方法添加了 watcher 事件。

清單 14.getChildren 方法新增 watcher 事件
public ArrayList<String> getChildren(String path, Stat stat, 
                    Watcher watcher) throws KeeperException.NoNodeException {
 DataNodeV1 n = nodes.get(path);
 if (n == null) {
 throw new KeeperException.NoNodeException();
 }
 synchronized (n) {
 ArrayList<String> children = new ArrayList<String>();
 children.addAll(n.children);
 if (watcher != null) {
 childWatches.addWatch(path, watcher);
 }
 return children;
 }
 }

如清單 15 所示,現在需要從這個封裝物件中再次提取出 Watcher 物件來,在 register 方法裡面,客戶端將 Watcher 物件轉交給 ZKWatchManager,並最終儲存在一個 Map 型別的資料結構 dataWatches 裡面,用於將資料節點的路徑和 Watcher 物件進行一一對映後管理起來。

注意,WatcherRegistation 除了 Header 和 request 兩個屬性被傳遞到了服務端,其他都沒有到服務端,否則服務端就容易出現記憶體緊張甚至溢位的危險,因為資料量太大了。這就是 ZooKeeper 為什麼適用於分散式環境的原因,它在網路中傳輸的是訊息,而不是資料包實體。

清單 15.processRequest 程式碼
public void register(int rc) {
 if (shouldAddWatch(rc)) {
 Map<String, Set<Watcher>> watches = getWatches(rc);
 synchronized(watches) {
 Set<Watcher> watchers = watches.get(clientPath);
 if (watchers == null) {
 watchers = new HashSet<Watcher>();
 watches.put(clientPath, watchers);
 }
 watchers.add(watcher);
 }
 }
 }

服務端處理 Watcher 流程

如圖 3 所示是服務端處理 Watcher 的一個完整序列圖。

圖 3. 服務端處理 Watcher 序列圖

注意,以下所有程式碼均為精簡版,去除了日誌、判斷分支,只在原始碼上保留了主線程式碼。

FinalRequestProcessor 類接收到客戶端請求後,會呼叫 processRequest 方法進行處理,會進一步轉向 ZooKeeperServer 的 processRequest 進行進一步處理,處理結由 ZKDatabase 類返回,如清單 16-18 所示。

清單 16.processRequest 程式碼
public void processRequest(Request request) {
if (request.hdr != null) {
 TxnHeader hdr = request.hdr;
 Record txn = request.txn;

 rc = zks.processTxn(hdr, txn);
 }
清單 17.ZooKeeperServer 程式碼
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
ProcessTxnResult rc;
 int opCode = hdr.getType();
 long sessionId = hdr.getClientId();
 rc = getZKDatabase().processTxn(hdr, txn);
 if (opCode == OpCode.createSession) {
 if (txn instanceof CreateSessionTxn) {
 CreateSessionTxn cst = (CreateSessionTxn) txn;
 sessionTracker.addSession(sessionId, cst
 .getTimeOut());
 } else {
 LOG.warn("*****>>>>> Got "
 + txn.getClass() + " "
 + txn.toString());
 }
 } else if (opCode == OpCode.closeSession) {
 sessionTracker.removeSession(sessionId);
 }
 return rc;
}
清單 18.ZKDatabase 程式碼
public ProcessTxnResult processTxn(TxnHeader header, Record txn)
 {
switch (header.getType()) {
case OpCode.setData:
 SetDataTxn setDataTxn = (SetDataTxn) txn;
 rc.path = setDataTxn.getPath();
 rc.stat = setData(setDataTxn.getPath(), setDataTxn
 .getData(), setDataTxn.getVersion(), header
 .getZxid(), header.getTime());
 break;

對於註冊 Watcher 請求,FinalRequestProcessor 的 ProcessRequest 方法會判斷當前請求是否需要註冊 Watcher,如果為 true,就會將當前的 ServerCnxn 物件和資料節點路徑傳入 getData 方法中去。ServerCnxn 是一個 ZooKeeper 客戶端和伺服器之間的連線介面,代表了一個客戶端和伺服器的連線,我們後面講到的 process 回撥方法,實際上也是從這裡回撥的,所以可以把 ServerCnxn 看作是一個 Watcher 物件。資料節點的節點路徑和 ServerCnxn 最終會被儲存在 WatchManager 的 watchTable 和 watch2Paths 中。

清單 19. 判斷是否註冊 Watcher 程式碼
case OpCode.getData: {
 lastOp = "GETD";
 GetDataRequest getDataRequest = new GetDataRequest();
 ByteBufferInputStream.byteBuffer2Record(request.request,
 getDataRequest);
 DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
 if (n == null) {
 throw new KeeperException.NoNodeException();
 }
 Long aclL;
 synchronized(n) {
 aclL = n.acl;
 }
 PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclL),
 ZooDefs.Perms.READ,
 request.authInfo);
 Stat stat = new Stat();
 byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
 getDataRequest.getWatch() ? cnxn : null);
 rsp = new GetDataResponse(b, stat);
 break;
 }

如前所述,WatchManager 負責 Watcher 事件的觸發,它是一個統稱,在服務端 DataTree 會託管兩個 WatchManager,分別是 dataWatches 和 childWatches,分別對應資料變更 Watcher 和子節點變更 Watcher。

清單 20.WatchManger 兩個佇列
private final HashMap<String, HashSet<Watcher>> watchTable =
 new HashMap<String, HashSet<Watcher>>();

 private final HashMap<Watcher, HashSet<String>> watch2Paths =
 new HashMap<Watcher, HashSet<String>>();

回到主題,如清單 21 到 23 所示,當發生 Create、Delete、NodeChange(資料變更)這樣的事件後,DataTree 會呼叫相應方法去觸發 WatchManager 的 triggerWatch 方法,該方法返回 ZNODE 的資訊,自此進入到回撥本地 process 的序列。

清單 21.processTxn 程式碼
public ProcessTxnResult processTxn(TxnHeader header, Record txn)
 {
 ProcessTxnResult rc = new ProcessTxnResult();

 try {
switch (header.getType()) {
case OpCode.setData:
 SetDataTxn setDataTxn = (SetDataTxn) txn;
 rc.path = setDataTxn.getPath();
 rc.stat = setData(setDataTxn.getPath(), setDataTxn
 .getData(), setDataTxn.getVersion(), header
 .getZxid(), header.getTime());
 break;
清單 22.setData 程式碼
public Stat setData(String path, byte data[], int version, long zxid,
 long time) throws KeeperException.NoNodeException {
 Stat s = new Stat();
 DataNodeV1 n = nodes.get(path);
 if (n == null) {
 throw new KeeperException.NoNodeException();
 }
 synchronized (n) {
 n.data = data;
 n.stat.setMtime(time);
 n.stat.setMzxid(zxid);
 n.stat.setVersion(version);
 n.copyStat(s);
 }
 dataWatches.triggerWatch(path, EventType.NodeDataChanged);
 return s;
 }
清單 23.triggerWatch 程式碼
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type,
 KeeperState.SyncConnected, path);
//將事件型別(EventType)、通知狀態(WatchedEvent)、節點路徑封裝成一個 WatchedEvent 物件
 HashSet<Watcher> watchers;
 synchronized (this) {
//根據資料節點的節點路徑從 watchTable 裡面取出對應的 Watcher。如果沒有找到 Watcher 物件,
說明沒有任何客戶端在該資料節點上註冊過 Watcher,直接退出。如果找打了 Watcher 就將其提取出來,
同時會直接從 watchTable 和 watch2Paths 裡刪除 Watcher,即 Watcher 是一次性的,觸發一次就失效了。
 watchers = watchTable.remove(path);
for (Watcher w : watchers) {
 HashSet<String> paths = watch2Paths.get(w);
 }
 }
 for (Watcher w : watchers) {
 if (supress != null && supress.contains(w)) {
 continue;
 }
//對於需要註冊 Watcher 的請求,ZooKeeper 會把請求對應的惡 ServerCnxn 作為一個 Watcher 儲存,
所以這裡呼叫的 process 方法實質上是 ServerCnxn 的對應方法
 w.process(e);
 }
 return watchers;
}

從上面的程式碼我們可以總結出,如果想要處理一個 Watcher,需要執行的步驟如下所示:

1. 將事件型別(EventType)、通知狀態(WatchedEvent)、節點路徑封裝成一個 WatchedEvent 物件。

2. 根據資料節點的節點路徑從 watchTable 裡面取出對應的 Watcher。如果沒有找到 Watcher 物件,說明沒有任何客戶端在該資料節點上註冊過 Watcher,直接退出。如果找到了 Watcher 就將其提取出來,同時會直接從 watchTable 和 watch2Paths 裡刪除 Watcher,即 Watcher 是一次性的,觸發一次就失效了。

3. 對於需要註冊 Watcher 的請求,ZooKeeper 會把請求對應的 ServerCnxn 作為一個 Watcher 儲存,所以這裡呼叫的 process 方法實質上是 ServerCnxn 的對應方法,如清單 24 所示,在請求頭標記“-1”表示當前是一個通知,將 WatchedEvent 包裝成 WatcherEvent 用於網路傳輸序列化,向客戶端傳送通知,真正的回撥方法在客戶端,就是我們清單 10 裡面定義的 process() 方法。

清單 24.ServerCnxn 類程式碼
synchronized public void process(WatchedEvent event) {
 ReplyHeader h = new ReplyHeader(-1, -1L, 0);
 if (LOG.isTraceEnabled()) {
 ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
 "Deliver event " + event + " to 0x"
 + Long.toHexString(this.sessionId)
 + " through " + this);
 }

 // Convert WatchedEvent to a type that can be sent over the wire
 WatcherEvent e = event.getWrapper();

 sendResponse(h, e, "notification");
 }

如清單 24 所示,客戶端收到訊息後,會呼叫 ClientCnxn 的 SendThread.readResponse 方法來進行統一處理,如清單所示。如果響應頭 replyHdr 中標識的 Xid 為 02,表示是 ping,如果為-4,表示是驗證包,如果是-1,表示這是一個通知型別的響應,然後進行反序列化、處理 chrootPath、還原 WatchedEvent、回撥 Watcher 等步驟,其中回撥 Watcher 步驟將 WacthedEvent 物件交給 EventThread 執行緒,在下一個輪詢週期中進行 Watcher 回撥。

清單 25.SendThread 執行緒程式碼
class SendThread extends ZooKeeperThread {
 private long lastPingSentNs;
 private final ClientCnxnSocket clientCnxnSocket;
 private Random r = new Random(System.nanoTime()); 
 private boolean isFirstConnect = true;

 void readResponse(ByteBuffer incomingBuffer) throws IOException {
 ByteBufferInputStream bbis = new ByteBufferInputStream(
 incomingBuffer);
 BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
 ReplyHeader replyHdr = new ReplyHeader();

 replyHdr.deserialize(bbia, "header");
 if (replyHdr.getXid() == -2) {

如清單 25 所示,SendThread 接收到服務端的通知事件後,會通過呼叫 EventThread 類的 queueEvent 方法將事件傳給 EventThread 執行緒,queueEvent 方法根據該通知事件,從 ZKWatchManager 中取出所有相關的 Watcher,如清單 26 所示。

清單 26.EventThread 執行緒程式碼
class EventThread extends ZooKeeperThread {
public void queueEvent(WatchedEvent event) {
 if (event.getType() == EventType.None
 && sessionState == event.getState()) {
 return;
 }
 sessionState = event.getState();

 // materialize the watchers based on the event
 WatcherSetEventPair pair = new WatcherSetEventPair(
 watcher.materialize(event.getState(), event.getType(),
 event.getPath()),
 event);
 // queue the pair (watch set & event) for later processing
 waitingEvents.add(pair);
 }

客戶端在識別出事件型別 EventType 之後,會從相應的 Watcher 儲存中刪除對應的 Watcher,獲取到相關的 Watcher 之後,會將其放入 waitingEvents 佇列,該佇列從字面上就能理解是一個待處理佇列,執行緒的 run 方法會不斷對該該佇列進行處理,這就是一種非同步處理思維的實現。

清單 27.ZKWatchManager 取出 Watcher
public Set<Watcher> materialize(Watcher.Event.KeeperState state,
 Watcher.Event.EventType type,
 String clientPath)
 {
 Set<Watcher> result = new HashSet<Watcher>();
case NodeCreated:
 synchronized (dataWatches) {
 addTo(dataWatches.remove(clientPath), result);
 }
 synchronized (existWatches) {
 addTo(existWatches.remove(clientPath), result);
 }
 break;
清單 28.EventThread 執行緒的 run 方法
public void run() {
 try {
 isRunning = true;
 while (true) {
 Object event = waitingEvents.take();
 if (event == eventOfDeath) {
 wasKilled = true;
 } else {
 processEvent(event);
 }
 if (wasKilled)
 synchronized (waitingEvents) {
 if (waitingEvents.isEmpty()) {
 isRunning = false;
 break;
 }
 }
 }

ZooKeeper Watcher 特性總結

1. 註冊只能確保一次消費

無論是服務端還是客戶端,一旦一個 Watcher 被觸發,ZooKeeper 都會將其從相應的儲存中移除。因此,開發人員在 Watcher 的使用上要記住的一點是需要反覆註冊。這樣的設計有效地減輕了服務端的壓力。如果註冊一個 Watcher 之後一直有效,那麼針對那些更新非常頻繁的節點,服務端會不斷地向客戶端傳送事件通知,這無論對於網路還是服務端效能的影響都非常大。

2. 客戶端序列執行

客戶端 Watcher 回撥的過程是一個串行同步的過程,這為我們保證了順序,同時,需要開發人員注意的一點是,千萬不要因為一個 Watcher 的處理邏輯影響了整個客戶端的 Watcher 回撥。

3. 輕量級設計

WatchedEvent 是 ZooKeeper 整個 Watcher 通知機制的最小通知單元,這個資料結構中只包含三部分的內容:通知狀態、事件型別和節點路徑。也就是說,Watcher 通知非常簡單,只會告訴客戶端發生了事件,而不會說明事件的具體內容。例如針對 NodeDataChanged 事件,ZooKeeper 的 Watcher 只會通知客戶指定資料節點的資料內容發生了變更,而對於原始資料以及變更後的新資料都無法從這個事件中直接獲取到,而是需要客戶端主動重新去獲取資料,這也是 ZooKeeper 的 Watcher 機制的一個非常重要的特性。另外,客戶端向服務端註冊 Watcher 的時候,並不會把客戶端真實的 Watcher 物件傳遞到服務端,僅僅只是在客戶端請求中使用 boolean 型別屬性進行了標記,同時服務端也僅僅只是儲存了當前連線的 ServerCnxn 物件。這樣輕量級的 Watcher 機制設計,在網路開銷和服務端記憶體開銷上都是非常廉價的。