1. 程式人生 > >MySQL二進位制日誌分析-程式碼實現(FORMAT_DESCRIPTION_EVENT)

MySQL二進位制日誌分析-程式碼實現(FORMAT_DESCRIPTION_EVENT)

如前文概述,MySQL Binlog v3以前版本, 二進位制日誌檔案的第一個事件是START_EVENT_V3, 從v4版本開始第一個事件為FORMAT_DESCRIPTION_EVENT(以下簡稱FD事件),替代掉START_EVENT_V3。具體到MySQL伺服器版本來說,MySQL 5.0以前版本二進位制日誌的第一個事件是START_EVENT_V3,而後續版本的第一個事件都是FD事件,由於目前大部分MySQL都跑在5.0+,所以這裡不討論START_EVENT_V3事件,FD事件可以看成是START_EVENT_V3的繼承和擴充套件,是相通的,有興趣可以蟲github 克隆(clone)BinlogMiner的原始碼下來參考。

FD事件比較簡單,順帶介紹一下BinlogMiner的程式實現,本文可能很拖沓。

檔案I/O是程式語言的基本功能,也是程式設計的基本功。具體到Java來說,傳統方式是通過I/O流來進行檔案的讀寫,如"BufferedInputStream"。Java 1.4版本的開始引入新的IO訪問方式Java NIO (New IO) ,其中的MappedByteBuffer允許將一個檔案通過FileChannel.map,對映成一個記憶體快取,跟簡單的來說,對映成一個位元組陣列,可以通過直接訪問陣列的方式訪問檔案。這大大簡化了檔案的隨機讀寫,而且相較於流方式,更接近人的思考方式,還有極高的讀寫效率。總的來說Java還是很全面的,對於底層的開發也支援很好,NO.1 霸榜多年還是有道理的。BinlogMiner基於MappedByteBuffer實現的。下面介紹的並非BinlogMiner的程式碼(因為實際程式碼需要處理很多細節,不適合來說原理),下面只是舉個簡單實現能跑的栗子。

只需要簡單的3行就可以建立一個MappedByteBuffer:

String binlogFileName = "D:\\build\\binlogs\\5.7.18\\blog.000016";
RandomAccessFile binlogFile = new RandomAccessFile(binlogFileName, "r");
FileChannel binlogFileChannel = binlogFile.getChannel();
MappedByteBuffer blogFileBuffer = binlogFileChannel.map(MapMode.READ_ONLY, 0, binlogFile.length());

最後一行,建立一個將檔案(D:\\build\\binlogs\\5.7.18\\blog.000016)對映到MappedByteBuffer,只讀模式,範圍從第一個位元組開始(0),到檔案結束(binlogFile.length())。通過MappedByteBuffer.position()可以獲取當前的檔案訪問的位置,通過MappedByteBuffer.position(int position)可以設定當前檔案訪問的位置,在概述的時候提到,前4個位元組是二進位制檔案的幻數(magic number),將訪問指標直接跳過,就到了FD事件開始了。

blogFileBuffer.position(4);
int starPos = blogFileBuffer.position(); //記錄事件開始位置

這裡需要注意的是MappedByteBuffer的position是一個整型,最大的值是Integer.MAX_VALUE。也就是2GB,如果超過2GB,那麼就需要重新對映,如:

blogFileBuffer = binlogFileChannel.map(MapMode.READ_ONLY, new_start_position , new_end_position);

這時候MappedByteBuffer的position就是一個相對偏移,需要根據new_start_position_position來獲得絕對偏移量,當然對於Binlog不需要擔心這個問題,因為截至當前的MySQL 8版本,二進位制檔案的最大限制還是1GB。

1. Common header的解析

根據前文,事件的第一部分是通用頭,根據官方文件:
https://dev.mysql.com/doc/internals/en/binlog-event-header.html

Binlog header Payload:
  4  timestamp
  1  event type
  4  server-id
  4  event-size
  if binlog-version > 1:
      4  log pos
      2  flags

接下來程式碼獲取檔案頭的內容:

1.1.timestamp

前4個位元組是該事件的時間戳:

byte[] rawTimestamp = new byte[4];
blogFileBuffer.get(rawTimestamp); 

根據文件,timestamp是一個無符號的4位元組長度的整型,這裡需要注意的2個細節是:
  - Java沒有unsigned關鍵字,沒有無符號數,這裡需要用第三放的庫或者直接實現無符號數。
  - 不同平臺,使用不同位元組序的概念,如,一個數值,1234,litte-endian儲存的是4321,而big-endian儲存的是1234。
  - 在Java中,通過4個位元組記錄一個整型,8個位元組記錄一個長整型,但在binlog中,為節省空間,提高效能,可能是1個位元組,2個位元組,3個位元組,。。。

在BinlogMiner中,是自己寫函式實現的無符號數讀取,具體可以看PaserHelper這個工具類,這裡就不列出來:

ByteOrder order = ByteOrder.LITTLE_ENDIAN;  //little-endian,小位元組序,x86平臺都是。
long timestampValue = ParserHelper.getUnsignedLong(rawTimestamp, order); //獲得了儲存的值;

//這裡獲得的timestamp是unix timestamp,是基於秒的,而java的Date是基於毫秒(milliseconds)的,所以這裡轉換要乘以1000.
Date timestamp = new Date(timestampValue * 1000L);
System.out.println(timestamp.toString()); // 這裡就獲得了可讀的事件日期;

1.2. event type

1個位元組的無符號數,說明該事件的型別:

byte[] rawEventType = new byte[1];
blogFileBuffer.get(rawEventType);
int eventTypeValue = ParserHelper.getUnsignedInteger(rawEventType, order);
System.out.print(eventTypeValue); 

這裡輸出的事件型別為15,轉換成十六進位制為0x0F,根據文件:(https://dev.mysql.com/doc/internals/en/binlog-event-type.html)正好是FD事件。

1.3. server-id

MySQL的伺服器id,4位元組無符號:

byte[] rawServerId = new byte[4];
blogFileBuffer.get(rawServerId);
long serverId = ParserHelper.getUnsignedInteger(rawServerId, order);

1.4. event-size

該事件的長度(位元組數),4位元組無符號數;

byte[] rawEventSize = new byte[4];
blogFileBuffer.get(rawEventSize);
long eventSize = ParserHelper.getUnsignedInteger(rawEventSize, order);

1.5. log pos

該事件的結束位置,4位元組無符號數,v4版本才包含,v4以前的START_EVENT_V3其實也是可以根據event-size計算得log pos的。

byte[] rawLogPos = new byte[4];
blogFileBuffer.get(rawLogPos);
long logPos = ParserHelper.getUnsignedInteger(rawLogPos, order);

1.6. flags

該事件的標誌(https://dev.mysql.com/doc/internals/en/binlog-event-flag.html)

byte[] rawFlags = new byte[2];
blogFileBuffer.get(rawFlags);
long flags = ParserHelper.getUnsignedInteger(rawFlags, order);

至此,事件的通用頭分析結束。

2. Event Body

Common Header後就是事件內容(Event Body),先看官方定義:


FORMAT_DESCRIPTION_EVENT:A format description event is the first event of a binlog for binlog-version 4. It describes how the other events are layed out.

Payload:
  2             binlog-version
  string[50]    mysql-server version
  4             create timestamp
  1             event header length
  string[p]     event type header lengths

注意:這裡的文件寫的是Payload,但實際分析的時候會發現,應該理解為前文概述中的post header。實際的playload根據版本不同為1個位元組,或者0個位元組,用於標識CRC演算法,先做個引子,後文會詳細說明。

2.1.binlog-version

這裡自描述了該二進位制日誌檔案的版本,2個位元組無符號數:

byte[] rawBinlogVersion = new byte[2];
blogFileBuffer.get(rawBinlogVersion);
int binlogVersion =ParserHelper.getUnsignedInteger(rawBinlogVersion, order);
System.out.print(binlogVersion); //--> 輸出4,v4版本

2.2. mysql-server version

記錄了伺服器的版本,50個位元組,字串型別。

byte[] rawserverVersion = new byte[50];
blogFileBuffer.get(rawserverVersion);
String serverVersion = new String(rawserverVersion).trim();
System.out.print(serverVersion); //--> 輸出 5.7.18-15-log

2.3. create timestamp

二進位制檔案建立的事件戳。4位元組無符號數:

byte[] rawCreateTimestamp = new byte[4];
blogFileBuffer.get(rawCreateTimestamp);
long createTimestampValue = ParserHelper.getUnsignedInteger(rawCreateTimestamp, order);
Date createTimestamp = new Date(createTimestampValue * 1000L);

2.4. event header length

通用頭(Common header)的長度, 1個位元組,無符號數。

byte[] rawCommonHeaderSize = new byte[1];
blogFileBuffer.get(rawCommonHeaderSize);
int commonHeaderSize = ParserHelper.getUnsignedInteger(rawCommonHeaderSize, order);
System.out.print(commonHeaderSize); //--> 輸出是19,除了v1版本,其他版本都是固定的19個位元組。

2.5. event type header lengths

這裡的是每個事件的post header的長度的位元組陣列,每一個事件一個位元組。
首先這個陣列長度不是固定的(每個版本包含的事件數很可能是不同的),還可能包含checksum部分,前文說過,在MySQL 5.6.2版本開始引入Checksum, 這裡可能還有Checksum的資訊。這部分解析就需要些技巧,首先假設“event type header lengths”一直到事件結束。

int pos = blogFileBuffer.position(); //get current position;
byte[] remainBytes = new byte[logPos - pos]; //根據當前的位置和結束位置,計算出該事件剩餘的位元組數
blogFileBuffer.get(remainBytes); 

那麼根據這個陣列,FD事件的Post Header的長度是多少?根據官方文件,可以根據(事件的編號-1)作為索引獲取:

byte rawFdPostHeaderLength = remainBytes[0x0f - 1]; //FD event = 0x0f
int fdPostHeaderLength = rawFdPostHeaderLength & 0xFF; //這裡是byte轉無符號int
System.out.println(fdPostHeaderLength); //--> FD的post header長度為95個位元組。

可以通過log pos和post header來計算出checksum部分的長度,其中根據官方文件,用1個位元組記錄checksum型別,剩下的為checksum的值:

long crcSize = logPos - (starPos + 19 + fdPostHeaderLength); // 輸出結構--> 5 (1 byte crc type + 4 byte crc value)

所以實際的Post header陣列為:

byte[] postHeaderSizeArray = Arrays.copyOfRange(remainBytes, 0, remainBytes.length-5);

2.6. checksum演算法

所以checksum的型別,和checksum的值為:

int crcType =remainBytes[postHeaderSizeArray.length] & 0xff;
System.out.println(ParserHelper.getHexString(crcValue)); // --> = 1, 根據文件也就是CRC32
byte[] crcValue = Arrays.copyOfRange(remainBytes, remainBytes.length-(5-1), remainBytes.length);
System.out.println(ParserHelper.getHexString(crcValue)); 
//--> 本案例的輸出為F2FABAC5,可以通過mysqlbinlog比對(0xc5bafaf2), 注意,因為是little-endian平臺,這裡需要倒過來。

至此,第一個事件分析完畢。最後附上完整程式碼(注意:ParserHelper來自於BinlogMiner):

import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteOrder;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import java.util.Arrays;
import java.util.Date;

import org.littlestar.mysql.binlog.parser.ParserHelper;

public class T2 {
    public static void main(String[] args) throws IOException {
        String binlogFileName = "D:\\build\\binlogs\\5.7.18\\blog.000020";
        RandomAccessFile binlogFile = new RandomAccessFile(binlogFileName, "r");
        FileChannel binlogFileChannel = binlogFile.getChannel();
        MappedByteBuffer blogFileBuffer = binlogFileChannel.map(MapMode.READ_ONLY, 0, binlogFile.length());
        
        //skip 4 bytes image number;
        blogFileBuffer.position(4);
        int starPos = blogFileBuffer.position();
        //Common Header..
        byte[] rawTimestamp = new byte[4];
        blogFileBuffer.get(rawTimestamp);
        
        ByteOrder order = ByteOrder.LITTLE_ENDIAN; 
        long timestampValue = ParserHelper.getUnsignedLong(rawTimestamp, order);
        Date timestamp = new Date(timestampValue * 1000L);

        byte[] rawEventType = new byte[1];
        blogFileBuffer.get(rawEventType);
        int eventTypeValue = ParserHelper.getUnsignedInteger(rawEventType, order);
        
        byte[] rawServerId = new byte[4];
        blogFileBuffer.get(rawServerId);
        long serverId = ParserHelper.getUnsignedInteger(rawServerId, order);
        
        byte[] rawEventSize = new byte[4];
        blogFileBuffer.get(rawEventSize);
        long eventSize = ParserHelper.getUnsignedInteger(rawEventSize, order);
        
        byte[] rawLogPos = new byte[4];
        blogFileBuffer.get(rawLogPos);
        int logPos = ParserHelper.getUnsignedInteger(rawLogPos, order);
        
        byte[] rawFlags = new byte[2];
        blogFileBuffer.get(rawFlags);
        int flags = ParserHelper.getUnsignedInteger(rawFlags, order);
        
        //Event Body
        
        byte[] rawBinlogVersion = new byte[2];
        blogFileBuffer.get(rawBinlogVersion);
        int binlogVersion = ParserHelper.getUnsignedInteger(rawBinlogVersion, order);
        
        byte[] rawserverVersion = new byte[50];
        blogFileBuffer.get(rawserverVersion);
        String serverVersion = new String(rawserverVersion).trim();
        
        byte[] rawCreateTimestamp = new byte[4];
        blogFileBuffer.get(rawCreateTimestamp);
        long createTimestampValue = ParserHelper.getUnsignedInteger(rawCreateTimestamp, order);
        Date createTimestamp = new Date(createTimestampValue * 1000L);
        
        byte[] rawCommonHeaderSize = new byte[1];
        blogFileBuffer.get(rawCommonHeaderSize);
        int commonHeaderSize = ParserHelper.getUnsignedInteger(rawCommonHeaderSize, order);
        
        int pos = blogFileBuffer.position(); //get current position;
        byte[] remainBytes = new byte[logPos - pos]; 
        blogFileBuffer.get(remainBytes);
        
        byte rawFdPostHeaderLength = remainBytes[0x0f - 1]; //FD event = 0x0f
        int fdPostHeaderLength = rawFdPostHeaderLength & 0xFF; //
        System.out.println(fdPostHeaderLength);
        long crcSize = logPos - (starPos + 19 + fdPostHeaderLength); // --> 5 (1 byte crc type + crc value)
        byte[] postHeaderSizeArray = Arrays.copyOfRange(remainBytes, 0, remainBytes.length-5);
        int crcType =remainBytes[postHeaderSizeArray.length] & 0xff;
        byte[] crcValue = Arrays.copyOfRange(remainBytes, remainBytes.length-4, remainBytes.length);
        System.out.println(ParserHelper.getHexString(crcValue));
        
        binlogFileChannel.close();
        binlogFile.close();
        
    }
}

 


&n