1. 程式人生 > >ElasticSearch實戰系列七: Logstash實戰使用-圖文講解

ElasticSearch實戰系列七: Logstash實戰使用-圖文講解

## 前言 在上一篇中我們介紹了[Logstash快速入門](https://www.cnblogs.com/xuwujing/p/13412108.html),本文主要介紹的是ELK日誌系統中的Logstash的實戰使用。實戰使用我打算從以下的幾個場景來進行講解。 ### 時區問題解決方案 在我們使用logstash將採集的資料傳輸到ES中的時候,會發現採集的時間`@timestamp`的時間和我們本地的不一致,這個主要是因為時區的問題導致的,我們在計算時間的時候需要將這個時間增加8小時,但是這樣會很不方便。為了永久解決這個問題,我們可以在logstash中的filter中對該欄位進行轉換,增加8小時。 新增的配置如下: ``` ruby { code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)" } ruby { code => "event.set('@timestamp',event.get('timestamp'))" } mutate { remove_field => ["timestamp"] } ``` 原本示例圖: ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200813173634140.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FhendzeHBjbQ==,size_16,color_FFFFFF,t_70#pic_center) 新增配置之後的示例圖: ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200813174158330.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FhendzeHBjbQ==,size_16,color_FFFFFF,t_70#pic_center) 可以看到新增配置之後`@timestamp`時間已經和本地時間基本一致了。 ### 日誌內容切分 我們在進行採集日誌到ES中的時候,有時需要對日誌內容進行切割。比如得到日誌內容的時間以及日誌級別等等。這時我們就可以通過grok來對日誌內容進行切分,比如將制定好的日誌內容切割為日誌時間、執行緒名稱、日誌級別、類名以及詳細內容等等。我們只需要在logstash的filter中使用grok語法即可完成日誌內容切割。 這裡我們使用JAVA的Logback來制定日誌輸出格式,然後通過日誌的格式編寫grok語法,最後將grok配置新增到logstash的filter中。 **Logback輸出配置:** > |%d{yyyy-MM-dd HH:mm:ss.SSS}|[%thread]|%-5level|%logger{50}|-%msg%n **日誌樣例資料:** > |2020-07-24 17:08:33.159|[Thread-5]|INFO|com.pancm.Application|-測試示例三: All things in their being are good for something. 天生我才必有用3 **grok模式:** > \|%{DATA:log_time}\|%{DATA:thread}\|%{DATA:log_level}\|%{DATA:class_name}\|-%{GREEDYDATA:content} 使用grok分析 ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200813174422136.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FhendzeHBjbQ==,size_16,color_FFFFFF,t_70#pic_center) 可以看到以及分析匹配成功了。 然後我們在filter中新增如下配置: > grok { > match => { "message" =>"\|%{DATA:log_time}\|%{DATA:thread}\|%{DATA:log_level}\|%{DATA:class_name}\|-%{GREEDYDATA:content}" > } > } 最終輸出的日誌到ES的示例圖: ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200813174942382.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FhendzeHBjbQ==,size_16,color_FFFFFF,t_70#pic_center) ### 自定義模板 我們在使用Logstash採集日誌的時候,如果沒有指定索引庫或模板,則會使用ElasticSearch預設自帶的名字為”logstash”的模板,預設應用於Logstash寫入資料到ElasticSearch使用。但是我們希望使用自定義的索引模板,將採集的日誌按照我們自身的想法來寫入,此時我們就需要用到自定義模板了。 主要有兩種方式,一種是在logstash的output外掛中使用template指定本機器上的一個模板json路徑, 例如 `template => "/home/logstash.json"`,json裡面的內容為我們自定的索引mapping,雖然這種方式簡單,但是分散在Logstash機器上,維護起來比較麻煩。還有一種是在elasticsearc服務端自定義配置模板,事先將模板設定好,然後在logstash的output輸出中指定該模板即可,這種方式比較靈活方便,可動態更改,全域性生效。 這裡我們還是通過一個示例來進行說明,我們首先建立一個template_mylog的模板,配置這幾個欄位: log_time、thread、log_level、class_name、content。 語句如下: ``` PUT _template/template_mylog { "index_patterns" : [ "mylog-*" ], "order" : 10, "settings": { "index.number_of_shards": 3, "number_of_replicas": 1 }, "mappings" : { "properties" : { "log_level" : { "type" : "keyword" }, "thread" : { "type" : "keyword" }, "class_name" : { "type" : "keyword" }, "content" : { "type" : "keyword" }, "log_time" : { "type" : "date","format" : "yyyy-MM-dd HH:mm:ss.SSS"} } } } ``` 示例圖: ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200813164346316.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FhendzeHBjbQ==,size_16,color_FFFFFF,t_70#pic_center) 注:上述的配置比其他mapping而言多了兩個新配置,一個是index_patterns,該配置表明自動建立的索引開頭以`mylog-`的索引庫都會採用該模板;而order表示順序級別,在有相同的索引模板中,該值越大,優先順序越高。 建立成功之後,我們只需在output中的新增如下配置即可。 ``` elasticsearch { hosts => ["127.0.0.1:9200"] index => "mylog-%{+YYYY.MM.dd}" } ``` 然後我們啟動logstash進行日誌的採集。 效果圖: ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200813174942382.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FhendzeHBjbQ==,size_16,color_FFFFFF,t_70#pic_center) ### 寫入多個索引庫 我們在使用logstash採集日誌的時候,有時有多種不同的日誌並且需要採集到不同的索引庫中,這時我們就可以通過標記來進行寫入。比如採集/home/logs目錄下的日誌我定義一個標記為java,採集/home/logs2目錄下的日誌我定義一個標記為java2,那麼在寫入ElasticSearch的時候只需要根據該標記區分寫入即可。 **logstash input配置示例:** ``` file { path => ["/home/logs/mylog-2020-08-13.0.txt"] type => "java" start_position => "beginning" sincedb_path => "/dev/null" } file { path => ["/home/logs2/*.txt"] type => "java2" start_position => "beginning" sincedb_path => "/dev/null" } ``` **logstash output配置示例:** ``` if [type] == "java"{ elasticsearch { hosts => ["127.0.0.1:9200"] index => "mylog-%{+YYYY.MM.dd}" } } if [type] == "java2"{ elasticsearch { hosts => ["127.0.0.1:9200"] index => "mylog-%{+YYYY.MM}" } } ``` 示例圖在**多行內容合併**場景中。 ### 多行內容合併 我們在採集日誌的時候,經常會遇到異常日誌,並且異常日誌並非為一行內容,如果我們按照原有的方式採集,在ElasticSearch中顯示的是一行一行的內容,這樣的話我們排查問題會很頭疼。幸好Logstash中支援多行日誌合併,使用multiline.pattern、multiline.negate和multiline.what來實現配置實現。 下面的配置中,我們通過制定匹配規則將以空格開頭的所有行合併到上一行,並把以Caused by開頭的也追加到上一行。 在Logstash的input配置中新增如下配置: ``` codec => multiline { pattern => "\s*\[" negate => "true" what => "previous" } ``` **異常日誌:** ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200813175244616.png#pic_center) 原異常日誌在ElasticSearch中示例圖: ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200813175733878.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FhendzeHBjbQ==,size_16,color_FFFFFF,t_70#pic_center) 多行合併之後的效果圖: ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200813180752587.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FhendzeHBjbQ==,size_16,color_FFFFFF,t_70#pic_center) ### 完整配置 **logstash-test.conf 配置** ``` input{ file { path => ["/home/logs/mylog-2020-08-13.0.txt"] type => "java" start_position => "beginning" sincedb_path => "/dev/null" } file { path => ["/home/logs2/*.txt"] type => "java2" codec => multiline { pattern => "\s*\[" negate => "true" what => "previous" } start_position => "beginning" sincedb_path => "/dev/null" } } filter { grok { match => { "message" =>"\|%{DATA:log_time}\|%{DATA:thread}\|%{DATA:log_level}\|%{DATA:class_name}\|-%{GREEDYDATA:content}" } } ruby { code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)" } ruby { code => "event.set('@timestamp',event.get('timestamp'))" } mutate { remove_field => ["timestamp"] } } output { stdout { codec => rubydebug } if [type] == "java"{ elasticsearch { hosts => ["127.0.0.1:9200"] index => "mylog-%{+YYYY.MM.dd}" } } if [type] == "java2"{ elasticsearch { hosts => ["127.0.0.1:9200"] index => "mylog-%{+YYYY.MM}" } } } ``` ### 異常問題解決方案 1.logstash: Could not execute action: PipelineAction::Create
, action_result: false 解決辦法: 斜杆採用“/” 2, logstash: object mapping for [host] tried to parse field [host] as object, but found a concrete value 解決辦法: 在filter裡面新增如下配置: ``` mutate { rename => { "host" => "host.name" } } ``` ## 其它 [ElasticSearch實戰系列](https://www.cnblogs.com/xuwujing/tag/elasticsearch/): - [ElasticSearch實戰系列一: ElasticSearch叢集+Kinaba安裝教程](https://www.cnblogs.com/xuwujing/p/11385255.html) - [ElasticSearch實戰系列二: ElasticSearch的DSL語句使用教程---圖文詳解](https://www.cnblogs.com/xuwujing/p/11567053.html) - [ElasticSearch實戰系列三: ElasticSearch的JAVA API使用教程](https://www.cnblogs.com/xuwujing/p/11645630.html) - [ElasticSearch實戰系列四: ElasticSearch理論知識介紹](https://www.cnblogs.com/xuwujing/p/12093933.html) - [ElasticSearch實戰系列五: ElasticSearch的聚合查詢基礎使用教程之度量(Metric)聚合](https://www.cnblogs.com/xuwujing/p/12385903.html) - [ElasticSearch實戰系列六: Logstash快速入門](https://www.cnblogs.com/xuwujing/p/13412108.html) ### 音樂推薦 原創不易,如果感覺不錯,希望給個推薦!您的支援是我寫作的最大動力! 版權宣告: 作者:虛無境 部落格園出處:http://www.cnblogs.com/xuwujing CSDN出處:http://blog.csdn.net/qazwsxpcm     個人部落格出處:http://www.panchengm