zookeeper原始碼 — 四、session建立
目錄
- session建立的主要過程
- 客戶端發起連線
- 服務端建立session
session建立的主要過程
用一張圖來說明session建立過程中client和server的互動
主要流程
- 服務端啟動,客戶端啟動
- 客戶端發起socket連線
- 服務端accept socket連線,socket連線建立
- 客戶端傳送ConnectRequest給server
- server收到後初始化ServerCnxn,代表一個和客戶端的連線,即session,server傳送ConnectResponse給client
- client處理ConnectResponse,session建立完成
客戶發起連線
和server建立socket連線
客戶端要發起連線要先啟動,不論是使用curator client還是zkClient,初始化的都是初始化org.apache.zookeeper.ZooKeeper#ZooKeeper
。
Zookeeper初始化的主要工作是初始化自己的一些關鍵元件
- Watcher,外部構造好傳入
- 初始化StaticHostProvider,決定客戶端選擇連線哪一個server
- ClientCnxn,客戶端網路通訊的元件,主要啟動邏輯就是啟動這個類
ClientCnxn包含兩個執行緒
- SendThread,負責client端訊息的傳送和接收
- EventThread,負責處理event
ClientCnxn初始化的過程就是初始化啟動這兩個執行緒,客戶端發起連線的主要邏輯在SendThread執行緒中
// org.apache.zookeeper.ClientCnxn.SendThread#run @Override public void run() { clientCnxnSocket.introduce(this,sessionId); clientCnxnSocket.updateNow(); clientCnxnSocket.updateLastSendAndHeard(); int to; long lastPingRwServer = System.currentTimeMillis(); final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds while (state.isAlive()) { try { // client是否連線到server,如果沒有連線到則連線server if (!clientCnxnSocket.isConnected()) { if(!isFirstConnect){ try { Thread.sleep(r.nextInt(1000)); } catch (InterruptedException e) { LOG.warn("Unexpected exception", e); } } // don't re-establish connection if we are closing if (closing || !state.isAlive()) { break; } // 這個裡面去連線server startConnect(); clientCnxnSocket.updateLastSendAndHeard(); } // 省略中間程式碼... clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this); // 省略中間程式碼... }
SendThread#run是一個while迴圈,只要client沒有被關閉會一直迴圈,每次迴圈判斷當前client是否連線到server,如果沒有則發起連線,發起連線呼叫了startConnect
private void startConnect() throws IOException {
state = States.CONNECTING;
InetSocketAddress addr;
if (rwServerAddress != null) {
addr = rwServerAddress;
rwServerAddress = null;
} else {
// 通過hostProvider來獲取一個server地址
addr = hostProvider.next(1000);
}
// 省略中間程式碼...
// 建立client與server的連線
clientCnxnSocket.connect(addr);
}
到這裡client發起了socket連線,server監聽的埠收到client的連線請求後和client建立連線。
通過一個request來建立session連線
socket連線建立後,client會向server傳送一個ConnectRequest來建立session連線。兩種情況會發送ConnectRequest
- 在上面的connect方法中會判斷是否socket已經建立成功,如果建立成功就會發送ConnectRequest
- 如果socket沒有立即建立成功(socket連線建立是非同步的),則傳送這個packet要延後到doTransport中
傳送ConnectRequest是在下面的方法中
// org.apache.zookeeper.ClientCnxn.SendThread#primeConnection
void primeConnection() throws IOException {
LOG.info("Socket connection established to "
+ clientCnxnSocket.getRemoteSocketAddress()
+ ", initiating session");
isFirstConnect = false;
long sessId = (seenRwServerBefore) ? sessionId : 0;
ConnectRequest conReq = new ConnectRequest(0, lastZxid,
sessionTimeout, sessId, sessionPasswd);
// 省略中間程式碼...
// 將conReq封裝為packet放入outgoingQueue等待發送
outgoingQueue.addFirst(new Packet(null, null, conReq,
null, null, readOnly));
}
clientCnxnSocket.enableReadWriteOnly();
if (LOG.isDebugEnabled()) {
LOG.debug("Session establishment request sent on "
+ clientCnxnSocket.getRemoteSocketAddress());
}
}
請求中帶的引數
- lastZxid:上一個事務的id
- sessionTimeout:client端配置的sessionTimeout
- sessId:sessionId,如果之前建立過連線取的是上一次連線的sessionId
- sessionPasswd:session的密碼
服務端建立session
和client建立socket連線
在server啟動的過程中除了會啟動用於選舉的網路元件還會啟動用於處理client請求的網路元件
org.apache.zookeeper.server.NIOServerCnxnFactory
主要啟動了三個執行緒:
- AcceptThread:用於接收client的連線請求,建立連線後交給SelectorThread執行緒處理
- SelectorThread:用於處理讀寫請求
- ConnectionExpirerThread:檢查session連線是否過期
client發起socket連線的時候,server監聽了該埠,接收到client的連線請求,然後把建立練級的SocketChannel放入佇列裡面,交給SelectorThread處理
// org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#addAcceptedConnection
public boolean addAcceptedConnection(SocketChannel accepted) {
if (stopped || !acceptedQueue.offer(accepted)) {
return false;
}
wakeupSelector();
return true;
}
建立session連線
SelectorThread是一個不斷迴圈的執行緒,每次迴圈都會處理剛剛建立的socket連線
// org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#run
while (!stopped) {
try {
select();
// 處理對立中的socket
processAcceptedConnections();
processInterestOpsUpdateRequests();
} catch (RuntimeException e) {
LOG.warn("Ignoring unexpected runtime exception", e);
} catch (Exception e) {
LOG.warn("Ignoring unexpected exception", e);
}
}
// org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#processAcceptedConnections
private void processAcceptedConnections() {
SocketChannel accepted;
while (!stopped && (accepted = acceptedQueue.poll()) != null) {
SelectionKey key = null;
try {
// 向該socket註冊讀事件
key = accepted.register(selector, SelectionKey.OP_READ);
// 建立一個NIOServerCnxn維護session
NIOServerCnxn cnxn = createConnection(accepted, key, this);
key.attach(cnxn);
addCnxn(cnxn);
// 省略中間程式碼...
}
}
說了這麼久,我們說的session究竟是什麼還沒有解釋,session中文翻譯是會話,在這裡就是zk的server和client維護的一個具有一些特別屬性的網路連線,網路連線這裡就是socket連線,一些特別的屬性包括
- sessionId:唯一標示一個會話
- sessionTimeout:這個連線的超時時間,超過這個時間server就會把連線斷開
所以session建立的兩步就是
- 建立socket連線
- client發起建立session請求,server建立一個例項來維護這個連線
server收到ConnectRequest之後,按照正常處理io的方式處理這個request,server端的主要操作是
- 反序列化為ConnectRequest
- 根據request中的sessionId來判斷是新的session連線還是session重連
- 如果是新連線
- 生成sessionId
- 建立新的SessionImpl並放入org.apache.zookeeper.server.SessionTrackerImpl#sessionExpiryQueue
- 封裝該請求為新的request在processorChain中傳遞,最後交給FinalRequestProcessor處理
- 如果是重連
- 關閉sessionId對應的原來的session
- 關閉原來的socket連線
- sessionImp會在sessionExpiryQueue中由於過期被清理
- 重新開啟一個session
- 將原來的sessionId設定到當前的NIOServerCnxn例項中,作為新的連線的sessionId
- 校驗密碼是否正確密碼錯誤的時候直接返回給客戶端,不可用的session
- 密碼正確的話,新建SessionImpl
- 返回給客戶端sessionId
- 關閉sessionId對應的原來的session
- 如果是新連線
總體流程是
其中有一個session生成演算法我們來看下
public static long initializeNextSession(long id) {
// sessionId是long型別,共8個位元組,64位
long nextSid;
// 取時間戳的的低40位作為初始化sessionId的第16-55位,這裡使用的是無符號右移,不會出現負數
nextSid = (Time.currentElapsedTime() << 24) >>> 8;
// 使用serverId(配置檔案中指定的myid)作為高8位
nextSid = nextSid | (id <<56);
// nextSid為long的最小值,這中情況不可能出現,這裡只是作為一個case列在這裡
if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER) {
++nextSid; // this is an unlikely edge case, but check it just in case
}
return nextSid;
}
初始化sessionId的組成
myid(1位元組)+擷取的時間戳低40位(5個位元組)+2個位元組(初始化都是0)
每個server再基於這個id不斷自增,這樣的演算法就保證了每個server的sessionId是全域性唯一的。
總結
session在zk框架中是一個重要概念,很多功能都依賴於session,比如臨時節點,session關閉後就自動刪除了。本文主要介紹了session的建立過程中client和server各自的處理方式