MySQL二進位制日誌分析-程式碼實現(FORMAT_DESCRIPTION_EVENT)
如前文概述,MySQL Binlog v3以前版本, 二進位制日誌檔案的第一個事件是START_EVENT_V3, 從v4版本開始第一個事件為FORMAT_DESCRIPTION_EVENT(以下簡稱FD事件),替代掉START_EVENT_V3。具體到MySQL伺服器版本來說,MySQL 5.0以前版本二進位制日誌的第一個事件是START_EVENT_V3,而後續版本的第一個事件都是FD事件,由於目前大部分MySQL都跑在5.0+,所以這裡不討論START_EVENT_V3事件,FD事件可以看成是START_EVENT_V3的繼承和擴充套件,是相通的,有興趣可以蟲github 克隆(clone)BinlogMiner的原始碼下來參考。
FD事件比較簡單,順帶介紹一下BinlogMiner的程式實現,本文可能很拖沓。
檔案I/O是程式語言的基本功能,也是程式設計的基本功。具體到Java來說,傳統方式是通過I/O流來進行檔案的讀寫,如"BufferedInputStream"。Java 1.4版本的開始引入新的IO訪問方式Java NIO (New IO) ,其中的MappedByteBuffer允許將一個檔案通過FileChannel.map,對映成一個記憶體快取,跟簡單的來說,對映成一個位元組陣列,可以通過直接訪問陣列的方式訪問檔案。這大大簡化了檔案的隨機讀寫,而且相較於流方式,更接近人的思考方式,還有極高的讀寫效率。總的來說Java還是很全面的,對於底層的開發也支援很好,NO.1 霸榜多年還是有道理的。BinlogMiner基於MappedByteBuffer實現的。下面介紹的並非BinlogMiner的程式碼(因為實際程式碼需要處理很多細節,不適合來說原理),下面只是舉個簡單實現能跑的栗子。
只需要簡單的3行就可以建立一個MappedByteBuffer:
String binlogFileName = "D:\\build\\binlogs\\5.7.18\\blog.000016"; RandomAccessFile binlogFile = new RandomAccessFile(binlogFileName, "r"); FileChannel binlogFileChannel = binlogFile.getChannel(); MappedByteBuffer blogFileBuffer = binlogFileChannel.map(MapMode.READ_ONLY, 0, binlogFile.length());
最後一行,建立一個將檔案(D:\\build\\binlogs\\5.7.18\\blog.000016)對映到MappedByteBuffer,只讀模式,範圍從第一個位元組開始(0),到檔案結束(binlogFile.length())。通過MappedByteBuffer.position()可以獲取當前的檔案訪問的位置,通過MappedByteBuffer.position(int position)可以設定當前檔案訪問的位置,在概述的時候提到,前4個位元組是二進位制檔案的幻數(magic number),將訪問指標直接跳過,就到了FD事件開始了。
blogFileBuffer.position(4); int starPos = blogFileBuffer.position(); //記錄事件開始位置
這裡需要注意的是MappedByteBuffer的position是一個整型,最大的值是Integer.MAX_VALUE。也就是2GB,如果超過2GB,那麼就需要重新對映,如:
blogFileBuffer = binlogFileChannel.map(MapMode.READ_ONLY, new_start_position , new_end_position);
這時候MappedByteBuffer的position就是一個相對偏移,需要根據new_start_position_position來獲得絕對偏移量,當然對於Binlog不需要擔心這個問題,因為截至當前的MySQL 8版本,二進位制檔案的最大限制還是1GB。
1. Common header的解析
根據前文,事件的第一部分是通用頭,根據官方文件:
https://dev.mysql.com/doc/internals/en/binlog-event-header.html
Binlog header Payload: 4 timestamp 1 event type 4 server-id 4 event-size if binlog-version > 1: 4 log pos 2 flags
接下來程式碼獲取檔案頭的內容:
1.1.timestamp
前4個位元組是該事件的時間戳:
byte[] rawTimestamp = new byte[4]; blogFileBuffer.get(rawTimestamp);
根據文件,timestamp是一個無符號的4位元組長度的整型,這裡需要注意的2個細節是:
- Java沒有unsigned關鍵字,沒有無符號數,這裡需要用第三放的庫或者直接實現無符號數。
- 不同平臺,使用不同位元組序的概念,如,一個數值,1234,litte-endian儲存的是4321,而big-endian儲存的是1234。
- 在Java中,通過4個位元組記錄一個整型,8個位元組記錄一個長整型,但在binlog中,為節省空間,提高效能,可能是1個位元組,2個位元組,3個位元組,。。。
在BinlogMiner中,是自己寫函式實現的無符號數讀取,具體可以看PaserHelper這個工具類,這裡就不列出來:
ByteOrder order = ByteOrder.LITTLE_ENDIAN; //little-endian,小位元組序,x86平臺都是。 long timestampValue = ParserHelper.getUnsignedLong(rawTimestamp, order); //獲得了儲存的值; //這裡獲得的timestamp是unix timestamp,是基於秒的,而java的Date是基於毫秒(milliseconds)的,所以這裡轉換要乘以1000. Date timestamp = new Date(timestampValue * 1000L); System.out.println(timestamp.toString()); // 這裡就獲得了可讀的事件日期;
1.2. event type
1個位元組的無符號數,說明該事件的型別:
byte[] rawEventType = new byte[1]; blogFileBuffer.get(rawEventType); int eventTypeValue = ParserHelper.getUnsignedInteger(rawEventType, order); System.out.print(eventTypeValue);
這裡輸出的事件型別為15,轉換成十六進位制為0x0F,根據文件:(https://dev.mysql.com/doc/internals/en/binlog-event-type.html)正好是FD事件。
1.3. server-id
MySQL的伺服器id,4位元組無符號:
byte[] rawServerId = new byte[4]; blogFileBuffer.get(rawServerId); long serverId = ParserHelper.getUnsignedInteger(rawServerId, order);
1.4. event-size
該事件的長度(位元組數),4位元組無符號數;
byte[] rawEventSize = new byte[4]; blogFileBuffer.get(rawEventSize); long eventSize = ParserHelper.getUnsignedInteger(rawEventSize, order);
1.5. log pos
該事件的結束位置,4位元組無符號數,v4版本才包含,v4以前的START_EVENT_V3其實也是可以根據event-size計算得log pos的。
byte[] rawLogPos = new byte[4]; blogFileBuffer.get(rawLogPos); long logPos = ParserHelper.getUnsignedInteger(rawLogPos, order);
1.6. flags
該事件的標誌(https://dev.mysql.com/doc/internals/en/binlog-event-flag.html)
byte[] rawFlags = new byte[2]; blogFileBuffer.get(rawFlags); long flags = ParserHelper.getUnsignedInteger(rawFlags, order);
至此,事件的通用頭分析結束。
2. Event Body
Common Header後就是事件內容(Event Body),先看官方定義:
FORMAT_DESCRIPTION_EVENT:A format description event is the first event of a binlog for binlog-version 4. It describes how the other events are layed out.
Payload: 2 binlog-version string[50] mysql-server version 4 create timestamp 1 event header length string[p] event type header lengths
注意:這裡的文件寫的是Payload,但實際分析的時候會發現,應該理解為前文概述中的post header。實際的playload根據版本不同為1個位元組,或者0個位元組,用於標識CRC演算法,先做個引子,後文會詳細說明。
2.1.binlog-version
這裡自描述了該二進位制日誌檔案的版本,2個位元組無符號數:
byte[] rawBinlogVersion = new byte[2]; blogFileBuffer.get(rawBinlogVersion); int binlogVersion =ParserHelper.getUnsignedInteger(rawBinlogVersion, order); System.out.print(binlogVersion); //--> 輸出4,v4版本
2.2. mysql-server version
記錄了伺服器的版本,50個位元組,字串型別。
byte[] rawserverVersion = new byte[50]; blogFileBuffer.get(rawserverVersion); String serverVersion = new String(rawserverVersion).trim(); System.out.print(serverVersion); //--> 輸出 5.7.18-15-log
2.3. create timestamp
二進位制檔案建立的事件戳。4位元組無符號數:
byte[] rawCreateTimestamp = new byte[4]; blogFileBuffer.get(rawCreateTimestamp); long createTimestampValue = ParserHelper.getUnsignedInteger(rawCreateTimestamp, order); Date createTimestamp = new Date(createTimestampValue * 1000L);
2.4. event header length
通用頭(Common header)的長度, 1個位元組,無符號數。
byte[] rawCommonHeaderSize = new byte[1]; blogFileBuffer.get(rawCommonHeaderSize); int commonHeaderSize = ParserHelper.getUnsignedInteger(rawCommonHeaderSize, order); System.out.print(commonHeaderSize); //--> 輸出是19,除了v1版本,其他版本都是固定的19個位元組。
2.5. event type header lengths
這裡的是每個事件的post header的長度的位元組陣列,每一個事件一個位元組。
首先這個陣列長度不是固定的(每個版本包含的事件數很可能是不同的),還可能包含checksum部分,前文說過,在MySQL 5.6.2版本開始引入Checksum, 這裡可能還有Checksum的資訊。這部分解析就需要些技巧,首先假設“event type header lengths”一直到事件結束。
int pos = blogFileBuffer.position(); //get current position; byte[] remainBytes = new byte[logPos - pos]; //根據當前的位置和結束位置,計算出該事件剩餘的位元組數 blogFileBuffer.get(remainBytes);
那麼根據這個陣列,FD事件的Post Header的長度是多少?根據官方文件,可以根據(事件的編號-1)作為索引獲取:
byte rawFdPostHeaderLength = remainBytes[0x0f - 1]; //FD event = 0x0f int fdPostHeaderLength = rawFdPostHeaderLength & 0xFF; //這裡是byte轉無符號int System.out.println(fdPostHeaderLength); //--> FD的post header長度為95個位元組。
可以通過log pos和post header來計算出checksum部分的長度,其中根據官方文件,用1個位元組記錄checksum型別,剩下的為checksum的值:
long crcSize = logPos - (starPos + 19 + fdPostHeaderLength); // 輸出結構--> 5 (1 byte crc type + 4 byte crc value)
所以實際的Post header陣列為:
byte[] postHeaderSizeArray = Arrays.copyOfRange(remainBytes, 0, remainBytes.length-5);
2.6. checksum演算法
所以checksum的型別,和checksum的值為:
int crcType =remainBytes[postHeaderSizeArray.length] & 0xff; System.out.println(ParserHelper.getHexString(crcValue)); // --> = 1, 根據文件也就是CRC32 byte[] crcValue = Arrays.copyOfRange(remainBytes, remainBytes.length-(5-1), remainBytes.length); System.out.println(ParserHelper.getHexString(crcValue)); //--> 本案例的輸出為F2FABAC5,可以通過mysqlbinlog比對(0xc5bafaf2), 注意,因為是little-endian平臺,這裡需要倒過來。
至此,第一個事件分析完畢。最後附上完整程式碼(注意:ParserHelper來自於BinlogMiner):
import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteOrder; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.FileChannel.MapMode; import java.util.Arrays; import java.util.Date; import org.littlestar.mysql.binlog.parser.ParserHelper; public class T2 { public static void main(String[] args) throws IOException { String binlogFileName = "D:\\build\\binlogs\\5.7.18\\blog.000020"; RandomAccessFile binlogFile = new RandomAccessFile(binlogFileName, "r"); FileChannel binlogFileChannel = binlogFile.getChannel(); MappedByteBuffer blogFileBuffer = binlogFileChannel.map(MapMode.READ_ONLY, 0, binlogFile.length()); //skip 4 bytes image number; blogFileBuffer.position(4); int starPos = blogFileBuffer.position(); //Common Header.. byte[] rawTimestamp = new byte[4]; blogFileBuffer.get(rawTimestamp); ByteOrder order = ByteOrder.LITTLE_ENDIAN; long timestampValue = ParserHelper.getUnsignedLong(rawTimestamp, order); Date timestamp = new Date(timestampValue * 1000L); byte[] rawEventType = new byte[1]; blogFileBuffer.get(rawEventType); int eventTypeValue = ParserHelper.getUnsignedInteger(rawEventType, order); byte[] rawServerId = new byte[4]; blogFileBuffer.get(rawServerId); long serverId = ParserHelper.getUnsignedInteger(rawServerId, order); byte[] rawEventSize = new byte[4]; blogFileBuffer.get(rawEventSize); long eventSize = ParserHelper.getUnsignedInteger(rawEventSize, order); byte[] rawLogPos = new byte[4]; blogFileBuffer.get(rawLogPos); int logPos = ParserHelper.getUnsignedInteger(rawLogPos, order); byte[] rawFlags = new byte[2]; blogFileBuffer.get(rawFlags); int flags = ParserHelper.getUnsignedInteger(rawFlags, order); //Event Body byte[] rawBinlogVersion = new byte[2]; blogFileBuffer.get(rawBinlogVersion); int binlogVersion = ParserHelper.getUnsignedInteger(rawBinlogVersion, order); byte[] rawserverVersion = new byte[50]; blogFileBuffer.get(rawserverVersion); String serverVersion = new String(rawserverVersion).trim(); byte[] rawCreateTimestamp = new byte[4]; blogFileBuffer.get(rawCreateTimestamp); long createTimestampValue = ParserHelper.getUnsignedInteger(rawCreateTimestamp, order); Date createTimestamp = new Date(createTimestampValue * 1000L); byte[] rawCommonHeaderSize = new byte[1]; blogFileBuffer.get(rawCommonHeaderSize); int commonHeaderSize = ParserHelper.getUnsignedInteger(rawCommonHeaderSize, order); int pos = blogFileBuffer.position(); //get current position; byte[] remainBytes = new byte[logPos - pos]; blogFileBuffer.get(remainBytes); byte rawFdPostHeaderLength = remainBytes[0x0f - 1]; //FD event = 0x0f int fdPostHeaderLength = rawFdPostHeaderLength & 0xFF; // System.out.println(fdPostHeaderLength); long crcSize = logPos - (starPos + 19 + fdPostHeaderLength); // --> 5 (1 byte crc type + crc value) byte[] postHeaderSizeArray = Arrays.copyOfRange(remainBytes, 0, remainBytes.length-5); int crcType =remainBytes[postHeaderSizeArray.length] & 0xff; byte[] crcValue = Arrays.copyOfRange(remainBytes, remainBytes.length-4, remainBytes.length); System.out.println(ParserHelper.getHexString(crcValue)); binlogFileChannel.close(); binlogFile.close(); } }
&n