1. 程式人生 > 其它 >logstash sqlserver 多表關聯同步至ElasticSearch

logstash sqlserver 多表關聯同步至ElasticSearch

最近負責了一個任務,將MS SQLServer的資料表關聯後同步至ES,主要參考官方文件中關於中使用logstash保持關係型資料庫與ES同步的文章,使用logstash的jdbc input,編寫sql進行資料同步。
但是實際處理時,由於ES提供的解決方案,僅提供了單表同步的思路,對於我這種多表聯合查詢的情況,沒有涉及。而且需要處理的表記錄都超過千萬行,逐行比較根本無法查詢出結果。

這個問題的處理方式,需要使用到SQLServer時間戳的特性:SQLServer中任意表中的新增或修改,資料行上的時間戳在當前資料庫範圍內的所有表中,都是最大的。藉助這個特性,可以利用SQL的Select Max Union

語句來獲取當前所有join的表中的最大時間戳。再將該時間戳轉換為數值型,記錄在logstash的sql_last_value中,用於新資料的篩選。

解決方案如下:

SQL:

SELECT 
CONVERT(BIGINT,TimeCurrentSet.current_max_time) AS max_timestamp,
*
FROM table1 A INNER JOIN table2 B 
ON A.ID = B.ID
LEFT JOIN (SELECT MAX(Time) AS current_max_time FROM (SELECT MAX(time1) AS Time FROM table1 UNION SELECT MAX(time2) AS Time FROM table2) AS _time_union) AS TimeCurrentSet
ON 1=1
LEFT JOIN (SELECT CONVERT(TIMESTAMP,CAST(:sql_last_value AS BIGINT)) AS last_max_time) AS TimeLastSet
ON 1=1
WHERE
A.time1 > TimeLastSet.last_max_time OR B.time2 > TimeLastSet.last_max_time

logstash pipline conf:

input {
    jdbc {
        jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
        jdbc_connection_string => "..."
        jdbc_user => "..."
        jdbc_password => "..."
        jdbc_paging_enabled => false
        jdbc_fetch_size => 2000
        tracking_column => "max_timestamp"
        use_column_value => true
        tracking_column_type => "numeric"
        last_run_metadata_path => "..."
        schedule => "0,30 * * * * *"
        statement => "SELECT ..."
...

我在StackOverflow上的詳細回答:https://stackoverflow.com/a/70082172/7726468

轉載請註明出處: cnblogs.com/wswind