logstash同步mysql到elasticsearch
阿新 • • 發佈:2018-12-10
運行 就是 shang rom _id 1.4 page maven2 http 更加通用一些,建議在 mysql 表設計的時候都增加一個
logstash 獲取
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.5.2.zip
unzip logstash-6.5.2.zip && cd logstash-6.5.2
安裝 jdbc 和 elasticsearch 插件
bin/logstash-plugin install logstash-input-jdbc
bin/logstash-plugin install logstash-output-elasticsearch
獲取 jdbc mysql 驅動
wget http://central.maven.org/maven2/mysql/mysql-connector-java/5.1.47/mysql-connector-java-5.1.47.jar unzip mysql-connector-java-5.1.47.zip
編寫配置文件
logstash-input-jdbc
使用 logstash-input-jdbc 插件讀取 mysql 的數據,這個插件的工作原理比較簡單,就是定時執行一個 sql,然後將 sql 執行的結果寫入到流中,增量獲取的方式沒有通過 binlog 方式同步,而是用一個遞增字段作為條件去查詢,每次都記錄當前查詢的位置,由於遞增的特性,只需要查詢比當前大的記錄即可獲取這段時間內的全部增量,一般的遞增字段有兩種,AUTO_INCREMENT
的主鍵 id
和 ON UPDATE CURRENT_TIMESTAMP
的 update_time
字段,id
字段只適用於那種只有插入沒有更新的表,update_time
update_time
字段
jdbc_driver_library
: jdbc mysql 驅動的路徑,在上一步中已經下載jdbc_driver_class
: 驅動類的名字,mysql 填com.mysql.jdbc.Driver
就好了jdbc_connection_string
: mysql 地址jdbc_user
: mysql 用戶jdbc_password
: mysql 密碼schedule
: 執行 sql 時機,類似 crontab 的調度statement
: 要執行的 sql,以 ":" 開頭是定義的變量,可以通過 parameters 來設置變量,這裏的sql_last_value
update_time
條件是>=
因為時間有可能相等,沒有等號可能會漏掉一些增量use_column_value
: 使用遞增列的值tracking_column_type
: 遞增字段的類型,numeric
表示數值類型,timestamp
表示時間戳類型tracking_column
: 遞增字段的名稱,這裏使用 update_time 這一列,這列的類型是timestamp
last_run_metadata_path
: 同步點文件,這個文件記錄了上次的同步點,重啟時會讀取這個文件,這個文件可以手動修改
input { jdbc { jdbc_driver_library => "../mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://<mysql_host>:3306/rta" jdbc_user => "<username>" jdbc_password => "<password>" schedule => "* * * * *" statement => "SELECT * FROM table WHERE update_time >= :sql_last_value" use_column_value => true tracking_column_type => "timestamp" tracking_column => "update_time" last_run_metadata_path => "syncpoint_table" } }
logstash-output-elasticsearch
hosts
: es 集群地址user
: es 用戶名password
: es 密碼index
: 導入到 es 中的 index 名,這裏我直接設置成了 mysql 表的名字document_id
: 導入到 es 中的文檔 id,這個需要設置成主鍵,否則同一條記錄更新後在 es 中會出現兩條記錄,%{id}
表示引用 mysql 表中id
字段的值
output { elasticsearch { hosts => ["172.31.22.165", "172.31.17.241", "172.31.30.84", "172.31.18.178"] user => "<user>" password => "<password>" index => "table" document_id => "%{id}" } }
運行
把上面的代碼保存到一個配置文件裏面 sync_table.cfg
,執行下面命令即可
bin/logstash -f config/sync_table.cfg
#logstash的配置文件需要自己創建,一下是我自己的配置文件
cat mysql_conf/mysql.conf
input {
stdin {
}
jdbc {
#數據庫地址
jdbc_connection_string => "jdbc:mysql://localhost:3601/user"
jdbc_user => "admin"
jdbc_password => "passwd"
#數據庫驅動路徑
jdbc_driver_library => "/soft/logstash-5.6.9/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
#sql路徑,也可以是直接指定sql語句,字段需要換成 statement =>
statement_filepath => "/soft/logstash-5.6.9/pay_test.sql"
#是否開啟記錄追蹤
record_last_run => "true"
#是否需要追蹤字段,如果為true,則需要指定tracking_column,默認是timestamp
use_column_value => "true"
#指定追蹤的字段,這裏需要註意的是,建議選擇主鍵字段,如果選擇日期需要為實時表
tracking_column => "id"
#追蹤字段的類型,目前只有數字和時間類型,默認是數字類型
#tracking_column_type => "number"
#設置時區
jdbc_default_timezone =>"Asia/Shanghai"
#是否每次清除last_run_metadata_path的內容
clean_run => "false"
#這裏可以手動設置:sql_last_value的值,默認時間是1970-01-01,默認數字是0
last_run_metadata_path => "/soft/logstash-5.6.9/logstash_jdbc_last_run"
#多久同步一次
schedule => "*/5 * * * *"
#是否分頁
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
}
}
filter {
#jdbc默認json,暫時沒找到修改方法
json {
source => "message"
remove_field => ["message"]
}
mutate { #需要移除的字段
remove_field => "@timestamp"
remove_field => "type"
remove_field => "@version"
}
}
output {
elasticsearch {
hosts => "localhost:9200" #elasticsearch地址
index => "user" #elasticsearch索引
document_id => "%{id}" #elasticsearch的id,該值需要唯一,如果不唯一就不要加這個字段,默認生成
document_type => "log" #elasticsearch的type
}
}
logstash同步mysql到elasticsearch