FLUME單節點配置並自定義攔截器
3. Flume1.7.0解壓縮和更換目錄
# cd /opt
# tar -xzvf apache-flume-1.7.0-bin.tar.gz
# mv apache-flume-1.7.0-bin flume1.7.0
# chmod 777 -R /opt/flume1.7.0 #給目錄授權
4. 配置環境變數
# vim /etc/profile export FLUME_HOME=/opt/flume1.7.0 export FLUME_CONF_DIR=$FLUME_HOME/conf export PATH=$FLUME_HOME/bin # source /etc/profile
5. 測試使用
5.1 新增flume-conf.properties配置檔案
# cd /opt/flume1.7.0/conf
# vim flume-conf.properties
# a.conf: A single-node Flume configuration # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /opt/log a1.sources.r1.fileHeader = true a1.sources.r1.deserializer.outputCharset=UTF-8 # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://hadoop0:9000/log a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.writeFormat=Text a1.sinks.k1.hdfs.maxOpenFiles = 1 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.rollInterval = 0 a1.sinks.k1.hdfs.rollSize = 1000000 a1.sinks.k1.hdfs.batchSize = 100000 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000000 a1.channels.c1.transactionCapacity = 100000 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
5.2 建立目錄並授權
# mkdir /opt/log
# chmod 777 -R /opt/log
注:hdfs的log目錄,不用手動去建立,它會自動生成的
還是針對學習八中的那個需求,我們現在換一種實現方式,採用攔截器來實現。
先回想一下,spooldir source可以將檔名作為header中的key:basename寫入到event的header當中去。試想一下,如果有一個攔截器可以攔截這個event,然後抽取header中這個key的值,將其拆分成3段,每一段都放入到header中,這樣就可以實現那個需求了。
遺憾的是,flume沒有提供可以攔截header的攔截器。不過有一個抽取body內容的攔截器:RegexExtractorInterceptor,看起來也很強大,以下是一個官方文件的示例:
If the Flume event body contained 1:2:3.4foobar5 and the following configuration was used
a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
a1.sources.r1.interceptors.i1.serializers = s1 s2 s3
a1.sources.r1.interceptors.i1.serializers.s1.name = one
a1.sources.r1.interceptors.i1.serializers.s2.name = two
a1.sources.r1.interceptors.i1.serializers.s3.name = three
The extracted event will contain the same body but the following headers will have been added one=>1, two=>2, three=>3
大概意思就是,通過這樣的配置,event body中如果有1:2:3.4foobar5 這樣的內容,這會通過正則的規則抽取具體部分的內容,然後設定到header當中去。
於是決定打這個攔截器的主義,覺得只要把程式碼稍微改改,從攔截body改為攔截header中的具體key,就OK了。翻開原始碼,哎呀,很工整,改起來沒難度,以下是我新增的一個攔截器:RegexExtractorExtInterceptor:
-
package com.besttone.flume;
-
import java.util.List;
-
import java.util.Map;
-
import java.util.regex.Matcher;
-
import java.util.regex.Pattern;
-
import org.apache.commons.lang.StringUtils;
-
import org.apache.flume.Context;
-
import org.apache.flume.Event;
-
import org.apache.flume.interceptor.Interceptor;
-
import org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer;
-
import org.apache.flume.interceptor.RegexExtractorInterceptorSerializer;
-
import org.slf4j.Logger;
-
import org.slf4j.LoggerFactory;
-
import com.google.common.base.Charsets;
-
import com.google.common.base.Preconditions;
-
import com.google.common.base.Throwables;
-
import com.google.common.collect.Lists;
-
/**
-
* Interceptor that extracts matches using a specified regular expression and
-
* appends the matches to the event headers using the specified serializers</p>
-
* Note that all regular expression matching occurs through Java's built in
-
* java.util.regex package</p>. Properties:
-
* <p>
-
* regex: The regex to use
-
* <p>
-
* serializers: Specifies the group the serializer will be applied to, and the
-
* name of the header that will be added. If no serializer is specified for a
-
* group the default {@link RegexExtractorInterceptorPassThroughSerializer} will
-
* be used
-
* <p>
-
* Sample config:
-
* <p>
-
* agent.sources.r1.channels = c1
-
* <p>
-
* agent.sources.r1.type = SEQ
-
* <p>
-
* agent.sources.r1.interceptors = i1
-
* <p>
-
* agent.sources.r1.interceptors.i1.type = REGEX_EXTRACTOR
-
* <p>
-
* agent.sources.r1.interceptors.i1.regex = (WARNING)|(ERROR)|(FATAL)
-
* <p>
-
* agent.sources.r1.interceptors.i1.serializers = s1 s2
-
* agent.sources.r1.interceptors.i1.serializers.s1.type =
-
* com.blah.SomeSerializer agent.sources.r1.interceptors.i1.serializers.s1.name
-
* = warning agent.sources.r1.interceptors.i1.serializers.s2.type =
-
* org.apache.flume.interceptor.RegexExtractorInterceptorTimestampSerializer
-
* agent.sources.r1.interceptors.i1.serializers.s2.name = error
-
* agent.sources.r1.interceptors.i1.serializers.s2.dateFormat = yyyy-MM-dd
-
* </code>
-
* </p>
-
*
-
* <pre>
-
* Example 1:
-
* </p>
-
* EventBody: 1:2:3.4foobar5</p> Configuration:
-
* agent.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
-
* </p>
-
* agent.sources.r1.interceptors.i1.serializers = s1 s2 s3
-
* agent.sources.r1.interceptors.i1.serializers.s1.name = one
-
* agent.sources.r1.interceptors.i1.serializers.s2.name = two
-
* agent.sources.r1.interceptors.i1.serializers.s3.name = three
-
* </p>
-
* results in an event with the the following
-
*
-
* body: 1:2:3.4foobar5 headers: one=>1, two=>2, three=3
-
*
-
* Example 2:
-
*
-
* EventBody: 1:2:3.4foobar5
-
*
-
* Configuration: agent.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
-
* <p>
-
* agent.sources.r1.interceptors.i1.serializers = s1 s2
-
* agent.sources.r1.interceptors.i1.serializers.s1.name = one
-
* agent.sources.r1.interceptors.i1.serializers.s2.name = two
-
* <p>
-
*
-
* results in an event with the the following
-
*
-
* body: 1:2:3.4foobar5 headers: one=>1, two=>2
-
* </pre>
-
*/
-
public class RegexExtractorExtInterceptor implements Interceptor {
-
static final String REGEX = "regex";
-
static final String SERIALIZERS = "serializers";
-
// 增加程式碼開始
-
static final String EXTRACTOR_HEADER = "extractorHeader";
-
static final boolean DEFAULT_EXTRACTOR_HEADER = false;
-
static final String EXTRACTOR_HEADER_KEY = "extractorHeaderKey";
-
// 增加程式碼結束
-
private static final Logger logger = LoggerFactory
-
.getLogger(RegexExtractorExtInterceptor.class);
-
private final Pattern regex;
-
private final List<NameAndSerializer> serializers;
-
// 增加程式碼開始
-
private final boolean extractorHeader;
-
private final String extractorHeaderKey;
-
// 增加程式碼結束
-
private RegexExtractorExtInterceptor(Pattern regex,
-
List<NameAndSerializer> serializers, boolean extractorHeader,
-
String extractorHeaderKey) {
-
this.regex = regex;
-
this.serializers = serializers;
-
this.extractorHeader = extractorHeader;
-
this.extractorHeaderKey = extractorHeaderKey;
-
}
-
@Override
-
public void initialize() {
-
// NO-OP...
-
}
-
@Override
-
public void close() {
-
// NO-OP...
-
}
-
@Override
-
public Event intercept(Event event) {
-
String tmpStr;
-
if(extractorHeader)
-
{
-
tmpStr = event.getHeaders().get(extractorHeaderKey);
-
}
-
else
-
{
-
tmpStr=new String(event.getBody(),
-
Charsets.UTF_8);
-
}
-
Matcher matcher = regex.matcher(tmpStr);
-
Map<String, String> headers = event.getHeaders();
-
if (matcher.find()) {
-
for (int group = 0, count = matcher.groupCount(); group < count; group++) {
-
int groupIndex = group + 1;
-
if (groupIndex > serializers.size()) {
-
if (logger.isDebugEnabled()) {
-
logger.debug(
-
"Skipping group {} to {} due to missing serializer",
-
group, count);
-
}
-
break;
-
}
-
NameAndSerializer serializer = serializers.get(group);
-
if (logger.isDebugEnabled()) {
-
logger.debug("Serializing {} using {}",
-
serializer.headerName, serializer.serializer);
-
}
-
headers.put(serializer.headerName, serializer.serializer
-
.serialize(matcher.group(groupIndex)));
-
}
-
}
-
return event;
-
}
-
@Override
-
public List<Event> intercept(List<Event> events) {
-
List<Event> intercepted = Lists.newArrayListWithCapacity(events.size());
-
for (Event event : events) {
-
Event interceptedEvent = intercept(event);
-
if (interceptedEvent != null) {
-
intercepted.add(interceptedEvent);
-
}
-
}
-
return intercepted;
-
}
-
public static class Builder implements Interceptor.Builder {
-
private Pattern regex;
-
private List<NameAndSerializer> serializerList;
-
// 增加程式碼開始
-
private boolean extractorHeader;
-
private String extractorHeaderKey;
-
// 增加程式碼結束
-
private final RegexExtractorInterceptorSerializer defaultSerializer = new RegexExtractorInterceptorPassThroughSerializer();
-
@Override
-
public void configure(Context context) {
-
String regexString = context.getString(REGEX);
-
Preconditions.checkArgument(!StringUtils.isEmpty(regexString),
-
"Must supply a valid regex string");
-
regex = Pattern.compile(regexString);
-
regex.pattern();
-
regex.matcher("").groupCount();
-
configureSerializers(context);
-
// 增加程式碼開始
-
extractorHeader = context.getBoolean(EXTRACTOR_HEADER,
-
DEFAULT_EXTRACTOR_HEADER);
-
if (extractorHeader) {
-
extractorHeaderKey = context.getString(EXTRACTOR_HEADER_KEY);
-
Preconditions.checkArgument(
-
!StringUtils.isEmpty(extractorHeaderKey),
-
"必須指定要抽取內容的header key");
-
}
-
// 增加程式碼結束
-
}
-
private void configureSerializers(Context context) {
-
String serializerListStr = context.getString(SERIALIZERS);
-
Preconditions.checkArgument(
-
!StringUtils.isEmpty(serializerListStr),
-
"Must supply at least one name and serializer");
-
String[] serializerNames = serializerListStr.split("\\s+");
-
Context serializerContexts = new Context(
-
context.getSubProperties(SERIALIZERS + "."));
-
serializerList = Lists
-
.newArrayListWithCapacity(serializerNames.length);
-
for (String serializerName : serializerNames) {
-
Context serializerContext = new Context(
-
serializerContexts.getSubProperties(serializerName
-
+ "."));
-
String type = serializerContext.getString("type", "DEFAULT");
-
String name = serializerContext.getString("name");
-
Preconditions.checkArgument(!StringUtils.isEmpty(name),
-
"Supplied name cannot be empty.");
-
if ("DEFAULT".equals(type)) {
-
serializerList.add(new NameAndSerializer(name,
-
defaultSerializer));
-
} else {
-
serializerList.add(new NameAndSerializer(name,
-
getCustomSerializer(type, serializerContext)));
-
}
-
}
-
}
-
private RegexExtractorInterceptorSerializer getCustomSerializer(
-
String clazzName, Context context) {
-
try {
-
RegexExtractorInterceptorSerializer serializer = (RegexExtractorInterceptorSerializer) Class
-
.forName(clazzName).newInstance();
-
serializer.configure(context);
-
return serializer;
-
} catch (Exception e) {
-
logger.error("Could not instantiate event serializer.", e);
-
Throwables.propagate(e);
-
}
-
return defaultSerializer;
-
}
-
@Override
-
public Interceptor build() {
-
Preconditions.checkArgument(regex != null,
-
"Regex pattern was misconfigured");
-
Preconditions.checkArgument(serializerList.size() > 0,
-
"Must supply a valid group match id list");
-
return new RegexExtractorExtInterceptor(regex, serializerList,
-
extractorHeader, extractorHeaderKey);
-
}
-
}
-
static class NameAndSerializer {
-
private final String headerName;
-
private final RegexExtractorInterceptorSerializer serializer;
-
public NameAndSerializer(String headerName,
-
RegexExtractorInterceptorSerializer serializer) {
-
this.headerName = headerName;
-
this.serializer = serializer;
-
}
-
}
-
}
簡單說明一下改動的內容:
增加了兩個配置引數:
extractorHeader 是否抽取的是header部分,預設為false,即和原始的攔截器功能一致,抽取的是event body的內容
extractorHeaderKey 抽取的header的指定的key的內容,當extractorHeader為true時,必須指定該引數。
按照第八講的方法,我們將該類打成jar包,作為flume的外掛放到了/var/lib/flume-ng/plugins.d/RegexExtractorExtInterceptor/lib目錄下,重新啟動flume,將該攔截器載入到classpath中。
最終的flume.conf如下:
-
tier1.sources=source1
-
tier1.channels=channel1
-
tier1.sinks=sink1
-
tier1.sources.source1.type=spooldir
-
tier1.sources.source1.spoolDir=/opt/logs
-
tier1.sources.source1.fileHeader=true
-
tier1.sources.source1.basenameHeader=true
-
tier1.sources.source1.interceptors=i1
-
tier1.sources.source1.interceptors.i1.type=com.besttone.flume.RegexExtractorExtInterceptor$Builder
-
tier1.sources.source1.interceptors.i1.regex=(.*)\\.(.*)\\.(.*)
-
tier1.sources.source1.interceptors.i1.extractorHeader=true
-
tier1.sources.source1.interceptors.i1.extractorHeaderKey=basename
-
tier1.sources.source1.interceptors.i1.serializers=s1 s2 s3
-
tier1.sources.source1.interceptors.i1.serializers.s1.name=one
-
tier1.sources.source1.interceptors.i1.serializers.s2.name=two
-
tier1.sources.source1.interceptors.i1.serializers.s3.name=three
-
tier1.sources.source1.channels=channel1
-
tier1.sinks.sink1.type=hdfs
-
tier1.sinks.sink1.channel=channel1
-
tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/flume/events/%{one}/%{three}
-
tier1.sinks.sink1.hdfs.round=true
-
tier1.sinks.sink1.hdfs.roundValue=10
-
tier1.sinks.sink1.hdfs.roundUnit=minute
-
tier1.sinks.sink1.hdfs.fileType=DataStream
-
tier1.sinks.sink1.hdfs.writeFormat=Text
-
tier1.sinks.sink1.hdfs.rollInterval=0
-
tier1.sinks.sink1.hdfs.rollSize=10240
-
tier1.sinks.sink1.hdfs.rollCount=0
-
tier1.sinks.sink1.hdfs.idleTimeout=60
-
tier1.channels.channel1.type=memory
-
tier1.channels.channel1.capacity=10000
-
tier1.channels.channel1.transactionCapacity=1000
-
tier1.channels.channel1.keep-alive=30
我把source type改回了內建的spooldir,而不是上一講自定義的source,然後添加了一個攔截器i1,type是自定義的攔截器:com.besttone.flume.RegexExtractorExtInterceptor$Builder,正則表示式按“.”分隔抽取三部分,分別放到header中的key:one,two,three當中去,即a.log.2014-07-31,通過攔截器後,在header當中就會增加三個key: one=a,two=log,three=2014-07-31。這時候我們在tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/flume/events/%{one}/%{three}。
就實現了和前面第八講一模一樣的需求。