1. 程式人生 > >根據需求對LEK進行改進

根據需求對LEK進行改進

  1. LEK解釋
    ELK是三個開源軟體的縮寫,分別表示:Elasticsearch , Logstash, Kibana。
    利用Logstash對日誌檔案中日誌進行收集,收集的資料存入Elasticsearch中,然後通過前端Kibana進入資料彙總、分析和展示。

  2. 問題
    1、由於公司系統日誌量太大,擔心收集所有日誌直接插入ES會承受不了。
    2、由於公司系統需要把收集到的日誌根據一個唯一編號進行關聯,才能知道某一個流程是否出現問題,問題出現在哪一個步驟。所以Kibana不能滿足展示需求。
    3、需要對系統單量進行監控,及時報警異常情況。

  3. 解決方案
    1、對問題1解決方案:在系統中進行重要日誌埋點,只收集埋點日誌。並且每臺業務系統收集到的日誌不直接插入Elasticsearch中,而是中間加了一個redis緩衝,在用Logstash的server進行推送到Elasticsearch中。
    2、對問題2解決方案:定製開發前端頁面展示,從Elasticsearch中根據各種條件查詢資料並展示到頁面中。
    3、對問題3解決方案:收集的埋點日誌存入redis緩衝同時在傳送一份到kafka中,利用流計算平臺進行資料統計,滿足異常情況就傳送報警訊息。

  4. 流程圖
    在這裡插入圖片描述

  5. 流程中相關程式碼
    1、利用Logstash過濾器只收集埋點日誌

#利用過濾器只收集埋點日誌
filter {
	if [message] =~ /.*monitorLogsMsg.*/{ 
		mutate {
			split => { "message" => "monitorLogsMsg" }
			add_field => {
				"newmsg" => "%{[message][1]}"
				"classmsg" => "%{[message[0]]}"
			}
		}
		json {
			source => "newmsg"
			remove_field => [ "message" ]
			remove_field => [ "newmsg" ]
		  }
	}
}

2、加redis緩衝併發送一份日誌資料到kafka中

output {
	#中間加了一個redis緩衝
	redis {
		host => "10.xx.xx.xx"
		port => 6379
		password => "password"
		data_type => "list"
		key => "keys_word"
		codec => "json"
	}
	#傳送一份到kafka中
	kafka{
                topic_id => "realTimeCount"
                bootstrap_servers => "10.xx.xx.xx:9092"
                batch_size => 5
        }
}