1. 程式人生 > 其它 >【RocketMQ】主從同步實現原理

【RocketMQ】主從同步實現原理

主從同步的實現邏輯主要在HAService中,在DefaultMessageStore的建構函式中,對HAService進行了例項化,並在start方法中,啟動了HAService

public class DefaultMessageStore implements MessageStore {
    public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
        final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
        // ...
        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
            // 初始化HAService
            this.haService = new HAService(this);
        } else {
            this.haService = null;
        }
        // ...
    }

    public void start() throws Exception {
        // ...
        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
            // 啟動HAService
            this.haService.start();
            this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
        }
        // ...
    }
}

HAService的建構函式中,建立了AcceptSocketServiceGroupTransferServiceHAClient,在start方法中主要做了如下幾件事:

  1. 呼叫AcceptSocketService的beginAccept方法,這一步主要是進行埠繫結,在埠上監聽從節點的連線請求(可以看做是執行在master節點的);
  2. 呼叫AcceptSocketService的start方法啟動服務,這一步主要為了處理從節點的連線請求,與從節點建立連線(可以看做是執行在master節點的);
  3. 呼叫GroupTransferService的start方法,主要用於在主從同步的時候,等待資料傳輸完畢
    (可以看做是執行在master節點的);
  4. 呼叫HAClient的start方法啟動,裡面與master節點建立連線,向master彙報主從同步進度並存儲master傳送過來的同步資料(可以看做是執行在從節點的);
public class HAService {
     public HAService(final DefaultMessageStore defaultMessageStore) throws IOException {
        this.defaultMessageStore = defaultMessageStore;
        // 建立AcceptSocketService
        this.acceptSocketService =
            new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
        this.groupTransferService = new GroupTransferService();
        // 建立HAClient
        this.haClient = new HAClient();
    }

    public void start() throws Exception {
        // 開始監聽從伺服器的連線
        this.acceptSocketService.beginAccept();
        // 啟動服務
        this.acceptSocketService.start();
        // 啟動GroupTransferService
        this.groupTransferService.start();
        // 啟動
        this.haClient.start();
    }
}

監聽從節點連線請求

AcceptSocketServicebeginAccept方法裡面首先獲取了ServerSocketChannel,然後進行埠繫結,並在selector上面註冊了OP_ACCEPT事件的監聽,監聽從節點的連線請求:

public class HAService {
    class AcceptSocketService extends ServiceThread {
        /**
         * 監聽從節點的連線
         *
         * @throws Exception If fails.
         */
        public void beginAccept() throws Exception {
            // 建立ServerSocketChannel
            this.serverSocketChannel = ServerSocketChannel.open();
            // 獲取selector
            this.selector = RemotingUtil.openSelector();
            this.serverSocketChannel.socket().setReuseAddress(true);
            // 繫結埠
            this.serverSocketChannel.socket().bind(this.socketAddressListen);
            // 設定非阻塞
            this.serverSocketChannel.configureBlocking(false);
            // 註冊OP_ACCEPT連線事件的監聽
            this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
        }
    }
}

處理從節點連線請求

AcceptSocketService的run方法中,對監聽到的連線請求進行了處理,處理邏輯大致如下:

  1. 從selector中獲取到監聽到的事件;
  2. 如果是OP_ACCEPT連線事件,建立與從節點的連線物件HAConnection,與從節點建立連線,然後呼叫HAConnection的start方法進行啟動,並建立的HAConnection物件加入到連線集合中,HAConnection中封裝了Master節點和從節點的資料同步邏輯
public class HAService {
    class AcceptSocketService extends ServiceThread {
        @Override
        public void run() {
            log.info(this.getServiceName() + " service started");
            // 如果服務未停止
            while (!this.isStopped()) {
                try {
                    this.selector.select(1000);
                    // 獲取監聽到的事件
                    Set<SelectionKey> selected = this.selector.selectedKeys();
                    // 處理事件
                    if (selected != null) {
                        for (SelectionKey k : selected) {
                            // 如果是連線事件
                            if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                                SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
                                if (sc != null) {
                                    HAService.log.info("HAService receive new connection, "
                                        + sc.socket().getRemoteSocketAddress());
                                    try {
                                        // 建立HAConnection,建立連線
                                        HAConnection conn = new HAConnection(HAService.this, sc);
                                        // 啟動
                                        conn.start();
                                        // 新增連線
                                        HAService.this.addConnection(conn);
                                    } catch (Exception e) {
                                        log.error("new HAConnection exception", e);
                                        sc.close();
                                    }
                                }
                            } else {
                                log.warn("Unexpected ops in select " + k.readyOps());
                            }
                        }
                        selected.clear();
                    }
                } catch (Exception e) {
                    log.error(this.getServiceName() + " service has exception.", e);
                }
            }
            log.info(this.getServiceName() + " service end");
        }
    }
}

等待主從複製傳輸結束

GroupTransferService的run方法主要是為了在進行主從資料同步的時候,等待從節點資料同步完畢。

在執行時首先進會呼叫waitForRunning進行等待,因為此時可能還有沒有開始主從同步,所以先進行等待,之後如果有同步請求,會喚醒該執行緒,然後呼叫doWaitTransfer方法等待資料同步完成:

public class HAService {
    class GroupTransferService extends ServiceThread {

         public void run() {
            log.info(this.getServiceName() + " service started");
            // 如果服務未停止
            while (!this.isStopped()) {
                try {
                    // 等待執行
                    this.waitForRunning(10);
                    // 如果被喚醒,呼叫doWaitTransfer等待主從同步完成
                    this.doWaitTransfer();
                } catch (Exception e) {
                    log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }

            log.info(this.getServiceName() + " service end");
        }
    }
}

在看doWaitTransfer方法之前,首先看下是如何判斷有資料需要同步的。

Master節點中,當訊息被寫入到CommitLog以後,會呼叫submitReplicaRequest方法處主從同步,首先判斷當前Broker的角色是否是SYNC_MASTER,如果是則會構建訊息提交請求GroupCommitRequest,然後呼叫HAServiceputRequest新增到請求集合中,並喚醒GroupTransferService中在等待的執行緒:

public class CommitLog {
    public CompletableFuture<PutMessageStatus> submitReplicaRequest(AppendMessageResult result, MessageExt messageExt) {
        if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
            HAService service = this.defaultMessageStore.getHaService();
            if (messageExt.isWaitStoreMsgOK()) {
                if (service.isSlaveOK(result.getWroteBytes() + result.getWroteOffset())) {
                    // 構建GroupCommitRequest
                    GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
                            this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout());
                    // 新增請求
                    service.putRequest(request);
                    // 喚醒GroupTransferService中在等待的執行緒
                    service.getWaitNotifyObject().wakeupAll();
                    return request.future();
                }
                else {
                    return CompletableFuture.completedFuture(PutMessageStatus.SLAVE_NOT_AVAILABLE);
                }
            }
        }
        return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
    }
}

doWaitTransfer方法中,會判斷CommitLog提交請求集合requestsRead是否為空,如果不為空,表示有訊息寫入了CommitLog,Master節點需要等待將資料傳輸給從節點:

  1. push2SlaveMaxOffset記錄了從節點已經同步的訊息偏移量,判斷push2SlaveMaxOffset是否大於本次CommitLog提交的偏移量,也就是請求中設定的偏移量;
  2. 獲取請求中設定的等待截止時間;
  3. 開啟迴圈,判斷資料是否還未傳輸完畢,並且未超過截止時間,如果是則等待1s,然後繼續判斷傳輸是否完畢,不斷進行,直到超過截止時間或者資料已經傳輸完畢;
    (向從節點發送的訊息最大偏移量push2SlaveMaxOffset超過了請求中設定的偏移量表示本次同步資料傳輸完畢);
  4. 喚醒在等待資料同步完畢的執行緒;
public class HAService {
    // CommitLog提交請求集合
    private volatile LinkedList<CommitLog.GroupCommitRequest> requestsRead = new LinkedList<>();

    class GroupTransferService extends ServiceThread {

        private void doWaitTransfer() {
            // 如果CommitLog提交請求集合不為空
            if (!this.requestsRead.isEmpty()) {
                // 處理訊息提交請求
                for (CommitLog.GroupCommitRequest req : this.requestsRead) {
                    // 判斷傳輸到從節點最大偏移量是否超過了請求中設定的偏移量
                    boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                    // 獲取截止時間
                    long deadLine = req.getDeadLine();
                    // 如果從節點還未同步完畢並且未超過截止時間
                    while (!transferOK && deadLine - System.nanoTime() > 0) {
                        // 等待
                        this.notifyTransferObject.waitForRunning(1000);
                        // 判斷從節點同步的最大偏移量是否超過了請求中設定的偏移量
                        transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                    }
                    // 喚醒
                    req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                }

                this.requestsRead = new LinkedList<>();
            }
        }
    }
}

啟動HAClient

HAClient可以看做是在從節點上執行的,主要進行的處理如下:

  1. 呼叫connectMaster方法連線Master節點,Master節點上也會執行,但是它本身就是Master沒有可連的Master節點,所以可以忽略;
  2. 呼叫isTimeToReportOffset方法判斷是否需要向Master節點彙報同步偏移量,如果需要則呼叫reportSlaveMaxOffset方法將當前的訊息同步偏移量傳送給Master節點;
  3. 呼叫processReadEvent處理網路請求中的可讀事件,也就是處理Master傳送過來的訊息,將訊息存入CommitLog
public class HAService {
    class HAClient extends ServiceThread {
   
        @Override
        public void run() {
            log.info(this.getServiceName() + " service started");
            while (!this.isStopped()) {
                try {
                    // 連線Master節點
                    if (this.connectMaster()) {
                        // 是否需要報告訊息同步偏移量
                        if (this.isTimeToReportOffset()) {
                            // 向Master節點發送同步偏移量
                            boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                            if (!result) {
                                this.closeMaster();
                            }
                        }
                        this.selector.select(1000);
                        // 處理讀事件,也就是Master節點發送的資料
                        boolean ok = this.processReadEvent();
                        if (!ok) {
                            this.closeMaster();
                        }
                        // ...
                    } else {
                        this.waitForRunning(1000 * 5);
                    }
                } catch (Exception e) {
                    log.warn(this.getServiceName() + " service has exception. ", e);
                    this.waitForRunning(1000 * 5);
                }
            }

            log.info(this.getServiceName() + " service end");
        }
    }
}

連線主節點

connectMaster方法中會獲取Master節點的地址,並轉換為SocketAddress物件,然後向Master節點請求建立連線,並在selector註冊OP_READ可讀事件監聽:

public class HAService {
    class HAClient extends ServiceThread {
    // 當前的主從複製進度
    private long currentReportedOffset = 0;

    private boolean connectMaster() throws ClosedChannelException {
        if (null == socketChannel) {
            String addr = this.masterAddress.get();
            if (addr != null) {
                // 將地址轉為SocketAddress
                SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
                if (socketAddress != null) {
                    // 連線master
                    this.socketChannel = RemotingUtil.connect(socketAddress);
                    if (this.socketChannel != null) {
                        // 註冊OP_READ可讀事件監聽
                        this.socketChannel.register(this.selector, SelectionKey.OP_READ);
                    }
                }
            }
            // 獲取CommitLog中當前最大的偏移量
            this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
            // 更新上次寫入時間
            this.lastWriteTimestamp = System.currentTimeMillis();
        }
        return this.socketChannel != null;
    }
}

傳送主從同步訊息拉取偏移量

isTimeToReportOffset方法中,首先獲取當前時間與上一次進行主從同步的時間間隔interval,如果時間間隔interval大於配置的傳送心跳時間間隔,表示需要向Master節點發送從節點訊息同步的偏移量,接下來會呼叫reportSlaveMaxOffset方法傳送同步偏移量,也就是說從節點會定時向Master節點發送請求,反饋CommitLog中同步訊息的偏移量

public class HAService {
    class HAClient extends ServiceThread {
       // 當前從節點已經同步訊息的偏移量大小
       private long currentReportedOffset = 0;

       private boolean isTimeToReportOffset() {
            // 獲取距離上一次主從同步的間隔時間
            long interval =
                HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;
            // 判斷是否超過了配置的傳送心跳包時間間隔
            boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig()
                .getHaSendHeartbeatInterval();

            return needHeart;
        }

        // 傳送同步偏移量,傳入的引數是當前的主從複製偏移量currentReportedOffset
        private boolean reportSlaveMaxOffset(final long maxOffset) {
            this.reportOffset.position(0);
            this.reportOffset.limit(8); // 設定資料傳輸大小為8個位元組
            this.reportOffset.putLong(maxOffset);// 設定同步偏移量
            this.reportOffset.position(0);
            this.reportOffset.limit(8);

            for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
                try {
                    // 向Master節點發送拉取偏移量
                    this.socketChannel.write(this.reportOffset);
                } catch (IOException e) {
                    log.error(this.getServiceName()
                        + "reportSlaveMaxOffset this.socketChannel.write exception", e);
                    return false;
                }
            }
            // 更新發送時間
            lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
            return !this.reportOffset.hasRemaining();
        }
    }
}

處理網路可讀事件

processReadEvent方法中處理了可讀事件,也就是處理Master節點發送的同步資料, 首先從socketChannel中讀取資料到byteBufferRead中,byteBufferRead是讀緩衝區,讀取資料的方法會返回讀取到的位元組數,對位元組數大小進行判斷:

  • 如果可讀位元組數大於0表示有資料需要處理,呼叫dispatchReadRequest方法進行處理;
  • 如果可讀位元組數為0表示沒有可讀資料,此時記錄讀取到空資料的次數,如果連續讀到空資料的次數大於3次,將終止本次處理;
  class HAClient extends ServiceThread {
        // 讀緩衝區,會將從socketChannel讀入緩衝區
        private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);

        private boolean processReadEvent() {
            int readSizeZeroTimes = 0;
            while (this.byteBufferRead.hasRemaining()) {
                try {
                    // 從socketChannel中讀取資料到byteBufferRead中,返回讀取到的位元組數
                    int readSize = this.socketChannel.read(this.byteBufferRead);
                    if (readSize > 0) {
                        // 重置readSizeZeroTimes
                        readSizeZeroTimes = 0;
                        // 處理資料
                        boolean result = this.dispatchReadRequest();
                        if (!result) {
                            log.error("HAClient, dispatchReadRequest error");
                            return false;
                        }
                    } else if (readSize == 0) {
                        // 記錄讀取到空資料的次數
                        if (++readSizeZeroTimes >= 3) {
                            break;
                        }
                    } else {
                        log.info("HAClient, processReadEvent read socket < 0");
                        return false;
                    }
                } catch (IOException e) {
                    log.info("HAClient, processReadEvent read socket exception", e);
                    return false;
                }
            }

            return true;
        }
  }
訊息寫入ComitLog

dispatchReadRequest方法中會將從節點讀取到的資料寫入CommitLog,dispatchPosition記錄了已經處理的資料在讀緩衝區中的位置,從讀緩衝區byteBufferRead獲取剩餘可讀取的位元組數,如果可讀資料的位元組數大於一個訊息頭的位元組數(12個位元組),表示有資料還未處理完畢,反之表示訊息已經處理完畢結束處理。
對資料的處理邏輯如下:

  1. 從緩衝區中讀取資料,首先獲取到的是訊息在master節點的物理偏移量masterPhyOffset;
  2. 向後讀取8個位元組,得到訊息體內容的位元組數bodySize;
  3. 獲取從節點當前CommitLog的最大物理偏移量slavePhyOffset,如果不為0並且不等於masterPhyOffset,表示與Master節點的傳輸偏移量不一致,也就是資料不一致,此時終止處理
  4. 如果可讀取的位元組數大於一個訊息頭的位元組數 + 訊息體大小,表示有訊息可處理,繼續進行下一步;
  5. 計算訊息體在讀緩衝區中的起始位置,從讀緩衝區中根據起始位置,讀取訊息內容,將訊息追加到從節點的CommitLog中
  6. 更新dispatchPosition的值為訊息頭大小 + 訊息體大小,dispatchPosition之前的資料表示已經處理完畢;
    class HAClient extends ServiceThread {
        // 已經處理的資料在讀緩衝區中的位置,初始化為0
        private int dispatchPosition = 0;
        // 讀緩衝區
        private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);

        private boolean dispatchReadRequest() {
            // 訊息頭大小
            final int msgHeaderSize = 8 + 4; // phyoffset + size
            // 開啟迴圈不斷讀取資料
            while (true) {
                // 獲可讀取的位元組數
                int diff = this.byteBufferRead.position() - this.dispatchPosition;
                // 如果位元組數大於一個訊息頭的位元組數
                if (diff >= msgHeaderSize) {
                    // 獲取訊息在master節點的物理偏移量
                    long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
                    // 獲取訊息體大小
                    int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
                    // 獲取從節點當前CommitLog的最大物理偏移量
                    long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
                    if (slavePhyOffset != 0) {
                        // 如果不一致結束處理
                        if (slavePhyOffset != masterPhyOffset) {
                            log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
                                + slavePhyOffset + " MASTER: " + masterPhyOffset);
                            return false;
                        }
                    }
                    // 如果可讀取的位元組數大於一個訊息頭的位元組數 + 訊息體大小
                    if (diff >= (msgHeaderSize + bodySize)) {
                        // 將度緩衝區的資料轉為位元組陣列
                        byte[] bodyData = byteBufferRead.array();
                        // 計算訊息體在讀緩衝區中的起始位置
                        int dataStart = this.dispatchPosition + msgHeaderSize;
                        // 從讀緩衝區中根據訊息的位置,讀取訊息內容,將訊息追加到從節點的CommitLog中
                        HAService.this.defaultMessageStore.appendToCommitLog(
                                masterPhyOffset, bodyData, dataStart, bodySize);
                        // 更新dispatchPosition的值為訊息頭大小+訊息體大小
                        this.dispatchPosition += msgHeaderSize + bodySize;
                        if (!reportSlaveMaxOffsetPlus()) {
                            return false;
                        }
                        continue;
                    }
                }
                if (!this.byteBufferRead.hasRemaining()) {
                    this.reallocateByteBuffer();
                }

                break;
            }

            return true;
        }
    }

HAConnection

HAConnection中封裝了Master節點與從節點的網路通訊處理,分別在ReadSocketServiceWriteSocketService中。

ReadSocketService

ReadSocketService啟動後處理監聽到的可讀事件,前面知道HAClient中從節點會定時向Master節點彙報從節點的訊息同步偏移量,Master節點對彙報請求的處理就在這裡,如果從網路中監聽到了可讀事件,會呼叫processReadEvent處理讀事件:

public class HAConnection {
     class ReadSocketService extends ServiceThread {
        @Override
        public void run() {
            HAConnection.log.info(this.getServiceName() + " service started");
            while (!this.isStopped()) {
                try {
                    this.selector.select(1000);
                    // 處理可讀事件
                    boolean ok = this.processReadEvent();
                    if (!ok) {
                        HAConnection.log.error("processReadEvent error");
                        break;
                    }
                    // ...
                } catch (Exception e) {
                    HAConnection.log.error(this.getServiceName() + " service has exception.", e);
                    break;
                }
            }
            // ...
            HAConnection.log.info(this.getServiceName() + " service end");
        }
     }
}

處理可讀事件

processReadEvent中從網路中處理讀事件的方式與上面HAClientdispatchReadRequest類似,都是將網路中的資料讀取到讀緩衝區中,並用一個變數記錄已讀取資料的位置,processReadEvent方法的處理邏輯如下:

  1. 從socketChannel讀取資料到讀緩衝區byteBufferRead中,返回讀取到的位元組數;
  2. 如果讀取到的位元組數大於0,進入下一步,如果讀取到的位元組數為0,記錄連續讀取到空位元組數的次數是否超過三次,如果超過終止處理;
  3. 判斷剩餘可讀取的位元組數是否大於等於8,前面知道,從節點發送同步訊息拉取偏移量的時候設定的位元組大小為8,所以位元組數大於等於8的時候表示需要讀取從節點發送的偏移量;
  4. 計算資料在緩衝區中的位置,從緩衝區讀取從節點發送的同步偏移量readOffset;
  5. 更新processPosition的值,processPosition表示讀緩衝區中已經處理資料的位置;
  6. 更新slaveAckOffset為從節點發送的同步偏移量readOffset的值;
  7. 如果當前Master節點記錄的從節點的同步偏移量slaveRequestOffset小於0,表示還未進行同步,此時將slaveRequestOffset更新為從節點發送的同步偏移量;
  8. 如果從節點發送的同步偏移量比當前Master節點的最大物理偏移量還要大,終止本次處理;
  9. 呼叫notifyTransferSome,更新Master節點記錄的向從節點同步訊息的偏移量;
public class HAConnection {

     class ReadSocketService extends ServiceThread {
     // 讀緩衝區    
     private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
     // 讀緩衝區中已經處理的資料位置
     private int processPosition = 0;

     private boolean processReadEvent() {
            int readSizeZeroTimes = 0;
            // 如果沒有可讀資料
            if (!this.byteBufferRead.hasRemaining()) {
                this.byteBufferRead.flip();
                // 處理位置置為0
                this.processPosition = 0;
            }
            // 如果資料未讀取完畢
            while (this.byteBufferRead.hasRemaining()) {
                try {
                    // 從socketChannel讀取資料到byteBufferRead中,返回讀取到的位元組數
                    int readSize = this.socketChannel.read(this.byteBufferRead);
                    // 如果讀取資料位元組數大於0
                    if (readSize > 0) {
                        // 重置readSizeZeroTimes
                        readSizeZeroTimes = 0;
                        // 獲取上次處理讀事件的時間戳
                        this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                        // 判斷剩餘可讀取的位元組數是否大於等於8
                        if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
                            // 獲取偏移量內容的結束位置
                            int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
                            // 從結束位置向前讀取8個位元組得到從點發送的同步偏移量
                            long readOffset = this.byteBufferRead.getLong(pos - 8);
                            // 更新處理位置
                            this.processPosition = pos;
                            // 更新slaveAckOffset為從節點發送的同步進度
                            HAConnection.this.slaveAckOffset = readOffset;
                            // 如果記錄的從節點的同步進度小於0,表示還未進行同步
                            if (HAConnection.this.slaveRequestOffset < 0) {
                                // 更新為從節點發送的同步進度
                                HAConnection.this.slaveRequestOffset = readOffset;
                                log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
                            } else if (HAConnection.this.slaveAckOffset > HAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset()) {
                                // 如果從節點發送的拉取偏移量比當前Master節點的最大物理偏移量還要大
                                log.warn("slave[{}] request offset={} greater than local commitLog offset={}. ",
                                        HAConnection.this.clientAddr,
                                        HAConnection.this.slaveAckOffset,
                                        HAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset());
                                return false;
                            }
                            // 更新Master節點記錄的向從節點同步訊息的偏移量
                            HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
                        }
                    } else if (readSize == 0) 
                        // 判斷連續讀取到空資料的次數是否超過三次
                        if (++readSizeZeroTimes >= 3) {
                            break;
                        }
                    } else {
                        log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
                        return false;
                    }
                } catch (IOException e) {
                    log.error("processReadEvent exception", e);
                    return false;
                }
            }

            return true;
        }
    }
}

前面在GroupTransferService中可以看到是通過push2SlaveMaxOffset的值判斷本次同步是否完成的,在notifyTransferSome方法中可以看到當Master節點收到從節點反饋的訊息拉取偏移量時,對push2SlaveMaxOffset的值進行了更新:

public class HAService {
    // 向從節點推送的訊息最大偏移量
    private final GroupTransferService groupTransferService;

    public void notifyTransferSome(final long offset) {
        // 如果傳入的偏移大於push2SlaveMaxOffset記錄的值,進行更新
        for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) {
            // 更新向從節點推送的訊息最大偏移量
            boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
            if (ok) {
                this.groupTransferService.notifyTransferSome();
                break;
            } else {
                value = this.push2SlaveMaxOffset.get();
            }
        }
    }
}

WriteSocketService

WriteSocketService用於Master節點向從節點發送同步訊息,處理邏輯如下:

  1. 根據從節點發送的主從同步訊息拉取偏移量slaveRequestOffset進行判斷:

    • 如果slaveRequestOffset值為-1,表示還未收到從節點報告的同步偏移量,此時睡眠一段時間等待從節點發送訊息拉取偏移量;
    • 如果slaveRequestOffset值不為-1,表示已經開始進行主從同步進行下一步;
  2. 判斷nextTransferFromWhere值是否為-1,nextTransferFromWhere記錄了下次需要傳輸的訊息在CommitLog中的偏移量,如果值為-1表示初次進行資料同步,此時有兩種情況:

    • 如果從節點發送的拉取偏移量slaveRequestOffset為0,就從當前CommitLog檔案最大偏移量開始同步;
    • 如果slaveRequestOffset不為0,則從slaveRequestOffset位置處進行資料同步;
  3. 判斷上次寫事件是否已經將資料都寫入到從節點

    • 如果已經寫入完畢,判斷距離上次寫入資料的時間間隔是否超過了設定的心跳時間,如果超過,為了避免連線空閒被關閉,需要傳送一個心跳包,此時構建心跳包的請求資料,呼叫transferData方法傳輸資料;
    • 如果上次的資料還未傳輸完畢,呼叫transferData方法繼續傳輸,如果還是未完成,則結束此處處理;
  4. 根據nextTransferFromWhere從CommitLog中獲取訊息,如果未獲取到訊息,等待100ms,如果獲取到訊息,從CommitLog中獲取訊息進行傳輸:
    (1)如果獲取到訊息的位元組數大於最大傳輸的大小,設定最最大傳輸數量,分批進行傳輸;
    (2)更新下次傳輸的偏移量地址也就是nextTransferFromWhere的值;
    (3)從CommitLog中獲取的訊息內容設定到將讀取到的訊息資料設定到selectMappedBufferResult中;
    (4)設定訊息頭資訊,包括訊息頭位元組數、拉取訊息的偏移量等;
    (5)呼叫transferData傳送資料;

public class HAConnection {
    class WriteSocketService extends ServiceThread {
        private final int headerSize = 8 + 4;// 訊息頭大小
        @Override
        public void run() {
            HAConnection.log.info(this.getServiceName() + " service started");
            while (!this.isStopped()) {
                try {
                    this.selector.select(1000);
                    // 如果slaveRequestOffset為-1,表示還未收到從節點報告的拉取進度
                    if (-1 == HAConnection.this.slaveRequestOffset) {
                        // 等待一段時間
                        Thread.sleep(10);
                        continue;
                    }
                    // 初次進行資料同步
                    if (-1 == this.nextTransferFromWhere) {
                        // 如果拉取進度為0
                        if (0 == HAConnection.this.slaveRequestOffset) {
                            // 從master節點最大偏移量從開始傳輸
                            long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
                            masterOffset =
                                masterOffset
                                    - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                                    .getMappedFileSizeCommitLog());

                            if (masterOffset < 0) {
                                masterOffset = 0;
                            }
                            // 更新nextTransferFromWhere
                            this.nextTransferFromWhere = masterOffset;
                        } else {
                            // 根據從節點發送的偏移量開始資料同步
                            this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
                        }

                        log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
                            + "], and slave request " + HAConnection.this.slaveRequestOffset);
                    }
                    // 判斷上次傳輸是否完畢
                    if (this.lastWriteOver) {
                        // 獲取當前時間距離上次寫入資料的時間間隔
                        long interval =
                            HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
                        // 如果距離上次寫入資料的時間間隔超過了設定的心跳時間
                        if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                            .getHaSendHeartbeatInterval()) {
                            // 構建header
                            this.byteBufferHeader.position(0);
                            this.byteBufferHeader.limit(headerSize);
                            this.byteBufferHeader.putLong(this.nextTransferFromWhere);
                            this.byteBufferHeader.putInt(0);
                            this.byteBufferHeader.flip();
                            // 傳送心跳包
                            this.lastWriteOver = this.transferData();
                            if (!this.lastWriteOver)
                                continue;
                        }
                    } else {
                        // 未傳輸完畢,繼續上次的傳輸
                        this.lastWriteOver = this.transferData();
                        // 如果依舊未完成,結束本次處理
                        if (!this.lastWriteOver)
                            continue;
                    }
                    // 根據偏移量獲取訊息資料
                    SelectMappedBufferResult selectResult =
                        HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
                    if (selectResult != null) {// 獲取訊息不為空
                        // 獲取訊息內容大小
                        int size = selectResult.getSize();
                        // 如果訊息的位元組數大於最大傳輸的大小
                        if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
                            // 設定為最大傳輸大小
                            size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
                        }

                        long thisOffset = this.nextTransferFromWhere;
                        // 更新下次傳輸的偏移量地址
                        this.nextTransferFromWhere += size;

                        selectResult.getByteBuffer().limit(size);
                        // 將讀取到的訊息資料設定到selectMappedBufferResult
                        this.selectMappedBufferResult = selectResult;

                        // 設定訊息頭
                        this.byteBufferHeader.position(0);
                        // 設定訊息頭大小
                        this.byteBufferHeader.limit(headerSize);
                        // 設定偏移量地址
                        this.byteBufferHeader.putLong(thisOffset);
                        // 設定訊息內容大小
                        this.byteBufferHeader.putInt(size);
                        this.byteBufferHeader.flip();
                        // 傳送資料
                        this.lastWriteOver = this.transferData();
                    } else {
                        // 等待100ms
                        HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
                    }
                } catch (Exception e) {

                    HAConnection.log.error(this.getServiceName() + " service has exception.", e);
                    break;
                }
            }

            HAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();

            // ...
            HAConnection.log.info(this.getServiceName() + " service end");
        }
    }
}

傳送資料

transferData方法的處理邏輯如下:

  1. 傳送訊息頭資料;
  2. 訊息頭資料傳送完畢之後,傳送訊息內容,前面知道從CommitLog中讀取的訊息內容放入到了selectMappedBufferResult,將selectMappedBufferResult的內容傳送給從節點;
public class HAConnection {
    class WriteSocketService extends ServiceThread {
        private boolean transferData() throws Exception {
            int writeSizeZeroTimes = 0;
            // 寫入訊息頭
            while (this.byteBufferHeader.hasRemaining()) {
                // 傳送訊息頭資料
                int writeSize = this.socketChannel.write(this.byteBufferHeader);
                if (writeSize > 0) {
                    writeSizeZeroTimes = 0;
                    // 記錄傳送時間
                    this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                } else if (writeSize == 0) {
                    if (++writeSizeZeroTimes >= 3) {
                        break;
                    }
                } else {
                    throw new Exception("ha master write header error < 0");
                }
            }

            if (null == this.selectMappedBufferResult) {
                return !this.byteBufferHeader.hasRemaining();
            }

            writeSizeZeroTimes = 0;

            // 訊息頭資料傳送完畢之後,傳送訊息內容
            if (!this.byteBufferHeader.hasRemaining()) {
                while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
                    // 傳送訊息內容
                    int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
                    if (writeSize > 0) {
                        writeSizeZeroTimes = 0;
                        this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                    } else if (writeSize == 0) {
                        if (++writeSizeZeroTimes >= 3) {
                            break;
                        }
                    } else {
                        throw new Exception("ha master write body error < 0");
                    }
                }
            }
            // ...
            return result;
        }
    }
}

總結

主從同步流程

有新訊息寫入之後的同步流程

參考
丁威、周繼鋒《RocketMQ技術內幕》

RocketMQ版本:4.9.3