logstash jdbc 增量同步到kafka
阿新 • • 發佈:2019-01-05
logstash利用jdcb增量同步資料庫說明
配置說明
做增量同步主要是更具sql_last_value的值做判斷,如果符合條件才開始同步
配置檔案如下:
input {
stdin {
}
jdbc {
#資料庫地址
jdbc_connection_string => "jdbc:oracle:thin:@//localhost/zaradb"
jdbc_user => "game"
jdbc_password => "PRVXWFxSOa"
#資料庫驅動路徑
jdbc_driver_library => "/logstash-6.2.4/ojdbc7.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
#sql路徑
statement_filepath => "/logstash-6.2.4/pay_test.sql"
#是否開啟記錄追蹤
record_last_run => "true"
#是否需要追蹤欄位,如果為true,則需要指定tracking_column,預設是timestamp
use_column_value => "true"
#指定追蹤的欄位
tracking_column => "pay_kafkatime"
#追蹤欄位的型別,目前只有數字和時間型別,預設是數字型別
tracking_column_type => "timestamp"
#設定時區
jdbc_default_timezone =>"Asia/Shanghai"
#是否每次清除last_run_metadata_path的內容
clean_run => "false"
#這裡可以手動設定:sql_last_value的值,預設時間是1970-01-01,預設數字是0
last_run_metadata_path => "/logstash-6.2.4/logstash_jdbc_last_run"
#多久同步一次
schedule => "*/5 * * * *"
#是否分頁
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
mutate {
#指定要刪除的欄位
remove_field => "@version"
remove_field => "@timestamp"
}
}
output {
# stdout { codec => rubydebug }
kafka {
#kafka topic
topic_id => "test"
#kafka地址
bootstrap_servers => "localhost:9092"
#json互動
codec => "json"
#client.id
client_id => "test2"
}
}
注意事項
外掛將以sql_last_value儲存在配置中的元資料檔案的形式持久儲存引數last_run_metadata_path。在查詢執行後,該檔案將被更新為當前值sql_last_value。下一次管道啟動時,該值將通過從檔案中讀取來更新。如果 clean_run設定為true,則該值將被忽略,sql_last_value並將設定為1970年1月1日,如果use_column_value為true,則為0 ,就像沒有執行任何查詢一樣。
如果資料庫是非實時資料庫,不能以時間作為追蹤屬性,不然同步的資料要麼多要麼少。