HBase 0.94.8 split 原始碼分析
阿新 • • 發佈:2019-01-04
1. 發起 hbase split
1.1 HBaseAdmin.split
1.3 HRegionServer.splitRegion 對Region進行split
2. 確定split point
2.2 HRegion.checkSplit
2.3 RegionSplitPolicy.getSplitPoint 具體獲得分割點方法
3.執行split
3.1 CompactSplitThread.requestSplit
3.2 SplitRequest.run
3.3 SplitTransaction.execute
3.3.1 SplitTransaction.createDaughters 建立兩個region,獲得parent region的寫鎖
3.3.2 SplitTransaction.openDaughters 開啟兩個子region
a)DaughterOpener,開啟region。(會呼叫openDaughterRegion,最底層會呼叫HRegion.openHRegion )
1).向hdfs上寫入.regionInfo檔案以便meta掛掉以便恢復
2).初始化其下的HStore,主要是LoadStoreFiles函式:
該函式會構造storefile物件,從hdfs上獲取路徑和檔案,每個檔案一個
storefile物件,對每個storefile物件會讀取檔案上的內容建立一個
HalfStoreFileReader讀物件來操作該region的父region上的相應的檔案,及該
region上目前儲存的是引用檔案,其指向的是其父region上的相應的檔案,對該
region的所有讀或寫都將關聯到父region上。
b).services.addToOnlineRegions 將子Region新增到rs的online region列表上,並新增到meta表上。
3.3.3 HRegion.openHRegion
3.4 SplitTransaction.transitionZKNode 修改zk節點狀態,等待split結束
1.1 HBaseAdmin.split
1.2 HBaseAdmin.split/** * Split a table or an individual region. * Asynchronous operation. * * @param tableNameOrRegionName table to region to split * @param splitPoint the explicit position to split on * @throws IOException if a remote or network exception occurs * @throws InterruptedException interrupt exception occurred */ public void split(final byte [] tableNameOrRegionName, final byte [] splitPoint) throws IOException, InterruptedException { CatalogTracker ct = getCatalogTracker(); try { Pair<HRegionInfo, ServerName> regionServerPair = getRegion(tableNameOrRegionName, ct); //如果tableNameOrRegionName是RegionName則可以獲得Pair<HRegionInfo, ServerName>,否則為空 if (regionServerPair != null) { if (regionServerPair.getSecond() == null) { throw new NoServerForRegionException(Bytes.toStringBinary(tableNameOrRegionName)); } else { //split region 重點分析方法 split(regionServerPair.getSecond(), regionServerPair.getFirst(), splitPoint); } } else { //如果tableNameOrRegionName為表名稱則進入這個分支 final String tableName = tableNameString(tableNameOrRegionName, ct); List<Pair<HRegionInfo, ServerName>> pairs = MetaReader.getTableRegionsAndLocations(ct, tableName);//獲得tableName這個表的所有region的HRegionInfo和對應的ServerName //如果splitPoint為空則會對所有region執行split,如果非空則只對包含splitPoint的region執行split. for (Pair<HRegionInfo, ServerName> pair: pairs) { // May not be a server for a particular row if (pair.getSecond() == null) continue; HRegionInfo r = pair.getFirst(); // check for parents if (r.isSplitParent()) continue; // if a split point given, only split that particular region if (splitPoint != null && !r.containsRow(splitPoint)) continue; // call out to region server to do split now split(pair.getSecond(), pair.getFirst(), splitPoint); } } } finally { cleanupCatalogTracker(ct); } }
//這個函式為上面函式中呼叫的split private void split(final ServerName sn, final HRegionInfo hri, byte[] splitPoint) throws IOException { HRegionInterface rs = this.connection.getHRegionConnection(sn.getHostname(), sn.getPort());//獲得HRegionServer例項 rs.splitRegion(hri, splitPoint);//呼叫HRegionServer.splitRegion對region進行split }
1.3 HRegionServer.splitRegion 對Region進行split
public void splitRegion(HRegionInfo regionInfo, byte[] splitPoint) throws NotServingRegionException, IOException { checkOpen();//檢查server和hdfs是否可用 HRegion region = getRegion(regionInfo.getRegionName());//根據RegionName獲得region region.flushcache();//flush cache 有幾種情況不進行flush,the cache is empty | the region is closed | a flush is already in progress | writes are disabled region.forceSplit(splitPoint);//設定split point compactSplitThread.requestSplit(region, region.checkSplit()); //通過region.checkSplit()獲取split point,進行split }
2. 確定split point
2.2 HRegion.checkSplit
public byte[] checkSplit() {
// Can't split ROOT/META
if (this.regionInfo.isMetaTable()) {
if (shouldForceSplit()) {
LOG.warn("Cannot split root/meta regions in HBase 0.20 and above");
}
return null;
}
if (!splitPolicy.shouldSplit()) {
return null;
}
byte[] ret = splitPolicy.getSplitPoint();
if (ret != null) {
try {
checkRow(ret, "calculated split");
} catch (IOException e) {
LOG.error("Ignoring invalid split", e);
return null;
}
}
return ret;
}
2.3 RegionSplitPolicy.getSplitPoint 具體獲得分割點方法
//如果region設定了split Point,則返回設定的split Point。否則,獲取store的midkey作為splitpoint
protected byte[] getSplitPoint() {
byte[] explicitSplitPoint = this.region.getExplicitSplitPoint();
if (explicitSplitPoint != null) {
return explicitSplitPoint;
}
Map<byte[], Store> stores = region.getStores();
byte[] splitPointFromLargestStore = null;
long largestStoreSize = 0;
for (Store s : stores.values()) {
byte[] splitPoint = s.getSplitPoint();
long storeSize = s.getSize();
if (splitPoint != null && largestStoreSize < storeSize) {
splitPointFromLargestStore = splitPoint;
largestStoreSize = storeSize;
}
}
return splitPointFromLargestStore;
}
3.執行split
3.1 CompactSplitThread.requestSplit
public synchronized void requestSplit(final HRegion r, byte[] midKey) {
if (midKey == null) {
LOG.debug("Region " + r.getRegionNameAsString() +
" not splittable because midkey=null");
return;
}
try {
this.splits.execute(new SplitRequest(r, midKey, this.server));
if (LOG.isDebugEnabled()) {
LOG.debug("Split requested for " + r + ". " + this);
}
} catch (RejectedExecutionException ree) {
LOG.info("Could not execute split for " + r, ree);
}
}
3.2 SplitRequest.run
public void run() {
if (this.server.isStopping() || this.server.isStopped()) {
LOG.debug("Skipping split because server is stopping=" +
this.server.isStopping() + " or stopped=" + this.server.isStopped());
return;
}
try {
final long startTime = System.currentTimeMillis();
SplitTransaction st = new SplitTransaction(parent, midKey);
// If prepare does not return true, for some reason -- logged inside in
// the prepare call -- we are not ready to split just now. Just return.
//SplitTransaction.prepare()初始化SplitTransaction物件中的兩個子region。同時做一些檢測比如splitrow必須被region包含等
if (!st.prepare()) return;
try {
st.execute(this.server, this.server);
this.server.getMetrics().incrementSplitSuccessCount();
} catch (Exception e) {
if (this.server.isStopping() || this.server.isStopped()) {
LOG.info(
"Skip rollback/cleanup of failed split of "
+ parent.getRegionNameAsString() + " because server is"
+ (this.server.isStopping() ? " stopping" : " stopped"), e);
return;
}
try {
LOG.info("Running rollback/cleanup of failed split of " +
parent.getRegionNameAsString() + "; " + e.getMessage(), e);
if (st.rollback(this.server, this.server)) {
LOG.info("Successful rollback of failed split of " +
parent.getRegionNameAsString());
this.server.getMetrics().incrementSplitFailureCount();
} else {
this.server.abort("Abort; we got an error after point-of-no-return");
}
} catch (RuntimeException ee) {
........
}
3.3 SplitTransaction.execute
/**
* Run the transaction.
* @param server Hosting server instance. Can be null when testing (won't try
* and update in zk if a null server)
* @param services Used to online/offline regions.
* @throws IOException If thrown, transaction failed. Call {@link #rollback(Server, RegionServerServices)}
* @return Regions created
* @throws IOException
* @see #rollback(Server, RegionServerServices)
*/
public PairOfSameType<HRegion> execute(final Server server,
final RegionServerServices services)
throws IOException {
PairOfSameType<HRegion> regions = createDaughters(server, services);
//建立split臨時目錄,改變region zk狀態,關閉region,停止所有store服務
//建立daughter目錄,將region storefile放入目錄中
//建立子region A、B,在zk上註冊,並且設定原HRI下線
openDaughters(server, services, regions.getFirst(), regions.getSecond());
transitionZKNode(server, services, regions.getFirst(), regions.getSecond());
return regions;
}
3.3.1 SplitTransaction.createDaughters 建立兩個region,獲得parent region的寫鎖
/**
* Prepare the regions and region files.
* @param server Hosting server instance. Can be null when testing (won't try
* and update in zk if a null server)
* @param services Used to online/offline regions.
* @throws IOException If thrown, transaction failed. Call {@link #rollback(Server, RegionServerServices)}
* @return Regions created
*/
/* package */PairOfSameType<HRegion> createDaughters(final Server server,
final RegionServerServices services) throws IOException {
LOG.info("Starting split of region " + this.parent);
if ((server != null && server.isStopped()) ||
(services != null && services.isStopping())) {
throw new IOException("Server is stopped or stopping");
}
assert !this.parent.lock.writeLock().isHeldByCurrentThread(): "Unsafe to hold write lock while performing RPCs";
// Coprocessor callback
//這個就是觸發BaseRegionObserver.preSplit的源頭
if (this.parent.getCoprocessorHost() != null) {
this.parent.getCoprocessorHost().preSplit();
}
// If true, no cluster to write meta edits to or to update znodes in.
boolean testing = server == null? true:
server.getConfiguration().getBoolean("hbase.testing.nocluster", false);
this.fileSplitTimeout = testing ? this.fileSplitTimeout :
server.getConfiguration().getLong("hbase.regionserver.fileSplitTimeout",
this.fileSplitTimeout);
// Set ephemeral SPLITTING znode up in zk. Mocked servers sometimes don't
// have zookeeper so don't do zk stuff if server or zookeeper is null
if (server != null && server.getZooKeeper() != null) {
try {
// 1. 在zk上建立一個臨時的node splitting point
createNodeSplitting(server.getZooKeeper(),
this.parent.getRegionInfo(), server.getServerName());
} catch (KeeperException e) {
throw new IOException("Failed creating SPLITTING znode on " +
this.parent.getRegionNameAsString(), e);
}
}
//記錄了進度在 private final List<JournalEntry> journal = new ArrayList<JournalEntry>();中
this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK);
if (server != null && server.getZooKeeper() != null) {
try {
// Transition node from SPLITTING to SPLITTING after creating the split node.
// Master will get the callback for node change only if the transition is successful.
// Note that if the transition fails then the rollback will delete the created znode
// TODO : May be we can add some new state to znode and handle the new state incase of success/failure
// 2. 等待master直到這個region轉為splitting狀態
this.znodeVersion = transitionNodeSplitting(server.getZooKeeper(),
this.parent.getRegionInfo(), server.getServerName(), -1);
} catch (KeeperException e) {
throw new IOException("Failed setting SPLITTING znode on "
+ this.parent.getRegionNameAsString(), e);
}
}
// 3. 建立splitting的資料夾
createSplitDir(this.parent.getFilesystem(), this.splitdir);
this.journal.add(JournalEntry.CREATE_SPLIT_DIR);
List<StoreFile> hstoreFilesToSplit = null;
Exception exceptionToThrow = null;
try{
// 4. 等待region的flush和compact都完成後,關閉這個region
hstoreFilesToSplit = this.parent.close(false);
} catch (Exception e) {
exceptionToThrow = e;
}
if (exceptionToThrow == null && hstoreFilesToSplit == null) {
// The region was closed by a concurrent thread. We can't continue
// with the split, instead we must just abandon the split. If we
// reopen or split this could cause problems because the region has
// probably already been moved to a different server, or is in the
// process of moving to a different server.
exceptionToThrow = closedByOtherException;
}
if (exceptionToThrow != closedByOtherException) {
this.journal.add(JournalEntry.CLOSED_PARENT_REGION);
}
if (exceptionToThrow != null) {
if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow;
throw new IOException(exceptionToThrow);
}
if (!testing) {
// 5. 從HRegionServer上移除,加入到下線region中
services.removeFromOnlineRegions(this.parent.getRegionInfo().getEncodedName());
}
this.journal.add(JournalEntry.OFFLINED_PARENT);
// TODO: If splitStoreFiles were multithreaded would we complete steps in
// less elapsed time? St.Ack 20100920
//
// splitStoreFiles creates daughter region dirs under the parent splits dir
// Nothing to unroll here if failure -- clean up of CREATE_SPLIT_DIR will
// clean this up.
// 6. 進行regionsplit操作,建立執行緒池,用StoreFileSplitter類將region下的所有Hfile(StoreFile)進行split,
// (split row在hfile中的不管,其他的都進行引用,把引用檔案分別寫到region下邊)
splitStoreFiles(this.splitdir, hstoreFilesToSplit);
// Log to the journal that we are creating region A, the first daughter
// region. We could fail halfway through. If we do, we could have left
// stuff in fs that needs cleanup -- a storefile or two. Thats why we
// add entry to journal BEFORE rather than AFTER the change.
// 7. 生成左右兩個子region,刪除meta上parent,根據引用檔案生成子region的regioninfo,寫到hdfs上
this.journal.add(JournalEntry.STARTED_REGION_A_CREATION);
HRegion a = createDaughterRegion(this.hri_a, this.parent.rsServices);
// Ditto
this.journal.add(JournalEntry.STARTED_REGION_B_CREATION);
HRegion b = createDaughterRegion(this.hri_b, this.parent.rsServices);
// This is the point of no return. Adding subsequent edits to .META. as we
// do below when we do the daughter opens adding each to .META. can fail in
// various interesting ways the most interesting of which is a timeout
// BUT the edits all go through (See HBASE-3872). IF we reach the PONR
// then subsequent failures need to crash out this regionserver; the
// server shutdown processing should be able to fix-up the incomplete split.
// The offlined parent will have the daughters as extra columns. If
// we leave the daughter regions in place and do not remove them when we
// crash out, then they will have their references to the parent in place
// still and the server shutdown fixup of .META. will point to these
// regions.
// We should add PONR JournalEntry before offlineParentInMeta,so even if
// OfflineParentInMeta timeout,this will cause regionserver exit,and then
// master ServerShutdownHandler will fix daughter & avoid data loss. (See
// HBase-4562).
this.journal.add(JournalEntry.PONR);
// Edit parent in meta. Offlines parent region and adds splita and splitb.
if (!testing) {
MetaEditor.offlineParentInMeta(server.getCatalogTracker(),
this.parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo());
}
return new PairOfSameType<HRegion>(a, b);
}
3.3.2 SplitTransaction.openDaughters 開啟兩個子region
/**
* Perform time consuming opening of the daughter regions.
* @param server Hosting server instance. Can be null when testing (won't try
* and update in zk if a null server)
* @param services Used to online/offline regions.
* @param a first daughter region
* @param a second daughter region
* @throws IOException If thrown, transaction failed. Call {@link #rollback(Server, RegionServerServices)}
*/
/* package */void openDaughters(final Server server,
final RegionServerServices services, HRegion a, HRegion b)
throws IOException {
boolean stopped = server != null && server.isStopped();
boolean stopping = services != null && services.isStopping();
// TODO: Is this check needed here?
if (stopped || stopping) {
LOG.info("Not opening daughters " +
b.getRegionInfo().getRegionNameAsString() +
" and " +
a.getRegionInfo().getRegionNameAsString() +
" because stopping=" + stopping + ", stopped=" + stopped);
} else {
// Open daughters in parallel.
//開啟兩個子region
//內層會呼叫HRegion.openHRegion去開啟一個Region,具體的初始化是在HRegion.initializeRegionInternals中
DaughterOpener aOpener = new DaughterOpener(server, a);
DaughterOpener bOpener = new DaughterOpener(server, b);
aOpener.start();
bOpener.start();
try {
aOpener.join();
bOpener.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted " + e.getMessage());
}
if (aOpener.getException() != null) {
throw new IOException("Failed " +
aOpener.getName(), aOpener.getException());
}
if (bOpener.getException() != null) {
throw new IOException("Failed " +
bOpener.getName(), bOpener.getException());
}
if (services != null) {
try {
// add 2nd daughter first (see HBASE-4335)
services.postOpenDeployTasks(b, server.getCatalogTracker(), true);
// Should add it to OnlineRegions
services.addToOnlineRegions(b);
services.postOpenDeployTasks(a, server.getCatalogTracker(), true);
services.addToOnlineRegions(a);
} catch (KeeperException ke) {
throw new IOException(ke);
}
}
}
}
a)DaughterOpener,開啟region。(會呼叫openDaughterRegion,最底層會呼叫HRegion.openHRegion )
1).向hdfs上寫入.regionInfo檔案以便meta掛掉以便恢復
2).初始化其下的HStore,主要是LoadStoreFiles函式:
該函式會構造storefile物件,從hdfs上獲取路徑和檔案,每個檔案一個
storefile物件,對每個storefile物件會讀取檔案上的內容建立一個
HalfStoreFileReader讀物件來操作該region的父region上的相應的檔案,及該
region上目前儲存的是引用檔案,其指向的是其父region上的相應的檔案,對該
region的所有讀或寫都將關聯到父region上。
b).services.addToOnlineRegions 將子Region新增到rs的online region列表上,並新增到meta表上。
3.3.3 HRegion.openHRegion
/**
* Open HRegion.
* Calls initialize and sets sequenceid.
* @param reporter
* @return Returns <code>this</code>
* @throws IOException
*/
protected HRegion openHRegion(final CancelableProgressable reporter)
throws IOException {
checkCompressionCodecs();
//初始化region,
//1.checkRegionInfoOnFilesystem將HRegionInfo寫入檔案
//2.cleanupTempDir 清空老region臨時目錄
//3.初始化HRegion store,載入hfile
//4.獲得recover.edit檔案,找到對應的store,將讀取的keyvalue輸出到store,恢復hregion
long seqid = initialize(reporter);
if (this.log != null) {
this.log.setSequenceNumber(seqid);
}
return this;
}
3.4 SplitTransaction.transitionZKNode 修改zk節點狀態,等待split結束
/**
* Finish off split transaction, transition the zknode
* @param server Hosting server instance. Can be null when testing (won't try
* and update in zk if a null server)
* @param services Used to online/offline regions.
* @param a first daughter region
* @param a second daughter region
* @throws IOException If thrown, transaction failed. Call {@link #rollback(Server, RegionServerServices)}
*/
/* package */void transitionZKNode(final Server server,
final RegionServerServices services, HRegion a, HRegion b)
throws IOException {
// Tell master about split by updating zk. If we fail, abort.
if (server != null && server.getZooKeeper() != null) {
try {
this.znodeVersion = transitionNodeSplit(server.getZooKeeper(),
parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(),
server.getServerName(), this.znodeVersion);
int spins = 0;
// Now wait for the master to process the split. We know it's done
// when the znode is deleted. The reason we keep tickling the znode is
// that it's possible for the master to miss an event.
do {
if (spins % 10 == 0) {
LOG.debug("Still waiting on the master to process the split for " +
this.parent.getRegionInfo().getEncodedName());
}
Thread.sleep(100);
// When this returns -1 it means the znode doesn't exist
this.znodeVersion = tickleNodeSplit(server.getZooKeeper(),
parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(),
server.getServerName(), this.znodeVersion);
spins++;
} while (this.znodeVersion != -1 && !server.isStopped()
&& !services.isStopping());
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new IOException("Failed telling master about split", e);
}
}
// Coprocessor callback
if (this.parent.getCoprocessorHost() != null) {
this.parent.getCoprocessorHost().postSplit(a,b);
}
// Leaving here, the splitdir with its dross will be in place but since the
// split was successful, just leave it; it'll be cleaned when parent is
// deleted and cleaned up.
}