CanalSharp-mysql資料庫binlog的增量訂閱&消費元件Canal的.NET客戶端
一.前言
CanalSharp是阿里巴巴開源專案mysql資料庫binlog的增量訂閱&消費元件 Canal 的.NET客戶端,關於什麼是 Canal?又能做什麼?我會在後文為大家一一介紹。CanalSharp 這個專案,是由我和 WithLin(主要貢獻) 完成,並將一直進行維護的Canal的.NET客戶端專案。目前開源在github:https://github.com/CanalSharp/CanalSharp/ 希望大家多多支援,旨在為.NET開發者提供一個友好的對接Canal的選擇,為.NET社群生態做貢獻。
二.Canal介紹
1.背景
早期,阿里巴巴B2B公司因為存在杭州和美國雙機房部署,存在跨機房同步的業務需求。不過早期的資料庫同步業務,主要是基於trigger的方式獲取增量變更,不過從2010年開始,阿里系公司開始逐步的嘗試基於資料庫的日誌解析,獲取增量變更進行同步,由此衍生出了增量訂閱&消費的業務,從此開啟了一段新紀元。
ps. 目前內部版本已經支援mysql和oracle部分版本的日誌解析,當前的canal開源版本支援5.7及以下的版本(阿里內部mysql 5.7.13, 5.6.10, mysql 5.5.18和5.1.40/48)
基於日誌增量訂閱&消費支援的業務:
- 資料庫映象
- 資料庫實時備份
- 多級索引 (賣家和買家各自分庫索引)
- search build
- 業務cache重新整理
- 價格變化等重要業務訊息
2.工作原理
2.1 mysql主備複製實現
從上層來看,複製分成三步:
- master將改變記錄到二進位制日誌(binary log)中(這些記錄叫做二進位制日誌事件,binary log events,可以通過show binlog events進行檢視);
- slave將master的binary log events拷貝到它的中繼日誌(relay log);
- slave重做中繼日誌中的事件,將改變反映它自己的資料。
2.2 Canal的工作原理
原理相對比較簡單:
- canal模擬mysql slave的互動協議,偽裝自己為mysql slave,向mysql master傳送dump協議
- mysql master收到dump請求,開始推送binary log給slave(也就是canal)
- canal解析binary log物件(原始為byte流)
3.Canal的安裝以及使用
三.CanalSharp介紹
1.工作原理
CanalSharp 是 Canal 的 .NET 客戶端,它與 Canal 是採用的Socket來進行通訊的,傳輸協議是TCP,互動協議採用的是 Google Protocol Buffer 3.0。
2.工作流程
1.Canal連線到mysql資料庫,模擬slave
2.CanalSharp與Canal建立連線
2.資料庫發生變更寫入到binlog
5.Canal向資料庫傳送dump請求,獲取binlog並解析
4.CanalSharp向Canal請求資料庫變更
4.Canal傳送解析後的資料給CanalSharp
5.CanalSharp收到資料,消費成功,傳送回執。(可選)
6.Canal記錄消費位置。
以一張圖來表示:
3.應用場景
CanalSharp作為Canal的客戶端,其應用場景就是Canal的應用場景。關於應用場景在Canal介紹一節已有概述。這裡我舉一些實際的使用例子:
1.代替使用輪詢資料庫方式來監控資料庫變更,有效改善輪詢耗費資料庫資源。
2.根據資料庫的變更實時更新搜尋引擎,比如電商場景下商品資訊發生變更,實時同步到商品搜尋引擎 Elasticsearch、solr等
3.根據資料庫的變更實時更新快取,比如電商場景下商品價格、庫存發生變更實時同步到redis
4.資料庫異地備份、資料同步
5.根據資料庫變更觸發某種業務,比如電商場景下,建立訂單超過xx時間未支付被自動取消,我們獲取到這條訂單資料的狀態變更即可向使用者推送訊息。
6.將資料庫變更整理成自己的資料格式傳送到kafka等訊息佇列,供訊息佇列的消費者進行消費。
四.CanalSharp的使用
1.使用前的準備
使用 CanalSharp 之前,必然要先準備好mysql資料庫以及Canal才行,這個步驟請直接查閱Canal官方文件 https://github.com/alibaba/canal/wiki 。但是為了讓大家能快速跑通CanalSharp,CanalSharp 專案為大家提供了一個通過 docker-compose 同時執行 mysql和canal。
2.通過docker-compose執行mysql和canal:
git clone https://github.com/CanalSharp/CanalSharp.git
cd docker
docker-compose up -d
出現下圖表示執行成功:
3.使用navicat等資料庫管理工具連線mysql
ip:執行docker的伺服器ip
mysql使用者:root
mysql密碼:000000
mysql埠:4406
預設提供了一個test資料庫,然後有一張名為test的表。
4.建立一個 .NET Core 控制檯專案
5.新增 Nuget 程式包
Install-Package CanalSharp.Client
6.編碼
(1)建立連線
//canal 配置的 destination,預設為 example
var destination = "example";
//建立一個簡單CanalClient連線物件(此物件不支援叢集)傳入引數分別為 canal地址、埠、destination、使用者名稱、密碼
var connector = CanalConnectors.NewSingleConnector("119.27.178.76", 11111, destination, "", "");
//連線 Canal
connector.Connect();
//訂閱,同時傳入Filter,如果不傳則以Canal的Filter為準。Filter是一種過濾規則,通過該規則的表資料變更才會傳遞過來
connector.Subscribe(".*\\\\..*");
(2)獲取資料
while (true)
{
//獲取資料 1024表示資料大小 單位為位元組
var message = connector.Get(1024);
//批次id 可用於回滾
var batchId = message.Id;
if (batchId == -1 || message.Entries.Count <= 0)
{
Thread.Sleep(300);
continue;
}
PrintEntry(message.Entries);
}
(3)輸出資料
/// <summary>
/// 輸出資料
/// </summary>
/// <param name="entrys">一個entry表示一個數據庫變更</param>
private static void PrintEntry(List<Entry> entrys)
{
foreach (var entry in entrys)
{
if (entry.EntryType == EntryType.Transactionbegin || entry.EntryType == EntryType.Transactionend)
{
continue;
}
RowChange rowChange = null;
try
{
//獲取行變更
rowChange = RowChange.Parser.ParseFrom(entry.StoreValue);
}
catch (Exception e)
{
}
if (rowChange != null)
{
//變更型別 insert/update/delete 等等
EventType eventType = rowChange.EventType;
//輸出binlog資訊 表名 資料庫名 變更型別
Console.WriteLine(
$"================> binlog[{entry.Header.LogfileName}:{entry.Header.LogfileOffset}] , name[{entry.Header.SchemaName},{entry.Header.TableName}] , eventType :{eventType}");
//輸出 insert/update/delete 變更型別列資料
foreach (var rowData in rowChange.RowDatas)
{
if (eventType == EventType.Delete)
{
PrintColumn(rowData.BeforeColumns.ToList());
}
else if (eventType == EventType.Insert)
{
PrintColumn(rowData.AfterColumns.ToList());
}
else
{
Console.WriteLine("-------> before");
PrintColumn(rowData.BeforeColumns.ToList());
Console.WriteLine("-------> after");
PrintColumn(rowData.AfterColumns.ToList());
}
}
}
}
}
/// <summary>
/// 輸出每個列的詳細資料
/// </summary>
/// <param name="columns"></param>
private static void PrintColumn(List<Column> columns)
{
foreach (var column in columns)
{
//輸出列明 列值 是否變更
Console.WriteLine($"{column.Name} : {column.Value} update= {column.Updated}");
}
}
7.測試執行
首次執行會輸出一堆資料,那些都是初始化執行建立表的資料,忽略即可
執行專案,然後一次執行sql觀察輸出:
insert into test values(1000,'111');
update test set name='222' where id=1000;
delete from test where id=1000;
通過新標籤頁開啟圖片檢視大圖
可以看見我們分別執行 insert、update、delete 語句,我們的CanalSharp都獲取到了資料庫變更。
五.使用Canal的經驗
1.mysql資料庫版本有要求:5.7.13, 5.6.10,、5.5.18和5.1.40/48,不一定非要滿足小版本號的要求,比如 5.7.x、5.6.x、5.5.x都應該可以,但是實際需要自己做測試。前面的具體版本號是Canal官方提供的資料,但是博主公司用的mysql 的版本是5.5.60,是可以正常使用Canal的。
2.mysql資料binlog的格式強烈建議設定為row
3.Canal並非必須連線到master資料庫,它同樣可以連線到slave資料庫,只是從庫出了需要開啟寫入binlog以外還需要設定 log-slave-updates
開啟。
4.如果生產環境已經存在mysql叢集,且叢集主庫的binlog格式為mixed,mysql資料庫叢集的主庫binlog格式可以不用改依然為 mixed,設定某一個從庫binlog格式配置為 row,讓Canal連線從庫,這樣可以避免對生產環境的mysql叢集產生影響。
5.mysql支援Statement,MiXED,以及ROW三種格式的binlog為什麼推薦使用row格式binlog,經過博主實際測試,使用row格式相容性是最好的,實際可以自己測試。
六.結束語
CanalSharp的介紹到這裡就結束了,如果覺得這個專案有用的歡迎大家來個 star 。後續將會寫幾篇文章介紹更詳細的使用方法以及實戰。