ZooKeeper客戶端與服務端的事件watcher原始碼閱讀
我是懷著無比激動的心情寫這篇部落格的,如果對您有幫助,歡迎給我點個贊
watcher存在的必要性
舉個特容易懂的例子: 假如我的專案是基於dubbo+zookeeper搭建的分散式專案, 我有三個功能相同的服務提供者,用zookeeper當成註冊中心,我的三個專案得註冊進zookeeper才能對外暴露服務,但是問題來了,寫的java程式碼怎麼才能註冊進zookeeper呢?當然加入依賴,寫好配置檔案再啟動就成了,這時,這三個服務體提供者就是zookeeper的客戶端了,zookeeper的客戶端不止一個,我選擇了哪個依賴,就是哪個客戶端,光有服務提供者不成啊,對外提供服務,我得需要服務消費者啊,於是用同樣的方式,把消費者也註冊進zookeeper,zookeeper中就存在了4個node,也就是4個客戶端,服務消費者訂閱zookeeper,向它拉取服務提供者的address,然後把地址快取在本地, 進而可以遠端呼叫服務消費者,那麼問題又來了,萬一哪一臺服務提供者掛了,怎麼辦呢?zookeeper是不是得通知消費者呢? 萬一哪一天服務提供者的address變了,是不是也得通知消費者? 這就是watcher存在的意義,它解決了這件事
實驗場景:
假設我們已經成功啟動了zookeeper的服務端和客戶端,並且預先添加了watcher,然後使用控制檯動態的修改下node的data,我們會發現watcher回撥的現象
新增的鉤子函式程式碼如下:
public class ZookepperClientTest { public static void main(String[] args) throws Exception { ZooKeeper client = new ZooKeeper("localhost", 5000, new Watcher() { @Override public void process(WatchedEvent event) { System.err.println("連線,觸發"); } }); Stat stat = new Stat(); // todo 下面新增的事件監聽器可是實現事件的消費訂閱 String content = new String(client.getData("/node1", new Watcher() { @Override public void process(WatchedEvent event) { // todo 任何連線上這個節點的客戶端修改了這個節點的 data資料,都會引起process函式的回撥 // todo 特點1: watch只能使用1次 if (event.getType().equals(Event.EventType.NodeDataChanged)){ System.err.println("當前節點資料發生了改變"); } } }, stat));
看如上的程式碼, 添加了一個自己的watcher
也就是client.getData("/node1", new Watcher() {}
這是個回撥的鉤子函式,執行時不會執行,當滿足的某個條件時才會執行, 比如: node1被刪除了, node1的data被修改了
getData做了哪些事情?
原始碼如下: getdata,顧名思義,返回服務端的node的data+stat, 當然是當服務端的node發生了變化後呼叫的
主要主流如下幾個工作
- 建立
WatchRegistration wcb= new DataWatchRegistration(watcher, clientPath);
- 其實就是一個簡單的內部類,將path 和 watch 封裝進了一個物件
- 建立一個request,並且初始化這個
request.head=getData=4
- 呼叫ClientCnxn.submitRequest(...) , 將現存的這些資訊進一步封裝
- request.setWatch(watcher != null);說明他並沒有將watcher封裝進去,而是僅僅做了個有沒有watcher的標記
public byte[] getData(final String path, Watcher watcher, Stat stat)
throws KeeperException, InterruptedException
{
// todo 校驗path
final String clientPath = path;
PathUtils.validatePath(clientPath);
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
// todo DataWatchRegistration 繼承了 WatchRegistration
// todo DataWatchRegistration 其實就是一個簡單的內部類,將path 和 watch 封裝進了一個物件
wcb = new DataWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
// todo 建立一個請求頭
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getData);
// todo 建立了一個GetDataRequest
GetDataRequest request = new GetDataRequest();
// todo 給這個請求初始化,path 是傳遞進來的path,但是 watcher不是!!! 如果我們給定了watcher , 這裡面的條件就是 true
request.setPath(serverPath);
request.setWatch(watcher != null); // todo 可看看看服務端接收到請求是怎麼辦的
GetDataResponse response = new GetDataResponse();
// todo 同樣由 clientCnxn 上下文進行提交請求, 這個操作應該同樣是阻塞的
// todo EventThread 和 SendThread 同時使用一份 clientCnxn的 submitRequest()
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
if (stat != null) {
DataTree.copyStat(response.getStat(), stat);
}
return response.getData();
}
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
的原始碼我解除安裝下面, 這裡來到這個方法中,一眼能看到,它依然是阻塞的式的,並且requet被進一步封裝進packet
更重要的是 queuePacket()
方法的最後一個引數,存在我們剛剛建立的path+watcher的封裝類
public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration)
throws InterruptedException {
ReplyHeader r = new ReplyHeader();
// todo 來到這個 queuePacket() 方法在下面, 這個方法就是將 使用者輸入-> string ->>> request ->>> packet 的過程
Packet packet = queuePacket(h, r, request, response, null, null, null,
null, watchRegistration);
// todo 使用同步程式碼塊,在下面的進行 同步阻塞等待, 直到有了Response響應才會跳出這個迴圈, 這個finished狀態就是在客戶端接受到服務端的
// todo 的響應後, 將服務端的響應解析出來,然後放置到 pendingqueue裡時,設定上去的
synchronized (packet) {
while (!packet.finished) {
// todo 這個等待是需要喚醒的
packet.wait();
}
}
// todo 直到上面的程式碼塊被喚醒,才會這個方法才會返回
return r;
}
同樣,在queuePacket()方法中將packet提交到outgoingQueue中,最終被seadThread消費傳送到服務端
服務端如何處理watchRegistration不為空的packet
後續我準備用一整篇部落格詳解單機模式下服務端處理請求的流程,所以這篇部落格只說結論
在服務端,使用者的請求最終會按順序流向三個Processor
,分別是
- PrepRequestProcessor
- 負責進行一些狀態的修改
- SyncRequestProcessor
- 將事務日誌同步到磁碟
- FinalRequestProcessor
- 相應使用者的請求
我們直接去看FinalRequestProcessor
的public void processRequest(Request request) {}方法
,看他針對getData()
方式的請求做出了哪些動作.下面來了個小高潮,zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null);
跟進watcher的有無給服務端新增不同的Watcher
真的得劃重點了,當我發現這一點時,我的心情是超級激動的,就像發現了新大陸一樣
case OpCode.getData: {
lastOp = "GETD";
GetDataRequest getDataRequest = new GetDataRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
getDataRequest);
DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
if (n == null) {
throw new KeeperException.NoNodeException();
}
PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.READ,
request.authInfo);
Stat stat = new Stat();
// todo 這裡的操作 getDataRequest.getWatch() ? cnxn : null 對應可客戶端的 跟進watcher有沒有而決定往服務端傳遞 true 還是false 相關
// todo 跟進去 getData()
byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
getDataRequest.getWatch() ? cnxn : null);
//todo cnxn的Processor()被回撥, 往客戶端傳送資料 , 什麼時候觸發呢? 就是上面的 處理事務時的回撥 第127行
// todo 構建了一個 rsp ,在本類的最後面將rsp 響應給client
rsp = new GetDataResponse(b, stat);
break;
}
繼續跟進這個getData()
在服務端維護了一份path+watcher的map
public byte[] getData(String path, Stat stat, Watcher watcher)
throws KeeperException.NoNodeException {
DataNode n = nodes.get(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
synchronized (n) {
n.copyStat(stat);
if (watcher != null) {
// todo 將path 和 watcher 繫結在一起
dataWatches.addWatch(path, watcher);
}
return n.data;
}
}
客戶端開啟命令列,修改服務端node的狀態
書接上回,當客戶單的程式碼去建立ClientCnxn
時,有下面的邏輯 , 它開啟了兩條守護執行緒, sendThread負責向服務端傳送心跳,已經和服務端進行使用者相關的IO交流, EventThread就負責和txn事務相關的處理邏輯,級別上升到針對node
// todo start就是啟動了在構造方法中建立的執行緒
public void start() {
sendThread.start();
eventThread.start();
}
到目前為止,客戶端就有如下三條執行緒了
- 負責處理使用者在控制檯輸入命令的主執行緒
- 守護執行緒1: seadThread
- 守護執行緒2: eventThread
跟進主執行緒的處理使用者輸入部分的邏輯程式碼如下:
下面的程式碼的主要邏輯就是處理使用者輸入的命令,當通過if-else選擇分支判斷使用者到底輸入的啥命令
按照我們的假定的場景,使用者輸入的命令是這樣的 set /path newValue
所以,毫無疑問,經過解析後代碼會去執行下面的stat = zk.setData(path, args[2].getBytes(),
部分
// todo zookeeper客戶端, 處理使用者輸入命令的具體邏輯
// todo 用大白話講,下面其實就是把 從控制檯獲取的使用者的輸入資訊轉換成指定的字元, 然後傳送到服務端
// todo MyCommandOptions 是處理命令列選項和shell指令碼的工具類
protected boolean processZKCmd(MyCommandOptions co) throws KeeperException, IOException, InterruptedException {
// todo 在這個方法中可以看到很多的命令列所支援的命令
Stat stat = new Stat();
// todo 獲取命令列輸入中 0 1 2 3 ... 位置的內容, 比如 0 位置是命令 1 2 3 位置可能就是不同的引數
String[] args = co.getArgArray();
String cmd = co.getCommand();
if (args.length < 1) {
usage();
return false;
}
if (!commandMap.containsKey(cmd)) {
usage();
return false;
}
boolean watch = args.length > 2;
String path = null;
List<ACL> acl = Ids.OPEN_ACL_UNSAFE;
LOG.debug("Processing " + cmd);
if (cmd.equals("quit")) {
System.out.println("Quitting...");
zk.close();
System.exit(0);
} else if (cmd.equals("set") && args.length >= 3) {
path = args[1];
stat = zk.setData(path, args[2].getBytes(),
args.length > 3 ? Integer.parseInt(args[3]) : -1);
printStat(stat);
繼續跟進stat = zk.setData(path, args[2].getBytes(),
下面的邏輯也很簡單,就是將使用者的輸入封裝進來request中,通過ClientCnxn
類的submit方法提交到一個佇列中,等待著sendThread去消費
這次有目的的看一下submitRequest的最後一個引數為null, 這個引數是WatchRegistration的位置,一開始置為null
public Stat setData(final String path, byte data[], int version)
throws KeeperException, InterruptedException
{
final String clientPath = path;
PathUtils.validatePath(clientPath);
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.setData);
SetDataRequest request = new SetDataRequest();
request.setPath(serverPath);
request.setData(data);
request.setVersion(version);
SetDataResponse response = new SetDataResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, null);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
return response.getStat();
}
跟進這個submitRequest()方法, 原始碼如下,不處所料的是,它同樣被阻塞住了,直到服務端給了它響應
當前程式碼的主要邏輯就是將request封裝進packet,然後將packet新增到ClintCnxn維護的outgoingQueue佇列中等待sendThread的消費
這次來到這個方法是因為我們在控制檯輸入的set 命令而觸發的,比較重要的是本次packet攜帶的WatchRegistration==null, 毫無疑問,這次服務端在FinalRequestProcessor中再處理時取出的watcher==null, 也就不會將path+watcher儲存進maptable中
重要:傳送事務訊息
在FinalRequestProcessor
的public void processRequest(Request request) {}
方法中,有如下程式碼
//todo 請求頭不為空
if (request.hdr != null) {
// 獲取請求頭
TxnHeader hdr = request.hdr;
// 獲取事務
Record txn = request.txn;
// todo 跟進這個方法-----<--!!!!!!-----處理事務的邏輯,在這裡面有向客戶端傳送事件的邏輯, 回撥客戶端的watcher----!!!!!!-->
rc = zks.processTxn(hdr, txn);
}
繼續跟進去
// todo 處理事物日誌
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
ProcessTxnResult rc;
int opCode = hdr.getType();
long sessionId = hdr.getClientId();
// todo 繼續跟進去!!!!!!!!!
// todo 跟進 processTxn(hdr, txn)
rc = getZKDatabase().processTxn(hdr, txn);
跟進ZkDatabase.java
中的processTxn(hdr, txn)方法
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
// todo 跟進 processTxn
return dataTree.processTxn(hdr, txn);
}
跟進到DataTree.java
public ProcessTxnResult processTxn(TxnHeader header, Record txn)
{
ProcessTxnResult rc = new ProcessTxnResult();
try {
rc.clientId = header.getClientId();
rc.cxid = header.getCxid();
rc.zxid = header.getZxid();
rc.type = header.getType();
rc.err = 0;
rc.multiResult = null;
switch (header.getType()) { // todo 根據客客戶端傳送過來的type進行switch,
case OpCode.create:
CreateTxn createTxn = (CreateTxn) txn;
rc.path = createTxn.getPath();
// todo 跟進這個建立節點的方法
createNode(
createTxn.getPath(),
根據請求頭的值,進而判斷出走到那個switch的分支,當前我們在控制檯觸發,進入到setData分支如下:跟進這個方法中可以看到它主要做了如下幾件事
- 使用傳遞進來的新值替代舊data
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
觸發指定的事件watch,什麼事件呢? NodeDataChange, 觸發了哪個watcher呢? 跟進去檢視
//todo setData
public Stat setData(String path, byte data[], int version, long zxid,
long time) throws KeeperException.NoNodeException {
Stat s = new Stat();
DataNode n = nodes.get(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
byte lastdata[] = null;
synchronized (n) {
// todo 修改記憶體的資料
lastdata = n.data;
n.data = data;
n.stat.setMtime(time);
n.stat.setMzxid(zxid);
n.stat.setVersion(version);
n.copyStat(s);
}
// now update if the path is in a quota subtree.
String lastPrefix;
if((lastPrefix = getMaxPrefixWithQuota(path)) != null) {
this.updateBytes(lastPrefix, (data == null ? 0 : data.length)
- (lastdata == null ? 0 : lastdata.length));
}
// todo 終於 看到了 服務端 關於觸發NodeDataChanged的事件
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
return s;
}
跟進去dataWatches.triggerWatch(path, EventType.NodeDataChanged);
,原始碼如下, 主要的邏輯就是取出存放在服務端的watch,然後逐個回撥他們的processor函式,問題來了,到底是哪些watcher呢? 其實就是我們在客戶端啟動時新增getData()時存進去的wather,也就是ServerCnxn
// todo 跟進去服務端的 觸發事件, 但是吧, 很納悶. 就是沒有往客戶端傳送資料的邏輯
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type,
KeeperState.SyncConnected, path);
HashSet<Watcher> watchers;
synchronized (this) {
watchers = watchTable.remove(path);
if (watchers == null || watchers.isEmpty()) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"No watchers for " + path);
}
return null;
}
for (Watcher w : watchers) {
HashSet<String> paths = watch2Paths.get(w);
if (paths != null) {
paths.remove(path);
}
}
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
// todo 繼續跟進去, 看它如何回撥的
w.process(e);
}
return watchers;
}
懷著激動的心情去看看ServerCnxn的process()方法做了什麼事?
來到ServerCnxn的實現類NIOServerCnxn, 確實很激動,看到了服務端在往客戶端傳送事務型訊息, 並且new ReplyHeader(-1, -1L, 0)第一個位置上的引數是-1, 這一點很重要,因為客戶端在接受到這個xid=-1的標記後,就會將這條響應交給EventThread處理
@Override
synchronized public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(-1, -1L, 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"Deliver event " + event + " to 0x"
+ Long.toHexString(this.sessionId)
+ " through " + this);
}
// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();
// todo 往服務端傳送了 e event型別訊息
sendResponse(h, e, "notification");
}
處理回調回調watch使用的響應
進入到SendThread
的讀就緒原始碼部分,如下: 它根據header.xid=-1就知道了這是事務型別的響應
// todo 服務端丟擲來的事件, 客戶端將把他存在EventThread的 watingEvents 佇列中
// todo 它的實現邏輯也是這樣, 會有另外一個執行緒不斷的消費這個佇列
if (replyHdr.getXid() == -1) {
// -1 means notification
if (LOG.isDebugEnabled()) {
LOG.debug("Got notification sessionid:0x"
+ Long.toHexString(sessionId));
}
// todo 建立watcherEvent 並將服務端傳送回來的資料,反序列化進這個物件中
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");
// convert from a server path to a client path
// todo 將server path 反轉成 client path
if (chrootPath != null) {
String serverPath = event.getPath();
if (serverPath.compareTo(chrootPath) == 0)
event.setPath("/");
else if (serverPath.length() > chrootPath.length())
event.setPath(serverPath.substring(chrootPath.length()));
else {
LOG.warn("Got server path " + event.getPath()
+ " which is too short for chroot path "
+ chrootPath);
}
WatchedEvent we = new WatchedEvent(event);
if (LOG.isDebugEnabled()) {
LOG.debug("Got " + we + " for sessionid 0x"
+ Long.toHexString(sessionId));
}
//todo 跟進去
eventThread.queueEvent(we);
return;
}
}
在這個方法的最後,將這個相應新增進EventThread
消費的佇列中,跟進 eventThread.queueEvent(we);
// todo
public void queueEvent(WatchedEvent event) {
// todo 如果事件的型別是 none, 或者sessionState = 直接返回
/**
* todo 事件的型別被設計成 watcher 介面的列舉
* None (-1),
* NodeCreated (1),
* NodeDeleted (2),
* NodeDataChanged (3),
* NodeChildrenChanged (4);
*/
if (event.getType() == EventType.None
&& sessionState == event.getState()) {
return;
}
sessionState = event.getState();
// materialize the watchers based on the event
// todo 根據事件的具體型別,將觀察者具體化, 跟進去
// todo 這個類是ClientCnxn的輔助類,作用就是將watcher 和它觀察的事件封裝在一起
WatcherSetEventPair pair = new WatcherSetEventPair(
//todo 跟進這個 materialize方法. 其實就是從map中取出了和當前client關聯的全部 watcher set
watcher.materialize(event.getState(), event.getType(),
event.getPath()),
event);
// queue the pair (watch set & event) for later processing
// todo 將watch集合 和 event 進行排隊(按順序新增到佇列裡了), 以便後續處理 , 怎麼處理呢? 就在EventThread的run迴圈中消費
// todo watingEvent ==> LinkedBlockingQueue<Object>
waitingEvents.add(pair);
}
上面的程式碼主要做了如下幾件事:
- 從map中取出和當前事件相關的全部watcher
- 將watcher set 新增進 waitingEvents佇列中,等待EventThead的消費
跟進 watcher.materialize(event.getState(), event.getType(),
會追到下面的程式碼
case NodeDataChanged: // todo node中的data改變和 nodeCreate 都會來到下面的分支
case NodeCreated:
synchronized (dataWatches) {
// todo dataWatches 就是剛才存放 path : watcher 的map
// todo dataWatches.remove(clientPath) 移除並返回clientPath對應的watcher , 放入 result 中
addTo(dataWatches.remove(clientPath), result);
}
上面的dataWatches 就是儲存path+watcher set的map, 上面的操作是移除並返回指定的watcher,這也說明了,為什麼zk原生客戶端新增的watcher僅僅會回撥一次
EventThread是如何消費waitingEvents的
EventThread是一條守護執行緒, 因此它擁有自己的不斷在執行的run方法,它就是在這個run方法中對這個佇列進行消費的
@Override
public void run() {
try {
isRunning = true;
// todo 同樣是無限的迴圈
while (true) {
// todo 從watingEvnets 中取出一個 WatcherSetEventPair
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
// todo 本類方法,處理這個事件,繼續進入,方法就在下面
processEvent(event);
}
if (wasKilled)
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
} catc
繼續跟進它的processEvent(event)
,最終會在這個方法中呼叫下面的程式碼,這裡的watcher就是我在本篇部落格的開始位置新增進去的watcher,至此打完收工
watcher.process(pair.event);
我是懷著無比激動的心情寫這篇部落格的,如果對您有幫助,歡迎給我點個贊