實時日誌監控系統-全覽
阿新 • • 發佈:2019-01-01
大資料處理,大致可以分為兩大模組:
- 離線資料處理:比如說電商、運營商出現的大批量的日誌,可以由flume、sqoop或者其他路徑,匯入到HDFS中,然後經過資料清洗,使用Hive進行分析和處理,對於優化伺服器資源等有很好的作用;個人覺得,支付寶的年賬單就是離線資料處理的應用之處了。
- 實時資料處理:對於有些業務需要,可能第二天或者更晚的時候進行分析無關緊要,但對於一些高頻的金融交易來說,實時性就太重要了,還有一些如百度搜索的top10,新浪微博的微博熱點等等,如果等到第二天處理,那這些新聞也沒什麼吸引的價值了。
所以,縱觀來說,離線資料處理和實時資料處理撐起了大資料處理的一片天,本文將介紹本人親自負責並予以實施的日誌監控專案,麻雀雖小,五臟俱全。
主要模組
- 日誌收集模組
- 日誌處理模組
主要工具
- flume:用於日誌的收集,堪稱是業內最好的日誌收集工具,支援多種日誌收集的渠道,同時支援諸多的日誌收集存放地,功能強大;官方連結:flume官網
- kafka:訊息緩衝佇列,大資料處理中常用的緩衝佇列,用於資料爆炸的時候,避免拖垮後續的處理邏輯,將訊息先存放到佇列中,延遲一定的時間進行處理。
- log4j:我們在Tomcat伺服器上部署的業務系統,需要指定flume-appender,因此需要使用到log4j。
- SparkStreaming:在第一版本中,由於實時性不是很強,因此使用該工具予以處理,其處理日誌會有一定的延遲,但吞吐量較大。
- MySql:用於讀取配置資料,已經將配置資料全部遷移到zookeeper上。
- Spring boot:構建資料配置服務,方便使用者配置自己的日誌資料,比如郵件發給何人,簡訊發給何人,都可以自由指定。
- zookeeper:資料配置中心,在本專案用途中,主要是用於配置資料的管理,官方連結:zookeeper官網
1:日誌收集模組
在日誌收集模組中,針對我們自身的業務,可以分為兩大部分:
- Nginx日誌和資料庫執行日誌:首先是Nginx,作為業內比較強大的負責均衡工具,其效能比較優良,我們在日常的服務中,也是使用該工具來進行負載均衡的功能實現;插播一句,業內另一比較強大的負載均衡工具是淘寶的章文嵩博士開發的LVS,對於訪問量不是很大的網站,使用Nginx完全可以實現功能;為了能夠準確處理出錯的日誌,我們對日誌格式進行了一定的定義,類似下圖:
- 對於Tomcat型別的服務,選擇使用log4j內建的flume-appender方式來實現,具體配置可以參考官網:https://logging.apache.org/log4j/2.x/manual/appenders.html#FlumeAppender;其中有很詳細的flume-appender配置,在日誌中配置合理,每一條日誌都會按照相應的格式,作為flume收集日誌的來源。
對於收集到的日誌,統一採用kafkaSink的方式,輸送到後續的kafka中,以備後續的處理。
關於日誌的收集,在處理過程中有幾點收穫:
- 對於flume的收集渠道有了更加深入的理解,flume不愧是強大的工具,支援的收集渠道非常多,而且支援的型別也很多,我們在收集nginx日誌的時候,配置的type為exec,即命令執行方式,其會執行該命令,把需要監控的日誌實時進行讀取,配置如下:
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /var/log/secure a1.sources.r1.channels = c1
- 對於tail命令,支援同時讀取多個日誌檔案,會統一把這些日誌輸送到同一個源,輸送到目的地。
- 攔截器的使用:有時候,收集到的日誌並不是完全如我們的意願,這時候,攔截器就派上了用場,我們在plugins.d目錄下,部署了自己的jar包,用於攔截讀取到的日誌,進行第二步驟的處理;而且攔截器支援鏈式,即多個攔截器會依次處理收集到的日誌。
2:日誌處理模組
對於收集到的日誌的處理,我們採用的是Spark-Streaming工具,將其與kafka對接,對於收集到的每一條資料進行處理:
public void startTask() {
//新建sparkConf
SparkConf conf = new SparkConf().setAppName(ConfigUtils.SPARK_APPNAME);
conf.setMaster("local[4]");// 本地多執行緒呼叫
// conf.setMaster(ConfigUtils.SPARK_MASTER);//叢集呼叫
//製作StreamingContext
JavaStreamingContext jsc = new JavaStreamingContext(conf,
Durations.seconds(Long.valueOf(ConfigUtils.SPARK_DURATIONS)));
Map<String, String> kafaParameters = new HashMap<String, String>();
//部署kafka機器的ip及埠號
kafaParameters.put("metadata.broker.list", ConfigUtils.KAFKA_BROKER);
//消費組的groupId
kafaParameters.put("group.id", ConfigUtils.KAFKA_GROUPID);
kafaParameters.put("fetch.message.max.bytes", ConfigUtils.KAFKA_FETCH_MAX);
kafaParameters.put("num.consumer.fetchers", ConfigUtils.KAFKA_FETCH_NUM);
Set<String> topics = new HashSet<String>();
topics.add(ConfigUtils.KAFKA_TOPIC);
try {
//指定直連,消費kafka某個topic內的資料
JavaPairInputDStream<String, String> lines = KafkaUtils.createDirectStream(jsc, String.class, String.class,
StringDecoder.class, StringDecoder.class, kafaParameters, topics);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {
public Iterator<String> call(Tuple2<String, String> tuple) throws Exception {
// log.info("接收kafka資料:" + tuple._2);
return Arrays.asList(tuple._2.split(SPACE.pattern())).iterator();
}
});
words.foreachRDD(new VoidFunction2<JavaRDD<String>, Time>() {
public void call(JavaRDD<String> word, Time arg1) throws Exception {
// TODO Auto-generated method stub
process(word);
}
});
}catch(Exception e) {
e.printStackTrace();
}
}
這裡,主要是將SparkStreaming與kafka對接起來的實現,需要指定消費組的group id,需要指定消費的topic,指定消費的機器,最重要的一步就是建立接下來需要進行處理的JavaRDD,其實,spark最核心的概念就是rdd的處理,其SparkStreaming,實際上處理的也就是一段時間內產生的RDD而已。
對於上述的程式碼中一些問題予以優化下:
try {
JavaPairInputDStream<String, String> lines = KafkaUtils
.createDirectStream(jsc, String.class, String.class,
StringDecoder.class, StringDecoder.class,
kafaParameters, topics);
lines.foreachRDD(new VoidFunction<JavaPairRDD<String, String>>() {
@Override
public void call(JavaPairRDD<String, String> t)
throws Exception {
t.foreachPartition(new VoidFunction<Iterator<Tuple2<String, String>>>() {
@Override
public void call(Iterator<Tuple2<String, String>> t)
throws Exception {
while (t.hasNext()) {
String res = t.next()._2;
try {
// 這裡,很重要的一點是,到底要不要輸出日誌
if (flag) {
log.info("read kafka message:" + res);
}
process(res);
} catch (Exception e) {
log.info(res + "------處理異常------"
+ getExeptionMessage(e));
}
}
}
});
}
});
} catch (Exception e) {
e.printStackTrace();
}
更新了其中的運算元,爭取能夠提高效率:接下來的處理,則是對收集到的日誌,進行自己的處理,在此處不予贅述。
專案總結:本專案其實難度並不大,重點在於攔截器的設定,kafka叢集的搭建,後續處理的完善,以及如何形成spark與kafka資料的對接等方面。