Mysql 流增量寫入 Hdfs(一) --從 mysql 到 kafka
一. 概述
在大資料的靜態資料處理中,目前普遍採用的是用 Spark + Hdfs (Hive / Hbase) 的技術架構來對資料進行處理。
但有時候有其他的需求,需要從其他不同資料來源不間斷得采集資料,然後儲存到 Hdfs 中進行處理。而追加(append)這種操作在 Hdfs 裡面明顯是比較麻煩的一件事。所幸有了 Storm 這麼個流資料處理這樣的東西問世,可以幫我們解決這些問題。
不過光有 Storm 還不夠,我們還需要其他中介軟體來協助我們,讓所有其他資料來源都歸於一個通道。這樣就能實現不同資料來源以及 Hhdfs 之間的解耦。而這個中介軟體 Kafka 無疑是一個很好的選擇。
這樣我們就可以讓 Mysql 的增量資料不停得丟擲到 Kafka ,而後再讓 storm 不停得從 Kafka 對應的 Topic 讀取資料並寫入到 Hdfs 中。
二. 基本知識
2.1 Mysql binlog 介紹
binlog 即 Mysql 的二進位制日誌。它可以說是 Mysql 最重要的日誌了,它記錄了所有的DDL和DML(除了資料查詢語句)語句,以事件形式記錄,還包含語句所執行的消耗的時間,MySQL的二進位制日誌是事務安全型的。
上面所說的提到了 DDL 和 DML ,可能有些同學不瞭解,這裡順便說一下:
- DDL:資料定義語言DDL用來建立資料庫中的各種物件-----表、檢視、索引、同義詞、聚簇等如:CREATE TABLE/VIEW/INDEX/SYN/CLUSTER...
- DML:資料操縱語言DML主要有三種形式:插入(INSERT), 更新(UPDATE),以及刪除(DELETE)。
在 Mysql 中,binlog 預設是不開啟的,因為有大約 1% (官方說法)的效能損耗,如果要手動開啟,流程如下:
- vi編輯開啟mysql配置檔案:
vi /usr/local/mysql/etc/my.cnf
在[mysqld] 區塊設定/新增如下,
log-bin=mysql-bin
注意一定要在 [mysqld] 下。 2. 重啟 Mysql
pkill mysqld
/usr/local/mysql/bin/mysqld_safe --user=mysql &
2.2 kafka
這裡只對 Kafka 做一個基本的介紹,更多的內容可以度娘一波。
<img src="https://img2018.cnblogs.com/blog/1011838/201812/1011838-20181206075114483-251042842.png" width="70%">
上面的圖片是 kafka 官方的一個圖片,我們目前只需要關注 Producers 和 Consumers 就行了。
Kafka 是一個分散式釋出-訂閱訊息系統。分散式方面由 Zookeeper 進行協同處理。訊息訂閱其實說白了吧,就是一個佇列,分為消費者和生產者,就像上圖中的內容,有資料來源充當 Producer 生產資料到 kafka 中,而有資料充當 Consumers ,消費 kafka 中的資料。
<img src="https://img2018.cnblogs.com/blog/1011838/201812/1011838-20181206075127662-202539931.png" width="70%">
上圖中的 offset 指的是資料的寫入以及消費的位置的資訊,這是由 Zookeeper 管理的。也就是說,當 Consumers 重啟或是怎樣,需要重新從 kafka 讀取訊息時,總不能讓它從頭開始消費資料吧,這時候就需要有個記錄能告訴你從哪裡開始重新讀取。這就是 offset 。
kafka 中還有一個至關重要的概念,那就是 topic 。不過這個其實還是很好理解的,比如你要訂閱一些訊息,你肯定是不會訂閱所有訊息的吧,你只需要訂閱你感興趣的主題,比如攝影,程式設計,搞笑這些主題。而這裡主題的概念其實和 topic 是一樣的。總之,可以將 topic 歸結為通道,kafka 中有很多個通道,不同的 Producer 向其中一個通道生產資料,也就是拋資料進去這個通道,Comsumers 不停得消費通道中的資料。
而我們要做的就是將 Mysql binlog 產生的資料拋到 kafka 中充當作生產者,然後由 storm 充當消費者,不停得消費資料並寫入到 Hdfs 中。
至於怎麼將 binlog 的資料拋到 kafka ,別急,下面我們就來介紹。
2.3 maxwell
maxwell 這個工具可以很方便得監聽 Mysql 的 binlog ,**然後每當 binlog 發生變化時,就會以 json 格式丟擲對應的變化資料到 Kafka 中。**比如當向 mysql 一張表中插入一條語句的時候,maxwell 就會立刻監聽到 binlog 中有對應的記錄增加,然後將一些資訊包括插入的資料都轉化成 json 格式,然後拋到 kafka 指定的 topic 中。
除了 Kafka 外,其實 maxwell 還支援寫入到其他各種中介軟體,比如 redis。 同時 maxwell 是比較輕量級的工具,只需要在 mysql 中新建一個數據庫供它記錄一些資訊,然後就可以直接執行。
三. 使用 maxwell 監聽 binlog
接下來我們將的是如果使用 maxwell ,讓它監聽 mysql 的 binlog 並拋到 kafka 中。maxwell 主要有兩種執行方式。一種是使用配置檔案,另一種則是在命令列中新增引數的方式執行。這裡追求方便,只使用命令列的方式進行演示。
這裡介紹一下簡單的將資料拋到 kafka 的命令列指令碼吧。
bin/maxwell --user='maxwell' --password='XXXXXX' --host='127.0.0.1' \
--producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_topic=maxwell --port=3306
各項引數說明如下:
- user:mysql 使用者名稱
- password:mysql 密碼
- host:Mysql 地址
- producer:指定寫入的中介軟體型別,比如還有 redies
- kafka.bootstrap.servers:kafka 的地址
- kafka_topic:指明寫入到 kafka 哪個 topic
- port:mysql 埠
啟動之後,maxwell 便開始工作了,當然如果你想要讓這條命令可以在後臺執行的話,可以使用 Linux 的 nohup 命令,這裡就不多贅述,有需要百度即可。
這樣配置的話通常會將整個資料庫的增刪改都給拋到 kafka ,但這樣的需求顯然不常見,更常見的應該是具體監聽對某個庫的操作,或是某個表的操作。
在升級到 1.9.2(最新版本)後,maxwell 為我們提供這樣一個引數,讓我們可以輕鬆實現上述需求:--filter。
這個引數通常包含兩個配置項,exclude 和 include。意思就是讓你指定排除哪些和包含哪些。比如我只想監聽 Adatabase 庫下的 Atable 表的變化。我可以這樣。
--filter='exclude: *.*, include: Adatabase.Atable'
這樣我們就可以輕鬆實現監聽 mysql binlog 的變化,並可以定製自己的需求。
OK,這一章我們介紹了 mysql binlog ,kafka 以及 maxwell 的一些內容,下一篇我們將會看到 storm 如何寫入 hdfs 以及定製一些策略。see you~~
歡迎關注公眾號哈爾的資料城堡,裡面有資料,程式碼,以及深度的思考。