1. 程式人生 > 實用技巧 >ELK+Filebeat+Nginx集中式日誌解決方案(三)—— 新增kafka+zookeeper叢集

ELK+Filebeat+Nginx集中式日誌解決方案(三)—— 新增kafka+zookeeper叢集

一、使用說明:


Kafka:


Kafka 是一個訊息系統,原本開發自 LinkedIn,用作 LinkedIn 的活動流(Activity Stream)和運營資料處理管道(Pipeline)的基礎。現在它已被多家公司作為多種型別的資料管道和訊息系統使用。活動流資料是幾乎所有站點在對其網站使用情況做報表時都要用到的資料中最常規的部分。活動資料包括頁面訪問量(Page View)、被檢視內容方面的資訊以及搜尋情況等內容。這種資料通常的處理方式是先把各種活動以日誌的形式寫入某種檔案,然後週期性地對這些檔案進行統計分析。運營資料指的是伺服器的效能資料(CPU、IO 使用率、請求時間、服務日誌等等資料),總的來說,運營資料的統計方法種類繁多。


不太熟悉的話,可以參考一下這一篇文章:《Kafka 入門 and kafka+logstash 實戰應用



Zookeeper:


ZooKeeper是一種為分散式應用所設計的高可用、高效能且一致的開源協調服務,是Google的Chubby一個開源實現,是Hadoop和Hbase的重要元件,它提供了一項基本服務:分散式鎖服務。由於ZooKeeper的開源特性,後來我們的開發者在分散式鎖的基礎上,摸索了出了其他的使用方法:配置維護、組服務、分散式訊息佇列、分散式通知/協調等。

Zookeeper是基於記憶體同步資料的,所以叢集內的節點其記憶體中的資料結構是完全相同的,因此效率非常高。

它主要是用來解決分散式應用中經常遇到的一些資料管理問題,如:統一命名服務、狀態同步服務、叢集管理、分散式應用配置項的管理等。


不太熟悉的話,可以參考一下這一篇文章:《ZooKeeper基本講解 & 叢集構建 & 常用操作指令


二、實驗環境


架構圖:

wKiom1jT19Py1R_RAAEqSFN6qTU924.png


架構解讀 : (整個架構從左到右,總共分為5層)


第一層、資料採集層

最左邊的是業務伺服器叢集,上面安裝了filebeat做日誌採集,同時把採集的日誌分別傳送給兩個logstash服務。


第二層、資料處理層,資料快取層

logstash服務把接受到的日誌經過格式處理,轉存到本地的kafka broker+zookeeper 叢集中。


第三層、資料轉發層

這個單獨的Logstash節點會實時去kafka broker叢集拉資料,轉發至ES DataNode。


第四層、資料持久化儲存

ES DataNode 會把收到的資料,寫磁碟,建索引庫。


第五層、資料檢索,資料展示

ES Master + Kibana 主要協調ES叢集,處理資料檢索請求,資料展示。


筆者為了節約寶貴的伺服器資源,把一些可拆分的服務合併在同一臺主機。大家可以根據自己的實際業務環境自由拆分,延伸架構。



8臺伺服器(centos 6.5 final版本):

192.168.1.194(filebeat收集日誌,nginx做為web伺服器)
192.168.1.195(filebeat收集日誌,nginx做為web伺服器)
192.168.1.196(logstash,kafka+zookeeper)
192.168.1.197(kafka+zookeeper)
192.168.1.199(logstash,kafka+zookeeper)
192.168.1.198(elasticsearchmaster,kibana,nginx做方向代理)
192.168.1.200(elasticsearchDataNode)
192.168.1.201(elasticsearchDataNode)


使用版本:

java-1.8.0-openjdk
filebeat-5.2.2
logstash-5.2.2
elasticsearch-5.2.2
kibana-5.2.2
nginx-1.6.1
zookeeper-3.4.9
kafka_2.11-0.10.2.0


三、安裝配置:


此次實驗是在第一和第二篇文章的基礎上,不熟悉的可以先看一下第一篇文章《ELK+Filebeat+Nginx集中式日誌解決方案(一)》和第二篇文章《ELK+Filebeat+Nginx集中式日誌解決方案(二)——新增ElasticSearch叢集》。

1、安裝和配置zookeeper叢集:

zookeeper官網:http://zookeeper.apache.org/

在192.168.1.196,192.168.1.197,192.168.1.199上分別安裝java-1.8.0-openjdk

tarxfzookeeper-3.4.9.tar.gz

配置檔案:


[[email protected]zookeeper-3.4.9]#vimconf/zoo.cfg

內容為:
#Thenumberofmillisecondsofeachtick
tickTime=2000
#Thenumberofticksthattheinitial
#synchronizationphasecantake
initLimit=10
#Thenumberofticksthatcanpassbetween
#sendingarequestandgettinganacknowledgement
syncLimit=5
#thedirectorywherethesnapshotisstored.
#donotuse/tmpforstorage,/tmphereisjust
#examplesakes.
dataDir=/u01/zookeeper/zookeeper-3.4.9/data
#theportatwhichtheclientswillconnect
clientPort=2181
#themaximumnumberofclientconnections.
#increasethisifyouneedtohandlemoreclients
#maxClientCnxns=60
server.6=192.168.1.196:2888:3888
server.7=192.168.1.197:2888:3888
server.9=192.168.1.199:2888:3888
#
#Besuretoreadthemaintenancesectionofthe
#administratorguidebeforeturningonautopurge.
#
#http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
#ThenumberofsnapshotstoretainindataDir
#autopurge.snapRetainCount=3
#Purgetaskintervalinhours
#Setto"0"todisableautopurgefeature
#autopurge.purgeInterval=1


建立myid檔案:

#192.168.1.196
echo6>/u01/zookeeper/zookeeper-3.4.9/data/myid

#192.168.1.197
echo7>/u01/zookeeper/zookeeper-3.4.9/data/myid

#192.168.1.199
echo9>/u01/zookeeper/zookeeper-3.4.9/data/myid


啟動服務 & 檢視節點狀態

#192.168.1.196
bin/zkServer.shstart
bin/zkServer.shstatus
ZooKeeperJMXenabledbydefault
Usingconfig:/home/workspace/zookeeper-3.4.9/bin/../conf/zoo.cfg
Mode:leader

#192.168.1.197
bin/zkServer.shstart
bin/zkServer.shstatus
ZooKeeperJMXenabledbydefault
Usingconfig:/home/workspace/zookeeper-3.4.9/bin/../conf/zoo.cfg
Mode:follower

#192.168.1.199
bin/zkServer.shstart
bin/zkServer.shstatus
ZooKeeperJMXenabledbydefault
Usingconfig:/home/workspace/zookeeper-3.4.9/bin/../conf/zoo.cfg
Mode:follower


Zookeeper的叢集已經配置ok了



2、安裝和配置kafka叢集:


Kafka官網: http://kafka.apache.org/

在192.168.1.196,192.168.1.197,192.168.1.199上分別執行

tarxfkafka_2.11-0.10.2.0.tar.gz


192.168.1.196修改配置檔案:

[[email protected]workspace]#vimkafka_2.11-0.10.2.0/config/server.properties

內容為:
#LicensedtotheApacheSoftwareFoundation(ASF)underoneormore
#contributorlicenseagreements.SeetheNOTICEfiledistributedwith
#thisworkforadditionalinformationregardingcopyrightownership.
#TheASFlicensesthisfiletoYouundertheApacheLicense,Version2.0
#(the"License");youmaynotusethisfileexceptincompliancewith
#theLicense.YoumayobtainacopyoftheLicenseat
#
#http://www.apache.org/licenses/LICENSE-2.0
#
#Unlessrequiredbyapplicablelaworagreedtoinwriting,software
#distributedundertheLicenseisdistributedonan"ASIS"BASIS,
#WITHOUTWARRANTIESORCONDITIONSOFANYKIND,eitherexpressorimplied.
#SeetheLicenseforthespecificlanguagegoverningpermissionsand
#limitationsundertheLicense.
#seekafka.server.KafkaConfigforadditionaldetailsanddefaults
#############################ServerBasics#############################
#Theidofthebroker.Thismustbesettoauniqueintegerforeachbroker.
broker.id=6
#Switchtoenabletopicdeletionornot,defaultvalueisfalse
#delete.topic.enable=true
#############################SocketServerSettings#############################
#Theaddressthesocketserverlistenson.Itwillgetthevaluereturnedfrom
#java.net.InetAddress.getCanonicalHostName()ifnotconfigured.
#FORMAT:
#listeners=listener_name://host_name:port
#EXAMPLE:
#listeners=PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.1.196:9092
#Hostnameandportthebrokerwilladvertisetoproducersandconsumers.Ifnotset,
#itusesthevaluefor"listeners"ifconfigured.Otherwise,itwillusethevalue
#returnedfromjava.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
#Mapslistenernamestosecurityprotocols,thedefaultisforthemtobethesame.Seetheconfigdocumentationformoredetails
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
#Thenumberofthreadshandlingnetworkrequests
num.network.threads=3
#ThenumberofthreadsdoingdiskI/O
num.io.threads=8
#Thesendbuffer(SO_SNDBUF)usedbythesocketserver
socket.send.buffer.bytes=102400
#Thereceivebuffer(SO_RCVBUF)usedbythesocketserver
socket.receive.buffer.bytes=102400
#Themaximumsizeofarequestthatthesocketserverwillaccept(protectionagainstOOM)
socket.request.max.bytes=104857600
#############################LogBasics#############################
#Acommaseperatedlistofdirectoriesunderwhichtostorelogfiles
log.dirs=/home/workspace/kafka_2.11-0.10.2.0/data
#Thedefaultnumberoflogpartitionspertopic.Morepartitionsallowgreater
#parallelismforconsumption,butthiswillalsoresultinmorefilesacross
#thebrokers.
num.partitions=1
#Thenumberofthreadsperdatadirectorytobeusedforlogrecoveryatstartupandflushingatshutdown.
#ThisvalueisrecommendedtobeincreasedforinstallationswithdatadirslocatedinRAIDarray.
num.recovery.threads.per.data.dir=1
#############################LogFlushPolicy#############################
#Messagesareimmediatelywrittentothefilesystembutbydefaultweonlyfsync()tosync
#theOScachelazily.Thefollowingconfigurationscontroltheflushofdatatodisk.
#Thereareafewimportanttrade-offshere:
#1.Durability:Unflusheddatamaybelostifyouarenotusingreplication.
#2.Latency:Verylargeflushintervalsmayleadtolatencyspikeswhentheflushdoesoccurastherewillbealotofdatatoflush.
#3.Throughput:Theflushisgenerallythemostexpensiveoperation,andasmallflushintervalmayleadtoexceessiveseeks.
#Thesettingsbelowallowonetoconfiguretheflushpolicytoflushdataafteraperiodoftimeor
#everyNmessages(orboth).Thiscanbedonegloballyandoverriddenonaper-topicbasis.
#Thenumberofmessagestoacceptbeforeforcingaflushofdatatodisk
#log.flush.interval.messages=10000
#Themaximumamountoftimeamessagecansitinalogbeforeweforceaflush
#log.flush.interval.ms=1000
#############################LogRetentionPolicy#############################
#Thefollowingconfigurationscontrolthedisposaloflogsegments.Thepolicycan
#besettodeletesegmentsafteraperiodoftime,orafteragivensizehasaccumulated.
#Asegmentwillbedeletedwhenever*either*ofthesecriteriaaremet.Deletionalwayshappens
#fromtheendofthelog.
#Theminimumageofalogfiletobeeligiblefordeletionduetoage
log.retention.hours=168
#Asize-basedretentionpolicyforlogs.Segmentsareprunedfromthelogaslongastheremaining
#segmentsdon'tdropbelowlog.retention.bytes.Functionsindependentlyoflog.retention.hours.
#log.retention.bytes=1073741824
#Themaximumsizeofalogsegmentfile.Whenthissizeisreachedanewlogsegmentwillbecreated.
log.segment.bytes=1073741824
#Theintervalatwhichlogsegmentsarecheckedtoseeiftheycanbedeletedaccording
#totheretentionpolicies
log.retention.check.interval.ms=300000
#############################Zookeeper#############################
#Zookeeperconnectionstring(seezookeeperdocsfordetails).
#Thisisacommaseparatedhost:portpairs,eachcorrespondingtoazk
#server.e.g."127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
#Youcanalsoappendanoptionalchrootstringtotheurlstospecifythe
#rootdirectoryforallkafkaznodes.
zookeeper.connect=192.168.1.196:2181,192.168.1.197:2181,192.168.1.199:2181
#Timeoutinmsforconnectingtozookeeper
zookeeper.connection.timeout.ms=6000


其他兩臺192.168.1.197,192.168.1.199修改配置檔案:

#修改broker.id和listeners
#192.168.1.197
broker.id=7
listeners=PLAINTEXT://192.168.1.197:9092

#192.168.1.199
broker.id=9
listeners=PLAINTEXT://192.168.1.199:9092

其他都一致!!!


啟動服務(都一樣啟動):

bin/kafka-server-start.shconfig/server.properties


ok,此時kafka+zookeeper叢集完成!!!



3、在192.168.1.199上面安裝和配置logstash


安裝方法參照第一篇文章ELK+Filebeat+Nginx集中式日誌解決方案(一)》!此機器上的logstash主要是用來做為kafka叢集的訊息消費者,也就是從kafka叢集主動拉取訊息,再轉發到elasticsearch叢集上面!


修改配置:

vim/etc/logstash/conf.d/kafka2elasticsearch.conf

內容為:
input{
kafka{
bootstrap_servers=>"192.168.1.196:9092,192.168.1.197:9092,192.168.1.199:9092"
group_id=>"logstashnginx"
topics=>["testnginx"]
consumer_threads=>10
}
}

filter{
#if"nginx-accesslog"in[tags]{
grok{
match=>{"message"=>"%{HTTPDATE:timestamp}\|%{IP:remote_addr}\|%{IPORHOST:http_host}\|(?:%{DATA:http_x_forwarded_for}|-)\|%{DATA:request_method}\|%{DATA:request_uri}\|%{DATA:server_protocol}\|%{NUMBER:status}\|(?:%{NUMBER:body_bytes_sent}|-)\|(?:%{DATA:http_referer}|-)\|%{DATA:http_user_agent}\|(?:%{DATA:request_time}|-)\|"}
}
mutate{
convert=>["status","integer"]
convert=>["body_bytes_sent","integer"]
convert=>["request_time","float"]
}
geoip{
source=>"remote_addr"
}
date{
match=>["timestamp","dd/MMM/YYYY:HH:mm:ssZ"]
}
useragent{
source=>"http_user_agent"
}
#}
#if"sys-messages"in[tags]{
#grok{
#match=>{"message"=>"%{SYSLOGLINE}"}
#add_field=>["received_at","%{@timestamp}"]
#add_field=>["received_from","%{host}"]
#}
#date{
#match=>["timestamp","MMMdHH:mm:ss"]
#}
#ruby{
#code=>"event['@timestamp']=event['@timestamp'].getlocal"
#}
#}
}

output{
elasticsearch{
hosts=>["192.168.1.200:9200","192.168.1.201:9200"]
index=>"logstash-nginx-%{+YYYY.MM.dd}"
#document_type=>"%{type}"
flush_size=>50000
idle_flush_time=>10
}
#stdout{codec=>rubydebug}
}


然後啟動logstash:

nohuplogstash-r-fkafka2elasticsearch.conf--path.settings/etc/logstash/&



4、在192.168.1.196上面配置logstash

修改配置:

vim/etc/logstash/conf.d/2kafka.conf

內容為:
input{
beats{
port=>5044
}
}
output{
#elasticsearch{
#hosts=>["192.168.1.198:9200"]
#index=>"logstash-%{type}-%{+YYYY.MM.dd}"
#document_type=>"%{type}"
#}
#stdout{codec=>rubydebug}
kafka{
#workers=>3
bootstrap_servers=>"192.168.1.196:9092,192.168.1.197:9092,192.168.1.199:9092"
topic_id=>"testnginx"
}
}


然後啟動logstash:

nohuplogstash-r-f2kafka.conf--path.settings/etc/logstash/&



ok!到此整個叢集架構就搭設完成了!!!可以在kibana上面看到被蒐集的日誌資訊了!


後面還會有後續文章,敬請關注!!!


四、未解決的:

  • 在實驗的過程中,發現第二層的logstash可以從filebeat中獲取到tags、document_type這類值,但是通過kafka+zookeeper叢集之後,第三層的logstash再從kafka+zookeeper叢集就拉群不到這類fileds資訊了,所以也沒辦法在filter中利用不通的tags或者document_type這類值做為不通日誌來源的判斷,從而無法使用不同的grok匹配。這裡哪位同學有什麼解決辦法,可以在評論區說一下。


轉載於:https://blog.51cto.com/zhengmingjing/1909835