1. 程式人生 > >Flume 原始碼解析:HDFS Sink

Flume 原始碼解析:HDFS Sink

Apache Flume 資料流程的最後一部分是 Sink,它會將上游抽取並轉換好的資料輸送到外部儲存中去,如本地檔案、HDFS、ElasticSearch 等。本文將通過分析原始碼來展現 HDFS Sink 的工作流程。

Sink 元件的生命週期

在上一篇文章中, 我們瞭解到 Flume 元件都會實現 LifecycleAware 介面,並由 LifecycleSupervisor 例項管理和監控。不過,Sink 元件並不直接由它管理,而且被包裝在了 SinkRunnerSinkProcessor 這兩個類中。Flume 支援三種 Sink 處理器,該處理器會將 Channel 和 Sink 以不同的方式連線起來。這裡我們只討論 DefaultSinkProcessor

的情況,即一個 Channel 只會連線一個 Sink。同時,我們也將略過對 Sink 分組的討論。

Sink Component LifeCycle

HDFS Sink 模組中的類

HDFS Sink 模組的原始碼在 flume-hdfs-sink 子目錄中,主要由以下幾個類組成:

HDFS Sink Classes

HDFSEventSink 類實現了生命週期的各個方法,包括 configurestartprocessstop 等。它啟動後會維護一組 BucketWriter 例項,每個例項對應一個 HDFS 輸出檔案路徑,上游的訊息會傳遞給它,並寫入 HDFS。通過不同的 HDFSWriter 實現,它可以將資料寫入文字檔案、壓縮檔案、或是 SequenceFile

配置與啟動

Flume 配置檔案載入時,會例項化各個元件,並呼叫它們的 configure 方法,其中就包括 Sink 元件。在 HDFSEventSink#configure 方法中,程式會讀取配置檔案中以 hdfs. 為開頭的專案,為其提供預設值,並做基本的引數校驗。如,batchSize 必須大於零,fileType 指定為 CompressedStreamcodeC 引數也必須指定等等。同時,程式還會初始化一個 SinkCounter,用於統計執行過程中的各項指標。

public void configure(Context context) {
  filePath = Preconditions.
checkNotNull( context.getString("hdfs.path"), "hdfs.path is required"); rollInterval = context.getLong("hdfs.rollInterval", defaultRollInterval); if (sinkCounter == null) { sinkCounter = new SinkCounter(getName()); } }

HDFSEventSink#start 方法中會建立兩個執行緒池:callTimeoutPool 執行緒池會在 BucketWriter#callWithTimeout 方法中使用,用來限定 HDFS 遠端呼叫的請求時間,如 FileSystem#createFSDataOutputStream#hflush 都有可能超時;timedRollerPool 則用於對檔案進行滾動,前提是使用者配置了 rollInterval 選項,我們將在下一節詳細說明。

public void start() {
  callTimeoutPool = Executors.newFixedThreadPool(threadsPoolSize,
      new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
  timedRollerPool = Executors.newScheduledThreadPool(rollTimerPoolSize,
      new ThreadFactoryBuilder().setNameFormat(rollerName).build());
}

處理資料

process 方法包含了 HDFS Sink 的主要邏輯,也就是從上游的 Channel 中獲取資料,並寫入指定的 HDFS 檔案,流程圖如下:

Process Method Flow Chart

Channel 事務

處理邏輯的外層是一個 Channel 事務,並提供了異常處理。以 Kafka Channel 為例:事務開始時,程式會從 Kafka 中讀取資料,但不會立刻提交變動後的偏移量。只有當這些訊息被成功寫入 HDFS 檔案之後,偏移量才會提交給 Kafka,下次迴圈將從新的偏移量開始消費。

Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
transaction.begin()
try {
  event = channel.take();
  bucketWriter.append(event);
  transaction.commit()
} catch (Throwable th) {
  transaction.rollback();
  throw new EventDeliveryException(th);
} finally {
  transaction.close();
}

查詢或建立 BucketWriter

BucketWriter 例項和 HDFS 檔案一一對應,檔案路徑是通過配置生成的,例如:

a1.sinks.access_log.hdfs.path = /user/flume/access_log/dt=%Y%m%d
a1.sinks.access_log.hdfs.filePrefix = events.%[localhost]
a1.sinks.access_log.hdfs.inUsePrefix = .
a1.sinks.access_log.hdfs.inUseSuffix = .tmp
a1.sinks.access_log.hdfs.rollInterval = 300
a1.sinks.access_log.hdfs.fileType = CompressedStream
a1.sinks.access_log.hdfs.codeC = lzop

以上配置生成的臨時檔案和目標檔案路徑為:

/user/flume/access_log/dt=20180925/.events.hostname1.1537848761307.lzo.tmp
/user/flume/access_log/dt=20180925/events.hostname1.1537848761307.lzo
  • %{...}:使用訊息中的頭資訊進行替換;
  • %[...]:目前僅支援 %[localhost]%[ip]、以及 %[fqdn]
  • %x:日期佔位符,通過頭資訊中的 timestamp 來生成,或者使用 useLocalTimeStamp 配置項。

檔案的前後綴則是在 BucketWriter#open 方法中追加的。程式碼中的 counter 是當前檔案的建立時間戳,lzo 則是當前壓縮格式的預設檔案字尾。

String fullFileName = fileName + "." + counter;
fullFileName += fileSuffix;
fullFileName += codeC.getDefaultExtension();
bucketPath = filePath + "/" + inUsePrefix + fullFileName + inUseSuffix;
targetPath = filePath + "/" + fullFileName;

如果指定路徑沒有對應的 BucketWriter 例項,程式會建立一個,並根據 fileType 配置項來生成對應的 HDFSWriter 例項。Flume 支援的三種類型是:HDFSSequenceFileHDFSDataStream、以及 HDFSCompressedDataStream,寫入 HDFS 的動作是由這些類中的程式碼完成的。

bucketWriter = sfWriters.get(lookupPath);
if (bucketWriter == null) {
  hdfsWriter = writerFactory.getWriter(fileType);
  bucketWriter = new BucketWriter(hdfsWriter);
  sfWriters.put(lookupPath, bucketWriter);
}

寫入資料並重新整理

在寫入資料之前,BucketWriter 首先會檢查檔案是否已經開啟,如未開啟則會命關聯的 HDFSWriter 類開啟新的檔案,以 HDFSCompressedDataStream 為例:

public void open(String filePath, CompressionCodec codec) {
  FileSystem hdfs = dstPath.getFileSystem(conf);
  fsOut = hdfs.append(dstPath)
  compressor = CodedPool.getCompressor(codec, conf);
  cmpOut = codec.createOutputStream(fsOut, compressor);
  serializer = EventSerializerFactory.getInstance(serializerType, cmpOut);
}

public void append(Event e) throws IO Exception {
  serializer.write(event);
}

Flume 預設的 serializerType 配置是 TEXT,即使用 BodyTextEventSerializer 來序列化資料,不做加工,直接寫進輸出流:

public void write(Event e) throws IOException {
  out.write(e.getBody());
  if (appendNewline) {
    out.write('\n');
  }
}

BucketWriter 需要關閉或重開時會呼叫 HDFSWriter#sync 方法,進而執行序列化例項和輸出流例項上的 flush 方法:

public void sync() throws IOException {
  serializer.flush();
  compOut.finish();
  fsOut.flush();
  hflushOrSync(fsOut);
}

從 Hadoop 0.21.0 開始,Syncable#sync 拆分成了 hflushhsync 兩個方法,前者只是將資料從客戶端的快取中刷新出去,後者則會保證資料已被寫入 HDFS 本地磁碟。為了相容新老 API,Flume 會通過 Java 反射機制來確定 hflush 是否存在,不存在則呼叫 sync 方法。上述程式碼中的 flushOrSync 正是做了這樣的判斷。

檔案滾動

HDFS Sink 支援三種滾動方式:按檔案大小、按訊息數量、以及按時間間隔。按大小和按數量的滾動是在 BucketWriter#shouldRotate 方法中判斷的,每次 append 時都會呼叫:

private boolean shouldRotate() {
  boolean doRotate = false;
  if ((rollCount > 0) && (rollCount <= eventCounter)) {
    doRotate = true;
  }
  if ((rollSize > 0) && (rollSize <= processSize)) {
    doRotate = true;
  }
  return doRotate;
}

按時間滾動則是使用了上文提到的 timedRollerPool 執行緒池,通過啟動一個定時執行緒來實現:

private void open() throws IOException, InterruptedException {
  if (rollInterval > 0) {
    Callable<Void> action = new Callable<Void>() {
      public Void call() throws Exception {
        close(true);
      }
    };
    timedRollFuture = timedRollerPool.schedule(action, rollInterval);
  }
}

關閉與停止

HDFSEventSink#close 被觸發時,它會遍歷所有的 BucketWriter 例項,呼叫它們的 close 方法,進而關閉下屬的 HDFSWriter。這個過程和 flush 類似,只是還會做一些額外操作,如關閉後的 BucketWriter 會將自身從 sfWriters 雜湊表中移除:

public synchronized void close(boolean callCloseCallback) {
  writer.close();
  timedRollFuture.cancel(false);
  onCloseCallback.run(onCloseCallbackPath);
}

onCloseCallback 回撥函式是在 HDFSEventSink 初始化 BucketWriter 時傳入的:

WriterCallback closeCallback = new WriterCallback() {
  public void run(String bucketPath) {
      synchronized (sfWritersLock) {
        sfWriters.remove(bucketPath);
      }
  }
}
bucketWriter = new BucketWriter(lookPath, closeCallback);

最後,HDFSEventSink 會關閉 callTimeoutPooltimedRollerPool 執行緒池,整個元件隨即停止。

ExecutorService[] toShutdown = { callTimeoutPool, timedRollerPool };
for (ExecutorService execService : toShutdown) {
  execService.shutdown();
}

參考資料