1. 程式人生 > >Flume HDFS Sink使用及原始碼分析

Flume HDFS Sink使用及原始碼分析

HDFS Sink介紹

Flume匯入資料HDFS,目前只支援建立序列化(sequence)檔案和文字(text)檔案。還支援這兩個檔案的壓縮。檔案可以根據執行的時間,資料的大小和時間的數量來進行週期性的滾動(關閉當前檔案產生新的檔案)。也可以根據資料屬性分割槽,例如根據時間戳或機器分割槽。HDFS目錄路徑可以包含格式化的轉義字元,生成目錄路徑可以通過格式化轉移字元(escape sequences),HDFS sink通過這些轉義字元生成一個目錄或者檔案去儲存Event。當然在Flume中使用HDFS Sink的話,需要新增HDFS相關的Jar,這樣Flume就能使用Hadoop的jar和Hadoop叢集互動。注:Hadoop必須支援sync()。

以下是HDFS Sink支援的轉義字元:

名稱     描述
%{host} 替代Event Header被命名為“host”的值,支援任意的Header name。
%t Unix毫秒時間
%a 短的周名稱,例如:Mon, Tue, ...
%A 周名稱全稱,例如:Monday, Tuesday, ...
%b 短的月名稱,例如:(Jan, Feb, ...
%B 月名稱全稱,例如:January, February, ...
%c 日期和時間,例如:Thu Mar 3 23:05:25 2005
%d 每個月的某一天,例如:01 - 31
%e 每個月的某一天(沒有填充0)例如:1,2,3,4---31
%D 日期;像:%m/%d/%y
%H 小時(00..23)
%I 小時(01..12)
%j 每個年的某一天,例如:001..366
%k 小時,例如:0..23
%m 月份,例如:01..12
%n 月份,例如:1..12
%M 分鐘,例如:00..59
%p am 或 pm
%s 從1970-01-01 00:00:00 UTC到現在的毫秒數
%S 秒,例如:00..59
%y 兩位數的年份,例如:00..99
%Y 年份,例如:2010
%z +hhmm 數字時區,例如:-0400
檔案在使用的時候以".tmp"為字尾,一旦檔案關閉,副檔名將被移除。
注:跟時間相關的轉移序列,Key為“timestamp”必須存在在Event的Headers中(除非hdfs.useLocalTimeStamp設定為true)
Name Default Description
channel
type 元件的名稱,必須為:HDFS
hdfs.path HDFS目錄路徑,例如:hdfs://namenode/flume/webdata/
hdfs.filePrefix FlumeData HDFS目錄中,由Flume建立的檔案字首。
hdfs.fileSuffix 追加到檔案的字尾,例如:.txt
hdfs.inUsePrefix 檔案正在寫入時的字首。
hdfs.inUseSuffix .tmp 檔案正在寫入時的字尾。
hdfs.rollInterval 30 當前寫入的檔案滾動間隔,預設30秒生成一個新的檔案 (0 = 不滾動)
hdfs.rollSize 1024 以檔案大小觸發檔案滾動,單位位元組(0 = 不滾動)
hdfs.rollCount 10
以寫入的事件數觸發檔案滾動。(0 = 不滾動)
hdfs.idleTimeout 0 超時多久以後關閉無效的檔案。(0 = 禁用自動關閉的空閒檔案)但是還是可能因為網路等多種原因導致,正在寫的檔案始終沒有關閉,從而產生tmp檔案
hdfs.batchSize 100 有多少Event後,寫到檔案才重新整理到HDFS。
hdfs.codeC 壓縮編解碼器,可以使用:gzip, bzip2, lzo, lzop, snappy
hdfs.fileType SequenceFile 檔案格式:通常使用SequenceFile(預設),DataStream 或者 CompressedStream
(1)DataStream不能壓縮輸出檔案,請不用設定hdfs.codeC編碼解碼器。
(2)CompressedStream要求設定hdfs.codeC來制定一個有效的編碼解碼器。
hdfs.maxOpenFiles 5000 HDFS中允許開啟檔案資料,如果數量超過了,最老的檔案將被關閉。
hdfs.callTimeout 10000 允許HDFS操作的毫秒數,例如:open,write, flush, close。如果很多HFDS操作超時,這個配置應該增大。
hdfs.threadsPoolSize 10

每個HDFS sink的HDFS的IO操作執行緒數(例如:open,write)

hdfs.rollTimerPoolSize 1 每個HDFS sink排程定時檔案滾動的執行緒數。
hdfs.kerberosPrincipal 安全訪問HDFS Kerberos的主使用者。
hdfs.kerberosKeytab 安全訪問HDFS Kerberos keytab
hdfs.proxyUser
hdfs.round false 時間戳應該被四捨五入。(如果為true,會影響所有的時間,除了t%)
hdfs.roundValue 1 四捨五入的最高倍數(單位配置在hdfs.roundUnit),但是要小於當前時間。
hdfs.roundUnit second 四捨五入的單位,包含:second,minuteorhour.
hdfs.timeZone Local Time 時區的名稱,主要用來解決目錄路徑。例如:America/Los_Angeles
hdfs.useLocalTimeStamp false 使用本地時間替換轉義字元。 (而不是event header的時間戳)
hdfs.closeTries 0 在發起一個關閉命令後,HDFS sink必須嘗試重新命名檔案的次數。如果設定為1,重新命名失敗後,HDFS sink不會再次嘗試重新命名該檔案,這個檔案處於開啟狀態,並且用.tmp作為副檔名。如果為0,Sink會一直嘗試重新命名,直至重新命名成功。如果檔案 失敗,這個檔案可能一直保持開啟狀態,但是這種情況下資料是完整的。檔案將會在Flume下次重啟時被關閉。
hdfs.retryInterval 180 在幾秒鐘之間連續嘗試關閉檔案。每個關閉請求都會有多個RPC往返Namenode,因此設定的太低可能導致Namenode超負荷,如果設定0或者更小,如果第一次嘗試失敗的話,該Sink將不會嘗試關閉檔案。並且把檔案開啟,或者用“.tmp”作為副檔名。
serializer TEXT 可能的選項包括avro_event或繼承了EventSerializer.Builder介面的類名。
serializer.*
關於round:
a1.sinks.k1.hdfs.round=true
a1.sinks.k1.hdfs.roundValue=10
a1.sinks.k1.hdfs.roundUnit=minute
上面的配置將四捨五入配置到10分鐘,例如:一個事件的時間戳是11:54:34 AM, June 12, 2012 將導致hdfs的路徑變為:/flume/events/2012-06-12/1150/00

原始碼分析

configure(Context context):主要用於載入配置檔案。

public void configure(Context context) {
    this.context = context;
    //HDFS目錄路徑,例如:hdfs://namenode/flume/webdata/,也可以用/flume/webdata/,這樣要把Hadoop的配置檔案放到classpath
    filePath = Preconditions.checkNotNull(
        context.getString("hdfs.path"), "hdfs.path is required");
    //HDFS目錄中,由Flume建立的檔案字首。
    fileName = context.getString("hdfs.filePrefix", defaultFileName);
    //檔案字尾
    this.suffix = context.getString("hdfs.fileSuffix", defaultSuffix);
    //檔案正在寫入時的字首。
    inUsePrefix = context.getString("hdfs.inUsePrefix", defaultInUsePrefix);//檔案正在寫入時的字尾。
    inUseSuffix = context.getString("hdfs.inUseSuffix", defaultInUseSuffix);
    //時區的名稱,主要用來解決目錄路徑。例如:America/Los_Angeles
    String tzName = context.getString("hdfs.timeZone");
    timeZone = tzName == null ? null : TimeZone.getTimeZone(tzName);
    rollInterval = context.getLong("hdfs.rollInterval", defaultRollInterval);//當前寫入的檔案滾動間隔,預設30秒生成一個新的檔案 (0 = 不滾動)
    rollSize = context.getLong("hdfs.rollSize", defaultRollSize);//以檔案大小觸發檔案滾動,單位位元組(0 = 不滾動)
    rollCount = context.getLong("hdfs.rollCount", defaultRollCount);
    //有多少Event後,寫到檔案才重新整理到HDFS。
    batchSize = context.getLong("hdfs.batchSize", defaultBatchSize);
    //超時多久以後關閉無效的檔案。(0 = 禁用自動關閉的空閒檔案)但是還是可能因為網路等多種原因導致,正在寫的檔案始終沒有關閉,從而產生tmp檔案
    idleTimeout = context.getInteger("hdfs.idleTimeout", 0);
    //壓縮編解碼器,可以使用:gzip, bzip2, lzo, lzop, snappy
    String codecName = context.getString("hdfs.codeC");
    //檔案格式:通常使用SequenceFile(預設), DataStream 或者 CompressedStrea
    //(1)DataStream不能壓縮輸出檔案,請不用設定hdfs.codeC編碼解碼器。
    //(2)CompressedStream要求設定hdfs.codeC來制定一個有效的編碼解碼器。
    fileType = context.getString("hdfs.fileType", defaultFileType);
    //HDFS中允許開啟檔案的資料,如果數量超過了,最老的檔案將被關閉。
    maxOpenFiles = context.getInteger("hdfs.maxOpenFiles", defaultMaxOpenFiles);
    //允許HDFS操作的毫秒數,例如:open,write, flush, close。如果很多HFDS操作超時,這個配置應該增大。
    callTimeout = context.getLong("hdfs.callTimeout", defaultCallTimeout);
    //允許HDFS操作的毫秒數,例如:open,write, flush, close。如果很多HFDS操作超時,這個配置應該增大。
    //每個HDFS sink的HDFS的IO操作執行緒數(例如:open,write) 
    threadsPoolSize = context.getInteger("hdfs.threadsPoolSize", defaultThreadPoolSize);
    //每個HDFS sink排程定時檔案滾動的執行緒數。
    rollTimerPoolSize = context.getInteger("hdfs.rollTimerPoolSize", defaultRollTimerPoolSize);
    //每個HDFS sink排程定時檔案滾動的執行緒數。
    String kerbConfPrincipal = context.getString("hdfs.kerberosPrincipal");
    //安全認證
    String kerbKeytab = context.getString("hdfs.kerberosKeytab");
    String proxyUser = context.getString("hdfs.proxyUser");
    tryCount = context.getInteger("hdfs.closeTries", defaultTryCount);
    if(tryCount <= 0) {
      LOG.warn("Retry count value : " + tryCount + " is not " +
        "valid. The sink will try to close the file until the file " +
        "is eventually closed.");
      tryCount = defaultTryCount;
    }
    retryInterval = context.getLong("hdfs.retryInterval",
      defaultRetryInterval);
    if(retryInterval <= 0) {
      LOG.warn("Retry Interval value: " + retryInterval + " is not " +
        "valid. If the first close of a file fails, " +
        "it may remain open and will not be renamed.");
      tryCount = 1;
    }

    Preconditions.checkArgument(batchSize > 0,
        "batchSize must be greater than 0");
    if (codecName == null) {
      codeC = null;
      compType = CompressionType.NONE;
    } else {
      codeC = getCodec(codecName);
      // TODO : set proper compression type
      compType = CompressionType.BLOCK;
    }

    // Do not allow user to set fileType DataStream with codeC together
    // To prevent output file with compress extension (like .snappy)
    if(fileType.equalsIgnoreCase(HDFSWriterFactory.DataStreamType)
        && codecName != null) {
      throw new IllegalArgumentException("fileType: " + fileType +
          " which does NOT support compressed output. Please don't set codeC" +
          " or change the fileType if compressed output is desired.");
    }

    if(fileType.equalsIgnoreCase(HDFSWriterFactory.CompStreamType)) {
      Preconditions.checkNotNull(codeC, "It's essential to set compress codec"
          + " when fileType is: " + fileType);
    }

    // get the appropriate executor
    this.privExecutor = FlumeAuthenticationUtil.getAuthenticator(
            kerbConfPrincipal, kerbKeytab).proxyAs(proxyUser);



    //時間戳應該被四捨五入。(如果為true,會影響所有的時間,除了t%)
    needRounding = context.getBoolean("hdfs.round", false);

    if(needRounding) {
      //四捨五入的單位
      String unit = context.getString("hdfs.roundUnit", "second");
      if (unit.equalsIgnoreCase("hour")) {
        this.roundUnit = Calendar.HOUR_OF_DAY;
      } else if (unit.equalsIgnoreCase("minute")) {
        this.roundUnit = Calendar.MINUTE;
      } else if (unit.equalsIgnoreCase("second")){
        this.roundUnit = Calendar.SECOND;
      } else {
        LOG.warn("Rounding unit is not valid, please set one of" +
            "minute, hour, or second. Rounding will be disabled");
        needRounding = false;
      }
      //四捨五入的最高倍數
      this.roundValue = context.getInteger("hdfs.roundValue", 1);
      if(roundUnit == Calendar.SECOND || roundUnit == Calendar.MINUTE){
        Preconditions.checkArgument(roundValue > 0 && roundValue <= 60,
            "Round value" +
            "must be > 0 and <= 60");
      } else if (roundUnit == Calendar.HOUR_OF_DAY){
        Preconditions.checkArgument(roundValue > 0 && roundValue <= 24,
            "Round value" +
            "must be > 0 and <= 24");
      }
    }

    this.useLocalTime = context.getBoolean("hdfs.useLocalTimeStamp", false);
    if(useLocalTime) {
      clock = new SystemClock();
    }

    if (sinkCounter == null) {
      //<span style="color:#000000;">計數器</span>
      sinkCounter = new SinkCounter(getName());
    }
  }
    
按照Flume的生命週期,先啟動start方法:
  @Override
  public void start() {
    String timeoutName = "hdfs-" + getName() + "-call-runner-%d";
    //執行緒池用於event寫入HDFS檔案
    callTimeoutPool = Executors.newFixedThreadPool(threadsPoolSize,
            new ThreadFactoryBuilder().setNameFormat(timeoutName).build());

    String rollerName = "hdfs-" + getName() + "-roll-timer-%d";
    //該執行緒池用來滾動檔案
    timedRollerPool = Executors.newScheduledThreadPool(rollTimerPoolSize,
            new ThreadFactoryBuilder().setNameFormat(rollerName).build());
    //該LinkedHashMap用來儲存檔案的絕對路徑以及對應的BucketWriter
    this.sfWriters = new WriterLinkedHashMap(maxOpenFiles);
    sinkCounter.start();
    super.start();
  }
所有的Event,經Source後傳送的Channel,再由Channel傳入到Sink,主要呼叫Sink的process方法實現事務:
public Status process() throws EventDeliveryException {
    Channel channel = getChannel();//獲取Channel
    Transaction transaction = channel.getTransaction();//獲取事務
    List<BucketWriter> writers = Lists.newArrayList();//初始化BucketWriter列表,BucketWriter是操作HDFS主類。
    transaction.begin();
    try {
      int txnEventCount = 0;
      for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {//批量處理
        Event event = channel.take();//獲取Event
        if (event == null) {
          break;
        }

        // reconstruct the path name by substituting place holders
        String realPath = BucketPath.escapeString(filePath, event.getHeaders(),
            timeZone, needRounding, roundUnit, roundValue, useLocalTime);//格式化HDFS路徑,根據轉義字元
        String realName = BucketPath.escapeString(fileName, event.getHeaders(),
          timeZone, needRounding, roundUnit, roundValue, useLocalTime);//格式化檔名稱,根據轉義字元

        //寫入HDFS的絕對路徑
        String lookupPath = realPath + DIRECTORY_DELIMITER + realName;
        BucketWriter bucketWriter;
        HDFSWriter hdfsWriter = null;
        // Callback to remove the reference to the bucket writer from the
        // sfWriters map so that all buffers used by the HDFS file
        // handles are garbage collected.
        WriterCallback closeCallback = new WriterCallback() {
          @Override
          public void run(String bucketPath) {
            LOG.info("Writer callback called.");
            synchronized (sfWritersLock) {
              sfWriters.remove(bucketPath);
            }
          }
        };
        synchronized (sfWritersLock) {
          //根據HDFS的絕對路徑獲取對應的BucketWriter物件
          bucketWriter = sfWriters.get(lookupPath);
          // we haven't seen this file yet, so open it and cache the handle
          if (bucketWriter == null) {
            //初始化BuchetWriter物件
            hdfsWriter = writerFactory.getWriter(fileType);
            bucketWriter = initializeBucketWriter(realPath, realName,
              lookupPath, hdfsWriter, closeCallback);
            //放入Map
            sfWriters.put(lookupPath, bucketWriter);
          }
        }

        // track the buckets getting written in this transaction
        if (!writers.contains(bucketWriter)) {
          //如果BucketWriter列表沒有正在寫的檔案——bucketWriter,則加入
          writers.add(bucketWriter);
        }

        // Write the data to HDFS
        try {
          //將event寫入bucketWriter對應的檔案中
          bucketWriter.append(event);
        } catch (BucketClosedException ex) {
          LOG.info("Bucket was closed while trying to append, " +
            "reinitializing bucket and writing event.");
          hdfsWriter = writerFactory.getWriter(fileType);
          bucketWriter = initializeBucketWriter(realPath, realName,
            lookupPath, hdfsWriter, closeCallback);
          synchronized (sfWritersLock) {
            sfWriters.put(lookupPath, bucketWriter);
          }
          bucketWriter.append(event);
        }
      }

      if (txnEventCount == 0) {
        //這次事務沒有處理任何event
        sinkCounter.incrementBatchEmptyCount();
      } else if (txnEventCount == batchSize) {
        //一次處理batchSize個event
        sinkCounter.incrementBatchCompleteCount();
      } else {
        //channel中剩餘的events不足batchSize
        sinkCounter.incrementBatchUnderflowCount();
      }

      // flush all pending buckets before committing the transaction
      //獲取List裡面的BucketWriter的所有資料都重新整理到HDFS
      for (BucketWriter bucketWriter : writers) {
        //如果使用轉義字元生成檔名或路徑,可能還沒有滿足其他滾動生成新檔案的條件,就有新檔案產生,
        //在這種情況下,例如為hdfs.idleTimeout=0,那麼就可能會在HDFS中出現很多.tmp字尾的檔案。因為呼叫flush沒有關閉該檔案。
        bucketWriter.flush();
      }
      //提交事務
      transaction.commit();

      if (txnEventCount < 1) {
        return Status.BACKOFF;
      } else {
        sinkCounter.addToEventDrainSuccessCount(txnEventCount);
        return Status.READY;
      }
    } catch (IOException eIO) {
      transaction.rollback();//事務回滾
      LOG.warn("HDFS IO error", eIO);
      return Status.BACKOFF;
    } catch (Throwable th) {
      transaction.rollback();
      LOG.error("process failed", th);
      if (th instanceof Error) {
        throw (Error) th;
      } else {
        throw new EventDeliveryException(th);
      }
    } finally {
      transaction.close();//關閉事務
    }
  }

HDFS Sink流程分析:

1,通過configure(Context context)和start()方法初始化Sink

2,SinkRunner的執行緒呼叫process()方法,迴圈處理批量的Event,如果Event為null,就跳出迴圈。

3,有Event資料,先格式化HDFS的檔案路徑和檔名,即:realPath和realName。realPath+realName就是完整HDFS路徑:lookupPath,然後根據lookupPath獲取BucketWriter物件。

4,BucketWriter物件不存在,則先構建根據fileType構建一個HDFSWriter 物件。然後初始化BucketWriter物件。最後將物件放到sfWriters中,表示正在寫的檔案。

  public HDFSWriter getWriter(String fileType) throws IOException {
    if (fileType.equalsIgnoreCase(SequenceFileType)) {
      //通過SequenceFile.Writer寫入檔案
      return new HDFSSequenceFile();
    } else if (fileType.equalsIgnoreCase(DataStreamType)) {
      //通過FSDataOutputStream
      return new HDFSDataStream();
    } else if (fileType.equalsIgnoreCase(CompStreamType)) {
      return new HDFSCompressedDataStream();
    } else {
      throw new IOException("File type " + fileType + " not supported");
    }
  }

        HDFSSequenceFile:configure(context)方法會首先獲取寫入格式writeFormat即引數"hdfs.writeFormat",org.apache.flume.sink.hdfs.SequenceFileSerializerType定義了一下三個:

  Writable(HDFSWritableSerializer.Builder.class),//預設的
  Text(HDFSTextSerializer.Builder.class),
  Other(null);

再獲取是否使用HDFS本地檔案系統"hdfs.useRawLocalFileSystem",預設是flase不使用;然後獲取writeFormat的所有配置資訊serializerContext;然後根據writeFormat和serializerContext構造SequenceFileSerializer的物件serializer。

  HDFSDataStream:configure(context)方法先獲取serializerType型別,預設是TEXT(BodyTextEventSerializer.Builder.class),其他的還包含:

public enum EventSerializerType {
  TEXT(BodyTextEventSerializer.Builder.class),
  HEADER_AND_TEXT(HeaderAndBodyTextEventSerializer.Builder.class),
  AVRO_EVENT(FlumeEventAvroEventSerializer.Builder.class),
  OTHER(null);

再獲取是否使用HDFS本地檔案系統"hdfs.useRawLocalFileSystem",預設是flase不使用;最後獲取serializer的所有配置資訊serializerContext。serializer的例項化在HDFSDataStream.doOpen(Configuration conf, Path dstPath, FileSystem hdfs)方法中實現的。

        HDFSCompressedDataStream:configure和HDFSDataStream.configure(context)類似,serializerType的型別也一樣。serializer的例項化是在HDFSCompressedDataStream.open(String filePath, CompressionCodec codec, CompressionType cType)方法中實現。

5,bucketWriter例項化後存放到sfWriters中,並且判斷是否在writers變數的List中,如果不存在,就放入List,這樣後面就可以對bucketWriter統一flush了。

6,bucketWriter.append(event);

  public synchronized void append(final Event event)
          throws IOException, InterruptedException {
    checkAndThrowInterruptedException();//檢查當前執行緒是否被中斷
    // If idleFuture is not null, cancel it before we move forward to avoid a
    // close call in the middle of the append.
    if(idleFuture != null) {
      idleFuture.cancel(false);
      // There is still a small race condition - if the idleFuture is already
      // running, interrupting it can cause HDFS close operation to throw -
      // so we cannot interrupt it while running. If the future could not be
      // cancelled, it is already running - wait for it to finish before
      // attempting to write.
      if(!idleFuture.isDone()) {
        try {
          idleFuture.get(callTimeout, TimeUnit.MILLISECONDS);
        } catch (TimeoutException ex) {
          LOG.warn("Timeout while trying to cancel closing of idle file. Idle" +
            " file close may have failed", ex);
        } catch (Exception ex) {
          LOG.warn("Error while trying to cancel closing of idle file. ", ex);
        }
      }
      idleFuture = null;
    }

    // If the bucket writer was closed due to roll timeout or idle timeout,
    // force a new bucket writer to be created. Roll count and roll size will
    // just reuse this one
    if (!isOpen) {
      if (closed) {
        throw new BucketClosedException("This bucket writer was closed and " +
          "this handle is thus no longer valid");
      }
      open();//一個檔案已經完成將isOpen設定為false,則新建一個檔案
    }

    // check if it's time to rotate the file
    if (shouldRotate()) {//檢查檔案的行數及大小,判斷是否要關閉檔案後重新生成檔案。
      boolean doRotate = true;

      if (isUnderReplicated) {
        if (maxConsecUnderReplRotations > 0 &&
            consecutiveUnderReplRotateCount >= maxConsecUnderReplRotations) {
          doRotate = false;
          if (consecutiveUnderReplRotateCount == maxConsecUnderReplRotations) {
            LOG.error("Hit max consecutive under-replication rotations ({}); " +
                "will not continue rolling files under this path due to " +
                "under-replication", maxConsecUnderReplRotations);
          }
        } else {
          LOG.warn("Block Under-replication detected. Rotating file.");
        }
        consecutiveUnderReplRotateCount++;
      } else {
        consecutiveUnderReplRotateCount = 0;
      }

      if (doRotate) {
        close();
        open();//新建一個檔案
      }
    }

    // write the event
    try {
      sinkCounter.incrementEventDrainAttemptCount();
      callWithTimeout(new CallRunner<Void>() {
        @Override
        public Void call() throws Exception {
          writer.append(event); // could block 往HDFS寫入資料。
          return null;
        }
      });
    } catch (IOException e) {
      LOG.warn("Caught IOException writing to HDFSWriter ({}). Closing file (" +
          bucketPath + ") and rethrowing exception.",
          e.getMessage());
      try {
        close(true);
      } catch (IOException e2) {
        LOG.warn("Caught IOException while closing file (" +
             bucketPath + "). Exception follows.", e2);
      }
      throw e;
    }

    // update statistics
    processSize += event.getBody().length;
    eventCounter++;
    batchCounter++;

    if (batchCounter == batchSize) {
      flush();
    }
  }

開啟新檔案分為兩類:

第一類不需要壓縮

  public void open(String filePath) throws IOException {
    open(filePath, null, CompressionType.NONE);
  }

第二類要壓縮
  public void open(String filePath, CompressionCodec codeC,
      CompressionType compType) throws IOException {
    Configuration conf = new Configuration();
    Path dstPath = new Path(filePath);
    FileSystem hdfs = dstPath.getFileSystem(conf);
    open(dstPath, codeC, compType, conf, hdfs);
  }
注:HDFSDataStream是不支援壓縮的,所以直接呼叫第一類的open方法。

在open方法中,如果按時間滾動的rollInterval不為0,則建立Callable,放入timedRollFuture中rollInterval秒之後關閉檔案,預設是30s寫一個檔案。

最後writer.append(event)是真正寫資料到HDFS,writer分如下三種情況:

HDFSSequenceFile:append(event)方法,會先通過serializer.serialize(e)把event處理成一個Key和一個Value。

serializer為HDFSWritableSerializer:

Key:

private Object getKey(Event e) {
    String timestamp = e.getHeaders().get("timestamp");//獲取header的timesteamp
    long eventStamp;

    if (timestamp == null) {//timestamp不存在就拿系統的當前時間
      eventStamp = System.currentTimeMillis();
    } else {
      eventStamp = Long.valueOf(timestamp);
    }
    return new LongWritable(eventStamp);//將時間封裝成LongWritable
  }
Value:
  private BytesWritable makeByteWritable(Event e) {
    BytesWritable bytesObject = new BytesWritable();
    bytesObject.set(e.getBody(), 0, e.getBody().length);
    return bytesObject;
  }
serializer為HDFSTextSerializer:

Key同上,Value:

  private Text makeText(Event e) {
    Text textObject = new Text();
    textObject.set(e.getBody(), 0, e.getBody().length);
    return textObject;
  }

writer為HDFSDataStream:

直接呼叫serializer.write(e),serializer分三種:

org.apache.flume.serialization.BodyTextEventSerializer直接讀取body寫入OutputStream流中,然後在最後加"\n"。

org.apache.flume.serialization.HeaderAndBodyTextEventSerializer將e.getHeaders() + " " +e.getBody()寫入資料流,然後根據配置看是否要加"\n"

org.apache.flume.serialization.AvroEventSerializer將event整體寫入dataFileWriter。

然後appned方法更新統計,processSize統計檔案大小;eventCounter統計檔案行數;batchCounter是統計最近一次flush之後的處理的event數;

如果處理的event數量達到batchSize的大小,則重新整理到HDFS,flush()方法會首先執行writer.sync()即寫入HDFS,然後將batchCounter置為0,根據fileType的不同writer也會有很多寫入型別:

HDFSSequenceFile:sync()方法執行SequenceFile.Writer.syncFs()將資料寫入HDFS中;
HDFSDataStream:sync()方法執行
HDFSCompressedDataStream:sync()方法先執行serializer.flush():只有FlumeEventAvroEventSerializer的flush()方法也有實現dataFileWriter.flush(),其他倆BodyTextEventSerializer和HeaderAndBodyTextEventSerializer均未實現flush()方法。然後執行outStream.flush()和outStream.sync()將資料重新整理至HDFS中。

7,回到HDFSEventSink.process()方法中,會根據這次事務處理的event數量更新相應的統計;

8,遍歷writers,挨個重新整理BucketWriter至HDFS;

9,最後提交事務,異常回滾,關閉事務。

最後停止:

  @Override
  public void stop() {
    // do not constrain close() calls with a timeout
    synchronized (sfWritersLock) {//獲取物件鎖
      //遍歷物件鎖
      for (Entry<String, BucketWriter> entry : sfWriters.entrySet()) {
        LOG.info("Closing {}", entry.getKey());
        //關閉BucketWriter,flush到HDFS
        try {
          entry.getValue().close();
        } catch (Exception ex) {
          LOG.warn("Exception while closing " + entry.getKey() + ". " +
                  "Exception follows.", ex);
          if (ex instanceof InterruptedException) {
            Thread.currentThread().interrupt();
          }
        }
      }
    }

    // shut down all our thread pools
    ExecutorService toShutdown[] = {callTimeoutPool, timedRollerPool};
    for (ExecutorService execService : toShutdown) {
      execService.shutdown();
      try {
        while (execService.isTerminated() == false) {
          execService.awaitTermination(
                  Math.max(defaultCallTimeout, callTimeout), TimeUnit.MILLISECONDS);
        }
      } catch (InterruptedException ex) {
        LOG.warn("shutdown interrupted on " + execService, ex);
      }
    }

    callTimeoutPool = null;
    timedRollerPool = null;

    synchronized (sfWritersLock) {
      sfWriters.clear();
      sfWriters = null;
    }
    sinkCounter.stop();
    super.stop();
  }