Flume - FileChannel原始碼詳解
FileChannel的簡易類結構:
FileChannel的內部事務類,FileBackedTransaction:
檔案操作類:LogFile(LogFileV2在1.7已經被捨棄):
還有其他幾個比較重要的類:
FlumeEventQueue,LogFile,Log,LogUtils。
一,初始化過程:public void configure(Context context)
1,useDualCheckpoints(是否需要備份檢查點)
2,compressBackupCheckpoint(是否壓縮備份節點)
3,checkpointDir(檢查點目錄,預設在${user.home}目錄下)
4,dataDirs(資料節點目錄)
5,capacity(獲取配置的容量)
6,keepAlive(超時時間,就是如果channel中沒有資料最長等待時間)
7,transactionCapacity(事務的最大容量)
注意:capacity的值一定要大於transactionCapacity,不然會報錯,看原始碼:
[java] view plain copy- Preconditions.checkState(transactionCapacity <= capacity,
- "File Channel transaction capacity cannot be greater than the " +
- "capacity of the channel.");
9,maxFileSize(最大檔案的大小,預設是1.5G)
10,minimumRequiredSpace(最少需要多少空間,預設是500M)
11,useLogReplayV1(使用舊重放邏輯)
12,useFastReplay(不使用佇列重放)
13,keyProvider(KEY供應商的型別,支援的型別:JCEKSFILE)
14,activeKey(用於加密新資料的金鑰名稱)
15,cipherProvider(加密提供程式型別,支援的型別:AESCTRNOPADDING)
二,start()方法:
[java] view plain copy- @Override
- public synchronized void start() {
- LOG.info("Starting {}...", this);
- try {
- Builder builder = new Log.Builder();
- builder.setCheckpointInterval(checkpointInterval);
- builder.setMaxFileSize(maxFileSize);
- builder.setMinimumRequiredSpace(minimumRequiredSpace);
- builder.setQueueSize(capacity);
- builder.setCheckpointDir(checkpointDir);
- builder.setLogDirs(dataDirs);
- builder.setChannelName(getName());
- builder.setUseLogReplayV1(useLogReplayV1);
- builder.setUseFastReplay(useFastReplay);
- builder.setEncryptionKeyProvider(encryptionKeyProvider);
- builder.setEncryptionKeyAlias(encryptionActiveKey);
- builder.setEncryptionCipherProvider(encryptionCipherProvider);
- builder.setUseDualCheckpoints(useDualCheckpoints);
- builder.setCompressBackupCheckpoint(compressBackupCheckpoint);
- builder.setBackupCheckpointDir(backupCheckpointDir);
- builder.setFsyncPerTransaction(fsyncPerTransaction);
- builder.setFsyncInterval(fsyncInterval);
- builder.setCheckpointOnClose(checkpointOnClose);//以上是將configure方法獲取到的引數,set到Builder物件
- log = builder.build();
- //builder.build();方法通過Builder建立Log物件
- //並且嘗試獲取checkpointDir和dataDir檔案鎖,Log類中的private void lock(File dir) throws IOException方法就是用來嘗試過去鎖的
- log.replay();
- //1,首先獲取到checkpointDir的寫鎖
- //2,獲取最大的fileID
- //3,讀取log檔案根據record的型別進行相應的操作,進行恢復;遍歷所有的data目錄
- //4,將queue重新整理到相關檔案
- open = true;//表示開啟channel
- int depth = getDepth();
- Preconditions.checkState(queueRemaining.tryAcquire(depth),
- "Unable to acquire " + depth + " permits " + channelNameDescriptor);
- LOG.info("Queue Size after replay: " + depth + " "
- + channelNameDescriptor);
- } catch (Throwable t) {
- open = false;
- startupError = t;
- LOG.error("Failed to start the file channel " + channelNameDescriptor, t);
- if (t instanceof Error) {
- throw (Error) t;
- }
- }
- if (open) {
- //計數器開始統計
- channelCounter.start();
- channelCounter.setChannelSize(getDepth());
- channelCounter.setChannelCapacity(capacity);
- }
- super.start();
- }
- workerExecutor.scheduleWithFixedDelay(new BackgroundWorker(this),
- this.checkpointInterval, this.checkpointInterval,
- TimeUnit.MILLISECONDS);
[java] view plain copy
- static class BackgroundWorker implements Runnable {
- private static final Logger LOG = LoggerFactory
- .getLogger(BackgroundWorker.class);
- private final Log log;
- public BackgroundWorker(Log log) {
- this.log = log;
- }
- @Override
- public void run() {
- try {
- if (log.open) {
- log.writeCheckpoint();
- //將checpoint、inflightTakes、inflightPuts都重新整理至磁碟,先後將inflightPuts、inflightTakes、checkpoint.meta重建,
- //更新checkpoint檔案並重新整理至磁碟,這些檔案都在checkpointDir目錄下;更新log-ID.meta檔案;同時肩負起刪除log檔案及其對應的meta檔案的責任。
- }
- } catch (IOException e) {
- LOG.error("Error doing checkpoint", e);
- } catch (Throwable e) {
- LOG.error("General error in checkpoint worker", e);
- }
- }
- }
三,事務
很多方法和Memory的事務類相似。如:doTake(),doCommit(),doRollback(),doPut()
下面詳細的介紹這幾個方法。
1,doPut():source會呼叫put方法
- @Override
- protected void doPut(Event event) throws InterruptedException {
- channelCounter.incrementEventPutAttemptCount();
- if(putList.remainingCapacity() == 0) {//是否有剩餘空間
- throw new ChannelException("Put queue for FileBackedTransaction " +
- "of capacity " + putList.size() + " full, consider " +
- "committing more frequently, increasing capacity or " +
- "increasing thread count. " + channelNameDescriptor);
- }
- // this does not need to be in the critical section as it does not
- // modify the structure of the log or queue.
- if(!queueRemaining.tryAcquire(keepAlive, TimeUnit.SECONDS)) {//嘗試等待
- throw new ChannelFullException("The channel has reached it's capacity. "
- + "This might be the result of a sink on the channel having too "
- + "low of batch size, a downstream system running slower than "
- + "normal, or that the channel capacity is just too low. "
- + channelNameDescriptor);
- }
- boolean success = false;
- log.lockShared();//獲取checkpoint的讀鎖,doTake()方法也會獲取讀鎖,所以doTake和doPut只能操作一個,無法同時操作。
- try {
- //transactionID是在TransactionIDOracle類中遞增的
- FlumeEventPointer ptr = log.put(transactionID, event);//將Event寫入資料檔案,使用RandomAccessFile。資料會快取到inflightputs檔案中
- Preconditions.checkState(putList.offer(ptr), "putList offer failed "
- + channelNameDescriptor);
- queue.addWithoutCommit(ptr, transactionID);//指標和事務ID加入到queue佇列中。
- success = true;
- } catch (IOException e) {
- throw new ChannelException("Put failed due to IO error "
- + channelNameDescriptor, e);
- } finally {
- log.unlockShared();//釋放讀鎖
- if(!success) {
- // release slot obtained in the case
- // the put fails for any reason
- queueRemaining.release();//釋放訊號量
- }
- }
- }
2,doTake():sink會呼叫put方法 [java] view plain copy
- <pre name="code" class="java"> protected Event doTake() throws InterruptedException {
- channelCounter.incrementEventTakeAttemptCount();
- if(takeList.remainingCapacity() == 0) {
- throw new ChannelException("Take list for FileBackedTransaction, capacity " +
- takeList.size() + " full, consider committing more frequently, " +
- "increasing capacity, or increasing thread count. "
- + channelNameDescriptor);
- }
- log.lockShared();//獲取鎖
- /*
- * 1. Take an event which is in the queue.
- * 2. If getting that event does not throw NoopRecordException,
- * then return it.
- * 3. Else try to retrieve the next event from the queue
- * 4. Repeat 2 and 3 until queue is empty or an event is returned.
- */
- try {
- while (true) {
- FlumeEventPointer ptr = queue.removeHead(transactionID);//獲取檔案指標,ptr的資料結構是fileID和offset
- if (ptr == null) {
- return null;
- } else {
- try {
- // first add to takeList so that if write to disk
- // fails rollback actually does it's work
- Preconditions.checkState(takeList.offer(ptr),
- "takeList offer failed "
- + channelNameDescriptor);
- log.take(transactionID, ptr); // write take to disk
- Event event = log.get(ptr);//根據檔案指標,使用log物件在磁碟中獲取到Event。資料會快取到inflighttakes檔案中
- return event;
- } catch (IOException e) {
- throw new ChannelException("Take failed due to IO error "
- + channelNameDescriptor, e);
- } catch (NoopRecordException e) {
- LOG.warn("Corrupt record replaced by File Channel Integrity " +
- "tool found. Will retrieve next event", e);
- takeList.remove(ptr);
- } catch (CorruptEventException ex) {
- if (fsyncPerTransaction) {
- throw new ChannelException(ex);
- }
- LOG.warn("Corrupt record found. Event will be " +
- "skipped, and next event will be read.", ex);
- takeList.remove(ptr);
- }
- }
- }
- } finally {
- log.unlockShared();//釋放鎖
- }
- }
- @Override
- protected void doCommit() throws InterruptedException {
- int puts = putList.size();
- int takes = takeList.size();
- if(puts > 0) {//puts和takes不能同時都>0,其中有一個得是等於零
- Preconditions.checkState(takes == 0, "nonzero puts and takes "
- + channelNameDescriptor);
- log.lockShared();//獲取鎖
- try {
- log.commitPut(transactionID);//該操作會封裝成一個ByteBuffer型別寫入到檔案,
- channelCounter.addToEventPutSuccessCount(puts);
- synchronized (queue) {
- while(!putList.isEmpty()) {
- if(!queue.addTail(putList.removeFirst())) {
- StringBuilder msg = new StringBuilder();
- msg.append("Queue add failed, this shouldn't be able to ");
- msg.append("happen. A portion of the transaction has been ");
- msg.append("added to the queue but the remaining portion ");
- msg.append("cannot be added. Those messages will be consumed ");
- msg.append("despite this transaction failing. Please report.");
- msg.append(channelNameDescriptor);
- LOG.error(msg.toString());
- Preconditions.checkState(false, msg.toString());
- }
- }
- queue.completeTransaction(transactionID);//清空checkpoint資料夾中inflightputs和inflighttakes檔案的內容
- }
- } catch (IOException e) {
- throw new ChannelException("Commit failed due to IO error "
- + channelNameDescriptor, e);
- } finally {
- log.unlockShared();//釋放鎖
- }
- } else if (takes > 0) {
- log.lockShared();//釋放鎖
- try {
- log.commitTake(transactionID);//寫入data檔案
- queue.completeTransaction(transactionID);//和上面操作一樣
- channelCounter.addToEventTakeSuccessCount(takes);
- } catch (IOException e) {
- throw new ChannelException("Commit failed due to IO error "
- + channelNameDescriptor, e);
- } finally {
- log.unlockShared();
- }
- queueRemaining.release(takes);
- }
- putList.clear();
- takeList.clear();//清空兩個佇列
- channelCounter.setChannelSize(queue.getSize());
- }
4,doRollback():source和sink都會呼叫該方法回滾資料
- @Override
- protected void doRollback() throws InterruptedException {
- int puts = putList.size();
- int takes = takeList.size();
- log.lockShared();
- try {
- if(takes > 0) {
- Preconditions.checkState(puts == 0, "nonzero puts and takes "
- + channelNameDescriptor);
- synchronized (queue) {
- while (!takeList.isEmpty()) {
- Preconditions.checkState(queue.addHead(takeList.removeLast()),
- "Queue add failed, this shouldn't be able to happen "
- + channelNameDescriptor);
- }
- }
- }
- putList.clear();
- takeList.clear();
- queue.completeTransaction(transactionID);
- channelCounter.setChannelSize(queue.getSize());
- log.rollback(transactionID);//也是封裝成ByteBuffer,寫入到快取檔案中。
- } catch (IOException e) {
- throw new ChannelException("Commit failed due to IO error "
- + channelNameDescriptor, e);
- } finally {
- log.unlockShared();
- // since rollback is being called, puts will never make it on
- // to the queue and we need to be sure to release the resources
- queueRemaining.release(puts);
- }
- }
Flame的FileChannel在系統崩潰的時候保證資料的完整性和一致性,其實是通過JDK的位元組通道實現的(java.nio.channels.FileChannel),位元組通道為了保證資料在系統崩潰之後不丟失資料,檔案的修改模式會被強制到底層儲存裝置。
最後看下Flume FileChannel的檔案結構:
checkpoint目錄:
checkpoint:存放Event在那個data檔案logFileID的什麼位置offset等資訊。
inflighttakes:存放的是事務take的快取資料,每隔段時間就重建檔案。
內容:
1、16位元組是校驗碼;
2、transactionID1+eventsCount1+eventPointer11+eventPointer12+...;
3、transactionID2+eventsCount2+eventPointer21+eventPointer22+...
inflightputs:存放的是事務對應的put快取資料,每隔段時間就重建檔案。
內容:
1、16位元組是校驗碼;
2、transactionID1+eventsCount1+eventPointer11+eventPointer12+...;
3、transactionID2+eventsCount2+eventPointer21+eventPointer22+...
checkpoint.meta:主要儲存的是logfileID及對應event的數量等資訊。
data目錄:
log-ID.meta:主要記錄log-ID下一個寫入位置以及logWriteOrderID等資訊。
log-ID:資料檔案,目錄裡資料檔案保持不超過2個。
FileChannel實現比較複雜,先寫這麼多,以後有需要細細瞭解。
http://blog.csdn.net/qianshangding0708/article/details/48133033