1. 程式人生 > >logstash jdbc 增量同步到kafka

logstash jdbc 增量同步到kafka

logstash利用jdcb增量同步資料庫說明

配置說明

做增量同步主要是更具sql_last_value的值做判斷,如果符合條件才開始同步

配置檔案如下:

input {
      stdin {
    }
    jdbc {
      #資料庫地址
      jdbc_connection_string => "jdbc:oracle:thin:@//localhost/zaradb"
      jdbc_user => "game"
      jdbc_password => "PRVXWFxSOa" 
      #資料庫驅動路徑
      jdbc_driver_library => "/logstash-6.2.4/ojdbc7.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver" #sql路徑 statement_filepath => "/logstash-6.2.4/pay_test.sql" #是否開啟記錄追蹤 record_last_run => "true" #是否需要追蹤欄位,如果為true,則需要指定tracking_column,預設是timestamp use_column_value => "true" #指定追蹤的欄位
tracking_column => "pay_kafkatime" #追蹤欄位的型別,目前只有數字和時間型別,預設是數字型別 tracking_column_type => "timestamp" #設定時區 jdbc_default_timezone =>"Asia/Shanghai" #是否每次清除last_run_metadata_path的內容 clean_run => "false" #這裡可以手動設定:sql_last_value的值,預設時間是1970-01-01,預設數字是0
last_run_metadata_path => "/logstash-6.2.4/logstash_jdbc_last_run" #多久同步一次 schedule => "*/5 * * * *" #是否分頁 jdbc_paging_enabled => "true" jdbc_page_size => "50000" } } filter { json { source => "message" remove_field => ["message"] } mutate { #指定要刪除的欄位 remove_field => "@version" remove_field => "@timestamp" } } output { # stdout { codec => rubydebug } kafka { #kafka topic topic_id => "test" #kafka地址 bootstrap_servers => "localhost:9092" #json互動 codec => "json" #client.id client_id => "test2" } }

注意事項

外掛將以sql_last_value儲存在配置中的元資料檔案的形式持久儲存引數last_run_metadata_path。在查詢執行後,該檔案將被更新為當前值sql_last_value。下一次管道啟動時,該值將通過從檔案中讀取來更新。如果 clean_run設定為true,則該值將被忽略,sql_last_value並將設定為1970年1月1日,如果use_column_value為true,則為0 ,就像沒有執行任何查詢一樣。

如果資料庫是非實時資料庫,不能以時間作為追蹤屬性,不然同步的資料要麼多要麼少。