Mysql資料庫監聽binlog的開啟步驟
前言
我們經常需要根據使用者對自己資料的一些操作來做一些事情.
比如如果使用者刪除了自己的賬號,我們就給他發簡訊罵他,去發簡訊求他回來.
類似於這種功能,當然可以在業務邏輯層實現,在收到使用者的刪除請求之後執行這一操作,但是資料庫的binlog為我們提供了另外一種操作方法.
要監聽binlog,需要兩步,第一步當然是你的mysql需要開啟這一個功能,第二個是要寫程式來對日誌進行讀取.
mysql開啟binlog.
首先mysql的binlog日常是不開啟的,因此我們需要:
找到mysql的配置檔案my.cnf,這個因作業系統不一樣,位置也不一定一樣,可以自己找一下,
在其中加入以下內容:
[mysqld] server_id = 1 log-bin = mysql-bin binlog-format = ROW
之後重啟mysql.
/ ubuntu service mysql restart // mac mysql.server restart
監測是否開啟成功
進入mysql命令列,執行:
show variables like '%log_bin%' ;
如果結果如下圖,則說明成功了:
檢視正在寫入的binlog狀態:
程式碼讀取binlog
引入依賴
我們使用開源的一些實現,這裡因為一些奇怪的原因,我選用了mysql-binlog-connector-java這個包,(官方github倉庫)[github.com/shyiko/mysq…]具體依賴如下:
<!-- https://mvnrepository.com/artifact/com.github.shyiko/mysql-binlog-connector-java --> <dependency> <groupId>com.github.shyiko</groupId> <artifactId>mysql-binlog-connector-java</artifactId> <version>0.17.0</version> </dependency>
當然,對binlog的處理有很多開源實現,阿里的cancl就是一個,也可以使用它.
寫個demo
根據官方倉庫中readme裡面,來簡單的寫個demo.
public static void main(String[] args) { BinaryLogClient client = new BinaryLogClient("hostname",3306,"username","passwd"); EventDeserializer eventDeserializer = new EventDeserializer(); eventDeserializer.setCompatibilityMode( EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY ); client.setEventDeserializer(eventDeserializer); client.registerEventListener(new BinaryLogClient.EventListener() { @Override public void onEvent(Event event) { // TODO dosomething(); logger.info(event.toString()); } }); client.connect(); }
這個完全是根據官方教程裡面寫的,在onEvent裡面可以寫自己的業務邏輯,由於我只是測試,所以我在裡面將每一個event都列印了出來.
之後我手動登入到mysql,分別進行了增加,修改,刪除操作,監聽到的log如下:
00:23:13.331 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=0,eventType=ROTATE,serverId=1,headerLength=19,dataLength=28,nextPosition=0,flags=32},data=RotateEventData{binlogFilename='mysql-bin.000001',binlogPosition=886}}
00:23:13.334 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468403000,eventType=FORMAT_DESCRIPTION,dataLength=100,flags=0},data=FormatDescriptionEventData{binlogVersion=4,serverVersion='5.7.23-0ubuntu0.16.04.1-log',dataLength=95}}
00:23:23.715 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000,eventType=ANONYMOUS_GTID,dataLength=46,nextPosition=951,data=null}
00:23:23.716 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000,eventType=QUERY,dataLength=51,nextPosition=1021,flags=8},data=QueryEventData{threadId=4,executionTime=0,errorCode=0,database='pf',sql='BEGIN'}}
00:23:23.721 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000,eventType=TABLE_MAP,dataLength=32,nextPosition=1072,data=TableMapEventData{tableId=108,table='student',columnTypes=15,3,columnMetadata=135,columnNullability={}}}
00:23:23.724 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000,eventType=EXT_WRITE_ROWS,dataLength=23,nextPosition=1114,data=WriteRowsEventData{tableId=108,includedColumns={0,1},rows=[
[[B@546a03af,2]
]}}
00:23:23.725 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000,eventType=XID,dataLength=12,nextPosition=1145,data=XidEventData{xid=28}}
00:23:55.872 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000,nextPosition=1210,data=null}
00:23:55.872 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000,nextPosition=1280,sql='BEGIN'}}
00:23:55.873 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000,nextPosition=1331,columnNullability={}}}
00:23:55.875 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000,eventType=EXT_UPDATE_ROWS,dataLength=31,nextPosition=1381,data=UpdateRowsEventData{tableId=108,includedColumnsBeforeUpdate={0,rows=[
{before=[[B@6833ce2c,1],after=[[B@725bef66,3]}
]}}
00:23:55.875 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000,nextPosition=1412,data=XidEventData{xid=41}}
00:24:22.333 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000,nextPosition=1477,data=null}
00:24:22.334 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000,nextPosition=1547,sql='BEGIN'}}
00:24:22.334 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000,nextPosition=1598,columnNullability={}}}
00:24:22.335 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000,eventType=EXT_DELETE_ROWS,nextPosition=1640,data=DeleteRowsEventData{tableId=108,rows=[
[[B@1888ff2c,3]
]}}
00:24:22.335 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000,nextPosition=1671,data=XidEventData{xid=42}}
根據自己的業務,封裝一個更好使,更定製的工具類
開始的時候打算貼程式碼的,但是程式碼越寫越多,索性傳在github上了,這裡只貼部分的實現.程式碼傳送門
實現思路
- 支援對單個表的監聽,因為我們不想真的對所有資料庫中的所有資料表進行監聽.
- 可以多執行緒消費.
- 把監聽到的內容轉換成我們喜聞樂見的形式(文中的資料結構不一定很好,我沒想到更加合適的了).
所以實現思路大致如下:
- 封裝個客戶端,對外只提供獲取方法,遮蔽掉初始化的細節程式碼.
- 提供註冊監聽器(偽)的方法,可以註冊對某個表的監聽(重新定義一個監聽介面,所有註冊的監聽器實現這個就好).
- 真正的監聽器只有客戶端,他將此資料庫例項上的所有操作,全部監聽到並轉換成我們想要的格式LogItem放進阻塞佇列裡面.
- 啟動多個執行緒,消費阻塞佇列,對某一個LogItem呼叫對應的資料表的監聽器,做一些業務邏輯.
初始化程式碼:
public MysqlBinLogListener(Conf conf) { BinaryLogClient client = new BinaryLogClient(conf.host,conf.port,conf.username,conf.passwd); EventDeserializer eventDeserializer = new EventDeserializer(); eventDeserializer.setCompatibilityMode( EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY ); client.setEventDeserializer(eventDeserializer); this.parseClient = client; this.queue = new ArrayBlockingQueue<>(1024); this.conf = conf; listeners = new ConcurrentHashMap<>(); dbTableCols = new ConcurrentHashMap<>(); this.consumer = Executors.newFixedThreadPool(consumerThreads); }
註冊程式碼:
public void regListener(String db,String table,BinLogListener listener) throws Exception { String dbTable = getdbTable(db,table); Class.forName("com.mysql.jdbc.Driver"); // 儲存當前註冊的表的colum資訊 Connection connection = DriverManager.getConnection("jdbc:mysql://" + conf.host + ":" + conf.port,conf.passwd); Map<String,Colum> cols = getColMap(connection,db,table); dbTableCols.put(dbTable,cols); // 儲存當前註冊的listener List<BinLogListener> list = listeners.getOrDefault(dbTable,new ArrayList<>()); list.add(listener); listeners.put(dbTable,list); }
在這個步驟中,我們在註冊監聽者的同時,獲得了該表的schema資訊,並儲存到map裡面去,方便後續對資料進行處理.
監聽程式碼:
@Override public void onEvent(Event event) { EventType eventType = event.getHeader().getEventType(); if (eventType == EventType.TABLE_MAP) { TableMapEventData tableData = event.getData(); String db = tableData.getDatabase(); String table = tableData.getTable(); dbTable = getdbTable(db,table); } // 只處理新增刪除更新三種操作 if (isWrite(eventType) || isUpdate(eventType) || isDelete(eventType)) { if (isWrite(eventType)) { WriteRowsEventData data = event.getData(); for (Serializable[] row : data.getRows()) { if (dbTableCols.containsKey(dbTable)) { LogItem e = LogItem.itemFromInsert(row,dbTableCols.get(dbTable)); e.setDbTable(dbTable); queue.add(e); } } } } }
我偷懶了,這裡面只實現了對新增操作的處理,其他操作沒有寫.
消費程式碼:
public void parse() throws IOException { parseClient.registerEventListener(this); for (int i = 0; i < consumerThreads; i++) { consumer.submit(() -> { while (true) { if (queue.size() > 0) { try { LogItem item = queue.take(); String dbtable = item.getDbTable(); listeners.get(dbtable).forEach(l -> { l.onEvent(item); }); } catch (InterruptedException e) { e.printStackTrace(); } } Thread.sleep(1000); } }); } parseClient.connect(); }
消費時,從佇列中獲取item,之後獲取對應的一個或者多個監聽者,分別消費這個item.
測試程式碼:
public static void main(String[] args) throws Exception { Conf conf = new Conf(); conf.host = "hostname"; conf.port = 3306; conf.username = conf.passwd = "hhsgsb"; MysqlBinLogListener mysqlBinLogListener = new MysqlBinLogListener(conf); mysqlBinLogListener.parseArgsAndRun(args); mysqlBinLogListener.regListener("pf","student",item -> { System.out.println(new String((byte[])item.getAfter().get("name"))); logger.info("insert into {},value = {}",item.getDbTable(),item.getAfter()); }); mysqlBinLogListener.regListener("pf","teacher",item -> System.out.println("teacher ====")); mysqlBinLogListener.parse(); }
在這段很少的程式碼裡,註冊了兩個監聽者,分別監聽student和teacher表,並分別進行列印處理,經測試,在teacher表插入資料時,可以獨立的執行定義的業務邏輯.
注意:這裡的工具類並不能直接投入使用,因為裡面有許多的異常處理沒有做,且功能僅監聽了插入語句,可以用來做實現的參考.
參考文章
- github.com/shyiko/mysq…
- https://www.jb51.net/article/166761.htm
總結
以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,謝謝大家對我們的支援。