1. 程式人生 > 實用技巧 >Logstash使用-mysql據同步至elasticsearch

Logstash使用-mysql據同步至elasticsearch

1.下載

wget https://artifacts.elastic.co/downloads/logstash/logstash-6.6.0.tar.gz

測試:
cd /qqc_data/logstash-6.6.0/bin
[root@localhost bin]# ./logstash -e 'input { stdin { } } output { stdout {} }'
Sending Logstash logs to /qqc_data/logstash-6.6.0/logs which is now configured via log4j2.properties
[2020-08-28T14:39:27,286][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.queue", :path=>"/qqc_data/logstash-6.6.0/data/queue"}
[2020-08-28T14:39:27,297][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.dead_letter_queue", :path=>"/qqc_data/logstash-6.6.0/data/dead_letter_queue"}
[2020-08-28T14:39:27,715][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2020-08-28T14:39:27,730][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.6.0"}
[2020-08-28T14:39:27,761][INFO ][logstash.agent           ] No persistent UUID file found. Generating new UUID {:uuid=>"7da33a24-8145-499d-97ce-e7abac609b77", :path=>"/qqc_data/logstash-6.6.0/data/uuid"}
[2020-08-28T14:39:33,755][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>3, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2020-08-28T14:39:33,863][INFO ][logstash.pipeline        ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0xb345437 run>"}
The stdin plugin is now waiting for input:
[2020-08-28T14:39:33,921][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2020-08-28T14:39:34,175][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
jjjj
{
    "@timestamp" => 2020-08-28T06:41:27.621Z,
          "host" => "localhost.localdomain",
      "@version" => "1",
       "message" => "jjjj"
}

2.mysql 資料同步到es

2.1 依賴安裝
參考:https://www.jianshu.com/p/62433b9c5c96?utm_campaign=maleskine&utm_content=note&utm_medium=seo_notes&utm_source=recommendation

業務:mysql 資料同步到es

相關依賴安裝:
cd /qqc_data/logstash-6.6.0/bin

安裝 jdbc 和 elasticsearch 外掛
./logstash-plugin install logstash-input-jdbc
./logstash-plugin install logstash-output-elasticsearch

mysql 驅動:
wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.49.zip

2.2 配置檔案
1、vim test.conf
input {
 jdbc {
   jdbc_driver_library => "/qqc_data/mysql-connector-java-5.1.49/mysql-connector-java-5.1.49-bin.jar"
   jdbc_driver_class => "com.mysql.jdbc.Driver"
   jdbc_connection_string => "jdbc:mysql://{IP地址}:3306/test"
   jdbc_user => "****"
   jdbc_password => "****"
   schedule => "*/1 * * * *"
   statement => "SELECT * FROM ip_info WHERE modify_time >= :sql_last_value"
   use_column_value => true
   tracking_column_type => "timestamp"
   tracking_column => "modify_time" # 遞增欄位的名稱,這裡使用 update_time 這一列,這列的型別是 timestamp
   last_run_metadata_path => "syncpoint_table"  # 同步點檔案,這個檔案記錄了上次的同步點,重啟時會讀取這個檔案,這個檔案可以手動修改
 }
}

output {
 elasticsearch {
   hosts => ["172.30.4.129:9200"]
   # user => ""
   # password => ""
   # document_type => ""
   index => "index_one"
   document_id => "%{id}"  # 匯入到 es 中的文件 id,這個需要設定成主鍵,否則同一條記錄更新後在 es 中會出現兩條記錄,%{id} 表示引用 mysql 表中 id 欄位的值
 }
 stdout {
        # JSON格式輸出
        codec => json_lines
    }
}

2、啟動:
bin/logstash -f test.conf
日誌輸出:
[2020-08-28T16:10:29,001][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
Fri Aug 28 16:11:01 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
[2020-08-28T16:11:01,551][INFO ][logstash.inputs.jdbc     ] (0.014257s) SELECT version()
[2020-08-28T16:11:01,611][INFO ][logstash.inputs.jdbc     ] (0.008063s) SELECT * FROM ip_info WHERE modify_time >= '2020-08-28 16:55:15'
{"id":6,"create_time":"2020-08-28T08:55:06.000Z","modify_time":"2020-08-28T08:55:15.000Z","user_ip":"44.55.678.22","@version":"1","@timestamp":"2020-08-28T08:11:01.677Z","count":3}
{"id":1,"create_time":"2020-08-25T02:10:03.805Z","modify_time":"2020-08-28T08:58:57.000Z","user_ip":"127.0.0.1.44","@version":"1","@timestamp":"2020-08-28T08:11:01.663Z","count":4}

3、配置檔案流程
通過modify_time欄位的變化定時執行sql
2.3 測試
1、mysql 中資料:
mysql> SELECT * FROM ip_info;
+----+--------------+-------+----------------------------+----------------------------+
| id | user_ip      | count | create_time                | modify_time                |
+----+--------------+-------+----------------------------+----------------------------+
|  1 | 127.0.0.1.44 |     4 | 2020-08-25 10:10:03.805568 | 2020-08-28 16:58:57.000000 |
|  2 | 58.33.77.66  |     2 | 2020-08-25 10:18:47.703727 | 2020-08-25 10:18:58.812388 |
|  3 | 127.0.0.1    |     1 | 2020-08-26 01:25:53.632885 | 2020-08-26 01:25:53.632885 |
|  4 | 58.33.77.66  |     3 | 2020-08-26 08:54:52.454664 | 2020-08-26 09:46:20.155353 |
|  5 | 58.33.77.66  |     6 | 2020-08-27 07:54:49.592302 | 2020-08-27 07:56:21.768304 |
|  6 | 44.55.678.22 |     3 | 2020-08-28 16:55:06.000000 | 2020-08-28 16:55:15.000000 |
+----+--------------+-------+----------------------------+----------------------------+
6 rows in set

2、es 同步到的資料
GET http://172.30.4.129:9200/index_one/_search
{
    "took": 5,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": 6,
        "max_score": 1.0,
        "hits": [
            {
                "_index": "index_one",
                "_type": "doc",
                "_id": "5",
                "_score": 1.0,
                "_source": {
                    "@version": "1",
                    "user_ip": "58.33.77.66",
                    "@timestamp": "2020-08-28T07:54:00.393Z",
                    "id": 5,
                    "count": 6,
                    "create_time": "2020-08-26T23:54:49.592Z",
                    "modify_time": "2020-08-26T23:56:21.768Z"
                }
            },
            {
                "_index": "index_one",
                "_type": "doc",
                "_id": "2",
                "_score": 1.0,
                "_source": {
                    "@version": "1",
                    "user_ip": "58.33.77.66",
                    "@timestamp": "2020-08-28T07:49:01.654Z",
                    "id": 2,
                    "count": 2,
                    "create_time": "2020-08-25T02:18:47.703Z",
                    "modify_time": "2020-08-25T02:18:58.812Z"
                }
            },
            {
                "_index": "index_one",
                "_type": "doc",
                "_id": "4",
                "_score": 1.0,
                "_source": {
                    "@version": "1",
                    "user_ip": "58.33.77.66",
                    "@timestamp": "2020-08-28T07:49:01.656Z",
                    "id": 4,
                    "count": 3,
                    "create_time": "2020-08-26T00:54:52.454Z",
                    "modify_time": "2020-08-26T01:46:20.155Z"
                }
            },
            {
                "_index": "index_one",
                "_type": "doc",
                "_id": "6",
                "_score": 1.0,
                "_source": {
                    "@version": "1",
                    "user_ip": "44.55.678.22",
                    "@timestamp": "2020-08-28T08:02:00.176Z",
                    "id": 6,
                    "count": 3,
                    "create_time": "2020-08-28T08:55:06.000Z",
                    "modify_time": "2020-08-28T08:55:15.000Z"
                }
            },
            {
                "_index": "index_one",
                "_type": "doc",
                "_id": "1",
                "_score": 1.0,
                "_source": {
                    "@version": "1",
                    "user_ip": "127.0.0.1.44",
                    "@timestamp": "2020-08-28T08:02:00.175Z",
                    "id": 1,
                    "count": 4,
                    "create_time": "2020-08-25T02:10:03.805Z",
                    "modify_time": "2020-08-28T08:58:57.000Z"
                }
            },
            {
                "_index": "index_one",
                "_type": "doc",
                "_id": "3",
                "_score": 1.0,
                "_source": {
                    "@version": "1",
                    "user_ip": "127.0.0.1",
                    "@timestamp": "2020-08-28T07:49:01.655Z",
                    "id": 3,
                    "count": 1,
                    "create_time": "2020-08-25T17:25:53.632Z",
                    "modify_time": "2020-08-25T17:25:53.632Z"
                }
            }
        ]
    }
}

2.4 多張表同步
[root@localhost config]# vim /qqc_data/logstash-6.6.0/config/pipelines.yml

新增多個配置檔案
- pipeline.id: ip_info
  path.config: "/qqc_data/logstash-6.6.0/test.conf"

- pipeline.id: user_info
  path.config: "/qqc_data/logstash-6.6.0/test_two.conf"
  
 啟動:
 [root@localhost config]# /qqc_data/logstash-6.6.0/bin/logstash