ELK 經典用法—企業自定義日誌收集切割和mysql模塊
本文收錄在Linux運維企業架構實戰系列
一、收集切割公司自定義的日誌
很多公司的日誌並不是和服務默認的日誌格式一致,因此,就需要我們來進行切割了。
1、需切割的日誌示例
2018-02-24 11:19:23,532 [143] DEBUG performanceTrace 1145 http://api.114995.com:8082/api/Carpool/QueryMatchRoutes 183.205.134.240 null 972533 310000 TITTL00 HUAWEI 860485038452951 3.1.146 HUAWEI 5.1 113.552344 33.332737 發送響應完成 Exception:(null)
2、切割的配置
在logstash 上,使用fifter 的grok 插件進行切割
input { beats { port => "5044" } } filter { grok { match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{NUMBER:thread:int}\] %{DATA:level} (?<logger>[a-zA-Z]+) %{NUMBER:executeTime:int} %{URI:url} %{IP:clientip} %{USERNAME:UserName} %{NUMBER:userid:int} %{NUMBER:AreaCode:int} (?<Board>[0-9a-zA-Z]+[-]?[0-9a-zA-Z]+) (?<Brand>[0-9a-zA-Z]+[-]?[0-9a-zA-Z]+) %{NUMBER:DeviceId:int} (?<TerminalSourceVersion>[0-9a-z\.]+) %{NUMBER:Sdk:float} %{NUMBER:Lng:float} %{NUMBER:Lat:float} (?<Exception>.*)" } remove_field => "message" } date { match => ["timestamp","dd/MMM/YYYY:H:m:s Z"] remove_field => "timestamp" } geoip { source => "clientip" target => "geoip" database => "/etc/logstash/maxmind/GeoLite2-City.mmdb" } } output { elasticsearch { hosts => ["http://192.168.10.101:9200/"] index => "logstash-%{+YYYY.MM.dd}" document_type => "apache_logs" } }
3、切割解析後效果
4、最終kibana 展示效果
① top10 clientip
② top5 url
③ 根據ip 顯示地理位置
⑤ top10 executeTime
⑥ 其他字段都可進行設置,多種圖案,也可將多個圖形放在一起展示
二、grok 用法詳解
1、簡介
Grok是迄今為止使蹩腳的、無結構的日誌結構化和可查詢的最好方式。Grok在解析 syslog logs、apache and other webserver logs、mysql logs等任意格式的文件上表現完美。
Grok內置了120多種的正則表達式庫,地址:https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns。
2、入門例子
① 示例
55.3.244.1 GET /index.html 15824 0.043
② 分析
這條日誌可切分為5個部分,IP(55.3.244.1)、方法(GET)、請求文件路徑(/index.html)、字節數(15824)、訪問時長(0.043),對這條日誌的解析模式(正則表達式匹配)如下:
%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}
③ 寫到filter中
filter { grok { match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"} } }
④ 解析後效果
client: 55.3.244.1 method: GET request: /index.html bytes: 15824 duration: 0.043
3、解析任意格式日誌
(1)解析任意格式日誌的步驟:
① 先確定日誌的切分原則,也就是一條日誌切分成幾個部分。
② 對每一塊進行分析,如果Grok中正則滿足需求,直接拿來用。如果Grok中沒用現成的,采用自定義模式。
③ 學會在Grok Debugger中調試。
(2)grok 的分類
- 滿足自帶的grok 正則 grok_pattern
① 可以查詢
# less /usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-4.1.1/patterns/grok-patterns
② 使用格式
grok_pattern 由零個或多個 %{SYNTAX:SEMANTIC}組成
例: %{IP:clientip}
其中SYNTAX 是表達式的名字,是由grok提供的:例如數字表達式的名字是NUMBER,IP地址表達式的名字是IP
SEMANTIC 表示解析出來的這個字符的名字,由自己定義,例如IP字段的名字可以是 client
- 自定義SYNTAX
使用格式:(?<field_name>the pattern here)
例:(?<Board>[0-9a-zA-Z]+[-]?[0-9a-zA-Z]+)
(3)正則解析容易出錯,強烈建議使用Grok Debugger調試,姿勢如下(我打開這個網頁不能用)
三、使用mysql 模塊,收集mysql 日誌
1、官方文檔使用介紹
https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-module-mysql.html
2、配置filebeat ,使用mysql 模塊收集mysql 的慢查詢
# vim filebeat.yml
#=========================== Filebeat prospectors =============================
filebeat.modules:
- module: mysql
error:
enabled: true
var.paths: ["/var/log/mariadb/mariadb.log"]
slowlog:
enabled: true
var.paths: ["/var/log/mariadb/mysql-slow.log"]
#----------------------------- Redis output --------------------------------
output.redis:
hosts: ["192.168.10.102"]
password: "ilinux.io"
key: "httpdlogs"
datatype: "list"
db: 0
timeout: 5
3、elk—logstash 切割mysql 的慢查詢日誌
① 切割配置
# vim mysqllogs.conf
input { redis { host => "192.168.10.102" port => "6379" password => "ilinux.io" data_type => "list" key => "httpdlogs" threads => 2 } } filter { grok { match => { "message" => "(?m)^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?<clienthost>\S*) )?\[(?:%{IPV4:clientip})?\]\s+Id:\s+%{NUMBER:row_id:int}\n#\s+Query_time:\s+%{NUMBER:query_time:float}\s+Lock_time:\s+%{NUMBER:lock_time:float}\s+Rows_sent:\s+%{NUMBER:rows_sent:int}\s+Rows_examined:\s+%{NUMBER:rows_examined:int}\n\s*(?:use %{DATA:database};\s*\n)?SET\s+timestamp=%{NUMBER:timestamp};\n\s*(?<sql>(?<action>\w+)\b.*;)\s*(?:\n#\s+Time)?.*$" } } date { match => ["timestamp","dd/MMM/YYYY:H:m:s Z"] remove_field => "timestamp" } } output { elasticsearch { hosts => ["http://192.168.10.101:9200/"] index => "logstash-%{+YYYY.MM.dd}" document_type => "mysql_logs" } }
② 切割後顯示結果
4、kibana 最終顯示效果
① 哪幾個的數據庫最多,例:top2 庫
表無法顯示,因為有些語句不涉及表,切割不出來
② 哪幾個sql語句出現的最多,例:top5 sql語句
③ 哪幾個sql語句出現的最多,例:top5 sql語句
④ 哪幾臺服務器慢查詢日誌生成的最多,例:top5 服務器
⑤ 哪幾個用戶慢查詢日誌生成的最多,例:top2 用戶
可以合並顯示
5、使用mysql 模塊收集mysql 的慢查詢
(1)filebeat 配置和上邊一樣
(2)elk—logstash 切割mysql 的錯誤日誌
# vim mysqllogs.conf
filter { grok { match => { "message" => "(?<timestamp>\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}) %{NUMBER:pid:int} \[%{DATA:level}\] (?<content>.*)" } } date { match => ["timestamp","dd/MMM/YYYY:H:m:s Z"] remove_field => "timestamp" } }
(3)就不在展示結果了
四、ELK 收集多實例日誌
很多情況下,公司資金不足,不會一對一收集日誌;因此,一臺logstash 使用多實例收集處理多臺agent 的日誌很有必要。
1、filebeat 的配置
主要是output 的配置,只需不同agent 指向不同的端口即可
① agent 1 配置指向5044 端口
#----------------------------- Logstash output --------------------------------
output.logstash:
# The Logstash hosts
hosts: ["192.168.10.107:5044"]
② agent 2 配置指向5045 端口
#----------------------------- Logstash output --------------------------------
output.logstash:
# The Logstash hosts
hosts: ["192.168.10.107:5045"]
2、logstash 的配置
針對不同的agent ,input 指定對應的端口
① agent 1
input { beats { port => "5044" } } output { #可以在output 加以區分 elasticsearch { hosts => ["http://192.168.10.107:9200/"] index => "logstash-apache1-%{+YYYY.MM.dd}" document_type => "apache1_logs" } }
② agent 1
input { beats { port => "5045" } } output { #可以在output 加以區分 elasticsearch { hosts => ["http://192.168.10.107:9200/"] index => "logstash-apache2-%{+YYYY.MM.dd}" document_type => "apache2_logs" } }
開啟對應的服務就ok 了。
ELK 經典用法—企業自定義日誌收集切割和mysql模塊