Canal實現Mysql資料實時同步到數倉
阿新 • • 發佈:2021-01-14
使用canal 基於mysql資料庫binlog的增量訂閱消費並通過ETL儲存到MongoDB資料庫
一:業務資料庫和資料倉庫的概要說明:
- 業務資料庫中的資料結構是為了完成交易而設計的,不是為了而查詢和分析的便利設計的。
- 業務資料庫大多是讀寫優化的,即又要讀(檢視商品資訊),也要寫(產生訂單,完成支付)。因此對於大量資料的讀(查詢指標,一般是複雜的只讀型別查詢)是支援不足的。
- 當對資料的分析逐漸演化到非常精細化和具體的使用者的叢集分析,特定使用者在某種使用場景中,例如“海外使用者和國內使用者在過去五年的第一季度服裝供應商採購的購買行為與公司進行的促銷活動方案之間的關係”。
二:資料倉庫的作用在於:
- 資料結構為了分析和查詢的便利;
- 只讀優化的資料庫,即不需要它寫入速度多麼快,只要做大量資料的複雜查詢的速度足夠快就行了。
那麼在這裡前一種業務資料庫(讀寫都優化)的是業務性資料庫,後一種是分析性資料庫,即資料倉庫。
在此記錄一下用Canal監聽MySQL的binlog事件,實現增量訂閱消費,並將其轉儲存到mongo資料倉庫,(此處選用MongoDB作為資料倉庫)。
這樣把資料從業務性的資料庫中提取、加工、匯入分析性的資料庫就是傳統的ETL 工作。
- 即Extract-Transform-Load,用來描述將資料從來源端經過抽取(extract)、轉換(transform)、載入(load)至目的端的過程
三:涉及到的技術:Docker,Canal,Mysql,MongoDB
簡要介紹Canal原理
在row level模式下,bin-log中可以不記錄執行的sql語句的上下文相關的資訊,僅僅只需要記錄那一條被修改。所以rowlevel的日誌內容會非常清楚的記錄下每一行資料修改的細節。不會出現某些特定的情況下的儲存過程或function,以及trigger的呼叫和觸發無法被正確複製的問題
四: 檢視並設定MySQL的binlog模式為ROW模式
mysql> SHOW VARIABLES LIKE 'binlog_format'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | binlog_format | ROW | +---------------+-------+ 1 row in set (1.58 sec)
- 確認是使用ROW的binlog模式,給MySQL資料庫分配一個canal的角色許可權
CREATE USER canaletl IDENTIFIED BY 'canal';
alter user 'canaletl'@'%' identified with mysql_native_password by 'canalpass';
GRANT ALL PRIVILEGES ON *.* TO 'canaletl'@'%' ;
FLUSH PRIVILEGES;
五: Docker 建立啟動一個較新和穩定的Canal:1.1.4版本
docker run -d --name=canal --network=cluster-net -p 11111:11111 -e canal.destinations=qcdbv2 -e canal.instance.master.address=mysql8:3306 -e canal.instance.dbUsername=canaletl -e canal.instance.dbPassword=canal1qaz2wsx -e canal.instance.connectionCharset=UTF-8 -e canal.user=canal -e canal.passwd=canal canal/canal-server:v1.1.4
六:啟動後檢視docker執行容器:
root@server:~# docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
cdd169d7b076 canal/canal-server:v1.1.4 "/alidata/bin/main.s…" 1 hours ago Up 1 hours 9100/tcp, 11110/tcp, 11112/tcp, 0.0.0.0:11111->11111/tcp canal
七:檢視Canal監聽日誌,確認是否成功監聽MySQL
[root@62a5f11f7e15 admin]# tail -f -n100 canal-server/logs/test/test.log
]
2020-04-22 15:16:03.997 [Thread-6] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - stop CannalInstance for null-mysqldb
2020-04-22 15:16:04.000 [Thread-6] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - stop successful....
2020-04-22 15:41:47.637 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2020-04-22 15:41:47.640 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [mysqldb/instance.properties]
2020-04-22 15:41:47.757 [main] WARN o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)]
2020-04-22 15:41:47.784 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2020-04-22 15:41:47.785 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [mysqldb/instance.properties]
2020-04-22 15:41:48.083 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-mysqldb
2020-04-22 15:41:48.092 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2020-04-22 15:41:48.093 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter :
2020-04-22 15:41:48.183 [destination = mysqldb , address = mysql8/10.0.1.2:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2020-04-22 15:41:48.185 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - subscribe filter change to mysqldb\..*
2020-04-22 15:41:48.185 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^mysqldb\..*$
2020-04-22 15:41:48.185 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2020-04-22 15:41:48.204 [destination = mysqldb , address = mysql8/10.0.1.2:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just last position
{"identity":{"slaveId":-1,"sourceAddress":{"address":"mysql8","port":3306}},"postion":{"gtid":"","included":false,"journalName":"binlog.000006","position":30019,"serverId":1,"timestamp":1587534667000}}
2020-04-22 15:41:48.593 [destination = mysqldb , address = mysql8/10.0.1.2:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=binlog.000006,position=30019,serverId=1,gtid=,timestamp=1587534667000] cost : 401ms , the next step is binlog dump
八:編寫ETL程式碼,主要實現功能為接收Canal的binlog訂閱事件位元組流,轉換為可供MongoDB儲存的集合,連線MongoDB進行增刪改查等操作
- 舉例,連線類CanalClientStarter
/**
* Filter規則描述:適用於instance.properties和Consumer端的subscribe()方法
1) 所有表:.* or .*\\..*
2) canal schema下所有表: canal\\..*
3) canal下的以canal打頭的表:canal\\.canal.*
4) canal schema下的一張表:canal.test1
5) 多個規則組合使用:canal\\..*,mysql.test1,mysql.test2 (逗號分隔)
*
*/
private String heartBeatCollection4Mongo;
public void init() {
doHeartBeat();
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
CanalConnector connector = getCanalConnector();
int batchSize = 100;
int retryCnt = 0;
while (running) {
try { connector.connect();
connector.subscribe(canalFilterReg);//canal schema下所有表: canal\\..*
connector.rollback();
while (running) {
Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的資料
long batchId = message.getId();
int size = message.getEntries().size();
logger.debug("canal subscribed batchId:" + batchId);
if (batchId == -1 || size == 0) {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
} else {
boolean res = consumeCanalMessage(message.getEntries());
if (res) {
connector.ack(batchId);
} else {
connector.rollback(batchId);
}
}
doHeartBeat();
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
try {
Thread.sleep(10000);
} catch (InterruptedException e1) {
}
} finally {
retryCnt++;
logger.debug("retry " + retryCnt);
if (retryCnt > 20) {
connector.disconnect();
connector = getCanalConnector();
retryCnt = 0;
}
}
}
logger.info("etl服務退出");
mongoTemplate.save(new HeartBeatDoc(new Date(), destination, zkServers),
heartBeatCollection4Mongo + "_kill");
}
});
thread.start();
}
protected CanalConnector getCanalConnector() {
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress(canalServerHost, canalServerPort), destination, canalUser, canalPwd);
//CanalConnector connector = CanalConnectors.newClusterConnector(zkServers, destination, canalUser, canalPwd);
return connector;
}
protected boolean consumeCanalMessage(List<Entry> entries) {
for (Entry entry : entries) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of rowChange-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
String tableName = entry.getHeader().getTableName();
long logfileOffset = entry.getHeader().getLogfileOffset();
logger.debug(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), logfileOffset, entry.getHeader().getSchemaName(), tableName,
eventType));
if (!etlServiceScannerAndDispatcher.matchEtlService(tableName, eventType)) {
logger.info("no etl service bean matching db operation:" + eventType + " on " + tableName);
continue;
}
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
//logger.debug("-------> before");
printColumn(rowData.getBeforeColumnsList());
//logger.debug("-------> after");
printColumn(rowData.getAfterColumnsList());
}
etlServiceScannerAndDispatcher.doEtl(tableName, eventType, rowData);
}
}
return true;
}
......
}
九:啟動etl工程專案
發現打出了心跳,則說明已經成功與Canal對接上
11:36:20DEBUG[by:][CanalClientStarter.run:L73]canal subscribed batchId:-1
11:36:30DEBUG[by:][CanalClientStarter.doHeartBeat:L176]Canal client HeartBeat...
11:36:30DEBUG[by:][CanalClientStarter.run:L73]canal subscribed batchId:-1
11:36:40DEBUG[by:][CanalClientStarter.doHeartBeat:L176]Canal client HeartBeat...
11:36:40DEBUG[by:][CanalClientStarter.run:L73]canal subscribed batchId:-1
11:36:50DEBUG[by:][CanalClientStarter.doHeartBeat:L176]Canal client HeartBeat...
11:36:50DEBUG[by:][CanalClientStarter.run:L73]canal subscribed batchId:-1
11:37:00DEBUG[by:][CanalClientStarter.doHeartBeat:L176]Canal client HeartBeat...
11:37:00DEBUG[by:][CanalClientStarter.run:L73]canal subscribed batchId:-1
11:37:10DEBUG[by:][CanalClientStarter.doHeartBeat:L176]Canal client HeartBeat...
11:37:10DEBUG[by:][CanalClientStarter.run:L73]canal subscribed batchId:-1
11:37:20DEBUG[by:][CanalClientStarter.doHeartBeat:L176]Canal client HeartBeat...
11:37:20DEBUG[by:][CanalClientStarter.run:L73]canal subscribed batchId:-1
11:37:30DEBUG[by:][CanalClientStarter.doHeartBeat:L176]Canal client HeartBeat...
11:37:30DEBUG[by:][CanalClientStarter.run:L73]canal subscribed batchId:-1
11:37:40DEBUG[by:][CanalClientStarter.doHeartBeat:L176]Canal client HeartBeat...
11:37:40DEBUG[by:][CanalClientStarter.run:L73]canal subscribed batchId:-1
11:37:50DEBUG[by:][CanalClientStarter.doHeartBeat:L176]Canal client HeartBeat...
11:37:50DEBUG[by:][CanalClientStarter.run:L73]canal subscribed batchId:-1
- Mysql執行新增表的操作,初始化跑完影子表後,即建立etl監聽的binlog事件,初始化表,更新表資料等
十:流程梳理
Canal監聽到binlog事件進行增量訂閱
2020-04-22 15:41:48.204 [destination = mysqldb , address = mysql8/10.0.1.2:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just last position
{"identity":{"slaveId":-1,"sourceAddress":{"address":"mysql8","port":3306}},"postion":{"gtid":"","included":false,"journalName":"binlog.000006","position":30019,"serverId":1,"timestamp":1587534667000}}
2020-04-22 15:41:48.593 [destination = mysqldb , address = mysql8/10.0.1.2:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=binlog.000006,position=30019,serverId=1,gtid=,timestamp=1587534667000] cost : 401ms , the next step is binlog dump
2020-04-23 11:59:37.220 [New I/O server worker #1-3] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - subscribe filter change to mysqldb\..*
2020-04-23 11:59:37.220 [New I/O server worker #1-3] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^mysqldb\..*$
ETL監聽到Canal訂閱到資料,開始轉換儲存到MongoDB,輸出更新日誌,
14:01:44DEBUG[by:][CanalClientStarter.consumeCanalMessage:L140]================> binlog[binlog.000009:721728] , name[mysqldb,fi_task_etl_init] , eventType : INSERT
14:01:44DEBUG[by:][CanalClientStarter.consumeCanalMessage:L140]================> binlog[binlog.000009:729938] , name[mysqldb,fi_task_etl_init] , eventType : INSERT
14:01:44DEBUG[by:][CanalClientStarter.doHeartBeat:L177]Canal client HeartBeat...
14:01:44DEBUG[by:][CanalClientStarter.run:L73]canal subscribed batchId:49
14:01:44DEBUG[by:][CanalClientStarter.consumeCanalMessage:L140]================> binlog[binlog.000009:737877] , name[mysqldb,fi_task_etl_init] , eventType : INSERT
14:01:44DEBUG[by:][CanalClientStarter.consumeCanalMessage:L140]================> binlog[binlog.000009:745600] , name[mysqldb,fi_task_etl_init] , eventType : INSERT
14:01:44DEBUG[by:][CanalClientStarter.doHeartBeat:L177]Canal client HeartBeat...
14:01:44DEBUG[by:][CanalClientStarter.run:L73]canal subscribed batchId:50
14:01:44DEBUG[by:][CanalClientStarter.consumeCanalMessage:L140]================> binlog[binlog.000009:753293] , name[mysqldb,fi_task_etl_init] , eventType : INSERT
14:01:44DEBUG[by:][CanalClientStarter.consumeCanalMessage:L140]================> binlog[binlog.000009:761419] , name[mysqldb,fi_task_etl_init] , eventType : INSERT
14:01:44DEBUG[by:][CanalClientStarter.doHeartBeat:L177]Canal client HeartBeat...
14:01:44DEBUG[by:][CanalClientStarter.run:L73]canal subscribed batchId:51
檢視mongo確實有剛剛新增的集合
整體效果:
Mysql --> Canal --> ETL --> MongoDB