1. 程式人生 > >ELK+zookeeper+kafka收集springcould日誌配置文檔

ELK+zookeeper+kafka收集springcould日誌配置文檔

最大 num 形式 initial 1.5 state eset pat receive

技術分享圖片
####
####
####

技術分享圖片

本文介紹使用ELK(elasticsearch、logstash、kibana) + kafka來搭建一個日誌系統。主要演示使用spring aop進行日誌收集,然後通過kafka將日誌發送給logstash,logstash再將日誌寫入elasticsearch,這樣elasticsearch就有了日誌數據了,最後,則使用kibana將存放在elasticsearch中的日誌數據顯示出來,並且可以做實時的數據圖表分析等等。

最開始我些項目的時候,都習慣用log4j來把日誌寫到log文件中,後來項目有了高可用的要求,我們就進行了分布式部署web,這樣我們還是用log4j這樣的方式來記錄log的話,那麽就有N臺機子的N個log目錄,ELK正常使用時需要在每臺機器上都安裝logstash等收集日誌的客戶端,這個時候查找log起來非常麻煩,不知道問題用戶出錯log是寫在哪一臺服務器上的,後來,想到一個辦法,幹脆把log直接寫到數據庫中去,這樣做,雖然解決了查找異常信息便利性的問題了,但存在兩個缺陷:

1,log記錄好多,表不夠用啊,又得分庫分表了,

2,連接db,如果是數據庫異常,那邊log就丟失了,那麽為了解決log丟失的問題,那麽還得先將log寫在本地,然後等db連通了後,再將log同步到db,這樣的處理辦法,感覺是越搞越復雜。

現在ELK的做法
好在現在有了ELK這樣的方案,可以解決以上存在的煩惱,首先是,使用elasticsearch來存儲日誌信息,對一般系統來說可以理解為可以存儲無限條數據,因為elasticsearch有良好的擴展性,然後是有一個logstash,可以把理解為數據接口,為elasticsearch對接外面過來的log數據,它對接的渠道,有kafka,有log文件,有redis等等,足夠兼容N多log形式,最後還有一個部分就是kibana,它主要用來做數據展現,log那麽多數據都存放在elasticsearch中,我們得看看log是什麽樣子的吧,這個kibana就是為了讓我們看log數據的,但還有一個更重要的功能是,可以編輯N種圖表形式,什麽柱狀圖,折線圖等等,來對log數據進行直觀的展現。

技術分享圖片

ELK職能分工

logstash做日誌對接,接受應用系統的log,然後將其寫入到elasticsearch中,logstash可以支持N種log渠道,kafka渠道寫進來的、和log目錄對接的方式、也可以對reids中的log數據進行監控讀取,等等。

elasticsearch存儲日誌數據,方便的擴展特效,可以存儲足夠多的日誌數據。

kibana則是對存放在elasticsearch中的log數據進行:數據展現、報表展現,並且是實時的。
技術分享圖片

此次安裝的軟件:es,kibana,logstash,kafka,zookeeper
ELK版本最好選用5以上的版本,es需要用普通用戶起服務還需要註意系統的最大打開文件數等參數,索引的應用基本都是解壓即可。

修改系統參數

vi /etc/security/limits.conf

*              soft    nproc          65536

*              hard    nproc          65536

*              soft    nofile          65536

*              hard    nofile          65536

vi /etc/sysctl.conf

vm.max_map_count= 262144

sysctl -p 

####
####

es:
技術分享圖片

vim elasticsearch.yml

node.name : elk1

path.data : /data/elk_data

path.logs : /data/elk_data

network.host : 192.168.0.181

http.port : 9200

http.cors.enabled: true

http.cors.allow-origin: "*"
##
由於head插件本質上還是一個nodejs的工程,因此需要安裝node,使用npm來安裝依賴的包。(npm可以理解為maven)

下載node-v6.10.3-linux-x64.tar.gz ,nodejs下載地址

把“node-v6.10.3-linux-x64.tar.gz”拷貝到centos上,本示例目錄為:/opt/es/
技術分享圖片

cd /opt/es/

tar -xzvf node-v6.10.3-linux-x64.tar.gz

ln -s /opt/es/node-v6.10.3-linux-x64/bin/node /usr/local/bin/node

ln -s /opt/es/node-v6.10.3-linux-x64/bin/npm /usr/local/bin/npm

cd /opt/es/node-v6.10.3-linux-x64/bin/

測試是否安裝成功

npm -v

#####
#####

kibana:

技術分享圖片

vim kibana.yml

server.port: 5601
server.host: "192.168.0.181"
elasticsearch.url: "http://192.168.0.181:9200"
kibana.index: ".kibana"

#####
#####
技術分享圖片

kafka:

技術分享圖片

vim server.properties

broker.id=0
#port=9092
#host.name=192.168.0.181
listeners=PLAINTEXT://192.168.0.181:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/logs/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.0.181:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
#advertised.host.name=192.168.0.181
#advertised.port=9092
advertised.listeners=PLAINTEXT://192.168.0.181:9092
replica.fetch.max.bytes=4194304
message.max.bytes=4000000
compression.codec=snappy
max.partition.fetch.bytes=4194304

#
#
添加topic

./kafka-topics.sh --create --topic app-log --replication-factor 1 --partitions 1 --zookeeper localhost:2181
./kafka-topics.sh --list --zookeeper localhost:2181

#
./kafka-console-producer.sh --broker-list 192.168.0.181:9092 --topic app-log
./kafka-console-consumer.sh --zookeeper 192.168.0.181:2181 --topic app-log --from-beginning //查看輸出

技術分享圖片

####
####

logstash:

添加配置文件,啟動logstash時-f指定配置文件

[root@elk-181 config]# cat kafka.conf 
input {
    kafka {
        bootstrap_servers => "192.168.0.181:9092"
        group_id => "app-log"
        topics => ["app-log"]
        consumer_threads => 5  
        decorate_events => false 
        auto_offset_reset => "earliest" #latest最新的;earliest最開始的
    }
}

filter {
        json {
            source => "message"
        }
                if [severity] == "DEBUG" {
               drop {}
               }
}

output {
    elasticsearch {
    action => "index"          #The operation on ES
        hosts  => "192.168.0.181:9200"     #ElasticSearch host, can be array.
#       index  => "applog"         #The index to write data to.  
        index => "applog-%{+YYYY.MM.dd}"
    }
  }

####
####

springcould日誌配置

###
技術分享圖片

###

技術分享圖片

log:
  brokerList:
    url: 192.168.0.181:9092
    topic: app-log
  console :
    level : DEBUG

#
#

#
#
<appender name="KAFKA" class="com.skong.core.logs.KafkaAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">

${logLevel} # # ![](https://s1.51cto.com/images/blog/201901/10/f03c34d45cc43564d98bc2dcf7a6019a.png?x-oss-process=image/watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=) # # # ![](https://s1.51cto.com/images/blog/201901/10/c0e5bcced0a454b2900a9f508ff9e77e.jpg?x-oss-process=image/watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=) 此腳本為定時刪除索引和重啟 ``` [root@elk-181 ~]# cat cleanesdata.sh #!/bin/bash #author:silence_ltx #date:20180809 PATH=/data/xxxx-node/bin:/xxxx-java/bin:/data/elasticsearch/bin:/data/kibana/bin:/usr/local/python/bin:/data/xxxx-node/bin:/data/xxxx-node/bin:/data/xxxx-java/bin:/data/elasticsearch/bin:/data/kibana/bin:/data/xxxx-java/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin:/root/bin:/root/bin date=`date +%Y.%m.%d -d "2 days ago"` echo $date /usr/bin/curl -XDELETE "192.168.0.181:9200/applog-${date}" ps -ef| grep logstash | grep -v grep | awk ‘{print $2}‘ | xargs kill -9 netstat -nltp | grep 9092 | awk ‘{print $NF}‘ | awk -F "/" ‘{print $1 }‘| xargs kill -9 sleep 30 find /data/logs/kafka-logs/app-log-0/ -mtime +2 -exec rm -rf {} \; cd /data/kafka/bin sh ./kafka-server-start.sh ../config/server.properties > /data/kafka/bin/kafka.log & sleep 30 cd /data/logstash/bin sh ./logstash -f ../config/kafka.conf > /data/logstash/bin/logstash.log & ``` ![](https://s1.51cto.com/images/blog/201901/10/32c6a3db29380f7498ca9f11a411db03.jpg?x-oss-process=image/watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=) # # # ![](https://s1.51cto.com/images/blog/201901/10/8d40ae54dd476149390d0fef36c8322c.jpg?x-oss-process=image/watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=)

ELK+zookeeper+kafka收集springcould日誌配置文檔