1. 程式人生 > >Zookeeper原始碼閱讀(十三) Seesion(2)

Zookeeper原始碼閱讀(十三) Seesion(2)

前言

前一篇主要介紹了zookeeper的session的狀態,狀態之間的切換以及和session有關的實體session介面和sessiontrackimpl類的相關屬性。這一篇主要詳細說下session相關的流程。

session的建立

在ZookeeperServer的processConnectRequest方法中處理了session的建立和重新啟用的請求。

public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
    ...
    cnxn.setSessionTimeout(sessionTimeout);
    // We don't want to receive any packets until we are sure that the
    // session is setup
    cnxn.disableRecv();
    long sessionId = connReq.getSessionId();
    if (sessionId != 0) {//如果sessionId已經存在
        long clientSessionId = connReq.getSessionId();
        LOG.info("Client attempting to renew session 0x"
                 + Long.toHexString(clientSessionId)
                 + " at " + cnxn.getRemoteSocketAddress());
        serverCnxnFactory.closeSession(sessionId);
        cnxn.setSessionId(sessionId);
        reopenSession(cnxn, sessionId, passwd, sessionTimeout);//重新啟用session
    } else {
        LOG.info("Client attempting to establish new session at "
                 + cnxn.getRemoteSocketAddress());
        createSession(cnxn, passwd, sessionTimeout);//建立session
    }
}

之後經過ZookeeperServer(createSession)->SessionTrackerImpl(createSession)->SessionTrackerImpl(addSession),在addSession方法中新建了session物件。

synchronized public void addSession(long id, int sessionTimeout) {
    sessionsWithTimeout.put(id, sessionTimeout);
    if (sessionsById.get(id) == null) {
        SessionImpl s = new SessionImpl(id, sessionTimeout, 0);//建立session物件,id的初始化和生成在前一篇有詳細講過
        sessionsById.put(id, s);//設定屬性
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,//打log
                    "SessionTrackerImpl --- Adding session 0x"
                    + Long.toHexString(id) + " " + sessionTimeout);
        }
    } else {
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
                    "SessionTrackerImpl --- Existing session 0x"
                    + Long.toHexString(id) + " " + sessionTimeout);
        }
    }
    touchSession(id, sessionTimeout);//session啟用,後面詳細介紹
}

除了sessionid的生成外,關於timeout也需要強調下,timeout由客戶端確定(在客戶端),但必須在伺服器規定的最大的(20ticktime)和最小的timeout(ticktime2)之間。

可以看到,session建立的場景是比較簡單的,但是這是zookeeper server端處理session請求的部分,而zookeeper client端更新session的場景主要分為兩大類:

  1. 普通的讀寫請求,從上面可以看到,server在每次處理客戶端的讀寫請求時都會有啟用session的操作;
  2. ping請求。在第十篇client-server(2)中有簡單提到過。
if (state.isConnected()) {
   //1000(1 second) is to prevent race condition missing to send the second ping
   //also make sure not to send too many pings when readTimeout is small 
    int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);//根據和上次傳送時間的間隔和根據api傳入的sessiontimeout來決定,readtimeout = 1/2的sessiontimeout(SendThread的onconnected方法中賦值)
    //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
    if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
        sendPing();
        clientCnxnSocket.updateLastSend();
    } else {
        if (timeToNextPing < to) {
            to = timeToNextPing;
        }
    }
}

可以看到,如果沒有其他的請求,client端大約會在1/3的sessiontimeout時間後會去嘗試ping server,而這個請求在server端也會被processconnectrequest處理,也會達到更新session的作用。

在client端傳送請求後,server端會建立/啟用session,在啟用session的過程中會有一些策略的處理,首先先看下session的分桶策略處理。

分桶

Zookeeper的會話管理主要是通過SessionTracker來負責,其採用了分桶策略(將類似的會話放在同一區塊中進行管理)進行管理,以便Zookeeper對會話進行不同區塊的隔離處理以及同一區塊的統一處理。

在之前提到的,在SessionTracker中主要存了三分和session有關的資料,sessionById:這是一個HashMap<Long,SessionImpl>型別的資料結構,用於根據sessionID來管理Session實體。sessionWithTimeout:這是一個ConcurrentHashMap<Long,Integer>型別的資料結構,用於根據sessionID來管理會話的超時時間;sessionSets:這是一個HashMap<Long,SessionSet>型別的資料結構,用於根據下次會話超時時間點來歸檔會話,便於進行會話管理和超時檢查。

而其實所謂的分桶策略,就是按照超時時間把不同的session放到一起統一管理,對session超時等判斷或處理也是按超時時間為單位進行操作,這樣也能大大提高效率,也就是說sessionSets中,每一個SessionSet(實際上是一個SessionImpl的hashset)就是一個桶,而桶的標識就是過期時間。

過期時間計算
long expireTime = roundToInterval(Time.currentElapsedTime() + timeout);//timeout = 介面傳入的sessiontimeout

過期時間是在roundToInterval方法中計算的。

private long roundToInterval(long time) {
    // We give a one interval grace period
    return (time / expirationInterval + 1) * expirationInterval;//expirationInterval一般情況是ticktime
}

這裡計算出的expireTime就是sessionSets中每一個桶的key,可以知道的是,每一個桶的expireTime最終計算結果一定是expirationInterval的倍數,而不同的session會根據他們啟用時間的不同放到不同的桶裡。

Session啟用

在建立session後,每次client發起請求(ping或者讀寫請求),server端都會重新啟用session,而這個過程就是session的啟用,也就是所謂的touchSession。會話啟用的過程使server可以檢測到client的存活並讓client保持連線狀態。整個過程的流程圖如下:

在程式碼中主要由SessionTrackerImpl的touchSession方法處理:

synchronized public boolean touchSession(long sessionId, int timeout) {
    if (LOG.isTraceEnabled()) {//打log
        ZooTrace.logTraceMessage(LOG,
                                 ZooTrace.CLIENT_PING_TRACE_MASK,
                                 "SessionTrackerImpl --- Touch session: 0x"
                + Long.toHexString(sessionId) + " with timeout " + timeout);
    }
    SessionImpl s = sessionsById.get(sessionId);//根據id從(id, session)mapping關係中取出對應的session
    // Return false, if the session doesn't exists or marked as closing
    if (s == null || s.isClosing()) {//判斷session的狀態,如果已經關閉就返回
        return false;
    }
    long expireTime = roundToInterval(Time.currentElapsedTime() + timeout);//計算新的超時時間
    if (s.tickTime >= expireTime) {//表明和上次啟用還在同一個桶中
        // Nothing needs to be done
        return true;
    }
    SessionSet set = sessionSets.get(s.tickTime);//從老桶中刪除
    if (set != null) {
        set.sessions.remove(s);
    }
    s.tickTime = expireTime;//更新過期時間
    set = sessionSets.get(s.tickTime);
    if (set == null) {
        set = new SessionSet();
        sessionSets.put(expireTime, set);
    }
    set.sessions.add(s);//放入新的桶中
    return true;
}

這樣整個session啟用的流程就很清晰了,client會在讀寫或者ping時更新session,而server端收到請求後啟用session,啟用的過程分為下面幾步:

  1. 檢查會話是否已經被關閉;
  2. 計算新的超時時間;
  3. 定位session當前的桶,並從老桶中刪除;
  4. 根據新的超時時間把session遷移到新的桶中。

session超時檢查

Zk內部利用SessionTracker管理Session的整個會話生命週期。SessionTracker的實現類SessionTrackerImpl本身就是一個session的超時檢查執行緒逐個地對會話桶中剩下的會話進行清理。之前已經說過,session被啟用後會被轉移到新的桶中,現在按時間先後有t1, t2兩個時間點,如果到了t2時間點,t1時間對應的桶內還有元素,那麼代表t1對應的桶內的session全部過期,需要進行相關的清理工作,而實際上zk就是這麼工作的,也即SessionTrackerImpl執行緒的檢查內容。

@Override
synchronized public void run() {
    try {
        while (running) {
            currentTime = Time.currentElapsedTime();
            if (nextExpirationTime > currentTime) {//如果還沒有到過期檢測時間
                this.wait(nextExpirationTime - currentTime);//wait住
                continue;
            }
            SessionSet set;
            set = sessionSets.remove(nextExpirationTime);//把nextExpirationTime對應桶的session全部刪掉,已經全部過期
            if (set != null) {
                for (SessionImpl s : set.sessions) {
                    setSessionClosing(s.sessionId);//設定關閉狀態
                    expirer.expire(s);//發起會話關閉請求
                }
            }
            nextExpirationTime += expirationInterval;//更新下一次session超時檢查時間
             }
    } catch (InterruptedException e) {
        handleException(this.getName(), e);
    }
    LOG.info("SessionTrackerImpl exited loop!");
}

注意:之前提到session桶的key一定是expirationInterval的整數倍,而在SessionTrackerImpl中超時檢測的過程中,是以expirationInterval為單位去進行檢測的,每次增加一個expirationInterval的時間,這兩邊互相的配合保證了能檢測到所有session的桶。

會話清理

在進行了session的超時檢查後,除了把session從SessionTrackerImpl中刪除,還需要針對每個session進行清理。SessionTrackerImpl中的expirer.expire(s)便發起了session清理的請求,開始了這個過程。

標記session狀態為closing

因為進行會話清理的工作相對來說較為耗時,而setSessionClosing(s.sessionId)把isClosing設定為了false,這樣在server收到來自客戶端的請求時在PrepRequestProcessor的pRequest2Txn和pRequest中會去checkSession來檢查session的狀態,發現session處於closing狀態後便不會處理響應的請求了。

case OpCode.create:                
    zks.sessionTracker.checkSession(request.sessionId, request.getOwner());

這是收到客戶端create請求後檢查的程式碼,其他的請求也是一樣,無法通過checksession狀態。

發起關閉session的請求

為了使對該會話的關閉操作在整個服務端叢集都生效,Zookeeper使用了提交會話關閉請求的方式,並立即交付給PreRequestProcessor進行處理。

public void expire(Session session) {//zookeeperserver
    long sessionId = session.getSessionId();
    LOG.info("Expiring session 0x" + Long.toHexString(sessionId)
            + ", timeout of " + session.getTimeout() + "ms exceeded");
    close(sessionId);
}
private void close(long sessionId) {//zookeeperserver
    submitRequest(null, sessionId, OpCode.closeSession, 0, null, null);//通過zookeeper server的submitrequest方法加入到佇列中等待PrepRequestProcessor處理
}
public void submitRequest(Request si) {
    ...
    try {
            touch(si.cnxn);
            boolean validpacket = Request.isValid(si.type);
            if (validpacket) {
                firstProcessor.processRequest(si);//加入佇列
    ...
public void processRequest(Request request) {//
    // request.addRQRec(">prep="+zks.outstandingChanges.size());
    submittedRequests.add(request);
}

通過發起關閉請求的過程可以看到這是一個非同步的請求過程,和client-server間的非同步互動有異曲同工之妙,這裡完成了生產者的部分,把請求加入到了一個待處理佇列中。

清理臨時節點

熟悉Zk的同學都知道,zk的session一旦失效,那麼在失效session建立的臨時節點資訊就會全部丟失。因此,在這裡做session相關的清理工作時需要把session相關的臨時節點資訊全部清除。Zookeeper在記憶體資料庫中會為每個會話都單獨儲存了一份由該會話維護的所有臨時節點集合,因此只需要根據失效session的id把臨時節點列表取到並刪除即可。但是在實際場景中有兩種特殊情況:

  1. 節點刪除請求,且刪除的節點正是上述臨時節點列表中的一個;
  2. 臨時節點更新(建立,修改)請求,建立目標節點正好是上述臨時節點列表中的一個。

對於第一類請求,需要將所有請求對應的資料節點路徑從當前臨時節點列表中移出,以避免重複刪除,對於第二類請求,需要將所有這些請求對應的資料節點路徑新增到當前臨時節點列表中,以刪除這些即將被建立但是尚未儲存到記憶體資料庫中的臨時節點。

前面在發起關閉session請求中把請求加入到了submittedRequests佇列中,在PrepRequestProcessor的run -> pRequest方法中進行了消費。

protected void pRequest(Request request) throws RequestProcessorException {
    // LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
    // request.type + " id = 0x" + Long.toHexString(request.sessionId));
    request.hdr = null;
    request.txn = null;
    ...
    case OpCode.closeSession:
        pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
        break;
case OpCode.closeSession:
    // We don't want to do this check since the session expiration thread
    // queues up this operation without being the session owner.
    // this request is the last of the session so it should be ok
    //zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
    HashSet<String> es = zks.getZKDatabase()
            .getEphemerals(request.sessionId);//從zk的記憶體資料庫中取出臨時節點集合
    synchronized (zks.outstandingChanges) {
        for (ChangeRecord c : zks.outstandingChanges) {//遍歷zk的事務佇列
            //addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path,
            //            null, -1, null));
            //這是zk delete節點是增加的changeRecord,根據stat是null可以知道有事務是刪除節點的,需要把此節點從臨時節點列表中刪除
            if (c.stat == null) {
                // Doing a delete
                es.remove(c.path);//從臨時節點列表中刪除
        } else if (c.stat.getEphemeralOwner() == request.sessionId) {//建立和修改時stat不為null,且只要sessionid為當前session的id
                es.add(c.path);//把臨時節點路徑加入set
            }
        }
        for (String path2Delete : es) {//遍歷
            addChangeRecord(new ChangeRecord(request.hdr.getZxid(),//新增臨時節點刪除的事件
                    path2Delete, null, 0, null));
        }

        zks.sessionTracker.setSessionClosing(request.sessionId);//把
    }

    LOG.info("Processed session termination for sessionid: 0x"
            + Long.toHexString(request.sessionId));
    break;
void addChangeRecord(ChangeRecord c) {
    synchronized (zks.outstandingChanges) {
        zks.outstandingChanges.add(c);
        zks.outstandingChangesForPath.put(c.path, c);
    }
}

這樣可以看到事務變更的過程也是一個非同步的過程,這裡添加了臨時節點刪除的事件到佇列中,但是這裡其實並不是典型的佇列,因為outstandingChanges並不是嚴格意義上的佇列,而是一個arraylist。

而FinalRequestProcessor是此佇列的消費者,在processRequest方法中完成了事務佇列的處理。

synchronized (zks.outstandingChanges) {
    ...
    while (!zks.outstandingChanges.isEmpty()
        && zks.outstandingChanges.get(0).zxid <= request.zxid) {//遍歷outstandingChanges
        ChangeRecord cr = zks.outstandingChanges.remove(0);
        if (cr.zxid < request.zxid) {//zxid比request的版本要早,過期
            LOG.warn("Zxid outstanding "
                    + cr.zxid
                    + " is less than current " + request.zxid);
        }
        if (zks.outstandingChangesForPath.get(cr.path) == cr) {//路徑和事務記錄mapping關係刪除
            zks.outstandingChangesForPath.remove(cr.path);
        }
    }
    if (request.hdr != null) {
       TxnHeader hdr = request.hdr;
       Record txn = request.txn;

       rc = zks.processTxn(hdr, txn);//根據事務型別處理事務,closeseesion就是在zookeeperserver中sessionTracker.removeSession(sessionId);
    }
    // do not add non quorum packets to the queue.
    if (Request.isQuorum(request.type)) {
        zks.getZKDatabase().addCommittedProposal(request);//叢集操作,提出proposal,保證對叢集中所有例項可見
    }
}

在processRequest中可以看到在消費事務的記錄過程中,會根據request的型別進行事務處理,呼叫了zookeeperserver的processTxn方法。

public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
    ProcessTxnResult rc;
    int opCode = hdr.getType();
    long sessionId = hdr.getClientId();//獲取sessionid
    //case OpCode.closeSession:
    //killSession(header.getClientId(), header.getZxid());
    //從datatree中刪除session相關的臨時節點
    rc = getZKDatabase().processTxn(hdr, txn);
    if (opCode == OpCode.createSession) {//createsession
        if (txn instanceof CreateSessionTxn) {
            CreateSessionTxn cst = (CreateSessionTxn) txn;
            sessionTracker.addSession(sessionId, cst//增加session
                    .getTimeOut());
        } else {
            LOG.warn("*****>>>>> Got "
                    + txn.getClass() + " "
                    + txn.toString());
        }
    } else if (opCode == OpCode.closeSession) {
        sessionTracker.removeSession(sessionId);//利用sessionTracker刪除session
    }
    return rc;
}

具體刪除session的操作:

synchronized public void removeSession(long sessionId) {
    SessionImpl s = sessionsById.remove(sessionId);//從sessionsById中移除
    sessionsWithTimeout.remove(sessionId);//移除sessionid和對應的timeout
    if (LOG.isTraceEnabled()) {
        ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,//打log
                "SessionTrackerImpl --- Removing session 0x"
                + Long.toHexString(sessionId));
    }
    if (s != null) {
        SessionSet set = sessionSets.get(s.tickTime);//把session從桶裡刪除
        // Session expiration has been removing the sessions   
        if(set != null){
            set.sessions.remove(s);
        }
    }
}

到這裡session及相關的臨時節點的有關資料就被刪除完了。其實清理session臨時節點的過程還是比較長的,總結下可以分為下面幾步:

  1. 收集需要清理的臨時節點,收集的方法是從zkdatabase中利用mapping關係取出,並對兩種特殊情況有相應的處理;
  2. 新增節點刪除事務變更,這裡是把outstandingChanges當做了一個佇列(也可以理解為不是,但是作用類似),在outstandingChanges儲存了新來的事務變更的記錄;
  3. 真正刪除臨時節點,在zkdatabase中把datatree中的和失效session有關的臨時節點全部刪除;
  4. 刪除失效session,這是通過把SessionTrackerImpl中的sessionsById, sessionSets, sessionsWithTimeout失效session的記錄都刪除做到的。

關閉NIOServerCnxn

在FinalRequestProcessor的processRequest方法中會根據請求型別處理serverCnxn,如果是closeSession就會通過NIOServerCnxnFactory找到對應的NIOServerCnxn並將其刪除。

if (request.hdr != null && request.hdr.getType() == OpCode.closeSession) {
    ServerCnxnFactory scxn = zks.getServerCnxnFactory();
    // this might be possible since
    // we might just be playing diffs from the leader
    if (scxn != null && request.cnxn == null) {
        // calling this if we have the cnxn results in the client's
        // close session response being lost - we've already closed
        // the session/socket here before we can send the closeSession
        // in the switch block below
        scxn.closeSession(request.sessionId);//關閉connection
        return;
    }
}

在server端關閉連線後,server還會把buffer中的關閉連線請求傳送給客戶端。

case OpCode.closeSession: {
    lastOp = "CLOS";
    closeSession = true;//設定closeSession狀態
    err = Code.get(rc.err);
    break;
}
try {
    cnxn.sendResponse(hdr, rsp, "response");
    if (closeSession) {
        cnxn.sendCloseSession();//傳送關閉連線請求給客戶端
    }

session重連

當客戶端與服務端之間的網路連線斷開時,Zookeeper客戶端會自動進行反覆的重連,直到最終成功連線上Zookeeper叢集中的一臺機器。此時,再次連線上服務端的客戶端有可能處於以下兩種狀態之一。

  1. connected:在會話超時時間內連線上了叢集中的任意一臺server;
  2. expired:連線上server時已經超時,服務端其實已經進行了會話清理操作,再次連線上的session會被視為非法會話。

在客戶端與服務端之間維持的是一個長連線,在sessionTimeout時間內,服務端會不斷地檢測該客戶端是否還處於正常連線(SessionTrackerImpl),服務端會將客戶端的每次操作視為一次有效的心跳檢測來反覆地進行會話啟用。因此,在正常情況下,客戶端會話時一直有效的。然而,當客戶端與服務端之間的連線斷開後,使用者在客戶端可能主要看到兩類異常:CONNECTION_LOSS(連線斷開)和SESSION_EXPIRED(會話過期)。

CONNECTION_LOSS

若客戶端在setData時出現了CONNECTION_LOSS現象,此時客戶端會收到None-Disconnected通知,同時會丟擲異常。應用程式需要捕捉異常並且等待Zookeeper客戶端自動完成重連,一旦重連成功,那麼客戶端會收到None-SyncConnected通知,之後就可以重試setData操作。

這裡給出客戶端判斷connection丟失重連的情景:

  1. 無法連線上server
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
    to = connectTimeout - clientCnxnSocket.getIdleRecv();
}

if (to <= 0) {
    String warnInfo;
    warnInfo = "Client session timed out, have not heard from server in "
        + clientCnxnSocket.getIdleRecv()
        + "ms"
        + " for sessionid 0x"
        + Long.toHexString(sessionId);
    LOG.warn(warnInfo);
    throw new SessionTimeoutException(warnInfo);//丟擲session超時異常
}

在clientCnxn的sendthread的run方法中對幾種exception進行了處理:

// this is ugly, you have a better way speak up
if (e instanceof SessionExpiredException) {
    LOG.info(e.getMessage() + ", closing socket connection");
} else if (e instanceof SessionTimeoutException) {
    LOG.info(e.getMessage() + RETRY_CONN_MSG);
} else if (e instanceof EndOfStreamException) {
    LOG.info(e.getMessage() + RETRY_CONN_MSG);
} else if (e instanceof RWServerFoundException) {
    LOG.info(e.getMessage());
} else if (e instanceof SocketException) {
    LOG.info("Socket error occurred: {}: {}", serverAddress, e.getMessage());
} else {
    LOG.warn("Session 0x{} for server {}, unexpected error{}",
                    Long.toHexString(getSessionId()),
                    serverAddress,
                    RETRY_CONN_MSG,
                    e);
}
cleanup();//清理客戶端的佇列等的殘留
private void cleanup() {
    clientCnxnSocket.cleanup();
    synchronized (pendingQueue) {//清除pendingqueue中的request
        for (Packet p : pendingQueue) {
            conLossPacket(p);
        }
        pendingQueue.clear();
    }
    synchronized (outgoingQueue) {//清除outgoingqueue中的packet
        for (Packet p : outgoingQueue) {
            conLossPacket(p);
        }
        outgoingQueue.clear();
    }

Session Expired

客戶端與服務端斷開連線後,重連時間耗時太長,超過了會話超時時間限制後沒有成功連上伺服器,伺服器會進行會話清理,此時,客戶端不知道會話已經失效,狀態還是DISCONNECTED,如果客戶端重新連上了伺服器,此時狀態為SESSION_EXPIRED,用於需要重新例項化Zookeeper物件,並且看應用的複雜情況,重新恢復臨時資料。

  1. 連線server時間過長:
void onConnected(int _negotiatedSessionTimeout, long _sessionId,
        byte[] _sessionPasswd, boolean isRO) throws IOException {
    negotiatedSessionTimeout = _negotiatedSessionTimeout;
    if (negotiatedSessionTimeout <= 0) {
        state = States.CLOSED;
       ...
        warnInfo = "Unable to reconnect to ZooKeeper service, session 0x"
            + Long.toHexString(sessionId) + " has expired";
        LOG.warn(warnInfo);
        throw new SessionExpiredException(warnInfo);//丟擲session過期異常
    }

這裡跑出了session超時的異常,也會被上面的函式捕捉並清理佇列重新進行連線。

Session Moved

客戶端會話從一臺伺服器轉移到另一臺伺服器,即客戶端與服務端S1斷開連線後,重連上了服務端S2,此時會話就從S1轉移到了S2。當多個客戶端使用相同的sessionId/sessionPasswd建立會話時,會收到SessionMovedException異常。因為一旦有第二個客戶端連線上了服務端,就被認為是會話轉移了。

synchronized public void checkSession(long sessionId, Object owner) throws KeeperException.SessionExpiredException, KeeperException.SessionMovedException {
    SessionImpl session = sessionsById.get(sessionId);
    if (session == null || session.isClosing()) {
        throw new KeeperException.SessionExpiredException();
    }
    if (session.owner == null) {
        session.owner = owner;
    } else if (session.owner != owner) {//session的owner不一致
        throw new KeeperException.SessionMovedException();
    }
}

思考

針對zkdatabase 臨時節點列表的兩種特殊處理

針對刪除的請求對應的節點直接把節點從臨時節點列表刪除,這樣在刪除的事務中會把節點真正刪掉;而更新的則不需要判斷,因為除了刪除請求外其他事務請求中只要sessionid和失效sessionid一致的都加入待刪除佇列就OK。而之所以要處理更新的節點是因為如果增加節點那麼很有可能這些幾點還沒有寫入記憶體資料庫中,需要在這裡再掃描一次事務佇列,確保不會有臨時節點的殘留。

 for (ChangeRecord c : zks.outstandingChanges) {//遍歷zk的事務佇列
            //addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path,
            //            null, -1, null));
            //這是zk delete節點是增加的changeRecord,根據stat是null可以知道有事務是刪除節點的,需要把此節點從臨時節點列表中刪除
            if (c.stat == null) {
                // Doing a delete
                es.remove(c.path);//從臨時節點列表中刪除
        } else if (c.stat.getEphemeralOwner() == request.sessionId) {//建立和修改時stat不為null,且只要sessionid為當前session的id
                es.add(c.path);//把臨時節點路徑加入set
            }
        }

session owner

每一個ServerCnxn物件都有一個me屬性(object)標識自己,在zookeeperserver中si.setOwner(ServerCnxn.me);設定了request的owner,也初始化為session的owner。

參考

https://www.jianshu.com/p/594129a44814

http://www.cnblogs.com/leesf456/p/6103870.html

https://blog.csdn.net/pwlazy/article/details/8157368

https://www.jianshu.com/p/3b7f9a032ded

https://www.cnblogs.com/davidwang456/p/5009659.html

zk session程式碼在3.5之後有修改,可參考https://blog.csdn.net/jpf254/article/details/80804458

《從paxos到zookeeper》