flume自定義interceptor和hbase sink
阿新 • • 發佈:2019-01-01
在flume的實際應用中,可能會遇到對日誌進行簡單的過濾和處理。flume在source端有其內建的interceptor類可以對主機、IP、靜態標記做處理,如果想自定義處理邏輯該如何處理?在不規則的日誌資料進入hbase之前想做處理又該如何處理?
1.自定義source
在eclipse(或Myeclipse)中,引入flume的jar包(下載flume解壓後的lib目錄中),編寫自定義類,實現Interceptor類,重寫public Event intercept(Event event) 和
<pre name="code" class="java"><span style="font-family:Microsoft YaHei;">public List<Event> intercept(List<Event> events),其中在第一個方法裡編寫自己的處理邏輯,日誌資料是位元組陣列形式存在body裡的,要處理日誌資料,需要先將其轉化</span>
<span style="font-family:Microsoft YaHei;">為字串。後一個方法是批量處理event,其實就是呼叫前一個方法,最後一段程式碼是例項化你編寫的類。完整原始碼如下:</span>
編寫完成後,將程式碼打包如下圖所示:<span style="font-family:Microsoft YaHei;">import java.util.List; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import com.google.common.base.Charsets; import com.google.common.collect.Lists; public class AccessLogInterceptor implements Interceptor { @Override public void close() { // TODO Auto-generated method stub } @Override public void initialize() { // TODO Auto-generated method stub } @Override public Event intercept(Event event) { // TODO Auto-generated method stub StringBuffer sb = new StringBuffer(); String body = new String(event.getBody(), Charsets.UTF_8); String[] fields = body.split("|"); int i = 1; for (String field : fields) { sb.append(i + field); i++; } event.setBody(sb.toString().getBytes()); return event; } @Override public List<Event> intercept(List<Event> events) { // TODO Auto-generated method stub List<Event> intercepted = Lists.newArrayListWithCapacity(events.size()); for (Event event : events) { Event interceptedEvent = intercept(event); if (interceptedEvent != null) { intercepted.add(interceptedEvent); } } return intercepted; } public static class Builder implements Interceptor.Builder { // 使用Builder初始化Interceptor @Override public Interceptor build() { return new AccessLogInterceptor(); } @Override public void configure(Context context) { } } }</span>
上傳到flume的lib目錄下:
修改配置檔案client.properties.properties(我的flume配置檔名,參照自己的配置檔案修改):
啟動你的flume就OK了。
2.自定義hbase sink
要對進如hbase的日誌做預處理,以便於使用和檢視。在上述的工程中新建立一個包,在其中編寫處理類。原始碼如下:
打包放入到lib目錄下(同上),配置檔案如下:import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.List; import java.util.Locale; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.conf.ComponentConfiguration; import org.apache.flume.sink.hbase.AsyncHbaseEventSerializer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.log4j.Logger; import org.hbase.async.AtomicIncrementRequest; import org.hbase.async.PutRequest; public class AsyncHbaseLogEventSerializer implements AsyncHbaseEventSerializer { private byte[] table; private byte[] colFam; private byte[][] columnNames; private Event currentEvent; private final List<PutRequest> puts = new ArrayList<PutRequest>(); private final List<AtomicIncrementRequest> incs = new ArrayList<AtomicIncrementRequest>(); private byte[] currentRowKey; private final byte[] eventCountCol = "eventCount".getBytes(); private static Logger log = Logger.getLogger(AsyncHbaseLogEventSerializer.class); //初始化工作 @Override public void initialize(byte[] table, byte[] cf) { // TODO Auto-generated method stub this.table = table; this.colFam = cf; } //讀取flume配置檔案內容,包括列名,rowkey字尾等資訊 @Override public void configure(Context context) { // TODO Auto-generated method stub String cols = new String(context.getString("columns")); String[] names = cols.split(","); columnNames = new byte[names.length][]; int i = 0; for (String name : names) { log.info("列名是:"+name); columnNames[i++] = name.getBytes(); } } @Override public List<PutRequest> getActions() { // TODO Auto-generated method stub // Split the event body and get the values for the columns String eventStr = new String(currentEvent.getBody()); String[] cols = logTokenize(eventStr); puts.clear(); String req = cols[4]; String reqPath = req.split(" ")[1]; int pos = reqPath.indexOf("?"); if (pos > 0) { reqPath = reqPath.substring(0, pos); } if (reqPath.length() > 1 && reqPath.trim().endsWith("/")) { reqPath = reqPath.substring(0, reqPath.length() - 1); } String req_ts_str = cols[3]; Long currTime = System.currentTimeMillis(); String currTimeStr = null; if (req_ts_str != null && !req_ts_str.equals("")) { SimpleDateFormat df = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US); SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); try { currTimeStr = df2.format(df.parse(req_ts_str)); currTime = df.parse(req_ts_str).getTime(); } catch (ParseException e) { System.out .println("parse req time error,using system.current time."); } } long revTs = Long.MAX_VALUE - currTime; currentRowKey = (Long.toString(revTs) + reqPath).getBytes(); System.out.println("currentRowKey: " + new String(currentRowKey)); for (int i = 0; i < cols.length; i++) { PutRequest putReq = new PutRequest(table, currentRowKey, colFam, columnNames[i], cols[i].getBytes()); puts.add(putReq); } // 增加列 PutRequest reqPathPutReq = new PutRequest(table, currentRowKey, colFam, "req_path".getBytes(), reqPath.getBytes()); puts.add(reqPathPutReq); PutRequest reqTsPutReq = new PutRequest(table, currentRowKey, colFam, "req_ts".getBytes(), Bytes.toBytes(currTimeStr)); puts.add(reqTsPutReq); //String channelType = ChannelUtil.getType(cols[8]); String channelType = "abc"; PutRequest channelPutReq = new PutRequest(table, currentRowKey, colFam, "req_chan".getBytes(), Bytes.toBytes(channelType)); puts.add(channelPutReq); return puts; } @Override public List<AtomicIncrementRequest> getIncrements() { // TODO Auto-generated method stub incs.clear(); incs.add(new AtomicIncrementRequest(table, "totalEvents".getBytes(), colFam, eventCountCol)); return incs; } @Override public void setEvent(Event event) { // TODO Auto-generated method stub this.currentEvent = event; } @Override public void configure(ComponentConfiguration arg0) { // TODO Auto-generated method stub } @Override public void cleanUp() { // TODO Auto-generated method stub table = null; colFam = null; currentEvent = null; columnNames = null; currentRowKey = null; } public String[] logTokenize(String eventStr) { String logEntryPattern = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+|-) \"([^\"]+)\" \"([^\"]+)\""; Pattern p = Pattern.compile(logEntryPattern); Matcher matcher = p.matcher(eventStr); if (!matcher.matches()) { System.err.println("Bad log entry (or problem with RE?):"); System.err.println(eventStr); return null; } String[] columns = new String[matcher.groupCount()]; for (int i = 0; i < matcher.groupCount(); i++) { columns[i] = matcher.group(i + 1); } return columns; } }
重啟flume即可。