1. 程式人生 > >在filebeat 6.0.0中如何將log輸出到不同到kafka topics

在filebeat 6.0.0中如何將log輸出到不同到kafka topics

2017年11月,elastic釋出了最新的elastic stack 6.0.0版本,整個版本做了不少的改動,大家可以查閱官方的release note和break change。我們第一時間嚐鮮,將日誌分析系統升級到6.0.0。坑肯定是少不了了,這裡,說一下filebeat升級後但影響。

在filebeat 5.x的版本中,如果你想在一個filebeat agent上收集不同的log,然後publish到不同的kakfa topics,可以這樣做:

  • 定義多個 prospector,為每個prospector設定不同的document_type
  • 在kafka output中,使用%{[type]},來獲取不同的document_type值,然後設定到topic中。(注意,請不要使用網上有些人說的topics,那是畫蛇添足)

如下例:

filebeat.prospectors:
    # App logs - prospector
    - input_type: log
      paths:
        - /myapp/logs/myapp.log
      exclude_lines: [".+? INFO[^*].+", ".+? DEBUG[^*].+"]
      exclude_files: [".gz$", ".tmp"]
      fields:
        api: myappapi
        environment: STG
      ignore_older: 24
h document_type: applog_myappapi scan_frequency: 1s # Multine on Timestamp, YYYY-MM-DD # https://www.elastic.co/guide/en/beats/filebeat/master/multiline-examples.html multiline: pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}' negate: true match: after max_lines: 500
timeout: 5s # Server Stats - prospector - input_type: log paths: - /myapp/logs/serverstats.log # Exclude messages with log level exclude_lines: [".+? ERROR[^*].+", ".+? DEBUG[^*].+"] exclude_files: [".gz$", ".tmp"] fields: api: myappapi environment: STG ignore_older: 24h document_type: applog_myappapi_stats scan_frequency: 1s # ELB prospector - input_type: log paths: - /var/log/httpd/elasticbeanstalk-access_log document_type: elblog_myappapi fields: api: myappapi environment: STG exclude_lines: [".+? INFO[^*].+", ".+? DEBUG[^*].+"] exclude_files: [".gz$", ".tmp"] ignore_older: 24h # 0s, it is done as often as possible. Default: 10s scan_frequency: 1s registry_file: /var/lib/filebeat/registry ############################# Output ########################################## # Configure what outputs to use when sending the data collected by the beat. # Multiple outputs may be used. #----------------------------- Kafka output -------------------------------- output.kafka: # initial brokers for reading cluster metadata hosts: ["broker.1.ip.address:9092", "broker.2.ip.address:9092", "broker.3.ip.address:9092"] # message topic selection + partitioning topic: '%{[type]}' partition.round_robin: reachable_only: false required_acks: 1 compression: gzip max_message_bytes: 1000000

但由於升級到6.0.0之後,document_type這個選項被deprecated了,也就沒法通過%{[type]}來訪問。因此,同樣的配置會造成filebeat無法獲得topic的配置,進而不能給kafka傳送訊息。而且還沒有error log,cpu使用率一直是100%…
更坑爹的是,6.0.0的文件沒有更新這一塊,仍然給出了下面的例子:

output.kafka:
  # initial brokers for reading cluster metadata
  hosts: ["kafka1:9092", "kafka2:9092", "kafka3:9092"]

  # message topic selection + partitioning
  topic: '%{[type]}'
  partition.round_robin:
    reachable_only: false

  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

正確的做法是使用fields。然後通過%{[]}獲取對應的值。例子:

filebeat.prospectors:

# Each - is a prospector. Most options can be set at the prospector level, so
# you can use different prospectors for various configurations.
# Below are the prospector specific configurations.

- type: log

  # Change to true to enable this prospector configuration.
  enabled: true

  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - /var/log/test1.log
    #- c:\programdata\elasticsearch\logs\*
  fields:
    log_topics: test1
- type: log
  enabled: true
  paths:
    - /var/log/test2.log
  fields:
    log_topics: test2
#----------------------------- kafka output --------------------------------
output.kafka:
# Boolean flag to enable or disable the output module.
  enabled: true

# The list of Kafka broker addresses from where to fetch the cluster metadata.
# The cluster metadata contain the actual Kafka brokers events are published
# to.
  hosts: [{{kafka_url}}]

# The Kafka topic used for produced events. The setting can be a format string
# using any event field. To set the topic from document type use `%{[type]}`.
  topic: '%{[fields][log_topics]}'

對應的,在logstash上,如果要分別解釋對應的topic:

input {
  kafka{
        bootstrap_servers => "{{kafka_url}}"
        topics => ["test1","test2"]
        codec => "json"
        consumer_threads => 2
        enable_auto_commit => true
        auto_commit_interval_ms => "1000"
        group_id=> test
  }
}
filter {
  if[fields][log_topics] == "test1" {
    grok {
      patterns_dir => ["./patterns"]
      match => {
        "message" => "%{PLATFORM_SYSLOG}"
      }
    }
  }
  if[fields][log_topics] == "test2" {
    grok {
      patterns_dir => ["./patterns"]
      match => {
        "message" => "%{IAM_SYSLOG}"
      }
    }
  }