【Flume】【原始碼分析】flumeng的事務控制的原理詳解【記憶體通道memory channel】
一開始我也是以為flume ng的事務控制是在sink端的,因為只看到那裡有事務的使用,但是今天看了一下fluem的整個事務控制,我才後知後覺,特此寫了這篇文章,望各位不吝指教。
先來一張圖吧!!!
從圖中可以看出,flume的事務控制在source端和sink端都有,具體的事務是依賴於通道的。這裡將的事務和檔案通道中的事務控制有個小區別【檔案通道中的事務是記錄在磁碟上】
1、獲取事務
Transaction transaction = channel.getTransaction();
方法定義:
內部呼叫createTransaction();方法,具體定義如下【看記憶體通道】:public Transaction getTransaction() { if (!initialized) { synchronized (this) { if (!initialized) { initialize(); initialized = true; } } } BasicTransactionSemantics transaction = currentTransaction.get(); if (transaction == null || transaction.getState().equals( BasicTransactionSemantics.State.CLOSED)) { transaction = createTransaction(); currentTransaction.set(transaction); } return transaction; }
protected BasicTransactionSemantics createTransaction() {
return new MemoryTransaction(transCapacity, channelCounter);
}
事務初始化了三個變數,分別是:事件放入列表【一次事務中可以放入的event數量】,事件取出列表【一次事務中可以取走的event數量】,通道監控度量資料public MemoryTransaction(int transCapacity, ChannelCounter counter) { putList = new LinkedBlockingDeque<Event>(transCapacity); takeList = new LinkedBlockingDeque<Event>(transCapacity); channelCounter = counter; }
transCapacity就是我們配置的事務容量,也就是一次事務中最多可以容下多少個event
事務,以及事務中的變數都定義好了,下面就是事務中具體的方法定義了:
以下只講述記憶體通道的相關方法定義:【關於檔案通道的講解】
1、doPut
putList放入一個event,代表一個event已經納入到事務中了;這個put的操作肯定是由source端發起的,看個例子:【關於ExexSource的原始碼分析】
這是channelprocessor的方法,迴圈遍歷reqChannelQueue這個map物件,對立面每個通道對應的批量event進行put操作,納入事務的過程中。for (Channel reqChannel : reqChannelQueue.keySet()) { Transaction tx = reqChannel.getTransaction(); Preconditions.checkNotNull(tx, "Transaction object must not be null"); try { tx.begin(); List<Event> batch = reqChannelQueue.get(reqChannel); for (Event event : batch) { reqChannel.put(event); } tx.commit(); } catch (Throwable t) { tx.rollback(); if (t instanceof Error) { LOG.error("Error while writing to required channel: " + reqChannel, t); throw (Error) t; } else { throw new ChannelException("Unable to put batch on required " + "channel: " + reqChannel, t); } } finally { if (tx != null) { tx.close(); } }
event在放入eventQueue是通過list的add方法,所以放在列表尾部
2、doTake
記憶體通道中有個變數
queue——LinkedBlockingDeque
在通道初始化的時候,初始化該變量了
synchronized(queueLock) {
queue = new LinkedBlockingDeque<Event>(capacity);
queueRemaining = new Semaphore(capacity);
queueStored = new Semaphore(0);
}
分別記錄的通道的總容量、剩餘空閒容量、佔用容量
take的時候首先從queue中取出隊頭event,前面第一步說了放入的時候是放在尾部,現在從頭部取,保證先入先出;這裡是從總體的通道容量中取出一個,還需要操作一下takeList,將其納入事務中,takeList中也put一次
3、doCommit
synchronized(queueLock) {
if(puts > 0 ) {
while(!putList.isEmpty()) {
if(!queue.offer(putList.removeFirst())) {
throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
}
}
}
putList.clear();
takeList.clear();
}
offer——放隊尾【從putList中取頭一個放】,一直迴圈到putList取完了
這裡commit都做完了,說明doTake肯定沒問題的,所以takeList要清空了
4、doRollback
synchronized(queueLock) {
Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " +
"queue to rollback takes. This should never happen, please report");
while(!takeList.isEmpty()) {
queue.addFirst(takeList.removeLast());
}
putList.clear();
}
回滾,肯定回滾的是sink寫出失敗的event,在takeList中,走到這個方法的時候,doCommit肯定是沒做的,只有發生異常了,才會到回滾,所以takeList並未clear。
這裡將takeList中的最後一個元素,迴圈取出放回通道佇列的第一個【因為doTake會從頭取,當然要把剛剛失敗的event繼續放到頭部,下次繼續操作這些event】
同理,這裡rollback了,說明putList肯定也沒用了,清空,重新再放
無論是往通道中放event還是從通道中取event,都有一個超時控制
// this does not need to be in the critical section as it does not
// modify the structure of the log or queue.
if(!queueRemaining.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
throw new ChannelFullException("The channel has reached it's capacity. "
+ "This might be the result of a sink on the channel having too "
+ "low of batch size, a downstream system running slower than "
+ "normal, or that the channel capacity is just too low. "
+ channelNameDescriptor);
}
這個keepAlive就是我們配置的超時時間,超時則會丟擲異常!以上所有操作方法中,大家看到有一些變數XXXCounter的操作,這些都是監控度量的資料,詳見【flume中度量監控的分析】