從零編寫日誌分析系統之logstash
概述
logstash是負責採集和解析日誌的,將日誌解析成需要的格式儲存在elasticsearch或者其他地方。logstash提供了很多非常強大的外掛,這些外掛可以有效的把日誌資訊轉換成需要的格式。
一:安裝
二:配置
logstash提供了非常多的外掛,使用這些外掛需要配置啟動檔案。啟動配置檔案的編寫對於logstash至關重要,可以說logstash就是依靠強大的配置檔案來工作的。
logstash的配置檔案主要分為三個部分input,filter和output。
intput負責從各個地方獲取日誌資訊,filter負責解析和轉換日誌資訊為需要的格式,譬如json,output負責將解析後的日誌資訊傳送到指定的地方。
輸入外掛(input)配置
輸入外掛根據現在的使用場景主要從filebeat或者kafka接受日誌資訊。
從filebeat接受訊息
input {
beats {
port => 5044
}
}
從kafka接受訊息
input {
kafka {
zk_connect => "localhost:2181" #zookeeper地址
topic_id => "nginx-access-log" #kafka中topic名稱,記得建立該topic
group_id => "nginx-access-log" #預設為“logstash”
codec => "plain" #與Shipper端output配置項一致
consumer_threads => 1 #消費的執行緒數
decorate_events => true #在輸出訊息的時候回輸出自身的資訊,包括:消費訊息的大小、topic來源以及consumer的group資訊。
type => "nginx-access-log"
}
}
過濾器外掛(Filter)配置
豐富的過濾器外掛的存在是 logstash 威力如此強大的重要因素。名為過濾器,其實提供的不單單是過濾的功能。它擴充套件了進入過濾器的原始資料,進行復雜的邏輯處理,甚至可以無中生有的新增新的 logstash
事件到後續的流程中去!
Grok 正則捕獲
Grok 正則捕獲是logstash最核心的功能,它的作用是將輸入的字串解析成需要的資料格式。logstash提供了一些預設的正則表示式,也可以自己定義正則表示式。
正則表示式檢測網站: http://grokdebug.herokuapp.com/
這個網站可以檢測編寫的正則表示式是否正確,也提供了預設的正則表示式檢視。
gork外掛用法
filter {
grok {
match => {
"message" => "\s+(?<request_time>\d+(?:\.\d+)?)\s+"
}
}
}
使用預設正則表示式解析nginx日誌
首先要定義nginx日誌格式
log_format custom '[$time_local] $server_addr $remote_addr $body_bytes_sent $request_time $upstream_response_time ''$upstream_addr $upstream_status "$request_uri" "$http_x_forwarded_for" "$http_referer" "$http_user_agent" $status';
然後編寫相應的gork日誌解析檔案
NGINXACCESS \[%{HTTPDATE:logtime}\] %{IPORHOST:host} %{IPORHOST:remoteaddr} (?:%{NUMBER:size}|-) %{NUMBER:responsetime} (?:%{NUMBER:upstreamtime}|-) %{URIHOST:upstreamhost} %{BASE10NUM:upstreamstatus} %{QS:url} %{QS:clientip} %{QS:referrer} %{QS:agent} %{INT:status}
在logstash中引用日誌檔案所在的資料夾就可以使用了
grok {
patterns_dir => "/data1/app/logstash-5.5.2/pattern"
match => {
"message" => "%{NGINXACCESS}"
}
}
使用自定義增則表示式解析Java日誌
很多時候收集各個業務系統日誌的時候會定義一套根據業務產生的日誌。這時候就需要自定義的增則表示式了
配置Java中logback或者log4j輸出日誌格式:
logging.pattern.file=%d{yyyy-MM-dd HH\:mm\:ss} %p [%c]\: %m%n
編寫自定義的日誌解析檔案
DATE ([1-9]\d{3}-(0[1-9]|1[0-2])-(0[1-9]|[1-2][0-9]|3[0-1])\s+(20|21|22|23|[0-1]\d):[0-5]\d:[0-5]\d)
JAVALOG %{DATE:time} %{WORD:level} \[%{JAVACLASS:class}\]: %{GREEDYDATA:message}
在logstash中配置
grok {
patterns_dir => "/data1/app/logstash-5.5.2/pattern"
match => {
"message" => "%{JAVALOG}"
}
}
時間處理(Date)外掛
很多時候日誌的日誌logstash並不能識別,這時候filters/date 外掛可以用來轉換你的日誌記錄中的時間字串,變成 LogStash::Timestamp 物件,然後轉存到 @timestamp 欄位裡。
date {
match => ["logtime", "dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
}
資料修改(Mutate)
filters/mutate 外掛是 Logstash 另一個重要外掛。它提供了豐富的基礎型別資料處理能力。包括型別轉換,字串處理和欄位處理等。可以設定的轉換型別包括:”integer”,”float” 和 “string”
轉換nginx日誌格式
mutate {
convert => {
"responsetime" => "float"
"upstreamtime" => "float"
"size" => "integer"
}
}
輸出(output)外掛
輸出外掛會將解析後的日誌輸出到elasticsearch或者其他什麼地方。這時候就需要定義一些配置來定義輸出條件。
document_id=>” ” 為索引提供document id ,對重寫elasticsearch中相同id詞目很有用
document_type=>” ”事件要被寫入的document type,一般要將相似事件寫入同一type,可用%{}引用事件type,預設type=log
index=>”logstash-%{+YYYY,MM.dd}” 事件要被寫進的索引,可是動態的用%{foo}語句
hosts=>[“127.0.0.0”] [“127.0.0.1:9200”,”127.0.0.2:9200”] [“https://127.0.0.1:9200“][“https://127.0.0.1:9200/mypath“]
manage_template=>true 一個預設的es mapping 模板將啟用(除非設定為false 用自己的template)
template=>”” 有效的filepath 設定自己的template檔案路徑,不設定就用已有的
template_name=>”logstash” 在es內部模板的名字。
配置格式
output {
elasticsearch {
hosts => "xxx:9200"
manage_template => true
index => "%{[fields][logIndex]}-%{+YYYY.MM.dd}"
document_type => "%{[fields][docType]}"
template_overwrite=> true
}
if [fields][logIndex] == "tomcat" {
kafka {
codec => json
bootstrap_servers => "xxx:9092"
topic_id => "tomcat-log"
codec => plain {
format => "%{message}"
}
}
}
}
其中模板的配置十分重要,日誌解析成固定的格式後還需要elasticsearch的模板配合,將該格式儲存為指定的資料型別。譬如將需要分詞的資料儲存為text型別,需要固定查詢的資料儲存為key型別。
編碼外掛(Codec)
在此之前,logstash 只支援純文字形式輸入,然後以過濾器處理它。但現在,我們可以在輸入輸出期處理不同型別的資料,這全是因為有了 codec 設定。
codec 的引入,使得 logstash 可以更好更方便的與其他有自定義資料格式的運維產品共存,比如graphite、fluent、netflow、collectd,以及使用 msgpack、json、edn 等通用資料格式的其他產品等。
上面傳送給kafka的日誌資料,我舊使用了codec外掛將日誌資訊轉換為了json資料格式,不然kafka接收到的訊息就只是一個訊息名稱。
codec => json