1. 程式人生 > >HBase 0.94.8 split 原始碼分析

HBase 0.94.8 split 原始碼分析

1. 發起 hbase split
1.1 HBaseAdmin.split
  /**
   * Split a table or an individual region.
   * Asynchronous operation.
   *
   * @param tableNameOrRegionName table to region to split
   * @param splitPoint the explicit position to split on
   * @throws IOException if a remote or network exception occurs
   * @throws InterruptedException interrupt exception occurred
   */
  public void split(final byte [] tableNameOrRegionName,
      final byte [] splitPoint) throws IOException, InterruptedException {
    CatalogTracker ct = getCatalogTracker();
    try {
      Pair<HRegionInfo, ServerName> regionServerPair
        = getRegion(tableNameOrRegionName, ct); //如果tableNameOrRegionName是RegionName則可以獲得Pair<HRegionInfo, ServerName>,否則為空
      if (regionServerPair != null) {
        if (regionServerPair.getSecond() == null) {
            throw new NoServerForRegionException(Bytes.toStringBinary(tableNameOrRegionName));
        } else {
		  //split region 重點分析方法
          split(regionServerPair.getSecond(), regionServerPair.getFirst(), splitPoint);
        }
      } else {
		//如果tableNameOrRegionName為表名稱則進入這個分支
        final String tableName = tableNameString(tableNameOrRegionName, ct);
        List<Pair<HRegionInfo, ServerName>> pairs =
          MetaReader.getTableRegionsAndLocations(ct,
              tableName);//獲得tableName這個表的所有region的HRegionInfo和對應的ServerName
		//如果splitPoint為空則會對所有region執行split,如果非空則只對包含splitPoint的region執行split.
        for (Pair<HRegionInfo, ServerName> pair: pairs) {
          // May not be a server for a particular row
          if (pair.getSecond() == null) continue;
          HRegionInfo r = pair.getFirst();
          // check for parents
          if (r.isSplitParent()) continue;
          // if a split point given, only split that particular region
          if (splitPoint != null && !r.containsRow(splitPoint)) continue;
          // call out to region server to do split now
          split(pair.getSecond(), pair.getFirst(), splitPoint);
        }
      }
    } finally {
      cleanupCatalogTracker(ct);
    }
  }
1.2 HBaseAdmin.split
  //這個函式為上面函式中呼叫的split
  private void split(final ServerName sn, final HRegionInfo hri,
      byte[] splitPoint) throws IOException {
    HRegionInterface rs =
      this.connection.getHRegionConnection(sn.getHostname(), sn.getPort());//獲得HRegionServer例項
    rs.splitRegion(hri, splitPoint);//呼叫HRegionServer.splitRegion對region進行split
  }

1.3 HRegionServer.splitRegion 對Region進行split
  public void splitRegion(HRegionInfo regionInfo, byte[] splitPoint)
      throws NotServingRegionException, IOException {
    checkOpen();//檢查server和hdfs是否可用
    HRegion region = getRegion(regionInfo.getRegionName());//根據RegionName獲得region
    region.flushcache();//flush cache 有幾種情況不進行flush,the cache is empty | the region is closed | a flush is already in progress | writes are disabled
    region.forceSplit(splitPoint);//設定split point 
    compactSplitThread.requestSplit(region, region.checkSplit()); //通過region.checkSplit()獲取split point,進行split  
  }
  
2. 確定split point

2.2 HRegion.checkSplit 
  public byte[] checkSplit() {
    // Can't split ROOT/META
    if (this.regionInfo.isMetaTable()) {
      if (shouldForceSplit()) {
        LOG.warn("Cannot split root/meta regions in HBase 0.20 and above");
      }
      return null;
    }


    if (!splitPolicy.shouldSplit()) {
      return null;
    }


    byte[] ret = splitPolicy.getSplitPoint();


    if (ret != null) {
      try {
        checkRow(ret, "calculated split");
      } catch (IOException e) {
        LOG.error("Ignoring invalid split", e);
        return null;
      }
    }
    return ret;
  }
  
2.3 RegionSplitPolicy.getSplitPoint 具體獲得分割點方法
  //如果region設定了split Point,則返回設定的split Point。否則,獲取store的midkey作為splitpoint
  protected byte[] getSplitPoint() {
    byte[] explicitSplitPoint = this.region.getExplicitSplitPoint();
    if (explicitSplitPoint != null) {
      return explicitSplitPoint;
    }
    Map<byte[], Store> stores = region.getStores();


    byte[] splitPointFromLargestStore = null;
    long largestStoreSize = 0;
    for (Store s : stores.values()) {
      byte[] splitPoint = s.getSplitPoint();
      long storeSize = s.getSize();
      if (splitPoint != null && largestStoreSize < storeSize) {
        splitPointFromLargestStore = splitPoint;
        largestStoreSize = storeSize;
      }
    }
    
    return splitPointFromLargestStore;
  }
  
3.執行split

3.1 CompactSplitThread.requestSplit

  public synchronized void requestSplit(final HRegion r, byte[] midKey) {
    if (midKey == null) {
      LOG.debug("Region " + r.getRegionNameAsString() +
        " not splittable because midkey=null");
      return;
    }
    try {
      this.splits.execute(new SplitRequest(r, midKey, this.server));
      if (LOG.isDebugEnabled()) {
        LOG.debug("Split requested for " + r + ".  " + this);
      }
    } catch (RejectedExecutionException ree) {
      LOG.info("Could not execute split for " + r, ree);
    }
  }
  
3.2 SplitRequest.run 
  public void run() {
    if (this.server.isStopping() || this.server.isStopped()) {
      LOG.debug("Skipping split because server is stopping=" +
        this.server.isStopping() + " or stopped=" + this.server.isStopped());
      return;
    }
    try {
      final long startTime = System.currentTimeMillis();
      SplitTransaction st = new SplitTransaction(parent, midKey);
      // If prepare does not return true, for some reason -- logged inside in
      // the prepare call -- we are not ready to split just now. Just return.
	  //SplitTransaction.prepare()初始化SplitTransaction物件中的兩個子region。同時做一些檢測比如splitrow必須被region包含等
      if (!st.prepare()) return;
      try {
        st.execute(this.server, this.server);
        this.server.getMetrics().incrementSplitSuccessCount();
      } catch (Exception e) {
        if (this.server.isStopping() || this.server.isStopped()) {
          LOG.info(
              "Skip rollback/cleanup of failed split of "
                  + parent.getRegionNameAsString() + " because server is"
                  + (this.server.isStopping() ? " stopping" : " stopped"), e);
          return;
        }
        try {
          LOG.info("Running rollback/cleanup of failed split of " +
            parent.getRegionNameAsString() + "; " + e.getMessage(), e);
          if (st.rollback(this.server, this.server)) {
            LOG.info("Successful rollback of failed split of " +
              parent.getRegionNameAsString());
            this.server.getMetrics().incrementSplitFailureCount();
          } else {
            this.server.abort("Abort; we got an error after point-of-no-return");
          }
        } catch (RuntimeException ee) {
        ........
  }

3.3 SplitTransaction.execute
  /**
   * Run the transaction.
   * @param server Hosting server instance.  Can be null when testing (won't try
   * and update in zk if a null server)
   * @param services Used to online/offline regions.
   * @throws IOException If thrown, transaction failed. Call {@link #rollback(Server, RegionServerServices)}
   * @return Regions created
   * @throws IOException
   * @see #rollback(Server, RegionServerServices)
   */
  public PairOfSameType<HRegion> execute(final Server server,
      final RegionServerServices services)
  throws IOException {
    PairOfSameType<HRegion> regions = createDaughters(server, services);
	//建立split臨時目錄,改變region zk狀態,關閉region,停止所有store服務  
	//建立daughter目錄,將region storefile放入目錄中  
	//建立子region A、B,在zk上註冊,並且設定原HRI下線
    openDaughters(server, services, regions.getFirst(), regions.getSecond());
    transitionZKNode(server, services, regions.getFirst(), regions.getSecond());
    return regions;
  }

3.3.1 SplitTransaction.createDaughters 建立兩個region,獲得parent region的寫鎖
  /**
   * Prepare the regions and region files.
   * @param server Hosting server instance.  Can be null when testing (won't try
   * and update in zk if a null server)
   * @param services Used to online/offline regions.
   * @throws IOException If thrown, transaction failed. Call {@link #rollback(Server, RegionServerServices)}
   * @return Regions created
   */
  /* package */PairOfSameType<HRegion> createDaughters(final Server server,
      final RegionServerServices services) throws IOException {
    LOG.info("Starting split of region " + this.parent);
    if ((server != null && server.isStopped()) ||
        (services != null && services.isStopping())) {
      throw new IOException("Server is stopped or stopping");
    }
    assert !this.parent.lock.writeLock().isHeldByCurrentThread(): "Unsafe to hold write lock while performing RPCs";


    // Coprocessor callback
	//這個就是觸發BaseRegionObserver.preSplit的源頭
    if (this.parent.getCoprocessorHost() != null) {
      this.parent.getCoprocessorHost().preSplit();
    }


    // If true, no cluster to write meta edits to or to update znodes in.
    boolean testing = server == null? true:
      server.getConfiguration().getBoolean("hbase.testing.nocluster", false);
    this.fileSplitTimeout = testing ? this.fileSplitTimeout :
      server.getConfiguration().getLong("hbase.regionserver.fileSplitTimeout",
          this.fileSplitTimeout);


    // Set ephemeral SPLITTING znode up in zk.  Mocked servers sometimes don't
    // have zookeeper so don't do zk stuff if server or zookeeper is null
    if (server != null && server.getZooKeeper() != null) {
      try {
		// 1. 在zk上建立一個臨時的node splitting point
        createNodeSplitting(server.getZooKeeper(),
          this.parent.getRegionInfo(), server.getServerName());
      } catch (KeeperException e) {
        throw new IOException("Failed creating SPLITTING znode on " +
          this.parent.getRegionNameAsString(), e);
      }
    }
	//記錄了進度在 private final List<JournalEntry> journal = new ArrayList<JournalEntry>();中
    this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK);
    if (server != null && server.getZooKeeper() != null) {
      try {
        // Transition node from SPLITTING to SPLITTING after creating the split node.
        // Master will get the callback for node change only if the transition is successful.
        // Note that if the transition fails then the rollback will delete the created znode
        // TODO : May be we can add some new state to znode and handle the new state incase of success/failure
        // 2. 等待master直到這個region轉為splitting狀態
		this.znodeVersion = transitionNodeSplitting(server.getZooKeeper(),
            this.parent.getRegionInfo(), server.getServerName(), -1);
      } catch (KeeperException e) {
        throw new IOException("Failed setting SPLITTING znode on "
            + this.parent.getRegionNameAsString(), e);
      }
    }
	// 3. 建立splitting的資料夾
    createSplitDir(this.parent.getFilesystem(), this.splitdir);
    this.journal.add(JournalEntry.CREATE_SPLIT_DIR);
 
    List<StoreFile> hstoreFilesToSplit = null;
    Exception exceptionToThrow = null;
    try{
	  // 4. 等待region的flush和compact都完成後,關閉這個region
      hstoreFilesToSplit = this.parent.close(false);
    } catch (Exception e) {
      exceptionToThrow = e;
    }
    if (exceptionToThrow == null && hstoreFilesToSplit == null) {
      // The region was closed by a concurrent thread.  We can't continue
      // with the split, instead we must just abandon the split.  If we
      // reopen or split this could cause problems because the region has
      // probably already been moved to a different server, or is in the
      // process of moving to a different server.
      exceptionToThrow = closedByOtherException;
    }
    if (exceptionToThrow != closedByOtherException) {
      this.journal.add(JournalEntry.CLOSED_PARENT_REGION);
    }
    if (exceptionToThrow != null) {
      if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow;
      throw new IOException(exceptionToThrow);
    }


    if (!testing) {
	  // 5. 從HRegionServer上移除,加入到下線region中
      services.removeFromOnlineRegions(this.parent.getRegionInfo().getEncodedName());
    }
    this.journal.add(JournalEntry.OFFLINED_PARENT);


    // TODO: If splitStoreFiles were multithreaded would we complete steps in
    // less elapsed time?  St.Ack 20100920
    //
    // splitStoreFiles creates daughter region dirs under the parent splits dir
    // Nothing to unroll here if failure -- clean up of CREATE_SPLIT_DIR will
    // clean this up.
	// 6. 進行regionsplit操作,建立執行緒池,用StoreFileSplitter類將region下的所有Hfile(StoreFile)進行split,
    //  (split row在hfile中的不管,其他的都進行引用,把引用檔案分別寫到region下邊)
    splitStoreFiles(this.splitdir, hstoreFilesToSplit);


    // Log to the journal that we are creating region A, the first daughter
    // region.  We could fail halfway through.  If we do, we could have left
    // stuff in fs that needs cleanup -- a storefile or two.  Thats why we
    // add entry to journal BEFORE rather than AFTER the change.
	// 7. 生成左右兩個子region,刪除meta上parent,根據引用檔案生成子region的regioninfo,寫到hdfs上
    this.journal.add(JournalEntry.STARTED_REGION_A_CREATION);
    HRegion a = createDaughterRegion(this.hri_a, this.parent.rsServices);


    // Ditto
    this.journal.add(JournalEntry.STARTED_REGION_B_CREATION);
    HRegion b = createDaughterRegion(this.hri_b, this.parent.rsServices);


    // This is the point of no return.  Adding subsequent edits to .META. as we
    // do below when we do the daughter opens adding each to .META. can fail in
    // various interesting ways the most interesting of which is a timeout
    // BUT the edits all go through (See HBASE-3872).  IF we reach the PONR
    // then subsequent failures need to crash out this regionserver; the
    // server shutdown processing should be able to fix-up the incomplete split.
    // The offlined parent will have the daughters as extra columns.  If
    // we leave the daughter regions in place and do not remove them when we
    // crash out, then they will have their references to the parent in place
    // still and the server shutdown fixup of .META. will point to these
    // regions.
    // We should add PONR JournalEntry before offlineParentInMeta,so even if
    // OfflineParentInMeta timeout,this will cause regionserver exit,and then
    // master ServerShutdownHandler will fix daughter & avoid data loss. (See 
    // HBase-4562).
    this.journal.add(JournalEntry.PONR);


    // Edit parent in meta.  Offlines parent region and adds splita and splitb.
    if (!testing) {
      MetaEditor.offlineParentInMeta(server.getCatalogTracker(),
        this.parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo());
    }
    return new PairOfSameType<HRegion>(a, b);
  }
  
3.3.2 SplitTransaction.openDaughters 開啟兩個子region
 
  /**
   * Perform time consuming opening of the daughter regions.
   * @param server Hosting server instance.  Can be null when testing (won't try
   * and update in zk if a null server)
   * @param services Used to online/offline regions.
   * @param a first daughter region
   * @param a second daughter region
   * @throws IOException If thrown, transaction failed. Call {@link #rollback(Server, RegionServerServices)}
   */
  /* package */void openDaughters(final Server server,
      final RegionServerServices services, HRegion a, HRegion b)
      throws IOException {
    boolean stopped = server != null && server.isStopped();
    boolean stopping = services != null && services.isStopping();
    // TODO: Is this check needed here?
    if (stopped || stopping) {
      LOG.info("Not opening daughters " +
          b.getRegionInfo().getRegionNameAsString() +
          " and " +
          a.getRegionInfo().getRegionNameAsString() +
          " because stopping=" + stopping + ", stopped=" + stopped);
    } else {
      // Open daughters in parallel.
	  //開啟兩個子region
	  //內層會呼叫HRegion.openHRegion去開啟一個Region,具體的初始化是在HRegion.initializeRegionInternals中
      DaughterOpener aOpener = new DaughterOpener(server, a);
      DaughterOpener bOpener = new DaughterOpener(server, b);
      aOpener.start();
      bOpener.start();
      try {
        aOpener.join();
        bOpener.join();
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new IOException("Interrupted " + e.getMessage());
      }
      if (aOpener.getException() != null) {
        throw new IOException("Failed " +
          aOpener.getName(), aOpener.getException());
      }
      if (bOpener.getException() != null) {
        throw new IOException("Failed " +
          bOpener.getName(), bOpener.getException());
      }
      if (services != null) {
        try {
          // add 2nd daughter first (see HBASE-4335)
          services.postOpenDeployTasks(b, server.getCatalogTracker(), true);
          // Should add it to OnlineRegions
          services.addToOnlineRegions(b);
          services.postOpenDeployTasks(a, server.getCatalogTracker(), true);
          services.addToOnlineRegions(a);
        } catch (KeeperException ke) {
          throw new IOException(ke);
        }
      }
    }
  }




a)DaughterOpener,開啟region。(會呼叫openDaughterRegion,最底層會呼叫HRegion.openHRegion )
1).向hdfs上寫入.regionInfo檔案以便meta掛掉以便恢復 
    2).初始化其下的HStore,主要是LoadStoreFiles函式: 
     該函式會構造storefile物件,從hdfs上獲取路徑和檔案,每個檔案一個
     storefile物件,對每個storefile物件會讀取檔案上的內容建立一個 
      HalfStoreFileReader讀物件來操作該region的父region上的相應的檔案,及該 
      region上目前儲存的是引用檔案,其指向的是其父region上的相應的檔案,對該 
      region的所有讀或寫都將關聯到父region上。
    b).services.addToOnlineRegions 將子Region新增到rs的online region列表上,並新增到meta表上。

3.3.3 HRegion.openHRegion
  /**
   * Open HRegion.
   * Calls initialize and sets sequenceid.
   * @param reporter
   * @return Returns <code>this</code>
   * @throws IOException
   */
  protected HRegion openHRegion(final CancelableProgressable reporter)
  throws IOException {
    checkCompressionCodecs();
	//初始化region,  
	//1.checkRegionInfoOnFilesystem將HRegionInfo寫入檔案  
	//2.cleanupTempDir 清空老region臨時目錄  
	//3.初始化HRegion store,載入hfile  
	//4.獲得recover.edit檔案,找到對應的store,將讀取的keyvalue輸出到store,恢復hregion  
    long seqid = initialize(reporter);
    if (this.log != null) {
      this.log.setSequenceNumber(seqid);
    }
    return this;
  }

  
3.4 SplitTransaction.transitionZKNode  修改zk節點狀態,等待split結束

  /**
   * Finish off split transaction, transition the zknode
   * @param server Hosting server instance.  Can be null when testing (won't try
   * and update in zk if a null server)
   * @param services Used to online/offline regions.
   * @param a first daughter region
   * @param a second daughter region
   * @throws IOException If thrown, transaction failed. Call {@link #rollback(Server, RegionServerServices)}
   */
  /* package */void transitionZKNode(final Server server,
      final RegionServerServices services, HRegion a, HRegion b)
      throws IOException {
    // Tell master about split by updating zk.  If we fail, abort.
    if (server != null && server.getZooKeeper() != null) {
      try {
        this.znodeVersion = transitionNodeSplit(server.getZooKeeper(),
          parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(),
          server.getServerName(), this.znodeVersion);


        int spins = 0;
        // Now wait for the master to process the split. We know it's done
        // when the znode is deleted. The reason we keep tickling the znode is
        // that it's possible for the master to miss an event.
        do {
          if (spins % 10 == 0) {
            LOG.debug("Still waiting on the master to process the split for " +
                this.parent.getRegionInfo().getEncodedName());
          }
          Thread.sleep(100);
          // When this returns -1 it means the znode doesn't exist
          this.znodeVersion = tickleNodeSplit(server.getZooKeeper(),
            parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(),
            server.getServerName(), this.znodeVersion);
          spins++;
        } while (this.znodeVersion != -1 && !server.isStopped()
            && !services.isStopping());
      } catch (Exception e) {
        if (e instanceof InterruptedException) {
          Thread.currentThread().interrupt();
        }
        throw new IOException("Failed telling master about split", e);
      }
    }


    // Coprocessor callback
    if (this.parent.getCoprocessorHost() != null) {
      this.parent.getCoprocessorHost().postSplit(a,b);
    }


    // Leaving here, the splitdir with its dross will be in place but since the
    // split was successful, just leave it; it'll be cleaned when parent is
    // deleted and cleaned up.
  }