1. 程式人生 > >資料收集之binlog同步----Maxwell

資料收集之binlog同步----Maxwell

簡介

Maxwell是由Java語言編寫,Zendesk開源的binlog解析同步工具。可通過簡單配置,將binlog解析並以json的格式同步到如file,kafka,redis,RabbitMQ等系統中。也可自定義輸出。相比Canal,Maxwell相當於Canal Server+Canal Client。

安裝

配置MySQL

MySQL 開啟Binlog

#開啟binlog
#修改my.cnf配置檔案 增加如下內容
[[email protected] /root]# vim /etc/my.cnf

[mysqld]
#binlog檔案儲存目錄及binlog檔名字首
#binlog檔案儲存目錄: /var/lib/mysql/ #binlog檔名字首: mysql-binlog #mysql向檔名字首新增數字字尾來按順序建立二進位制日誌檔案 如mysql-binlog.000006 mysql-binlog.000007 log-bin=/var/lib/mysql/mysql-binlog #選擇基於行的日誌記錄方式 binlog-format=ROW #伺服器 id #binlog資料中包含server_id,標識該資料是由那個server同步過來的 server_id=1

MySQL 配置許可權

CREATE USER 'maxwell_sync'@'%' IDENTIFIED BY
'maxwell_sync_1';
-- Maxwell需要在待同步的庫上建立schema_database庫,將狀態儲存在`schema_database`選項指定的資料庫中(預設為`maxwell`) GRANT ALL on maxwell.* to 'maxwell_sync'@'%'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'maxwell_sync'@'%'; FLUSH PRIVILEGES;

MySQL 建庫建表

create database test_maxwell;
use test_maxwell;
create
table if not exists `user_info`( `userid` int, `name` varchar(100), `age` int )engine=innodb default charset=utf8;

配置Maxwell

下載解壓

[root@node2 /data/software]# wget https://github.com/zendesk/maxwell/releases/download/v1.17.1/maxwell-1.17.1.tar.gz

[root@node2 /data/software]# tar -zxvf maxwell-1.17.1.tar.gz

解析binlog並同步至kafka並配置監控

啟動maxwell
#輸入來源於mysql binlog 
#輸出到kafka
#配置說明
#1)kafka_topic 
#可配置成如 namespace_%{database}_%{table} %{database} 和 %{table}會被替換成真正的值
#2)kafka_version 
#注意和kafka版本匹配。
#3)額外配置 
#kafka.acks、kafka.compression.type、kafka.retries
#4)filter
#可排除庫、表、過濾掉某些行。也可用一段js靈活處理資料 
#如 exclude: test_maxwell.user_info.userid = 1 排除test_maxwell庫user_info表userid值為1的行
#5)monitor
#可配置的監控方式jmx、http等
#http_bind_address 監控繫結的IP
#http_port 監控繫結的Port
#http_path_prefix http請求字首

[[email protected] /data/software/maxwell-1.17.1]# bin/maxwell \
--host='localhost' \
--port=3306 \
--user='maxwell_sync' \
--password='maxwell_sync_1' \
--filter='exclude: *.*,include:test_maxwell.user_info,exclude: test_maxwell.user_info.userid = 1' \
--producer=kafka \
--kafka_version='0.11.0.1' \
--kafka.bootstrap.servers='node1:6667,node2:6667,node3:6667' \
--kafka_topic=qaTopic \
--metrics_type=http \
--metrics_jvm=true \
--http_bind_address=node2 \
--http_port=8090 \
--http_path_prefix=db_test_maxwell

#輸出到控制檯用如下配置

[[email protected] /data/software/maxwell-1.17.1]# bin/maxwell \
--host='localhost' \
--port=3306 \
--user='maxwell_sync' \
--password='maxwell_sync_1' \
--producer=stdout
kafka消費者客戶端
[[email protected] /usr/hdp/2.6.4.0-91/kafka]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.113.101:6667 --topic qaTopic
解析Insert
#sql insert 3條資料
mysql> insert into user_info(userid,name,age) values (1,'name1',10),(2,'name2',20),(3,'name3',30);

#kafka-console-consumer結果
#userid=1的資料被過濾掉了
{"database":"test_maxwell","table":"user_info","type":"insert","ts":1533857131,"xid":10571,"xoffset":0,"data":{"userid":2,"name":"name2","age":20}}
{"database":"test_maxwell","table":"user_info","type":"insert","ts":1533857131,"xid":10571,"commit":true,"data":{"userid":3,"name":"name3","age":30}}
解析Delete
#sql delete
mysql> delete from user_info where userid=2;

#kafka-console-consumer結果
{"database":"test_maxwell","table":"user_info","type":"delete","ts":1533857183,"xid":10585,"commit":true,"data":{"userid":2,"name":"name2","age":20}}
解析Update
#sql update
mysql> update user_info set name='name3',age=23 where userid=3;

#maxwell解析結果
{"database":"test_maxwell","table":"user_info","type":"update","ts":1533857219,"xid":10595,"commit":true,"data":{"userid":3,"name":"name3","age":23},"old":{"age":30}}
檢視監控
訊息處理速度與JVM
成功傳送到Kafka的訊息數、傳送失敗的訊息數
已從binlog處理的行數、消費binlog速度、jvm狀態
http://node2:8090/db_test_maxwell/metrics

返回:
counters: {
MaxwellMetrics.messages.failed: {
count: 0
},
MaxwellMetrics.messages.succeeded: {
count: 0
},
MaxwellMetrics.row.count: {
count: 84
}
}
......
maxwell健康狀態
http://node2:8090/db_test_maxwell/healthcheck

返回:
{
MaxwellHealth: {
healthy: true
}
}
ping
http://node2:8090/db_test_maxwell/pingping通返回字串pong

Maxwell優缺點

優點

(1) 相比較canal,配置簡單,開箱即用。
(2) 可自定義傳送目的地(java 繼承類,實現方法),資料處理靈活(js)。
(3) 自帶多種監控。

缺點

(1) 需要在待同步的業務庫上建schema_database庫(預設maxwell),用於存放元資料,如binlog消費偏移量。但按maxwell的意思,這並不是缺點。
(2) 不支援HA。而canal可通過zookeeper實現canal server和canal client的HA,實現failover。

相關推薦

資料收集binlog同步----Maxwell

簡介 Maxwell是由Java語言編寫,Zendesk開源的binlog解析同步工具。可通過簡單配置,將binlog解析並以json的格式同步到如file,kafka,redis,RabbitMQ等

資料收集Filebeat

  Filebeat採用Go語言開發,也可用於日誌收集,相較於的Logstash,更輕量,資源佔用更少。一般部署在日誌收集的最前端。   本文基於Filebeat 6.3.2總結。 設計要點 主要元件   Filebeat主要由兩大元件組成:

資料收集Flume

Flume最初由Cloudera開發,於2011年6月貢獻給Apache,於2012成為頂級專案。在孵化這一年,基於老版本的Flume(Flume OG:Flume Original Generation 即Flume 0.9.x版本)進行重構,摒棄了Zooke

資料收集DataX

DataX DataX是阿里開源的離線資料同步工具,可以實現包括 MySQL、Oracle、MongoDB、Hive、HDFS、HBase、Elasticsearch等各種異構資料來源之間的高效同步。 DataX原理 設計理念 為了解決異

資料收集Logstash

Logstash 之前用的Logstash快忘了,好記性不如爛筆頭,好好總結一下。 Logstash由Java(Core)+Ruby(Plugin)語言編寫,是一個開源的日誌收集、處理、轉發工具。 Input產生事件,Filter修改事件,Output將事件傳

實時抽取mysql資料工具maxwell

利用Maxwell元件實時監聽mysql的binlog日誌,並且把解析的json格式資料傳送到kafka視窗供實時消費 文件主題: 如何使用Maxwell實時監聽Mysql的binlog日誌,並且把解析的json格式資料傳送到kafka視窗 具體步驟 一:在linux環境下安裝部署好mysq

Asp.netCore安裝centos7 資料收集

虛擬機器的安裝和centos的安裝看博友的文章:https://www.cnblogs.com/zhaopei/p/netcore.html   https://www.centos.org/ centos安裝netcore 步驟 https://dotnet.microsoft.com/l

資料採集解析Mysql的binlog日誌傳送至Kafka實時消費

本文采用Maxwell來實現實時解析mysql的binlog日誌傳送至kafka 1、開啟mysql binlog 環境中mysql是docker容器,所以需要進入容器修改mysql配置.

Hadoop-No.15Flume基於事件的資料收集和處理

Flume是一種分散式的可靠開源系統,用於流資料的高效收集,聚集和移動.Flume通常用於移動日誌資料.但是也能移動大量事件資料.如社交媒體訂閱,訊息佇列事件或者網路流量資料. Flume架構

Mysqlbinlog日誌說明及利用binlog日誌恢復資料操作記錄

在網上找到的一篇文章,覺得寫得很好,怕下次找不到了,顧轉載之,原文連結:眾所周知,binlog日誌對於mysql資料庫來說是十分重要的。在資料丟失的緊急情況下,我們往往會想到用binlog日誌功能進行資料恢復(定時全備份+binlog日誌恢復增量資料部分),化險為夷!廢話不多

JVM高級特性-三、垃圾收集判斷對象存活算法

地方法 size none ava 裏的 結束 靜態屬性 概述 span 一、概述   運行時數據區中,程序計數器、虛擬機棧、本地方法棧都是隨線程而生隨線程而滅的   因此,他們的內存分配和回收是確定的,在方法或線程結束時就回收。而Java堆和方   法區則是不確定的

WordPress資料收集,以後整理

tiny log word res utm nbsp dao 指定 html WordPress主題開發:實現分頁功能 http://www.cnblogs.com/tinyphp/p/6361901.html WordPress如何調取顯示指定文章 https://w

python---scrapyMySQL同步存儲

相關操作 ces comment 操作數 字典 爬取 drop pre var 假設我們已經能獲取到item裏定義的字段的數據,接下來就需要保存item的數據到mysql數據庫. pipeline用來存儲item中的數據,將爬取到的數據進行二次處理 首先,要做的準備的工作,

信息收集DNS信息收集 -- dnsenum

域名信息收集 dnsenum 滲透思路dnsenum 由perl編寫的一款多線程的、可指定DNS服務器、支持域名爆破、支持不同網速情況下的工具調優、結果可導入到其他工具中使用的一款DNS信息收集工具。(網上大佬們都說可以用來查不連續的IP段,這是在說什麽呢?現在還沒有相關的認知,求解答)語法: dnse

Android_連接數據庫_資料收集

解析 註意 target 實用 學習筆記 microsoft family 服務 ims 1、http://blog.csdn.net/conowen/article/details/7435231/  (Android學習筆記(21)————利用JDBC連接服務器數據庫)

QT學習資料收集

++ baidu 實現 收集 hello tornado 學習之路 share 學習指南 幾個專欄 Qt學習之路(3):Hello, world!(續) - 豆子空間 - 51CTO技術博客 http://devbean.blog.51cto.com/448512/

軟件測試_資料收集

info 再看 測試 .com 層次 span 收集 經典 平臺 JMETER jmeter的擴展性實在是太強大了,涉及到各種數據庫,各種服務器,各種類型的接口,甚至是大數據平臺。想要吃透真的不是一兩年時間能做到的。 Selenium selenium的經典文檔不多,但是

資料收集】PCA降維

post hive ron str AD span clas htm logs 重點整理: PCA(Principal Components Analysis)即主成分分析,是圖像處理中經常用到的降維方法 1、原始數據: 假定數據是二維的 x=[2.5, 0.5, 2.2,

信息收集域名、IP互查

域名轉IP目的 Linux下通過shell終端查詢某域名的IP地址、通過IP地址查詢綁定的域名。並 整理返回結果,創建python工具。 環境 linux + 命令行 工具 1. ping 2. host 3. dig 4. nslookup 工具一:PING --- 簡單粗暴 使用ping命令發送一

信息收集DNS信息收集 -- fierce

fierce dns信息收集 目錄 工具描述 參數解釋 爆破子域名 自定義字典爆破子域名 反查指定範圍的IP段 反查指定域名附近的IP段 反查指定域名所在的C段IP 掃描優化:自定義超時時間、多線程 工具描述 Fierce是一款IP、域名互查的DNS工具,可進行域傳送漏洞檢測、字典爆破子域名、反查