logstash-input-jdbc外掛實現關係型資料庫和ES增量同步
阿新 • • 發佈:2019-01-05
環境安裝
從官網下載你要部署環境的相應安裝包,這裡以linux系統為例。
選擇與你的ES相同版本的logstash的tar.gz包,上傳解壓,進入解壓目錄下測試下。
使用bin/logstash -e 'input { stdin { } } output { stdout {} }'
,啟動後輸入任意內容後,如果有返回則表示安裝成功
使用logstash-input-jdbc外掛
先使用bin/logstash-plugin list
檢視下已安裝好的外掛一般5.X以後的版本都會預設安裝好此外掛。
下面羅列一下,要實現增量同步需要的東西,以orcal為例:
- orcal連線jar包
- 能搜尋新增值的sql
我的sql: :sql_last_value是上次同步的最後值
select *
from drc_policy_publish p
where record_no > :sql_last_value
order by record_no
接下類建立一個啟動的conf檔案,vi conf/orcal.conf
,裡面配置如下
input {
jdbc {
jdbc_connection_string => "jdbc:oracle:thin:@//ip:1521/orcl"
# 使用者名稱和密碼
jdbc_user => "user"
jdbc_password => "password"
# 驅動,準備的連線jar包位置
jdbc_driver_library => "/opt/elk/logstash-5.6.8/orcal/ojdbc-6.jar"
# 驅動類名
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
#配置一次同步的最大數量
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
# 執行的sql 檔案路徑+名稱
statement_filepath => "/opt/elk/logstash-5.6.8/orcal/sql/policy.sql"
# 設定監聽間隔 各欄位含義(由左至右)分、時、天、月、年,全部為*預設含義為每分鐘都更新
schedule => "* * * * *"
# 是否記錄上次執行結果, 如果為真,將會把上次執行到的 tracking_column 欄位的值記錄下來,儲存到 last_run_metadata_path 指定的檔案中
record_last_run => "true"
# 是否需要記錄某個column 的值,如果record_last_run為真,可以自定義我們需要 track 的 column 名稱,此時該引數就要為 true. 否則預設 track 的是 timestamp 的值.
use_column_value => "true"
# 如果 use_column_value 為真,需配置此引數. track 的資料庫 column 名,該 column 必須是遞增的. 一般是mysql主鍵
tracking_column => "record_no"
#用於儲存上次同步的最後值,先新建,輸入0比較好,不然是使用null去比較
last_run_metadata_path => "/opt/elk/logstash-5.6.8/orcal/last_id"
# 是否清除 last_run_metadata_path 的記錄,如果為真那麼每次都相當於從頭開始查詢所有的資料庫記錄
clean_run => "false"
# 是否將 欄位(column) 名稱轉小寫
lowercase_column_names => "true"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
elasticsearch {
hosts => "ip:9200"
index => "orcal"
document_type => "drc_polic_publish"
# 將"_id"的值設為資料庫主鍵
document_id => "%{record_no}"
}
}
如果你的表資料中沒有遞增的列,那也可以使用預設的timestamp 值,不過你需要將es中_id的值也設定成timestamp的值。如果讓es自己來建立_id,則會一直插入資料,因為它無法判斷哪些是新值。
在啟動前,你需要先在ES中建立好索引的對映,不然就都要使用預設映射了。
接下來啟動使用bin/logstash -f config/orcal.conf
命令啟動,可以看到兩次的搜尋引數是不同的
可以看到資料已經放進去了
也可以去Discover,把主鍵欄位排下序,檢視最新的同步資料是什麼
上述的啟動方式,會佔用命令列,如果驗證好自己的配置沒有問題後,可以修改schedule引數,比如schedule => “* 8 * * *”,每八小時同步一次,然後使用nohup命令後臺執行logstash nohup bin/logstash -f conf/orcal.conf &