實時抽取mysql資料工具之maxwell
利用Maxwell元件實時監聽mysql的binlog日誌,並且把解析的json格式資料傳送到kafka視窗供實時消費
文件主題:
如何使用Maxwell實時監聽Mysql的binlog日誌,並且把解析的json格式資料傳送到kafka視窗
具體步驟
一:在linux環境下安裝部署好mysql
1 開啟binlog
sudo vi /etc/my.cnf
2 mysql的binlog格式有3種,為了把binlog解析成json資料格式,要設定binlog的格式為row(binlog有三種格式:Statement、Row以及Mixed)
server-id=1
log-bin=master(這一步開啟binlog)
binlog_format=row
3重啟msyql服務
sudo service mysqld restart
4檢視是否已經開啟binlog
Mysql>show variables like '%log_bin%';
此時,可以在目錄/var/lib/mysql下看到生成了相應的binlog監聽日誌檔案,如圖,master.000001檔案,每次重啟msyql服務,就會生成一個新的監聽檔案
第二大步驟:配置Maxwell相關的部署工作
1下載Maxwell
官網
http://maxwells-daemon.io/
元件下載連結
https://github.com/zendesk/maxwell/releases/download/v1.10.7/maxwell-1.10.7.tar.gz
2 安裝Maxwell
tar-zxf maxwell-1.10.6.tar.gz -C ../modules/
3給mysql授權
mysql> GRANT ALL on maxwell.* to'maxwell'@'%' identified by 'XXXXXX';
mysql> GRANT SELECT, REPLICATION CLIENT,REPLICATION SLAVE on *.* to 'maxwell'@'%';
以上圖片為官網參考
以我自己的為例:
GRANTALL on *.* to 'user01'@'%' identified by '123456';
把所有資料庫的所有表授權給user01使用者以密碼123456登入
GRANTSELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'user01'@'%';
flushprivileges;
4開啟maxwell命令列(注意,如果沒有設定,maxwell預設是把監聽的mysql的binlog日誌傳送到kafka的主題叫maxwell的topic上的)
bin/maxwell --user='maxwell' --password='XXXXXX' --host='127.0.0.1' \
--producer=kafka --kafka.bootstrap.servers=localhost:9092
以上為官網參考,我自己的具體的demo
bin/maxwell --user='user01' --password='123456'--host='192.168.136.129' --producer=kafka --kafka.bootstrap.servers=mw:9092解釋:host引數是安裝mysql的那臺主機,最後的kafka.bootstrap.servers是安裝kafka叢集的節點主機名和埠號
第三大步驟,kafka相關配置
說明(以下我的kafka是安裝在主機名叫mw,注意kafka裡的配置檔案埠號要和命令列裡給的埠號一致)
1首先啟動zookeeper
$sbin/zkServer.sh start
2開啟kafka命令列
bin/kafka-server-start.shconfig/server.properties
3建立一個主題叫maxwell以便於接受資料
bin/kafka-topics.sh--create --zookeeper mw:2181 --replication-factor 1 --partitions 1 --topic maxwell
4啟動生產者視窗
bin/kafka-console-producer.sh--broker-list mw:9092 --topic maxwell
5啟動消費者視窗
bin/kafka-console-consumer.sh--zookeeper mw:2181 --topic maxwell --from-beginning
第四大步:最終測試
此時,mysql的binlog已經開啟,maxwell命令列也開啟,kafka的生產者消費者視窗也開啟在監聽
開始往mysql裡插入修改資料
通過寫的jdbc寫的多執行緒程式批量插入修改資料的程式造資料
此時,檢視kafka的消費者視窗。可以看到,mysql插入修改的資料,以json形式被maxwell傳送到了kafka以maxwell主題命名的窗口裡
第五 額外補充
1在第三大步驟裡的第四小步開啟maxwell命令列時,可以多跟幾個引數對mysql的binlog進行過濾,只篩選某些資料庫裡的某些表
include_dbs,exclude_dbs,include_tables,exclude_tables
2注意點:kafka的安裝版本要和maxwell相容適應,不然會報錯
Kafka clients 0.8 and 0.9 are compatible with brokers running kafka 0.8.
0.10.0.x clients only support 0.10.0.x or later brokers.
Mixing Kafka 0.10 with other versions can lead to serious performance impacts.
最好安裝kafka0.10版本的吧