1. 程式人生 > >Flume NG原始碼分析(七)ChannelSelector

Flume NG原始碼分析(七)ChannelSelector

前幾篇介紹了Flume NG Source元件的基本情況,接下來看看Channel相關的元件,Channel相關元件有:

1. Channel

2. ChannelSelector

3. Interceptor / InterceptorChain

4. ChannelProcessor

5. Transaction

這篇說說ChannelSelector。ChannelSelector的作用是為Source選擇下游的Channel。有兩種選擇方式,複製和多路複用。所謂複製就是把Source中傳遞過來的Event複製給所有對應的下游的Channel。多路複用是可以把Source傳遞過來的Event按照不同的屬性傳遞到不同的下游Channel中去。

下面這張圖展示了ReplicatingChannelSelector的工作方式: Source中傳遞過來的Event會被複制到Channel1, Channel2, Channel3中去。然後不同的Channel再傳遞給各自下游的Sink。ChannelSelector提供了靈活的Source事件分發機制。預設的ChannelSelector就是ReplicatingChannelSelector



一個典型的配置如下: <Source1> 中的Event會被複制到下游的<Channel1>和<Channel2>中去

# List the sources, sinks and channels for the agent
<Agent>.sources = <Source1>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>

# set list of channels for source (separated by space)
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2>

# set channel for sinks
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>

<Agent>.sources.<Source1>.selector.type = replicating

來看看ChannelSelector的介面定義

1. setChannels方法新增和Source相關的所有的Channel進來

2. getAllChannels返回和Source相關的所有Channels

3. getRequiredChannels返回的是必須要傳遞的Channel,當傳遞失敗時要通知Source

4. getOptionalChannels返回的是不是必須要傳遞的Channel,當傳遞失敗時可以忽略

public interface ChannelSelector extends NamedComponent, Configurable {

  public void setChannels(List<Channel> channels);

  public List<Channel> getRequiredChannels(Event event);

  public List<Channel> getOptionalChannels(Event event);

  public List<Channel> getAllChannels();

}

ChannelSelector的類層次結構如下

AbstractChannelSelector是個抽象類,實現了ChannelSelector介面,它的作用是給ChannelSelector這個類層次結構新增屬性。

public abstract class AbstractChannelSelector implements ChannelSelector {

  private List<Channel> channels;
  private String name;

  @Override
  public List<Channel> getAllChannels() {
    return channels;
  }

  @Override
  public void setChannels(List<Channel> channels) {
    this.channels = channels;
  }

  @Override
  public synchronized void setName(String name) {
    this.name = name;
  }

  @Override
  public synchronized String getName() {
    return name;
  }
。。。。。
}

ReplicatingChannelSelector會把Source傳遞的事件流複製給下游所有的Channel。它維護了一個requiredChannels和optionalChannels列表。在配置的optional裡出現的Channel會加入到optionalChannel列表,其他的都進入requiredChannels。


public class ReplicatingChannelSelector extends AbstractChannelSelector {

  /**
   * Configuration to set a subset of the channels as optional.
   */
  public static final String CONFIG_OPTIONAL = "optional";
  List<Channel> requiredChannels = null;
  List<Channel> optionalChannels = new ArrayList<Channel>();

  @Override
  public List<Channel> getRequiredChannels(Event event) {
    /*
     * Seems like there are lot of components within flume that do not call
     * configure method. It is conceiveable that custom component tests too
     * do that. So in that case, revert to old behavior.
     */
    if(requiredChannels == null) {
      return getAllChannels();
    }
    return requiredChannels;
  }

  @Override
  public List<Channel> getOptionalChannels(Event event) {
    return optionalChannels;
  }

  @Override
  public void configure(Context context) {
    String optionalList = context.getString(CONFIG_OPTIONAL);
    requiredChannels = new ArrayList<Channel>(getAllChannels());
    Map<String, Channel> channelNameMap = getChannelNameMap();
    if(optionalList != null && !optionalList.isEmpty()) {
      for(String optional : optionalList.split("\\s+")) {
        Channel optionalChannel = channelNameMap.get(optional);
        requiredChannels.remove(optionalChannel);
        if (!optionalChannels.contains(optionalChannel)) {
          optionalChannels.add(optionalChannel);
        }
      }
    }
  }
}

MultiplexingChannelSelector可以根據Source傳遞過來的事件流的屬性來選擇相應的下游Channel。

MultiplexingChannelSelector的配置例如下面這個例子

1. 根據Event的Header裡配置的<State, Value>屬性

2. 假如State屬性的值是CA,那麼這個Event流入mem-channel-1, 如果是AZ,流入file-channel-2,如果是NY, 流入mem-channel-1和file-channel-2。其他的值(包含null)都流入到mem-channel-1

agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1

MultiplexingChannelSelector維護了幾個屬性來對應上面的配置

 private String headerName;

  private Map<String, List<Channel>> channelMapping;
  private Map<String, List<Channel>> optionalChannels;
  private List<Channel> defaultChannels;

configure()方法中會根據配置,填充channelMapping, optionalChannels和defaultChannels幾個資料結構

public void configure(Context context) {
    this.headerName = context.getString(CONFIG_MULTIPLEX_HEADER_NAME,
        DEFAULT_MULTIPLEX_HEADER);

    Map<String, Channel> channelNameMap = getChannelNameMap();

    defaultChannels = getChannelListFromNames(
        context.getString(CONFIG_DEFAULT_CHANNEL), channelNameMap);

    Map<String, String> mapConfig =
        context.getSubProperties(CONFIG_PREFIX_MAPPING);

    channelMapping = new HashMap<String, List<Channel>>();

    for (String headerValue : mapConfig.keySet()) {
      List<Channel> configuredChannels = getChannelListFromNames(
          mapConfig.get(headerValue),
          channelNameMap);

      //This should not go to default channel(s)
      //because this seems to be a bad way to configure.
      if (configuredChannels.size() == 0) {
        throw new FlumeException("No channel configured for when "
            + "header value is: " + headerValue);
      }

      if (channelMapping.put(headerValue, configuredChannels) != null) {
        throw new FlumeException("Selector channel configured twice");
      }
    }
    //If no mapping is configured, it is ok.
    //All events will go to the default channel(s).
    Map<String, String> optionalChannelsMapping =
        context.getSubProperties(CONFIG_PREFIX_OPTIONAL + ".");

    optionalChannels = new HashMap<String, List<Channel>>();
    for (String hdr : optionalChannelsMapping.keySet()) {
      List<Channel> confChannels = getChannelListFromNames(
              optionalChannelsMapping.get(hdr), channelNameMap);
      if (confChannels.isEmpty()) {
        confChannels = EMPTY_LIST;
      }
      //Remove channels from optional channels, which are already
      //configured to be required channels.

      List<Channel> reqdChannels = channelMapping.get(hdr);
      //Check if there are required channels, else defaults to default channels
      if(reqdChannels == null || reqdChannels.isEmpty()) {
        reqdChannels = defaultChannels;
      }
      for (Channel c : reqdChannels) {
        if (confChannels.contains(c)) {
          confChannels.remove(c);
        }
      }

      if (optionalChannels.put(hdr, confChannels) != null) {
        throw new FlumeException("Selector channel configured twice");
      }
    }

  }

}

填充了這幾個資料結構之後,getRequiredChannels()和getOptionalChannles()的實現就很簡單了

 public List<Channel> getRequiredChannels(Event event) {
    String headerValue = event.getHeaders().get(headerName);
    if (headerValue == null || headerValue.trim().length() == 0) {
      return defaultChannels;
    }

    List<Channel> channels = channelMapping.get(headerValue);

    //This header value does not point to anything
    //Return default channel(s) here.
    if (channels == null) {
      channels = defaultChannels;
    }

    return channels;
  }

  @Override
  public List<Channel> getOptionalChannels(Event event) {
    String hdr = event.getHeaders().get(headerName);
    List<Channel> channels = optionalChannels.get(hdr);

    if(channels == null) {
      channels = EMPTY_LIST;
    }
    return channels;
  }