1. 程式人生 > >使用Filebeat採集日誌結合logstash過濾出特定格式的日誌至Elasticsearch

使用Filebeat採集日誌結合logstash過濾出特定格式的日誌至Elasticsearch

使用Filebeat採集日誌結合logstash過濾出特定格式的日誌

文章目錄

ELK搭建

使用外國大佬的開源專案,基本不要改什麼就可快速搭建一套單機版ELK用於練手。
注意:logstash已被我改造,如果以該專案構建ELK記得更改logstash.conf。
ELK專案github連結:

https://github.com/deviantony/docker-elk

這裡對es不做過多描述,主要針對filebeat和logstash講解。

什麼是Filebeat

Filebeat是一個輕量級的託運人,用於轉發和集中日誌資料。Filebeat作為代理安裝在伺服器上,監視您指定的日誌檔案或位置,收集日誌事件,並將它們轉發到Elasticsearch或 Logstash進行索引。
Filebeat的工作原理:啟動Filebeat時,它會啟動一個或多個輸入,這些輸入將查詢您為日誌資料指定的位置。對於Filebeat找到的每個日誌,Filebeat啟動一個收集器。每個收集器為新內容讀取單個日誌,並將新日誌資料傳送到libbeat,libbeat聚合事件並將聚合資料傳送到您為Filebeat配置的輸出。
官方流程圖如下:

什麼是Logstash

Logstash是一個具有實時流水線功能的開源資料收集引擎。Logstash可以動態統一來自不同來源的資料,並將資料標準化為您選擇的目的地。為各種高階下游分析和視覺化用例清理和民主化所有資料。
Logstash的優勢:

  • Elasticsearch的攝取主力
    水平可擴充套件的資料處理管道,具有強大的Elasticsearch和Kibana協同作用
  • 可插拔管道架構
    混合,匹配和編排不同的輸入,過濾器和輸出,以便在管道協調中發揮作用
  • 社群可擴充套件和開發人員友好的外掛生態系統
    提供超過200個外掛,以及建立和貢獻自己的靈活性

採用Springboot構建一個Demo測試

接著進入正文,先講下我的需求。

1. 專案日誌生成在某個路徑,如/var/log/project,裡面有warn,info,error目錄,分別對應不同級別的日誌,我需要採集這些日誌。
2. 我需要採集特定格式的日誌,如"[2018-11-24 08:33:43,253][ERROR][http-nio-8080-exec-4][com.hh.test.logs.LogsApplication][code:200,msg:測試錄入錯誤日誌,param:{}]"
3. 篩選採集到的日誌,生成自定義欄位,如datatime,level,thread,class,msg

ELK以搭好,由elasticsearch+logstash+kibana 組成,

本文的filebeat作用是採集特定目錄下的日誌,並將其傳送出去,但是它只能採集並無法對資料進行篩選,這時候就用到logstash了,logstash擁有眾多外掛可提供過濾篩選功能,由於logstash本身是基於jdk的,所以佔用記憶體較大,而filebeat相較下,佔用的記憶體就不是很多了。有圖有真相:

所以可採用如下的方案採集篩選日誌:

  1. 每個專案都伴隨著一個filebeat採集日誌,你只需給他配置日誌所在目錄(可用正則匹配)即可,它會進行監視,如有新增日誌會自動將日誌採集傳送到你配置的輸出裡,一般配 置的輸出有kafka和redis、logstash、elasticsearch,這裡為了篩選格式,採用logstash進行處理。
  2. 配置filebeat多模組,為眾多專案配置日誌目錄路徑進行日誌採集併發送到logstash篩選過濾在轉發給elasticsearch。

filebeat.yml配置

參考配置,只列出我用到的,詳情見官方文件:

filebeat:
  prospectors:
  - type: log
    //開啟監視,不開不採集
    enable: true
    paths:  # 採集日誌的路徑這裡是容器內的path
    - /var/log/elkTest/error/*.log
    # 日誌多行合併採集
    multiline.pattern: '^\['
    multiline.negate: true
    multiline.match: after
    # 為每個專案標識,或者分組,可區分不同格式的日誌
    tags: ["java-logs"]
    # 這個檔案記錄日誌讀取的位置,如果容器重啟,可以從記錄的位置開始取日誌
    registry_file: /usr/share/filebeat/data/registry

output:
  # 輸出到logstash中,logstash更換為自己的ip
  logstash:
    hosts: ["logstash:5044"]

注:6.0以上該filebeat.yml需要掛載到/usr/share/filebeat/filebeat.yml,另外還需要掛載/usr/share/filebeat/data/registry 檔案,避免filebeat容器掛了後,新起的重複收集日誌。

logstash.conf配置

我用到的logstash並不是用來採集日誌的,而是對日誌進行匹配篩選,所以不要跟隨專案啟動,只需單獨啟動,暴露5044埠,能接收到filebeat傳送日誌即可,也就是說,它只是起到一個加工並轉發給elasticsearch的作用而已。
配置參考:

input {
	beats {
	    port => 5044
	}
}
filter {
   if "java-logs" in [tags]{ 
     grok {
        # 篩選過濾
        match => {
	       "message" => "(?<date>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\]\[(?<level>[A-Z]{4,5})\]\[(?<thread>[A-Za-z0-9/-]{4,40})\]\[(?<class>[A-Za-z0-9/.]{4,40})\]\[(?<msg>.*)"
        }
       remove_field => ["message"]
     }
     # 不匹配正則則刪除,匹配正則用=~
 	 if [level] !~ "(ERROR|WARN|INFO)" {
         # 刪除日誌
         drop {}
     }
    }
}

output {
	elasticsearch {
		hosts => "elasticsearch:9200"
	}
}

備註:

#grok 裡邊有定義好的現場的模板你可以用,但是更多的是自定義模板,規則是這樣的,小括號裡邊包含所有一個key和value,例子:(?<key>value),比如以下的資訊,第一個我定義的key是data,表示方法為:?<key> 前邊一個問號,然後用<>把key包含在裡邊去。value就是純正則了,這個我就不舉例子了。這個有個線上的除錯庫,可以供大家參考,
http://grokdebug.herokuapp.com/ 

上面我自定義的格式是:

{
  "date": [
    [
      "2018-11-24 02:38:13,995"
    ]
  ],
  "level": [
    [
      "INFO"
    ]
  ],
  "thread": [
    [
      "http-nio-8080-exec-1"
    ]
  ],
  "class": [
    [
      "com.hh.test.logs.LogsApplication"
    ]
  ],
  "msg": [
    [
      "code:200,msg:測試錄入錯誤日誌,param:java.lang.ArithmeticException: / by zero]"
    ]
  ]
}

效果展示

測試專案以上傳到github
地址: https://github.com/liaozihong/ELK-CollectionLogs

參考連結:
ELK 之Filebeat 結合Logstash
Grok Debugger
ELK logstash 配置語法(24th)
Filebeat官方文件
Logstash官方文件