1. 程式人生 > 其它 >企業級實戰模組三:ELK+Filebeat+Kafka+ZooKeeper構建大資料日誌分析平臺案例(下)

企業級實戰模組三:ELK+Filebeat+Kafka+ZooKeeper構建大資料日誌分析平臺案例(下)

企業級實戰模組三:ELK+Filebeat+Kafka+ZooKeeper構建大資料日誌分析平臺案例(下)

1 安裝並配置Kafka Broker叢集

1.1 下載與安裝Kafka

可以從kafka官網https://kafka.apache.org/downloads獲取kafka安裝包,將下載下來的安裝包直接解壓到一個路徑下即可完成kafka的安裝,這裡統一將kafka安裝到/usr/local目錄下,基本操作過程如下:

tar -zxvf kafka_2.11-2.2.2.tgz 

mv kafka_2.11-2.2.2 /usr/local/kafka

注意:file 是對Kafka有嚴格的版本限制,需要在file 官網檢視支援的版本

https://www.elastic.co/guide/en/beats/filebeat/7.15/kafka-output.html

1.2 配置kafka叢集

這裡將kafka安裝到/usr/local目錄下,因此,kafka的主配置檔案為/usr/local/kafka/config/server.properties,這裡以節點kafkazk1為例,重點介紹一些常用配置項的含義:

broker.id=1
listeners=PLAINTEXT://192.168.5.4:9092
log.dirs=/usr/local/kafka/logs
num.partitions=6
log.retention.hours=60
log.segment.bytes=1073741824
zookeeper.connect=192.168.5.4:2181,192.168.5.5:2181,192.168.5.6:2181
auto.create.topics.enable=true
delete.topic.enable=true

每個配置項含義如下:

  • broker.id:每一個broker在叢集中的唯一表示,要求是正數。當該伺服器的IP地址發生改變時,broker.id沒有變化,則不會影響consumers的訊息情況。

  • listeners:設定kafka的監聽地址與埠,可以將監聽地址設定為主機名或IP地址,這裡將監聽地址設定為IP地址。

  • log.dirs:這個引數用於配置kafka儲存資料的位置,kafka中所有的訊息都會存在這個目錄下。可以通過逗號來指定多個路徑, kafka會根據最少被使用的原則選擇目錄分配新的parition。需要注意的是,kafka在分配parition的時候選擇的規則不是按照磁碟的空間大小來定的,而是根據分配的 parition的個數多小而定。

  • num.partitions:這個引數用於設定新建立的topic有多少個分割槽,可以根據消費者實際情況配置,配置過小會影響消費效能。這裡配置6個。

  • log.retention.hours:這個引數用於配置kafka中訊息儲存的時間,還支援log.retention.minutes和

  • log.retention.ms配置項。這三個引數都會控制刪除過期資料的時間,推薦使用log.retention.ms。如果多個同時設定,那麼會選擇最小的那個。

  • log.segment.bytes:配置partition中每個segment資料檔案的大小,預設是1GB,超過這個大小會自動建立一個新的segment file。

  • zookeeper.connect:這個引數用於指定zookeeper所在的地址,它儲存了broker的元資訊。 這個值可以通過逗號設定多個值,每個值的格式均為:hostname:port/path,每個部分的含義如下:

    • hostname:表示zookeeper伺服器的主機名或者IP地址,這裡設定為IP地址。

    • port: 表示是zookeeper伺服器監聽連線的埠號。

    • /path:表示kafka在zookeeper上的根目錄。如果不設定,會使用根目錄。

  • auto.create.topics.enable:這個引數用於設定是否自動建立topic,如果請求一個topic時發現還沒有建立, kafka會在broker上自動建立一個topic,如果需要嚴格的控制topic的建立,那麼可以設定

  • auto.create.topics.enable為false,禁止自動建立topic。

  • delete.topic.enable:在0.8.2版本之後,Kafka提供了刪除topic的功能,但是預設並不會直接將topic資料物理刪除。如果要從物理上刪除(即刪除topic後,資料檔案也會一同刪除),就需要設定此配置項為true。

1.3 啟動kafka叢集

在啟動kafka叢集前,需要確保ZooKeeper叢集已經正常啟動。接著,依次在kafka各個節點上執行如下命令即可:

cd /usr/local/kafka && nohup bin/kafka-server-start.sh config/server.properties &

這裡將kafka放到後臺執行,啟動後,會在啟動kafka的當前目錄下生成一個nohup.out檔案,可通過此檔案檢視kafka的啟動和執行狀態。通過jps指令,可以看到有個Kafka標識,這是kafka程序成功啟動的標誌。

1.4 kafka叢集基本命令操作

kefka提供了多個命令用於檢視、建立、修改、刪除topic資訊,也可以通過命令測試如何生產訊息、消費訊息等,這些命令位於kafka安裝目錄的bin目錄下,這裡是/usr/local/kafka/bin。登入任意一臺kafka叢集節點,切換到此目錄下,即可進行命令操作。下面列舉kafka的一些常用命令的使用方法。

1)建立一個topic,並指定topic屬性(副本數、分割槽數等)

bin/kafka-topics.sh --create --zookeeper 192.168.5.4:2181,192.168.5.5:2181,192.168.5.6:2181 --replication-factor 1 --partitions 3 --topic mytopic

2)顯示topic列表

bin/kafka-topics.sh --zookeeper 192.168.5.4:2181,192.168.5.5:2181,192.168.5.6:2181 --list

3)檢視某個topic的狀態

bin/kafka-topics.sh --describe --zookeeper 192.168.5.4:2181,192.168.5.5:2181,192.168.5.6:2181 --topic mytopic

4)生產訊息

bin/kafka-console-producer.sh --broker-list 192.168.5.4:9092,192.168.5.5:9092,192.168.5.6:9092 --topice mytopic

5)消費訊息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.5.4:9092,192.168.5.5:9092,192.168.5.6:9092 --topic mytopic --from-beginning

6)刪除topic

bin/kafka-topics.sh --zookeeper 192.168.5.4:2181,192.168.5.5:2181,192.168.5.6:2181 --delete --topic mytopic

2 安裝並配置Filebeat

2.1 為什麼要使用filebeat

Logstash功能雖然強大,但是它依賴java、在資料量大的時候,Logstash程序會消耗過多的系統資源,這將嚴重影響業務系統的效能,而filebeat就是一個完美的替代者,filebeat是Beat成員之一,基於Go語言,沒有任何依賴,配置檔案簡單,格式明瞭,同時,filebeat比logstash更加輕量級,所以佔用系統資源極少,非常適合安裝在生產機器上。

2.2 下載與安裝filebeat

由於filebeat基於go語言開發,無其他任何依賴,因而安裝非常簡單,可以從elastic官網https://www.elastic.co/downloads/beats/filebeat 獲取filebeat安裝包,將下載下來的安裝包直接解壓到一個路徑下即可完成filebeat的安裝。根據前面的規劃,將filebeat安裝到filebeatserver主機上,這裡設定將filebeat安裝到/usr/local目錄下,基本操作過程如下:

tar -zxvf filebeat-7.15.0-linux-x86_64.tar.gz

mv filebeat-7.15.0-linux-x86_64 /usr/local/filebeat

2.3 配置filebeat

filebeat的配置檔案目錄為/usr/local/filebeat/filebeat.yml,這裡僅列出常用的配置項,內容如下:

filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/messages
- /var/log/secure
fields:
log_topic: osmessages
name: "192.168.5.3"
output.kafka:
enabled: true
hosts: ["192.168.5.4:9092","192.168.5.5:9092","192.168.5.6:9092"]
# version: "2.2.2"
topic: '%{[fields][log_topic]}'
partition.round_robin:
reachable_only: true
worker: 2
required_acks: 1
compression: gzip
max_message_bytes: 10000000
logging.level: debug

配置項的含義介紹如下:

  • filebeat.inputs:用於定義資料原型。

  • type:指定資料的輸入型別,這裡是log,即日誌,是預設值,還可以指定為stdin,即標準輸入。

  • enabled: true:啟用手工配置filebeat,而不是採用模組方式配置filebeat。

  • paths:用於指定要監控的日誌檔案,可以指定一個完整路徑的檔案,也可以是一個模糊匹配格式,例如:

- /data/nginx/logs/nginx_*.log,該配置表示將獲取/data/nginx/logs目錄下的所有以.log結尾的檔案,注意這裡有個破折號“-”,要在paths配置項基礎上進行縮排,不然啟動filebeat會報錯,另外破折號前面不能有tab縮排,建議通過空格方式縮排。

- /var/log/*.log,該配置表示將獲取/var/log目錄的所有子目錄中以”.log”結尾的檔案,而不會去查詢/var/log目錄下以”.log”結尾的檔案。

  • name: 設定filebeat收集的日誌中對應主機的名字,如果配置為空,則使用該伺服器的主機名。這裡設定為IP,便於區分多臺主機的日誌資訊。

  • output.kafka:filebeat支援多種輸出,支援向kafka,logstash,elasticsearch輸出資料,這裡的設定是將資料輸出到kafka。

  • enabled:表明這個模組是啟動的。

  • host: 指定輸出資料到kafka叢集上,地址為kafka叢集IP加埠號。

  • topic:指定要傳送資料給kafka叢集的哪個topic,若指定的topic不存在,則會自動建立此topic。注意topic的寫法,在filebeat6.x之前版本是通過“%{[type]}”來自動獲取document_type配置項的值。而在filebeat6.x之後版本是通過'%{[fields][log_topic]}'來獲取日誌分類的。

  • logging.level:定義filebeat的日誌輸出級別,有critical、error、warning、info、debug五種級別可選,在除錯的時候可選擇debug模式。

2.4 啟動filebeat收集日誌

所有配置完成之後,就可以啟動filebeat,開啟收集日誌程序了,啟動方式如下:

cd /usr/local/filebeat

nohup ./filebeat -e -c filebeat.yml &

這樣,就把filebeat程序放到後臺執行起來了。啟動後,在當前目錄下會生成一個nohup.out檔案,可以檢視filebeat啟動日誌和執行狀態。

2.5 檢視訊息

登入到Kafka叢集節點,執行

bin/kafka-topics.sh --zookeeper 192.168.5.4:2181,192.168.5.5:2181,192.168.5.6:2181 --list

bin/kafka-console-consumer.sh --bootstrap-server 192.168.5.4:9092,192.168.5.5:9092,192.168.5.6:9092 --topic osmessages

2.5 filebeat輸出資訊格式解讀

這裡以作業系統中/var/log/secure檔案的日誌格式為例,選取一個SSH登入系統失敗的日誌,內容如下:

Oct  8 21:52:17 node-01 sshd[33361]: Failed password for root from 192.168.5.11 port 49424 ssh2

filebeat接收到/var/log/secure日誌後,會將上面日誌傳送到kafka叢集,在kafka任意一個節點上,消費輸出日誌內容如下:

{"@timestamp":"2021-10-08T13:55:06.457Z",
"@metadata":{"beat":"filebeat","type":"_doc","version":"7.15.0"},
"input":{"type":"log"},
"fields":{"log_topic":"osmessages"},
"host":{"name":"192.168.5.3"},
"agent":{"type":"filebeat","version":"7.15.0","hostname":"filebeat-server","ephemeral_id":"4255551e-797c-4700-94c8-a3e884f8d396","id":"caea7d53-4c44-46f9-a809-ce2d99baf85b","name":"192.168.5.3"},
"ecs":{"version":"1.11.0"},
"log":{"offset":8730,"file":{"path":"/var/log/secure"}},
"message":"Oct 8 21:55:05 node-01 sshd[35904]: Failed password for root from 192.168.5.12 port 57756 ssh2"}

3 安裝並配置Logstash服務

3.1 下載與安裝Logstash

可以從elastic官網https://www.elastic.co/downloads/logstash 獲取logstash安裝包,將下載下來的安裝包直接解壓到一個路徑下即可完成logstash的安裝。根據前面的規劃,將logstash安裝到logstashserver主機(172.16.213.120)上,這裡統一將logstash安裝到/usr/local目錄下,基本操作過程如下:

tar -zxvf logstash-7.15.0-linux-x86_64.tar.gz 

mv logstash-7.15.0 /usr/local/logstash

3.2 logstash 工作原理

Logstash是一個開源的、服務端的資料處理pipeline(管道),它可以接收多個源的資料、然後對它們進行轉換、最終將它們傳送到指定型別的目的地。Logstash是通過外掛機制實現各種功能的,可以在https://github.com/logstash-plugins 下載各種功能的外掛,也可以自行編寫外掛。

Logstash實現的功能主要分為接收資料、解析過濾並轉換資料、輸出資料三個部分,對應的外掛依次是input外掛、filter外掛、output外掛,其中,filter外掛是可選的,其它兩個是必須外掛。也就是說在一個完整的Logstash配置檔案中,必須有input外掛和output外掛。

3.3 配置logstash作為轉發節點

logstash是作為一個二級轉發節點使用的,也就是它將kafka作為資料接收源,然後將資料傳送到elasticsearch叢集中,按照這個需求,新建logstash事件配置檔案kafka_os_into_es.conf,內容如下:

input {
kafka {
bootstrap_servers => "192.168.5.4:9092,192.168.5.5:9092,192.168.5.6:9092"
topics => ["osmessages"]
}
}
output {
elasticsearch {
hosts => ["192.168.5.8:9200","192.168.5.9:9200","192.168.5.10:9200"]
index => " osmessageslog-%{+YYYY-MM-dd}"
}
}

3.4 啟動logstash轉發日誌

cd /usr/local/logstash/

nohup bin/logstash -f config/kafka_os_into_es.conf &

4 安裝並配置Kibana展示日誌資料

4.1 下載與安裝Kibana

kibana使用JavaScript語言編寫,安裝部署十分簡單,即下即用,可以從elastic官網https://www.elastic.co/cn/downloads/kibana 下載所需的版本,這裡需要注意的是Kibana與Elasticsearch的版本必須一致,另外,在安裝Kibana時,要確保Elasticsearch、Logstash和kafka已經安裝完畢。

將下載下來的安裝包直接解壓到一個路徑下即可完成kibana的安裝,根據前面的規劃,將kibana安裝到主機上,然後統一將kibana安裝到/usr/local目錄下,基本操作過程如下:

tar zxvf kibana-7.15.0-linux-x86_64.tar.gz

mv kibana-7.15.0-linux-x86_64 /usr/local/kibana

4.2 配置Kibana

由於將Kibana安裝到了/usr/local目錄下,因此,Kibana的配置檔案為/usr/local/kibana/kibana.yml,Kibana配置非常簡單,這裡僅列出常用的配置項,內容如下:

cat <<EOF>>/usr/local/kibana/config/kibana.yml
server.port: 5601
server.host: "0.0.0.0"
elasticsearch.hosts: ["http://192.168.5.8:9200"]
kibana.index: ".kibana"
i18n.locale: "zh-CN"
EOF

其中,每個配置項的含義介紹如下:

  • server.port:kibana繫結的監聽埠,預設是5601。

  • server.host:kibana繫結的IP地址,如果內網訪問,設定為內網地址即可。

  • elasticsearch.url:kibana訪問ElasticSearch的地址,如果是ElasticSearch叢集,新增任一叢集節點IP即可,官方推薦是設定為ElasticSearch叢集中client node角色的節點IP。

  • kibana.index:用於儲存kibana資料資訊的索引,這個可以在kibanaweb介面中看到。

4.3 啟動Kibana服務與web配置

所有配置完成後,就可以啟動kibana了,啟動kibana服務的命令在/usr/local/kibana/bin目錄下,執行如下命令啟動kibana服務:

cd /usr/local/kibana

nohup bin/kibana --allow-root &

5 除錯並驗證日誌資料流向

經過上面的配置過程,大資料日誌分析平臺已經基本構建完成,由於整個配置架構比較複雜,這裡來梳理下各個功能模組的資料和業務流向。

本文摘抄或總結其他筆記,筆記不涉及任何商業用途,如果侵權請及時聯絡處理