【RocketMQ】主從同步實現原理
主從同步的實現邏輯主要在HAService
中,在DefaultMessageStore
的建構函式中,對HAService
進行了例項化,並在start方法中,啟動了HAService
:
public class DefaultMessageStore implements MessageStore { public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager, final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException { // ... if (!messageStoreConfig.isEnableDLegerCommitLog()) { // 初始化HAService this.haService = new HAService(this); } else { this.haService = null; } // ... } public void start() throws Exception { // ... if (!messageStoreConfig.isEnableDLegerCommitLog()) { // 啟動HAService this.haService.start(); this.handleScheduleMessageService(messageStoreConfig.getBrokerRole()); } // ... } }
在HAService
的建構函式中,建立了AcceptSocketService
、GroupTransferService
和HAClient
,在start方法中主要做了如下幾件事:
- 呼叫
AcceptSocketService
的beginAccept方法,這一步主要是進行埠繫結,在埠上監聽從節點的連線請求(可以看做是執行在master節點的); - 呼叫
AcceptSocketService
的start方法啟動服務,這一步主要為了處理從節點的連線請求,與從節點建立連線(可以看做是執行在master節點的); - 呼叫
GroupTransferService
的start方法,主要用於在主從同步的時候,等待資料傳輸完畢 - 呼叫
HAClient
的start方法啟動,裡面與master節點建立連線,向master彙報主從同步進度並存儲master傳送過來的同步資料(可以看做是執行在從節點的);
public class HAService { public HAService(final DefaultMessageStore defaultMessageStore) throws IOException { this.defaultMessageStore = defaultMessageStore; // 建立AcceptSocketService this.acceptSocketService = new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort()); this.groupTransferService = new GroupTransferService(); // 建立HAClient this.haClient = new HAClient(); } public void start() throws Exception { // 開始監聽從伺服器的連線 this.acceptSocketService.beginAccept(); // 啟動服務 this.acceptSocketService.start(); // 啟動GroupTransferService this.groupTransferService.start(); // 啟動 this.haClient.start(); } }
監聽從節點連線請求
AcceptSocketService
的beginAccept
方法裡面首先獲取了ServerSocketChannel
,然後進行埠繫結,並在selector上面註冊了OP_ACCEPT事件的監聽,監聽從節點的連線請求:
public class HAService {
class AcceptSocketService extends ServiceThread {
/**
* 監聽從節點的連線
*
* @throws Exception If fails.
*/
public void beginAccept() throws Exception {
// 建立ServerSocketChannel
this.serverSocketChannel = ServerSocketChannel.open();
// 獲取selector
this.selector = RemotingUtil.openSelector();
this.serverSocketChannel.socket().setReuseAddress(true);
// 繫結埠
this.serverSocketChannel.socket().bind(this.socketAddressListen);
// 設定非阻塞
this.serverSocketChannel.configureBlocking(false);
// 註冊OP_ACCEPT連線事件的監聽
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}
}
}
處理從節點連線請求
AcceptSocketService
的run方法中,對監聽到的連線請求進行了處理,處理邏輯大致如下:
- 從selector中獲取到監聽到的事件;
- 如果是
OP_ACCEPT
連線事件,建立與從節點的連線物件HAConnection
,與從節點建立連線,然後呼叫HAConnection
的start方法進行啟動,並建立的HAConnection
物件加入到連線集合中,HAConnection中封裝了Master節點和從節點的資料同步邏輯;
public class HAService {
class AcceptSocketService extends ServiceThread {
@Override
public void run() {
log.info(this.getServiceName() + " service started");
// 如果服務未停止
while (!this.isStopped()) {
try {
this.selector.select(1000);
// 獲取監聽到的事件
Set<SelectionKey> selected = this.selector.selectedKeys();
// 處理事件
if (selected != null) {
for (SelectionKey k : selected) {
// 如果是連線事件
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
if (sc != null) {
HAService.log.info("HAService receive new connection, "
+ sc.socket().getRemoteSocketAddress());
try {
// 建立HAConnection,建立連線
HAConnection conn = new HAConnection(HAService.this, sc);
// 啟動
conn.start();
// 新增連線
HAService.this.addConnection(conn);
} catch (Exception e) {
log.error("new HAConnection exception", e);
sc.close();
}
}
} else {
log.warn("Unexpected ops in select " + k.readyOps());
}
}
selected.clear();
}
} catch (Exception e) {
log.error(this.getServiceName() + " service has exception.", e);
}
}
log.info(this.getServiceName() + " service end");
}
}
}
等待主從複製傳輸結束
GroupTransferService
的run方法主要是為了在進行主從資料同步的時候,等待從節點資料同步完畢。
在執行時首先進會呼叫waitForRunning
進行等待,因為此時可能還有沒有開始主從同步,所以先進行等待,之後如果有同步請求,會喚醒該執行緒,然後呼叫doWaitTransfer
方法等待資料同步完成:
public class HAService {
class GroupTransferService extends ServiceThread {
public void run() {
log.info(this.getServiceName() + " service started");
// 如果服務未停止
while (!this.isStopped()) {
try {
// 等待執行
this.waitForRunning(10);
// 如果被喚醒,呼叫doWaitTransfer等待主從同步完成
this.doWaitTransfer();
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info(this.getServiceName() + " service end");
}
}
}
在看doWaitTransfer
方法之前,首先看下是如何判斷有資料需要同步的。
Master節點中,當訊息被寫入到CommitLog以後,會呼叫submitReplicaRequest
方法處主從同步,首先判斷當前Broker的角色是否是SYNC_MASTER,如果是則會構建訊息提交請求GroupCommitRequest
,然後呼叫HAService
的putRequest
新增到請求集合中,並喚醒GroupTransferService
中在等待的執行緒:
public class CommitLog {
public CompletableFuture<PutMessageStatus> submitReplicaRequest(AppendMessageResult result, MessageExt messageExt) {
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
if (messageExt.isWaitStoreMsgOK()) {
if (service.isSlaveOK(result.getWroteBytes() + result.getWroteOffset())) {
// 構建GroupCommitRequest
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout());
// 新增請求
service.putRequest(request);
// 喚醒GroupTransferService中在等待的執行緒
service.getWaitNotifyObject().wakeupAll();
return request.future();
}
else {
return CompletableFuture.completedFuture(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
在doWaitTransfer
方法中,會判斷CommitLog提交請求集合requestsRead
是否為空,如果不為空,表示有訊息寫入了CommitLog,Master節點需要等待將資料傳輸給從節點:
- push2SlaveMaxOffset記錄了從節點已經同步的訊息偏移量,判斷push2SlaveMaxOffset是否大於本次CommitLog提交的偏移量,也就是請求中設定的偏移量;
- 獲取請求中設定的等待截止時間;
- 開啟迴圈,判斷資料是否還未傳輸完畢,並且未超過截止時間,如果是則等待1s,然後繼續判斷傳輸是否完畢,不斷進行,直到超過截止時間或者資料已經傳輸完畢;
(向從節點發送的訊息最大偏移量push2SlaveMaxOffset超過了請求中設定的偏移量表示本次同步資料傳輸完畢); - 喚醒在等待資料同步完畢的執行緒;
public class HAService {
// CommitLog提交請求集合
private volatile LinkedList<CommitLog.GroupCommitRequest> requestsRead = new LinkedList<>();
class GroupTransferService extends ServiceThread {
private void doWaitTransfer() {
// 如果CommitLog提交請求集合不為空
if (!this.requestsRead.isEmpty()) {
// 處理訊息提交請求
for (CommitLog.GroupCommitRequest req : this.requestsRead) {
// 判斷傳輸到從節點最大偏移量是否超過了請求中設定的偏移量
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
// 獲取截止時間
long deadLine = req.getDeadLine();
// 如果從節點還未同步完畢並且未超過截止時間
while (!transferOK && deadLine - System.nanoTime() > 0) {
// 等待
this.notifyTransferObject.waitForRunning(1000);
// 判斷從節點同步的最大偏移量是否超過了請求中設定的偏移量
transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
}
// 喚醒
req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
this.requestsRead = new LinkedList<>();
}
}
}
}
啟動HAClient
HAClient可以看做是在從節點上執行的,主要進行的處理如下:
- 呼叫
connectMaster
方法連線Master節點,Master節點上也會執行,但是它本身就是Master沒有可連的Master節點,所以可以忽略; - 呼叫
isTimeToReportOffset
方法判斷是否需要向Master節點彙報同步偏移量,如果需要則呼叫reportSlaveMaxOffset
方法將當前的訊息同步偏移量傳送給Master節點; - 呼叫
processReadEvent
處理網路請求中的可讀事件,也就是處理Master傳送過來的訊息,將訊息存入CommitLog;
public class HAService {
class HAClient extends ServiceThread {
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// 連線Master節點
if (this.connectMaster()) {
// 是否需要報告訊息同步偏移量
if (this.isTimeToReportOffset()) {
// 向Master節點發送同步偏移量
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
if (!result) {
this.closeMaster();
}
}
this.selector.select(1000);
// 處理讀事件,也就是Master節點發送的資料
boolean ok = this.processReadEvent();
if (!ok) {
this.closeMaster();
}
// ...
} else {
this.waitForRunning(1000 * 5);
}
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
this.waitForRunning(1000 * 5);
}
}
log.info(this.getServiceName() + " service end");
}
}
}
連線主節點
connectMaster方法中會獲取Master節點的地址,並轉換為SocketAddress物件,然後向Master節點請求建立連線,並在selector註冊OP_READ可讀事件監聽:
public class HAService {
class HAClient extends ServiceThread {
// 當前的主從複製進度
private long currentReportedOffset = 0;
private boolean connectMaster() throws ClosedChannelException {
if (null == socketChannel) {
String addr = this.masterAddress.get();
if (addr != null) {
// 將地址轉為SocketAddress
SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
if (socketAddress != null) {
// 連線master
this.socketChannel = RemotingUtil.connect(socketAddress);
if (this.socketChannel != null) {
// 註冊OP_READ可讀事件監聽
this.socketChannel.register(this.selector, SelectionKey.OP_READ);
}
}
}
// 獲取CommitLog中當前最大的偏移量
this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
// 更新上次寫入時間
this.lastWriteTimestamp = System.currentTimeMillis();
}
return this.socketChannel != null;
}
}
傳送主從同步訊息拉取偏移量
在isTimeToReportOffset
方法中,首先獲取當前時間與上一次進行主從同步的時間間隔interval,如果時間間隔interval大於配置的傳送心跳時間間隔,表示需要向Master節點發送從節點訊息同步的偏移量,接下來會呼叫reportSlaveMaxOffset
方法傳送同步偏移量,也就是說從節點會定時向Master節點發送請求,反饋CommitLog中同步訊息的偏移量:
public class HAService {
class HAClient extends ServiceThread {
// 當前從節點已經同步訊息的偏移量大小
private long currentReportedOffset = 0;
private boolean isTimeToReportOffset() {
// 獲取距離上一次主從同步的間隔時間
long interval =
HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;
// 判斷是否超過了配置的傳送心跳包時間間隔
boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig()
.getHaSendHeartbeatInterval();
return needHeart;
}
// 傳送同步偏移量,傳入的引數是當前的主從複製偏移量currentReportedOffset
private boolean reportSlaveMaxOffset(final long maxOffset) {
this.reportOffset.position(0);
this.reportOffset.limit(8); // 設定資料傳輸大小為8個位元組
this.reportOffset.putLong(maxOffset);// 設定同步偏移量
this.reportOffset.position(0);
this.reportOffset.limit(8);
for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
try {
// 向Master節點發送拉取偏移量
this.socketChannel.write(this.reportOffset);
} catch (IOException e) {
log.error(this.getServiceName()
+ "reportSlaveMaxOffset this.socketChannel.write exception", e);
return false;
}
}
// 更新發送時間
lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
return !this.reportOffset.hasRemaining();
}
}
}
處理網路可讀事件
processReadEvent
方法中處理了可讀事件,也就是處理Master節點發送的同步資料, 首先從socketChannel中讀取資料到byteBufferRead中,byteBufferRead是讀緩衝區,讀取資料的方法會返回讀取到的位元組數,對位元組數大小進行判斷:
- 如果可讀位元組數大於0表示有資料需要處理,呼叫
dispatchReadRequest
方法進行處理; - 如果可讀位元組數為0表示沒有可讀資料,此時記錄讀取到空資料的次數,如果連續讀到空資料的次數大於3次,將終止本次處理;
class HAClient extends ServiceThread {
// 讀緩衝區,會將從socketChannel讀入緩衝區
private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
while (this.byteBufferRead.hasRemaining()) {
try {
// 從socketChannel中讀取資料到byteBufferRead中,返回讀取到的位元組數
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
// 重置readSizeZeroTimes
readSizeZeroTimes = 0;
// 處理資料
boolean result = this.dispatchReadRequest();
if (!result) {
log.error("HAClient, dispatchReadRequest error");
return false;
}
} else if (readSize == 0) {
// 記錄讀取到空資料的次數
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
log.info("HAClient, processReadEvent read socket < 0");
return false;
}
} catch (IOException e) {
log.info("HAClient, processReadEvent read socket exception", e);
return false;
}
}
return true;
}
}
訊息寫入ComitLog
dispatchReadRequest
方法中會將從節點讀取到的資料寫入CommitLog,dispatchPosition
記錄了已經處理的資料在讀緩衝區中的位置,從讀緩衝區byteBufferRead
獲取剩餘可讀取的位元組數,如果可讀資料的位元組數大於一個訊息頭的位元組數(12個位元組),表示有資料還未處理完畢,反之表示訊息已經處理完畢結束處理。
對資料的處理邏輯如下:
- 從緩衝區中讀取資料,首先獲取到的是訊息在master節點的物理偏移量masterPhyOffset;
- 向後讀取8個位元組,得到訊息體內容的位元組數bodySize;
- 獲取從節點當前CommitLog的最大物理偏移量slavePhyOffset,如果不為0並且不等於masterPhyOffset,表示與Master節點的傳輸偏移量不一致,也就是資料不一致,此時終止處理;
- 如果可讀取的位元組數大於一個訊息頭的位元組數 + 訊息體大小,表示有訊息可處理,繼續進行下一步;
- 計算訊息體在讀緩衝區中的起始位置,從讀緩衝區中根據起始位置,讀取訊息內容,將訊息追加到從節點的CommitLog中;
- 更新dispatchPosition的值為訊息頭大小 + 訊息體大小,dispatchPosition之前的資料表示已經處理完畢;
class HAClient extends ServiceThread {
// 已經處理的資料在讀緩衝區中的位置,初始化為0
private int dispatchPosition = 0;
// 讀緩衝區
private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
private boolean dispatchReadRequest() {
// 訊息頭大小
final int msgHeaderSize = 8 + 4; // phyoffset + size
// 開啟迴圈不斷讀取資料
while (true) {
// 獲可讀取的位元組數
int diff = this.byteBufferRead.position() - this.dispatchPosition;
// 如果位元組數大於一個訊息頭的位元組數
if (diff >= msgHeaderSize) {
// 獲取訊息在master節點的物理偏移量
long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
// 獲取訊息體大小
int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
// 獲取從節點當前CommitLog的最大物理偏移量
long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
if (slavePhyOffset != 0) {
// 如果不一致結束處理
if (slavePhyOffset != masterPhyOffset) {
log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
+ slavePhyOffset + " MASTER: " + masterPhyOffset);
return false;
}
}
// 如果可讀取的位元組數大於一個訊息頭的位元組數 + 訊息體大小
if (diff >= (msgHeaderSize + bodySize)) {
// 將度緩衝區的資料轉為位元組陣列
byte[] bodyData = byteBufferRead.array();
// 計算訊息體在讀緩衝區中的起始位置
int dataStart = this.dispatchPosition + msgHeaderSize;
// 從讀緩衝區中根據訊息的位置,讀取訊息內容,將訊息追加到從節點的CommitLog中
HAService.this.defaultMessageStore.appendToCommitLog(
masterPhyOffset, bodyData, dataStart, bodySize);
// 更新dispatchPosition的值為訊息頭大小+訊息體大小
this.dispatchPosition += msgHeaderSize + bodySize;
if (!reportSlaveMaxOffsetPlus()) {
return false;
}
continue;
}
}
if (!this.byteBufferRead.hasRemaining()) {
this.reallocateByteBuffer();
}
break;
}
return true;
}
}
HAConnection
HAConnection中封裝了Master節點與從節點的網路通訊處理,分別在ReadSocketService
和WriteSocketService
中。
ReadSocketService
ReadSocketService
啟動後處理監聽到的可讀事件,前面知道HAClient中從節點會定時向Master節點彙報從節點的訊息同步偏移量,Master節點對彙報請求的處理就在這裡,如果從網路中監聽到了可讀事件,會呼叫processReadEvent
處理讀事件:
public class HAConnection {
class ReadSocketService extends ServiceThread {
@Override
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.selector.select(1000);
// 處理可讀事件
boolean ok = this.processReadEvent();
if (!ok) {
HAConnection.log.error("processReadEvent error");
break;
}
// ...
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}
// ...
HAConnection.log.info(this.getServiceName() + " service end");
}
}
}
處理可讀事件
processReadEvent
中從網路中處理讀事件的方式與上面HAClient
的dispatchReadRequest
類似,都是將網路中的資料讀取到讀緩衝區中,並用一個變數記錄已讀取資料的位置,processReadEvent
方法的處理邏輯如下:
- 從socketChannel讀取資料到讀緩衝區byteBufferRead中,返回讀取到的位元組數;
- 如果讀取到的位元組數大於0,進入下一步,如果讀取到的位元組數為0,記錄連續讀取到空位元組數的次數是否超過三次,如果超過終止處理;
- 判斷剩餘可讀取的位元組數是否大於等於8,前面知道,從節點發送同步訊息拉取偏移量的時候設定的位元組大小為8,所以位元組數大於等於8的時候表示需要讀取從節點發送的偏移量;
- 計算資料在緩衝區中的位置,從緩衝區讀取從節點發送的同步偏移量readOffset;
- 更新processPosition的值,processPosition表示讀緩衝區中已經處理資料的位置;
- 更新slaveAckOffset為從節點發送的同步偏移量readOffset的值;
- 如果當前Master節點記錄的從節點的同步偏移量slaveRequestOffset小於0,表示還未進行同步,此時將slaveRequestOffset更新為從節點發送的同步偏移量;
- 如果從節點發送的同步偏移量比當前Master節點的最大物理偏移量還要大,終止本次處理;
- 呼叫notifyTransferSome,更新Master節點記錄的向從節點同步訊息的偏移量;
public class HAConnection {
class ReadSocketService extends ServiceThread {
// 讀緩衝區
private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
// 讀緩衝區中已經處理的資料位置
private int processPosition = 0;
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
// 如果沒有可讀資料
if (!this.byteBufferRead.hasRemaining()) {
this.byteBufferRead.flip();
// 處理位置置為0
this.processPosition = 0;
}
// 如果資料未讀取完畢
while (this.byteBufferRead.hasRemaining()) {
try {
// 從socketChannel讀取資料到byteBufferRead中,返回讀取到的位元組數
int readSize = this.socketChannel.read(this.byteBufferRead);
// 如果讀取資料位元組數大於0
if (readSize > 0) {
// 重置readSizeZeroTimes
readSizeZeroTimes = 0;
// 獲取上次處理讀事件的時間戳
this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
// 判斷剩餘可讀取的位元組數是否大於等於8
if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
// 獲取偏移量內容的結束位置
int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
// 從結束位置向前讀取8個位元組得到從點發送的同步偏移量
long readOffset = this.byteBufferRead.getLong(pos - 8);
// 更新處理位置
this.processPosition = pos;
// 更新slaveAckOffset為從節點發送的同步進度
HAConnection.this.slaveAckOffset = readOffset;
// 如果記錄的從節點的同步進度小於0,表示還未進行同步
if (HAConnection.this.slaveRequestOffset < 0) {
// 更新為從節點發送的同步進度
HAConnection.this.slaveRequestOffset = readOffset;
log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
} else if (HAConnection.this.slaveAckOffset > HAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset()) {
// 如果從節點發送的拉取偏移量比當前Master節點的最大物理偏移量還要大
log.warn("slave[{}] request offset={} greater than local commitLog offset={}. ",
HAConnection.this.clientAddr,
HAConnection.this.slaveAckOffset,
HAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset());
return false;
}
// 更新Master節點記錄的向從節點同步訊息的偏移量
HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
}
} else if (readSize == 0)
// 判斷連續讀取到空資料的次數是否超過三次
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
return false;
}
} catch (IOException e) {
log.error("processReadEvent exception", e);
return false;
}
}
return true;
}
}
}
前面在GroupTransferService中可以看到是通過push2SlaveMaxOffset的值判斷本次同步是否完成的,在notifyTransferSome方法中可以看到當Master節點收到從節點反饋的訊息拉取偏移量時,對push2SlaveMaxOffset的值進行了更新:
public class HAService {
// 向從節點推送的訊息最大偏移量
private final GroupTransferService groupTransferService;
public void notifyTransferSome(final long offset) {
// 如果傳入的偏移大於push2SlaveMaxOffset記錄的值,進行更新
for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) {
// 更新向從節點推送的訊息最大偏移量
boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
if (ok) {
this.groupTransferService.notifyTransferSome();
break;
} else {
value = this.push2SlaveMaxOffset.get();
}
}
}
}
WriteSocketService
WriteSocketService
用於Master節點向從節點發送同步訊息,處理邏輯如下:
-
根據從節點發送的主從同步訊息拉取偏移量
slaveRequestOffset
進行判斷:- 如果
slaveRequestOffset
值為-1,表示還未收到從節點報告的同步偏移量,此時睡眠一段時間等待從節點發送訊息拉取偏移量; - 如果
slaveRequestOffset
值不為-1,表示已經開始進行主從同步進行下一步;
- 如果
-
判斷
nextTransferFromWhere
值是否為-1,nextTransferFromWhere記錄了下次需要傳輸的訊息在CommitLog中的偏移量,如果值為-1表示初次進行資料同步,此時有兩種情況:- 如果從節點發送的拉取偏移量slaveRequestOffset為0,就從當前CommitLog檔案最大偏移量開始同步;
- 如果slaveRequestOffset不為0,則從slaveRequestOffset位置處進行資料同步;
-
判斷上次寫事件是否已經將資料都寫入到從節點
- 如果已經寫入完畢,判斷距離上次寫入資料的時間間隔是否超過了設定的心跳時間,如果超過,為了避免連線空閒被關閉,需要傳送一個心跳包,此時構建心跳包的請求資料,呼叫transferData方法傳輸資料;
- 如果上次的資料還未傳輸完畢,呼叫transferData方法繼續傳輸,如果還是未完成,則結束此處處理;
-
根據nextTransferFromWhere從CommitLog中獲取訊息,如果未獲取到訊息,等待100ms,如果獲取到訊息,從CommitLog中獲取訊息進行傳輸:
(1)如果獲取到訊息的位元組數大於最大傳輸的大小,設定最最大傳輸數量,分批進行傳輸;
(2)更新下次傳輸的偏移量地址也就是nextTransferFromWhere的值;
(3)從CommitLog中獲取的訊息內容設定到將讀取到的訊息資料設定到selectMappedBufferResult中;
(4)設定訊息頭資訊,包括訊息頭位元組數、拉取訊息的偏移量等;
(5)呼叫transferData傳送資料;
public class HAConnection {
class WriteSocketService extends ServiceThread {
private final int headerSize = 8 + 4;// 訊息頭大小
@Override
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.selector.select(1000);
// 如果slaveRequestOffset為-1,表示還未收到從節點報告的拉取進度
if (-1 == HAConnection.this.slaveRequestOffset) {
// 等待一段時間
Thread.sleep(10);
continue;
}
// 初次進行資料同步
if (-1 == this.nextTransferFromWhere) {
// 如果拉取進度為0
if (0 == HAConnection.this.slaveRequestOffset) {
// 從master節點最大偏移量從開始傳輸
long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
masterOffset =
masterOffset
- (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getMappedFileSizeCommitLog());
if (masterOffset < 0) {
masterOffset = 0;
}
// 更新nextTransferFromWhere
this.nextTransferFromWhere = masterOffset;
} else {
// 根據從節點發送的偏移量開始資料同步
this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
}
log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
+ "], and slave request " + HAConnection.this.slaveRequestOffset);
}
// 判斷上次傳輸是否完畢
if (this.lastWriteOver) {
// 獲取當前時間距離上次寫入資料的時間間隔
long interval =
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
// 如果距離上次寫入資料的時間間隔超過了設定的心跳時間
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getHaSendHeartbeatInterval()) {
// 構建header
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);
this.byteBufferHeader.putLong(this.nextTransferFromWhere);
this.byteBufferHeader.putInt(0);
this.byteBufferHeader.flip();
// 傳送心跳包
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}
} else {
// 未傳輸完畢,繼續上次的傳輸
this.lastWriteOver = this.transferData();
// 如果依舊未完成,結束本次處理
if (!this.lastWriteOver)
continue;
}
// 根據偏移量獲取訊息資料
SelectMappedBufferResult selectResult =
HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
if (selectResult != null) {// 獲取訊息不為空
// 獲取訊息內容大小
int size = selectResult.getSize();
// 如果訊息的位元組數大於最大傳輸的大小
if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
// 設定為最大傳輸大小
size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
}
long thisOffset = this.nextTransferFromWhere;
// 更新下次傳輸的偏移量地址
this.nextTransferFromWhere += size;
selectResult.getByteBuffer().limit(size);
// 將讀取到的訊息資料設定到selectMappedBufferResult
this.selectMappedBufferResult = selectResult;
// 設定訊息頭
this.byteBufferHeader.position(0);
// 設定訊息頭大小
this.byteBufferHeader.limit(headerSize);
// 設定偏移量地址
this.byteBufferHeader.putLong(thisOffset);
// 設定訊息內容大小
this.byteBufferHeader.putInt(size);
this.byteBufferHeader.flip();
// 傳送資料
this.lastWriteOver = this.transferData();
} else {
// 等待100ms
HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
}
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}
HAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();
// ...
HAConnection.log.info(this.getServiceName() + " service end");
}
}
}
傳送資料
transferData
方法的處理邏輯如下:
- 傳送訊息頭資料;
- 訊息頭資料傳送完畢之後,傳送訊息內容,前面知道從CommitLog中讀取的訊息內容放入到了selectMappedBufferResult,將selectMappedBufferResult的內容傳送給從節點;
public class HAConnection {
class WriteSocketService extends ServiceThread {
private boolean transferData() throws Exception {
int writeSizeZeroTimes = 0;
// 寫入訊息頭
while (this.byteBufferHeader.hasRemaining()) {
// 傳送訊息頭資料
int writeSize = this.socketChannel.write(this.byteBufferHeader);
if (writeSize > 0) {
writeSizeZeroTimes = 0;
// 記錄傳送時間
this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
} else if (writeSize == 0) {
if (++writeSizeZeroTimes >= 3) {
break;
}
} else {
throw new Exception("ha master write header error < 0");
}
}
if (null == this.selectMappedBufferResult) {
return !this.byteBufferHeader.hasRemaining();
}
writeSizeZeroTimes = 0;
// 訊息頭資料傳送完畢之後,傳送訊息內容
if (!this.byteBufferHeader.hasRemaining()) {
while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
// 傳送訊息內容
int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
if (writeSize > 0) {
writeSizeZeroTimes = 0;
this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
} else if (writeSize == 0) {
if (++writeSizeZeroTimes >= 3) {
break;
}
} else {
throw new Exception("ha master write body error < 0");
}
}
}
// ...
return result;
}
}
}
總結
主從同步流程
有新訊息寫入之後的同步流程
參考
丁威、周繼鋒《RocketMQ技術內幕》
RocketMQ版本:4.9.3