Fluentd-ElasticSearch配置兩三(糗)事
上週末在家閒來無事,於是乎動手幫專案組搭建日誌收集的EFK環境,最終目標的部署是這個樣子的:
在每個應用機器上部一個Fluentd做為代理端,以tail方式讀取指定的應用日誌檔案,然後Forward到做為匯聚端的Fluentd,匯聚端對日誌內容加工、分解成結構化內容,再儲存到ElasticSearch。日誌內容的展現由Kinana實現。
Fluentd是什麼? Fluentd是一個完全免費的、完全開源的日誌收集器,可以與125多種系統相對接,實現“日誌一切”架構。
由於前面已經在Kubernetes容器平臺上部署好了ElasticSearch與Kibana,週末只是忙乎Fluentd的安裝配置,以前沒有使用過,現學再賣,結果折騰過程中出現了幾個問題,在這裡唸叨記錄一下。
Output外掛的請求超時時間
在我們的部署中,只使用了Fluentd兩種Output外掛,一個是代理端用於向後傳遞日誌內容的Ouput-Forward,一個是用於向ES中儲存資料的Output-ElasticSearch。
- 代理端Output-Forward外掛配置(最初)
<match **>
@type forward
require_ack_response ture
<server>
host 10.xxx.xx.xx
port 24224
</server>
<buffer tag>
@type file
path /xxx/xxx/buffer
flush_interval 10s
total_limit_size 1G
</buffer>
</match>
- 匯聚端Output-ElasticSearch外掛配置(最初)
<match xxxxx.xxxxx.**>
@type elasticsearch
host 10.xxx.xx.xxx
port 30172
logstash_format true
logstash_prefix log.xxxxx.xxxxx.xxxxx-
<buffer tag>
@type file
path "/xxx/xxx/fluentd/aggregator/xxxx/buffer"
total_limit_size 20G
flush_interval 10s
</buffer>
</match>
配置完以後,簡單測試跑通後,滿心歡喜地拿應用某臺機器上1-3月產生的日誌灌數。從Kibana介面上代表日誌數量的綠柱向上增長,幾千…幾萬..百萬…兩百萬,當時還發圖片給專案組的顯擺了一番。後面想著不可能有這麼多日誌記錄吧!? 登入到應用伺服器上去日誌目錄裡wc了一把,所有檔案行數總和才150萬! 暈死!
跑進Kibana介面細觀瞧,才發現入庫的日誌記錄有重複內容,比如:某條記錄就重複了233次。 當時也不知道具體原因,猜測著可能是Output-ElasticSearch外掛因接收響應超時,叕重發了內容到ElasticSearch,所以試著查到相應引數:request_timeout
加入到匯聚端配置中,引數預設值是:5s
<match xxxxx.xxxxx.**>
@type elasticsearch
...
request_timeout 30s
<buffer tag>
...
</buffer>
</match>
發訊號HUP
給匯聚端Fluentd程序,讓它重新載入修改後的配置,再試,沒再出現重複日誌記錄。但,超時時間只是表象,真正的原因是buffer配置操成,請見下一條內容。
Request Entity Too Large
在說明這個問題前,先看看Fluentd的buffer機制,buffer做為Output外掛的緩衝區,當輸出端(比如:ElasticSearch)服務不可用時,Fluentd暫時快取需要輸出的內容到檔案或者記憶體,然後再重試向輸出端傳送。
What? 那輸出端服務總是不恢復,光進不出,企不是要將Fluend緩衝區撐爆後丟失日誌資料?
當緩衝區Full後,預設產生BufferOverflowError
異常,輸入外掛自行如何處理此異常。我們使用的in_tail外掛會停止讀取日誌檔案內容,in_forward外掛會向上一級ouput_forward外掛返回錯誤。除產生異常外,還可通過Output外掛的配置引數overflow_action
(>=1.0)或者buffer_queue_full_action
(<1.0)控制緩衝區滿後的行為:
- throw_exception
- block
- drop_oldest
為什麼扯到buffer?因為遇到的這個問題與它有關。在修改完request_timeout
引數後,又經常看到錯誤資訊:
2018-03-04 15:10:12 +0800 [warn]: #0 fluent/log.rb:336:warn: Could not push logs to Elasticsearch, resetting connection and trying again. Connection reset by peer (Errno::ECONNRESET)
顯示被ElasticSearch端強制斷掉連線,什麼情況? 在命令列啟動fluentd時強制trace日誌級別,滿屏的輸出中找不見錯誤原因,除了ECCONNRESET。 ElasticSearch端日誌也未見異常資訊。 只好求助於另一殺器:tcpdump,抓取與ES之間的通訊包:
sudo tcpdump -A -nn -s 0 'tcp port 9200 ' -i eth1 #9200是ES接受請求的埠
得到,請求包頭:
User-Agent: Faraday v0.13.1
Content-Type: application/json
Host: 10.210.39.136:30172
Content-Length: 129371575
響應包頭:
HTTP/1.1 413 Request Entity Too Large
content-length: 0
Request Entity Too Large! 查了下ElasticSearch文件,ES預設能接受的資料包大小是100MB,由`http.max_content_length
引數控制。而我們上送的資料包長度明顯大於這一限制Content-Length: 129371575
。
是什麼造成了這種結果? 從上面關於Fluentd Buffer的示意圖中,可以看出Output外掛是以Chunk為單位向輸出端傳送資料包,Chunk的大小由引數chunk_limit_size
設定(說明文件here),預設值:記憶體緩衝8MB/檔案緩衝256MB。 在我們初始配置中並沒有設定,採用的是預設值,所以出現Request Entity Too Large! 錯誤也就不稀奇。 分別在代理端與匯聚端的buffer相關配置裡增加chunk_limit_size引數設定:
<buffer tag>
...
chunk_limit_size 10M
...
</buffer>
<buffer tag>
...
chunk_limit_size 15M
...
</buffer>
想想前面一節說明到與ES間請求超時,與採用預設chunk大小設定是有關的,檔案型緩衝區預設256M的chunk大小,送給ES端就是大小不超過其限制的100MB,也會讓ES處理較長的時間,所以會有請求超時出現。
在配置過程中,看錯了老版本的文件,使用引數buffer_chunk_limit
設定,沒起作用,迷惑一小會兒!
另外,是否可以將匯聚端的chunk大小設定得比代理端小? 是可以的,但匯聚端會有警告日誌產生。大致看了下程式碼,說說我自己的理解(不見得對喲):Flunetd後一級收到前一級傳送來的chunk資料時,先不做大小上的處理,在向再後一級傳送時,根據後一級型別進行chunk的編碼,完成後再與設定的chunk大小做比對,如果大於設定再重新分拆成多個後再進行編碼傳送,期間會產生警告日誌 。
增加ElasticSearch日誌資料防重配置
為防止再出現日誌記錄重複的現象,除上述配置上的處理外,最好在發給ElasticSearch前,給每條日誌生成一個主鍵值,這樣ES在收到重複記錄後,如果發現主鍵對應的記錄已存在則Update,否則才Insert。
<filter aplus.batch.**>
@type elasticsearch_genid
hash_id_key _hash # storing generated hash id key (default is _hash)
</filter>
<match aplus.batch.**>
@type elasticsearch
...
id_key _hash # specify same key name which is specified in hash_id_key
remove_keys _hash # Elasticsearch doesn't like keys that start with _
...
</match>
Timestamp是Unix Time格式!
因為上送ElasticSearc的日誌記錄中,標明日誌時間的域值是拼接出來的再轉換成日期型別的:
${ DateTime.parse(record["systemDate"] + " " + record["systemTime"] + " +0800").to_time.to_i }
注意,一定要在最後使用to_i
方法,將其轉換成Unix Time格式。否則,會在編碼送往ES資料時報錯,大到錯誤資訊是“未發現Time型別上的msgPack方法”。
另:Output-ElasticSearch外掛的說明文件,官網內容不全面,完全說明在這裡。
附上全部配置檔案內容:
<system>
worker 2
root_dir /xxxx/xxxxx/fluentd/agent
log_level info
</system>
<source>
@type tail
tag xxxxx.xxxxx.agent.log
path_key path
path /xxxx/xxxxx/agent/logs/xxxxx/*.batch*.log
pos_file /xxxx/xxxxx/fluentd/agent/pos/batch_agent.db
read_from_head true
<parse>
@type multiline
format_firstline /^(\d+-\d+-\d+\s+)?\d+:\d+:\d+\|/
format1 /^(?<log>(\d+-\d+-\d+\s+)?\d+:\d+:\d+\|.*)$/
</parse>
#multiline_flush_interval 5s
</source>
<filter foo.bar>
@type record_transformer
<record>
hostname "#{Socket.gethostname}"
</record>
</filter>
<match **>
@type forward
require_ack_response ture
<server>
host xx.xxx.xx.xx
port 24224
</server>
<buffer tag>
@type file
path /xxxx/xxxxx/fluentd/agent/buffer
flush_interval 10s
total_limit_size 1G
chunk_limit_size 10M
</buffer>
</match>
<source>
@type forward
bind 0.0.0.0
source_address_key hostIP
</source>
<filter xxxxx.xxxxx.agent.log>
@type parser
reserve_data true
key_name log
<parse>
@type regexp
expression /(?mx)^(?<systemDate>\d+-\d+-\d+)?\s?(?<systemTime>\d+:\d+:\d+)\|(?<bizDate>.*)\|(?<artiPersonCode>.*)\|(?<invokeSeqNo>.*)\|(?<orgCode>.*)\|(?<tranCode>.*)\|(?<bizNo>.*)\|(?<dueNum>.*)\|(?<jobId>.*)\|(?<jobRunId>.*)\|(?<message>.*)\|(?<file>.*)\|(?<line>.*)\|(?<thread>.*)\|(?<logLevel>.*)\|(?<misc>.*)$/
</parse>
</filter>
<filter xxxxx.xxxxx.agent.log>
@type record_transformer
enable_ruby true
<record>
systemDate ${ if !(o=record["systemDate"]).nil? and o.length!=0 then o else if (d=record["path"].scan(/\d{4}-\d{2}-\d{2}/).last).nil? then Time.new.strftime("%Y-%m-%d") else d end end }
</record>
</filter>
<filter xxxxx.xxxxx.agent.log>
@type record_transformer
enable_ruby true
<record>
logtime ${ DateTime.parse(record["systemDate"] + " " + record["systemTime"] + " +0800").to_time.to_i }
</record>
renew_time_key logtime
remove_keys logtime,systemDate,systemTime
</filter>
<filter xxxxx.xxxxx.**>
@type elasticsearch_genid
hash_id_key _hash # storing generated hash id key (default is _hash)
</filter>
<match aplus.batch.**>
@type elasticsearch
host xx.xxx.xx.xxxx
port 9200
id_key _hash # specify same key name which is specified in hash_id_key
remove_keys _hash # Elasticsearch doesn't like keys that start with _
logstash_format true
logstash_prefix log.xxxx.xxxxx.agent-
request_timeout 30s
# reload_connections false
slow_flush_log_threshold 30s
<buffer tag>
@type file
path "/xxxx/xxxxx/fluentd/aggregator/#{ENV['HOSTNAME']}/buffer"
total_limit_size 20G
chunk_limit_size 15M
flush_interval 10s
retry_wait 10.0
</buffer>
</match>