Zookeeper源碼閱讀(十四) 單機Server
前言
前面兩篇主要說了下client-server的session相關的內容,到這裏client的內容以及client-server的連接的內容也就基本告一段落了,剩下的部分就是server部分內部的結構,zk的選舉以及server部分的工作機制等了。 這一篇主要說下單機server的啟動過程,裏面會涉及到一些server內部的工作機制和機構。
Server架構
可以看到Zookeeper的server端主要分為幾個大模塊,ZKDatabase是zk server內部的內存數據庫,內部維護了節點數據等關鍵數據,負責快照和日誌的記錄,同時也管理了session的超時和集群的選舉,其他的部分主要有負責管理session的sessiontracker,負責處理請求的請求鏈和Learner模塊(集群溝通?目前還不是特別清楚)。
單機版Server啟動
單機版Server主要流程:
從上圖中可以看到單機server啟動可以分為預啟動和初始化兩個部分。
預啟動
1. 統一由QuorumPeerMain作為啟動類
無論單機或集群,在zkServer.cmd和zkServer.sh中都配置了QuorumPeerMain作為啟動入口類。
2. 解析zoo.cfg
用過ZK的同學都知道zoo.cfg是用戶配置的zookeeper核心配置文件,ticktime,dataDir,dataLogDir,集群ip:port等都配置在其中。在實例化QuorumPeerMain對象後會去解析zoo.cfg文件。
QuorumPeerMain(Main)->QuorumPeerMain(initializeAndRun)->QuorumPeerConfig(parse)->QuorumPeerConfig(parseProperties)
parseProperties函數太長了。。。而且都是很簡單的property文件取值的操作,可以簡單看下。
3. 創建並啟動歷史文件清理器DatadirCleanupManager
DatadirCleanupManager的start方法負責自動清理歷史的快照和事務日誌。
public void start() { if (PurgeTaskStatus.STARTED == purgeTaskStatus) { LOG.warn("Purge task is already running."); return; } // Don't schedule the purge task with zero or negative purge interval. if (purgeInterval <= 0) { LOG.info("Purge task is not scheduled."); return; } timer = new Timer("PurgeTask", true);//利用了java的Timer類來做定時任務 TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);//PurgeTask是一個timertask timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));//設置頻率 purgeTaskStatus = PurgeTaskStatus.STARTED; }
4. 判斷啟動模式
if (args.length == 1 && config.servers.size() > 0) {//通過解析zoo.cfg server數量來判斷是否是集群
runFromConfig(config);//如果是集群,直接用QuorumPeerMain中集群啟動方法
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);//如果是單機就用單機啟動方法
}
5. 再次解析zoo.cfg
ZooKeeperServerMain(Main)->ZooKeeperServerMain(initializeAndRun)->ServerConfig(parse)->QuorumPeerConfig(parse)->QuorumPeerConfig(parseProperties)。
這裏之所以還要進行一次解析是因為這裏是調用的zookeeperserver的main方法,無法把原來解析的參數傳入。而且配置文件比較小,解析並不是特別耗資源,可以接受。
6. 創建ZookeeperServer實例
ZookeeperServer是server端的核心類,在啟動時會創建zookeeperserver的一個實例。
final ZooKeeperServer zkServer = new ZooKeeperServer();
到這裏位置就完成了所謂的預啟動,可以看出,在預啟動階段Zookeeper的server幹了下面幾件事:
- 清理歷史快照和log文件;
- 解析配置文件並進行初步的分析,判斷Server端狀態(Standalone/Cluster);
- 實例化ZookeeperServer。
初始化
在實例化了zookeeperserver之後,zookeeper server端的啟動過程便來到了初始化階段,這個過程也是比較長的。
首先是在runfromconfig方法中:
public void runFromConfig(ServerConfig config) throws IOException {
LOG.info("Starting server");
FileTxnSnapLog txnLog = null;
try {
// Note that this thread isn't going to be doing anything else,
// so rather than spawning another thread, we will just call
// run() in this thread.
// create a file logger url from the command line args
final ZooKeeperServer zkServer = new ZooKeeperServer();//1
// Registers shutdown handler which will be used to know the
// server error or shutdown state changes.
final CountDownLatch shutdownLatch = new CountDownLatch(1);//2
zkServer.registerServerShutdownHandler(
new ZooKeeperServerShutdownHandler(shutdownLatch));
txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(//3
config.dataDir));
zkServer.setTxnLogFactory(txnLog);
zkServer.setTickTime(config.tickTime);
zkServer.setMinSessionTimeout(config.minSessionTimeout);
zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns());
cnxnFactory.startup(zkServer);
// Watch status of ZooKeeper server. It will do a graceful shutdown
// if the server is not running or hits an internal error.
shutdownLatch.await();
shutdown();
cnxnFactory.join();
if (zkServer.canShutdown()) {
zkServer.shutdown(true);
}
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Server interrupted", e);
} finally {
if (txnLog != null) {
txnLog.close();
}
}
}
1. 創建服務器統計器ServerStats
在上面代碼的1處,也就是zookeeperserver的構造函數內實例化了Server的統計器ServerStats。
public ZooKeeperServer() {
serverStats = new ServerStats(this); //ServerStats統計了基本的server的數據如收發packet數,延遲信息等。
listener = new ZooKeeperServerListenerImpl(this);
}
簡單介紹下ServerStats:
/**
* Basic Server Statistics
*/
//從註釋也可以知道ServerStats是server數據的基礎類
public class ServerStats {
private long packetsSent;//zkserver啟動後,或是最近一次充值服務端統計信息後,服務端->客戶端發送的響應次數
private long packetsReceived;//zkserver啟動後,或是最近一次充值服務端統計信息後,服務端收到的客戶端發送的響應次數
private long maxLatency;//zkserver啟動後,或是最近一次充值服務端統計信息後,server端請求處理的最大延時
private long minLatency = Long.MAX_VALUE;//zkserver啟動後,或是最近一次充值服務端統計信息後,server端請求處理的最小延時
private long totalLatency = 0;//zkserver啟動後,或是最近一次充值服務端統計信息後,server端請求處理的總延時
private long count = 0;//zkserver啟動後,或是最近一次充值服務端統計信息後,server端處理的客戶端請求總次數
private final Provider provider;//provider對象提供部分統計數據,如下
public interface Provider {
public long getOutstandingRequests();//獲取隊列中還沒有被處理的請求數量,在zookeeperserver和finalrequestprocessor中
public long getLastProcessedZxid();//獲得目前最新的zxid
public String getState();//獲取服務器狀態
public int getNumAliveConnections();//獲取存活的客戶端連接總數
}
public ServerStats(Provider provider) {//構造器
this.provider = provider;
}
2. 創建數據管理器FileTxnSnapLog
FileTxnSnapLog是Zookeeper上層服務器和底層數據存儲之間的對接層,提供了一系列操作數據文件的接口,如事務日誌文件和快照數據文件。Zookeeper根據zoo.cfg文件中解析出的快照數據目錄dataDir和事務日誌目錄dataLogDir來創建FileTxnSnapLog。
其實這裏的FileTxnSnapLog就是包含了第3,4篇中講的快照和日誌類FileTxnLog,FileSnap的功能類,FileTxnLog,FileSnap的生成以來dataDir和snapDir(dataLogDir)來生成。
3. 設置服務器tickTime和會話超時時間限制
zkServer.setTickTime(config.tickTime);
zkServer.setMinSessionTimeout(config.minSessionTimeout);
zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
4. 創建ServerCnxnFactory
通過配置系統屬性zookeper.serverCnxnFactory來指定使用Zookeeper自己實現的NIO還是使用Netty框架作為Zookeeper服務端網絡連接工廠。
cnxnFactory = ServerCnxnFactory.createFactory();//創建ServerCnxnFactory
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns());//初始化ServerCnxnFactory
cnxnFactory.startup(zkServer);//啟動ServerCnxnFactory
之前提到過這裏利用到了反射。
static public ServerCnxnFactory createFactory() throws IOException {
String serverCnxnFactoryName =
System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);//讀取配置
if (serverCnxnFactoryName == null) {
serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();//默認是NIO實現
}
try {
ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)//如果配置了,則按照配置來實例化。反射
.getDeclaredConstructor().newInstance();
LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
return serverCnxnFactory;
} catch (Exception e) {
IOException ioe = new IOException("Couldn't instantiate "
+ serverCnxnFactoryName);
ioe.initCause(e);
throw ioe;
}
}
5. 初始化ServerCnxnFactory
NIOServerCnxnFactory(configure)
@Override
public void configure(InetSocketAddress addr, int maxcc) throws IOException {
configureSaslLogin();
thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr);//傳入的runnable對象是ServerCnxnFactory的實現類
thread.setDaemon(true);//設置為daemon線程
maxClientCnxns = maxcc;//NIO相關的設置
this.ss = ServerSocketChannel.open();
ss.socket().setReuseAddress(true);
LOG.info("binding to port " + addr);
ss.socket().bind(addr);
ss.configureBlocking(false);
ss.register(selector, SelectionKey.OP_ACCEPT);
}
可以看到,Zookeeper會初始化Thread作為ServerCnxnFactory的主線程,然後再初始化NIO服務器。而zookeeper初始化的thread傳入的runnable對象依然是ServerCnxnFactory的實現類,也就是說run的時候依然是執行ServerCnxnFactory。
6. 啟動ServerCnxnFactory主線程
@Override
public void startup(ZooKeeperServer zks) throws IOException,
InterruptedException {
start();//啟動主線程
setZooKeeperServer(zks);//設置server端對象
zks.startdata();//恢復數據等
zks.startup();
}
其中start方法啟動了線程。
public void start() {
// ensure thread is started once and only once
if (thread.getState() == Thread.State.NEW) {//如果是剛啟動
thread.start();//啟動線程
}
}
NIOServerCnxnFactory的run方法是NIO異步連接的一些基本設置如建立連接等。
7. 恢復本地數據
每次zk啟動時,都需要從本地塊找數據文件和事務日誌文件中進行數據恢復。NIOServerCnxnFactory(startdata)->ZooKeeperServer(startdata)中進行了恢復data的操作。
8. 創建並啟動session管理器
所謂的session管理器就是前面說的sessiontracker。
public synchronized void startup() {
if (sessionTracker == null) {
createSessionTracker();//創建session管理器
}
startSessionTracker();//啟動session管理器
setupRequestProcessors();//初始化zookeeper請求處理鏈
registerJMX();//註冊JMX服務
setState(State.RUNNING);
notifyAll();
}
protected void createSessionTracker() {
sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
tickTime, 1, getZooKeeperServerListener());//創建sessiontrack,初始化管理器裏的sessionsWithTimeout,expirationInterval等變量,特別的是,初始化sessionId也在這裏一起做了
}
9. 初始化Zookeeper的請求處理鏈
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this,
finalProcessor);
((SyncRequestProcessor)syncProcessor).start();
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
((PrepRequestProcessor)firstProcessor).start();
}
zookeeper請求處理方式基於責任鏈模式,也就是說在server端有多個請求處理器一次來處理一個客戶端請求。在服務器啟動的時候,會將這些處理器串聯起來形成一個處理鏈。上圖是單機server的處理器。
10. 註冊JMX服務
ZK 服務器的信息會以JXM的方式暴露給外部。這裏還不太了解。
11. 註冊ZK服務器實例
public void submitRequest(Request si) {
if (firstProcessor == null) {
synchronized (this) {
try {
// Since all requests are passed to the request
// processor it should wait for setting up the request
// processor chain. The state will be updated to RUNNING
// after the setup.
while (state == State.INITIAL) {//如果在初始化時會wait住
wait(1000);
}
} catch (InterruptedException e) {
LOG.warn("Une
通過上面代碼的註釋可以知道在初始化時請求提交處理會wait住,而這個函數是在NIOServerCnxnFactory(run)->NIOServerCnxn(doIO)->ZookeeperServer(processConnectRequest)->ZookeeperServer(createSession)->ZookeeperServer(submitRequest)中調用的,也即是前面在啟動ServerCnxnFactory主線程時便會在這裏wait住。
setState(State.RUNNING);
notifyAll();
而在這裏便會notify時線程可以完全工作。
思考
JMX需要再學習
請求鏈和日誌/快照清理過程
參考
http://www.cnblogs.com/leesf456/p/6105276.html
https://www.jianshu.com/p/47cb9e6d309d
https://my.oschina.net/pingpangkuangmo/blog/491673
https://my.oschina.net/xianggao/blog/537902
https://www.jianshu.com/p/76d6d674530b
從paxos到zk
Zookeeper源碼閱讀(十四) 單機Server