1. 程式人生 > >第三節 ElasticSearch資料匯入之Logstash

第三節 ElasticSearch資料匯入之Logstash

一、簡介 Logstash 是一款強大的資料處理工具,它可以實現資料傳輸,格式處理,格式化輸出,還有強大的外掛功能,常用於日誌處理、或一些具有一定格式的資料匯入到ES的處理。 工作流程
Logstash 工作的三個階段:
input 資料輸入端,可以接收來自任何地方的源資料。
  • file:從檔案中讀取
  • syslog:監聽在514埠的系統日誌資訊,並解析成RFC3164格式。
  • redis:從redis-server list 中獲取
  • beat:接收來自Filebeat的事件

Filter 資料中轉層,主要進行格式處理,資料型別轉換、資料過濾、欄位新增,修改等,常用的過濾器如下。
  • grok: 通過正則解析和結構化任何文字。Grok 目前是logstash最好的方式對非結構化日誌資料解析成結構化和可查詢化。logstash內建了120個匹配模式,滿足大部分需求。
  • mutate: 在事件欄位執行一般的轉換。可以重新命名、刪除、替換和修改事件欄位。
  • drop: 完全丟棄事件,如debug事件。
  • clone: 複製事件,可能新增或者刪除欄位。
  • geoip: 新增有關IP地址地理位置資訊。

output 是logstash工作的最後一個階段,負責將資料輸出到指定位置,相容大多數應用,常用的有:
  • elasticsearch: 傳送事件資料到 Elasticsearch,便於查詢,分析,繪圖。
  • file: 將事件資料寫入到磁碟檔案上。
  • mongodb:將事件資料傳送至高效能NoSQL mongodb,便於永久儲存,查詢,分析,大資料分片。
  • redis:將資料傳送至redis-server,常用於中間層暫時快取。
  • graphite: 傳送事件資料到graphite。http://graphite.wikidot.com/
  • statsd: 傳送事件資料到 statsd。

二、下載安裝 1.下載地址: https://www.elastic.co/products/logstash
2.window下解壓到目標資料夾即可 三、使用 1. 編寫logstash啟動的配置檔案 2.編寫對應的mapping template檔案 第一種:普通有固定分隔符的檔案 例如:2017-05-09 10:31:41,378 [INFO ] com.es.common.SystemLog.execute(SystemLog.java:44) - login|IP: 192.168.3.105|MAC: A1245C15-26C1-4263-8845-01CFCA6EC4FD|USERID: 89293|USERNM: sslvoe| log.conf input { file { type => "tradelog" #需要讀取的檔案 path => "E:/home/elk/his/trade.log*" discover_interval => 5 #從開始位置讀取 start_position => "beginning" #記錄讀取的位置 sincedb_path => "E:/home/elk/conf/sincedb_trade.txt" sincedb_write_interval => 15 #文字型別 codec => plain { charset => "GB2312" } } } filter { grok { match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:operMethod}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|USERID: %{WORD:userid}\|USERNM: %{WORD:usernm}\|" } match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:operMethod}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|USERID: %{WORD:userid}\|" } match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:operMethod}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|" } remove_field => "message" } date { match => ["[@metadata][logdate]", "YYYY-MM-dd HH:mm:ss,SSS"] } } output { if "_grokparsefailure" not in [tags] and "_dateparsefailure" not in [tags] { stdout {codec => rubydebug} elasticsearch { index => "tradelog" document_type => "t_log_type" hosts => ["127.0.0.1:9200"] manage_template => true template_overwrite => true template_name => "log4j-tradelog" template => "E:/home/elk/conf/tradelog_template.json" } } } tradelog_template.json: { "template": "log4j-tradelog*", "settings": { "index.number_of_shards": 3, "number_of_replicas": 0 }, "mappings": { "tradelog": { "_all": { "enabled": false }, "properties": { "@timestamp": { "type": "date", "format": "strict_date_optional_time||epoch_millis", "doc_values": true }, "@version": { "type": "string", "index": "not_analyzed" }, "exception": { "type": "string", "index": "analyzed" }, "path": { "type": "string", "index": "not_analyzed" }, "host": { "type": "string", "index": "not_analyzed" }, "ip": { "type": "ip", "index": "not_analyzed" }, "userid": { "type": "integer", "index": "not_analyzed" }, "mac": { "type": "string", "index": "not_analyzed" }, "usernm": { "type": "string", "index": "not_analyzed" }, "operMethod": { "type": "string", "index": "not_analyzed" }, "type": { "type": "string", "index": "not_analyzed" } } } } }
執行命令: logstash -f log.conf
第二種 從資料庫中讀取資料並插入到es中 input { jdbc { jdbc_driver_library => "E:/tools/mvn_repository/mysql/mysql-connector-java/5.1.36/mysql-connector-java-5.1.36.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://172.16.10.3:3306/b2bdev?characterEncoding=UTF-8&useSSL=false" jdbc_user => "devmanager" jdbc_password => "yyh123" statement => "SELECT MERCHANDISE_ID AS goodsid,MERCHANDISE_CD AS goodscd,ASSISTANT_CODE AS asscode,BUYER AS buyer,MERCHANDISE_NM AS goodsnm,MANUFACTURER_ID AS manuid,created_date as createddate FROM t_merchandise_info WHERE DELETE_FLAG = 0 LIMIT 0, 200" jdbc_paging_enabled => "true" jdbc_page_size => "50000" } } filter { json { source => "message" remove_field => ["message"] } } output { stdout {codec => rubydebug} elasticsearch { index => "goods300" document_id => "%{goodscd}" document_type => "goods_info" doc_as_upsert => true hosts => ["127.0.0.1:9200"] manage_template => true template_overwrite => true template_name => "goodsInfo" template => "E:/home/elk/conf/goods_template.json" } }
第三種 JSON格式的文字 input { file { path => "E:/home/elk/his/jsontest.json" start_position => "beginning" codec => json { charset => "UTF-8" } } } filter { json { source => "message" remove_field => "message" }
}
output { stdout {codec => rubydebug} #如果不是JSON則不記錄到elasticsearch if "_jsonparsefailure" not in [tags] { elasticsearch { index => "jsontest1" document_type => "jst" hosts => ["127.0.0.1:9200"] manage_template => true template_overwrite => true template_name => "jsonInfo" template => "E:/home/elk/conf/jsontest_template.json" } } }
參考網址: http://tchuairen.blog.51cto.com/3848118/1840596/ http://www.2cto.com/kf/201610/560348.html https://www.elastic.co/guide/en/logstash/current/plugins-filters-json.html https://www.elastic.co/guide/en/elasticsearch/reference/current/array.html