logstash從csv檔案匯入資料到elasticsearch
阿新 • • 發佈:2018-12-14
logstash的安裝部署自行百度
注意:要和es的版本一致,這裡使用的都是5.5.1版本
一、在logstash的bin目錄下建立logstash.conf檔案:
input { file { path => ["C:\Users\Desktop\test.csv"] start_position => "beginning" } } filter { csv { separator => "," columns => ["name","age"] } mutate { convert => { "name" => "string" "age" => "integer" } } } output { elasticsearch { hosts => ["127.0.0.1:9200"] index => "test2" document_type => "test2" } }
其中:
input
input元件負責讀取資料,使用以下外掛讀取不同的資料型別。
file外掛讀取本地文字檔案,
stdin外掛讀取標準輸入資料,
tcp外掛讀取網路資料,
log4j外掛讀取log4j傳送過來的資料等等。
path:csv檔案路徑
start_position:可以設定為beginning或者end,beginning表示從頭開始讀取檔案,end表示讀取最新的,這個也要和ignore_older一起使用。
filter
filter外掛負責過濾解析input讀取的資料
讀取csv檔案:
separator:拆分符
columns:csv檔案中的欄位,注意:要和 csv檔案中欄位順序一致
output
hosts:主機ip
index:設定es中的索引名稱
document_type:索引下的type名稱
對於csv檔案需要注意一下幾點:
1、第一行不需要儲存欄位名稱,直接就是欄位值資訊
2、最後一行要換行
csv檔案示例:
二、在logstash的bin目錄下執行 logstash -f logstash.conf,出現如下資訊則表示執行成功:
E:\softwareInstallDirecory\logstash\logstash-5.5.1\bin>logstash -f logstash.conf ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console. Sending Logstash's logs to E:/softwareInstallDirecory/logstash/logstash-5.5.1/logs which is now configured via log4j2.properties [2018-10-11T10:12:20,773][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://127.0.0.1:9200/]}} [2018-10-11T10:12:20,773][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://127.0.0.1:9200/, :path=>"/"} [2018-10-11T10:12:20,914][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>#<Java::JavaNet::URI:0x2329ed6d>} [2018-10-11T10:12:20,930][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil} [2018-10-11T10:12:20,991][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>50001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"_all"=>{"enabled"=>true, "norms"=>false}, "dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date", "include_in_all"=>false}, "@version"=>{"type"=>"keyword", "include_in_all"=>false}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}} [2018-10-11T10:12:21,007][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>[#<Java::JavaNet::URI:0x609ab5a>]} [2018-10-11T10:12:21,007][INFO ][logstash.pipeline ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500} [2018-10-11T10:12:21,304][INFO ][logstash.pipeline ] Pipeline main started [2018-10-11T10:12:21,413][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
注意:這個程序會一直處於執行狀態,若csv檔案被修改,程式會自動重新匯入資料;
若需要重新匯入資料,則需要刪除logstash安裝目錄下\data\plugins\inputs\file下的檔案。重新執行logstash -f logstash.conf
三、使用es-head檢視資料