使用Filebeat採集日誌結合logstash過濾出特定格式的日誌至Elasticsearch
使用Filebeat採集日誌結合logstash過濾出特定格式的日誌
文章目錄
ELK搭建
使用外國大佬的開源專案,基本不要改什麼就可快速搭建一套單機版ELK用於練手。
注意:logstash已被我改造,如果以該專案構建ELK記得更改logstash.conf。
ELK專案github連結:
這裡對es不做過多描述,主要針對filebeat和logstash講解。
什麼是Filebeat
Filebeat是一個輕量級的託運人,用於轉發和集中日誌資料。Filebeat作為代理安裝在伺服器上,監視您指定的日誌檔案或位置,收集日誌事件,並將它們轉發到Elasticsearch或 Logstash進行索引。
Filebeat的工作原理:啟動Filebeat時,它會啟動一個或多個輸入,這些輸入將查詢您為日誌資料指定的位置。對於Filebeat找到的每個日誌,Filebeat啟動一個收集器。每個收集器為新內容讀取單個日誌,並將新日誌資料傳送到libbeat,libbeat聚合事件並將聚合資料傳送到您為Filebeat配置的輸出。
官方流程圖如下:
什麼是Logstash
Logstash是一個具有實時流水線功能的開源資料收集引擎。Logstash可以動態統一來自不同來源的資料,並將資料標準化為您選擇的目的地。為各種高階下游分析和視覺化用例清理和民主化所有資料。
Logstash的優勢:
- Elasticsearch的攝取主力
水平可擴充套件的資料處理管道,具有強大的Elasticsearch和Kibana協同作用 - 可插拔管道架構
混合,匹配和編排不同的輸入,過濾器和輸出,以便在管道協調中發揮作用 - 社群可擴充套件和開發人員友好的外掛生態系統
提供超過200個外掛,以及建立和貢獻自己的靈活性
採用Springboot構建一個Demo測試
接著進入正文,先講下我的需求。
1. 專案日誌生成在某個路徑,如/var/log/project,裡面有warn,info,error目錄,分別對應不同級別的日誌,我需要採集這些日誌。
2. 我需要採集特定格式的日誌,如"[2018-11-24 08:33:43,253][ERROR][http-nio-8080-exec-4][com.hh.test.logs.LogsApplication][code:200,msg:測試錄入錯誤日誌,param:{}]"
3. 篩選採集到的日誌,生成自定義欄位,如datatime,level,thread,class,msg
ELK以搭好,由elasticsearch+logstash+kibana 組成,
本文的filebeat作用是採集特定目錄下的日誌,並將其傳送出去,但是它只能採集並無法對資料進行篩選,這時候就用到logstash了,logstash擁有眾多外掛可提供過濾篩選功能,由於logstash本身是基於jdk的,所以佔用記憶體較大,而filebeat相較下,佔用的記憶體就不是很多了。有圖有真相:
所以可採用如下的方案採集篩選日誌:
- 每個專案都伴隨著一個filebeat採集日誌,你只需給他配置日誌所在目錄(可用正則匹配)即可,它會進行監視,如有新增日誌會自動將日誌採集傳送到你配置的輸出裡,一般配 置的輸出有kafka和redis、logstash、elasticsearch,這裡為了篩選格式,採用logstash進行處理。
- 配置filebeat多模組,為眾多專案配置日誌目錄路徑進行日誌採集併發送到logstash篩選過濾在轉發給elasticsearch。
filebeat.yml配置
參考配置,只列出我用到的,詳情見官方文件:
filebeat:
prospectors:
- type: log
//開啟監視,不開不採集
enable: true
paths: # 採集日誌的路徑這裡是容器內的path
- /var/log/elkTest/error/*.log
# 日誌多行合併採集
multiline.pattern: '^\['
multiline.negate: true
multiline.match: after
# 為每個專案標識,或者分組,可區分不同格式的日誌
tags: ["java-logs"]
# 這個檔案記錄日誌讀取的位置,如果容器重啟,可以從記錄的位置開始取日誌
registry_file: /usr/share/filebeat/data/registry
output:
# 輸出到logstash中,logstash更換為自己的ip
logstash:
hosts: ["logstash:5044"]
注:6.0以上該filebeat.yml需要掛載到/usr/share/filebeat/filebeat.yml,另外還需要掛載/usr/share/filebeat/data/registry 檔案,避免filebeat容器掛了後,新起的重複收集日誌。
logstash.conf配置
我用到的logstash並不是用來採集日誌的,而是對日誌進行匹配篩選,所以不要跟隨專案啟動,只需單獨啟動,暴露5044埠,能接收到filebeat傳送日誌即可,也就是說,它只是起到一個加工並轉發給elasticsearch的作用而已。
配置參考:
input {
beats {
port => 5044
}
}
filter {
if "java-logs" in [tags]{
grok {
# 篩選過濾
match => {
"message" => "(?<date>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\]\[(?<level>[A-Z]{4,5})\]\[(?<thread>[A-Za-z0-9/-]{4,40})\]\[(?<class>[A-Za-z0-9/.]{4,40})\]\[(?<msg>.*)"
}
remove_field => ["message"]
}
# 不匹配正則則刪除,匹配正則用=~
if [level] !~ "(ERROR|WARN|INFO)" {
# 刪除日誌
drop {}
}
}
}
output {
elasticsearch {
hosts => "elasticsearch:9200"
}
}
備註:
#grok 裡邊有定義好的現場的模板你可以用,但是更多的是自定義模板,規則是這樣的,小括號裡邊包含所有一個key和value,例子:(?<key>value),比如以下的資訊,第一個我定義的key是data,表示方法為:?<key> 前邊一個問號,然後用<>把key包含在裡邊去。value就是純正則了,這個我就不舉例子了。這個有個線上的除錯庫,可以供大家參考,
http://grokdebug.herokuapp.com/
上面我自定義的格式是:
{
"date": [
[
"2018-11-24 02:38:13,995"
]
],
"level": [
[
"INFO"
]
],
"thread": [
[
"http-nio-8080-exec-1"
]
],
"class": [
[
"com.hh.test.logs.LogsApplication"
]
],
"msg": [
[
"code:200,msg:測試錄入錯誤日誌,param:java.lang.ArithmeticException: / by zero]"
]
]
}
效果展示
測試專案以上傳到github
地址: https://github.com/liaozihong/ELK-CollectionLogs
參考連結:
ELK 之Filebeat 結合Logstash
Grok Debugger
ELK logstash 配置語法(24th)
Filebeat官方文件
Logstash官方文件