根據需求對LEK進行改進
阿新 • • 發佈:2019-01-04
-
LEK解釋
ELK是三個開源軟體的縮寫,分別表示:Elasticsearch , Logstash, Kibana。
利用Logstash對日誌檔案中日誌進行收集,收集的資料存入Elasticsearch中,然後通過前端Kibana進入資料彙總、分析和展示。 -
問題
1、由於公司系統日誌量太大,擔心收集所有日誌直接插入ES會承受不了。
2、由於公司系統需要把收集到的日誌根據一個唯一編號進行關聯,才能知道某一個流程是否出現問題,問題出現在哪一個步驟。所以Kibana不能滿足展示需求。
3、需要對系統單量進行監控,及時報警異常情況。 -
解決方案
1、對問題1解決方案:在系統中進行重要日誌埋點,只收集埋點日誌。並且每臺業務系統收集到的日誌不直接插入Elasticsearch中,而是中間加了一個redis緩衝,在用Logstash的server進行推送到Elasticsearch中。
2、對問題2解決方案:定製開發前端頁面展示,從Elasticsearch中根據各種條件查詢資料並展示到頁面中。
3、對問題3解決方案:收集的埋點日誌存入redis緩衝同時在傳送一份到kafka中,利用流計算平臺進行資料統計,滿足異常情況就傳送報警訊息。 -
流程圖
-
流程中相關程式碼
1、利用Logstash過濾器只收集埋點日誌
#利用過濾器只收集埋點日誌 filter { if [message] =~ /.*monitorLogsMsg.*/{ mutate { split => { "message" => "monitorLogsMsg" } add_field => { "newmsg" => "%{[message][1]}" "classmsg" => "%{[message[0]]}" } } json { source => "newmsg" remove_field => [ "message" ] remove_field => [ "newmsg" ] } } }
2、加redis緩衝併發送一份日誌資料到kafka中
output { #中間加了一個redis緩衝 redis { host => "10.xx.xx.xx" port => 6379 password => "password" data_type => "list" key => "keys_word" codec => "json" } #傳送一份到kafka中 kafka{ topic_id => "realTimeCount" bootstrap_servers => "10.xx.xx.xx:9092" batch_size => 5 } }