Zookeeper 原始碼(三)Zookeeper 客戶端原始碼
阿新 • • 發佈:2018-11-09
Zookeeper 原始碼(三)Zookeeper 客戶端原始碼
Zookeeper 客戶端由以下幾個核心元件組成:
類 | 說明 |
---|---|
Zookeeper | Zookeeper 客戶端入口 |
ClientWatchManager | 客戶端 Watcher 管理器 |
HostProvider | 客戶端地址列表管理器 |
ClientCnxn | 客戶端核心執行緒,其內部又包含兩個執行緒,即 SendThread 和 EventThread。前者是一個 IO 執行緒,主要負責 ZooKeeper 客戶端和服務端之間的網路通訊;後者是一個事件執行緒,主要負責對服務端事件進行處理。 |
ClientCnxnSocketNetty | 最底層的通訊 netty |
客戶端整體結構如下圖:
一、Zookeeper
客戶端在構造階段建立 ClientCnxn 與服務端連線,後續命令都是通過 ClientCnxn 傳送給服務端。ClientCnxn 是客戶端與服務端通訊的底層介面,它和 ClientCnxnSocketNetty 一起工作提供網路通訊服務。
服務端是 ZookeeperServer 類,收到 ClientCnxn 的請求處理後再通過 ClientCnxn 返回到客戶端。
ClientCnxn 連線時可以同時指定多臺伺服器地址,根據指定的演算法連線一臺伺服器,當某個伺服器發生故障無法連線時,會自動連線到其他的伺服器。實現這一機制的是 StaticHostProvider 類。
(1) 客戶端使用:
ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1", 5000, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { if (type == Event.EventType.None) { // 如果連線建立成功才能繼續執行 countDownLatch.countDown(); } } } }); countDownLatch.await(); zooKeeper.create( "/testRoot", // 節點路徑,不允許遞迴建立節點 "testRoot".getBytes(), // 節點內容 ZooDefs.Ids.OPEN_ACL_UNSAFE, // 節點許可權,一般情況下不用關注 CreateMode.PERSISTENT); // 節點型別 }
(2) ZooKeeper 建立
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly,
HostProvider aHostProvider) throws IOException {
// 1. watcher 儲存在 ZKWatchManager 的 defaultWatcher 中,作為整個會話的預設 watcher
watchManager = defaultWatchManager();
watchManager.defaultWatcher = watcher;
// 2. 解析 server 獲取 IP 以及 PORT
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
hostProvider = aHostProvider;
// 3. 建立 ClientCnxn 例項
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), sessionId, sessionPasswd, canBeReadOnly);
cnxn.seenRwServerBefore = true; // since user has provided sessionId
// 4. 啟動 SendThread 和 EventThread 執行緒,這兩個執行緒均為守護執行緒
cnxn.start();
}
建立底層通訊 ClientCnxnSocketNIO 或 ClientCnxnSocketNetty
public static final String ZOOKEEPER_CLIENT_CNXN_SOCKET = "zookeeper.clientCnxnSocket";
private static ClientCnxnSocket getClientCnxnSocket() throws IOException {
String clientCnxnSocketName = System
.getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
if (clientCnxnSocketName == null) {
clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
}
try {
return (ClientCnxnSocket) Class.forName(clientCnxnSocketName)
.newInstance();
} catch (Exception e) {
IOException ioe = new IOException("Couldn't instantiate "
+ clientCnxnSocketName);
ioe.initCause(e);
throw ioe;
}
}
(3) ClientCnxn 建立
Packet | 所有的請求都會封裝成 packet
outgoingQueue | 即將傳送的請求 packets
pendingQueue | 已經發送等待響應的 packets
ClientCnxn 建立時建立了兩個執行緒 SendThread 和 EventThread,這兩個執行緒都是守護執行緒,主執行緒結束時即關閉執行緒。
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;
connectTimeout = sessionTimeout / hostProvider.size();
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;
sendThread = new SendThread(clientCnxnSocket);
eventThread = new EventThread();
}
SendThread(ClientCnxnSocket clientCnxnSocket) {
super(makeThreadName("-SendThread()"));
state = States.CONNECTING;
this.clientCnxnSocket = clientCnxnSocket;
setDaemon(true);
}
EventThread() {
super(makeThreadName("-EventThread"));
setDaemon(true);
}