1. 程式人生 > 其它 >canal原始碼分析簡介-4

canal原始碼分析簡介-4

7.0 driver模組

2018-11-10 22:30:196,0534

driver,顧名思義為驅動。熟悉jdbc程式設計的同學都知道,當專案中需要操作資料庫(oracle、sqlserver、mysql等)時,都需要在專案中引入對應的資料庫的驅動。以mysql為例,我們需要引入的是mysql-connector-java這個jar包,通過這個驅動包來與資料庫進行通訊。

那麼為什麼canal不使用mysql官方提供的驅動包,而要自己編寫一個driver模組?原因在於mysql-connector-java驅動包只是實現了JDBC規範,方便我們在程式中對資料庫中的資料進行增刪改查。

對於獲取並解析binlog日誌這樣的場景,mysql-connector-java並沒有提供這樣的功能。因此,canal編寫了自己的driver模組,提供了基本的增刪改查功能,並提供了直接獲取原始binlog位元組流的功能,其他模組在這個模組的基礎上對binlog位元組進行解析,parser模組底層實際上就是通過driver模組來與資料庫建立連線的。

driver模組目錄結構如下所示:

最核心的3個類分別是:

  • MysqlConnector:表示一個數據庫連線,作用類似於java.sql.Connection

  • MysqlQueryExecutor:查詢執行器,作用類似於PrepareStatement.executeQuery()

  • MysqlUpdateExecutor:更新執行器,作用類似於PrepareStatement.executeUpdate()

在本小節中,我們將首先介紹driver模組的基本使用;接著介紹parser模組是如何使用driver模組的;最後講解driver模組的實現原理。

1 driver模組的基本使用

本小節將會介紹MysqlConnector和MysqlQueryExecutor、MysqlUpdateExecutor如何使用。

假設test庫下有一張mysql表:user

  1. CREATETABLE`user`(
  2. `id`int(11)NOTNULLAUTO_INCREMENT,
  3. `name`varchar(18)NOTNULL,
  4. `password`varchar(15)NOTNULL,
  5. PRIMARYKEY(`id`)
  6. )ENGINE=InnoDBDEFAULTCHARSET=utf8mb4;

該表中有2條記錄:

  1. mysql>select*fromt_user;
  2. +----+---------------+----------+
  3. |id|name|password|
  4. +----+---------------+----------+
  5. |1|tianshozhi|xx|
  6. |2|wangxiaoxiiao|yy|
  7. +----+---------------+----------+

1.1 MysqlConnector

MysqlConnector相當於一個數據連結,其使用方式如下所示:

  1. @Test
  2. publicvoidtestMysqlConnection(){
  3. MysqlConnectorconnector=newMysqlConnector();
  4. try{
  5. //1建立資料庫連線
  6. connector=newMysqlConnector();
  7. //設定資料庫ip、port
  8. connector.setAddress(newInetSocketAddress("127.0.0.1",3306));
  9. //設定使用者名稱
  10. connector.setUsername("root");
  11. //設定密碼
  12. connector.setPassword(“yourpassword");
  13. //設定預設連線到的資料庫
  14. connector.setDefaultSchema("test");
  15. //設定連結字串,33表示UTF-8
  16. connector.setCharsetNumber((byte)33);
  17. //======設定網路相關引數===========
  18. //設定socket超時時間,預設30s,也就是傳送一個請求給mysql時,如果30s沒響應,則會丟擲SocketTimeoutException
  19. connector.setSoTimeout(30*1000);
  20. //設定傳送緩衝區發小,預設16K
  21. connector.setSendBufferSize(16*1024);//16K
  22. //設定接受緩衝區大小,預設16K
  23. connector.setReceiveBufferSize(16*1024);//16k
  24. //呼叫connect方法建立連線
  25. connector.connect();
  26. //2...dosomething....
  27. }catch(IOExceptione){
  28. e.printStackTrace();
  29. }finally{
  30. try{
  31. //關閉連結
  32. connector.disconnect();
  33. }catch(IOExceptione){
  34. e.printStackTrace();
  35. }
  36. }
  37. }

一個MysqlConnector例項底層只能維護一個數據庫連結。除了上面提到的方法,MysqlConnector還提供了reconnect()方法和fork()方法。

reconnect()方法:

reconnect()內部先呼叫disconnect方法關閉原有連線,然後使用connect方法建立一個新的連線

  1. mysqlConnector.reconnect();

fork()方法:

如果希望建立多個連線,可以fork出一個新的MysqlConnector例項,再呼叫這個新MysqlConnector例項的connect方法建立連線。

  1. MysqlConnectorfork=mysqlConnector.fork();
  2. fork.connect();

1.2MysqlQueryExecutor

這裡我們使用MysqlQueryExecutor查詢資料庫中的user表中的兩條記錄,注意canal的driver模組並沒有實現jdbcref規範,因此使用起來,與我們熟悉的JDBC程式設計有一些區別。

案例程式碼:

  1. @Test
  2. publicvoidtestQuery()throwsIOException{
  3. MysqlConnectorconnector=newMysqlConnector(newInetSocketAddress("127.0.0.1",3306),"root”,”yourpassword");
  4. try{
  5. //1建立資料庫連線
  6. connector.connect();
  7. //2構建查詢執行器,並執行查詢
  8. MysqlQueryExecutorexecutor=newMysqlQueryExecutor(connector);
  9. //ResultSetPacket作用類似於ResultSet
  10. ResultSetPacketresult=executor.query("select*fromtest.user");
  11. //3對查詢結果進行解析
  12. //FieldPacket中封裝的欄位的一些源資訊,如欄位的名稱,型別等
  13. List<FieldPacket>fieldDescriptors=result.getFieldDescriptors();
  14. //欄位的值使用String表示,jdbc程式設計中使用的getInt,getBoolean,getDate等方法,實際上都是都是字串轉換得到的
  15. List<String>fieldValues=result.getFieldValues();
  16. //列印欄位名稱
  17. for(FieldPacketfieldDescriptor:fieldDescriptors){
  18. StringfieldName=fieldDescriptor.getName();
  19. System.out.print(fieldName+"");
  20. }
  21. //列印欄位的值
  22. System.out.println("\n"+fieldValues);
  23. }finally{
  24. connector.disconnect();
  25. }
  26. }

控制檯輸出如下:

  1. idnamepassword
  2. [1,tianshozhi,xx,2,wangxiaoxiiao,yy]

可以看出來:

對user表中的欄位資訊,canal中使用FieldPacket來表示,放於一個List表示。

對於user表中的一行記錄,使用另一個List表示,這個List的大小是欄位的List大小的整數倍,前者size除以後者就是查詢出來的行數。

1.3 MysqlUpdateExecutor

使用案例

  1. @Test
  2. publicvoidtestUpdate(){
  3. MysqlConnectorconnector=newMysqlConnector(newInetSocketAddress("127.0.0.1",3306),"root","xx");
  4. try{
  5. connector.connect();
  6. MysqlUpdateExecutorexecutor=newMysqlUpdateExecutor(connector);
  7. OKPacketokPacket=executor.update("insertintotest.user(name,password)values('tianbowen','zzz')");
  8. System.out.println(JSON.toJSONString(okPacket,true));
  9. }catch(IOExceptione){
  10. e.printStackTrace();
  11. }finally{
  12. try{
  13. connector.disconnect();
  14. }catch(IOExceptione){
  15. e.printStackTrace();
  16. }
  17. }
  18. }

如果執行更新操作成功,返回的是一個OkPacket,上面把OkPacket轉成JSON,控制檯輸出如下:

  1. {
  2. "affectedRows":"AQ==",
  3. "fieldCount":0,
  4. "insertId":"AQ==",
  5. "message":"",
  6. "serverStatus":2,
  7. "warningCount":0
  8. }

可以看到這裡OkPacke包含的資訊比較多。其中比較重要的是:sql操作影響的記錄行數affectedRows,以及insert操作返回自動生成的主鍵insertId

這裡返回的insertId和affectedRows都是位元組陣列,我們需要將其轉換為數字,以insertId為例,其轉換方式如下;

  1. bytes[]insertId=okPacket.getInsertId();
  2. longautoGeneratedKey=ByteHelper.readLengthCodedBinary(insertId,0);
  3. System.out.println(autoGeneratedKey);

2 parser模組是如何使用driver模組的?

分析canal是如何使用driver模組的,主要就是看其他模組使用driver模組執行了哪些查詢和更新sql。由於canal的作用主要是解析binlog,因此執行的大多都是binlog解析過程中所需要使用的sql語句。

顯然parser模組需要依靠driver模組來獲取原始的binlog二進位制位元組流,因此相關sql都在driver模組中。

2.1 parser模組執行的更新sql

parser模組提供了一個MysqlConnection對driver模組的MysqlConnector進行了封裝,在開始dump binlog前,會對當前連結進行一些引數設定,如下圖:

com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection#updateSettings

其中:

1 set wait_timeout=9999999

2 set net_write_timeout=1800

3 set net_read_timeout=1800

4 set names 'binary'

設定服務端返回結果時不做編碼轉化,直接按照資料庫的二進位制編碼進行傳送,由客戶端自己根據需求進行編碼轉化

set @master_binlog_checksum= @@global.binlog_checksum

mysql5.6針對checksum支援需要設定session變數如果不設定會出現錯誤:

  1. Slavecannothandlereplicationeventswiththechecksumthatmasterisconfiguredtolog

但也不能亂設定,需要和mysql server的checksum配置一致,不然RotateLogEvent會出現亂碼。'@@global.binlog_checksum'需要去掉單引號,在mysql 5.6.29下導致master退出

5 set @slave_uuid=uuid()

mysql5.6需要設定slave_uuid避免被server kill連結,參考:https://github.com/alibaba/canal/issues/284

6 SET @mariadb_slave_capability='" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + "'

mariadb針對特殊的型別,需要設定session變數

2.2 parser模組執行的查詢sql

7 show variables like 'binlog_format'

用於檢視binlog格式,值為STATEMENT,MIXED,ROW的一種,如:

  1. mysql>showvariableslike'binlog_format';
  2. +---------------+-------+
  3. |Variable_name|Value|
  4. +---------------+-------+
  5. |binlog_format|ROW|
  6. +---------------+-------+

8 show variables like 'binlog_row_image'

ROW模式下,即使我們只更新了一條記錄的其中某個欄位,也會記錄每個欄位變更前後的值,binlog日誌就會變大,帶來磁碟IO上的開銷,以及網路開銷。mysql提供了引數binlog_row_image,來控制是否需要記錄每一行的變更,其有3個值:

  • FULL: 記錄列的所有修改

  • MINIMAL:只記錄修改的列。

  • NOBLOB:如果是text型別或clob欄位,不記錄 這些日誌

如:

  1. mysql>showvariableslike'binlog_row_image';
  2. +------------------+-------+
  3. |Variable_name|Value|
  4. +------------------+-------+
  5. |binlog_row_image|FULL|
  6. +------------------+-------+

9 select @master_binlog_checksum

mysql 主從複製(replication) 同步可能會出現資料不一致,mysql 5.6 版本中加入了 replication event checksum(主從複製事件校驗)功能。預設開啟。如果開啟,每個binlog後面會多出4個位元組,為CRC32校驗值。目前cancal支援解析CRC32的值,但不會進行校驗。如:

  1. mysql>showvariableslike'binlog_checksum';
  2. +-----------------+-------+
  3. |Variable_name|Value|
  4. +-----------------+-------+
  5. |binlog_checksum|CRC32|
  6. mysql>select@master_binlog_checksum;
  7. +-------------------------+
  8. |@master_binlog_checksum|
  9. +-------------------------+
  10. |NULL|
  11. +-------------------------+
  12. 1rowinset(0.01sec)

10 show variables like 'server_id'

mysql主從同步時,每個機器都要設定一個唯一的server_id,canal連線到某個mysql例項之後,會查詢這個serverId。

11 show master status

mysql binlog是多檔案儲存,唯一確定一個binlog位置需要通過:binlog file + binlog position。show master status可以獲得當前的binlog位置,如:

  1. mysql>showmasterstatus;
  2. +--------------+----------+--------------+------------------+-------------------+
  3. |File|Position|Binlog_Do_DB|Binlog_Ignore_DB|Executed_Gtid_Set|
  4. +--------------+----------+--------------+------------------+-------------------+
  5. |mysql.000012|23479||||
  6. +--------------+----------+--------------+------------------+-------------------+

12 show binlog events limit 1

查詢最早的binlog位置。

  1. mysql>showbinlogeventslimit1;
  2. +--------------+-----+-------------+-----------+-------------+---------------------------------------+
  3. |Log_name|Pos|Event_type|Server_id|End_log_pos|Info|
  4. +--------------+-----+-------------+-----------+-------------+---------------------------------------+
  5. |mysql.000001|4|Format_desc|1|123|Serverver:5.7.18-log,Binlogver:4|
  6. +--------------+-----+-------------+-----------+-------------+---------------------------------------+

mysql binlog檔案預設從mysql.000001開始,前四個位元組是魔法位元組,是固定的。因此真正的binlog事件總是從第4個位元組之後才開始的。

binlog檔案可能會清空,官方的mysql版支援設定引數expire_logs_days來控制binlog儲存時間,一些分支如percona,支援指定報文binlog檔案個數。主要是避免binlog過多導致磁碟空間不足。

13 show slave status

主要用於判斷MySQL複製同步狀態,這個命令的內容比較多,這裡不演示。主要是關注兩個執行緒的狀態:

  • Slave_IO_Running執行緒:負責把主庫的bin日誌(Master_Log)內容,投遞到從庫的中繼日誌上(Relay_Log)

  • Slave_SQL_Running執行緒:負責把中繼日誌上的語句在從庫上執行一遍

以及Seconds_Behind_Master的值,其表示從庫落後主庫的時間,如果為0則表示沒有延遲。

show global variableslike 'rds\_%'

這個命令沒懂,猜測應該是判斷是否資料庫是否是是否是阿里雲上提供的rds。

"desc " + fullname

檢視庫表的欄位定義,如:

  1. mysql>desctest.user;
  2. +----------+-------------+------+-----+---------+----------------+
  3. |Field|Type|Null|Key|Default|Extra|
  4. +----------+-------------+------+-----+---------+----------------+
  5. |id|int(11)|NO|PRI|NULL|auto_increment|
  6. |name|varchar(18)|NO||NULL||
  7. |password|varchar(15)|NO||NULL||
  8. +----------+-------------+------+-----+---------+----------------+

原始的binlog二進位制流中,並不包含欄位的名稱,而canal提供個client訂閱的event中包含了欄位名稱,實際上就是通過這個命令來獲得的。parser模組的TableMetaCache類就是用於快取表字段資訊。當表結構變更後,也會跟著自動跟新。

3 Driver模組實現原理

canal的driver模組實際上就是一個手功編寫的一個mysql客戶端。要編寫這樣的一個客戶端並不容易,需要參考Mysql client/server通訊協議,以下是地址:

https://dev.mysql.com/doc/internals/en/client-server-protocol.html

筆者也嘗試自己寫了一些功能,最終的體會是,要實現一個完整的客戶端,太多細節要考慮,沒有足夠的時間。另外一點,也建議讀者可以閱讀一下這個通訊協議即可,以便對driver模組有更深的理解。建議不要花太多時間。

事實上canal的driver客戶端也沒有實現完整的通訊協議,只是滿足了簡單的查詢和更新功能。不過從binlog解析的角度,這已經足夠了。

8.0 異地多活場景下的資料同步之道

2019-03-29 03:16:219,91511

在當今網際網路行業,大多數人網際網路從業者對"單元化"、"異地多活"這些詞彙已經耳熟能詳。而資料同步是異地多活的基礎,所有具備資料儲存能力的元件如:資料庫、快取、MQ等,資料都可以進行同步,形成一個龐大而複雜的資料同步拓撲。

本文將先從概念上介紹單元化、異地多活、就近訪問等基本概念。之後,將以資料庫為例,講解在資料同步的情況下,如何解決資料迴環、資料衝突、資料重複等典型問題。

1 什麼是單元化

如果僅僅從"單元化”這個詞彙的角度來說,我們可以理解為將資料劃分到多個單元進行儲存。"單元"是一個抽象的概念,通常與資料中心(IDC)概念相關,一個單元可以包含多個IDC,也可以只包含一個IDC。本文假設一個單元只對應一個IDC。

考慮一開始只有一個IDC的情況,所有使用者的資料都會寫入同一份底層儲存中,如下圖所示:

這種架構是大多資料中小型網際網路公司採用的方案,存在以下幾個問題:

1 不同地區的使用者體驗不同。一個IDC必然只能部署在一個地區,例如部署在北京,那麼北京的使用者訪問將會得到快速響應;但是對於上海的使用者,訪問延遲一般就會大一點,上海到北京的一個RTT可能有20ms左右。

2 容災問題。這裡容災不是單臺機器故障,而是指機房斷電,自然災害,或者光纖被挖斷等重大災害。一旦出現這種問題,將無法正常為使用者提供訪問,甚至出現數據丟失的情況。這並不是不可能,例如:2015年,支付寶杭州某資料中心的光纜就被挖斷過;2018年9月,雲棲大會上,螞蟻金服當場把杭州兩個資料中心的網線剪斷。

為了解決這些問題,我們可以將服務部署到多個不同的IDC中,不同IDC之間的資料互相進行同步。如下圖:

通過這種方式,我們可以解決單機房遇到的問題:

1 使用者體驗。不同的使用者可以選擇離自己最近的機房進行訪問

2 容災問題。當一個機房掛了之後,我們可以將這個機房使用者的流量排程到另外一個正常的機房,由於不同機房之間的資料是實時同步的,使用者流量排程過去後,也可以正常訪問資料 (故障發生那一刻的少部分資料可能會丟失)。

需要注意的是,關於容災,存在一個容災級別的劃分,例如:單機故障,機架(rack)故障,機房故障,城市級故障等。我們這裡只討論機房故障和城市故障。

  • 機房容災 :上面的案例中,我們使用了2個IDC,但是2個IDC並不能具備機房容災能力。至少需要3個IDC,例如,一些基於多數派協議的一致性元件,如zookeeper,redis、etcd、consul等,需要得到大部分節點的同意。例如我們部署了3個節點,在只有2個機房的情況下, 必然是一個機房部署2個節點,一個機房部署一個節點。當部署了2個節點的機房掛了之後,只剩下一個節點,無法形成多數派。在3機房的情況下,每個機房部署一個節點,任意一個機房掛了,還剩2個節點,還是可以形成多數派。這也就是我們常說的"兩地三中心”。

  • 城市級容災:在發生重大自然災害的情況下,可能整個城市的機房都無法訪問。一些元件,例如螞蟻的ocean base,為了達到城市級容災的能力,使用的是"三地五中心"的方案。這種情況下,3個城市分別擁有2、2、1個機房。當整個城市發生災難時,其他兩個城市依然至少可以保證有3個機房依然是存活的,同樣可以形成多數派。

小結:如果僅僅是考慮不同地區的使用者資料就近寫入距離最近的IDC,這是純粹意義上的”單元化”。不同單元的之間資料實時進行同步,相互備份對方的資料,才能做到真正意義上"異地多活”。

實現單元化,技術層面我們要解決的事情很多,例如:流量排程,即如何讓使用者就近訪問附近的IDC;資料互通,如何實現不同機房之間資料的相互同步。流量排程不在本文的討論範疇內,資料同步是本文講解的重點。

2 如何進行資料同步

需要同步的元件有很多,例如資料庫,快取等,這裡以多個Mysql叢集之間的資料同步為例進行講解,實際上快取的同步思路也是類似。

2.1 基礎知識

為了瞭解如何對不同mysql的資料相互進行同步,我們先了解一下mysql主從複製的基本架構,如下圖所示:

通常一個mysql叢集有一主多從構成。使用者的資料都是寫入主庫Master,Master將資料寫入到本地二進位制日誌binary log中。從庫Slave啟動一個IO執行緒(I/O Thread)從主從同步binlog,寫入到本地的relay log中,同時slave還會啟動一個SQL Thread,讀取本地的relay log,寫入到本地,從而實現資料同步。

基於這個背景知識,我們就可以考慮自己編寫一個元件,其作用類似與mysql slave,也是去主庫上拉取binlog,只不過binlog不是儲存到本地,而是將binlog轉換成sql插入到目標mysql叢集中,實現資料的同步。

這並非是一件不可能完成的事,MySQL官網上已經提供好所有你自己編寫一個mysql slave 同步binlog所需的相關背景知識,訪問這個連結:https://dev.mysql.com/doc/internals/en/client-server-protocol.html,你將可以看到mysql 客戶端與服務端的通訊協議。下圖紅色框中展示了Mysql主從複製的相關協議:

當然,筆者的目的並不是希望讀者真正的按照這裡的介紹嘗試編寫一個mysql 的slave,只是想告訴讀者,模擬mysql slave拉取binlog並非是一件很神奇的事,只要你的網路基礎知識夠紮實,完全可以做到。然而,這是一個龐大而複雜的工作。以一人之力,要完成這個工作,需要佔用你大量的時間。好在,現在已經有很多開源的元件,已經實現了按照這個協議可以模擬成一個mysql的slave,拉取binlog。例如:

  • 阿里巴巴開源的canal

  • 美團開源的puma

  • linkedin開源的databus

...

你可以利用這些元件來完成資料同步,而不必重複造輪子。 假設你採用了上面某個開源元件進行同步,需要明白的是這個元件都要完成最基本的2件事:從源庫拉取binlog並進行解析,我們把這部分功能稱之為binlog syncer;將獲取到的binlog轉換成SQL插入目標庫,這個功能稱之為sql writer。

為什麼劃分成兩塊獨立的功能?因為binlog訂閱解析的實際應用場景並不僅僅是資料同步,如下圖:

如圖所示,我們可以通過binlog來:

  • 實時更新搜尋引擎,如es中的索引資訊

  • 實時更新redis中的快取

  • 傳送到kafka供下游消費,由業務方自定義業務邏輯處理等

  • ...

因此,通常我們把binlog syncer單獨作為一個模組,其只負責解析從資料庫中拉取並解析binlog,並在記憶體中快取(或持久化儲存)。另外,binlog syncer另外提一個sdk,業務方通過這個sdk從binlog syncer中獲取解析後的binlog資訊,然後完成自己的特定業務邏輯處理。

顯然,在資料同步的場景下,我們可以基於這個sdk,編寫一個元件專門用於將binlog轉換為sql,插入目標庫,實現資料同步,如下圖所示:

北京使用者的資料不斷寫入離自己最近的機房的DB,通過binlog syncer訂閱這個庫binlog,然後下游的binlog writer將binlog轉換成SQL,插入到目標庫。上海使用者類似,只不過方向相反,不再贅述。通過這種方式,我們可以實時的將兩個庫的資料同步到對端。當然事情並非這麼簡單,我們有一些重要的事情需要考慮。

2.1 如何獲取全量+增量的歷史資料?

通常,mysql不會儲存所有的歷史binlog。原因在於,對於一條記錄,可能我們會更新多次,這依然是一條記錄,但是針對每一次更新操作,都會產生一條binlog記錄,這樣就會存在大量的binlog,很快會將磁碟佔滿。因此DBA通常會通過一些配置項,來定時清理binlog,只保留最近一段時間內的binlog。

例如,官方版的mysql提供了expire_logs_days配置項,可以設定儲存binlog的天數,筆者這裡設定為0,表示預設不清空,如果將這個值設定大於0,則只會儲存指定的天數。

另外一些mysql 的分支,如percona server,還可以指定保留binlog檔案的個數。我們可以通過show binary logs來檢視當前mysql存在多少個binlog檔案,如下圖:

通常,如果binlog如果從來沒被清理過,那麼binlog檔名字字尾通常是000001,如果不是這個值,則說明可能已經被清理過。當然,這也不是絕對,例如執行"reset master”命令,可以將所有的binlog清空,然後從000001重新開始計數。

Whatever! 我們知道了,binlog可能不會一直保留,所以直接同步binlog,可能只能獲取到部分資料。因此,通常的策略是,由DBA先dump一份源庫的完整資料快照,增量部分,再通過binlog訂閱解析進行同步。

2.2 如何解決重複插入?

考慮以下情況下,源庫中的一條記錄沒有唯一索引。對於這個記錄的binlog,通過sql writer將binlog轉換成sql插入目標庫時,丟擲了異常,此時我們並不知道知道是否插入成功了,則需要進行重試。如果之前已經是插入目標庫成功,只是目標庫響應時網路超時(socket timeout)了,導致的異常,這個時候重試插入,就會存在多條記錄,造成資料不一致。

因此,通常,在資料同步時,通常會限制記錄必須有要有主鍵或者唯一索引。

2.3 如何解決唯一索引衝突?

由於兩邊的庫都存在資料插入,如果都使用了同一個唯一索引,那麼在同步到對端時,將會產生唯一索引衝突。對於這種情況,通常建議是使用一個全域性唯一的分散式ID生成器來生成唯一索引,保證不會產生衝突。

另外,如果真的產生衝突了,同步元件應該將衝突的記錄儲存下來,以便之後的問題排查。

2.4 對於DDL語句如何處理?

如果資料庫表中已經有大量資料,例如千萬級別、或者上億,這個時候對於這個表的DDL變更,將會變得非常慢,可能會需要幾分鐘甚至更長時間,而DDL操作是會鎖表的,這必然會對業務造成極大的影響。

因此,同步元件通常會對DDL語句進行過濾,不進行同步。DBA在不同的資料庫叢集上,通過一些線上DDL工具(如gh-ost),進行表結構變更。

2.5 如何解決資料迴環問題?

資料迴環問題,是資料同步過程中,最重要的問題。我們針對INSERT、UPDATE、DELETE三個操作來分別進行說明:

INSERT操作

假設在A庫插入資料,A庫產生binlog,之後同步到B庫,B庫同樣也會產生binlog。由於是雙向同步,這條記錄,又會被重新同步回A庫。由於A庫應存在這條記錄了,產生衝突。

UPDATE操作

先考慮針對A庫某條記錄R只有一次更新的情況,將R更新成R1,之後R1這個binlog會被同步到B庫,B庫又將R1同步會A庫。對於這種情況下,A庫將不會產生binlog。因為A庫記錄當前是R1,B庫同步回來的還是R1,意味著值沒有變。

在一個更新操作並沒有改變某條記錄值的情況下,mysql是不會產生binlog,相當於同步終止。下圖演示了當更新的值沒有變時,mysql實際上不會做任何操作:

上圖演示了,資料中原本有一條記錄(1,"tianshouzhi”),之後執行一個update語句,將id=1的記錄的name值再次更新為”tianshouzhi”,意味著值並沒有變更。這個時候,我們看到mysql 返回的影響的記錄函式為0,也就是說,並不會產生真是的更新操作。

然而,這並不意味UPDATE 操作沒有問題,事實上,其比INSERT更加危險。

考慮A庫的記錄R被連續更新了2次,第一次更新成R1,第二次被更新成R2;這兩條記錄變更資訊都被同步到B庫,B也產生了R1和R2。由於B的資料也在往A同步,B的R1會被先同步到A,而A現在的值是R2,由於值不一樣,將會被更新成R1,併產生新的binlog;此時B的R2再同步會A,發現A的值是R1,又更新成R2,也產生binlog。由於B同步回A的操作,讓A又產生了新的binlog,A又要同步到B,如此反覆,陷入無限迴圈中。

DELETE操作

同樣存在先後順序問題。例如先插入一條記錄,再刪除。B在A刪除後,又將插入的資料同步回A,接著再將A的刪除操作也同步回A,每次都會產生binlog,陷入無限迴環。

關於資料迴環問題,筆者有著血的教訓,曾經因為筆者的誤操作,將一個庫的資料同步到了自身,最終也導致無限迴圈,原因分析與上述提到的UPDATE、DELETE操作類似,讀者可自行思考。

針對上述資料同步到過程中可能會存在的資料迴環問題,最終會導致資料無限迴圈,因此我們必須要解決這個問題。由於存在多種解決方案,我們將在稍後統一進行講解。

2.6 資料同步架構設計

現在,讓我們先把思路先從解決資料同步的具體細節問題轉回來,從更高的層面講解資料同步的架構應該如何設計。稍後的內容中,我們將講解各種避免資料迴環的各種解決方案。

前面的架構中,只涉及到2個DB的資料同步,如果有多個DB資料需要相互同步的情況下,架構將會變得非常複雜。例如:

這個圖演示的是四個DB之間資料需要相互同步,這種拓撲結構非常複雜。為了解決這種問題,我們可以將資料寫入到一個數據中轉站,例如MQ中進行儲存,如下:

我們在不同的機房各部署一套MQ叢集,這個機房的binlog syncer將需要同步的DB binlog資料寫入MQ對應的Topic中。對端機房如果需要同步這個資料,只需要通過binlog writer訂閱這個topic,消費topic中的binlog資料,插入到目標庫中即可。一些MQ支援consumer group的概念,不同的consumer group的消費位置offset相互隔離,從而達到一份資料,同時供多個消費者進行訂閱的能力。

當然,一些binlog訂閱解析元件,可能實現了類似於MQ的功能,此時,則不需要獨立部署MQ。

3 資料同步迴環問題解決方案

資料迴環問題有多種解決方案,通過排除法,一一進行講解。

3.1 往目標庫插入不生成binlog

在mysql中,我們可以設定session變數,來控制當前會話上的更新操作,不產生binlog。這樣當往目標庫插入資料時,由於不產生binlog,也就不會被同步會源庫了。為了演示這個效果,筆者清空了本機上的所有binlog(執行reset master),現在如下圖所示:

忽略這兩個binlog event,binlog檔案格式最開始就是這兩個event。

接著,筆者執行set sql_log_bin=0,然後插入一條語句,最後可以看到的確沒有產生新的binlog事件:

通過這種方式,貌似可以解決資料迴環問題。目標庫不產生binlog,就不會被同步會源庫。

但是,答案是否定的。我們是往目標庫的master插入資料,如果不產生binlog,目標庫的slave也無法同步資料,主從資料不一致。所以,需要排除這種方案。

提示:如果恢復set sql_log_bin=1,插入語句是會產生binlog,讀者可以自行模擬。

3.2 控制binlog同步方向

既然不產生binlog不能解決問題。那麼換一種思路,可以產生binlog。當把一個binlog轉換成sql時,插入某個庫之前,我們先判斷這條記錄是不是原本就是這個庫產生的,如果是,那麼就拋棄,也可以避免迴環問題。

現在問題就變為,如何給binlog加個標記,表示其實那個mysql叢集產生的。這也有幾種方案,下面一一講述。

3.2.1 ROW模式下記錄sql

mysql主從同步,binlog複製一般有3種模式。STATEMENT,ROW,MIXED。預設情況下,STATEMENT模式只記錄SQL語句,ROW模式只記錄欄位變更前後的值,MIXED模式是二者混合。binlog同步一般使用的都是ROW模式,高版本Mysql主從同步預設也是ROW模式。

我們想採取的方案是,在執行的SQL之前加上一段特殊標記,表示這個SQL的來源。例如

  1. /*IDC1:DB1*/insertintousers(name)values("tianbowen")

其中/*IDC1:DB1*/是一個註釋,表示這個SQL原始在是IDC1的DB1中產生的。之後,在同步的時候,解析出SQL中的IDC資訊,就能判斷出是不是自己產生的資料。

然而,ROW模式下,預設只記錄變更前後的值,不記錄SQL。所以,我們要通過一個開關,讓Mysql在ROW模式下也記錄INSERT、UPDATE、DELETE的SQL語句。具體做法是,在mysql的配置檔案中,新增以下配置:

  1. binlog_rows_query_log_events=1

這個配置可以讓mysql在binlog中產生ROWS_QUERY_LOG_EVENT型別的binlog事件,其記錄的就是執行的SQL。

通過這種方式,我們就記錄下的一個binlog最初是由哪一個叢集產生的,之後在同步的時候,sql writer判斷目標機房和當前binlog中包含的機房相同,則拋棄這條資料,從而避免迴環。

這種思路,功能上沒問題,但是在實踐中,確非常麻煩。首先,讓業務對執行的每條sql都加上一個這樣的標識,幾乎不可能。另外,如果忘記加了,就不知道資料的來源了。如果採用這種方案,可以考慮在資料庫訪問層中介軟體層面新增支援在sql之前增加/*..*/的功能,統一對業務遮蔽。即使這樣,也不完美,不能保證所有的sql都通過中介軟體來來寫入,例如DBA的一些日常運維操作,或者手工通過mysql命令列來操作資料庫時,肯定會存在沒有新增機房資訊的情況。

總的來說,這個方案不是那麼完美。

3.2.2 通過附加表記錄binlog產生源叢集資訊

這種方案目前很多公司使用。大致思路是,在db中都加一張額外的表,例如叫direction,記錄一個binlog產生的源叢集的資訊。例如

  1. CREATETABLE`direction`(
  2. `idc`varchar(255)notnull,
  3. `db_cluster`varchar(255)notnull,
  4. )ENGINE=InnoDBDEFAULTCHARSET=utf8mb4

idc欄位用於記錄某條記錄原始產生的IDC,db_cluster用於記錄原始產生的資料庫叢集(注意這裡要使用叢集的名稱,不能是server_id,因為可能會發生主從切換)。

假設使用者在IDC1的庫A插入的一條記錄(也可以在事務中插入多條記錄,單條記錄,即使不開啟事務,mysql預設也會開啟事務):

  1. BEGIN;
  2. insertintousers(name)values("tianshouzhi”);
  3. COMMIT;

那麼A庫資料binlog通過sql writer同步到目標庫B時,sql writer可以提前對事務中的資訊可以進行一些修改,,如下所示:

  1. BEGIN;
  2. #往目標庫同步時,首先額外插入一條記錄,表示這個事務中的資料都是A產生的。
  3. insertintodirection(idc,db_cluster)values("IDC1”,"DB_A”)
  4. #插入原來的記錄資訊
  5. insertintousers(name)values("tianshouzhi”);
  6. COMMIT;

之後B庫的資料往A同步時,就可以根據binlog中的第一條記錄的資訊,判斷這個記錄原本就是A產生的,進行拋棄,通過這種方式來避免迴環。這種方案已經已經過很多的公司的實際驗證。

3.2.3 通過GTID

Mysql 5.6引入了GTID(全域性事務id)的概念,極大的簡化的DBA的運維。在資料同步的場景下,GTID依然也可以發揮極大的威力。

GTID 由2個部分組成:

  1. server_uuid:transaction_id

其中server_uuid是mysql隨機生成的,全域性唯一。transaction_id事務id,預設情況下每次插入一個事務,transaction_id自增1。注意,這裡並不會對GTID進行全面的介紹,僅說明其在資料同步的場景下,如何避免迴環、資料重複插入的問題。

GTID提供了一個會話級變數gtid_next,指示如何產生下一個GTID。可能的取值如下:

  • AUTOMATIC: 自動生成下一個GTID,實現上是分配一個當前例項上尚未執行過的序號最小的GTID。

  • ANONYMOUS: 設定後執行事務不會產生GTID,顯式指定的GTID。

預設情況下,是AUTOMATIC,也就是自動生成的,例如我們執行sql:

  1. insertintousers(name)values("tianbowen”);

產生的binlog資訊如下:

可以看到,GTID會在每個事務(Query->...->Xid)之前,設定這個事務下一次要使用到的GTID。

從源庫訂閱binlog的時候,由於這個GTID也可以被解析到,之後在往目標庫同步資料的時候,我們可以顯示的的指定這個GTID,不讓目標自動生成。也就是說,往目標庫,同步資料時,變成了2條SQL:

  1. SETGTID_NEXT='09530823-4f7d-11e9-b569-00163e121964:1’
  2. insertintousers(name)values("tianbowen")

由於我們顯示指定了GTID,目標庫就會使用這個GTID當做當前事務ID,不會自動生成。同樣,這個操作也會在目標庫產生binlog資訊,需要同步回源庫。再往源庫同步時,我們按照相同的方式,先設定GTID,在執行解析binlog後得到的SQL,還是上面的內容

SET GTID_NEXT= '09530823-4f7d-11e9-b569-00163e121964:1'

insert into users(name) values("tianbowen")

由於這個GTID在源庫中已經存在了,插入記錄將會被忽略,演示如下:

  1. mysql>SETGTID_NEXT='09530823-4f7d-11e9-b569-00163e121964:1';
  2. QueryOK,0rowsaffected(0.00sec)
  3. mysql>insertintousers(name)values("tianbowen");
  4. QueryOK,0rowsaffected(0.01sec)#注意這裡,影響的記錄行數為0

注意這裡,對於一條insert語句,其影響的記錄函式居然為0,也就會插入並沒有產生記錄,也就不會產生binlog,避免了迴圈問題。

如何做到的呢?mysql會記錄自己執行過的所有GTID,當判斷一個GTID已經執行過,就會忽略。通過如下sql檢視:

  1. mysql>showglobalvariableslike"gtid_executed";
  2. +---------------+------------------------------------------+
  3. |Variable_name|Value|
  4. +---------------+------------------------------------------+
  5. |gtid_executed|09530823-4f7d-11e9-b569-00163e121964:1-5|
  6. +---------------+------------------------------------------+

上述value部分,冒號":"前面的是server_uuid,冒號後面的1-5,是一個範圍,表示已經執行過1,2,3,4,5這個幾個transaction_id。這裡就能解釋了,在GTID模式的情況下,為什麼前面的插入語句影響的記錄函式為0了。

顯然,GTID除了可以幫助我們避免資料迴環問題,還可以幫助我們解決資料重複插入的問題,對於一條沒有主鍵或者唯一索引的記錄,即使重複插入也沒有,只要GTID已經執行過,之後的重複插入都會忽略。

當然,我們還可以做得更加細緻,不需要每次都往目標庫設定GTID_NEXT,這畢竟是一次網路通訊。sql writer在往目標庫插入資料之前,先判斷目標庫的server_uuid是不是和當前binlog事務資訊攜帶的server_uuid相同,如果相同,則可以直接丟棄。檢視目標庫的gtid,可以通過以下sql執行:

  1. mysql>showvariableslike"server_uuid";
  2. +---------------+--------------------------------------+
  3. |Variable_name|Value|
  4. +---------------+--------------------------------------+
  5. |server_uuid|09530823-4f7d-11e9-b569-00163e121964|
  6. +---------------+--------------------------------------+

GTID應該算是一個終極的資料迴環解決方案,mysql原生自帶,比新增一個輔助表的方式更輕量,開銷也更低。需要注意的是,這倒並不是一定說GTID的方案就比輔助表好,因為輔助表可以新增機房等額外資訊。在一些場景下,如果下游需要知道這條記錄原始產生的機房,還是需要使用輔助表。

4 開源元件介紹canal/otter

前面深入講解了單元化場景下資料同步的基礎知識。讀者可能比較感興趣的是,哪些開源元件在這些方面做的比較好。筆者建議的首選,是canal/otter組合。

canal的作用就是類似於前面所述的binlog syncer,拉取解析binlog。otter是canal的客戶端,專門用於進行資料同步,類似於前文所講解的sql writer。並且,canal的最新版本已經實現了GTID。