1. 程式人生 > >自定義flume的攔截器,提取body中的時間作為header

自定義flume的攔截器,提取body中的時間作為header

package com.springboot.MongoDB.flume;

import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.codec.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import com.alibaba.fastjson.JSON;

/**
 * 自定義flume的攔截器,提取body中的時間作為header
 */
public class kafkaInterceptor implements Interceptor {

    private final boolean preserveExisting;

    private kafkaInterceptor(boolean preserveExisting) {
        this.preserveExisting = preserveExisting;
    }

    public void initialize() {

    }
    private  ThreadLocal<SimpleDateFormat> formatCache = new ThreadLocal<SimpleDateFormat>() {
          @Override
          protected SimpleDateFormat initialValue() {
              return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
          }
      };
    public Event intercept(Event event) {
        if(event==null) {
            return null;
        }
        Map<String, String> headers = event.getHeaders();
        byte[] body = event.getBody();
        String jsonIn = new String(body,Charsets.UTF_8);
        //filePath="hdfs://hdp1.hdp:8020/IOT/"+new Integer(orgId)%100+"/"+orgId+"/"+filePathUrl.substring(filePathUrl.indexOf("*")+2,filePathUrl.length());
        //設定頭${}
        CacheData cache = JSON.parseObject(jsonIn, CacheData.class);
        headers.put(FlumeConstants.TIMESTAMP,formatCache.get().format(Long.parseLong(cache.getCreateTime())));
        headers.put(FlumeConstants.orgId,cache.getOrgId());
        headers.put(FlumeConstants.orgId100,Integer.toString(new Integer(cache.getOrgId())%100));
        headers.put(FlumeConstants.productline,cache.getProductline());
        headers.put(FlumeConstants.createTime,formatCache.get().format(Long.parseLong(cache.getCreateTime())));
        headers.put(FlumeConstants.t,formatCache.get().format(Long.parseLong(cache.getT())));
        //對資料加工處理
        cache.setCreateTime(formatCache.get().format(Long.parseLong(cache.getCreateTime())));
        cache.setT(formatCache.get().format(Long.parseLong(cache.getT())));
        String jsonOut = JSON.toJSONString(cache);
        event.setBody(jsonOut.getBytes(Charsets.UTF_8));
        return event;
    }

    private Map parseData(String str){
        HashMap map = JSON.parseObject(str, HashMap.class);
        String createTime = String.valueOf(map.get("createTime"));
        String t = String.valueOf(map.get("t"));
        formatCache.get().format(new Date(Long.parseLong(createTime)));
        map.put("createTime", formatCache.get().format(Long.parseLong(createTime)));
        map.put("t", formatCache.get().format(Long.parseLong(t)));
        return map;
    }

    public static class Builder implements Interceptor.Builder {

        private boolean preserveExisting = FlumeConstants.PRESERVE_DFLT;
        
        public Interceptor build() {
            return new kafkaInterceptor(preserveExisting);
        }

        public void configure(Context context) {
            //載入配置檔案屬性
            preserveExisting = context.getBoolean(FlumeConstants.PRESERVE, FlumeConstants.PRESERVE_DFLT);
        }

    }
    public List<Event> intercept(List<Event> events) {
        if (events == null) return null;
        List<Event> outList =new ArrayList<>();
        for (Event event : events) {
            Event outEvent = intercept(event);
            if(outEvent!=null) {
                outList.add(outEvent);
            }
        }
        return outList;
    }

    public void close() {

    }

}