logstash-input-jdbc增量、全量資料同步
阿新 • • 發佈:2018-12-16
一、場景
筆者在mysql資料同步到ES中,發現第一次同步時需要全量的資料,之後則需要定時去同步增量資料,所以筆者提供增量和全量同步的conf供讀者參考
二、解決方案
1、全量資料同步
具體如何執行可參考https://blog.csdn.net/w_linux/article/details/84555506,這裡提供conf的配置
input { jdbc { jdbc_driver_library => "./config/mysql-connector-java-5.1.39.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" # 資料庫相關配置 jdbc_connection_string => "jdbc:ip:port/jdcomm?characterEncoding=UTF-8&useSSL=false" jdbc_user => "root" jdbc_password => "password" statement => "SELECT * FROM blwjxb_z" jdbc_paging_enabled => "true" jdbc_page_size => "50000" schedule => "*/10 * * * *" } } filter { json { source => "message" remove_field => ["message"] } } output { stdout { codec => rubydebug } elasticsearch { hosts => "localhost" #將mysql資料加入myindex索引下,會自動建立 index => "myindex" # 自增ID 需要關聯的資料庫中有有一個id欄位,對應索引的id號 document_id => "%{id}" } }
2、增量資料同步
input { jdbc { jdbc_driver_library => "./config/mysql-connector-java-5.1.39.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" # 資料庫相關配置 jdbc_connection_string => "jdbc:ip:port/jdcomm?characterEncoding=UTF-8&useSSL=false" jdbc_user => "root" jdbc_password => "password" statement => "SELECT * FROM blwjxb_z where id > :sql_last_value" #使用其它欄位追蹤,而不是用時間 use_column_value => true #追蹤的欄位 tracking_column => id record_last_run => true #上一個sql_last_value值的存放檔案路徑, 必須要在檔案中指定欄位的初始值 last_run_metadata_path => "./config/station_parameter.txt" jdbc_paging_enabled => "true" jdbc_page_size => "50000" schedule => "* * * * *" } } filter { json { source => "message" remove_field => ["message"] } } output { stdout { codec => rubydebug } elasticsearch { hosts => "localhost" #將mysql資料加入myindex索引下,會自動建立 index => "myindex" # 自增ID 需要關聯的資料庫中有有一個id欄位,對應索引的id號 document_id => "%{id}" } }
思路就是每次指令碼定時執行的時候會去找id>station_parameter.txt中設定的數值,每次增量資料同步後,station_parameter.txt中的數值會自動更新。
station_parameter.txt中的資料初始如下圖