1. 程式人生 > >ELK6.0日誌從收集到處理完整版教程(三)配置檔案講解

ELK6.0日誌從收集到處理完整版教程(三)配置檔案講解

FIlebeat 配置檔案內容詳解:

vi filebeat.yml

filebeat.prospectors:
#日誌型別
- type: log
  enabled: True
  # 日誌路徑可以寫多個,支援萬用字元
  paths:
    - /tmp/test.log
#設定字符集編碼
  encoding: utf-8
#文件型別
  document_type: my-nginx-log
#每十秒掃描一次
  scan_frequency: 10s
# 實際讀取檔案時,每次讀取 16384 位元組
  harverster_buffer_size: 16384
#The maximum number of bytes that a single log message can have. All bytes after max_bytes are discarded and not sent. This setting is especially useful for multiline log messages, which can get large. The default is 10MB (10485760). 一次發生的log大小值;
  max_bytes: 10485760
# 是否從檔案末尾開始讀取
  tail_files: true
#給日誌加上tags方便在logstash過濾日誌的時候做判斷
  tags: ["nginx-access"]
#剔除以.gz 結尾的檔案
  exclude_files: [".gz$"]
#output 部分的配置文件
#配置輸出為kafka,無論你是tar包還是rpm安裝,目錄裡邊會看到官方提供的filebeat.full.yml OR filebeat.reference.yml 這裡邊有filebeat 所有的input 方法和output 方法供你參考;
output.kafka:     
  enabled: true
#kafka 的server,可以配置叢集,例子如下:
  hosts:["ip:9092","ip2:9092","ip3:9092"]
#這個非常重要,filebeat作為provider,把資料輸入到kafka裡邊,logstash 作為消費者去消費這些資訊,logstash的input 中需要這個topic,不然logstash沒有辦法取到資料。
  topic: elk-%{[type]}       
# The number of concurrent load-balanced Kafka output workers. kafka 的併發執行程序
  worker: 2
#當傳輸給kafka 有問題的時候,重試的次數;
  max_retries: 3
#單個kafka請求裡面的最大事件數,預設2048 
  bulk_max_size: 2048
#等待kafka broker響應的時間,預設30s 
  timeout: 30s
#kafka broker等待請求的最大時長,預設10s 
  broker_timeout: 10s
#每個kafka broker在輸出管道中的訊息快取數,預設256 
  channel_buffer_size: 256
#網路連線的保活時間,預設為0,不開啟保活機制 
  keep_alive: 60
#輸出壓縮碼,可選項有none, snappy, lz4 and gzip,預設為gzip (kafka支援的壓縮,資料會先被壓縮,然後被生產者傳送,並且在服務端也是保持壓縮狀態,只有在最終的消費者端才會被解壓縮)
  compression: gzip
#允許的最大json訊息大小,預設為1000000,超出的會被丟棄,應該小於broker的  message.max.bytes(broker能接收訊息的最大位元組數)
  max_message_bytes: 1000000
#kafka的響應返回值,0位無等待響應返回,繼續傳送下一條訊息;1表示等待本地提交(leader broker已經成功寫入,但follower未寫入),-1表示等待所有副本的提交,預設為1 
  required_acks: 0
#The configurable ClientID used for logging, debugging, and auditing purposes. The default is "beats"。客戶端ID 用於日誌怕錯,審計等,預設是beats。
  client_id: beats
#測試配置檔案:/opt/elk/filebeat/filebeat -c  /opt/elk/filebeat/filebeat.yml test config
#如果配置檔案沒有問題的話,會出現config ok ,如果有問題會提示具體問題在哪裡。
#啟動filebeat 
可以先通過 /opt/elk/filebeat-6.2.2-linux-x86_64/filebeat -c  -e /opt/elk/filebeat-6.2.2-linux-x86_64/filebeat.yml  檢視一下輸入filebeat是否工作正常,會有很多資訊列印到螢幕上;
nohup  /opt/elk/filebeat-6.2.2-linux-x86_64/filebeat -c  /opt/elk/filebeat-6.2.2-linux-x86_64/filebeat.yml >>/dev/null 2>&1& 

logstash 配置詳解:

input {
#資料來源
  kafka {
   #這個對應filebeat的output 的index
    topics_pattern => "elk-.*"
    #kafka 的配置
    bootstrap_servers => "IP1:9092,IP2:9092,IP3:9092"
    kafka 中的group ID
    group_id => "logstash-g1"
   }
}
#過濾器,如果你想要日誌按照你的需求輸出的話,需要在logstash中過濾並且給與相應的key,比如以下的是nginx 日誌,用的是logstash內建好的過濾器,%{IP:client} 這個裡邊包含了兩個資訊:ip地址,client 是在kibana 中顯示的自定義鍵值,最終顯示成這樣,http://grokdebug.herokuapp.com/ 是線上除錯logstash filter的工具,你可以線上除錯你的過濾規則。

filter {
    grok {
        match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"}
       #因為我們取出了欄位,所以不需要原來這個message欄位,這個欄位裡邊包含之前beat 輸入的所有欄位。 
       remove_field => ["message"]
    }
}
output {
  elasticsearch {
    hosts => ["IP:9200"]
    #這個是在kibana上的index Patterns 的索引,建議什麼服務就用幹什麼名字,因為beat 提供了些kibana的模板可以匯入,匯入的模板匹配的時候用的索引是對應服務的名稱開頭 。
    index => "nginx-%{+YYYY.MM.dd}"
    document_type => "nginx"
     #每次20000 傳送一次資料到elasticsearch
    flush_size => 20000
    如果不夠20000,沒10秒會發送一次資料;、
    idle_flush_time =>10
   }
 }

日誌從filebeat 收集到Logstash輸出的流程圖:

ELK 之資料收集傳輸過濾  Filebeat+Logstash 部署

Kibana 上邊的日誌通過logstash 過濾之後取出來在kibana上顯示如下:

ELK 之資料收集傳輸過濾  Filebeat+Logstash 部署