1. 程式人生 > 其它 >【Maxwell】03 定向監聽&全量輸出

【Maxwell】03 定向監聽&全量輸出

一、定向監聽

定向監聽,即只監聽某一個特定的表,或者庫

1、建立樣本案例

-- 建立監聽的庫(演示樣本)
CREATE DATABASE `test-db-2` CHARACTER SET 'utf8mb4' COLLATE 'utf8mb4_general_ci';

-- 和需要監聽的表 (演示樣本)
CREATE TABLE `test-db-2`.`sample` (
`ID` bigint(20) NOT NULL AUTO_INCREMENT,
`NAME` varchar(64) DEFAULT NULL,
`TYPE` int(12) DEFAULT NULL,
PRIMARY
KEY (`ID`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4;

2、定向監聽的引數項:

cd /usr/local/maxwell-1.29.2
./bin/maxwell \
--user='maxwell' \
--password='123456' \
--host='192.168.2.225' \
--port='3308' \
--producer='stdout' \
--filter="exclude:*.*, include:test-db-2.sample" \
--jdbc_options='useSSL=false&serverTimezone=Asia/Shanghai
' # 定向監聽,通過過濾器引數實現 --filter="exclude:*.*, include:test-db-2.sample" # exclude,表示排除 # include,表示包含

針對該庫的sample表進行操作,是能夠監聽的

[root@localhost maxwell-1.29.2]# ./bin/maxwell \
> --user='maxwell' \
> --password='123456' \
> --host='192.168.2.225' \
> --port='3308' \
> --producer='stdout
' \ > --filter="exclude:*.*, include:test-db-2.sample" \ > --jdbc_options='useSSL=false&serverTimezone=Asia/Shanghai' Using kafka version: 1.0.0 17:01:57,280 INFO Maxwell - Starting Maxwell. maxMemory: 247332864 bufferMemoryUsage: 0.25 17:01:57,493 INFO Maxwell - Maxwell v1.29.2 is booting (StdoutProducer), starting at Position[BinlogPosition[mysql-bin.000005:656973], lastHeartbeat=1642495761716] 17:01:57,651 INFO MysqlSavedSchema - Restoring schema id 1 (last modified at Position[BinlogPosition[mysql-bin.000005:16191], lastHeartbeat=0]) 17:01:57,819 INFO BinlogConnectorReplicator - Setting initial binlog pos to: mysql-bin.000005:656973 17:01:57,856 INFO BinaryLogClient - Connected to 192.168.2.225:3308 at mysql-bin.000005/656973 (sid:6379, cid:254) 17:01:57,856 INFO BinlogConnectorReplicator - Binlog connected. 17:01:58,149 INFO AbstractSchemaStore - storing schema @Position[BinlogPosition[mysql-bin.000005:657429], lastHeartbeat=1642495761716] after applying "CREATE DATABASE `test-db-2` CHARACTER SET 'utf8mb4' COLLATE 'utf8mb4_general_ci'" to test-db-2, new schema id is 2 17:01:58,223 INFO AbstractSchemaStore - storing schema @Position[BinlogPosition[mysql-bin.000005:657658], lastHeartbeat=1642495761716] after applying "CREATE TABLE `sample` ( `ID` bigint(20) NOT NULL AUTO_INCREMENT, `NAME` varchar(64) DEFAULT NULL, `TYPE` int(12) DEFAULT NULL, PRIMARY KEY (`ID`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4" to test-db-2, new schema id is 3 {"database":"test-db-2","table":"sample","type":"insert","ts":1642496453,"xid":87865,"commit":true,"data":{"ID":1,"NAME":"1","TYPE":11}}

如果在其他庫操作,Maxwell則是不會監聽的

INSERT INTO `test-db`.`day_sale` (`ID`, `PRODUCT`, `CHANNEL`, `AMOUNT`, `SALE_DATE`) VALUES 
(NULL, '產品A', '拼多多', 2497.0000, NOW()),
(NULL, '產品B', '京東', 2497.0000, NOW()),
(NULL, '產品C', '淘寶', 2497.0000, NOW())

二、全量輸出

全量輸出,通過主庫存放的maxwell庫的元資料的更改,實現資料初始化

(除了這種方法還有其他3種,未列舉出來)

需求:

將 test-db-2 庫下的 sample 表的四條資料,全量匯入到 maxwell 控制檯進行列印。

操作:

INSERT INTO `maxwell`.`bootstrap` (`id`, `database_name`, `table_name`, `where_clause`, `is_complete`, `inserted_rows`, `total_rows`, `created_at`, `started_at`, `completed_at`, `binlog_file`, `binlog_position`, `client_id`, `comment`) VALUES 
(NULL, 'test-db-2', 'sample', NULL, 0, 0, 0, NULL, NULL, NULL, NULL, 0, 'maxwell', NULL);

輸出日誌

# 不關閉maxwell也能執行,maxwell先監聽了maxwell庫的bootstrap表的操作,發現了初始化操作,然後執行同步
{"database":"maxwell","table":"bootstrap","type":"insert","ts":1642497749,"xid":90142,"commit":true,"data":{"id":2,"database_name":"test-db-2","table_name":"sample","where_clause":null,"is_complete":0,"inserted_rows":0,"total_rows":0,"created_at":null,"started_at":null,"completed_at":null,"binlog_file":null,"binlog_position":0,"client_id":"maxwell","comment":null}}
{"database":"test-db-2","table":"sample","type":"bootstrap-start","ts":1642497347,"data":{}}
17:15:47,541 INFO  SynchronousBootstrapper - bootstrapping started for test-db-2.sample
{"database":"test-db-2","table":"sample","type":"bootstrap-insert","ts":1642497347,"data":{"ID":1,"NAME":"1","TYPE":11}}
{"database":"test-db-2","table":"sample","type":"bootstrap-insert","ts":1642497347,"data":{"ID":2,"NAME":"2","TYPE":22}}
{"database":"test-db-2","table":"sample","type":"bootstrap-insert","ts":1642497347,"data":{"ID":3,"NAME":"3","TYPE":33}}
{"database":"test-db-2","table":"sample","type":"bootstrap-insert","ts":1642497347,"data":{"ID":4,"NAME":"4","TYPE":44}}
{"database":"test-db-2","table":"sample","type":"bootstrap-insert","ts":1642497347,"data":{"ID":5,"NAME":"5","TYPE":55}}
{"database":"test-db-2","table":"sample","type":"bootstrap-complete","ts":1642497347,"data":{}}
17:15:47,592 INFO  SynchronousBootstrapper - bootstrapping ended for #2 test-db-2.sample

同步完成的結果:

SELECT * FROM `maxwell`.`bootstrap`;
+----+---------------+------------+--------------+-------------+---------------+------------+------------+---------------------+---------------------+-------------+-----------------+-----------+---------+
| id | database_name | table_name | where_clause | is_complete | inserted_rows | total_rows | created_at | started_at          | completed_at        | binlog_file | binlog_position | client_id | comment |
+----+---------------+------------+--------------+-------------+---------------+------------+------------+---------------------+---------------------+-------------+-----------------+-----------+---------+
|  2 | test-db-2     | sample     | NULL         |           1 |             5 |          0 | NULL       | 2022-01-18 17:22:30 | 2022-01-18 17:22:30 | NULL        |               0 | maxwell   | NULL    |
+----+---------------+------------+--------------+-------------+---------------+------------+------------+---------------------+---------------------+-------------+-----------------+-----------+---------+

當資料全部初始化完成以後,Maxwell 的元資料會變化
is_complete 欄位從 0 變為 1
started_at 欄位從 null 變為具體時間(資料同步開始時間)
completed_at 欄位從 null 變為具體時間(資料同步結束時間)