【Maxwell】03 定向監聽&全量輸出
阿新 • • 發佈:2022-01-26
一、定向監聽
定向監聽,即只監聽某一個特定的表,或者庫
1、建立樣本案例
-- 建立監聽的庫(演示樣本) CREATE DATABASE `test-db-2` CHARACTER SET 'utf8mb4' COLLATE 'utf8mb4_general_ci'; -- 和需要監聽的表 (演示樣本) CREATE TABLE `test-db-2`.`sample` ( `ID` bigint(20) NOT NULL AUTO_INCREMENT, `NAME` varchar(64) DEFAULT NULL, `TYPE` int(12) DEFAULT NULL, PRIMARYKEY (`ID`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4;
2、定向監聽的引數項:
cd /usr/local/maxwell-1.29.2 ./bin/maxwell \ --user='maxwell' \ --password='123456' \ --host='192.168.2.225' \ --port='3308' \ --producer='stdout' \ --filter="exclude:*.*, include:test-db-2.sample" \ --jdbc_options='useSSL=false&serverTimezone=Asia/Shanghai' # 定向監聽,通過過濾器引數實現 --filter="exclude:*.*, include:test-db-2.sample" # exclude,表示排除 # include,表示包含
針對該庫的sample表進行操作,是能夠監聽的
[root@localhost maxwell-1.29.2]# ./bin/maxwell \ > --user='maxwell' \ > --password='123456' \ > --host='192.168.2.225' \ > --port='3308' \ > --producer='stdout' \ > --filter="exclude:*.*, include:test-db-2.sample" \ > --jdbc_options='useSSL=false&serverTimezone=Asia/Shanghai' Using kafka version: 1.0.0 17:01:57,280 INFO Maxwell - Starting Maxwell. maxMemory: 247332864 bufferMemoryUsage: 0.25 17:01:57,493 INFO Maxwell - Maxwell v1.29.2 is booting (StdoutProducer), starting at Position[BinlogPosition[mysql-bin.000005:656973], lastHeartbeat=1642495761716] 17:01:57,651 INFO MysqlSavedSchema - Restoring schema id 1 (last modified at Position[BinlogPosition[mysql-bin.000005:16191], lastHeartbeat=0]) 17:01:57,819 INFO BinlogConnectorReplicator - Setting initial binlog pos to: mysql-bin.000005:656973 17:01:57,856 INFO BinaryLogClient - Connected to 192.168.2.225:3308 at mysql-bin.000005/656973 (sid:6379, cid:254) 17:01:57,856 INFO BinlogConnectorReplicator - Binlog connected. 17:01:58,149 INFO AbstractSchemaStore - storing schema @Position[BinlogPosition[mysql-bin.000005:657429], lastHeartbeat=1642495761716] after applying "CREATE DATABASE `test-db-2` CHARACTER SET 'utf8mb4' COLLATE 'utf8mb4_general_ci'" to test-db-2, new schema id is 2 17:01:58,223 INFO AbstractSchemaStore - storing schema @Position[BinlogPosition[mysql-bin.000005:657658], lastHeartbeat=1642495761716] after applying "CREATE TABLE `sample` ( `ID` bigint(20) NOT NULL AUTO_INCREMENT, `NAME` varchar(64) DEFAULT NULL, `TYPE` int(12) DEFAULT NULL, PRIMARY KEY (`ID`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4" to test-db-2, new schema id is 3 {"database":"test-db-2","table":"sample","type":"insert","ts":1642496453,"xid":87865,"commit":true,"data":{"ID":1,"NAME":"1","TYPE":11}}
如果在其他庫操作,Maxwell則是不會監聽的
INSERT INTO `test-db`.`day_sale` (`ID`, `PRODUCT`, `CHANNEL`, `AMOUNT`, `SALE_DATE`) VALUES (NULL, '產品A', '拼多多', 2497.0000, NOW()), (NULL, '產品B', '京東', 2497.0000, NOW()), (NULL, '產品C', '淘寶', 2497.0000, NOW())
二、全量輸出
全量輸出,通過主庫存放的maxwell庫的元資料的更改,實現資料初始化
(除了這種方法還有其他3種,未列舉出來)
需求:
將 test-db-2 庫下的 sample 表的四條資料,全量匯入到 maxwell 控制檯進行列印。
操作:
INSERT INTO `maxwell`.`bootstrap` (`id`, `database_name`, `table_name`, `where_clause`, `is_complete`, `inserted_rows`, `total_rows`, `created_at`, `started_at`, `completed_at`, `binlog_file`, `binlog_position`, `client_id`, `comment`) VALUES (NULL, 'test-db-2', 'sample', NULL, 0, 0, 0, NULL, NULL, NULL, NULL, 0, 'maxwell', NULL);
輸出日誌
# 不關閉maxwell也能執行,maxwell先監聽了maxwell庫的bootstrap表的操作,發現了初始化操作,然後執行同步 {"database":"maxwell","table":"bootstrap","type":"insert","ts":1642497749,"xid":90142,"commit":true,"data":{"id":2,"database_name":"test-db-2","table_name":"sample","where_clause":null,"is_complete":0,"inserted_rows":0,"total_rows":0,"created_at":null,"started_at":null,"completed_at":null,"binlog_file":null,"binlog_position":0,"client_id":"maxwell","comment":null}} {"database":"test-db-2","table":"sample","type":"bootstrap-start","ts":1642497347,"data":{}} 17:15:47,541 INFO SynchronousBootstrapper - bootstrapping started for test-db-2.sample {"database":"test-db-2","table":"sample","type":"bootstrap-insert","ts":1642497347,"data":{"ID":1,"NAME":"1","TYPE":11}} {"database":"test-db-2","table":"sample","type":"bootstrap-insert","ts":1642497347,"data":{"ID":2,"NAME":"2","TYPE":22}} {"database":"test-db-2","table":"sample","type":"bootstrap-insert","ts":1642497347,"data":{"ID":3,"NAME":"3","TYPE":33}} {"database":"test-db-2","table":"sample","type":"bootstrap-insert","ts":1642497347,"data":{"ID":4,"NAME":"4","TYPE":44}} {"database":"test-db-2","table":"sample","type":"bootstrap-insert","ts":1642497347,"data":{"ID":5,"NAME":"5","TYPE":55}} {"database":"test-db-2","table":"sample","type":"bootstrap-complete","ts":1642497347,"data":{}} 17:15:47,592 INFO SynchronousBootstrapper - bootstrapping ended for #2 test-db-2.sample
同步完成的結果:
SELECT * FROM `maxwell`.`bootstrap`; +----+---------------+------------+--------------+-------------+---------------+------------+------------+---------------------+---------------------+-------------+-----------------+-----------+---------+ | id | database_name | table_name | where_clause | is_complete | inserted_rows | total_rows | created_at | started_at | completed_at | binlog_file | binlog_position | client_id | comment | +----+---------------+------------+--------------+-------------+---------------+------------+------------+---------------------+---------------------+-------------+-----------------+-----------+---------+ | 2 | test-db-2 | sample | NULL | 1 | 5 | 0 | NULL | 2022-01-18 17:22:30 | 2022-01-18 17:22:30 | NULL | 0 | maxwell | NULL | +----+---------------+------------+--------------+-------------+---------------+------------+------------+---------------------+---------------------+-------------+-----------------+-----------+---------+
當資料全部初始化完成以後,Maxwell 的元資料會變化
is_complete 欄位從 0 變為 1
started_at 欄位從 null 變為具體時間(資料同步開始時間)
completed_at 欄位從 null 變為具體時間(資料同步結束時間)