Zookeeper原始碼解析——服務端啟動流程
一 啟動類
從bin/zkServer.cmd可以看出啟動類是org.apache.zookeeper.server.quorum.QuorumPeerMain,類結構如下:
一個普通的類,主要有個QuorumPeer(zookeeper叢集版啟動時節點用QuorumPeer表示)的變數以及啟動main函式和兩個初始化函式
二 啟動流程
QuorumPeerMain.initializeAndRun(args):解析配置,啟動資料自動清除的定時任務,叢集版則啟動叢集版程式碼。
1 解析配置
QuorumPeerConfig.parse
public void parse (String path) throws ConfigException {
LOG.info("Reading configuration from: " + path);
try {
//構造器模式
File configFile = (new VerifyingFileFactory.Builder(LOG)
.warnForRelativePath()
.failForNonExistingPath()
.build()).create(path);
Properties cfg = new Properties();
FileInputStream in = new FileInputStream(configFile);
try {
cfg.load(in);
configFileStr = path;
} finally {
in.close();
}
parseProperties(cfg);
} catch (IOException e) {
throw new ConfigException("Error processing " + path, e);
} catch (IllegalArgumentException e) {
throw new ConfigException("Error processing " + path, e);
}
//支援動態配置
if (dynamicConfigFileStr!=null) {
try {
Properties dynamicCfg = new Properties();
FileInputStream inConfig = new FileInputStream(dynamicConfigFileStr)
...
setupQuorumPeerConfig(dynamicCfg, false);
...
}
2 資料自動清除任務
DatadirCleanupManager,包含一個Timer定時器和PurgeTask清理任務。
首先認知下zookeeper主要存放了兩類檔案,snapshot和log,snapshot是資料的快照,log是與snapshot關聯一致的事務日誌
3 叢集版啟動程式碼
這裡開始構建QuorumPeer例項,根據配置進行set,並start
3.1 QuorumPeer幾個重要的配置屬性:
- ServerCnxnFactory cnxnFactory:預設實現是NIOServerCnxnFactory,主要是負責和客戶端建立連線和通訊
- FileTxnSnapLog logFactory:使用者日誌記錄和snapshot儲存,能根據它加載出資料到ZKDatabase中,同時能將ZKDatabase中的資料以及session儲存到快照日誌檔案中
- int electionType:選舉演算法的型別。預設是3,採用的是FastLeaderElection選舉演算法,
- long myid:就是myid檔案中寫入的數字,節點標識。
- int tickTime:session檢查的心跳間隔
- minSessionTimeout、maxSessionTimeout:限制客戶端給出的sessionTimeout時間
- initLimit:初始化階段,和leader通訊超時、接受其他過半節點響應的超時設定,超時時間是initLimit*tickTime;
- syncLimit:初始化階段後,代替initLimit作用,作為後續連線的超時設定,時間也是syncLimit*tickTime
- ZKDatabase zkDb:儲存ZooKeeper樹形資料,是伺服器的記憶體資料庫
- quorumConfig:用於驗證是節點是否已經認同了。預設採用的是QuorumMaj,即最簡單的數量過半即可,不考慮權重問題
- learnerType:兩種,PARTICIPANT, OBSERVER。PARTICIPANT參與投票,可能成為Follower,也可能成為Leader。OBSERVER不參與投票,角色不會改變。
- quorumPeers: QuorumServer包含ip、和Leader用的通訊埠、選舉投票用的埠
public void runFromConfig(QuorumPeerConfig config)
throws IOException, AdminServerException
{
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}
LOG.info("Starting quorum peer");
try {
ServerCnxnFactory cnxnFactory = null;
ServerCnxnFactory secureCnxnFactory = null;
if (config.getClientPortAddress() != null) {
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns(),
false);
}
if (config.getSecureClientPortAddress() != null) {
secureCnxnFactory = ServerCnxnFactory.createFactory();
secureCnxnFactory.configure(config.getSecureClientPortAddress(),
config.getMaxClientCnxns(),
true);
}
quorumPeer = getQuorumPeer();
quorumPeer.setTxnFactory(new FileTxnSnapLog(
config.getDataLogDir(),
config.getDataDir()));
quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
quorumPeer.enableLocalSessionsUpgrading(
config.isLocalSessionsUpgradingEnabled());
//quorumPeer.setQuorumPeers(config.getAllMembers());
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setConfigFileName(config.getConfigFilename());
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
if (config.getLastSeenQuorumVerifier()!=null) {
quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
}
quorumPeer.initConfigInZKDatabase();
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
quorumPeer.start();
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
}
}
3.2 quorumPeer.start()
QuorumPeer繼承自Thread,start流程如下:
3.2.1 載入資料庫
ZKDatabase: 類註釋說明這是zk的記憶體資料庫,包含了session、datatree和commited log 資訊
protected DataTree dataTree;
protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;//session的id以及對應的超時時間,session id可關聯log
protected LinkedList<Proposal> committedLog = new LinkedList<Proposal>();
DataTree:
private final ConcurrentHashMap<String, DataNode> nodes =
new ConcurrentHashMap<String, DataNode>();//維護path對應的node
private final WatchManager dataWatches = new WatchManager();//資料更新的觸發管理
private final WatchManager childWatches = new WatchManager(); //自節點更新的觸發管理
private final PathTrie pTrie = new PathTrie();//跟蹤配額節點的資訊
private final Map<Long, HashSet<String>> ephemerals =
new ConcurrentHashMap<Long, HashSet<String>>();//維護session的臨時節點
DataNode包含有children,data[]儲存本節點的資料,stat 是否持久化的狀態資訊
TODO WatchManager解析
QuorumPeer.loadDataBase,通過zkDb.loadDataBase,從最新的 snap中恢復zkDb,然後從最新的log,將未持久化的資訊replay。
zkDb.loadDataBase之後,可以獲得最新的zkid,進而確定選舉進入了哪一個代(epoch)
3.2.3 啟動客戶端監聽服務
預設實現:NIOServerCnxnFactory.start()
public void start() {
stopped = false;
if (workerPool == null) {
workerPool = new WorkerService(
"NIOWorker", numWorkerThreads, false);
}
for(SelectorThread thread : selectorThreads) {
if (thread.getState() == Thread.State.NEW) {
thread.start();
}
}
// ensure thread is started once and only once
if (acceptThread.getState() == Thread.State.NEW) {
acceptThread.start();
}
if (expirerThread.getState() == Thread.State.NEW) {
expirerThread.start();
}
}
看下類註釋:
/**
* NIOServerCnxnFactory implements a multi-threaded ServerCnxnFactory using
* NIO non-blocking socket calls. Communication between threads is handled via
* queues.
*
* - 1 accept thread, which accepts new connections and assigns to a
* selector thread
* - 1-N selector threads, each of which selects on 1/N of the connections.
* The reason the factory supports more than one selector thread is that
* with large numbers of connections, select() itself can become a
* performance bottleneck.
* - 0-M socket I/O worker threads, which perform basic socket reads and
* writes. If configured with 0 worker threads, the selector threads
* do the socket I/O directly.
* - 1 connection expiration thread, which closes idle connections; this is
* necessary to expire connections on which no session is established.
*
* Typical (default) thread counts are: on a 32 core machine, 1 accept thread,
* 1 connection expiration thread, 4 selector threads, and 64 worker threads.
*/
一個初始化的工作執行緒池workerPool, 包含
ArrayList<ExecutorService> workers
,每個worker都是一個容量為1的固定執行緒池workers.add(Executors.newFixedThreadPool(1, new DaemonThreadFactory(threadNamePrefix, i)))
有多個Selector執行緒,每個Selector執行緒只負責select各自負責的連線,避免連線數太大,使得selector變成效能瓶頸
一個Accept執行緒,只處理新來的連線,將連線交給Selector
一個Expire執行緒,檢查並關閉空閒的連線
start()將啟動上面所有執行緒
先看下Accept執行緒:
初始化是在NIOServerCnxnFactory.configure()。ServerSocketChannel被開啟,並繫結port,和selectorThreads 一起傳遞給了acceptThread的建構函式。acceptThread在構造時,就關注了serverSocket的OP_ACCEPT事件。並在run迴圈中,監聽新連線的事件到來,最後在doAccept()中將連線的socket傳給其中一個selectorThread(呼叫selectorThread.addAcceptedConnection)。
public void configure(InetSocketAddress addr, int maxcc, boolean secure) throws IOException {
...
this.ss = ServerSocketChannel.open();
ss.socket().setReuseAddress(true);
LOG.info("binding to port " + addr);
ss.socket().bind(addr);
ss.configureBlocking(false);
acceptThread = new AcceptThread(ss, addr, selectorThreads);
}
public AcceptThread(ServerSocketChannel ss, InetSocketAddress addr,
Set<SelectorThread> selectorThreads) throws IOException {
super("NIOServerCxnFactory.AcceptThread:" + addr);
this.acceptSocket = ss;
this.acceptKey =
acceptSocket.register(selector, SelectionKey.OP_ACCEPT);
this.selectorThreads = Collections.unmodifiableList(
new ArrayList<SelectorThread>(selectorThreads));
selectorIterator = this.selectorThreads.iterator();
}
private boolean doAccept() {
boolean accepted = false;
SocketChannel sc = null;
try {
sc = acceptSocket.accept();
...
SelectorThread selectorThread = selectorIterator.next();
if (!selectorThread.addAcceptedConnection(sc)) {
throw new IOException(
"Unable to add connection to selector queue"
+ (stopped ? " (shutdown in progress)" : ""));
}
} catch (IOException e) {
...
}
return accepted;
}
再看下 selectorThread執行緒。
先看下run迴圈
public void run() {
try {
while (!stopped) {
try {
//接收讀寫的事件,併兼讀寫操作交給workerThread進行
select();
//處理新的連線,關注讀,並新增NIOServerCnxn
processAcceptedConnections();
//select之後會暫時不再關注對應socket的讀寫事件
//這裡來恢復關注。
processInterestOpsUpdateRequests();
} catch (RuntimeException e) {
LOG.warn("Ignoring unexpected runtime exception", e);
} catch (Exception e) {
LOG.warn("Ignoring unexpected exception", e);
}
}
...
//退出迴圈後對連線進行關閉清理
}
private void select() {
...
selector.select();
Set<SelectionKey> selected = selector.selectedKeys();
ArrayList<SelectionKey> selectedList =
new ArrayList<SelectionKey>(selected);
Collections.shuffle(selectedList);
Iterator<SelectionKey> selectedKeys = selectedList.iterator();
while(!stopped && selectedKeys.hasNext()) {
...
if (key.isReadable() || key.isWritable()) {
handleIO(key);
}
}
...
}
private void handleIO(SelectionKey key) {
IOWorkRequest workRequest = new IOWorkRequest(this, key);
NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
// Stop selecting this key while processing on its
// connection
cnxn.disableSelectable();
key.interestOps(0);
touchCnxn(cnxn);
workerPool.schedule(workRequest);
}
selectorThread的addAcceptedConnection會將新來的連線socket入隊,通過wakeupSelector()結束run中的select()阻塞,使其能馬上processAcceptedConnections。
wokerPool中的執行緒對socket的資料處理,最終由ZookeeperServer.processPacket完成。這裡資料的處理參考 資料序列化一節
3.3.4 啟動管理服務
預設是Jetty提供服務,會佔用8080埠,在做單機偽叢集部署時,要禁用Jetty
3.3.5 選舉
會指定一個選舉演算法createElectionAlgorithm(3),預設是3,快速選舉,其他演算法準備過期了。FastLeaderElection fle = new FastLeaderElection(this, qcm); fle.start();
FastLeaderElection需要一個QuorumPeer節點物件,QuorumCnxManager節點連線管理器(管理與其他節點的連線),sendqueue 和recvqueue 兩個佇列的成員變數,一個Messenger,負責訊息的傳送和接收
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){
this.stop = false;
this.manager = manager;
starter(self, manager);
}
private void starter(QuorumPeer self, QuorumCnxManager manager) {
this.self = self;
proposedLeader = -1;
proposedZxid = -1;
sendqueue = new LinkedBlockingQueue<ToSend>();
recvqueue = new LinkedBlockingQueue<Notification>();
this.messenger = new Messenger(manager);
}
Messenger(QuorumCnxManager manager) {
this.ws = new WorkerSender(manager);
this.wsThread = new Thread(this.ws,
"WorkerSender[myid=" + self.getId() + "]");
this.wsThread.setDaemon(true);
this.wr = new WorkerReceiver(manager);
this.wrThread = new Thread(this.wr,
"WorkerReceiver[myid=" + self.getId() + "]");
this.wrThread.setDaemon(true);
}
FastLeaderElection.start(),直接呼叫mesenger.start(),啟動訊息的傳送和接收
訊息傳送由WorkerSender實現
class WorkerSender extends ZooKeeperThread {
volatile boolean stop;
QuorumCnxManager manager;
WorkerSender(QuorumCnxManager manager){
super("WorkerSender");
this.stop = false;
this.manager = manager;
}
public void run() {
while (!stop) {
try {
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
if(m == null) continue;
process(m);
} catch (InterruptedException e) {
break;
}
}
LOG.info("WorkerSender is down");
}
/**
* Called by run() once there is a new message to send.
*
* @param m message to send
*/
void process(ToSend m) {
ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
m.leader,
m.zxid,
m.electionEpoch,
m.peerEpoch,
m.configData);
manager.toSend(m.sid, requestBuffer);
}
}
訊息接收由WorkerReceiver實現
Message response;
while (!stop) {
// Sleeps on receive
response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
//解包
...
Notification n = new Notification();
//繼續解包,版本校驗
//如果訊息來自不參與選舉的伺服器,則直接返回自己的情況 if(!self.getCurrentAndNextConfigVoters().contains(response.sid)) {
Vote current = self.getCurrentVote();
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(ToSend.mType.notification,
current.getId(),
current.getZxid(),
logicalclock.get(),
self.getPeerState(),
response.sid,
current.getPeerEpoch(),
qv.toString().getBytes());
sendqueue.offer(notmsg);
} else {
/*
* If this server is looking, then send proposed leader
*/
if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
recvqueue.offer(n);
/*
* Send a notification back if the peer that sent this
* message is also looking and its logical clock is
* lagging behind.
*/
if((ackstate == QuorumPeer.ServerState.LOOKING)
&& (n.electionEpoch < logicalclock.get())){
Vote v = getVote();
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(ToSend.mType.notification,
v.getId(),
v.getZxid(),
logicalclock.get(),
self.getPeerState(),
response.sid,
v.getPeerEpoch(),
qv.toString().getBytes());
sendqueue.offer(notmsg);
}
} else {
/*
* If this server is not looking, but the one that sent the ack
* is looking, then send back what it believes to be the leader.
*/
Vote current = self.getCurrentVote();
if(ackstate == QuorumPeer.ServerState.LOOKING){
if(LOG.isDebugEnabled()){
LOG.debug("Sending new notification. My id ={} recipient={} zxid=0x{} leader={} config version = {}",
self.getId(),
response.sid,
Long.toHexString(current.getZxid()),
current.getId(),
Long.toHexString(self.getQuorumVerifier().getVersion()));
}
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(
ToSend.mType.notification,
current.getId(),
current.getZxid(),
current.getElectionEpoch(),
self.getPeerState(),
response.sid,
current.getPeerEpoch(),
qv.toString().getBytes());
sendqueue.offer(notmsg);
}
}
總結下這段接收的流程:
3.2.6 Run迴圈
while(running)中根據zk狀態,作不同處理
- LOOKING:選舉階段,不斷參與選舉(lookForLeader()),直至選舉結束(屆時完成狀態切換)
- OBSERVING:觀察階段,嘗試與Leader通訊,獲取自己的狀態資訊
- LEADING:Leader,構造Leader服務,並呼叫lead()後阻塞,只有Leader掛了後,再啟動則回到LOOKING狀態
- FOLLOWING:Follower, 構造Follower服務,並呼叫followLeader()後阻塞,只有掛了,重新回到LOOKING狀態