hbase原始碼系列(十一)Put、Delete在服務端是如何處理?
在講完之後HFile和HLog之後,今天我想分享是Put在Region Server經歷些了什麼?相信前面看了《HTable探祕》的朋友都會有印象,沒看過的建議回去先看看,Put是通過MultiServerCallable來提交的多個Put,好,我們就先去這個類吧,在call方法裡面,我們找到了這句。
responseProto = getStub().multi(controller, requestProto);
它呼叫了Region Server的multi方法。好,我們立即殺到HRegionServer去,搜尋找到multi這個方法。
public MultiResponse multi(final RpcController rpcc, final MultiRequest request) throws ServiceException { // RpcController是屬於後門的,這樣返回的資料就不用序列化了 PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc; CellScanner cellScanner = controller != null? controller.cellScanner(): null; if (controller != null) controller.setCellScanner(null); List<CellScannable> cellsToReturn = null; MultiResponse.Builder responseBuilder = MultiResponse.newBuilder(); //取出來所有的Action for (RegionAction regionAction : request.getRegionActionList()) { this.requestCount.add(regionAction.getActionCount()); RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder(); HRegion region; try { //獲取對應的HRegion region = getRegion(regionAction.getRegion()); } catch (IOException e) { responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); continue; // 報告這個action有錯 } if (regionAction.hasAtomic() && regionAction.getAtomic()) { try { //如果是原子操作,就走原子操作的通道 mutateRows(region, regionAction.getActionList(), cellScanner); } catch (IOException e) { regionActionResultBuilder.setException(ResponseConverter.buildException(e)); } } else { // 非原子性提交,把錯誤內部處理了 cellsToReturn = doNonAtomicRegionMutation(region, regionAction, cellScanner, regionActionResultBuilder, cellsToReturn); } responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); } // 如果需要返回資料的話,就new一個createCellScanner扔回去 if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) { controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn)); } return responseBuilder.build(); }
這個方法裡面還包括了PayloadCarryingRpcController和CellScanner可以看得出來它不只是被Put來用的,但是這些我們不管我們只看Put如何處理就行了。
1、取出來所有的action(Put),這裡主要是put,因為我們呼叫客戶端就是這麼呼叫的,其實別的型別也可以支援,獲取他們對應的region。
2、根據action的原子性來判斷走哪個方法,原子性操作走mutateRows,非原子性操作走doNonAtomicRegionMutation方法,我查了一下這個Atomic到底是怎麼回事,我搜索了一下程式碼,發現在呼叫HTable的mutateRow方法的時候,它設定了Atomic為true,這個是應該是支援一行資料的原子性的,有這個需求的童鞋可以嘗試用這個方法,也是可以提交多個,包括Put、Delete操作。
regionMutationBuilder.setAtomic(true);
getStub().multi(null, request);
我們先看doNonAtomicRegionMutation,這是我們常用的方式。
List<ClientProtos.Action> mutations = null; for (ClientProtos.Action action: actions.getActionList()) { ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null; try { Result r = null; if (action.hasGet()) { Get get = ProtobufUtil.toGet(action.getGet()); r = region.get(get); } else if (action.hasMutation()) { MutationType type = action.getMutation().getMutateType(); if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null && !mutations.isEmpty()) { // 如果這個操作不是Put或者Delete的話,就一下子把前面的活都先幹了? doBatchOp(builder, region, mutations, cellScanner); mutations.clear(); } switch (type) { case APPEND: r = append(region, action.getMutation(), cellScanner); break; case INCREMENT: r = increment(region, action.getMutation(), cellScanner); break; case PUT: case DELETE: // 前面的那些,我們都用得少,或者是不用,不用管它們,看這裡就行 if (mutations == null) { mutations = new ArrayList<ClientProtos.Action>(actions.getActionCount()); } mutations.add(action); break; default: throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); } } else { throw new HBaseIOException("Unexpected Action type"); } if (r != null) { ClientProtos.Result pbResult = null; if (isClientCellBlockSupport()) { pbResult = ProtobufUtil.toResultNoData(r); // if (cellsToReturn == null) cellsToReturn = new ArrayList<CellScannable>(); cellsToReturn.add(r); } else { pbResult = ProtobufUtil.toResult(r); } //把result編譯成Protobuf碼,返回 resultOrExceptionBuilder = ClientProtos.ResultOrException.newBuilder().setResult(pbResult); } } catch (IOException ie) { resultOrExceptionBuilder = ResultOrException.newBuilder(). setException(ResponseConverter.buildException(ie)); } if (resultOrExceptionBuilder != null) { // Propagate index. resultOrExceptionBuilder.setIndex(action.getIndex()); builder.addResultOrException(resultOrExceptionBuilder.build()); } } //進行批量操作 if (mutations != null && !mutations.isEmpty()) { doBatchOp(builder, region, mutations, cellScanner); } return cellsToReturn;
這裡面程式碼很多,也適配了很多種型別,是個大而全的方法,但是我們這裡用到的只是把Put、Delete等的型別轉換新增到mutations的列表裡,然後走下面這個批量操作。
此外get的批量操作也是走的這個方法,裡面它走的是HRegion.get的方法返回一個Result。
doBatchOp(builder, region, mutations, cellScanner);
doBatchOp裡面的程式碼我就補貼了,老帖程式碼就沒意思了。
1、還是得把Put、Delete給轉換型別,這裡的批量操作只支援全是Delete或者全是Put。
2、用HRegion.batchMutate方法來執行操作,返回OperationStatus陣列,記錄每個action的狀態,是成功,還是失敗,或者是別的狀態。
在batchMutate裡面首先就是檢查是否是隻讀狀態,然後檢查是否是Meta Region的,是不執行MemStore檢查了,因為MemStore的堆記憶體超過了阻塞佇列的MemStore大小,就會報錯誤,太惡劣了。。。沒catch的哦。
long addedSize = doMiniBatchMutation(batchOp, isReplay);
//MemStore的大小到了閥值,就要flush到檔案了
if (isFlushSize(newSize)) {
requestFlush();
}
doMiniBatchMutation就是我們的終極boss了,是個很長很臭的類,貼程式碼都不能一下子全貼。
1、例項化幾個重要的類,後面具體會用到
//日誌,isInReplay是否支援重做,這裡是false
WALEdit walEdit = new WALEdit(isInReplay);
//控制多版本的MemStore flush的結果,每次flush的w都是一樣的,就好像同一批號的食品
MultiVersionConsistencyControl.WriteEntry w = null;
long txid = 0;
//日誌同步是否成功
boolean walSyncSuccessful = false;
boolean locked = false;
2、檢查Put和Delete裡面的列族是否和Region持有的列族的定義相同,有時候我們在Delete的時候是不填列族的,這裡它給這個缺的列族來一個KeyValue.Type.DeleteFamily,刪除列族的型別。
3、給Row加鎖,先計算hash值做key,如果該key沒上過鎖,就上一把鎖,然後計算出來要寫的action有多少個,記錄到numReadyToWrite。
4、更新時間戳,把該action裡面的所有的kv的時間戳更新為最新的時間戳,它這裡也會把之前的沒執行的也一起更新。
5、給該region加鎖,這個時間點之後,就不允許讀了,等待時間需要根據numReadyToWrite的數量來計算。
//加鎖,現在要上鎖了,這段時間內不允許讀
lock(this.updatesLock.readLock(), numReadyToWrite);
locked = true;
//等待時間
final long waitTime = Math.min(maxBusyWaitDuration,
busyWaitDuration * Math.min(numReadyToWrite, maxBusyWaitMultiplier));
if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
throw new RegionTooBusyException(
"failed to get a lock in " + waitTime + "ms");
}
6、上鎖之後,下面就是重頭戲了,也就是Put、Delete等的重點。給這些寫入memstore的資料建立一個批次號。
//為這次新增進MemStore的資料新增一個批次號
w = mvcc.beginMemstoreInsert();
//這是批次號的計算方式,nextWriteNumber就等於memstore的寫的次數+1
public WriteEntry beginMemstoreInsert() {
synchronized (writeQueue) {
long nextWriteNumber = ++memstoreWrite;
WriteEntry e = new WriteEntry(nextWriteNumber);
writeQueue.add(e);
return e;
}
}
7、把kv們寫入到memstore當中,然後計算出來一個新增資料之後的新的MemStore的大小addedSize。
//把kv們寫入memstore
long addedSize = 0;
for (int i = firstIndex; i < lastIndexExclusive; i++) {
if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
continue;
}
addedSize += applyFamilyMapToMemstore(familyMaps[i], w);
}
這個新增到MemStore裡面也沒啥神祕的,因為MemStore裡面有兩個kv的集合,它只是把kv新增到集合裡面去,看下面的程式碼就知道了。
private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
MultiVersionConsistencyControl.WriteEntry localizedWriteEntry) {
long size = 0;try {for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
byte[] family = e.getKey();
List<Cell> cells = e.getValue();
//把kv新增到memstore當中
Store store = getStore(family);
for (Cell cell: cells) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
kv.setMvccVersion(localizedWriteEntry.getWriteNumber());
size += store.add(kv);
}
}
}
return size;
}
注意這一句話kv.setMvccVersion(localizedWriteEntry.getWriteNumber()); 後面會用到的。
8、把kv新增到日誌當中,標誌狀態為成功,如果是使用者設定了不寫入日誌的,它就不寫入日誌了。
Durability durability = Durability.USE_DEFAULT;
for (int i = firstIndex; i < lastIndexExclusive; i++) {
// 跳過狀態不對的
if (batchOp.retCodeDetails[i].getOperationStatusCode()
!= OperationStatusCode.NOT_RUN) {
continue;
}
//標誌狀態為成功
batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
Mutation m = batchOp.operations[i];
//獲取自定義的日誌同步方式
Durability tmpDur = getEffectiveDurability(m.getDurability());
if (tmpDur.ordinal() > durability.ordinal()) {
durability = tmpDur;
}
if (tmpDur == Durability.SKIP_WAL) {
//記錄日誌的kv的大小,但是不寫入到日誌當中
recordMutationWithoutWal(m.getFamilyCellMap());
continue;
}
//把列族裡面的kv全部新增到walEdit當中
addFamilyMapToWALEdit(familyMaps[i], walEdit);
}
9、先非同步新增日誌,這裡為什麼是非同步的,因為之前給上鎖了,暫時不能讀了,如果這裡呼叫的是同步的方法,後果自己想象下。
Mutation mutation = batchOp.operations[firstIndex];
if (walEdit.size() > 0) {
//非同步新增日誌
txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(),
walEdit, mutation.getClusterIds(), now, this.htableDescriptor);
}
10、釋放之前建立的鎖。
//釋放相關的鎖
if (locked) {
this.updatesLock.readLock().unlock();
locked = false;
}
releaseRowLocks(acquiredRowLocks);
11、同步日誌。
if (walEdit.size() > 0) {
syncOrDefer(txid, durability);
}
walSyncSuccessful = true;
12、結束該批次的操作。
if (w != null) {
mvcc.completeMemstoreInsert(w);
w = null;
}
到這裡其實就是結束了。但是如果新增到了MemStore裡面了,但是日誌沒有同步成功呢?
finally {
if (!walSyncSuccessful) {
//如果日誌沒有成功,
rollbackMemstore(batchOp, familyMaps, firstIndex, lastIndexExclusive);
}
......
}
一路跟蹤程式碼下去,跟蹤到程式碼在MemStore的rollback方法裡面。
KeyValue found = this.snapshot.get(kv);
if (found != null && found.getMvccVersion() == kv.getMvccVersion()) {
this.snapshot.remove(kv);
}
// 比較一下mvcc,相同就刪除.
found = this.kvset.get(kv);
if (found != null && found.getMvccVersion() == kv.getMvccVersion()) {
removeFromKVSet(kv);
long s = heapSizeChange(kv, true);
this.size.addAndGet(-s);
}
比較了MvccVersion,發現是同一批次的,就幹掉了。
過程寫得比較凌亂了,把之前的總結一下吧:
1、做準備工作,例項化變數
2、檢查Put和Delete裡面的列族是否和Region持有的列族的定義相同。
3、給Row加鎖,先計算hash值做key,如果該key沒上過鎖,就上一把鎖,然後計算出來要寫的action有多少個,記錄到numReadyToWrite。
4、更新時間戳,把該action裡面的所有的kv的時間戳更新為最新的時間戳,它這裡也會把之前的沒執行的也一起更新。
5、給該region加鎖,這個時間點之後,就不允許讀了,等待時間需要根據numReadyToWrite的數量來計算。
6、上鎖之後,下面就是重頭戲了,也就是Put、Delete等的重點。給這些寫入memstore的資料建立一個批次號。
7、把kv們寫入到memstore當中,然後計算出來一個新增資料之後的新的MemStore的大小addedSize。
8、把kv新增到日誌當中,標誌狀態為成功,如果是使用者設定了不寫入日誌的,它就不寫入日誌了。
9、先非同步新增日誌。
10、釋放之前建立的鎖。
11、同步日誌。
12、結束該批次的操作。
Final、同步日誌沒成功的,最後根據批次回滾MemStore中的操作。
上面的過程適用於Put和Delete的批量操作,但是這裡總感覺很好奇,就這樣結束了,Put和Delete操作就沒區別嗎,那它怎麼刪除資料的?
返回在第4步更新時間戳的時候,發現了一些貓膩,Delete的情況執行了prepareDeleteTimestamps方法,看看吧。
void prepareDeleteTimestamps(Map<byte[], List<Cell>> familyMap, byte[] byteNow)
throws IOException {
for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
byte[] family = e.getKey();
List<Cell> cells = e.getValue();
//列和count的對映
Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
for (Cell cell: cells) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
// 如果是時間戳是最新的話就執行下面這些操作
if (kv.isLatestTimestamp() && kv.isDeleteType()) {
//new一個Get從Store裡面去搜索
} else {
kv.updateLatestStamp(byteNow);
}
}
}
}
看來一下程式碼,這裡是上來先判斷是否是最新的時間戳,我就回去看來一下Delete的建構函式,尼瑪。。。
public Delete(byte [] row) {
this(row, HConstants.LATEST_TIMESTAMP);
}
public Delete(byte [] row, long timestamp) {
this(row, 0, row.length, timestamp);
}
只傳了rowkey進去的,它就是最新的。。然後看了一下注釋,凡是在這個時間點之前的所有版本的所有列,我們都要刪除。
好吧,我們很無奈的宣佈,我們只能走kv.isLatestTimestamp() && kv.isDeleteType(),下面是沒放出來的程式碼。
byte[] qual = kv.getQualifier();
if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
//想到相同列的每次+1
Integer count = kvCount.get(qual);
if (count == null) {
kvCount.put(qual, 1);
} else {
kvCount.put(qual, count + 1);
}
//更新之後把最新的count數量
count = kvCount.get(qual);
Get get = new Get(kv.getRow());
get.setMaxVersions(count);
get.addColumn(family, qual);
//從store當中取出相應的result來
List<Cell> result = get(get, false);
if (result.size() < count) {
// Nothing to delete 數量不夠。。 更新最新的時間戳為現在的時間
kv.updateLatestStamp(byteNow);
continue;
}
//數量超過了也不行
if (result.size() > count) {
throw new RuntimeException("Unexpected size: " + result.size());
}
//取最後一個的時間戳
KeyValue getkv = KeyValueUtil.ensureKeyValue(result.get(count - 1));
//更新kv的時間戳為getkv的時間戳
Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(),
getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
這裡又幹了一個Get操作,把列族的多個版本的內容取出來,如果數量不符合預期也會有問題,但是這後面操作的中心思想就是:
(a)按照預期來說,取出來的少了,就設定刪除的時間戳為現在;
(b)取出來的多了,就報錯;
(c)剛好的,就把Delete的時間戳設定為最大的那個的時間戳,但即便是這樣也沒有刪除資料。
回到這裡我又想起來,只有在Compaction之後,hbase的檔案才會變小,難道是在那個時候刪除的?那在刪除之前,我們進行Get或者Scan操作的時候,會不會讀到這些沒有被刪除的資料呢?
好,讓我們拭目以待。