ZooKeeper原始碼分析:Quorum請求的整個流程
Quorum請求是轉發給Leader處理,並且需要得一個Follower Quorum確認的請求。這些請求包括:
1)znode的寫操作(OpCode.create,OpCode.delete,OpCode.setData,OpCode.setACL)
2)Session的建立和關閉操作(OpCode.createSession和OpCode.closeSession)
3)OpCode.multi操作。
本博文分析了Client, Follower和Leader協同完成Quorum請求的過程。另外需注意的是OpCode.sync請求也需要轉發給Leader, 但不需要得到一個Follower Quorum確認。本文也會提到OpCode.sync操作。
資料結構
Request型別物件:Server內部傳遞的資料結構。
屬性 | 說明 |
sessionId | 會話ID |
cxid | 客戶端事務ID |
type | 操作型別, 如OpCode.setData |
request | 請求Record物件,如SetDataRequest |
cnxn | Server和Client端的連線物件 |
hdr | 請求事務頭TxnHeader |
txn | 請求事務體Record,如OpCode.setData請求,則是SetDataTxn型別物件 |
zxid | ZooKeeper事務ID |
authInfo | 認證資訊 |
createTime | 建立時間 |
owner | 所有者 |
e | 處理過程中的異常 |
QuorumPacket型別物件:用於ZooKeeper伺服器之間傳遞的資料包。
屬性 | 說明 |
zxid | ZooKeeper事務ID |
data | 資料包的資料: 在Leader.REQUEST中,資料依次如下: Request.sessionId Request.cxid Request.type Request.request 在Leader.PROPOSAL中,資料依次如下: Request.hdr Request.txn 在Leader.ACK中,為null 在Leader.COMMIT中,為null |
authinfo | 認證資訊 |
Quorum請求流程
假設拓撲結構如下圖,Client A和Follower A建立連線。
資料流程圖如下。在圖中,連線線說明前的數字表示事件發的生時序,主時序是直接使用一個數字表示,並且數字越小表示越早發生(如1 Client Request是在2 Request之前發生)。對於和主時序併發的操作使用主時序序號後加上一個括號括起來的數字表示,如7(1)-n Request指和7 Request是併發的。7(1)-n中n表示以7(1)開頭的操作時序。
我們從資料流程圖中Step 1講起:Client A 發起一個Quorum請求給Follower A。
【Client A, Step 1】Client A呼叫Quorum請求對應的方法:
如呼叫Zookeeper的建構函式,會發起OpCode.createSession請求,
如呼叫Zookeeper.setData方法,會發起OpCode.setData操作。
最終會呼叫ClientCnxn.submitRequest方法將請求放入outgoingQueue佇列中,並阻塞等待Follower A反饋。而ClientCnxn.SendThread執行緒會從outgoingQueue中取出請求,併發送給Follower A。
下面程式碼Zookeeper.setData方法: Client A構建物件傳送給Follower A
publicStat setData(finalString path,bytedata[],intversion) throwsKeeperException, InterruptedException { finalString clientPath = path; PathUtils.validatePath(clientPath); //通過傳入的path構造完整serverPath finalString serverPath = prependChroot(clientPath); //構造一個Request頭 RequestHeader h=newRequestHeader(); //設定型別為setData h.setType(ZooDefs.OpCode.setData); //構造一個SetData請求體 SetDataRequest request =newSetDataRequest(); //設定需要修改node的serverPath request.setPath(serverPath); //設定需要修改的node的data request.setData(data); //設定需要修改的node的version request.setVersion(version); //構建SetDataResponse物件 SetDataResponse response =newSetDataResponse(); //提交請求,並等待返回結果 ReplyHeader r =cnxn.submitRequest(h, request, response,null); //如果r.getErr()不能0,則表示有錯誤,丟擲異常 if(r.getErr() != 0) { throwKeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } returnresponse.getStat(); }【Follower A, Step 2,3】Follower A的NIOServerCnxn類接到了Client A的請求,會呼叫ZookeeperServer.processPacket方法。該方法會構建一個Request物件,並呼叫第一個處理器FollowerRequestProcessor的processRequest方法。該方法將Request物件放入FollowerRequestProcessor.queuedRequests佇列中。FollowerRequestProcessor處理器執行緒會迴圈從FollowerRequestProcessor.queuedRequests佇列中取出Request物件,並繼續下面步驟:
1)呼叫下一個處理器CommitProcessor的processRequest方法。該方法將Request物件放入CommitProcessor.queuedRequests佇列中;
2)通過Request.type判斷Request型別。若發現是一個Quorum請求,會直接呼叫Learner.request(request)方法。該方法將Request物件封裝成一個Leader.Request的Quorum資料包,併發送給Leader。
OpCode.sync操作也將呼叫Learner.request方法將請求轉發給Leader,但在這之前會先將Request物件加入到pendingSyncs佇列中。
FollowerRequestProcessor的run方法如下:
publicvoidrun() { try{ while(!finished) { //從queuedRequests佇列中取出Request物件 Requestrequest =queuedRequests.take(); if(LOG.isTraceEnabled()) { ZooTrace. logRequest(LOG,ZooTrace.CLIENT_REQUEST_TRACE_MASK , 'F', request,""); } //當request是Request.requestOfDeath,一個poison pill, 就退出while迴圈, //並結束FollowerRequestProcessor執行緒 if(request ==Request.requestOfDeath) { break; } //我們在提交這個request到leader之前,把這個request傳遞到下一個處理器。 //這樣我們就準備好從Leader那得到Response nextProcessor.processRequest(request); //只有Quorum操作和sync操作才會呼叫Follower.request方法, 轉發Leader.REQUEST資料包給Leader //sync操作和Quorum操作有一些不同, //我們需要保持跟蹤這個sync操作對於的Follower已經掛起,所有我們將它加入pendingSyncs佇列中。 switch(request.type ) { caseOpCode.sync: //將OpCode.sync放入pendingSyncs佇列中 zks.pendingSyncs .add(request); zks.getFollower().request(request); break; caseOpCode.create: caseOpCode.delete: caseOpCode.setData: caseOpCode.setACL: caseOpCode.createSession: caseOpCode.closeSession: caseOpCode.multi: //Quorum請求,直接呼叫Folloer.request方法 zks.getFollower().request(request); break; } } }catch(Exception e) { LOG.error("Unexpected exception causing exit", e); } LOG.info("FollowerRequestProcessor exited loop!"); }【Leader A, Step 4】Leader A的LearnerHandler執行緒會迴圈讀取從Learner獲得的Quorum資料包。如果資料包是Learner.REQUEST型別,則會解析Quorum資料包的內容,檢查操作型別。
如果操作型別不是OpCode.sync, 則會構造Request物件。並呼叫ZooKeeperServer.submitRequest方法(和上面Follower接收到請求所使用的submitRequest方法是同一個方法),並最終會呼叫第一個處理器PrepRequestProcessor的submitRequest方法,將Request物件放入PrepRequestProcessor.submittedRequests佇列中。
如果操作型別是OpCode.sync, 會構造Request型別的子類LearnerSyncRequest物件,並同樣呼叫PrepRequestProcessor的submitRequest方法。
LearnerHandler.run方法中對Leader.REQUEST資料包的處理程式碼如下:
publicvoidrun () { ...... caseLeader.REQUEST: bb =ByteBuffer. wrap(qp.getData()); //從QuorumPacket中讀取sesssionId sessionId= bb.getLong(); //從QuorumPacket中讀取cxid cxid= bb.getInt(); //從QuorumPacket中讀取操作型別 type= bb.getInt(); bb = bb.slice(); Requestsi; //如果操作Code的型別是OpCode.sync,則構造LearnerSyncRequest物件 if(type==OpCode.sync){ si =newLearnerSyncRequest(this,sessionId,cxid,type, bb,qp.getAuthinfo()); } //如果操作Code的型別不是OpCode.sync, 則構造Request物件 else{ si =newRequest(null,sessionId,cxid,type, bb,qp.getAuthinfo()); } //設定owner si.setOwner(this); //提交請求 leader.zk .submitRequest(si); break; ...... }PrepRequestProcessor處理器執行緒會從PrepRequestProcessor.submittedRequests佇列中取出Request物件,並根據Request型別構建TxnHeader和Record物件,然後分別賦給Request.hdr和Request.txn。之後會呼叫下一個處理器ProposalRequestProcessor的processRequest方法,將Request物件傳遞給處理器ProposalRequestProcessor。(如果發現有異常會則會建立一個錯誤Record型別物件)
PrepRequestProcessor的run方法如下:
publicvoidrun(){ try{ while(true) { //從submittedRequests佇列中取去第一個request物件 Requestrequest =submittedRequests.take(); longtraceMask =ZooTrace.CLIENT_REQUEST_TRACE_MASK; //如果是OpCode.ping操作,則將traceMask設定成ZooTrace. CLIENT_PING_TRACE_MASK if(request.type ==OpCode.ping) { traceMask =ZooTrace. CLIENT_PING_TRACE_MASK; } if(LOG.isTraceEnabled()) { ZooTrace. logRequest(LOG, traceMask,'P', request,""); } //如果request是一個requestOfDeath, 則退出while迴圈。 if(Request.requestOfDeath == request) { break; } //處理請求 pRequest(request); } }catch(InterruptedException e) { LOG.error("Unexpected interruption", e); }catch(RequestProcessorExceptione) { if(e.getCause()instanceofXidRolloverException) { LOG.info(e.getCause().getMessage()); } LOG.error("Unexpected exception", e); }catch(Exception e) { LOG.error("Unexpected exception", e); } LOG.info("PrepRequestProcessor exited loop!"); }PrepRequestProcessor的pRequest2Txn方法,該方法會在pRequest方法中呼叫,構建TxnHeader和Record物件。下面是關於OpCode.setData請求的程式碼:
protectedvoidpRequest2Txn(inttype,longzxid,Requestrequest,Recordrecord,booleandeserialize) throwsKeeperException,IOException,RequestProcessorException { request.hdr =newTxnHeader(request.sessionId , request.cxid, zxid, zks.getTime(), type); switch(type) { ..... caseOpCode.setData: //檢查session zks.sessionTracker .checkSession(request.sessionId, request.getOwner()); //將record轉成SetDataRequest型別 SetDataRequestsetDataRequest = (SetDataRequest)record; if(deserialize) //將Request.reques資料反序列化成setDataRequest物件 ByteBufferInputStream.byteBuffer2Record(request. request, setDataRequest); //獲取需要需要修改的znode的path path= setDataRequest.getPath(); //獲取記憶體資料中獲取path對於的znode資訊 nodeRecord= getRecordForPath(path); //檢查對znode是否有寫許可權 checkACL(zks,nodeRecord.acl ,ZooDefs.Perms.WRITE, request.authInfo); //獲取客戶端設定的版本號 version= setDataRequest.getVersion(); //獲取節點當前版本號 intcurrentVersion =nodeRecord.stat.getVersion(); //如果客戶端設定的版本號不是-1,且不等於當前版本號,則丟擲KeeperException.BadVersionException異常 if(version!= -1 &&version!= currentVersion) { thrownewKeeperException.BadVersionException(path); } //version等於當前版本加1 version= currentVersion + 1; //構建SetDataTxn物件,並賦給request.txn request. txn =newSetDataTxn(path, setDataRequest.getData(),version); //拷貝nodeRecord nodeRecord=nodeRecord.duplicate(request.hdr.getZxid()); //將nodeRecord的當前版本號設定為version nodeRecord.stat.setVersion(version); //將nodeRecord放入outstandingChanges //path和nodeRecord map放入outstandingChangesForPath addChangeRecord(nodeRecord); break; ...... } }【Leader A, Step 5,6】處理器ProposalRequestProcessor會先判斷Request物件是否是LearnerSyncRequest型別。
如果不是LearnerSyncRequest型別(也就是Quorum請求),會按如下步驟執行:
1)呼叫下一個處理器CommitProcessor的processRequest方法,將Request物件放入CommitProcessor.queuedRequests佇列中;
2)將proposal傳送到所有的Follower;
3)呼叫SyncRequestProcessor處理器的processRequest方法。該方法會將請求放入SyncRequestProcessor.queuedRequests佇列中。(【Leader A, Step 7(1)】SyncRequestProcessor執行緒會記錄Log, 然後傳遞給SendAckRequestProcessor。SendAckRequestProcessor會發送一個Leader.ACK的Quorum資料包給自己)
如果是LearnerSyncRequest型別,說明該請求是OpCode.sync操作,則會直接呼叫Leader.processSync方法。
ProposalRequestProcessor的processRequest方法如下:
publicvoidprocessRequest(Requestrequest)throwsRequestPrzocessorException{ //如果是sync操作,則呼叫Leader.processSync方法 if(requestinstanceofLearnerSyncRequest){ zks.getLeader().processSync((LearnerSyncRequest)request); } //如果不是sync操作 else{ //傳遞到下一個處理器 nextProcessor.processRequest(request); if(request.hdr !=null) { // We need to sync and get consensus on any transactions try{ //傳送proposal給所有的follower zks.getLeader().propose(request); }catch(XidRolloverExceptione) { thrownewRequestProcessorException(e.getMessage(), e); } //呼叫SyncRequestProcessor處理器的processRequest方法