Zookeeper原始碼分析(三)——session管理
阿新 • • 發佈:2019-09-13
一、session的建立
//ZookeeperServer.java //第617行 long createSession(ServerCnxn cnxn, byte passwd[], int timeout) { long sessionId = sessionTracker.createSession(timeout); //省略部分程式碼 } //SessionTrackerImpl.java //第236行 synchronized public long createSession(int sessionTimeout) { addSession(nextSessionId, sessionTimeout); return nextSessionId++; } //SessionTrackerImpl.java //第241行 synchronized public void addSession(long id, int sessionTimeout) { //儲存sessionId和過期時間的關係 sessionsWithTimeout.put(id, sessionTimeout); //如果session不存在,就新建一個 if (sessionsById.get(id) == null) { SessionImpl s = new SessionImpl(id, sessionTimeout, 0); sessionsById.put(id, s); //省略日誌列印 } else { //省略日誌列印 } //將session按照一定規則聚合 touchSession(id, sessionTimeout); } //SessionTrackerImpl.java //第166行 synchronized public boolean touchSession(long sessionId, int timeout) { if (LOG.isTraceEnabled()) { //省略日誌列印 } SessionImpl s = sessionsById.get(sessionId); // Return false, if the session doesn't exists or marked as closing if (s == null || s.isClosing()) { return false; } long expireTime = roundToInterval(Time.currentElapsedTime() + timeout); //如果當前session的過期時間大於這個值,不需要操作 if (s.tickTime >= expireTime) { // Nothing needs to be done return true; } //將session從舊的桶中移出,並放入(剩餘超時時間更長的)新的桶 SessionSet set = sessionSets.get(s.tickTime); if (set != null) { set.sessions.remove(s); } s.tickTime = expireTime; set = sessionSets.get(s.tickTime); if (set == null) { set = new SessionSet(); sessionSets.put(expireTime, set); } set.sessions.add(s); return true; } //SessionTrackerImpl.java //第89行 private long roundToInterval(long time) { //expirationInterval就是zookeeper的心跳週期(tickTime),預設值是3000 //這段計算的意思是將過期時間每3000ms分一個段 //比如200ms、500ms、3000ms返回0,3001ms、5000ms返回3000 //由於這裡的time是加了Time.currentElapsedTime()的,所以不會出現0的情況 return (time / expirationInterval + 1) * expirationInterval; }
二、session啟用
//ZookeeperServer.java //第728行 public void submitRequest(Request si) { //省略部分程式碼 try { touch(si.cnxn); //省略部分程式碼 } catch (MissingSessionException e) { if (LOG.isDebugEnabled()) { LOG.debug("Dropping request: " + e.getMessage()); } } catch (RequestProcessorException e) { LOG.error("Unable to process request:" + e.getMessage(), e); } } //ZookeeperServer.java //第368行 void touch(ServerCnxn cnxn) throws MissingSessionException { //省略部分程式碼 if (!sessionTracker.touchSession(id, to)) { throw new MissingSessionException( "No session with sessionid 0x" + Long.toHexString(id) + " exists, probably expired and removed"); } }
zookeeper服務端響應客戶端的請求時,都會呼叫submitRequest方法,最終會呼叫到touchSession方法,這裡會將session移動到新的桶中
三、session的過期
//SessionTrackerImpl.java
//第142行
synchronized public void run() {
try {
while (running) {
currentTime = Time.currentElapsedTime();
//在SessionTrackerImpl初始化的時候,會給nextExpirationTime賦一個初值
//nextExpirationTime = roundToInterval(Time.currentElapsedTime());
if (nextExpirationTime > currentTime) {
this.wait(nextExpirationTime - currentTime);
continue;
}
SessionSet set;
//如果到達了過期時間,則移除對應桶中的所有session
set = sessionSets.remove(nextExpirationTime);
if (set != null) {
for (SessionImpl s : set.sessions) {
setSessionClosing(s.sessionId);
expirer.expire(s);
}
}
nextExpirationTime += expirationInterval;
}
} catch (InterruptedException e) {
handleException(this.getName(), e);
}
LOG.info("SessionTrackerImpl exited loop!");
}
//ZookeeperServer.java
//第353行
public void expire(Session session) {
long sessionId = session.getSessionId();
LOG.info("Expiring session 0x" + Long.toHexString(sessionId)
+ ", timeout of " + session.getTimeout() + "ms exceeded");
close(sessionId);
}
//ZookeeperServer.java
//第329行
private void close(long sessionId) {
submitRequest(null, sessionId, OpCode.closeSession, 0, null, null);
}
//PrepRequestProcessor.java
//第294行
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)
throws KeeperException, IOException, RequestProcessorException {
request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
Time.currentWallTime(), type);
switch (type) {
//省略程式碼
case OpCode.closeSession:
// We don't want to do this check since the session expiration thread
// queues up this operation without being the session owner.
// this request is the last of the session so it should be ok
//zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
HashSet<String> es = zks.getZKDatabase().getEphemerals(request.sessionId);
//刪除session關聯的所有臨時節點
synchronized (zks.outstandingChanges) {
//zookeeper的大部分操作都會記錄並放入列表
for (ChangeRecord c : zks.outstandingChanges) {
//c.stat == null表示這是刪除操作
if (c.stat == null) {
es.remove(c.path);
} else if (c.stat.getEphemeralOwner() == request.sessionId) {
es.add(c.path);
}
}
for (String path2Delete : es) {
addChangeRecord(new ChangeRecord(request.hdr.getZxid(),
path2Delete, null, 0, null));
}
zks.sessionTracker.setSessionClosing(request.sessionId);
}
LOG.info("Processed session termination for sessionid: 0x"
+ Long.toHexString(request.sessionId));
break;
//省略程式