1. 程式人生 > 其它 >關於MySQL匯入資料到elasticsearch的小工具logstash

關於MySQL匯入資料到elasticsearch的小工具logstash

  1. logstash核心配置檔案pipelines.yml

     #注:此處的 - 必須頂格寫必須!!!
     - pipeline.id: invitation
     #下面路徑配置的是你同步資料是的欄位對映關係
       path.config: /opt/apps/logstash/config/invitation/invitation.conf
    
  2. 同步資料時的欄位對映關係配置檔案invitation.conf。注:路徑一定跟你pipelines.yml配置檔案中的一樣

     input {
     
     jdbc {
     #驅動jar包的位置
     jdbc_driver_library => "/opt/apps/logstash/lib/mysql-connector-java-8.0.13.jar"
     #驅動類名
     jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
     #MySQL的連結
     jdbc_connection_string => "jdbc:mysql://192.168.0.234:3306/community?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true"
     #資料庫使用者名稱
     jdbc_user => "mostchh"
     #資料庫密碼
     jdbc_password => "1qaz2wsx3edc"
     #資料庫重連嘗試次數
     connection_retry_attempts => "3"
     #超時時間
     jdbc_validation_timeout => "3600"
     #開啟分頁查詢(預設false不開啟)
     jdbc_paging_enabled => "true"
     #單次分頁查詢條數(預設100000,若欄位較多且更新頻率較高,建議調低此值)
     jdbc_page_size => "500"
     #時區
     #jdbc_default_timezone =>"Asia/Shanghai"    
     #如果sql較複雜,建議配通過statement_filepath配置sql檔案的存放路徑;
     statement_filepath => "/opt/apps/logstash/sql/invitation/invitation.sql"
     #需要記錄查詢結果某欄位的值時,此欄位為true
     use_column_value => true
     #是否設定欄位為小寫,預設是true
     lowercase_column_names => false
     #需要記錄的遞增欄位,用於增量同步,下次只同步比該值大的資料
     tracking_column => "modifiedTime"
     #遞增的欄位型別
     tracking_column_type => "timestamp"
     #記錄上一次執行記錄
     record_last_run => true
     #上一次同步的遞增欄位存放檔案路徑
     last_run_metadata_path => "/opt/apps/logstash/station/invitation.txt"
     #是否清除last_run_metadata_path的記錄,需要增量同步時此欄位必須為false
     clean_run => false
     #自動同步資料的cron表示式,下面是一秒執行一次
     schedule => "*/1 * * * * *"
     #對應你pipelines配置檔案的ID
     type => "invitation"
     
     }
     
     }
     #資料處理的過濾器
     filter {
     aggregate {
     task_id => "%{cardId}"
     code => "
     	map['cardId'] = event.get('cardId')
     	map['title'] = event.get('title')
                     map['content'] = event.get('content')
                     map['issueUserId'] = event.get('issueUserId')
                     map['issueUserName'] = event.get('issueUserName')
                     map['issueUserIcon'] = event.get('issueUserIcon')
     	map['issueTime'] = event.get('issueTime')
     	map['revealStatus'] = event.get('revealStatus')
                     map['commentNum'] = event.get('commentNum')
                     map['isMeLike'] = event.get('isMeLike')
                     map['giveLikeNum'] = event.get('giveLikeNum')
                     map['isDelete'] = event.get('isDelete')
     				map['issueStatus'] = event.get('issueStatus')
     				map['cardStatus'] = event.get('cardStatus')
     	map['giveLikeUsers'] ||=[]
     	#資料一對多的處理
     	if (event.get('userId') != nil)
     		if !(map['giveLikeUsers'].include? event.get('userId'))  
     			map['giveLikeUsers'] << event.get('userId')
     		end
     	end
     	map['file_list'] ||=[]
     	map['fileList'] ||=[]
     	#資料一對多的處理
     	if (event.get('fileId') != nil)
     		if !(map['file_list'].include? event.get('fileId'))  
     			map['file_list'] << event.get('fileId')        
     			map['fileList'] << {
     				'fileId' => event.get('fileId'),
     				'fileName' => event.get('fileName'),
     				'fileUrl' => event.get('fileUrl')
     			}
     		end
     	end
     	event.cancel()
     "
     
     push_previous_map_as_event => true
     timeout => 5
     }
     mutate {
     }
      mutate {
      #過濾不需要的欄位
    remove_field => ["@timestamp","@version"]
     }
     }
    
     output {
     elasticsearch {
    document_id => "%{cardId}"
    document_type => "_doc"
    index => "bbs_card_management"
    hosts => ["http://192.168.0.178:9200"]
      }
     stdout{
    codec => rubydebug
     }
     }
    
  3. 同步資料的SQL配置檔案invitation.sql,具體的SQL就根據你的業務來定了,我這裡用的檢視所以SQL比較簡單。注:路徑一定跟你invitation.conf配置檔案中的一樣

     SELECT
     * 
     FROM
     invitation 
     WHERE
     modifiedTime >= :sql_last_value 
     AND modifiedTime < NOW()
    

    此處的:sql_last_value 取得就是你遞增欄位存放地址中的值

  4. 遞增欄位存放檔案invitation.txt。注:路徑一定跟你invitation.conf配置檔案中的一樣

     --- 2021-08-30 15:22:08.000000000 +00:00
    

    配置的存放型別是時間型別,儲存格式就是這樣的。

以上就是logstash的所有配置了,只需要執行即可實現一秒同步一次資料,當然具體多久同步根據你具體的需求來定。

一切都是最好的安排。