1. 程式人生 > 其它 >Rsyslog+kafka+ELK(叢集)部署

Rsyslog+kafka+ELK(叢集)部署

目前公司做等保,需要有日誌審計,初步考慮使用rsyslog把所有伺服器的日誌收集起來。同時考慮到我們運維過程中也要查詢日誌,要把rsyslog收集的日誌可以統一介面來查詢使用
收集的日誌型別:系統日誌,mysql日誌,防火牆,waf,F5日誌,sftp,smtp日誌等
開源產品:Rsyslog、Kafka、ELK
處理流程為:Vm Rsyslog--> Rsyslog Server --omkafka--> Kafka --> Logstash --> Elasticsearch --> Kibana
ps:omkafka模組在rsyslog v8.7.0之後的版本才支援

環境:

Server IPADDR Applicaiton
ELK Node1 10.10.27.125 zookeeper kafka elasticsearch logstash kibana
ELK Node1 10.10.27.126 zookeeper kafka elasticsearch logstash
ELK Node1 10.10.27.127 zookeeper kafka elasticsearch logstash
Rsyslog server 10.10.27.121 Rsyslog server
Rsyslog Node 10.10.27.122 Rsyslog client

1、安裝docker和docker-compose

本人使用的是RHEL系統,使用redhat的extras源安裝docker

yum install -y docker
wget https://github.com/docker/compose/releases/download/1.25.5/docker-compose-Linux-x86_64
mv docker-compose-Linux-x86_64 /usr/bin/docker-compose

2、拉取需要用到的映象

docker pull zookeeper:3.4.13
docker pull wurstmeister/kafka
docker pull elasticsearch:7.7.0
docker pull daocloud.io/library/kibana:7.7.0
docker pull daocloud.io/library/logstash:7.7.0
docker tag wurstmeister/kafka:latest kafka:2.12-2.5.0
docker tag docker.io/zookeeper:3.4.13 docker.io/zookeeper:3.4.13
docker tag daocloud.io/library/kibana:7.7.0 kibana:7.7.0
docker tag daocloud.io/library/logstash:7.7.0 logstash:7.7.0

3、準備應用的配置檔案

mkdir -p /data/zookeeper
mkdir -p /data/kafka
mkdir -p /data/logstash/conf
mkdir -p /data/es/conf 
mkdir -p /data/es/data
chmod 777 /data/es/data
mkdir -p /data/kibana

~]# cat /data/es/conf/elasticsearch.yml 
cluster.name: es-cluster
network.host: 0.0.0.0
node.name: master1    #每臺節點需要更改此node.name,e.g master2,master3
http.cors.enabled: true
http.cors.allow-origin: "*"
node.master: true
node.data: true
network.publish_host: 10.10.27.125      #每臺節點需要更改為本機IP地址
discovery.zen.minimum_master_nodes: 1
discovery.zen.ping.unicast.hosts: ["10.10.27.125","10.10.27.126","10.10.27.127"]
cluster.initial_master_nodes: ["10.10.27.125","10.10.27.126","10.10.27.127"]

#elasticsearch啟動過程會有報錯,提前做以下操作
~]# vim /etc/sysctl.conf
vm.max_map_count=655350
~]# sysctl -p
~]# cat /etc/security/limits.conf
*	-	nofile	100000
*	-	fsize	unlimited
*	-	nproc	100000	#	#	#unlimited nproc for *


~]# cat /data/logstash/conf/logstash.conf 
input{
  kafka{
    topics => ["system-log"]   #必須可前文的topic統一
      bootstrap_servers => ["10.10.27.125:9092,10.10.27.126:9092,10.10.27.127:9092"]
  }
}
filter{
  grok{
    match =>{ 
      "message" => "%{SYSLOGTIMESTAMP:timestamp} %{IP:ip} %{DATA:syslog_program} %{GREEDYDATA:message}"
    }
    overwrite => ["message"]
  }
  date {
    match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
  }
}

output{
  elasticsearch{
    hosts => ["10.10.27.125:9200","10.10.27.126:9200","10.10.27.127:9200"]
    index => "system-log-%{+YYYY.MM.dd}"
  }
  stdout{
    codec => rubydebug
  }
}


~]# cat /data/logstash/conf/logstash.yml
http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.hosts: [ "http://10.10.27.125:9200","http://10.10.27.126:9200","http://10.10.27.127:9200" ]

~]# cat /data/kibana/conf/kibana.yml 
#
# ** THIS IS AN AUTO-GENERATED FILE **
#

# Default Kibana configuration for docker target
server.name: kibana
server.host: "0.0.0.0"
elasticsearch.hosts: [ "http://10.10.27.125:9200","http://10.10.27.126:9200","http://10.10.27.127:9200" ]
monitoring.ui.container.elasticsearch.enabled: true

4、編輯docker-compose.yml配置

~]# mkdir /data/elk
~]# cat /data/elk/docker-compose.yml
version: '2.1'   #必須2.1往上,不然會報版本格式錯誤
services:
  elasticsearch:
    image: elasticsearch:7.7.0
    container_name: elasticsearch
    environment:
      ES_JAVA_OPTS: -Xms1g -Xmx1g
    network_mode: host
    volumes:
      - /data/es/conf/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
      - /data/es/data:/usr/share/elasticsearch/data
    logging:
      driver: json-file
    ports:
      - 9200:9200
  kibana:
    image: kibana:7.7.0
    container_name: kibana
    links:
      - elasticsearch
    depends_on:
      - elasticsearch   #kibana在elasticsearch啟動之後再啟動
    volumes:
      - /data/kibana/conf/kibana.yml:/usr/share/kibana/config/kibana.yml
    logging:
      driver: json-file
    ports:
      - 5601:5601
  logstash:
    image: logstash:7.7.0
    container_name: logstash
    volumes:
      - /data/logstash/conf/logstash.conf:/usr/share/logstash/pipeline/logstash.conf
      - /data/logstash/conf/logstash.yml:/usr/share/logstash/config/logstash.yml
    depends_on:
      - elasticsearch
    links:
      - elasticsearch:es
    logging:
      driver: json-file
    ports:
      - 4560:4560

  zookeeper:
    image: zookeeper:3.4.13
    container_name: zookeeper
    environment:
      ZOO_PORT: 2181
      ZOO_DATA_DIR: /data/zookeeper/data
      ZOO_DATA_LOG_DIR: /data/zookeeper/logs
      ZOO_MY_ID: 1    #三臺機器做叢集的話,其他2臺需要更改此ID,e.g 2,3
      ZOO_SERVERS: "server.1=10.10.27.125:2888:3888 server.2=10.10.27.126:2888:3888 server.3=10.10.27.127:2888:3888"
    volumes:
      - /data/zookeeper:/data/zookeeper
    network_mode: host
    logging:
      driver: json-file
    ports:
      - 2181:2181
  
  kafka:
    image: kafka:2.12-2.5.0
    container_name: kafka
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1  #kafka 的 broker 叢集標識, 每臺節點 broker 不一樣,其他2臺更改此ID
      KAFKA_PORT: 9092
      KAFKA_HEAP_OPTS: "-Xms1g -Xmx1g"
      KAFKA_HOST_NAME: 10.10.27.125        #其他2臺更改為自己IP地址
      KAFKA_ADVERTISED_HOST_NAME: 10.10.27.125    #其他2臺更改為自己IP地址
      KAFKA_LOG_DIRS: /data/kafka
      KAFKA_ZOOKEEPER_CONNECT: 10.10.27.125:2181,10.10.27.126:2181,10.10.27.127:2181
    network_mode: host
    volumes:
      - /data:/data
    logging:
      driver: json-file

5、部署ELK

#開始部署(三臺節點分別修改配置檔案和docker-compose配置)
~]# docker-compose up -d
#停止執行的容器例項
~]# docker-compose stop
#單獨啟動容器
~]# docker-compose up -d kafka

6、驗證叢集狀態

(1) 驗證zookeeper:

]# docker exec -it zookeeper bash
bash-4.4# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Mode: follower

(2) 驗證kafka:

]# docker exec -it kafka bash
bash-4.4# kafka-topics.sh --list --zookeeper 10.10.27.125:2181
__consumer_offsets
system-log

(3) 驗證elasticsearch

]# curl '10.10.27.125:9200/_cat/nodes?v'
ip            heap.percent ram.percent cpu load_1m load_5m load_15m node.role master name
10.10.27.126           57          81   0    0.37    0.15     0.09 dilmrt    *      master2
10.10.27.125           34          83   0    0.11    0.10     0.06 dilmrt    -      master1
10.10.27.127           24          81   0    0.03    0.06     0.06 dilmrt    -      master3

(4) 驗證kibana

瀏覽器開啟http://10.10.27.125:5601

7、部署rsyslog把日誌匯入ELK

(1) rsyslog服務端

~]# cat /etc/rsyslog.conf 
# Provides UDP syslog reception
$ModLoad imudp
$UDPServerRun 514

# Provides TCP syslog reception
$ModLoad imtcp
$InputTCPServerRun 514

~]# cat /etc/rsyslog.d/default.conf
#### GLOBAL DIRECTIVES ####
# Use default timestamp format  # 使用自定義的日誌格式
$ActionFileDefaultTemplate RSYSLOG_TraditionalFileFormat
$template myFormat,"%timestamp% %fromhost-ip% %syslogtag% %msg%\n"
$ActionFileDefaultTemplate myFormat

# 根據客戶端的IP單獨存放主機日誌在不同目錄,rsyslog需要手動建立
$template RemoteLogs,"/data/rsyslog/%fromhost-ip%/%fromhost-ip%_%$YEAR%-%$MONTH%-%$DAY%.log"
# 排除本地主機IP日誌記錄,只記錄遠端主機日誌
:fromhost-ip, !isequal, "127.0.0.1" ?RemoteLogs
~]# systemctl restart rsyslog
~]# tree /data/rsyslog
/data/rsyslog/
├── 10.10.27.122
│   ├── 10.10.27.122_2020-06-27.log
│   ├── 10.10.27.122_2020-06-28.log
│   ├── 10.10.27.122_2020-06-29.log
│   ├── 10.10.27.122_2020-06-30.log
│   ├── 10.10.27.122_2020-07-01.log
│   └── 10.10.27.122_2020-07-02.log
├── 10.10.27.123
│   ├── 10.10.27.123_2020-06-27.log
│   ├── 10.10.27.123_2020-06-28.log
│   ├── 10.10.27.123_2020-06-29.log
│   ├── 10.10.27.123_2020-06-30.log
│   ├── 10.10.27.123_2020-07-01.log
│   ├── 10.10.27.123_2020-07-02.log
│   └── 10.10.27.123_2020-07-03.log
├── 10.10.27.125
│   ├── 10.10.27.125_2020-07-02.log
│   └── 10.10.27.125_2020-07-03.log
├── 10.10.27.126
│   ├── 10.10.27.126_2020-07-02.log
│   └── 10.10.27.126_2020-07-03.log
└── 10.10.27.127
    └── 10.10.27.127_2020-07-02.log

由於上面的模板是把所有收集的日誌放到一個檔案中,感覺檢視不方便,所以可以使用下面的模板定義日誌分message和secure區分,同時,分月份作為目錄存放日誌檔案

~]# cat /etc/rsyslog.d/default.conf
#### GLOBAL DIRECTIVES ####
# Use default timestamp format  # 使用自定義的日誌格式
$ActionFileDefaultTemplate RSYSLOG_TraditionalFileFormat
$template myFormat,"%timestamp% %fromhost-ip% %syslogtag% %msg%\n"
$ActionFileDefaultTemplate myFormat

# 根據客戶端的IP單獨存放主機日誌在不同目錄,目錄需要手動建立
$template RemoteMessageLogs,"/data/system_rsyslog/%fromhost-ip%_%HOSTNAME%/%$YEAR%-%$MONTH%/message_%$YEAR%-%$MONTH%-%$DAY%.log"
$template RemoteSecureLogs,"/data/system_rsyslog/%fromhost-ip%_%HOSTNAME%/%$YEAR%-%$MONTH%/secure_%$YEAR%-%$MONTH%-%$DAY%.log"
$template RemoteMailLogs,"/data/system_rsyslog/%fromhost-ip%_%HOSTNAME%/%$YEAR%-%$MONTH%/mail_%$YEAR%-%$MONTH%-%$DAY%.log"
#交換機和防火牆日誌單獨目錄存放
$template NetworkLogs,"/data/Network_rsyslog/%fromhost-ip%/%$YEAR%-%$MONTH%/%fromhost-ip%_%$YEAR%-%$MONTH%-%$DAY%.log"
$template FirewallkLogs,"/data/Network_rsyslog/%fromhost-ip%_%HOSTNAME%/%$YEAR%-%$MONTH%/%fromhost-ip%_%$YEAR%-%$MONTH%-%$DAY%.log"

if prifilt("mail.*") then {
    :fromhost-ip, startswith, "10.10." ?RemoteMailLogs
    :fromhost-ip, isequal, "127.0.0.1" ?RemoteMailLogs
}
if prifilt("authpriv.*") then {
    :fromhost-ip, startswith, "10.10." ?RemoteSecureLogs
    :fromhost-ip, isequal, "127.0.0.1" ?RemoteSecureLogs
}
if prifilt("*.info;mail.none;authpriv.none") then {
    :fromhost-ip, startswith, "10.10." ?RemoteMessageLogs
    :fromhost-ip, isequal, "127.0.0.1" ?RemoteMessageLogs
}
if prifilt("*.*") then {
    :fromhost-ip, startswith, "172.16.10." ?NetworkLogs
    :fromhost-ip, startswith, "10.110." ?FirewallkLogs
}
& ~   #其他無關日誌丟棄掉,不然會寫到messages

~]# systemctl restart rsyslog
~]# tree /data/
/data/
├── Network_rsyslog
│   ├── 10.110.100.10_PA-3050-A
│   │   └── 2020-07
│   │       ├── 10.110.3.130_2020-07-08.log
│   │       ├── 10.110.3.130_2020-07-09.log
│   │       └── 10.110.3.130_2020-07-10.log
│   ├── 172.16.100.13
│   │   └── 2020-07
│   │       ├── 172.16.10.133_2020-07-08.log
│   │       ├── 172.16.10.133_2020-07-09.log
│   │       └── 172.16.10.133_2020-07-10.log
├── system_rsyslog
│   ├── 10.10.24.103_dpsvqaosch01
│   │   └── 2020-07
│   │       ├── message_2020-07-09.log
│   │       ├── message_2020-07-10.log
│   │       ├── secure_2020-07-09.log
│   │       └── secure_2020-07-10.log
│   ├── 10.10.24.104_dpsvqaosch02
│   │   └── 2020-07
│   │       ├── message_2020-07-08.log
│   │       ├── message_2020-07-09.log
│   │       ├── message_2020-07-10.log
│   │       └── secure_2020-07-08.log
#定時清理超過180天的日誌
~]# crontab -e
0 6 1 * * /bin/find /data/rsyslog/*/*  -type d -mtime +180|xargs rm -rf

為了把rsyslog server收集的日誌資料匯入到ELK中,需要在rsyslog server使用到omkafka的模組

~]# yum -y install rsyslog-kafka
~]# cat /etc//rsyslog.d/kafka.conf
# 載入omkafka和imfile模組
module(load="omkafka")
module(load="imfile")
 
# syslog template
template(name="SystemlogTemplate" type="string" string="%msg%\n")
 
# ruleset
ruleset(name="systemlog-kafka") {
    #日誌轉發kafka
    action (
        type="omkafka"
	template="SystemlogTemplate"
        topic="system-log"
        broker="10.10.27.125:9092,10.10.27.126:9092,10.10.27.127:9092"
    )
}

input(type="imfile" Tag="Systemlog" File="/data/*rsyslog/*/*/*.log" Ruleset="systemlog-kafka"

~]# systemctl restart rsyslog

2、 Rsyslog客戶端

~]# cat /etc/rsyslog.conf #追加一行
*.*	@10.10.27.121:514
#所有日誌通過UDP傳輸給rsyslog server
~]# systemctl restart rsyslog

至此,rsyslog準備完畢,驗證/data/rsyslog下是否產生日誌檔案,驗證ELK內是否出現索引,建立索引後看到日誌內容