1. 程式人生 > >kafka原始碼系列之mysql資料增量同步到kafka

kafka原始碼系列之mysql資料增量同步到kafka

一,架構介紹

生產中由於歷史原因web後端,mysql叢集,kafka叢集(或者其它訊息佇列)會存在一下三種結構。

1,資料先入mysql叢集,再入kafka

資料入mysql叢集是不可更改的,如何再高效的將資料寫入kafka呢?

A),在表中存在自增ID的欄位,然後根據ID,定期掃描表,然後將資料入kafka

B),有時間欄位的,可以按照時間欄位定期掃描入kafka叢集。

C),直接解析binlog日誌,然後解析後的資料寫入kafka

640?wx_fmt=png

2web後端同時將資料寫入kafkamysql叢集

640?wx_fmt=png

3web後端將資料先入kafka,再入mysql叢集

這個方式,有很多優點,比如可以用kafka解耦,然後將資料按照離線儲存和計算,實時計算兩個模組構建很好的大資料架構。抗高峰,便於擴充套件等等。

640?wx_fmt=png

二,實現步驟

1mysql安裝準備

安裝mysql估計看這篇文章的人都沒什麼問題,所以本文不具體講解了。

A),假如你單機測試請配置好server_id

B),開啟binlog,只需配置log-bin

[[email protected] ~]# cat /etc/my.cnf

[mysqld]

server_id=1

datadir=/var/lib/mysql

socket=/var/lib/mysql/mysql.sock

user=mysql

# Disabling symbolic-links is recommended to prevent assorted security risks

symbolic-links=0

log-bin=/var/lib/mysql/mysql-binlog

[mysqld_safe]

log-error=/var/log/mysqld.log

pid-file=/var/run/mysqld/mysqld.pid

建立測試庫和表

create database school character set utf8 collate utf8_general_ci;

create table student(

name varchar(20) not null comment '姓名',

sid int(10) not null primary key comment '學員',

majora varchar(50) not null default '' comment '專業

',

tel varchar(11) not null unique key comment '手機號',

birthday date not null comment '出生日期'

);

2binlog日誌解析

兩種方式:

一是掃面binlog檔案(有需要的話請聯絡浪尖)

二是通過複製同步的方式

暫實現了第二種方式,樣例程式碼如下:

MysqlBinlogParse mysqlBinlogParse = new MysqlBinlogParse(args[0],Integer.valueOf(args[1]),args[2],args[3]){  @Override  public void processDelete(String queryType, String database, String sql) {try {      String jsonString = SqlParse.parseDeleteSql(sql);JSONObject jsonObject = JSONObject.fromObject(jsonString);jsonObject.accumulate("database", database);jsonObject.accumulate("queryType", queryType);System.out.println(sql);System.out.println(" ");System.out.println(" ");System.out.println(jsonObject.toString());} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();}  }  @Override  public void processInsert(String queryType, String database, String sql) {try {      String jsonString = SqlParse.parseInsertSql(sql);JSONObject jsonObject = JSONObject.fromObject(jsonString);jsonObject.accumulate("database", database);jsonObject.accumulate("queryType", queryType);System.out.println(sql);System.out.println(" ");System.out.println(" ");System.out.println(jsonObject.toString());} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();}  }  @Override  public void processUpdate(String queryType, String database, String sql) {    String jsonString;try {      jsonString = SqlParse.parseUpdateSql(sql);JSONObject jsonObject = JSONObject.fromObject(jsonString);jsonObject.accumulate("database", database);jsonObject.accumulate("queryType", queryType);System.out.println(sql);System.out.println(" ");System.out.println(" ");System.out.println(jsonObject.toString());} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();}  }};mysqlBinlogParse.setServerId(3);mysqlBinlogParse.start();

3sql語法解析

從原始的mysql binlog event中,我們能解析到的資訊,主要的也就是mysqldatabasequery型別(INSERT,DELETE,UPDATE),具體執行的sql。我這裡封裝了三個重要的方法。只暴露了這三個介面,那麼我們要明白的事情是,我們入kafka,然後流式處理的時候希望的到的是跟插入mysql後一樣格式的資料。這個時候我們就要自己做sql的解析,將querysql解析成欄位形式的資料,供流式處理。解析的格式如下:

A),INSERT

640?wx_fmt=png

B),DELETE

640?wx_fmt=png

C),UPDATE

640?wx_fmt=png

最終浪尖是將解析後的資料封裝成了json,然後我們自己寫kafka producer將訊息傳送到kafka,後端就可以處理了。

三,總結

最後,浪尖還是建議web後端資料最好先入訊息佇列,如kafka,然後分離線和實時將資料進行解耦分流,用於實時處理和離線處理。

訊息佇列的訂閱者可以根據需要隨時擴充套件,可以很好的擴充套件資料的使用者。

訊息佇列的橫向擴充套件,增加吞吐量,做起來還是很簡單的。這個用傳統資料庫,分庫分表還是很麻煩的。

由於訊息佇列的存在,也可以幫助我們抗高峰,避免高峰時期後端處理壓力過大導致整個業務處理宕機。

具體原始碼球友可以在知識星球獲取。

歡迎大家進入知識星球,學習更多更深入的大資料知識,面試經驗,獲取更多更詳細的資料。

640?wx_fmt=jpeg