1. 程式人生 > >logstash mysql 準實時同步到 elasticsearch

logstash mysql 準實時同步到 elasticsearch

當前 下載 設置 hmm 結果 如果 6.2 metadata 使用

mysql 作為成熟穩定的數據持久化解決方案,廣泛地應用在各種領域,但是在數據分析方面稍有不足,而 elasticsearch 作為數據分析領域的佼佼者,剛好可以彌補這項不足,而我們要做的只需要將 mysql 中的數據同步到 elasticsearch 中即可,而 logstash 剛好就可以支持,所有你需要做的只是寫一個配置文件而已

logstash 獲取

獲取 logstash

wget https://artifacts.elastic.co/downloads/logstash/logstash-6.2.3.zip
unzip logstash-6.2.3.zip && cd logstash-6.2.3

安裝 jdbc 和 elasticsearch 插件

bin/logstash-plugin install logstash-input-jdbc && bin/logstash-plugin install logstash-output-elasticsearch; \

獲取 jdbc mysql 驅動

wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.46.zip && unzip mysql-connector-java-5.1.46.zip && \

編寫配置文件

logstash-input-jdbc

使用 logstash-input-jdbc 插件讀取 mysql 的數據,這個插件的工作原理比較簡單,就是定時執行一個 sql,然後將 sql 執行的結果寫入到流中,增量獲取的方式沒有通過 binlog 方式同步,而是用一個遞增字段作為條件去查詢,每次都記錄當前查詢的位置,由於遞增的特性,只需要查詢比當前大的記錄即可獲取這段時間內的全部增量,一般的遞增字段有兩種,AUTO_INCREMENT 的主鍵 idON UPDATE CURRENT_TIMESTAMPupdate_time 字段,id 字段只適用於那種只有插入沒有更新的表,update_time

更加通用一些,建議在 mysql 表設計的時候都增加一個 update_time 字段

input {
  jdbc {
    jdbc_driver_library => "../mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://<mysql_host>:3306/rta"
    jdbc_user => "<username>"
    jdbc_password => "<password>"
    schedule => "* * * * *"
    statement => "SELECT * FROM table WHERE update_time > :sql_last_value"
    use_column_value => true
    tracking_column_type => "timestamp"
    tracking_column => "update_time"
    last_run_metadata_path => "syncpoint_table"
  }
}
  • jdbc_driver_library: jdbc mysql 驅動的路徑,在上一步中已經下載
  • jdbc_driver_class: 驅動類的名字,mysql 填 com.mysql.jdbc.Driver 就好了
  • jdbc_connection_string: mysql 地址
  • jdbc_user: mysql 用戶
  • jdbc_password: mysql 密碼
  • schedule: 執行 sql 時機,類似 crontab 的調度
  • statement: 要執行的 sql,以 ":" 開頭是定義的變量,可以通過 parameters 來設置變量,這裏的 sql_last_value 是內置的變量,表示上一次 sql 執行中 update_time 的值
  • use_column_value: 使用遞增列的值
  • tracking_column_type: 遞增字段的類型,numeric 表示數值類型, timestamp 表示時間戳類型
  • tracking_column: 遞增字段的名稱,這裏使用 update_time 這一列,這列的類型是 timestamp
  • last_run_metadata_path: 同步點文件,這個文件記錄了上次的同步點,重啟時會讀取這個文件,這個文件可以手動修改

logstash-output-elasticsearch

output {
  elasticsearch {
    hosts => ["172.31.22.165", "172.31.17.241", "172.31.30.84", "172.31.18.178"]
    user => "<user>"
    password => "<password>"
    index => "table"
    document_id => "%{id}"
  }
}
  • hosts: es 集群地址
  • user: es 用戶名
  • password: es 密碼
  • index: 導入到 es 中的 index 名,這裏我直接設置成了 mysql 表的名字
  • document_id: 導入到 es 中的文檔 id,這個需要設置成主鍵,否則同一條記錄更新後在 es 中會出現兩條記錄,%{id} 表示引用 mysql 表中 id 字段的值

運行

把上面的代碼保存到一個配置文件裏面 sync_table.cfg,執行下面命令即可

cd logstash-6.2.3 && bin/logstash -f config/sync_table.cfg

如果成功了會在標準輸出輸出執行的 sql 語句

[2018-04-14T18:12:00,278][INFO ][logstash.inputs.jdbc     ] (0.001011s) SELECT version()
[2018-04-14T18:12:00,284][INFO ][logstash.inputs.jdbc     ] (0.000723s) SELECT * FROM table WHERE update_time > ‘2018-04-14 17:55:00‘

其他問題

多表同步

一個 logstash 實例可以借助 pipelines 機制同步多個表,只需要寫多個配置文件就可以了,假設我們有兩個表 table1 和 table2,對應兩個配置文件 sync_table1.cfgsync_table2.cfg

config/pipelines.yml 中配置

- pipeline.id: table1
  path.config: "config/sync_table1.cfg"
- pipeline.id: table2
  path.config: "config/sync_table2.cfg"

直接 bin/logstash 啟動即可

@timestamp 字段

默認情況下 @timestamp 字段是 logstash-input-jdbc 添加的字段,默認是當前時間,這個字段在數據分析的時候非常有用,但是有時候我們希望使用數據中的某些字段來指定這個字段,這個時候可以使用 filter.date, 這個插件是專門用來設置 @timestamp 這個字段的

比如我有我希望用字段 timeslice 來表示 @timestamptimeslice 是一個字符串,格式為 %Y%m%d%H%M

filter {
  date {
    match => [ "timeslice", "yyyyMMddHHmm" ]
    timezone => "Asia/Shanghai"
  }
}

把這一段配置加到 sync_table.cfg 中,現在 @timestamptimeslice 一致了

參考鏈接

  • logstash-input-jdbc 插件: https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html
  • logstash-output-elasticsearch 插件: https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html
  • logstash-multiple-piplines: https://www.elastic.co/blog/logstash-multiple-pipelines
  • logstash-filter-date 插件: https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html

轉載請註明出處
本文鏈接:http://www.hatlonely.com/2018/04/14/logstash-mysql-%E5%87%86%E5%AE%9E%E6%97%B6%E5%90%8C%E6%AD%A5%E5%88%B0-elasticsearch/

logstash mysql 準實時同步到 elasticsearch