Logstash5.2.x升級至6.5.x
第1章 環境說明:
現有架構為elk+kafka+filebeat,elk各元件為5.2.x版本
[[email protected] ~]# rpm -qa |grep logstash
logstash-5.2.2-1.noarch
[[email protected] ~]# java -version
java version "1.8.0_181"
Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)
由於logstash5.x版本不支援獨立的pipeline,需要大量的if-else判斷,配置檔案管理起來也比較複雜,而新的pipeline配置相對獨立,可以針對每個業務的日誌型別來進行管理
這裡只升級了logstash元件,驗證是可以和es 5.x版本配置使用的,沒有問題
第2章 配置檔案除錯(這裡以nginx日誌為例)
2.1主配置檔案:
cat /etc/logstash/pipelines.yml
- pipeline.id: nginx_access
path.config: "/etc/logstash/conf.d/nginx_access.yml"
2.2管道檔案
cat /etc/logstash/conf.d/nginx_access.yml
input {
kafka {
bootstrap_servers => "127.0.0.1:9020"
group_id => "logstash"
consumer_threads => 5
topics => "nginx_access"
codec => "json"
}
}
filter {
grok {
patterns_dir => [ "/etc/logstash/patterns.d/" ]
match => {
message => ["%{WPT_NGX_COMM}"]
}
}
mutate {
split => ["request" , "?"]
add_field => {
"uri_path" => "%{[request][0]}"
"uri_query" => "%{[request][1]}"
}
remove_field => ["request"]
convert => {
"response" => "integer"
"body_bytes_sent" => "integer"
"request_time" => "float"
"upstream_response_time" => "float"
}
}
useragent {
source => "user_agent"
lru_cache_size => 5000
}
date {
timezone => "Asia/Shanghai"
match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z", "UNIX", "yyyy-MM-dd HH:mm:ss", "dd-MMM-yyyy HH:mm:ss" ]
target => "@timestamp"
remove_field => "timestamp"
}
}
output {
elasticsearch {
hosts => ["http://127.0.0.1:9200"]
index => "%{type}-%{+YYYY.MM.dd}"
}
}
除錯期間可以使用./bin/logstash –r 來檢查配置
第3章 logstash升級至6.5.4
3.1停止服務:
systemctl stop logstash.service
3.2解除安裝舊版本logstash
rpm –e logstash-5.2.2-1.noarch
3.3安裝新版logstash
yum localinstall logstash-6.5.4.rpm
配置檔案我本地打包好了上傳到伺服器上
在pipeline檔案中,我只開啟了一個管道進行驗證服務是否有問題,並且在output中,同時讓資料打到檔案一份,用來驗證資料解析是否正常,最後啟動logstash服務即可
output {
elasticsearch {
hosts => ["http://127.0.0.1:9200"]
index => "%{type}-%{+YYYY.MM.dd}"
}
file {
path => "/tmp/test.log"
}
日誌中沒有報錯,並且logstash工作也是正常的,但是有警告,網上查了一下,可能是與kafka版本不匹配而導致
[2018-12-26T14:39:05,424][WARN ][org.apache.kafka.common.utils.AppInfoParser] Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=logstash-7
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) ~[?:1.8.0_121]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) ~[?:1.8.0_121]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) ~[?:1.8.0_121]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) ~[?:1.8.0_121]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) ~[?:1.8.0_121]
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[?:1.8.0_121]
at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62) [kafka-clients-2.0.1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:791) [kafka-clients-2.0.1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:650) [kafka-clients-2.0.1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:630) [kafka-clients-2.0.1.jar:?]
at sun.reflect.GeneratedConstructorAccessor47.newInstance(Unknown Source) [?:1.8.0_121]
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) [?:1.8.0_121]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423) [?:1.8.0_121]