1. 程式人生 > 實用技巧 >最常見的日誌收集架構(ELK Stack)

最常見的日誌收集架構(ELK Stack)

日誌存在於不同的機器不同的目錄,所以首先機器上面要有收集日誌的 Agent,這些 Agent 被生動的叫做:Shippers(直譯:發貨商),好理解,將日誌像貨物一樣傳送出去。

我第一次聽到 ELK 的時候,以為它就是 Elasticsearch 的縮寫,誰知道它代表了三個元件:

  • E - Elasticsearch(簡稱:ES)
  • L - Logstash
  • K - Kibana

我理解也不是很深刻,Logstash 主要用來收集各個 Shipper 發過來的日誌,並做一個過濾,然後發給 Elasticsearch 儲存,Elasticsearch 儲存日誌,建索引,然後提供介面,Kibana 作為前端調介面提供可配置的視覺化。

這就是從機器上的日誌,到視覺化一個過程,蠻清晰的。

日誌資料可以這樣:

Filebeat(Shipper) -> Elasticsearch

索引(動詞)之前如果要做過濾拆分欄位就加個 Logstash:

Filebeat(Shipper) -> Logstash -> Elasticsearch

如果 Agent 量比較大,中間再加一個訊息佇列:

Filebeat(Shipper) -> 訊息佇列 -> Logstash -> Elasticsearch

這一套搭建下來很容易,遇到的問題我列一下,希望對你也有幫助:

問:如何保證 ES 中的資料有序?

答:Logstash 的 filter 流程中的 date 外掛可以解析業務時間並替換@timestamp

欄位,ES 預設是按照@timestamp欄位排序的。

filter {
  if [beat][name] == "xxx-backend" {
    grok {
      match => { "message" => "%{GREEDYDATA:time} \[%{DATA:log_level}\] \[%{DATA:golang}\] %{GREEDYDATA:message}" }
      overwrite => [ "message" ]
    }
    date {
      match => ["time", "yyyy/MM/dd HH:mm:ss.SSS"]
    }
  }
}

判斷是不是某個業務,如果是,那我們知道格式,然後使用 grok 外掛,解析並拆分欄位,其中的 time 欄位給 date 外掛解析並替換@timestamp


問:Kafka 中的資料格式是什麼樣的,如何消費?

答:Filebeat 打到 kafka 的日誌資訊是 json 格式的,Logstash 消費的時候可以使用 filter 流程中的 json 外掛來解析。

filter {
  if [type] == 'from-kafka' {
    json {
      source => "message"
    }
  }
}

問:如何區分不同的資料來源?

答:input 塊中,type 欄位標明即可。

input {
  beats {
    port => 5044
    type => "from-beat"
  }
  kafka {
    bootstrap_servers => "kafkahost1:9092,kafkahost2:9092,kafkahost3:9092"
    topics => ["pdd"]
    type => "from-kafka"
  }
}

問:如何標記不同的業務日誌?

答:使用 Shipper 的 name 或者 tag 來標記。

name: xxx-backend
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /data/xxx-logs/*.log
  tags: ["正義之劍"]

問:如何在 ES 中為不同的業務建立不同的 index?

答:上面我們為 Shipper 起了名字,我們可以用名字作為索引的一部分。

output {
  elasticsearch {
    hosts => "http://EShost:9200"
    index => "xxx-%{[beat][name]}-%{+YYYY.MM.dd}"
    user => "xxx"
    password => "yyy"
  }
}

問:如何在 Logstash 層配置多個業務的 filter?

答:和 nginx 的配置類似,有個 conf.d 的資料夾,配置都放在下面,然後開啟自動 reload 即可。

  • 在 logstash.yml 配置檔案中啟用:
config.reload.automatic: true
config.reload.interval: 10s
  • 觀察 pipelines.yml 中配置:
- pipeline.id: main
  path.config: "/etc/logstash/conf.d/*.conf"

只啟用一個管道,配置 watchpath.config目錄 。