zookeeper原理之投票的網路通訊流程
阿新 • • 發佈:2020-08-22
通訊流程圖:
接收資料 Notification 和傳送 ToSend
通訊過程原始碼分析
每個 zk 服務啟動後建立 socket 監聽
FastLeaderElection.lookForLeader
這個方法在前面分析過,裡面會呼叫 sendNotifications 來發送投票請求
FastLeaderElection.sendqueue
sendQueue 這個佇列的資料,是通過 WorkerSender 來進行獲取併發送的。而這個 WorkerSender 執行緒,在構建 fastLeaderElection 時,會啟動
ToSender | Notification |
leader; 被推薦的伺服器 sid zxid; 被推薦的伺服器當前最新的事務 id peerEpoch; 被推薦的伺服器當前所處的 epoch electionepoch; 當前伺服器所處的 epoch stat 當前伺服器狀態 sid 接收訊息的伺服器 sid(myid) | leader; //被推薦的伺服器 sid zxid; 被推薦的伺服器最新事務 id peerEpoch; 被推薦的伺服器當前所處的 epoch electionEpoch 選舉伺服器所處的 epoch stat; 選舉伺服器當前的狀態 sid; 選舉伺服器的 sid |
protected Election createElectionAlgorithm(int electionAlgorithm){ //…. case 3: qcm = createCnxnManager(); QuorumCnxManager.Listener listener = qcm.listener; if(listener != null){ listener.start(); } // 啟動監聽listener 實現了執行緒,所以在 run 方法中可以看到構建ServerSocket 的請求,這裡專門用來接收其他zkServer // 的投票請求 // 這塊後續再分析 @Override public void run() { int numRetries = 0; InetSocketAddress addr; while((!shutdown) && (numRetries < 3)){ try { ss = new ServerSocket(); } } } }
public Vote lookForLeader() throws InterruptedException { //省略部分程式碼 sendNotifications(); //這個方法,會把當前zk 伺服器的資訊新增到 sendqueue /* * Loop in which we exchange notifications until we find a leader */ while ((self.getPeerState() == ServerState.LOOKING) && //省略部分程式碼 }
class WorkerSender extends ZooKeeperThread { public void run() { while (!stop) { try {//從佇列中獲取 ToSend 物件 ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS); if(m == null) continue; process(m); //省略部分程式碼 void process(ToSend m) { ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), m.leader, m.zxid, m.electionEpoch, m.peerEpoch); manager.toSend(m.sid, requestBuffer); // 這裡就是呼叫 QuorumCnxManager // 進行訊息傳送 } } } } }QuorumCnxManager.toSend
public void toSend(Long sid, ByteBuffer b) { if (this.mySid == sid) { // 如果接受者是自己,直接放置到接收佇列 b.position(0); addToRecvQueue(new Message(b.duplicate(), sid)); } else { // 否則傳送到對應的傳送佇列上 ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY); // 判斷當前的 sid 是否已經存在於傳送佇列,如果是,則直接把已經存在的資料傳送出去 ArrayBlockingQueue<ByteBuffer> bqExisting = queueSendMap.putIfAbsent(sid, bq); if (bqExisting != null) { addToSendQueue(bqExisting, b); } else { addToSendQueue(bq, b); } connectOne(sid); // 連線申請呼叫鏈 connectOne-->initiateConnection- // ->startConnection , startConnection 就是傳送方啟動入口 } }startConnection
private boolean startConnection(Socket sock, Long sid) { // 省略部分程式碼 if (sid > this.mySid) { // 為了防止重複建立連線,只允許 sid 大的主動連線 sid 小的 closeSocket(sock); } else { // 構建一個傳送執行緒和接收執行緒,負責針對當前連線的資料傳遞,後續的邏輯比較簡單,就不做分析 SendWorker sw = new SendWorker(sock, sid); RecvWorker rw = new RecvWorker(sock, din, sid, sw); sw.setRecv(rw); } }SendWorker 會監聽對應 sid 的阻塞佇列,啟動的時候回如果佇列為空時會重新發送一次最前最後的訊息,以防上一次處理是伺服器異常退出,造成上一條訊息未處理成功;然後就是不停監聽隊裡,發現有訊息時呼叫send 方法RecvWorker:RecvWorker 不停監聽 socket 的 inputstream,讀取訊息放到訊息接收佇列中,訊息放入佇列中,qcm 的流程就完畢了。 QuorumCnxManager.Listener listener 監聽到客戶端請求之後,開始處理訊息
public void run() { // 省略部分程式碼 while (!shutdown) { Socket client = ss.accept(); setSockOpts(client); LOG.info("Received connection request" + client.getRemoteSocketAddress()); if (quorumSaslAuthEnabled) { receiveConnectionAsync(client); } else { receiveConnection(client); // 接收客戶端請求 } } }QuorumCnxManager.receiveConnection
public void receiveConnection(final Socket sock) { DataInputStream din = null; try { // 獲取客戶端的資料包 din = new DataInputStream(new BufferedInputStream(sock.getInputStream())); handleConnection(sock, din);// 呼叫 handle 進行處理 } catch (IOException e) { LOG.error("Exception handling connection, addr: {}, closing server connection", sock.getRemoteSocketAddress()); closeSocket(sock); } }handleConnection
private void handleConnection(Socket sock, DataInputStream din)throws IOException { Long sid = null; try { //獲取客戶端的 sid,也就是 myid sid = din.readLong(); if (sid < 0) { sid = din.readLong(); if (sid < this.mySid) { //為了防止重複建立連線,只允許 sid 大的主動連線 sid 小的 SendWorker sw = senderWorkerMap.get(sid); if (sw != null) { sw.finish();//關閉連線 } LOG.debug("Create new connection to server: " + sid); closeSocket(sock);//關閉連線 connectOne(sid);//向 sid 發起連線 } else {//同樣,構建一個 SendWorker 和RecvWorker 進行傳送和接收資料 SendWorker sw = new SendWorker(sock, sid); RecvWorker rw = new RecvWorker(sock, din, sid, sw); sw.setRecv(rw); } } } }