Zookeeper客戶端原始碼分析
1. 從ZooKeeper構造方法開始:
public ZooKeeper(String connectString, intsessionTimeout, Watcher watcher,
long sessionId, byte[]sessionPasswd, boolean canBeReadOnly)
throws IOException
{
LOG.info("Initiating clientconnection, connectString=" + connectString
+ " sessionTimeout="+ sessionTimeout
+ " watcher=" +watcher
+ " sessionId=" +Long.toHexString(sessionId)
+ " sessionPasswd="
+ (sessionPasswd == null ?"<null>" : "<hidden>"));
watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser= new ConnectStringParser(
connectString);
HostProvider hostProvider = newStaticHostProvider(
connectStringParser.getServerAddresses());
cnxn = newClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), sessionId, sessionPasswd, canBeReadOnly);
cnxn.seenRwServerBefore = true; //since user has provided sessionId
cnxn.start();
}
--構造方法中主要作了4件事:
(1)把引數中的watcher物件加入了ZooKeeper物件的成員watchManager中,關於ZKWatchManager這個類,實際上主要功能是承擔watcher管理容器的作用,從它的成員變數可看出
private final Map<String,Set<Watcher>> dataWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches =
new HashMap<String, Set<Watcher>>();
private volatile Watcher defaultWatcher;
裡面包含了三個map容器,分別儲存data,exist,child型別的watcher,還有一個單獨的defaultWatcher,它們的區別是前三者是一次性。後者是在zookeeper物件生命週期內都生效,並且是在構造方法時傳入。
(2)建立一個ClientCnxn物件,這個物件承擔了客戶端的職能,看看裡面的構造方法原始碼
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;
connectTimeout = sessionTimeout / hostProvider.size();
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;
sendThread = new SendThread(clientCnxnSocket);
eventThread = new EventThread();
--可見裡面先是根據使用者傳來的引數賦值給自己的各個成員,這些都是比較重要的配置項,還有new出了兩個非常重要的執行緒sendThread和eventThread,前者負責跟服務端之間的io互動(也就是傳送請求和接受回覆),後者是一個輪詢執行緒,用來回調watcher的處理方法。
(3)在構造方法中有個重要的引數getClientCnxnSocket(),返回的是一個ClientCnxnSocket物件,這個物件非常關鍵,是底層的和服務端進行網路io互動的類物件,看看它的原始碼
String clientCnxnSocketName = System
.getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
if (clientCnxnSocketName == null) {
clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
}
try {
return (ClientCnxnSocket)Class.forName(clientCnxnSocketName)
.newInstance();
} catch (Exception e) {
IOException ioe = new IOException("Couldn't instantiate "
+ clientCnxnSocketName);
ioe.initCause(e);
throw ioe;
}
--看紅色部分,這是一個比較少見的利用Class物件呼叫構造方法建立類物件的用法,為啥能這樣做呢?因為Class物件是這個類所有方法,成員等資訊的入口,有了它就能訪問這個類的相關方法或成員資訊。為什麼不直接使用ClientCnxnSocket類的構造方法而多此一舉呢?這是為了執行時能根據需要來動態地載入相關類,構建該類的物件。在這裡實際建立的是ClientCnxnSocket的子類ClientCnxnSocketNIO物件,這種套路有個高大上的名字叫“反射”。
來看看ClientCnxnSocketNIO的構造方法都幹了什麼事
private final Selectorselector = Selector.open();
private SelectionKey sockKey;
ClientCnxnSocketNIO() throws IOException {
super();
}
--為什麼除了構造方法裡面的程式碼還把它前面的兩行成員變數的定義貼出來呢?因為真正完整的構造方法是包括非靜態成員的初始化動作的。所以這一步最關鍵的動作就是紅色部分,
最後創建出來的這個ClientCnxnSocketNIO物件被SendThread的構造方法當實參使用了,看看裡面幹了什麼
SendThread(ClientCnxnSocket clientCnxnSocket) {
super(makeThreadName("-SendThread()"));
state = States.CONNECTING;
this.clientCnxnSocket = clientCnxnSocket;
setDaemon(true);
}
--其實也沒幹了什麼,只是把clientCnxnSocket賦值給了SendThread類的clientCnxnSocket成員,也就是這個跟服務端互動的網路io類最終是被SendThread拿來線上程中使用了。
(4)啟動上一步創建出來的sendThread和eventThread執行緒,原始碼很簡單
sendThread.start();
eventThread.start();
--到此為止,好像zookeeper的構造方法分析就結束了
2.sendThread執行緒啟動後幹了什麼
public void run() {
clientCnxnSocket.introduce(this,sessionId);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = System.currentTimeMillis();
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
while (state.isAlive()) {
try {
if(!clientCnxnSocket.isConnected()) {
if(!isFirstConnect){
try {
Thread.sleep(r.nextInt(1000));
} catch(InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
// don't re-establishconnection if we are closing
if (closing ||!state.isAlive()) {
break;
}
startConnect();
clientCnxnSocket.updateLastSendAndHeard();
}
if (state.isConnected()) {
// determine whether weneed to send an AuthFailed event.
if (zooKeeperSaslClient!= null) {
booleansendAuthEvent = false;
if(zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
try {
zooKeeperSaslClient.initialize(ClientCnxn.this);
} catch (SaslException e) {
LOG.error("SASL authentication with Zookeeper Quorum member failed:" + e);
state =States.AUTH_FAILED;
sendAuthEvent= true;
}
}
KeeperStateauthState = zooKeeperSaslClient.getKeeperState();
if (authState !=null) {
if (authState== KeeperState.AuthFailed) {
// Anauthentication error occurred during authentication with the Zookeeper Server.
state =States.AUTH_FAILED;
sendAuthEvent = true;
} else {
if(authState == KeeperState.SaslAuthenticated) {
sendAuthEvent = true;
}
}
}
if (sendAuthEvent== true) {
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
authState,null));
}
}
to = readTimeout -clientCnxnSocket.getIdleRecv();
} else {
to = connectTimeout -clientCnxnSocket.getIdleRecv();
}
if (to <= 0) {
String warnInfo;
warnInfo = "Clientsession timed out, have not heard from server in "
+clientCnxnSocket.getIdleRecv()
+ "ms"
+ " forsessionid 0x"
+Long.toHexString(sessionId);
LOG.warn(warnInfo);
throw newSessionTimeoutException(warnInfo);
}
if (state.isConnected()) {
//1000(1 second) is to prevent race condition missing tosend the second ping
//also make sure not to send too many pings when readTimeoutis small
int timeToNextPing =readTimeout / 2 - clientCnxnSocket.getIdleSend() -
((clientCnxnSocket.getIdleSend() > 1000) ? 1000: 0);
//send a ping requesteither time is due or no packet sent out within MAX_SEND_PING_INTERVAL
if (timeToNextPing<= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
sendPing();
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing< to) {
to =timeToNextPing;
}
}
}
// If we are in read-onlymode, seek for read/write server
if (state ==States.CONNECTEDREADONLY) {
long now = System.currentTimeMillis();
int idlePingRwServer =(int) (now - lastPingRwServer);
if (idlePingRwServer>= pingRwTimeout) {
lastPingRwServer =now;
idlePingRwServer =0;
pingRwTimeout =
Math.min(2*pingRwTimeout, maxPingRwTimeout);
pingRwServer();
}
to = Math.min(to,pingRwTimeout - idlePingRwServer);
}
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue,ClientCnxn.this);
} catch (Throwable e) {
if (closing) {
if (LOG.isDebugEnabled()){
// closing so thisis expected
LOG.debug("Anexception was thrown while closing send thread for session 0x"
+Long.toHexString(getSessionId())
+ " :" + e.getMessage());
}
break;
} else {
// this is ugly, youhave a better way speak up
if (e instanceofSessionExpiredException) {
LOG.info(e.getMessage() + ", closing socket connection");
} else if (e instanceofSessionTimeoutException) {
LOG.info(e.getMessage() + RETRY_CONN_MSG);
} else if (e instanceofEndOfStreamException) {
LOG.info(e.getMessage() + RETRY_CONN_MSG);
} else if (e instanceofRWServerFoundException) {
LOG.info(e.getMessage());
} else {
LOG.warn(
"Session 0x"
+Long.toHexString(getSessionId())
+" for server "
+clientCnxnSocket.getRemoteSocketAddress()
+", unexpected error"
+ RETRY_CONN_MSG, e);
}
cleanup();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(
Event.EventType.None,
Event.KeeperState.Disconnected,
null));
}
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
}
}
}
cleanup();
clientCnxnSocket.close();
if (state.isAlive()) {
eventThread.queueEvent(newWatchedEvent(Event.EventType.None,
Event.KeeperState.Disconnected, null));
}
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
"SendThread exitedloop for session: 0x"
+ Long.toHexString(getSessionId()));
}
--注意紅色部分,如果socket還沒連線到伺服器,則進行連線,最終調了SocketChannel.connect(SocketAddress remote)進行連線,然後更新相關狀態。如果已經連線上了,那麼執行clientCnxnSocket.doTransport(to, pendingQueue,outgoingQueue,ClientCnxn.this);,看看doTransport方法裡面的原始碼
voiddoTransport(int waitTimeOut, List<Packet> pendingQueue,LinkedList<Packet> outgoingQueue,
ClientCnxn cnxn)
throws IOException, InterruptedException {
selector.select(waitTimeOut);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
// Everything below and until we get back to the select is
// non blocking, so time is effectively a constant. That is
// Why we just have to do this once, here
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
sendThread.primeConnection();
}
} else if ((k.readyOps() & (SelectionKey.OP_READ |SelectionKey.OP_WRITE)) != 0) {
doIO(pendingQueue, outgoingQueue, cnxn);
}
}
if (sendThread.getZkState().isConnected()) {
synchronized(outgoingQueue) {
if (findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
enableWrite();
}
}
}
selected.clear();
}
--看紅色部分,其中doIO(pendingQueue, outgoingQueue, cnxn)是進行網路io讀寫操作,而findSendablePacket(outgoingQueue,cnxn.sendThread.clientTunneledAuthenticationInProgress()是把請求從outgoingQueue對列中提取出來傳送給服務端。看看doIO的原始碼
if (sockKey.isReadable()) {
int rc = sock.read(incomingBuffer);
……….
sendThread.readResponse(incomingBuffer);
}
if (sockKey.isWritable()) {
synchronized(outgoingQueue) {
Packet p =findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress());
...........
sock.write(p.bb);
..........
outgoingQueue.removeFirstOccurrence(p);
.........
pendingQueue.add(p);
}
--如果有可讀事件,那麼就讀取到快取中,然後呼叫readResponse方法,如果有可寫事件,那麼就是用socket傳送到服務端,並且從outgoingQueue佇列移出掉該事件包,並加入pendingQueue佇列中。再看看readResponse方法的原始碼
WatchedEvent we = new WatchedEvent(event);
eventThread.queueEvent( we );
--主要就是構造了WatchedEvent物件,並加入了waitingEvents佇列,供eventThread執行緒使用。
3.eventThread執行緒啟動後幹了什麼
public void run() {
try {
isRunning = true;
while(true) {
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
processEvent(event);
}
if (wasKilled)
synchronized(waitingEvents) {
if(waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
} catch (InterruptedException e) {
LOG.error("Event threadexiting due to interruption", e);
}
LOG.info("EventThread shut down for session: 0x{}",
Long.toHexString(getSessionId()));
}
--注意紅色部分,執行緒會迴圈去從waitingEvents對列中消費event物件(生產者是sendThread執行緒),並且呼叫processEvent方法來處理。看看processEvent的原始碼
private void processEvent(Object event){
try {
if (event instanceof WatcherSetEventPair) {
// each watcher will processthe event
WatcherSetEventPair pair =(WatcherSetEventPair) event;
for (Watcher watcher :pair.watchers) {
try {
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Errorwhile calling watcher ", t);
}
}
} else {
Packet p = (Packet) event;
int rc = 0;
String clientPath =p.clientPath;
if (p.replyHeader.getErr() !=0) {
rc =p.replyHeader.getErr();
}
if (p.cb == null) {
LOG.warn("Somehow anull cb got to EventThread!");
} else if (p.responseinstanceof ExistsResponse
|| p.responseinstanceof SetDataResponse
|| p.responseinstanceof SetACLResponse) {
StatCallback cb =(StatCallback) p.cb;
if (rc == 0) {
if (p.responseinstanceof ExistsResponse) {
cb.processResult(rc, clientPath, p.ctx,
((ExistsResponse) p.response)
.getStat());
} else if (p.responseinstanceof SetDataResponse) {
cb.processResult(rc,clientPath, p.ctx,
((SetDataResponse) p.response)
.getStat());
} else if (p.responseinstanceof SetACLResponse) {
cb.processResult(rc, clientPath, p.ctx,
((SetACLResponse) p.response)
.getStat());
}
} else {
cb.processResult(rc,clientPath, p.ctx, null);
}
} else if (p.responseinstanceof GetDataResponse) {
DataCallback cb =(DataCallback) p.cb;
GetDataResponse rsp =(GetDataResponse) p.response;
if (rc == 0) {
cb.processResult(rc,clientPath, p.ctx, rsp
.getData(),rsp.getStat());
} else {
cb.processResult(rc,clientPath, p.ctx, null,
null);
}
} else if (p.responseinstanceof GetACLResponse) {
ACLCallback cb =(ACLCallback) p.cb;
GetACLResponse rsp =(GetACLResponse) p.response;
if (rc == 0) {
cb.processResult(rc,clientPath, p.ctx, rsp
.getAcl(),rsp.getStat());
} else {
cb.processResult(rc,clientPath, p.ctx, null,
null);
}
} else if (p.responseinstanceof GetChildrenResponse) {
ChildrenCallback cb =(ChildrenCallback) p.cb;
GetChildrenResponse rsp =(GetChildrenResponse) p.response;
if (rc == 0) {
cb.processResult(rc,clientPath, p.ctx, rsp
.getChildren());
} else {
cb.processResult(rc,clientPath, p.ctx, null);
}
} else if (p.responseinstanceof GetChildren2Response) {
Children2Callback cb =(Children2Callback) p.cb;
GetChildren2Response rsp= (GetChildren2Response) p.response;
if (rc == 0) {
cb.processResult(rc,clientPath, p.ctx, rsp
.getChildren(), rsp.getStat());
} else {
cb.processResult(rc,clientPath, p.ctx, null, null);
}
} else if (p.responseinstanceof CreateResponse) {
StringCallback cb =(StringCallback) p.cb;
CreateResponse rsp =(CreateResponse) p.response;
if (rc == 0) {
cb.processResult(rc,clientPath, p.ctx,
(chrootPath== null
?rsp.getPath()
:rsp.getPath()
.substring(chrootPath.length())));
} else {
cb.processResult(rc,clientPath, p.ctx, null);
}
} else if (p.responseinstanceof MultiResponse) {
MultiCallback cb =(MultiCallback) p.cb;
MultiResponse rsp =(MultiResponse) p.response;
if (rc == 0) {
List<OpResult> results = rsp.getResultList();
int newRc =rc;
for (OpResult result :results) {
if(result instanceof ErrorResult
&& KeeperException.Code.OK.intValue()
!= (newRc = ((ErrorResult)result).getErr())) {
break;
}
}
cb.processResult(newRc, clientPath, p.ctx, results);
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.cb instanceof VoidCallback) {
VoidCallback cb =(VoidCallback) p.cb;
cb.processResult(rc,clientPath, p.ctx);
}
}
} catch (Throwable t) {
LOG.error("Caught unexpectedthrowable", t);
}
}
}
--對event事件的處理實際分兩大類,一類是我們很熟悉的watcher事件,呼叫我們重寫後的process方法處理。另一大類是不大熟悉的AsyncCallback事件,而對於這種事件,要根據具體回覆包裡面的內容選擇執行AsyncCallback類的子類的processResult方法處理該事件。對於後一大類沒用過,引用網上的一些知識點
Watcher 和 AsyncCallback 的區別
Watcher: Watcher 是用於監聽節點,session 狀態的,比如 getData 對資料節點 a 設定了 watcher ,那麼當 a 的資料內容發生改變時,客戶端會收到 NodeDataChanged 通知,然後進行 watcher 的回撥。
AsyncCallback : AsyncCallback 是在以非同步方式使用 ZooKeeper API 時,用於處理返回結果的。例如:getData 同步呼叫的版本是: byte[] getData(String path, boolean watch,Statstat) ,非同步呼叫的版本是: voidgetData(String path,Watcher watcher,AsyncCallback.DataCallback cb,Object ctx) ,可以看到,前者是直接返回獲取的結果,後者是通過 AsyncCallback 回撥處理結果的。
--可見AsyncCallback是用於非同步呼叫的回撥方法。
--到此,zookeeper客戶端原始碼就算分析完了,由於本人對socket方面不熟悉,所以這塊講的不是太清晰,後續要把這塊知識點認真學習好。
相關推薦
Zookeeper客戶端原始碼分析
1. 從ZooKeeper構造方法開始: public ZooKeeper(String connectString, intsessionTimeout, Watcher watcher, long sessionId, byte[]ses
Zookeeper客戶端原始碼分析(一)建立連線
本文基於zookeeper-3.4.14,由於zookeeper的很多構造方法都是呼叫的另一個構造方法,所以分析程式碼的時候直接分
Zookeeper 原始碼(三)Zookeeper 客戶端原始碼
Zookeeper 原始碼(三)Zookeeper 客戶端原始碼 Zookeeper 客戶端由以下幾個核心元件組成: 類 說明 Zookeeper Zookeeper 客戶端入口 ClientWatch
MQTT再學習 -- MQTT 客戶端原始碼分析
MQTT 原始碼分析,搜尋了一下發現網路上講的很少,多是逍遙子的那幾篇。 參看:逍遙子_mosquitto原始碼分析系列 參看:MQTT libmosquitto原始碼分析 參看:Mosquitto學習筆記 一、目錄結構首先我們還是來看一下 mosquitto-1.4.14 的原始碼目錄結構
RabbitMQ客戶端原始碼分析之BlockingCell.md
RabbitMQ-java-client版本 com.rabbitmq:amqp-client:4.3.0 RabbitMQ版本宣告: 3.6.15 BlockingCell BlockingCell,程式碼文件註釋描述為”簡單的一次性IPC機制“,
RabbitMQ客戶端原始碼分析(三)之Command
RabbitMQ-java-client版本 com.rabbitmq:amqp-client:4.3.0 RabbitMQ版本宣告: 3.6.15 Command Command介面是AMQP方法-引數的容器介面,帶有可選的內容頭(content
RabbitMQ客戶端原始碼分析(五)之ConsumerWorkSerivce與WorkPool
RabbitMQ-java-client版本 com.rabbitmq:amqp-client:4.3.0 RabbitMQ版本宣告: 3.6.15 WorkPool WorkPool可以認
RabbitMQ客戶端原始碼分析(六)之IntAllocator
RabbitMQ-java-client版本 com.rabbitmq:amqp-client:4.3.0 RabbitMQ版本宣告: 3.6.15 IntAllocator 用於分配給定範
RabbitMQ客戶端原始碼分析(七)之Channel與ChannelManager
RabbitMQ-java-client版本 com.rabbitmq:amqp-client:4.3.0 RabbitMQ版本宣告: 3.6.15 Channel uml圖 tran
RabbitMQ客戶端原始碼分析(九)之RPC請求響應
宣告 Queue宣告、exchange宣告、bind等,這些都是通過同步RPC呼叫 channel.queueDeclare(queueName, durable
hbase客戶端原始碼分析--deletetable
–hbase 刪除表 HBaseAdmin admin = new HBaseAdmin(conf); 可以檢視原始碼,其實低層也是呼叫建立 HConnectionImplementation 物件進行連線管理的 admin.disableTable(t
hbase客戶端原始碼分析--put流程
—client 的呼叫流程 table.put(put); 操作 HTable table = new HTable(conf, Bytes.toBytes(tableName)); 呼叫流程如上面的delete流程一樣 首先建立一個muti的操作物件
zookeeper客戶端和服務端互動分析
原文連結 ZkClient 在使用ZooKeeper的Java客戶端時,經常需要處理幾個問題:重複註冊watcher、session失效重連、異常處理。
原始碼走讀-Yarn-ResourceManager04-MR任務提交-客戶端側分析
0x05 RM排程-MR任務提交-客戶端側分析 5.1 mapreduce.job org.apache.hadoop.mapreduce.Job 我們都知道,MR任務的一般結尾會有一句話是job.waitForCompletion(true),這行程式碼
BeeGFS原始碼分析2-客戶端概要分析
註冊檔案系統型別 init_fhgfs_client 核心模組初始化: // fhgfs_client_module\source\program\Main.c #define BEEGFS_LICENSE "GPL v2" static int __init init_fhgf
ZooKeeper客戶端與服務端的事件watcher原始碼閱讀
我是懷著無比激動的心情寫這篇部落格的,如果對您有幫助,歡迎給我點個贊 watcher存在的必要性 舉個特容易懂的例子: 假如我的專案是基於dubbo+zookeeper搭建的分散式專案, 我有三個功能相同的服務提供者,用zookeeper當成註冊中心,我的三個專案得註冊進zookeeper才能對外暴露服務
zookeeper客戶端命令行查看dubbo服務的生產者和消費者
ide 切換 microsoft ons -s sof 生產 size zkcli 假設zookeeper安裝在192.168.5.130這臺服務器上,現在我們通過命令行查看dubbo在zookeeper註冊服務的生產者和消費者信息 首先通過命令切換到/usr/zookee
7.5 zookeeper客戶端curator的基本使用
serve server 超時 one c-c tlist result 強制 car 使用zookeeper原生API實現一些復雜的東西比較麻煩。所以,出現了兩款比較好的開源客戶端,對zookeeper的原生API進行了包裝:zkClient和curator。後者是Net
zookeeper客戶端命令詳解
需要 des absolut 時間 its 認證 del 就會 ons 今天同事突然向看一下zookeeper中都創建了哪些節點,而我本人對zookeeper的客服端命令了解的很少,有些操作竟然不知道怎麽用,於是乎就索性整理一下zookeeper客服端命令的使用,並再此
zookeeper客戶端相關命令
server 同步 window iss lpad dig 技術分享 localhost gpo windows環境: 本機 直接 點機zkcli.cmd linux環境: 連接到zookeeper server ./zkCli.sh -server localh