第三節 ElasticSearch資料匯入之Logstash
阿新 • • 發佈:2018-12-13
一、簡介
Logstash 是一款強大的資料處理工具,它可以實現資料傳輸,格式處理,格式化輸出,還有強大的外掛功能,常用於日誌處理、或一些具有一定格式的資料匯入到ES的處理。
工作流程
Logstash 工作的三個階段:
input 資料輸入端,可以接收來自任何地方的源資料。
Filter 資料中轉層,主要進行格式處理,資料型別轉換、資料過濾、欄位新增,修改等,常用的過濾器如下。
output 是logstash工作的最後一個階段,負責將資料輸出到指定位置,相容大多數應用,常用的有:
二、下載安裝 1.下載地址: https://www.elastic.co/products/logstash
2.window下解壓到目標資料夾即可
三、使用
1. 編寫logstash啟動的配置檔案
2.編寫對應的mapping template檔案
第一種:普通有固定分隔符的檔案
例如:2017-05-09 10:31:41,378 [INFO ] com.es.common.SystemLog.execute(SystemLog.java:44) - login|IP: 192.168.3.105|MAC: A1245C15-26C1-4263-8845-01CFCA6EC4FD|USERID: 89293|USERNM: sslvoe|
log.conf
input {
file {
type => "tradelog"
#需要讀取的檔案
path => "E:/home/elk/his/trade.log*"
discover_interval => 5
#從開始位置讀取
start_position => "beginning"
#記錄讀取的位置
sincedb_path => "E:/home/elk/conf/sincedb_trade.txt"
sincedb_write_interval => 15
#文字型別
codec => plain { charset => "GB2312" }
}
}
filter {
grok {
match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:operMethod}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|USERID: %{WORD:userid}\|USERNM: %{WORD:usernm}\|" }
match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:operMethod}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|USERID: %{WORD:userid}\|" }
match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:operMethod}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|" }
remove_field => "message"
}
date {
match => ["[@metadata][logdate]", "YYYY-MM-dd HH:mm:ss,SSS"]
}
}
output {
if "_grokparsefailure" not in [tags] and "_dateparsefailure" not in [tags] {
stdout {codec => rubydebug}
elasticsearch {
index => "tradelog"
document_type => "t_log_type"
hosts => ["127.0.0.1:9200"]
manage_template => true
template_overwrite => true
template_name => "log4j-tradelog"
template => "E:/home/elk/conf/tradelog_template.json"
}
}
}
tradelog_template.json:
{
"template": "log4j-tradelog*",
"settings": {
"index.number_of_shards": 3,
"number_of_replicas": 0
},
"mappings": {
"tradelog": {
"_all": {
"enabled": false
},
"properties": {
"@timestamp": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis",
"doc_values": true
},
"@version": {
"type": "string",
"index": "not_analyzed"
},
"exception": {
"type": "string",
"index": "analyzed"
},
"path": {
"type": "string",
"index": "not_analyzed"
},
"host": {
"type": "string",
"index": "not_analyzed"
},
"ip": {
"type": "ip",
"index": "not_analyzed"
},
"userid": {
"type": "integer",
"index": "not_analyzed"
},
"mac": {
"type": "string",
"index": "not_analyzed"
},
"usernm": {
"type": "string",
"index": "not_analyzed"
},
"operMethod": {
"type": "string",
"index": "not_analyzed"
},
"type": {
"type": "string",
"index": "not_analyzed"
}
}
}
}
}
執行命令: logstash -f log.conf
第二種 從資料庫中讀取資料並插入到es中 input { jdbc { jdbc_driver_library => "E:/tools/mvn_repository/mysql/mysql-connector-java/5.1.36/mysql-connector-java-5.1.36.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://172.16.10.3:3306/b2bdev?characterEncoding=UTF-8&useSSL=false" jdbc_user => "devmanager" jdbc_password => "yyh123" statement => "SELECT MERCHANDISE_ID AS goodsid,MERCHANDISE_CD AS goodscd,ASSISTANT_CODE AS asscode,BUYER AS buyer,MERCHANDISE_NM AS goodsnm,MANUFACTURER_ID AS manuid,created_date as createddate FROM t_merchandise_info WHERE DELETE_FLAG = 0 LIMIT 0, 200" jdbc_paging_enabled => "true" jdbc_page_size => "50000" } } filter { json { source => "message" remove_field => ["message"] } } output { stdout {codec => rubydebug} elasticsearch { index => "goods300" document_id => "%{goodscd}" document_type => "goods_info" doc_as_upsert => true hosts => ["127.0.0.1:9200"] manage_template => true template_overwrite => true template_name => "goodsInfo" template => "E:/home/elk/conf/goods_template.json" } }
第三種 JSON格式的文字 input { file { path => "E:/home/elk/his/jsontest.json" start_position => "beginning" codec => json { charset => "UTF-8" } } } filter { json { source => "message" remove_field => "message" }
}
output { stdout {codec => rubydebug} #如果不是JSON則不記錄到elasticsearch if "_jsonparsefailure" not in [tags] { elasticsearch { index => "jsontest1" document_type => "jst" hosts => ["127.0.0.1:9200"] manage_template => true template_overwrite => true template_name => "jsonInfo" template => "E:/home/elk/conf/jsontest_template.json" } } }
參考網址: http://tchuairen.blog.51cto.com/3848118/1840596/ http://www.2cto.com/kf/201610/560348.html https://www.elastic.co/guide/en/logstash/current/plugins-filters-json.html https://www.elastic.co/guide/en/elasticsearch/reference/current/array.html
Logstash 工作的三個階段:
input 資料輸入端,可以接收來自任何地方的源資料。
- file:從檔案中讀取
- syslog:監聽在514埠的系統日誌資訊,並解析成RFC3164格式。
- redis:從redis-server list 中獲取
- beat:接收來自Filebeat的事件
Filter 資料中轉層,主要進行格式處理,資料型別轉換、資料過濾、欄位新增,修改等,常用的過濾器如下。
- grok: 通過正則解析和結構化任何文字。Grok 目前是logstash最好的方式對非結構化日誌資料解析成結構化和可查詢化。logstash內建了120個匹配模式,滿足大部分需求。
- mutate: 在事件欄位執行一般的轉換。可以重新命名、刪除、替換和修改事件欄位。
- drop: 完全丟棄事件,如debug事件。
- clone: 複製事件,可能新增或者刪除欄位。
- geoip: 新增有關IP地址地理位置資訊。
output 是logstash工作的最後一個階段,負責將資料輸出到指定位置,相容大多數應用,常用的有:
- elasticsearch: 傳送事件資料到 Elasticsearch,便於查詢,分析,繪圖。
- file: 將事件資料寫入到磁碟檔案上。
- mongodb:將事件資料傳送至高效能NoSQL mongodb,便於永久儲存,查詢,分析,大資料分片。
- redis:將資料傳送至redis-server,常用於中間層暫時快取。
- graphite: 傳送事件資料到graphite。http://graphite.wikidot.com/
- statsd: 傳送事件資料到 statsd。
二、下載安裝 1.下載地址: https://www.elastic.co/products/logstash
執行命令: logstash -f log.conf
第二種 從資料庫中讀取資料並插入到es中 input { jdbc { jdbc_driver_library => "E:/tools/mvn_repository/mysql/mysql-connector-java/5.1.36/mysql-connector-java-5.1.36.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://172.16.10.3:3306/b2bdev?characterEncoding=UTF-8&useSSL=false" jdbc_user => "devmanager" jdbc_password => "yyh123" statement => "SELECT MERCHANDISE_ID AS goodsid,MERCHANDISE_CD AS goodscd,ASSISTANT_CODE AS asscode,BUYER AS buyer,MERCHANDISE_NM AS goodsnm,MANUFACTURER_ID AS manuid,created_date as createddate FROM t_merchandise_info WHERE DELETE_FLAG = 0 LIMIT 0, 200" jdbc_paging_enabled => "true" jdbc_page_size => "50000" } } filter { json { source => "message" remove_field => ["message"] } } output { stdout {codec => rubydebug} elasticsearch { index => "goods300" document_id => "%{goodscd}" document_type => "goods_info" doc_as_upsert => true hosts => ["127.0.0.1:9200"] manage_template => true template_overwrite => true template_name => "goodsInfo" template => "E:/home/elk/conf/goods_template.json" } }
第三種 JSON格式的文字 input { file { path => "E:/home/elk/his/jsontest.json" start_position => "beginning" codec => json { charset => "UTF-8" } } } filter { json { source => "message" remove_field => "message" }
}
output { stdout {codec => rubydebug} #如果不是JSON則不記錄到elasticsearch if "_jsonparsefailure" not in [tags] { elasticsearch { index => "jsontest1" document_type => "jst" hosts => ["127.0.0.1:9200"] manage_template => true template_overwrite => true template_name => "jsonInfo" template => "E:/home/elk/conf/jsontest_template.json" } } }
參考網址: http://tchuairen.blog.51cto.com/3848118/1840596/ http://www.2cto.com/kf/201610/560348.html https://www.elastic.co/guide/en/logstash/current/plugins-filters-json.html https://www.elastic.co/guide/en/elasticsearch/reference/current/array.html