## Zookeeper原始碼閱讀(六) Watcher
前言
好久沒有更新部落格了,最近這段時間過得很壓抑,終於開始踏上為換工作準備的正軌了,工作又真的很忙而且很瑣碎,讓自己有點煩惱,希望能早點結束這種狀態。
繼上次分析了ZK的ACL相關程式碼後,ZK裡非常重要的另一個特性就是Watcher機制了。其實在我看來,就ZK的使用而言,Watche機制是最核心的特性也不為過了!這一篇先簡單介紹下watcher相關的實體類和介面。
Watcher機制
在ZK中,客戶端可以為znode向服務端註冊監聽,當相應znode的指定事件被觸發時,服務端就會向客戶端傳送通知,而客戶端收到通知後也會執行相應的響應邏輯。整體邏輯如下:
Watcher特性:
- 一次性:每次註冊的watcher只能被觸發一次,之後就會被刪除,如果想繼續觸發需要註冊新的watcher;
- 序列性:客戶端執行watcher是一個序列過程;
- 輕量性:watcher僅包含通知狀態、事件型別和節點路徑。
Watcher總體框架
看Watcher相關的程式碼和寫這篇部落格參考了很多資料,首先列一下Watcher的總體的框架:
上面兩幅圖能很清楚的看到和Watcher有關的介面和類的結構,簡單介紹下它們的功能:
Watcher和Event介面:使用過ZK的人對Watcher介面應該很熟悉,內部定義的process方法是watcher被觸發是zk呼叫的方法。同時,watcher內部有內部介面Event,定義了事件的型別(連線狀態KeeperState和znode的變化狀態EventType);
WatchedEvent介面:連線狀態和znode的變化狀態以及znode的路徑;
WatcherEvent介面:內容和WatchedEvent一模一樣,但是是負責網路傳輸的,由jute生成。
ClientWatchManager介面:根據event返回相應的watcher(Return a set of watchers that should be notified of the event.)。定義了materialize方法。ZKWatchManager實現了這個介面。
程式碼分析
Watcher介面
這是watcher介面和兩個內部列舉之間的類圖關係,詳細看下程式碼:
/**
* This interface specifies the public interface an event handler class must
* implement. A ZooKeeper client will get various events from the ZooKeeper
* server it connects to. An application using such a client handles these
* events by registering a callback object with the client. The callback object
* is expected to be an instance of a class that implements Watcher interface.
*
*/
@InterfaceAudience.Public
public interface Watcher {
/**
* This interface defines the possible states an Event may represent
*/
@InterfaceAudience.Public
//內部介面Event,表示事件的狀態
public interface Event {
/**
* Enumeration of states the ZooKeeper may be at the event
*/
@InterfaceAudience.Public
//內部列舉,連線狀態
public enum KeeperState {
/** Unused, this state is never generated by the server */
@Deprecated
Unknown (-1),
/** The client is in the disconnected state - it is not connected
* to any server in the ensemble. */
//斷開連線-0
Disconnected (0),
/** Unused, this state is never generated by the server */
@Deprecated
NoSyncConnected (1),
/** The client is in the connected state - it is connected
* to a server in the ensemble (one of the servers specified
* in the host connection parameter during ZooKeeper client
* creation). */
//連線狀態-3
SyncConnected (3),
/**
* Auth failed state
*/
//認證失敗
AuthFailed (4),
/**
* The client is connected to a read-only server, that is the
* server which is not currently connected to the majority.
* The only operations allowed after receiving this state is
* read operations.
* This state is generated for read-only clients only since
* read/write clients aren't allowed to connect to r/o servers.
*/
//連線到read-only的server
ConnectedReadOnly (5),
/**
* SaslAuthenticated: used to notify clients that they are SASL-authenticated,
* so that they can perform Zookeeper actions with their SASL-authorized permissions.
*/
//Sasl認證通過-6
SaslAuthenticated(6),
/** The serving cluster has expired this session. The ZooKeeper
* client connection (the session) is no longer valid. You must
* create a new client connection (instantiate a new ZooKeeper
* instance) if you with to access the ensemble. */
//session過期狀態-(-112)
Expired (-112);
//intValue用來表示當前的連線狀態
private final int intValue; // Integer representation of value
// for sending over wire
//構造器
KeeperState(int intValue) {
this.intValue = intValue;
}
//返回整型值
public int getIntValue() {
return intValue;
}
//int->狀態
public static KeeperState fromInt(int intValue) {
switch(intValue) {
case -1: return KeeperState.Unknown;
case 0: return KeeperState.Disconnected;
case 1: return KeeperState.NoSyncConnected;
case 3: return KeeperState.SyncConnected;
case 4: return KeeperState.AuthFailed;
case 5: return KeeperState.ConnectedReadOnly;
case 6: return KeeperState.SaslAuthenticated;
case -112: return KeeperState.Expired;
default:
throw new RuntimeException("Invalid integer value for conversion to KeeperState");
}
}
}
/**
* Enumeration of types of events that may occur on the ZooKeeper
*/
@InterfaceAudience.Public
//事件型別
public enum EventType {
None (-1),//無事件
NodeCreated (1),//建立節點
NodeDeleted (2),//刪除節點
NodeDataChanged (3),//節點資料改變
NodeChildrenChanged (4);//節點的孩子節點發生change
//時間整型值
private final int intValue; // Integer representation of value
// for sending over wire
//構造器
EventType(int intValue) {
this.intValue = intValue;
}
//獲取整型值
public int getIntValue() {
return intValue;
}
//int->事件型別
public static EventType fromInt(int intValue) {
switch(intValue) {
case -1: return EventType.None;
case 1: return EventType.NodeCreated;
case 2: return EventType.NodeDeleted;
case 3: return EventType.NodeDataChanged;
case 4: return EventType.NodeChildrenChanged;
default:
throw new RuntimeException("Invalid integer value for conversion to EventType");
}
}
}
}
//回撥方法
abstract public void process(WatchedEvent event);
}
為了更簡潔的說明Event介面中兩種狀態在實際使用時的情況,用《從zk到paxos》中的表來表示下:
特別的說明:
- NodeDataChanged事件包含znode的dataversion和data本身的修改均會觸發Watcher,所以即使用相同內容來更新data,dataversion依然會更新;NodeChildrenDataChanged則指的是子節點列表發生變化,如節點增加或刪除時會觸發。
- AuthFailed和NoAuth是兩種狀態,前者是auth的模式不對(例如選擇了digest1,而不是正確的digest模式),後者是表示auth資訊不對。
- process這個回撥方法非常重要。當zk向客戶端傳送一個watcher通知時,客戶端就會對相應的process方法進行回撥,從而實現對事件的處理。
WatchedEvent和WatcherEvent
/**
* A WatchedEvent represents a change on the ZooKeeper that a Watcher
* is able to respond to. The WatchedEvent includes exactly what happened,
* the current state of the ZooKeeper, and the path of the znode that
* was involved in the event.
*/
@InterfaceAudience.Public
public class WatchedEvent {
final private KeeperState keeperState;//連線狀態
final private EventType eventType;//事件型別
private String path;//路徑
WatchedEvent用來封裝服務端事件並傳遞給Watcher,從而方便回撥方法process來處理。
WatcherEvetn從內容含義上來說和WatchedEvent是一樣的,只是WatcherEvent實現了Record介面,方便序列化來進行網路傳輸。
public class WatcherEvent implements Record {
private int type;//事件型別
private int state;//連線狀態
private String path;//路徑
特別的是,WatchedEvent中的getWrapper方法就是把WatchedEvent包裝成WatcherEvent,程式碼很簡單:
/**
* Convert WatchedEvent to type that can be sent over network
*/
public WatcherEvent getWrapper() {
return new WatcherEvent(eventType.getIntValue(), //呼叫watcherevent的建構函式
keeperState.getIntValue(),
path);
}
ClientWatchManager和ZKWatchManager
ClientWatchManager是根據獲得的Event得到需要通知的watcher。
public interface ClientWatchManager {
/**
* Return a set of watchers that should be notified of the event. The
* manager must not notify the watcher(s), however it will update it's
* internal structure as if the watches had triggered. The intent being
* that the callee is now responsible for notifying the watchers of the
* event, possibly at some later time.
*
* @param state event state
* @param type event type
* @param path event path
* @return may be empty set but must not be null
*/
public Set<Watcher> materialize(Watcher.Event.KeeperState state,
Watcher.Event.EventType type, String path);
其實現類ZKWatchManager在Zookeeper類中:
/**
* Manage watchers & handle events generated by the ClientCnxn object.
* 可以看到這個類的目的是為了處理有clientCnxn獲取的events和相關的watcher
* We are implementing this as a nested class of ZooKeeper so that
* the public methods will not be exposed as part of the ZooKeeper client
* API.
*/
private static class ZKWatchManager implements ClientWatchManager {
private final Map<String, Set<Watcher>> dataWatches =
new HashMap<String, Set<Watcher>>();//通過getdata設定的watch
private final Map<String, Set<Watcher>> existWatches =
new HashMap<String, Set<Watcher>>();//通過exists設定的watch
private final Map<String, Set<Watcher>> childWatches =
new HashMap<String, Set<Watcher>>();//通過getchildren設定的watch
private volatile Watcher defaultWatcher;//client在和zookeeper建立連線時傳遞的watcher
final private void addTo(Set<Watcher> from, Set<Watcher> to) {
if (from != null) {
to.addAll(from);
}
}
/* (non-Javadoc)
* @see org.apache.zookeeper.ClientWatchManager#materialize(Event.KeeperState,
* Event.EventType, java.lang.String)
*/
@Override
public Set<Watcher> materialize(Watcher.Event.KeeperState state,
Watcher.Event.EventType type,
String clientPath)
{
//相關的watcher集合
Set<Watcher> result = new HashSet<Watcher>();
switch (type) {
case None://事件型別是none
result.add(defaultWatcher);//把defaultWatcher加入
//是否清空,根據zookeeper.disableAutoWatchReset欄位進行配置的值、Zookeeper的狀態是否為同步連線來判斷
boolean clear = ClientCnxn.getDisableAutoResetWatch() &&
state != Watcher.Event.KeeperState.SyncConnected;//
//datawatches同步塊
synchronized(dataWatches) {
for(Set<Watcher> ws: dataWatches.values()) {
result.addAll(ws);//把所有的watcher加入...含義應該是建立連線後如果連線斷開或超時會清空所有的watcher
}
if (clear) {//如果要清空
dataWatches.clear();
}
}
synchronized(existWatches) {//同上
for(Set<Watcher> ws: existWatches.values()) {
result.addAll(ws);
}
if (clear) {
existWatches.clear();
}
}
synchronized(childWatches) {//同上
for(Set<Watcher> ws: childWatches.values()) {
result.addAll(ws);
}
if (clear) {
childWatches.clear();
}
}
return result;
case NodeDataChanged:
case NodeCreated://nodedatachange和新建node
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);//把所有clientPath位置的datawatch移除並加入result
}
synchronized (existWatches) {
addTo(existWatches.remove(clientPath), result);//把所有clientPath位置的existwatch移除並加入result
}
break;
case NodeChildrenChanged://node的孩子節點發生改變
synchronized (childWatches) {
addTo(childWatches.remove(clientPath), result);//把所有clientPath位置的childwatch移除並加入result
}
break;
case NodeDeleted://node節點被刪除
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);//把所有clientPath位置的datawatch移除並加入result
}
// XXX This shouldn't be needed, but just in case
synchronized (existWatches) {//不應該發生,為什麼呢?
Set<Watcher> list = existWatches.remove(clientPath);
if (list != null) {
addTo(existWatches.remove(clientPath), result);
LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
}
}
synchronized (childWatches) {
addTo(childWatches.remove(clientPath), result);//把所有clientPath位置的childwatch移除並加入result
}
break;
default://預設處理
String msg = "Unhandled watch event type " + type
+ " with state " + state + " on path " + clientPath;
LOG.error(msg);
throw new RuntimeException(msg);
}
return result;
}
}
可以看到,這裡的程式碼裡體現了watcher的一次性,每次觸發之後原來的watcher就會被刪除。
WatcherSetEventPair
這個類的目的很簡單,就是為了把Event和Watcher繫結起來,此類在clientCnxn中。
private static class WatcherSetEventPair {
private final Set<Watcher> watchers;//watchers
private final WatchedEvent event;//事件
//構造器
public WatcherSetEventPair(Set<Watcher> watchers, WatchedEvent event) {
this.watchers = watchers;
this.event = event;
}
}
總結
這一篇主要先說一下Watcher相關的一些介面和實體類,但是儘管參考了許多資料,但還是有幾處不太理解:
為什麼nodedelete後出發的watcher中exist相關的不會被觸發呢?
為什麼需要WatcherSetEventPair 這個類(參考,講得很好,後面會涉及到這裡)
因為watcher介面process函式需要event引數 那麼在ClientWatchManager完成了根據event找到對應的watchers之後 就可以直接呼叫watcher.process(event)了
但是!!!由於ClientCnxn.EventThread是非同步處理的,通過生產消費完成 在processEvent的函式中,要取出一個數據結構Object,既包含watchers集合,又要包含event,所以就把兩者組合在一起出現了WatcherSetEventPair 這個類;
Watcher的註冊和觸發
materialize方法中none時加入所有的watcher應該是為了在連線狀態發生變化時刪除所有的watcher。