kafka connector 中的輕量級ETL-transfomation功能介紹
阿新 • • 發佈:2018-12-21
在kafka connector的使用中,可能因為各種原因(業務原因、connector需要key或者schema等)需要用到transfomation,處理訊息的內容。下面列舉了kafka connector 自帶的transfomation的功能,幫助大家瞭解一下,當然有能力也可以自己開發transfomation元件。
功能 | 轉換器名稱 | 轉換器型別 | 獨有配置 | 對應功能(修改kafka Message) |
插入欄位 | InsertField | org.apache.kafka.connect.transforms.${name}.(key/value) | offset.field |
新增offset記錄 |
partition.field | 新增對應kafka分割槽記錄 | |||
timestamp.field | 新增對應時間戳欄位 | |||
topic.field | 新增對應kafka分割槽欄位記錄 | |||
static.field | 新增寫死的記錄名 | |||
static.field | 新增寫死的記錄值 | |||
替換欄位 | ReplaceField | blacklist | 要丟棄的欄位名 | |
whitelist | 要改名的欄位列表 | |||
renames | 改名對應對映:old_fied1:new_filed1,old_fied2:new_filed2 |
|||
遮擋欄位 | MaskField | fields | 要加密(丟棄)的欄位列表 | |
包裝欄位(整個訊息包裝成一個欄位) | HoistField | field | 被包裝後的欄位名 | |
拆包欄位(只能取一個欄位) | ExtractField | field | 要取出來的欄位 | |
設定訊息Schema | SetSchemaMetadata | schema.name | 設定schema名稱 | |
schema.version | 設定schema版本 | |||
轉換timestamp欄位型別 | TimestampConverter | target.type |
目標timestamp型別(unix/Date/Time/Timestamp) | |
field | timestamp欄位名 | |||
format | yyyyMMdd | |||
給kafka訊息新增key | ValueToKey | fields | 將值的哪個欄位作為kafka訊息的key | |
用時間戳來改變目的表或檔名 | TimestampRouter | timestamp.format | 改變訊息的timestamp格式:yyyyMMdd | |
topic.format | ${topic}${timestamp}呼叫topic名和時間戳,生成欄位裡的新topic名 | |||
用正則表示式改變目的表或檔名 | RegexRouter | regex | 該topic名要匹配什麼正則:DC1-TEST-(.*) | |
replacement | 要代替的心topic名:$1 | |||
展開巢狀的資料介面 | Flatten | delimiter | 將巢狀的結構展開,指定的分隔符 | |
改變欄位的資料型別 | Cast | spec | 轉換欄位型別foo:int8,bar:float32 |