1. 程式人生 > >格式化日誌提取模擬寫入Elasticsearch

格式化日誌提取模擬寫入Elasticsearch

1.目標

任務場景和目標:

  • 已有服務的格式化日誌.
  • 利用Ingest Pipeline提取
  • 通過Simulate Pipeline API模擬寫入Elasticesearch

目的是對Pipeline檔案進行驗證。

日誌格式如下:

行號|時間戳|程序ID|執行緒ID|日誌級別|訊息內容

示例:

2|2018-11-28,10:50:06.792978|6719|140737353873600|WARN|***DKDD

2.步驟

操作步驟如下:

  • 建立Ingest Pipeline檔案
  • 提交(put)到Elasticsearch
  • 建立日誌文件
  • 驗證結果

2.1建立pipeline檔案

儲存以下內容為檔案,如/home/liujg/dev/crush-backend-cpp/crush/gateway/bin/Debug/pipeline.json

{
    "description": "test-pipeline",
    "processors": [{
        "grok": {
            "field": "message",
            "patterns": ["%{NUMBER:lineno}\\|%{MY_TIMESTAMP:my_timestamp}\\|%{PID:pid}\\|%{TID:tid}\\|%{LOGLEVEL:log_level}\\|%{GREEDYDATA:message}"],
            "pattern_definitions": {
                "DATE_ZH": "%{YEAR}-%{MONTHNUM2}-%{MONTHDAY}",
                "TIME_MS": "%{TIME}.\\d{6}",
                "MY_TIMESTAMP": "%{DATE_ZH},%{TIME_MS}",
                "PID": "%{NUMBER}",
                "TID":"%{NUMBER}"
            }
        }
    }]
}

patterns:日誌匹配模式

pattern_definitions: 自定義模式. DATE_ZH為"yyyy-MM-dd"格式的日期,TIMES_MS為時間格式.

 

2.2提交pipeline

curl -H'Content-Type: application/json' -XPUT 'http://localhost:9200/_ingest/pipeline/test-pipeline' [email protected]/home/liujg/dev/crush-backend-cpp/crush/gateway/bin/Debug/pipeline.json

http://localhost:9200

為Elasticsearch主機埠.

test-pipeline為建立的Pipeline名稱.

[email protected]後面為pipeline檔名稱.

 

2.3建立文件

建立上述日誌內容的文件.

curl -H'Content-Type: application/json' -XPOST 'http://localhost:9200/_ingest/pipeline/test-pipeline/_simulate' -d'
{
	"docs": [{
		"_index": "my-test-log",
		"_type": "log",
		"_id": "AVpsUYR_du9kwoEnKsSA",
		"_score": 1,
		"_source": {
			"@timestamp": "2017-03-31T18:22:25.981Z",
			"beat": {
				"hostname": "my think",
				"name": "RestReviews",
				"version": "5.1.1"
			},
			"input_type": "log",
			"message": "2|2018-11-28,10:50:06.792978|6719|140737353873600|WARN|***DKDD",
			"offset": 3,
			"source": "/home/liujg/dev/crush-backend-cpp/crush/gateway/bin/Debug/1.log",
			"tags": [
				"debug",
				"reviews"
			],
			"type": "log"
		}
	}]
}'

寫入的索引名稱為my-test-log.

message與pipeline的field對應,內容為日誌資訊.

 

2.4驗證結果

返回內容如下:

{
    "docs": [{
        "doc": {
            "_index": "my-test-log",
            "_type": "log",
            "_id": "AVpsUYR_du9kwoEnKsSA",
            "_source": {
                "offset": 3,
                "my_timestamp": "2018-11-28,10:50:06.792978",
                "input_type": "log",
                "log_level": "WARN",
                "pid": "6719",
                "source": "/home/liujg/dev/crush-backend-cpp/crush/gateway/bin/Debug/1.log",
                "message": "***DKDD",
                "type": "log",
                "tid": "140737353873600",
                "tags": ["debug", "reviews"],
                "@timestamp": "2017-03-31T18:22:25.981Z",
                "lineno": "2",
                "beat": {
                    "name": "RestReviews",
                    "version": "5.1.1",
                    "hostname": "my think"
                }
            },
            "_ingest": {
                "timestamp": "2018-12-04T09:24:27.236Z"
            }
        }
    }]
}

提取出的結構化資料有:

  • lineno:行號
  • my_timestamp:時間戳
  • pid:程序id
  • tid:執行緒id
  • log_level:日誌級別
  • message:日誌正文


3.資料

Parsing csv files with Filebeat and Elasticsearch Ingest Pipelines
https://www.objectrocket.com/blog/how-to/elasticsearch-ingest-csv/

grok模式

https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns