1. 程式人生 > 資料庫 >LogStash實現MySQL資料增量同步到ElasticSearch

LogStash實現MySQL資料增量同步到ElasticSearch

需求的由來

在做一個專案的時候,需要一個搜尋的功能,剛開始想到的是用資料庫的模糊查詢,但是考慮到效率的問題,於是就用了ElasticSearch(ES),但是MySQL中的資料怎麼樣到ES中呢,我們可能會想到,MySQL可以實現主從複製,通過binary log檔案實現的,蒐集了一波資料發現,LogStash可以實現這個資料同步的功能,有增量和全量,如果資料只同步一次的話,可以使用全量同步,如果資料會有更新的話,可以使用增量同步,真是nice。

安裝LogStash
要用人家的功能,第一步當然是安裝此利器了,直接去官網下載即可
在這裡插入圖片描述
下載完之後解壓,由於是要跟MySQL打交道的,當然必須要有MySQL的相關配置了,第一步在bin的同級目錄下建立一個mysql目錄,然後將MySQL驅動放裡面即可,

在這裡插入圖片描述
下載完畢之後,解壓,將裡面的jar包拷貝到剛剛建立的mysql目錄下即可。
在這裡插入圖片描述
然後是在bin目錄下建立一個logstash.conf的檔案,內容如下:

input {
  # 多張表的同步只需要設定多個jdbc的模組就行了
  jdbc {
      # mysql 資料庫連結
      jdbc_connection_string => "jdbc:mysql://localhost:3306/newsblog?userSSL=true&useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"
      # 驅動
      jdbc_driver_library => "D:/elasticsearch/logstash-7.8.0/mysql/mysql-connector-java-8.0.22.jar"
      # 驅動類名
      jdbc_driver_class => "com.mysql.cj.jdbc.Driver"      
      # 使用者名稱和密碼
      jdbc_user => "root"
      jdbc_password => "root"
      #設定監聽間隔  各欄位含義(由左至右)分、時、天、月、年,全部為*預設含義為每分鐘都更新
      schedule => "*/2 * * * *"
      #直接執行sql語句
      statement => "select * from blog where blogId > :sql_last_value"
      # 用其他欄位追蹤
      use_column_value => true
      tracking_column => "blogid"
      # 記錄最新的同步的offset資訊,會自動建立該檔案
      last_run_metadata_path => "D:/elasticsearch/syncpoint_table.txt"
      # 是否清空檔案
      clean_run => false
    }

}


output {
  elasticsearch {
        #es的ip和埠
        hosts => ["http://localhost:9200"]
        #ES索引名稱(自己定義的)
        index => "blog"
        #文件型別
        document_type => "_doc"
        #設定資料的id為資料庫中的欄位,這裡都是小寫
        document_id => "%{blogid}"
    }
    stdout {
        codec => json_lines
    }
}

啟動
在啟動ES的前提下,啟動LogStash,在bin目錄下執行命令
logstash -f logstash.conf
然後就會根據配置來進行同步資料了,會生成一個記錄offset檔案。
在這裡插入圖片描述
然後通過Kibana可以進行查詢到資料。
在這裡插入圖片描述