1. 程式人生 > >基於Flume+Log4j+Kafka的日誌採集架構方案

基於Flume+Log4j+Kafka的日誌採集架構方案

Flume 基本概念

Flume是一個完善、強大的日誌採集工具,關於它的配置,在網上有很多現成的例子和資料,這裡僅做簡單說明不再詳細贅述。
Flume包含Source、Channel、Sink三個最基本的概念:

Source——日誌來源,其中包括:Avro Source、Thrift Source、Exec Source、JMS Source、Spooling Directory Source、Kafka Source、NetCat Source、Sequence Generator Source、Syslog Source、HTTP Source、Stress Source、Legacy Source、Custom Source、Scribe Source以及Twitter 1% firehose Source。

Channel——日誌管道,所有從Source過來的日誌資料都會以佇列的形式存放在裡面,它包括:Memory Channel、JDBC Channel、Kafka Channel、File Channel、Spillable Memory Channel、Pseudo Transaction Channel、Custom Channel。

Sink——日誌出口,日誌將通過Sink向外發射,它包括:HDFS Sink、Hive Sink、Logger Sink、Avro Sink、Thrift Sink、IRC Sink、File Roll Sink、Null Sink、HBase Sink、Async HBase Sink、Morphline Solr Sink、Elastic Search Sink、Kite Dataset Sink、Kafka Sink、Custom Sink。

基於Flume的日誌採集是靈活的,我們可以看到既有Avro Sink也有Avro Source,既有Thrift Sink也有Thrift Source,這意味著我們可以將多個管道處理串聯起來,如下圖所示:

串聯的意義在於,我們可以將多個管道合併到一個管道中最終輸出到同一個Sink中去,如下圖:

上面講述了Source和Sink的作用,而Channel的作用在於處理不同的Sink,假設我們一個Source要對應多個Sink,則只需要為一個Source建立多個Channel即可,如下所示:

一個Source如果想要輸出到多個Sink中去,就需要建立多個Channel進行介入並最終輸出,通過上面這幾張圖,我們可以很好的理解Flume的執行機制,我們在這裡也就點到為止,詳細的配置可以在官網或者在網上搜索到、檢視到。

一般情況下,我們使用 Exec Source對log檔案進行監控,這樣做確實是比較簡單,但是並不方便,我們需要在每一臺要監控的伺服器上部署Flume,對運維來講萬一目標日誌檔案發生IO異常(例如格式改變、檔名改變、檔案被鎖),也是很痛苦的,因此我們最好能讓日誌直接通過Socket傳送出去,而不是存放在本地,這樣一來,不僅降低了目標伺服器的磁碟佔用,還能夠有效的防止檔案IO異常,而Kafka就是一個比較好的解決方案,具體的架構如下圖所示:

由上圖可以看到,日誌最終流向了兩個地方:HBase Persistence和Realtime Processor,而至於為什麼不用Kafka直接與Storm進行通訊的原因是為了將Sotrm邏輯和日誌源通過Flume進行隔離,在Storm中對日誌進行簡單的分析後,將結果扔進 Rabbit MQ 中供 WEB APP消費。

HBase Persistence就是將原始的日誌記錄在HBase中以便回檔查詢,而Realtime Processor則包含了實時的日誌統計以及錯誤異常郵件提醒等功能。

為了能夠準確的捕獲到異常資料,我們還需要對程式進行一些規範化的改造,例如提供統一的異常處理控制代碼等等。

日誌輸出格式

既然打算要對日誌進行統一處理,一個統一、規範的日誌格式就是非常重要的,而我們以往使用的 PatternLayout 對於最終欄位的切分非常的不方便,如下所示:

2016-05-08 19:32:55,572 [INFO ] [main] - [com.banksteel.log.demo.log4j.Demo.main(Demo.java:13)] 輸出資訊……
2016-05-08 19:32:55,766 [DEBUG] [main] - [com.banksteel.log.demo.log4j.Demo.main(Demo.java:15)] 除錯資訊……
2016-05-08 19:32:55,775 [WARN ] [main] - [com.banksteel.log.demo.log4j.Demo.main(Demo.java:16)] 警告資訊……
2016-05-08 19:32:55,783 [ERROR] [main] - [com.banksteel.log.demo.log4j.Demo.main(Demo.java:20)] 處理業務邏輯的時候發生一個錯誤……
java.lang.Exception: 錯誤訊息啊
at com.banksteel.log.demo.log4j.Demo.main(Demo.java:18)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

如何去解析這個日誌,是個非常頭疼的地方,萬一某個系統的開發人員輸出的日誌不符合既定規範的 PatternLayout 就會引發異常。

為了能夠一勞永逸的解決格式問題,我們採用 JsonLayout 就能很好的規範日誌輸出,例如LOG4J 2.X 版本中提供的 JsonLayout 輸出的格式如下所示:

{
  "timeMillis" : 1462712870612,
  "thread" : "main",
  "level" : "FATAL",
  "loggerName" : "com.banksteel.log.demo.log4j2.Demo",
  "message" : "發生了一個可能會影響程式繼續執行下去的異常!",
  "thrown" : {
    "commonElementCount" : 0,
    "localizedMessage" : "錯誤訊息啊",
    "message" : "錯誤訊息啊",
    "name" : "java.lang.Exception",
    "extendedStackTrace" : [ {
      "class" : "com.banksteel.log.demo.log4j2.Demo",
      "method" : "main",
      "file" : "Demo.java",
      "line" : 20,
      "exact" : true,
      "location" : "classes/",
      "version" : "?"
    }, {
      "class" : "sun.reflect.NativeMethodAccessorImpl",
      "method" : "invoke0",
      "file" : "NativeMethodAccessorImpl.java",
      "line" : -2,
      "exact" : false,
      "location" : "?",
      "version" : "1.7.0_80"
    }, {
      "class" : "sun.reflect.NativeMethodAccessorImpl",
      "method" : "invoke",
      "file" : "NativeMethodAccessorImpl.java",
      "line" : 57,
      "exact" : false,
      "location" : "?",
      "version" : "1.7.0_80"
    }, {
      "class" : "sun.reflect.DelegatingMethodAccessorImpl",
      "method" : "invoke",
      "file" : "DelegatingMethodAccessorImpl.java",
      "line" : 43,
      "exact" : false,
      "location" : "?",
      "version" : "1.7.0_80"
    }, {
      "class" : "java.lang.reflect.Method",
      "method" : "invoke",
      "file" : "Method.java",
      "line" : 606,
      "exact" : false,
      "location" : "?",
      "version" : "1.7.0_80"
    }, {
      "class" : "com.intellij.rt.execution.application.AppMain",
      "method" : "main",
      "file" : "AppMain.java",
      "line" : 144,
      "exact" : true,
      "location" : "idea_rt.jar",
      "version" : "?"
    } ]
  },
  "endOfBatch" : false,
  "loggerFqcn" : "org.apache.logging.log4j.spi.AbstractLogger",
  "source" : {
    "class" : "com.banksteel.log.demo.log4j2.Demo",
    "method" : "main",
    "file" : "Demo.java",
    "line" : 23
  }
} 

我們看到,這種格式,無論用什麼語言都能輕鬆解析了。

日誌框架的Kafka整合

我們這裡只用log4j 1.x 和 log4j 2.x 進行示例。

log4j 1.x 與 Kafka 整合

首先POM.xml的內容如下:

<dependencies>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-core</artifactId>
        <version>2.7.4</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.7.4</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-annotations</artifactId>
        <version>2.7.4</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.8.2.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.8.2.1</version>
    </dependency>
</dependencies> 

注意,我們這裡使用的Kafka版本號是0.8.2.1,但是對應0.9.0.1是可以使用的並且0.9.0.1也只能用0.8.2.1才不會發生異常(具體異常可以自己嘗試一下)。

而log4j 1.x 本身是沒有 JsonLayout 可用的,因此我們需要自己實現一個類,如下所示:

package com.banksteel.log.demo.log4j;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.log4j.Layout;
import org.apache.log4j.spi.LoggingEvent;

import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

/**
 * 擴充套件Log4j 1.x,使其支援 JsonLayout,與 log4j2.x 一樣是基於Jackson進行解析,其格式也是完全參考 Log4J 2.x實現的。
 *
 * @author 熱血BUG男
 * @version 1.0.0
 * @since Created by gebug on 2016/5/8.
 */
public class JsonLayout extends Layout {

    private final ObjectMapper mapper = new ObjectMapper();

    public String format(LoggingEvent loggingEvent) {
        String json;
        Map<String, Object> map = new LinkedHashMap<String, Object>(0);
        Map<String, Object> source = new LinkedHashMap<String, Object>(0);
        source.put("method", loggingEvent.getLocationInformation().getMethodName());
        source.put("class", loggingEvent.getLocationInformation().getClassName());
        source.put("file", loggingEvent.getLocationInformation().getFileName());
        source.put("line", safeParse(loggingEvent.getLocationInformation().getLineNumber()));

        map.put("timeMillis", loggingEvent.getTimeStamp());
        map.put("thread", loggingEvent.getThreadName());
        map.put("level", loggingEvent.getLevel().toString());
        map.put("loggerName", loggingEvent.getLocationInformation().getClassName());
        map.put("source", source);
        map.put("endOfBatch", false);
        map.put("loggerFqcn", loggingEvent.getFQNOfLoggerClass());


        map.put("message", safeToString(loggingEvent.getMessage()));
        map.put("thrown", formatThrowable(loggingEvent));
        try {
            json = mapper.writeValueAsString(map);
        } catch (JsonProcessingException e) {
            return e.getMessage();
        }
        return json;
    }

    private List<Map<String, Object>> formatThrowable(LoggingEvent le) {
        if (le.getThrowableInformation() == null ||
                le.getThrowableInformation().getThrowable() == null)
            return null;
        List<Map<String, Object>> traces = new LinkedList<Map<String, Object>>();
        Map<String, Object> throwableMap = new LinkedHashMap<String, Object>(0);
        StackTraceElement[] stackTraceElements = le.getThrowableInformation().getThrowable().getStackTrace();
        for (StackTraceElement stackTraceElement : stackTraceElements) {
            throwableMap.put("class", stackTraceElement.getClassName());
            throwableMap.put("file", stackTraceElement.getFileName());
            throwableMap.put("line", stackTraceElement.getLineNumber());
            throwableMap.put("method", stackTraceElement.getMethodName());
            throwableMap.put("location", "?");
            throwableMap.put("version", "?");
            traces.add(throwableMap);
        }
        return traces;
    }

    private static String safeToString(Object obj) {
        if (obj == null) return null;
        try {
            return obj.toString();
        } catch (Throwable t) {
            return "Error getting message: " + t.getMessage();
        }
    }

    private static Integer safeParse(String obj) {
        try {
            return Integer.parseInt(obj.toString());
        } catch (NumberFormatException t) {
            return null;
        }
    }

    public boolean ignoresThrowable() {
        return false;
    }

    public void activateOptions() {

    }
}

其實並不複雜,注意其中有一些獲取不到的資訊,用?代替了,保留欄位的目的在於與log4j 2.x 的日誌格式完全一致,配置log4j.properties如下對接 Kafka:

log4j.rootLogger=INFO,console
log4j.logger.com.banksteel.log.demo.log4j=DEBUG,kafka
log4j.appender.kafka=kafka.producer.KafkaLog4jAppender
log4j.appender.kafka.topic=server_log
log4j.appender.kafka.brokerList=Kafka-01:9092,Kafka-02:9092,Kafka-03:9092
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.syncSend=true
log4j.appender.kafka.layout=com.banksteel.log.demo.log4j.JsonLayout

# appender console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n 

通過列印日誌我們可以看到其輸出的最終格式如下:

{
  "timeMillis": 1462713132695,
  "thread": "main",
  "level": "ERROR",
  "loggerName": "com.banksteel.log.demo.log4j.Demo",
  "source": {
    "method": "main",
    "class": "com.banksteel.log.demo.log4j.Demo",
    "file": "Demo.java",
    "line": 20
  },
  "endOfBatch": false,
  "loggerFqcn": "org.slf4j.impl.Log4jLoggerAdapter",
  "message": "處理業務邏輯的時候發生一個錯誤……",
  "thrown": [
    {
      "class": "com.intellij.rt.execution.application.AppMain",
      "file": "AppMain.java",
      "line": 144,
      "method": "main",
      "location": "?",
      "version": "?"
    },
    {
      "class": "com.intellij.rt.execution.application.AppMain",
      "file": "AppMain.java",
      "line": 144,
      "method": "main",
      "location": "?",
      "version": "?"
    },
    {
      "class": "com.intellij.rt.execution.application.AppMain",
      "file": "AppMain.java",
      "line": 144,
      "method": "main",
      "location": "?",
      "version": "?"
    },
    {
      "class": "com.intellij.rt.execution.application.AppMain",
      "file": "AppMain.java",
      "line": 144,
      "method": "main",
      "location": "?",
      "version": "?"
    },
    {
      "class": "com.intellij.rt.execution.application.AppMain",
      "file": "AppMain.java",
      "line": 144,
      "method": "main",
      "location": "?",
      "version": "?"
    },
    {
      "class": "com.intellij.rt.execution.application.AppMain",
      "file": "AppMain.java",
      "line": 144,
      "method": "main",
      "location": "?",
      "version": "?"
    }
  ]
}
複製程式碼

測試類:

複製程式碼
package com.banksteel.log.demo.log4j;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author 熱血BUG男
 * @version 1.0.0
 * @since Created by gebug on 2016/5/8.
 */
public class Demo {
    private static final Logger logger = LoggerFactory.getLogger(Demo.class);

    public static void main(String[] args) {
        logger.info("輸出資訊……");
        logger.trace("隨意列印……");
        logger.debug("除錯資訊……");
        logger.warn("警告資訊……");
        try {
            throw new Exception("錯誤訊息啊");
        } catch (Exception e) {
            logger.error("處理業務邏輯的時候發生一個錯誤……", e);
        }
    }
} 

log4j 2.x 與 Kafka 整合

log4j 2.x 天生支援 JsonLayout,並且與 Kafka 整合方便,我們只需要按部就班的配置一下就好了,POM.xml如下:

<dependencies>
  <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-api</artifactId>
      <version>2.5</version>
  </dependency>
  <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-core</artifactId>
      <version>2.5</version>
  </dependency>
  <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-core</artifactId>
      <version>2.7.4</version>
  </dependency>
  <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.7.4</version>
  </dependency>
  <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-annotations</artifactId>
      <version>2.7.4</version>
  </dependency>
  <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>0.9.0.1</version>
  </dependency>
</dependencies>

log4j2.xml配置檔案如下所示:

<?xml version="1.0" encoding="UTF-8"?>
<!-- Log4j2 的配置檔案 -->
<Configuration status="DEBUG" strict="true" name="LOG4J2_DEMO" packages="com.banksteel.log.demo.log4j2">
    <properties>
        <property name="logPath">log</property>
    </properties>

    <Appenders>
        <!--配置控制檯輸出樣式-->
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="%highlight{%d{yyyy-MM-dd HH:mm:ss} %d{UNIX_MILLIS} [%t] %-5p %C{1.}:%L - %msg%n}"/>
        </Console>
        <!-- 配置Kafka日誌主動採集,Storm會將日誌解析成欄位存放在HBase中。 -->
        <Kafka name="Kafka" topic="server_log">
            <!--使用JSON傳輸日誌檔案-->
            <JsonLayout complete="true" locationInfo="true"/>
            <!--Kafka叢集配置,需要在本機配置Hosts檔案,或者通過Nginx配置-->
            <Property name="bootstrap.servers">Kafka-01:9092,Kafka-02:9092,Kafka-03:9092</Property>
        </Kafka>
    </Appenders>
    <Loggers>
        <Root level="DEBUG">
            <!--啟用控制檯輸出日誌-->
            <AppenderRef ref="Console"/>
            <!--啟用Kafka採集日誌-->
            <AppenderRef ref="Kafka"/>
        </Root>
    </Loggers>
</Configuration>

這樣就Okay了,我們可以在Kafka中看到完整的輸出:

{
  "timeMillis" : 1462712870591,
  "thread" : "main",
  "level" : "ERROR",
  "loggerName" : "com.banksteel.log.demo.log4j2.Demo",
  "message" : "處理業務邏輯的時候發生一個錯誤……",
  "thrown" : {
    "commonElementCount" : 0,
    "localizedMessage" : "錯誤訊息啊",
    "message" : "錯誤訊息啊",
    "name" : "java.lang.Exception",
    "extendedStackTrace" : [ {
      "class" : "com.banksteel.log.demo.log4j2.Demo",
      "method" : "main",
      "file" : "Demo.java",
      "line" : 20,
      "exact" : true,
      "location" : "classes/",
      "version" : "?"
    }, {
      "class" : "sun.reflect.NativeMethodAccessorImpl",
      "method" : "invoke0",
      "file" : "NativeMethodAccessorImpl.java",
      "line" : -2,
      "exact" : false,
      "location" : "?",
      "version" : "1.7.0_80"
    }, {
      "class" : "sun.reflect.NativeMethodAccessorImpl",
      "method" : "invoke",
      "file" : "NativeMethodAccessorImpl.java",
      "line" : 57,
      "exact" : false,
      "location" : "?",
      "version" : "1.7.0_80"
    }, {
      "class" : "sun.reflect.DelegatingMethodAccessorImpl",
      "method" : "invoke",
      "file" : "DelegatingMethodAccessorImpl.java",
      "line" : 43,
      "exact" : false,
      "location" : "?",
      "version" : "1.7.0_80"
    }, {
      "class" : "java.lang.reflect.Method",
      "method" : "invoke",
      "file" : "Method.java",
      "line" : 606,
      "exact" : false,
      "location" : "?",
      "version" : "1.7.0_80"
    }, {
      "class" : "com.intellij.rt.execution.application.AppMain",
      "method" : "main",
      "file" : "AppMain.java",
      "line" : 144,
      "exact" : true,
      "location" : "idea_rt.jar",
      "version" : "?"
    } ]
  },
  "endOfBatch" : false,
  "loggerFqcn" : "org.apache.logging.log4j.spi.AbstractLogger",
  "source" : {
    "class" : "com.banksteel.log.demo.log4j2.Demo",
    "method" : "main",
    "file" : "Demo.java",
    "line" : 22
  }
}

為了減少日誌對空間的佔用,我們通常還會設定JSONLayout的compact屬性為true,這樣在kafka中獲得的日誌將會排除掉空格和換行符。 

最後

由於在實際開發中,我們會引入多個第三方依賴,這些依賴往往也會依賴無數的log日誌框架,為了保證測試通過,請認清本文例子中的包名以及版本號,log4j 1.x 的 Json 輸出是為了完全模擬 2.x 的欄位,因此部分欄位用?代替,如果想要完美,請自行解決。

隨便解釋一下日誌級別,以便建立規範:

log.error 錯誤資訊,通常寫在 catch 中,可以使用 log.error("發生了一個錯誤",e) 來記錄詳細的異常堆疊

log.fatal 嚴重錯誤,該級別的錯誤用來記錄會導致程式異常退出的錯誤日誌。

log.warn 警告

log.info 資訊

log.trace 簡單輸出文字

log.debug 除錯資訊

Log4j 的詳細介紹請點這裡
Log4j 的下載地址請點這裡