ELK stack 7.6.x + kafka構建日誌平臺
背景
使用 ELK Stack + kafka 搭建一個日誌系統。
日誌是不可預測的。在生產環境發生故障需要看日誌時,日誌可能突然激增並淹沒您的日誌記錄基礎結構。為了保護 Logstash 和Elasticsearch 免受此類資料突發攻擊,需要部署緩衝機制以充當訊息代理。
Apache Kafka是與ELK Stack一起部署的最常見的代理解決方案。通常,Kafka部署在託運人和索引器之間,用作收集資料的入口點:
graph LR filebeat1 --log --> kafka filebeat2 --log --> kafka filebeat3 --log --> kafka kafka --> logstash logstash --> elasticsearch elasticsearch --> kibana本文將展示如何使用 ELK Stack 和 Kafka 部署建立彈性資料管道所需的所有元件:
- Filebeat –收集日誌並將其轉發到 Kafka 主題。
- Kafka –代理資料流並將其排隊。
- Logstash –彙總來自 Kafka 主題的資料,對其進行處理並將其傳送到 Elasticsearch。
- Elasticsearch –索引資料。
- Kibana –用於分析資料
基礎環境
基於 CentOS 7.6 ,一個 Spring boot 的日誌作為例子。 /data/spring/gateway/nohup.out
ELK Stack 7.6.x
JDK 1.8 請自行安裝
Elasticsearch
目錄統一在 /data/elasticsearch
安裝
wget https://mirrors.huaweicloud.com/elasticsearch/7.6.2/elasticsearch-7.6.2-x86_64.rpm rpm -ivh elasticsearch-7.6.2-x86_64.rpm
配置
/etc/elasticsearch/jvm.options
[root@es1 elasticsearch]# egrep -v "^#|^$" /etc/elasticsearch/jvm.options -Xms16g -Xmx16g 8-13:-XX:+UseConcMarkSweepGC 8-13:-XX:CMSInitiatingOccupancyFraction=75 8-13:-XX:+UseCMSInitiatingOccupancyOnly 14-:-XX:+UseG1GC 14-:-XX:G1ReservePercent=25 14-:-XX:InitiatingHeapOccupancyPercent=30 -Djava.io.tmpdir=${ES_TMPDIR} -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/data/elasticsearch -XX:ErrorFile=/data/elasticsearch/log/hs_err_pid%p.log 8:-XX:+PrintGCDetails 8:-XX:+PrintGCDateStamps 8:-XX:+PrintTenuringDistribution 8:-XX:+PrintGCApplicationStoppedTime 8:-Xloggc:/data/elasticsearch/log/gc.log 8:-XX:+UseGCLogFileRotation 8:-XX:NumberOfGCLogFiles=32 8:-XX:GCLogFileSize=64m 9-:-Xlog:gc*,gc+age=trace,safepoint:file=/data/elasticsearch/log/gc.log:utctime,pid,tags:filecount=32,filesize=64m
/etc/elasticsearch/elasticsearch.yml
[root@es1 elasticsearch]# egrep -v "^#|^$" /etc/elasticsearch/elasticsearch.yml
cluster.name: smy
node.name: node-1
path.data: /data/elasticsearch
path.logs: /data/elasticsearch/log
network.host: 192.168.252.174
http.port: 9200
discovery.seed_hosts: ["127.0.0.1", "[::1]"]
cluster.initial_master_nodes: ["node-1"]
http.cors.enabled: true
http.cors.allow-origin: /.*/
啟動
systemctl start elasticsearch.service
systemctl status elasticsearch.service
檢測
瀏覽器訪問http://ip:9200
, 可以看到類似如下輸出的內容
{
"name" : "node-1",
"cluster_name" : "smy",
"cluster_uuid" : "wsjadseSQkqQQDkVz1gRgw",
"version" : {
"number" : "7.6.2",
"build_flavor" : "default",
"build_type" : "rpm",
"build_hash" : "ef48eb35cf30adf4db14086e8aabd07ef6fb113f",
"build_date" : "2020-03-26T06:34:37.794943Z",
"build_snapshot" : false,
"lucene_version" : "8.4.0",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
}
Logstash
接下來, 是 Elk + Logstash 中的 "L" 。
安裝
wget https://mirrors.cloud.tencent.com/elasticstack/7.x/yum/7.6.2/logstash-7.6.2.rpm
rpm -ivh logstash-7.6.2.rpm
配置
接下來,我們將配置一個 Logstash 管道,該管道從 Kafka 主題提取日誌,處理這些日誌並將其運送到 Elasticsearch 進行索引。
讓我們建立一個新的配置檔案:
vim /etc/logstash/conf.d/gateway.conf
內容如下:
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => "apache"
}
}
filter {
grok {
match => [ "message",
"(?<timestamp>%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{TIME}) %{LOGLEVEL:level} %{NUMBER:pid} --- \[(?<thread>[A-Za-z0-9-]+)\] [A-Za-z0-9.]*\.(?<class>[A-Za-z0-9#_]+)\s*:\s+(?<logmessage>.*)",
"message",
"(?<timestamp>%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{TIME}) %{LOGLEVEL:level} %{NUMBER:pid} --- .+? :\s+(?<logmessage>.*)"
]
}
#Parsing out timestamps which are in timestamp field thanks to previous grok section
date {
match => [ "timestamp" , "yyyy-MM-dd HH:mm:ss.SSS" ]
}
}
output {
elasticsearch {
hosts => ["192.168.252.174:9200"]
}
}
我們使用 Logstash Kafka 輸入外掛來定義 Kafka 主機和我們希望 Logstash 提取的主題。我們正在對日誌應用一些篩選,並且將資料運送到本地 Elasticsearch 例項。
儲存檔案。
Kibana
接下來, 是 Elk + Logstash 中的 "K" 。
安裝
wget https://mirrors.huaweicloud.com/kibana/7.6.2/kibana-7.6.2-x86_64.rpm
rpm -ivh ibana-7.6.2-x86_64.rpm
配置
[root@wlj174 ~]# egrep -v "^#|^$" /etc/kibana/kibana.yml
server.port: 5601
server.host: "192.168.252.174"
elasticsearch.hosts: ["http://192.168.252.174:9200"]
kibana.index: ".kibana"
啟動
systemctl start kibana
瀏覽器開啟 http://ip:5601
,即可開啟 Kibana 首頁
**Filebeat **
安裝
wget https://mirrors.cloud.tencent.com/elasticstack/7.x/yum/7.6.2/filebeat-7.6.2-x86_64.rpm
rpm -ivh filebeat-7.6.2-x86_64.rpm
配置
[root@iZ1767b3udZ filebeat]# cat /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /data/spring/gateway/nohup.out
output.kafka:
codec.format:
string: '%{[@timestamp]} %{[message]}'
hosts: ["192.168.252.174:9092"]
topic: apache
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
在輸入部分,告訴 Filebeat 要收集哪些日誌 gateway 的訪問日誌。在輸出部分,告訴 Filebeat 將資料轉發到我們的本地Kafka 伺服器和相關主題(將在下一步中安裝)。
請注意 codec.format 指令的使用-這是為了確保正確提取 message和 timestamp 欄位。否則,這些行將以 JSON 傳送到 Kafka。
儲存檔案。
Kafka
最後來安裝我們的 Kafka ,Kafka 使用 zookeeper 來維護配置資訊及同步。我們使用現有的 zookeeper
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.6.0/kafka_2.12-2.6.0.tgz
tar zxvf kafka_2.12-2.6.0.tgz
只需要修改配置檔案 server.properties
的 zookeeper.connect=192.168.252.174:2181
,修改為你自己的 zookeeper 地址即可。
啟動
nohup bin/kafka-server-start.sh config/server.properties &
你會看到一些輸出:
[2020-09-07 21:38:06,559] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2020-09-07 21:38:06,564] INFO starting (kafka.server.KafkaServer)
[2020-09-07 21:38:06,565] INFO Connecting to zookeeper on 192.168.252.174:2181 (kafka.server.KafkaServer)
[2020-09-07 21:38:06,583] INFO [ZooKeeperClient Kafka server] Initializing a new session to 192.168.252.174:2181. (kafka.zookeeper.ZooKeeperClient)
[2020-09-07 21:38:06,590] INFO Client environment:zookeeper.version=3.5.8-f439ca583e70862c3068a1f2a7d4d068eec33315, built on 05/04/2020 15:53 GMT (org.apache.zookeeper.ZooKeeper)
...
...
接下來為 gateway 日誌建立一個主題(topic)
bin/kafka-topics.sh --create --zookeeper localhost:2181--replication-factor 1 --partitions 1 --topic apacheCreated topic apache
啟動資料管道
啟動 Filebeat
systemctl start filebeat
啟動 Logstash
systemctl start logstash
需要幾分鐘以後才能在 Elasticsearch 或 Kibana 看到,我也不知道為什麼。。。著急的話,可以先在 Kafka 看
[root@wlj174 bin]# ./kafka-console-consumer.sh --topic apache --bootstrap-server localhost:9092
2020-09-08T14:46:27.308Z 2020-09-08 14:46:17.310 ERROR 59181 --- [or-http-epoll-4] c.w.s.f.e.s.f.StaticRequestDecorator : 解密失敗:Decryption error
2020-09-08T14:46:27.308Z java.lang.NullPointerException
2020-09-08T14:46:52.310Z 2020-09-08 14:46:50.415 ERROR 59181 --- [r-http-epoll-13] c.w.s.f.e.s.f.StaticRequestDecorator : 解密失敗:Decryption error
2020-09-08T14:46:52.310Z java.lang.NullPointerException
...
...
為了確保 Logstash 正在聚合資料並將其運送到Elasticsearch中,請使用:
curl -X GET "lip:9200/_cat/indices?v"
[root@wlj174 bin]# curl -X GET "192.168.252.174:9200/_cat/indices?v"
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
green open .kibana_task_manager_1 ueE-zBPiR6CixvUDfX_9sQ 1 0 2 2 30kb 30kb
green open ilm-history-1-000001 WoJuE9kmQs2aYUAK8GBIHQ 1 0 18 0 25.4kb 25.4kb
green open .apm-agent-configuration k8trtCzLSI2DrLnkqsuSTQ 1 0 0 0 283b 283b
green open .kibana_1 HvyvaQK_RRmZHzQyrjmIDQ 1 0 12 5 47.1kb 47.1kb
yellow open logstash-2020.09.08-000001 WPl0br1CSO2XztOCFcjjCQ 1 1 3265 0 708.3kb 708.3kb
Kibana 檢視日誌
這裡比較簡單,就是在 kibana 建立索引,關聯到資料就能看日誌了。
由於截圖太多。。。不寫了。。。