1. 程式人生 > 實用技巧 >ELK stack 7.6.x + kafka構建日誌平臺

ELK stack 7.6.x + kafka構建日誌平臺

背景

使用 ELK Stack + kafka 搭建一個日誌系統。

日誌是不可預測的。在生產環境發生故障需要看日誌時,日誌可能突然激增並淹沒您的日誌記錄基礎結構。為了保護 Logstash 和Elasticsearch 免受此類資料突發攻擊,需要部署緩衝機制以充當訊息代理。

Apache Kafka是與ELK Stack一起部署的最常見的代理解決方案。通常,Kafka部署在託運人和索引器之間,用作收集資料的入口點:

graph LR filebeat1 --log --> kafka filebeat2 --log --> kafka filebeat3 --log --> kafka kafka --> logstash logstash --> elasticsearch elasticsearch --> kibana

本文將展示如何使用 ELK Stack 和 Kafka 部署建立彈性資料管道所需的所有元件:

  • Filebeat –收集日誌並將其轉發到 Kafka 主題。
  • Kafka –代理資料流並將其排隊。
  • Logstash –彙總來自 Kafka 主題的資料,對其進行處理並將其傳送到 Elasticsearch。
  • Elasticsearch –索引資料。
  • Kibana –用於分析資料

基礎環境

基於 CentOS 7.6 ,一個 Spring boot 的日誌作為例子。 /data/spring/gateway/nohup.out

ELK Stack 7.6.x

JDK 1.8 請自行安裝

Elasticsearch

目錄統一在 /data/elasticsearch

安裝

wget https://mirrors.huaweicloud.com/elasticsearch/7.6.2/elasticsearch-7.6.2-x86_64.rpm
rpm -ivh  elasticsearch-7.6.2-x86_64.rpm

配置

/etc/elasticsearch/jvm.options

[root@es1 elasticsearch]# egrep -v "^#|^$" /etc/elasticsearch/jvm.options 
-Xms16g
-Xmx16g
8-13:-XX:+UseConcMarkSweepGC
8-13:-XX:CMSInitiatingOccupancyFraction=75
8-13:-XX:+UseCMSInitiatingOccupancyOnly
14-:-XX:+UseG1GC
14-:-XX:G1ReservePercent=25
14-:-XX:InitiatingHeapOccupancyPercent=30
-Djava.io.tmpdir=${ES_TMPDIR}
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/data/elasticsearch
-XX:ErrorFile=/data/elasticsearch/log/hs_err_pid%p.log
8:-XX:+PrintGCDetails
8:-XX:+PrintGCDateStamps
8:-XX:+PrintTenuringDistribution
8:-XX:+PrintGCApplicationStoppedTime
8:-Xloggc:/data/elasticsearch/log/gc.log
8:-XX:+UseGCLogFileRotation
8:-XX:NumberOfGCLogFiles=32
8:-XX:GCLogFileSize=64m
9-:-Xlog:gc*,gc+age=trace,safepoint:file=/data/elasticsearch/log/gc.log:utctime,pid,tags:filecount=32,filesize=64m

/etc/elasticsearch/elasticsearch.yml

[root@es1 elasticsearch]# egrep -v "^#|^$" /etc/elasticsearch/elasticsearch.yml
cluster.name: smy
node.name: node-1
path.data: /data/elasticsearch
path.logs: /data/elasticsearch/log
network.host: 192.168.252.174
http.port: 9200
discovery.seed_hosts: ["127.0.0.1", "[::1]"]
cluster.initial_master_nodes: ["node-1"]
http.cors.enabled: true
http.cors.allow-origin: /.*/ 

啟動

systemctl start elasticsearch.service 
systemctl status elasticsearch.service

檢測

瀏覽器訪問http://ip:9200, 可以看到類似如下輸出的內容

{
  "name" : "node-1",
  "cluster_name" : "smy",
  "cluster_uuid" : "wsjadseSQkqQQDkVz1gRgw",
  "version" : {
    "number" : "7.6.2",
    "build_flavor" : "default",
    "build_type" : "rpm",
    "build_hash" : "ef48eb35cf30adf4db14086e8aabd07ef6fb113f",
    "build_date" : "2020-03-26T06:34:37.794943Z",
    "build_snapshot" : false,
    "lucene_version" : "8.4.0",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"
}

Logstash

接下來, 是 Elk + Logstash 中的 "L" 。

安裝

wget https://mirrors.cloud.tencent.com/elasticstack/7.x/yum/7.6.2/logstash-7.6.2.rpm
rpm -ivh logstash-7.6.2.rpm

配置

接下來,我們將配置一個 Logstash 管道,該管道從 Kafka 主題提取日誌,處理這些日誌並將其運送到 Elasticsearch 進行索引。
讓我們建立一個新的配置檔案:

vim /etc/logstash/conf.d/gateway.conf

內容如下:

input {
  kafka {
    bootstrap_servers => "localhost:9092"
    topics => "apache"
    }
}
filter {
  grok {
    match => [ "message", 
               "(?<timestamp>%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{TIME})  %{LOGLEVEL:level} %{NUMBER:pid} --- \[(?<thread>[A-Za-z0-9-]+)\] [A-Za-z0-9.]*\.(?<class>[A-Za-z0-9#_]+)\s*:\s+(?<logmessage>.*)",
               "message",
               "(?<timestamp>%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{TIME})  %{LOGLEVEL:level} %{NUMBER:pid} --- .+? :\s+(?<logmessage>.*)"
             ]
  }

  #Parsing out timestamps which are in timestamp field thanks to previous grok section
  date {
    match => [ "timestamp" , "yyyy-MM-dd HH:mm:ss.SSS" ]
  }
}
output {
  elasticsearch {
    hosts => ["192.168.252.174:9200"]
  }
}

我們使用 Logstash Kafka 輸入外掛來定義 Kafka 主機和我們希望 Logstash 提取的主題。我們正在對日誌應用一些篩選,並且將資料運送到本地 Elasticsearch 例項。
儲存檔案。

Kibana

接下來, 是 Elk + Logstash 中的 "K" 。

安裝

wget https://mirrors.huaweicloud.com/kibana/7.6.2/kibana-7.6.2-x86_64.rpm
rpm -ivh ibana-7.6.2-x86_64.rpm

配置

[root@wlj174 ~]# egrep -v "^#|^$" /etc/kibana/kibana.yml 
server.port: 5601
server.host: "192.168.252.174"
elasticsearch.hosts: ["http://192.168.252.174:9200"]
kibana.index: ".kibana"

啟動

systemctl start kibana

瀏覽器開啟 http://ip:5601,即可開啟 Kibana 首頁

**Filebeat **

安裝

wget https://mirrors.cloud.tencent.com/elasticstack/7.x/yum/7.6.2/filebeat-7.6.2-x86_64.rpm
rpm -ivh filebeat-7.6.2-x86_64.rpm 

配置

[root@iZ1767b3udZ filebeat]# cat /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /data/spring/gateway/nohup.out
output.kafka:
  codec.format:
    string: '%{[@timestamp]} %{[message]}'
  hosts: ["192.168.252.174:9092"]
  topic: apache
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

在輸入部分,告訴 Filebeat 要收集哪些日誌 gateway 的訪問日誌。在輸出部分,告訴 Filebeat 將資料轉發到我們的本地Kafka 伺服器和相關主題(將在下一步中安裝)。
請注意 codec.format 指令的使用-這是為了確保正確提取 message和 timestamp 欄位。否則,這些行將以 JSON 傳送到 Kafka。
儲存檔案。

Kafka

最後來安裝我們的 Kafka ,Kafka 使用 zookeeper 來維護配置資訊及同步。我們使用現有的 zookeeper

wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.6.0/kafka_2.12-2.6.0.tgz
tar zxvf kafka_2.12-2.6.0.tgz

只需要修改配置檔案 server.propertieszookeeper.connect=192.168.252.174:2181,修改為你自己的 zookeeper 地址即可。

啟動

nohup bin/kafka-server-start.sh config/server.properties &

你會看到一些輸出:

[2020-09-07 21:38:06,559] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2020-09-07 21:38:06,564] INFO starting (kafka.server.KafkaServer)
[2020-09-07 21:38:06,565] INFO Connecting to zookeeper on 192.168.252.174:2181 (kafka.server.KafkaServer)
[2020-09-07 21:38:06,583] INFO [ZooKeeperClient Kafka server] Initializing a new session to 192.168.252.174:2181. (kafka.zookeeper.ZooKeeperClient)
[2020-09-07 21:38:06,590] INFO Client environment:zookeeper.version=3.5.8-f439ca583e70862c3068a1f2a7d4d068eec33315, built on 05/04/2020 15:53 GMT (org.apache.zookeeper.ZooKeeper)
...
...

接下來為 gateway 日誌建立一個主題(topic)

bin/kafka-topics.sh --create --zookeeper localhost:2181--replication-factor 1 --partitions 1 --topic apacheCreated topic apache

啟動資料管道

啟動 Filebeat

systemctl start filebeat

啟動 Logstash

systemctl start logstash

需要幾分鐘以後才能在 Elasticsearch 或 Kibana 看到,我也不知道為什麼。。。著急的話,可以先在 Kafka 看

[root@wlj174 bin]# ./kafka-console-consumer.sh  --topic apache --bootstrap-server localhost:9092
2020-09-08T14:46:27.308Z 2020-09-08 14:46:17.310 ERROR 59181 --- [or-http-epoll-4] c.w.s.f.e.s.f.StaticRequestDecorator     : 解密失敗:Decryption error
2020-09-08T14:46:27.308Z java.lang.NullPointerException
2020-09-08T14:46:52.310Z 2020-09-08 14:46:50.415 ERROR 59181 --- [r-http-epoll-13] c.w.s.f.e.s.f.StaticRequestDecorator     : 解密失敗:Decryption error
2020-09-08T14:46:52.310Z java.lang.NullPointerException
...
...

為了確保 Logstash 正在聚合資料並將其運送到Elasticsearch中,請使用:

curl -X GET "lip:9200/_cat/indices?v"
[root@wlj174 bin]# curl -X GET "192.168.252.174:9200/_cat/indices?v"
health status index                      uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   .kibana_task_manager_1     ueE-zBPiR6CixvUDfX_9sQ   1   0          2            2       30kb           30kb
green  open   ilm-history-1-000001       WoJuE9kmQs2aYUAK8GBIHQ   1   0         18            0     25.4kb         25.4kb
green  open   .apm-agent-configuration   k8trtCzLSI2DrLnkqsuSTQ   1   0          0            0       283b           283b
green  open   .kibana_1                  HvyvaQK_RRmZHzQyrjmIDQ   1   0         12            5     47.1kb         47.1kb
yellow open   logstash-2020.09.08-000001 WPl0br1CSO2XztOCFcjjCQ   1   1       3265            0    708.3kb        708.3kb

Kibana 檢視日誌

這裡比較簡單,就是在 kibana 建立索引,關聯到資料就能看日誌了。

由於截圖太多。。。不寫了。。。