ELK6.0日誌從收集到處理完整版教程(三)配置檔案講解
阿新 • • 發佈:2018-11-25
FIlebeat 配置檔案內容詳解:
vi filebeat.yml
filebeat.prospectors: #日誌型別 - type: log enabled: True # 日誌路徑可以寫多個,支援萬用字元 paths: - /tmp/test.log #設定字符集編碼 encoding: utf-8 #文件型別 document_type: my-nginx-log #每十秒掃描一次 scan_frequency: 10s # 實際讀取檔案時,每次讀取 16384 位元組 harverster_buffer_size: 16384 #The maximum number of bytes that a single log message can have. All bytes after max_bytes are discarded and not sent. This setting is especially useful for multiline log messages, which can get large. The default is 10MB (10485760). 一次發生的log大小值; max_bytes: 10485760 # 是否從檔案末尾開始讀取 tail_files: true #給日誌加上tags方便在logstash過濾日誌的時候做判斷 tags: ["nginx-access"] #剔除以.gz 結尾的檔案 exclude_files: [".gz$"] #output 部分的配置文件 #配置輸出為kafka,無論你是tar包還是rpm安裝,目錄裡邊會看到官方提供的filebeat.full.yml OR filebeat.reference.yml 這裡邊有filebeat 所有的input 方法和output 方法供你參考; output.kafka: enabled: true #kafka 的server,可以配置叢集,例子如下: hosts:["ip:9092","ip2:9092","ip3:9092"] #這個非常重要,filebeat作為provider,把資料輸入到kafka裡邊,logstash 作為消費者去消費這些資訊,logstash的input 中需要這個topic,不然logstash沒有辦法取到資料。 topic: elk-%{[type]} # The number of concurrent load-balanced Kafka output workers. kafka 的併發執行程序 worker: 2 #當傳輸給kafka 有問題的時候,重試的次數; max_retries: 3 #單個kafka請求裡面的最大事件數,預設2048 bulk_max_size: 2048 #等待kafka broker響應的時間,預設30s timeout: 30s #kafka broker等待請求的最大時長,預設10s broker_timeout: 10s #每個kafka broker在輸出管道中的訊息快取數,預設256 channel_buffer_size: 256 #網路連線的保活時間,預設為0,不開啟保活機制 keep_alive: 60 #輸出壓縮碼,可選項有none, snappy, lz4 and gzip,預設為gzip (kafka支援的壓縮,資料會先被壓縮,然後被生產者傳送,並且在服務端也是保持壓縮狀態,只有在最終的消費者端才會被解壓縮) compression: gzip #允許的最大json訊息大小,預設為1000000,超出的會被丟棄,應該小於broker的 message.max.bytes(broker能接收訊息的最大位元組數) max_message_bytes: 1000000 #kafka的響應返回值,0位無等待響應返回,繼續傳送下一條訊息;1表示等待本地提交(leader broker已經成功寫入,但follower未寫入),-1表示等待所有副本的提交,預設為1 required_acks: 0 #The configurable ClientID used for logging, debugging, and auditing purposes. The default is "beats"。客戶端ID 用於日誌怕錯,審計等,預設是beats。 client_id: beats #測試配置檔案:/opt/elk/filebeat/filebeat -c /opt/elk/filebeat/filebeat.yml test config #如果配置檔案沒有問題的話,會出現config ok ,如果有問題會提示具體問題在哪裡。 #啟動filebeat 可以先通過 /opt/elk/filebeat-6.2.2-linux-x86_64/filebeat -c -e /opt/elk/filebeat-6.2.2-linux-x86_64/filebeat.yml 檢視一下輸入filebeat是否工作正常,會有很多資訊列印到螢幕上; nohup /opt/elk/filebeat-6.2.2-linux-x86_64/filebeat -c /opt/elk/filebeat-6.2.2-linux-x86_64/filebeat.yml >>/dev/null 2>&1&
logstash 配置詳解:
input { #資料來源 kafka { #這個對應filebeat的output 的index topics_pattern => "elk-.*" #kafka 的配置 bootstrap_servers => "IP1:9092,IP2:9092,IP3:9092" kafka 中的group ID group_id => "logstash-g1" } } #過濾器,如果你想要日誌按照你的需求輸出的話,需要在logstash中過濾並且給與相應的key,比如以下的是nginx 日誌,用的是logstash內建好的過濾器,%{IP:client} 這個裡邊包含了兩個資訊:ip地址,client 是在kibana 中顯示的自定義鍵值,最終顯示成這樣,http://grokdebug.herokuapp.com/ 是線上除錯logstash filter的工具,你可以線上除錯你的過濾規則。 filter { grok { match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"} #因為我們取出了欄位,所以不需要原來這個message欄位,這個欄位裡邊包含之前beat 輸入的所有欄位。 remove_field => ["message"] } } output { elasticsearch { hosts => ["IP:9200"] #這個是在kibana上的index Patterns 的索引,建議什麼服務就用幹什麼名字,因為beat 提供了些kibana的模板可以匯入,匯入的模板匹配的時候用的索引是對應服務的名稱開頭 。 index => "nginx-%{+YYYY.MM.dd}" document_type => "nginx" #每次20000 傳送一次資料到elasticsearch flush_size => 20000 如果不夠20000,沒10秒會發送一次資料;、 idle_flush_time =>10 } }