Flume自定義Source
阿新 • • 發佈:2018-10-23
pat ise sts href xtend bstr code exit 轉換 模擬編寫了一個Flume 1.7中TAILDIR的功能實現,通過手動控制文件的讀取位置來達到對文件的讀寫,防止flume掛了之後重復消費的情況。
以下是代碼實現,僅做參考,生產上直接用TAILDIR讀取文件內容即可,若要讀取一個目錄下的子目錄,可使用github上以實現的這個項目包:https://github.com/qwurey/flume-source-taildir-recursive
以下是代碼實現,僅做參考,生產上直接用TAILDIR讀取文件內容即可,若要讀取一個目錄下的子目錄,可使用github上以實現的這個項目包:https://github.com/qwurey/flume-source-taildir-recursive
package com.fwmagic.flume.source; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDrivenSource; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.conf.Configurable; import org.apache.flume.event.EventBuilder; import org.apache.flume.source.AbstractSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * @Description:自定義Source 1、讀取指定目錄下的文件,如nginx的access.log * 2、讀取文件前先判斷offset文件是否存在,不存在則創建它 * 3、每次讀取完都寫一個offset文件記錄讀取到文件的什麽位置,防止重啟flume時發生重復消費的情況 * 4、如何自定義?參考ExecSource * <p> * (1):獲取自定義配置文件屬性 * (2):創建線程池,用channelProcessor發送數據給channel * (3):線程池提交(啟動任務) * 任務內容: * (1):讀取偏移量文件,沒有則創建,有則獲取偏移量,將讀取的指針重置到指定偏移量 * (2):讀取指定的日誌文件,將讀取的一行內容打包成Event,用Channel發送Event * (3):獲取讀取內容後的偏移量,重置偏移量 * (4):stop方法調用,關閉線程池,調用super.stop方法。 * @Date:Create in 2018/8/19 */ public class TailFileSource extends AbstractSource implements EventDrivenSource, Configurable { /*監聽的文件*/ private String filePath; /*記錄讀取偏移量的文件*/ private String posiFile; /*若讀取文件暫無內容,則等待數秒*/ private Long interval; /*讀寫文件的字符集*/ private String charset; /*讀取文件內容的線程*/ private FileRunner fileRunner; /*線程池*/ private ExecutorService executor; private static final Logger logger = LoggerFactory.getLogger(TailFileSource.class); /** * 初始化配置文件內容 * * @param context */ @Override public void configure(Context context) { filePath = context.getString("filePath"); posiFile = context.getString("posiFile"); interval = context.getLong("interval", 2000L); charset = context.getString("charset", "UTF-8"); } @Override public synchronized void start() { //啟動一個線程,用於監聽對應的日誌文件 //創建一個線程池 executor = Executors.newSingleThreadExecutor(); //用channelProcessor發送數據給channel ChannelProcessor channelProcessor = super.getChannelProcessor(); fileRunner = new FileRunner(filePath, posiFile, interval, charset, channelProcessor); executor.submit(fileRunner); super.start(); } @Override public synchronized void stop() { fileRunner.setFlag(Boolean.FALSE); while (!executor.isTerminated()) { logger.debug("waiting for exec executor service to stop"); try { executor.awaitTermination(500, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); logger.debug("Interrupted while waiting for executor service to stop,Just exiting."); Thread.currentThread().interrupt(); } } super.stop(); } public static class FileRunner implements Runnable { private Long interval; private String charset; private Long offset = 0L; private File pFile; private RandomAccessFile raf; private ChannelProcessor channelProcessor; private Boolean flag = Boolean.TRUE; public void setFlag(Boolean flag) { this.flag = flag; } public FileRunner(String filePath, String posiFile, Long interval, String charset, ChannelProcessor channelProcessor) { this.interval = interval; this.charset = charset; this.channelProcessor = channelProcessor; //1、判斷是否有偏移量文件,有則讀取偏移量,沒有則創建 pFile = new File(posiFile); if (!pFile.exists()) { try { pFile.createNewFile(); } catch (IOException e) { e.printStackTrace(); logger.error("create position file error!", e); } } //2、判斷偏移量中的文件內容是否大於0 try { String offsetStr = FileUtils.readFileToString(pFile, this.charset); // 3、如果偏移量文件中有記錄,則將內容轉換為Long if (StringUtils.isNotBlank(offsetStr)) { offset = Long.parseLong(offsetStr); } // 4、如果有偏移量,則直接跳到文件的偏移量位置 raf = new RandomAccessFile(filePath, "r"); // 跳到指定的位置 raf.seek(offset); } catch (IOException e) { e.printStackTrace(); logger.error("read position file error!", e); } } @Override public void run() { //監聽文件 while (flag) { // 讀取文件中的內容 String line = null; try { line = raf.readLine(); if (StringUtils.isNotBlank(line)) { // 把數據打包成Event,發送到Channel line = new String(line.getBytes("ISO-8859-1"), "UTF-8"); Event event = EventBuilder.withBody(line.getBytes()); channelProcessor.processEvent(event); //更新偏移量文件,把偏移量寫入文件 offset = raf.getFilePointer(); FileUtils.writeStringToFile(pFile, offset.toString()); } else { try { Thread.sleep(interval); } catch (InterruptedException e) { e.printStackTrace(); logger.error("thread sleep error", e); } } } catch (IOException e) { e.printStackTrace(); } } } } }
Flume自定義Source