1. 程式人生 > 其它 >ELK—使用filebeat收集日誌寫入kafka

ELK—使用filebeat收集日誌寫入kafka

filebeat作為輕量級日誌收集軟體,不依賴java環境,不消耗記憶體,可以使用者無法安裝java環境的伺服器或容器使用。

一、使用filebeat收集日誌寫入kafka

[root@linux-host2 src]# grep -v "#" /etc/filebeat/filebeat.yml | grep -v "^$"
filebeat.prospectors:
- input_type: log
paths:
  - /var/log/*.log   # 對哪些日誌進行收集
  - /var/log/messages
exclude_lines: ["^DBG"]
exclude_files: [".gz$"]
document_type: "system-log-1512-filebeat"
output.file: #測試寫入本地檔案 path: "/tmp" filename: "filebeat.txt"
output.kafka: #寫入 kafka hosts: ["192.168.15.11:9092","192.168.15.12:9092","192.168.15.13:9092"] # kafka叢集 topic: "systemlog-1512-filebeat" # 主題名稱 partition.round_robin: reachable_only: true required_acks: 1 #本地寫入完成 compression: gzip #開啟壓縮 max_message_bytes: 1000000 #訊息最大值

啟動 filebeat 並驗證本地檔案 是否有資料

驗證是否寫入kafka

/usr/local/kafka/bin/kafka-topics.sh  --list --zookeeper   192.168.15.11:2181,192.168.15.12:2181,192.168.15.13:2181

配置 logstash從kafka讀取日誌到elasticsearch

input {
  kafka {
    bootstrap_servers => "192.168.15.11:9092"
    topics => "systemlog-1512-filebeat"
consumer_threads => 1 decorate_events => true codec => "json" auto_offset_reset => "latest" } } output { if [type] == "system-log-1512-filebeat" { elasticsearch { hosts => ["192.168.15.11:9200"] index => "system-log-1512-filebeat-%{+YYYY.MM.dd}" } } }

使用 filebeat  收集多個日誌檔案

grep -v "#" /etc/filebeat/filebeat.yml | grep -v "^$"
filebeat.prospectors:
- input_type: log
  paths:
    - /var/log/ syslog.log
    - /var/log/messages
  exclude_lines: ["^DBG"]
  exclude_files: [".gz$"]
#document_type: "system-log-1512-filebeat"
  fields:
    type: "system-log-7.106"

- input_type: log
  paths:
    - /var/log/nginx/access.log
  exclude_lines: ["^DBG"]
  exclude_files: [".gz$"]
fields: type:
"nginx-accesslog-1512-filebeat" output.file: path: "/tmp" filename: "filebeat.txt" output.kafka: hosts: ["192.168.15.11:9092","192.168.15.12:9092","192.168.15.13:9092"] topic: "systemlog-1512-filebeat" partition.round_robin: reachable_only: true #如果 reachable_only 設定為 true,則事件將僅釋出到可用分割槽,false 將傳送到所有分割槽 required_acks: 1 compression: gzip max_message_bytes: 1000000 

配置 logstash從 kafka讀取nginx日誌,寫入 elasticsearch

input {
  kafka {
    bootstrap_servers => "192.168.15.11:9092"
    topics => "nginx-accesslog-1512"
    codec => "json"
    consumer_threads => 1
    decorate_events => true
  }

  kafka {
    bootstrap_servers => "192.168.15.11:9092"
    topics => "system-log-1512"
    consumer_threads => 1
    decorate_events => true
    codec => "json"
  }

  kafka {
    bootstrap_servers => "192.168.15.11:9092"
    topics => "systemlog-1512-filebeat"
    consumer_threads => 1
    decorate_events => true
    codec => "json"
    auto_offset_reset => "latest"
  }
}

output {
  if [type] == "nginx-accesslog-1512" {
    elasticsearch {
      hosts => ["192.168.15.11:9200"]
      index => "nginx-accesslog-1512-%{+YYYY.MM.dd}"
    }
  }

  if [type] == "system-log-1512" {
    elasticsearch {
      hosts => ["192.168.15.12:9200"]
      index => "system-log-1512-%{+YYYY.MM.dd}"
    }
  }

  if [type] == "system-log-1512-filebeat" {
    elasticsearch {
      hosts => ["192.168.15.11:9200"]
      index => "system-log-1512-filebeat-%{+YYYY.MM.dd}"
    }
  }

  if [type] == "nginx-accesslog-1512-filebeat" {
    elasticsearch {
      hosts => ["192.168.15.11:9200"]
      index => "nginx-accesslog-1512-filebeat-%{+YYYY.MM.dd}"
    }
  }
}