1. 程式人生 > >logstash從csv檔案匯入資料到elasticsearch

logstash從csv檔案匯入資料到elasticsearch

logstash的安裝部署自行百度
注意:要和es的版本一致,這裡使用的都是5.5.1版本
一、在logstash的bin目錄下建立logstash.conf檔案:

input {
  file {
    path => ["C:\Users\Desktop\test.csv"]  
    start_position => "beginning"
  }
}
filter {
  csv {
    separator => ","
    columns => ["name","age"]
  }
  mutate {
    convert => {
      "name" => "string"
      "age" => "integer"
    }
  }
 }

output {
  elasticsearch {
        hosts => ["127.0.0.1:9200"]
        index => "test2"
		document_type => "test2"
  }
}

其中:

input

input元件負責讀取資料,使用以下外掛讀取不同的資料型別。
       file外掛讀取本地文字檔案,
       stdin外掛讀取標準輸入資料,
       tcp外掛讀取網路資料,
       log4j外掛讀取log4j傳送過來的資料等等。
path:csv檔案路徑
start_position:可以設定為beginning或者end,beginning表示從頭開始讀取檔案,end表示讀取最新的,這個也要和ignore_older一起使用。

filter

filter外掛負責過濾解析input讀取的資料
讀取csv檔案:
separator:拆分符
columns:csv檔案中的欄位,注意:要和 csv檔案中欄位順序一致

output

hosts:主機ip
index:設定es中的索引名稱
document_type:索引下的type名稱

對於csv檔案需要注意一下幾點:
1、第一行不需要儲存欄位名稱,直接就是欄位值資訊
2、最後一行要換行
csv檔案示例:
在這裡插入圖片描述

二、在logstash的bin目錄下執行 logstash -f logstash.conf,出現如下資訊則表示執行成功:

E:\softwareInstallDirecory\logstash\logstash-5.5.1\bin>logstash -f logstash.conf
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
Sending Logstash's logs to E:/softwareInstallDirecory/logstash/logstash-5.5.1/logs which is now configured via log4j2.properties
[2018-10-11T10:12:20,773][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://127.0.0.1:9200/]}}
[2018-10-11T10:12:20,773][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://127.0.0.1:9200/, :path=>"/"}
[2018-10-11T10:12:20,914][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>#<Java::JavaNet::URI:0x2329ed6d>}
[2018-10-11T10:12:20,930][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2018-10-11T10:12:20,991][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>50001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"_all"=>{"enabled"=>true, "norms"=>false}, "dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date", "include_in_all"=>false}, "@version"=>{"type"=>"keyword", "include_in_all"=>false}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2018-10-11T10:12:21,007][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>[#<Java::JavaNet::URI:0x609ab5a>]}
[2018-10-11T10:12:21,007][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
[2018-10-11T10:12:21,304][INFO ][logstash.pipeline        ] Pipeline main started
[2018-10-11T10:12:21,413][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

注意:這個程序會一直處於執行狀態,若csv檔案被修改,程式會自動重新匯入資料;
若需要重新匯入資料,則需要刪除logstash安裝目錄下\data\plugins\inputs\file下的檔案。重新執行logstash -f logstash.conf

三、使用es-head檢視資料
在這裡插入圖片描述