1. 程式人生 > 實用技巧 >fluentd 安裝、配置、使用介紹

fluentd 安裝、配置、使用介紹

一、fluentd簡介

fluentd是一個針對日誌的收集、處理、轉發系統。通過豐富的外掛系統, 可以收集來自於各種系統或應用的日誌,轉化為使用者指定的格式後,轉發到使用者所指定的日誌儲存系統之中。

通過 fluentd,你可以非常輕易的實現像追蹤日誌檔案並將其過濾後轉存到 MongoDB 這樣的操作。fluentd 可以徹底的將你從繁瑣的日誌處理中解放出來。

用圖來說明的話,沒有使用fluentd以前,系統是這樣的:

使用fluentd之後,系統是這樣的:

本篇博文將對fluentd的安裝、配置、使用等各方面做一個簡單的介紹。

fluentd 既可以作為日誌收集器安裝到每一個結點上, 也可以作為一個服務端收集各個結點上報的日誌流。 你甚至也可以在各個結點上都部署 fluentd 收集日誌,然後上報到一個 fluentd 叢集做統一處理, 然後再轉發到最終的日誌儲存伺服器。

所以在一個完整的日誌收集、處理系統裡,你可以構建一個這樣的日誌處理流:

Apps (with fluentd/fluent-bit) -> broker (kafka) -> fluentd cluster -> elasticsearch -> kibana

其中提到的 fluent-bit 是一個極簡版的 fluentd,專門用作日誌的收集和轉發, 可以在應用結點上取代 fluentd 收集日誌,滿足極端的資源要求。

1.1 與 logstash 的對比

通過上述描述,你也許會覺得和 ELK 中的 Logstash 高度相似。事實上也確實如此,你完全可以用 fluentd 來替換掉 ELK 中的 Logstash。

有兩篇文章對這兩個工具做了很好的對比:

概括一下的話,有以下區別:

  • fluentd 比 logstash 更省資源;
  • 更輕量級的 fluent-bid 對應 filebeat,作為部署在結點上的日誌收集器;
  • fluentd 有更多強大、開放的外掛數量和社群;

二、fluentd安裝

2017 年 12 月的時候,fluentd 釋出了 v1.0 版本,也就是 td-agent v3 版。

從 gem 安裝和從 rpm、yum 安裝的名字不一樣,連配置檔案的路徑都不一樣,需要記住的是:

  • 從 gem 安裝的,配置檔案和執行程式都叫做 fluent;
  • 從 rpm 安裝的,配置檔案和執行程式都叫做 td-agent3;

td-agent 和 fluentd 是同一個軟體,區別在於 td-agent 更注重於穩定性,在更新上會稍晚於 fluentd,而且依賴的一些庫也會有不同(如 jemalloc),更適用於用於生產環境。

2.1 安裝fluentd

詳細可參考官方文件

以 CentOS 為例:

# 安裝
$ curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent3.sh | sh

# 通過 systemd 啟動
$ systemctl start td-agent.service
$ systemctl status td-agent.service

# 或者也可以手動啟動
$ /etc/init.d/td-agent start
$ /etc/init.d/td-agent stop
$ /etc/init.d/td-agent restart
$ /etc/init.d/td-agent status

2.2 安裝外掛

# 從 rpm 安裝的話,
# 比如要使用下例的 mongo,需要安裝
$ td-agent-gem install fluent-plugin-mongo
$ td-agent-gem <PLUGIN_NAME>

# 從 gem 安裝的話
$ gem install <PLUGIN_NAME>

三、配置檔案

3.1 路徑

  • 如果是通過 gem 安裝的,那麼可以通過下列命令生成和編輯配置檔案:
$ fluentd --setup /etc/fluent
$ vi /etc/fluent/fluent.conf
  • 如果是通過 RPM, Deb 或 DMG 安裝的,那麼配置檔案在:
$ vi /etc/td-agent/td-agent.conf

3.2 常用

你可以在配置檔案裡使用 @include 來切分你的配置檔案,include 支援多種寫法:

# 絕對路徑
include /path/to/config.conf
# 相對路徑
@include conf.d/*.conf
# 甚至 URL
@include http://example.com/fluent.conf

3.3 資料格式

在配置檔案裡你需要為很多引數賦值,這些值必須使用 fluentd 支援的資料格式,有下列這些:

  • string:字串,最常見的格式,詳細支援語法見文件;
  • integer:整數;
  • float:浮點數;
  • size:大小,僅支援整數:
    • <INTEGER>k<INTERGER>K
    • <INTEGER>m<INTERGER>M
    • <INTEGER>g<INTERGER>G
    • <INTEGER>t<INTERGER>T
  • time:時間,也支援整數:
    • <INTEGER>s<INTERGER>S
    • <INTEGER>m<INTERGER>M
    • <INTEGER>h<INTERGER>H
    • <INTEGER>d<INTERGER>D
  • array:按照 JSON array 解析;
  • hash:按照 JSON object 解析;

四、命令

配置檔案的核心是各種命令塊(directives),每一種命令都是為了完成某種處理,命令與命令之間還可以組成串聯關係,以 pipline 的形式流式的處理和分發日誌。

命令的主要組成部分有:

  • source
  • filter
  • match
  • label
  • error

最常見的方式就是 source 收集日誌,然後由串聯的 filter 做流式的處理,最後交給 match 進行分發。match 是日誌流程的終點,一旦匹配了某一個 match,就不會再繼續往下匹配了。

同時你還可以用 label 將任務分組,用 error 處理異常,用 system 修改執行引數。

不同的命令中,都可以通過 @type 指定想要使用的外掛名字,而且還可以傳入各式各樣的外掛引數, 由豐富的外掛提供強大的功能,下面是詳細一些的說明。

4.1 source

source 是 fluentd 的一切資料的來源,每一個 source 內都包含一個輸入模組,比如原生整合的包含 httpforward 兩個模組,分別用來接收 HTTP 請求和 TCP 請求:

# Receive events from 24224/tcp
# This is used by log forwarding and the fluent-cat command
<source>
  @type forward
  port 24224
</source>

# http://this.host:9880/myapp.access?json={"event":"data"}
<source>
  @type http
  port 9880
</source>

當然,除了這兩個外,fluentd 還有大量的支援各種協議或方式的 source 外掛,比如最常用的 tail 就可以幫你追蹤檔案。

每一個具體的外掛都包含其特有的引數,比如上例中 port 就是一個引數,當你要使用一個 source 外掛的時候,注意看看有哪些引數是需要配置的,然後將其寫到 source directive 內。

source dirctive 在獲取到輸入後,會向 fluent 的路由丟擲一個事件,這個事件包含三個要素:

  • tag
  • time
  • record
    那上例程式碼中的第二個 source 舉例,當我們發起一個 http://this.host:9880/myapp.access?json={"event":"data"}的請求時,這個 source 會丟擲:
# generated by http://this.host:9880/myapp.access?json={"event":"data"}
tag: myapp.access
time: (current time)
record: {"event":"data"}

4.2 match

match 用來指定動作,通過 tag 匹配 source,然後執行指定的命令來分發日誌,最常見的用法就是將 source 收集的日誌轉存到資料庫。

# http://this.host:9880/myapp.access?json={"event":"data"}
<source>
  @type http
  port 9880
</source>

# 將標記為 myapp.access 的日誌轉存到檔案
<match myapp.access>
  @type file
  path /var/log/fluent/access
</match>

上例中的 myapp.access 就是 tag,tag 有好幾種匹配模式:

  • *:匹配任意一個 tag;
  • **:匹配任意數量個 tag;
  • a b:匹配 a 或 b;
  • {X,Y,Z}:匹配 X, Y, Z 中的一個;

比如可以寫成這樣:

<match a.*>
<match **>
<match a.{b,c}>
<match a.* b.*>

match 是從上往下依次匹配的,一旦一個日誌流被匹配上,就不會再繼續匹配剩下的 match 了。 所以如果有 <match **> 這樣的全匹配,一定要放到配置檔案的最後。

用法和 source 幾乎一模一樣,不過 source 是丟擲事件,match 是接收並處理事件。

而且 match 不僅僅用來處理輸出,還可以對日誌事件進行一些處理後重新丟擲,當成一個新的事件從新走一遍流程,比如可以用rewrite_tag_filter 外掛為日誌流重新打上 tag,實現通過正則來對日誌進行分流的需求:

<match app>
  # 捕獲被打上了 app tag 的日誌
  ...
</match>

<match cp>
  # 捕獲被打上了 cp tag 的日誌
  ...
</match>

<match **>
  # https://docs.fluentd.org/v0.12/articles/out_rewrite_tag_filter
  # 被打上 tag 的日誌會被從頭處理,從而被上面的 match 捕獲,實現了日誌的分流
  @type rewrite_tag_filter
  <rule>
    key log  # 指定要處理的 field
    pattern ^.*\ c\.p\.\ .*  # 匹配條件
    tag cp  # 打上 tag `cp`
  </rule>
  <rule>
    key log
    pattern ^.*
    tag app  # 其餘日誌打上 tag `app`
  </rule>
</match>

4.3 filter

filter 和 match 的語法幾乎完全一樣,但是 filter 可以串聯成 pipeline,對資料進行序列處理,最終再交給 match 輸出。

# http://this.host:9880/myapp.access?json={"event":"data"}
<source>
  @type http
  port 9880
</source>

<filter myapp.access>
  @type record_transformer
  <record>
    host_param "#{Socket.gethostname}"
  </record>
</filter>

<match myapp.access>
  @type file
  path /var/log/fluent/access
</match>

這個例子裡,filter 獲取資料後,呼叫原生的 @type record_transformer 外掛,在事件的 record 裡插入了新的欄位 host_param,然後再交給 match 輸出。

4.4 system

fluentd 的相關設定,可以在啟動時設定,也可以在配置檔案裡設定,包含:

  • log_level
  • suppress_repeated_stacktrace
  • emit_error_log_interval
  • suppress_config_dump
  • without_source

五、外掛介紹Plugins

Fluentd 有一個非常活躍社群,提供了大量的外掛,你可以在這裡看到大多數常見外掛的列表!

Fluentd 支援 7 種類型的外掛:

  • Input:事件流入口;
  • Parser:修改 Input 外掛中事件格式,用於 Source;
  • Filter: 修改事件流,用於 Filter;
  • Output:輸出外掛,用於 Match;
  • Formatter:修改 Output 外掛中事件流的格式,用於 Match;
  • Buffer:在 Output 外掛中指定 buffer,用於 Match;
  • Storage:將外掛狀態存入記憶體或資料庫,可用於 Source、Filter 和 Match,需要外掛支援 storage 命令;

六、外掛引數Parameters

不同的外掛都可以設定不同的引數,拿最簡單的 forward 舉個例子:

<source>
  @type http
  port 9880
</source>

其中 @typeport 都是引數,一個指明瞭外掛的名字,另一個指明瞭監聽的埠。

fluentd 裡有兩種型別的引數:

  • 預設引數:以 @ 開頭的都是預設引數;
  • 外掛引數:其餘的引數都是外掛引數,為外掛做配置,可以在外掛文件裡查閱。

6.1 預設引數 Common plugin parameter

fluentd 裡只有四個預設引數:

  • @type:用於指定外掛型別;
  • @id:指定外掛 id,在輸出監控資訊的時候有用;
  • @label:指定分組標籤,可以對日誌流做批處理;
  • @log_level:為每一組命令設定日誌級別。
6.1.1 label

label 用於將任務進行分組,方便複雜任務的管理。

你可以在 source 裡指定 @label @<LABEL_NAME>, 這個 source 所觸發的事件就會被髮送給指定的 label 所包含的任務, 而不會被後續的其他任務獲取到。

需要注意的是,label 一旦被聲明瞭,就必須在後面被用到,否則會報錯。

看個例子:

<source>
  @type forward
</source>

<source>
  # 這個任務指定了 label 為 @SYSTEM
  # 會被髮送給 <label @SYSTEM>
  # 而不會被髮送給下面緊跟的 filter 和 match
  @type tail
  @label @SYSTEM
</source>

<filter access.**>
  @type record_transformer
  <record>
    # ...
  </record>
</filter>
<match **>
  @type elasticsearch
  # ...
</match>

<label @SYSTEM>
  # 將會接收到上面 @type tail 的 source event
  <filter var.log.middleware.**>
    @type grep
    # ...
  </filter>
  <match **>
    @type s3
    # ...
  </match>
</label>
6.1.2 error

用來接收外掛通過呼叫 emit_error_event API 丟擲的異常,使用方法和 label 一樣,通過設定 <label @ERROR> 就可以接收到相關的異常。

6.1.3 log_level

官方文件

目前支援的日誌級別引數值有:

  • fatal
  • error
  • warn
  • info
  • debug
  • trace

從上往下依次遞減,當你指定了一個級別後,會捕獲大於等於該級別的所有日誌。

比如如果你指定 @log_level info,就會獲取到 info, warn, error, fatal 級別的日誌。

6.2 其他外掛引數

除了預設引數外,各個外掛還可以定製自己的引數,這個就需要查閱你所用外掛的文件頁面了。

tail 舉個例子,我們可以查閱 文件, 可以看到它有 tag, path, exclude_path, ... 等一系列的引數,比如其中 tag 就可以為日誌流打上供 match 使用的 tag

七、高可用

內容來源於官方文件:Fluentd High Availability Configuration

7.1 Message Delivery Semantics

任何訊息傳遞系統,都需要考慮訊息遞交語義(delivery semantics):

  • At most once:最多傳遞一次,有可能會丟訊息,但是不會重複;
  • At least once:最少傳遞一次,不會丟訊息,但是可能重複;
  • Exactly once:確切的只傳遞一次,需要多次確認訊息狀態,會極大的犧牲效能。

一般來說,我們會根據業務場景,在前兩種中選擇一種,第三種因為效能較差,只適合在小型內部系統上玩玩。

7.2 網路拓撲

一個日誌收集系統由兩個角色組成:

  • log forwarders:負責日誌採集和轉發;
  • log aggregators:負責日誌收集和彙總處理。

fluentd 可以扮演上述兩個角色(或者由 fluent-bit 扮演 forwarders 角色),為了保證高可用, 對 aggregators 做多點備份:

我們需要在 log forwarders 裡配置多個 aggregators:

# Log Forwarding
<match mytag.**>
  @type forward

  # 主 aggregator
  <server>
    host 192.168.0.1
    port 24224
  </server>
  # 備用 aggregators
  <server>
    host 192.168.0.2
    port 24224
    standby  # 宣告為備用
  </server>

  # 所有的日誌流都會存入磁碟,定期 flush 到 aggregators
  # 較長的 flush 可以減少 CPU
  <buffer>
    flush_interval 60s
  </buffer>
</match>

7.3 資料丟失的場景

Forwarder 會把所有資料存放在 buffer 中,假如你在 match 中配置了 buffer_type file,則會將資料都存放在磁碟中,然後按照 flush_interval 定期將資料傳送到 aggregator。

但是,如果 forwarder 程序在將資料寫入 buffer 前死掉了,或者存放 buffer 的磁碟壞掉了,就會導致資料丟失。

7.4 監控

7.4.1 外掛監控

fluentd 內建了一個 HTTP 介面,可以用來獲取外掛資訊,只需要在配置檔案里加上:

<source>
  @type monitor_agent
  bind 0.0.0.0
  port 24220
</source>

然後訪問:http://localhost:24220/api/plugins.json 就可以拿到外掛的資訊。

八、效能調優

一般來說,fluentd 單節點的吞吐量大概是 10w/sec 左右。

要想提高效能的話,可以在輸出端(match)指定 num_threads 來提高併發,在輸入端安裝 fluent-plugin-multiprocess 外掛來提高 CPU 的利用率(Ruby 也有 GIL 問題)。

8.1 負載均衡

fluentd 的 multiprocess 外掛非常的雞肋,只是幫你多啟動幾個 fluentd 程序,然後每個程序執行自己的配置檔案。這個你使用程序管理器(如 supervisor 或 systemd)都能做到。

後來又引入了 multi worker 的引數,但是簡單看了下後發現需要外掛做適配,而我並沒有精力去一個個的排查外掛的相容性,所以也就不考慮了。

為了提高 fluentd 的吞吐量,你有幾個辦法:

  • 拆分 fluentd 的配置檔案,然後各自啟動新的程序,缺點是各自監聽不同的埠;
  • 啟動 multi worker,利用多核提高效能;
  • 增加一個負載均衡,將流量分配到後端不同的 fluentd 程序上。

我採用了最後一種方法,使用 haproxy 分發 tcp 到後端的 fluentd,寫了一個 docker-compose 檔案,開箱即用:

https://github.com/Laisky/HelloWorld/tree/master/docker/docker_log/multi-process

不過在做拆分的時候,要考慮到當前的處理流程是否是無狀態的,比如兩個典型的場景:

  • 日誌多行合併;
  • 日誌解析;

其中多行合併就是有狀態的,不能很好的進行並行。而日誌解析是無狀態的,可以根據需求開任意多的程序來處理。為了分擔壓力,建議將 fluentd 的處理拆為幾個不同的步驟,其中第一個步驟僅進行多行合併等有狀態的請求,然後第二層再並行的進行較重的解析等操作,最大程度的提高 fluentd 叢集的吞吐量。

九、Demo

9.1 Nginx Log

一個監聽 Nginx 日誌的例子:

<source>
  @type tail
  @id nginx-access
  @label @nginx
  path /var/log/nginx/access.log
  pos_file /var/lib/fluentd/nginx-access.log.posg
  tag nginx.access
  format /^(?<remote>[^ ]*) (?<host>[^ ]*) \[(?<time>[^\]]*)\] (?<code>[^ ]*) "(?<method>\S+)(?: +(?<path>[^\"]*) +\S*)?" (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)")?$/
  time_format %d/%b/%Y:%H:%M:%S %z
</source>

<source>
  @type tail
  @id nginx-error
  @label @nginx
  path /var/log/nginx/error.log
  pos_file /var/lib/fluentd/nginx-error.log.posg
  tag nginx.error

  format /^(?<time>\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}) \[(?<log_level>\w+)\] (?<pid>\d+).(?<tid>\d+): (?<message>.*)$/
</source>

<label @nginx>
  <match nginx.access>
    @type mongo
    database nginx
    collection access
    host 10.47.12.119
    port 27016

    time_key time
    flush_interval 10s
  </match>
  <match nginx.error>
    @type mongo
    database nginx
    collection error
    host 10.47.12.119
    port 27016

    time_key time
    flush_interval 10s
  </match>
</label>

為了匹配,你也需要修改 Nginx 的 log_format 為:

log_format main '$remote_addr $host [$time_local] $status "$request" $body_bytes_sent "$http_referer" "$http_user_agent"';

9.2 Docker Log

如果你在啟動 docker 時配置了 --log_driver=fluentd 的話,就可以用 fluentd 來接受 docker 的日誌。

但是 docker 預設會按照換行符將日誌拆成一條條的 json,所以你需要合併多行日誌,並提取日誌資訊。 下面是一個例子,拆分成兩層,先做合併,再做解析:

# 這一層只做合併,做完後就轉發給下一層
<filter geely.sit>
  @type concat
  timeout_label @NORMAL  # concat 需要處理好 timeout flush,否則會丟資料
  flush_interval 5s
  key log
  stream_identity_key container_id
  multiline_start_regexp /^\d{4}-\d{2}-\d{2} +\d{2}:\d{2}:\d{2}.\d{3} +\|/
</filter>

<match **>
  @type relabel
  @label @NORMAL
</match>

<label @NORMAL>
  <match **.sit>
      @type copy
      <store>
          @type forward
          send_timeout 30s
          recover_wait 10s
          hard_timeout 30s
          <server>
              host lb
              port 24225
          </server>
      </store>
  </match>
</label>

第二層做解析,因為上一層拼合的日誌包含 \n,所以要用 multiline 來做解析:

<filter geely.sit>
  @type parser
  key_name log
  reserve_data true
  <parse>
    @type multiline
    format_firstline /^\d{4}-\d{2}-\d{2} +\d{2}:\d{2}:\d{2}.\d{3} +\|/
    format1 /^(?<time>.{23}) {0,}\| {0,}(?<project>[^ ]+) {0,}\| {0,}(?<level>[^ ]+) {0,}\| {0,}(?<thread>[^\|]+) {0,}\| {0,}(?<class>[^\:]+)\:(?<line>\d+) {0,}- {0,}(?<message>.+)/
    keep_time_key true
  </parse>
</filter>

9.3 Docker 化

一個例子,執行的時候需要把 fluent.conf 掛載到 /fluentd/etc/fluent.conf,才能執行:

FROM fluent/fluentd:v1.1.3

RUN apk add --update --virtual .build-deps \
    sudo build-base ruby-dev

RUN sudo gem install fluent-plugin-elasticsearch -v 2.8.6 \
    && sudo gem install fluent-plugin-concat -v 2.1.0 \
    && sudo gem install fluent-plugin-rewrite-tag-filter -v 2.0.2 \
    && sudo gem install fluent-plugin-kafka -v 0.6.3 \
    && sudo gem install fluent-plugin-cadvisor -v 0.3.1 \
    && sudo gem install fluent-plugin-flowcounter -v 1.3 \
    && sudo gem install fluent-plugin-ignore-filter -v 2.0.0 \
    && sudo gem sources --clear-all \
    && apk del .build-deps \
    && rm -rf /var/cache/apk/* \
        /home/fluent/.gem/ruby/2.3.0/cache/*.gem

RUN mkdir -p /data/log/td-agent/buffer/

ENV FLUENTD_CONF="fluent.conf"

ENTRYPOINT exec fluentd -c /fluentd/etc/${FLUENTD_CONF} -p /fluentd/plugins $FLUENTD_OPT

參考博文: