SQL資料同步到ElasticSearch(三)- 使用Logstash+LastModifyTime同步資料
在系列開篇,我提到了四種將SQL SERVER資料同步到ES中的方案,本文將採用最簡單的一種方案,即使用LastModifyTime來追蹤DB中在最近一段時間發生了變更的資料。
安裝Java
安裝部分的官方文件在這裡:https://www.elastic.co/guide/en/logstash/current/installing-logstash.html
可以直接檢視官方文件。
我這裡使用的還是之前文章中所述的CentOS來進行安裝。
首先需要安裝Java(萬物源於Java)
輸入命令找到的OpenJDK 1.8.X版本(截止我嘗試時,在Java11上會有問題):
yum search java | grep -i --color JDK
使用Yum進行安裝:
yum install java-1.8.0-openjdk
配置環境變數JAVA_HOME、CLASSPATH、PATH。
開啟/etc/profile檔案:
vi /etc/profile
將下面幾行程式碼貼上到該檔案的最後:
--這句要自己到/usr/lib/jvm下面找對應的目錄,不能直接copyexport JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b10-0.el7_6.x86_64/ export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export PATH=$PATH:$JAVA_HOME/bin
儲存並關閉,然後執行下列命令讓設定立即生效。
source /etc/profile
可以輸入下面的命令檢視是否已生效:
java –-version
echo $JAVA_HOME echo $CLASSPATH echo $PATH
安裝LogStash
首先註冊ELK官方的GPG-KEY:
rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
然後cd /etc/yum.repos.d/資料夾下,建立一個logstash.repo檔案,並將下面一段內容貼上到該檔案中儲存:
[logstash-7.x] name=Elastic repository for 7.x packages baseurl=https://artifacts.elastic.co/packages/7.x/yum gpgcheck=1 gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch enabled=1 autorefresh=1 type=rpm-md
然後執行安裝命令:
sudo yum install logstash
以上步驟可能比較慢,還有另外一種辦法,就是通過下載來安裝LogStash:
官方文件在這裡:https://www.elastic.co/cn/downloads/logstash
首先在上面的連結中下載LogStash的tar.gz包,這個過程有可能也很慢,我的解決方案是在自己機器上使用迅雷進行下載,完事兒Copy到Linux伺服器中。
下載完成後,執行解壓操作:
sudo tar -xvf logstash-7.2.0.tar.gz
解壓完成後,進入解壓後的logstash-7.2.0資料夾。
接著我們安裝Logstash-input-jdbc外掛:
bin/logstash-plugin install logstash-input-jdbc
下載SQL SERVER jbdc元件,這裡我們從微軟官網下載:https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-2017 ,當然這個連結只是目前的,如果你在嘗試時這個連結失效了,那就自行百度搜索吧~
下載完成後,解壓到logstash下面的lib目錄下,這裡我自己為了方便,把微軟預設給jdbc外面包的一層語言名稱的資料夾給去掉了。
接著,我們到/config資料夾,新建一個logstash.conf檔案,內容大概如下:
下面的每一個引數含義都可以在官方文件中找到:
- input->jdbc大括號中的引數文件:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html#plugins-inputs-jdbc-jdbc_fetch_size
- output->elasticsearch 大括號中的文件:https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html
input { jdbc { jdbc_driver_library => "/usr/local/logstash-7.2.0/lib/mssql-jdbc-7.2.2/mssql-jdbc-7.2.2.jre8.jar" // 這裡請靈活應變,能找到我們上一步下載的jdbc jar包即可 jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver" // 這個名字是固定的 jdbc_connection_string => "jdbc:sqlserver: //資料庫ServerIP:1433;databaseName=資料庫名;" jdbc_user => "資料庫賬號" jdbc_password => "資料庫密碼" schedule => "* * * * *" // Corn 表示式,請自行百度寫法 jdbc_default_timezone => "Asia/Shanghai" jdbc_page_size => "500" // 每一批傳輸的數量 record_last_run => "true" //是否儲存狀態 use_column_value => "true" //設定為時true,使用定義的 tracking_column值作為:sql_last_value。設定為時false,:sql_last_value反映上次執行查詢的時間。 tracking_column => "LastModificationTime" //配合use_column_value使用 last_run_metadata_path => "/usr/opt/logstash/config/last_id" //記錄:sql_last_value的檔案 lowercase_column_names => "false" //將DB中的列名自動轉換為小寫 tracking_column_type => "timestamp" //tracking_column的資料型別,只能是numberic和timestamp clean_run => "false" //是否應保留先前的執行狀態,其實我也不知道這個欄位幹啥用的~~ statement => "SELECT * FROM 表 WITH(NOLOCK) WHERE LastModificationTime > :sql_last_value" //從DB中抓資料的SQL指令碼 } } output { elasticsearch { index => "test" //ES叢集的索引名稱 document_id => "%{Id}" //Id是表裡面的主鍵,為了拿這個主鍵在ES中生成document ID hosts => ["http://192.168.154.135:9200"]// ES叢集的地址 } }
上面的被註釋搞的亂糟糟的,給你們一個可以複製的版本吧:
input { jdbc { jdbc_driver_library => "/usr/local/logstash-7.2.0/lib/mssql-jdbc-7.2.2/mssql-jdbc-7.2.2.jre8.jar" jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver" jdbc_connection_string => "jdbc:sqlserver://SERVER_IP:1433;databaseName=DBName;" jdbc_user => "xxx" jdbc_password => "password" schedule => "* * * * *" jdbc_default_timezone => "Asia/Shanghai" jdbc_page_size => "50000" record_last_run => "true" use_column_value => "true" tracking_column => "LastModificationTime" last_run_metadata_path => "/usr/local/logstash-7.2.0/config/last_id" lowercase_column_names => "false" tracking_column_type => "timestamp" clean_run => "false" statement => "SELECT * FROM xxx WITH(NOLOCK) WHERE LastModificationTime > :sql_last_value" } } output { elasticsearch { index => "item" document_id => "%{Id}" hosts => ["http://ES叢集IP:9200"] } }
Logstash 整體思路
回頭來說一下這個LogStash的整體思路吧,其實我的理解,LogStash就是一個數據搬運工,他的搬運資料,分為三個大的階段:
- 讀取資料(input)
- 過濾資料(filter)
- 輸出資料(output)
對應的官方文件:https://www.elastic.co/guide/en/logstash/current/pipeline.html
而這每一個階段,都是通過一些外掛來實現的,比如在上述的配置檔案中,我們有:
- 讀取資料即input部分,這部分由於我們是需要從資料庫讀取資料,所以使用了一個可以執行SQL語句的jdbc-input外掛,這裡如果我們的資料來源是其他的部分,就需要使用其他的一些外掛來實現。
- 也有輸出資料部分,這部分我們是將資料寫入到ElasticSearch,所以我們使用了一個elasticsearch-output外掛。這裡也可以將資料寫入到kafka等其他的一些產品中,也是需要一些外掛即可搞定。
- 可以發現我們上面的部分沒有涉及到filter外掛,其實如果我們想對資料做一些過濾、規範化處理等,都可以使用filter外掛來進行處理,具體的還需要進一步去探索啦~
執行資料同步
剩下的部分就簡單了,切換目錄到logstash的目錄下,執行命令:
bin/logstash -f config/logstash.conf
最後執行的效果圖大概如下:
可以使用Elasticsearch-Head等外掛來檢視是否同步正常:
大概就是這樣啦,後續我這邊會繼續嘗試使用其他方式來進行資料同步,歡迎大家關