1. 程式人生 > 實用技巧 >zookeeper原理之投票的網路通訊流程

zookeeper原理之投票的網路通訊流程

通訊流程圖:

接收資料 Notification 和傳送 ToSend
ToSender Notification
leader; 被推薦的伺服器 sid zxid; 被推薦的伺服器當前最新的事務 id peerEpoch; 被推薦的伺服器當前所處的 epoch electionepoch; 當前伺服器所處的 epoch stat 當前伺服器狀態 sid 接收訊息的伺服器 sid(myid) leader; //被推薦的伺服器 sid zxid; 被推薦的伺服器最新事務 id peerEpoch; 被推薦的伺服器當前所處的 epoch electionEpoch 選舉伺服器所處的 epoch stat; 選舉伺服器當前的狀態 sid; 選舉伺服器的 sid
通訊過程原始碼分析 每個 zk 服務啟動後建立 socket 監聽
protected Election createElectionAlgorithm(int electionAlgorithm){
	//….
	case 3:
		qcm = createCnxnManager();
		QuorumCnxManager.Listener listener = 
		qcm.listener;
		if(listener != null){
			listener.start();
		}
	// 啟動監聽listener 實現了執行緒,所以在 run 方法中可以看到構建ServerSocket 的請求,這裡專門用來接收其他zkServer
	// 的投票請求
	// 這塊後續再分析
	@Override
	public void run() {
			int numRetries = 0;
			InetSocketAddress addr;
			while((!shutdown) && (numRetries < 3)){
				try {
					ss = new ServerSocket();
				}
			}
		}
	}
FastLeaderElection.lookForLeader 這個方法在前面分析過,裡面會呼叫 sendNotifications 來發送投票請求
public Vote lookForLeader() throws InterruptedException {
	 //省略部分程式碼
	 sendNotifications(); //這個方法,會把當前zk 伺服器的資訊新增到 sendqueue
	 /*
	 * Loop in which we exchange 
	notifications until we find a leader
	 */
	 while ((self.getPeerState() == ServerState.LOOKING) &&
	 //省略部分程式碼
}
FastLeaderElection.sendqueue sendQueue 這個佇列的資料,是通過 WorkerSender 來進行獲取併發送的。而這個 WorkerSender 執行緒,在構建 fastLeaderElection 時,會啟動
class WorkerSender extends ZooKeeperThread {
	public void run() {
		while (!stop) {
			try {//從佇列中獲取 ToSend 物件
				ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
				if(m == null) continue;
				process(m);
				//省略部分程式碼
				void process(ToSend m) {
					ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
					m.leader, m.zxid,
					m.electionEpoch,
					m.peerEpoch);
					manager.toSend(m.sid, requestBuffer); // 這裡就是呼叫 QuorumCnxManager
														// 進行訊息傳送
				}
			}
		}
	}
}
QuorumCnxManager.toSend
public void toSend(Long sid, ByteBuffer b) {

		if (this.mySid == sid) { // 如果接受者是自己,直接放置到接收佇列
			b.position(0);
			addToRecvQueue(new Message(b.duplicate(), sid));
		} else { // 否則傳送到對應的傳送佇列上
			ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY);
			// 判斷當前的 sid 是否已經存在於傳送佇列,如果是,則直接把已經存在的資料傳送出去
			ArrayBlockingQueue<ByteBuffer> bqExisting = queueSendMap.putIfAbsent(sid, bq);
			if (bqExisting != null) {
				addToSendQueue(bqExisting, b);
			} else {
				addToSendQueue(bq, b);
			}
			connectOne(sid); // 連線申請呼叫鏈 connectOne-->initiateConnection-
								// ->startConnection , startConnection 就是傳送方啟動入口
		}
	}
startConnection
private boolean startConnection(Socket sock, Long sid) {
		// 省略部分程式碼
		if (sid > this.mySid) {
			// 為了防止重複建立連線,只允許 sid 大的主動連線 sid 小的
			closeSocket(sock);
		} else {
			// 構建一個傳送執行緒和接收執行緒,負責針對當前連線的資料傳遞,後續的邏輯比較簡單,就不做分析
			SendWorker sw = new SendWorker(sock, sid);
			RecvWorker rw = new RecvWorker(sock, din, sid, sw);
			sw.setRecv(rw);
		}
	}
SendWorker 會監聽對應 sid 的阻塞佇列,啟動的時候回如果佇列為空時會重新發送一次最前最後的訊息,以防上一次處理是伺服器異常退出,造成上一條訊息未處理成功;然後就是不停監聽隊裡,發現有訊息時呼叫send 方法RecvWorker:RecvWorker 不停監聽 socket 的 inputstream,讀取訊息放到訊息接收佇列中,訊息放入佇列中,qcm 的流程就完畢了。 QuorumCnxManager.Listener listener 監聽到客戶端請求之後,開始處理訊息
public void run() {
		// 省略部分程式碼
		while (!shutdown) {
			Socket client = ss.accept();
			setSockOpts(client);
			LOG.info("Received connection request" + client.getRemoteSocketAddress());
			if (quorumSaslAuthEnabled) {
				receiveConnectionAsync(client);
			} else {
				receiveConnection(client); // 接收客戶端請求
			}
		}
	}
QuorumCnxManager.receiveConnection
public void receiveConnection(final Socket sock) {
		DataInputStream din = null;
		try {
			// 獲取客戶端的資料包
			din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
			handleConnection(sock, din);// 呼叫 handle 進行處理
		} catch (IOException e) {
			LOG.error("Exception handling connection, addr: {}, closing server connection",
					sock.getRemoteSocketAddress());
			closeSocket(sock);
		}
	}
handleConnection
private void handleConnection(Socket sock, DataInputStream din)throws IOException {
			 Long sid = null;
			 try {
				 //獲取客戶端的 sid,也就是 myid
				 sid = din.readLong();
			 	if (sid < 0) { 
			 		sid = din.readLong();
			 		if (sid < this.mySid) {
			 			//為了防止重複建立連線,只允許 sid 大的主動連線 sid 小的
			 			SendWorker sw = senderWorkerMap.get(sid);
			 			if (sw != null) {
			 				sw.finish();//關閉連線
			 			}
			 			LOG.debug("Create new connection to server: " + sid);
			 			closeSocket(sock);//關閉連線
			 			connectOne(sid);//向 sid 發起連線
			 		} else {//同樣,構建一個 SendWorker 和RecvWorker 進行傳送和接收資料
			 			SendWorker sw = new 
			 			SendWorker(sock, sid);
			 			RecvWorker rw = new 
			 			RecvWorker(sock, din, sid, sw);
			 			sw.setRecv(rw);
			 		}
			 	}
			 }
		}