logstash sqlserver 多表關聯同步至ElasticSearch
阿新 • • 發佈:2021-11-23
最近負責了一個任務,將MS SQLServer的資料表關聯後同步至ES,主要參考官方文件中關於中使用logstash保持關係型資料庫與ES同步的文章,使用logstash的jdbc input,編寫sql進行資料同步。
但是實際處理時,由於ES提供的解決方案,僅提供了單表同步的思路,對於我這種多表聯合查詢的情況,沒有涉及。而且需要處理的表記錄都超過千萬行,逐行比較根本無法查詢出結果。
這個問題的處理方式,需要使用到SQLServer時間戳的特性:SQLServer中任意表中的新增或修改,資料行上的時間戳在當前資料庫範圍內的所有表中,都是最大的。藉助這個特性,可以利用SQL的Select Max Union
解決方案如下:
SQL:
SELECT CONVERT(BIGINT,TimeCurrentSet.current_max_time) AS max_timestamp, * FROM table1 A INNER JOIN table2 B ON A.ID = B.ID LEFT JOIN (SELECT MAX(Time) AS current_max_time FROM (SELECT MAX(time1) AS Time FROM table1 UNION SELECT MAX(time2) AS Time FROM table2) AS _time_union) AS TimeCurrentSet ON 1=1 LEFT JOIN (SELECT CONVERT(TIMESTAMP,CAST(:sql_last_value AS BIGINT)) AS last_max_time) AS TimeLastSet ON 1=1 WHERE A.time1 > TimeLastSet.last_max_time OR B.time2 > TimeLastSet.last_max_time
logstash pipline conf:
input { jdbc { jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver" jdbc_connection_string => "..." jdbc_user => "..." jdbc_password => "..." jdbc_paging_enabled => false jdbc_fetch_size => 2000 tracking_column => "max_timestamp" use_column_value => true tracking_column_type => "numeric" last_run_metadata_path => "..." schedule => "0,30 * * * * *" statement => "SELECT ..." ...
我在StackOverflow上的詳細回答:https://stackoverflow.com/a/70082172/7726468
轉載請註明出處: cnblogs.com/wswind