logstash+elasticsearch+kafka統一日誌收集
阿新 • • 發佈:2018-11-26
文章目錄
logstash+elasticsearch+kafka統一日誌收集
環境準備
以下相關軟體可從(http://pan.baidu.com/s/1c1Dj41U)下載
- 安裝jdk
- 安裝elasticsearch
從(https://www.elastic.co/downloads)下載最新版,解壓後直接執行bin/elasticsearch.bat即可,開啟elasticsearch.yml配置檔案,設定network.host為外部可訪問的ip,建議0.0.0.0 - 安裝kibana
從(https://www.elastic.co/downloads)下載最新版,解壓後執行bin/kibana.bat啟動程式 - 安裝logstash
從(https://www.elastic.co/downloads)下載最新版,解壓到指定目錄 - 安裝kafka
從(http://kafka.apache.org/downloads.html
引數配置
- 在logstash/config目錄下新建檔案log4j.conf,然後啟動logstash服務(logstash.bat -f …/config/log4j.conf ),內容如下
input {
log4j{
mode => "server"
type=>"log4j-json"
port=>4712
}
}
filter {}
output {
stdout { codec => rubydebug }
elasticsearch { hosts => ["127.0.0.1"] }
}
- 在系統內log4j要配置SocketAppender輸出,log4j的properties配置
log4j.rootLogger=info,logstash
# Socket,logstash
log4j.appender.logstash=org.apache.log4j.net.SocketAppender
log4j.appender.logstash.Port=4712
log4j.appender.logstash.RemoteHost=localhost
log4j.appender.logstash.ReconnectionDelay=60000
log4j.appender.logstash.LocationInfo=true
# 工程名
log4j.appender.logstash.application=stage
其他說明
- logstash的elasticsearch外掛預設將資訊儲存在logstash-%{+YYYY.MM.dd}格式的索引下面,在第一次開啟kibana時,會要求建立查詢索引,直接建立預設的logstash-*即可
- elasticsearch最好部署成叢集環境,以實現不間斷收集日誌
- 本示例直接使用log4j的SocketAppender輸出,如果logstash的socket服務斷掉,日誌會消失;如果log4j輸出到kafka,再從kafka輸出到logstash會避免這個問題
通過kafka蒐集日誌
引入第三方包kafka-log4j-appender
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-log4j-appender</artifactId>
<version>0.10.0.0</version>
</dependency>
配置kafka appender
## appender kafka
log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.kafka.topic=logstash
log4j.appender.kafka.brokerList=brokerNode1:9091,brokerNode2:9092
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.syncSend=true
log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
log4j.appender.kafka.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L %% - %m%n
logstash配置kafka輸入
input {
kafka{
bootstrap_servers => "192.168.1.202:9092,192.168.1.202:9093"
topics=>["logstash"]
type=>"log4j-json"
}
}
filter {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:logtime} \[%{DATA:javaThread}\] %{WORD:project} %{LOGLEVEL:loglevel} %{DATA:msg}" }
}
}
output {
stdout { codec => rubydebug }
elasticsearch { hosts => ["127.0.0.1"] }
}
注意:
- kafka的topic必須配置一致;
- logstash的elasticsearch外掛預設埠是9092,如果配置多個地址必須寫成[“127.0.0.1:9200”,“127.0.0.2:9200”]
- logstash可以通過filter外掛,利用正則表示式提取關鍵字串作為單獨欄位儲存
- 當前版本的KafkaLog4jAppender程式碼有bug,無法輸出異常棧,需重寫原始碼,修改subAppend方法
private String subAppend(LoggingEvent event) {
StringBuffer msg = new StringBuffer();
msg.append((this.layout == null) ? event.getRenderedMessage() : this.layout.format(event));
if (layout.ignoresThrowable()) {
String[] s = event.getThrowableStrRep();
if (s != null) {
int len = s.length;
for (int i = 0; i < len; i++) {
msg.append(s[i]);
msg.append(Layout.LINE_SEP);
}
}
}
return msg.toString();
}
參考資料
- http://www.cnblogs.com/zero/p/5904297.html 日誌收集系統 ELK
- https://www.elastic.co/guide/en/elastic-stack/current/elastic-stack.html 官網手冊
- http://www.cnblogs.com/dennisit/p/4133131.html Elastic Search 分散式部署示例