1. 程式人生 > >ELK之Logstash

ELK之Logstash

Logstash 是一個接收、處理、轉發日誌的工具,支援系統日誌、webserver 日誌、錯誤日誌、應用日誌、總之包括所有可以丟擲來的日誌型別。在一個典型的使用場景下(ELK):用 Elasticsearch 作為後臺資料的儲存,kibana用來前端的報表展示。Logstash 在其過程中擔任搬運工的角色,它為資料儲存、報表查詢和日誌解析建立了一個功能強大的管道鏈。Logstash 提供了多種多樣的 input,filters,codecs 和 output 元件,讓使用者輕鬆實現強大的功能。Logstash 收集資料物件就是日誌檔案,使用 Logstash 對日誌檔案進行收集和統一過濾,變成可讀性高的內容,方便開發者或運維人員觀察,從而有效的分析系統/專案執行的效能,做好監控和預警的準備工作等。

1 組成結構

Logstash 通過管道進行運作,管道有兩個必需的元素,輸入和輸出,還有一個可選的元素,過濾器。輸入外掛從資料來源獲取資料,過濾器外掛根據使用者指定的資料格式修改資料,輸出外掛則將資料寫入到目的地。

在生產環境中,Logstash 的管道要複雜很多,可能需要配置多個輸入、過濾器和輸出外掛。因此,需要一個配置檔案管理輸入、過濾器和輸出相關的配置。配置檔案內容格式如下:

# 日誌匯入
input {
    ...
}

# 日誌篩選匹配處理
filter {
    ...
}

# 日誌匹配輸出
output {
    ...
}

日誌解析配置檔案的框架共分為三個模組,input,output,filter。

2 外掛用法

在使用外掛之前,我們先了解一個概念:事件。Logstash 每讀取一次資料的行為叫做事件。

Logstash 的處理流程: input => filter => output 

如何執行按指定配置檔案執行?

/opt/logstash/bin/logstash –w 2 -f /etc/logstash/conf.d/test.conf

引數說明:

-w # 指定執行緒,預設是 cpu 核數

-f # 指定配置檔案

-t # 測試配置檔案是否正常

-b # 執行 filter 模組之前最大能積累的日誌,數值越大效能越好,同時越佔記憶體

2.1 input 模組

示例1:

input {
    # file為常用檔案外掛,外掛內選項很多,可根據需求自行判斷
    file {
        path => "/var/lib/mysql/slow.log"
        # 要匯入的檔案的位置,可以使用*,例如/var/log/nginx/*.log
        Excude =>”*.gz”
        # 要排除的檔案
        start_position => "beginning"
        # 從檔案開始的位置開始讀,end表示從結尾開始讀
        ignore_older => 0  
        # 多久之內沒修改過的檔案不讀取,0為無限制,單位為秒
        sincedb_path => "/dev/null"
        # 記錄檔案上次讀取位置,輸出到null表示每次都從檔案首行開始解析
        type => "mysql-slow"
        # type欄位,可表明匯入的日誌型別
    }   
}

示例2:

input {
    # redis外掛為常用外掛,外掛內選項很多,可根據需求自行判斷, 還包括kafka、tcp
    redis {
        batch_count => 1 
        # EVAL命令返回的事件數目,設定為5表示一次請求返回5條日誌資訊
        data_type => "list" 
        # logstash redis外掛工作方式
        key => "logstash-test-list" 
        # 監聽的鍵值
        host => "127.0.0.1" 
        # redis地址
        port => 6379 
        # redis埠號
        password => "123qwe" 
        # 如果有安全認證,此項為認證密碼
        db => 0 
        # 如果應用使用了不同的資料庫,此為redis資料庫的編號,預設為0。
        threads => 1 
        # 啟用執行緒數量
      }
}

2.2 output模組

示例1:

output {
    # tdout { codec => "rubydebug" }
    # 篩選過濾後的內容輸出到終端顯示

    elasticsearch {  # 匯出到es,最常用的外掛
        codec => "json"
        # 匯出格式為json
        hosts => ["127.0.0.1:9200"]
        # ES地址+埠
        index => "logstash-slow-%{+YYYY.MM.dd}"
        # 匯出到index內,可以使用時間變數
        user => "admin"
        password => "xxxxxx"
        # ES如果有安全認證就使用賬號密碼驗證,無安全認證就不需要
        flush_size => 500
        # 預設500,logstash一次性攢夠500條的資料在向es傳送
        idle_flush_time => 1
        # 預設1s,如果1s內沒攢夠500,還是會一次性把資料發給ES
    }   
}

示例2:

output {
     redis{  # 輸出到redis的外掛,下面選項根據需求使用
         batch => true
         # 設為false,一次rpush,發一條資料,true為傳送一批
         batch_events => 50
         # 一次rpush傳送多少資料
         batch_timeout => 5
         # 一次rpush消耗多少時間
         codec => plain
         # 對輸出資料進行codec,避免使用logstash的separate filter
         congestion_interval => 1
         # 多長時間進項一次擁塞檢查
         congestion_threshold => 5
         # 限制一個list中可以存在多少個item,當數量足夠時,就會阻塞直到有其他消費者消費list中的資料
         data_type => list
         # 使用list還是publish
         db => 0
         # 使用redis的那個資料庫,預設為0號
         host => ["127.0.0.1:6379"]
         # redis 的地址和埠,會覆蓋全域性埠
         key => xxx
         # list或channel的名字
         password => xxx
         # redis的密碼,預設不使用
         port => 6379
         # 全域性埠,預設6379,如果host已指定,本條失效
         reconnect_interval => 1
         # 失敗重連的間隔,預設為1s
         timeout => 5
         # 連線超時的時間
         workers => 1
         # 工作程序
     }
}

2.3完整的示例

示例1:Elasticsearch slow-log

input {
    file {
        path => ["/var/log/elasticsearch/private_test_index_search_slowlog.log"]
        start_position => "beginning"
        ignore_older => 0
        # sincedb_path => "/dev/null"
        type => "elasticsearch_slow"
        }   
}

filter {
    grok {
        match =>  { "message" => "^\[(\d\d){1,2}-(?:0[1-9]|1[0-2])-(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])\s+(?:2[0123]|[01]?[0-9]):(?:[0-5][0-9]):(?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)\]\[(TRACE|DEBUG|WARN\s|INFO\s)\]\[(?<io_type>[a-z\.]+)\]\s\[(?<node>[a-z0-9\-\.]+)\]\s\[(?<index>[A-Za-z0-9\.\_\-]+)\]\[\d+\]\s+took\[(?<took_time>[\.\d]+(ms|s|m))\]\,\s+took_millis\[(\d)+\]\,\s+types\[(?<types>([A-Za-z\_]+|[A-Za-z\_]*))\]\,\s+stats\[\]\,\s+search_type\[(?<search_type>[A-Z\_]+)\]\,\s+total_shards\[\d+\]\,\s+source\[(?<source>[\s\S]+)\]\,\s+extra_source\[[\s\S]*\]\,\s*$" }
        remove_field => ["message"]
        }   

    date {
        match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"] 
        }   
    ruby {
        code => "event.timestamp.time.localtime"
        }   
    }

output {
     elasticsearch {
         codec => "json"
         hosts => ["127.0.0.1:9200"]
         index => "logstash-elasticsearch-slow-%{+YYYY.MM.dd}"
         user => "admin"
         password => "xxxx"
    }   

}

示例2:Mysql-slow log

input {
    file {
        path => "/var/lib/mysql/slow.log"
        start_position => "beginning"
        ignore_older => 0
        # sincedb_path => "/dev/null"
        type => "mysql-slow"
    }   
}
filter {
    if ([message] =~ "^(\/usr\/local|Tcp|Time)[\s\S]*") { drop {} }
    multiline {
        pattern => "^\#\s+Time\:\s+\d+\s+(0[1-9]|[12][0-9]|3[01]|[1-9])"
        negate => true
        what => "previous"
    }   
    grok {
        match => { "message" => "^\#\sTime\:\s+\d+\s+(?<datetime>%{TIME})\n+\#\[email protected]\:\s+[A-Za-z0-9\_]+\[(?<mysql_user>[A-Za-z0-9\_]+)\]\[email protected]\s+(?<mysql_host>[A-Za-z0-9\_]+)\s+\[\]\n+\#\s+Query\_time\:\s+(?<query_time>[0-9\.]+)\s+Lock\_time\:\s+(?<lock_time>[0-9\.]+)\s+Rows\_sent\:\s+(?<rows_sent>\d+)\s+Rows\_examined\:\s+(?<rows_examined>\d+)(\n+|\n+use\s+(?<dbname>[A-Za-z0-9\_]+)\;\n+)SET\s+timestamp\=\d+\;\n+(?<slow_message>[\s\S]+)$"
   }   
        remove_field => ["message"]
   }   
    date {
        match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"] 
    }   
    ruby {
        code => "event.timestamp.time.localtime"
    }   
}
output { 
    elasticsearch {
        codec => "json"
        hosts => ["127.0.0.1:9200"]
        index => "logstash-mysql-slow-%{+YYYY.MM.dd}"
        user => "admin"
        password => "xxxxx"
    }   
}