1. 程式人生 > >flume自定義sink source

flume自定義sink source

@Override
public Status process() throws EventDeliveryException {
Status status = null;

Channel ch = getChannel();
Transaction txn = ch.getTransaction();  //flume的source+sink是在同一個事物中進行的,為了穩定性考慮
txn.begin();
try {
Event event = ch.take();
System.out.println(pro.get("ip")+" "+pro.getProperty("port")+" "+event);

txn.commit();
status = Status.READY;
} catch (Throwable t) {
txn.rollback();
status = status.BACKOFF;
if(t instanceof Error){
throw t;
}
}finally{
txn.close();
}
return status;
}


@Override
public void configure(Context context) {
//讀取配置資訊
String ip = context.getString("ip", "192.168.184.46");
String port = context.getString("port", "8086");

pro.setProperty("ip", ip);
pro.setProperty("port", port);

}