1. 程式人生 > 其它 >hbase原始碼系列(十一)Put、Delete在服務端是如何處理?

hbase原始碼系列(十一)Put、Delete在服務端是如何處理?

在講完之後HFile和HLog之後,今天我想分享是Put在Region Server經歷些了什麼?相信前面看了《HTable探祕》的朋友都會有印象,沒看過的建議回去先看看,Put是通過MultiServerCallable來提交的多個Put,好,我們就先去這個類吧,在call方法裡面,我們找到了這句。

responseProto = getStub().multi(controller, requestProto);

它呼叫了Region Server的multi方法。好,我們立即殺到HRegionServer去,搜尋找到multi這個方法。

public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
  throws ServiceException {
    // RpcController是屬於後門的,這樣返回的資料就不用序列化了
    PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
    CellScanner cellScanner = controller != null? controller.cellScanner(): null;
    if (controller != null) controller.setCellScanner(null);
    List<CellScannable> cellsToReturn = null;
     MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
     //取出來所有的Action
     for (RegionAction regionAction : request.getRegionActionList()) {
       this.requestCount.add(regionAction.getActionCount());
       RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
       HRegion region;
       try {
         //獲取對應的HRegion
         region = getRegion(regionAction.getRegion());
       } catch (IOException e) {
         responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
         continue;  // 報告這個action有錯
       }

       if (regionAction.hasAtomic() && regionAction.getAtomic()) {
         try {
          //如果是原子操作,就走原子操作的通道
           mutateRows(region, regionAction.getActionList(), cellScanner);
         } catch (IOException e) {
           regionActionResultBuilder.setException(ResponseConverter.buildException(e));
         }
       } else {
         // 非原子性提交,把錯誤內部處理了
         cellsToReturn = doNonAtomicRegionMutation(region, regionAction, cellScanner,
             regionActionResultBuilder, cellsToReturn);
       }
       responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
     }
     // 如果需要返回資料的話,就new一個createCellScanner扔回去
     if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
       controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
     }
     return responseBuilder.build();
   }

 這個方法裡面還包括了PayloadCarryingRpcController和CellScanner可以看得出來它不只是被Put來用的,但是這些我們不管我們只看Put如何處理就行了。

1、取出來所有的action(Put),這裡主要是put,因為我們呼叫客戶端就是這麼呼叫的,其實別的型別也可以支援,獲取他們對應的region。

2、根據action的原子性來判斷走哪個方法,原子性操作走mutateRows,非原子性操作走doNonAtomicRegionMutation方法,我查了一下這個Atomic到底是怎麼回事,我搜索了一下程式碼,發現在呼叫HTable的mutateRow方法的時候,它設定了Atomic為true,這個是應該是支援一行資料的原子性的,有這個需求的童鞋可以嘗試用這個方法,也是可以提交多個,包括Put、Delete操作。

regionMutationBuilder.setAtomic(true);
getStub().multi(null, request);

 我們先看doNonAtomicRegionMutation,這是我們常用的方式。

   List<ClientProtos.Action> mutations = null;
     for (ClientProtos.Action action: actions.getActionList()) {
       ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null;
       try {
         Result r = null;
         if (action.hasGet()) {
           Get get = ProtobufUtil.toGet(action.getGet());
           r = region.get(get);
         } else if (action.hasMutation()) {
           MutationType type = action.getMutation().getMutateType();
           if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
               !mutations.isEmpty()) {
             // 如果這個操作不是Put或者Delete的話,就一下子把前面的活都先幹了?
             doBatchOp(builder, region, mutations, cellScanner);
             mutations.clear();
           }
           switch (type) {
           case APPEND:
             r = append(region, action.getMutation(), cellScanner);
             break;
           case INCREMENT:
             r = increment(region, action.getMutation(), cellScanner);
             break;
           case PUT:
           case DELETE:
             // 前面的那些,我們都用得少,或者是不用,不用管它們,看這裡就行
             if (mutations == null) {
               mutations = new ArrayList<ClientProtos.Action>(actions.getActionCount());
            }
             mutations.add(action);
             break;
           default:
             throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
          }
        } else {
           throw new HBaseIOException("Unexpected Action type");
        }
        if (r != null) {
          ClientProtos.Result pbResult = null;
          if (isClientCellBlockSupport()) {
             pbResult = ProtobufUtil.toResultNoData(r);
             //  
             if (cellsToReturn == null) cellsToReturn = new ArrayList<CellScannable>();
             cellsToReturn.add(r);
          } else {
            pbResult = ProtobufUtil.toResult(r);
          }
          //把result編譯成Protobuf碼,返回
          resultOrExceptionBuilder =
            ClientProtos.ResultOrException.newBuilder().setResult(pbResult);
        }
      } catch (IOException ie) {
        resultOrExceptionBuilder = ResultOrException.newBuilder().
          setException(ResponseConverter.buildException(ie));
      }
      if (resultOrExceptionBuilder != null) {
        // Propagate index.
        resultOrExceptionBuilder.setIndex(action.getIndex());
        builder.addResultOrException(resultOrExceptionBuilder.build());
      }
    }
    //進行批量操作 
    if (mutations != null && !mutations.isEmpty()) {
      doBatchOp(builder, region, mutations, cellScanner);
    }
    return cellsToReturn;

這裡面程式碼很多,也適配了很多種型別,是個大而全的方法,但是我們這裡用到的只是把Put、Delete等的型別轉換新增到mutations的列表裡,然後走下面這個批量操作。

此外get的批量操作也是走的這個方法,裡面它走的是HRegion.get的方法返回一個Result。

doBatchOp(builder, region, mutations, cellScanner);

 doBatchOp裡面的程式碼我就補貼了,老帖程式碼就沒意思了。

1、還是得把Put、Delete給轉換型別,這裡的批量操作只支援全是Delete或者全是Put。

2、用HRegion.batchMutate方法來執行操作,返回OperationStatus陣列,記錄每個action的狀態,是成功,還是失敗,或者是別的狀態。

在batchMutate裡面首先就是檢查是否是隻讀狀態,然後檢查是否是Meta Region的,是不執行MemStore檢查了,因為MemStore的堆記憶體超過了阻塞佇列的MemStore大小,就會報錯誤,太惡劣了。。。沒catch的哦。

long addedSize = doMiniBatchMutation(batchOp, isReplay);
//MemStore的大小到了閥值,就要flush到檔案了
if (isFlushSize(newSize)) {
   requestFlush();
}

doMiniBatchMutation就是我們的終極boss了,是個很長很臭的類,貼程式碼都不能一下子全貼。

1、例項化幾個重要的類,後面具體會用到

//日誌,isInReplay是否支援重做,這裡是false
WALEdit walEdit = new WALEdit(isInReplay);
//控制多版本的MemStore flush的結果,每次flush的w都是一樣的,就好像同一批號的食品
MultiVersionConsistencyControl.WriteEntry w = null;
long txid = 0;
//日誌同步是否成功
boolean walSyncSuccessful = false;
boolean locked = false;

2、檢查Put和Delete裡面的列族是否和Region持有的列族的定義相同,有時候我們在Delete的時候是不填列族的,這裡它給這個缺的列族來一個KeyValue.Type.DeleteFamily,刪除列族的型別。

3、給Row加鎖,先計算hash值做key,如果該key沒上過鎖,就上一把鎖,然後計算出來要寫的action有多少個,記錄到numReadyToWrite。

4、更新時間戳,把該action裡面的所有的kv的時間戳更新為最新的時間戳,它這裡也會把之前的沒執行的也一起更新。

5、給該region加鎖,這個時間點之後,就不允許讀了,等待時間需要根據numReadyToWrite的數量來計算。

//加鎖,現在要上鎖了,這段時間內不允許讀
lock(this.updatesLock.readLock(), numReadyToWrite);
locked = true;

//等待時間
final long waitTime = Math.min(maxBusyWaitDuration,
        busyWaitDuration * Math.min(numReadyToWrite, maxBusyWaitMultiplier));
      if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
        throw new RegionTooBusyException(
          "failed to get a lock in " + waitTime + "ms");
}

6、上鎖之後,下面就是重頭戲了,也就是Put、Delete等的重點。給這些寫入memstore的資料建立一個批次號。

//為這次新增進MemStore的資料新增一個批次號
w = mvcc.beginMemstoreInsert();

//這是批次號的計算方式,nextWriteNumber就等於memstore的寫的次數+1
public WriteEntry beginMemstoreInsert() {
    synchronized (writeQueue) {
      long nextWriteNumber = ++memstoreWrite;
      WriteEntry e = new WriteEntry(nextWriteNumber);
      writeQueue.add(e);
      return e;
    }
}

7、把kv們寫入到memstore當中,然後計算出來一個新增資料之後的新的MemStore的大小addedSize。

//把kv們寫入memstore
long addedSize = 0;
for (int i = firstIndex; i < lastIndexExclusive; i++) {
    if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
        continue;
    }
    addedSize += applyFamilyMapToMemstore(familyMaps[i], w);
} 

這個新增到MemStore裡面也沒啥神祕的,因為MemStore裡面有兩個kv的集合,它只是把kv新增到集合裡面去,看下面的程式碼就知道了。

private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
    MultiVersionConsistencyControl.WriteEntry localizedWriteEntry) {
    long size = 0;try {for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
        byte[] family = e.getKey();
        List<Cell> cells = e.getValue();
        //把kv新增到memstore當中
        Store store = getStore(family);
        for (Cell cell: cells) {
          KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
          kv.setMvccVersion(localizedWriteEntry.getWriteNumber());
          size += store.add(kv);
        }
      }
    } 
return size;
   }

注意這一句話kv.setMvccVersion(localizedWriteEntry.getWriteNumber());  後面會用到的。

8、把kv新增到日誌當中,標誌狀態為成功,如果是使用者設定了不寫入日誌的,它就不寫入日誌了。

      Durability durability = Durability.USE_DEFAULT;
      for (int i = firstIndex; i < lastIndexExclusive; i++) {
        // 跳過狀態不對的
        if (batchOp.retCodeDetails[i].getOperationStatusCode()
            != OperationStatusCode.NOT_RUN) {
          continue;
        }
        //標誌狀態為成功
        batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;

        Mutation m = batchOp.operations[i];
        //獲取自定義的日誌同步方式
        Durability tmpDur = getEffectiveDurability(m.getDurability());
        if (tmpDur.ordinal() > durability.ordinal()) {
          durability = tmpDur;
        }
        if (tmpDur == Durability.SKIP_WAL) {
          //記錄日誌的kv的大小,但是不寫入到日誌當中
          recordMutationWithoutWal(m.getFamilyCellMap());
          continue;
        }
        //把列族裡面的kv全部新增到walEdit當中
        addFamilyMapToWALEdit(familyMaps[i], walEdit);
      }

9、先非同步新增日誌,這裡為什麼是非同步的,因為之前給上鎖了,暫時不能讀了,如果這裡呼叫的是同步的方法,後果自己想象下。

Mutation mutation = batchOp.operations[firstIndex];
if (walEdit.size() > 0) {
   //非同步新增日誌
   txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(),
           walEdit, mutation.getClusterIds(), now, this.htableDescriptor);
}

10、釋放之前建立的鎖。

//釋放相關的鎖
if (locked) {
    this.updatesLock.readLock().unlock();
    locked = false;
}
releaseRowLocks(acquiredRowLocks);

11、同步日誌。

if (walEdit.size() > 0) {
    syncOrDefer(txid, durability);
}
walSyncSuccessful = true;

12、結束該批次的操作。

if (w != null) {
   mvcc.completeMemstoreInsert(w);
   w = null;
}

到這裡其實就是結束了。但是如果新增到了MemStore裡面了,但是日誌沒有同步成功呢?

finally {
  if (!walSyncSuccessful) {
     //如果日誌沒有成功,
     rollbackMemstore(batchOp, familyMaps, firstIndex, lastIndexExclusive);
  }
  ......  
}

一路跟蹤程式碼下去,跟蹤到程式碼在MemStore的rollback方法裡面。

KeyValue found = this.snapshot.get(kv);
if (found != null && found.getMvccVersion() == kv.getMvccVersion()) {
  this.snapshot.remove(kv);
}
// 比較一下mvcc,相同就刪除.
found = this.kvset.get(kv);
if (found != null && found.getMvccVersion() == kv.getMvccVersion()) {
   removeFromKVSet(kv);
   long s = heapSizeChange(kv, true);
   this.size.addAndGet(-s);
}

比較了MvccVersion,發現是同一批次的,就幹掉了。

過程寫得比較凌亂了,把之前的總結一下吧:

1、做準備工作,例項化變數

2、檢查Put和Delete裡面的列族是否和Region持有的列族的定義相同。

3、給Row加鎖,先計算hash值做key,如果該key沒上過鎖,就上一把鎖,然後計算出來要寫的action有多少個,記錄到numReadyToWrite。

4、更新時間戳,把該action裡面的所有的kv的時間戳更新為最新的時間戳,它這裡也會把之前的沒執行的也一起更新。

5、給該region加鎖,這個時間點之後,就不允許讀了,等待時間需要根據numReadyToWrite的數量來計算。

6、上鎖之後,下面就是重頭戲了,也就是Put、Delete等的重點。給這些寫入memstore的資料建立一個批次號。

7、把kv們寫入到memstore當中,然後計算出來一個新增資料之後的新的MemStore的大小addedSize。

8、把kv新增到日誌當中,標誌狀態為成功,如果是使用者設定了不寫入日誌的,它就不寫入日誌了。

9、先非同步新增日誌。

10、釋放之前建立的鎖。

11、同步日誌。

12、結束該批次的操作。

Final、同步日誌沒成功的,最後根據批次回滾MemStore中的操作。

上面的過程適用於Put和Delete的批量操作,但是這裡總感覺很好奇,就這樣結束了,Put和Delete操作就沒區別嗎,那它怎麼刪除資料的?

返回在第4步更新時間戳的時候,發現了一些貓膩,Delete的情況執行了prepareDeleteTimestamps方法,看看吧。

void prepareDeleteTimestamps(Map<byte[], List<Cell>> familyMap, byte[] byteNow)
      throws IOException {
    for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
      byte[] family = e.getKey();
      List<Cell> cells = e.getValue();
      //列和count的對映
      Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);

      for (Cell cell: cells) {
        KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
        // 如果是時間戳是最新的話就執行下面這些操作
        if (kv.isLatestTimestamp() && kv.isDeleteType()) {
          //new一個Get從Store裡面去搜索
         } else {
          kv.updateLatestStamp(byteNow);
        }
      }
    }
  }

看來一下程式碼,這裡是上來先判斷是否是最新的時間戳,我就回去看來一下Delete的建構函式,尼瑪。。。

public Delete(byte [] row) {
    this(row, HConstants.LATEST_TIMESTAMP);
}

public Delete(byte [] row, long timestamp) {
    this(row, 0, row.length, timestamp);
}

只傳了rowkey進去的,它就是最新的。。然後看了一下注釋,凡是在這個時間點之前的所有版本的所有列,我們都要刪除。

好吧,我們很無奈的宣佈,我們只能走kv.isLatestTimestamp() && kv.isDeleteType(),下面是沒放出來的程式碼。

byte[] qual = kv.getQualifier();
          if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
          //想到相同列的每次+1
          Integer count = kvCount.get(qual);
          if (count == null) {
            kvCount.put(qual, 1);
          } else {
            kvCount.put(qual, count + 1);
          }
          //更新之後把最新的count數量
          count = kvCount.get(qual);

          Get get = new Get(kv.getRow());
          get.setMaxVersions(count);
          get.addColumn(family, qual);
          //從store當中取出相應的result來
          List<Cell> result = get(get, false);

          if (result.size() < count) {
            // Nothing to delete 數量不夠。。 更新最新的時間戳為現在的時間
            kv.updateLatestStamp(byteNow);
            continue;
          }
          //數量超過了也不行
          if (result.size() > count) {
            throw new RuntimeException("Unexpected size: " + result.size());
          }
          //取最後一個的時間戳
          KeyValue getkv = KeyValueUtil.ensureKeyValue(result.get(count - 1));
          //更新kv的時間戳為getkv的時間戳
          Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(),
              getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);

這裡又幹了一個Get操作,把列族的多個版本的內容取出來,如果數量不符合預期也會有問題,但是這後面操作的中心思想就是:

(a)按照預期來說,取出來的少了,就設定刪除的時間戳為現在;

(b)取出來的多了,就報錯;

(c)剛好的,就把Delete的時間戳設定為最大的那個的時間戳,但即便是這樣也沒有刪除資料。

回到這裡我又想起來,只有在Compaction之後,hbase的檔案才會變小,難道是在那個時候刪除的?那在刪除之前,我們進行Get或者Scan操作的時候,會不會讀到這些沒有被刪除的資料呢?

好,讓我們拭目以待。