1. 程式人生 > >Fluentd-ElasticSearch配置兩三(糗)事

Fluentd-ElasticSearch配置兩三(糗)事

上週末在家閒來無事,於是乎動手幫專案組搭建日誌收集的EFK環境,最終目標的部署是這個樣子的:

這裡寫圖片描述

在每個應用機器上部一個Fluentd做為代理端,以tail方式讀取指定的應用日誌檔案,然後Forward到做為匯聚端的Fluentd,匯聚端對日誌內容加工、分解成結構化內容,再儲存到ElasticSearch。日誌內容的展現由Kinana實現。

Fluentd是什麼? Fluentd是一個完全免費的、完全開源的日誌收集器,可以與125多種系統相對接,實現“日誌一切”架構。

圖片來自於Fluentd官方網站

由於前面已經在Kubernetes容器平臺上部署好了ElasticSearch與Kibana,週末只是忙乎Fluentd的安裝配置,以前沒有使用過,現學再賣,結果折騰過程中出現了幾個問題,在這裡唸叨記錄一下。

Output外掛的請求超時時間

在我們的部署中,只使用了Fluentd兩種Output外掛,一個是代理端用於向後傳遞日誌內容的Ouput-Forward,一個是用於向ES中儲存資料的Output-ElasticSearch。

  • 代理端Output-Forward外掛配置(最初)
  <match **>
    @type             forward
    require_ack_response ture
    <server>
      host             10.xxx.xx.xx
      port             24224
</server> <buffer tag> @type file path /xxx/xxx/buffer flush_interval 10s total_limit_size 1G </buffer> </match>

  • 匯聚端Output-ElasticSearch外掛配置(最初)
  <match xxxxx.xxxxx.**>
    @type              elasticsearch
host 10.xxx.xx.xxx port 30172 logstash_format true logstash_prefix log.xxxxx.xxxxx.xxxxx- <buffer tag> @type file path "/xxx/xxx/fluentd/aggregator/xxxx/buffer" total_limit_size 20G flush_interval 10s </buffer> </match>

配置完以後,簡單測試跑通後,滿心歡喜地拿應用某臺機器上1-3月產生的日誌灌數。從Kibana介面上代表日誌數量的綠柱向上增長,幾千…幾萬..百萬…兩百萬,當時還發圖片給專案組的顯擺了一番。後面想著不可能有這麼多日誌記錄吧!? 登入到應用伺服器上去日誌目錄裡wc了一把,所有檔案行數總和才150萬! 暈死!

跑進Kibana介面細觀瞧,才發現入庫的日誌記錄有重複內容,比如:某條記錄就重複了233次。 當時也不知道具體原因,猜測著可能是Output-ElasticSearch外掛因接收響應超時,叕重發了內容到ElasticSearch,所以試著查到相應引數:request_timeout加入到匯聚端配置中,引數預設值是:5s

<match xxxxx.xxxxx.**>
  @type              elasticsearch
  ...
  request_timeout    30s
  <buffer tag>
    ...
  </buffer>
</match>

發訊號HUP給匯聚端Fluentd程序,讓它重新載入修改後的配置,再試,沒再出現重複日誌記錄。但,超時時間只是表象,真正的原因是buffer配置操成,請見下一條內容

Request Entity Too Large

在說明這個問題前,先看看Fluentd的buffer機制,buffer做為Output外掛的緩衝區,當輸出端(比如:ElasticSearch)服務不可用時,Fluentd暫時快取需要輸出的內容到檔案或者記憶體,然後再重試向輸出端傳送。

圖片來自於Fluentd官方網站

What? 那輸出端服務總是不恢復,光進不出,企不是要將Fluend緩衝區撐爆後丟失日誌資料?

當緩衝區Full後,預設產生BufferOverflowError異常,輸入外掛自行如何處理此異常。我們使用的in_tail外掛會停止讀取日誌檔案內容,in_forward外掛會向上一級ouput_forward外掛返回錯誤。除產生異常外,還可通過Output外掛的配置引數overflow_action(>=1.0)或者buffer_queue_full_action(<1.0)控制緩衝區滿後的行為:

  • throw_exception
  • block
  • drop_oldest

為什麼扯到buffer?因為遇到的這個問題與它有關。在修改完request_timeout引數後,又經常看到錯誤資訊:

2018-03-04 15:10:12 +0800 [warn]: #0 fluent/log.rb:336:warn: Could not push logs to Elasticsearch, resetting connection and trying again. Connection reset by peer (Errno::ECONNRESET)

顯示被ElasticSearch端強制斷掉連線,什麼情況? 在命令列啟動fluentd時強制trace日誌級別,滿屏的輸出中找不見錯誤原因,除了ECCONNRESET。 ElasticSearch端日誌也未見異常資訊。 只好求助於另一殺器:tcpdump,抓取與ES之間的通訊包:

sudo tcpdump -A -nn -s 0 'tcp  port 9200 ' -i eth1  #9200是ES接受請求的埠

得到,請求包頭:

User-Agent: Faraday v0.13.1
Content-Type: application/json
Host: 10.210.39.136:30172
Content-Length: 129371575

響應包頭:

HTTP/1.1 413 Request Entity Too Large
content-length: 0

Request Entity Too Large! 查了下ElasticSearch文件,ES預設能接受的資料包大小是100MB,由`http.max_content_length引數控制。而我們上送的資料包長度明顯大於這一限制Content-Length: 129371575

是什麼造成了這種結果? 從上面關於Fluentd Buffer的示意圖中,可以看出Output外掛是以Chunk為單位向輸出端傳送資料包,Chunk的大小由引數chunk_limit_size設定(說明文件here),預設值:記憶體緩衝8MB/檔案緩衝256MB。 在我們初始配置中並沒有設定,採用的是預設值,所以出現Request Entity Too Large! 錯誤也就不稀奇。 分別在代理端與匯聚端的buffer相關配置裡增加chunk_limit_size引數設定:

<buffer tag>
  ...
  chunk_limit_size        10M
  ...
</buffer>
<buffer tag>
  ...
  chunk_limit_size        15M
  ...
</buffer>

想想前面一節說明到與ES間請求超時,與採用預設chunk大小設定是有關的,檔案型緩衝區預設256M的chunk大小,送給ES端就是大小不超過其限制的100MB,也會讓ES處理較長的時間,所以會有請求超時出現。

在配置過程中,看錯了老版本的文件,使用引數buffer_chunk_limit設定,沒起作用,迷惑一小會兒!

另外,是否可以將匯聚端的chunk大小設定得比代理端小? 是可以的,但匯聚端會有警告日誌產生。大致看了下程式碼,說說我自己的理解(不見得對喲):Flunetd後一級收到前一級傳送來的chunk資料時,先不做大小上的處理,在向再後一級傳送時,根據後一級型別進行chunk的編碼,完成後再與設定的chunk大小做比對,如果大於設定再重新分拆成多個後再進行編碼傳送,期間會產生警告日誌 。

增加ElasticSearch日誌資料防重配置

為防止再出現日誌記錄重複的現象,除上述配置上的處理外,最好在發給ElasticSearch前,給每條日誌生成一個主鍵值,這樣ES在收到重複記錄後,如果發現主鍵對應的記錄已存在則Update,否則才Insert。

<filter aplus.batch.**>
  @type            elasticsearch_genid
  hash_id_key      _hash    # storing generated hash id key (default is _hash)
</filter>

<match aplus.batch.**>
  @type              elasticsearch
  ...
  id_key             _hash # specify same key name which is specified in hash_id_key
  remove_keys        _hash # Elasticsearch doesn't like keys that start with _
  ...
</match>

Timestamp是Unix Time格式!

因為上送ElasticSearc的日誌記錄中,標明日誌時間的域值是拼接出來的再轉換成日期型別的:

 ${ DateTime.parse(record["systemDate"] + " " + record["systemTime"] + " +0800").to_time.to_i }

注意,一定要在最後使用to_i方法,將其轉換成Unix Time格式。否則,會在編碼送往ES資料時報錯,大到錯誤資訊是“未發現Time型別上的msgPack方法”。

另:Output-ElasticSearch外掛的說明文件,官網內容不全面,完全說明在這裡

附上全部配置檔案內容:

<system>
  worker            2
  root_dir          /xxxx/xxxxx/fluentd/agent
  log_level         info
</system>

<source>
  @type             tail
  tag               xxxxx.xxxxx.agent.log
  path_key          path
  path              /xxxx/xxxxx/agent/logs/xxxxx/*.batch*.log
  pos_file          /xxxx/xxxxx/fluentd/agent/pos/batch_agent.db
  read_from_head    true

  <parse>
    @type             multiline
    format_firstline  /^(\d+-\d+-\d+\s+)?\d+:\d+:\d+\|/
    format1           /^(?<log>(\d+-\d+-\d+\s+)?\d+:\d+:\d+\|.*)$/
  </parse>
  #multiline_flush_interval 5s
</source>

<filter foo.bar>
  @type             record_transformer
  <record>
    hostname        "#{Socket.gethostname}"
  </record>
</filter>

<match **>
  @type             forward
  require_ack_response ture
  <server>
    host             xx.xxx.xx.xx
    port             24224
  </server>
  <buffer tag>
    @type            file
    path             /xxxx/xxxxx/fluentd/agent/buffer
    flush_interval   10s
    total_limit_size 1G
    chunk_limit_size 10M
  </buffer>
</match>
<source>
  @type               forward
  bind                0.0.0.0
  source_address_key  hostIP
</source>

<filter  xxxxx.xxxxx.agent.log>
   @type           parser
   reserve_data    true
   key_name        log
   <parse>
      @type        regexp
      expression   /(?mx)^(?<systemDate>\d+-\d+-\d+)?\s?(?<systemTime>\d+:\d+:\d+)\|(?<bizDate>.*)\|(?<artiPersonCode>.*)\|(?<invokeSeqNo>.*)\|(?<orgCode>.*)\|(?<tranCode>.*)\|(?<bizNo>.*)\|(?<dueNum>.*)\|(?<jobId>.*)\|(?<jobRunId>.*)\|(?<message>.*)\|(?<file>.*)\|(?<line>.*)\|(?<thread>.*)\|(?<logLevel>.*)\|(?<misc>.*)$/
   </parse>
</filter>

<filter xxxxx.xxxxx.agent.log>
   @type           record_transformer
   enable_ruby     true
   <record>
     systemDate    ${ if !(o=record["systemDate"]).nil? and o.length!=0 then o else if (d=record["path"].scan(/\d{4}-\d{2}-\d{2}/).last).nil? then Time.new.strftime("%Y-%m-%d") else d end  end }
   </record>
</filter>

<filter xxxxx.xxxxx.agent.log>
   @type          record_transformer
   enable_ruby    true
   <record>
     logtime      ${ DateTime.parse(record["systemDate"] + " " + record["systemTime"] + " +0800").to_time.to_i }
   </record>
   renew_time_key logtime
   remove_keys    logtime,systemDate,systemTime
</filter>

<filter xxxxx.xxxxx.**>
  @type            elasticsearch_genid
  hash_id_key      _hash    # storing generated hash id key (default is _hash)
</filter>

<match aplus.batch.**>
  @type              elasticsearch
  host               xx.xxx.xx.xxxx
  port               9200
  id_key             _hash # specify same key name which is specified in hash_id_key
  remove_keys        _hash # Elasticsearch doesn't like keys that start with _
  logstash_format    true
  logstash_prefix    log.xxxx.xxxxx.agent-
  request_timeout    30s
# reload_connections false
  slow_flush_log_threshold 30s
  <buffer tag>
    @type            file
    path             "/xxxx/xxxxx/fluentd/aggregator/#{ENV['HOSTNAME']}/buffer"
    total_limit_size 20G
    chunk_limit_size 15M
    flush_interval   10s
    retry_wait       10.0
  </buffer>
</match>