1. 程式人生 > 實用技巧 >ES & Filebeat 使用 Pipeline 處理日誌中的 @timestamp

ES & Filebeat 使用 Pipeline 處理日誌中的 @timestamp

使用 Pipeline 處理日誌中的 @timestamp

Filebeat 收集的日誌傳送到 ElasticSearch 後,會預設新增一個 @timestamp 欄位作為時間戳用於檢索,而日誌中的資訊會全部新增到 message 欄位中,但是這個時間是 Filebeat 採集日誌的時間,不是日誌生成的實際時間,所以為了便於檢索日誌,需要將 @timestamp 替換為 message 欄位中的時間。

這裡使用的是 elasticseatch 提供的 pipeline 來進行替換。首先日誌格式如下:

20-09-22 07:01:25.109 INFO - {"traceId":"65e97e88a61d7cd4558b8f3a203458fd"}
20-09-22 06:51:12.117 INFO - {"traceId":"4e0542c994919065f71536872ccb9677"}

在 Kibana 中的 Devtools 介面中編寫如下 pipeline 並執行:

PUT _ingest/pipeline/test-news-server-online      # test-news-server-online 為流水線的名稱
{
  "description": "test-news-server-online",            # 對 pipeline 進行描述
  "processors": [
    {
      "grok": {                                                              # 使用 grok 對日誌內容進行提取
        "field": "message",					       # 選擇要提取資訊的欄位
        "patterns": [
          "%{TIMESTAMP_ISO8601:logatime}"          # 使用 TIMESTAMP_ISO8601 的標準匹配時間,將匹配的值賦值給新增的欄位 logatime
        ],
        "ignore_failure": true		                       # 如果日誌中有不存在時間戳的行,可以新增這個配置來忽略匹配錯誤產生的 error 資訊
      },
      "date": {					                       # 使用 data 時間戳外掛來格式化時間輸出,替代預設的 @timestamp
        "field": "logatime",				               # 指定使用新增的 logatime 欄位
        "timezone": "Asia/Shanghai", 	                       # 指定輸出時間的時區,不指定的話可能會比正確的時間晚 8 個小時
        "formats": [
          "yy-MM-dd HH:mm:ss.SSS"	              # 指定時間輸出的格式
        ],
        "ignore_failure": true		                      # 如果遇到錯誤則忽略
      }
    }
  ]
}

pipeline 編寫完成後,在 Devtools 中可以使用如下命令進行查詢:

GET _ingest/pipeline/test-news-server-online

在 filebeat 中引用這個 pipeline:

filebeat.idle_timeout: 2s
filebeat.inputs:
- backoff: 1s
  backoff_factor: 2
  close_inactive: 1h
  enabled: true
  encoding: plain
  harvester_buffer_size: 262144
  max_backoff: 10s
  max_bytes: 10485760
  paths:
  - /opt/trace.log
  scan_frequency: 10s
  tail_lines: true
  type: log
  fields:
    type: test-news-server
filebeat.name: filebeat-shiper
filebeat.spool_zie: 50000
output.elasticsearch:
  bulk_max_size: 8192
  hosts:
  - 10.11.16.211:30187
  - 10.11.16.212:30187
  - 10.11.16.213:30187
  - 10.11.16.214:30187
  - 10.11.16.215:30187
  index: test-news-timestamp
  workers: 4
  pipeline: "test-news-server-online"				# 在此處指定 pipeline 的名稱
processors:
- drop_fields:
    fields:
    - agent.ephemeral_id
    - agent.hostname
    - agent.id
    - agent.type
    - agent.version
    - ecs.version
    - input.type
    - log.offset
    - version
- decode_json_fields:
    fields:
    - message
    max_depth: 1
    overwrite_keys: true
setup.ilm.enabled: false
setup.template.name: test-news-timestamp-reverse
setup.template.pattern: test-news-timestamp-reverse-*

執行 filebeat,在 kibana 中檢視日誌資訊,可以看到收集的日誌資訊中新增了 logatime 欄位,@timestamp 欄位的時間也與 logatime 欄位保持了一致。

如果在 filebeat 執行的日誌中發現瞭如下報錯資訊,有可能是日誌中存在不含有時間戳的行(一般是由於日誌被截斷導致的,可以參考處理多行日誌的文件):

ERROR   pipeline/output.go:121  Failed to publish events: temporary bulk send failure

如果不希望將 logatime 欄位在日誌中展示的話,可以將 pipeline 修改為如下內容:

PUT _ingest/pipeline/test-news-server-online
{
  "description": "test-news-server-online",
  "processors": [
    {
      "grok": {
        "field": "message",
        "patterns": [
          "%{TIMESTAMP_ISO8601:logatime}"
        ],
        "ignore_failure": true
      },
      "date": {
        "field": "logatime",
        "timezone": "Asia/Shanghai", 
        "formats": [
          "yy-MM-dd HH:mm:ss"
        ],
        "ignore_failure": true
      },
      "remove": {
        "field": "logatime"
      }
    }
  ]
}

如果希望將 logatime 的值同時賦值給其他的新增欄位,例如 realtime ,pipeline 修改如下:

PUT _ingest/pipeline/test-news-server-online
{
  "description": "test-news-server-online",
  "processors": [
    {
      "grok": {
        "field": "message",
        "patterns": [
          "%{TIMESTAMP_ISO8601:logatime}"
        ],
        "ignore_failure": true
      },
      "date": {
        "field": "logatime",
        "timezone": "Asia/Shanghai", 
        "target_field": "realtime"
        "formats": [
          "yy-MM-dd HH:mm:ss"
        ],
        "ignore_failure": true
      },
      "remove": {
        "field": "logatime"
      }
    }
  ]
}

target_field 欄位用於將一個值賦值給指定的欄位,預設是給 @timestamp ,如果未提供該選項,則會預設更新 @timestamp 欄位