1. 程式人生 > >flume ng進擊之路 (三) —— 自定義source API開發

flume ng進擊之路 (三) —— 自定義source API開發

概述

關於flume ng的簡單介紹,可以參考flume ng進擊之路 (一)—— 入門,同時flume ng也提供了各種各樣的source和sink介面供我們在生成環境中使用,但是在生產環境中,我們常常需要定製的source或者sink來滿足我們的要求。

好在flume ng提供了開放介面,我們可以根據這些介面,實現自己定製的source或者sink。下面我們來看一下如何實現自定義source框架。

實現

maven依賴

首先,要根據flume ng提供的介面來實現自定義source,需要我們依賴flume ng的配置,我們引入兩個配置flume-ng-core和flume-ng-configuration,具體的maven配置如下:

        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flume</groupId
>
<artifactId>flume-ng-configuration</artifactId> <version>1.6.0</version> </dependency>

source都包含哪些東西

在正式開始之前,我們先看看官網上一個source是如何使用的。我們在使用source的時候,只需要簡單的配置一個檔案,比如我們看看官網的Spooling Directory Source是如何監控一個資料夾的檔案變化並且抓取的。
簡單的配置如下:

a1.channels
= ch-1 a1.sources = src-1 a1.sources.src-1.type = spooldir a1.sources.src-1.channels = ch-1 a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool a1.sources.src-1.fileHeader = true

這是配置一個Spooling Directory Source的最簡單的配置,沒有填寫的配置會使用預設值,參考官網。
從上面可以看到,如果要實現自定義的source,我們要處理相應的自定義配置。
另外,我們知道flume是將訊息(採集以及傳輸)封裝成Event來實現通用source,channel和sink的組合,因此,要實現自定義source,必須要處理Event的來源。

自定義source

有了上面的簡單瞭解,我們可以開始我們的自定義開發工作了。從上面可以知道,我們需要處理兩個關鍵問題:配置和Event處理。當然是通過實現flume提供的介面來處理。

我們來實現一個自動間隔傳送test文字資訊的source
程式碼組織結構如下:
這裡寫圖片描述
具體程式碼如下:
MySourceEventReader.java程式碼如下:

package flume.plugin;

import com.google.common.collect.Lists;
import org.apache.flume.Event;
import org.apache.flume.client.avro.ReliableEventReader;
import org.apache.flume.event.EventBuilder;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;

public class MySourceEventReader  implements ReliableEventReader {
    private final Charset outputCharset = Charset.forName("UTF-8");
    // 標識是否source的event資訊是否已提交到channel
    private boolean committed = true; // true,已提交,false,有資訊待提交

    @Override
    public void close() throws IOException {
        /**
         * 執行關閉相關資源操作
         */
    }

    @Override
    public void commit() throws IOException {
        /**
         * 被呼叫,標識是否提交成功
         */
        if (!committed) {
            committed = true;
        }
    }

    @Override
    public Event readEvent() {
        /**
         * 傳送單獨的一個event,內容為test
         */
        return EventBuilder.withBody("test", outputCharset);
    }

    @Override
    public List<Event> readEvents(int numEvents) {
        /**
         * 傳送多個Event列表
         */
        List<Event> retList = Lists.newLinkedList();
        for (int i = 0; i < numEvents; ++i) {
            retList.add(EventBuilder.withBody("test", outputCharset));
        }
        if (retList.size() != 0) {
            // only non-empty events need to commit
            committed = false;
        }
        return retList;
    }
}

MyFlumeSource.java程式碼如下:

package flume.plugin;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractSource;

import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class MyFlumeSource  extends AbstractSource implements Configurable, EventDrivenSource {
    // process info
    private SourceCounter sourceCounter;
    private MySourceEventReader reader;
    private ScheduledExecutorService executor;
    private int intervalMillis;
;
    @Override
    public synchronized void configure(Context context) {
        //根據Context讀取配置,Context會自動載入flume啟動時指定的配置

        // 讀配置檔案間隔時間,預設值100ms
        intervalMillis = context.getInteger("intervalMillis", 100);
    }

    @Override
    public synchronized void start() {
        // 初始化
        if (sourceCounter == null) {
            sourceCounter = new SourceCounter(getName());
        }
        executor = Executors.newSingleThreadScheduledExecutor();
        reader = new MySourceEventReader();

        // 每個2s執行一次
        Runnable runner = new MyReaderRunnable(reader, sourceCounter);
        executor.scheduleWithFixedDelay(runner, 0, 2, TimeUnit.MILLISECONDS);

        super.start();
        sourceCounter.start();
    }

    @Override
    public synchronized void stop() {
        executor.shutdown();
        try {
            executor.awaitTermination(10L, TimeUnit.SECONDS);
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
        executor.shutdownNow();
        super.stop();
        sourceCounter.stop();
    }

    private class MyReaderRunnable implements Runnable {

        private MySourceEventReader reader;
        private SourceCounter sourceCounter;

        public MyReaderRunnable(MySourceEventReader reader, SourceCounter sourceCounter) {
            this.reader = reader;
            this.sourceCounter = sourceCounter;
        }

        @Override
        public void run() {
            while (!Thread.interrupted()) {
                // 讀事件
                List<Event> events = reader.readEvents(5);
                // 提交
                sourceCounter.addToEventReceivedCount(events.size());
                sourceCounter.incrementAppendBatchReceivedCount();
                // sleep intervalMillis
                try {
                    Thread.sleep(intervalMillis);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

上面就是一個最簡單的source的實現了,實現每個一段時間自動傳送“test”資訊,對應的flume配置和啟動命令如下:
flume-conf.properties配置如下:

producer.sources = s

#test
producer.sources.s.type = flume.plugin.MyFlumeSource
// 注意這個配置,這個配置是上述程式碼中唯一指定的配置,我們還可以通過這種方式提供更多的配置,都會通過Context來自動讀取
producer.sources.s.intervalMillis=50

producer.sources.s.channels = c

啟動flume的命令如下:
flume-ng agent -n producer –conf conf -f conf/flume-conf.properties

至此,我們完成了flume自定義source的開發,可以根據想要的開發更多的自定義source。