1. 程式人生 > 實用技巧 >zookeeper之事件觸發

zookeeper之事件觸發

前面這麼長的說明,只是為了清洗的說明事件的註冊流程,最終的觸發,還得需要通過事務型操作來完成。 在我們最開始的案例中,通過如下程式碼去完成了事件的觸發。
zookeeper.setData(“/mic”, “1”.getByte(),-1) ; //修改節點的值觸發監聽
前面的客戶端和服務端對接的流程就不再重複講解了,互動流程是一樣的,唯一的差別在於事件觸發了。 服務端的事件響應 DataTree.setData()
public Stat setData(String path, byte data[], int version, long zxid, long time)throws KeeperException.NoNodeException {
		Stat s = new Stat();
		DataNode n = nodes.get(path);
		if (n == null) {
			throw new KeeperException.NoNodeException();
		}
		byte lastdata[] = null;
		synchronized (n) {
			lastdata = n.data;
			n.data = data;
			n.stat.setMtime(time);
			n.stat.setMzxid(zxid);
			n.stat.setVersion(version);
			n.copyStat(s);
		}
		// now update if the path is in a quota subtree.
		String lastPrefix = getMaxPrefixWithQuota(path);
		if (lastPrefix != null) {
			this.updateBytes(lastPrefix, (data == null ? 0 : data.length) - (lastdata == null ? 0 : lastdata.length));
		}
		dataWatches.triggerWatch(path, EventType.NodeDataChanged); // 觸發對應節點的NodeDataChanged事件
		return s;
	}

WatcherManager. triggerWatch

Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
		WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); // 根據事件型別、連線狀態、節點路徑建立
																					// WatchedEvent
		HashSet<Watcher> watchers;
		synchronized (this) {
			watchers = watchTable.remove(path); // 從 watcher 表中移除 path,並返回其對應的
												// watcher 集合
			if (watchers == null || watchers.isEmpty()) {
				if (LOG.isTraceEnabled()) {
					ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "No watchers for " + path);
				}
				return null;
			}
			for (Watcher w : watchers) { // 遍歷 watcher 集合
				HashSet<String> paths = watch2Paths.get(w); // 根據 watcher 從
															// watcher 表中取出路徑集合
				if (paths != null) {
					paths.remove(path); // 移除路徑
				}
			}
		}
		for (Watcher w : watchers) { // 遍歷 watcher 集合
			if (supress != null && supress.contains(w)) {
				continue;
			}
			w.process(e); // OK,重點又來了,w.process 是做什麼呢?
		}
		return watchers;
	}

w.process(e);

還記得我們在服務端繫結事件的時候,watcher 繫結是是什麼?是 ServerCnxn,所以 w.process(e),其實呼叫的應該是 ServerCnxn 的 process 方法。而servercnxn 又是一個抽象方法,有兩個實現類,分別是:NIOServerCnxn 和NIOServerCnxn。那接下來我們扒開 NIOServerCnxn 這個類的 process 方法看看究竟。
public void process(WatchedEvent event) {
		ReplyHeader h = new ReplyHeader(-1, -1L, 0);
		if (LOG.isTraceEnabled()) {
			ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
					"Deliver event " + event + " to 0x" + Long.toHexString(this.sessionId) + " through " + this);
		}
		// Convert WatchedEvent to a type that can be sent over the wire
		WatcherEvent e = event.getWrapper();
		try {
			sendResponse(h, e, "notification"); // look, 這個地方傳送了一個事件,事件物件為
												// WatcherEvent。完美
		} catch (IOException e1) {
			if (LOG.isDebugEnabled()) {
				LOG.debug("Problem sending to " + getRemoteSocketAddress(), e1);
			}
			close();
		}
	}
那接下里,客戶端會收到這個 response,觸發 SendThread.readResponse 方法。