[轉] ELK 之 Logstash
【From】 https://blog.csdn.net/iguyue/article/details/77006201
ELK 之 Logstash
簡介:
ELK 之 LogstashLogstash 是一個接收,處理,轉發日誌的工具。支援系統日誌,webserver 日誌,錯誤日誌,應用日誌,總之包括所有可以丟擲來的日誌型別。在一個典型的使用場景下(ELK):用 Elasticsearch 作為後臺資料的儲存,kibana用來前端的報表展示。Logstash 在其過程中擔任搬運工的角色,它為資料儲存,報表查詢和日誌解析建立了一個功能強大的管道鏈。Logstash 提供了多種多樣的 input,filters,codecs 和 output 元件,讓使用者輕鬆實現強大的功能。
安裝:
(需要 jdk 環境,安裝過程這裡不再闡述,筆者此處使用 jdk 1.8)
這裡使用 2.4.1 版本,是為了和公司 elasticsearch2.x 配合,版本自行控制。
注意: ELK 技術棧有 version check,軟體大版本號需要一致
yum -y install https://download.elastic.co/logstash/logstash/packages/centos/logstash-2.4.1.noarch.rpm
安裝完成後會生成兩個主要目錄和一個配置檔案
程式主體目錄:/opt/logstash
log 分析配置檔案目錄:/etc/logstash/conf.d
程式執行配置檔案:/etc/sysconfig/logstash
先測試是否安裝成功
[[email protected]~]#/opt/logstash/bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'
Settings: Default pipeline workers: 4
Pipeline main started
hello world! # 輸入測試字串
{
"message" => "hello world!", # 成功輸出
"@version" => "1",
"@timestamp" => "2017-08-07T07:47:35.938Z",
"host" => "iZbp13lsytivlvvks4ulatZ"
}
如何執行按指定配置檔案執行
/opt/logstash/bin/logstash –w 2 -f /etc/logstash/conf.d/test.conf
引數
-w # 指定執行緒,預設是 cpu 核數
-f # 指定配置檔案
-t # 測試配置檔案是否正常
-b # 執行 filter 模組之前最大能積累的日誌,數值越大效能越好,同時越佔內
存
配置檔案寫法:
# 日誌匯入
input {
}
# 日誌篩選匹配處理
filter {
}
# 日誌匹配輸出
output {
}
日誌解析配置檔案的框架共分為三個模組,input,output,filter。後面會一一講解, 每個模組裡面存在不同的外掛。
input 模組
列子1
input {
# file為常用檔案外掛,外掛內選項很多,可根據需求自行判斷
file {
path => "/var/lib/mysql/slow.log"
# 要匯入的檔案的位置,可以使用*,例如/var/log/nginx/*.log
Excude =>”*.gz”
# 要排除的檔案
start_position => "beginning"
# 從檔案開始的位置開始讀,end表示從結尾開始讀
ignore_older => 0
# 多久之內沒修改過的檔案不讀取,0為無限制,單位為秒
sincedb_path => "/dev/null"
# 記錄檔案上次讀取位置,輸出到null表示每次都從檔案首行開始解析
type => "mysql-slow"
# type欄位,可表明匯入的日誌型別
}
}
例子2
input {
# redis外掛為常用外掛,外掛內選項很多,可根據需求自行判斷
redis {
batch_count => 1
# EVAL命令返回的事件數目,設定為5表示一次請求返回5條日誌資訊
data_type => "list"
# logstash redis外掛工作方式
key => "logstash-test-list"
# 監聽的鍵值
host => "127.0.0.1"
# redis地址
port => 6379
# redis埠號
password => "123qwe"
# 如果有安全認證,此項為認證密碼
db => 0
# 如果應用使用了不同的資料庫,此為redis資料庫的編號,預設為0。
threads => 1
# 啟用執行緒數量
}
}
常用的 input 外掛其實有很多,這裡只舉例了兩種。其他還有 kafka,tcp 等等
filter 模組
例子
filter { # 外掛很多,這裡選取我使用過的外掛做講述
if ([message] =~ "正則表示式") { drop {} }
# 正則匹配=~,!~,包含判斷in,not in ,字串匹配==,!=,等等,匹配之後可以做任何操作,這裡過濾掉匹配行,除了做過濾操作,if後面可以作任意操作,甚至可以為匹配到的任意行做單獨的正則分割操作
multiline {
pattern => "正則表示式"
negate => true
what => "previous"
# 多行合併,由於一些日誌存在一條多行的情況,這個模組可以進行指定多行合併,通過正則匹配,匹配到的內容上面的多行合併為一條日誌。
}
grok {
match => { "message" => "正則表示式"
# 正則匹配日誌,可以篩選分割出需要記錄的欄位和值
}
remove_field => ["message"]
# 刪除不需要記錄的欄位
}
date {
match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]
# 記錄@timestamp時間,可以設定日誌中自定的時間欄位,如果日誌中沒有時間欄位,也可以自己生成
target=>“@timestamp”
# 將匹配的timestamp欄位放在指定的欄位 預設是@timestamp
}
ruby {
code => "event.timestamp.time.localtime"
# timestamp時區鎖定
}
}
output 模組
例子1
output {
# tdout { codec => "rubydebug" }
# 篩選過濾後的內容輸出到終端顯示
elasticsearch { # 匯出到es,最常用的外掛
codec => "json"
# 匯出格式為json
hosts => ["127.0.0.1:9200"]
# ES地址+埠
index => "logstash-slow-%{+YYYY.MM.dd}"
# 匯出到index內,可以使用時間變數
user => "admin"
password => "xxxxxx"
# ES如果有安全認證就使用賬號密碼驗證,無安全認證就不需要
flush_size => 500
# 預設500,logstash一次性攢夠500條的資料在向es傳送
idle_flush_time => 1
# 預設1s,如果1s內沒攢夠500,還是會一次性把資料發給ES
}
}
例子2
output {
redis{ # 輸出到redis的外掛,下面選項根據需求使用
batch => true
# 設為false,一次rpush,發一條資料,true為傳送一批
batch_events => 50
# 一次rpush傳送多少資料
batch_timeout => 5
# 一次rpush消耗多少時間
codec => plain
# 對輸出資料進行codec,避免使用logstash的separate filter
congestion_interval => 1
# 多長時間進項一次擁塞檢查
congestion_threshold => 5
# 限制一個list中可以存在多少個item,當數量足夠時,就會阻塞直到有其他消費者消費list中的資料
data_type => list
# 使用list還是publish
db => 0
# 使用redis的那個資料庫,預設為0號
host => ["127.0.0.1:6379"]
# redis 的地址和埠,會覆蓋全域性埠
key => xxx
# list或channel的名字
password => xxx
# redis的密碼,預設不使用
port => 6379
# 全域性埠,預設6379,如果host已指定,本條失效
reconnect_interval => 1
# 失敗重連的間隔,預設為1s
timeout => 5
# 連線超時的時間
workers => 1
# 工作程序
}
}
常用外掛還有很多,更多的外掛使用可以檢視官方文件
通過上面的介紹,我們大體知道了 logstash 的處理流程:
input => filter => output
接下來就看一完整的應用例子
完整的應用:
Elasticsearch slow-log
input {
file {
path => ["/var/log/elasticsearch/private_test_index_search_slowlog.log"]
start_position => "beginning"
ignore_older => 0
# sincedb_path => "/dev/null"
type => "elasticsearch_slow"
}
}
filter {
grok {
match => { "message" => "^\[(\d\d){1,2}-(?:0[1-9]|1[0-2])-(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])\s+(?:2[0123]|[01]?[0-9]):(?:[0-5][0-9]):(?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)\]\[(TRACE|DEBUG|WARN\s|INFO\s)\]\[(?<io_type>[a-z\.]+)\]\s\[(?<node>[a-z0-9\-\.]+)\]\s\[(?<index>[A-Za-z0-9\.\_\-]+)\]\[\d+\]\s+took\[(?<took_time>[\.\d]+(ms|s|m))\]\,\s+took_millis\[(\d)+\]\,\s+types\[(?<types>([A-Za-z\_]+|[A-Za-z\_]*))\]\,\s+stats\[\]\,\s+search_type\[(?<search_type>[A-Z\_]+)\]\,\s+total_shards\[\d+\]\,\s+source\[(?<source>[\s\S]+)\]\,\s+extra_source\[[\s\S]*\]\,\s*$" }
remove_field => ["message"]
}
date {
match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]
}
ruby {
code => "event.timestamp.time.localtime"
}
}
output {
elasticsearch {
codec => "json"
hosts => ["127.0.0.1:9200"]
index => "logstash-elasticsearch-slow-%{+YYYY.MM.dd}"
user => "admin"
password => "xxxx"
}
}
Mysql-slow log
input {
file {
path => "/var/lib/mysql/slow.log"
start_position => "beginning"
ignore_older => 0
# sincedb_path => "/dev/null"
type => "mysql-slow"
}
}
filter {
if ([message] =~ "^(\/usr\/local|Tcp|Time)[\s\S]*") { drop {} }
multiline {
pattern => "^\#\s+Time\:\s+\d+\s+(0[1-9]|[12][0-9]|3[01]|[1-9])"
negate => true
what => "previous"
}
grok {
match => { "message" => "^\#\sTime\:\s+\d+\s+(?<datetime>%{TIME})\n+\#\[email protected]\:\s+[A-Za-z0-9\_]+\[(?<mysql_user>[A-Za-z0-9\_]+)\]\[email protected]\s+(?<mysql_host>[A-Za-z0-9\_]+)\s+\[\]\n+\#\s+Query\_time\:\s+(?<query_time>[0-9\.]+)\s+Lock\_time\:\s+(?<lock_time>[0-9\.]+)\s+Rows\_sent\:\s+(?<rows_sent>\d+)\s+Rows\_examined\:\s+(?<rows_examined>\d+)(\n+|\n+use\s+(?<dbname>[A-Za-z0-9\_]+)\;\n+)SET\s+timestamp\=\d+\;\n+(?<slow_message>[\s\S]+)$"
}
remove_field => ["message"]
}
date {
match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]
}
ruby {
code => "event.timestamp.time.localtime"
}
}
output {
elasticsearch {
codec => "json"
hosts => ["127.0.0.1:9200"]
index => "logstash-mysql-slow-%{+YYYY.MM.dd}"
user => "admin"
password => "xxxxx"
}
}
Nginx access.log
logstash 中內建 nginx 的正則,我們只要稍作修改就能使用
將下面的內容寫入到/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-
patterns-core-2.0.5/patterns/grok-patterns 檔案中
X_FOR (%{IPV4}|-)
NGINXACCESS %{COMBINEDAPACHELOG} \"%{X_FOR:http_x_forwarded_for}\"
ERRORDATE %{YEAR}/%{MONTHNUM}/%{MONTHDAY} %{TIME}
NGINXERROR_ERROR %{ERRORDATE:timestamp}\s{1,}\[%{DATA:err_severity}\]\s{1,}(%{NUMBER:pid:int}#%{NUMBER}:\s{1,}\*%{NUMBER}|\*%{NUMBER}) %{DATA:err_message}(?:,\s{1,}client:\s{1,}(?<client_ip>%{IP}|%{HOSTNAME}))(?:,\s{1,}server:\s{1,}%{IPORHOST:server})(?:, request: %{QS:request})?(?:, host: %{QS:server_ip})?(?:, referrer:\"%{URI:referrer})?
NGINXERROR_OTHER %{ERRORDATE:timestamp}\s{1,}\[%{DATA:err_severity}\]\s{1,}%{GREEDYDATA:err_message}
之後的 log 配置檔案如下
input {
file {
path => [ "/var/log/nginx/www-access.log" ]
start_position => "beginning"
# sincedb_path => "/dev/null"
type => "nginx_access"
}
}
filter {
grok {
match => { "message" => "%{NGINXACCESS}"}
}
mutate {
convert => [ "response","integer" ]
convert => [ "bytes","integer" ]
}
date {
match => [ "timestamp","dd/MMM/yyyy:HH:mm:ss Z"]
}
ruby {
code => "event.timestamp.time.localtime"
}
}
output {
elasticsearch {
codec => "json"
hosts => ["127.0.0.1:9200"]
index => "logstash-nginx-access-%{+YYYY.MM.dd}"
user => "admin"
password => "xxxx"
}
}
Nginx error.log
input {
file {
path => [ "/var/log/nginx/www-error.log" ]
start_position => "beginning"
# sincedb_path => "/dev/null"
type => "nginx_error"
}
}
filter {
grok {
match => [
"message","%{NGINXERROR_ERROR}",
"message","%{NGINXERROR_OTHER}"
]
}
ruby {
code => "event.timestamp.time.localtime"
}
date {
match => [ "timestamp","dd/MMM/yyyy:HH:mm:ss"]
}
}
output {
elasticsearch {
codec => "json"
hosts => ["127.0.0.1:9200"]
index => "logstash-nginx-error-%{+YYYY.MM.dd}"
user => "admin"
password => "xxxx"
}
}
PHP error.log
input {
file {
path => ["/var/log/php/error.log"]
start_position => "beginning"
# sincedb_path => "/dev/null"
type => "php-fpm_error"
}
}
filter {
multiline {
pattern => "^\[(0[1-9]|[12][0-9]|3[01]|[1-9])\-%{MONTH}-%{YEAR}[\s\S]+"
negate => true
what => "previous"
}
grok {
match => { "message" => "^\[(?<timestamp>(0[1-9]|[12][0-9]|3[01]|[1-9])\-%{MONTH}-%{YEAR}\s+%{TIME}?)\s+[A-Za-z]+\/[A-Za-z]+\]\s+(?<category>(?:[A-Z]{3}\s+[A-Z]{1}[a-z]{5,7}|[A-Z]{3}\s+[A-Z]{1}[a-z\s]{9,11}))\:\s+(?<error_message>[\s\S]+$)" }
remove_field => ["message"]
}
date {
match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]
}
ruby {
code => "event.timestamp.time.localtime"
}
}
output {
elasticsearch {
codec => "json"
hosts => ["127.0.0.1:9200"]
index => "logstash-php-error-%{+YYYY.MM.dd}"
user => "admin"
password => "xxxxx"
}
}
Php-fpm slow-log
input {
file {
path => ["/var/log/php-fpm/www.slow.log"]
start_position => "beginning"
# sincedb_path => "/dev/null"
type => "php-fpm_slow"
}
}
filter {
multiline {
pattern => "^$"
negate => true
what => "previous"
}
grok {
match => { "message" => "^\[(?<timestamp>(0[1-9]|[12][0-9]|3[01]|[1-9])\-%{MONTH}-%{YEAR}\s+%{TIME})\]\s+\[[a-z]{4}\s+(?<pool>[A-Za-z0-9]{1,8})\]\s+[a-z]{3}\s+(?<pid>\d{1,7})\n(?<slow_message>[\s\S]+$)" }
remove_field => ["message"]
}
date {
match => ["timestamp","dd-MMM-yyyy:HH:mm:ss Z"]
}
ruby {
code => "event.timestamp.time.localtime"
}
}
output {
elasticsearch {
codec => "json"
hosts => ["127.0.0.1:9200"]
index => "logstash-php-fpm-slow-%{+YYYY.MM.dd}"
user => "admin"
password => "xxxx"
}
}
log 解析配置檔案統一放在/etc/logstash/conf.d 目錄下,不過也可以任意放
置,統一起來最好。
在多個配置檔案的時候,不能使用如下命令執行logstash:
/opt/logstash/bin/logstash -f /etc/logstash/conf.d/(或者有個*)
這個命令會拼接配置檔案,不會單個使用,會報錯。
如果有多個配置檔案,就一個一個啟動:
/opt/logstash/bin/logstash -f /etc/logstash/conf.d/nginx_error.conf
但是這樣也很麻煩,如果配置檔案很多的情況下需要一個個來,並且啟動
速度還很慢,所以我寫了一個測試指令碼用來方便使用,僅供參考:
#!/bin/bash
conf_path=/etc/logstash/conf.d
# /配置檔案存放目錄根據需求自己更改
conf_name=$( ls ${conf_path} )
case $1 in
start)
echo "-----------please wait.----------"
echo "The start-up process is too slow."
for cf in ${conf_name}
do
/opt/logstash/bin/logstash -f $conf_path/$cf > /dev/null 2>&1 &
if [ $? -ne 0 ];then
echo 'The '${cf}' start-up failed.'
fi
sleep 20
done
echo "start-up success."
;;
stop)
ps -ef |grep logstash |grep -v grep > /dev/null 2>&1
if [ $? -eq 0 ];then
ps -ef|grep logstash |grep -v grep |awk '{print $2}'|xargs kill -9 > /dev/null 2>&1
sleep 2
echo "Stop success."
fi
;;
restart)
ps -ef |grep logstash |grep -v grep 2>&1
if [ $? -eq 0 ];then
ps -ef|grep logstash |grep -v grep |awk '{print $2}'|xargs kill -9 > /dev/null 2>&1
sleep 3
echo "Stop success."
fi
echo "-----------please wait.----------"
echo "The start-up process is too slow."
for cf in ${conf_name}
do
/opt/logstash/bin/logstash -f $conf_path/$cf > /dev/null 2>&1 &
if [ $? -ne 0 ];then
echo 'The '${cf}' start-up failed.'
fi
sleep 10
done
echo "start-up success."
;;
*)
echo "Usage: "$0" {start|stop|restart|}"
exit 1
esac
指令碼的名字中不要包含 logstash,這裡儲存為 log_stash.sh
使用./log_stash.sh (start|stop|restart) 來執行指令碼