FlinkCDC採集資料格式
阿新 • • 發佈:2021-10-27
delete
SourceRecord { sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1634898017, file=master.000007, pos=982176634, row=1, server_id=1, event=2} } ConnectRecord { topic='mysql_binlog_source.test.tbl_rule_config', kafkaPartition=null, key=Struct{id=5}, keySchema=Schema{mysql_binlog_source.test.tbl_rule_config.Key:STRUCT}, value=Struct{before=Struct{id=5,source_db=test,source_table=demo5,source_columns_truncate=aa,sink_db=db_test,sink_table=demo5,sink_columns=pk,name,aa,is_create_sink_table=0,is_del=0}, source=Struct{version=1.2.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1634898017000,db=test,table=tbl_rule_config,server_id=1,file=master.000007,pos=982176791,row=0,thread=1014574}, op=d, ts_ms=1634898015827}, valueSchema=Schema{mysql_binlog_source.test.tbl_rule_config.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=) }
insert
SourceRecord { sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1634898005, file=master.000007, pos=982165605, row=1, server_id=1, event=2} } ConnectRecord { topic='mysql_binlog_source.test.tbl_rule_config', kafkaPartition=null, key=Struct{id=5}, keySchema=Schema{mysql_binlog_source.test.tbl_rule_config.Key:STRUCT}, value=Struct{after=Struct{id=5,source_db=test,source_table=demo5,source_columns_truncate=aa,sink_db=db_test,sink_table=demo5,sink_columns=pk,name,aa,is_create_sink_table=0,is_del=0}, source=Struct{version=1.2.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1634898005000,db=test,table=tbl_rule_config,server_id=1,file=master.000007,pos=982165762,row=0,thread=1014574}, op=c, ts_ms=1634898004110}, valueSchema=Schema{mysql_binlog_source.test.tbl_rule_config.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=) }
update
SourceRecord { sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1634898199, file=master.000007, pos=982196336, row=1, server_id=1, event=2} } ConnectRecord { topic='mysql_binlog_source.test.tbl_rule_config', kafkaPartition=null, key=Struct{id=4}, keySchema=Schema{mysql_binlog_source.test.tbl_rule_config.Key:STRUCT}, value=Struct{before=Struct{id=4,source_db=test,source_table=demo4,source_columns_truncate=log,content,source_columns_fillin=create_time@2099-01-01 00:00:00,sink_db=db_test,sink_table=demo4,sink_columns=pk,name,age,log,content,create_time,update_time,is_create_sink_table=0,is_del=0},after=Struct{id=4,source_db=test,source_table=demo4,source_columns_truncate=log,content,source_columns_fillin=create_time@2099-01-01 00:00:00,sink_db=db_test,sink_table=demo4,sink_columns=pk,name,age,log,content,create_time,update_time,is_create_sink_table=0,is_del=1},source=Struct{version=1.2.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1634898199000,db=test,table=tbl_rule_config,server_id=1,file=master.000007,pos=982196493,row=0,thread=1014574},op=u,ts_ms=1634898198166}, valueSchema=Schema{mysql_binlog_source.test.tbl_rule_config.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=) }
自定義反序列化器,將data資料轉換為json格式
import com.alibaba.fastjson.JSONObject; import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema; import io.debezium.data.Envelope; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.Collector; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; /** * Author LYM * Description 自定義反序列化器,將data資料轉換為json格式 * 原格式:id=1,name=liuxintong,age=17 * 目標格式:{"id":74603,"order_id":28641,"order_status":"1005","operate_time":"2021-07-30 11:35:49"} * Date 2021/10/18 17:25 * Version 1.0 */ public class FlinkCdcDataDeserializationSchema implements DebeziumDeserializationSchema<String> { @Override public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception { Struct valueStruct = (Struct)sourceRecord.value(); Struct sourceStruct = valueStruct.getStruct("source"); //獲取資料庫名稱,表名,操作型別 String database = sourceStruct.getString("db"); String table = sourceStruct.getString("table"); String type = Envelope.operationFor(sourceRecord).toString().toLowerCase(); if (type.equals("create")) type="insert"; JSONObject jsonObject = new JSONObject(); jsonObject.put("database",database); jsonObject.put("table",table); jsonObject.put("type",type); //格式轉換 Struct beforeStruct = valueStruct.getStruct("before"); JSONObject beforeDataJson = new JSONObject(); if (beforeStruct != null) { for (Field field : beforeStruct.schema().fields()) { beforeDataJson.put(field.name(),beforeStruct.get(field)); } } Struct afterStruct = valueStruct.getStruct("after"); JSONObject afterDataJson = new JSONObject(); if (afterStruct != null) { for (Field field : afterStruct.schema().fields()) { afterDataJson.put(field.name(),afterStruct.get(field)); } } jsonObject.put("beforeData",beforeDataJson); jsonObject.put("afterData",afterDataJson); //向下遊傳遞資料 collector.collect(jsonObject.toJSONString()); } @Override public TypeInformation<String> getProducedType() { return TypeInformation.of(String.class); } }