1. 程式人生 > >zookeeper(5)客戶端

zookeeper(5)客戶端

  zookeeper客戶端主要負責與使用者進行互動,將命令傳送到伺服器,接收伺服器的響應,反饋給使用者。主要分為一下三層:

使用者命令處理層

   使用者命令處理層的功能是讀取使用者輸入的命令,解析使用者命令和輸入引數,根據命令和引數,進行一些校驗,然後執行節點操作。

原始碼例項(ZooKeeperMain):

  1 public class ZooKeeperMain {
  2     // 命令解析器。用於解析命令
  3     protected MyCommandOptions cl = new MyCommandOptions();
  4
5 // 主函式 6 public static void main(String args[]) throws KeeperException, IOException, InterruptedException { 7 // 執行客戶端 8 ZooKeeperMain main = new ZooKeeperMain(args); 9 main.run(); 10 } 11 12 public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
13 // 解析啟動引數 14 cl.parseOptions(args); 15 // 獲取server引數,連線伺服器 16 connectToZK(cl.getOption("server")); 17 18 } 19 20 // 連線伺服器 21 protected void connectToZK(String newHost) throws InterruptedException, IOException { 22 host = newHost; 23 zk = new
ZooKeeper(host, Integer.parseInt(cl.getOption("timeout")), new MyWatcher()); 24 } 25 26 void run() throws KeeperException, IOException, InterruptedException { 27 // 迴圈讀取命令, 28 BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); 29 String line; 30 while ((line = br.readLine()) != null) { 31 // 執行命令 32 executeLine(line); 33 } 34 } 35 36 public void executeLine(String line) throws InterruptedException, IOException, KeeperException { 37 if (!line.equals("")) { 38 // 解析命令 39 cl.parseCommand(line); 40 // 執行命令 41 processZKCmd(cl); 42 } 43 } 44 45 protected boolean processZKCmd(MyCommandOptions co) throws KeeperException, IOException, InterruptedException { 46 // 讀取命令和引數 47 Stat stat = new Stat(); 48 String[] args = co.getArgArray(); 49 String cmd = co.getCommand(); 50 boolean watch = args.length > 2; 51 String path = null; 52 List<ACL> acl = Ids.OPEN_ACL_UNSAFE; 53 // 執行不同的命令,主要是進行一些校驗,然後呼叫zookeeper方法 54 if (cmd.equals("quit")) { 55 zk.close(); 56 System.exit(0); 57 } else if (cmd.equals("redo") && args.length >= 2) { 58 Integer i = Integer.decode(args[1]); 59 if (commandCount <= i) { 60 return false; 61 } 62 cl.parseCommand(history.get(i)); 63 history.put(commandCount, history.get(i)); 64 processCmd(cl); 65 } else if (cmd.equals("history")) { 66 for (int i = commandCount - 10; i <= commandCount; ++i) { 67 if (i < 0) 68 continue; 69 System.out.println(i + " - " + history.get(i)); 70 } 71 } else if (cmd.equals("printwatches")) { 72 if (args.length == 1) { 73 System.out.println("printwatches is " + (printWatches ? "on" : "off")); 74 } else { 75 printWatches = args[1].equals("on"); 76 } 77 } else if (cmd.equals("connect")) { 78 if (args.length >= 2) { 79 connectToZK(args[1]); 80 } else { 81 connectToZK(host); 82 } 83 } 84 if (cmd.equals("create") && args.length >= 3) { 85 int first = 0; 86 CreateMode flags = CreateMode.PERSISTENT; 87 if ((args[1].equals("-e") && args[2].equals("-s")) || (args[1]).equals("-s") && (args[2].equals("-e"))) { 88 first += 2; 89 flags = CreateMode.EPHEMERAL_SEQUENTIAL; 90 } else if (args[1].equals("-e")) { 91 first++; 92 flags = CreateMode.EPHEMERAL; 93 } else if (args[1].equals("-s")) { 94 first++; 95 flags = CreateMode.PERSISTENT_SEQUENTIAL; 96 } 97 if (args.length == first + 4) { 98 acl = parseACLs(args[first + 3]); 99 } 100 path = args[first + 1]; 101 String newPath = zk.create(path, args[first + 2].getBytes(), acl, flags); 102 } else if (cmd.equals("delete") && args.length >= 2) { 103 path = args[1]; 104 zk.delete(path, watch ? Integer.parseInt(args[2]) : -1); 105 } else if (cmd.equals("set") && args.length >= 3) { 106 path = args[1]; 107 stat = zk.setData(path, args[2].getBytes(), args.length > 3 ? Integer.parseInt(args[3]) : -1); 108 printStat(stat); 109 } else if (cmd.equals("aget") && args.length >= 2) { 110 path = args[1]; 111 zk.getData(path, watch, dataCallback, path); 112 } else if (cmd.equals("get") && args.length >= 2) { 113 path = args[1]; 114 byte data[] = zk.getData(path, watch, stat); 115 data = (data == null) ? "null".getBytes() : data; 116 System.out.println(new String(data)); 117 printStat(stat); 118 } else if (cmd.equals("ls") && args.length >= 2) { 119 path = args[1]; 120 List<String> children = zk.getChildren(path, watch); 121 System.out.println(children); 122 } else if (cmd.equals("ls2") && args.length >= 2) { 123 path = args[1]; 124 List<String> children = zk.getChildren(path, watch, stat); 125 System.out.println(children); 126 printStat(stat); 127 } else if (cmd.equals("getAcl") && args.length >= 2) { 128 path = args[1]; 129 acl = zk.getACL(path, stat); 130 for (ACL a : acl) { 131 System.out.println(a.getId() + ": " + getPermString(a.getPerms())); 132 } 133 } else if (cmd.equals("setAcl") && args.length >= 3) { 134 path = args[1]; 135 stat = zk.setACL(path, parseACLs(args[2]), args.length > 4 ? Integer.parseInt(args[3]) : -1); 136 printStat(stat); 137 } else if (cmd.equals("stat") && args.length >= 2) { 138 path = args[1]; 139 stat = zk.exists(path, watch); 140 printStat(stat); 141 } else if (cmd.equals("listquota") && args.length >= 2) { 142 path = args[1]; 143 String absolutePath = Quotas.quotaZookeeper + path + "/" + Quotas.limitNode; 144 byte[] data = null; 145 try { 146 data = zk.getData(absolutePath, false, stat); 147 StatsTrack st = new StatsTrack(new String(data)); 148 data = zk.getData(Quotas.quotaZookeeper + path + "/" + Quotas.statNode, false, stat); 149 System.out.println("Output stat for " + path + " " + new StatsTrack(new String(data)).toString()); 150 } catch (KeeperException.NoNodeException ne) { 151 System.err.println("quota for " + path + " does not exist."); 152 } 153 } else if (cmd.equals("setquota") && args.length >= 4) { 154 String option = args[1]; 155 String val = args[2]; 156 path = args[3]; 157 System.err.println("Comment: the parts are " + "option " + option + " val " + val + " path " + path); 158 if ("-b".equals(option)) { 159 // we are setting the bytes quota 160 createQuota(zk, path, Long.parseLong(val), -1); 161 } else if ("-n".equals(option)) { 162 // we are setting the num quota 163 createQuota(zk, path, -1L, Integer.parseInt(val)); 164 } else { 165 usage(); 166 } 167 168 } else if (cmd.equals("delquota") && args.length >= 2) { 169 // if neither option -n or -b is specified, we delete 170 // the quota node for thsi node. 171 if (args.length == 3) { 172 // this time we have an option 173 String option = args[1]; 174 path = args[2]; 175 if ("-b".equals(option)) { 176 delQuota(zk, path, true, false); 177 } else if ("-n".equals(option)) { 178 delQuota(zk, path, false, true); 179 } 180 } else if (args.length == 2) { 181 path = args[1]; 182 // we dont have an option specified. 183 // just delete whole quota node 184 delQuota(zk, path, true, true); 185 } else if (cmd.equals("help")) { 186 usage(); 187 } 188 } else if (cmd.equals("close")) { 189 zk.close(); 190 } else if (cmd.equals("addauth") && args.length >= 2) { 191 byte[] b = null; 192 if (args.length >= 3) 193 b = args[2].getBytes(); 194 195 zk.addAuthInfo(args[1], b); 196 } else { 197 usage(); 198 } 199 return watch; 200 } 201 }
View Code

  除了基礎的節點操作外,使用者命令層還提供了節點配額的控制。節點配額的控制通過在/zookeeper/quaota對應的目錄下記錄當前節點資料大小和現在大小實現。

原始碼例項(ZooKeeperMain.createQuota):

 1 public static boolean createQuota(ZooKeeper zk, String path,
 2             long bytes, int numNodes)
 3         throws KeeperException, IOException, InterruptedException
 4     {
 5         //判斷指定路徑是否存在
 6         Stat initStat = zk.exists(path, false);
 7         if (initStat == null) {
 8             throw new IllegalArgumentException(path + " does not exist.");
 9         }
10         String quotaPath = Quotas.quotaZookeeper;
11         String realPath = Quotas.quotaZookeeper + path;
12         try {
13             //判斷在子節點中是否有限量設定
14             List<String> children = zk.getChildren(realPath, false);
15             for (String child: children) {
16                 if (!child.startsWith("zookeeper_")) {
17                     throw new IllegalArgumentException(path + " has child " +
18                             child + " which has a quota");
19                 }
20             }
21         } catch(KeeperException.NoNodeException ne) {
22             // this is fine
23         }
24         //判斷夫節點中是否有限量設定
25         checkIfParentQuota(zk, path);
26         //如果當前節點限量設定為空,逐級建立節點資料
27         if (zk.exists(quotaPath, false) == null) {
28             try {
29                 zk.create(Quotas.procZookeeper, null, Ids.OPEN_ACL_UNSAFE,
30                         CreateMode.PERSISTENT);
31                 zk.create(Quotas.quotaZookeeper, null, Ids.OPEN_ACL_UNSAFE,
32                         CreateMode.PERSISTENT);
33             } catch(KeeperException.NodeExistsException ne) {
34                 // do nothing
35             }
36         }
37         String[] splits = path.split("/");
38         StringBuilder sb = new StringBuilder();
39         sb.append(quotaPath);
40         for (int i=1; i<splits.length; i++) {
41             sb.append("/" + splits[i]);
42             quotaPath = sb.toString();
43             try {
44                 zk.create(quotaPath, null, Ids.OPEN_ACL_UNSAFE ,
45                         CreateMode.PERSISTENT);
46             } catch(KeeperException.NodeExistsException ne) {
47                 //do nothing
48             }
49         }
50         //建立限量設定節點
51         String statPath = quotaPath + "/" + Quotas.statNode;
52         quotaPath = quotaPath + "/" + Quotas.limitNode;
53         StatsTrack strack = new StatsTrack(null);
54         strack.setBytes(bytes);
55         strack.setCount(numNodes);
56         try {
57             zk.create(quotaPath, strack.toString().getBytes(),
58                     Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
59             StatsTrack stats = new StatsTrack(null);
60             stats.setBytes(0L);
61             stats.setCount(0);
62             zk.create(statPath, stats.toString().getBytes(),
63                     Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
64         } catch(KeeperException.NodeExistsException ne) {
65             byte[] data = zk.getData(quotaPath, false , new Stat());
66             StatsTrack strackC = new StatsTrack(new String(data));
67             if (bytes != -1L) {
68                 strackC.setBytes(bytes);
69             }
70             if (numNodes != -1) {
71                 strackC.setCount(numNodes);
72             }
73             zk.setData(quotaPath, strackC.toString().getBytes(), -1);
74         }
75         return true;
76     }
View Code

節點處理層

  節點處理層主要是提供節點操作功能,將節點操作引數封裝成資料物件,然後通過網路層傳送資料物件,並返回結果。網路層提供了同步和非同步兩種網路請求方式。

建立節點(ZooKeeper):

public void create(final String path, byte data[], List<ACL> acl,
            CreateMode createMode,  StringCallback cb, Object ctx)
    {
        final String clientPath = path;
//解析client相對路徑到全路徑
        final String serverPath = prependChroot(clientPath);
//設定請求頭
        RequestHeader h = new RequestHeader();
        h.setType(ZooDefs.OpCode.create);
//設定建立節點請求體
        CreateRequest request = new CreateRequest();
        CreateResponse response = new CreateResponse();
        ReplyHeader r = new ReplyHeader();
        request.setData(data);
        request.setFlags(createMode.toFlag());
        request.setPath(serverPath);
        request.setAcl(acl);
//通過網路層傳送請求
        cnxn.queuePacket(h, r, request, response, cb, clientPath,
                serverPath, ctx, null);
    }
View Code

刪除節點(ZooKeeper):

 1 public void delete(final String path, int version)
 2         throws InterruptedException, KeeperException
 3     {
 4         final String clientPath = path;
 5       //解析client相對路徑到全路徑
 6         final String serverPath = prependChroot(clientPath);
 7        //設定請求頭
 8         RequestHeader h = new RequestHeader();
 9         h.setType(ZooDefs.OpCode.delete);
10         //設定刪除節點請求體
11         DeleteRequest request = new DeleteRequest();
12         request.setPath(serverPath);
13         request.setVersion(version);
14         cnxn.queuePacket(h, new ReplyHeader(), request, null, cb, clientPath,
15                 serverPath, ctx, null);
16     }
View Code

其他方法(ZooKeeper):

  1 public void exists(final String path, Watcher watcher,
  2             StatCallback cb, Object ctx)
  3     {
  4         final String clientPath = path;
  5         PathUtils.validatePath(clientPath);
  6 
  7         // the watch contains the un-chroot path
  8         WatchRegistration wcb = null;
  9         if (watcher != null) {
 10             wcb = new ExistsWatchRegistration(watcher, clientPath);
 11         }
 12 
 13         final String serverPath = prependChroot(clientPath);
 14 
 15         RequestHeader h = new RequestHeader();
 16         h.setType(ZooDefs.OpCode.exists);
 17         ExistsRequest request = new ExistsRequest();
 18         request.setPath(serverPath);
 19         request.setWatch(watcher != null);
 20         SetDataResponse response = new SetDataResponse();
 21         cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
 22                 clientPath, serverPath, ctx, wcb);
 23     }
 24     public void getData(final String path, Watcher watcher,
 25             DataCallback cb, Object ctx)
 26     {
 27         final String clientPath = path;
 28         PathUtils.validatePath(clientPath);
 29 
 30         // the watch contains the un-chroot path
 31         WatchRegistration wcb = null;
 32         if (watcher != null) {
 33             wcb = new DataWatchRegistration(watcher, clientPath);
 34         }
 35 
 36         final String serverPath = prependChroot(clientPath);
 37 
 38         RequestHeader h = new RequestHeader();
 39         h.setType(ZooDefs.OpCode.getData);
 40         GetDataRequest request = new GetDataRequest();
 41         request.setPath(serverPath);
 42         request.setWatch(watcher != null);
 43         GetDataResponse response = new GetDataResponse();
 44         cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
 45                 clientPath, serverPath, ctx, wcb);
 46     }
 47     public void setData(final String path, byte data[], int version,
 48             StatCallback cb, Object ctx)
 49     {
 50         final String clientPath = path;
 51         PathUtils.validatePath(clientPath);
 52 
 53         final String serverPath = prependChroot(clientPath);
 54 
 55         RequestHeader h = new RequestHeader();
 56         h.setType(ZooDefs.OpCode.setData);
 57         SetDataRequest request = new SetDataRequest();
 58         request.setPath(serverPath);
 59         request.setData(data);
 60         request.setVersion(version);
 61         SetDataResponse response = new SetDataResponse();
 62         cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
 63                 clientPath, serverPath, ctx, null);
 64     }
 65 
 66     public void getACL(final String path, Stat stat, ACLCallback cb,
 67             Object ctx)
 68     {
 69         final String clientPath = path;
 70         PathUtils.validatePath(clientPath);
 71 
 72         final String serverPath = prependChroot(clientPath);
 73 
 74         RequestHeader h = new RequestHeader();
 75         h.setType(ZooDefs.OpCode.getACL);
 76         GetACLRequest request = new GetACLRequest();
 77         request.setPath(serverPath);
 78         GetACLResponse response = new GetACLResponse();
 79         cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
 80                 clientPath, serverPath, ctx, null);
 81     }
 82     public void setACL(final String path, List<ACL> acl, int version,
 83             StatCallback cb, Object ctx)
 84     {
 85         final String clientPath = path;
 86         PathUtils.validatePath(clientPath);
 87 
 88         final String serverPath = prependChroot(clientPath);
 89 
 90         RequestHeader h = new RequestHeader();
 91         h.setType(ZooDefs.OpCode.setACL);
 92         SetACLRequest request = new SetACLRequest();
 93         request.setPath(serverPath);
 94         request.setAcl(acl);
 95         request.setVersion(version);
 96         SetACLResponse response = new SetACLResponse();
 97         cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
 98                 clientPath, serverPath, ctx, null);
 99     }
100     public void getChildren(final String path, Watcher watcher,
101             Children2Callback cb, Object ctx)
102     {
103         final String clientPath = path;
104         final String serverPath = prependChroot(clientPath);
105         
106         WatchRegistration wcb = null;
107         if (watcher != null) {
108             wcb = new ChildWatchRegistration(watcher, clientPath);
109         }
110         
111         RequestHeader h = new RequestHeader();
112         h.setType(ZooDefs.OpCode.getChildren2);
113         GetChildren2Request request = new GetChildren2Request();
114         request.setPath(serverPath);
115         request.setWatch(watcher != null);
116         GetChildren2Response response = new GetChildren2Response();
117         cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
118                 clientPath, serverPath, ctx, wcb);
119     }
120     public void sync(final String path, VoidCallback cb, Object ctx){
121         final String clientPath = path;
122         PathUtils.validatePath(clientPath);
123 
124         final String serverPath = prependChroot(clientPath);
125 
126         RequestHeader h = new RequestHeader();
127         h.setType(ZooDefs.OpCode.sync);
128         SyncRequest request = new SyncRequest();
129         SyncResponse response = new SyncResponse();
130         request.setPath(serverPath);
131         cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
132                 clientPath, serverPath, ctx, null);
133     }
View Code

網路請求層

  網路請求層最為複雜,主要實現nio非同步網路請求以及結果回撥,watcher管理。

  提供了同步和非同步兩種通訊方式。同步通訊其實也是通過非同步通訊實現,首先會使用非同步通訊傳送請求,然後判斷返回結果是否ready,如果沒有則通過wait進入阻塞狀態。當非同步通訊返回請求時,會設定返回結果狀態,並且喚醒阻塞的執行緒。

同步請求(ClientCnxn.submitRequest):

 1 public ReplyHeader submitRequest(RequestHeader h, Record request,
 2             Record response, WatchRegistration watchRegistration)
 3             throws InterruptedException {
 4         //非同步傳送請求包
 5         ReplyHeader r = new ReplyHeader();
 6         Packet packet = queuePacket(h, r, request, response, null, null, null,
 7                     null, watchRegistration);
 8         //如果請求包沒有返回資料,則線上等待
 9         synchronized (packet) {
10             while (!packet.finished) {
11                 packet.wait();
12             }
13         }
14         return r;
15     }
View Code

  非同步請求的引數會被封裝成一個Packet物件放入outgoingQueue佇列中。會有一個傳送執行緒從outgoingQueue佇列中取出一個可傳送的Packet物件,併發送序列化資訊,然後把該Packet放入到pendingQueue佇列中,當接收到服務端響應,反序列號出結果資料,然後在pendingQueue中找到對應的Packet,設定結果,最後對於有回撥和watcher的命令封裝成事件放入事件佇列中,會有另一個事件執行緒,從事件佇列中讀取事件訊息,,執行回撥和watcher邏輯。

非同步請求(ClientCnxn.queuePacket):

 1 Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
 2             Record response, AsyncCallback cb, String clientPath,
 3             String serverPath, Object ctx, WatchRegistration watchRegistration)
 4     {
 5         
 6         Packet packet = null;
 7         synchronized (outgoingQueue) {
 8             //設定一個全域性唯一的id,作為資料包的id
 9             if (h.getType() != OpCode.ping && h.getType() != OpCode.auth) {
10                 h.setXid(getXid());
11             }
12             //將請求頭,請求體,返回結果,watcher等封裝成資料包。
13             packet = new Packet(h, r, request, response, null,
14                     watchRegistration);
15             packet.cb = cb;
16             packet.ctx = ctx;
17             packet.clientPath = clientPath;
18             packet.serverPath = serverPath;
19             //將資料包新增到outgoing佇列中。
20             outgoingQueue.add(packet);
21         }
22         sendThread.wakeup();
23         return packet;
24     }
View Code

  傳送執行緒執行流程如下:

  1.啟動執行緒,建立伺服器連線。(狀態為Connecting)

  2.建立連線後,進行初始化,主要是向伺服器傳送預設watcher命令、auth命令、connect命令。(狀態為Connected) 

  3. 從outgoing佇列中讀取資料包,傳送到服務端。

  4.接收服務端請求,處理返回結構,connect命令記錄sessionid、sessionpwd、timeout等;如果是其他命令,然後在pendingQueue中找到對應的Packet,設定結果。

  5.對於有回撥和watcher的命令封裝成事件放入事件佇列中。

傳送執行緒主流程(ClientCnxn.SendThread.run):

 1 class SendThread extends Thread {
 2         SelectionKey sockKey;
 3         private final Selector selector = Selector.open();
 4         public void run() {
 5             while (zooKeeper.state.isAlive()) {
 6                 //建立連線
 7                 startConnect();
 8                 //獲取註冊通道
 9                 selector.select(1000);
10                 Set<SelectionKey> selected;
11                 synchronized (this) {
12                     selected = selector.selectedKeys();
13                 }
14                 for (SelectionKey k : selected) {
15                     SocketChannel sc = ((SocketChannel) k.channel());
16                     if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
17                         //建立連線
18                         if (sc.finishConnect()) {
19                             primeConnection(k);
20                         }
21                         //讀寫資料
22                     } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
23                         doIO();
24                     }
25                 }
26             }
27             try {
28                 selector.close();
29             } catch (IOException e) {
30                 LOG.warn("Ignoring exception during selector close", e);
31             }
32         }
33 
34       //通過nio建立連線
35         private void startConnect() throws IOException {
36             //從伺服器列表中獲取一臺伺服器地址
37             InetSocketAddress addr = serverAddrs.get(nextAddrToTry);
38             nextAddrToTry++;
39             if (nextAddrToTry == serverAddrs.size()) {
40                 nextAddrToTry = 0;
41             }
42             //通過nio註冊
43             SocketChannel sock;
44             sock = SocketChannel.open();
45             sock.configureBlocking(false);
46             sock.socket().setSoLinger(false, -1);
47             sock.socket().setTcpNoDelay(true);
48             try {
49                 sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
50             } catch (IOException e) {
51                 sock.close();
52                 throw e;
53             }
54             //初始化快取
55             lenBuffer.clear();
56             incomingBuffer = lenBuffer;
57         }
58 }
View Code

建立連線,進行初始化(ClientCnxn.SendThread.primeConnection):

 1         private void primeConnection(SelectionKey k) throws IOException {
 2             ConnectRequest conReq = new ConnectRequest(0, lastZxid,
 3                     sessionTimeout, sessionId, sessionPasswd);
 4             //序列化連線命令
 5             ByteArrayOutputStream baos = new ByteArrayOutputStream();
 6             BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
 7             boa.writeInt(-1, "len");
 8             conReq.serialize(boa, "connect");
 9             baos.close();
10             ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
11             bb.putInt(bb.capacity() - 4);
12             bb.rewind();
13             synchronized (outgoingQueue) {
14                 //傳送設定監聽器請求,將請求封裝成資料包,放入outgoing佇列中
15                 if (!disableAutoWatchReset) {
16                     List<String> dataWatches = zooKeeper.getDataWatches();
17                     List<String> existWatches = zooKeeper.getExistWatches();
18                     List<String> childWatches = zooKeeper.getChildWatches();
19                     if (!dataWatches.isEmpty()
20                                 || !existWatches.isEmpty() || !childWatches.isEmpty()) {
21                         SetWatches sw = new SetWatches(lastZxid,
22                                 prependChroot(dataWatches),
23                                 prependChroot(existWatches),
24                                 prependChroot(childWatches));
25                         RequestHeader h = new RequestHeader();
26                         h.setType(ZooDefs.OpCode.setWatches);
27                         h.setXid(-8);
28                         Packet packet = new Packet(h, new ReplyHeader(), sw, null, null,
29                                 null);
30                         outgoingQueue.addFirst(packet);
31                     }
32                 }
33                 //傳送認證資訊
34                 for (AuthData id : authInfo) {
35                     outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
36                             OpCode.auth), null, new AuthPacket(0, id.scheme,
37                             id.data), null, null, null));
38                 }
39                 //傳送連線命令請求
40                 outgoingQueue.addFirst((new Packet(null, null, null, null, bb,
41                         null)));
42             }
43             //註冊通道
44             synchronized (this) {
45                 k.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
46             }
47         }
View Code

處理讀寫主流程,主要是nio操作(ClientCnxn.SendThread.doIO):

 1 boolean doIO() throws InterruptedException, IOException {
 2             boolean packetReceived = false;
 3             //獲取socketchannel
 4             SocketChannel sock = (SocketChannel) sockKey.channel();
 5             //如果可讀
 6             if (sockKey.isReadable()) {
 7                 //讀取資料到快取中
 8                 int rc = sock.read(incomingBuffer);
 9                 //直到快取讀滿
10                 if (!incomingBuffer.hasRemaining()) {
11                     //重置快取
12                     incomingBuffer.flip();
13                     //如果讀取的是長度資訊,讀取長度資訊,並且分配相應快取
14                     if (incomingBuffer == lenBuffer) {
15                         int len = incomingBuffer.getInt();
16                         incomingBuffer = ByteBuffer.allocate(len);
17                     } else if (!initialized) {
18                       //如果是connect命令的返回值,獲取session,timeout等相關資訊
19                         readConnectResult();
20                         enableRead();
21                         lenBuffer.clear();
22                         //重置快取
23                         incomingBuffer = lenBuffer;
24                         initialized = true;
25                     } else {
26                         //讀取資料內容
27                         readResponse();
28                         //重置快取
29                         lenBuffer.clear();
30                         incomingBuffer = lenBuffer;
31                         packetReceived = true;
32                     }
33                 }
34             }
35             //如果是寫
36             if (sockKey.isWritable()) {
37                 synchronized (outgoingQueue) {
38                     if (!outgoingQueue.isEmpty()) {
39                         //從outgoingQueue佇列中拿資料包寫入通道
40                         ByteBuffer pbb = outgoingQueue.getFirst().bb;
41                         sock.write(pbb);
42                         if (!pbb.hasRemaining()) {
43                             sentCount++;
44                             Packet p = outgoingQueue.removeFirst();
45                             if (p.header != null
46                                     && p.header.getType() != OpCode.ping
47                                     && p.header.getType() != OpCode.auth) {
48                                 pendingQueue.add(p);
49                             }
50                         }
51                     }
52                 }
53             }
54             if (outgoingQueue.isEmpty()) {
55                 disableWrite();
56             } else {
57                 enableWrite();
58             }
59             return packetReceived;
60         }
View Code

處理返回結果,主要處理connect返回結果和其他請求返回結果。

connect命令主要返回sessionID, sessonpwd,timeout,(ClientCnxn.SendThread.readConnectResult):

 1 //讀取connect命令的結果
 2 void readConnectResult() throws IOException {
 3 //反序列化connect命令結果
 4             ByteBufferInputStream bbis = new ByteBufferInputStream(
 5                     incomingBuffer);
 6             BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
 7             ConnectResponse conRsp = new ConnectResponse();
 8             conRsp.deserialize(bbia, "connect");
 9             //獲取timeout,session等資訊
10             readTimeout = negotiatedSessionTimeout * 2 / 3;
11             connectTimeout = negotiatedSessionTimeout / serverAddrs.size();
12             sessionId = conRsp.getSessionId();
13             sessionPasswd = conRsp.getPasswd();
14             zooKeeper.state = States.CONNECTED;
15 //向訊息佇列放入連線成功訊息
16             eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
17                     Watcher.Event.KeeperState.SyncConnected, null));
18         }
View Code

其他返回結果,xid=-2為ping命令的返回結果;xid=-4為auth命令;xid=-1為伺服器傳送的notification;其他命令返回結果。

 1         void readResponse() throws IOException {
 2             //對返回資料進行反序列化
 3             ByteBufferInputStream bbis = new ByteBufferInputStream(
 4                     incomingBuffer);
 5             BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
 6             ReplyHeader replyHdr = new ReplyHeader();
 7             replyHdr.deserialize(bbia, "header");
 8             //根據返回頭資訊,封裝想要的事件,放入事件佇列中,交給eventthread處理
 9 //向訊息佇列放入驗證失敗訊息
10             if (replyHdr.getXid() == -4) {
11                  // -4 is the xid for AuthPacket               
12                 if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
13                     zooKeeper.state = States.AUTH_FAILED;                    
14                     eventThread.queueEvent( new WatchedEv