Flume NG原始碼分析(七)ChannelSelector
前幾篇介紹了Flume NG Source元件的基本情況,接下來看看Channel相關的元件,Channel相關元件有:
1. Channel
2. ChannelSelector
3. Interceptor / InterceptorChain
4. ChannelProcessor
5. Transaction
這篇說說ChannelSelector。ChannelSelector的作用是為Source選擇下游的Channel。有兩種選擇方式,複製和多路複用。所謂複製就是把Source中傳遞過來的Event複製給所有對應的下游的Channel。多路複用是可以把Source傳遞過來的Event按照不同的屬性傳遞到不同的下游Channel中去。
下面這張圖展示了ReplicatingChannelSelector的工作方式: Source中傳遞過來的Event會被複制到Channel1, Channel2, Channel3中去。然後不同的Channel再傳遞給各自下游的Sink。ChannelSelector提供了靈活的Source事件分發機制。預設的ChannelSelector就是ReplicatingChannelSelector
一個典型的配置如下: <Source1> 中的Event會被複制到下游的<Channel1>和<Channel2>中去
# List the sources, sinks and channels for the agent <Agent>.sources = <Source1> <Agent>.sinks = <Sink1> <Sink2> <Agent>.channels = <Channel1> <Channel2> # set list of channels for source (separated by space) <Agent>.sources.<Source1>.channels = <Channel1> <Channel2> # set channel for sinks <Agent>.sinks.<Sink1>.channel = <Channel1> <Agent>.sinks.<Sink2>.channel = <Channel2> <Agent>.sources.<Source1>.selector.type = replicating
來看看ChannelSelector的介面定義
1. setChannels方法新增和Source相關的所有的Channel進來
2. getAllChannels返回和Source相關的所有Channels
3. getRequiredChannels返回的是必須要傳遞的Channel,當傳遞失敗時要通知Source
4. getOptionalChannels返回的是不是必須要傳遞的Channel,當傳遞失敗時可以忽略
public interface ChannelSelector extends NamedComponent, Configurable { public void setChannels(List<Channel> channels); public List<Channel> getRequiredChannels(Event event); public List<Channel> getOptionalChannels(Event event); public List<Channel> getAllChannels(); }
ChannelSelector的類層次結構如下
AbstractChannelSelector是個抽象類,實現了ChannelSelector介面,它的作用是給ChannelSelector這個類層次結構新增屬性。
public abstract class AbstractChannelSelector implements ChannelSelector {
private List<Channel> channels;
private String name;
@Override
public List<Channel> getAllChannels() {
return channels;
}
@Override
public void setChannels(List<Channel> channels) {
this.channels = channels;
}
@Override
public synchronized void setName(String name) {
this.name = name;
}
@Override
public synchronized String getName() {
return name;
}
。。。。。
}
ReplicatingChannelSelector會把Source傳遞的事件流複製給下游所有的Channel。它維護了一個requiredChannels和optionalChannels列表。在配置的optional裡出現的Channel會加入到optionalChannel列表,其他的都進入requiredChannels。
public class ReplicatingChannelSelector extends AbstractChannelSelector {
/**
* Configuration to set a subset of the channels as optional.
*/
public static final String CONFIG_OPTIONAL = "optional";
List<Channel> requiredChannels = null;
List<Channel> optionalChannels = new ArrayList<Channel>();
@Override
public List<Channel> getRequiredChannels(Event event) {
/*
* Seems like there are lot of components within flume that do not call
* configure method. It is conceiveable that custom component tests too
* do that. So in that case, revert to old behavior.
*/
if(requiredChannels == null) {
return getAllChannels();
}
return requiredChannels;
}
@Override
public List<Channel> getOptionalChannels(Event event) {
return optionalChannels;
}
@Override
public void configure(Context context) {
String optionalList = context.getString(CONFIG_OPTIONAL);
requiredChannels = new ArrayList<Channel>(getAllChannels());
Map<String, Channel> channelNameMap = getChannelNameMap();
if(optionalList != null && !optionalList.isEmpty()) {
for(String optional : optionalList.split("\\s+")) {
Channel optionalChannel = channelNameMap.get(optional);
requiredChannels.remove(optionalChannel);
if (!optionalChannels.contains(optionalChannel)) {
optionalChannels.add(optionalChannel);
}
}
}
}
}
MultiplexingChannelSelector可以根據Source傳遞過來的事件流的屬性來選擇相應的下游Channel。
MultiplexingChannelSelector的配置例如下面這個例子
1. 根據Event的Header裡配置的<State, Value>屬性
2. 假如State屬性的值是CA,那麼這個Event流入mem-channel-1, 如果是AZ,流入file-channel-2,如果是NY, 流入mem-channel-1和file-channel-2。其他的值(包含null)都流入到mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1
MultiplexingChannelSelector維護了幾個屬性來對應上面的配置
private String headerName;
private Map<String, List<Channel>> channelMapping;
private Map<String, List<Channel>> optionalChannels;
private List<Channel> defaultChannels;
configure()方法中會根據配置,填充channelMapping, optionalChannels和defaultChannels幾個資料結構
public void configure(Context context) {
this.headerName = context.getString(CONFIG_MULTIPLEX_HEADER_NAME,
DEFAULT_MULTIPLEX_HEADER);
Map<String, Channel> channelNameMap = getChannelNameMap();
defaultChannels = getChannelListFromNames(
context.getString(CONFIG_DEFAULT_CHANNEL), channelNameMap);
Map<String, String> mapConfig =
context.getSubProperties(CONFIG_PREFIX_MAPPING);
channelMapping = new HashMap<String, List<Channel>>();
for (String headerValue : mapConfig.keySet()) {
List<Channel> configuredChannels = getChannelListFromNames(
mapConfig.get(headerValue),
channelNameMap);
//This should not go to default channel(s)
//because this seems to be a bad way to configure.
if (configuredChannels.size() == 0) {
throw new FlumeException("No channel configured for when "
+ "header value is: " + headerValue);
}
if (channelMapping.put(headerValue, configuredChannels) != null) {
throw new FlumeException("Selector channel configured twice");
}
}
//If no mapping is configured, it is ok.
//All events will go to the default channel(s).
Map<String, String> optionalChannelsMapping =
context.getSubProperties(CONFIG_PREFIX_OPTIONAL + ".");
optionalChannels = new HashMap<String, List<Channel>>();
for (String hdr : optionalChannelsMapping.keySet()) {
List<Channel> confChannels = getChannelListFromNames(
optionalChannelsMapping.get(hdr), channelNameMap);
if (confChannels.isEmpty()) {
confChannels = EMPTY_LIST;
}
//Remove channels from optional channels, which are already
//configured to be required channels.
List<Channel> reqdChannels = channelMapping.get(hdr);
//Check if there are required channels, else defaults to default channels
if(reqdChannels == null || reqdChannels.isEmpty()) {
reqdChannels = defaultChannels;
}
for (Channel c : reqdChannels) {
if (confChannels.contains(c)) {
confChannels.remove(c);
}
}
if (optionalChannels.put(hdr, confChannels) != null) {
throw new FlumeException("Selector channel configured twice");
}
}
}
}
填充了這幾個資料結構之後,getRequiredChannels()和getOptionalChannles()的實現就很簡單了
public List<Channel> getRequiredChannels(Event event) {
String headerValue = event.getHeaders().get(headerName);
if (headerValue == null || headerValue.trim().length() == 0) {
return defaultChannels;
}
List<Channel> channels = channelMapping.get(headerValue);
//This header value does not point to anything
//Return default channel(s) here.
if (channels == null) {
channels = defaultChannels;
}
return channels;
}
@Override
public List<Channel> getOptionalChannels(Event event) {
String hdr = event.getHeaders().get(headerName);
List<Channel> channels = optionalChannels.get(hdr);
if(channels == null) {
channels = EMPTY_LIST;
}
return channels;
}