1. 程式人生 > >logback通過kafka接入ELK詳解

logback通過kafka接入ELK詳解

原文連結:http://blog.csdn.net/zhangruhong168/article/details/76973212

資料流向圖示參考(圖片來自百度以圖搜圖,向原作者致敬)

日誌流向

參考圖示 圖片來自網路

這裡寫圖片描述

開發人員如何使用elk

開發人員需要做的就是將日誌資訊通過各種渠道發到kafka,本例項以logback為例進行說明,其他接入方式請自行搜尋。請先行準備好可以正常使用的已經接入logback的系統,==如果你沒有準備好,本文將假裝你已經準備好。==

logback接入elk

  1. pom檔案引入如下依賴
<dependency>
    <groupId>net.logstash.logback</groupId
>
<artifactId>logstash-logback-encoder</artifactId> <version>4.11</version> </dependency> <dependency> <groupId>com.github.danielwegener</groupId> <artifactId>logback-kafka-appender</artifactId> <version>0.1.0</version
>
<scope>runtime</scope> </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

2.在logback配置檔案中增加相應的appender

<appender name="KafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
    <encoder class="com.github.danielwegener.logback.kafka.encoding.LayoutKafkaMessageEncoder">
<layout class="net.logstash.logback.layout.LogstashLayout" > <includeContext>true</includeContext> <includeCallerData>true</includeCallerData> <customFields>{"system":"test"}</customFields> <fieldNames class="net.logstash.logback.fieldnames.ShortenedFieldNames"/> </layout> <charset>UTF-8</charset> </encoder> <!--kafka topic 需要與配置檔案裡面的topic一致 否則kafka會沉默並鄙視你--> <topic>applog</topic> <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.HostNameKeyingStrategy" /> <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" /> <producerConfig>bootstrap.servers=10.39.232.218:9092</producerConfig> </appender> <!--你可能還需要加點這個玩意兒--> <logger name="Application_ERROR"> <appender-ref ref="KafkaAppender"/> </logger> <!--還有這個玩意兒--> <root> <level value="INFO" /> <appender-ref ref="CONSOLE" /> <appender-ref ref="KafkaAppender" /> </root>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  1. 程式碼中使用 
    • 如果你將KafkaAppender新增到了root標籤下 那麼你的INFO極其更嚴格的基本的日誌都將發給kafka,例如你可以這樣使用
Logger logger = LoggerFactory.getLogger(this.getClass());
logger.debug("This is a debug message"); 此條日誌收日誌級別限制 不會同步到kafka
logger.info("This is an info message");
logger.warn("This is a warn message");
logger.error("This is an error message");
  • 1
  • 2
  • 3
  • 4
  • 5
  • 如果你將KafkaAppender新增到了Application_ERROR下 那麼你可以這樣使用
Logger logger= LoggerFactory.getLogger("Application_ERROR");
logger.debug("This is a debug message");
logger.info("This is an info message");
logger.warn("This is a warn message");
logger.error("This is an error message");
  • 1
  • 2
  • 3
  • 4
  • 5
  1. 資料查詢 
    在一切順利的情況下,你可以通過kibana查到類似的記錄 
    kibana 查詢示例

配置解釋

前面我們按照配置成功的將專案的日誌資訊寫入到了elk中,並在kibana上查到了相應的日誌資訊。接下來我們相信解析一下前面的配置。

是否包含上下文

<includeContext>true</includeContext>

開啟的話會包含hostname等logback的context資訊

這裡寫圖片描述

是否包含日誌來源

<includeCallerData>false</includeCallerData>
  • 1

這裡寫圖片描述

差異如下

"caller": {
    "class": "com.example.elkdemo.com.example.elktest.utils.LogHelper",
    "method": "helpMethod",
    "file": "LogHelper.java",
    "line": 11
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

自定義附加欄位

<customFields>{"system":"test"}</customFields>
  • 1

這裡寫圖片描述

自定義欄位的簡稱

<fieldNames class="net.logstash.logback.fieldnames.ShortenedFieldNames"/>
  • 1

內容如下

public class ShortenedFieldNames extends LogstashFieldNames {
    public static final String FIELD_LOGGER = "logger";
    public static final String FIELD_THREAD = "thread";
    public static final String FIELD_LEVEL_VAL = "levelVal";
    public static final String FIELD_CALLER = "caller";
    public static final String FIELD_CLASS = "class";
    public static final String FIELD_METHOD = "method";
    public static final String FIELD_FILE = "file";
    public static final String FIELD_LINE = "line";
    public static final String FIELD_STACKTRACE = "stacktrace";

    public ShortenedFieldNames() {
        this.setLogger("logger");
        this.setThread("thread");
        this.setLevelValue("levelVal");
        this.setCaller("caller");
        this.setCallerClass("class");
        this.setCallerMethod("method");
        this.setCallerFile("file");
        this.setCallerLine("line");
        this.setStackTrace("stacktrace");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

前面我們講解了,生成的JSON資料日誌的一些layout屬性配置,下面我們講解一下appender將日誌資訊傳送給kafka的時候的一些配置

概念準備

本文使用的logback-kafka-appender相對於kafka叢集來說就是kafka叢集訊息的生產者。 
- producer: 
  訊息生產者,釋出訊息到 kafka 叢集的終端或服務。 
- broker: 
  kafka 叢集中包含的伺服器。 
- topic: 
  每條釋出到 kafka 叢集的訊息屬於的類別,即 kafka 是面向 topic 的。 
- partition: 
  partition 是物理上的概念,每個 topic 包含一個或多個 partition。kafka 分配的單位是 partition。 
- consumer: 
  從 kafka 叢集中消費訊息的終端或服務。 
- Consumer group: 
  high-level consumer API 中,每個 consumer 都屬於一個 consumer group,每條訊息只能被 consumer group 中的一個 Consumer 消費,但可以被多個 consumer group 消費。 
- replica: 
  partition 的副本,保障 partition 的高可用。 
- leader: 
  replica 中的一個角色, producer 和 consumer 只跟 leader 互動。 
- follower: 
  replica 中的一個角色,從 leader 中複製資料。 
- controller: 
  kafka 叢集中的其中一個伺服器,用來進行 leader election 以及 各種 failover。 
- zookeeper: 
  kafka 通過 zookeeper 來儲存叢集的 meta 資訊。

kafka訊息提交策略(deliveryStrategy標籤用)

由於logback的日誌需要通過網路提交到kafka叢集,難免會存在因為網路不穩定、kafka叢集不穩定等原因導致日誌資訊不能抵達的情況,此時配置好相應的策略告訴logback-kafka-appender怎麼解決這個問題就顯得至關重要。

logback-kafka-appender為我們提供了兩種策略,非同步提交策略(AsynchronousDeliveryStrategy)和阻塞提交策略(BlockingDeliveryStrategy)

非同步提交策略(AsynchronousDeliveryStrategy)

任何訊息提交給kafka生產者,如果因為某些原因導致交付,該訊息會被分發給備胎appenders,可是在網路(比如與kafka伺服器的連線斷了)有問題時這種交付策略會在生產者的傳送緩衝區塞滿了以後堵塞。為了避免這個阻塞的產生,我們可以開啟producerConfig block.on.buffer.full = false。開啟後所有不能快速通過網路抵達kafka叢集的訊息都會被分發到備胎appender。

阻塞提交策略(BlockingDeliveryStrategy)

這個策略會一直阻塞呼叫執行緒直到每一個日誌訊息實際抵達kafka。這種策略會導致比較消極和不開心以及讓人難過的影響,因為它對吞吐量有巨大的負面影響。

==警告:這個策略不應使用一起producerConfig linger.ms==

==溫馨提示==

非同步提交策略(AsynchronousDeliveryStrategy)並不能阻止在向kafka提交資料時候的阻塞。這意味著:假如在logging上下文啟動的時候所有的kafka伺服器都不可達,或全部kafka伺服器在配置的時間段(>metadata.max.age.ms)依然不可達,你的appender最終也將阻塞。這種行為是我們不希望看到的,kafka-clients migitated 0.9版本已改善這種情況(見#16)。在此之前的版本,可使用如下方案。

<configuration>

    <!-- This is the kafkaAppender -->
    <appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
    <!-- Kafka Appender configuration -->
    </appender>

    <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
        <appender-ref ref="kafkaAppender" />
    </appender>

    <root level="info">
        <appender-ref ref="ASYNC" />
    </root>
</configuration>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

自定義策略

當然 如果你認為以上策略不是你的菜或者你實在看不慣這些策略,你也可以自定義你自己的策略,你只需要繼承om.github.danielwegener.logback.kafka.delivery.DeliveryStrategy就可以了

備胎(fallback)appender

假如某些原因導致我們的生產者(logback-kafka-appender)不能釋出日誌某條訊息,這條訊息還可以通過配置的備胎appender(比如基於STDOUT和STDERR的ConsoleAppender)記錄下來。

作為開發者,我們只需要在相應的可能出現問題的appenders裡面加入一個標籤appender-ref,所有不能抵達的訊息日誌都會分發到appender-ref申明的appenders去。

比如 STDOUT就是後備方案

值得注意的是,採用非同步提交策略會重用原來的kafka生產者的io執行緒寫訊息到後備appenders,因此所有的後備方案的appenders的io速度必須滿足要求,否則可能拖慢速度或者導致宕機。

生產者(logback-kafka-appender)配置調優(基於kafka0.8.2,logback-kafka-appender預設配置)

對logback-kafka-appender的所有配置 我們都可以通過Name=Value進行同名覆蓋,這給予我們更好的微調能力(比如 batch.size, compression.type 和 linger.ms)。

序列化

這個模組提供了一個與logback的LayoutWrappingEncoder類似的LayoutKafkaMessageEncoder(他們的區別就是以建立位元組陣列取代同步輸出流)

LayoutKafkaMessageEncoder使用了常用的ch.qos.logback.core.Layout作為layout-parameter(佈局引數)。

這就允許我們使用任何實現了 ILoggingEvent 或者 IAccessEvent 的元件,比如眾所周知的PatternLayout和logstash-logback-encoder的LogstashLayout。

自定義序列化(略)

主鍵分割槽策略

kafka的可伸縮性和順序保證嚴重依賴分割槽的概念。對於應用日誌來說這就意味著我們需要思考我們想要如何分配日誌訊息到多個kafka的topic分割槽。這個決定的一個含義是這些訊息在被多個不同消費者消費時有序,因為kafka只提供了在單一分割槽讀取順序的保證。另一個含義是我們的日誌訊息如何均勻分佈在所有可用分割槽並且保持不同伺服器間的負載均衡。

RoundRobinKeying策略(預設)

這個策略均勻地分配所有寫日誌訊息到所有可用的kafka分割槽。這一策略可能會導致消費者客戶端讀取順序異常。

HostNameKeying策略

這一策略使用HOSTNAME來劃分傳送到kafka的日誌訊息。這是有用的因為它確保所有通過這個host釋出的日誌訊息將以正確的順序交付給任何消費者。但是這種策略會導致日誌不均勻的分佈少量的主機(相對於分割槽的數量)。

ContextNameKeying(策略)

這一策略使用logbacks CONTEXT_NAME分割槽傳送到kafka的日誌訊息。這是確保所有相同日誌context的日誌訊息將可以被任何消費者以正確的順序消費。但是這種策略會導致日誌不均勻分佈少量的主機(相對於分割槽的數量)。這個策略只適用於ILoggingEvents。

ThreadNameKeying策略

這一策略使用呼叫執行緒的名字作為分割槽鍵。這將確保所有相同執行緒的訊息將可以被任何消費者以正確的順序消費。但是這種策略會導致日誌不均勻分佈少量執行緒(名稱)(相對於分割槽的數量)。這個策略只適用於ILoggingEvents。

LoggerNameKeying策略

*這個策略使用日誌記錄器的名字(logger name)作為分割槽鍵。這將確保所有使用相同logger name的訊息將可以被任何消費者以正確的順序消費。但是這種策略會導致日誌不均勻分佈與一些少量的不同的loggers上(相對於分割槽的數量)。這個策略只適用於ILoggingEvents。

自定義主鍵分割槽策略

如果你對以上分割槽策略都不滿足你的需求,你也可以很容易的通過實現keyingStrategy類來實現自己的分割槽策略,比如:

package foo;
import com.github.danielwegener.logback.kafka.keying.KeyingStrategy;

public class LevelKeyingStrategy implements KeyingStrategy<ILoggingEvent> {
    @Override
    public byte[] createKey(ILoggingEvent e) {
        return ByteBuffer.allocate(4).putInt(e.getLevel()).array();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

參考大多數的logback元件,自定義分割槽策略可能需要實現如下介面

ch.qos.logback.core.spi.ContextAware and ch.qos.logback.core.spi.LifeCycle interfaces.
  • 1

Q&A

如果想使用不同的kafka topic怎麼辦?

你只需要為這個topic增加一個appenders即可

如何讓log的格式為logstash裡面的json格式

<encoder class="com.github.danielwegener.logback.kafka.encoding.PatternLayoutKafkaMessageEncoder">
  <layout class="net.logstash.logback.layout.LogstashLayout" />
</encoder>
  • 1
  • 2
  • 3