使用 Binlog 和 Canal 從 MySQL 抽取資料
資料抽取是 ETL 流程的第一步。我們會將資料從 RDBMS 或日誌伺服器等外部系統抽取至資料倉庫,進行清洗、轉換、聚合等操作。在現代網站技術棧中,MySQL 是最常見的資料庫管理系統,我們會從多個不同的 MySQL 例項中抽取資料,存入一箇中心節點,或直接進入 Hive。市面上已有多種成熟的、基於 SQL 查詢的抽取軟體,如著名的開源專案 Apache Sqoop,然而這些工具並不支援實時的資料抽取。MySQL Binlog 則是一種實時的資料流,用於主從節點之間的資料複製,我們可以利用它來進行資料抽取。藉助阿里巴巴開源的 Canal 專案,我們能夠非常便捷地將 MySQL 中的資料抽取到任意目標儲存中。
Canal 的組成部分
簡單來說,Canal 會將自己偽裝成 MySQL 從節點(Slave),並從主節點(Master)獲取 Binlog,解析和貯存後供下游消費端使用。Canal 包含兩個組成部分:服務端和客戶端。服務端負責連線至不同的 MySQL 例項,併為每個例項維護一個事件訊息佇列;客戶端則可以訂閱這些佇列中的資料變更事件,處理並存儲到資料倉庫中。下面我們來看如何快速搭建起一個 Canal 服務。
配置 MySQL 主節點
MySQL 預設沒有開啟 Binlog,因此我們需要對 my.cnf
檔案做以下修改:
server-id = 1 log_bin = /path/to/mysql-bin.log binlog_format = ROW
- 1
- 2
- 3
注意 binlog_format
必須設定為 ROW
, 因為在 STATEMENT
或 MIXED
模式下, Binlog 只會記錄和傳輸 SQL 語句(以減少日誌大小),而不包含具體資料,我們也就無法儲存了。
從節點通過一個專門的賬號連線主節點,這個賬號需要擁有全域性的 REPLICATION
許可權。我們可以使用 GRANT
命令建立這樣的賬號:
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT
ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
- 1
- 2
啟動 Canal 服務端
從 GitHub 專案釋出頁中下載 Canal 服務端程式碼(連結),配置檔案在 conf
資料夾下,有以下目錄結構:
canal.deployer/conf/canal.properties
canal.deployer/conf/instanceA/instance.properties
canal.deployer/conf/instanceB/instance.properties
- 1
- 2
- 3
conf/canal.properties
是主配置檔案,如其中的 canal.port
用以指定服務端監聽的埠。instanceA/instance.properties
則是各個例項的配置檔案,主要的配置項有:
# slaveId 不能與 my.cnf 中的 server-id 項重複
canal.instance.mysql.slaveId = 1234
canal.instance.master.address = 127.0.0.1:3306
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.connectionCharset = UTF-8
# 訂閱例項中所有的資料庫和表
canal.instance.filter.regex = .*\\..*
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
執行 sh bin/startup.sh
命令開啟服務端,在日誌檔案 logs/example/example.log
中可以看到以下輸出:
Loading properties file from class path resource [canal.properties]
Loading properties file from class path resource [example/instance.properties]
start CannalInstance for 1-example
[destination = example , address = /127.0.0.1:3306 , EventParser] prepare to find start position just show master status
- 1
- 2
- 3
- 4
編寫 Canal 客戶端
從服務端消費變更訊息時,我們需要建立一個 Canal 客戶端,指定需要訂閱的資料庫和表,並開啟輪詢。
首先,在專案中新增 com.alibaba.otter:canal.client
依賴項,構建 CanalConnector
例項:
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
connector.connect();
connector.subscribe(".*\\..*");
while (true) {
Message message = connector.getWithoutAck(100);
long batchId = message.getId();
if (batchId == -1 || message.getEntries().isEmpty()) {
Thread.sleep(3000);
} else {
printEntries(message.getEntries());
connector.ack(batchId);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
這段程式碼和連線訊息系統很相似。變更事件會批量傳送過來,待處理完畢後我們可以 ACK 這一批次,從而避免訊息丟失。
// printEntries
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
for (RowData rowData : rowChange.getRowDatasList()) {
if (rowChange.getEventType() == EventType.INSERT) {
printColumns(rowData.getAfterCollumnList());
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
每一個 Entry
代表一組具有相同變更型別的資料列表,如 INSERT 型別、UPDATE、DELETE 等。每一行資料我們都可以獲取到各個欄位的資訊:
// printColumns
String line = columns.stream()
.map(column -> column.getName() + "=" + column.getValue())
.collect(Collectors.joining(","));
System.out.println(line);
- 1
- 2
- 3
- 4
- 5
完整程式碼可以在 GitHub 中找到(連結)。
載入至資料倉庫
關係型資料庫與批量更新
若資料倉庫是基於關係型資料庫的,我們可以直接使用 REPLACE
語句將資料變更寫入目標表。其中需要注意的是寫入效能,在更新較頻繁的場景下,我們通常會快取一段時間的資料,並批量更新至資料庫,如:
REPLACE INTO `user` (`id`, `name`, `age`, `updated`) VALUES
(1, 'Jerry', 30, '2017-08-12 16:00:00'),
(2, 'Mary', 28, '2017-08-12 17:00:00'),
(3, 'Tom', 36, '2017-08-12 18:00:00');
- 1
- 2
- 3
- 4
另一種方式是將資料變更寫入按分隔符分割的文字檔案,並用 LOAD DATA
語句載入資料庫。這些檔案也可以用在需要寫入 Hive 的場景中。不管使用哪一種方法,請一定注意要對字串型別的欄位進行轉義,避免匯入時出錯。
基於 Hive 的資料倉庫
Hive 表儲存在 HDFS 上,該檔案系統不支援修改,因此我們需要一些額外工作來寫入資料變更。常用的方式包括:JOIN、Hive 事務、或改用 HBase。
資料可以歸類成基礎資料和增量資料。如昨日的 user
表是基礎資料,今日變更的行是增量資料。通過 FULL OUTER JOIN
,我們可以將基礎和增量資料合併成一張最新的資料表,並作為明天的基礎資料:
SELECT
COALESCE(b.`id`, a.`id`) AS `id`
,COALESCE(b.`name`, a.`name`) AS `name`
,COALESCE(b.`age`, a.`age`) AS `age`
,COALESCE(b.`updated`, a.`updated`) AS `updated`
FROM dw_stage.`user` a
FULL OUTER JOIN (
-- 增量資料會包含重複資料,因此需要選擇最新的那一條
SELECT `id`, `name`, `age`, `updated`
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY `id` ORDER BY `updated` DESC) AS `n`
FROM dw_stage.`user_delta`
) b
WHERE `n` = 1
) b
ON a.`id` = b.`id`;
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
Hive 0.13 引入了事務和 ACID 表,0.14 開始支援 INSERT
、UPDATE
、DELETE
語句,Hive 2.0.0 則又新增了 Streaming Mutation API,用以通過程式設計的方式批量更新 Hive 表中的記錄。目前,ACID 表必須使用 ORC 檔案格式進行儲存,且須按主鍵進行分桶(Bucket)。Hive 會將變更記錄儲存在增量檔案中,當 OrcInputFormat
讀取資料時會自動定位到最新的那條記錄。官方案例可以在這個連結中檢視。
最後,我們可以使用 HBase 來實現表資料的更新,它是一種 KV 儲存系統,同樣基於 HDFS。HBase 的資料可以直接為 MapReduce 指令碼使用,且 Hive 中可以建立外部對映表指向 HBase。更多資訊請檢視官方網站。
初始化資料
資料抽取通常是按需進行的,在新增一張表時,資料來源中可能已經有大量原始記錄了。常見的做法是手工將這批資料全量匯入至目標表中,但我們也可以複用 Canal 這套機制來實現歷史資料的抽取。
首先,我們在資料來源庫中建立一張輔助表:
CREATE TABLE `retl_buffer` (
id BIGINT AUTO_INCREMENT PRIMARY KEY
,table_name VARCHAR(255)
,pk_value VARCHAR(255)
);
- 1
- 2
- 3
- 4
- 5
當需要全量抽取 user
表時,我們執行以下語句,將所有 user.id
寫入輔助表中:
INSERT INTO `retl_buffer` (`table_name`, `pk_value`)
SELECT 'user', `id` FROM `user`;
- 1
- 2
Canal 客戶端在處理到 retl_buffer
表的資料變更時,可以從中解析出表名和主鍵的值,直接反查資料來源,將資料寫入目標表:
if ("retl_buffer".equals(entry.getHeader().getTableName())) {
String tableName = rowData.getAfterColumns(1).getValue();
String pkValue = rowData.getAfterColumns(2).getValue();
System.out.println("SELECT * FROM " + tableName + " WHERE id = " + pkValue);
}
- 1
- 2
- 3
- 4
- 5
這一方法在阿里巴巴的另一個開源軟體 Otter 中使用。
Canal 高可用
- Canal 服務端中的例項可以配置一個備用 MySQL,從而能夠在雙 Master 場景下自動選擇正在工作的資料來源。注意兩臺主庫都需要開啟
log_slave_updates
選項。Canal 會使用自己的心跳機制(定期更新輔助表的記錄)來檢測主庫的存活。 - Canal 自身也有 HA 配置,配合 Zookeeper,我們可以開啟多個 Canal 服務端,當某臺伺服器宕機時,客戶端可以從 ZK 中獲取新的服務端地址,繼續進行消費。更多資訊可以參考 Canal AdminGuide。