1. 程式人生 > 實用技巧 >Elasticsearch優化 & filebeat配置檔案優化 & logstash格式配置 & grok實踐

Elasticsearch優化 & filebeat配置檔案優化 & logstash格式配置 & grok實踐

Elasticsearch優化 & filebeat配置檔案優化 & logstash格式配置 & grok實踐

編碼轉換問題(主要就是中文亂碼)

(1)input 中的codec => plain 轉碼

codec => plain {
         charset => "GB2312"
}

將GB2312 的文字編碼,轉為UTF-8 的編碼

(2)也可以在filebeat中實現編碼的轉換(推薦)

filebeat.prospectors:
- input_type: log
  paths:
    - c:\Users\Administrator\Desktop\performanceTrace.txt
  encoding: GB2312

刪除多餘日誌中的多餘行

(1)logstash filter 中drop 刪除

    if ([message] =~ "^20.*-\ task\ request,.*,start\ time.*") {   #用正則需刪除的多餘行
            drop {}
    } 

(2)日誌示例

2020-03-20 10:44:01,523 [33]DEBUG Debug - task request,task Id:1cbb72f1-a5ea-4e73-957c-6d20e9e12a7a,start time:2018-03-20 10:43:59   #需刪除的行
-- Request String : {"UserName":"15046699923","Pwd":"ZYjyh727","DeviceType":2,"DeviceId":"PC-20170525SADY","EquipmentNo":null,"SSID":"pc","RegisterPhones":null,"AppKey":"ab09d78e3b2c40b789ddfc81674bc24deac","Version":"2.0.5.3"} -- End
-- Response String : {"ErrorCode":0,"Success":true,"ErrorMsg":null,"Result":null,"WaitInterval":30} -- End

grok 處理多種日誌不同的行(重點)

(1)日誌示例:

2020-03-20 10:44:01,523 [33]DEBUG Debug - task request,task Id:1cbb72f1-a5ea-4e73-957c-6d20e9e12a7a,start time:2018-03-20 10:43:59
-- Request String : {"UserName":"15046699923","Pwd":"ZYjyh727","DeviceType":2,"DeviceId":"PC-20170525SADY","EquipmentNo":null,"SSID":"pc","RegisterPhones":null,"AppKey":"ab09d78e3b2c40b789ddfc81674bc24deac","Version":"2.0.5.3"} -- End
-- Response String : {"ErrorCode":0,"Success":true,"ErrorMsg":null,"Result":null,"WaitInterval":30} -- End

在logstash filter中grok 分別處理3行

match => {
    "message" => "^20.*-\ task\ request,.*,start\ time\:%{TIMESTAMP_ISO8601:RequestTime}"
}

match => {
    "message" => "^--\ Request\ String\ :\ \{\"UserName\":\"%{NUMBER:UserName:int}\",\"Pwd\":\"(?<Pwd>.*)\",\"DeviceType\":%{NUMBER:DeviceType:int},\"DeviceId\":\"(?<DeviceId>.*)\",\"EquipmentNo\":(?<EquipmentNo>.*),\"SSID\":(?<SSID>.*),\"RegisterPhones\":(?<RegisterPhones>.*),\"AppKey\":\"(?<AppKey>.*)\",\"Version\":\"(?<Version>.*)\"\}\ --\ \End.*"    
}

match => {
    "message" => "^--\ Response\ String\ :\ \{\"ErrorCode\":%{NUMBER:ErrorCode:int},\"Success\":(?<Success>[a-z]*),\"ErrorMsg\":(?<ErrorMsg>.*),\"Result\":(?<Result>.*),\"WaitInterval\":%{NUMBER:WaitInterval:int}\}\ --\ \End.*"
}

... 等多行

(2)日誌示例:

# 這是一條INFO 日誌
2018-09-06 21:21:40.536 [490343b4207b39e5,490343b4207b39e5] [reactor-http-epoll-4] INFO  c.w.w.p.i.config.SecurityFilter - [filter,75] - skipFlag:false  uri:/report-server/daily/queryDailyReportChannel authorization:GbUzq6IElKkvRswreIHd8Xv/YMDd885jyINObc543vx2H+0lhdu0p5bOu0Vd9PT+jgxJpXHYyZiPgQmyio5Sfg==

# 這個一條ERROR日誌
2018-09-06 21:21:15.863 [548809be071dd887,548809be071dd887] [reactor-http-epoll-4] ERROR c.w.w.c.e.WebExceptionHandler - [handle,34] - 系統異常:/report-server/game/queryPartnerGameReport\ncom.wbgg.wbcommon.core.base.exception.BusinessException: 您的賬號未登入,請登入後再操作!\n\tat com.wbgg.wbcommon.core.base.wrapper.Wrapper.check(Wrapper.java:155)\n\tat com.wbgg.wbgateway.pc.infrastructure.config.SecurityFilter.filter(SecurityFilter.java:86)\n\tat org.springframework.cloud.gateway.handler.FilteringWebHandler$GatewayFilterAdapter.filter(FilteringWebHandler.java:135)\n\tat org.springframework.cloud.gateway.filter.OrderedGatewayFilter.filter(OrderedGatewayFilter.java:44)\n\tat org.springframework.cloud.gateway.handler.FilteringWebHandler$DefaultGatewayFilterChain.lambda$filter$0(FilteringWebHandler.java:117)\n\tat reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:44)\n\tat reactor.core.publisher.MonoLift.subscribe(MonoLift.java:45)\n\tat reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)\n\tat reactor.core.publisher.MonoLift.subscribe(MonoLift.java:45)\n\tat reactor.core.publisher.Mono.subscribe(Mono.java:3695)\n\tat reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:172)\n\tat reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56)\n\tat reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:55)\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:96)\n\tat reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:96)\n\tat reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:96)\n\tat reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:275)\n\tat reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:849)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:96)\n\tat reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:96)\n\tat reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:96)\n\tat reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1505)\n\tat reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241)\n\tat reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2070)\n\tat reactor.core.publisher.MonoFlatMap$FlatMapInner.onSubscribe(MonoFlatMap.java:230)\n\tat reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)\n\tat reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:55)\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:96)\n\tat reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:96)\n\tat reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:96)\n\tat reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:275)\n\tat reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:849)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:96)\n\tat reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:96)\n\tat reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:96)\n\tat reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1505)\n\tat reactor.core.publisher.MonoFilterWhen$MonoFilterWhenMain.innerResult(MonoFilterWhen.java:193)\n\tat reactor.core.publisher.MonoFilterWhen$FilterWhenInner.onNext(MonoFilterWhen.java:260)\n\tat reactor.core.publisher.MonoFilterWhen$FilterWhenInner.onNext(MonoFilterWhen.java:228)\n\tat reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2070)\n\tat reactor.core.publisher.MonoFilterWhen$FilterWhenInner.onSubscribe(MonoFilterWhen.java:249)\n\tat reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)\n\tat reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:55)\n\tat reactor.core.publisher.Mono.subscribe(Mono.java:3695)\n\tat reactor.core.publisher.MonoFilterWhen$MonoFilterWhenMain.onNext(MonoFilterWhen.java:150)\n\tat reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2070)\n\tat reactor.core.publisher.MonoFilterWhen$MonoFilterWhenMain.onSubscribe(MonoFilterWhen.java:103)\n\tat reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)\n\tat reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:55)\n\tat reactor.core.publisher.MonoFilterWhen.subscribe(MonoFilterWhen.java:56)\n\tat reactor.core.publisher.MonoLift.subscribe(MonoLift.java:45)\n\tat reactor.core.publisher.MonoPeek.subscribe(MonoPeek.java:71)\n\tat reactor.core.publisher.MonoLift.subscribe(MonoLift.java:45)\n\tat reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44)\n\tat reactor.core.publisher.MonoLift.subscribe(MonoLift.java:45)\n\tat reactor.core.publisher.Mono.subscribe(Mono.java:3695)\n\tat reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:442)\n\tat reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:244)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:96)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:96)\n\tat reactor.core.publisher.FluxDematerialize$DematerializeSubscriber.onNext(FluxDematerialize.java:114)\n\tat reactor.core.publisher.FluxDematerialize$DematerializeSubscriber.onNext(FluxDematerialize.java:42)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:96)\n\tat reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:395)\n\tat reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:638)\n\tat reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onNext(FluxFlattenIterable.java:242)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:96)\n\tat reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:204)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:96)\n\tat reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1505)\n\tat reactor.core.publisher.MonoCollectList$MonoBufferAllSubscriber.onComplete(MonoCollectList.java:118)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:112)\n\tat reactor.core.publisher.DrainUtils.postCompleteDrain(DrainUtils.java:131)\n\tat reactor.core.publisher.DrainUtils.postComplete(DrainUtils.java:186)\n\tat reactor.core.publisher.FluxMaterialize$MaterializeSubscriber.onComplete(FluxMaterialize.java:134)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:112)\n\tat reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:325)\n\tat reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:638)\n\tat reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onComplete(FluxFlattenIterable.java:259)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:112)\n\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:144)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:112)\n\tat reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1508)\n\tat reactor.core.publisher.MonoCollectList$MonoBufferAllSubscriber.onComplete(MonoCollectList.java:118)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:112)\n\tat reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:794)\n\tat reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:560)\n\tat reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:540)\n\tat reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete(FluxFlatMap.java:426)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:112)\n\tat reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:265)\n\tat reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:201)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.request(ScopePassingSpanSubscriber.java:79)\n\tat reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:335)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onSubscribe(ScopePassingSpanSubscriber.java:71)\n\tat reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:139)\n\tat reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:63)\n\tat reactor.core.publisher.FluxLiftFuseable.subscribe(FluxLiftFuseable.java:70)\n\tat reactor.core.publisher.FluxFlatMap.subscribe(FluxFlatMap.java:97)\n\tat reactor.core.publisher.FluxLift.subscribe(FluxLift.java:46)\n\tat reactor.core.publisher.MonoCollectList.subscribe(MonoCollectList.java:59)\n\tat reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:55)\n\tat reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59)\n\tat reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:55)\n\tat reactor.core.publisher.MonoFlattenIterable.subscribe(MonoFlattenIterable.java:101)\n\tat reactor.core.publisher.FluxLiftFuseable.subscribe(FluxLiftFuseable.java:70)\n\tat reactor.core.publisher.FluxMaterialize.subscribe(FluxMaterialize.java:40)\n\tat reactor.core.publisher.FluxLift.subscribe(FluxLift.java:46)\n\tat reactor.core.publisher.MonoCollectList.subscribe(MonoCollectList.java:59)\n\tat reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:55)\n\tat reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:74)\n\tat reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:55)\n\tat reactor.core.publisher.MonoFlattenIterable.subscribe(MonoFlattenIterable.java:101)\n\tat reactor.core.publisher.FluxLiftFuseable.subscribe(FluxLiftFuseable.java:70)\n\tat reactor.core.publisher.FluxDematerialize.subscribe(FluxDematerialize.java:39)\n\tat reactor.core.publisher.FluxLift.subscribe(FluxLift.java:46)\n\tat reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)\n\tat reactor.core.publisher.FluxLift.subscribe(FluxLift.java:46)\n\tat reactor.core.publisher.FluxConcatMap.subscribe(FluxConcatMap.java:121)\n\tat reactor.core.publisher.FluxLift.subscribe(FluxLift.java:46)\n\tat reactor.core.publisher.MonoNext.subscribe(MonoNext.java:40)\n\tat reactor.core.publisher.MonoLift.subscribe(MonoLift.java:45)\n\tat reactor.core.publisher.MonoMap.subscribe(MonoMap.java:55)\n\tat reactor.core.publisher.MonoLift.subscribe(MonoLift.java:45)\n\tat reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60)\n\tat reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:55)\n\tat reactor.core.publisher.MonoSwitchIfEmpty.subscribe(MonoSwitchIfEmpty.java:44)\n\tat reactor.core.publisher.MonoLift.subscribe(MonoLift.java:45)\n\tat reactor.core.publisher.MonoMap.subscribe(MonoMap.java:55)\n\tat reactor.core.publisher.MonoLift.subscribe(MonoLift.java:45)\n\tat reactor.core.publisher.Mono.subscribe(Mono.java:3695)\n\tat reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:442)\n\tat reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:244)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:96)\n\tat reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:243)\n\tat reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:201)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.request(ScopePassingSpanSubscriber.java:79)\n\tat reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:229)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onSubscribe(ScopePassingSpanSubscriber.java:71)\n\tat reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:139)\n\tat reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:63)\n\tat reactor.core.publisher.FluxLiftFuseable.subscribe(FluxLiftFuseable.java:70)\n\tat reactor.core.publisher.FluxConcatMap.subscribe(FluxConcatMap.java:121)\n\tat reactor.core.publisher.FluxLift.subscribe(FluxLift.java:46)\n\tat reactor.core.publisher.MonoNext.subscribe(MonoNext.java:40)\n\tat reactor.core.publisher.MonoLift.subscribe(MonoLift.java:45)\n\tat reactor.core.publisher.MonoSwitchIfEmpty.subscribe(MonoSwitchIfEmpty.java:44)\n\tat reactor.core.publisher.MonoLift.subscribe(MonoLift.java:45)\n\tat reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60)\n\tat reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:55)\n\tat reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60)\n\tat reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:55)\n\tat reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)\n\tat reactor.core.publisher.MonoLift.subscribe(MonoLift.java:45)\n\tat reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)\n\tat reactor.core.publisher.MonoLift.subscribe(MonoLift.java:45)\n\tat reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)\n\tat reactor.core.publisher.MonoLift.subscribe(MonoLift.java:45)\n\tat org.springframework.cloud.sleuth.instrument.web.TraceWebFilter$MonoWebFilterTrace.subscribe(TraceWebFilter.java:180)\n\tat reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)\n\tat reactor.core.publisher.MonoLift.subscribe(MonoLift.java:45)\n\tat reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44)\n\tat reactor.core.publisher.MonoLift.subscribe(MonoLift.java:45)\n\tat reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44)\n\tat reactor.core.publisher.MonoLift.subscribe(MonoLift.java:45)\n\tat reactor.core.publisher.MonoPeekTerminal.subscribe(MonoPeekTerminal.java:61)\n\tat reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:55)\n\tat reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44)\n\tat reactor.core.publisher.MonoLift.subscribe(MonoLift.java:45)\n\tat reactor.core.publisher.Mono.subscribe(Mono.java:3695)\n\tat reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:172)\n\tat reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56)\n\tat reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:55)\n\tat reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:70)\n\tat reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:55)\n\tat reactor.core.publisher.MonoPeekTerminal.subscribe(MonoPeekTerminal.java:61)\n\tat reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:55)\n\tat reactor.netty.http.server.HttpServerHandle.onStateChange(HttpServerHandle.java:64)\n\tat reactor.netty.tcp.TcpServerBind$ChildObserver.onStateChange(TcpServerBind.java:226)\n\tat reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:434)\n\tat reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:141)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)\n\tat reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:160)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)\n\tat io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)\n\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)\n\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)\n\tat io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)\n\tat io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)\n\tat io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)\n\tat io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799)\n\tat io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:433)\n\tat io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:330)\n\tat io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)\n\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\tat java.lang.Thread.run(Thread.java:748)

在logstash filter中grok 規則進行匹配處理

input {
  kafka {
    id => "test-kafka-input"
    bootstrap_servers => ["192.168.0.250:9092"] # kafka地址
    group_id => "logstash"						# kafka group
    topics => ["test", "filebeat"]				# kafka topics
    codec => json 								# 設定輸入型別為json
  }
}

filter {

#    mutate {
#        gsub => [ "message", "\r", "" ]		# 替換掉換行符
#    }


    grok {
        match => ["message","%{TIMESTAMP_ISO8601:timestamp}\s+%{SYSLOG5424SD:uid}\s+%{SYSLOG5424SD:threadid}\s+%{LOGLEVEL:loglevel}\s+%{JAVACLASS:javaclass}\s+.?\s+%{SYSLOG5424SD}\s+.?\s+%{GREEDYDATA:message}"]	 # 配置正則表示式和標籤匹配日誌
        overwrite => ["message"]				# 將上面%{GREEDYDATA:message} 標籤覆蓋到message上
    }

    date {
        match => [ "timestamp", "yyyy-MM-dd HH:mm:ss,SSS" ] # 配置timestamp 時間格式
        target => "@timestamp"								# 將上面grok正則匹配的標籤timestamp 覆蓋到預設date "@timestamp" 上面,以便kibana中看到列印的最新時間
    }
    
    # 下面這段是為了解決Elasticsearch 預設時間是0時區,不是東八區,所以預設顯示時間比東八區少8個小時,這時我們通過ruby 進行時間格式的修改,增加8個小時,示例如下:
    ruby { 
        code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)" 
    }
 
    ruby {
        code => "event.set('@timestamp',event.get('timestamp'))"
    }

	# 配置要刪除的多餘的一些字串,通過mutate模組進行刪除
    mutate {
        remove_field => ["timestamp","hostname","tags","stream","agent","ecs","input","[kubernetes][container][name]","[kubernetes][labels][pod-template-hash]","[kubernetes][pod][uid]","[kubernetes][replicaset]","@version","[log][offset]"]
    }

    json {
        source => "@fields"
        # 刪除filebeat 自帶的不需要的元資料
        remove_field => [ "beat","@fields","fields","index_name","offset","source","message","time","tags"]
      }



#    json {
#        source => "message" 
#        remove_field => [ "message" ]
#  }

#  multiline {
#    pattern => "^\d{4}-\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2}" 
#    negate => true   
#    what => "previous" 
#    }

}


output {
  elasticsearch {
    hosts => ["http://192.168.0.250:9200"]
    user => logstash_admin
    password => "YHkdypsPKqw5gaWKE"
    index => "game-filebeat-%{+YYYY.MM.dd}"
  }
  
  #file {
  #  path => "/test/bak/test.txt"
  #}
  
}

日誌多行合併處理—multiline外掛(重點)

(1)示例:

① 日誌

2018-03-20 10:44:01,523 [33]DEBUG Debug - task request,task Id:1cbb72f1-a5ea-4e73-957c-6d20e9e12a7a,start time:2018-03-20 10:43:59
-- Request String : {"UserName":"15046699923","Pwd":"ZYjyh727","DeviceType":2,"DeviceId":"PC-20170525SADY","EquipmentNo":null,"SSID":"pc","RegisterPhones":null,"AppKey":"ab09d78e3b2c40b789ddfc81674bc24deac","Version":"2.0.5.3"} -- End
-- Response String : {"ErrorCode":0,"Success":true,"ErrorMsg":null,"Result":null,"WaitInterval":30} -- End

② logstash grok 對合並後多行的處理(合併多行後續都一樣,如下)

filter {
  grok {
    match => {
      "message" => "^%{TIMESTAMP_ISO8601:InsertTime}\ .*-\ task\ request,.*,start\ time:%{TIMESTAMP_ISO8601:RequestTime}\n--\ Request\ String\ :\ \{\"UserName\":\"%{NUMBER:UserName:int}\",\"Pwd\":\"(?<Pwd>.*)\",\"DeviceType\":%{NUMBER:DeviceType:int},\"DeviceId\":\"(?<DeviceId>.*)\",\"EquipmentNo\":(?<EquipmentNo>.*),\"SSID\":(?<SSID>.*),\"RegisterPhones\":(?<RegisterPhones>.*),\"AppKey\":\"(?<AppKey>.*)\",\"Version\":\"(?<Version>.*)\"\}\ --\ \End\n--\ Response\ String\ :\ \{\"ErrorCode\":%{NUMBER:ErrorCode:int},\"Success\":(?<Success>[a-z]*),\"ErrorMsg\":(?<ErrorMsg>.*),\"Result\":(?<Result>.*),\"WaitInterval\":%{NUMBER:WaitInterval:int}\}\ --\ \End"
    }
  }
}

(2)在filebeat中使用multiline 外掛(推薦)

① 介紹multiline

pattern:正則匹配從哪行合併

negate:true/false,匹配到pattern 部分開始合併,還是不配到的合併

match:after/before(需自己理解)

  after:匹配到pattern 部分後合併,注意:這種情況最後一行日誌不會被匹配處理

  before:匹配到pattern 部分前合併(推薦)

② 5.5版本之後(before為例)

filebeat.prospectors:
- input_type: log
  paths:
    - /root/performanceTrace*
  fields:
    type: zidonghualog
  multiline.pattern: '.*\"WaitInterval\":.*--\ End'
  multiline.negate: true
  multiline.match: before

③ 5.5版本之前(after為例)

filebeat.prospectors:
- input_type: log 
     paths:
      - /root/performanceTrace*
      input_type: log 
      multiline:
           pattern: '^20.*'
           negate: true
           match: after

(3)在logstash input中使用multiline 外掛(沒有filebeat 時推薦)

① 介紹multiline

pattern:正則匹配從哪行合併

negate:true/false,匹配到pattern 部分開始合併,還是不配到的合併

what:previous/next(需自己理解)

  previous:相當於filebeat 的after

  next:相當於filebeat 的before

② 用法

input {
        file {
                path => ["/root/logs/log2"]
                start_position => "beginning"
                codec => multiline {
                        pattern => "^20.*"
                        negate => true
                        what => "previous"
                }
        }
}

(4)在logstash filter中使用multiline 外掛(不推薦)

(a)不推薦的原因:

  ① filter設定multiline後,pipline worker會自動將為1

  ② 5.5 版本官方把multiline 去除了,要使用的話需下載,下載命令如下:

  /usr/share/logstash/bin/logstash-plugin install logstash-filter-multiline

(b)示例:

filter {
  multiline {
    pattern => "^20.*"
    negate => true
    what => "previous"
  }
} 

logstash filter 中的date使用

(1) 日誌示例

2018-03-20 10:44:01 [33]DEBUG Debug - task request,task Id:1cbb72f1-a5ea-4e73-957c-6d20e9e12a7a,start time:2018-03-20 10:43:59

(2) date 使用

        date {
                match => ["InsertTime","YYYY-MM-dd HH:mm:ss "]
                remove_field => "InsertTime"
        }

注:

match => ["timestamp" ,"dd/MMM/YYYY H:m:s Z"]

匹配這個欄位,欄位的格式為:日日/月月月/年年年年 時/分/秒 時區

也可以寫為:match => ["timestamp","ISO8601"](推薦)

(3)date 介紹

  就是將匹配日誌中時間的key 替換為@timestamp 的時間,因為@timestamp 的時間是日誌送到logstash 的時間,並不是日誌中真正的時間。

6、對多類日誌分類處理(重點)

① 在filebeat 的配置中新增type 分類

filebeat:
  prospectors:
    - paths:
        - /mnt/data_total/WebApiDebugLog.txt*
      fields:
        type: WebApiDebugLog_total
    - paths:
        - /mnt/data_request/WebApiDebugLog.txt*
      fields:
        type: WebApiDebugLog_request
    - paths:
        - /mnt/data_report/WebApiDebugLog.txt*
      fields:
        type: WebApiDebugLog_report

② 在logstash filter中使用if,可進行對不同類進行不同處理

filter {
   if [fields][type] == "WebApiDebugLog_request" {   #對request 類日誌
        if ([message] =~ "^20.*-\ task\ report,.*,start\ time.*") {   #刪除report 行
                drop {}
        }
    grok {
        match => {"... ..."}
        }
}

③ 在logstash output中使用if

if [fields][type] == "WebApiDebugLog_total" {
    elasticsearch {
        hosts => ["6.6.6.6:9200"]
        index => "logstashl-WebApiDebugLog_total-%{+YYYY.MM.dd}"
        document_type => "WebApiDebugLog_total_logs"
} 

對elk 整體效能的優化

效能分析

(1)伺服器硬體Linux:1cpu 4GRAM

假設每條日誌250 Byte

(2)分析

logstash硬體Linux:1cpu 4GRAM

每秒500條日誌

去掉ruby每秒660條日誌

去掉grok後每秒1000條資料

filebeat硬體Linux:1cpu 4GRAM

每秒2500-3500條資料

每天每臺機器可處理:24h*60min*60sec*3000*250Byte=64,800,000,000Bytes,約64G

③ 瓶頸在logstash 從redis中取資料存入ES,開啟一個logstash,每秒約處理6000條資料;開啟兩個logstash,每秒約處理10000條資料(cpu已基本跑滿);

④ logstash的啟動過程佔用大量系統資源,因為指令碼中要檢查java、ruby以及其他環境變數,啟動後資源佔用會恢復到正常狀態。

關於收集日誌的選擇:logstash/filter

(1)沒有原則要求使用filebeat或logstash,兩者作為shipper的功能是一樣的,區別在於:

logstash由於集成了眾多外掛,如grok,ruby,所以相比beat是重量級的;

② logstash啟動後佔用資源更多,如果硬體資源足夠則無需考慮二者差異;

③ logstash基於JVM,支援跨平臺;而beat使用golang編寫,AIX不支援;

④ AIX 64bit平臺上需要安裝jdk(jre) 1.7 32bit,64bit的不支援;

⑤ filebeat可以直接輸入到ES,但是系統中存在logstash直接輸入到ES的情況,這將造成不同的索引型別造成檢索複雜,最好統一輸入到els 的源。

(2)總結

  logstash/filter 總之各有千秋,但是,我推薦選擇:在每個需要收集的日誌伺服器上配置filebeat,因為輕量級,用於收集日誌;再統一輸出給logstash,做對日誌的處理;最後統一由logstash 輸出給es。中間也開增加kafka訊息佇列進行快取。

logstash的優化相關配置

(1)可以優化的引數,可根據自己的硬體進行優化配置

① pipeline 執行緒數,官方建議是等於CPU核心數

預設配置 ---> pipeline.workers: 2

可優化為 ---> pipeline.workers: CPU核心數(或幾倍cpu核心數)

② 實際output 時的執行緒數

預設配置 ---> pipeline.output.workers: 1

可優化為 ---> pipeline.output.workers: 不超過pipeline 執行緒數

③ 每次傳送的事件數

預設配置 ---> pipeline.batch.size: 125

可優化為 ---> pipeline.batch.size: 1000

④ 傳送延時

預設配置 ---> pipeline.batch.delay: 5

可優化為 ---> pipeline.batch.size: 10

(2)總結

  通過設定-w引數指定pipeline worker數量,也可直接修改配置檔案logstash.yml。這會提高filter和output的執行緒數,如果需要的話,將其設定為cpu核心數的幾倍是安全的,執行緒在I/O上是空閒的。

  預設每個輸出在一個pipeline worker執行緒上活動,可以在輸出output 中設定workers設定,不要將該值設定大於pipeline worker數。

  還可以設定輸出的batch_size數,例如ES輸出與batch size一致。

  filter設定multiline後,pipline worker會自動將為1,如果使用filebeat,建議在beat中就使用multiline,如果使用logstash作為shipper,建議在input 中設定multiline,不要在filter中設定multiline。

(3)Logstash中的JVM配置檔案

  Logstash是一個基於Java開發的程式,需要執行在JVM中,可以通過配置jvm.options來針對JVM進行設定。比如記憶體的最大最小、垃圾清理機制等等。JVM的記憶體分配不能太大不能太小,太大會拖慢作業系統。太小導致無法啟動。預設如下:

-Xms256m  # 最小使用記憶體
-Xmx1g 	  # 最大使用記憶體

引入Redis 的相關問題

(1)filebeat可以直接輸入到logstash(indexer),但logstash沒有儲存功能,如果需要重啟需要先停所有連入的beat,再停logstash,造成運維麻煩;另外如果logstash發生異常則會丟失資料;引入Redis作為資料緩衝池,當logstash異常停止後可以從Redis的客戶端看到資料快取在Redis中;

(2)Redis可以使用list(最長支援4,294,967,295條)或釋出訂閱儲存模式;

(3)redis 做elk 緩衝佇列的優化:

​ ① bind 0.0.0.0 #不要監聽本地埠

​ ② requirepass ilinux.io #加密碼,為了安全執行

​ ③ 只做佇列,沒必要持久儲存,把所有持久化功能關掉:快照(RDB檔案)和追加式檔案(AOF檔案),效能更好

  save "" 禁用快照
  appendonly no 關閉RDB

​ ④ 把記憶體的淘汰策略關掉,把記憶體空間最大

  maxmemory 0 #maxmemory為0的時候表示我們對Redis的記憶體使用沒有限制

elasticsearch 節點優化配置

(1)伺服器硬體配置,OS 引數

(a) /etc/sysctl.conf 配置

vim /etc/sysctl.conf
vm.swappiness = 1                     # ES 推薦將此引數設定為 1,大幅降低 swap 分割槽的大小,強制最大程度的使用記憶體,注意,這裡不要設定為 0, 這會很可能會造成 OOM
net.core.somaxconn = 65535     		# 定義了每個埠最大的監聽佇列的長度
vm.max_map_count= 262144    			# 限制一個程序可以擁有的VMA(虛擬記憶體區域)的數量。虛擬記憶體區域是一個連續的虛擬地址空間區域。當VMA 的數量超過這個值,OOM
fs.file-max = 518144                  # 設定 Linux 核心分配的檔案控制代碼的最大數量
[root@elasticsearch]# sysctl -p 生效一下

(b)limits.conf 配置

vim /etc/security/limits.conf
elasticsearch    soft    nofile          65535
elasticsearch    hard    nofile          65535
elasticsearch    soft    memlock         unlimited
elasticsearch    hard    memlock         unlimited

(c)為了使以上引數永久生效,還要設定兩個地方

vim /etc/pam.d/common-session-noninteractive
vim /etc/pam.d/common-session

新增如下屬性:
session required pam_limits.so
可能需重啟後生效

(2)elasticsearch 中的JVM配置檔案

-Xms2g
-Xmx2g

① 將最小堆大小(Xms)和最大堆大小(Xmx)設定為彼此相等。

② Elasticsearch可用的堆越多,可用於快取的記憶體就越多。但請注意,太多的堆可能會使您長時間垃圾收集暫停。

③ 設定Xmx為不超過物理RAM的50%,以確保有足夠的實體記憶體留給核心檔案系統快取。

④ 不要設定Xmx為JVM用於壓縮物件指標的臨界值以上;確切的截止值有所不同,但接近32 GB。不要超過32G,如果空間大,多跑幾個例項,不要讓一個例項太大記憶體

(3)elasticsearch 配置檔案優化引數

① vim elasticsearch.yml

bootstrap.memory_lock: true  #鎖住記憶體,不使用swap
#快取、執行緒等優化如下
bootstrap.mlockall: true
transport.tcp.compress: true
indices.fielddata.cache.size: 40%
indices.cache.filter.size: 30%
indices.cache.filter.terms.size: 1024mb
threadpool:
    search:
        type: cached
        size: 100
        queue_size: 2000

② 設定環境變數

vim /etc/profile.d/elasticsearch.sh export ES_HEAP_SIZE=2g #Heap Size不超過實體記憶體的一半,且小於32G

(4)叢集的優化(我未使用叢集)

① ES是分散式儲存,當設定同樣的cluster.name後會自動發現並加入叢集;

② 叢集會自動選舉一個master,當master宕機後重新選舉;

③ 為防止"腦裂",叢集中個數最好為奇數個

④ 為有效管理節點,可關閉廣播 discovery.zen.ping.multicast.enabled: false,並設定單播節點組discovery.zen.ping.unicast.hosts: ["ip1", "ip2", "ip3"]

效能的檢查

(1)檢查輸入和輸出的效能

Logstash和其連線的服務執行速度一致,它可以和輸入、輸出的速度一樣快。

(2)檢查系統引數

① CPU

注意CPU是否過載。在Linux/Unix系統中可以使用top -H檢視程序引數以及總計。

如果CPU使用過高,直接跳到檢查JVM堆的章節並檢查Logstash worker設定。

② Memory

注意Logstash是執行在Java虛擬機器中的,所以它只會用到你分配給它的最大記憶體。

檢查其他應用使用大量記憶體的情況,這將造成Logstash使用硬碟swap,這種情況會在應用佔用記憶體超出實體記憶體範圍時。

③ I/O 監控磁碟I/O檢查磁碟飽和度

使用Logstash plugin(例如使用檔案輸出)磁碟會發生飽和。

當發生大量錯誤,Logstash生成大量錯誤日誌時磁碟也會發生飽和。

在Linux中,可使用iostat,dstat或者其他命令監控磁碟I/O

④ 監控網路I/O

當使用大量網路操作的input、output時,會導致網路飽和。

在Linux中可使用dstat或iftop監控網路情況。

(3)檢查JVM heap

heap設定太小會導致CPU使用率過高,這是因為JVM的垃圾回收機制導致的。

一個快速檢查該設定的方法是將heap設定為兩倍大小然後檢測效能改進。不要將heap設定超過實體記憶體大小,保留至少1G記憶體給作業系統和其他程序。

你可以使用類似jmap命令列或VisualVM更加精確的計算JVM heap