1. 程式人生 > >ElasticSearch資料匯入

ElasticSearch資料匯入

作者:dominating

本文將介紹通過logstash收集.csv檔案,oracle資料庫資料再匯入到ElasticSearch中,以及SuperMap iClient for JavaScript 9D與ElasticSearch的結合使用。

安裝logstash

bin/logstash -e 'input { stdin { } } output { stdout {} }'

輸入hello world,logstash將會輸出內容到控制檯

2018-01-29T16 :36:49,507 +0000 0.0.0.0 hello world

匯入.csv檔案

編寫配置檔案


收集和匯入資料需要藉助logstash的input,filter,output外掛來編寫配置檔案:

input {
  file {
    path => ["/opt/flights2.csv"]
    start_position => "beginning"
  }
}
filter {
  csv {
    separator => ","
    columns => ["ident","lon","lat","temp","origin","destination"]
  } 
}
output {
  elasticsearch {
        hosts => ["192.168.255.143:9200"
] index => "flight" } }

引數說明:
input外掛
file:資料來源為檔案型
path:#必選項,配置檔案路徑.如我使用的以下.csv檔案
這裡寫圖片描述
start_position:logstash從哪個位置讀取檔案資料,預設從尾部,值為:end,如果要匯入歷史資料則設定成:beginning
filter外掛
csv:csv檔案過濾器
separator:定義列分割符值。預設為逗號’,’
columns:定義一個列名稱列表,按照在CSV中出現的順序
output外掛
elasticsearch輸出目標為elasticsearch,配置host和index索引名

執行logstash

/opt/logstash-6.1.2/bin/logstash -f /opt/test.conf

-f:指定配置檔案路徑
這裡寫圖片描述
檢視ElasticSearch
這裡寫圖片描述
這裡寫圖片描述

"took": 9,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": 100,
        "max_score": 1.0,
        "hits": [{
            "_index": "flight",
            "_type": "doc",
            "_id": "9pMPQWEBy48LTNYe0eDu",
            "_score": 1.0,
            "_source": {
                "temp": "1.49E+12",
                "@timestamp": "2018-01-29T08:36:50.287Z",
                "host": "ubuntu-node3",
                "lat": "29.824944",
                "ident": "T0000",
                "origin": "Lishe",
                "destination": "Jiangbei",
                "message": "T0000,121.465069,29.824944,1.49E+12,Lishe,Jiangbei\r",
                "@version": "1",
                "path": "/opt/flights2.csv",
                "lon": "121.465069"
            }
        }]
    }

可以看到匯入了100條資料,並且能夠被查詢到。

匯入oracle資料

需要在logstash所在機器上提前安裝oracle客戶端,並且配置好oracle的環境變數
測試oracle資料庫是否能正常通訊
這裡寫圖片描述
編寫logstash配置檔案
需要匯入的oracle資料
這裡寫圖片描述

input {
  jdbc {
    jdbc_driver_library => "/opt/ojdbc6.jar"
    jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
    jdbc_connection_string => "jdbc:oracle:thin:@//192.168.15.89:1521/supermap"
    jdbc_user => "liu"
    jdbc_password => "supermap"
    schedule => "* * * * *"
    statement => "select * from SMDTV_341"
    type => "jdbc"
    last_run_metadata_path => "/home/elsearch/logstash-oradb.lastrun"
  }
}
filter {

}
output {
  elasticsearch {
        hosts => ["192.168.255.143:9200"]
        index => "test"
  }
}

引數說明:
jdbc_driver_library:在oracle目錄下,如我的在D:\app\wangwu\product\11.2.0\dbhome_1\jdbc\lib下面,複製到指定目錄
這裡寫圖片描述
schedule:查詢間隔,”* * * * *”每分鐘查詢一次,不設定則只執行一次
last_run_metadata_path:最後更新時間檔案位置
statement:SQL查詢語句
index: 索引,可以先建立一個索引再匯入

將檔案儲存為jdbc.conf放在logstash所在機器,執行

/opt/logstash-6.1.2/bin/logstash -f /opt/jdbc.conf

這裡寫圖片描述
檢視結果
這裡寫圖片描述
說明資料已經匯入成功,並且能夠被查詢到。

匯入地理座標點資料

ElasticSearch提供了地理位置功能,並且能夠把地理位置、全文搜尋、結構化搜尋和分析結合到一起。
ElasticSearch中儲存地理座標資料需要使用geo-point型別,並且必須提前顯式宣告
1、建立索引,並且宣告location為geo-point型別

curl -XPUT '192.168.255.143:9200/test3?pretty' -H 'Content-Type: application/json' -d' 
{ 
  "mappings": { 
    "capital" : { 
      "properties" : { 
        "location" : { 
          "type" :    "geo_point" 
        } 
      } 
    } 
  } 
} 
'

建立索引test3,對映欄位location,並且宣告為geo-point型別
2、編寫配置檔案
將帶地理座標的資料匯入到ElasticSearch中
這裡寫圖片描述

input { 
  jdbc { 
    jdbc_driver_library => "/opt/ojdbc6.jar" 
    jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver" 
    jdbc_connection_string => "jdbc:oracle:thin:@//192.168.15.89:1521/supermap" 
    jdbc_user => "liu" 
    jdbc_password => "supermap" 
    statement => "select * from SMDTV_361" 
    type => "jdbc" 
    last_run_metadata_path => "/home/elsearch/logstash-oradb.lastrun" 
  } 
} 
filter { 
    mutate { 
        add_field => {"location" => "%{smy},%{smx}"} 
    } 
} 
output { 
  elasticsearch { 
        hosts => ["192.168.255.143:9200"] 
        index => "test3" 
        document_type => "capital" 
  } 
}

由於原表中並沒有location欄位,所以filter中使用mutate外掛給收集到的資料新增欄位location
3、執行匯入命令

/opt/logstash-6.1.2/bin/logstash -f /opt/jdbc.conf

4、網格聚合
將地理位置資料匯入成功之後,我們就可以使用ElasticSearch提供的地理位置功能了,以下我們將執行Geohash網格聚合:

curl -XGET '192.168.255.143:9200/test3/capital/_search?pretty' -H 'Content-Type: application/json' -d' 
{ 
  "query": { 
    "constant_score": { 
      "filter": { 
        "geo_bounding_box": { 
          "location": { 
            "top_left": { 
              "lat":  90, 
              "lon": -180 
            }, 
            "bottom_right": { 
              "lat":  -90, 
              "lon": 180 
            } 
          } 
        } 
      } 
    } 
  }, 
  "aggs": { 
    "world": { 
      "geohash_grid": { 
        "field":     "location", 
        "precision": 1 
      } 
    } 
  } 
} 
'

SuperMap iClient for JavaScript 9D和ElasticSearch的結合使用

SuperMap iClient for JavaScript 9D封裝了ElasticSearch的JavaScript API,我們以for Leaflet為例查詢之前匯入的資料。
1、 定義服務

liveESService = new SuperMap.ElasticSearch("http://192.168.255.143:9200");

2、 傳入查詢條件,成功回撥函式

function loadLiveData() {
        var liveParameters = [];
        liveParameters.push({index: "flight"});
        liveParameters.push({
            "query": {
                "match_all":{}
            },
            "from": 0,
            "size": 100
        });
        liveESService.msearch({body: liveParameters}, function (error, result) {
            if (error) {
                widgets.alert.showAlert(JSON.stringify(error), false);
                return;
            }
            renderLive(result.responses);
        });
    }

使用new SuperMap.ElasticSearch(url).msearch(params,callback)方法,傳入查詢引數,定義回撥函式
3、 處理資料新增到地圖
這裡寫圖片描述
4、更多關於SuperMap iClient for JavaScript 9D和ElasticSearch的結合使用的例子
http://iclient.supermap.io/examples/leaflet/examples.html#Elasticsearch
熱力/網格圖
這裡寫圖片描述

航班監控
這裡寫圖片描述