logstash常用配置檔案總結
目錄
logstash配置檔案示例
input { beats { port => "5011" } } filter { if "include" in [tags] { mutate{ split => ["message","123"] add_field => { "beforeinclude" => "%{[message][0]}" } } mutate{ gsub => ["beforeinclude", ",", ""] } mutate{ add_field => { "StandardizedMessage" => "%{beforeinclude}123%{[message][1]}" } } } if "include" in [tags] or "exclude" in [tags]{ #分割“ mutate{ add_field => { "StandardizedMessage2" => "%{StandardizedMessage}" } } mutate{ #貌似不能用雙引號分割 gsub => ["StandardizedMessage2", "\"", "||"] } mutate{ split => ["StandardizedMessage2","||"] add_field => { "PAGE" => "%{[StandardizedMessage2][3]}" } } mutate{ split => ["StandardizedMessage"," "] add_field => { "URL" => "%{[StandardizedMessage][7]}" } add_field => { "INFO" => "%{[StandardizedMessage][0]}" } } mutate{ gsub => ["INFO", "\"", ""] } truncate { fields => [ "URL" ] length_bytes => 1000 } grok { match => { "message" => "%{HTTPDATE:requesttimestamp}" } } mutate{ add_field => { "tem1" => "%{requesttimestamp}" } remove_field => [ "requesttimestamp" ] } date{ match=>["tem1","dd/MMM/yyyy:HH:mm:ss Z"] target=>"logdatetime" } mutate{ remove_field => [ "tem1" ] } ruby{ code => "event.set('tem2', (event.get('logdatetime').time.localtime + 8*60*60))" } mutate{ add_field => { "tmp3" => "%{tem2}" } remove_field => [ "tem2" ] } mutate{ split => ["tmp3", "."] add_field => { "ACCESS_TIME" => "%{[tmp3][0]}" } remove_field => [ "tmp3" ] } mutate{ gsub => ["ACCESS_TIME", "T", " "] } } } output{ if "exclude" in [tags] or "include" in [tags] { jdbc { driver_jar_path => "/home/elk/jar/jdbc/mysql-connector-java-5.1.46-bin.jar" driver_class => "com.mysql.jdbc.Driver" connection_string => "jdbc:mysql://192.168.2.123:3306/cm?user=123&password=123" statement => [ "INSERT INTO buff (URL,ACCESS_TIME) VALUES(?, ?)", "URL","ACCESS_TIME"] } } }
可以參考Logstash最佳實踐
http://doc.yonyoucloud.com/doc/logstash-best-practice-cn/index.html
logstash的配置檔案總共分為三個部分 輸入input 處理filter 輸出output
輸入input
filebeat
從filebeat中傳輸過來資料(埠為filebeat中配置的埠)
input {
beats {
port => "5011"
}
}
標準輸入(螢幕)
input{stdin{}}
讀取檔案(File)
input
file {
path => ["/var/log/*.log", "/var/log/message"]
type => "system"
start_position => "beginning"
}
}
有一些比較有用的配置項,可以用來指定 FileWatch 庫的行為:
discover_interval
logstash 每隔多久去檢查一次被監聽的 path 下是否有新檔案。預設值是 15 秒。
exclude
不想被監聽的檔案可以排除出去,這裡跟 path 一樣支援 glob 展開。
sincedb_path
如果你不想用預設的 $HOME/.sincedb(Windows 平臺上在 C:\Windows\System32\config\systemprofile\.sincedb),可以通過這個配置定義 sincedb 檔案到其他位置。
sincedb_write_interval
logstash 每隔多久寫一次 sincedb 檔案,預設是 15 秒。
stat_interval
logstash 每隔多久檢查一次被監聽檔案狀態(是否有更新),預設是 1 秒。
start_position
logstash 從什麼位置開始讀取檔案資料,預設是結束位置,也就是說 logstash 程序會以類似 tail -F 的形式執行。如果你是要匯入原有資料,把這個設定改成 "beginning",logstash 程序就從頭開始讀取,有點類似 cat,但是讀到最後一行不會終止,而是繼續變成 tail -F。
讀取網路資料(TCP)
input {
tcp {
port => 8888
mode => "server"
ssl_enable => false
}
}
讀取mysql
input {
stdin { }
jdbc {
jdbc_connection_string => "jdbc:mysql://localhost:3306/database"
jdbc_user => "root"
jdbc_password => "root"
jdbc_driver_library => "C:/Program Files (x86)/MySQL/Connector.J 5.1/mysql-connector-java-5.1.40-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"<br>
statement => "SELECT * FROM session"<br>
schedule => "* * * * *"
}
}
處理filter
基礎知識
變數message為logstash 從input得到的一整條字串。
可以用"%{AAA}"代表變數名AAA這個變數的值
"%{[AAA][3]}"代表變數名AAA這個陣列變數(通常用split分割後)的第四個變數的值(陣列從0開始)
filter 區段之內,是順序執行的。
if使用
第一種用法是if 標籤
如果這一次進入的流是job-exclude標籤或job-include標籤,則執行大括號內的操作
if "job-exclude" in [tags] or "job-include" in [tags]{
mutate{
split => ["message","123"]
add_field => {
"beforeexclude" => "%{[message][0]}"
}
}
}
第二種用法是變數的表示式
if "123" in [Name] {
mutate { remove_field => [ "Name" ] }
}
如果Name變數中有123這個字串,則。。。
Grok 正則捕獲
在grok中,支援以正則表示式的方式提取所需要的資訊,其中,正則表示式又分兩種,一種是內建的正則表示式(可以滿足我們大部分的要求),一種是自定義的正則表示式,形式分別如下:
# 內建的正則表示式方式,下面的寫法表示從輸入的訊息中提取IP欄位,並命名為sip
%{IP:sip}
# 自定義的正則表示式,開始與終止符分別為(?與?),下面的寫法表示獲取除,以外的字元,並命名為log_type
(?<log_type>[^,]+?)
以一個具體的例子來說明grok用法,假定需要解析的訊息內容樣本如下:
日誌型別:僵屍網路日誌, 源IP:192.168.101.251, 源埠:63726, 目的IP:124.127.48.41, 目的埠:1390, 攻擊型別:異常流量, 嚴重級別:低, 系統動作:被記錄, URL:-\n
為了快速判斷我們的正則表示式是否正確,不妨進行線上測試,地址為(線上測試地址)[http://grokdebug.herokuapp.com/],最後的結果為:
^日誌型別:(?<log_type>[^,]+?), 源IP:%{IP:sip}, 源埠:%{NUMBER:sport:int}, 目的IP:%{IP:dip}, 目的埠:%{NUMBER:dport:int}, 攻擊型別:(?<att_type>[^,]+?), 嚴重級別:(?<slevel>[^,]{1,}?), 系統動作:(?<sys_act>[^,]{1,}?), URL:(?<url>.+)$
在上面的表示式中,為了提高正則表示式的解析效率,我們需要進行整行匹配,於是添加了正則表示式的開始與結尾字元“^$”,此外為了便於統計與分析,我們還需要對相關型別進行轉換(如將字串轉換為整數),例如我們所需要的埠為整數,表示式為%{NUMBER:dport:int}。需要注意的是,grok也就支援兩種資料型別轉換,分別為float與int。
例子
grok {
match => { "message" => "%{HTTPDATE:requesttimestamp}" }
}
將message中匹配httpdate格式的東西放入變數requesttimestamp
時間處理(Date)
filter {
grok {
match => ["message", "%{HTTPDATE:logdate}"]
}
date {
match => ["logdate", "dd/MMM/yyyy:HH:mm:ss Z"]
}
}
注意:時區偏移量只需要用一個字母 Z 即可。
將變數匹配為相應格式
具體格式:
http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html
資料修改(Mutate)(重要)
filters/mutate 外掛是 Logstash 另一個重要外掛。它提供了豐富的基礎型別資料處理能力。
變數增減
mutate{
add_field => {
"StandardizedMessage2" => "%{StandardizedMessage}"
}
}
新建變數名為StandardizedMessage2,它的值為名字為StandardizedMessage這個變數的值
mutate{
remove_field => [ "tem1" ]
}
刪除tem1這個變數
型別轉換
型別轉換是 filters/mutate 外掛最初誕生時的唯一功能。
可以設定的轉換型別包括:"integer","float" 和 "string"。示例如下:
filter {
mutate {
convert => ["request_time", "float"]
}
}
注意:mutate 除了轉換簡單的字元值,還支援對陣列型別的欄位進行轉換,即將 ["1","2"] 轉換成 [1,2]。但不支援對雜湊型別的欄位做類似處理。有這方面需求的可以採用稍後講述的 filters/ruby 外掛完成。
字串處理
gsub(替換)
僅對字串型別欄位有效
例如
gsub => ["beforeinclude", ",", "|"]
將beforeinclude這個變數的逗號換為|
gsub => ["urlparams", "[\\?#]", "_"]
split(分割)
filter {
mutate {
split => ["message", "|"]
}
}
隨意輸入一串以|分割的字元,比如 "123|321|adfd|dfjld*=123",可以看到如下輸出:
{
"message" => [
[0] "123",
[1] "321",
[2] "adfd",
[3] "dfjld*=123"
],
"@version" => "1",
"@timestamp" => "2014-08-20T15:58:23.120Z",
"host" => "raochenlindeMacBook-Air.local"
}
注意split分割是永久性的,如果要用兩種東西分割,請先用變數A=這個變數B,再分割變數a和b
具體使用如
mutate{
split => ["message","123"]
add_field => {
"before" => "%{[message][0]}"
}
}
join(合併)
僅對陣列型別欄位有效
我們在之前已經用 split 割切的基礎再 join 回去。配置改成:
filter {
mutate {
split => ["message", "|"]
}
mutate {
join => ["message", ","]
}
}
12|23 改為12,34
merge(合併陣列)
合併兩個陣列或者雜湊欄位。把後面的變數合併到前面的那個
依然在之前 split 的基礎上繼續:
filter {
mutate {
split => ["message", "|"]
}
mutate {
merge => ["message", "message"]
}
}
把後面的變數合併到前面的那個
我們會看到輸出:
{
"message" => [
[0] "123",
[1] "321",
[2] "adfd",
[3] "dfjld*=123",
[4] "123",
[5] "321",
[6] "adfd",
[7] "dfjld*=123"
],
"@version" => "1",
"@timestamp" => "2014-08-20T16:05:53.711Z",
"host" => "raochenlindeMacBook-Air.local"
}
如果 src 欄位是字串,會自動先轉換成一個單元素的陣列再合併。把上一示例中的來源欄位改成 "host":
filter {
mutate {
split => ["message", "|"]
}
mutate {
merge => ["message", "host"]
}
}
結果變成:
{
"message" => [
[0] "123",
[1] "321",
[2] "adfd",
[3] "dfjld*=123",
[4] "raochenlindeMacBook-Air.local"
],
"@version" => "1",
"@timestamp" => "2014-08-20T16:07:53.533Z",
"host" => [
[0] "raochenlindeMacBook-Air.local"
]
}
看,目的欄位 "message" 確實多了一個元素,但是來源欄位 "host" 本身也由字串型別變成陣列型別了!
rename
重新命名某個欄位,如果目的欄位已經存在,會被覆蓋掉:
filter {
mutate {
rename => ["syslog_host", "host"]
}
}
update
更新某個欄位的內容。如果欄位不存在,不會新建。
replace
作用和 update 類似,但是當欄位不存在的時候,它會起到 add_field 引數一樣的效果,自動新增新的欄位。
執行次序
需要注意的是,filter/mutate 內部是有執行次序的。其次序如下:
rename(event) if @rename
update(event) if @update
replace(event) if @replace
convert(event) if @convert
gsub(event) if @gsub
uppercase(event) if @uppercase
lowercase(event) if @lowercase
strip(event) if @strip
remove(event) if @remove
split(event) if @split
join(event) if @join
merge(event) if @merge
filter_matched(event)
而 filter_matched 這個 filters/base.rb 裡繼承的方法也是有次序的。
@add_field.each do |field, value|
end
@remove_field.each do |field|
end
@add_tag.each do |tag|
end
@remove_tag.each do |tag|
end
輸出output
mysql寫入
output{
if "exclude" in [tags] or "include" in [tags] {
jdbc {
driver_jar_path => "/home/elk/jar/jdbc/mysql-connector-java-5.1.46-bin.jar"
driver_class => "com.mysql.jdbc.Driver"
connection_string => "jdbc:mysql://192.168.2.123:3306/cm?user=123&password=123"
statement => [ "INSERT INTO buff (URL,ACCESS_TIME) VALUES(?, ?)", "URL","ACCESS_TIME"]
}
}
}
注意 jar包位置,資料庫位置,使用者名稱密碼,以及變數的數量與values的?,後面的那些為變數的名稱,
意味第一個?的值為url變數的值
標準輸出(Stdout)
output {
stdout {
codec => rubydebug
workers => 2
}
}
解釋
輸出外掛統一具有一個引數是 workers。Logstash 為輸出做了多執行緒的準備。
其次是 codec 設定。codec 的作用在之前已經講過。可能除了 codecs/multiline ,其他 codec 外掛本身並沒有太多的設定項。所以一般省略掉後面的配置區段。換句話說。上面配置示例的完全寫法應該是:
output {
stdout {
codec => rubydebug {
}
workers => 2
}
}
單就 outputs/stdout 外掛來說,其最重要和常見的用途就是除錯。所以在不太有效的時候,加上命令列引數 -vv 執行,檢視更多詳細除錯資訊。
儲存成檔案(File)
配置示例
output {
file {
path => "/home/elk/logstash-6.3.1/job.log"
codec => line { format => "custom format: %{message}"}
}
}