1. 程式人生 > >ZooKeeper原始碼學習筆記(2)--Standalone模式下的ZooKeeper

ZooKeeper原始碼學習筆記(2)--Standalone模式下的ZooKeeper

Server入口

Server的啟動程式碼位於 zkServer.sh 檔案中。

zkServer 指令和Java入口類的對應關係

zkServer.sh指令碼同 /etc/init.d/ 中的啟動指令碼比較類似,都是通過shell的case命令解析指令執行。具體指令如下:
1. start: 通過nohup後臺啟動org.apache.zookeeper.server.quorum.QuorumPeerMain
2. start-foreground: 前臺執行org.apache.zookeeper.server.quorum.QuorumPeerMain
3. stop: 殺死通過start啟動的程序
4. restart: 先後呼叫stop

start,起到重啟的作用
5. status: 通過org.apache.zookeeper.client.FourLetterWordMain檢視執行情況
6. upgrade: 通過org.apache.zookeeper.server.upgrade.UpgradeMain 進行線上更新
7. print-cmd: 輸出啟動start的命令

啟動邏輯

zkServer.sh中看到,ZooKeeper Server的入口類是QuorumPeerMain

判斷啟動模式

在入口函式中,根據在 zoo.cfg 檔案中配置的server個數,決定啟動Standalone(單機)模式或是Cluster(叢集)模式

如果在 zoo.cfg 檔案中沒有配置 server 則預設作為 Standalone 模式啟動,並將啟動引數傳遞給 ZooKeeperServerMain::main ,否則作為 Cluster 模式進行啟動。

在本節中,暫時不考慮Cluster模式,只關心 Standalone 模式下的 Server 執行邏輯。

Standalone 模式的啟動流程

如上所言,在Standalone模式下,QuorumPeerMain會將啟動引數傳遞給ZooKeeperServerMain::main

ZooKeeperServerMain::main裡,ZooKeeper在解析完成config檔案後,呼叫runFromConfig

初始化Server。

public void runFromConfig(ServerConfig config) throws IOException {
    final ZooKeeperServer zkServer = new ZooKeeperServer();
    // Registers shutdown handler which will be used to know the
    // server error or shutdown state changes.
    final CountDownLatch shutdownLatch = new CountDownLatch(1);
    zkServer.registerServerShutdownHandler( new ZooKeeperServerShutdownHandler(shutdownLatch));
    cnxnFactory = ServerCnxnFactory.createFactory();
    cnxnFactory.configure(config.getClientPortAddress(),
                    config.getMaxClientCnxns());
    cnxnFactory.startup(zkServer);
    shutdownLatch.await();
    shutdown();

    cnxnFactory.join();
    if (zkServer.canShutdown()) {
        zkServer.shutdown();
    }
}

cnxnFactory.startup

走讀原始碼發現,cnxnFactory.startup方法中啟動了三個執行緒,分別是NIOServerCnxnFactory(Runnable啟動), PrepRequestProcessor , SyncRequestProcessor

執行緒啟動完畢後,進入shutdownLatch.await()的等待狀態, 阻塞主執行緒,避免程式退出。

退出邏輯在ZooKeeperServerShutdownHandler::handle中可以看到:

 if (state == State.ERROR || state == State.SHUTDOWN) {
  shutdownLatch.countDown();
}

當ZooKeeperServer處於異常或關閉狀態,shutdownLatch.countDown();之後,shutdownLatch.await()指令完成,主執行緒進入關閉流程。

Server Sokect

學習筆記(1)中,我們看到Client端在SendThread中同伺服器保持一個socket長連結,與之對應的,在Server端也會有一個ServerSocket負責接收Client傳送過來的請求。

String serverCnxnFactoryName = System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
if (serverCnxnFactoryName == null) {
  serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
}

runFromConfig中構造了一個ServerCnxnFactory物件,這個物件預設是一個NIOServerCnxnFactory,對應Client端中的ClientCnxnSocketNIO類。

@Override
public void configure(InetSocketAddress addr, int maxcc) throws IOException {
  thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr);
}

@Override
public void startup(ZooKeeperServer zks) throws IOException,
            InterruptedException {
  start();
  setZooKeeperServer(zks);
  zks.startdata();
  zks.startup();
}

NIOServerCnxnFactory類本身繼承了Runnable介面,在 NIOServerCnxnFactory::startup 中啟動一個Daemon執行緒響應來自Client的請求資訊

響應socket請求

Client 端會中有一個SendThread 執行緒專門負責同 Server 的socket 連結。同樣,在Server端的NIOServerCnxnFactory類中也有一個獨立執行緒,專門負責讀取Client傳送來的資料。

Socket通訊

如圖,在Client端成功和Server端建立連結之後,Client端的使用者請求會被 ClientCnxnSocketNIO 寫入socket中,當NIOServerCnxnFactory 讀取並處理完畢後,再通過socket進行寫回,得到response。

對於大部分資料請求,會在doIO中逐步解析成一個Packet物件,再獲取Request請求,傳送給ZooKeeperServer::submitRequest進行消費,具體的消費路徑會在後面進行講解,這裡只簡單介紹 socket 的通訊邏輯。

Watcher的實現

ServerCnxn實現了Watcher介面,如果判斷request中包含了watcher,則會將ServerCnxn加入監聽列表中,當指定節點發生變化時,回撥ServerCnxn的對應方法,通過sendResponse通知Client節點資訊發生改變

ZooKeeper的資料結構

ZooKeeper是一個基於節點模型的分散式協調框架,使用類似檔案路徑的節點進行資料儲存。

在執行過程中,節點資訊都會被全部載入到記憶體中,每個節點都會被構造成一個DataNode 物件,被稱為znode。

三層資料快取層

znode節點會由於使用者的讀寫操作頻繁發生變化,為了提升資料的訪問效率,ZooKeeper中有一個三層的資料緩衝層用於存放節點資料。

三層資料快取層

outstandingChanges

outstandingChanges 位於ZooKeeperServer 中,用於存放剛進行更改還沒有同步到ZKDatabase中的節點資訊

ZKDatabase

ZKDatabase 用於管理ZooKeeper的中的節點資料。

ZKDatabase中有一個DataTree物件,在DataTree中維護一個叫做nodes的ConcurrentHashMap,用於在記憶體中持有完整的節點資訊。

cnxnFactory.startup的時候,系統會通過zkDb.loadDatabase()將序列化存放的節點資訊還原到記憶體中

Disk files

Disk file 由兩部分組成,一個是 FileSnap, 一個是FileTxnLog。顧名思義,FileSnap 用於存放基於某個時間點狀態的 ZooKeeper 節點資訊快照,FileTxnLog 用於存放資料對節點資訊的具體更改操作。

ZKDatabase 的資料持久化

ZooKeeper 通過維護節點資訊的一致性來完成分散式應用的協調工作。

關於 Snapshot 和 Transaction

同 Hadoop 類似,在ZooKeeper中同樣存在Snapshot和Transaction的概念。

Snapshot 和 Transaction

Snapshot 對應某個時間點資料的完整狀態,Transaction 代表某條對資料的修正指令。

當Snapshot A 執行完指令後,他的資料狀態得到更新,成為 Snapshot B。

當服務端異常退出或重啟時,還原資料節點到指定狀態有兩種方案,一種是再次執行每一條Transaction,另一種是先將資料節點還原到一個正確的Snapshot,再執行從這個Snapshot之後的每一條Transaction。第一種方案需要儲存從首次啟動開始的每一條指令,同時執行時間隨指令條數線性增長,影響還原效率。因此我們通常都採用第二種方案snapshot+transaction進行資料還原。

資料載入流程

如果當前並非首次啟動ZooKeeper,則我們需要將關閉前的ZooKeeper資料進行還原。

根據前一小節,我們知道了Snapshot和Transaction的關係,在返回原始碼,我們看到在ZooKeeperServer::loadData()會呼叫以下程式碼

public long loadDataBase() throws IOException {
  PlayBackListener listener=new PlayBackListener(){
    public void onTxnLoaded(TxnHeader hdr,Record txn){
      Request r = new Request(null, 0, hdr.getCxid(),hdr.getType(), null, null);
      addCommittedProposal(r);
     }
  };
  long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);
  return zxid;
}

public long restore(DataTree dt, Map<Long, Integer> sessions, 
            PlayBackListener listener) throws IOException {
  snapLog.deserialize(dt, sessions);
  FileTxnLog txnLog = new FileTxnLog(dataDir);
  TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
  long highestZxid = dt.lastProcessedZxid;
  TxnHeader hdr;
  while (true) {
    hdr = itr.getHeader();
      if (hdr == null) {
         return dt.lastProcessedZxid;
      }
      processTransaction(hdr,dt,sessions, itr.getTxn());
      listener.onTxnLoaded(hdr, itr.getTxn());
      if (!itr.next()) 
        break;
    }
  } finally {
    if (itr != null) {
      itr.close();
    }
  }
  return highestZxid;
}

snapLog是一個FileTxnSnapLog類,他由一個FileSnap和一個FileTxnLog組成。

FileSnap 是快照檔案的工具類,擁有serializedeserialize方法,可以將DataTree物件進行序列化和反序列化。

FileTxnLog是Transaction 日誌的工具類,通過txnLog.read,我們拿到Snapshot檔案發生後的Transaction 日誌,通過processTransaction將事務應用到DataTree上,還原初態。

通過loadDatabase()我們成功的將磁碟檔案儲存的節點資訊重新載入到了記憶體中,從這個時候開始我們可以對到來的socket進行消費。

處理 session 請求

在響應socket請求的小節中,我們看到在NIOServerCnxnFactory中啟動了一個Daemon執行緒,並在while迴圈中獲取socket請求資訊,然後分發到doIO中執行。

doIO邏輯

ZooKeeper將每一個 socket 的連結,都認為是一個session, 並擁有一個超時時間。請求會被包裝成一個NIOServerCnxn物件,當判斷session是首次connect到ZooKeeperServer的時候,先讀取connect資訊,在SessionTrackerImpl中維護當前存活的session佇列。

SessionTrackerImpl 是一個獨立執行緒,專門用於檢測 session 的存活狀態。

其他非首次連線的socket資訊會通過readRequest進行消費。

RequestProcessor 任務鏈

RequestProcessor任務鏈

ZooKeeperServer::setupRequestProcessors中建立了三個RequestProcessor物件,分別是 FinalRequestProcessor , SyncRequestProcessor 和 PrepRequestProcessor ,其中PrepRequestProcessor 和 SyncRequestProcessor 類分別繼承自 Thread 類,作為獨立執行緒執行。

readRequest通過反序列化Packet類,提取出Request資訊,然後呼叫ZooKeeperServer::submitRequest進行資料處理。

public void submitRequest(Request si) {
  firstProcessor.processRequest(si);
}

對於Standalone 模式的ZooKeeperServer,他的firstPrcessor就是PrepRequestProcessor

PrepRequestProcessor

PrepRequestProcessor是整個任務鏈的起點。

PrepRequestProcessor::submitRequest不會立即處理request請求,而是將request加入執行佇列submittedRequests中,等待執行。

消費submittedRequests

PrepRequestProcessor自身的獨立執行緒不斷從佇列中拉去Request物件,呼叫pRequest(request)。在pRequest中,根據Request的不同種類,將Request轉變為不同的Record物件,通過addChangeRecord將ChangeRecord加入ZooKeeperServer.outstandingChanges 中,此時節點資料並沒有同步到DataTree中。

根據節點的三層快取模型,在獲取節點資訊時,會首先從outstandingChangesForPath中獲取資訊,當沒有找到對應的節點資訊時,再通過zkDb::getNode獲取。

SyncRequestProcessor

SyncRequestProcessor 作為 PrepRequestProcessor 的下游消費者,負責將Transaction寫入TxnLog中,並定時構建快照檔案。

SyncRequestProcessor

zkDatabase.append中會將Request寫入Transaction Log File,如果發現當前的Txn條數超過閾值,則啟動一個快照執行緒,將DataTree作為快照例項到磁碟中。

takeSnapshot中,通過序列化當前的DataTree結構,將snapShot儲存到磁碟上

SyncRequestProcessor的處理條數超過閾值1000條時,呼叫flush()命令,將任務逐個傳遞給下游的RequestProcessor進行處理。

FinalRequestProcessor

FinalRequestProcessor作為Standalone模式下的任務鏈終點,主要完成以下工作。

while (!zks.outstandingChanges.isEmpty() && zks.outstandingChanges.get(0).zxid <= request.zxid) {
    ChangeRecord cr = zks.outstandingChanges.remove(0);
    if (cr.zxid < request.zxid) {
        LOG.warn("Zxid outstanding " + cr.zxid + " is less than current " + request.zxid);
    }
    if (zks.outstandingChangesForPath.get(cr.path) == cr) {
        zks.outstandingChangesForPath.remove(cr.path);
    }
}
if (request.hdr != null) {
    TxnHeader hdr = request.hdr;
    Record txn = request.txn;
    rc = zks.processTxn(hdr, txn);
}
  1. 呼叫zks.processTxn(),將請求資訊合併到DataTree
  2. 清理掉zks.outstandingChanges中的冗餘資料,防止outstandingChanges無限增長。

任務鏈總結

ZooKeeper Server中通過三層的任務鏈實現對請求的處理過程。

第一層負責在outstandingChanges中構建一個臨時的節點物件,便於後續請求能夠快速獲取對應節點最新狀態

第二層負責將請求資料轉為Transaction日誌,並記錄到磁碟中,便於重啟後的節點資料還原。同時還會根據日誌操作定時儲存快照。

第三層負責批量將請求資料合併到DataTree中,同時清除第一層臨時構建的節點物件。

總結

ZooKeeper Server使用DataTree在記憶體中持有所有節點資訊, 在磁碟中通過Snapshot 和 TxnFile 儲存歷史節點資料。

響應請求時,Server 將請求資料分發給一個RequestProcessor任務鏈進行消費。

在任務鏈中,通過一個單執行緒保證資料的執行緒安全和一致性。

同樣由於在 ZooKeeper 中是通過單執行緒保證資料執行緒安全,在大訪問量級下的執行效率值得思考,之後可以看看Cluster下的ZooKeeper有沒有對這一塊做出優化。