logstash安裝和logstash-input-jdbc外掛
安裝logstash
logstash的安裝和elasticsearch的安裝是一樣簡單的,就是那種拆箱即用的。目前最新的版本是5.5.0,注意這個版本需要和elasticsearch的版本一致。
下載安裝
wget https://artifacts.elastic.co/downloads/logstash/logstash-5.5.0.zip
- 1
下載完之後
unzip logstash-5.5.0.zip
- 1
之後將檔案移動到自己存放的目錄即可。
啟動服務測試一下是否安裝成功:
cd bin
./logstash -e 'input { stdin { } } output { stdout {} }'
- 1
- 2
如果出現下面的東西就表示成功:
輸入隨便什麼內容:
HelloWorld
- 1
就會變成下面這樣:
基本上這樣就算是安裝成功了(其實好像並沒有安轉,只是下載而已)
可以在https://www.elastic.co/downloads/past-releases先下載,後安裝。
安裝logstash-input-jdbc外掛
主要參考https://blog.csdn.net/yeyuma/article/details/50240595
logstash-input-jdbc外掛是logstash 的一個個外掛。
使用ruby語言開發。
下載外掛過程中最大的坑是下載外掛相關的依賴的時候下不動,因為國內網路的原因,訪問不到亞馬遜的伺服器。
解決辦法,改成國內的ruby倉庫映象。此映象託管於淘寶的阿里雲伺服器上
1, 如果沒有安裝 gem 的話 安裝gem
sudo yum install gem
替換淘寶
1,gem sources --add https://ruby.taobao.org/ --remove https://rubygems.org/ 2,gem sources -l *** CURRENT SOURCES *** https://ruby.taobao.org # 請確保只有 ruby.taobao.org 如果 還是顯示 https://rubygems.org/ 進入 home的 .gemrc 檔案 sudo vim ~/.gemrc 手動刪除 https://rubygems.org/
2, 修改Gemfile的資料來源地址。步驟:
1, whereis logstash # 檢視logstash安裝的位置, 我的在 /opt/logstash/ 目錄 2, sudo vi Gemfile # 修改 source 的值 為: "https://ruby.taobao.org" 3, sudo vi Gemfile.jruby-1.9.lock # 找到 remote 修改它的值為: https://ruby.taobao.org
或者直接替換源這樣你不用改你的 Gemfile 的 source。
sudo gem install bundler $ bundle config mirror.https://rubygems.org https://ruby.taobao.org
安裝logstash-input-jdbc
我一共試了三種方法,一開始都沒有成功,原因如上,映象的問題。一旦映象配置成淘寶的了,理論上隨便選擇一種安裝都可以成功,我用的是第三種。
第一種:
cd /opt/logstash/ sudo bin/plugin install logstash-input-jdbc 如果成功就成功了。
第二種:
選擇對應的版本。我的logstash版本是1.4.0,對應的外掛版本是1.0.0
2,複製 1.0.0的下載地址
cd /opt/logstash sudo wget https://github.com/logstash-plugins/logstash-input-jdbc/archive/v1.0.0.zip sudo bin/plugin install v1.0.0.zip 如果成功了就成功了
3, 遺憾的是上兩個步驟都沒成功,我是手動裝的。
參考 安裝jdbc外掛的問題, 這篇文章的最後 ,這為好心的姑娘,也建議改變gemgile裡的映象,我估計這姑娘是chinese。
$ cd /opt/logstash $ wget https://github.com/logstash-plugins/logstash-input-jdbc/archive/v1.0.0.zip $ unzip master.zip $ cd logstash-input-jdbc-master 注意,有坑: 1, 修該 裡面的 Gemfile sudo vi Gemfile 修改 source 的值 為: "https://ruby.taobao.org" 2,修改 logstash-input-jdbc.gemspec sudo vi logstash-input-jdbc.gemspec 找到 s.files = `git ls-files`.split($\) 改為: s.files = [".gitignore", "CHANGELOG.md", "Gemfile", "LICENSE", "NOTICE.TXT", "README.md", "Rakefile", "lib/logstash/inputs/jdbc.rb", "lib/logstash/plugin_mixins/jdbc.rb", "logstash-input-jdbc.gemspec", "spec/inputs/jdbc_spec.rb"] OK! 繼續..... $ gem build logstash-input-jdbc.gemspec (output: fatal: Not a git repository (or any of the parent directories): .git Successfully built RubyGem Name: logstash-input-jdbc Version: 1.0.0 File: logstash-input-jdbc-1.0.0.gem) $ mv logstash-input-jdbc-1.0.0.gem /opt/logstash/ $ cd .. $ bin/plugin install logstash-input-jdbc-1.0.0.gem 如果再不成功 我也沒招了。
三、實現樣例。
假如上面步驟都搞定了...重點來了 繼續看...沒搞定也可以接著看啦..hahahaha....實戰......
目的 : 監聽資料表的資料,當我有新增時增加到elasticsearch,當我修改時,update到elasticsearch。
第一 前提:
1, 我有mysql資料庫,我有一張hotel 表, hotel_account表(此表裡有hotel_id), 裡面無資料。
2,已經啟動 elasticsearch . 地址是 http://192.168.0.45 埠:9200.
3,已經安裝 logstash, 地址在 /opt/logstash
第二 準備
兩個檔案: jdbc.conf
jdbc.sql 。名字隨便起啦。
一個 mysql 的java 驅動包 : mysql-connector-java-5.1.36-bin.jar
jdbc.conf 內容:
注意 statement_filepath => "jdbc.sql" 這個名字要跟下面的sql檔案的名字對應上。
重點欄位說明:
schedule:設定監聽間隔。可以設定每隔多久監聽一次什麼的。具體參考官方文件。
statement_filepath: 執行的sql 檔案路徑+名稱
input { stdin { } jdbc { # mysql jdbc connection string to our backup databse jdbc_connection_string => "jdbc:mysql://192.168.0.49:3306/dfb" # the user we wish to excute our statement as jdbc_user => "test" jdbc_password => "test" # the path to our downloaded jdbc driver jdbc_driver_library => "/opt/logstash/mysql-connector-java-5.1.36-bin.jar" # the name of the driver class for mysql jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" statement_filepath => "jdbc.sql" schedule => "* * * * *" type => "jdbc" } } filter { json { source => "message" remove_field => ["message"] } } output { elasticsearch { host => "192.168.0.199" port => "9200" protocol => "http" index => "mysql01" document_id => "%{id}" cluster => "logstash-elasticsearch" } stdout { codec => json_lines } }
jdbc.sql
select h.id as id, h.hotel_name as name, h.photo_url as img, ha.id as haId, ha.finance_person from hotel h LEFT JOIN hotel_account ha on h.id = ha.hotel_id where h.last_modify_time >= :sql_last_start
第三: 啟動 logstash
sudo bin/logstash -f jdbc.conf
如果一切順利 應該如圖:
現在 logstash 已經開始監聽mysql 的表了。查詢哪些表 就在jdbc.sql 寫sql語句就行了。
現在 手動的 hotel, hotel_account 分別增加一條資料:
INSERT INTO hotel(hotel_name, photo_url) VALUES("馬二帥酒店","images/madashuai.img");
找到 hotel 的id
INSERT INTO hotel_account(hotel_id, finance_person) VALUES(15627, "馬二帥");
大約30秒的時候檢視es
很好馬二帥酒店已經增加進來了。
現在修改下 hotel_account
UPDATE hotel_account SET finance_person = "馬二帥修改為馬小帥" where id = 1601;
修改也能監聽到哦。
OK到此為止,使用logstash-input-jdbc外掛增量監聽es就介紹完咯
可用nohup ./logstash -f ./config-mysql/mysql.conf > log/log.file 2>&1 &語句,改為後臺執行使用後有點
[root@zy0001 bin]# cat config-mysql/mysql.conf
input {
stdin {
}
jdbc {
# 資料庫
jdbc_connection_string => "jdbc:mysql://192.168.2.124:3306/monitor_platform"
# 使用者名稱密碼
jdbc_user => "root"
jdbc_password => "123456"
# jar包的位置
jdbc_driver_library => "/usr/es/logstash-5.6.3/mysql-connector-java-5.1.45.jar"
# mysql的Driver
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#清除第一次同步的時間,日誌時間
#clean_run => true
statement_filepath => "/usr/es/logstash-5.6.3/bin/config-mysql/monitor.sql"
# statement => "SELECT rowkey,title,url,task_rowkey,task_name,create_time,result,site_url,handle_time from monitor_task_video_manual_history"
schedule => "* * * * *"
#索引的型別
type => "monitor_task_video_manual_history"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
elasticsearch {
hosts => "192.168.2.201:9200"
# index名
index => "monitor_platform"
# 需要關聯的資料庫中有有一個id欄位,對應索引的id號
document_id => "%{rowkey}"
}
stdout {
codec => json_lines
}
}
[root@zy0001 bin]# cat config-mysql/monitor.sql
SELECT
rowkey,
title,
url,
task_rowkey,
task_name,
create_time,
result,
site_url,
handle_time
FROM
monitor_task_video_manual_history
WHERE
create_time >= DATE_ADD(:sql_last_value,INTERVAL 7 HOUR)
UNION
SELECT
rowkey,
title,
url,
task_rowkey,
task_name,
create_time,
result,
site_url,
handle_time
FROM
monitor_task_video_manual
WHERE
create_time >= DATE_ADD(:sql_last_value,INTERVAL 7 HOUR)
create_time >= DATE_ADD(:sql_last_value,INTERVAL 7 HOUR),:sql_last_value是獲取linux中上一次執行sql的時間,可在配置中使用 clean_run => true清除此時間。並使用函式DATE_ADD新增7小時,因為此日誌時間會比當前時間少8小時,增加7小時,代表同步一小時內的資料。這樣形成增量更新!