ZooKeeper原始碼學習筆記(1)--client端解析
前言
ZooKeeper是一個相對簡單的分散式協調服務,通過閱讀原始碼我們能夠更進一步的清楚分散式的原理。
環境
ZooKeeper 3.4.9
入口函式
在bin/zkCli.sh
中,我們看到client端的真實入口其實是一個org.apache.zookeeper.ZooKeeperMain
的Java類
"$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \
-cp "$CLASSPATH" $CLIENT_JVMFLAGS $JVMFLAGS \
org.apache.zookeeper.ZooKeeperMain "[email protected]"
通過原始碼走讀,看到在ZooKeeperMain
中主要由兩部分構成
connectToZK(cl.getOption("server"));
while ((line = (String)readLine.invoke(console, getPrompt())) != null) {
executeLine(line);
}
- 構造一個
ZooKeeper
物件,同ZooKeeperServer進行建立通訊連線 - 通過反射呼叫
jline.ConsoleReader
ZooKeeper
介面。
如上所述,client端其實是對 zookeeper.jar 的簡單封裝,在構造出一個ZooKeeper物件後,通過解析使用者輸入,呼叫 ZooKeeper 介面和 Server 進行互動。
ZooKeeper 類
剛才我們看到 client 端同 ZooKeeper Server 之間的互動其實是通過 ZooKeeper 物件進行的,接下來我們詳細深入到 ZooKeeper 類中,看看其和服務端的互動邏輯。
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly)
throws IOException
{
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
HostProvider hostProvider = new StaticHostProvider( connectStringParser.getServerAddresses());
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly);
cnxn.start();
}
在 ZooKeeper的構造方法中,可以看到 ZooKeeper 中使用 Server 的伺服器地址構建了一個 ClientCnxn
類,在這個類中,系統新建了兩個執行緒
sendThread = new SendThread(clientCnxnSocket);
eventThread = new EventThread();
其中,SendThread
負責將ZooKeeper
的請求資訊封裝成一個Packet
,傳送給 Server ,並維持同Server的心跳,EventThread
負責解析通過通過SendThread
得到的Response
,之後傳送給Watcher::processEvent
進行詳細的事件處理。
如上圖所示,Client中在終端輸入指令後,會被封裝成一個Request
請求,通過submitRequest
,進一步被封裝成Packet
包,提交給SendThread
處理。
SendThread
通過doTransport
將Packet
傳送給Server,並通過readResponse
獲取結果,解析成一個Event
,再將Event
加入EventThread
的佇列中等待執行。
EventThread
通過processEvent
消費佇列中的Event
事件。
SendThread
SendThread
的主要作用除了將Packet
包傳送給Server之外,還負責維持Client和Server之間的心跳,確保 session 存活。
現在讓我們從原始碼出發,看看SendThread
究竟是如何執行的。
SendThread
是一個執行緒類,因此我們進入其run()
方法,看看他的啟動流程。
while (state.isAlive()) {
if (!clientCnxnSocket.isConnected()) {
// 啟動和server的socket連結
startConnect();
}
// 根據上次的連線時間,判斷是否超時
if (state.isConnected()) {
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}
if (to <= 0) {
throw new SessionTimeoutException(warnInfo);
}
// 傳送心跳包
if (state.isConnected()) {
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
sendPing();
clientCnxnSocket.updateLastSend();
}
}
// 將指令資訊傳送給 Server
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
}
從上面的程式碼中,可以看出SendThread
的主要任務如下:
1. 建立同 Server 之間的 socket 連結
2. 判斷連結是否超時
3. 定時傳送心跳任務
4. 將ZooKeeper指令傳送給Server
與 Server 的長連結
ZooKeeper
通過獲取ZOOKEEPER_CLIENT_CNXN_SOCKET
變數構造了一個ClientCnxnSocket
物件,預設情況下是ClientCnxnSocketNIO
類
String clientCnxnSocketName = System
.getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
if (clientCnxnSocketName == null) {
clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
}
在ClientCnxnSocketNIO::connect
中我們可以看到這裡同Server之間建立了一個socket連結。
SocketChannel sock = createSock();
registerAndConnect(sock, addr);
超時與心跳
在SendThread::run
中,可以看到針對連結是否建立分別有readTimeout
和connetTimeout
兩種超時時間,一旦發現連結超時,則丟擲異常,終止 SendThread
。
在沒有超時的情況下,如果判斷距離上次心跳時間超過了1/2個超時時間,會再次傳送心跳資料,避免訪問超時。
傳送 ZooKeeper 指令
在時序圖中,我們看到從終端輸入指令後,我們會將其解析成一個Packet
包,等待SendThread
進行傳送。
以ZooKeeper::create
為例
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.create);
CreateRequest request = new CreateRequest();
CreateResponse response = new CreateResponse();
request.setData(data);
request.setFlags(createMode.toFlag());
request.setPath(serverPath);
if (acl != null && acl.size() == 0) {
throw new KeeperException.InvalidACLException();
}
request.setAcl(acl);
ReplyHeader r = cnxn.submitRequest(h, request, response, null);
在這裡create指令,被封裝成了一個 CreateRequest
,通過submitRequest
被轉成了一個Packet
包
public ReplyHeader submitRequest(RequestHeader h, Record request,
Record response, WatchRegistration watchRegistration)
throws InterruptedException {
ReplyHeader r = new ReplyHeader();
Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration);
synchronized (packet) {
while (!packet.finished) {
packet.wait();
}
}
return r;
}
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration) {
Packet packet = null;
// Note that we do not generate the Xid for the packet yet. It is
// generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
// where the packet is actually sent.
synchronized (outgoingQueue) {
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
// If the client is asking to close the session then
// mark as closing
if (h.getType() == OpCode.closeSession) {
closing = true;
}
outgoingQueue.add(packet);
}
}
sendThread.getClientCnxnSocket().wakeupCnxn();
return packet;
}
在submitRequest
中,我們進一步看到Request
被封裝成一個Packet
包,並加入SendThread::outgoingQueue
佇列中,等待執行。
Note:在這裡我們還看到,ZooKeeper方法中所謂的同步方法其實就是在Packet
被提交到SendThread
之後,陷入一個while
迴圈,等待處理完成後再跳出的過程
在SendThread::run
的while
迴圈中,ZooKeeper通過doTransport
將存放在outgoingQueue
中的Packet
包傳送給 Server。
void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn) {
if (sockKey.isReadable()) {
// 讀取response資訊
sendThread.readResponse(incomingBuffer);
}
if (sockKey.isWritable()) {
Packet p = findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress());
sock.write(p.bb);
}
}
在doIO
傳送socket資訊之前,先從socket中獲取返回資料,通過readResonse
進行處理。
void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
replyHdr.deserialize(bbia, "header");
if (replyHdr.getXid() == -1) {
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");
WatchedEvent we = new WatchedEvent(event);
eventThread.queueEvent( we );
}
}
在readReponse
中,通過解析資料,我們可以得到WatchedEvent
物件,並將其壓入EventThread
的訊息佇列,等待分發
EventThread
public void run() {
while (true) {
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
processEvent(event);
}
}
在EventThread
中通過processEvent
對佇列中的事件進行消費,並分發給不同的Watcher
watch事件註冊和分發
通常在ZooKeeper中,我們會為指定節點新增一個Watcher
,用於監聽節點變化情況,以ZooKeeper:exist
為例
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new ExistsWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.exists);
ExistsRequest request = new ExistsRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
SetDataResponse response = new SetDataResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
程式碼的大致邏輯和create
類似,但是對wathcer做了一層ExistWatchRegistration
的包裝,當packet
物件完成請求之後,呼叫register
方法,根據不同包裝的WatchRegistration
將watch註冊到不同watch列表中,等待回撥。
if (p.watchRegistration != null) {
p.watchRegistration.register(p.replyHeader.getErr());
}
在 ZooKeeper 中一共有三種類型的WatchRegistration
,分別對應DataWatchRegistration
,ChildWatchRegistration
,ExistWatchRegistration
。 並在ZKWatchManager
類中根據每種型別的WatchRegistration
,分別有一張map表負責存放。
private final Map<String, Set<Watcher>> dataWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches =
new HashMap<String, Set<Watcher>>();
當 EventThread::processEvent
時,根據event
的所屬路徑,從三張map中獲取對應的watch列表進行訊息通知及處理。
總結
client 端的原始碼分析就到此為止了。
ZooKeeper Client 的原始碼很簡單,擁有三個獨立執行緒分別對命令進行處理,分發和響應操作,在保證各個執行緒相互獨立的基礎上,儘可能避免了多執行緒操作中出現鎖的情況。