1. 程式人生 > >zookeeper原始碼 — 四、session建立

zookeeper原始碼 — 四、session建立

目錄

  • session建立的主要過程
  • 客戶端發起連線
  • 服務端建立session

session建立的主要過程

用一張圖來說明session建立過程中client和server的互動

主要流程

  • 服務端啟動,客戶端啟動
  • 客戶端發起socket連線
  • 服務端accept socket連線,socket連線建立
  • 客戶端傳送ConnectRequest給server
  • server收到後初始化ServerCnxn,代表一個和客戶端的連線,即session,server傳送ConnectResponse給client
  • client處理ConnectResponse,session建立完成

客戶發起連線

和server建立socket連線

客戶端要發起連線要先啟動,不論是使用curator client還是zkClient,初始化的都是初始化org.apache.zookeeper.ZooKeeper#ZooKeeper

Zookeeper初始化的主要工作是初始化自己的一些關鍵元件

  • Watcher,外部構造好傳入
  • 初始化StaticHostProvider,決定客戶端選擇連線哪一個server
  • ClientCnxn,客戶端網路通訊的元件,主要啟動邏輯就是啟動這個類

ClientCnxn包含兩個執行緒

  • SendThread,負責client端訊息的傳送和接收
  • EventThread,負責處理event

ClientCnxn初始化的過程就是初始化啟動這兩個執行緒,客戶端發起連線的主要邏輯在SendThread執行緒中

// org.apache.zookeeper.ClientCnxn.SendThread#run
@Override
public void run() {
    clientCnxnSocket.introduce(this,sessionId);
    clientCnxnSocket.updateNow();
    clientCnxnSocket.updateLastSendAndHeard();
    int to;
    long lastPingRwServer = System.currentTimeMillis();
    final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
    while (state.isAlive()) {
        try {
            // client是否連線到server,如果沒有連線到則連線server
            if (!clientCnxnSocket.isConnected()) {
                if(!isFirstConnect){
                    try {
                        Thread.sleep(r.nextInt(1000));
                    } catch (InterruptedException e) {
                        LOG.warn("Unexpected exception", e);
                    }
                }
                // don't re-establish connection if we are closing
                if (closing || !state.isAlive()) {
                    break;
                }
                // 這個裡面去連線server
                startConnect();
                clientCnxnSocket.updateLastSendAndHeard();
            }

            // 省略中間程式碼...
            clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
            // 省略中間程式碼...
}

SendThread#run是一個while迴圈,只要client沒有被關閉會一直迴圈,每次迴圈判斷當前client是否連線到server,如果沒有則發起連線,發起連線呼叫了startConnect

private void startConnect() throws IOException {
    state = States.CONNECTING;

    InetSocketAddress addr;
    if (rwServerAddress != null) {
        addr = rwServerAddress;
        rwServerAddress = null;
    } else {
        // 通過hostProvider來獲取一個server地址
        addr = hostProvider.next(1000);
    }
    // 省略中間程式碼...

    // 建立client與server的連線
    clientCnxnSocket.connect(addr);
}

到這裡client發起了socket連線,server監聽的埠收到client的連線請求後和client建立連線。

通過一個request來建立session連線

socket連線建立後,client會向server傳送一個ConnectRequest來建立session連線。兩種情況會發送ConnectRequest

  • 在上面的connect方法中會判斷是否socket已經建立成功,如果建立成功就會發送ConnectRequest
  • 如果socket沒有立即建立成功(socket連線建立是非同步的),則傳送這個packet要延後到doTransport中

傳送ConnectRequest是在下面的方法中

// org.apache.zookeeper.ClientCnxn.SendThread#primeConnection
void primeConnection() throws IOException {
    LOG.info("Socket connection established to "
             + clientCnxnSocket.getRemoteSocketAddress()
             + ", initiating session");
    isFirstConnect = false;
    long sessId = (seenRwServerBefore) ? sessionId : 0;
    ConnectRequest conReq = new ConnectRequest(0, lastZxid,
                                               sessionTimeout, sessId, sessionPasswd);
        // 省略中間程式碼...
        // 將conReq封裝為packet放入outgoingQueue等待發送
        outgoingQueue.addFirst(new Packet(null, null, conReq,
                                          null, null, readOnly));
    }
    clientCnxnSocket.enableReadWriteOnly();
    if (LOG.isDebugEnabled()) {
        LOG.debug("Session establishment request sent on "
                  + clientCnxnSocket.getRemoteSocketAddress());
    }
}

請求中帶的引數

  • lastZxid:上一個事務的id
  • sessionTimeout:client端配置的sessionTimeout
  • sessId:sessionId,如果之前建立過連線取的是上一次連線的sessionId
  • sessionPasswd:session的密碼

服務端建立session

和client建立socket連線

在server啟動的過程中除了會啟動用於選舉的網路元件還會啟動用於處理client請求的網路元件

org.apache.zookeeper.server.NIOServerCnxnFactory

主要啟動了三個執行緒:

  • AcceptThread:用於接收client的連線請求,建立連線後交給SelectorThread執行緒處理
  • SelectorThread:用於處理讀寫請求
  • ConnectionExpirerThread:檢查session連線是否過期

client發起socket連線的時候,server監聽了該埠,接收到client的連線請求,然後把建立練級的SocketChannel放入佇列裡面,交給SelectorThread處理

// org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#addAcceptedConnection
public boolean addAcceptedConnection(SocketChannel accepted) {
    if (stopped || !acceptedQueue.offer(accepted)) {
        return false;
    }
    wakeupSelector();
    return true;
}

建立session連線

SelectorThread是一個不斷迴圈的執行緒,每次迴圈都會處理剛剛建立的socket連線

// org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#run
while (!stopped) {
    try {
        select();
        //  處理對立中的socket
        processAcceptedConnections();
        processInterestOpsUpdateRequests();
    } catch (RuntimeException e) {
        LOG.warn("Ignoring unexpected runtime exception", e);
    } catch (Exception e) {
        LOG.warn("Ignoring unexpected exception", e);
    }
}

// org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#processAcceptedConnections
private void processAcceptedConnections() {
    SocketChannel accepted;
    while (!stopped && (accepted = acceptedQueue.poll()) != null) {
        SelectionKey key = null;
        try {
            // 向該socket註冊讀事件
            key = accepted.register(selector, SelectionKey.OP_READ);
            // 建立一個NIOServerCnxn維護session
            NIOServerCnxn cnxn = createConnection(accepted, key, this);
            key.attach(cnxn);
            addCnxn(cnxn);
        // 省略中間程式碼...
    }
}

說了這麼久,我們說的session究竟是什麼還沒有解釋,session中文翻譯是會話,在這裡就是zk的server和client維護的一個具有一些特別屬性的網路連線,網路連線這裡就是socket連線,一些特別的屬性包括

  • sessionId:唯一標示一個會話
  • sessionTimeout:這個連線的超時時間,超過這個時間server就會把連線斷開

所以session建立的兩步就是

  • 建立socket連線
  • client發起建立session請求,server建立一個例項來維護這個連線

server收到ConnectRequest之後,按照正常處理io的方式處理這個request,server端的主要操作是

  • 反序列化為ConnectRequest
  • 根據request中的sessionId來判斷是新的session連線還是session重連
    • 如果是新連線
      • 生成sessionId
      • 建立新的SessionImpl並放入org.apache.zookeeper.server.SessionTrackerImpl#sessionExpiryQueue
      • 封裝該請求為新的request在processorChain中傳遞,最後交給FinalRequestProcessor處理
    • 如果是重連
      • 關閉sessionId對應的原來的session
        • 關閉原來的socket連線
        • sessionImp會在sessionExpiryQueue中由於過期被清理
      • 重新開啟一個session
        • 將原來的sessionId設定到當前的NIOServerCnxn例項中,作為新的連線的sessionId
        • 校驗密碼是否正確密碼錯誤的時候直接返回給客戶端,不可用的session
        • 密碼正確的話,新建SessionImpl
        • 返回給客戶端sessionId

總體流程是

其中有一個session生成演算法我們來看下

public static long initializeNextSession(long id) {
    // sessionId是long型別,共8個位元組,64位
    long nextSid;
    // 取時間戳的的低40位作為初始化sessionId的第16-55位,這裡使用的是無符號右移,不會出現負數
    nextSid = (Time.currentElapsedTime() << 24) >>> 8;
    // 使用serverId(配置檔案中指定的myid)作為高8位
    nextSid =  nextSid | (id <<56);
    // nextSid為long的最小值,這中情況不可能出現,這裡只是作為一個case列在這裡
    if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER) {
        ++nextSid;  // this is an unlikely edge case, but check it just in case
    }
    return nextSid;
}

初始化sessionId的組成

myid(1位元組)+擷取的時間戳低40位(5個位元組)+2個位元組(初始化都是0)

每個server再基於這個id不斷自增,這樣的演算法就保證了每個server的sessionId是全域性唯一的。

總結

session在zk框架中是一個重要概念,很多功能都依賴於session,比如臨時節點,session關閉後就自動刪除了。本文主要介紹了session的建立過程中client和server各自的處理方式