LogStash實現MySQL資料增量同步到ElasticSearch
阿新 • • 發佈:2020-12-27
需求的由來
在做一個專案的時候,需要一個搜尋的功能,剛開始想到的是用資料庫的模糊查詢,但是考慮到效率的問題,於是就用了ElasticSearch(ES),但是MySQL中的資料怎麼樣到ES中呢,我們可能會想到,MySQL可以實現主從複製,通過binary log檔案實現的,蒐集了一波資料發現,LogStash可以實現這個資料同步的功能,有增量和全量,如果資料只同步一次的話,可以使用全量同步,如果資料會有更新的話,可以使用增量同步,真是nice。
安裝LogStash
要用人家的功能,第一步當然是安裝此利器了,直接去官網下載即可
下載完之後解壓,由於是要跟MySQL打交道的,當然必須要有MySQL的相關配置了,第一步在bin的同級目錄下建立一個mysql目錄,然後將MySQL驅動放裡面即可,
下載完畢之後,解壓,將裡面的jar包拷貝到剛剛建立的mysql目錄下即可。
然後是在bin目錄下建立一個
logstash.conf
的檔案,內容如下:
input { # 多張表的同步只需要設定多個jdbc的模組就行了 jdbc { # mysql 資料庫連結 jdbc_connection_string => "jdbc:mysql://localhost:3306/newsblog?userSSL=true&useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai" # 驅動 jdbc_driver_library => "D:/elasticsearch/logstash-7.8.0/mysql/mysql-connector-java-8.0.22.jar" # 驅動類名 jdbc_driver_class => "com.mysql.cj.jdbc.Driver" # 使用者名稱和密碼 jdbc_user => "root" jdbc_password => "root" #設定監聽間隔 各欄位含義(由左至右)分、時、天、月、年,全部為*預設含義為每分鐘都更新 schedule => "*/2 * * * *" #直接執行sql語句 statement => "select * from blog where blogId > :sql_last_value" # 用其他欄位追蹤 use_column_value => true tracking_column => "blogid" # 記錄最新的同步的offset資訊,會自動建立該檔案 last_run_metadata_path => "D:/elasticsearch/syncpoint_table.txt" # 是否清空檔案 clean_run => false } } output { elasticsearch { #es的ip和埠 hosts => ["http://localhost:9200"] #ES索引名稱(自己定義的) index => "blog" #文件型別 document_type => "_doc" #設定資料的id為資料庫中的欄位,這裡都是小寫 document_id => "%{blogid}" } stdout { codec => json_lines } }
啟動
在啟動ES的前提下,啟動LogStash,在bin目錄下執行命令
logstash -f logstash.conf
然後就會根據配置來進行同步資料了,會生成一個記錄offset檔案。
然後通過Kibana可以進行查詢到資料。