canal原始碼分析簡介-4
7.0 driver模組
2018-11-10 22:30:196,0534driver,顧名思義為驅動。熟悉jdbc程式設計的同學都知道,當專案中需要操作資料庫(oracle、sqlserver、mysql等)時,都需要在專案中引入對應的資料庫的驅動。以mysql為例,我們需要引入的是mysql-connector-java
這個jar包,通過這個驅動包來與資料庫進行通訊。
那麼為什麼canal不使用mysql官方提供的驅動包,而要自己編寫一個driver模組?原因在於mysql-connector-java驅動包只是實現了JDBC規範,方便我們在程式中對資料庫中的資料進行增刪改查。
對於獲取並解析binlog日誌這樣的場景,mysql-connector-java並沒有提供這樣的功能。因此,canal編寫了自己的driver模組,提供了基本的增刪改查功能,並提供了直接獲取原始binlog位元組流的功能,其他模組在這個模組的基礎上對binlog位元組進行解析,parser模組底層實際上就是通過driver模組來與資料庫建立連線的。
driver模組目錄結構如下所示:
最核心的3個類分別是:
-
MysqlConnector
:表示一個數據庫連線,作用類似於java.sql.Connection -
MysqlQueryExecutor
:查詢執行器,作用類似於PrepareStatement.executeQuery() -
MysqlUpdateExecutor
:更新執行器,作用類似於PrepareStatement.executeUpdate()
在本小節中,我們將首先介紹driver模組的基本使用;接著介紹parser模組是如何使用driver模組的;最後講解driver模組的實現原理。
1 driver模組的基本使用
本小節將會介紹MysqlConnector和MysqlQueryExecutor、MysqlUpdateExecutor如何使用。
假設test庫下有一張mysql表:user
- CREATETABLE`user`(
- `id`int(11)NOTNULLAUTO_INCREMENT,
- `name`varchar(18)NOTNULL,
- `password`varchar(15)NOTNULL,
- PRIMARYKEY(`id`)
- )ENGINE=InnoDBDEFAULTCHARSET=utf8mb4;
該表中有2條記錄:
- mysql>select*fromt_user;
- +----+---------------+----------+
- |id|name|password|
- +----+---------------+----------+
- |1|tianshozhi|xx|
- |2|wangxiaoxiiao|yy|
- +----+---------------+----------+
1.1 MysqlConnector
MysqlConnector相當於一個數據連結,其使用方式如下所示:
- @Test
- publicvoidtestMysqlConnection(){
- MysqlConnectorconnector=newMysqlConnector();
- try{
- //1建立資料庫連線
- connector=newMysqlConnector();
- //設定資料庫ip、port
- connector.setAddress(newInetSocketAddress("127.0.0.1",3306));
- //設定使用者名稱
- connector.setUsername("root");
- //設定密碼
- connector.setPassword(“yourpassword");
- //設定預設連線到的資料庫
- connector.setDefaultSchema("test");
- //設定連結字串,33表示UTF-8
- connector.setCharsetNumber((byte)33);
- //======設定網路相關引數===========
- //設定socket超時時間,預設30s,也就是傳送一個請求給mysql時,如果30s沒響應,則會丟擲SocketTimeoutException
- connector.setSoTimeout(30*1000);
- //設定傳送緩衝區發小,預設16K
- connector.setSendBufferSize(16*1024);//16K
- //設定接受緩衝區大小,預設16K
- connector.setReceiveBufferSize(16*1024);//16k
- //呼叫connect方法建立連線
- connector.connect();
- //2...dosomething....
- }catch(IOExceptione){
- e.printStackTrace();
- }finally{
- try{
- //關閉連結
- connector.disconnect();
- }catch(IOExceptione){
- e.printStackTrace();
- }
- }
- }
一個MysqlConnector例項底層只能維護一個數據庫連結。除了上面提到的方法,MysqlConnector還提供了reconnect()方法和fork()方法。
reconnect()方法:
reconnect()內部先呼叫disconnect方法關閉原有連線,然後使用connect方法建立一個新的連線
- mysqlConnector.reconnect();
fork()方法:
如果希望建立多個連線,可以fork出一個新的MysqlConnector例項,再呼叫這個新MysqlConnector例項的connect方法建立連線。
- MysqlConnectorfork=mysqlConnector.fork();
- fork.connect();
1.2MysqlQueryExecutor
這裡我們使用MysqlQueryExecutor查詢資料庫中的user表中的兩條記錄,注意canal的driver模組並沒有實現jdbcref規範,因此使用起來,與我們熟悉的JDBC程式設計有一些區別。
案例程式碼:
- @Test
- publicvoidtestQuery()throwsIOException{
- MysqlConnectorconnector=newMysqlConnector(newInetSocketAddress("127.0.0.1",3306),"root”,”yourpassword");
- try{
- //1建立資料庫連線
- connector.connect();
- //2構建查詢執行器,並執行查詢
- MysqlQueryExecutorexecutor=newMysqlQueryExecutor(connector);
- //ResultSetPacket作用類似於ResultSet
- ResultSetPacketresult=executor.query("select*fromtest.user");
- //3對查詢結果進行解析
- //FieldPacket中封裝的欄位的一些源資訊,如欄位的名稱,型別等
- List<FieldPacket>fieldDescriptors=result.getFieldDescriptors();
- //欄位的值使用String表示,jdbc程式設計中使用的getInt,getBoolean,getDate等方法,實際上都是都是字串轉換得到的
- List<String>fieldValues=result.getFieldValues();
- //列印欄位名稱
- for(FieldPacketfieldDescriptor:fieldDescriptors){
- StringfieldName=fieldDescriptor.getName();
- System.out.print(fieldName+"");
- }
- //列印欄位的值
- System.out.println("\n"+fieldValues);
- }finally{
- connector.disconnect();
- }
- }
控制檯輸出如下:
- idnamepassword
- [1,tianshozhi,xx,2,wangxiaoxiiao,yy]
可以看出來:
對user表中的欄位資訊,canal中使用FieldPacket
來表示,放於一個List表示。
對於user表中的一行記錄,使用另一個List表示,這個List的大小是欄位的List大小的整數倍,前者size除以後者就是查詢出來的行數。
1.3 MysqlUpdateExecutor
使用案例
- @Test
- publicvoidtestUpdate(){
- MysqlConnectorconnector=newMysqlConnector(newInetSocketAddress("127.0.0.1",3306),"root","xx");
- try{
- connector.connect();
- MysqlUpdateExecutorexecutor=newMysqlUpdateExecutor(connector);
- OKPacketokPacket=executor.update("insertintotest.user(name,password)values('tianbowen','zzz')");
- System.out.println(JSON.toJSONString(okPacket,true));
- }catch(IOExceptione){
- e.printStackTrace();
- }finally{
- try{
- connector.disconnect();
- }catch(IOExceptione){
- e.printStackTrace();
- }
- }
- }
如果執行更新操作成功,返回的是一個OkPacket
,上面把OkPacket轉成JSON,控制檯輸出如下:
- {
- "affectedRows":"AQ==",
- "fieldCount":0,
- "insertId":"AQ==",
- "message":"",
- "serverStatus":2,
- "warningCount":0
- }
可以看到這裡OkPacke包含的資訊比較多。其中比較重要的是:sql操作影響的記錄行數affectedRows
,以及insert操作返回自動生成的主鍵insertId
。
這裡返回的insertId和affectedRows都是位元組陣列,我們需要將其轉換為數字,以insertId為例,其轉換方式如下;
- bytes[]insertId=okPacket.getInsertId();
- longautoGeneratedKey=ByteHelper.readLengthCodedBinary(insertId,0);
- System.out.println(autoGeneratedKey);
2 parser模組是如何使用driver模組的?
分析canal是如何使用driver模組的,主要就是看其他模組使用driver模組執行了哪些查詢和更新sql。由於canal的作用主要是解析binlog,因此執行的大多都是binlog解析過程中所需要使用的sql語句。
顯然parser模組需要依靠driver模組來獲取原始的binlog二進位制位元組流,因此相關sql都在driver模組中。
2.1 parser模組執行的更新sql
parser模組提供了一個MysqlConnection
對driver模組的MysqlConnector進行了封裝,在開始dump binlog前,會對當前連結進行一些引數設定,如下圖:
com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection#updateSettings
其中:
1 set wait_timeout=9999999
2 set net_write_timeout=1800
3 set net_read_timeout=1800
4 set names 'binary'
設定服務端返回結果時不做編碼轉化,直接按照資料庫的二進位制編碼進行傳送,由客戶端自己根據需求進行編碼轉化
set @master_binlog_checksum= @@global.binlog_checksum
mysql5.6針對checksum支援需要設定session變數如果不設定會出現錯誤:
- Slavecannothandlereplicationeventswiththechecksumthatmasterisconfiguredtolog
但也不能亂設定,需要和mysql server的checksum配置一致,不然RotateLogEvent會出現亂碼。'@@global.binlog_checksum'需要去掉單引號,在mysql 5.6.29下導致master退出
5 set @slave_uuid=uuid()
mysql5.6需要設定slave_uuid避免被server kill連結,參考:https://github.com/alibaba/canal/issues/284
6 SET @mariadb_slave_capability='" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + "'
mariadb針對特殊的型別,需要設定session變數
2.2 parser模組執行的查詢sql
7 show variables like 'binlog_format'
用於檢視binlog格式,值為STATEMENT,MIXED,ROW的一種,如:
- mysql>showvariableslike'binlog_format';
- +---------------+-------+
- |Variable_name|Value|
- +---------------+-------+
- |binlog_format|ROW|
- +---------------+-------+
8 show variables like 'binlog_row_image'
ROW模式下,即使我們只更新了一條記錄的其中某個欄位,也會記錄每個欄位變更前後的值,binlog日誌就會變大,帶來磁碟IO上的開銷,以及網路開銷。mysql提供了引數binlog_row_image,來控制是否需要記錄每一行的變更,其有3個值:
-
FULL: 記錄列的所有修改
-
MINIMAL:只記錄修改的列。
-
NOBLOB:如果是text型別或clob欄位,不記錄 這些日誌
如:
- mysql>showvariableslike'binlog_row_image';
- +------------------+-------+
- |Variable_name|Value|
- +------------------+-------+
- |binlog_row_image|FULL|
- +------------------+-------+
9 select @master_binlog_checksum
mysql 主從複製(replication) 同步可能會出現資料不一致,mysql 5.6 版本中加入了 replication event checksum(主從複製事件校驗)功能。預設開啟。如果開啟,每個binlog後面會多出4個位元組,為CRC32校驗值。目前cancal支援解析CRC32的值,但不會進行校驗。如:
- mysql>showvariableslike'binlog_checksum';
- +-----------------+-------+
- |Variable_name|Value|
- +-----------------+-------+
- |binlog_checksum|CRC32|
- mysql>select@master_binlog_checksum;
- +-------------------------+
- |@master_binlog_checksum|
- +-------------------------+
- |NULL|
- +-------------------------+
- 1rowinset(0.01sec)
10 show variables like 'server_id'
mysql主從同步時,每個機器都要設定一個唯一的server_id,canal連線到某個mysql例項之後,會查詢這個serverId。
11 show master status
mysql binlog是多檔案儲存,唯一確定一個binlog位置需要通過:binlog file + binlog position。show master status可以獲得當前的binlog位置,如:
- mysql>showmasterstatus;
- +--------------+----------+--------------+------------------+-------------------+
- |File|Position|Binlog_Do_DB|Binlog_Ignore_DB|Executed_Gtid_Set|
- +--------------+----------+--------------+------------------+-------------------+
- |mysql.000012|23479||||
- +--------------+----------+--------------+------------------+-------------------+
12 show binlog events limit 1
查詢最早的binlog位置。
- mysql>showbinlogeventslimit1;
- +--------------+-----+-------------+-----------+-------------+---------------------------------------+
- |Log_name|Pos|Event_type|Server_id|End_log_pos|Info|
- +--------------+-----+-------------+-----------+-------------+---------------------------------------+
- |mysql.000001|4|Format_desc|1|123|Serverver:5.7.18-log,Binlogver:4|
- +--------------+-----+-------------+-----------+-------------+---------------------------------------+
mysql binlog檔案預設從mysql.000001開始,前四個位元組是魔法位元組,是固定的。因此真正的binlog事件總是從第4個位元組之後才開始的。
binlog檔案可能會清空,官方的mysql版支援設定引數expire_logs_days
來控制binlog儲存時間,一些分支如percona,支援指定報文binlog檔案個數。主要是避免binlog過多導致磁碟空間不足。
13 show slave status
主要用於判斷MySQL複製同步狀態,這個命令的內容比較多,這裡不演示。主要是關注兩個執行緒的狀態:
-
Slave_IO_Running執行緒:負責把主庫的bin日誌(Master_Log)內容,投遞到從庫的中繼日誌上(Relay_Log)
-
Slave_SQL_Running執行緒:負責把中繼日誌上的語句在從庫上執行一遍
以及Seconds_Behind_Master
的值,其表示從庫落後主庫的時間,如果為0則表示沒有延遲。
show global variableslike 'rds\_%'
這個命令沒懂,猜測應該是判斷是否資料庫是否是是否是阿里雲上提供的rds。
"desc " + fullname
檢視庫表的欄位定義,如:
- mysql>desctest.user;
- +----------+-------------+------+-----+---------+----------------+
- |Field|Type|Null|Key|Default|Extra|
- +----------+-------------+------+-----+---------+----------------+
- |id|int(11)|NO|PRI|NULL|auto_increment|
- |name|varchar(18)|NO||NULL||
- |password|varchar(15)|NO||NULL||
- +----------+-------------+------+-----+---------+----------------+
原始的binlog二進位制流中,並不包含欄位的名稱,而canal提供個client訂閱的event中包含了欄位名稱,實際上就是通過這個命令來獲得的。parser模組的TableMetaCache
類就是用於快取表字段資訊。當表結構變更後,也會跟著自動跟新。
3 Driver模組實現原理
canal的driver模組實際上就是一個手功編寫的一個mysql客戶端。要編寫這樣的一個客戶端並不容易,需要參考Mysql client/server通訊協議,以下是地址:
https://dev.mysql.com/doc/internals/en/client-server-protocol.html
筆者也嘗試自己寫了一些功能,最終的體會是,要實現一個完整的客戶端,太多細節要考慮,沒有足夠的時間。另外一點,也建議讀者可以閱讀一下這個通訊協議即可,以便對driver模組有更深的理解。建議不要花太多時間。
事實上canal的driver客戶端也沒有實現完整的通訊協議,只是滿足了簡單的查詢和更新功能。不過從binlog解析的角度,這已經足夠了。
8.0 異地多活場景下的資料同步之道
2019-03-29 03:16:219,91511在當今網際網路行業,大多數人網際網路從業者對"單元化"、"異地多活"這些詞彙已經耳熟能詳。而資料同步是異地多活的基礎,所有具備資料儲存能力的元件如:資料庫、快取、MQ等,資料都可以進行同步,形成一個龐大而複雜的資料同步拓撲。
本文將先從概念上介紹單元化、異地多活、就近訪問等基本概念。之後,將以資料庫為例,講解在資料同步的情況下,如何解決資料迴環、資料衝突、資料重複等典型問題。
1 什麼是單元化
如果僅僅從"單元化”這個詞彙的角度來說,我們可以理解為將資料劃分到多個單元進行儲存。"單元"是一個抽象的概念,通常與資料中心(IDC)概念相關,一個單元可以包含多個IDC,也可以只包含一個IDC。本文假設一個單元只對應一個IDC。
考慮一開始只有一個IDC的情況,所有使用者的資料都會寫入同一份底層儲存中,如下圖所示:
這種架構是大多資料中小型網際網路公司採用的方案,存在以下幾個問題:
1 不同地區的使用者體驗不同。一個IDC必然只能部署在一個地區,例如部署在北京,那麼北京的使用者訪問將會得到快速響應;但是對於上海的使用者,訪問延遲一般就會大一點,上海到北京的一個RTT可能有20ms左右。
2 容災問題。這裡容災不是單臺機器故障,而是指機房斷電,自然災害,或者光纖被挖斷等重大災害。一旦出現這種問題,將無法正常為使用者提供訪問,甚至出現數據丟失的情況。這並不是不可能,例如:2015年,支付寶杭州某資料中心的光纜就被挖斷過;2018年9月,雲棲大會上,螞蟻金服當場把杭州兩個資料中心的網線剪斷。
為了解決這些問題,我們可以將服務部署到多個不同的IDC中,不同IDC之間的資料互相進行同步。如下圖:
通過這種方式,我們可以解決單機房遇到的問題:
1 使用者體驗。不同的使用者可以選擇離自己最近的機房進行訪問
2 容災問題。當一個機房掛了之後,我們可以將這個機房使用者的流量排程到另外一個正常的機房,由於不同機房之間的資料是實時同步的,使用者流量排程過去後,也可以正常訪問資料 (故障發生那一刻的少部分資料可能會丟失)。
需要注意的是,關於容災,存在一個容災級別的劃分,例如:單機故障,機架(rack)故障,機房故障,城市級故障等。我們這裡只討論機房故障和城市故障。
-
機房容災 :上面的案例中,我們使用了2個IDC,但是2個IDC並不能具備機房容災能力。至少需要3個IDC,例如,一些基於多數派協議的一致性元件,如zookeeper,redis、etcd、consul等,需要得到大部分節點的同意。例如我們部署了3個節點,在只有2個機房的情況下, 必然是一個機房部署2個節點,一個機房部署一個節點。當部署了2個節點的機房掛了之後,只剩下一個節點,無法形成多數派。在3機房的情況下,每個機房部署一個節點,任意一個機房掛了,還剩2個節點,還是可以形成多數派。這也就是我們常說的"兩地三中心”。
-
城市級容災:在發生重大自然災害的情況下,可能整個城市的機房都無法訪問。一些元件,例如螞蟻的ocean base,為了達到城市級容災的能力,使用的是"三地五中心"的方案。這種情況下,3個城市分別擁有2、2、1個機房。當整個城市發生災難時,其他兩個城市依然至少可以保證有3個機房依然是存活的,同樣可以形成多數派。
小結:如果僅僅是考慮不同地區的使用者資料就近寫入距離最近的IDC,這是純粹意義上的”單元化”。不同單元的之間資料實時進行同步,相互備份對方的資料,才能做到真正意義上"異地多活”。
實現單元化,技術層面我們要解決的事情很多,例如:流量排程,即如何讓使用者就近訪問附近的IDC;資料互通,如何實現不同機房之間資料的相互同步。流量排程不在本文的討論範疇內,資料同步是本文講解的重點。
2 如何進行資料同步
需要同步的元件有很多,例如資料庫,快取等,這裡以多個Mysql叢集之間的資料同步為例進行講解,實際上快取的同步思路也是類似。
2.1 基礎知識
為了瞭解如何對不同mysql的資料相互進行同步,我們先了解一下mysql主從複製的基本架構,如下圖所示:
通常一個mysql叢集有一主多從構成。使用者的資料都是寫入主庫Master,Master將資料寫入到本地二進位制日誌binary log中。從庫Slave啟動一個IO執行緒(I/O Thread)從主從同步binlog,寫入到本地的relay log中,同時slave還會啟動一個SQL Thread,讀取本地的relay log,寫入到本地,從而實現資料同步。
基於這個背景知識,我們就可以考慮自己編寫一個元件,其作用類似與mysql slave,也是去主庫上拉取binlog,只不過binlog不是儲存到本地,而是將binlog轉換成sql插入到目標mysql叢集中,實現資料的同步。
這並非是一件不可能完成的事,MySQL官網上已經提供好所有你自己編寫一個mysql slave 同步binlog所需的相關背景知識,訪問這個連結:https://dev.mysql.com/doc/internals/en/client-server-protocol.html,你將可以看到mysql 客戶端與服務端的通訊協議。下圖紅色框中展示了Mysql主從複製的相關協議:
當然,筆者的目的並不是希望讀者真正的按照這裡的介紹嘗試編寫一個mysql 的slave,只是想告訴讀者,模擬mysql slave拉取binlog並非是一件很神奇的事,只要你的網路基礎知識夠紮實,完全可以做到。然而,這是一個龐大而複雜的工作。以一人之力,要完成這個工作,需要佔用你大量的時間。好在,現在已經有很多開源的元件,已經實現了按照這個協議可以模擬成一個mysql的slave,拉取binlog。例如:
-
阿里巴巴開源的canal
-
美團開源的puma
-
linkedin開源的databus
...
你可以利用這些元件來完成資料同步,而不必重複造輪子。 假設你採用了上面某個開源元件進行同步,需要明白的是這個元件都要完成最基本的2件事:從源庫拉取binlog並進行解析,我們把這部分功能稱之為binlog syncer;將獲取到的binlog轉換成SQL插入目標庫,這個功能稱之為sql writer。
為什麼劃分成兩塊獨立的功能?因為binlog訂閱解析的實際應用場景並不僅僅是資料同步,如下圖:
如圖所示,我們可以通過binlog來:
-
實時更新搜尋引擎,如es中的索引資訊
-
實時更新redis中的快取
-
傳送到kafka供下游消費,由業務方自定義業務邏輯處理等
-
...
因此,通常我們把binlog syncer單獨作為一個模組,其只負責解析從資料庫中拉取並解析binlog,並在記憶體中快取(或持久化儲存)。另外,binlog syncer另外提一個sdk,業務方通過這個sdk從binlog syncer中獲取解析後的binlog資訊,然後完成自己的特定業務邏輯處理。
顯然,在資料同步的場景下,我們可以基於這個sdk,編寫一個元件專門用於將binlog轉換為sql,插入目標庫,實現資料同步,如下圖所示:
北京使用者的資料不斷寫入離自己最近的機房的DB,通過binlog syncer訂閱這個庫binlog,然後下游的binlog writer將binlog轉換成SQL,插入到目標庫。上海使用者類似,只不過方向相反,不再贅述。通過這種方式,我們可以實時的將兩個庫的資料同步到對端。當然事情並非這麼簡單,我們有一些重要的事情需要考慮。
2.1 如何獲取全量+增量的歷史資料?
通常,mysql不會儲存所有的歷史binlog。原因在於,對於一條記錄,可能我們會更新多次,這依然是一條記錄,但是針對每一次更新操作,都會產生一條binlog記錄,這樣就會存在大量的binlog,很快會將磁碟佔滿。因此DBA通常會通過一些配置項,來定時清理binlog,只保留最近一段時間內的binlog。
例如,官方版的mysql提供了expire_logs_days配置項,可以設定儲存binlog的天數,筆者這裡設定為0,表示預設不清空,如果將這個值設定大於0,則只會儲存指定的天數。
另外一些mysql 的分支,如percona server,還可以指定保留binlog檔案的個數。我們可以通過show binary logs來檢視當前mysql存在多少個binlog檔案,如下圖:
通常,如果binlog如果從來沒被清理過,那麼binlog檔名字字尾通常是000001,如果不是這個值,則說明可能已經被清理過。當然,這也不是絕對,例如執行"reset master”命令,可以將所有的binlog清空,然後從000001重新開始計數。
Whatever! 我們知道了,binlog可能不會一直保留,所以直接同步binlog,可能只能獲取到部分資料。因此,通常的策略是,由DBA先dump一份源庫的完整資料快照,增量部分,再通過binlog訂閱解析進行同步。
2.2 如何解決重複插入?
考慮以下情況下,源庫中的一條記錄沒有唯一索引。對於這個記錄的binlog,通過sql writer將binlog轉換成sql插入目標庫時,丟擲了異常,此時我們並不知道知道是否插入成功了,則需要進行重試。如果之前已經是插入目標庫成功,只是目標庫響應時網路超時(socket timeout)了,導致的異常,這個時候重試插入,就會存在多條記錄,造成資料不一致。
因此,通常,在資料同步時,通常會限制記錄必須有要有主鍵或者唯一索引。
2.3 如何解決唯一索引衝突?
由於兩邊的庫都存在資料插入,如果都使用了同一個唯一索引,那麼在同步到對端時,將會產生唯一索引衝突。對於這種情況,通常建議是使用一個全域性唯一的分散式ID生成器來生成唯一索引,保證不會產生衝突。
另外,如果真的產生衝突了,同步元件應該將衝突的記錄儲存下來,以便之後的問題排查。
2.4 對於DDL語句如何處理?
如果資料庫表中已經有大量資料,例如千萬級別、或者上億,這個時候對於這個表的DDL變更,將會變得非常慢,可能會需要幾分鐘甚至更長時間,而DDL操作是會鎖表的,這必然會對業務造成極大的影響。
因此,同步元件通常會對DDL語句進行過濾,不進行同步。DBA在不同的資料庫叢集上,通過一些線上DDL工具(如gh-ost),進行表結構變更。
2.5 如何解決資料迴環問題?
資料迴環問題,是資料同步過程中,最重要的問題。我們針對INSERT、UPDATE、DELETE三個操作來分別進行說明:
INSERT操作
假設在A庫插入資料,A庫產生binlog,之後同步到B庫,B庫同樣也會產生binlog。由於是雙向同步,這條記錄,又會被重新同步回A庫。由於A庫應存在這條記錄了,產生衝突。
UPDATE操作
先考慮針對A庫某條記錄R只有一次更新的情況,將R更新成R1,之後R1這個binlog會被同步到B庫,B庫又將R1同步會A庫。對於這種情況下,A庫將不會產生binlog。因為A庫記錄當前是R1,B庫同步回來的還是R1,意味著值沒有變。
在一個更新操作並沒有改變某條記錄值的情況下,mysql是不會產生binlog,相當於同步終止。下圖演示了當更新的值沒有變時,mysql實際上不會做任何操作:
上圖演示了,資料中原本有一條記錄(1,"tianshouzhi”),之後執行一個update語句,將id=1的記錄的name值再次更新為”tianshouzhi”,意味著值並沒有變更。這個時候,我們看到mysql 返回的影響的記錄函式為0,也就是說,並不會產生真是的更新操作。
然而,這並不意味UPDATE 操作沒有問題,事實上,其比INSERT更加危險。
考慮A庫的記錄R被連續更新了2次,第一次更新成R1,第二次被更新成R2;這兩條記錄變更資訊都被同步到B庫,B也產生了R1和R2。由於B的資料也在往A同步,B的R1會被先同步到A,而A現在的值是R2,由於值不一樣,將會被更新成R1,併產生新的binlog;此時B的R2再同步會A,發現A的值是R1,又更新成R2,也產生binlog。由於B同步回A的操作,讓A又產生了新的binlog,A又要同步到B,如此反覆,陷入無限迴圈中。
DELETE操作
同樣存在先後順序問題。例如先插入一條記錄,再刪除。B在A刪除後,又將插入的資料同步回A,接著再將A的刪除操作也同步回A,每次都會產生binlog,陷入無限迴環。
關於資料迴環問題,筆者有著血的教訓,曾經因為筆者的誤操作,將一個庫的資料同步到了自身,最終也導致無限迴圈,原因分析與上述提到的UPDATE、DELETE操作類似,讀者可自行思考。
針對上述資料同步到過程中可能會存在的資料迴環問題,最終會導致資料無限迴圈,因此我們必須要解決這個問題。由於存在多種解決方案,我們將在稍後統一進行講解。
2.6 資料同步架構設計
現在,讓我們先把思路先從解決資料同步的具體細節問題轉回來,從更高的層面講解資料同步的架構應該如何設計。稍後的內容中,我們將講解各種避免資料迴環的各種解決方案。
前面的架構中,只涉及到2個DB的資料同步,如果有多個DB資料需要相互同步的情況下,架構將會變得非常複雜。例如:
這個圖演示的是四個DB之間資料需要相互同步,這種拓撲結構非常複雜。為了解決這種問題,我們可以將資料寫入到一個數據中轉站,例如MQ中進行儲存,如下:
我們在不同的機房各部署一套MQ叢集,這個機房的binlog syncer將需要同步的DB binlog資料寫入MQ對應的Topic中。對端機房如果需要同步這個資料,只需要通過binlog writer訂閱這個topic,消費topic中的binlog資料,插入到目標庫中即可。一些MQ支援consumer group的概念,不同的consumer group的消費位置offset相互隔離,從而達到一份資料,同時供多個消費者進行訂閱的能力。
當然,一些binlog訂閱解析元件,可能實現了類似於MQ的功能,此時,則不需要獨立部署MQ。
3 資料同步迴環問題解決方案
資料迴環問題有多種解決方案,通過排除法,一一進行講解。
3.1 往目標庫插入不生成binlog
在mysql中,我們可以設定session變數,來控制當前會話上的更新操作,不產生binlog。這樣當往目標庫插入資料時,由於不產生binlog,也就不會被同步會源庫了。為了演示這個效果,筆者清空了本機上的所有binlog(執行reset master),現在如下圖所示:
忽略這兩個binlog event,binlog檔案格式最開始就是這兩個event。
接著,筆者執行set sql_log_bin=0,然後插入一條語句,最後可以看到的確沒有產生新的binlog事件:
通過這種方式,貌似可以解決資料迴環問題。目標庫不產生binlog,就不會被同步會源庫。
但是,答案是否定的。我們是往目標庫的master插入資料,如果不產生binlog,目標庫的slave也無法同步資料,主從資料不一致。所以,需要排除這種方案。
提示:如果恢復set sql_log_bin=1,插入語句是會產生binlog,讀者可以自行模擬。
3.2 控制binlog同步方向
既然不產生binlog不能解決問題。那麼換一種思路,可以產生binlog。當把一個binlog轉換成sql時,插入某個庫之前,我們先判斷這條記錄是不是原本就是這個庫產生的,如果是,那麼就拋棄,也可以避免迴環問題。
現在問題就變為,如何給binlog加個標記,表示其實那個mysql叢集產生的。這也有幾種方案,下面一一講述。
3.2.1 ROW模式下記錄sql
mysql主從同步,binlog複製一般有3種模式。STATEMENT,ROW,MIXED。預設情況下,STATEMENT模式只記錄SQL語句,ROW模式只記錄欄位變更前後的值,MIXED模式是二者混合。binlog同步一般使用的都是ROW模式,高版本Mysql主從同步預設也是ROW模式。
我們想採取的方案是,在執行的SQL之前加上一段特殊標記,表示這個SQL的來源。例如
- /*IDC1:DB1*/insertintousers(name)values("tianbowen")
其中/*IDC1:DB1*/是一個註釋,表示這個SQL原始在是IDC1的DB1中產生的。之後,在同步的時候,解析出SQL中的IDC資訊,就能判斷出是不是自己產生的資料。
然而,ROW模式下,預設只記錄變更前後的值,不記錄SQL。所以,我們要通過一個開關,讓Mysql在ROW模式下也記錄INSERT、UPDATE、DELETE的SQL語句。具體做法是,在mysql的配置檔案中,新增以下配置:
- binlog_rows_query_log_events=1
這個配置可以讓mysql在binlog中產生ROWS_QUERY_LOG_EVENT型別的binlog事件,其記錄的就是執行的SQL。
通過這種方式,我們就記錄下的一個binlog最初是由哪一個叢集產生的,之後在同步的時候,sql writer判斷目標機房和當前binlog中包含的機房相同,則拋棄這條資料,從而避免迴環。
這種思路,功能上沒問題,但是在實踐中,確非常麻煩。首先,讓業務對執行的每條sql都加上一個這樣的標識,幾乎不可能。另外,如果忘記加了,就不知道資料的來源了。如果採用這種方案,可以考慮在資料庫訪問層中介軟體層面新增支援在sql之前增加/*..*/的功能,統一對業務遮蔽。即使這樣,也不完美,不能保證所有的sql都通過中介軟體來來寫入,例如DBA的一些日常運維操作,或者手工通過mysql命令列來操作資料庫時,肯定會存在沒有新增機房資訊的情況。
總的來說,這個方案不是那麼完美。
3.2.2 通過附加表記錄binlog產生源叢集資訊
這種方案目前很多公司使用。大致思路是,在db中都加一張額外的表,例如叫direction,記錄一個binlog產生的源叢集的資訊。例如
- CREATETABLE`direction`(
- `idc`varchar(255)notnull,
- `db_cluster`varchar(255)notnull,
- )ENGINE=InnoDBDEFAULTCHARSET=utf8mb4
idc欄位用於記錄某條記錄原始產生的IDC,db_cluster用於記錄原始產生的資料庫叢集(注意這裡要使用叢集的名稱,不能是server_id,因為可能會發生主從切換)。
假設使用者在IDC1的庫A插入的一條記錄(也可以在事務中插入多條記錄,單條記錄,即使不開啟事務,mysql預設也會開啟事務):
- BEGIN;
- insertintousers(name)values("tianshouzhi”);
- COMMIT;
那麼A庫資料binlog通過sql writer同步到目標庫B時,sql writer可以提前對事務中的資訊可以進行一些修改,,如下所示:
- BEGIN;
- #往目標庫同步時,首先額外插入一條記錄,表示這個事務中的資料都是A產生的。
- insertintodirection(idc,db_cluster)values("IDC1”,"DB_A”)
- #插入原來的記錄資訊
- insertintousers(name)values("tianshouzhi”);
- COMMIT;
之後B庫的資料往A同步時,就可以根據binlog中的第一條記錄的資訊,判斷這個記錄原本就是A產生的,進行拋棄,通過這種方式來避免迴環。這種方案已經已經過很多的公司的實際驗證。
3.2.3 通過GTID
Mysql 5.6引入了GTID(全域性事務id)的概念,極大的簡化的DBA的運維。在資料同步的場景下,GTID依然也可以發揮極大的威力。
GTID 由2個部分組成:
- server_uuid:transaction_id
其中server_uuid是mysql隨機生成的,全域性唯一。transaction_id事務id,預設情況下每次插入一個事務,transaction_id自增1。注意,這裡並不會對GTID進行全面的介紹,僅說明其在資料同步的場景下,如何避免迴環、資料重複插入的問題。
GTID提供了一個會話級變數gtid_next,指示如何產生下一個GTID。可能的取值如下:
-
AUTOMATIC: 自動生成下一個GTID,實現上是分配一個當前例項上尚未執行過的序號最小的GTID。
-
ANONYMOUS: 設定後執行事務不會產生GTID,顯式指定的GTID。
預設情況下,是AUTOMATIC,也就是自動生成的,例如我們執行sql:
- insertintousers(name)values("tianbowen”);
產生的binlog資訊如下:
可以看到,GTID會在每個事務(Query->...->Xid)之前,設定這個事務下一次要使用到的GTID。
從源庫訂閱binlog的時候,由於這個GTID也可以被解析到,之後在往目標庫同步資料的時候,我們可以顯示的的指定這個GTID,不讓目標自動生成。也就是說,往目標庫,同步資料時,變成了2條SQL:
- SETGTID_NEXT='09530823-4f7d-11e9-b569-00163e121964:1’
- insertintousers(name)values("tianbowen")
由於我們顯示指定了GTID,目標庫就會使用這個GTID當做當前事務ID,不會自動生成。同樣,這個操作也會在目標庫產生binlog資訊,需要同步回源庫。再往源庫同步時,我們按照相同的方式,先設定GTID,在執行解析binlog後得到的SQL,還是上面的內容
SET GTID_NEXT= '09530823-4f7d-11e9-b569-00163e121964:1'
insert into users(name) values("tianbowen")
由於這個GTID在源庫中已經存在了,插入記錄將會被忽略,演示如下:
- mysql>SETGTID_NEXT='09530823-4f7d-11e9-b569-00163e121964:1';
- QueryOK,0rowsaffected(0.00sec)
- mysql>insertintousers(name)values("tianbowen");
- QueryOK,0rowsaffected(0.01sec)#注意這裡,影響的記錄行數為0
注意這裡,對於一條insert語句,其影響的記錄函式居然為0,也就會插入並沒有產生記錄,也就不會產生binlog,避免了迴圈問題。
如何做到的呢?mysql會記錄自己執行過的所有GTID,當判斷一個GTID已經執行過,就會忽略。通過如下sql檢視:
- mysql>showglobalvariableslike"gtid_executed";
- +---------------+------------------------------------------+
- |Variable_name|Value|
- +---------------+------------------------------------------+
- |gtid_executed|09530823-4f7d-11e9-b569-00163e121964:1-5|
- +---------------+------------------------------------------+
上述value部分,冒號":"前面的是server_uuid,冒號後面的1-5,是一個範圍,表示已經執行過1,2,3,4,5這個幾個transaction_id。這裡就能解釋了,在GTID模式的情況下,為什麼前面的插入語句影響的記錄函式為0了。
顯然,GTID除了可以幫助我們避免資料迴環問題,還可以幫助我們解決資料重複插入的問題,對於一條沒有主鍵或者唯一索引的記錄,即使重複插入也沒有,只要GTID已經執行過,之後的重複插入都會忽略。
當然,我們還可以做得更加細緻,不需要每次都往目標庫設定GTID_NEXT,這畢竟是一次網路通訊。sql writer在往目標庫插入資料之前,先判斷目標庫的server_uuid是不是和當前binlog事務資訊攜帶的server_uuid相同,如果相同,則可以直接丟棄。檢視目標庫的gtid,可以通過以下sql執行:
- mysql>showvariableslike"server_uuid";
- +---------------+--------------------------------------+
- |Variable_name|Value|
- +---------------+--------------------------------------+
- |server_uuid|09530823-4f7d-11e9-b569-00163e121964|
- +---------------+--------------------------------------+
GTID應該算是一個終極的資料迴環解決方案,mysql原生自帶,比新增一個輔助表的方式更輕量,開銷也更低。需要注意的是,這倒並不是一定說GTID的方案就比輔助表好,因為輔助表可以新增機房等額外資訊。在一些場景下,如果下游需要知道這條記錄原始產生的機房,還是需要使用輔助表。
4 開源元件介紹canal/otter
前面深入講解了單元化場景下資料同步的基礎知識。讀者可能比較感興趣的是,哪些開源元件在這些方面做的比較好。筆者建議的首選,是canal/otter組合。
canal的作用就是類似於前面所述的binlog syncer,拉取解析binlog。otter是canal的客戶端,專門用於進行資料同步,類似於前文所講解的sql writer。並且,canal的最新版本已經實現了GTID。