1. 程式人生 > 其它 >FlinkCDC採集資料格式

FlinkCDC採集資料格式

delete

SourceRecord
{
    sourcePartition={server=mysql_binlog_source}, 
    sourceOffset={ts_sec=1634898017, file=master.000007, pos=982176634, row=1, server_id=1, event=2}
} 
ConnectRecord
{
    topic='mysql_binlog_source.test.tbl_rule_config', kafkaPartition=null, key=Struct{id=5}, 
    keySchema=Schema{mysql_binlog_source.test.tbl_rule_config.Key:STRUCT}, 
    value
=Struct{before=Struct{id=5,source_db=test,source_table=demo5,source_columns_truncate=aa,sink_db=db_test,sink_table=demo5,sink_columns=pk,name,aa,is_create_sink_table=0,is_del=0}, source=Struct{version=1.2.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1634898017000,db=test,table=tbl_rule_config,server_id=1
,file=master.000007,pos=982176791,row=0,thread=1014574}, op=d, ts_ms=1634898015827}, valueSchema=Schema{mysql_binlog_source.test.tbl_rule_config.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=) }

insert

SourceRecord
{
    sourcePartition={server=mysql_binlog_source}, 
    sourceOffset
={ts_sec=1634898005, file=master.000007, pos=982165605, row=1, server_id=1, event=2} } ConnectRecord { topic='mysql_binlog_source.test.tbl_rule_config', kafkaPartition=null, key=Struct{id=5}, keySchema=Schema{mysql_binlog_source.test.tbl_rule_config.Key:STRUCT}, value=Struct{after=Struct{id=5,source_db=test,source_table=demo5,source_columns_truncate=aa,sink_db=db_test,sink_table=demo5,sink_columns=pk,name,aa,is_create_sink_table=0,is_del=0}, source=Struct{version=1.2.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1634898005000,db=test,table=tbl_rule_config,server_id=1,file=master.000007,pos=982165762,row=0,thread=1014574}, op=c, ts_ms=1634898004110}, valueSchema=Schema{mysql_binlog_source.test.tbl_rule_config.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=) }

update

SourceRecord
{
    sourcePartition={server=mysql_binlog_source}, 
    sourceOffset={ts_sec=1634898199, file=master.000007, pos=982196336, row=1, server_id=1, event=2}
} 
ConnectRecord
{
    topic='mysql_binlog_source.test.tbl_rule_config', 
    kafkaPartition=null, 
    key=Struct{id=4}, 
    keySchema=Schema{mysql_binlog_source.test.tbl_rule_config.Key:STRUCT}, 
    value=Struct{before=Struct{id=4,source_db=test,source_table=demo4,source_columns_truncate=log,content,source_columns_fillin=create_time@2099-01-01 00:00:00,sink_db=db_test,sink_table=demo4,sink_columns=pk,name,age,log,content,create_time,update_time,is_create_sink_table=0,is_del=0},after=Struct{id=4,source_db=test,source_table=demo4,source_columns_truncate=log,content,source_columns_fillin=create_time@2099-01-01 00:00:00,sink_db=db_test,sink_table=demo4,sink_columns=pk,name,age,log,content,create_time,update_time,is_create_sink_table=0,is_del=1},source=Struct{version=1.2.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1634898199000,db=test,table=tbl_rule_config,server_id=1,file=master.000007,pos=982196493,row=0,thread=1014574},op=u,ts_ms=1634898198166}, 
    valueSchema=Schema{mysql_binlog_source.test.tbl_rule_config.Envelope:STRUCT}, 
    timestamp=null, 
    headers=ConnectHeaders(headers=)
}
自定義反序列化器,將data資料轉換為json格式
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

/**
 * Author LYM
 * Description 自定義反序列化器,將data資料轉換為json格式
 * 原格式:id=1,name=liuxintong,age=17
 * 目標格式:{"id":74603,"order_id":28641,"order_status":"1005","operate_time":"2021-07-30 11:35:49"}
 * Date 2021/10/18 17:25
 * Version 1.0
 */
public class FlinkCdcDataDeserializationSchema implements DebeziumDeserializationSchema<String> {
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {

        Struct valueStruct = (Struct)sourceRecord.value();
        Struct sourceStruct = valueStruct.getStruct("source");

        //獲取資料庫名稱,表名,操作型別
        String database = sourceStruct.getString("db");
        String table = sourceStruct.getString("table");
        String type = Envelope.operationFor(sourceRecord).toString().toLowerCase();

        if (type.equals("create")) type="insert";
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("database",database);
        jsonObject.put("table",table);
        jsonObject.put("type",type);

        //格式轉換
        Struct beforeStruct = valueStruct.getStruct("before");
        JSONObject beforeDataJson = new JSONObject();
        if (beforeStruct != null) {
            for (Field field : beforeStruct.schema().fields()) {
                beforeDataJson.put(field.name(),beforeStruct.get(field));
            }
        }

        Struct afterStruct = valueStruct.getStruct("after");
        JSONObject afterDataJson = new JSONObject();
        if (afterStruct != null) {
            for (Field field : afterStruct.schema().fields()) {
                afterDataJson.put(field.name(),afterStruct.get(field));
            }
        }

        jsonObject.put("beforeData",beforeDataJson);
        jsonObject.put("afterData",afterDataJson);

        //向下遊傳遞資料
        collector.collect(jsonObject.toJSONString());

    }

    @Override
    public TypeInformation<String> getProducedType() {
        return TypeInformation.of(String.class);
    }
}