1. 程式人生 > >logstash自定義模板輸出到elasticsearch和增加輸出欄位

logstash自定義模板輸出到elasticsearch和增加輸出欄位

index

寫入事件所用的索引。可以動態的使用%{foo}語法,它的預設值是:
"logstash-%{+YYYY.MM.dd}",以天為單位分割的索引,使你可以很容易的刪除老的資料或者搜尋指定時間範圍內的資料。

索引不能包含大寫字母。推薦使用以周為索引的ISO 8601格式,例如logstash-%{+xxxx.ww}

示例:

 index => "tomcat_logs_index_%{+YYYY.MM.dd}"

hosts

是一個數組型別的值

意http協議使用的是http地址,埠是9200,示例:

hosts => ["192.168.102.209:9200", "192.168.102.216:9200"]

document_type

定義es索引的type,一般你應該讓同一種類型的日誌存到同一種type中,比如debug日誌和error日誌存到不同的type中

如果不設定預設type為logs

template

如果你願意,你可以設定指向你自己模板的路徑。如果沒有設定,那麼預設的模板會被使用

template_name

這個配置項用來定義在Elasticsearch中模板的命名

注意刪除舊的模板

curl -XDELETE <http://localhost:9200/_template/OldTemplateName?pretty>

template_overwrite

布林型別 預設為false
設定為true表示如果你有一個自定義的模板叫logstash,那麼將會用你自定義模板覆蓋預設模板logstash

manage_template

布林型別 預設為true
設定為false將關閉logstash自動管理模板功能
比如你定義了一個自定義模板,更加欄位名動態生成欄位,那麼應該設定為false

order引數

ELK Stack 在入門學習過程中,必然會碰到自己修改定製索引對映(mapping)乃至模板(template)的問題。
這時候,不少比較認真看 Logstash 文件的新使用者會通過下面這段配置來制定自己的模板策略:

output {
    elasticsearch {
        host => "127.0.0.1"
        manage_template => true
        template => "/path/to/mytemplate"
        template_name => "myname"
    }
}

然而隨後就發現,自己辛辛苦苦修改出來的模板,通過 curl -XGET 'http://127.0.0.1:9200/_template/myname' 看也確實上傳成功了,但實際新資料索引創建出來,就是沒生效!

這個原因是:Logstash 預設會上傳一個名叫 logstash 的模板到 ES 裡。如果你在使用上面這個配置之前,曾經執行過 Logstash(一般來說都會),那麼 ES 裡就已經存在這麼一個模板了。你可以curl -XGET 'http://127.0.0.1:9200/_template/logstash' 驗證。

這個時候,ES 裡就變成有兩個模板,logstash 和 myname,都匹配 logstash-* 索引名,要求設定一定的對映規則了。

ES 會按照一定的規則來嘗試自動 merge 多個都匹配上了的模板規則,最終運用到索引上

其中要點就是:template 是可以設定 order 引數的!而不寫這個引數,預設的 order 值就是 0。order 值越大,在 merge 規則的時候優先順序越高。

所以,解決這個問題的辦法很簡單:在你自定義的 template 裡,加一行,變成這樣:

{
    "template" : "logstash-*",
    "order" : 1,
    "settings" : { ... },
    "mappings" : { ... }
}

當然,其實如果只從 Logstash 配置角度出發,其實更簡單的辦法是:直接修改原來預設的 logstash 模板,然後模板名稱也不要改,就好了:

output {
    elasticsearch {
        host => "127.0.0.1"
        manage_template => true
        template_overwrite => true
    }
}

為elasticsearch配置模板

在使用logstash收集日誌的時候,我們一般會使用logstash自帶的動態索引模板,雖然無須我們做任何定製操作,就能把我們的日誌資料推送到elasticsearch索引叢集中

但是在我們查詢的時候,就會發現,預設的索引模板常常把我們不需要分詞的欄位,給分詞了,這樣以來,我們的比較重要的聚合統計就不準確了:

所以這時候,就需要我們自定義一些索引模板了

在logstash與elasticsearch整合的時候,總共有如下幾種使用模板的方式:

  1. 使用預設自帶的索引模板 ,大部分的欄位都會分詞,適合開發和時候快速驗證使用

  2. 在logstash收集端自定義配置模板,因為分散在收集機器上,維護比較麻煩

  3. 在elasticsearc服務端自定義配置模板,由elasticsearch負責載入模板,可動態更改,全域性生效,維護比較容易

使用預設自帶的索引模板

ElasticSearch預設自帶了一個名字為”logstash”的模板,預設應用於Logstash寫入資料到ElasticSearch使用

優點:最簡單,無須任何配置

缺點:無法自定義一些配置,例如:分詞方式

在logstash收集端自定義配置模板

使用第二種,適合小規模叢集的日誌收集

需要在logstash的output外掛中使用template指定本機器上的一個模板json路徑, 例如 template => "/tmp/logstash.json"

優點:配置簡單

缺點:因為分散在Logstash Indexer機器上,維護起來比較麻煩

在elasticsearc服務端自定義配置模板

manage_template => false//關閉logstash自動管理模板功能  
template_name => "xxx"//對映模板的名字  

第三種需要在elasticsearch的叢集中的config/templates路徑下配置模板json,在elasticsearch中索引模板可分為兩種

靜態模板

適合索引欄位資料固定的場景,一旦配置完成,不能向裡面加入多餘的欄位,否則會報錯

優點:scheam已知,業務場景明確,不容易出現因欄位隨便對映從而造成元資料撐爆es記憶體,從而導致es叢集全部宕機,維護比較容易,可動態更改,全域性生效。

缺點:欄位數多的情況下配置稍繁瑣

一個靜態索引模板配置例子如下:

{  
  "xxx" : {  
      "template": "xxx-*",  
        "settings": {  
            "index.number_of_shards": 3,  
            "number_of_replicas": 0   
        },  
    "mappings" : {  
      "logs" : {  
        "properties" : {  
          "@timestamp" : { //這是專門給kibana用的一個欄位,時間索引
            "type" : "date",  
            "format" : "dateOptionalTime",  
            "doc_values" : true  
          },  
          "@version" : {  
            "type" : "string",  
            "index" : "not_analyzed",  
            "doc_values" : true      
          },  
          "id" : {  
            "type" : "string",  
            "index" : "not_analyzed"  
          },  
          "name" : {  
            "type" : "string",  
            "index" : "not_analyzed"  
          }
        }  
      }  
    }  
  }  
}  

動態模板

適合欄位數不明確,大量欄位的配置型別相同的場景,多加欄位不會報錯

優點:可動態新增任意欄位,無須改動scheaml,

缺點:如果新增的欄位非常多,有可能造成es叢集宕機

一個動態索引模板配置例子如下:

{  
  "template" : "xxx-*",  
  "settings" : {  
   "index.number_of_shards": 5,  
   "number_of_replicas": 0    
  
},  
  "mappings" : {  
    "_default_" : {  
      "_all" : {"enabled" : true, "omit_norms" : true},  
      "dynamic_templates" : [ {  
        "message_field" : {  
          "match" : "message",  
          "match_mapping_type" : "string",  
          "mapping" : {  
            "type" : "string", "index" : "analyzed", "omit_norms" : true,  
            "fielddata" : { "format" : "disabled" }  
          }  
        }  
      }, {  
        "string_fields" : {  
          "match" : "*",  
          "match_mapping_type" : "string",  
          "mapping" : {  
            "type" : "string", "index" : "not_analyzed", "doc_values" : true  
          }  
        }  
      } ],  
      "properties" : {  
        "@timestamp": { "type": "date" },  
        "@version": { "type": "string", "index": "not_analyzed" }, 
        "geoip"  : {  
          "dynamic": true,  
          "properties" : {  
            "ip": { "type": "ip" },  
            "location" : { "type" : "geo_point" },  
            "latitude" : { "type" : "float" },  
            "longitude" : { "type" : "float" }  
          }  
        }  
      }  
    }  
  }  
}  

只設置message欄位分詞,其他的欄位預設都不分詞

模板結構

  • 通用設定 主要是模板匹配索引的過濾規則,影響該模板對哪些索引生效
  • settings:配置索引的公共引數,比如索引的replicas,以及分片數shards等引數
  • mappings:最重要的一部分,在這部分中配置每個type下的每個field的相關屬性,比如field型別(string,long,date等等),是否分詞,是否在記憶體中快取等等屬性都在這部分配置
  • aliases:索引別名,索引別名可用在索引資料遷移等用途上。

例子:

{
  "logstash" : {
    "order" : 0,
    "template" : "logstash-*",
    "settings" : {
      "index" : {
        "refresh_interval" : "5s"
      }
    },
    "mappings" : {
      "_default_" : {
        "dynamic_templates" : [ {
          "message_field" : {
            "mapping" : {
              "fielddata" : {
                "format" : "disabled"
              },
              "index" : "analyzed",
              "omit_norms" : true,
              "type" : "string"
            },
            "match_mapping_type" : "string",
            "match" : "message"
          }
        }, {
          "string_fields" : {
            "mapping" : {
              "fielddata" : {
                "format" : "disabled"
              },
              "index" : "analyzed",
              "omit_norms" : true,
              "type" : "string",
              "fields" : {
                "raw" : {
                  "ignore_above" : 256,
                  "index" : "not_analyzed",
                  "type" : "string"
                }
              }
            },
            "match_mapping_type" : "string",
            "match" : "*"
          }
        } ],
        "_all" : {
          "omit_norms" : true,
          "enabled" : true
        },
        "properties" : {
          "@timestamp" : {
            "type" : "date"
          },
          "geoip" : {
            "dynamic" : true,
            "properties" : {
              "ip" : {
                "type" : "ip"
              },
              "latitude" : {
                "type" : "float"
              },
              "location" : {
                "type" : "geo_point"
              },
              "longitude" : {
                "type" : "float"
              }
            }
          },
          "@version" : {
            "index" : "not_analyzed",
            "type" : "string"
          }
        }
      }
    },
    "aliases" : { }
  }
}

我們建立一個自定義Template動態模板,這個模板指定匹配所有以”go_logsindex“開始的索引,並且指定允許新增新欄位,匹配所有string型別的新欄位會建立一個raw的巢狀欄位,這個raw巢狀欄位型別也是string,但是是not_analyzed不分詞的(主要用於解決一些analyzed的string欄位無法做統計,但可以使用這個raw巢狀欄位做統計)

{
  "template": "go_logs_index_*",
  "order":0,
  "settings": {
      "index.number_of_replicas": "1",
      "index.number_of_shards": "5",
      "index.refresh_interval" : "10s"
  },
  "mappings": {
    "_default_": {
      "_all": {
        "enabled": false
      },
      "dynamic_templates": [
        {
          "my_template": {
            "match_mapping_type": "string",
            "mapping": {
              "type": "string",
              "fields": {
                "raw": {
                  "type": "string",
                  "index": "not_analyzed"
                }
              }
            }
          }
        }
      ]
    },
    "go": {
      "properties": {
        "timestamp": {
          "type": "string",
          "index": "not_analyzed"
        },
        "msg": {
          "type": "string",
          "analyzer": "ik",
          "search_analyzer": "ik_smart"
        },
        "file": {
          "type": "string",
          "index": "not_analyzed"
        },
        "line": {
          "type": "string",
          "index": "not_analyzed"
        },
        "threadid": {
          "type": "string",
          "index": "not_analyzed"
        },
        "info": {
          "type": "string",
          "index": "not_analyzed"
        },
        "type": {
          "type": "string",
          "index": "not_analyzed"
        },
        "@timestamp": {
          "format": "strict_date_optional_time||epoch_millis",
          "type": "date"
        },
        "@version": {
          "type": "string",
          "index": "not_analyzed"
        }
      }
    }
  }
}

總結

第三種方式統一管理Template最好,推薦使用第三種方式,但是具體問題具體分析。例如場景是Logstash 和ElasticSearch都在一臺伺服器,第二種就比較好

定製索引模板,是搜尋業務中一項比較重要的步驟,需要注意的地方有很多,比如:
1.欄位數固定嗎
2.欄位型別是什麼
3.分不分詞
4.索引不索引
5.儲存不儲存
6.排不排序
7.是否加權

增加輸出欄位:

logstash.conf配置檔案中的內容:

input {                                      #日誌資料輸入來源log4j
    log4j {
        host => "10.104.112.175"
        port => 4561
                type => "simple"
    }
    log4j {
        host => "10.104.112.175"
        port => 4560
                type => "detail"
    }
}
 
filter {                                      #logstash過濾器
    if [type] == "simple" {
        mutate{
                 split => ["message","|"]     #按 | 進行split切割message
                        add_field =>   {
                                "requestId" => "%{[message][0]}"
                        }
                        add_field =>   {
                                "timeCost" => "%{[message][1]}"
                        }
                        add_field =>   {
                                "responseStatus" => "%{[message][2]}"
                        }
						add_field =>   {
                                "channelCode" => "%{[message][3]}"
                        }
						add_field =>   {
                                "transCode" => "%{[message][4]}"
                        }
        }
		mutate {
			convert => ["timeCost", "integer"]  #修改timeCost欄位型別為整型
		}
    } else if [type] == "detail" {
		grok{
			match => {             #將message裡面 TJParam後面的內容,分隔並新增為ES欄位和值
				"message" => ".*TJParam %{PROG:requestId} %{PROG:channelCode} %{PROG:transCode}"
			}
		}
		grok{
			match => { 
				"message" => "(?<temMsg>(.*)(?=TJParam)/?)"  #擷取TJParam之前的字元作為temMsg欄位的值
				remove_field => ["message"]		     #刪除欄位message
			}
		}
		mutate {
			rename => {"temMsg" => "message"}		     #重新命名欄位temMsg為message
		}
    }
}
 
output {#日誌輸出目的地ES庫
 
    elasticsearch {
                action => "index"
        hosts => "10.104.112.175:9200"
        index => "supergwlog--%{+YYYY-MM}"
    }
 
}