1. 程式人生 > >logstash 將資料庫中的經緯度 轉化成 geo_point 型別

logstash 將資料庫中的經緯度 轉化成 geo_point 型別

最近在使用ELK工具,主要是使用kibana進行資料視覺化,該工具非常方便,牆裂推薦!!!

要使用coordinate map時,需要將資料轉換成它需要的geo_point型別,網上有很多使用geoip外掛將ip轉換成經緯度的教程,這裡沒有ip,只有經緯度資料。

下圖為geo_point的幾種形式,這裡我採用的是最後一種,陣列的格式

 

由於我不太懂這裡面的語法,只是簡單的使用,所以這裡只能給出我的解決方案,以及踩過的坑,沒有原理解釋。

主要在filter中,“rdc_lon”和“rdc_lat”為我資料庫中的欄位名稱,首先將經緯度的值賦給新的變數 geoip.location,然後將其轉化為float型別,要注意

的是【經度,緯度】,陣列格式經度在前

將其轉換為float後,第二步是將location變成 geo_point的型別,我沒有自己寫模板,logstash自帶的用來處理日誌的logstash-*模板就能完成此種轉換,因此在輸出到elasticsearch中,將index的命名設為 logstash-XXXX 就能使用預設的模板。

這裡踩的坑是,獲取資料庫裡的欄位rdc_lon,rdc_lat時,由於資料庫裡設定了大小寫,在資料庫中實際欄位名稱為“RDC_lon”,“RDC_lat”,沒想到在logstash不考慮大小寫,因此一直沒有獲取到值。

  1 input {
  2     jdbc {
  3         jdbc_driver_library => "/opt/logstash/lib/sqlserverdriver/mssql-jdbc-6.4.0.jre8.jar"
  4         jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
  5         jdbc_connection_string => "jdbc:sqlserver://0.0.0.0:1433;databaseName=shipment;"
  6         jdbc_user => "user"
  7         jdbc_password => "password"
  8         #schedule => "* * * * *"
  9         jdbc_default_timezone => "Asia/Shanghai"
 10         statement => "SELECT * FROM [table_name]"
 11     }
 12 }
 13 filter {
 14     mutate {
 15         add_field => ["[geoip][location]", "%{rdc_lon}"]
 16         add_field => ["[geoip][location]", "%{rdc_lat}"]
 17     }
 18     mutate {
 19         convert => ["[geoip][location]", "float"]
 20     }
 21 }
 22 
 23 output {
 24         elasticsearch {
 25             index => "logstash-geo_demo"
 26             hosts => ["0.0.0.0:9200"]
 27         }

最後的效果圖如下