ELK 之Filebeat 結合Logstash 過濾出來你想要的日誌
阿新 • • 發佈:2018-05-01
filebeat logstash ELK 先扯點沒用的
Harvest (收割者);
libeat (匯聚層);
registry(註冊計入者);
Prospector 負責探索日誌所在地,就如礦工一樣要找礦,而Harvest如礦主一樣的收割者礦工們的勞動成果,哎,世界無處不剝削啊!每個Prospector 都有一個對應的Harvest,然後他們有一個共同的老大叫做Libbeat,這個家夥會匯總所有的東西,然後把所有的日誌傳送給指定的客戶,這其中還有個非常重要的角色”registry“,這個家夥相當於一個會計,它會記錄Harvest 都收割了些啥,收割到哪裏了,這樣一但有問題了之後,harvest就會跑到會計哪裏問:“上次老子的活幹到那塊了”?Registry 會告訴Harvest 你Y的上次幹到哪裏了,去哪裏接著幹就行了。這樣就避免了數據重復收集的問題!
收集日誌的目的是有效的利用日誌,有效利用日誌的前提是日誌經過格式化符合我們的要求,這樣才能真正的高效利用收集到elasticsearch平臺的日誌。默認的日誌到達elasticsearch 是原始格式,亂的讓人抓狂,這個時候你會發現Logstash filter的可愛之處,它很像一塊橡皮泥,如果我們手巧的話就會塑造出來讓自己舒舒服服的作品,but 如果你沒搞好的話那就另說了,本文的宗旨就是帶你一起飛,搞定這塊橡皮泥。當你搞定之後你會覺得kibana 的世界瞬間清爽了很多!
FIlebeat 的4大金剛
Filebeat 有4個非常重要的概念需要我們知道,
Prospector(礦工);
libeat (匯聚層);
registry(註冊計入者);
Prospector 負責探索日誌所在地,就如礦工一樣要找礦,而Harvest如礦主一樣的收割者礦工們的勞動成果,哎,世界無處不剝削啊!每個Prospector 都有一個對應的Harvest,然後他們有一個共同的老大叫做Libbeat,這個家夥會匯總所有的東西,然後把所有的日誌傳送給指定的客戶,這其中還有個非常重要的角色”registry“,這個家夥相當於一個會計,它會記錄Harvest 都收割了些啥,收割到哪裏了,這樣一但有問題了之後,harvest就會跑到會計哪裏問:“上次老子的活幹到那塊了”?Registry 會告訴Harvest 你Y的上次幹到哪裏了,去哪裏接著幹就行了。這樣就避免了數據重復收集的問題!
FIlebeat 詳細配置:
filebeat.prospectors: - input_type: log enabled: True paths: - /var/log/mysql-slow-* #這個地方是關鍵,我們給上邊日誌加上了tags,方便在logstash裏邊通過這個tags 過濾並格式化自己想要的內容; tags: ["mysql_slow_logs"] #有的時候日誌不是一行輸出的,如果不用multiline的話,會導致一條日誌被分割成多條收集過來,形成不完整日誌,這樣的日誌對於我們來說是沒有用的!通過正則匹配語句開頭,這樣multiline 會在匹配開頭 之後,一直到下一個這樣開通的語句合並成一條語句。 #pattern:多行日誌開始的那一行匹配的pattern #negate:是否需要對pattern條件轉置使用,不翻轉設為true,反轉設置為false #match:匹配pattern後,與前面(before)還是後面(after)的內容合並為一條日誌 #max_lines:合並的最多行數(包含匹配pattern的那一行 默認值是500行 #timeout:到了timeout之後,即使沒有匹配一個新的pattern(發生一個新的事件),也把已經匹配的日誌事件發送出去 multiline.pattern: ‘^\d{4}/\d{2}/\d{2}‘ (2018\05\01 我的匹配是已這樣的日期開頭的) multiline.negate: true multiline.match: after multiline.Max_lines:20 multiline.timeout: 10s - input_type: log paths: - /var/log/mysql-sql-* tags: ["mysql_sql_logs"] multiline.pattern: ‘^\d{4}/\d{2}/\d{2}‘ multiline.negate: true multiline.match: after multiline.timeout: 10s encoding: utf-8 document_type: mysql-proxy scan_frequency: 20s harverster_buffer_size: 16384 max_bytes: 10485760 tail_files: true #tail_files:如果設置為true,Filebeat從文件尾開始監控文件新增內容,把新增的每一行文件作為一個事件依次發送,而不是從文件開始處重新發送所有內容。默認是false;
Logstash 的詳細配置
input {
kafka {
topics_pattern => "mysql.*"
bootstrap_servers => "x.x.x.x:9092"
#auto_offset_rest => "latest"
codec => json
group_id => "logstash-g1"
}
}
#終於到了關鍵的地方了,logstash的filter ,使用filter 過濾出來我們想要的日誌,
filter {
#if 還可以使用or 或者and 作為條件語句,舉個栗子: if “a” or “b” or “c” in [tags],這樣就可以過濾多個tags 的標簽了,我們這個主要用在同型號的交換設備的日誌正規化,比如說你有5臺交換機,把日誌指定到了同一個syslog-ng 上,收集日誌的時候只能通過同一個filebeat,多個prospector加不同的tags。這個時候過濾就可以通過判斷相應的tags來完成了。
if "mysql_slow_logs" in [tags]{
grok {
#grok 裏邊有定義好的現場的模板你可以用,但是更多的是自定義模板,規則是這樣的,小括號裏邊包含所有一個key和value,例子:(?<key>value),比如以下的信息,第一個我定義的key是data,表示方法為:?<key> 前邊一個問號,然後用<>把key包含在裏邊去。value就是純正則了,這個我就不舉例子了。這個有個在線的調試庫,可以供大家參考,http://grokdebug.herokuapp.com/
match => { "message" => "(?<date>\d{4}/\d{2}/\d{2}\s(?<datetime>%{TIME}))\s-\s(?<status>\w{2})\s-\s(?<respond_time>\d+)\.\d+\w{2}\s-\s%{IP:client}:(?<client-port>\d+)\[\d+\]->%{IP:server}:(?<server-port>\d+).*:(?<databases><\w+>):(?<SQL>.*)"}
#過濾完成之後把之前的message 移出掉,節約磁盤空間。
remove_field => ["message"]
}
}
else if "mysql_sql_logs" in [tags]{
grok {
match => { "message" => "(?<date>\d{4}/\d{2}/\d{2}\s(?<datetime>%{TIME}))\s-\s(?<status>\w{2})\s-\s(?<respond_time>\d+\.\d+)\w{2}\s-\s%{IP:client}:(?<client-port>\d+)\[\d+\]->%{IP:server}:(?<server-port>\d+).*:(?<databases><\w+>):(?<SQL>.*)"}
remove_field => ["message"]}
}
}
我要過濾的日誌樣本:
2018/05/01 16:16:01.892 - OK - 759.2ms - 172.29.1.7:35184[485388]->172.7.1.39:3306[1525162561129639717]:<DB>:select count(*) from test[];
過濾後的結果如下:
{
"date": [
[
"2018/05/01 16:16:01.892"
]
],
"datetime": [
[
"16:16:01.892"
]
],
"TIME": [
[
"16:16:01.892"
]
],
"HOUR": [
[
"16"
]
],
"MINUTE": [
[
"16"
]
],
"SECOND": [
[
"01.892"
]
],
"status": [
[
"OK"
]
],
"respond_time": [
[
"759"
]
],
"client": [
[
"172.29.1.7"
]
],
"IPV6": [
[
null,
null
]
],
"IPV4": [
[
"172.29.1.7",
"172.7.1.39"
]
],
"client-port": [
[
"35184"
]
],
"server": [
[
"172.7.1.39"
]
],
"server-port": [
[
"3306"
]
],
"databases": [
[
"<DB>"
]
],
"SQL": [
[
"select count(*) from test[];"
]
]
}
#有圖有真相:
ELK 之Filebeat 結合Logstash 過濾出來你想要的日誌