logstash_output_kafka:Mysql同步Kafka深入詳解
0、題記
實際業務場景中,會遇到基礎資料存在Mysql中,實時寫入資料量比較大的情景。遷移至kafka是一種比較好的業務選型方案。
而mysql寫入kafka的選型方案有:
方案一:logstash_output_kafka 外掛。
方案二:kafka_connector。
方案三:debezium 外掛。
方案四:flume。
方案五:其他類似方案。
其中:debezium和flume是基於mysql binlog實現的。
如果需要同步歷史全量資料+實時更新資料,建議使用logstash。
1、logstash同步原理
常用的logstash的外掛是:logstash_input_jdbc實現關係型資料庫到Elasticsearch等的同步。
實際上,核心logstash的同步原理的掌握,有助於大家理解類似的各種庫之間的同步。
logstash核心原理:輸入生成事件,過濾器修改它們,輸出將它們傳送到其他地方。
logstash核心三部分組成:input、filter、output。
input { }
filter { }
output { }
1.1 input輸入
包含但遠不限於:
- jdbc:關係型資料庫:mysql、oracle等。
- file:從檔案系統上的檔案讀取。
- syslog:在已知埠514上偵聽syslog訊息。
- redis:redis訊息。beats:處理 Beats傳送的事件。
- kafka:kafka實時資料流。
1.2 filter過濾器
過濾器是Logstash管道中的中間處理裝置。您可以將過濾器與條件組合,以便在事件滿足特定條件時對其執行操作。
可以把它比作資料處理的ETL環節。
一些有用的過濾包括:
- grok:解析並構造任意文字。Grok是目前Logstash中將非結構化日誌資料解析為結構化和可查詢內容的最佳方式。有了內置於Logstash的120種模式,您很可能會找到滿足您需求的模式!
- mutate:對事件欄位執行常規轉換。您可以重新命名,刪除,替換和修改事件中的欄位。
- drop:完全刪除事件,例如除錯事件。
- clone:製作事件的副本,可能新增或刪除欄位。
- geoip:新增有關IP地址的地理位置的資訊。
1.3 output輸出
輸出是Logstash管道的最後階段。一些常用的輸出包括:
elasticsearch:將事件資料傳送到Elasticsearch。
file:將事件資料寫入磁碟上的檔案。
kafka:將事件寫入Kafka。
詳細的filter demo參考:
2、同步Mysql到kafka配置參考
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://192.168.1.12:3306/news_base"
jdbc_user => "root"
jdbc_password => "xxxxxxx"
jdbc_driver_library => "/home/logstash-6.4.0/lib/mysql-connector-java-5.1.47.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
#schedule => "* * * * *"
statement => "SELECT * from news_info WHERE id > :sql_last_value order by id"
use_column_value => true
tracking_column => "id"
tracking_column_type => "numeric"
record_last_run => true
last_run_metadata_path => "/home/logstash-6.4.0/sync_data/news_last_run"
}
}
filter {
ruby{
code => "event.set('gather_time_unix',event.get('gather_time').to_i*1000)"
}
ruby{
code => "event.set('publish_time_unix',event.get('publish_time').to_i*1000)"
}
mutate {
remove_field => [ "@version" ]
remove_field => [ "@timestamp" ]
remove_field => [ "gather_time" ]
remove_field => [ "publish_time" ]
}
}
output {
kafka {
bootstrap_servers => "192.168.1.13:9092"
codec => json_lines
topic_id => "mytopic"
}
file {
codec => json_lines
path => "/tmp/output_a.log"
}
}
以上內容不復雜,不做細講。
注意:
Mysql藉助logstash同步後,日期型別格式:“2019-04-20 13:55:53”已經被識別為日期格式。
code =>
"event.set('gather_time_unix',event.get('gather_time').to_i*1000)",
是將Mysql中的時間格式轉化為時間戳格式。
3、坑總結
3.1 坑1欄位大小寫問題
from星友:使用logstash同步mysql資料的,因為在jdbc.conf裡面沒有新增 lowercase_column_names
=> "false" 這個屬性,所以logstash預設把查詢結果的列明改為了小寫,同步進了es,所以就導致es裡面看到的欄位名稱全是小寫。
最後總結:es是支援大寫欄位名稱的,問題出在logstash沒用好,需要在同步配置中加上 lowercase_column_names => "false" 。記錄下來希望可以幫到更多人。
3.2 同步到ES中的資料會不會重複?
想將關係資料庫的資料同步至ES中,如果在叢集的多臺伺服器上同時啟動logstash。
解讀:實際專案中就是沒用隨機id 使用指定id作為es的_id ,指定id可以是url的md5.這樣相同資料就會走更新覆蓋以前資料
3.3 相同配置logstash,升級6.3之後不能同步資料。
解讀:高版本基於時間增量有優化。
tracking_column_type => "timestamp"應該是需要指定標識為時間型別,預設為數字型別numeric
3.4 ETL欄位統一在哪處理?
解讀:可以logstash同步mysql的時候sql查詢階段處理,如:select a_value as avalue***。
或者filter階段處理,mutate rename處理。
mutate {
rename => ["shortHostname", "hostname" ]
}
或者kafka階段藉助kafka stream處理。
4、小結
- 相關配置和同步都不復雜,複雜點往往在於filter階段的解析還有logstash效能問題。
- 需要結合實際業務場景做深入的研究和效能分析。
- 有問題,歡迎留言討論。
推薦閱讀:
1、實戰 | canal 實現Mysql到Elasticsearch實時增量同步
2、乾貨 | Debezium實現Mysql到Elasticsearch高效實時同步
3、一張圖理清楚關係型資料庫與Elasticsearch同步
4、新的實現:
5、mysql2mysql:
6、推薦開源實現:
加入星球,更短時間更快習得更多幹貨!