zookeeper源碼之客戶端
阿新 • • 發佈:2018-02-10
服務端 run t對象 成對 bool .com 操作 code 分享
zookeeper自身提供了一個簡易的客戶端。主要包括一下幾個模塊:
1.啟動模塊。
2.核心執行模塊。
3.網絡通信模塊。
啟動模塊
啟動程序,接收和解析命令行。詳見zookeeper源碼之客戶端啟動模塊。
核心執行模塊
客戶端操作ZooKeeper服務端的核心類,詳見zookeeper源碼之客戶端核心執行模塊。
類圖
ZooKeeper
ZooKeeper是客戶端操作ZooKeeper服務端的核心類。當用戶向ZooKeeperMain執行相關命令時,最終會交給ZooKeeper執行,其會將用戶請求封裝成對象,然後發送到服務端。內部使用ClientCnxn來提供與服務端的通信。 請求數據會被封裝成RequestHeader、Request對象,相應的返回結果會存儲在Response,ReplyHeader對象。
public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode) throws KeeperException, InterruptedException { final String clientPath = path; PathUtils.validatePath(clientPath, createMode.isSequential()); final String serverPath = prependChroot(clientPath); RequestHeader h= new RequestHeader(); h.setType(ZooDefs.OpCode.create); CreateRequest request = new CreateRequest(); CreateResponse response = new CreateResponse(); request.setData(data); request.setFlags(createMode.toFlag()); request.setPath(serverPath); if (acl != null&& acl.size() == 0) { throw new KeeperException.InvalidACLException(); } request.setAcl(acl); ReplyHeader r = cnxn.submitRequest(h, request, response, null); if (r.getErr() != 0) { throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } if (cnxn.chrootPath == null) { return response.getPath(); } else { return response.getPath().substring(cnxn.chrootPath.length()); } }
ClientCnxn
為客戶端發送請求到服務端,管理底層IO連接。 將用戶調用的請求對象(RequestHeader、Request)封裝成Packet對象,存入發送隊列。內部有一個線程會不斷讀取發送隊列中的Packet對象,通過NIO將Packet對象發送到服務端,然後將Packet對象放入pending隊列,該線程會不斷讀取服務端的返回信息,並且將結果設置到Packet對象的Response,ReplyHeader對象中。
//等待發送的數據包隊列
private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>();
//發送後等待結果的數據包隊列
private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>();
class SendThread extends Thread { boolean doIO() throws InterruptedException, IOException { ...if (!outgoingQueue.isEmpty()) { ByteBuffer pbb = outgoingQueue.getFirst().bb; sock.write(pbb); if (!pbb.hasRemaining()) { sentCount++; Packet p = outgoingQueue.removeFirst(); if (p.header != null && p.header.getType() != OpCode.ping && p.header.getType() != OpCode.auth) { pendingQueue.add(p); } } } } ... } ... @Override public void run() { ...while (zooKeeper.state.isAlive()) { ...if (doIO()) { lastHeard = now; } ... }
...
}
zookeeper源碼之客戶端