1. 程式人生 > 其它 >hbase原始碼系列(一)Balancer 負載均衡

hbase原始碼系列(一)Balancer 負載均衡

  看原始碼很久了,終於開始動手寫部落格了,為什麼是先寫負載均衡呢,因為一個室友入職新公司了,然後他們遇到這方面的問題,某些機器的硬碟使用明顯比別的機器要多,每次用hadoop做完負載均衡,很快又變回來了。

  首先我們先看HMaster當中怎麼初始化Balancer的,把叢集的狀態穿進去,設定master,然後執行初始化。

//initialize load balancer
this.balancer.setClusterStatus(getClusterStatus());
this.balancer.setMasterServices(this);
this.balancer.initialize();

  然後呼叫是在HMaster的balance()方法當中呼叫

Map<TableName, Map<ServerName, List<HRegionInfo>>> assignmentsByTable =
        this.assignmentManager.getRegionStates().getAssignmentsByTable();

List<RegionPlan> plans = new ArrayList<RegionPlan>();
//Give the balancer the current cluster state.
this.balancer.setClusterStatus(getClusterStatus());
//針對表來做平衡,返回平衡方案,針對全域性,可能不是最優解
for (Map<ServerName, List<HRegionInfo>> assignments : assignmentsByTable.values()) {
    List<RegionPlan> partialPlans = this.balancer.balanceCluster(assignments);
    if (partialPlans != null) plans.addAll(partialPlans);
}

  可以看到它首先獲取了當前的叢集的分配情況,這個分配情況是根據表的 Map<TableName, Map<ServerName, List<HRegionInfo>>,然後遍歷這個map的values,呼叫balancer.balanceCluster(assignments) 來生成一個partialPlans,生成RegionPlan(Region的移動計劃) 。

  我們就可以切換到StochasticLoadBalancer當中了,這個是預設Balancer具體的實現了,也是最好的實現,下面就說說這玩意兒咋實現的。

  看一下注釋,這個玩意兒吹得神乎其神的,它說它考慮到了這麼多因素:

* <ul>
 * <li>Region Load</li> Region的負載
 * <li>Table Load</li>  表的負載
 * <li>Data Locality</li> 資料本地性
 * <li>Memstore Sizes</li> 記憶體Memstore的大小
 * <li>Storefile Sizes</li> 硬碟儲存檔案的大小
 * </ul>

  好,我們從balanceCluster開始看吧,一進來第一件事就是判斷是否需要平衡

//不需要平衡就退出
if (!needsBalance(new ClusterLoadState(clusterState))) {
   return null;
}

  平衡的條件是:負載最大值和最小值要在平均值(region數/server數)的+-slop值之間, 但是這個平均值是基於表的,因為我們傳進去的引數clusterState就是基於表的。

// Check if we even need to do any load balancing
// HBASE-3681 check sloppiness first
float average = cs.getLoadAverage(); // for logging
//叢集的負載最大值和最小值要在平均值的+-slop值之間
int floor = (int) Math.floor(average * (1 - slop));
int ceiling = (int) Math.ceil(average * (1 + slop));
if (!(cs.getMinLoad() > ceiling || cs.getMaxLoad() < floor)) {
    .....return false;
}
return true;

   如果需要平衡的話,就開始計算開銷了

// Keep track of servers to iterate through them.
Cluster cluster = new Cluster(clusterState, loads, regionFinder);
//計算出來當前的開銷    
double currentCost = computeCost(cluster, Double.MAX_VALUE);
double initCost = currentCost;
double newCost = currentCost;

   上面的被我清除了細枝末節之後的程式碼主體,okay,上面邏輯過程如下:

1. 生成一個虛擬的叢集cluster,方便計算計算當前狀態的開銷,其中clusterState是表的狀態,loads是整個叢集的狀態。

// Keep track of servers to iterate through them.
Cluster cluster = new Cluster(clusterState, loads, regionFinder);
//計算出來當前的開銷    
double currentCost = computeCost(cluster, Double.MAX_VALUE);
double initCost = currentCost;
double newCost = currentCost;

 2. 然後迴圈computedMaxSteps次,隨機從選出一個picker來計算平衡方案

int pickerIdx = RANDOM.nextInt(pickers.length);
RegionPicker p = pickers[pickerIdx];
//用選號器從叢集當中隨機跳出一對來,待處理的<server,region>對
Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> picks = p.pick(cluster);

  picker是啥?這裡面有三個,第一個是RandomRegionPicker是隨機挑選region,這裡就不詳細介紹了,主要討論後面兩個;第二個LoadPicker是計算負載的,第三個主要是考慮本地性的。

  給我感覺就很像ZF的搖號器一樣,用哪種演算法還要搖個號

pickers = new RegionPicker[] {
      new RandomRegionPicker(),
      new LoadPicker(),
      localityPicker
};

  下面我們先看localityPicker的pick方法,這個方法是隨機抽選出來一個server、region,找出region的其他本地機器,然後他們返回。

  @Override
    Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster) {
      if (this.masterServices == null) {
        return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
            new Pair<Integer, Integer>(-1,-1),
            new Pair<Integer, Integer>(-1,-1)
        );
      }
      // Pick a random region server 隨機選出一個server來
      int thisServer = pickRandomServer(cluster);

      // Pick a random region on this server 隨機選出region
      int thisRegion = pickRandomRegion(cluster, thisServer, 0.0f);

      if (thisRegion == -1) {
        return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
            new Pair<Integer, Integer>(-1,-1),
            new Pair<Integer, Integer>(-1,-1)
        );
      }

      // Pick the server with the highest locality 找出本地性最高的目標server
      int otherServer = pickHighestLocalityServer(cluster, thisServer, thisRegion);

      // pick an region on the other server to potentially swap
      int otherRegion = this.pickRandomRegion(cluster, otherServer, 0.5f);

      return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
          new Pair<Integer, Integer>(thisServer,thisRegion),
          new Pair<Integer, Integer>(otherServer,otherRegion)
      );
    }

   okay,這個結束了,下面我們看看LoadPicker吧。

  @Override
    Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster) {
      cluster.sortServersByRegionCount();
      //先挑選出負載最高的server
      int thisServer = pickMostLoadedServer(cluster, -1);
      //再選出除了負載最高的server之外負載最低的server
      int otherServer = pickLeastLoadedServer(cluster, thisServer);

      Pair<Integer, Integer> regions = pickRandomRegions(cluster, thisServer, otherServer);
      return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
          new Pair<Integer, Integer>(thisServer, regions.getFirst()),
          new Pair<Integer, Integer>(otherServer, regions.getSecond())

      );
    }

  這裡的負載高和負載低是按照Server上面的region數來算的,而不是儲存檔案啥的,選出負載最高和負載最低的時候,又隨機抽出region來返回了。

  pick挑選的過程介紹完了,那麼很明顯,計算才是重頭戲了,什麼樣的region會導致計算出來的分數高低呢?

3. 重點在計算函式上 computeCost(cluster, Double.MAX_VALUE) 結果這個函式也超級簡單,哈哈

protected double computeCost(Cluster cluster, double previousCost) {
    double total = 0;
    
    for (CostFunction c:costFunctions) {
      if (c.getMultiplier() <= 0) {
        continue;
      }

      total += c.getMultiplier() * c.cost(cluster);

      if (total > previousCost) {
        return total;
      }
    }
    return total;
  }

  遍歷CostFunction,拿cost的加權平均和計算出來。

  那costFunction裡面都有啥呢?localityCost又出現了,看來本地性是一個很大的考慮的情況。

costFunctions = new CostFunction[]{
      new RegionCountSkewCostFunction(conf),
      new MoveCostFunction(conf),
      localityCost,
      new TableSkewCostFunction(conf),
      regionLoadFunctions[0],
      regionLoadFunctions[1],
      regionLoadFunctions[2],
      regionLoadFunctions[3],
};

  可以看出來,裡面真正看中硬碟內容大小的,只有一個StoreFileCostFunction,cost的計算方式有些區別,但都是一個0-1之間的數字,下面給出裡面5個函式都用過的cost的函式。

//cost函式
double max = ((count - 1) * mean) + (total - mean);
for (double n : stats) {
        double diff = Math.abs(mean - n);
        totalCost += diff;
}

double scaled =  scale(0, max, totalCost);
return scaled;

//scale函式
protected double scale(double min, double max, double value) {
      if (max == 0 || value == 0) {
        return 0;
      }

      return Math.max(0d, Math.min(1d, (value - min) / max));
}

  經過分析吧,我覺得影響裡面最後cost最大的是它的權重,下面給一下,這些function的預設權重。

RegionCountSkewCostFunction hbase.master.balancer.stochastic.regionCountCost ,預設值500

MoveCostFunction hbase.master.balancer.stochastic.moveCost,預設值是100

localityCost hbase.master.balancer.stochastic.localityCost,預設值是25

TableSkewCostFunction hbase.master.balancer.stochastic.tableSkewCost,預設值是35

ReadRequestCostFunction hbase.master.balancer.stochastic.readRequestCost,預設值是5

WriteRequestCostFunction hbase.master.balancer.stochastic.writeRequestCost,預設值是5

MemstoreSizeCostFunction hbase.master.balancer.stochastic.memstoreSizeCost,預設值是5

StoreFileCostFunction hbase.master.balancer.stochastic.storefileSizeCost,預設值是5
Storefile的預設值是5,那麼低。。。可以試著提高一下這個引數,使它在計算cost消耗的時候,產生更加正向的意義,效果不好說。

4. 根據虛擬的叢集狀態生成RegionPlan,這裡就不說了

List<RegionPlan> plans = createRegionPlans(cluster);

  原始碼的分析完畢,要想減少儲存內容分佈不均勻,可以試著考慮增加一個picker,這樣又不會缺少對其他條件的考慮,具體可以參考LoadPicker,複製它的實現再寫一個,在pickMostLoadedServer和pickLeastLoadedServer這兩個方法裡面把考慮的條件改一下,以前的條件是Integer[] servers = cluster.serverIndicesSortedByRegionCount; 通過這個來查詢一下負載最高和最低的server,那麼現在我們要在Cluster裡面增加一個Server ---> StoreFile大小的關係對映集合,但是這裡面沒有,只有regionLoads,RegionLoad這個類有一個方法getStorefileSizeMB可以獲得StoreFile的大小,我們通過裡面的region和server的對映regionIndexToServerIndex來最後計算出來這個對映關係即可,這個計算對映關係個過程放在Cluster的建構函式裡面。