1. 程式人生 > 其它 >Flume自定義攔截器通過可配置欄位實現分割槽

Flume自定義攔截器通過可配置欄位實現分割槽

1. 功能說明

通過自定義攔截器實現使用資料中的時間欄位作為分割槽。
比如如下資料:

{
   "type":"token_transfer",
   "token_address":"0x4774fed3f2838f504006be53155ca9cbddee9f0c",
   "from_address":"0xf6d6d1a0a0ab371bcda382e8069cbcaece713b3c",
   "to_address":"0x25e2e5d8ecc4fe46a9505079ed29266779dc7d6f",
   "value":1000000000000000,
   "transaction_hash":"0x6fd37836e1b70b4dbe8cfdaf5bace39df5281a813ea3ffe74d2f521d4222e01b",
   "log_index":5,
   "block_number":6851636,
   "block_timestamp":1604991499,
   "block_hash":"0xce8dcb6e82b6026b81fd9e720d7524d0e46cfb788751fded363dc5798ae29d86",
   "item_id":"token_transfer_0x6fd37836e1b70b4dbe8cfdaf5bace39df5281a813ea3ffe74d2f521d4222e01b_5",
   "item_timestamp":"2020-11-10T06:58:19Z"
}

2. 配置方法

說明:從時間欄位中提取年、月、日、小時,作為分割槽欄位
注意:
(1)訊息體必須是JSON格式
(2)時間欄位在JSON的第一層

如果時間欄位型別是時間戳, 如JSON格式的訊息中存在時間戳欄位currentTimestamp

test2.sources.s1.interceptors = i1
test2.sources.s1.interceptors.i1.type = com.apache.flume.interceptor.AutoPartitionInterceptor$Builder
test2.sources.s1.interceptors.i1.timestampField = currentTimestamp:GMT+8

timestampField: 標識是時間戳格式的欄位
currentTimestamp: JSON串中的欄位名
:GMT+8: 可選,預設使用GMT+0,可以自定義時間戳的時區

如果時間欄位型別是年月日,如JSON格式的訊息中存在日期欄位datetime

test2.sources.s1.interceptors = i1
test2.sources.s1.interceptors.i1.type = com.apache.flume.interceptor.AutoPartitionInterceptor$Builder
test2.sources.s1.interceptors.i1.datetimeField = datetime

datetimeField: 標識是日期時間格式的欄位,注意,這裡只支援yyyy-MM-dd HH:mm:ss
datetime: JSON串中的欄位名

3. 測試

3.1 準備資料

{
   "type":"token_transfer",
   "token_address":"0x4774fed3f2838f5043155ca9cbddee9f0c",
   "from_address":"0xf6d6d1a0a0ab371be8069cbcaece713b3c",
   "to_address":"0x25e2e5d8ecc4fe49ed29266779dc7d6f",
   "value":1000000000000000,
   "transaction_hash":"0x6fd37835281a813ea3ffe74d2f521d4222e01b",
   "log_index":5,
   "block_number":6851636,
   "block_timestamp":1604991499,
   "block_hash":"0xce8dcb6e82b6026b81fd751fded363dc5798ae29d86",
   "item_id":"token_transfer_0x6fd37836e1b70b4dbe881a813ea3ffe74d2f521d4222e01b_5",
   "item_timestamp":"2020-11-10T06:58:19Z"
}

這裡使用block_timestamp的時間戳作為分割槽欄位,並且時區使用GMT+8時區。

3.2 flume配置conf檔案

test2.sources = s1
test2.sinks = k1
test2.channels = c1
test2.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
test2.sources.s1.batchSize = 100
test2.sources.s1.batchDurationMillis = 3000
test2.sources.s1.kafka.bootstrap.servers = hadoop200:9092,hadoop201:9092,hadoop202:9092
test2.sources.s1.kafka.topics = test
test2.sources.s1.kafka.consumer.group.id = bigdata
test2.sources.s1.kafka.consumer.auto.offset.reset = latest
test2.sources.s1.kafka.consumer.auto.commit.enable = false
test2.sources.s1.kafka.consumer.timeout.ms = 15000
test2.sources.s1.kafka.consumer.fetch.max.wait.ms = 5000
test2.sources.s1.kafka.consumer.max.poll.records = 100
test2.sources.s1.kafka.consumer.max.poll.interval.ms = 3000000


# 提取pk_year,pk_month,pk_day,pk_hour
test2.sources.s1.interceptors = i1
test2.sources.s1.interceptors.i1.type = com.apache.flume.interceptor.AutoPartitionInterceptor$Builder
test2.sources.s1.interceptors.i1.timestampField = block_timestamp:GMT+8

test2.sinks.k1.type = hdfs
test2.sinks.k1.hdfs.path = hdfs://hadoop200:8020//user/hive/warehouse/dt_ods.db/ods_test2/pk_year=%{pk_year}/pk_month=%{pk_month}/pk_day=%{pk_day}/pk_hour=%{pk_hour}
test2.sinks.k1.hdfs.filePrefix = test2
test2.sinks.k1.hdfs.fileSufix = .log
test2.sinks.k1.hdfs.useLocalTimeStamp = true
test2.sinks.k1.hdfs.batchSize = 500
test2.sinks.k1.hdfs.fileType = DataStream
test2.sinks.k1.hdfs.writeFormat = Text
test2.sinks.k1.hdfs.rollSize = 2147483648
test2.sinks.k1.hdfs.rollInterval = 0
test2.sinks.k1.hdfs.rollCount = 0
test2.sinks.k1.hdfs.idleTimeout = 120
test2.sinks.k1.hdfs.minBlockReplicas = 1
test2.channels.c1.type = file
test2.channels.c1.checkpointDir = /home/hadoop/test/flume_job/chkDir/test2
test2.channels.c1.dataDirs = /home/hadoop/test/flume_job/dataDir/test2
test2.sources.s1.channels = c1
test2.sinks.k1.channel = c1

注意以下幾個地方:
test2.sources.s1.interceptors.i1.type: 配置自定義攔截器的名稱
test2.sources.s1.interceptors.i1.timestampField: 配置字串中的時間戳欄位
test2.sinks.k1.hdfs.path: 後面的值中年月日的提取格式是%{pk_year}

3.3 測試

通過向kafka中傳送上面的樣例訊息,在日誌中輸出如下:

在hdfs中可以看到分割槽時間,而樣例中時間戳1604991499在GMT+8時區對應的日期是2020-11-10 14:58:19,所以分割槽正確。

4. 原始碼實現

public class AutoPartitionInterceptor implements Interceptor {

    Logger log = LoggerFactory.getLogger(this.getClass());
    JsonParser parser = null;
    DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    private String datetime_field = null;
    private String timestamp_field = null;

    public AutoPartitionInterceptor(String datetimeField, String timestampField){
        this.datetime_field = datetimeField;
        this.timestamp_field = timestampField;
    }

    @Override
    public void initialize() {
        parser = new JsonParser();
    }

    @Override
    public Event intercept(Event event) {
        //獲取資料body
        byte[] body = event.getBody();
        Map<String,String> headerMap = event.getHeaders();

       if(null != timestamp_field && !"".equals(timestamp_field)){
            String[] fieldAndZoneId = timestamp_field.split(":");
            String fieldName = fieldAndZoneId[0];
            String zoneId = fieldAndZoneId.length == 2 ? fieldAndZoneId[1] : "GMT+0";
            parseTimestamp2Header(body,fieldName,zoneId,headerMap);
        }else if(null != datetime_field && !"".equals(datetime_field)){
            parseDatetime2Header(body,datetime_field,headerMap);
        }

       return event;
    }

    /**
     * 根據時區,將時間戳轉換為年月日時分秒
     * @param time
     * @param zoneId
     * @return
     */
    private String getDateTimeByZoneId(String time,String zoneId){
            int len = time.length();
            long ts = 0;
            if(len == 10){
                ts = Long.valueOf(time) * 1000;
            }else if(len == 13){
                ts = Long.valueOf(time);
            }else {
                ts = Long.valueOf(time.substring(0,10)) * 1000;
            }
        return dateTimeFormatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneId.of(zoneId)));
    }


    /**
     * 向Header中新增分割槽欄位pk_year,pk_month,pk_day,pk_hour
     * @param body
     * @param datetimeFieldName
     * @param headerMap
     */
    private void parseDatetime2Header(byte[] body,String datetimeFieldName,Map<String,String> headerMap){
        String str = new String(body, Charsets.UTF_8);
        JsonElement element = parser.parse(str);
        try{
            JsonObject root = element.getAsJsonObject();
            log.debug(str);

            String dateTime = root.get(datetimeFieldName).getAsString().trim();
            String pk_day = dateTime.substring(0,10);
            String pk_year = pk_day.substring(0,4);
            String pk_month = pk_day.substring(0,7);

            //設定Header
            headerMap.put("pk_year",pk_year);
            headerMap.put("pk_month",pk_month);
            headerMap.put("pk_day",pk_day);

            if(dateTime.length() >= 13){
                String pk_hour = pk_day + "_" + dateTime.substring(11,13);
                headerMap.put("pk_hour",pk_hour);
            }


        }catch (Exception e){
            log.error(str);
            log.error(e.getMessage());
            e.printStackTrace();
        }
    }


    /**
     * 向Header中新增分割槽欄位pk_year,pk_month,pk_day,pk_hour
     * @param body
     * @param timestampFieldName
     * @param zoneId
     * @param headerMap
     */
    private void parseTimestamp2Header(byte[] body,String timestampFieldName,String zoneId,Map<String,String> headerMap){
        String str = new String(body, Charsets.UTF_8);
        JsonElement element = parser.parse(str);
        try{
            JsonObject root = element.getAsJsonObject();
            String timestamp = root.get(timestampFieldName).getAsString().trim();
            String dateTime = getDateTimeByZoneId(timestamp, zoneId);
            String pk_day = dateTime.substring(0,10);
            String pk_year = pk_day.substring(0,4);
            String pk_month = pk_day.substring(0,7);
            String pk_hour = pk_day + "_" + dateTime.substring(11,13);
            //設定Header
            headerMap.put("pk_year",pk_year);
            headerMap.put("pk_month",pk_month);
            headerMap.put("pk_day",pk_day);
            headerMap.put("pk_hour",pk_hour);

        }catch (Exception e){
            log.error(str);
            log.error(e.getMessage());
            e.printStackTrace();
        }
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        for (Event event : events) {
            intercept(event);
        }
        return events;
    }

    @Override
    public void close() {

    }
    public static class Builder implements  Interceptor.Builder{

        private String datetimeField = null;
        private String timestampField = null;

        @Override
        public Interceptor build() {

            return new AutoPartitionInterceptor(datetimeField,timestampField);
        }

        @Override
        public void configure(Context context) {
            datetimeField = context.getString("datetimeField");
            timestampField = context.getString("timestampField");
        }
    }

}