1. 程式人生 > >spark 讀取ORC檔案時間太長(計算Partition時間太長)且產出orc單個檔案中stripe個數太多問題解決方案

spark 讀取ORC檔案時間太長(計算Partition時間太長)且產出orc單個檔案中stripe個數太多問題解決方案

1、背景:

    控制上游檔案個數每天7000個,每個檔案大小小於256M,50億條+,orc格式。檢視每個檔案的stripe個數,500個左右,查詢命令:

hdfs fsck viewfs://hadoop/nn01/warehouse/…….db/……/partition_date=2017-11-11/part-06999 -files -blocks;

stripe個數檢視命令:

hive --orcfiledump viewfs://hadoop/nn01/warehouse/…….db/table/partition_date=2017-11-11/part-06999 | less

2、問題出現:

    通過Spark SQL讀取orc格式檔案,從spark作業提交到計算出Partition,開始執行Task,間隔時間太長。

    頻繁列印如下日誌:

17/11/11 03:52:01 INFO BlockManagerMasterEndpoint: Registering block manager gh-data-hdp-dn0640.---:11942 with 6.1 GB RAM, BlockManagerId(554, ----, 11942)
17/11/11 03:52:29 INFO DFSClient: Firstly choose dn: DatanodeInfoWithStorage[10.20.--.--:50010,DS-32f8aaa5-c6ce-48a9-a2b1-3b169df193b9,DISK], --
17/11/11 03:52:29 INFO DFSClient: Firstly choose dn: 

    問題抽象:如果執行如下簡單SQL 也會出現作業提交後ApplicationMaster(Driver)啟動了,作業Task遲遲不執行,Partition不能計算出來。SparkUI刷不出來DAU圖,看不到Stage相關資訊。

SELECT * from table where partition_date=2017-11-11 limit 1;

3、問題分析

    初步分析:Driver讀取DataNode的資料,通過分析GC日誌發現:確認Driver讀取了DataNode上的資料(orc檔案的head資訊),導致Driver產生了full GC。

原始碼跟蹤分析:發現和spark讀取orc檔案的策略有關係。

檢視HiveConf.java發現Spark讀取orc檔案預設採用HYBRID策略。

HIVE_ORC_SPLIT_STRATEGY("hive.exec.orc.split.strategy", "HYBRID", new StringSet(new String[]{"HYBRID", "BI", "ETL"}),
 "This is not a user level config. BI strategy is used when the requirement is to spend less time in split generation as opposed 
to query execution (split generation does not read or cache file footers). ETL strategy is used when spending little more time in 
split generation is acceptable (split generation reads and caches file footers). HYBRID chooses between the above strategies based 
on heuristics."),

檢視OrcInputFormat.java檔案發現HYBRID切分策略程式碼如下:

 public SplitStrategy call() throws IOException {
    final SplitStrategy splitStrategy;
    AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir,
        context.conf, context.transactionList);
    List<Long> deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories());
    Path base = dirInfo.getBaseDirectory();
    List<FileStatus> original = dirInfo.getOriginalFiles();
    boolean[] covered = new boolean[context.numBuckets];
    boolean isOriginal = base == null;
    // if we have a base to work from
    if (base != null || !original.isEmpty()) {
      // find the base files (original or new style)
      List<FileStatus> children = original;
      if (base != null) {
        children = SHIMS.listLocatedStatus(fs, base,
            AcidUtils.hiddenFileFilter);
      }
      long totalFileSize = 0;
      for (FileStatus child : children) {
        totalFileSize += child.getLen();
        AcidOutputFormat.Options opts = AcidUtils.parseBaseBucketFilename
            (child.getPath(), context.conf);
        int b = opts.getBucket();
        // If the bucket is in the valid range, mark it as covered.
        // I wish Hive actually enforced bucketing all of the time.
        if (b >= 0 && b < covered.length) {
          covered[b] = true;
        }
      }
      int numFiles = children.size();
      long avgFileSize = totalFileSize / numFiles;
      switch(context.splitStrategyKind) {
        case BI:
          // BI strategy requested through config
          splitStrategy = new BISplitStrategy(context, fs, dir, children, isOriginal,
              deltas, covered);
          break;
        case ETL:
          // ETL strategy requested through config
          splitStrategy = new ETLSplitStrategy(context, fs, dir, children, isOriginal,
              deltas, covered);
          break;
        default:
          // HYBRID strategy
          if (avgFileSize > context.maxSize) {
            splitStrategy = new ETLSplitStrategy(context, fs, dir, children, isOriginal, deltas,
                covered);
          } else {
            splitStrategy = new BISplitStrategy(context, fs, dir, children, isOriginal, deltas,
                covered);
          }
          break;
      }
    } else {
      // no base, only deltas
      splitStrategy = new ACIDSplitStrategy(dir, context.numBuckets, deltas, covered);
    }
    return splitStrategy;
  }
}

HYBRID策略:Spark Driver啟動的時候,會去nameNode讀取元資料,根據檔案總大小和檔案個數計算一個檔案的平均大小,如果這個平均值大於預設256M的時候就會觸發ETL策略。ETL策略就會去DataNode上讀取orc檔案的head等資訊,如果stripe個數多或元資料資訊太大就會導致Driver 產生FUll GC,這個時候就會表現為Driver啟動到Task執行間隔時間太久的現象。

4、解決方案:

spark 1.6.2:

val hiveContext = new HiveContext(sc)
// 預設64M,即代表在壓縮前資料量累計到64M就會產生一個stripe。與之對應的hive.exec.orc.default.row.index.stride=10000可以控制有多少行是產生一個stripe。
// 調整這個引數可控制單個檔案中stripe的個數,不配置單個檔案stripe過多,影響下游使用,如果配置了ETL切分策略或啟發式觸發了ETL切分策略,就會使得Driver讀取DataNode元資料太大,進而導致頻繁GC,使得計算Partition的時間太長難以接受。
hiveContext.setConf("hive.exec.orc.default.stripe.size","268435456")
// 總共有三種策略{"HYBRID", "BI", "ETL"}), 預設是"HYBRID","This is not a user level config. BI strategy is used when the requirement is to spend less time in split generation as opposed to query execution (split generation does not read or cache file footers). ETL strategy is used when spending little more time in split generation is acceptable (split generation reads and caches file footers). HYBRID chooses between the above strategies based on heuristics."),
// 如果不配置,當orc檔案大小大於spark框架估算的平均值256M時,會觸發ETL策略,導致Driver讀取DataNode資料切分split花費大量的時間。
hiveContext.setConf("hive.exec.orc.split.strategy", "BI")

spark2.2.0:

// 建立一個支援Hive的SparkSession
val sparkSession = SparkSession
  .builder()
  .appName("PvMvToBase")
  // 預設64M,即代表在壓縮前資料量累計到64M就會產生一個stripe。與之對應的hive.exec.orc.default.row.index.stride=10000可以控制有多少行是產生一個stripe。
  // 調整這個引數可控制單個檔案中stripe的個數,不配置單個檔案stripe過多,影響下游使用,如果配置了ETL切分策略或啟發式觸發了ETL切分策略,就會使得Driver讀取DataNode元資料太大,進而導致頻繁GC,使得計算Partition的時間太長難以接受。
  .config("hive.exec.orc.default.stripe.size", 268435456L)
  // 總共有三種策略{"HYBRID", "BI", "ETL"}), 預設是"HYBRID","This is not a user level config. BI strategy is used when the requirement is to spend less time in split generation as opposed to query execution (split generation does not read or cache file footers). ETL strategy is used when spending little more time in split generation is acceptable (split generation reads and caches file footers). HYBRID chooses between the above strategies based on heuristics."),
  // 如果不配置,當orc檔案大小大於spark框架估算的平均值256M時,會觸發ETL策略,導致Driver讀取DataNode資料切分split花費大量的時間。
  .config("hive.exec.orc.split.strategy", "BI")
  .enableHiveSupport()
  .getOrCreate()