關於MySQL匯入資料到elasticsearch的小工具logstash
阿新 • • 發佈:2021-08-30
-
logstash核心配置檔案pipelines.yml
#注:此處的 - 必須頂格寫必須!!! - pipeline.id: invitation #下面路徑配置的是你同步資料是的欄位對映關係 path.config: /opt/apps/logstash/config/invitation/invitation.conf
-
同步資料時的欄位對映關係配置檔案invitation.conf。注:路徑一定跟你pipelines.yml配置檔案中的一樣
input { jdbc { #驅動jar包的位置 jdbc_driver_library => "/opt/apps/logstash/lib/mysql-connector-java-8.0.13.jar" #驅動類名 jdbc_driver_class => "com.mysql.cj.jdbc.Driver" #MySQL的連結 jdbc_connection_string => "jdbc:mysql://192.168.0.234:3306/community?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true" #資料庫使用者名稱 jdbc_user => "mostchh" #資料庫密碼 jdbc_password => "1qaz2wsx3edc" #資料庫重連嘗試次數 connection_retry_attempts => "3" #超時時間 jdbc_validation_timeout => "3600" #開啟分頁查詢(預設false不開啟) jdbc_paging_enabled => "true" #單次分頁查詢條數(預設100000,若欄位較多且更新頻率較高,建議調低此值) jdbc_page_size => "500" #時區 #jdbc_default_timezone =>"Asia/Shanghai" #如果sql較複雜,建議配通過statement_filepath配置sql檔案的存放路徑; statement_filepath => "/opt/apps/logstash/sql/invitation/invitation.sql" #需要記錄查詢結果某欄位的值時,此欄位為true use_column_value => true #是否設定欄位為小寫,預設是true lowercase_column_names => false #需要記錄的遞增欄位,用於增量同步,下次只同步比該值大的資料 tracking_column => "modifiedTime" #遞增的欄位型別 tracking_column_type => "timestamp" #記錄上一次執行記錄 record_last_run => true #上一次同步的遞增欄位存放檔案路徑 last_run_metadata_path => "/opt/apps/logstash/station/invitation.txt" #是否清除last_run_metadata_path的記錄,需要增量同步時此欄位必須為false clean_run => false #自動同步資料的cron表示式,下面是一秒執行一次 schedule => "*/1 * * * * *" #對應你pipelines配置檔案的ID type => "invitation" } } #資料處理的過濾器 filter { aggregate { task_id => "%{cardId}" code => " map['cardId'] = event.get('cardId') map['title'] = event.get('title') map['content'] = event.get('content') map['issueUserId'] = event.get('issueUserId') map['issueUserName'] = event.get('issueUserName') map['issueUserIcon'] = event.get('issueUserIcon') map['issueTime'] = event.get('issueTime') map['revealStatus'] = event.get('revealStatus') map['commentNum'] = event.get('commentNum') map['isMeLike'] = event.get('isMeLike') map['giveLikeNum'] = event.get('giveLikeNum') map['isDelete'] = event.get('isDelete') map['issueStatus'] = event.get('issueStatus') map['cardStatus'] = event.get('cardStatus') map['giveLikeUsers'] ||=[] #資料一對多的處理 if (event.get('userId') != nil) if !(map['giveLikeUsers'].include? event.get('userId')) map['giveLikeUsers'] << event.get('userId') end end map['file_list'] ||=[] map['fileList'] ||=[] #資料一對多的處理 if (event.get('fileId') != nil) if !(map['file_list'].include? event.get('fileId')) map['file_list'] << event.get('fileId') map['fileList'] << { 'fileId' => event.get('fileId'), 'fileName' => event.get('fileName'), 'fileUrl' => event.get('fileUrl') } end end event.cancel() " push_previous_map_as_event => true timeout => 5 } mutate { } mutate { #過濾不需要的欄位 remove_field => ["@timestamp","@version"] } } output { elasticsearch { document_id => "%{cardId}" document_type => "_doc" index => "bbs_card_management" hosts => ["http://192.168.0.178:9200"] } stdout{ codec => rubydebug } }
-
同步資料的SQL配置檔案invitation.sql,具體的SQL就根據你的業務來定了,我這裡用的檢視所以SQL比較簡單。注:路徑一定跟你invitation.conf配置檔案中的一樣
SELECT * FROM invitation WHERE modifiedTime >= :sql_last_value AND modifiedTime < NOW()
此處的:sql_last_value 取得就是你遞增欄位存放地址中的值
-
遞增欄位存放檔案invitation.txt。注:路徑一定跟你invitation.conf配置檔案中的一樣
--- 2021-08-30 15:22:08.000000000 +00:00
配置的存放型別是時間型別,儲存格式就是這樣的。
以上就是logstash的所有配置了,只需要執行即可實現一秒同步一次資料,當然具體多久同步根據你具體的需求來定。
一切都是最好的安排。