企業級實戰模組三:ELK+Filebeat+Kafka+ZooKeeper構建大資料日誌分析平臺案例(下)
1 安裝並配置Kafka Broker叢集
可以從kafka官網https://kafka.apache.org/downloads獲取kafka安裝包,將下載下來的安裝包直接解壓到一個路徑下即可完成kafka的安裝,這裡統一將kafka安裝到/usr/local目錄下,基本操作過程如下:
tar -zxvf kafka_2.11-2.2.2.tgz
mv kafka_2.11-2.2.2 /usr/local/kafka
注意:file 是對Kafka有嚴格的版本限制,需要在file 官網檢視支援的版本
1.2 配置kafka叢集
這裡將kafka安裝到/usr/local目錄下,因此,kafka的主配置檔案為/usr/local/kafka/config/server.properties,這裡以節點kafkazk1為例,重點介紹一些常用配置項的含義:
broker.id=1
listeners=PLAINTEXT://192.168.5.4:9092
log.dirs=/usr/local/kafka/logs
num.partitions=6
log.retention.hours=60
log.segment.bytes=1073741824
zookeeper.connect=192.168.5.4:2181,192.168.5.5:2181,192.168.5.6:2181
auto.create.topics.enable=true
delete.topic.enable=true
每個配置項含義如下:
-
broker.id:每一個broker在叢集中的唯一表示,要求是正數。當該伺服器的IP地址發生改變時,broker.id沒有變化,則不會影響consumers的訊息情況。
-
listeners:設定kafka的監聽地址與埠,可以將監聽地址設定為主機名或IP地址,這裡將監聽地址設定為IP地址。
-
log.dirs:這個引數用於配置kafka儲存資料的位置,kafka中所有的訊息都會存在這個目錄下。可以通過逗號來指定多個路徑, kafka會根據最少被使用的原則選擇目錄分配新的parition。需要注意的是,kafka在分配parition的時候選擇的規則不是按照磁碟的空間大小來定的,而是根據分配的 parition的個數多小而定。
-
num.partitions:這個引數用於設定新建立的topic有多少個分割槽,可以根據消費者實際情況配置,配置過小會影響消費效能。這裡配置6個。
-
log.retention.hours:這個引數用於配置kafka中訊息儲存的時間,還支援log.retention.minutes和
-
log.retention.ms配置項。這三個引數都會控制刪除過期資料的時間,推薦使用log.retention.ms。如果多個同時設定,那麼會選擇最小的那個。
-
log.segment.bytes:配置partition中每個segment資料檔案的大小,預設是1GB,超過這個大小會自動建立一個新的segment file。
-
zookeeper.connect:這個引數用於指定zookeeper所在的地址,它儲存了broker的元資訊。 這個值可以通過逗號設定多個值,每個值的格式均為:hostname:port/path,每個部分的含義如下:
-
hostname:表示zookeeper伺服器的主機名或者IP地址,這裡設定為IP地址。
-
port: 表示是zookeeper伺服器監聽連線的埠號。
-
/path:表示kafka在zookeeper上的根目錄。如果不設定,會使用根目錄。
-
-
auto.create.topics.enable:這個引數用於設定是否自動建立topic,如果請求一個topic時發現還沒有建立, kafka會在broker上自動建立一個topic,如果需要嚴格的控制topic的建立,那麼可以設定
-
auto.create.topics.enable為false,禁止自動建立topic。
-
delete.topic.enable:在0.8.2版本之後,Kafka提供了刪除topic的功能,但是預設並不會直接將topic資料物理刪除。如果要從物理上刪除(即刪除topic後,資料檔案也會一同刪除),就需要設定此配置項為true。
1.3 啟動kafka叢集
在啟動kafka叢集前,需要確保ZooKeeper叢集已經正常啟動。接著,依次在kafka各個節點上執行如下命令即可:
cd /usr/local/kafka && nohup bin/kafka-server-start.sh config/server.properties &
這裡將kafka放到後臺執行,啟動後,會在啟動kafka的當前目錄下生成一個nohup.out檔案,可通過此檔案檢視kafka的啟動和執行狀態。通過jps指令,可以看到有個Kafka標識,這是kafka程序成功啟動的標誌。
1.4 kafka叢集基本命令操作
kefka提供了多個命令用於檢視、建立、修改、刪除topic資訊,也可以通過命令測試如何生產訊息、消費訊息等,這些命令位於kafka安裝目錄的bin目錄下,這裡是/usr/local/kafka/bin。登入任意一臺kafka叢集節點,切換到此目錄下,即可進行命令操作。下面列舉kafka的一些常用命令的使用方法。
1)建立一個topic,並指定topic屬性(副本數、分割槽數等)
bin/kafka-topics.sh --create --zookeeper 192.168.5.4:2181,192.168.5.5:2181,192.168.5.6:2181 --replication-factor 1 --partitions 3 --topic mytopic
2)顯示topic列表
bin/kafka-topics.sh --zookeeper 192.168.5.4:2181,192.168.5.5:2181,192.168.5.6:2181 --list
3)檢視某個topic的狀態
bin/kafka-topics.sh --describe --zookeeper 192.168.5.4:2181,192.168.5.5:2181,192.168.5.6:2181 --topic mytopic
4)生產訊息
bin/kafka-console-producer.sh --broker-list 192.168.5.4:9092,192.168.5.5:9092,192.168.5.6:9092 --topice mytopic
5)消費訊息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.5.4:9092,192.168.5.5:9092,192.168.5.6:9092 --topic mytopic --from-beginning
6)刪除topic
bin/kafka-topics.sh --zookeeper 192.168.5.4:2181,192.168.5.5:2181,192.168.5.6:2181 --delete --topic mytopic
2 安裝並配置Filebeat
2.1 為什麼要使用filebeat
Logstash功能雖然強大,但是它依賴java、在資料量大的時候,Logstash程序會消耗過多的系統資源,這將嚴重影響業務系統的效能,而filebeat就是一個完美的替代者,filebeat是Beat成員之一,基於Go語言,沒有任何依賴,配置檔案簡單,格式明瞭,同時,filebeat比logstash更加輕量級,所以佔用系統資源極少,非常適合安裝在生產機器上。
2.2 下載與安裝filebeat
由於filebeat基於go語言開發,無其他任何依賴,因而安裝非常簡單,可以從elastic官網https://www.elastic.co/downloads/beats/filebeat 獲取filebeat安裝包,將下載下來的安裝包直接解壓到一個路徑下即可完成filebeat的安裝。根據前面的規劃,將filebeat安裝到filebeatserver主機上,這裡設定將filebeat安裝到/usr/local目錄下,基本操作過程如下:
tar -zxvf filebeat-7.15.0-linux-x86_64.tar.gz
mv filebeat-7.15.0-linux-x86_64 /usr/local/filebeat
2.3 配置filebeat
filebeat的配置檔案目錄為/usr/local/filebeat/filebeat.yml,這裡僅列出常用的配置項,內容如下:
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/messages
- /var/log/secure
fields:
log_topic: osmessages
name: "192.168.5.3"
output.kafka:
enabled: true
hosts: ["192.168.5.4:9092","192.168.5.5:9092","192.168.5.6:9092"]
# version: "2.2.2"
topic: '%{[fields][log_topic]}'
partition.round_robin:
reachable_only: true
worker: 2
required_acks: 1
compression: gzip
max_message_bytes: 10000000
logging.level: debug
配置項的含義介紹如下:
-
filebeat.inputs:用於定義資料原型。
-
type:指定資料的輸入型別,這裡是log,即日誌,是預設值,還可以指定為stdin,即標準輸入。
-
enabled: true:啟用手工配置filebeat,而不是採用模組方式配置filebeat。
-
paths:用於指定要監控的日誌檔案,可以指定一個完整路徑的檔案,也可以是一個模糊匹配格式,例如:
- /data/nginx/logs/nginx_*.log,該配置表示將獲取/data/nginx/logs目錄下的所有以.log結尾的檔案,注意這裡有個破折號“-”,要在paths配置項基礎上進行縮排,不然啟動filebeat會報錯,另外破折號前面不能有tab縮排,建議通過空格方式縮排。
- /var/log/*.log,該配置表示將獲取/var/log目錄的所有子目錄中以”.log”結尾的檔案,而不會去查詢/var/log目錄下以”.log”結尾的檔案。
-
name: 設定filebeat收集的日誌中對應主機的名字,如果配置為空,則使用該伺服器的主機名。這裡設定為IP,便於區分多臺主機的日誌資訊。
-
output.kafka:filebeat支援多種輸出,支援向kafka,logstash,elasticsearch輸出資料,這裡的設定是將資料輸出到kafka。
-
enabled:表明這個模組是啟動的。
-
host: 指定輸出資料到kafka叢集上,地址為kafka叢集IP加埠號。
-
topic:指定要傳送資料給kafka叢集的哪個topic,若指定的topic不存在,則會自動建立此topic。注意topic的寫法,在filebeat6.x之前版本是通過“%{[type]}”來自動獲取document_type配置項的值。而在filebeat6.x之後版本是通過'%{[fields][log_topic]}'來獲取日誌分類的。
-
logging.level:定義filebeat的日誌輸出級別,有critical、error、warning、info、debug五種級別可選,在除錯的時候可選擇debug模式。
2.4 啟動filebeat收集日誌
所有配置完成之後,就可以啟動filebeat,開啟收集日誌程序了,啟動方式如下:
cd /usr/local/filebeat
nohup ./filebeat -e -c filebeat.yml &
這樣,就把filebeat程序放到後臺執行起來了。啟動後,在當前目錄下會生成一個nohup.out檔案,可以檢視filebeat啟動日誌和執行狀態。
2.5 檢視訊息
登入到Kafka叢集節點,執行
bin/kafka-topics.sh --zookeeper 192.168.5.4:2181,192.168.5.5:2181,192.168.5.6:2181 --list
bin/kafka-console-consumer.sh --bootstrap-server 192.168.5.4:9092,192.168.5.5:9092,192.168.5.6:9092 --topic osmessages
2.5 filebeat輸出資訊格式解讀
這裡以作業系統中/var/log/secure檔案的日誌格式為例,選取一個SSH登入系統失敗的日誌,內容如下:
Oct 8 21:52:17 node-01 sshd[33361]: Failed password for root from 192.168.5.11 port 49424 ssh2
filebeat接收到/var/log/secure日誌後,會將上面日誌傳送到kafka叢集,在kafka任意一個節點上,消費輸出日誌內容如下:
{"@timestamp":"2021-10-08T13:55:06.457Z",
"@metadata":{"beat":"filebeat","type":"_doc","version":"7.15.0"},
"input":{"type":"log"},
"fields":{"log_topic":"osmessages"},
"host":{"name":"192.168.5.3"},
"agent":{"type":"filebeat","version":"7.15.0","hostname":"filebeat-server","ephemeral_id":"4255551e-797c-4700-94c8-a3e884f8d396","id":"caea7d53-4c44-46f9-a809-ce2d99baf85b","name":"192.168.5.3"},
"ecs":{"version":"1.11.0"},
"log":{"offset":8730,"file":{"path":"/var/log/secure"}},
"message":"Oct 8 21:55:05 node-01 sshd[35904]: Failed password for root from 192.168.5.12 port 57756 ssh2"}
3 安裝並配置Logstash服務
3.1 下載與安裝Logstash
可以從elastic官網https://www.elastic.co/downloads/logstash 獲取logstash安裝包,將下載下來的安裝包直接解壓到一個路徑下即可完成logstash的安裝。根據前面的規劃,將logstash安裝到logstashserver主機(172.16.213.120)上,這裡統一將logstash安裝到/usr/local目錄下,基本操作過程如下:
tar -zxvf logstash-7.15.0-linux-x86_64.tar.gz
mv logstash-7.15.0 /usr/local/logstash
3.2 logstash 工作原理
Logstash是一個開源的、服務端的資料處理pipeline(管道),它可以接收多個源的資料、然後對它們進行轉換、最終將它們傳送到指定型別的目的地。Logstash是通過外掛機制實現各種功能的,可以在https://github.com/logstash-plugins 下載各種功能的外掛,也可以自行編寫外掛。
Logstash實現的功能主要分為接收資料、解析過濾並轉換資料、輸出資料三個部分,對應的外掛依次是input外掛、filter外掛、output外掛,其中,filter外掛是可選的,其它兩個是必須外掛。也就是說在一個完整的Logstash配置檔案中,必須有input外掛和output外掛。
3.3 配置logstash作為轉發節點
logstash是作為一個二級轉發節點使用的,也就是它將kafka作為資料接收源,然後將資料傳送到elasticsearch叢集中,按照這個需求,新建logstash事件配置檔案kafka_os_into_es.conf,內容如下:
input {
kafka {
bootstrap_servers => "192.168.5.4:9092,192.168.5.5:9092,192.168.5.6:9092"
topics => ["osmessages"]
}
}
output {
elasticsearch {
hosts => ["192.168.5.8:9200","192.168.5.9:9200","192.168.5.10:9200"]
index => " osmessageslog-%{+YYYY-MM-dd}"
}
}
3.4 啟動logstash轉發日誌
cd /usr/local/logstash/
nohup bin/logstash -f config/kafka_os_into_es.conf &
4 安裝並配置Kibana展示日誌資料
4.1 下載與安裝Kibana
kibana使用JavaScript語言編寫,安裝部署十分簡單,即下即用,可以從elastic官網https://www.elastic.co/cn/downloads/kibana 下載所需的版本,這裡需要注意的是Kibana與Elasticsearch的版本必須一致,另外,在安裝Kibana時,要確保Elasticsearch、Logstash和kafka已經安裝完畢。
將下載下來的安裝包直接解壓到一個路徑下即可完成kibana的安裝,根據前面的規劃,將kibana安裝到主機上,然後統一將kibana安裝到/usr/local目錄下,基本操作過程如下:
tar zxvf kibana-7.15.0-linux-x86_64.tar.gz
mv kibana-7.15.0-linux-x86_64 /usr/local/kibana
4.2 配置Kibana
由於將Kibana安裝到了/usr/local目錄下,因此,Kibana的配置檔案為/usr/local/kibana/kibana.yml,Kibana配置非常簡單,這裡僅列出常用的配置項,內容如下:
cat <<EOF>>/usr/local/kibana/config/kibana.yml
server.port: 5601
server.host: "0.0.0.0"
elasticsearch.hosts: ["http://192.168.5.8:9200"]
kibana.index: ".kibana"
i18n.locale: "zh-CN"
EOF
其中,每個配置項的含義介紹如下:
-
server.port:kibana繫結的監聽埠,預設是5601。
-
server.host:kibana繫結的IP地址,如果內網訪問,設定為內網地址即可。
-
elasticsearch.url:kibana訪問ElasticSearch的地址,如果是ElasticSearch叢集,新增任一叢集節點IP即可,官方推薦是設定為ElasticSearch叢集中client node角色的節點IP。
-
kibana.index:用於儲存kibana資料資訊的索引,這個可以在kibanaweb介面中看到。
4.3 啟動Kibana服務與web配置
所有配置完成後,就可以啟動kibana了,啟動kibana服務的命令在/usr/local/kibana/bin目錄下,執行如下命令啟動kibana服務:
cd /usr/local/kibana
nohup bin/kibana --allow-root &
5 除錯並驗證日誌資料流向
經過上面的配置過程,大資料日誌分析平臺已經基本構建完成,由於整個配置架構比較複雜,這裡來梳理下各個功能模組的資料和業務流向。
本文摘抄或總結其他筆記,筆記不涉及任何商業用途,如果侵權請及時聯絡處理