1. 程式人生 > >Zookeeper源碼閱讀(九) ZK Client-Server(2)

Zookeeper源碼閱讀(九) ZK Client-Server(2)

while循環 gen pre ipa repl lap message readonly 字節

前言

前面一篇博客主要從大致流程的角度說了下client和server建立連接的流程,這篇和下一篇博客會詳細的把上一篇不是很細致的地方展開和補充。

初始化階段

初始化階段主要就是把Zookeeper類中比較重要的功能類實例化,前面對這個過程說的已經比較詳細了。這裏主要補充幾點:

  1. ClientCnxn初始化

cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
        hostProvider, sessionTimeout, this, watchManager,
        getClientCnxnSocket(), canBeReadOnly);
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
        ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
        long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {

可以看到ClientCnxn的構造器中有一個非常重要的參數是ClientCnxnSocket,這也是client和server建立連接的功能類,這裏看下如何獲得的。

private static ClientCnxnSocket getClientCnxnSocket() throws IOException {
    //獲取系統配置
    String clientCnxnSocketName = System
            .getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
    //如果沒有特別設置,那麽采用NIO的實現為默認實現
    if (clientCnxnSocketName == null) {
        clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
    }
    try {
        //反射來獲取對象實例
        return (ClientCnxnSocket) Class.forName(clientCnxnSocketName).getDeclaredConstructor()
                .newInstance();
    } catch (Exception e) {
        IOException ioe = new IOException("Couldn't instantiate "
                + clientCnxnSocketName);
        ioe.initCause(e);
        throw ioe;
    }
}

可以看到,clientCnxn的對象是通過反射獲得的。

如果還記得的話,之前watcher發送的時候就是clientCnxn來發送的,當然,其實client端和server的連接都是通過這個類來做的,具體的方法涉及到doTransport, doIO等,具體使用了NIO的一些方法,之後把NIO和Netty弄得比較清楚後再來把這部分補上。

  1. StaticHostProvider裏的等待

public InetSocketAddress next(long spinDelay) {
    //每嘗試一次currentindex加一
    ++currentIndex;
    //試了所有的server
    if (currentIndex == serverAddresses.size()) {
        currentIndex = 0;
    }
    //試了一圈就會有currentIndex == lastIndex
    if (currentIndex == lastIndex && spinDelay > 0) {
        try {
            Thread.sleep(spinDelay);
        } catch (InterruptedException e) {
            LOG.warn("Unexpected exception", e);
        }
    } else if (lastIndex == -1) {
        // We don't want to sleep on the first ever connect attempt.
        lastIndex = 0;
    }

    return serverAddresses.get(currentIndex);
}

剛開始看到這個方法的時候很奇怪,一般next方法基本都沒有參數的,這裏很奇怪的加了個delay的時間,仔細看了看才發現有特殊的考慮,如果所有server都試過了且連不上,就會sleep spinDelay時間再嘗試。

創建階段

如前篇博客所說,當zookeeper構造器中clientCnxn的start方法調用也就是sendthread和eventthread開始run起來時,創建過程便開始了。

public void start() {
    sendThread.start();
    eventThread.start();
}

創建連接的過程主要是sendthread做的,啟動了start()方法實際上就是調用了sendthread的run方法,但是這個方法比較復雜,我們從sendThread的別的方法開始,最後再過run方法這樣清晰一點。而sendthread的功能之前有說過,這裏引用一份別人的總結,下面看方法功能的時候也可以對照:

  1. 維護了客戶端與服務端之間的會話生命周期(通過一定周期頻率內向服務端發送PING包檢測心跳),如果會話周期內客戶端與服務端出現TCP連接斷開,那麽就會自動且透明地完成重連操作。   
  2. 管理了客戶端所有的請求發送和響應接收操作,其將上層客戶端API操作轉換成相應的請求協議並發送到服務端,並完成對同步調用的返回和異步調用的回調。   
  3. 將來自服務端的事件傳遞給EventThread去處理。

技術分享圖片

下面過一下幾個比較重要的方法:

sendPing

這是sendthread功能第一點的方法,保證了和server之間的ping連接,也就是心跳。

private void sendPing() {
    lastPingSentNs = System.nanoTime();//lastPingSentNs是上一次ping的時間(nano time)
    RequestHeader h = new RequestHeader(-2, OpCode.ping);//生成ping的特殊請求頭
    queuePacket(h, null, null, null, null, null, null, null, null);//把packet加入到outgoingqueue中
}

這裏有一點需要註意:

if (p.requestHeader != null
        && p.requestHeader.getType() != OpCode.ping
        && p.requestHeader.getType() != OpCode.auth) {
    synchronized (pendingQueue) {
        pendingQueue.add(p);
    }
}

這是ClientCnxnSocketNIO的doIO方法裏的一段,這裏想說明的是如果header是ping, auth或空,那麽在發送完之後不會加入Pendingqueue中。

在sendthread的readresponse中,對ping和auth的請求都有特別的處理,在第八篇裏有分析過這個方法。

if (replyHdr.getXid() == -2) {
    // -2 is the xid for pings
if (replyHdr.getXid() == -4) {
    // -4 is the xid for AuthPacket 

pingRwServer

這個方法是client連接了只讀的server時會嘗試連接hostprovider裏的讀寫server。

private void pingRwServer() throws RWServerFoundException {
    String result = null;
    InetSocketAddress addr = hostProvider.next(0);//下一個server地址
    LOG.info("Checking server " + addr + " for being r/w." +
            " Timeout " + pingRwTimeout);

    Socket sock = null;
    BufferedReader br = null;
    try {
        //socket變量初始化
        sock = new Socket(addr.getHostName(), addr.getPort());
        sock.setSoLinger(false, -1);
        sock.setSoTimeout(1000);
        sock.setTcpNoDelay(true);
        sock.getOutputStream().write("isro".getBytes());
        sock.getOutputStream().flush();
        sock.shutdownOutput();
        br = new BufferedReader(
                new InputStreamReader(sock.getInputStream()));//獲得連接的回復
        result = br.readLine();//讀回復
    } catch (ConnectException e) {
        // ignore, this just means server is not up
    } catch (IOException e) {
        // some unexpected error, warn about it
        LOG.warn("Exception while seeking for r/w server " +
                e.getMessage(), e);
    } finally {
        //保護語句
        if (sock != null) {
            try {
                sock.close();
            } catch (IOException e) {
                LOG.warn("Unexpected exception", e);
            }
        }
        if (br != null) {
            try {
                br.close();
            } catch (IOException e) {
                LOG.warn("Unexpected exception", e);
            }
        }
    }

    //如果發現是讀寫server
    if ("rw".equals(result)) {
        pingRwTimeout = minPingRwTimeout;
        // save the found address so that it's used during the next
        // connection attempt
        rwServerAddress = addr;//保存此server地址,更新rwServerAddress
        throw new RWServerFoundException("Majority server found at "
                + addr.getHostName() + ":" + addr.getPort());//找到讀寫server,run方法拋異常,client重連到rwServerAddress
    }
}

對異常的catch和處理部分如下:

else if (e instanceof RWServerFoundException) {
    LOG.info(e.getMessage());
} else {
    LOG.warn(
            "Session 0x"
                    + Long.toHexString(getSessionId())
                    + " for server "
                    + clientCnxnSocket.getRemoteSocketAddress()
                    + ", unexpected error"
                    + RETRY_CONN_MSG, e);
}
cleanup();
if (state.isAlive()) {
    eventThread.queueEvent(new WatchedEvent(
            Event.EventType.None,
            Event.KeeperState.Disconnected,
            null));//waitingevent隊列中加入斷開連接的event,會導致重連
}

可以看到這裏把一個斷開連接的event加入了隊列後,eventthread處理的時候就會重連,而重連的server就是先前設置好的rwServerAddress。

startConnect

從方法名很容易知道,這個方法是用來建立連接的。

private void startConnect() throws IOException {
    // initializing it for new connection
    //初始化變量
    saslLoginFailed = false;
    state = States.CONNECTING;

    InetSocketAddress addr;//socket鏈接地址
    if (rwServerAddress != null) {
        addr = rwServerAddress;//設置為讀寫server的地址
        rwServerAddress = null;//這裏設為空為了連接斷開的時候下次連接可以換一個server
    } else {
        addr = hostProvider.next(1000);//如果讀寫server地址為空就換hostProvider裏的下一個
    }

    setName(getName().replaceAll("\\(.*\\)",
            "(" + addr.getHostName() + ":" + addr.getPort() + ")"));//設置線程的名字
    if (ZooKeeperSaslClient.isEnabled()) {//sasl開啟了,sasl有時間再去仔細看看
        try {
            //相應的username和client的初始化
            String principalUserName = System.getProperty(
                    ZK_SASL_CLIENT_USERNAME, "zookeeper");
            zooKeeperSaslClient =
                new ZooKeeperSaslClient(
                        principalUserName+"/"+addr.getHostName());
        } catch (LoginException e) {
            // An authentication error occurred when the SASL client tried to initialize:
            // for Kerberos this means that the client failed to authenticate with the KDC.
            // This is different from an authentication error that occurs during communication
            // with the Zookeeper server, which is handled below.
            LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without "
              + "SASL authentication, if Zookeeper server allows it.");
            eventThread.queueEvent(new WatchedEvent(
              Watcher.Event.EventType.None,
              Watcher.Event.KeeperState.AuthFailed, null));
            saslLoginFailed = true;
        }
    }
    logStartConnect(addr);//打log

    clientCnxnSocket.connect(addr);//開始socket連接
}

總結可以看到主要有下面幾步:

  1. 相關變量初始化;
  2. 找到對應的server地址;
  3. sasl的處理及變量初始化、異常處理;
  4. 打log;
  5. 連接。

primeConnection

先簡單看下代碼:

void primeConnection() throws IOException {
    LOG.info("Socket connection established to "
             + clientCnxnSocket.getRemoteSocketAddress()
             + ", initiating session");
    isFirstConnect = false;
    //seenRwServerBefore會在第一次連接上R/Wserver時設置為true
    long sessId = (seenRwServerBefore) ? sessionId : 0;//之前如果連接過rw的server會把sessionid設置成原來的,否則是0
    ConnectRequest conReq = new ConnectRequest(0, lastZxid,
            sessionTimeout, sessId, sessionPasswd);//構建connectRequest
    synchronized (outgoingQueue) {
        // We add backwards since we are pushing into the front
        // Only send if there's a pending watch
        // TODO: here we have the only remaining use of zooKeeper in
        // this class. It's to be eliminated!
        //重連rw server後,把所有watch的信息,auth的信息都放入outgoingqueue發送給server同步
        if (!disableAutoWatchReset) {//是否設置了自動重置watch的選項
            List<String> dataWatches = zooKeeper.getDataWatches();
            List<String> existWatches = zooKeeper.getExistWatches();
            List<String> childWatches = zooKeeper.getChildWatches();
            if (!dataWatches.isEmpty()
                        || !existWatches.isEmpty() || !childWatches.isEmpty()) {

                Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();
                Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();
                Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();
                long setWatchesLastZxid = lastZxid;

                //遍歷watch集合
                while (dataWatchesIter.hasNext()
                               || existWatchesIter.hasNext() || childWatchesIter.hasNext()) {
                    List<String> dataWatchesBatch = new ArrayList<String>();
                    List<String> existWatchesBatch = new ArrayList<String>();
                    List<String> childWatchesBatch = new ArrayList<String>();
                    int batchLength = 0;

                    // Note, we may exceed our max length by a bit when we add the last
                    // watch in the batch. This isn't ideal, but it makes the code simpler.
                    //最長只能是128kB
                    while (batchLength < SET_WATCHES_MAX_LENGTH) {
                        final String watch;
                        if (dataWatchesIter.hasNext()) {
                            watch = dataWatchesIter.next();
                            dataWatchesBatch.add(watch);
                        } else if (existWatchesIter.hasNext()) {
                            watch = existWatchesIter.next();
                            existWatchesBatch.add(watch);
                        } else if (childWatchesIter.hasNext()) {
                            watch = childWatchesIter.next();
                            childWatchesBatch.add(watch);
                        } else {
                            break;
                        }
                        batchLength += watch.length();
                    }

                    //構件watchset
                    SetWatches sw = new SetWatches(setWatchesLastZxid,
                            dataWatchesBatch,
                            existWatchesBatch,
                            childWatchesBatch);
                    RequestHeader h = new RequestHeader();
                    h.setType(ZooDefs.OpCode.setWatches);//設置請求類型
                    h.setXid(-8);
                    Packet packet = new Packet(h, new ReplyHeader(), sw, null, null);
                    outgoingQueue.addFirst(packet);//加入outgoingqueue
                }
            }
        }

        //auth信息加入outgoingqueue
        for (AuthData id : authInfo) {
            outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
                    OpCode.auth), null, new AuthPacket(0, id.scheme,
                    id.data), null, null));
        }
        outgoingQueue.addFirst(new Packet(null, null, conReq,
                    null, null, readOnly));
    }
    //發送(開始讀寫)
    clientCnxnSocket.enableReadWriteOnly();
    if (LOG.isDebugEnabled()) {
        LOG.debug("Session establishment request sent on "
                + clientCnxnSocket.getRemoteSocketAddress());//打log
    }
}

可以看到,primeConnection的功能主要就是重連rw server後同步watch和auth的信息。主要有三步:1. 設置sessionid;2. 構建同步的數據並加入outgoingqueue;3. 開啟讀寫。

特別要強調的是:

outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly));

在上面auth數據處理之後,會把帶有連接數據的packet放到隊列第一位。確保最先發出去的是連接請求(保證了第一個response是被ClientCnxnSocket#readConnectResult處理)。——這裏參考博客,但我有點疑問,怎麽就保證了收到的第一個一定是這個鏈接的packet呢?

startConnect和primeConnection的關系(引用):

兩者的區別在於NIO的SelectionKey
前者限於connect和accept
後者已經連接完成,開始了write和read,準備開始和zk server完成socket io

有一篇博客講大致創建過程比較好,引用下:

  1. 啟動SendThread
  2. 連接服務器(見SendThread.startConnect)
  3. 產生真正的socket,見ClientCnxnSocketNIO.createSock
  4. 向select註冊一個OP_CONNECT事件並連接服務器,由於是非阻塞連接,此時有可能並不會立即連上,如果連上就會調用SendThread.primeConnection初始化連接來註冊讀寫事件,否則會在接下來的輪詢select獲取連接事件中處理
  5. 復位socket的incomingBuffer
  6. 連接成功後會產生一個connect型的請求發給服務,用於獲取本次連接的sessionid
  7. 進入循環等待來自應用的請求,如果沒有就根據時間來ping 服務器

為什麽要引用這個是因為比較能說明startconnect和primeconnection的區別,在第二步中調用了startconnect建立了連接後調用primeConnection,在startconnect中可以連接和接收消息,在primeConnection()方法中主要初始化Session、Watch和權限信息,同時註冊ClientCnxnSocketNIO對讀時間和寫時間的監聽。

Onconnected

從註釋和函數名很容易看出是socket連接的callback。

/**
 * Callback invoked by the ClientCnxnSocket once a connection has been
 * established.//連接建立後的callback
 * 
 * @param _negotiatedSessionTimeout
 * @param _sessionId
 * @param _sessionPasswd
 * @param isRO
 * @throws IOException
 */
void onConnected(int _negotiatedSessionTimeout, long _sessionId,
        byte[] _sessionPasswd, boolean isRO) throws IOException {
    negotiatedSessionTimeout = _negotiatedSessionTimeout;//為連接timeout賦值
    if (negotiatedSessionTimeout <= 0) {//沒連上
        state = States.CLOSED;//state->closed

        eventThread.queueEvent(new WatchedEvent(
                Watcher.Event.EventType.None,
                Watcher.Event.KeeperState.Expired, null));
        eventThread.queueEventOfDeath();//建立連接失效的event並把代表death的event加入waitingevent的等待隊列

        String warnInfo;
        warnInfo = "Unable to reconnect to ZooKeeper service, session 0x"
            + Long.toHexString(sessionId) + " has expired";
        LOG.warn(warnInfo);
        throw new SessionExpiredException(warnInfo);//打log,拋異常
    }
    if (!readOnly && isRO) {
        LOG.error("Read/write client got connected to read-only server");
    }
    readTimeout = negotiatedSessionTimeout * 2 / 3; //read的timeout為啥設置成真實timeout的2/3,
    connectTimeout = negotiatedSessionTimeout / hostProvider.size();//均分timeout時間
    hostProvider.onConnected();//更改hostprovider裏的狀態
    sessionId = _sessionId;
    sessionPasswd = _sessionPasswd;
    state = (isRO) ?
            States.CONNECTEDREADONLY : States.CONNECTED;//設置連接狀態和session信息
    seenRwServerBefore |= !isRO;
    LOG.info("Session establishment complete on server "
            + clientCnxnSocket.getRemoteSocketAddress()
            + ", sessionid = 0x" + Long.toHexString(sessionId)
            + ", negotiated timeout = " + negotiatedSessionTimeout
            + (isRO ? " (READ-ONLY mode)" : ""));//打log
    KeeperState eventState = (isRO) ?
            KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;//是否是readonly的連接
    eventThread.queueEvent(new WatchedEvent(
            Watcher.Event.EventType.None,
            eventState, null));//吧連接成功的event加入隊列
}

readResponse

這個方法比較長,一段一段來分析。

ByteBufferInputStream bbis = new ByteBufferInputStream(
        incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);//生成反序列化的archive
ReplyHeader replyHdr = new ReplyHeader();

replyHdr.deserialize(bbia, "header");//解析出header

解析出回復頭後開始處理邏輯。

if (replyHdr.getXid() == -2) {//如果是ping的回復
    // -2 is the xid for pings
    if (LOG.isDebugEnabled()) {
        LOG.debug("Got ping response for sessionid: 0x"
                + Long.toHexString(sessionId)
                + " after "
                + ((System.nanoTime() - lastPingSentNs) / 1000000)
                + "ms");//打log
    }
    return;
}
if (replyHdr.getXid() == -4) {//如果是auth的回復
    // -4 is the xid for AuthPacket               
    if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {//如果驗證失敗
        state = States.AUTH_FAILED;//狀態設置                    
        eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, 
            Watcher.Event.KeeperState.AuthFailed, null) );//把驗證失敗加入waitingEvents隊列                                
    }
    if (LOG.isDebugEnabled()) {
        LOG.debug("Got auth sessionid:0x"
                + Long.toHexString(sessionId));//打log
    }
    return;
}
if (replyHdr.getXid() == -1) {//如果是通知
    // -1 means notification
    if (LOG.isDebugEnabled()) {
        LOG.debug("Got notification sessionid:0x"
                  + Long.toHexString(sessionId));//打log
    }
    WatcherEvent event = new WatcherEvent();
    event.deserialize(bbia, "response");

    // convert from a server path to a client path
    //把server端的path轉換成client端的path
    if (chrootPath != null) {
        String serverPath = event.getPath();
        if(serverPath.compareTo(chrootPath)==0)
            event.setPath("/");//把server端地址為chrootPath作為根節點
        else if (serverPath.length() > chrootPath.length())
            event.setPath(serverPath.substring(chrootPath.length()));//獲取地址
        else {
            LOG.warn("Got server path " + event.getPath()
                     + " which is too short for chroot path "
                     + chrootPath);//server端地址比chrootPath.length()不正常,打log
        }
    }

    WatchedEvent we = new WatchedEvent(event);//WatcherEvent生成WatchedEvent
    if (LOG.isDebugEnabled()) {
        LOG.debug("Got " + we + " for sessionid 0x"
                  + Long.toHexString(sessionId));
    }

    eventThread.queueEvent( we );//加入waitingEvents隊列 
    return;
}
// If SASL authentication is currently in progress, construct and
// send a response packet immediately, rather than queuing a
// response as with other packets.
//sasl驗證失敗或者驗證在進行中就發送一個請求(不排隊)
if (clientTunneledAuthenticationInProgress()) {
    GetSASLRequest request = new GetSASLRequest();
    request.deserialize(bbia,"token");
    zooKeeperSaslClient.respondToServer(request.getToken(),
      ClientCnxn.this);
    return;
}
Packet packet;
synchronized (pendingQueue) {//處理pendingqueue
    //從前面代碼的分析可知,auth和ping以及正在處理的sasl不在pendingqueue中(不會走到這一步),而觸發的watch也不pendingqueue中,是server發過來去watchmanager裏去找的,但是異步的AsyncCallBack在
    if (pendingQueue.size() == 0) {
        throw new IOException("Nothing in the queue, but got "
                + replyHdr.getXid());//沒有元素
    }
    packet = pendingQueue.remove();//獲取元素
}
/*
 * Since requests are processed in order, we better get a response
 * to the first request!
 */
try {
    if (packet.requestHeader.getXid() != replyHdr.getXid()) {//順序錯誤
        packet.replyHeader.setErr(
                KeeperException.Code.CONNECTIONLOSS.intValue());
        throw new IOException("Xid out of order. Got Xid "
                + replyHdr.getXid() + " with err " +
                + replyHdr.getErr() +
                " expected Xid "
                + packet.requestHeader.getXid()
                + " for a packet with details: "
                + packet );
    }

    //屬性設置
    packet.replyHeader.setXid(replyHdr.getXid());
    packet.replyHeader.setErr(replyHdr.getErr());
    packet.replyHeader.setZxid(replyHdr.getZxid());
    if (replyHdr.getZxid() > 0) {
        lastZxid = replyHdr.getZxid();
    }
    if (packet.response != null && replyHdr.getErr() == 0) {
        packet.response.deserialize(bbia, "response");//反序列化response
    }

    if (LOG.isDebugEnabled()) {
        LOG.debug("Reading reply sessionid:0x"
                + Long.toHexString(sessionId) + ", packet:: " + packet);//打log
    }
} finally {
    finishPacket(packet);//加入waitingevents隊列,之前watcher工作機制時講到watcher註冊後會把packet加入隊列中
}

總結:

  1. 反序列化response;
  2. 根據回復頭來處理,如果是ping,auth和sasl直接處理後返回,不會加入waitingevent隊列;
  3. 如果是server的通知表示是event,加入隊列
  4. 處理pendingqueue裏已經發送的packet。

run

run方法是sendthread乃至建立連接最核心的方法,內容也比較長,我們一節一節來看。

@Override
public void run() {
    clientCnxnSocket.introduce(this,sessionId);
    clientCnxnSocket.updateNow();
    clientCnxnSocket.updateLastSendAndHeard();
    int to;
    long lastPingRwServer = Time.currentElapsedTime();
    final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds

很明顯,這一段最開始的代碼就是clientCnxnSocket相關的初始化工作。

while (state.isAlive()) {
    try {
        if (!clientCnxnSocket.isConnected()) {//未連接
            if(!isFirstConnect){
                try {
                    Thread.sleep(r.nextInt(1000));
                } catch (InterruptedException e) {
                    LOG.warn("Unexpected exception", e);
                }
            }
            // don't re-establish connection if we are closing
            if (closing || !state.isAlive()) {//這裏最後有用到
                break;
            }
            startConnect();//連接
            clientCnxnSocket.updateLastSendAndHeard();
        }

然後開始檢查和服務器的連接狀態,如果沒有連接就會調用startConnect()去連接server;如果已經連接上了那麽就會定期發送ping來維持心跳檢測。

if (state.isConnected()) {
    // determine whether we need to send an AuthFailed event.
    if (zooKeeperSaslClient != null) {
        boolean sendAuthEvent = false;
        if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {//sasl狀態
            try {
                zooKeeperSaslClient.initialize(ClientCnxn.this);//sasl初始化,後面再研究
            } catch (SaslException e) {
               LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
                state = States.AUTH_FAILED;
                sendAuthEvent = true;
            }
        }
        KeeperState authState = zooKeeperSaslClient.getKeeperState();//連接狀態
        if (authState != null) {
            if (authState == KeeperState.AuthFailed) {//驗證失敗
                // An authentication error occurred during authentication with the Zookeeper Server.
                state = States.AUTH_FAILED;
                sendAuthEvent = true;
            } else {
                if (authState == KeeperState.SaslAuthenticated) {
                    sendAuthEvent = true;
                }
            }
        }

        if (sendAuthEvent == true) {
            eventThread.queueEvent(new WatchedEvent(
                  Watcher.Event.EventType.None,
                  authState,null));
        }
    }
    //readTimeout = 2/3的sessiontimeout
    to = readTimeout - clientCnxnSocket.getIdleRecv();//如果已經連接上,預計讀超時時間 - 距離上次讀已經過去的時間
} else {
    to = connectTimeout - clientCnxnSocket.getIdleRecv();//如果沒連接上,預計連接時間 - 上次讀已經過去的時間,這兩次獲得的就是是否超過了預計的timeout時間
}

if (to <= 0) {//超時
    String warnInfo;
    warnInfo = "Client session timed out, have not heard from server in "
        + clientCnxnSocket.getIdleRecv()
        + "ms"
        + " for sessionid 0x"
        + Long.toHexString(sessionId);
    LOG.warn(warnInfo);
    throw new SessionTimeoutException(warnInfo);//打log,拋異常
}
if (state.isConnected()) {//如果是連接狀態
   //1000(1 second) is to prevent race condition missing to send the second ping
   //also make sure not to send too many pings when readTimeout is small 
//計算到下次ping的時間,這裏做了優化,如果設置的時間過小會有個調節機制
    int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - 
          ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
    //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
    if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {//如果已經過了一半的readtimeout時間或者超過十秒沒有ping
        sendPing();//發送ping
        clientCnxnSocket.updateLastSend();
    } else {
        if (timeToNextPing < to) {//如果預計下次ping的時間 < 實際距離下次ping的時間
            to = timeToNextPing;
        }
    }
}

接下來是根據連接到的server的狀態來決策,如果是只讀的server,會自動去尋找讀寫的server。

// If we are in read-only mode, seek for read/write server
if (state == States.CONNECTEDREADONLY) {//連接是CONNECTEDREADONLY,那麽連接到的server就是read-only的
    long now = Time.currentElapsedTime();
    int idlePingRwServer = (int) (now - lastPingRwServer);//離上次ping讀寫server的時間
    if (idlePingRwServer >= pingRwTimeout) {
        lastPingRwServer = now;//更新連接讀寫server的時間
        idlePingRwServer = 0;
        pingRwTimeout =
            Math.min(2*pingRwTimeout, maxPingRwTimeout);
        pingRwServer();//嘗試去連接讀寫server
    }
    to = Math.min(to, pingRwTimeout - idlePingRwServer);
}

然後會把發送outgoingqueue中的請求數據並讀取server的回復。

clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);

最後是一些清理工作和對連接斷開的處理。這裏已經跳出了上面的循環,有幾個地方需要註意:

cleanup();//連接和幾個queue的清理
clientCnxnSocket.close();//關閉連接
if (state.isAlive()) {//1??
    eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
            Event.KeeperState.Disconnected, null));//加入一個斷開連接的event
}
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
        "SendThread exited loop for session: 0x"
               + Long.toHexString(getSessionId()));//打log

1??上面循環的條件就是while(state.isAlive()),之所以跳出了循環這裏還要判斷state狀態的原因是

// don't re-establish connection if we are closing
if (closing || !state.isAlive()) {
    break;
}

在run方法最開始的代碼中會去判斷closing的狀態,closing是在client主動發送斷開連接的消息後closing才會設置成為false,而run方法中的while循環跳出來且state是alive的狀態只有可能是client端主動發送了斷開連接的消息,這時就給eventthread增加一個斷開連接的事件去處理。

總結,run方法主要做了下面幾個工作:

  1. clientCnxnSocket及相關參數的初始化;
  2. 如果client端沒有連接上server就會去嘗試連接;
  3. 無論是否連接上會去檢測連接是否超時;
  4. 如果已經連接上了那麽會定期去發送心跳檢測和server的連接狀態;
  5. 如果連接到了readonly的server,會主動去連接讀寫的server;
  6. 發送outgoingqueue裏的請求並接受server的回復;
  7. 包含連接斷開的相關清理工作。

到這裏創建階段基本就結束了,感覺這個過程主要的流程和一些處理大致明白了,但是過程中有非常多的細節,這可能需要以後如果有用到的地方再仔細看看。

思考

primeConnection關於request和response的順序問題

如前面所說,怎麽保證順序

RW和readonly模式

server的這兩種模式各自條件是?

session的工作機制

sessId, sessionId

其實sessId就是sessionId,seenRwServerBefore在第一次連接時會被設置為true,sessId在未連接時為0,第一次建立連接時構建的ConnectRequest中會設置sessionId為0。

不在pendingqueue裏的請求

auth和ping以及正在處理的sasl沒有加入pendingQueue,觸發的watch也沒有在pendingQueue中。根據上一篇的參考第一篇中notification event的介紹可以知道觸發的watch是server的主動通知,不會存在pendingqueue中。針對auth和ping的處理,在第八篇裏當時對replyHdr的xid不是很清楚,當時思考裏也提了這個問題,現在可以知道是針對auth和ping的。

sendping和pingRwServer

前者是心跳驗證,後者是連接到readonly的server後嘗試連接讀寫server。

大致連接過程

首先與ZooKeeper服務器建立連接,有兩層連接要建立。

  1. 客戶端與服務器端的TCP連接

  2. 在TCP連接的基礎上建立session關聯 建立TCP連接之後,客戶端發送ConnectRequest請求,申請建立session關聯,此時服務器端會為該客戶端分配sessionId和密碼,同時開啟對該session是否超時的檢測。
  3. 當在sessionTimeout時間內,即還未超時,此時TCP連接斷開,服務器端仍然認為該sessionId處於存活狀態。 此時,客戶端會選擇下一個ZooKeeper服務器地址進行TCP連接建立,TCP連接建立完成後,拿著之前的sessionId和密碼發送ConnectRequest請求,如果還未到該sessionId的超時時間,則表示自動重連成功。 對客戶端用戶是透明的,一切都在背後默默執行,ZooKeeper對象是有效的。
  4. 如果重新建立TCP連接後,已經達到該sessionId的超時時間了(服務器端就會清理與該sessionId相關的數據),則返回給客戶端的sessionTimeout時間為0,sessionid為0,密碼為空字節數組。 客戶端接收到該數據後,會判斷協商後的sessionTimeout時間是否小於等於0,如果小於等於0,則使用eventThread線程先發出一個KeeperState.Expired事件,通知相應的Watcher。 然後結束EventThread線程的循環,開始走向結束。此時ZooKeeper對象就是無效的了,必須要重新new一個新的ZooKeeper對象,分配新的sessionId了。

參考

https://www.jianshu.com/p/f69e6de5f169

http://www.cnblogs.com/leesf456/p/6098255.html

https://my.oschina.net/pingpangkuangmo/blog/486780

https://blog.csdn.net/cnh294141800/article/details/53039482

https://www.cnblogs.com/shangxiaofei/p/7171882.html

《從Paxos到Zookeeper》

Zookeeper源碼閱讀(九) ZK Client-Server(2)