5,Logstash正則提取Nginx日誌
阿新 • • 發佈:2020-11-01
Logstash正則提取Nginx日誌 為什麼需要提取?使用一整行日誌無法分析,需要提取單獨的欄位 分析哪個IP訪問量大 分析Nginx的響應狀態碼 Nginx日誌格式 192.168.238.90 - - [01/Aug/2020:14:53:35 +0800] "GET /sjg666 HTTP/1.1" 404 3650 "-" "Chrome xxx" "-" Nginx日誌格式配置 log_format main '$remote_addr - $remote_user [$time_local] "$request" ' '$status $body_bytes_sent "$http_referer" ' '"$http_user_agent" "$http_x_forwarded_for"'; Grok提取利器,需要掌握正則表示式。藉助Kibana的Grok工具驗證提取 自寫正則提取(建議) 內建規則提取(簡化):/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns/grok-patterns Grok自寫正則提取語法:(?<欄位名>自寫正則表示式) (?<remote_addr>\d+\.\d+\.\d+\.\d+) 內建正則提取語法:%{內建正則表示式:欄位名} %{IP:remote_addr} - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent} 混合語法提取 (?<remote_addr>\d+\.\d+\.\d+\.\d+) - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] 普通正則表示式符號 . 表示任意一個字元,* 表示前面一個字元出現0次或者多次 [abc]表示中括號內任意一個字元,[^abc]表示非中括號內的字元 [0-9]表示數字,[a-z]表示小寫字母,[A-Z]表示大寫字母,[a-zA-Z]表示所有字母,[a-zA-Z0-9]表示所有字母+數字 [^0-9]表示非數字 ^xx表示以xx開頭,xx$表示以xx結尾 \s表示空白字元,\S表示非空白字元,\d表示數字 擴充套件正則表示式,在普通正則基礎上再進行擴充套件 ?表示前面字元出現0或者1次,+前面字元出現1或者多次 {a}表示前面字元匹配a次,{a,b}表示前面字元匹配a到b次 {,b}表示前面字元匹配0次到b次,{a,}前面字元匹配a或a+次 string1|string2表示匹配string1或者string2 Logstash正則提取Nginx寫入ES Logstash提取欄位配置 input { file { path => "/var/log/nginx/access.log" } } filter { grok { match => { "message" => '%{IP:remote_addr} - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}' } remove_field => ["message"] } } output { elasticsearch { hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"] user => "elastic" password => "sjgpwd" index => "sjgnginx-%{+YYYY.MM.dd}" } } Kibana顯示感嘆號問題處理 Kibana索引重新整理 Kibana索引的操作並不會影響到資料,刪除重建也沒問題 Logstash欄位特殊處理-替換或轉型別 http_user_agent包含雙引號,需要去除 filter { grok { match => { "message" => '%{IP:remote_addr} - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}' } remove_field => ["message"] } mutate { gsub => [ "http_user_agent",'"',"" ] } } Logstash字串轉整形 mutate{ gsub => [ "http_user_agent",'"',"" ] convert => { "status" => "integer" } convert => { "body_bytes_sent" => "integer" } } Logstash替換時間戳@timestamp Nginx模擬使用者訪問 while true;do curl 192.168.238.90/sjg666 curl 127.0.0.1 sleep 2 done 場景假設 假設我們要分析使用者昨天的訪問日誌 Logstash分析所有Nginx日誌,發現問題 input { file { path => "/var/log/nginx/access.log" start_position => "beginning" sincedb_path => "/dev/null" } } 兩種時間 傳送日誌時間,無法分析日誌 使用者的訪問時間在日誌裡,需要以日誌裡的為準,分析的結果才準確 以使用者訪問時間為準,格式為01/Aug/2020:10:34:20 +0800 filter { grok { match => { "message" => '%{IP:remote_addr} - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}' } remove_field => ["message"] } date { match => ["time_local", "dd/MMM/yyyy:HH:mm:ss Z"] target => "@timestamp" } } 日誌裡如果有不同的時間格式,覆蓋的時候格式要對應 20/Feb/2019:14:50:06 -> dd/MMM/yyyy:HH:mm:ss 2016-08-24 18:05:39,830 -> yyyy-MM-dd HH:mm:ss,SSS 手動統計Nginx的請求和網頁顯示進行對比 cat /var/log/nginx/access.log |awk '{print $4}'|sed 's/:[0-9][0-9]$//g'|sort |uniq -c 時間戳覆蓋後刪除 mutate { gsub => [ "http_user_agent",'"',"" ] convert => { "status" => "integer" } convert => { "body_bytes_sent" => "integer" } remove_field => ["time_local"] } Logstash正則提取異常處理 Logstash改成分析最新日誌 input { file { path => "/var/log/nginx/access.log" } } 正則提取有異常的情況 echo "sjgmethods xxx xxx" >> /var/log/nginx/access.log tags: _grokparsefailure 設定正則出錯提取到另外的索引裡 output { if "_grokparsefailure" not in [tags] and "_dateparsefailure" not in [tags] { elasticsearch { hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"] user => "elastic" password => "sjgpwd" index => "sjgnginx-%{+YYYY.MM.dd}" } } else{ elasticsearch { hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"] user => "elastic" password => "sjgpwd" index => "sjgfail-%{+YYYY.MM.dd}" } } } Kibana圖形簡單使用 模擬資料 while true;do curl 192.168.238.90/sjg666; curl 127.0.0.1; sleep 2; done 首頁區域 可以根據時間檢視訪問量:每分鐘訪問量 可以根據某個欄位查詢 可以單獨看某個欄位的統計 Kibana圖形有建立,選擇terms去檢視對應的資料 餅圖的建立 pie_remote_addr 表的建立 table_remote_addr Kibana面板的建立sjg_dash 建立面板 在面板上新增圖形 建議採用Grafana展示 Logstash分析Linux系統日誌 預設的日誌格式 Aug 3 18:37:57 sjg1 sshd[1318]: Accepted password for root from xxx port 49205 ssh2 無年份的欄位 系統日誌配置/etc/rsyslog.conf,重啟rsyslog $template sjgformat,"%$NOW% %TIMESTAMP:8:15% %hostname% %syslogtag% %msg%\n" $ActionFileDefaultTemplate sjgformat 日誌格式 2020-08-03 18:47:34 sjg1 sshd[1522]: Accepted password for root from 58.101.14.103 port 49774 ssh2 %{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE} %{NOTSPACE:procinfo}: (?<secinfo>.*) 只讀許可權新增 chmod +r secure 提取secure日誌,messages等其它日誌提取原理類似 input { file { path => "/var/log/secure" } } filter { grok { match => { "message" => '%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE} %{NOTSPACE:procinfo}: (?<secinfo>.*)' } remove_field => ["message"] } date { match => ["timestamp", "yyyy-MM-dd HH:mm:ss"] target => "@timestamp" } mutate { remove_field => ["timestamp"] } } output { elasticsearch { hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"] user => "elastic" password => "sjgpwd" index => "sjgsecure-%{+YYYY.MM.dd}" } }