1. 程式人生 > 實用技巧 >Logstash Oracle同步設定

Logstash Oracle同步設定

input {

  jdbc {
    #jdbc驅動包位置
    jdbc_driver_library => "D:\tools\elk\logstash-7.6.1\ojdbc8-12.2.0.1.jar"
    #jdbc驅動類
    jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
    # 資料庫相關配置
    jdbc_connection_string => "jdbc:oracle:thin:callcard/huawei123@//callcardsitoracle01.beta.hic.cloud:1521/callcardsit_srv"
    jdbc_user => "callcard"
    jdbc_password => "huawei123"
    # 是否清除sql_last_value的記錄,需要增量同步時此欄位必須為false;
    clean_run => true
    # 同步頻率(分 時 天 月 年),預設每分鐘同步一次;
    schedule => "*/10 * * * * *"

    use_column_value => true
    tracking_column_type => timestamp
    tracking_column => updated_at

    # 同步SQL
    statement =>"select *
from user n
       
where updated_at>:sql_last_value and updated_at<sysdate order by updated_at desc"
    # 索引型別,不需要指定type,否則會在同步ES後生成type欄位
    #type => "user"
    # 設定時區
    #jdbc_default_timezone =>"Asia/Shanghai"
  }

}
 
filter {
 # 刪除無用欄位
  mutate {   
          remove_field => "@timestamp"
          remove_field => "@version"
  }     

  # 時間+8個時區,思想是找臨時變數,最後+8後替換
  ruby {
        code => "event.set('@created_at', event.get('created_at').time.localtime + 8*60*60)"
  }       
  ruby {   
        code => "event.set('created_at',event.get('@created_at'))"
  }       
  mutate { 
          remove_field => ["@created_at"]
  }
}
output {
  stdout {
    codec => rubydebug
  }
  #if [type]=="user" {
    elasticsearch {
      # ES host:port
      hosts => ["127.0.0.1:9200"]
      #將mysql資料加入blog索引下,會自動建立
      index => "user"
      # 自增ID 需要關聯的資料庫中有有一個id欄位,對應索引的id號_id
      document_id => "%{id}"
    }
 # }
 
}