1. 程式人生 > 實用技巧 >Input模組外掛的詳細介紹

Input模組外掛的詳細介紹

jdbc外掛

  1. 增量同步和全量同步:
    1. 全量同步:是指將全部資料同步到es(通常是剛建立es,第一次同步時使用)
    2. 增量同步:是指將後續的更新、插入的記錄同步到es
  2. 增量同步更新的注意點:
    1. tracking_column的欄位必須是遞增的欄位(如id或者時間欄位)
    2. tracking_column欄位是id,則只能增加資料,而無法修改資料
    3. tracking_column欄位是時間欄位(如event_date),才可以實現同步修改操作
  3. 引數說明:
    1. record_last_run:是否記錄上次執行結果;如果為真,將會把上次執行到的tracking_column欄位的最後一行的值記錄下來,儲存到 last_run_metadata_path
    2. use_column_value:是否需要記錄某個column 的值,record_last_run 為真,可以自定義 track _column 名稱, 否則預設 track_column 的是 timestamp 的值
    3. tracking_column:如果 use_column_value 為真,需配置此引數;這個引數就是資料庫給出的一個欄位名稱;該 column 必須是遞增的。比如:記錄時間的欄位或者是自增的id
    4. last_run_metadata_path:指定檔案,來記錄上次執行到的tracking_column 欄位的最後一行的值;可以在SQL語句中運用該值(WHERE MY_ID > :last_sql_value);last_sql_value 取得就是該檔案中的值
    5. clean_run:是否清除 last_run_metadata_path 的記錄;如果為真,那麼每次都相當於從頭開始查詢所有的資料庫記錄
    6. lowercase_column_names:是否將 column 名稱轉小寫
    7. statement:執行SQL語句
    8. statement_filepath:可以將SQL語句寫入某個檔案中,statement_filepath就是執行SQL檔案的路徑
    9. schedule:設定監聽間隔,多久執行一次
      1. 從左到右的含義:分、時、日、月、年
      2. schedule => "* * * * *":全部為*預設含義為每分鐘都更新
      3. schedule => "40 12 30 7 2020":年份不能寫具體的年份,不然會報錯: Error: invalid weekday expression (2020)
      4. schedule => "40 12 30 7 *"
  4. 全量同步例項:
    input {
                jdbc {
                    # mysql 資料庫連結,test為資料庫名:"jdbc:mysql://192.168.29.1:3306/test?useUnicode=true&characterEncoding=utf8"
                    jdbc_connection_string => "jdbc:mysql://192.168.29.1:3306/test"
                    # 使用者名稱和密碼
                    jdbc_user => "root"
                    jdbc_password => "root"
                    # 驅動
                    jdbc_driver_library => "/opt/jar/mysql-connector-java-5.1.6.jar"
                    # 驅動類名
                    jdbc_driver_class => "com.mysql.jdbc.Driver"
                    #是否分頁
                    jdbc_paging_enabled => "true"
                    jdbc_page_size => "50000"
                    #直接執行sql語句
                    statement =>"select * from MdL_001"
                    # 執行的sql 檔案路徑+名稱
                    #statement_filepath => "/opt/data/jdbc.sql"
                    # 設定監聽間隔  各欄位含義(由左至右)分、時、天、月、年,全部為*預設含義為每分鐘都更新
                    schedule => "* * * * *"
                    # 索引型別
                    type => "jdbc"
                }
    }
  5. 增量同步例項:
    input {
        jdbc {
            # mysql 資料庫連結,test為資料庫名
            jdbc_connection_string => "jdbc:mysql://192.168.29.1:3306/test"
            # 使用者名稱和密碼
            jdbc_user => "root"
            jdbc_password => "root"
            # 驅動
            jdbc_driver_library => "/opt/jar/mysql-connector-java-5.1.6.jar"
            # 驅動類名
            jdbc_driver_class => "com.mysql.jdbc.Driver"
            #處理中文亂碼問題
            codec => plain { charset => "UTF-8"}
            #使用其它欄位追蹤,而不是用時間
            use_column_value => true
            #追蹤的欄位(使用MySQL中的時間欄位而不使用自增的id欄位;因為自增的id欄位無法實現更新問題,因為後面需要更新的id值會小於last_run_metadata_path記錄的值而無法實現更新操作)
            #tracking_column => id
            tracking_column => event_date
            record_last_run => true
            #上一個sql_last_value值的存放檔案路徑, 必須要在檔案中指定欄位的初始值
            last_run_metadata_path => "/opt/data/station_parameter.txt"
            #開啟分頁查詢
            jdbc_paging_enabled => true
            jdbc_page_size => 50000
            #直接執行sql語句
            statement =>"select * from mdl_001 id > :sql_last_value"
            # 執行的sql 檔案路徑+名稱
            #statement_filepath => "/opt/data/jdbc.sql"
            # 設定監聽間隔  各欄位含義(由左至右)分、時、天、月、年,全部為*預設含義為每分鐘都更新
            schedule => "* * * * *"
            # 索引型別
            type => "jdbc"
        }
    }

file外掛

Redis外掛