1. 程式人生 > 其它 >RocketMQ原始碼詳解 | Broker篇 · 其五:高可用之主從架構

RocketMQ原始碼詳解 | Broker篇 · 其五:高可用之主從架構

概述

對於一個訊息中介軟體來講,高可用功能是極其重要的,RocketMQ 當然也具有其對應的高可用方案。

在 RocketMQ 中,有主從架構和 Dledger 兩種高可用方案:

第一種通過主 Broker 將訊息傳送到從 Broker 實現高可用,在主 Broker IO 壓力大或宕機的時候,從 Broker 可以接管讀請求,但這種方案不支援在主 Broker 宕機後自動進行故障轉移,且從 Broker 不支援寫請求,也就是說在主 Broker 宕機後我們只能手動處理。

第二種是在 RocketMQ 4.5.X 的時候才加入的新的方案,其為基於 Raft 演算法實現的一個高可用方案,支援叢集自動選主與故障轉移,但 TPS 低於第一種方案。

本文主要介紹前者的實現



HAService

RocketMQ 的主從高可用的實現的程式碼量比較少,大概就一兩千行,其主要在 HAService 類和 HAConnection 類。


HAService 有三個內部類:

  • AcceptSocketService

    用來監聽 HAClient 的連線請求,接收請求後將建立好的 channel 包裝成 HAConnection 儲存起來

  • GroupTransferService

    用以監聽與處理分發請求,當外部發送了非同步的分發請求後,該類中的執行緒將同步的處理該請求,並將其執行結果交給 Future 以執行回撥函式

  • HAClient

    高可用客戶端,在從 Broker 上啟動,用以從主 Broker 拉取訊息


HAService 主要是對上面三個類的包裝,通過控制它們來對外提供服務。


AcceptSocketService

首先建立了一個註冊了 OP_ACCEPT 事件的 selector ,用以監聽繫結在 HA 服務埠上的 ServerSocketChannel(也就是一個標準的 NIO 伺服器)

public void beginAccept() throws Exception {
  this.serverSocketChannel = ServerSocketChannel.open();
  this.selector = RemotingUtil.openSelector();
  this.serverSocketChannel.socket().setReuseAddress(true);
  this.serverSocketChannel.socket().bind(this.socketAddressListen);
  this.serverSocketChannel.configureBlocking(false);
  // 監聽 accept 事件
  this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}

不過需要注意 RemotingUtil.openSelector() 方法,這裡如果在 Linux 平臺上,會使用 Epoll 來做多路複用的 selector

public static Selector openSelector() throws IOException {
  Selector result = null;

  // 如果在 linux 平臺, 則使用 Epoll 作為多路複用的 selector
  if (isLinuxPlatform()) {
    try {
      final Class<?> providerClazz = Class.forName("sun.nio.ch.EPollSelectorProvider");
      if (providerClazz != null) {
        // pass: 這裡通過反射呼叫 provider 方法獲取 SelectorProvider 以建立 epoll 的 selector
      }
    } catch (final Exception e) {
      // ignore
    }
  }

  // 否則如果在其他平臺上使用 nio 預設的實現
  if (result == null) {
    result = Selector.open();
  }

  return result;
}

在建立完成後,就開始通過使用 selector 監聽 accpet 事件發生,然後進行以下處理

SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();

if (sc != null) {
  try {
    // 在建立 socket 後, 將該建立好的 channel 包裝成 HAConnection 類
    // 放入主類進行管理
    HAConnection conn = new HAConnection(HAService.this, sc);
    conn.start();
    HAService.this.addConnection(conn);
  } catch (Exception e) {
    log.error("new HAConnection exception", e);
    sc.close();
  }
}

在建立了 HAConnection 並啟動後,這個服務就能自動的從儲存服務中拉取已經持久化的訊息 (準確來講,是否已經持久化取決於使用的刷盤方案) ,併發送給該 Channel 對應的從 Broker ,且響應從 Broker 傳送過來的請求。


GroupTransferService

如同簡介介紹的,這個服務主要用來處理上層發過來的分發請求

public void putRequest(final CommitLog.GroupCommitRequest request) {
  lock.lock();
  try {
    // 新增到等待寫佇列
    this.requestsWrite.add(request);
  } finally {
    lock.unlock();
  }
  this.wakeup();
}

其中對於請求的存放具有兩個佇列,分別為 requestsWriterequestsRead ,這種設計方式也是一種比較常見的"無鎖程式設計"的方式,即在遍歷一個"讀佇列"裡的請求時,使用另外一個佇列來接受新到來的請求,而只需要在兩個變數交換佇列時與"寫佇列"寫入時加鎖即可。避免了在每次寫入與讀取都需要加鎖,或直接使用一個阻塞佇列所帶來的消耗。


然後線上程中,會不斷的對"讀佇列"中的請求進行處理

for (CommitLog.GroupCommitRequest req : this.requestsRead) {
  // 當 HAService 的已同步的進度超過了該請求要求的進度時
  // 視為已提交
  boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();

  // 設定等待的超時時間
  long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now()
    + HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout();

  // 等待直到主從進度到達要求的位置或超過指定時間未到達
  while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {
    this.notifyTransferObject.waitForRunning(1000);
    transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
  }

  // 由於在 broker 中持久化是序列的, 所以下一個請求要求到達的偏移量
  // 一定大於當前請求, 因此我們的處理也能序列化
  if (!transferOK) {
    log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
  }

  // complete 執行結果
  req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}

可以看出,這個類其實也不會做實質上的備份到從 Broker 的操作,而是進行對已經同步到 salve 進度進行監控,等待分發請求需要到達的偏移量到達,或者超時時,將執行結果交給非同步請求。


HAClient

最後是高可用客戶端的設計,這個客戶端會被從 Broker 用來從主 Broker 拉取訊息。

先來看它的成員屬性

private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
// 主 Broker 的地址
private final AtomicReference<String> masterAddress = new AtomicReference<>();
// 向主 Broker 發起拉取的起始 offset
private final ByteBuffer reportOffset = ByteBuffer.allocate(8);

private SocketChannel socketChannel;
private Selector selector;

// 上一次寫入訊息的時間戳
private long lastWriteTimestamp = System.currentTimeMillis();
// 本從 Broker 的當前複製進度
private long currentReportedOffset = 0;
// 讀緩衝區中已經處理到的位置的指標
private int dispatchPosition = 0;
// 讀緩衝區大小 (4MB)
private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);

byteBufferReadbyteBufferBackup 你想到了什麼?嗯沒錯,上文提到的無鎖程式設計中的讀寫分離佇列


這個客戶端類大致的執行流程如下:

while (!this.isStopped()) {
  try {
    // 如果還沒有建立連線,則嘗試連線到主 Broker
    if (this.connectMaster()) {

      // 檢查當前時間是否需要上報偏移量
      // 同時, 上報偏移量又充當了主從 broker 之間的心跳的角色
      if (this.isTimeToReportOffset()) {
        // 向已經建立好的連線寫入當前偏移量
        boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
        if (!result) {
          this.closeMaster();
        }
      }

      // 等待事件的發生
      // 而這裡註冊的事件為 OP_READ, 即為等待主 Broker 傳送訊息
      this.selector.select(1000);

      // 處理到來的讀事件
      boolean ok = this.processReadEvent();
      // 出現異常時關閉連線
      if (!ok) {
        this.closeMaster();
      }

      // 從本地 store 服務中讀取已經提交的偏移量, 報告給主 Broker
      if (!reportSlaveMaxOffsetPlus()) {
        continue;
      }

			// pass: 超時後的關閉 channel
    } else {
      this.waitForRunning(1000 * 5);
    }
  } catch (Exception e) {
    log.warn(this.getServiceName() + " service has exception. ", e);
    this.waitForRunning(1000 * 5);
  }
}

在主從 Broker 的通訊中,上報偏移量充當了心跳的角色,在主 Broker 那邊,超過指定時間沒有接收到心跳,則會判斷為斷開連線

private boolean isTimeToReportOffset() {
  long interval =
    HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;
  boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig()
    .getHaSendHeartbeatInterval();

  return needHeart;
}

且這個心跳的超時時間預設配置為 5s


如果超過了指定的需要上報的間隔,就會通過已經建立好的 channel 寫入當前已經持久化偏移量

private boolean reportSlaveMaxOffset(final long maxOffset) {
  this.reportOffset.position(0);
  this.reportOffset.limit(8);
  this.reportOffset.putLong(maxOffset);
  this.reportOffset.position(0);
  this.reportOffset.limit(8);

  for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
    try {
      this.socketChannel.write(this.reportOffset);
    } catch (IOException e) {
      log.error(this.getServiceName()
                + "reportSlaveMaxOffset this.socketChannel.write exception", e);
      return false;
    }
  }

  lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
  return !this.reportOffset.hasRemaining();
}

這段程式碼很好理解,即直接將偏移量寫入到 buffer,然後將 buffer 寫入到 channel

吐槽下這種控制 buffer 位置的方式,直接呼叫 clear 後 put 然後 filp 不比直接操作位置直觀嗎...


寫入這裡可能會有疑惑,為什麼需要多次寫入(write),難道一次寫不完嗎?

SocketChannel 的註釋表示可能還真寫不完

Unless otherwise specified, a write operation will return only after writing all of the r requested bytes. Some types of channels, depending upon their state, may write only some of the bytes or possibly none at all. A socket channel in non-blocking mode, for example, cannot write any more bytes than are free in the socket's output buffer.

由於我們使用的是 NIO,也叫非阻塞 IO,所以在寫入的時候是不會阻塞的,我們也不知道具體寫入成功了沒有。而在 TCP 中是具有流量控制的,如果對面 socket 的接受緩衝區已經滿了,就會觸發流量控制,在本端的 socket 中的傳送緩衝區由於不會發送任何資料,所以會很快的堆滿,直到對方恢復接收為止。

而這時如果是傳統的 BIO,會阻塞在 write(..) 這直到完全的寫入為止,而 NIO 就可能需要多次 write 保證寫入完成。


然後在寫入完成後,會更新已經通訊的時間,並讓 selector 等待可讀事件發生,也就是等待對面傳送需要拉取的資料。

在事件發生或到達 1s 後,就會執行以下程式碼

private boolean processReadEvent() {
  int readSizeZeroTimes = 0;
  while (this.byteBufferRead.hasRemaining()) {
    try {
      int readSize = this.socketChannel.read(this.byteBufferRead);
      if (readSize > 0) {
        // 讀取到的需要拉取的訊息, 進行寫入
        readSizeZeroTimes = 0;
        boolean result = this.dispatchReadRequest();
        if (!result) {
          log.error("HAClient, dispatchReadRequest error");
          return false;
        }
      } else if (readSize == 0) {
        // 如果連續三次讀到的 size 都為 0 則結束
        if (++readSizeZeroTimes >= 3) {
          break;
        }
      } else {
        log.info("HAClient, processReadEvent read socket < 0");
        return false;
      }
    } catch (IOException e) {
      log.info("HAClient, processReadEvent read socket exception", e);
      return false;
    }
  }

  return true;
}

首先會先讀一個 long 的長度(8B)來判斷是否有資料,如有則通過 dispatchReadRequest 方法接著讀取讀取具體的訊息

private boolean dispatchReadRequest() {
    final int msgHeaderSize = 8 + 4; // phyoffset + size

    while (true) {
        int diff = this.byteBufferRead.position() - this.dispatchPosition;
        if (diff >= msgHeaderSize) {
         // pass: 讀訊息,然後同步 append 到 commitlog   
        }

        if (!this.byteBufferRead.hasRemaining()) {
            // 將當前正在讀的 byteBuffer 和 backupByteBuffer 進行交換
            this.reallocateByteBuffer();
        }
        break;
    }
    return true;
}

在讀取完成並持久化成功後,會提交一次已同步的偏移量,並接著迴圈以上的過程



HAConnection

該類為 SocketChannel 的包裝類,直接負責對 socket 的讀寫。

並且這個類也和 HAService 類似,將功能實現交給了兩個內部服務類去做:

  • ReadSocketService

    負責 socket 的讀,讀的是 HAClient 發過來的已經提交的偏移量(也是心跳)

  • WriteSocketService

    負責 socket 的寫,寫的是需要同步到從 Broker 的訊息


ReadSocketService

和 HAClient 類似,也是簡單的通過在註冊了 OP_READ 事件的 selector 來獲取傳送過來的偏移量

while (!this.isStopped()) {
  try {
    this.selector.select(1000);
    boolean ok = this.processReadEvent();
    if (!ok) {
      HAConnection.log.error("processReadEvent error");
      break;
    }

    // 檢查心跳時間, 超時則停止服務且移除這個連線
    long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
    if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
      log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
      break;
    }
  } catch (Exception e) {
    HAConnection.log.error(this.getServiceName() + " service has exception.", e);
    break;
  }
}

在有可讀事件後的核心原始碼如下

if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
  // 在這裡讀取到到的訊息為從 Broker 當前已經持久化的偏移量
  int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
  long readOffset = this.byteBufferRead.getLong(pos - 8);
  this.processPosition = pos;

  // 更新自身對於從 Broker 的持久化位置的資訊
  HAConnection.this.slaveAckOffset = readOffset;
  if (HAConnection.this.slaveRequestOffset < 0) {
    HAConnection.this.slaveRequestOffset = readOffset;
    log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
  }

  // 通知主 Broker 從 Broker 的提交偏移量已經更新
  HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
}

在這裡通知了已提交偏移量更新了以後,之前在 GroupTransferService 類中看見的阻塞執行緒就會被喚醒,然後進行請求的響應。


WriteSocketService

該類用於定期的將主 Broker 的訊息寫入到該連線對應的從 Broker。

首先先初始化需要拉取的開始位置

if (-1 == this.nextTransferFromWhere) {
  // 初始化從主 Broker 當前已寫入位置推送或為從 Broker 請求的位置
  if (0 == HAConnection.this.slaveRequestOffset) {
    // 計算需要推送的偏移量, 這個偏移量是 MappedFile 中的物理偏移量
    long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
    masterOffset =
      masterOffset
      - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
         .getMappedFileSizeCommitLog());

    if (masterOffset < 0) {
      masterOffset = 0;
    }

    this.nextTransferFromWhere = masterOffset;
  } else {
    this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
  }

  log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
           + "], and slave request " + HAConnection.this.slaveRequestOffset);
}

預設的起點為從 Broker 的請求位置,如果沒有則從自身當前已經寫入的物理位置為起點寫入


確定好即將進行 append 的位置後,則先將還沒寫完的資料進行寫入,直到所有剩餘的資料寫完後,才會進行下一批資料的拉取並寫入

if (this.lastWriteOver) {
  // 計算上一次寫入的時間, 超過指定間隔才寫入
  long interval =
    HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;

  if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
      .getHaSendHeartbeatInterval()) {
  	// pass: 寫入頭部欄位
  }
} else {
  // 如果上一次寫入還有沒有寫完的資料, 需要進行寫入
  this.lastWriteOver = this.transferData();
  if (!this.lastWriteOver)
    continue;
}

然後進行新資料的拉取

// 所有之前拉取的資料都已經寫完了, 現在繼續獲取新的資料以寫入從 Broker
SelectMappedBufferResult selectResult =
  HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
if (selectResult != null) {
  // 分批推送過去, 預設為 32k
  int size = selectResult.getSize();
  if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
    size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
  }

  long thisOffset = this.nextTransferFromWhere;
  this.nextTransferFromWhere += size;
  
  // pass: 寫入頭部

  this.lastWriteOver = this.transferData();
} else {

  HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
}

而我們可以看見,這裡的寫入並沒有將所有的資料進行直接的寫入,而是以預設 32K 的資料量進行分批的推送,一次沒有寫完的資料將會在下一次迴圈中寫入。



總結

最後我們再梳理一下主備同步的完整過程:

  1. HAService 啟動,監聽指定埠;HAClient 連線到指定埠。
  2. HAClient 如果有指定的偏移量,則從這個偏移量開始傳送;否則從主 Broker 的 CommitLog 的尾部開始傳送訊息。
  3. HAClient 收到來自 HAService 的訊息後,將其持久化到儲存層,然後更新已持久化索引
  4. HAClient 的已提交的 Log 的位置會定時上報
  5. HAService 發現位置已經更新了後,會處理上層應用之前提交的請求

以上過程不斷的重複,就完成了主備同步。


可以看的出來,這裡的實現十分的簡單,設計也非常高效,但是對於宕機卻沒有做多少處理,即使從 Broker 可以在主 Broker 宕機以後接管讀請求,但卻不能做到自動的主備轉移。

從 Broker 接管讀的具體實現已經在前幾篇文章提過


所以 RocketMQ 有了另一種高可用模式:DLedger 叢集,這種新的方案基於 Raft 演算法實現了 Broker 組內的自動故障轉移功能,實現了高可用。