1. 程式人生 > 其它 >RocketMQ-broker儲存機制-HA資料同步

RocketMQ-broker儲存機制-HA資料同步

RocketMQ-broker儲存機制-HA資料同步

HA機制解決讀寫分離模式下slave與master的資料同步問題,在master broker高負載的情況下,實現slave broker的資料訂閱。HA的主要實現邏輯在HaServer類中,入口在putMessage的handleHA()方法初。

HA分為同步複製和非同步複製,同步複製邏輯和同步刷盤機制差不多,都是同步等待通知的機制。put request之後,呼叫request的waitForFlush方法。

    public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
        
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) { HAService service = this.defaultMessageStore.getHaService(); if (messageExt.isWaitStoreMsgOK()) { // Determine whether to wait if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) { GroupCommitRequest request
= new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); service.getWaitNotifyObject().wakeupAll(); boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) { log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); } } // Slave problem else { // Tell the producer, slave not available putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE); } } } }

具體的處理邏輯在GroupTransferService中,其跟同步刷盤一樣,採用讀寫分離,通過比較slave傳過來的已經完成的offset是否 >= 與當前資料在本地commitlog的offset,來確定是否完成資料同步

        private void doWaitTransfer() {
            synchronized (this.requestsRead) {
                if (!this.requestsRead.isEmpty()) {
                    for (CommitLog.GroupCommitRequest req : this.requestsRead) {
                        boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                        /**
                         * 最多迴圈比較4次,如果發現已經同步到slave上的offset 超過了當前的需要同步資料在本地commitlog的offset的時候
                         * 表示已經成功同步
                         */
                        for (int i = 0; !transferOK && i < 5; i++) {
                            this.notifyTransferObject.waitForRunning(1000);
                            transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                        }

                        if (!transferOK) {
                            log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
                        }

                        // 通知給客戶端
                        req.wakeupCustomer(transferOK);
                    }

                    this.requestsRead.clear();
                }
            }
        }

而真正的資料同步是採用非同步方式具體實現在HaConnection中,服務端傳送commitlog中的資料,並接收到slave同步進度的ACKoffset. 而slave在HaClient中,每5s中上報同步進度或者收到master的資料並寫到commitlog中後傳送同步進度。

    public void start() throws Exception {
        // 註冊服務端監聽,繫結埠,註冊selecter選擇器和感興趣的事件選擇鍵
        this.acceptSocketService.beginAccept();
        // 啟動服務端就緒選擇,把接收到的客戶端連線通道包裝成HaConnection來處理
        /**
         * HaConnection內部實現了ReadSocketService和WriteSocketService
         * ReadSocketService 用於master接收來自slave的同步進度
         * WriteSocketService 用於傳送commitlog的同步資料
         */
        this.acceptSocketService.start();

        // 接受broker HA請求,並通知客戶端是否完成資料同步的服務
        this.groupTransferService.start();

        // haClient是slave處理master的同步資料  以及 slave上報同步進度
        this.haClient.start();
    }

先來看一下master 的acceptSocketService.start()執行的邏輯

/** {@inheritDoc} */
        @Override
        public void run() {
            log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                    this.selector.select(1000);
                    Set<SelectionKey> selected = this.selector.selectedKeys();

                    if (selected != null) {
                        for (SelectionKey k : selected) {
                            if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                                //  接收到來自slave的連線
                                SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();

                                if (sc != null) {
                                    HAService.log.info("HAService receive new connection, "
                                        + sc.socket().getRemoteSocketAddress());

                                    try {
                                        // HAConnection管理著各個slave的讀寫
                                        HAConnection conn = new HAConnection(HAService.this, sc);
                                        conn.start();
                                        HAService.this.addConnection(conn);
                                    } catch (Exception e) {
                                        log.error("new HAConnection exception", e);
                                        sc.close();
                                    }
                                }
                            } else {
                                log.warn("Unexpected ops in select " + k.readyOps());
                            }
                        }

                        selected.clear();
                    }
                } catch (Exception e) {
                    log.error(this.getServiceName() + " service has exception.", e);
                }
            }

            log.info(this.getServiceName() + " service end");
        }
    public void start() {
        // 處理slave的同步進度
        this.readSocketService.start();
        // 向slave傳送同步資料
        this.writeSocketService.start();
    }

接下來看一下slave 是如何處理的

        @Override
        public void run() {
            log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                    // 建立master連線 初始化socketChannel
                    // 同時也註冊了讀事件
                    if (this.connectMaster()) {
                        // 距離上一次slave請求master的時間是否超過5s
                        if (this.isTimeToReportOffset()) {
                            // 同步當前的offset到master
                            boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                            if (!result) {
                                this.closeMaster();
                            }
                        }
                        // 就緒選擇  堵塞
                        this.selector.select(1000);
                        // 收到master的資料
                        // 處理master的同步資料  並寫到本地commitlog
                        boolean ok = this.processReadEvent();
                        if (!ok) {
                            this.closeMaster();
                        }

                        // salve本地寫完資料  則立即向master同步進度
                        if (!reportSlaveMaxOffsetPlus()) {
                            continue;
                        }

                        long interval =
                            HAService.this.getDefaultMessageStore().getSystemClock().now()
                                - this.lastWriteTimestamp;
                        // 距離上一次收到master同步的資料超過了20s   則斷開連線
                        if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
                            .getHaHousekeepingInterval()) {
                            log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
                                + "] expired, " + interval);
                            this.closeMaster();
                            log.warn("HAClient, master not response some time, so close connection");
                        }
                    } else {
                        this.waitForRunning(1000 * 5);
                    }
                } catch (Exception e) {
                    log.warn(this.getServiceName() + " service has exception. ", e);
                    this.waitForRunning(1000 * 5);
                }
            }

            log.info(this.getServiceName() + " service end");
        }