1. 程式人生 > 實用技巧 >canal+kafka+logstash+es 架構 logstash的配置

canal+kafka+logstash+es 架構 logstash的配置

網上搜了很多資料都沒有看到配置 剛接觸logstash 寫了一個配置 分享給 剛入手的小白們 少些煩惱 只針對 canal 產生的資料 進行的過濾 如果是其他方式的 可以參考處理

input {
    kafka{
            bootstrap_servers => ["127.0.0.1:9092"]
            #group_id => "goodsoptiones"
             # 拉取最近資料
             auto_offset_reset => "latest"
             # boolean (optional), 
default: false #logstash 啟動後從什麼位置開始讀取資料,預設是結束位置,也就是說 logstash 程序會以從上次讀取結束時的偏移量開始繼續讀取,如果之前沒有消費過,那麼就開始從頭讀取.如果你是要匯入原有資料,把這個設定改成 "true" #reset_beginning => true # 使用的執行緒數 #consumer_threads => 5 topics => ["test"] codec
=> json {charset => "UTF-8"} } } filter{ split{ field => "data" } ruby{ code => " event.get('data').each {|k, v| event.set(k, v) } event.remove('data') " } translate{ field
=> "type" destination => "op_type" dictionary =>[ "INSERT","index", "UPDATE","update", "DELETE","delete" ] } mutate{ remove_field =>["database","pkNames","mysqlType","table","sql","sqlType","old","es","isDdl","ts","type"] } } output { #file { # path => "D:/zip/logstash-7.8.0/logs/kafkalog.log" # flush_interval => 0 #} stdout { codec => rubydebug } elasticsearch { hosts => ["http://localhost:9200"] index => "goodsoption" document_type => "_doc" document_id => "%{id}" #document_id => "142800" action=>"%{op_type}" #action=>"update" #user => "elastic" #password => "changeme" } }