Logstash使用-mysql據同步至elasticsearch
阿新 • • 發佈:2020-08-28
1.下載
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.6.0.tar.gz 測試: cd /qqc_data/logstash-6.6.0/bin [root@localhost bin]# ./logstash -e 'input { stdin { } } output { stdout {} }' Sending Logstash logs to /qqc_data/logstash-6.6.0/logs which is now configured via log4j2.properties [2020-08-28T14:39:27,286][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.queue", :path=>"/qqc_data/logstash-6.6.0/data/queue"} [2020-08-28T14:39:27,297][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.dead_letter_queue", :path=>"/qqc_data/logstash-6.6.0/data/dead_letter_queue"} [2020-08-28T14:39:27,715][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified [2020-08-28T14:39:27,730][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.6.0"} [2020-08-28T14:39:27,761][INFO ][logstash.agent ] No persistent UUID file found. Generating new UUID {:uuid=>"7da33a24-8145-499d-97ce-e7abac609b77", :path=>"/qqc_data/logstash-6.6.0/data/uuid"} [2020-08-28T14:39:33,755][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>3, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50} [2020-08-28T14:39:33,863][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0xb345437 run>"} The stdin plugin is now waiting for input: [2020-08-28T14:39:33,921][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]} [2020-08-28T14:39:34,175][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600} jjjj { "@timestamp" => 2020-08-28T06:41:27.621Z, "host" => "localhost.localdomain", "@version" => "1", "message" => "jjjj" }
2.mysql 資料同步到es
2.1 依賴安裝
參考:https://www.jianshu.com/p/62433b9c5c96?utm_campaign=maleskine&utm_content=note&utm_medium=seo_notes&utm_source=recommendation 業務:mysql 資料同步到es 相關依賴安裝: cd /qqc_data/logstash-6.6.0/bin 安裝 jdbc 和 elasticsearch 外掛 ./logstash-plugin install logstash-input-jdbc ./logstash-plugin install logstash-output-elasticsearch mysql 驅動: wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.49.zip
2.2 配置檔案
1、vim test.conf input { jdbc { jdbc_driver_library => "/qqc_data/mysql-connector-java-5.1.49/mysql-connector-java-5.1.49-bin.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://{IP地址}:3306/test" jdbc_user => "****" jdbc_password => "****" schedule => "*/1 * * * *" statement => "SELECT * FROM ip_info WHERE modify_time >= :sql_last_value" use_column_value => true tracking_column_type => "timestamp" tracking_column => "modify_time" # 遞增欄位的名稱,這裡使用 update_time 這一列,這列的型別是 timestamp last_run_metadata_path => "syncpoint_table" # 同步點檔案,這個檔案記錄了上次的同步點,重啟時會讀取這個檔案,這個檔案可以手動修改 } } output { elasticsearch { hosts => ["172.30.4.129:9200"] # user => "" # password => "" # document_type => "" index => "index_one" document_id => "%{id}" # 匯入到 es 中的文件 id,這個需要設定成主鍵,否則同一條記錄更新後在 es 中會出現兩條記錄,%{id} 表示引用 mysql 表中 id 欄位的值 } stdout { # JSON格式輸出 codec => json_lines } } 2、啟動: bin/logstash -f test.conf 日誌輸出: [2020-08-28T16:10:29,001][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600} Fri Aug 28 16:11:01 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification. [2020-08-28T16:11:01,551][INFO ][logstash.inputs.jdbc ] (0.014257s) SELECT version() [2020-08-28T16:11:01,611][INFO ][logstash.inputs.jdbc ] (0.008063s) SELECT * FROM ip_info WHERE modify_time >= '2020-08-28 16:55:15' {"id":6,"create_time":"2020-08-28T08:55:06.000Z","modify_time":"2020-08-28T08:55:15.000Z","user_ip":"44.55.678.22","@version":"1","@timestamp":"2020-08-28T08:11:01.677Z","count":3} {"id":1,"create_time":"2020-08-25T02:10:03.805Z","modify_time":"2020-08-28T08:58:57.000Z","user_ip":"127.0.0.1.44","@version":"1","@timestamp":"2020-08-28T08:11:01.663Z","count":4} 3、配置檔案流程 通過modify_time欄位的變化定時執行sql
2.3 測試
1、mysql 中資料:
mysql> SELECT * FROM ip_info;
+----+--------------+-------+----------------------------+----------------------------+
| id | user_ip | count | create_time | modify_time |
+----+--------------+-------+----------------------------+----------------------------+
| 1 | 127.0.0.1.44 | 4 | 2020-08-25 10:10:03.805568 | 2020-08-28 16:58:57.000000 |
| 2 | 58.33.77.66 | 2 | 2020-08-25 10:18:47.703727 | 2020-08-25 10:18:58.812388 |
| 3 | 127.0.0.1 | 1 | 2020-08-26 01:25:53.632885 | 2020-08-26 01:25:53.632885 |
| 4 | 58.33.77.66 | 3 | 2020-08-26 08:54:52.454664 | 2020-08-26 09:46:20.155353 |
| 5 | 58.33.77.66 | 6 | 2020-08-27 07:54:49.592302 | 2020-08-27 07:56:21.768304 |
| 6 | 44.55.678.22 | 3 | 2020-08-28 16:55:06.000000 | 2020-08-28 16:55:15.000000 |
+----+--------------+-------+----------------------------+----------------------------+
6 rows in set
2、es 同步到的資料
GET http://172.30.4.129:9200/index_one/_search
{
"took": 5,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 6,
"max_score": 1.0,
"hits": [
{
"_index": "index_one",
"_type": "doc",
"_id": "5",
"_score": 1.0,
"_source": {
"@version": "1",
"user_ip": "58.33.77.66",
"@timestamp": "2020-08-28T07:54:00.393Z",
"id": 5,
"count": 6,
"create_time": "2020-08-26T23:54:49.592Z",
"modify_time": "2020-08-26T23:56:21.768Z"
}
},
{
"_index": "index_one",
"_type": "doc",
"_id": "2",
"_score": 1.0,
"_source": {
"@version": "1",
"user_ip": "58.33.77.66",
"@timestamp": "2020-08-28T07:49:01.654Z",
"id": 2,
"count": 2,
"create_time": "2020-08-25T02:18:47.703Z",
"modify_time": "2020-08-25T02:18:58.812Z"
}
},
{
"_index": "index_one",
"_type": "doc",
"_id": "4",
"_score": 1.0,
"_source": {
"@version": "1",
"user_ip": "58.33.77.66",
"@timestamp": "2020-08-28T07:49:01.656Z",
"id": 4,
"count": 3,
"create_time": "2020-08-26T00:54:52.454Z",
"modify_time": "2020-08-26T01:46:20.155Z"
}
},
{
"_index": "index_one",
"_type": "doc",
"_id": "6",
"_score": 1.0,
"_source": {
"@version": "1",
"user_ip": "44.55.678.22",
"@timestamp": "2020-08-28T08:02:00.176Z",
"id": 6,
"count": 3,
"create_time": "2020-08-28T08:55:06.000Z",
"modify_time": "2020-08-28T08:55:15.000Z"
}
},
{
"_index": "index_one",
"_type": "doc",
"_id": "1",
"_score": 1.0,
"_source": {
"@version": "1",
"user_ip": "127.0.0.1.44",
"@timestamp": "2020-08-28T08:02:00.175Z",
"id": 1,
"count": 4,
"create_time": "2020-08-25T02:10:03.805Z",
"modify_time": "2020-08-28T08:58:57.000Z"
}
},
{
"_index": "index_one",
"_type": "doc",
"_id": "3",
"_score": 1.0,
"_source": {
"@version": "1",
"user_ip": "127.0.0.1",
"@timestamp": "2020-08-28T07:49:01.655Z",
"id": 3,
"count": 1,
"create_time": "2020-08-25T17:25:53.632Z",
"modify_time": "2020-08-25T17:25:53.632Z"
}
}
]
}
}
2.4 多張表同步
[root@localhost config]# vim /qqc_data/logstash-6.6.0/config/pipelines.yml
新增多個配置檔案
- pipeline.id: ip_info
path.config: "/qqc_data/logstash-6.6.0/test.conf"
- pipeline.id: user_info
path.config: "/qqc_data/logstash-6.6.0/test_two.conf"
啟動:
[root@localhost config]# /qqc_data/logstash-6.6.0/bin/logstash