ZooKeeper原始碼學習筆記(2)--Standalone模式下的ZooKeeper
Server入口
Server的啟動程式碼位於 zkServer.sh
檔案中。
zkServer.sh
指令碼同 /etc/init.d/
中的啟動指令碼比較類似,都是通過shell的case
命令解析指令執行。具體指令如下:
1. start
: 通過nohup後臺啟動org.apache.zookeeper.server.quorum.QuorumPeerMain
2. start-foreground
: 前臺執行org.apache.zookeeper.server.quorum.QuorumPeerMain
3. stop
: 殺死通過start
啟動的程序
4. restart
: 先後呼叫stop
start
,起到重啟的作用 5.
status
: 通過org.apache.zookeeper.client.FourLetterWordMain
檢視執行情況 6.
upgrade
: 通過org.apache.zookeeper.server.upgrade.UpgradeMain
進行線上更新 7.
print-cmd
: 輸出啟動start
的命令
啟動邏輯
從zkServer.sh
中看到,ZooKeeper Server的入口類是QuorumPeerMain
。
在入口函式中,根據在 zoo.cfg
檔案中配置的server個數,決定啟動Standalone(單機)模式或是Cluster(叢集)模式
如果在 zoo.cfg
檔案中沒有配置 server 則預設作為 Standalone 模式啟動,並將啟動引數傳遞給 ZooKeeperServerMain::main
,否則作為 Cluster 模式進行啟動。
在本節中,暫時不考慮Cluster模式,只關心 Standalone 模式下的 Server 執行邏輯。
Standalone 模式的啟動流程
如上所言,在Standalone模式下,QuorumPeerMain
會將啟動引數傳遞給ZooKeeperServerMain::main
。
ZooKeeperServerMain::main
裡,ZooKeeper在解析完成config檔案後,呼叫runFromConfig
public void runFromConfig(ServerConfig config) throws IOException {
final ZooKeeperServer zkServer = new ZooKeeperServer();
// Registers shutdown handler which will be used to know the
// server error or shutdown state changes.
final CountDownLatch shutdownLatch = new CountDownLatch(1);
zkServer.registerServerShutdownHandler( new ZooKeeperServerShutdownHandler(shutdownLatch));
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns());
cnxnFactory.startup(zkServer);
shutdownLatch.await();
shutdown();
cnxnFactory.join();
if (zkServer.canShutdown()) {
zkServer.shutdown();
}
}
走讀原始碼發現,cnxnFactory.startup
方法中啟動了三個執行緒,分別是NIOServerCnxnFactory
(Runnable啟動), PrepRequestProcessor
, SyncRequestProcessor
。
執行緒啟動完畢後,進入shutdownLatch.await()
的等待狀態, 阻塞主執行緒,避免程式退出。
退出邏輯在ZooKeeperServerShutdownHandler::handle
中可以看到:
if (state == State.ERROR || state == State.SHUTDOWN) {
shutdownLatch.countDown();
}
當ZooKeeperServer處於異常或關閉狀態,shutdownLatch.countDown();
之後,shutdownLatch.await()
指令完成,主執行緒進入關閉流程。
Server Sokect
在學習筆記(1)中,我們看到Client端在SendThread
中同伺服器保持一個socket長連結,與之對應的,在Server端也會有一個ServerSocket負責接收Client傳送過來的請求。
String serverCnxnFactoryName = System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
if (serverCnxnFactoryName == null) {
serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
}
在runFromConfig
中構造了一個ServerCnxnFactory物件,這個物件預設是一個NIOServerCnxnFactory
,對應Client端中的ClientCnxnSocketNIO
類。
@Override
public void configure(InetSocketAddress addr, int maxcc) throws IOException {
thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr);
}
@Override
public void startup(ZooKeeperServer zks) throws IOException,
InterruptedException {
start();
setZooKeeperServer(zks);
zks.startdata();
zks.startup();
}
NIOServerCnxnFactory
類本身繼承了Runnable介面,在 NIOServerCnxnFactory::startup
中啟動一個Daemon執行緒響應來自Client的請求資訊
響應socket請求
Client 端會中有一個SendThread
執行緒專門負責同 Server 的socket 連結。同樣,在Server端的NIOServerCnxnFactory類中也有一個獨立執行緒,專門負責讀取Client傳送來的資料。
如圖,在Client端成功和Server端建立連結之後,Client端的使用者請求會被 ClientCnxnSocketNIO 寫入socket中,當NIOServerCnxnFactory 讀取並處理完畢後,再通過socket進行寫回,得到response。
對於大部分資料請求,會在doIO
中逐步解析成一個Packet
物件,再獲取Request
請求,傳送給ZooKeeperServer::submitRequest
進行消費,具體的消費路徑會在後面進行講解,這裡只簡單介紹 socket 的通訊邏輯。
Watcher的實現
ServerCnxn
實現了Watcher
介面,如果判斷request中包含了watcher,則會將ServerCnxn
加入監聽列表中,當指定節點發生變化時,回撥ServerCnxn
的對應方法,通過sendResponse通知Client節點資訊發生改變
ZooKeeper的資料結構
ZooKeeper是一個基於節點模型的分散式協調框架,使用類似檔案路徑的節點進行資料儲存。
在執行過程中,節點資訊都會被全部載入到記憶體中,每個節點都會被構造成一個DataNode 物件,被稱為znode。
三層資料快取層
znode節點會由於使用者的讀寫操作頻繁發生變化,為了提升資料的訪問效率,ZooKeeper中有一個三層的資料緩衝層用於存放節點資料。
outstandingChanges
outstandingChanges 位於ZooKeeperServer 中,用於存放剛進行更改還沒有同步到ZKDatabase中的節點資訊
ZKDatabase
ZKDatabase 用於管理ZooKeeper的中的節點資料。
ZKDatabase中有一個DataTree物件,在DataTree中維護一個叫做nodes的ConcurrentHashMap
,用於在記憶體中持有完整的節點資訊。
在cnxnFactory.startup
的時候,系統會通過zkDb.loadDatabase()
將序列化存放的節點資訊還原到記憶體中
Disk files
Disk file 由兩部分組成,一個是 FileSnap
, 一個是FileTxnLog
。顧名思義,FileSnap
用於存放基於某個時間點狀態的 ZooKeeper 節點資訊快照,FileTxnLog
用於存放資料對節點資訊的具體更改操作。
ZKDatabase 的資料持久化
ZooKeeper 通過維護節點資訊的一致性來完成分散式應用的協調工作。
關於 Snapshot 和 Transaction
同 Hadoop 類似,在ZooKeeper中同樣存在Snapshot和Transaction的概念。
Snapshot 對應某個時間點資料的完整狀態,Transaction 代表某條對資料的修正指令。
當Snapshot A 執行完指令後,他的資料狀態得到更新,成為 Snapshot B。
當服務端異常退出或重啟時,還原資料節點到指定狀態有兩種方案,一種是再次執行每一條Transaction,另一種是先將資料節點還原到一個正確的Snapshot,再執行從這個Snapshot之後的每一條Transaction。第一種方案需要儲存從首次啟動開始的每一條指令,同時執行時間隨指令條數線性增長,影響還原效率。因此我們通常都採用第二種方案snapshot+transaction進行資料還原。
資料載入流程
如果當前並非首次啟動ZooKeeper,則我們需要將關閉前的ZooKeeper資料進行還原。
根據前一小節,我們知道了Snapshot和Transaction的關係,在返回原始碼,我們看到在ZooKeeperServer::loadData()
會呼叫以下程式碼
public long loadDataBase() throws IOException {
PlayBackListener listener=new PlayBackListener(){
public void onTxnLoaded(TxnHeader hdr,Record txn){
Request r = new Request(null, 0, hdr.getCxid(),hdr.getType(), null, null);
addCommittedProposal(r);
}
};
long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);
return zxid;
}
public long restore(DataTree dt, Map<Long, Integer> sessions,
PlayBackListener listener) throws IOException {
snapLog.deserialize(dt, sessions);
FileTxnLog txnLog = new FileTxnLog(dataDir);
TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
long highestZxid = dt.lastProcessedZxid;
TxnHeader hdr;
while (true) {
hdr = itr.getHeader();
if (hdr == null) {
return dt.lastProcessedZxid;
}
processTransaction(hdr,dt,sessions, itr.getTxn());
listener.onTxnLoaded(hdr, itr.getTxn());
if (!itr.next())
break;
}
} finally {
if (itr != null) {
itr.close();
}
}
return highestZxid;
}
snapLog
是一個FileTxnSnapLog
類,他由一個FileSnap
和一個FileTxnLog
組成。
FileSnap
是快照檔案的工具類,擁有serialize
,deserialize
方法,可以將DataTree
物件進行序列化和反序列化。
FileTxnLog
是Transaction 日誌的工具類,通過txnLog.read
,我們拿到Snapshot檔案發生後的Transaction 日誌,通過processTransaction
將事務應用到DataTree
上,還原初態。
通過loadDatabase()
我們成功的將磁碟檔案儲存的節點資訊重新載入到了記憶體中,從這個時候開始我們可以對到來的socket進行消費。
處理 session 請求
在響應socket請求的小節中,我們看到在NIOServerCnxnFactory中啟動了一個Daemon執行緒,並在while迴圈中獲取socket請求資訊,然後分發到doIO
中執行。
ZooKeeper將每一個 socket 的連結,都認為是一個session, 並擁有一個超時時間。請求會被包裝成一個NIOServerCnxn
物件,當判斷session是首次connect到ZooKeeperServer的時候,先讀取connect資訊,在SessionTrackerImpl
中維護當前存活的session佇列。
SessionTrackerImpl
是一個獨立執行緒,專門用於檢測 session 的存活狀態。
其他非首次連線的socket資訊會通過readRequest進行消費。
RequestProcessor 任務鏈
在ZooKeeperServer::setupRequestProcessors
中建立了三個RequestProcessor物件,分別是 FinalRequestProcessor , SyncRequestProcessor 和 PrepRequestProcessor ,其中PrepRequestProcessor 和 SyncRequestProcessor 類分別繼承自 Thread 類,作為獨立執行緒執行。
readRequest
通過反序列化Packet
類,提取出Request
資訊,然後呼叫ZooKeeperServer::submitRequest
進行資料處理。
public void submitRequest(Request si) {
firstProcessor.processRequest(si);
}
對於Standalone 模式的ZooKeeperServer
,他的firstPrcessor就是PrepRequestProcessor
類
PrepRequestProcessor
PrepRequestProcessor
是整個任務鏈的起點。
PrepRequestProcessor::submitRequest
不會立即處理request請求,而是將request加入執行佇列submittedRequests
中,等待執行。
PrepRequestProcessor自身的獨立執行緒不斷從佇列中拉去Request物件,呼叫pRequest(request)
。在pRequest
中,根據Request的不同種類,將Request轉變為不同的Record物件,通過addChangeRecord將ChangeRecord加入ZooKeeperServer.outstandingChanges 中,此時節點資料並沒有同步到DataTree
中。
根據節點的三層快取模型,在獲取節點資訊時,會首先從outstandingChangesForPath中獲取資訊,當沒有找到對應的節點資訊時,再通過zkDb::getNode
獲取。
SyncRequestProcessor
SyncRequestProcessor
作為 PrepRequestProcessor
的下游消費者,負責將Transaction寫入TxnLog中,並定時構建快照檔案。
zkDatabase.append
中會將Request
寫入Transaction Log File,如果發現當前的Txn條數超過閾值,則啟動一個快照執行緒,將DataTree作為快照例項到磁碟中。
在takeSnapshot
中,通過序列化當前的DataTree結構,將snapShot儲存到磁碟上
當SyncRequestProcessor
的處理條數超過閾值1000條時,呼叫flush()命令,將任務逐個傳遞給下游的RequestProcessor
進行處理。
FinalRequestProcessor
FinalRequestProcessor
作為Standalone模式下的任務鏈終點,主要完成以下工作。
while (!zks.outstandingChanges.isEmpty() && zks.outstandingChanges.get(0).zxid <= request.zxid) {
ChangeRecord cr = zks.outstandingChanges.remove(0);
if (cr.zxid < request.zxid) {
LOG.warn("Zxid outstanding " + cr.zxid + " is less than current " + request.zxid);
}
if (zks.outstandingChangesForPath.get(cr.path) == cr) {
zks.outstandingChangesForPath.remove(cr.path);
}
}
if (request.hdr != null) {
TxnHeader hdr = request.hdr;
Record txn = request.txn;
rc = zks.processTxn(hdr, txn);
}
- 呼叫
zks.processTxn()
,將請求資訊合併到DataTree
中 - 清理掉
zks.outstandingChanges
中的冗餘資料,防止outstandingChanges
無限增長。
任務鏈總結
ZooKeeper Server中通過三層的任務鏈實現對請求的處理過程。
第一層負責在outstandingChanges中構建一個臨時的節點物件,便於後續請求能夠快速獲取對應節點最新狀態
第二層負責將請求資料轉為Transaction日誌,並記錄到磁碟中,便於重啟後的節點資料還原。同時還會根據日誌操作定時儲存快照。
第三層負責批量將請求資料合併到DataTree
中,同時清除第一層臨時構建的節點物件。
總結
ZooKeeper Server使用DataTree
在記憶體中持有所有節點資訊, 在磁碟中通過Snapshot 和 TxnFile 儲存歷史節點資料。
響應請求時,Server 將請求資料分發給一個RequestProcessor
任務鏈進行消費。
在任務鏈中,通過一個單執行緒保證資料的執行緒安全和一致性。
同樣由於在 ZooKeeper 中是通過單執行緒保證資料執行緒安全,在大訪問量級下的執行效率值得思考,之後可以看看Cluster下的ZooKeeper有沒有對這一塊做出優化。