1. 程式人生 > >Flume - FileChannel原始碼詳解

Flume - FileChannel原始碼詳解

FileChannel在Flume是一個非常重要的Channel,FileChannel可以很好的保證資料的完整性和一致性,提供了類似mysql binlog的機制,保證機器down機,JVM異常退出時資料不丟失,在採集資料量很大的情況下,建議FileChannel設定的目錄和程式日誌檔案儲存的目錄設成不同的磁碟,以便提高效率。

FileChannel的簡易類結構:



FileChannel的內部事務類,FileBackedTransaction:


檔案操作類:LogFile(LogFileV2在1.7已經被捨棄):


還有其他幾個比較重要的類:

FlumeEventQueue,LogFile,Log,LogUtils。


一,初始化過程:public void configure(Context context)

1,useDualCheckpoints(是否需要備份檢查點)

2,compressBackupCheckpoint(是否壓縮備份節點)

3,checkpointDir(檢查點目錄,預設在${user.home}目錄下)

4,dataDirs(資料節點目錄)

5,capacity(獲取配置的容量)

6,keepAlive(超時時間,就是如果channel中沒有資料最長等待時間)

7,transactionCapacity(事務的最大容量)

注意:capacity的值一定要大於transactionCapacity,不然會報錯,看原始碼:

[java] view plain copy
  1. Preconditions.checkState(transactionCapacity <= capacity,  
  2.       "File Channel transaction capacity cannot be greater than the " +  
  3.         "capacity of the channel.");  
8, checkpointInterval(log的檢查間隔

9,maxFileSize(最大檔案的大小,預設是1.5G)

10,minimumRequiredSpace(最少需要多少空間,預設是500M)

11,useLogReplayV1(使用舊重放邏輯)

12,useFastReplay(不使用佇列重放)

13,keyProvider(KEY供應商的型別,支援的型別:JCEKSFILE)

14,activeKey(用於加密新資料的金鑰名稱)

15,cipherProvider(加密提供程式型別,支援的型別:AESCTRNOPADDING)


二,start()方法:

[java] view plain copy
  1. @Override  
  2.   public synchronized void start() {  
  3.     LOG.info("Starting {}..."this);  
  4.     try {  
  5.       Builder builder = new Log.Builder();  
  6.       builder.setCheckpointInterval(checkpointInterval);  
  7.       builder.setMaxFileSize(maxFileSize);  
  8.       builder.setMinimumRequiredSpace(minimumRequiredSpace);  
  9.       builder.setQueueSize(capacity);  
  10.       builder.setCheckpointDir(checkpointDir);  
  11.       builder.setLogDirs(dataDirs);  
  12.       builder.setChannelName(getName());  
  13.       builder.setUseLogReplayV1(useLogReplayV1);  
  14.       builder.setUseFastReplay(useFastReplay);  
  15.       builder.setEncryptionKeyProvider(encryptionKeyProvider);  
  16.       builder.setEncryptionKeyAlias(encryptionActiveKey);  
  17.       builder.setEncryptionCipherProvider(encryptionCipherProvider);  
  18.       builder.setUseDualCheckpoints(useDualCheckpoints);  
  19.       builder.setCompressBackupCheckpoint(compressBackupCheckpoint);  
  20.       builder.setBackupCheckpointDir(backupCheckpointDir);  
  21.       builder.setFsyncPerTransaction(fsyncPerTransaction);  
  22.       builder.setFsyncInterval(fsyncInterval);  
  23.       builder.setCheckpointOnClose(checkpointOnClose);//以上是將configure方法獲取到的引數,set到Builder物件  
  24.       log = builder.build();  
  25.       //builder.build();方法通過Builder建立Log物件  
  26.       //並且嘗試獲取checkpointDir和dataDir檔案鎖,Log類中的private void lock(File dir) throws IOException方法就是用來嘗試過去鎖的  
  27.       log.replay();  
  28.       //1,首先獲取到checkpointDir的寫鎖  
  29.       //2,獲取最大的fileID  
  30.       //3,讀取log檔案根據record的型別進行相應的操作,進行恢復;遍歷所有的data目錄  
  31.       //4,將queue重新整理到相關檔案  
  32.       open = true;//表示開啟channel   
  33.       int depth = getDepth();  
  34.         
  35.       Preconditions.checkState(queueRemaining.tryAcquire(depth),  
  36.           "Unable to acquire " + depth + " permits " + channelNameDescriptor);  
  37.       LOG.info("Queue Size after replay: " + depth + " "  
  38.            + channelNameDescriptor);  
  39.     } catch (Throwable t) {  
  40.       open = false;  
  41.       startupError = t;  
  42.       LOG.error("Failed to start the file channel " + channelNameDescriptor, t);  
  43.       if (t instanceof Error) {  
  44.         throw (Error) t;  
  45.       }  
  46.     }  
  47.     if (open) {  
  48.     //計數器開始統計  
  49.       channelCounter.start();  
  50.       channelCounter.setChannelSize(getDepth());  
  51.       channelCounter.setChannelCapacity(capacity);  
  52.     }  
  53.     super.start();  
  54.   }  
org.apache.flume.channel.file.Log類用來將Event寫入磁碟並將指向這些event的指標存入一個記憶體佇列FlumeEventQueue中。並且啟動一個執行緒,每過checkpointInterval毫秒寫一次檢查點log.writeCheckpoint()。 [java] view plain copy
  1. workerExecutor.scheduleWithFixedDelay(new BackgroundWorker(this),  
  2.         this.checkpointInterval, this.checkpointInterval,  
  3.         TimeUnit.MILLISECONDS);  

[java] view plain copy
  1. static class BackgroundWorker implements Runnable {  
  2.   private static final Logger LOG = LoggerFactory  
  3.       .getLogger(BackgroundWorker.class);  
  4.   private final Log log;  
  5.   
  6.   public BackgroundWorker(Log log) {  
  7.     this.log = log;  
  8.   }  
  9.   
  10.   @Override  
  11.   public void run() {  
  12.     try {  
  13.       if (log.open) {  
  14.         log.writeCheckpoint();  
  15.         //將checpoint、inflightTakes、inflightPuts都重新整理至磁碟,先後將inflightPuts、inflightTakes、checkpoint.meta重建,  
  16.         //更新checkpoint檔案並重新整理至磁碟,這些檔案都在checkpointDir目錄下;更新log-ID.meta檔案;同時肩負起刪除log檔案及其對應的meta檔案的責任。  
  17.       }  
  18.     } catch (IOException e) {  
  19.       LOG.error("Error doing checkpoint", e);  
  20.     } catch (Throwable e) {  
  21.       LOG.error("General error in checkpoint worker", e);  
  22.     }  
  23.   }  
  24. }  

三,事務

很多方法和Memory的事務類相似。如:doTake(),doCommit(),doRollback(),doPut()

下面詳細的介紹這幾個方法。

1,doPut():source會呼叫put方法

[java] view plain copy
  1. @Override  
  2.     protected void doPut(Event event) throws InterruptedException {  
  3.       channelCounter.incrementEventPutAttemptCount();  
  4.       if(putList.remainingCapacity() == 0) {//是否有剩餘空間  
  5.         throw new ChannelException("Put queue for FileBackedTransaction " +  
  6.             "of capacity " + putList.size() + " full, consider " +  
  7.             "committing more frequently, increasing capacity or " +  
  8.             "increasing thread count. " + channelNameDescriptor);  
  9.       }  
  10.       // this does not need to be in the critical section as it does not  
  11.       // modify the structure of the log or queue.  
  12.       if(!queueRemaining.tryAcquire(keepAlive, TimeUnit.SECONDS)) {//嘗試等待  
  13.         throw new ChannelFullException("The channel has reached it's capacity. "  
  14.             + "This might be the result of a sink on the channel having too "  
  15.             + "low of batch size, a downstream system running slower than "  
  16.             + "normal, or that the channel capacity is just too low. "  
  17.             + channelNameDescriptor);  
  18.       }  
  19.       boolean success = false;  
  20.       log.lockShared();//獲取checkpoint的讀鎖,doTake()方法也會獲取讀鎖,所以doTake和doPut只能操作一個,無法同時操作。  
  21.       try {  
  22.        //transactionID是在TransactionIDOracle類中遞增的  
  23.         FlumeEventPointer ptr = log.put(transactionID, event);//將Event寫入資料檔案,使用RandomAccessFile。資料會快取到inflightputs檔案中  
  24.         Preconditions.checkState(putList.offer(ptr), "putList offer failed "  
  25.           + channelNameDescriptor);  
  26.         queue.addWithoutCommit(ptr, transactionID);//指標和事務ID加入到queue佇列中。  
  27.         success = true;  
  28.       } catch (IOException e) {  
  29.         throw new ChannelException("Put failed due to IO error "  
  30.                 + channelNameDescriptor, e);  
  31.       } finally {  
  32.         log.unlockShared();//釋放讀鎖  
  33.         if(!success) {  
  34.           // release slot obtained in the case  
  35.           // the put fails for any reason  
  36.           queueRemaining.release();//釋放訊號量  
  37.         }  
  38.       }  
  39.     }  

2,doTake():sink會呼叫put方法 [java] view plain copy
  1. <pre name="code" class="java">    protected Event doTake() throws InterruptedException {  
  2.       channelCounter.incrementEventTakeAttemptCount();  
  3.       if(takeList.remainingCapacity() == 0) {  
  4.         throw new ChannelException("Take list for FileBackedTransaction, capacity " +  
  5.             takeList.size() + " full, consider committing more frequently, " +  
  6.             "increasing capacity, or increasing thread count. "  
  7.                + channelNameDescriptor);  
  8.       }  
  9.       log.lockShared();//獲取鎖  
  10.       /* 
  11.        * 1. Take an event which is in the queue. 
  12.        * 2. If getting that event does not throw NoopRecordException, 
  13.        *    then return it. 
  14.        * 3. Else try to retrieve the next event from the queue 
  15.        * 4. Repeat 2 and 3 until queue is empty or an event is returned. 
  16.        */  
  17.   
  18.       try {  
  19.         while (true) {  
  20.           FlumeEventPointer ptr = queue.removeHead(transactionID);//獲取檔案指標,ptr的資料結構是fileID和offset  
  21.           if (ptr == null) {  
  22.             return null;  
  23.           } else {  
  24.             try {  
  25.               // first add to takeList so that if write to disk  
  26.               // fails rollback actually does it's work  
  27.               Preconditions.checkState(takeList.offer(ptr),  
  28.                 "takeList offer failed "  
  29.                   + channelNameDescriptor);  
  30.               log.take(transactionID, ptr); // write take to disk  
  31.               Event event = log.get(ptr);//根據檔案指標,使用log物件在磁碟中獲取到Event。資料會快取到inflighttakes檔案中  
  32.               return event;  
  33.             } catch (IOException e) {  
  34.               throw new ChannelException("Take failed due to IO error "  
  35.                 + channelNameDescriptor, e);  
  36.             } catch (NoopRecordException e) {  
  37.               LOG.warn("Corrupt record replaced by File Channel Integrity " +  
  38.                 "tool found. Will retrieve next event", e);  
  39.               takeList.remove(ptr);  
  40.             } catch (CorruptEventException ex) {  
  41.               if (fsyncPerTransaction) {  
  42.                 throw new ChannelException(ex);  
  43.               }  
  44.               LOG.warn("Corrupt record found. Event will be " +  
  45.                 "skipped, and next event will be read.", ex);  
  46.               takeList.remove(ptr);  
  47.             }  
  48.           }  
  49.         }  
  50.       } finally {  
  51.         log.unlockShared();//釋放鎖  
  52.       }  
  53.     }  
3,doCommit(): source和sink都會呼叫該方法提交事務 [java] view plain copy
  1. @Override  
  2. protected void doCommit() throws InterruptedException {  
  3.   int puts = putList.size();  
  4.   int takes = takeList.size();  
  5.   if(puts > 0) {//puts和takes不能同時都>0,其中有一個得是等於零  
  6.     Preconditions.checkState(takes == 0"nonzero puts and takes "  
  7.             + channelNameDescriptor);  
  8.     log.lockShared();//獲取鎖  
  9.     try {  
  10.       log.commitPut(transactionID);//該操作會封裝成一個ByteBuffer型別寫入到檔案,  
  11.       channelCounter.addToEventPutSuccessCount(puts);  
  12.       synchronized (queue) {  
  13.         while(!putList.isEmpty()) {  
  14.           if(!queue.addTail(putList.removeFirst())) {  
  15.             StringBuilder msg = new StringBuilder();  
  16.             msg.append("Queue add failed, this shouldn't be able to ");  
  17.             msg.append("happen. A portion of the transaction has been ");  
  18.             msg.append("added to the queue but the remaining portion ");  
  19.             msg.append("cannot be added. Those messages will be consumed ");  
  20.             msg.append("despite this transaction failing. Please report.");  
  21.             msg.append(channelNameDescriptor);  
  22.             LOG.error(msg.toString());  
  23.             Preconditions.checkState(false, msg.toString());  
  24.           }  
  25.         }  
  26.         queue.completeTransaction(transactionID);//清空checkpoint資料夾中inflightputs和inflighttakes檔案的內容  
  27.       }  
  28.     } catch (IOException e) {  
  29.       throw new ChannelException("Commit failed due to IO error "  
  30.               + channelNameDescriptor, e);  
  31.     } finally {  
  32.       log.unlockShared();//釋放鎖  
  33.     }  
  34.   
  35.   } else if (takes > 0) {  
  36.     log.lockShared();//釋放鎖  
  37.     try {  
  38.       log.commitTake(transactionID);//寫入data檔案  
  39.       queue.completeTransaction(transactionID);//和上面操作一樣  
  40.       channelCounter.addToEventTakeSuccessCount(takes);  
  41.     } catch (IOException e) {  
  42.       throw new ChannelException("Commit failed due to IO error "  
  43.           + channelNameDescriptor, e);  
  44.     } finally {  
  45.       log.unlockShared();  
  46.     }  
  47.     queueRemaining.release(takes);  
  48.   }  
  49.   putList.clear();  
  50.   takeList.clear();//清空兩個佇列  
  51.   channelCounter.setChannelSize(queue.getSize());  
  52. }  

4,doRollback():source和sink都會呼叫該方法回滾資料

[java] view plain copy
  1. @Override  
  2. protected void doRollback() throws InterruptedException {  
  3.   int puts = putList.size();  
  4.   int takes = takeList.size();  
  5.   log.lockShared();  
  6.   try {  
  7.     if(takes > 0) {  
  8.       Preconditions.checkState(puts == 0"nonzero puts and takes "  
  9.           + channelNameDescriptor);  
  10.       synchronized (queue) {  
  11.         while (!takeList.isEmpty()) {  
  12.           Preconditions.checkState(queue.addHead(takeList.removeLast()),  
  13.               "Queue add failed, this shouldn't be able to happen "  
  14.                   + channelNameDescriptor);  
  15.         }  
  16.       }  
  17.     }  
  18.     putList.clear();  
  19.     takeList.clear();  
  20.     queue.completeTransaction(transactionID);  
  21.     channelCounter.setChannelSize(queue.getSize());  
  22.     log.rollback(transactionID);//也是封裝成ByteBuffer,寫入到快取檔案中。  
  23.   } catch (IOException e) {  
  24.     throw new ChannelException("Commit failed due to IO error "  
  25.         + channelNameDescriptor, e);  
  26.   } finally {  
  27.     log.unlockShared();  
  28.     // since rollback is being called, puts will never make it on  
  29.     // to the queue and we need to be sure to release the resources  
  30.     queueRemaining.release(puts);  
  31.   }  
  32. }  

Flame的FileChannel在系統崩潰的時候保證資料的完整性和一致性,其實是通過JDK的位元組通道實現的(java.nio.channels.FileChannel),位元組通道為了保證資料在系統崩潰之後不丟失資料,檔案的修改模式會被強制到底層儲存裝置。


最後看下Flume FileChannel的檔案結構:

checkpoint目錄:

checkpoint:存放Event在那個data檔案logFileID的什麼位置offset等資訊。

inflighttakes:存放的是事務take的快取資料,每隔段時間就重建檔案。

內容:

1、16位元組是校驗碼;

2、transactionID1+eventsCount1+eventPointer11+eventPointer12+...;

3、transactionID2+eventsCount2+eventPointer21+eventPointer22+...

inflightputs:存放的是事務對應的put快取資料,每隔段時間就重建檔案。

內容:

1、16位元組是校驗碼;

2、transactionID1+eventsCount1+eventPointer11+eventPointer12+...;

3、transactionID2+eventsCount2+eventPointer21+eventPointer22+...

checkpoint.meta:主要儲存的是logfileID及對應event的數量等資訊。

data目錄:

log-ID.meta:主要記錄log-ID下一個寫入位置以及logWriteOrderID等資訊。

log-ID:資料檔案,目錄裡資料檔案保持不超過2個。


FileChannel實現比較複雜,先寫這麼多,以後有需要細細瞭解。


http://blog.csdn.net/qianshangding0708/article/details/48133033