【zookeeper】事件 watch 機制 原理
zk作為一款成熟的分散式協調框架,訂閱-釋出功能是很重要的一個。所謂訂閱釋出功能,其實說白了就是觀察者模式。觀察者會訂閱一些感興趣的主題,然後這些主題一旦變化了,就會自動通知到這些觀察者。
zk的訂閱釋出也就是watch機制,是一個輕量級的設計。因為它採用了一種推拉結合的模式。一旦服務端感知主題變了,那麼只會傳送一個事件型別和節點資訊給關注的客戶端,而不會包括具體的變更內容,所以事件本身是輕量級的,這就是所謂的“推”部分。然後,收到變更通知的客戶端需要自己去拉變更的資料,這就是“拉”部分。
訂閱-釋出在zk中是通過事件註冊和回撥機制實現的,下面看下這部分內容。
整個註冊回撥過程分為三個大的部分:客戶端註冊,服務端發回事件,客戶端回撥
1.客戶端註冊:
回撥介面:
public interface Watcher {
abstract public void process(WatchedEvent event);
}
所有的事件回撥介面都需要實現這個介面,並在process內部實現回撥邏輯。event封裝了事件的資訊。event有兩個層級,第一個是state,第二個是evetType。不同的state有不同的type。
下面是對應關係:
zk的事件註冊介面:
zk的事件註冊介面主要有有以下的四類:
1.預設watch,也就是在new一個ZooKeeper例項代表了一個zk客戶端去連線伺服器的時候,在構造方法裡面傳入的一個預設watch的回撥介面,這個主要解決連線事件。在event中對應了syncConnected的state和none的type。
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
2.通過getData,getChildren和exist三個介面。每一種又有同步和非同步兩種版本。下面只看getData版本的:
public byte[] getData(final String path, Watcher watcher, Stat stat) public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx)
第一個有返回值的是同步的,第二個無返回值有回撥cb的是非同步的。當然,每一個又有幾個過載版本,這裡只貼了其中的一種。
所以註冊的介面基本上是我們先實現一個watch介面,作為回撥處理邏輯,然後呼叫以上的介面來註冊感興趣的事件。那麼這個註冊過程是怎樣的?
我們重點以getData同步版本來說明,非同步的其實在註冊這一塊是一樣的,都是通過構造packet來完成。
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new DataWatchRegistration(watcher, clientPath);
}
。。。
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
在getData內部,首先構建了一個watchRegistration例項,這個類後面說,總之它封裝了了回撥介面和關注節點。然後把這個註冊物件和packetheader一起傳入了submit方法。再看submit方法:
Packet packet = queuePacket(h, r, request, response, null, null, null,
null, watchRegistration);
裡面構造了一個packet,再看是如何構造的:
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration)
{
Packet packet = null;
synchronized (outgoingQueue) {
if (h.getType() != OpCode.ping && h.getType() != OpCode.auth) {
h.setXid(getXid());
}
packet = new Packet(h, r, request, response, null,
watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
if (!zooKeeper.state.isAlive() || closing) {
conLossPacket(packet);
} else {
// If the client is asking to close the session then
// mark as closing
if (h.getType() == OpCode.closeSession) {
closing = true;
}
outgoingQueue.add(packet);
}
}
sendThread.wakeup();
return packet;
}
主要就是設定了packet的屬性,然後把這個請求packet送入了傳送佇列。要知道我們註冊回撥的介面本來是用來獲取資料的,所以回撥依附在了獲取這個過程中,這裡的packet構造主要是為了獲取一次資料,構建的一個請求包,我們的事件回撥依附了這個過程,然後作為了這個請求packet的屬性儲存了起來。因為我們的是同步版本,所以packet的非同步介面cb在上一步設定為了null。這裡和回撥相關的就是設定了packet的watchRegistration屬性,也就是我們傳入的回撥介面,這是通過packet的構造方法完成的。所以有必要看下一個請求packet的內部:
static class Packet {
RequestHeader header;
ByteBuffer bb;
/** Client's view of the path (may differ due to chroot) **/
String clientPath;
/** Servers's view of the path (may differ due to chroot) **/
String serverPath;
ReplyHeader replyHeader;
Record request;
Record response;
boolean finished;
AsyncCallback cb;
Object ctx;
WatchRegistration watchRegistration;
這是packet的屬性,這裡的wathRegistration就是回撥介面,cb是getData的非同步版本的回撥,在得到資料以後的回撥函式,也就是上面我們談到的設為null的屬性,因為我們看的是getData的同步版本,所以為null。需要明確兩個回撥的區別。
到這裡,我們的事件回撥函式已經和這次getData請求的packet關聯起來的。
那麼,最後這個packet就會進入到outgoingQueue中被髮送。
也就是在SendThread的一次write過程中。
然後getData請求的資料就會被伺服器返回,在SendThread的一次read過程中,具體在readResponse函式中的最後部分,也就是finishPacket函式中,完成最後的註冊:
private void finishPacket(Packet p) {
if (p.watchRegistration != null) {
p.watchRegistration.register(p.replyHeader.getErr());
}
if (p.cb == null) {
synchronized (p) {
p.finished = true;
p.notifyAll();
}
} else {
p.finished = true;
eventThread.queuePacket(p);
}
}
可以看到這裡呼叫了一個register的方法。
下面需要了解下zk客戶端與註冊有關的資料結構:
在ZooKeeper類中,有一個內部類ZKWatchManager,是客戶端儲存所有的事件註冊的類,裡面有以下幾個重要的屬性,儲存回撥:
private static class ZKWatchManager implements ClientWatchManager {
private final Map<String, Set<Watcher>> dataWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches =
new HashMap<String, Set<Watcher>>();
private volatile Watcher defaultWatcher;
從名字上就可以看出各個屬性的作用,正好對應了我們開始所說的4種回撥。
map中的key就是節點的path,set就是該節點上所有的回撥。因為預設的回撥處理只有一個,所以就不是map,其餘的事件,每一個節點都可能會有多個,所以是一個set。
再看一直出現的WatchRegistration結構:
abstract class WatchRegistration {
private Watcher watcher;
private String clientPath;
public WatchRegistration(Watcher watcher, String clientPath)
{
this.watcher = watcher;
this.clientPath = clientPath;
}
是一個抽象類,其實就是封裝了一個事件註冊,包括了感興趣的節點和回撥函式。data,children和exist三種事件都有一個對應的實現類。這個抽象類有一個非抽象方法register,負責將packet裡面的watchRegistration放到之前的watchmanager中:
public void register(int rc) {
if (shouldAddWatch(rc)) {
Map<String, Set<Watcher>> watches = getWatches(rc);
synchronized(watches) {
Set<Watcher> watchers = watches.get(clientPath);
if (watchers == null) {
watchers = new HashSet<Watcher>();
watches.put(clientPath, watchers);
}
watchers.add(watcher);
}
}
}
首先根據事件型別拿到正確的map,然後把watch回撥放入map裡面。
至此客戶端註冊一個事件回撥的邏輯就清晰了,總結就是,通過註冊函式來設定回撥介面為packet的屬性。然後在註冊函式收到其自身希望得到的資料的時候,來把回撥函式註冊到manager上。
服務端處理:
主要分為了兩部分,服務端新增事件,服務端觸發事件以後的處理。
先看服務端新增事件:
還是以剛才的getData為例,服務端的process收到了getData請求,就會返回資料,這個procesor是FinalRequestProcessor,其中處理getData請求的部分程式碼:
case OpCode.getData: {
lastOp = "GETD";
GetDataRequest getDataRequest = new GetDataRequest();
ZooKeeperServer.byteBuffer2Record(request.request,
getDataRequest);
DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
if (n == null) {
throw new KeeperException.NoNodeException();
}
Long aclL;
synchronized(n) {
aclL = n.acl;
}
PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclL),
ZooDefs.Perms.READ,
request.authInfo);
Stat stat = new Stat();
byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
getDataRequest.getWatch() ? cnxn : null);
rsp = new GetDataResponse(b, stat);
break;
}
重點是getData函式的呼叫,檢測客戶端是否註冊了watch,如果註冊了,那麼就傳cnxn,否則就傳null。這裡的cnxn其實是服務端處理io的執行緒類,後面說。getData最終會到dataTree的getData函式:
public byte[] getData(String path, Stat stat, Watcher watcher)
throws KeeperException.NoNodeException {
DataNode n = nodes.get(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
synchronized (n) {
n.copyStat(stat);
if (watcher != null) {
dataWatches.addWatch(path, watcher);
}
return n.data;
}
}
會在datawatches裡面新增watch,因為我們是data型別的watch。
在Datatree類有兩個和watch相關的屬性:
private final WatchManager dataWatches = new WatchManager();
private final WatchManager childWatches = new WatchManager();
分別儲存了資料的子節點的watch。再看WatchManager結構:
private final HashMap<String, HashSet<Watcher>> watchTable =
new HashMap<String, HashSet<Watcher>>();
private final HashMap<Watcher, HashSet<String>> watch2Paths =
new HashMap<Watcher, HashSet<String>>();
主要有兩個map,儲存了節點到watch 和 watch到節點的雙向對映,這也是服務端儲存事件的結構。這樣服務端就在相應的節點上添加了一個watch。
再看服務端觸發watch事件邏輯,比如通過setData改變資料:
在datatree的
public Stat setData(String path, byte data[], int version, long zxid,
long time) throws KeeperException.NoNodeException {
函式的最後有一段:
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
會觸發事件:
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type,
KeeperState.SyncConnected, path);
HashSet<Watcher> watchers;
synchronized (this) {
watchers = watchTable.remove(path);
if (watchers == null || watchers.isEmpty()) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"No watchers for " + path);
}
return null;
}
for (Watcher w : watchers) {
HashSet<String> paths = watch2Paths.get(w);
if (paths != null) {
paths.remove(path);
}
}
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
w.process(e);
}
return watchers;
}
最終會呼叫process函式,這裡process函式是watch介面的實現,但是這個只有客戶端才有啊。實際上,服務端這裡的實現類就是服務端的執行緒類:NIOServerCnxn。
public class NIOServerCnxn implements Watcher, ServerCnxn
再看它的process方法:
synchronized public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(-1, -1L, 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"Deliver event " + event + " to 0x"
+ Long.toHexString(this.sessionId)
+ " through " + this);
}
// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();
sendResponse(h, e, "notification");
}
可以看到,只是發了一個事件型別的資訊,header為-1。
客戶端執行回撥:
從上面可以看到,服務端觸發了時間以後會發送一個-1為header的相應。
那麼客戶端就會在io執行緒的read部分讀到這個資訊,最後會到readResponse函式裡處理:
if (replyHdr.getXid() == -1) {
// -1 means notification
if (LOG.isDebugEnabled()) {
LOG.debug("Got notification sessionid:0x"
+ Long.toHexString(sessionId));
}
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");
// convert from a server path to a client path
if (chrootPath != null) {
String serverPath = event.getPath();
if(serverPath.compareTo(chrootPath)==0)
event.setPath("/");
else
event.setPath(serverPath.substring(chrootPath.length()));
}
WatchedEvent we = new WatchedEvent(event);
if (LOG.isDebugEnabled()) {
LOG.debug("Got " + we + " for sessionid 0x"
+ Long.toHexString(sessionId));
}
eventThread.queueEvent( we );
return;
}
把事件event反序列化出來,構建一個watchedevent物件,然後把這個event扔進eventQueue裡面,通過的是queueEvent函式:
public void queueEvent(WatchedEvent event) {
if (event.getType() == EventType.None
&& sessionState == event.getState()) {
return;
}
sessionState = event.getState();
// materialize the watchers based on the event
WatcherSetEventPair pair = new WatcherSetEventPair(
watcher.materialize(event.getState(), event.getType(),
event.getPath()),
event);
// queue the pair (watch set & event) for later processing
waitingEvents.add(pair);
}
這個函式會從之前的WatchManager中恢復出之前的回撥註冊。然後就會等待eventThread來處理。
EventThread也是一個執行緒,會週期性的處理佇列裡的事件。
public void run() {
try {
isRunning = true;
while (true) {
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
processEvent(event);
}
if (wasKilled)
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
調動事件的process函式即可。