1. 程式人生 > >Flume的HDFS sink學習

Flume的HDFS sink學習

前言:HDFS sink原生的解析時間戳的程式碼效能不高,可以通過修改原始碼提升效能。具體操作參考連結:http://www.cnblogs.com/lxf20061900/p/4014281.html

HDFS sink常用配置項:

type The component type name, needs to be hdfs
hdfs.path HDFS directory path (eg hdfs://namenode/flume/webdata/)
hdfs.filePrefix FlumeData Name prefixed to files created by Flume in hdfs directory
hdfs.fileSuffix Suffix to append to file (eg .avro - NOTE: period is not automatically added)
hdfs.inUsePrefix Prefix that is used for temporal files that flume actively writes into
hdfs.inUseSuffix .tmp Suffix that is used for temporal files that flume actively writes into
hdfs.rollInterval 30 Number of seconds to wait before rolling current file (0 = never roll based on time interval)
hdfs.rollSize 1024 File size to trigger roll, in bytes (0: never roll based on file size)
hdfs.rollCount 10 Number of events written to file before it rolled (0 = never roll based on number of events)
hdfs.idleTimeout 0 Timeout after which inactive files get closed (0 = disable automatic closing of idle files)
hdfs.batchSize 100 number of events written to file before it is flushed to HDFS
hdfs.fileType SequenceFile File format: currently SequenceFile, DataStream or CompressedStream (1)DataStream will not compress output file and please don’t set codeC (2)CompressedStream requires set hdfs.codeC with an available codeC
hdfs.maxOpenFiles 5000 Allow only this number of open files. If this number is exceeded, the oldest file is closed.
hdfs.callTimeout 10000 Number of milliseconds allowed for HDFS operations, such as open, write, flush, close. This number should be increased if many HDFS timeout operations are occurring.
hdfs.threadsPoolSize 10 Number of threads per HDFS sink for HDFS IO ops (open, write, etc.)
hdfs.round false Should the timestamp be rounded down (if true, affects all time based escape sequences except %t)
hdfs.roundValue 1 Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time.

一般使用hdfs sink都會採用滾動生成檔案的方式,hdfs sink滾動生成檔案的策略有:

  • 基於時間
  • 基於檔案大小
  • 基於hdfs檔案副本數(一般要規避這種情況)
  • 基於event數量
  • 基於檔案閒置時間

1、基於時間策略

配置項:hdfs.rollInterval

預設值:30秒

說明:如果設定為0表示禁用這個策略

原理: 在org.apache.flume.sink.hdfs.BucketWriter.append方法中開啟一個檔案,都會呼叫open方法,如果設定了hdfs.rollInterval,那麼hdfs.rollInterval秒之內只要其他策略沒有關閉檔案,檔案會在hdfs.rollInterval秒之後關閉。

if (rollInterval > 0) {

  Callable<Void> action = new Callable<Void>() {
    public Void call() throws Exception {
      LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.",
          bucketPath, rollInterval);
      try {
        // Roll the file and remove reference from sfWriters map.
        close(true);
      } catch (Throwable t) {
        LOG.error("Unexpected error", t);
      }
      return null;
    }
  };
  timedRollFuture = timedRollerPool.schedule(action, rollInterval,
      TimeUnit.SECONDS);

}

2、基於檔案大小和event數量策略

配置項:

檔案大小策略:hdfs.rollSize

event數量策略:hdfs.rollCount

預設值:

檔案大小策略:1024位元組

event數量策略:10

說明:如果設定為0表示禁用這些策略

原理: 這2種策略都是在org.apache.flume.sink.hdfs.BucketWriter.shouldRotate方法中進行判斷的,只要doRotate的值為true,那麼當前檔案就會關閉,即滾動到下一個檔案。

private boolean shouldRotate() {

boolean doRotate = false;

if (writer.isUnderReplicated()) {
  this.isUnderReplicated = true;
  doRotate = true;
} else {
  this.isUnderReplicated = false;
}

if ((rollCount > 0) && (rollCount <= eventCounter)) {
  LOG.debug("rolling: rollCount: {}, events: {}", rollCount, eventCounter);
  doRotate = true;
}

if ((rollSize > 0) && (rollSize <= processSize)) {
  LOG.debug("rolling: rollSize: {}, bytes: {}", rollSize, processSize);
  doRotate = true;
}

return doRotate;

}

注意:如果同時配置了時間策略和檔案大小策略,那麼會先判斷時間,如果時間沒到再判斷其他的條件。

3、基於hdfs檔案副本數

配置項:hdfs.minBlockReplicas

預設值:和hdfs的副本數一致

原理: 從上面的程式碼中可以看到,判斷副本數的關鍵方法是writer.isUnderReplicated(),即

public boolean isUnderReplicated() {

try {
  int numBlocks = getNumCurrentReplicas();
  if (numBlocks == -1) {
    return false;
  }
  int desiredBlocks;
  if (configuredMinReplicas != null) {
    desiredBlocks = configuredMinReplicas;
  } else {
    desiredBlocks = getFsDesiredReplication();
  }
  return numBlocks < desiredBlocks;
} catch (IllegalAccessException e) {
  logger.error("Unexpected error while checking replication factor", e);
} catch (InvocationTargetException e) {
  logger.error("Unexpected error while checking replication factor", e);
} catch (IllegalArgumentException e) {
  logger.error("Unexpected error while checking replication factor", e);
}
return false;

}

也就是說,如果當前正在寫的檔案的副本數小於hdfs.minBlockReplicas,此方法返回true,其他情況都返回false。假設這個方法返回true,那麼看一下會發生什麼事情。

首先就是上面程式碼提到的shouldRotate方法肯定返回的是true。再繼續跟蹤,下面的程式碼是關鍵

if (shouldRotate()) {

  boolean doRotate = true;

  if (isUnderReplicated) {
    if (maxConsecUnderReplRotations > 0 &&
        consecutiveUnderReplRotateCount >= maxConsecUnderReplRotations) {
      doRotate = false;
      if (consecutiveUnderReplRotateCount == maxConsecUnderReplRotations) {
        LOG.error("Hit max consecutive under-replication rotations ({}); " +
            "will not continue rolling files under this path due to " +
            "under-replication", maxConsecUnderReplRotations);
      }
    } else {
      LOG.warn("Block Under-replication detected. Rotating file.");
    }
    consecutiveUnderReplRotateCount++;
  } else {
    consecutiveUnderReplRotateCount = 0;
  }

  if (doRotate) {
    close();
    open();
  }
}

這裡maxConsecUnderReplRotations是固定的值30,也就是說,檔案滾動生成了30個之後,就不會再滾動了,因為將doRotate設定為了false。所以,從這裡可以看到,如果isUnderReplicated方法返回的是true,可能會導致檔案的滾動和預期的不一致。規避這個問題的方法就是將hdfs.minBlockReplicas設定為1,一般hdfs的副本數肯定都是大於等於1的,所以isUnderReplicated方法一定會返回false。 所以一般情況下,要規避這種情況,避免影響檔案的正常滾動。

4、基於檔案閒置時間策略

配置項:hdfs.idleTimeout

預設值:0

說明:預設啟動這個功能

這種策略很簡單,如果檔案在hdfs.idleTimeout秒的時間裡都是閒置的,沒有任何資料寫入,那麼當前檔案關閉,滾動到下一個檔案。

public synchronized void flush() throws IOException, InterruptedException {

checkAndThrowInterruptedException();
if (!isBatchComplete()) {
  doFlush();

  if (idleTimeout > 0) {
    // if the future exists and couldn't be cancelled, that would mean it has already run
    // or been cancelled
    if (idleFuture == null || idleFuture.cancel(false)) {
      Callable<Void> idleAction = new Callable<Void>() {
        public Void call() throws Exception {
          LOG.info("Closing idle bucketWriter {} at {}", bucketPath,
                   System.currentTimeMillis());
          if (isOpen) {
            close(true);
          }
          return null;
        }
      };
      idleFuture = timedRollerPool.schedule(idleAction, idleTimeout,
          TimeUnit.SECONDS);
    }
  }
}

}

以上滾動生成檔案的內容,參考連結 https://www.jianshu.com/p/4f43780c82e9