使用Logstash來實時同步MySQL和log日誌資料到ES
少年,光看是不行的,我的github在這裡,跟著做吧:https://github.com/singgel/NoSql-SkillTree
logstash是一個數據分析軟體,主要目的是分析log日誌。整一套軟體可以當作一個MVC模型,logstash是controller層,Elasticsearch是一個model層,kibana是view層。
首先將資料傳給logstash,它將資料進行過濾和格式化(轉成JSON格式),然後傳給Elasticsearch進行儲存、建搜尋的索引,kibana提供前端的頁面再進行搜尋和圖表視覺化,它是呼叫Elasticsearch的介面返回的資料進行視覺化。logstash和Elasticsearch是用Java寫的,kibana使用node.js框架。
以下以mysql為例,log同理
一、首先下載和你的ES對應的logstash版本,本篇我們使用的都是6.3.1
下載後使用logstash-plugin install logstash-input-jdbc 命令安裝jdbc的資料連線外掛
二、新增mysqltoes.conf檔案,配置Input和output引數如下,連線jdbc按照規則同步指定的資料到es
大家注意這裡的配置有很多種用法,包括同步時間規則和最後更新時間的用法就不詳細展開了
input { stdin { } jdbc { jdbc_connection_string => "jdbc:mysql://localhost:3306/star" jdbc_user => "root" jdbc_password => "123456" jdbc_driver_library => "/Users/yiche/.m2/repository/mysql/mysql-connector-java/8.0.12/mysql-connector-java-8.0.12.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" statement => "SELECT * FROM channel_data_detail_month" schedule => "* * * * *" } } output { stdout { codec => json_lines } elasticsearch { hosts => "localhost:9200" index => "star" document_type => "channel_data_detail_month" document_id => "%{id}" } }
使用logstash按照conf檔案執行 ./bin/logstash.bat -f ./config/mysqltoes.conf
注意這裡可能有執行不成功的坑,主要是把配置設定好,還有檔案和名稱編碼的問題 output es的配置用hosts
這時候我們可以看到MYSQL中的表資料已成功匯入ES
---
log日誌的
1. 定義資料來源
寫一個配置檔案,可命名為logstash.conf,輸入以下內容:
input { file { path => "/data/web/logstash/logFile/*/*" start_position => "beginning" #從檔案開始處讀寫 } # stdin {} #可以從標準輸入讀資料 }
定義的資料來源,支援從檔案、stdin、kafka、twitter等來源,甚至可以自己寫一個input plugin。如果像上面那樣用萬用字元寫file,如果有新日誌檔案拷進來,它會自動去掃描。
2. 定義資料的格式
根據打日誌的格式,用正則表示式進行匹配
filter {
#定義資料的格式
grok {
match => { "message" => "%{DATA:timestamp}\|%{IP:serverIp}\|%{IP:clientIp}\|%{DATA:logSource}\|%{DATA:userId}\|%{DATA:reqUrl}\|%{DATA:reqUri}\|%{DATA:refer}\|%{DATA:device}\|%{DATA:textDuring}\|%{DATA:duringTime:int}\|\|"}
}
}
由於打日誌的格式是這樣的:
2015-05-07-16:03:04|10.4.29.158|120.131.74.116|WEB|11299073|http://quxue.renren.com/shareApp?isappinstalled=0&userId=11299073&from=groupmessage|/shareApp|null|Mozilla/5.0 (iPhone; CPU iPhone OS 8_2 like Mac OS X) AppleWebKit/600.1.4 (KHTML, like Gecko) Mobile/12D508 MicroMessenger/6.1.5 NetType/WIFI|duringTime|98||
以|符號隔開,第一個是訪問時間,timestamp,作為logstash的時間戳,接下來的依次為:服務端IP,客戶端的IP,機器型別(WEB/APP/ADMIN),使用者的ID(沒有用0表示),請求的完整網址,請求的控制器路徑,reference,裝置的資訊,duringTime,請求所花的時間。
如上面程式碼,依次定義欄位,用一個正則表示式進行匹配,DATA是logstash定義好的正則,其實就是(.*?),並且定義欄位名。
我們將訪問時間作為logstash的時間戳,有了這個,我們就可以以時間為區分,檢視分析某段時間的請求是怎樣的,如果沒有匹配到這個時間的話,logstash將以當前時間作為該條記錄的時間戳。需要再filter裡面定義時間戳的格式,即打日誌用的格式:
filter {
#定義資料的格式
grok {#同上... }
#定義時間戳的格式
date {
match => [ "timestamp", "yyyy-MM-dd-HH:mm:ss" ]
locale => "cn"
}
}
在上面的欄位裡面需要跟logstash指出哪個是客戶端IP,logstash會自動去抓取該IP的相關位置資訊:
filter {
#定義資料的格式
grok {#同上}
#定義時間戳的格式
date {#同上}
#定義客戶端的IP是哪個欄位(上面定義的資料格式)
geoip {
source => "clientIp"
}
}
同樣地還有客戶端的UA,由於UA的格式比較多,logstash也會自動去分析,提取作業系統等相關資訊
#定義客戶端裝置是哪一個欄位 useragent { source => "device" target => "userDevice" }
哪些欄位是整型的,也需要告訴logstash,為了後面分析時可進行排序,使用的資料裡面只有一個時間
#需要進行轉換的欄位,這裡是將訪問的時間轉成int,再傳給Elasticsearch mutate { convert => ["duringTime", "integer"] }
3. 輸出配置
最後就是輸出的配置,將過濾扣的資料輸出到elasticsearch
output {
#將輸出儲存到elasticsearch,如果沒有匹配到時間就不儲存,因為日誌裡的網址引數有些帶有換行
if [timestamp] =~ /^\d{4}-\d{2}-\d{2}/ {
elasticsearch { host => localhost }
}
#輸出到stdout
# stdout { codec => rubydebug }
#定義訪問資料的使用者名稱和密碼
# user => webService
# password => 1q2w3e4r
}
我們將上述配置,儲存到logstash.conf,然後執行logstash
在logstash啟動完成之後,輸入上面的那條訪問記錄,logstash將輸出過濾後的資料:
可以看到logstash,自動去查詢IP的歸屬地,並將請求裡面的device欄位進行分析。