1. 程式人生 > >HBase之Table.put客戶端流程

HBase之Table.put客戶端流程

you 如果 rim asto 過大 pcre first scanner 都沒有

  首先,讓我們從HTable.put方法開始。由於這一節有很多方法只是簡單的參數傳遞,我就簡單略過,但是,關鍵的方法我還是會截圖講解,所以希望大家盡可能對照源碼進行流程分析。另外,在這一節,我單單介紹put操作在客戶端的流程,畢竟,這個內容已經很多了。至於具體服務端的流程,我會在後面的章節中介紹到,歡迎大家到時候閱讀。   由於這一節的方法還是比較復雜的,我特地畫了一張思維導圖,大家可以先通過思維導圖來對本節的內容有一個大概的了解,置於具體的流程,我在下面將對照源碼的貼圖一一為大家講解(在這裏聲明一點,我在這一節只介紹單個put操作的流程,至於put批處理,大家有興趣可以自己研究一下)。技術分享圖片   首先,讓我們來到HTable.put方法,如下圖所示:技術分享圖片
  這裏我先講一下這一節的最後調用流程,也同時讓大家明確一下在本節我著重要講解的流程是哪塊。在上圖中我已經表示出來了,後面方法的調用最後調用到了上面新創建的ClientServiceCallable中覆寫的rpcCall方法,也就是調用到了ClientServiceCallable.doMutate。關於這個方法中具體與服務端的交互流程在本節我就略過,但是,在後面的內容中,我會談到類似的情況,如果大家感興趣的話,可以繼續後面的內容。技術分享圖片  接下來讓我們回到本節的重點。首先是RpcRetryingCallerFactory.newCaller方法的調用,該方法使用RpcRetryingCallerFactory的成員參數創建了RpcRetryingCaller,用於後面對於RetryingCallable的調用(該方法在後面也會多次調用,在後面我就不貼圖了)。技術分享圖片
  接下來讓我們來到RpcRetryingCallerImpl.callWithRetries。這個方法是本節中最為重要的方法,在後面也會多次用到。方法雖然比較長,但大多是異常的情況的解決,在本節中我們就單單介紹callable.prepare與callable.call兩個方法。至於interceptor.intercept,由於在構造RpcRetryingCallerFactory時默認的interceptor類型為RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR,在本節並不會有其它影響,所以我們暫時不需要關註。技術分享圖片  上面的方法調用的callable具體類型為覆寫了rpcCall方法的ClientServiceCallable,下面讓我們來到ClientServiceCallable類的內部。ClientServiceCallable繼承自RegionServerCallable,因此,上面方法實際調用的是RegionServerCallable.prepare與RegionServerCallable.call。技術分享圖片
  首先讓我們來到RegionServerCallable.prepare方法。這裏比較重要的方法我已經框選出來了。需要大家特別留意的是最後的setStubByServiceName,一是因為他比較重要,二是我在後面的內容才會介紹,大家到時候可能忘記了,所以在這裏特別提醒一下大家。   容易看到,首先調用了connection.getRegionLocator獲得一個新構建的HRegionLocator(這裏就不截圖了,因為實在是沒有什麽內容需要講),不過大家需要註意的是,這裏的tableName是我們實際要查詢到tableName,而後面會用到META_TABLE_NAME,容易混淆,我在這裏簡單提一下。接下來調用了HRegionLocator.getRegionLocation。技術分享圖片  在調用HRegionLocator.getRegionLocation時,這裏會有一系列簡單方法的調用,由於在上面的導圖中我並沒有畫出,在這裏我就一一貼圖描述。技術分享圖片技術分享圖片技術分享圖片技術分享圖片 技術分享圖片技術分享圖片技術分享圖片  一系列方法走下來,到這裏就到了比較重要的方法。由於這個是長圖,沒有辦法框選除重點,我就在文字中一一介紹該方法中調用的比較好重要的方法。   1.getCachedLocation,該方法簡介調用到了metaCache.getCachedLocation,但此時,由於我們是第一次調用該表的信息,並沒有放到緩存中,因此,這裏返回的locations = null。   2.然後我們來到RegionInfo.createRegionName,需要註意的是,其入參row就是我們put操作創建的rowKey,也就是我們常說的行鍵。另外,在metaStartKey中傳入的id為HConstants.NINES(NINES = "99999999999999"),而在metaStopKey中傳入的id為空字符串。   3.接著構造了Scan。其中withStartRow與withStopRow中的inclusive入參都為true。將reversed設置為true,並且將catalog family設置為"info"(CATALOG_FAMILY_STR = "info")。大家可能註意到了,這裏的info列族在我們的表中並不一定存在。到了這裏,大家可能就猜到我在前面埋的伏筆了。沒錯,這裏構建的Scan是為了後面的查詢後面的META_TABLE_NAME做準備。   4.緊接著來到fro循環中,這裏連著調用了兩次getCachedLocation,後面的那次調用加了鎖,類似我們在單例設計模式中流程,加鎖以確保對象不會重復。   5.然後構建了ReversedClientScanner對象。(鑒於之前經驗,貼太多圖容易擾亂大家的思維,我在這裏盡量用文字來介紹)。ReversedClientScanner是ClientScanner的子類,另外,大家需要註意的是,在構造ReversedClientScanner時傳入的tableName為TableName.META_TABLE_NAME。在ReversedClientScanner的構造過程中,雖然有一些需要註意的地方,不過,我還是放在後面來描述,以便大家能夠更好的理解整個流程。   6.接下來調用了ReversedClientScanner.next,大家千萬不要小看這個方法,這個方法裏面的一系列調用時非常復雜的,也是本節的另外一個重點,我將在後面詳細介紹。   7.然後調用了MetaTableAccessor.getRegionLocations,其入參為ReversedClientScanner.next的返回值。這個方法的詳細流程也比較重要,同樣,我放到後面為大家講解。   8.最後調用了cacheLocation,也就是將當前tableName放到緩存中。技術分享圖片  上面,我將ConnectionImplementation.locateRegionInMeta方法中調用的各個流程都簡單介紹了一下,下面,我就選擇其中比較重要的方法來詳細描述。   首先讓我們來到ReversedClientScanner.next。這個方法調用了ClientScanner.nextWithSyncCache,如下圖所示:技術分享圖片  上圖框選的兩個方法都比較重要,讓我們首先介紹比較復雜的loadCache,如下圖所示。   看到這個方法大家可能比較慌,沒有關系,我會在這裏為大家一一介紹。   1.首先調用了moveToNextRegion。該方法首先調用closeScanner(其間首先調用了成員變量callable.setClose方法,然後調用了ClientScanner.call方法,這個方法我在後面也會提到,最後將當前成員變量callable中的值置為null,簡而言之,將成員變量callable.setClose置為null)。   然後構造了ScannerCallableWithReplicas並賦給成員變量callable。在構造ScannerCallableWithReplicas時需要註意的是其中創建了ReversedScannerCallable。也就是說ScannerCallableWithReplicas的成員變量currentScannerCallable為ReversedScannerCallable。順便提一下,ScannerCallableWithReplicas的成員變量scan為我們在上面構造的scan。   2.接著調用了ClientScanner.call方法。這裏的調用流程比較繁瑣。為了更清楚的解釋清楚loadCache方法,我們先跳過這裏,假設其中已經有了返回值。   3.然後調用了scanResultCache.addAndGet。簡單提示一下我們這裏的scanResultCache類型為CompleteScanResultCache。   4.然後將結果集中的內容遍歷放到成員變量cache中。這裏我們可以回過頭來看看上面的圖。上面圖中我框選了cache.poll方法。也就是說cache.poll將在loadCache方法中放入的結果集取出來。技術分享圖片 技術分享圖片  上面我提到過很多次ClientScanner.call方法,但是都沒有詳細描述,下面我就特意來講解該方法。其實這個方法很簡單,只是調用了方法RpcRetryingCaller.callWithoutRetries。這裏的caller是在ReversedClientScanner方法構造時創建的(上面只是提到說構造ReversedClientScanner有需要註意的地方,也就是這裏,其截圖我在上面也已經貼出來了)。技術分享圖片 技術分享圖片  接下來讓我們來到RpcRetryingCallerImpl.callWithoutRetries。這裏的入參callable我在上面的方法loadCache已經介紹過了。其類型為ScannerCallableWithReplicas。由於ScannerCallableWithReplicas.prepare方法為空實現,我在這裏就不貼圖了,接下來將重點放在ScannerCallableWithReplicas.call。技術分享圖片  讓我們來到ScannerCallableWithReplicas.call,如下圖所示。   1.在ClientScanner.closeScanner方法調用時,會走上面的if判斷。由於currentScannerCallable.closed的值為true。   2.由於默認的成員變量regionReplication,因此會調用RpcRetryingCallerWithReadReplicas.getRegionLocations。這個方法的調用與我們今天的主要流程並沒有什麽太多的聯系,因此,在這裏簡單略過。該方法我可能會放在後面的章節中講到。   3.構造了ResultBoundedCompletionService。這個方法比較重要,在後面的流程中我會反復講到。   4.調用了addCallsForCurrentReplica,將成員變量currentScannerCallable封裝到ScannerCallableWithReplicas.RetryingRPC中,並交由ResultBoundedCompletionService提交。   5.接著調用cs.poll,獲取其提交的任務的返回值。   後面我將詳細講解。技術分享圖片  首先來到ScannerCallableWithReplicas.addCallsForCurrentReplica方法。容易看到,將成員變量currentScannerCallable封裝到RetryingRPC中。然後調用了ResultBoundedCompletionService.submit。這裏著重提醒一下大家,這裏的currentScannerCallable類型為ReversedScannerCallable。技術分享圖片  接著讓我們來到ResultBoundedCompletionService.submit,如下圖所示。   這裏將傳入的RetryingRPC封裝到QueueingFuture,然後調用了executor.execute。由於QueueingFuture繼承自java.util.concurrent.RunnableFuture,也就是在調用executor.execute時,QueueingFuture.run方法會執行。技術分享圖片  接下來讓我們來到QueueingFuture。在下圖中,我框選出了其中比較重要的方法。   首先這裏調用了RpcRetryingCallerImpl.callWithRetries方法(由於這個方法我在上面已經提到過了,因此在這裏就不貼圖了)。重要的是其中的入參future類型為ScannerCallableWithReplicas.RetryingRPC。另外後面將當前QueueingFuture添加到ResultBoundedCompletionService成員變量completedTasks中。技術分享圖片  讓我們來到ScannerCallableWithReplicas.RetryingRPC.prepare方法。如下圖所示。大家可能對這裏的成員變量callable比較模糊了,大家可以往上翻到方法addCallsForCurrentReplica的描述,沒錯這裏的callable就是ScannerCallableWithReplicas的成員變量currentScannerCallable。而ScannerCallableWithReplicas.currentScannerCallable正是在構造ScannerCallableWithReplicas時傳入的ReversedScannerCallable。技術分享圖片  接下來讓我們來到ReversedScannerCallable.prepare。由於這是第一次調用prepare方法,因此其成員變量instantiated為false。這裏簡單提一下,這裏的getRow方法獲取的是我們調用put時的行鍵,也就是我們對於目標表的rowKey。由於這裏的tableName為TableName.META_TABLE_NAME,其rowKey在後面並沒有用到。   然後調用了ReversedScannerCallable.setStub方法。為成員變量stub的賦值。其值為getConnection().getClient(getLocation().getServerName())調用的返回值。技術分享圖片  讓我們來到ConnectionImplementation.getClient方法。看過我博文《HBase之HRegionServer啟動(含與HMaster交互)》的同學看到這裏可能就比較熟悉。 沒錯,這裏正是通過ClientProtos.ClientService.newBlockingStub構造了協議ClientProtos.ClientService的客戶端stub。關於與服務端交互的流程,我在《HBase之HRegionServer啟動(含與HMaster交互)》中已經具體介紹了,大家感興趣的可以去看一下,我們這裏來描述比較重要一個點。   就是computeIfAbsentEx的最後一個入參IOExceptionSupplier。他類似於java中的Supplier(類似的方法調用我在後面講解方法MetaTableAccessor.getRegionLocations)。技術分享圖片  在第一次調用時,我們的stubs中並沒有到該serverName的客戶端stub,因此調用了入參supplier的get方法。也就是我們上面看到的lambda表達式方法內容被調用。技術分享圖片  到這裏,ReversedScannerCallable.prepare方法就調用完成了。這個還有一個需要註意的點就是ReversedScannerCallable.prepare方法的最後將其成員變量instantiated置為true。   接下來讓我們來到ScannerCallableWithReplicas.RetryingRPC.call方法(這裏的callable類型為ReversedScannerCallable)。   這裏再次調用了RpcRetryingCallerImpl.callWithRetries,由於ReversedScannerCallable.prepare方法已經調用,並且其成員變量instantiated被置為true,所以上面描述的內容並不會再次調用(這裏框選的內容作為後面的伏筆)。技術分享圖片  也就是說,接下來應該調用的是ReversedScannerCallable.call。由於其並沒有call方法,因此,會一直調用到其父類RegionServerCallable.call。如下圖所示。這裏的rpcController類型為HBaseRpcControllerImpl。接下來調用了rpcCall方法。由於ReversedScannerCallable中並沒有rpcCall方法的實現,而在其父類ScannerCallable有實現rpcCall。技術分享圖片  接下來,讓我們來到ScannerCallable.rpcCall。由於默認的成員變量scannerId為-1,因此,會調用openScanner。由於openScanner方法僅僅是通過Client協議發送到服務端。關於rpc流程我在博客《hbase之RPC調用流程簡介》中已經介紹過了,感興趣的同學可以去看一下,那篇博文講的比較淺顯,我會在春節期間將那篇內容更新,大家可以關註我,到時候有更新大家也就收到通知了。   然後調用了ResponseConverter.getResults,將服務端的返回的ScanResponse轉換為Result。技術分享圖片  讓我們來到ResponseConverter.getResults。這個方法的主要作用是將CellScanner中Cell的或ScanResponse中的PB類型的results轉換為java類型的Result。至於該方法的詳細描述我要放到後面開設的第二章節,也就是HBase中客戶端協議各個操作中來講解,因為這裏流程是比較復雜的,要結合上服務端的流程才能講述清楚。所以這裏暫時略過。技術分享圖片  到這裏,一個完整的RpcRetryingCallerImpl.callWithRetries方法調用流程可以說是完結了。然後在ResultBoundedCompletionService.QueueingFuture.run方法的後面,將當前QueueingFuture添加到ResultBoundedCompletionService成員變量completedTasks中(雖然我在上面提到過,但這裏還是重述一下,以便我們後面的理解)。   而在我們本節描述的整體流程中,ScannerCallableWithReplicas.addCallsForCurrentReplica方法調用完結。   接下來讓我們來到ResultBoundedCompletionService.poll,由於其間接調用了ResultBoundedCompletionService.pollForSpecificCompletedTask,如下圖所由於在QueueingFuture.run方法的最後,將自身添加到了completedTasks。因此,上面的方法獲取的正是剛剛添加的QueueingFuture。接著調用了ResultBoundedCompletionService.QueueingFuture.get方法。如下圖所示。也就是說,這裏將result返回。這裏result的類型我們需要註意一下,以便後面在類型上面的理解。由於這裏QueueingFuture成員變量future的實際類型為ScannerCallableWithReplicas.RetryingRPC。大家可以往上翻到ScannerCallableWithReplicas.RetryingRPC.call,就可以發現,這裏的result是從ResponseConverter.getResults獲得的Result數組與成員變量callable封裝後的Pair對象。接著,將r.getFirst(),也就是實際獲得的結果返回。技術分享圖片   到這裏,大家可能以為要結束了,很遺憾,這裏只是到了ClientScanner.call方法的返回。   由於接下來的是兩個單獨的流程了。一個是MetaTableAccessor.getRegionLocations,另外一個是ConnectionImplementation.cacheLocation。至於這兩個流程之外的後續流程比較簡單,我就不一一敘述了,相信大家跟著源碼與我在前面的提示很容易就可以弄清楚了。而前面提到的那兩個單獨的流程我將放在後面的一節《HBase之Table.put客戶端流程(續)》中介紹。到時候歡迎大家閱讀。   大家可以關註我的博客,或者發送郵件到我的郵箱[email protected]來溝通交流大數據相關的知識。感謝大家的閱讀,如果覺得不錯,希望您可以點擊下面的推薦。

HBase之Table.put客戶端流程