zookeeper之事件觸發
阿新 • • 發佈:2020-09-02
前面這麼長的說明,只是為了清洗的說明事件的註冊流程,最終的觸發,還得需要通過事務型操作來完成。
在我們最開始的案例中,通過如下程式碼去完成了事件的觸發。
那接下里,客戶端會收到這個 response,觸發 SendThread.readResponse 方法。
zookeeper.setData(“/mic”, “1”.getByte(),-1) ; //修改節點的值觸發監聽前面的客戶端和服務端對接的流程就不再重複講解了,互動流程是一樣的,唯一的差別在於事件觸發了。 服務端的事件響應 DataTree.setData()
public Stat setData(String path, byte data[], int version, long zxid, long time)throws KeeperException.NoNodeException { Stat s = new Stat(); DataNode n = nodes.get(path); if (n == null) { throw new KeeperException.NoNodeException(); } byte lastdata[] = null; synchronized (n) { lastdata = n.data; n.data = data; n.stat.setMtime(time); n.stat.setMzxid(zxid); n.stat.setVersion(version); n.copyStat(s); } // now update if the path is in a quota subtree. String lastPrefix = getMaxPrefixWithQuota(path); if (lastPrefix != null) { this.updateBytes(lastPrefix, (data == null ? 0 : data.length) - (lastdata == null ? 0 : lastdata.length)); } dataWatches.triggerWatch(path, EventType.NodeDataChanged); // 觸發對應節點的NodeDataChanged事件 return s; }
WatcherManager. triggerWatch
Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) { WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); // 根據事件型別、連線狀態、節點路徑建立 // WatchedEvent HashSet<Watcher> watchers; synchronized (this) { watchers = watchTable.remove(path); // 從 watcher 表中移除 path,並返回其對應的 // watcher 集合 if (watchers == null || watchers.isEmpty()) { if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "No watchers for " + path); } return null; } for (Watcher w : watchers) { // 遍歷 watcher 集合 HashSet<String> paths = watch2Paths.get(w); // 根據 watcher 從 // watcher 表中取出路徑集合 if (paths != null) { paths.remove(path); // 移除路徑 } } } for (Watcher w : watchers) { // 遍歷 watcher 集合 if (supress != null && supress.contains(w)) { continue; } w.process(e); // OK,重點又來了,w.process 是做什麼呢? } return watchers; }
w.process(e);
還記得我們在服務端繫結事件的時候,watcher 繫結是是什麼?是 ServerCnxn,所以 w.process(e),其實呼叫的應該是 ServerCnxn 的 process 方法。而servercnxn 又是一個抽象方法,有兩個實現類,分別是:NIOServerCnxn 和NIOServerCnxn。那接下來我們扒開 NIOServerCnxn 這個類的 process 方法看看究竟。public void process(WatchedEvent event) { ReplyHeader h = new ReplyHeader(-1, -1L, 0); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "Deliver event " + event + " to 0x" + Long.toHexString(this.sessionId) + " through " + this); } // Convert WatchedEvent to a type that can be sent over the wire WatcherEvent e = event.getWrapper(); try { sendResponse(h, e, "notification"); // look, 這個地方傳送了一個事件,事件物件為 // WatcherEvent。完美 } catch (IOException e1) { if (LOG.isDebugEnabled()) { LOG.debug("Problem sending to " + getRemoteSocketAddress(), e1); } close(); } }