自定義flume的攔截器,提取body中的時間作為header
package com.springboot.MongoDB.flume;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.codec.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import com.alibaba.fastjson.JSON;
/**
* 自定義flume的攔截器,提取body中的時間作為header
*/
public class kafkaInterceptor implements Interceptor {
private final boolean preserveExisting;
private kafkaInterceptor(boolean preserveExisting) {
this.preserveExisting = preserveExisting;
}
public void initialize() {
}
private ThreadLocal<SimpleDateFormat> formatCache = new ThreadLocal<SimpleDateFormat>() {
@Override
protected SimpleDateFormat initialValue() {
return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
}
};
public Event intercept(Event event) {
if(event==null) {
return null;
}
Map<String, String> headers = event.getHeaders();
byte[] body = event.getBody();
String jsonIn = new String(body,Charsets.UTF_8);
//filePath="hdfs://hdp1.hdp:8020/IOT/"+new Integer(orgId)%100+"/"+orgId+"/"+filePathUrl.substring(filePathUrl.indexOf("*")+2,filePathUrl.length());
//設定頭${}
CacheData cache = JSON.parseObject(jsonIn, CacheData.class);
headers.put(FlumeConstants.TIMESTAMP,formatCache.get().format(Long.parseLong(cache.getCreateTime())));
headers.put(FlumeConstants.orgId,cache.getOrgId());
headers.put(FlumeConstants.orgId100,Integer.toString(new Integer(cache.getOrgId())%100));
headers.put(FlumeConstants.productline,cache.getProductline());
headers.put(FlumeConstants.createTime,formatCache.get().format(Long.parseLong(cache.getCreateTime())));
headers.put(FlumeConstants.t,formatCache.get().format(Long.parseLong(cache.getT())));
//對資料加工處理
cache.setCreateTime(formatCache.get().format(Long.parseLong(cache.getCreateTime())));
cache.setT(formatCache.get().format(Long.parseLong(cache.getT())));
String jsonOut = JSON.toJSONString(cache);
event.setBody(jsonOut.getBytes(Charsets.UTF_8));
return event;
}
private Map parseData(String str){
HashMap map = JSON.parseObject(str, HashMap.class);
String createTime = String.valueOf(map.get("createTime"));
String t = String.valueOf(map.get("t"));
formatCache.get().format(new Date(Long.parseLong(createTime)));
map.put("createTime", formatCache.get().format(Long.parseLong(createTime)));
map.put("t", formatCache.get().format(Long.parseLong(t)));
return map;
}
public static class Builder implements Interceptor.Builder {
private boolean preserveExisting = FlumeConstants.PRESERVE_DFLT;
public Interceptor build() {
return new kafkaInterceptor(preserveExisting);
}
public void configure(Context context) {
//載入配置檔案屬性
preserveExisting = context.getBoolean(FlumeConstants.PRESERVE, FlumeConstants.PRESERVE_DFLT);
}
}
public List<Event> intercept(List<Event> events) {
if (events == null) return null;
List<Event> outList =new ArrayList<>();
for (Event event : events) {
Event outEvent = intercept(event);
if(outEvent!=null) {
outList.add(outEvent);
}
}
return outList;
}
public void close() {
}
}