HBase的put流程原始碼分析
阿新 • • 發佈:2019-01-24
hbase是一個nosql型資料庫,本文我們會分析一下客戶的資料是通過什麼樣的路徑寫入到hbase的。
HBase作為一種列族資料庫,其將相關性較高的列聚合成一個列族單元,不同的列族單元物理上儲存在不同的檔案(HFile)內。一個表的資料會水平切割成不同的region分佈在叢集中不同的regionserver上。客戶端訪問叢集時會首先得到該表的region在叢集中的分佈,之後的資料交換由客戶端和regionserver間通過rpc通訊實現,下面我們從hbase原始碼裡探究客戶端put資料的流程。本文參考的原始碼是1.1.2版本的hbase
1)客戶端
put在客戶端的操作主要分為三個步驟,下面分別從三個步驟展開解釋:
(一)、客戶端快取使用者提交的put請求
get/delete/put/append/increment等等等等客戶可用的函式都在客戶端的HTable.java檔案中。 在HTable.java檔案中有如下的兩個變數: private RpcRetryingCallerFactory rpcCallerFactory; private RpcControllerFactory rpcControllerFactory; protected AsyncProcess multiAp; 如上的幾個變數分別定義了rpc呼叫的工廠和一個非同步處理的程序 客戶端的put請求呼叫getBufferedMutator().mutate(put),進入mutate這個函式可以看到它會把使用者提交的此次put操作放入到列表writeAsyncBuffer中,當buffer中的資料超過規定值時,由後臺程序進行提交。(二)、將writeBuffer中的put操作根據region的不同進行分組,分別放入不同的Map集合程序提交由函式backgroudFlushCommits完成,提交動作包含同步提交和非同步提交兩種情況,由傳入的引數boolean控制。進入上述函式分析。
可見當傳入backgroudFlushCommits的引數為false時執行的是非同步提交,引數為true時執行的是同步提交。與此同時,可以發現無論非同步提交還是同步提交,實際的提交動作是由AsyncProcess ap執行的,呼叫的語句如下: ap.submit(tableName,writeAsyncBuffer,true,null,false)
從sendMultiAction函式中一步步向裡檢視程式碼,其將使用者的action請求通過getNewMultiActionRunnable、SingleServerRequestRunnable層層呼叫最終落到了hbase的RPC框架中,每個使用者請求包裝成包裝MultiServerCallable物件,其是一個Runnable物件,在該物件中使用者請求與服務端建立起RPC聯絡。所有的runnable物件最終交到AsyncProcess物件的內部執行緒池中處理執行。2)服務端
客戶端MultiServerCallable的call方法中呼叫了服務端的multi函式執行提交動作,進入服務端。multi方法內部會根據請求是否是原子請求,執行不同的操作語句,這裡我們以非原子性提交為例,其執行了doNonAtomicRegionMutation()函式,這個函式中先進行一些rpc請求的編碼,將編碼後的action相關資訊組織到一個List<ClientProtos.Action>型別的變數mutations中,這裡的編碼採用的proto buffer的編碼方案,然後呼叫doBatchOp()語句,其接受了mutations作為引數。 在doBatchOp函式中,可以看到其最終呼叫的batchMutate執行的批量操作,這裡操作的結果會返回到OperationStatus型別的變數codes[]中,包括了以下幾種狀態:BAD_FAMILY;SANITY_CHECK_FAILURE;SUCCESS等狀態。 這些狀態記錄了每個action的執行結果,包括成功啦、失敗啦等等。就一步地這些請求被包裝成一個MutationBatch型別的物件傳入batchMutate,batchMutatue首先判斷一下資源的狀態,然後呼叫doMiniBatchMutation()執行最終的put操作,該操作返回的是寫入資料的大小addedSize,根據addedSize計算此時memstore的size以決定是否flush,如果達到了flush的要求,執行requestFlush()。doMiniBatchMutation接受了MutationBatch型別的物件繼續作為其引數。關鍵程式碼如下所示:
- while (!batchOp.isDone()) { //操作未完成前一直迴圈
- if (!batchOp.isInReplay()) {
- checkReadOnly(); //判斷是否是隻讀狀態
- }
- checkResources(); //檢查相關資源
- if (!initialized) {
- this.writeRequestsCount.add(batchOp.operations.length); //更新寫請求計數器
- if (!batchOp.isInReplay()) {
- doPreMutationHook(batchOp);
- }
- initialized = true;
- }
- long addedSize = doMiniBatchMutation(batchOp); //最終的put操作是落在這裡的
- long newSize = this.addAndGetGlobalMemstoreSize(addedSize); //以原子操作的方式增加Region上的MemStore記憶體的大小
- if (isFlushSize(newSize)) { //判斷memstore的大小是否達到閾值,決定是否flush
- requestFlush();
- }
- }
然後,就是將batch中的資料寫入到各個store的memstore中,並根據batch中的資料構建WAL edit。 構造WAL edit之後,將該條資料對應的table name、region info、cluster id等等包裝成一個HLogKey結構的物件,該物件即為walkey,將walKey和WAL edit共同組裝成一個entry之後將之append到記憶體中的ringbuffer資料結構中。 注意的是這次的append操作產生一個HLog範圍內的id,記作txid。txid用於標識這次寫事務寫入的HLog日誌。 寫入buffer後,即釋放所有的行鎖,兩階段鎖過程結束。然後在3153行 syncOrDefer(txid,durability) 將這次事務的日誌持久化到hfs中,一旦持久化完成便提交此次事務,程式碼在3170行,其呼叫了completeMemstoreInsertWithSeqNum(),走進這個函式會發現其在寫入mvccnum之後,呼叫了waitForPreviousTransactoinsComplete()函式,這個函式實際是推進了mvcc的memstoreRead,推進的思路如下: 先鎖上writeQueue佇列,然後一個一個看,找連續的已完成的WriteEntry,最後一個WriteEntry的writeNumber即是最新的點,此時可以賦值給mvcc.memstoreRead,後續讀事務一開始就去拿mvcc.memstoreRead,從而能夠拿到本次寫入的資料。 這裡要補充一句,此時寫入的資料儲存在memstore中,並沒有持久化到hdfs中,記憶體中的key-value是以skip list的資料結構儲存的。
總結上面hbase的寫路徑可以發現在hbase的寫入過程中應用到了如下的一些技術:
首先,客戶端的rpc請求傳遞到服務端時,函式AsyncRequestFutureImpl()是一個Lazy優化,或者說是一個非同步的優化,雖然函式聲明瞭一個對服務端的rpc呼叫,但是它並沒有馬上呼叫服務端,而是在需要時才真正呼叫服務端。
第二,資料提交時採用了group commit技術,理解group commit可以用挖煤做比喻,是一鏟子一鏟子挖比較快,還是一次挖出一車比較省力。
第三,MVCC即多版本併發控制
限於篇幅和本人的知識有限,以上所說的只是簡單描述了hbase的寫事務的主幹路徑,並簡要指出了其中的關鍵技術點,此外還有冪等控制、回滾操作、錯誤處理以及寫入執行緒模型等等等等,即便是提到的mvcc、group commit也只是蜻蜓點水,如果展開還有很多很精彩的內容值得大家研究,如果你也對hbase感興趣,歡迎與我一起討論,共同提高。
參考資料:http://www.cnblogs.com/foxmailed/p/3897884.html