flume自定義Interceptor的UUID和其他邏輯處理
阿新 • • 發佈:2019-01-04
package com.meme.flume.interceptor; import com.google.common.base.Charsets; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import java.util.UUID; /** * Created by root on 7/3/18. */ public class MyInterceptor implements Interceptor { private static final Logger logger = LoggerFactory .getLogger(MyInterceptor.class); public MyInterceptor(){ } @Override public void initialize() { } @Override 可以在該方法裡寫自己攔截器的邏輯 我這裡就只添加了UUID public Event intercept(Event event) { try{ String body = new String(event.getBody(), Charsets.UTF_8); StringBuffer bodyoutput = new StringBuffer(); bodyoutput.append(body+","+ UUID.randomUUID()); event.setBody(bodyoutput.toString().getBytes()); }catch (Exception e){ logger.error("interceptor failure"); } return event; } @Override public List<Event> intercept(List<Event> list) { for (Event event : list) { intercept(event); } return list; } @Override public void close() { }//Builder類中例項化interceptor public static class Builder implements Interceptor.Builder { //使用Builder初始化Interceptor @Override public Interceptor build() { return new MyInterceptor(); } @Override public void configure(Context context) { } } public static void main(String[] args) { System.out.println("ok"); } }
然後打包放到flume的lib目錄裡就可以
自定義的maven依賴
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
攔截器的配置:
agent.sources.r1.interceptors = i1
agent.sources.r1.interceptors.i1.type = com.meme.flume.interceptor.MyInterceptor$Builder