RocketMQ-broker儲存機制-HA資料同步
阿新 • • 發佈:2021-08-10
RocketMQ-broker儲存機制-HA資料同步
HA機制解決讀寫分離模式下slave與master的資料同步問題,在master broker高負載的情況下,實現slave broker的資料訂閱。HA的主要實現邏輯在HaServer類中,入口在putMessage的handleHA()方法初。
HA分為同步複製和非同步複製,同步複製邏輯和同步刷盤機制差不多,都是同步等待通知的機制。put request之後,呼叫request的waitForFlush方法。
public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) { HAService service = this.defaultMessageStore.getHaService(); if (messageExt.isWaitStoreMsgOK()) { // Determine whether to wait if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) { GroupCommitRequest request= new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); service.getWaitNotifyObject().wakeupAll(); boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());if (!flushOK) { log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); } } // Slave problem else { // Tell the producer, slave not available putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE); } } } }
具體的處理邏輯在GroupTransferService中,其跟同步刷盤一樣,採用讀寫分離,通過比較slave傳過來的已經完成的offset是否 >= 與當前資料在本地commitlog的offset,來確定是否完成資料同步
private void doWaitTransfer() { synchronized (this.requestsRead) { if (!this.requestsRead.isEmpty()) { for (CommitLog.GroupCommitRequest req : this.requestsRead) { boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); /** * 最多迴圈比較4次,如果發現已經同步到slave上的offset 超過了當前的需要同步資料在本地commitlog的offset的時候 * 表示已經成功同步 */ for (int i = 0; !transferOK && i < 5; i++) { this.notifyTransferObject.waitForRunning(1000); transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); } if (!transferOK) { log.warn("transfer messsage to slave timeout, " + req.getNextOffset()); } // 通知給客戶端 req.wakeupCustomer(transferOK); } this.requestsRead.clear(); } } }
而真正的資料同步是採用非同步方式具體實現在HaConnection中,服務端傳送commitlog中的資料,並接收到slave同步進度的ACKoffset. 而slave在HaClient中,每5s中上報同步進度或者收到master的資料並寫到commitlog中後傳送同步進度。
public void start() throws Exception { // 註冊服務端監聽,繫結埠,註冊selecter選擇器和感興趣的事件選擇鍵 this.acceptSocketService.beginAccept(); // 啟動服務端就緒選擇,把接收到的客戶端連線通道包裝成HaConnection來處理 /** * HaConnection內部實現了ReadSocketService和WriteSocketService * ReadSocketService 用於master接收來自slave的同步進度 * WriteSocketService 用於傳送commitlog的同步資料 */ this.acceptSocketService.start(); // 接受broker HA請求,並通知客戶端是否完成資料同步的服務 this.groupTransferService.start(); // haClient是slave處理master的同步資料 以及 slave上報同步進度 this.haClient.start(); }
先來看一下master 的acceptSocketService.start()執行的邏輯
/** {@inheritDoc} */ @Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { this.selector.select(1000); Set<SelectionKey> selected = this.selector.selectedKeys(); if (selected != null) { for (SelectionKey k : selected) { if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) { // 接收到來自slave的連線 SocketChannel sc = ((ServerSocketChannel) k.channel()).accept(); if (sc != null) { HAService.log.info("HAService receive new connection, " + sc.socket().getRemoteSocketAddress()); try { // HAConnection管理著各個slave的讀寫 HAConnection conn = new HAConnection(HAService.this, sc); conn.start(); HAService.this.addConnection(conn); } catch (Exception e) { log.error("new HAConnection exception", e); sc.close(); } } } else { log.warn("Unexpected ops in select " + k.readyOps()); } } selected.clear(); } } catch (Exception e) { log.error(this.getServiceName() + " service has exception.", e); } } log.info(this.getServiceName() + " service end"); }
public void start() { // 處理slave的同步進度 this.readSocketService.start(); // 向slave傳送同步資料 this.writeSocketService.start(); }
接下來看一下slave 是如何處理的
@Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { // 建立master連線 初始化socketChannel // 同時也註冊了讀事件 if (this.connectMaster()) { // 距離上一次slave請求master的時間是否超過5s if (this.isTimeToReportOffset()) { // 同步當前的offset到master boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset); if (!result) { this.closeMaster(); } } // 就緒選擇 堵塞 this.selector.select(1000); // 收到master的資料 // 處理master的同步資料 並寫到本地commitlog boolean ok = this.processReadEvent(); if (!ok) { this.closeMaster(); } // salve本地寫完資料 則立即向master同步進度 if (!reportSlaveMaxOffsetPlus()) { continue; } long interval = HAService.this.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp; // 距離上一次收到master同步的資料超過了20s 則斷開連線 if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig() .getHaHousekeepingInterval()) { log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress + "] expired, " + interval); this.closeMaster(); log.warn("HAClient, master not response some time, so close connection"); } } else { this.waitForRunning(1000 * 5); } } catch (Exception e) { log.warn(this.getServiceName() + " service has exception. ", e); this.waitForRunning(1000 * 5); } } log.info(this.getServiceName() + " service end"); }