elk+kafka 分散式日誌採集系統設計
阿新 • • 發佈:2019-01-11
Filebeat (每個微服務啟動一個)--->Kafka叢集--->Logstash(one)-->Elasticsearch叢集
一、資料流從檔案端到Kafka 叢集端,通過Filebeat
1.下載 Filebeat
#cd /opt/filebeat-6.3.2-linux-x86_64
filebeat.inputs: - type: log enabled: true paths: # - /var/log/*.log - /home/kexin/out.log #監聽的日誌檔案 filebeat.config.modules: path: ${path.config}/modules.d/*.yml reload.enabled: false setup.template.settings: index.number_of_shards: 3 output.kafka: enabled: true hosts: ["10.10.100.215:9092","10.10.100.216:9092","10.10.100.217:9092"] # kafka叢集配置 topic: "caolihua01" #kafka 訂閱的主題
2.啟動Filebeat
# ./filebeat -c filebeat.yml 啟動Filebeat
向監聽的檔案out.log 寫入資料
#echo "別瞎搞3" >> out.log
3. 通過kafka-eagle檢視寫入kafka的訊息
檢視訊息:
二、資料流從Kafka 叢集到Elasticsearch端 ,通過logstah
logstash配置和啟動
#cd /opt/logstash-6.3.2/bin
# ./logstash -f ../config/logstash.conf
input { kafka { bootstrap_servers => "10.10.100.215:9092,10.10.100.216:9092,10.10.100.217:9092" topics => "caolihua01" } } filter { ruby { message => "event.timestamp.time.localtime" } } output { elasticsearch { hosts => ["10.10.100.215:9200","10.10.100.216:9200","10.10.100.217:9200"] index => "kx_es_index" } }
三、寫入資料測試
# echo "別瞎搞啊001" >> out.log
該資料流轉過程:
out.log-->filebeat--->kafka-->logstash--->es ,通過kibana查詢
POST /kx_es_index/_search
{
"sort": [
{
"@timestamp": {
"order": "desc"
}
}
]
}
檢視結果:可以看到寫入ES
成功寫入ES後,
檢視kafka 佇列:
SELECT "partition","offset","msg" FROM "caolihua01" WHERE "partition" IN (0,1,2,3) limit 10
該訊息被消費掉了,所以kafka是查詢不到的。