Zookeeper原始碼閱讀(九) ZK Client-Server(1)
前言
Watcher部分的程式碼量總的來說還是比較多的,但是整個邏輯流程還是相對來說比較清晰的。不過還是需要常在腦子裡過一過,zk的watcher的相關的架構的設計還是挺精妙的。
從這一篇起開始說ZK client端-server端互動相關的程式碼,主要是從client本身,client和server的連線和會話以及server端這三個大點來說。這一篇主要說說大致流程和client端的初始化等。
結構
在網上看到了一張圖片非常好的描述了zk工作的大致結構,對理解zk client端以至於整體的程式碼都挺有幫助的,這裡貼出來:
上圖主要描述了ZK Client和Server端互動的過程:
- client端把request傳遞到Zookeeper類中(以Packet形式);
- Zookeeper類處理request並放入outgoingqueue中(sendthread做的);
- sendthread把發出的request移到pendingqueue;
- 收到回覆後,sendthread從pendingqueue中取出request,並生成event;
- eventthread處理event並觸發watchManager中的watcher,呼叫callback。
Client端程式碼結構
其實client端的很多重要的類在之前說watcher,快照和log的時候就已經接觸了很多了,這裡也是系統地總結下。
其中主要幾個類的功能:
Zookeeper:客戶端核心類之一,也是入口;
ClientCnxn:客戶端連線核心類,包含SendThread和EventThread兩個執行緒。SendThread為I/O執行緒,主要負責Zookeeper客戶端和伺服器之間的網路I/O通訊;EventThread為事件執行緒,主要負責對服務端事件進行處理;
ClientWatchManager:客戶端watcher管理器;
HostProvider:客戶端地址列表管理器。
上圖是Zookeeper類及其相關的類的互動UML圖,可以通過上圖來理解下整個Zookeeper各個功能類之間的關係和協作流程。
主要流程
zk client和server端建立連線從client來說主要分為以下三個階段:
- 初始化階段:上面介紹的幾個主要功能類的例項化;
- 建立階段:啟動及建立連線;
- 響應請求:響應及接收。
逐個介紹:
初始化階段
從上圖中能看出,第一步就是從Zookeeper類的例項化開始,我們選取一個Zookeeper類的構造器開始分析:
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly)
throws IOException
{
LOG.info("Initiating client connection, connectString=" + connectString
+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
//設定預設watcher,之前講watcher的時候說過
watchManager.defaultWatcher = watcher;
//負責解析配置的server地址串
//主要有兩個功能:1.加chroot(預設prefix,之前有介紹過);2.讀字串並把多個server地址分開
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
//根據之前的字串解析hostname,ip等,並不一定會按照原來的順序,在構造器中會將順序打散
HostProvider hostProvider = new StaticHostProvider(
connectStringParser.getServerAddresses());
//例項化clientCnxn物件
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
//啟動sendThread和eventThread
cnxn.start();
}
其實總結下可以看出整個初始化階段分為四步:
為預設Watcher賦值
- 解析並設定Zookeeper伺服器地址列表
- 例項化ClientCnxn物件
啟動clientCnxn物件裡的sendThread和eventThread執行緒。
在這裡講一下構造器中的幾個重要類:
ConnectStringParser
public final class ConnectStringParser {
private static final int DEFAULT_PORT = 2181;//預設port
private final String chrootPath;//預設字首
private final ArrayList<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>();//地址list
ConnectStringParser的構造器很簡單,主要就是解析chrootPath和生成上面的serverAddresses地址列表。
StaticHostProvider
有一張圖很好的形容了StaticHostProvider的工作原理。
在StaticHostProvider類中呼叫next方法會在迴圈佇列中不斷獲取,特別要注意的是這個迴圈佇列本身就已經是打亂過的。
在StaticHostProvider構造器中,把前面ConnectStringParser的server地址會再次解析一遍並生成一個佇列(因為上一步解析的結果有的沒有address),然後就會如下打亂。
Collections.shuffle(this.serverAddresses);
public InetSocketAddress next(long spinDelay) {
//這個部分主要是迴圈
++currentIndex;
if (currentIndex == serverAddresses.size()) {
currentIndex = 0;
}
//如果這一次的server地址和上一次一樣,那麼就睡眠spinDelay時間
if (currentIndex == lastIndex && spinDelay > 0) {
try {
Thread.sleep(spinDelay);
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
} else if (lastIndex == -1) {//如果是第一次訪問,就不要等待
// We don't want to sleep on the first ever connect attempt.
lastIndex = 0;
}
return serverAddresses.get(currentIndex);
}
建立階段
其實在sendThread和eventThread兩個執行緒啟動之後,建立和響應階段也就開始了。具體的流程會再後面詳細說,大致的流程是先從hostprovider獲取server地址,然後建立連線並構造請求傳送。
詳細流程:
- 獲取伺服器地址(從hostprovider中可以獲得),並建立TCP連線;
- 構造ConnectRequest請求。前面的TCP連線建立後,client和server的會話並沒有完全建立。SendThread會根據響應的引數構造ConnectRequest,幷包裝成Packet物件放入outgoingqueue中傳送到server端,這就是實際意義上的client和server的一個會話。這部分在之前的watcher傳送時有提到。
- ClientCnxnSocket從queue中取出Packet並序列化部分屬性發送到server。
這裡先把幾個基礎且比較重要的部分說下:
sendThread
功能:
- 維護client和server的心跳連線,一旦失去連線會立即重連;
- 管理了客戶端所有的請求傳送和響應接收操作,其將上層客戶端API操作轉換成相應的請求協議併發送到服務端,並完成對同步呼叫的返回和非同步呼叫的回撥;
- 接受請求的返回並傳遞給eventThread去處理。
上面的圖大致描述了outgoingqueue(客戶端請求等待發送的佇列)和pendingQueue(已經發送等待響應處理的佇列)的關係。
EventThread
EventThread是客戶端ClientCnxn內部的一個事件處理執行緒,負責客戶端的事件處理,並觸發客戶端註冊的Watcher監聽。EventThread中的watingEvents佇列用於臨時存放那些需要被觸發的Object,包括客戶端註冊的Watcher和非同步介面中註冊的回撥器AsyncCallback。同時,EventThread會不斷地從watingEvents中取出Object,識別具體型別(Watcher或AsyncCallback),並分別呼叫process和processResult介面方法來實現對事件的觸發和回撥。
Packet
其實之前就已經看過Packet的一些處理了,最重要的就是Packet序列化的時候createBB方法裡只有部分屬性序列化了,包括watcher在內的很多變數都沒有序列化,這也是watcher輕量特性的保證。
outgoingqueue和pendingqueue之前提到了主要的作用,而他們內部放置的物件都是Packet。在傳送時,sendThread從outgoingqueue取出Packet序列化(帶有生成的請求序號XID在請求頭中)併發送,然後這個Packet就被轉移到pendingqueue中,等待響應處理。
響應階段
同樣的,響應階段的程式碼也比較多,後面具體說,這裡說下大致流程:
- ClientCnxnSocket接收到響應後,首先判斷客戶端狀態是否初始化,若未初始化,那說明當前客戶端與服務端之間正在進行會話建立並反序列化response,生成ConnectResponse(帶有sessionid),然後會通知sendThread和HostProvider進行相應的設定;
- 如果為初始化狀態,且收到的為事件,那麼會反序列化為WatcherEvent,並放到EventThread的等待佇列中;
- 如果是常規的請求,如getdata,exists等,那麼會從pendingQueue中取出一個Packet來處理。
思考
Outgoingqueue, pendingQueue和EventThread的event等待佇列關係:
outgoingqueue就是所有要傳送的客戶端的請求,pendingqueue就是傳送過的等待響應的,如果客戶端收到了server端的回覆,就會從pendingqueue中取出請求Packet並處理;而event等待佇列是為了處理server段主動發起的事件,也就是節點發生了change,server主動傳送請求到客戶端,client把這類的通知放到event等待佇列中。
notification event,非notification event
客戶端需要接受伺服器傳送過來的訊息,第一種訊息是類似於Watcher回掉這種的,我們叫做notification,他的特點是伺服器主動傳送訊息給客戶端的,比如客戶端a在資料節點a上設定了getData監聽,當客戶端b修改了節點a後,伺服器主動傳送NodeDataChanged訊息給客戶端a。第二中訊息是類似於create,getData這種,他們向伺服器傳送對應的請求後,然後將請求放進到pendingQueue中,然後等待伺服器的響應,當接受到伺服器的響應後,再從pendingQueue中取出請求,然後進行回掉。
參考
https://www.cnblogs.com/francisYoung/p/5225703.html 可以多看下理解
http://www.cnblogs.com/leesf456/p/6098255.html
https://www.jianshu.com/p/cbad04b12950
《Paxos到ZK》