Logstash使用Jdbc input plugin定時讀取資料庫新記錄
阿新 • • 發佈:2018-12-20
MySQL表結構:
mysql> desc employee;
+-----------+-------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-----------+-------------+------+-----+---------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| last_ name | varchar(50) | NO | | NULL | |
| email | varchar(50) | NO | | NULL | |
| gender | varchar(20) | NO | | NULL | |
+-----------+-------------+------+-----+---------+----------------+
4 rows in set (0.00 sec)
MySQL預備資料:
mysql> select * from employee;
+----+-----------+------------+--------+
| id | last_name | email | gender |
+----+-----------+------------+--------+
| 1 | A1 | [email protected] | Male |
| 2 | A2 | [email protected] | Male |
| 3 | A3 | [email protected] | Female |
+----+-----------+------------+--------+
3 rows in set (0.00 sec)
Logstash配置:
input {
jdbc {
jdbc_driver_library => "drivers/mysql-connector-java-5.1.45.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/mybatis"
jdbc_user => "root"
jdbc_password => "root123.."
parameters => { "gender" => "Male" }
schedule => "* * * * *"
statement => "SELECT id,last_name,email,gender from employee where id > :sql_last_value and gender = :gender"
use_column_value => true
tracking_column => "id"
}
}
output {
stdout { codec => rubydebug }
}
啟動Logstash:
[[email protected] logstash-5.6.3]$ ./bin/logstash -f file/jdbc_input.conf
Sending Logstash's logs to /usr/local/logstash/logstash-5.6.3/logs which is now configured via log4j2.properties
[2018-04-25T15:37:19,037][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/usr/local/logstash/logstash-5.6.3/modules/fb_apache/configuration"}
[2018-04-25T15:37:19,040][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/usr/local/logstash/logstash-5.6.3/modules/netflow/configuration"}
[2018-04-25T15:37:19,862][INFO ][logstash.pipeline ] Starting pipeline {"id"=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>250}
[2018-04-25T15:37:19,961][INFO ][logstash.pipeline ] Pipeline main started
[2018-04-25T15:37:20,048][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
一分鐘後執行情況:
[2018-04-25T15:38:00,779][INFO ][logstash.inputs.jdbc ] (0.007000s) SELECT id,last_name,email,gender from employee where id > 0 and gender = 'Male'
{
"@timestamp" => 2018-04-25T07:38:00.794Z,
"gender" => "Male",
"@version" => "1",
"last_name" => "A1",
"id" => 1,
"email" => "[email protected]"
}
{
"@timestamp" => 2018-04-25T07:38:00.863Z,
"gender" => "Male",
"@version" => "1",
"last_name" => "A2",
"id" => 2,
"email" => "[email protected]"
}
MySQL插入一條資料:
mysql> insert into employee(last_name,email,gender) values('A4','[email protected]','Male');
Query OK, 1 row affected (0.02 sec)
mysql> select * from employee;
+----+-----------+------------+--------+
| id | last_name | email | gender |
+----+-----------+------------+--------+
| 1 | A1 | [email protected] | Male |
| 2 | A2 | [email protected] | Male |
| 3 | A3 | [email protected] | Female |
| 4 | A4 | [email protected] | Male |
+----+-----------+------------+--------+
4 rows in set (0.00 sec)
Logstash最新執行結果:
[2018-04-25T15:41:00,103][INFO ][logstash.inputs.jdbc ] (0.004000s) SELECT id,last_name,email,gender from employee where id > 2 and gender = 'Male'
{
"@timestamp" => 2018-04-25T07:41:00.107Z,
"gender" => "Male",
"@version" => "1",
"last_name" => "A4",
"id" => 4,
"email" => "[email protected]"
}
劃重點:
- 重點在於 use_column_value 和 tracking_column 這兩個引數,當use_column_value為true時,可以用 :sql_last_value 這個變數來獲取tracking_column對應的欄位的最新值,預設即第一次啟動時為 0 。我的示例中tracking_column對應id,即Logstash都會記錄每次查詢結果id的最大值,供下一次查詢使用。
- Logstash將tracking_column的最新值記錄到 last_run_metadata_path 引數下的 .logstash_jdbc_last_run 檔案,預設是/home/${user}/.logstash_jdbc_last_run,所以重啟後也不會從最初載入,還是從上次記錄的最新值開始查。當然,也可以設定 clean_run 引數為true,重啟後刪除上次的執行狀態,就可以從最初的資料開始讀取了。
- 文件地址:https://www.elastic.co/guide/en/logstash/6.2/plugins-inputs-jdbc.html