Flume效能優化實踐
Flume效能優化實踐
最近公司落地Flume日誌採集著實反覆了好久,簡單記錄一下效能優化的核心思路。
初始配置所有batch size、transaction size都是1000,channel的capactiy是10000。
版本一
最初我是按Memory Channel做壓測,Taildir的source採集增量日誌,Memory Channel緩衝資料,Kafka Sink傳送資料。
這裡面的瓶頸是Kafka Sink,因為Kafka Sink是單執行緒同步傳送,網路延遲就會導致吞吐上不去,大概10MB+的一個吞吐就封頂了。
版本二
翻看了官方文件,打算試驗一下sink group來實現多個kafka sink同時傳送,結果效能仍舊10MB+。
分析原理,原來sink group仍舊是個單執行緒sink,相當於多個kafka sink的代理而已,僅僅實現了輪轉負載均衡功能。
一個kafka sink的傳送延遲高,輪轉壓根沒有意義。
版本三
於是琢磨如何實現多執行緒跑多個Kafka Sink,於是仍舊使用1個Memory Channel,配置對應3個Kafka Sink,結果頻寬可以升高到30MB的樣子,但是極不穩定,來回跳躍。
此時發現Memory Channel的填充率接近90%+,應該是因為容量經常塞滿導致的流水線阻塞,通過增加memory channel的capacity到10萬,batch size和transaction size增加到1萬,吞吐提升到60MB~80MB+,填充率小於10%,已經滿足需求。
在transaction size=1000的情況下memory channel被填滿,而transaction size=1萬的情況下memory channel就不會被填滿,其實是通過增加channel批處理的包大小,降低了channel訪問的頻次,解決的是memory channel的鎖瓶頸。
同時,這個優化思路也帶來了問題,更大的memory channel capacity帶來了更大的資料丟失風險,因為宕機時memory channel裡緩衝的資料都會丟失。
版本四
實現多個memory channel輪轉,每個memory channel由一個kafka sink消費。
這樣做目的有2個:
- 由多個sink競爭消費1個channel改為各自消費1個channel,鎖瓶頸解決。
- 因為鎖瓶頸變小,所以可以仍舊保持較小的channel capacity來保障資料可靠性,比如每個channel容量10000,那麼3個channel丟失3萬,仍舊優於”版本三”。
實現該功能需要自己開發channel selector外掛,實現source流量的輪轉分發,可以翻看我之前寫的部落格。
版本五
同事要求使用file channel,保障佇列中資料的可靠性,但是經過測試發現吞吐只能跑到10MB左右,上述所說優化手段均無效。
更換SSD盤也沒有帶來任何提升,File channel自身填充率極低。
個人懷疑瓶頸在File Channel自身,其事務的提交效率太低,阻塞了source的投遞動作,無論如何增加channel數量也無濟於事,因為source是單執行緒的,輪轉發往多個File Channel的速度仍舊等於單個File Channel速度,導致後續Sink沒有足夠資料消費,吞吐無法提升。
從FileChannel程式碼來看,磁碟讀寫的相關程式碼全部被加鎖處理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
synchronized FlumeEventPointer put(ByteBuffer buffer) throws IOException { if (encryptor != null) { buffer = ByteBuffer.wrap(encryptor.encrypt(buffer.array())); } Pair<Integer, Integer> pair = write(buffer); return new FlumeEventPointer(pair.getLeft(), pair.getRight()); }
synchronized void take(ByteBuffer buffer) throws IOException { if (encryptor != null) { buffer = ByteBuffer.wrap(encryptor.encrypt(buffer.array())); } write(buffer); }
synchronized void rollback(ByteBuffer buffer) throws IOException { if (encryptor != null) { buffer = ByteBuffer.wrap(encryptor.encrypt(buffer.array())); } write(buffer); }
synchronized void commit(ByteBuffer buffer) throws IOException { if (encryptor != null) { buffer = ByteBuffer.wrap(encryptor.encrypt(buffer.array())); } write(buffer); dirty = true; lastCommitPosition = position(); } |
另外,日誌檔案的sync刷盤策略分為兩種選項,一種是每次提交事務都重新整理,另外一個是定時執行緒重新整理(下面是定時執行緒):
1 2 3 4 5 6 7 8 9 10 11 |
syncExecutor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { sync(); } catch (Throwable ex) { LOG.error("Data file, " + getFile().toString() + " could not " + "be synced to disk due to an error.", ex); } } }, fsyncInterval, fsyncInterval, TimeUnit.SECONDS); |
而這個sync()刷盤操作同樣被鎖保護的,會佔用大量的鎖時間:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
/** * Sync the underlying log file to disk. Expensive call, * should be used only on commits. If a sync has already happened after * the last commit, this method is a no-op * * @throws IOException * @throws LogFileRetryableIOException - if this log file is closed. */ synchronized void sync() throws IOException { if (!fsyncPerTransaction && !dirty) { if (LOG.isDebugEnabled()) { LOG.debug( "No events written to file, " + getFile().toString() + " in last " + fsyncInterval + " or since last commit."); } return; } if (!isOpen()) { throw new LogFileRetryableIOException("File closed " + file); } if (lastSyncPosition < lastCommitPosition) { getFileChannel().force(false); lastSyncPosition = position(); syncCount++; dirty = false; } } |
降低sync()的呼叫頻率,理論上可以降低鎖佔用時間,讓出更多的鎖時間給put與take操作。
flume可以配置這些引數,只是官方文件裡並沒有說明:
1 2 3 4 5 |
public static final String FSYNC_PER_TXN = "fsyncPerTransaction"; public static final boolean DEFAULT_FSYNC_PRE_TXN = true;
public static final String FSYNC_INTERVAL = "fsyncInterval"; public static final int DEFAULT_FSYNC_INTERVAL = 5; // seconds. |
預設是每個事務都sync,這樣當然是為了保障資料可靠性,否則也就沒必要用FileChannel了。
我嘗試改成了定時sync(),發現吞吐仍舊無法提升,那麼我繼續猜測問題在於事務的commit部分,也就是Sink做的事情:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
/** * Synchronization not required as this method is atomic * * @param transactionID * @param type * @throws IOException */ private void commit(long transactionID, short type) throws IOException { Preconditions.checkState(open, "Log is closed"); Commit commit = new Commit(transactionID, WriteOrderOracle.next(), type); ByteBuffer buffer = TransactionEventRecord.toByteBuffer(commit); int logFileIndex = nextLogWriter(transactionID); long usableSpace = logFiles.get(logFileIndex).getUsableSpace(); long requiredSpace = minimumRequiredSpace + buffer.limit(); if (usableSpace <= requiredSpace) { throw new IOException("Usable space exhausted, only " + usableSpace + " bytes remaining, required " + requiredSpace + " bytes"); } boolean error = true; try { try { LogFile.Writer logFileWriter = logFiles.get(logFileIndex); // If multiple transactions are committing at the same time, // this ensures that the number of actual fsyncs is small and a // number of them are grouped together into one. logFileWriter.commit(buffer); logFileWriter.sync(); error = false; } catch (LogFileRetryableIOException e) { if (!open) { throw e; } roll(logFileIndex, buffer); LogFile.Writer logFileWriter = logFiles.get(logFileIndex); logFileWriter.commit(buffer); logFileWriter.sync(); error = false; } |
提交事務也只是寫入一條日誌標記對應的事務完結了,這樣宕機重放日誌時就會跳過該事務。
我們發現這個操作總是sync(),雖然這個操作不需要鎖保護的樣子,但是它佔用了sink執行緒的時間,估計吞吐無法提升也離不開它的關係。
關於File Channel瓶頸,有同學有JAVA調優經驗的可以具體給FileChannel加一些除錯日誌,看看到底慢在哪個環節。
我個人會優先選擇使用capacity較小(1萬-10萬)的的memory channel配合多個sink來實現高吞吐,至於對宕機的那點擔心實在沒有必要,因為大多數時候memory channel的填充率不足1%,也就是丟失10萬*0.01=100條而已。