1. 程式人生 > >elk+kafka 分散式日誌採集系統設計

elk+kafka 分散式日誌採集系統設計

Filebeat (每個微服務啟動一個)--->Kafka叢集--->Logstash(one)-->Elasticsearch叢集

一、資料流從檔案端到Kafka 叢集端,通過Filebeat

1.下載 Filebeat

#cd  /opt/filebeat-6.3.2-linux-x86_64

filebeat.inputs:
- type: log
  enabled: true
  paths:
#    - /var/log/*.log
     - /home/kexin/out.log            #監聽的日誌檔案
filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
setup.template.settings:
  index.number_of_shards: 3
output.kafka:
  enabled: true
  hosts: ["10.10.100.215:9092","10.10.100.216:9092","10.10.100.217:9092"]     # kafka叢集配置
  topic: "caolihua01"   #kafka 訂閱的主題

 2.啟動Filebeat 

#  ./filebeat -c filebeat.yml 啟動Filebeat 

向監聽的檔案out.log 寫入資料

#echo "別瞎搞3" >> out.log 

 

3. 通過kafka-eagle檢視寫入kafka的訊息

 

檢視訊息:

二、資料流從Kafka 叢集到Elasticsearch端 ,通過logstah

logstash配置和啟動

 #cd /opt/logstash-6.3.2/bin

# ./logstash -f ../config/logstash.conf

input {
        kafka {
                bootstrap_servers => "10.10.100.215:9092,10.10.100.216:9092,10.10.100.217:9092"
                topics => "caolihua01"
        }
}
filter {
        ruby {
                message => "event.timestamp.time.localtime"
      }
}
output {
        elasticsearch {
                hosts => ["10.10.100.215:9200","10.10.100.216:9200","10.10.100.217:9200"]
                index => "kx_es_index"
        }
}

三、寫入資料測試

# echo "別瞎搞啊001" >> out.log

該資料流轉過程: 

   out.log-->filebeat--->kafka-->logstash--->es ,通過kibana查詢

POST /kx_es_index/_search
{
  "sort": [
    {
      "@timestamp": {
        "order": "desc"
      }
    }
  ]
}

檢視結果:可以看到寫入ES 

成功寫入ES後,

檢視kafka 佇列:

SELECT "partition","offset","msg" FROM "caolihua01" WHERE "partition" IN (0,1,2,3)    limit 10

該訊息被消費掉了,所以kafka是查詢不到的。