fluentd 安裝、配置、使用介紹
一、fluentd簡介
fluentd是一個針對日誌的收集、處理、轉發系統。通過豐富的外掛系統, 可以收集來自於各種系統或應用的日誌,轉化為使用者指定的格式後,轉發到使用者所指定的日誌儲存系統之中。
通過 fluentd,你可以非常輕易的實現像追蹤日誌檔案並將其過濾後轉存到 MongoDB 這樣的操作。fluentd 可以徹底的將你從繁瑣的日誌處理中解放出來。
用圖來說明的話,沒有使用fluentd以前,系統是這樣的:
使用fluentd之後,系統是這樣的:
本篇博文將對fluentd的安裝、配置、使用等各方面做一個簡單的介紹。
fluentd 既可以作為日誌收集器安裝到每一個結點上, 也可以作為一個服務端收集各個結點上報的日誌流。 你甚至也可以在各個結點上都部署 fluentd 收集日誌,然後上報到一個 fluentd 叢集做統一處理, 然後再轉發到最終的日誌儲存伺服器。
所以在一個完整的日誌收集、處理系統裡,你可以構建一個這樣的日誌處理流:
Apps (with fluentd/fluent-bit) -> broker (kafka) -> fluentd cluster -> elasticsearch -> kibana
其中提到的 fluent-bit 是一個極簡版的 fluentd,專門用作日誌的收集和轉發, 可以在應用結點上取代 fluentd 收集日誌,滿足極端的資源要求。
1.1 與 logstash 的對比
通過上述描述,你也許會覺得和 ELK 中的 Logstash 高度相似。事實上也確實如此,你完全可以用 fluentd 來替換掉 ELK 中的 Logstash。
有兩篇文章對這兩個工具做了很好的對比:
概括一下的話,有以下區別:
- fluentd 比 logstash 更省資源;
- 更輕量級的 fluent-bid 對應 filebeat,作為部署在結點上的日誌收集器;
- fluentd 有更多強大、開放的外掛數量和社群;
二、fluentd安裝
2017 年 12 月的時候,fluentd 釋出了 v1.0 版本,也就是 td-agent v3 版。
從 gem 安裝和從 rpm、yum 安裝的名字不一樣,連配置檔案的路徑都不一樣,需要記住的是:
- 從 gem 安裝的,配置檔案和執行程式都叫做 fluent;
- 從 rpm 安裝的,配置檔案和執行程式都叫做 td-agent3;
td-agent 和 fluentd 是同一個軟體,區別在於 td-agent 更注重於穩定性,在更新上會稍晚於 fluentd,而且依賴的一些庫也會有不同(如 jemalloc),更適用於用於生產環境。
2.1 安裝fluentd
詳細可參考官方文件!
以 CentOS 為例:
# 安裝
$ curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent3.sh | sh
# 通過 systemd 啟動
$ systemctl start td-agent.service
$ systemctl status td-agent.service
# 或者也可以手動啟動
$ /etc/init.d/td-agent start
$ /etc/init.d/td-agent stop
$ /etc/init.d/td-agent restart
$ /etc/init.d/td-agent status
2.2 安裝外掛
# 從 rpm 安裝的話,
# 比如要使用下例的 mongo,需要安裝
$ td-agent-gem install fluent-plugin-mongo
$ td-agent-gem <PLUGIN_NAME>
# 從 gem 安裝的話
$ gem install <PLUGIN_NAME>
三、配置檔案
3.1 路徑
- 如果是通過 gem 安裝的,那麼可以通過下列命令生成和編輯配置檔案:
$ fluentd --setup /etc/fluent
$ vi /etc/fluent/fluent.conf
- 如果是通過 RPM, Deb 或 DMG 安裝的,那麼配置檔案在:
$ vi /etc/td-agent/td-agent.conf
3.2 常用
你可以在配置檔案裡使用 @include
來切分你的配置檔案,include 支援多種寫法:
# 絕對路徑
include /path/to/config.conf
# 相對路徑
@include conf.d/*.conf
# 甚至 URL
@include http://example.com/fluent.conf
3.3 資料格式
在配置檔案裡你需要為很多引數賦值,這些值必須使用 fluentd 支援的資料格式,有下列這些:
string
:字串,最常見的格式,詳細支援語法見文件;integer
:整數;float
:浮點數;size
:大小,僅支援整數:<INTEGER>k
或<INTERGER>K
;<INTEGER>m
或<INTERGER>M
;<INTEGER>g
或<INTERGER>G
;<INTEGER>t
或<INTERGER>T
;
time
:時間,也支援整數:<INTEGER>s
或<INTERGER>S
;<INTEGER>m
或<INTERGER>M
;<INTEGER>h
或<INTERGER>H
;<INTEGER>d
或<INTERGER>D
;
array
:按照 JSON array 解析;hash
:按照 JSON object 解析;
四、命令
配置檔案的核心是各種命令塊(directives),每一種命令都是為了完成某種處理,命令與命令之間還可以組成串聯關係,以 pipline 的形式流式的處理和分發日誌。
命令的主要組成部分有:
- source
- filter
- match
- label
- error
最常見的方式就是 source 收集日誌,然後由串聯的 filter 做流式的處理,最後交給 match 進行分發。match 是日誌流程的終點,一旦匹配了某一個 match,就不會再繼續往下匹配了。
同時你還可以用 label 將任務分組,用 error 處理異常,用 system 修改執行引數。
不同的命令中,都可以通過 @type
指定想要使用的外掛名字,而且還可以傳入各式各樣的外掛引數, 由豐富的外掛提供強大的功能,下面是詳細一些的說明。
4.1 source
source 是 fluentd 的一切資料的來源,每一個 source 內都包含一個輸入模組,比如原生整合的包含 http
和 forward
兩個模組,分別用來接收 HTTP 請求和 TCP 請求:
# Receive events from 24224/tcp
# This is used by log forwarding and the fluent-cat command
<source>
@type forward
port 24224
</source>
# http://this.host:9880/myapp.access?json={"event":"data"}
<source>
@type http
port 9880
</source>
當然,除了這兩個外,fluentd 還有大量的支援各種協議或方式的 source 外掛,比如最常用的 tail
就可以幫你追蹤檔案。
每一個具體的外掛都包含其特有的引數,比如上例中 port
就是一個引數,當你要使用一個 source
外掛的時候,注意看看有哪些引數是需要配置的,然後將其寫到 source directive
內。
source dirctive
在獲取到輸入後,會向 fluent 的路由丟擲一個事件,這個事件包含三個要素:
- tag
- time
- record
那上例程式碼中的第二個 source 舉例,當我們發起一個http://this.host:9880/myapp.access?json={"event":"data"}
的請求時,這個 source 會丟擲:
# generated by http://this.host:9880/myapp.access?json={"event":"data"}
tag: myapp.access
time: (current time)
record: {"event":"data"}
4.2 match
match 用來指定動作,通過 tag 匹配 source,然後執行指定的命令來分發日誌,最常見的用法就是將 source 收集的日誌轉存到資料庫。
# http://this.host:9880/myapp.access?json={"event":"data"}
<source>
@type http
port 9880
</source>
# 將標記為 myapp.access 的日誌轉存到檔案
<match myapp.access>
@type file
path /var/log/fluent/access
</match>
上例中的 myapp.access 就是 tag,tag 有好幾種匹配模式:
*
:匹配任意一個 tag;**
:匹配任意數量個 tag;a b
:匹配 a 或 b;{X,Y,Z}
:匹配 X, Y, Z 中的一個;
比如可以寫成這樣:
<match a.*>
<match **>
<match a.{b,c}>
<match a.* b.*>
match 是從上往下依次匹配的,一旦一個日誌流被匹配上,就不會再繼續匹配剩下的 match 了。 所以如果有 <match **>
這樣的全匹配,一定要放到配置檔案的最後。
用法和 source 幾乎一模一樣,不過 source 是丟擲事件,match 是接收並處理事件。
而且 match 不僅僅用來處理輸出,還可以對日誌事件進行一些處理後重新丟擲,當成一個新的事件從新走一遍流程,比如可以用rewrite_tag_filter
外掛為日誌流重新打上 tag,實現通過正則來對日誌進行分流的需求:
<match app>
# 捕獲被打上了 app tag 的日誌
...
</match>
<match cp>
# 捕獲被打上了 cp tag 的日誌
...
</match>
<match **>
# https://docs.fluentd.org/v0.12/articles/out_rewrite_tag_filter
# 被打上 tag 的日誌會被從頭處理,從而被上面的 match 捕獲,實現了日誌的分流
@type rewrite_tag_filter
<rule>
key log # 指定要處理的 field
pattern ^.*\ c\.p\.\ .* # 匹配條件
tag cp # 打上 tag `cp`
</rule>
<rule>
key log
pattern ^.*
tag app # 其餘日誌打上 tag `app`
</rule>
</match>
4.3 filter
filter 和 match 的語法幾乎完全一樣,但是 filter 可以串聯成 pipeline,對資料進行序列處理,最終再交給 match 輸出。
# http://this.host:9880/myapp.access?json={"event":"data"}
<source>
@type http
port 9880
</source>
<filter myapp.access>
@type record_transformer
<record>
host_param "#{Socket.gethostname}"
</record>
</filter>
<match myapp.access>
@type file
path /var/log/fluent/access
</match>
這個例子裡,filter 獲取資料後,呼叫原生的 @type record_transformer
外掛,在事件的 record 裡插入了新的欄位 host_param,然後再交給 match 輸出。
4.4 system
fluentd 的相關設定,可以在啟動時設定,也可以在配置檔案裡設定,包含:
- log_level
- suppress_repeated_stacktrace
- emit_error_log_interval
- suppress_config_dump
- without_source
五、外掛介紹Plugins
Fluentd 有一個非常活躍社群,提供了大量的外掛,你可以在這裡看到大多數常見外掛的列表!
Fluentd 支援 7 種類型的外掛:
- Input:事件流入口;
- Parser:修改 Input 外掛中事件格式,用於 Source;
- Filter: 修改事件流,用於 Filter;
- Output:輸出外掛,用於 Match;
- Formatter:修改 Output 外掛中事件流的格式,用於 Match;
- Buffer:在 Output 外掛中指定 buffer,用於 Match;
- Storage:將外掛狀態存入記憶體或資料庫,可用於 Source、Filter 和 Match,需要外掛支援 storage 命令;
六、外掛引數Parameters
不同的外掛都可以設定不同的引數,拿最簡單的 forward 舉個例子:
<source>
@type http
port 9880
</source>
其中 @type
、port
都是引數,一個指明瞭外掛的名字,另一個指明瞭監聽的埠。
fluentd 裡有兩種型別的引數:
- 預設引數:以
@
開頭的都是預設引數; - 外掛引數:其餘的引數都是外掛引數,為外掛做配置,可以在外掛文件裡查閱。
6.1 預設引數 Common plugin parameter
fluentd 裡只有四個預設引數:
@type
:用於指定外掛型別;@id
:指定外掛 id,在輸出監控資訊的時候有用;@label
:指定分組標籤,可以對日誌流做批處理;@log_level
:為每一組命令設定日誌級別。
6.1.1 label
label 用於將任務進行分組,方便複雜任務的管理。
你可以在 source 裡指定 @label @<LABEL_NAME>
, 這個 source 所觸發的事件就會被髮送給指定的 label 所包含的任務, 而不會被後續的其他任務獲取到。
需要注意的是,label 一旦被聲明瞭,就必須在後面被用到,否則會報錯。
看個例子:
<source>
@type forward
</source>
<source>
# 這個任務指定了 label 為 @SYSTEM
# 會被髮送給 <label @SYSTEM>
# 而不會被髮送給下面緊跟的 filter 和 match
@type tail
@label @SYSTEM
</source>
<filter access.**>
@type record_transformer
<record>
# ...
</record>
</filter>
<match **>
@type elasticsearch
# ...
</match>
<label @SYSTEM>
# 將會接收到上面 @type tail 的 source event
<filter var.log.middleware.**>
@type grep
# ...
</filter>
<match **>
@type s3
# ...
</match>
</label>
6.1.2 error
用來接收外掛通過呼叫 emit_error_event
API 丟擲的異常,使用方法和 label 一樣,通過設定 <label @ERROR>
就可以接收到相關的異常。
6.1.3 log_level
目前支援的日誌級別引數值有:
fatal
error
warn
info
debug
trace
從上往下依次遞減,當你指定了一個級別後,會捕獲大於等於該級別的所有日誌。
比如如果你指定 @log_level info
,就會獲取到 info, warn, error, fatal
級別的日誌。
6.2 其他外掛引數
除了預設引數外,各個外掛還可以定製自己的引數,這個就需要查閱你所用外掛的文件頁面了。
拿 tail
舉個例子,我們可以查閱 文件, 可以看到它有 tag, path, exclude_path, ...
等一系列的引數,比如其中 tag
就可以為日誌流打上供 match
使用的 tag
。
七、高可用
內容來源於官方文件:Fluentd High Availability Configuration。
7.1 Message Delivery Semantics
任何訊息傳遞系統,都需要考慮訊息遞交語義(delivery semantics):
- At most once:最多傳遞一次,有可能會丟訊息,但是不會重複;
- At least once:最少傳遞一次,不會丟訊息,但是可能重複;
- Exactly once:確切的只傳遞一次,需要多次確認訊息狀態,會極大的犧牲效能。
一般來說,我們會根據業務場景,在前兩種中選擇一種,第三種因為效能較差,只適合在小型內部系統上玩玩。
7.2 網路拓撲
一個日誌收集系統由兩個角色組成:
- log forwarders:負責日誌採集和轉發;
- log aggregators:負責日誌收集和彙總處理。
fluentd 可以扮演上述兩個角色(或者由 fluent-bit 扮演 forwarders 角色),為了保證高可用, 對 aggregators 做多點備份:
我們需要在 log forwarders 裡配置多個 aggregators:
# Log Forwarding
<match mytag.**>
@type forward
# 主 aggregator
<server>
host 192.168.0.1
port 24224
</server>
# 備用 aggregators
<server>
host 192.168.0.2
port 24224
standby # 宣告為備用
</server>
# 所有的日誌流都會存入磁碟,定期 flush 到 aggregators
# 較長的 flush 可以減少 CPU
<buffer>
flush_interval 60s
</buffer>
</match>
7.3 資料丟失的場景
Forwarder 會把所有資料存放在 buffer 中,假如你在 match 中配置了 buffer_type file
,則會將資料都存放在磁碟中,然後按照 flush_interval
定期將資料傳送到 aggregator。
但是,如果 forwarder 程序在將資料寫入 buffer 前死掉了,或者存放 buffer 的磁碟壞掉了,就會導致資料丟失。
7.4 監控
7.4.1 外掛監控
fluentd 內建了一個 HTTP 介面,可以用來獲取外掛資訊,只需要在配置檔案里加上:
<source>
@type monitor_agent
bind 0.0.0.0
port 24220
</source>
然後訪問:http://localhost:24220/api/plugins.json
就可以拿到外掛的資訊。
八、效能調優
一般來說,fluentd 單節點的吞吐量大概是 10w/sec 左右。
要想提高效能的話,可以在輸出端(match)指定 num_threads
來提高併發,在輸入端安裝 fluent-plugin-multiprocess
外掛來提高 CPU 的利用率(Ruby 也有 GIL 問題)。
8.1 負載均衡
fluentd 的 multiprocess 外掛非常的雞肋,只是幫你多啟動幾個 fluentd 程序,然後每個程序執行自己的配置檔案。這個你使用程序管理器(如 supervisor 或 systemd)都能做到。
後來又引入了 multi worker 的引數,但是簡單看了下後發現需要外掛做適配,而我並沒有精力去一個個的排查外掛的相容性,所以也就不考慮了。
為了提高 fluentd 的吞吐量,你有幾個辦法:
- 拆分 fluentd 的配置檔案,然後各自啟動新的程序,缺點是各自監聽不同的埠;
- 啟動 multi worker,利用多核提高效能;
- 增加一個負載均衡,將流量分配到後端不同的 fluentd 程序上。
我採用了最後一種方法,使用 haproxy 分發 tcp 到後端的 fluentd,寫了一個 docker-compose 檔案,開箱即用:
https://github.com/Laisky/HelloWorld/tree/master/docker/docker_log/multi-process
不過在做拆分的時候,要考慮到當前的處理流程是否是無狀態的,比如兩個典型的場景:
- 日誌多行合併;
- 日誌解析;
其中多行合併就是有狀態的,不能很好的進行並行。而日誌解析是無狀態的,可以根據需求開任意多的程序來處理。為了分擔壓力,建議將 fluentd 的處理拆為幾個不同的步驟,其中第一個步驟僅進行多行合併等有狀態的請求,然後第二層再並行的進行較重的解析等操作,最大程度的提高 fluentd 叢集的吞吐量。
九、Demo
9.1 Nginx Log
一個監聽 Nginx 日誌的例子:
<source>
@type tail
@id nginx-access
@label @nginx
path /var/log/nginx/access.log
pos_file /var/lib/fluentd/nginx-access.log.posg
tag nginx.access
format /^(?<remote>[^ ]*) (?<host>[^ ]*) \[(?<time>[^\]]*)\] (?<code>[^ ]*) "(?<method>\S+)(?: +(?<path>[^\"]*) +\S*)?" (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)")?$/
time_format %d/%b/%Y:%H:%M:%S %z
</source>
<source>
@type tail
@id nginx-error
@label @nginx
path /var/log/nginx/error.log
pos_file /var/lib/fluentd/nginx-error.log.posg
tag nginx.error
format /^(?<time>\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}) \[(?<log_level>\w+)\] (?<pid>\d+).(?<tid>\d+): (?<message>.*)$/
</source>
<label @nginx>
<match nginx.access>
@type mongo
database nginx
collection access
host 10.47.12.119
port 27016
time_key time
flush_interval 10s
</match>
<match nginx.error>
@type mongo
database nginx
collection error
host 10.47.12.119
port 27016
time_key time
flush_interval 10s
</match>
</label>
為了匹配,你也需要修改 Nginx 的 log_format
為:
log_format main '$remote_addr $host [$time_local] $status "$request" $body_bytes_sent "$http_referer" "$http_user_agent"';
9.2 Docker Log
如果你在啟動 docker 時配置了 --log_driver=fluentd
的話,就可以用 fluentd 來接受 docker 的日誌。
但是 docker 預設會按照換行符將日誌拆成一條條的 json,所以你需要合併多行日誌,並提取日誌資訊。 下面是一個例子,拆分成兩層,先做合併,再做解析:
# 這一層只做合併,做完後就轉發給下一層
<filter geely.sit>
@type concat
timeout_label @NORMAL # concat 需要處理好 timeout flush,否則會丟資料
flush_interval 5s
key log
stream_identity_key container_id
multiline_start_regexp /^\d{4}-\d{2}-\d{2} +\d{2}:\d{2}:\d{2}.\d{3} +\|/
</filter>
<match **>
@type relabel
@label @NORMAL
</match>
<label @NORMAL>
<match **.sit>
@type copy
<store>
@type forward
send_timeout 30s
recover_wait 10s
hard_timeout 30s
<server>
host lb
port 24225
</server>
</store>
</match>
</label>
第二層做解析,因為上一層拼合的日誌包含 \n
,所以要用 multiline 來做解析:
<filter geely.sit>
@type parser
key_name log
reserve_data true
<parse>
@type multiline
format_firstline /^\d{4}-\d{2}-\d{2} +\d{2}:\d{2}:\d{2}.\d{3} +\|/
format1 /^(?<time>.{23}) {0,}\| {0,}(?<project>[^ ]+) {0,}\| {0,}(?<level>[^ ]+) {0,}\| {0,}(?<thread>[^\|]+) {0,}\| {0,}(?<class>[^\:]+)\:(?<line>\d+) {0,}- {0,}(?<message>.+)/
keep_time_key true
</parse>
</filter>
9.3 Docker 化
一個例子,執行的時候需要把 fluent.conf 掛載到 /fluentd/etc/fluent.conf
,才能執行:
FROM fluent/fluentd:v1.1.3
RUN apk add --update --virtual .build-deps \
sudo build-base ruby-dev
RUN sudo gem install fluent-plugin-elasticsearch -v 2.8.6 \
&& sudo gem install fluent-plugin-concat -v 2.1.0 \
&& sudo gem install fluent-plugin-rewrite-tag-filter -v 2.0.2 \
&& sudo gem install fluent-plugin-kafka -v 0.6.3 \
&& sudo gem install fluent-plugin-cadvisor -v 0.3.1 \
&& sudo gem install fluent-plugin-flowcounter -v 1.3 \
&& sudo gem install fluent-plugin-ignore-filter -v 2.0.0 \
&& sudo gem sources --clear-all \
&& apk del .build-deps \
&& rm -rf /var/cache/apk/* \
/home/fluent/.gem/ruby/2.3.0/cache/*.gem
RUN mkdir -p /data/log/td-agent/buffer/
ENV FLUENTD_CONF="fluent.conf"
ENTRYPOINT exec fluentd -c /fluentd/etc/${FLUENTD_CONF} -p /fluentd/plugins $FLUENTD_OPT
參考博文: