1. 程式人生 > 其它 >Logstash日誌解析的Grok模式詳解

Logstash日誌解析的Grok模式詳解

目錄

1.Grok filter入門

有效分析和查詢送入Elastic Stack的資料的能力取決於資訊的可讀性。 這意味著,當將非結構化資料攝取到系統中時,必須將其轉換為結構化資料。

通常,這個至關重要的任務留給Logstash(儘管還有其他日誌傳送器可用,比如 Fluentd)。無論你定義什麼資料來源,都必須提取日誌並執行一些魔力來美化它們,以確保在將它們輸出到Elasticsearch之前正確地對其進行了解析。

Logstash中的資料操作是使用過濾器外掛執行的。 本文重點介紹最流行和有用的過濾器外掛之一 ——Logstash grok過濾器,該過濾器用於將非結構化資料解析為結構化資料。

1.1 什麼是 Grok?

最初的術語實際上是很新的由羅伯特·A·海因萊因(Robert A. Heinlein)在他的1961年的《陌生的土地上的陌生人》一書中創造的,指的是理解某種東西,使人們真正沉浸其中。 這是 grok 語言和 Logstash grok 外掛的合適名稱,它們可以以一種格式修改資訊並將其浸入另一種格式(特別是 JSON)。 已經有數百種用於記錄的 Grok 模式。

1.2 Grok 是如何工作的?

簡而言之,grok是一種將行與正則表示式匹配,將行的特定部分對映到專用欄位中以及根據此對映執行操作的方法。

內建了超過 200 種 Logstash 模式 ,用於過濾 AWS,Bacula,Bro,Linux-Syslog 等中的單詞,數字和日期等專案。 如果找不到所需的模式,則可以編寫自己的自定義模式。 還有多個匹配模式的選項,可簡化表示式的編寫以捕獲日誌資料。

這是 Logstash grok 過濾器的基本語法格式:

%{PATTERN:FieldName}

這將匹配預定義的模式,並將其對映到特定的標識欄位。 由於grok本質上是基於正則表示式的組合,因此你也可以建立自己的基於正則表示式的 grok 過濾器。 例如:

(?\d\d-\d\d-\d\d)

這將使 22-22-22(或任何其他數字)的正則表示式與欄位名稱匹配。

1.3 一個 Logstash Grok例子

這個 grok 除錯工具是開始構建 grok 過濾器的好方法:Grok Debugger。你也可以在 Kibana 中找到 grok 的除錯工具:

使用此工具,你可以貼上日誌訊息並逐步構建 grok 模式,同時連續測試編譯。 通常,我建議從 %{GREEDYDATA:message} 模式開始,然後逐步新增越來越多的模式。

在上面的示例中,我將從以下內容開始:

%{GREEDYDATA:message}

比如針對這條資訊:

83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"

我們可以看到整個資訊都賦予給 message 了。

我們再接著嘗試如下的例子:

%{IPORHOST:clientip} - - \[%{HTTPDATE:timestamp}\] %{GREEDYDATA:message}

在上面,我們可以看到 clientip, message及timestamp。

1.4 常見的例子

以下是一些示例,可幫助你熟悉如何構造grok過濾器:

1.4.1 Syslog

用 Grok 解析 syslog 訊息是新使用者更普遍的需求之一。 syslog 還有幾種不同的日誌格式,因此請牢記編寫自己的自定義 grok 模式。 這是常見系統日誌解析的一個示例:

May  4 00:10:36 liuxg xpcproxy[69746]: libcoreservices: _dirhelper_userdir: 557: bootstrap_look_up returned (ipc/send) invalid destination port

grok解析為:

%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?:%{GREEDYDATA:syslog_message}

在logstash中,我們應該使用如下的grok filter:

grok {
   match => { 
    "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp}
    %{SYSLOGHOST:syslog_hostname}
    %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?:
    %{GREEDYDATA:syslog_message}" 
  }
}

1.4.2 Apache access logs

80.135.37.131 - - [11/Sep/2019:23:56:45 +0000] "GET /item/giftcards/4852 HTTP/1.1" 200 91 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.6; rv:9.0.1) Gecko/20100101 Firefox/9.0.1"

grok解析為:

%{COMBINEDAPACHELOG}

在logstash中,我們應該使用如下的grok filter:

grok  {
      match => { "message" => "%{COMBINEDAPACHELOG}" }
}

1.4.3 Elasticsearch

[2017-09-10T12:07:26,683][WARN ][index.indexing.slowlog.index] [GOgO9TD] [testindex-slowlogs/yNbyYk1ARSW_hd0YRh6J0A] took[142.3micros], took_millis[0], type[product], id[105], routing[] , source[{"price":9925,"name":"Nariko"}]

grok解析為:

\[%{TIMESTAMP_ISO8601:timestamp}\]\[%{DATA:loglevel}%{SPACE}\]\[%{DATA:source}\]%{SPACE}\[%{DATA:node}\]%{SPACE}\[%{DATA:index}\] %{NOTSPACE} \[%{DATA:updated-type}\]

\[%{TIMESTAMP_ISO8601:timestamp}\]\[%{DATA:loglevel}%{SPACE}\]\[%{DATA:source}\]%{SPACE}\[%{DATA:node}\] (\[%{NOTSPACE:index}\]\[%{NUMBER:shards}\])?%{GREEDYDATA} 

在logstash中,我們應該使用如下的grok filter:

grok {
      match => ["message", "\[%{TIMESTAMP_ISO8601:timestamp}\]\[%{DATA:loglevel}%{SPACE}\]\[%{DATA:source}%{SPACE}\]%{SPACE}\[%{DATA:node}\]%{SPACE}\[%{DATA:index}\] %{NOTSPACE} \[%{DATA:updated-type}\]",
                "message", "\[%{TIMESTAMP_ISO8601:timestamp}\]\[%{DATA:loglevel}%{SPACE}\]\[%{DATA:source}%{SPACE}\]%{SPACE}\[%{DATA:node}\] (\[%{NOTSPACE:Index}\]\[%{NUMBER:shards}\])?%{GREEDYDATA}"
      ]
   }

上面有兩種匹配的方法,只要其中的一種可以匹配就可以了。

1.4.4 Redis

grok {
       match => ["redistimestamp", "\[%{MONTHDAY} %{MONTH} %{TIME}]",
                ["redislog", "\[%{POSINT:pid}\] %{REDISTIMESTAMP:timestamp}"],
                ["redismonlog", "\[%{NUMBER:timestamp} \[%{INT:database} %{IP:client}:%{NUMBER:port}\] "%{WORD:command}"\s?%{GREEDYDATA:params}"]
      ]
    }

1.4.5 MogoDB

MONGO_LOG %{SYSLOGTIMESTAMP:timestamp} \[%{WORD:component}\] %{GREEDYDATA:message}MONGO_QUERY \{ (?<={ ).*(?= } ntoreturn:) \}MONGO_SLOWQUERY %{WORD} %{MONGO_WORDDASH:database}\.%{MONGO_WORDDASH:collection} %{WORD}: %{MONGO_QUERY:query} %{WORD}:%{NONNEGINT:ntoreturn} %{WORD}:%{NONNEGINT:ntoskip} %{WORD}:%{NONNEGINT:nscanned}.*nreturned:%{NONNEGINT:nreturned}..+ (?`<duration>`[0-9]+)msMONGO_WORDDASH \b[\w-]+\bMONGO3_SEVERITY \wMONGO3_COMPONENT %{WORD}|-MONGO3_LOG %{TIMESTAMP_ISO8601:timestamp} %{MONGO3_SEVERITY:severity} %{MONGO3_COMPONENT:component}%{SPACE}(?:\[%{DATA:context}\])? %{GREEDYDATA:message}

1.4.6 AWS

ELB_ACCESS_LOG %{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE:elb} %{IP:clientip}:%{INT:clientport:int} (?:(%{IP:backendip}:?:%{INT:backendport:int})|-) %{NUMBER:request_processing_time:float} %{NUMBER:backend_processing_time:float} %{NUMBER:response_processing_time:float} %{INT:response:int} %{INT:backend_response:int} %{INT:received_bytes:int} %{INT:bytes:int} "%{ELB_REQUEST_LINE}"CLOUDFRONT_ACCESS_LOG (?`<timestamp>`%{YEAR}-%{MONTHNUM}-%{MONTHDAY}\t%{TIME})\t%{WORD:x_edge_location}\t(?:%{NUMBER:sc_bytes:int}|-)\t%{IPORHOST:clientip}\t%{WORD:cs_method}\t%{HOSTNAME:cs_host}\t%{NOTSPACE:cs_uri_stem}\t%{NUMBER:sc_status:int}\t%{GREEDYDATA:referrer}\t%{GREEDYDATA:agent}\t%{GREEDYDATA:cs_uri_query}\t%{GREEDYDATA:cookies}\t%{WORD:x_edge_result_type}\t%{NOTSPACE:x_edge_request_id}\t%{HOSTNAME:x_host_header}\t%{URIPROTO:cs_protocol}\t%{INT:cs_bytes:int}\t%{GREEDYDATA:time_taken:float}\t%{GREEDYDATA:x_forwarded_for}\t%{GREEDYDATA:ssl_protocol}\t%{GREEDYDATA:ssl_cipher}\t%{GREEDYDATA:x_edge_response_result_type}

1.4.7 總結

Logstash grok 只是一種過濾器,可以在將日誌轉發到 Elasticsearch 之前將其應用於你的日誌。 因為它在日誌管道中扮演著如此重要的角色,所以 grok 也是最常用的過濾器之一。

以下是一些有用的資源列表,可以幫助你深入瞭解:

http://grokdebug.herokuapp.com ——這是在日誌上構建和測試 grok 過濾器的有用工具
http://grokconstructor.appspot.com/ —— 另一個 grok builder/tester
https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns —— Logstash 支援的模式列表

1.5 總結

Logstash grok 只是在將日誌轉發到 Elasticsearch 之前可以應用於你的日誌的一種過濾器。 由於 grok 在測井管道中起著至關重要的作用,因此它也是最常用的過濾器之一。

2. 日誌解析的Grok模式示例

如果沒有日誌解析,搜尋和視覺化日誌幾乎是不可能的。 解析結構化您的傳入(非結構化)日誌,以便使用者可以在調查期間或設定儀表板時搜尋清晰的欄位和值。

最流行的日誌解析語言是Grok。 可以使用Grok外掛在各種日誌管理和分析工具中解析日誌資料。

但是用 Grok 解析日誌可能會很棘手。 本文將研究一些 Grok 模式示例,這些示例可以幫助你瞭解如何解析日誌資料。

2.1 什麼是 grok?

最初的術語實際上是相當新的——由 Robert A. Heinlein 在他 1961 年的《陌生土地上的陌生人》一書中創造 ——它指的是理解某事到一個人真正沉浸在其中的水平。 它是 grok 語言和 Logstash grok 外掛的合適名稱,它以一種格式修改資訊並將其浸入另一種格式(特別是 JSON)中。 已經有幾百種可被日誌解析所使用的 Grok 模式。

2.2 Grok 是如何工作的?

簡而言之,grok 是一種將行與正則表示式匹配、將行的特定部分對映到專用欄位並基於此對映執行操作的方法。

內建有 200 多種 Logstash 模式,用於過濾 AWS、Bacula、Bro、Linux-Syslog 等中的單詞、數字和日期等專案。 如果找不到你需要的模式,你可以編寫自己的自定義模式。 還有多個匹配模式的選項,這簡化了表示式的編寫以捕獲日誌資料。

以下是 Logstash grok 過濾器的基本語法格式:

%{SYNTAX:SEMANTIC}

SYNTAX 將指定每個日誌文字中的模式。 SEMANTIC 將是你在已解析日誌中實際給出該語法的識別標記。 換一種說法:

%{PATTERN:FieldName}

這將匹配預定義的模式並將其對映到特定的識別字段。

例如,像 127.0.0.1 這樣的模式將匹配 Grok IP 模式,通常是 IPv4 模式。

Grok 具有獨立的 IPv4 和 IPv6 模式,但它們可以與語法 IP 一起過濾。

該標準模式如下:

IPV4 (?<![0-9])(?:(?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2}))(?![0-9])

假裝沒有統一的 IP 語法,你可以簡單地用相同的語義欄位名稱來理解兩者:

%{IPv4:Client IP} %{IPv6:Client IP}
同樣,只需使用 IP 語法,除非出於任何原因要將這些各自的地址分隔到單獨的欄位中。

由於 grok 本質上是基於正則表示式的組合,因此你還可以使用以下模式建立自己的自定義基於正則表示式的 grok 過濾器:

(?<custom_field>custom pattern)
例如:

(?\d\d-\d\d-\d\d)
此 grok 模式會將 22-22-22(或任何其他數字)的正則表示式與欄位名稱匹配。

2.3 Logstash Grok 模式例子

為了演示如何開始使用 grok,我將使用以下應用程式日誌:

2016-07-11T23:56:42.000+00:00 INFO [MySecretApp.com.Transaction.Manager]:Starting transaction for session -464410bf-37bf-475a-afc0-498e0199f008

我想用 grok 過濾器實現的目標是將日誌線分解為以下欄位:timestamp、log level、class,然後是 message 的其餘部分。

%{TIMESTAMP_ISO8601:timestamp} %{WORD:log-level} \[%{DATA:class}\]:%{GREEDYDATA:message}

以下 grok 模式將完成這項工作:

grok {
   match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:log-level} \[%{DATA:class}\]:%{GREEDYDATA:message}" }
 }

在開發的過程中,很多開發者自己不知道如何測試。在 Kibana 中,它含有一個 grok debugger:

如上所示,我們使用 %{GREEDYDATA:message} 把所有的資訊都對映到 message 欄位中。如果熟悉英文的開發者,greedy 也就是貪婪的意思。它可以把剩餘的所有內容都對映為該欄位。在kibana的grok debugger介面可以逐步除錯grok。

2.4 Grok 資料型別轉換

預設情況下,所有 SEMANTIC 條目都是字串,但你可以使用簡單的公式翻轉資料型別。 以下 Logstash grok 示例將標識為語義 num 的任何語法 NUMBER 轉換為語義浮點數 float:

%{NUMBER:num:float}
這是一個非常有用的工具,儘管它目前僅可用於轉換為 float 或整數 int。

2.5 _grokparsefailure

這將嘗試將傳入日誌與給定的 grok 模式匹配。 如果匹配,日誌將根據過濾器中定義的 grok 模式分解為指定的欄位。 如果不匹配,Logstash 將新增一個名為 _grokparsefailure的標籤。

2.6 操作資料

在匹配的基礎上,你可以定義額外的 Logstash grok 配置來操作資料。 例如,你可以讓 Logstash 1) 新增欄位,2) 覆蓋欄位,或 3) 刪除欄位。

grok {
   match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:log-level} \[%{DATA:class}\]:%{GREEDYDATA:message}" }
   overwrite => ["message"]
   add_tag =>  [ "My_Secret_Tag" ]
}

在我們的例子中,我們使用 “overwrite” 操作來覆蓋 “message” 欄位。 這樣,我們的 “message” 欄位將不會與我們定義的其他欄位(timestamp、log-level 和 class)一起出現。 此外,我們正在使用 “add_tag” 操作將自定義標籤欄位新增到日誌中。

此處提供了可用於操作日誌的可用操作的完整列表,以及它們的輸入型別和預設值。

2.7 完整示例

讓我們從一個示例非結構化日誌訊息開始,然後我們將使用 Grok 模式對其進行結構化:

128.39.24.23 - - [25/Dec/2021:12:16:50 +0000] "GET /category/electronics HTTP/1.1" 200 61 "/category/finance" "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)"

想象一下搜尋數百萬條看起來像這樣的日誌行! 看起來很可怕。 這就是為什麼我們有像 Grok 這樣的解析語言——讓資料更易於閱讀和搜尋。

檢視該資料並使用 Kibana 等分析工具進行搜尋的一種更簡單的方法是將其分解為具有值的欄位,如下表所示:

1.ip:128.39.24.23
2.timestamp:25/Dec/2021:12:16:50 +0000
3.verb:GET
4.request:/category/electronics HTTP/1.1
5.status: 200
6.bytes: 61
7.referrer:/category/finance
8.os: Windows

讓我們使用一個示例 Grok 模式來生成這些欄位。 以下部分將展示 Grok 模式語法以生成上述每個欄位。

對於每個部分,我們的 Grok 模式都會擴充套件,因為它包含更多要解析的欄位。 這些模式是 Grok 可以閱讀的正則表示式——我們可以使用它們來表達我們的資訊。

下面是一些有用的連結,可以幫助你開始使用一些 Grok 模式。 但我們將在部落格的其餘部分提供更多示例。

Grok pattern
Grok Debugger
在 Grok pattern 頁面,我們可以找到我們需要的預定義好的 pattern:

從某種意義上講,這些 pattern 在本質上就是基於正則表示式的組合。也可以建立自己定義的 pattern。

讓我們開始構建 Grok 模式來結構化資料。

我們使用以下方式描述模式的開始: ^
語法如下: %{pattern:Name_of_the_field}

請注意:不建議使用空格來描述欄位的名稱。

2.7.1 提取 IP

假設我們要提取 IP,我們可以使用 IP 方法:

^%{IP:ip}

然後我們有這些: – –

為了告訴 Grok 忽略它們,我們只需將它們新增到我們的模式中。

^%{IP:ip} - -

這個模式給了我們這個欄位:

ip:128.39.24.23

2.7.2 提取 timestamps

在我們非結構化日誌訊息的下一部分中,我們將時間戳 “含” 在一個數組中:

[25/Dec/2021:12:16:50 +0000]
要提取它,我們需要使用正則表示式和 HTTPDATE 方法,同時在外面新增括號,以便 Grok 知道忽略它們:

\[%{HTTPDATE:timestamp}\]

基於我們之前的模式,我們現在有:

^%{IP:ip} - - \[%{HTTPDATE:timestamp}\]

這給了我們:

"ip": "128.39.24.23",
"timestamp": "25/Dec/2021:12:16:50 +0000"

回到我們最初的非結構化訊息,看起來我們有一個空間,時間戳結束,“GET 開始”。 我們還需要告訴 Grok 忽略空格。 為此,我們只需按鍵盤上的空格鍵,或者我們可以使用 %{SPACE} -> 直到 4 個空格。

2.7.3 提取 verbs

是時候提取 GET 欄位了。 首先,我們需要告訴 Grok 忽略引號 - 然後使用 WORD 方法,我們將通過編寫來做到這一點:

"%{WORD:verb}

所以,現在我們的模式是:

^%{IP:ip} - - \[%{HTTPDATE:timestamp}\] "%{WORD:verb}

這給了我們這些欄位:

"ip": "128.39.24.23",
"verb": "GET",
"timestamp": "25/Dec/2021:12:16:50 +0000"

2.7.4 提取 request

為了提取請求 -> /category/electronics HTTP/1.1",我們需要使用 DATA 方法,它本質上是正則表示式中的萬用字元。

這意味著我們需要新增一個句號來提取此資訊,以告訴 DATA 方法在哪裡停止——否則,它不會捕獲任何資料。 我們可以使用引號作為停止標記:

%{DATA:request}"

現在,我們有以下 Grok 模式:

^%{IP:ip} - - \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request}"

這給了我們這些欄位:

"request": "/category/electronics HTTP/1.1",
"ip": "128.39.24.23",
"verb": "GET",
"timestamp": "25/Dec/2021:12:16:50 +0000"

2.7.5 提取 status

接下來是狀態,但我們在請求結束和狀態之間再次有一個空格,我們可以新增一個空格或 %{SPACE}。 為了提取數字,我們使用 NUMBER 方法。

%{NUMBER:status}

現在我們的模式擴充套件到:

^%{IP:ip} - - \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request}" %{NUMBER:status}

這給了我們如下的欄位:

"request": "/category/electronics HTTP/1.1",
"ip": "128.39.24.23",
"verb": "GET",
"timestamp": "25/Dec/2021:12:16:50 +0000",
"status": "200"

2.7.6 提取 Bytes

為了提取位元組,我們需要再次使用NUMBER方法,但在此之前,我們需要使用常規空格或 %{SPACE}

^%{IP:ip} - - \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request}" %{NUMBER:status} %{NUMBER:bytes}

這給了我們如下的欄位:

"request": "/category/electronics HTTP/1.1",
"bytes": "61",
"ip": "128.39.24.23",
"verb": "GET",
"timestamp": "25/Dec/2021:12:16:50 +0000",
"status": "200"

2.7.7 提取 referrer

要提取"referrer",我們需要使用常規空格或 %{SPACE} -> " 所以 Grok 會忽略它:

%{DATA:referrer}"

如你所見,我們新增 DATA 直到它遇到 " 字元:

這給了我們如下的欄位:

"request": "/category/electronics HTTP/1.1",
"referrer": "",
"bytes": "61",
"ip": "128.39.24.23",
"verb": "GET",
"timestamp": "25/Dec/2021:12:16:50 +0000",
"status": "200"

2.7.8 忽略資料及提取 os

現在我們想忽略這個資料 ->" "Mozilla/5.0 (compatible; MSIE 9.0; 為此,我們將使用 "Mozilla/5.0"的 DATA 方法,但不寫入該欄位將忽略它。

然後我們將使用不帶冒號或欄位名的 WORD 方法忽略 (compatible;

最後,我們將使用 DATA 忽略 MSIE 9.0;

這給我們留下了以下模式來忽略該資料

%{DATA}\(%{WORD};%{DATA};

為了進一步解釋這種模式......

( -> 停止直到它到達 (
; -> 停止直到到達 ;

請注意上面的模式中的 ( 字元。第一個 DATA 遇到 ( 即停止,也就是第一個資料。第二個 DATA 停止直到遇到 ; 符號。同樣地,第三個 DATA 停止直到遇到 ;

現在我們可以使用 WORD 方法提取 os -> %{WORD:os}。就是這樣! 現在我們剩下以下 Grok 模式來構建我們的資料。

^%{IP:ip} - - \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request}" %{NUMBER:status} %{NUMBER:bytes} %{DATA:referrer}" %{DATA}\(%{WORD};%{DATA}; %{WORD:os}

這為我們提供了這些整潔的欄位,我們可以使用這些欄位更輕鬆地搜尋和視覺化我們的日誌資料:

"request": "/category/electronics HTTP/1.1",
"referrer": "\"/category/finance",
"os": "Windows",
"bytes": "61",
"ip": "128.39.24.23",
"verb": "GET",
"timestamp": "25/Dec/2021:12:16:50 +0000",
"status": "200"

至此,我們已經完成了對整個日誌資訊的結構化分析。