flume ng進擊之路 (三) —— 自定義source API開發
概述
關於flume ng的簡單介紹,可以參考flume ng進擊之路 (一)—— 入門,同時flume ng也提供了各種各樣的source和sink介面供我們在生成環境中使用,但是在生產環境中,我們常常需要定製的source或者sink來滿足我們的要求。
好在flume ng提供了開放介面,我們可以根據這些介面,實現自己定製的source或者sink。下面我們來看一下如何實現自定義source框架。
實現
maven依賴
首先,要根據flume ng提供的介面來實現自定義source,需要我們依賴flume ng的配置,我們引入兩個配置flume-ng-core和flume-ng-configuration,具體的maven配置如下:
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId >
<artifactId>flume-ng-configuration</artifactId>
<version>1.6.0</version>
</dependency>
source都包含哪些東西
在正式開始之前,我們先看看官網上一個source是如何使用的。我們在使用source的時候,只需要簡單的配置一個檔案,比如我們看看官網的Spooling Directory Source是如何監控一個資料夾的檔案變化並且抓取的。
簡單的配置如下:
a1.channels = ch-1
a1.sources = src-1
a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
a1.sources.src-1.fileHeader = true
這是配置一個Spooling Directory Source的最簡單的配置,沒有填寫的配置會使用預設值,參考官網。
從上面可以看到,如果要實現自定義的source,我們要處理相應的自定義配置。
另外,我們知道flume是將訊息(採集以及傳輸)封裝成Event來實現通用source,channel和sink的組合,因此,要實現自定義source,必須要處理Event的來源。
自定義source
有了上面的簡單瞭解,我們可以開始我們的自定義開發工作了。從上面可以知道,我們需要處理兩個關鍵問題:配置和Event處理。當然是通過實現flume提供的介面來處理。
我們來實現一個自動間隔傳送test文字資訊的source。
程式碼組織結構如下:
具體程式碼如下:
MySourceEventReader.java程式碼如下:
package flume.plugin;
import com.google.common.collect.Lists;
import org.apache.flume.Event;
import org.apache.flume.client.avro.ReliableEventReader;
import org.apache.flume.event.EventBuilder;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
public class MySourceEventReader implements ReliableEventReader {
private final Charset outputCharset = Charset.forName("UTF-8");
// 標識是否source的event資訊是否已提交到channel
private boolean committed = true; // true,已提交,false,有資訊待提交
@Override
public void close() throws IOException {
/**
* 執行關閉相關資源操作
*/
}
@Override
public void commit() throws IOException {
/**
* 被呼叫,標識是否提交成功
*/
if (!committed) {
committed = true;
}
}
@Override
public Event readEvent() {
/**
* 傳送單獨的一個event,內容為test
*/
return EventBuilder.withBody("test", outputCharset);
}
@Override
public List<Event> readEvents(int numEvents) {
/**
* 傳送多個Event列表
*/
List<Event> retList = Lists.newLinkedList();
for (int i = 0; i < numEvents; ++i) {
retList.add(EventBuilder.withBody("test", outputCharset));
}
if (retList.size() != 0) {
// only non-empty events need to commit
committed = false;
}
return retList;
}
}
MyFlumeSource.java程式碼如下:
package flume.plugin;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractSource;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class MyFlumeSource extends AbstractSource implements Configurable, EventDrivenSource {
// process info
private SourceCounter sourceCounter;
private MySourceEventReader reader;
private ScheduledExecutorService executor;
private int intervalMillis;
;
@Override
public synchronized void configure(Context context) {
//根據Context讀取配置,Context會自動載入flume啟動時指定的配置
// 讀配置檔案間隔時間,預設值100ms
intervalMillis = context.getInteger("intervalMillis", 100);
}
@Override
public synchronized void start() {
// 初始化
if (sourceCounter == null) {
sourceCounter = new SourceCounter(getName());
}
executor = Executors.newSingleThreadScheduledExecutor();
reader = new MySourceEventReader();
// 每個2s執行一次
Runnable runner = new MyReaderRunnable(reader, sourceCounter);
executor.scheduleWithFixedDelay(runner, 0, 2, TimeUnit.MILLISECONDS);
super.start();
sourceCounter.start();
}
@Override
public synchronized void stop() {
executor.shutdown();
try {
executor.awaitTermination(10L, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
executor.shutdownNow();
super.stop();
sourceCounter.stop();
}
private class MyReaderRunnable implements Runnable {
private MySourceEventReader reader;
private SourceCounter sourceCounter;
public MyReaderRunnable(MySourceEventReader reader, SourceCounter sourceCounter) {
this.reader = reader;
this.sourceCounter = sourceCounter;
}
@Override
public void run() {
while (!Thread.interrupted()) {
// 讀事件
List<Event> events = reader.readEvents(5);
// 提交
sourceCounter.addToEventReceivedCount(events.size());
sourceCounter.incrementAppendBatchReceivedCount();
// sleep intervalMillis
try {
Thread.sleep(intervalMillis);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
上面就是一個最簡單的source的實現了,實現每個一段時間自動傳送“test”資訊,對應的flume配置和啟動命令如下:
flume-conf.properties配置如下:
producer.sources = s
#test
producer.sources.s.type = flume.plugin.MyFlumeSource
// 注意這個配置,這個配置是上述程式碼中唯一指定的配置,我們還可以通過這種方式提供更多的配置,都會通過Context來自動讀取
producer.sources.s.intervalMillis=50
producer.sources.s.channels = c
啟動flume的命令如下:
flume-ng agent -n producer –conf conf -f conf/flume-conf.properties
至此,我們完成了flume自定義source的開發,可以根據想要的開發更多的自定義source。