ZooKeeper中的會話機制
在本文中將對zk的會話機制進行總結
相關的類
- SessionTracker
- SessionTrackerImpl
會話狀態
常見的幾種會話狀態如下:
- CONNECTING,正在連線
- CONNECTED, 已連線
- RECONNECTING,正在重連
- RECONNECTED,已重連
- CLOSE,會話關閉
連線建立的初始化階段,客戶端的狀態會變成CONNECTING,同時客戶端會從伺服器地址列表中隨機獲取一個ip地址嘗試進行網路連線,知道成功建立連線,這時候,客戶端的狀態就會變成CONNECTED。但是,在通常情況下,由於網路的不可靠性,時常會伴隨這網路中斷的出現,這時候,客戶端和服務端之間會出現斷開連線的情況,一旦出現這種情況,客戶端會嘗試去重新連線服務端,這時,客戶端的狀態會再一次變成CONNECTING,直到重新連線上伺服器後,客戶端的狀態又會再次變成CONNECTED。
因此,在通常情況下,客戶端的會話狀態始終在CONNECTED和CONNECTING之間變化。
出現CLOSE的情況:
- 會話超時
- 許可權檢查失敗
- 客戶端主動退出
會話屬性
會話session是ZooKeeper中的會話實體,代表了一個客戶端的會話。其定義在org.apache.zookeeper.server.SessionTracker.Session中。介面的定義如下:
public interface SessionTracker {
public static interface Session {
long getSessionId();
int getTimeout();
boolean isClosing();
}
}
其實現類為org.apache.zookeeper.server.SessionTrackerImpl.SessionImpl,程式碼如下:
public static class SessionImpl implements Session {
final long sessionId;
final int timeout;
boolean isClosing;
Object owner;
public long getSessionId() { return sessionId; }
public int getTimeout() { return timeout; }
public boolean isClosing() { return isClosing; }
}
從上述程式碼中可以看出,Session主要由以下三個屬性組成:
- sessionId,這是一個64位的long型整數,代表一個唯一的會話。每次客戶端建立會話的時候,ZooKeeper都會為其分配一個全域性唯一的一個sessionId。
- timeout,會話的超時時間。客戶端在構造ZooKeeper例項的時候,會為本次會話配置一個會話的超時時間。客戶端在向ZooKeeper伺服器傳送這個超時時間後,服務端會根據自己的配置最終確定本次會話的超時時間。
- isClosing,這是一個標誌位,表示本次會話是否已經關閉。在服務端的“會話超時檢查”執行緒在檢查到該會話已經失效的時候,會第一時間將這個標誌位置為true,只要這個標誌位為true,那麼服務端就不會在處理該會話的請求了。
會話初始化
在講session的初始化之前,首先先看看SessionTrackerImpl的實現
public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker
{
protected final ConcurrentHashMap<Long, SessionImpl> sessionsById =
new ConcurrentHashMap<Long, SessionImpl>();
private final ConcurrentMap<Long, Integer> sessionsWithTimeout;
private final AtomicLong nextSessionId = new AtomicLong();
//建構函式
public SessionTrackerImpl(SessionExpirer expirer,
ConcurrentMap<Long, Integer> sessionsWithTimeout, int tickTime,
long serverId, ZooKeeperServerListener listener)
{
super("SessionTracker", listener);
this.expirer = expirer;
this.sessionExpiryQueue = new ExpiryQueue<SessionImpl>(tickTime);
this.sessionsWithTimeout = sessionsWithTimeout;
//初始化sessionId
this.nextSessionId.set(initializeNextSession(serverId));
for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) {
addSession(e.getKey(), e.getValue());
}
EphemeralType.validateServerId(serverId);
}
}
從上面的建構函式中可以看出,對session進行初始化,是由方法initializeNextSession來完成的。那麼,下面我們就來看看該方法的具體實現細節。
public static long initializeNextSession(long id) {
long nextSid;
nextSid = (Time.currentElapsedTime() << 24) >>> 8;
nextSid = nextSid | (id << 56);
if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER) {
++nextSid; // this is an unlikely edge case, but check it just in case
}
return nextSid;
}
從上面的初始化方法中可以看出,該方法的入參是一個long的整數id,該id表示的是服務端的機器編號,這個引數是在部署ZooKeeper伺服器的時候,配置在myid檔案中的。
初始化的步驟如下:
- 生成系統當前時間的時間戳,64位的long型整數
- 將時間戳左移24位,在無符號右移8位
- 經過上一步,該時間戳的高8位全部為0,低56位不為0
- 接著,將機器編號左移56位,那麼機器編號的高8位不為0,低56位全為0
- 最後,將上面得到的機器編號和時間戳進行或運算
為什麼要這樣做運算?
時間戳經過這樣的運算之後,高8位全部位0,和機器編號的高8位進行或運算之後,其結果完全取決於機器編號的高8位;同理,低56位由時間戳決定。因此,可以看出,sessionId其實是由機器編號+時間戳唯一決定的。可以保證在單機環境下的唯一性。
之所以右移8位的時候採用無符號,是因為防止前面左移24位的時候,可能出現負數的情況,因此為了消除產生的負數的影響,採用無符號的右移。
會話啟用
在ZooKeeper的設計過程中,只要客戶端有請求傳送到服務端,那麼服務端就會觸發一次會話啟用。以下情況會發生的會話啟用:
- 客戶端向服務端傳送讀寫請求
- 如果客戶端在sessionTimeout / 3的時間內都沒有與服務端有過互動,那麼客戶端會主動的向服務端傳送ping請求(心跳檢測),服務端收到請求之後,會觸發一次會話啟用。
與會話啟用相關的方法和類由:
- org.apache.zookeeper.server.ExpiryQueue
- SessionTrackerImpl中的touchSession方法
首先我們來看看org.apache.zookeeper.server.ExpiryQueue
//記錄的是: Session -> 超時時間
private final ConcurrentHashMap<E, Long> elemMap =
new ConcurrentHashMap<E, Long>();
//記錄的是: 下一個超時時間 -> session的集合
private final ConcurrentHashMap<Long, Set<E>> expiryMap =
new ConcurrentHashMap<Long, Set<E>>();
private final AtomicLong nextExpirationTime = new AtomicLong();
private final int expirationInterval;
// 服務端計算超時時間的方法,expirationInterval預設等於tickTime,2000ms
private long roundToNextInterval(long time) {
return (time / expirationInterval + 1) * expirationInterval;
}
從上面的的ExpiryQueue程式碼中可以看出:
- 分桶策略中的buckets其實就是一個set集合,每個超時時間對應一個會話的集合
- 維護另外一個map,管理單個session對應的超時時間
- 相應的計算超時時間的方法
在SessionTrackerImpl中對touchSession方法描述:
synchronized public boolean touchSession(long sessionId, int timeout) {
SessionImpl s = sessionsById.get(sessionId);
if (s == null) {
logTraceTouchInvalidSession(sessionId, timeout);
return false;
}
if (s.isClosing()) {
logTraceTouchClosingSession(sessionId, timeout);
return false;
}
//會話啟用的主要執行邏輯
updateSessionExpiry(s, timeout);
return true;
}
執行流程如下:
- 根據sessionId獲取到對應的會話實體
- 判斷該會話是否已經關閉,如果是的話,那麼就不需要啟用,直接返回
- 從ExpiryQueue中的elemMap獲取本次會話以前的超時時間prevExpiryTime
- 計算新的超時時間,計算邏輯為roundToNextInterval方法
- 根據新的超時時間,將session實體放入到新的超時時間對應的expiryMap,並且設定新的elemMap
- 將session實體從以前的expiryMap中刪除,並且更新對應的elemMap
會話超時檢查
會話超時檢查是由SessionTracker負責的。程式碼如下:
public void run() {
try {
while (running) {
//sessionExpiryQueue是一個ExpiryQueue物件
long waitTime = sessionExpiryQueue.getWaitTime();
if (waitTime > 0) {
Thread.sleep(waitTime);
continue;
}
// sessionExpiryQueue.poll()是獲取expiryMap中超時的會話
for (SessionImpl s : sessionExpiryQueue.poll()) {
setSessionClosing(s.sessionId);
expirer.expire(s);
}
}
} catch (InterruptedException e) {
handleException(this.getName(), e);
}
LOG.info("SessionTrackerImpl exited loop!");
}
整個程式碼的核心邏輯在:
for (SessionImpl s : sessionExpiryQueue.poll()) {
setSessionClosing(s.sessionId); //將會話的isClosing標誌位置為true
expirer.expire(s); //會話清理過程, 在後面的文章中會詳細介紹該過程的細節
}
超時檢查的策略:逐個檢查會話bucket中剩下的,超過超時時間的會話。程式碼如下
public Set<E> poll() {
long now = Time.currentElapsedTime();
long expirationTime = nextExpirationTime.get();
if (now < expirationTime) {
return Collections.emptySet();
}
Set<E> set = null;
long newExpirationTime = expirationTime + expirationInterval;
if (nextExpirationTime.compareAndSet(
expirationTime, newExpirationTime)) {
set = expiryMap.remove(expirationTime);
}
if (set == null) {
return Collections.emptySet();
}
return set;
}