1. 程式人生 > >XGBoost原始碼分析之單機多執行緒的實現

XGBoost原始碼分析之單機多執行緒的實現

上篇文章主要通過論文閱讀、數學推導,基本掌握了XGBoost的原理。於是開始閱讀XGBoost原始碼,並總結了幾處自己認為比較重要的方面。如有錯誤,請指正:

1. 總體框架:

cli_main.cc 是程式的入口,main函式所在的檔案。除了有main函式以外,還有訓練引數的結構體。如模型儲存路徑,資料路徑,迭代次數等。這個原始碼註釋的很清楚,不再贅述。

通過呼叫main() -> CLIRunTask() -> CLITrain(),這裡我們主要看函式CLITrain()的流程

CLITrain主要分了幾步:

1.載入資料

2.初始化learner

3.呼叫learner->InitModel()、learner->Configure初始化

Model,確定目標函式和模型,初始化目標函式結構體和模型

4.根據模型引數param.num_round迭代呼叫UpdateOneIter()來建樹

2.learner

上一節,在learner中初始化目標函式和模型,主要賦值給了這兩個變數。

/include/xgboost/learner.h

  /*! \brief objective function */
  std::unique_ptr<ObjFunction> obj_;
  /*! \brief The gradient booster used by the model*/
  std::unique_ptr<GradientBooster> gbm_;

ObjFunction是基類,定義了很多虛擬函式。類RegLossObj等都繼承於此類,主要實現了根據不同的模型和loss,將一階二階導數計算出來。

以線性迴歸模型為例

src\objective\regression_obj.cc

定義平方損失函式:

// linear regression
struct LinearSquareLoss {
  static bst_float PredTransform(bst_float x) { return x; }
  static bool CheckLabel(bst_float x) { return true; }
  static bst_float FirstOrderGradient(bst_float predt, bst_float label) { return predt - label; }
  static bst_float SecondOrderGradient(bst_float predt, bst_float label) { return 1.0f; }
  static bst_float ProbToMargin(bst_float base_score) { return base_score; }
  static const char* LabelErrorMsg() { return ""; }
  static const char* DefaultEvalMetric() { return "rmse"; }
}


class RegLossObj : public ObjFunction{
...
void GetGradient(const std::vector<bst_float> &preds,
                   const MetaInfo &info,
                   int iter,
                   std::vector<bst_gpair> *out_gpair) override {
...
    out_gpair->resize(preds.size());
    // check if label in range
    bool label_correct = true;
    // start calculating gradient
    const omp_ulong ndata = static_cast<omp_ulong>(preds.size());
    #pragma omp parallel for schedule(static)
    for (omp_ulong i = 0; i < ndata; ++i) {
      bst_float p = Loss::PredTransform(preds[i]);
      bst_float w = info.GetWeight(i);
      if (info.labels[i] == 1.0f) w *= param_.scale_pos_weight;
      if (!Loss::CheckLabel(info.labels[i])) label_correct = false;
      out_gpair->at(i) = bst_gpair(Loss::FirstOrderGradient(p, info.labels[i]) * w,
                                   Loss::SecondOrderGradient(p, info.labels[i]) * w);
    
    }
...
}
可以發現在計算一階導和二階導的時候,採用了並行處理。以一條資料作為一個粒度。

GradientBooster是基類,有兩個模型繼承於此類。XGBoost中除了有Tree模型(GBTree),同時也實現了線性模型(GBLinear)。與梯度下降和牛頓法不同的是,在每次迭代的過程中,每個屬性單獨計算,採用類似於一維的牛頓法來更新一個屬性。這裡不是重點,有興趣的同學可以看\src\gbm\gblinear.cc。


3.UpdateOneIter

這裡可以看出每次迭代的操作,主要有:

  void UpdateOneIter(int iter, DMatrix* train) override {
  
    this->LazyInitDMatrix(train);
    this->PredictRaw(train, &preds_);                                   //獲取上一輪預測值
    obj_->GetGradient(preds_, train->info(), iter, &gpair_);            //計算一階導和二階導
    gbm_->DoBoost(train, &gpair_, obj_.get());                          //建樹
  }

在DoBoost 中確定了輸出維度,呼叫BoostNewTrees構造森林。最後終於呼叫了update在輸出維度不為1時,呼叫BoostNewTrees是並行的。這裡構造不止一棵樹是為了一下一步隨機森林做準備。因為每一次迭代,是分給了多個樹完成,而不是一顆。DoBoost這部分程式碼可以在gbtree.cc中找到,這裡不再贅述。

4.ColMaker

4.1 用於統計的結構體

ColMaker繼承於TreeUpdater,是單機多執行緒建樹的實現過程,也是本文重點記錄的地方。

ColMaker提供兩個函式:Init、Update,分別用於初始化和更新建樹由上層呼叫,同時為了實現多執行緒,定義了三個結構體:ThreadEntry、NodeEntry、Builder。

class ColMaker: public TreeUpdater {
 public:
       void Init(const std::vector<std::pair<std::string, std::string> >& args) override
       void Update(const std::vector<bst_gpair> &gpair,DMatrix* dmat,const std::vector<RegTree*> &trees) override
	// training parameter
       TrainParam param;
        // data structure
       /*! \brief per thread x per node entry to store tmp data */
       struct ThreadEntr
       struct NodeEntry
        // actual builder that runs the algorithm
       struct Builder  
}

首先看一下結構體ThreadEntry

struct ThreadEntry {
    /*! \brief statistics of data */
    TStats stats;
    /*! \brief extra statistics of data */
    TStats stats_extra;
    /*! \brief last feature value scanned */
    bst_float last_fvalue;
    /*! \brief first feature value scanned */
    bst_float first_fvalue;
    /*! \brief current best solution */
    SplitEntry best;
    // constructor
    explicit ThreadEntry(const TrainParam &param)
        : stats(param), stats_extra(param) {
    }

ThreadEntry結構體用於多執行緒中單個執行緒內的樣本統計,一個執行緒擁有一個ThreadEntry 變數。
struct NodeEntry {
    /*! \brief statics for node entry */
    TStats stats;
    /*! \brief loss of this node, without split */
    bst_float root_gain;
    /*! \brief weight calculated related to current data */
    bst_float weight;
    /*! \brief current best solution */
    SplitEntry best;
    // constructor
    explicit NodeEntry(const TrainParam& param)
        : stats(param), root_gain(0.0f), weight(0.0f){
    }

NodeEntry 用於一個樹節點,所擁有的樣本的統計資訊。一個節點在分列前,需要將自己的樣本分給不同的執行緒進行處理,最後得到多個ThreadEntry 變數,彙總到每個節點自己的NodeEntry 變數中。詳細過程如下。

4.2 建樹總體流程

Update -> Builder.Update

struct Builder {
...
   public:
    // constructor
    explicit Builder(const TrainParam& param) : param(param), nthread(omp_get_max_threads()) {}
    // update one tree, growing
    virtual void Update(const std::vector<bst_gpair>& gpair,
                        DMatrix* p_fmat,
                        RegTree* p_tree) {
      this->InitData(gpair, *p_fmat, *p_tree);
      this->InitNewNode(qexpand_, gpair, *p_fmat, *p_tree);
      for (int depth = 0; depth < param.max_depth; ++depth) {
        this->FindSplit(depth, qexpand_, gpair, p_fmat, p_tree);
        this->ResetPosition(qexpand_, p_fmat, *p_tree);
        this->UpdateQueueExpand(*p_tree, &qexpand_);
        this->InitNewNode(qexpand_, gpair, *p_fmat, *p_tree);
        // if nothing left to be expand, break
        if (qexpand_.size() == 0) break;
      }
      // set all the rest expanding nodes to leaf
      for (size_t i = 0; i < qexpand_.size(); ++i) {
        const int nid = qexpand_[i];
        (*p_tree)[nid].set_leaf(snode[nid].weight * param.learning_rate);
      }
      // remember auxiliary statistics in the tree node
      for (int nid = 0; nid < p_tree->param.num_nodes; ++nid) {
        p_tree->stat(nid).loss_chg = snode[nid].best.loss_chg;
        p_tree->stat(nid).base_weight = snode[nid].weight;
        p_tree->stat(nid).sum_hess = static_cast<float>(snode[nid].stats.sum_hess);
        snode[nid].stats.SetLeafVec(param, p_tree->leafvec(nid));
      }
    }
...
}

上述程式碼的流程可以描述為:

初始化變數
初始化節點
For (depth 1:max_depth){
       找分割點,更新變數p_tree,維護樹的增長
       重置樣本所屬節點,更新變數position,主要將樣本從父節點分配到剛分裂出來的葉子節點中
       將葉子預分裂的節點挑出來,更新變數qexpand_,這個變數是為了儲存待處理的新節點
       初始化新節點
}

qexpand_中的節點設定為葉子節點
把佇列裡的節點資訊加到樹模型中

4.3 抽樣

在XGBoost中,為了防止過擬合,有很多地方用到了抽樣。其中一種是通過引數對樣本進行抽樣:

Builder -> Update - >InitData

 // mark subsample
        if (param.subsample < 1.0f) {
          std::bernoulli_distribution coin_flip(param.subsample);
          auto& rnd = common::GlobalRandom();
          for (size_t i = 0; i < rowset.size(); ++i) {
            const bst_uint ridx = rowset[i];
            if (gpair[ridx].hess < 0.0f) continue;
            if (!coin_flip(rnd)) position[ridx] = ~position[ridx];
          }

在XGBoost的設計中,position[ridx]如果小於0,則認為該樣本被刪除。由於loss是凸函式,二階導必定大於0,所以如果小於0,可能在計算過程中出現溢位等異常出現,則不考慮該樣本。所以如果不將樣本加入到計算中,只需取反。在上述程式碼中,是通過伯努利分佈生成隨機數來進行抽樣。

XGBoost還對模型的屬性進行抽樣:

Builder -> Update - > FindSplit

std::vector<bst_uint> feat_set = feat_index;
if (param.colsample_bylevel != 1.0f) {
      std::shuffle(feat_set.begin(), feat_set.end(), common::GlobalRandom());
      unsigned n = static_cast<unsigned>(param.colsample_bylevel * feat_index.size());
      feat_set.resize(n);
}

利用std::shuffle隨機排序取前n來實現列抽樣

4.4 尋找分割點

Builder -> Update - > FindSplit 
    // find splits at current level, do split per level
    inline void FindSplit(int depth,
                          const std::vector<int> &qexpand,
                          const std::vector<bst_gpair> &gpair,
                          DMatrix *p_fmat,
                          RegTree *p_tree) {
      ...
      dmlc::DataIter<ColBatch>* iter = p_fmat->ColIterator(feat_set);
      while (iter->Next()) {
        this->UpdateSolution(iter->Value(), gpair, *p_fmat);
      }
      // after this each thread's stemp will get the best candidates, aggregate results
      this->SyncBestSolution(qexpand);
      // get the best result, we can synchronize the solution
      for (size_t i = 0; i < qexpand.size(); ++i) {
        const int nid = qexpand[i];
        NodeEntry &e = snode[nid];
        // now we know the solution in snode[nid], set split
        if (e.best.loss_chg > rt_eps) {
          p_tree->AddChilds(nid);
          (*p_tree)[nid].set_split(e.best.split_index(), e.best.split_value, e.best.default_left());
          // mark right child as 0, to indicate fresh leaf
          (*p_tree)[(*p_tree)[nid].cleft()].set_leaf(0.0f, 0);
          (*p_tree)[(*p_tree)[nid].cright()].set_leaf(0.0f, 0);
        } else {
          (*p_tree)[nid].set_leaf(e.weight * param.learning_rate);
        }
      }
    }

XGBoost把樣本轉換成了以列為基礎的迭代器。這一部分設計等我整理一下再傳上來。程式碼將資料以列為基礎通過呼叫UpdateSolution計算分割點。為什麼要這麼做呢,這裡已經能想到了,每一列至少交給了一個執行緒去處理。而SyncBestSolution則是將執行緒處理後的資料進行彙總的過程。

而進入UpdateSolution後,如果資料量不大則按屬性進行多執行緒處理了,每個執行緒通過呼叫EnumerateSplit方法按實現。但是當資料量比較大,或者通過引數設定parallel_option,可以在此基礎上進行更細的多執行緒拆分。

通過函式ParallelFindSplit實現:

Builder -> Update - > FindSplit -> ParallelFindSplit

    // parallel find the best split of current fid
    // this function does not support nested functions
    inline void ParallelFindSplit(const ColBatch::Inst &col,
                                  bst_uint fid,
                                  const DMatrix &fmat,
                                  const std::vector<bst_gpair> &gpair) {
      // TODO(tqchen): double check stats order.
      const MetaInfo& info = fmat.info();
      const bool ind = col.length != 0 && col.data[0].fvalue == col.data[col.length - 1].fvalue;
      bool need_forward = param.need_forward_search(fmat.GetColDensity(fid), ind);
      bool need_backward = param.need_backward_search(fmat.GetColDensity(fid), ind);
      const std::vector<int> &qexpand = qexpand_;
      #pragma omp parallel
      {
        const int tid = omp_get_thread_num();
        std::vector<ThreadEntry> &temp = stemp[tid];
        // cleanup temp statistics
        for (size_t j = 0; j < qexpand.size(); ++j) {
          temp[qexpand[j]].stats.Clear();
        }
        bst_uint step = (col.length + this->nthread - 1) / this->nthread;
        bst_uint end = std::min(col.length, step * (tid + 1));
        for (bst_uint i = tid * step; i < end; ++i) {
          const bst_uint ridx = col[i].index;
          const int nid = position[ridx];
          if (nid < 0) continue;
          const bst_float fvalue = col[i].fvalue;
          if (temp[nid].stats.Empty()) {
            temp[nid].first_fvalue = fvalue;
          }
          temp[nid].stats.Add(gpair, info, ridx);
          temp[nid].last_fvalue = fvalue;
        }
      }
      // start collecting the partial sum statistics
      bst_omp_uint nnode = static_cast<bst_omp_uint>(qexpand.size());
      #pragma omp parallel for schedule(static)
      for (bst_omp_uint j = 0; j < nnode; ++j) {
        const int nid = qexpand[j];
        TStats sum(param), tmp(param), c(param);
        for (int tid = 0; tid < this->nthread; ++tid) {
          tmp = stemp[tid][nid].stats;
          stemp[tid][nid].stats = sum;
          sum.Add(tmp);
          if (tid != 0) {
            std::swap(stemp[tid - 1][nid].last_fvalue, stemp[tid][nid].first_fvalue);
          }
        }
        for (int tid = 0; tid < this->nthread; ++tid) {
          stemp[tid][nid].stats_extra = sum;
          ThreadEntry &e = stemp[tid][nid];
          bst_float fsplit;
          if (tid != 0) {
            if (stemp[tid - 1][nid].last_fvalue != e.first_fvalue) {
              fsplit = (stemp[tid - 1][nid].last_fvalue + e.first_fvalue) * 0.5f;
            } else {
              continue;
            }
          } else {
            fsplit = e.first_fvalue - rt_eps;
          }
          if (need_forward && tid != 0) {
            c.SetSubstract(snode[nid].stats, e.stats);
            if (c.sum_hess >= param.min_child_weight &&
                e.stats.sum_hess >= param.min_child_weight) {
              bst_float loss_chg = static_cast<bst_float>(
                  constraints_[nid].CalcSplitGain(param, fid, e.stats, c) - snode[nid].root_gain);
              e.best.Update(loss_chg, fid, fsplit, false);
            }
          }
          if (need_backward) {
            tmp.SetSubstract(sum, e.stats);
            c.SetSubstract(snode[nid].stats, tmp);
            if (c.sum_hess >= param.min_child_weight &&
                tmp.sum_hess >= param.min_child_weight) {
              bst_float loss_chg = static_cast<bst_float>(
                  constraints_[nid].CalcSplitGain(param, fid, tmp, c) - snode[nid].root_gain);
              e.best.Update(loss_chg, fid, fsplit, true);
            }
          }
        }
        if (need_backward) {
          tmp = sum;
          ThreadEntry &e = stemp[this->nthread-1][nid];
          c.SetSubstract(snode[nid].stats, tmp);
          if (c.sum_hess >= param.min_child_weight &&
              tmp.sum_hess >= param.min_child_weight) {
            bst_float loss_chg = static_cast<bst_float>(
                constraints_[nid].CalcSplitGain(param, fid, tmp, c) - snode[nid].root_gain);
            e.best.Update(loss_chg, fid, e.last_fvalue + rt_eps, true);
          }
        }
      }
      // rescan, generate candidate split
      #pragma omp parallel
      {
        TStats c(param), cright(param);
        const int tid = omp_get_thread_num();
        std::vector<ThreadEntry> &temp = stemp[tid];
        bst_uint step = (col.length + this->nthread - 1) / this->nthread;
        bst_uint end = std::min(col.length, step * (tid + 1));
        for (bst_uint i = tid * step; i < end; ++i) {
          const bst_uint ridx = col[i].index;
          const int nid = position[ridx];
          if (nid < 0) continue;
          const bst_float fvalue = col[i].fvalue;
          // get the statistics of nid
          ThreadEntry &e = temp[nid];
          if (e.stats.Empty()) {
            e.stats.Add(gpair, info, ridx);
            e.first_fvalue = fvalue;
          } else {
            // forward default right
            if (fvalue != e.first_fvalue) {
              if (need_forward) {
                c.SetSubstract(snode[nid].stats, e.stats);
                if (c.sum_hess >= param.min_child_weight &&
                    e.stats.sum_hess >= param.min_child_weight) {
                  bst_float loss_chg = static_cast<bst_float>(
                      constraints_[nid].CalcSplitGain(param, fid, e.stats, c) -
                      snode[nid].root_gain);
                  e.best.Update(loss_chg, fid, (fvalue + e.first_fvalue) * 0.5f, false);
                }
              }
              if (need_backward) {
                cright.SetSubstract(e.stats_extra, e.stats);
                c.SetSubstract(snode[nid].stats, cright);
                if (c.sum_hess >= param.min_child_weight &&
                    cright.sum_hess >= param.min_child_weight) {
                  bst_float loss_chg = static_cast<bst_float>(
                      constraints_[nid].CalcSplitGain(param, fid, c, cright) -
                      snode[nid].root_gain);
                  e.best.Update(loss_chg, fid, (fvalue + e.first_fvalue) * 0.5f, true);
                }
              }
            }
            e.stats.Add(gpair, info, ridx);
            e.first_fvalue = fvalue;
          }
        }
      }
    }


1. 上述程式碼將某一屬性的樣本空間的資料進行分塊,塊大小為(col.length + this->nthread - 1) / this->nthread),這一步是並行的,每一塊是一個執行緒,每個執行緒統計屬於自己的樣本的每個節點的一階導和二階導數的和,還需要統計邊緣值(first_fvalue、last_fvalue)

2. 按照塊邊緣,初始化執行緒變數stemp[tid][nid].best,這一步是節點並行,因為需要用到兩個相鄰執行緒所用有的資料塊的邊緣值:stemp[tid - 1][nid].last_fvalue + e.first_fvalue) * 0.5f,所以無法按照塊並行

3. 第三步則是按照資料塊並行,每個執行緒處理自己的資料塊來更新stemp[tid][nid].best,這樣每個執行緒都有一個根據自己樣本計算出來的屬於自己的最優分割點。

4. 最後再根據上文中提到的SyncBestSolution來彙總所有執行緒,最後進行樹的增長、qexpand更新、position更新等操作。

5. 由於迭代器是按照屬性排序的,e.stats.Add負責按順序累加樣本,再用總的減去就是剩下的:c.SetSubstract(snode[nid].stats, tmp),所以計算分割點只需要遍歷一遍樣本即可。

6. constraints_的型別是模板類傳遞進來的,具體分為ValueConstraint和NoConstraint。ValueConstraint具有上下界約束,具體定義參看src\tree\param.h

總結:

本文梳理了XGBoost 在訓練過程的原始碼的流程,通過閱讀,瞭解了XGBoost的執行機制、設計模式為以後自己編寫程式碼提供了寶貴經驗。