1. 程式人生 > 其它 >Carbondata原始碼系列(一)檔案生成過程

Carbondata原始碼系列(一)檔案生成過程

在滴滴的兩年一直在加班,人也變懶了,就很少再寫部落格了,最近在進行Carbondata和hive整合方面的工作,於是乎需要對Carbondata進行深入的研究。

於是新開一個系列,記錄自己學習Carbondata的點點滴滴。

1、環境準備

當前版本是1.2.0-SNAPSHOT

git clone https://github.com/apache/carbondata.git

先用IDEA開啟carbondata的程式碼,點選上方的View -> Tool Windows -> Maven Projects, 先勾選一下需要的profile和編譯format工程,如下圖所示: 

2、探尋程式碼入口

我們先開啟入口類CarbonDataFrameWriter,找到writeToCarbonFile這個方法

  private def writeToCarbonFile(parameters: Map[String, String] = Map()): Unit = {
    val options = new CarbonOption(parameters)
    val cc = CarbonContext.getInstance(dataFrame.sqlContext.sparkContext)
    if (options.tempCSV) {
      loadTempCSV(options, cc)
    } else {
      loadDataFrame(options, cc)
    }
  }

它有兩個方式,loadTempCSV和loadDataFrame。

loadTempCSV是先生成CSV檔案,再呼叫LOAD DATA INPATH...的命令匯入資料。

這裡我們之研究loadDataFrame這種直接生成資料的方式。

一路點進去,目標落在carbonTableSchema的LoadTable的run方法裡,接著就是洋洋灑灑的二百行的set程式碼。它是核心其實是構造一個CarbonLoadModel類。

      val carbonLoadModel = new CarbonLoadModel()
      carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
      carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
      carbonLoadModel.setStorePath(relation.tableMeta.storePath)

      val table = relation.tableMeta.carbonTable
      carbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray)
      carbonLoadModel.setTableName(table.getFactTableName)
      val dataLoadSchema = new CarbonDataLoadSchema(table)
      // Need to fill dimension relation
      carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)

這些程式碼為了Load一個文字檔案準備的,如果是用dataframe的方式則不需要看了。直接略過,直接調到if (carbonLoadModel.getUseOnePass)這一句。

這個跟字典的生成方式有關,這個值預設是false,先忽略true的過程吧,看主流程就行,下面這哥倆才是我們要找的。

// 生成字典檔案          
GlobalDictionaryUtil
            .generateGlobalDictionary(
              sparkSession.sqlContext,
              carbonLoadModel,
              relation.tableMeta.storePath,
              dictionaryDataFrame)
 // 生成資料檔案      
CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
            carbonLoadModel,
            relation.tableMeta.storePath,
            columnar,
            partitionStatus,
            None,
            loadDataFrame,
            updateModel) 

3、欄位生成過程

先看GlobalDictionaryUtil.generateGlobalDictionary方法

      if (StringUtils.isEmpty(allDictionaryPath)) {
        LOGGER.info("Generate global dictionary from source data files!")
        // load data by using dataSource com.databricks.spark.csv
        var df = dataFrame.getOrElse(loadDataFrame(sqlContext, carbonLoadModel))
        var headers = carbonLoadModel.getCsvHeaderColumns
        headers = headers.map(headerName => headerName.trim)
        val colDictFilePath = carbonLoadModel.getColDictFilePath
        if (colDictFilePath != null) {
          // generate predefined dictionary
          generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier,
            dimensions, carbonLoadModel, sqlContext, storePath, dictfolderPath)
        }
        if (headers.length > df.columns.length) {
          val msg = "The number of columns in the file header do not match the " +
                    "number of columns in the data file; Either delimiter " +
                    "or fileheader provided is not correct"
          LOGGER.error(msg)
          throw new DataLoadingException(msg)
        }
        // use fact file to generate global dict
        val (requireDimension, requireColumnNames) = pruneDimensions(dimensions,
          headers, df.columns)
        if (requireDimension.nonEmpty) {
          // select column to push down pruning
          df = df.select(requireColumnNames.head, requireColumnNames.tail: _*)
          val model = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier,
            requireDimension, storePath, dictfolderPath, false)
          // combine distinct value in a block and partition by column
          val inputRDD = new CarbonBlockDistinctValuesCombineRDD(df.rdd, model)
            .partitionBy(new ColumnPartitioner(model.primDimensions.length))
          // generate global dictionary files
          val statusList = new CarbonGlobalDictionaryGenerateRDD(inputRDD, model).collect()
          // check result status
          checkStatus(carbonLoadModel, sqlContext, model, statusList)
        } else {
          LOGGER.info("No column found for generating global dictionary in source data files")
        }
      } else {
        generateDictionaryFromDictionaryFiles(sqlContext,
          carbonLoadModel,
          storePath,
          carbonTableIdentifier,
          dictfolderPath,
          dimensions,
          allDictionaryPath)
      }

包含了兩種情況:不存在字典檔案和已存在欄位檔案。

先看不存在的情況

        // use fact file to generate global dict
        val (requireDimension, requireColumnNames) = pruneDimensions(dimensions,
          headers, df.columns)
        if (requireDimension.nonEmpty) {
          // 只選取標記為字典的維度列
          df = df.select(requireColumnNames.head, requireColumnNames.tail: _*)
          val model = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier,
            requireDimension, storePath, dictfolderPath, false)
          // 去重之後按列分割槽
          val inputRDD = new CarbonBlockDistinctValuesCombineRDD(df.rdd, model)
            .partitionBy(new ColumnPartitioner(model.primDimensions.length))
          // 生成全域性欄位檔案
          val statusList = new CarbonGlobalDictionaryGenerateRDD(inputRDD, model).collect()
          // check result status
          checkStatus(carbonLoadModel, sqlContext, model, statusList)
        } else {
          LOGGER.info("No column found for generating global dictionary in source data files")
        }

先從原始檔當中讀取所有維度列,去重之後按列分割槽,然後輸出,具體輸出的過程請看CarbonGlobalDictionaryGenerateRDD的internalCompute方法。

          val dictWriteTask = new DictionaryWriterTask(valuesBuffer,
            dictionaryForDistinctValueLookUp,
            model.table,
            model.columnIdentifier(split.index),
            model.hdfsLocation,
            model.primDimensions(split.index).getColumnSchema,
            model.dictFileExists(split.index)
          )
          // execute dictionary writer task to get distinct values
          val distinctValues = dictWriteTask.execute()
          val dictWriteTime = System.currentTimeMillis() - t3
          val t4 = System.currentTimeMillis()
          // if new data came than rewrite sort index file
          if (distinctValues.size() > 0) {
            val sortIndexWriteTask = new SortIndexWriterTask(model.table,
              model.columnIdentifier(split.index),
              model.primDimensions(split.index).getDataType,
              model.hdfsLocation,
              dictionaryForDistinctValueLookUp,
              distinctValues)
            sortIndexWriteTask.execute()
          }
          val sortIndexWriteTime = System.currentTimeMillis() - t4
          CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordDicShuffleAndWriteTime()
          // After sortIndex writing, update dictionaryMeta
          dictWriteTask.updateMetaData()

字典檔案在表目錄的下的Metadata目錄下,它需要生成三種檔案

1、欄位檔案,命令方式為 列ID.dict

2、sort index檔案,命令方式為 列ID.sortindex

3、字典列的meta資訊,命令方式為 列ID.dictmeta

4、資料生成過程

請開啟CarbonDataRDDFactory,找到loadCarbonData這個方法,方法裡面包括了從load命令和從dataframe載入的兩種方式,程式碼看起來是有點兒又長又臭的感覺。我們只關注loadDataFrame的方式就好。

      def loadDataFrame(): Unit = {
        try {
          val rdd = dataFrame.get.rdd
      // 獲取資料的位置
          val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]]{ p =>
            DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
          }.distinct.size
      // 確保executor數量要和資料的節點數一樣多
          val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(nodeNumOfData,
            sqlContext.sparkContext)
          val newRdd = new DataLoadCoalescedRDD[Row](rdd, nodes.toArray.distinct)
      // 生成資料檔案
          status = new NewDataFrameLoaderRDD(sqlContext.sparkContext,
            new DataLoadResultImpl(),
            carbonLoadModel,
            currentLoadCount,
            tableCreationTime,
            schemaLastUpdatedTime,
            newRdd).collect()

        } catch {
          case ex: Exception =>
            LOGGER.error(ex, "load data frame failed")
            throw ex
        }
      }

開啟NewDataFrameLoaderRDD類,檢視internalCompute方法,這個方法的核心是這句話

new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders.toArray)

開啟DataLoadExecutor,execute方法裡面的核心是DataLoadProcessBuilder的build方法,根據表不同的引數設定,DataLoadProcessBuilder的build過程會有一些不同

  public AbstractDataLoadProcessorStep build(CarbonLoadModel loadModel, String storeLocation,
      CarbonIterator[] inputIterators) throws Exception {
    CarbonDataLoadConfiguration configuration = createConfiguration(loadModel, storeLocation);
    SortScopeOptions.SortScope sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
    if (!configuration.isSortTable() || sortScope.equals(SortScopeOptions.SortScope.NO_SORT)) {
      // 沒有排序列或者carbon.load.sort.scope設定為NO_SORT的
      return buildInternalForNoSort(inputIterators, configuration);
    } else if (configuration.getBucketingInfo() != null) {
      // 設定了Bucket的表
      return buildInternalForBucketing(inputIterators, configuration);
    } else if (sortScope.equals(SortScopeOptions.SortScope.BATCH_SORT)) {
      // carbon.load.sort.scope設定為BATCH_SORT
      return buildInternalForBatchSort(inputIterators, configuration);
    } else {
      return buildInternal(inputIterators, configuration);
    }
  }

下面僅介紹標準的匯入過程buildInternal:

  private AbstractDataLoadProcessorStep buildInternal(CarbonIterator[] inputIterators,
      CarbonDataLoadConfiguration configuration) {
    // 1. Reads the data input iterators and parses the data.
    AbstractDataLoadProcessorStep inputProcessorStep =
        new InputProcessorStepImpl(configuration, inputIterators);
    // 2. Converts the data like dictionary or non dictionary or complex objects depends on
    // data types and configurations.
    AbstractDataLoadProcessorStep converterProcessorStep =
        new DataConverterProcessorStepImpl(configuration, inputProcessorStep);
    // 3. Sorts the data by SortColumn
    AbstractDataLoadProcessorStep sortProcessorStep =
        new SortProcessorStepImpl(configuration, converterProcessorStep);
    // 4. Writes the sorted data in carbondata format.
    return new DataWriterProcessorStepImpl(configuration, sortProcessorStep);
  }

主要是分4個步驟:

1、讀取資料,並進行格式轉換,這一步驟是讀取csv檔案服務的,dataframe的資料格式都已經處理過了

2、根據欄位的資料型別和配置,替換掉字典列的值;非字典列會被替換成byte陣列

3、按照Sort列進行排序

4、把資料用Carbondata的格式輸出

下面我們從第二步DataConverterProcessorStepImpl開始說起,在getIterator方法當中,會發現每一個CarbonRowBatch都要經過localConverter的convert方法轉換,localConverter中只有RowConverterImpl一個轉換器。

RowConverterImpl由很多的FieldConverter組成,在initialize方法中可以看到它是由FieldEncoderFactory的createFieldEncoder方法生成的。

  public FieldConverter createFieldEncoder(DataField dataField,
      Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
      CarbonTableIdentifier carbonTableIdentifier, int index, String nullFormat,
      DictionaryClient client, Boolean useOnePass, String storePath, boolean tableInitialize,
      Map<Object, Integer> localCache, boolean isEmptyBadRecord)
      throws IOException {
    // Converters are only needed for dimensions and measures it return null.
    if (dataField.getColumn().isDimension()) {
      if (dataField.getColumn().hasEncoding(Encoding.DIRECT_DICTIONARY) &&
          !dataField.getColumn().isComplex()) {
        return new DirectDictionaryFieldConverterImpl(dataField, nullFormat, index,
            isEmptyBadRecord);
      } else if (dataField.getColumn().hasEncoding(Encoding.DICTIONARY) &&
          !dataField.getColumn().isComplex()) {
        return new DictionaryFieldConverterImpl(dataField, cache, carbonTableIdentifier, nullFormat,
            index, client, useOnePass, storePath, tableInitialize, localCache, isEmptyBadRecord);
      } else if (dataField.getColumn().isComplex()) {
        return new ComplexFieldConverterImpl(
            createComplexType(dataField, cache, carbonTableIdentifier,
                    client, useOnePass, storePath, tableInitialize, localCache), index);
      } else {
        return new NonDictionaryFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord);
      }
    } else {
      return new MeasureFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord);
    }
  }

從這段程式碼當中可以看出來,它是分成了幾種型別的

1、維度型別,編碼方式為Encoding.DIRECT_DICTIONARY的非複雜列,採用DirectDictionaryFieldConverterImpl (主要是TIMESTAMP和DATE型別),換算成值和基準時間的差值

2、維度型別,編碼方式為Encoding.DICTIONARY的非複雜列,採用DictionaryFieldConverterImpl (非高基數的欄位型別),把欄位換成字典中的key(int型別)

3、維度型別,複雜列,採用ComplexFieldConverterImpl  (複雜欄位型別,Sturct和Array型別),把欄位轉成二進位制

4、維度型別,高基數列,採用NonDictionaryFieldConverterImpl,原封不動,原來是啥樣,現在還是啥樣

5、指標型別,採用MeasureFieldConverterImpl (值型別,float、double、int、bigint、decimal等),原封不動,原來是啥樣,現在還是啥樣

第三步SortProcessorStepImpl,關鍵點在SorterFactory.createSorter是怎麼實現的

  public static Sorter createSorter(CarbonDataLoadConfiguration configuration, AtomicLong counter) {
    boolean offheapsort = Boolean.parseBoolean(CarbonProperties.getInstance()
        .getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
            CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT));
    SortScopeOptions.SortScope sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
    Sorter sorter;
    if (offheapsort) {
      if (configuration.getBucketingInfo() != null) {
        sorter = new UnsafeParallelReadMergeSorterWithBucketingImpl(configuration.getDataFields(),
            configuration.getBucketingInfo());
      } else {
        sorter = new UnsafeParallelReadMergeSorterImpl(counter);
      }
    } else {
      if (configuration.getBucketingInfo() != null) {
        sorter =
            new ParallelReadMergeSorterWithBucketingImpl(counter, configuration.getBucketingInfo());
      } else {
        sorter = new ParallelReadMergeSorterImpl(counter);
      }
    }
    if (sortScope.equals(SortScopeOptions.SortScope.BATCH_SORT)) {
      if (configuration.getBucketingInfo() == null) {
        sorter = new UnsafeBatchParallelReadMergeSorterImpl(counter);
      } else {
        LOGGER.warn(
            "Batch sort is not enabled in case of bucketing. Falling back to " + sorter.getClass()
                .getName());
      }
    }
    return sorter;
  }

居然還可以使用堆外記憶體sort,設定enable.unsafe.sort為true就可以開啟了。我們看預設的ParallelReadMergeSorterImpl吧。

超過100000條記錄就要把資料排序,然後生成一個檔案,檔案數超過20個檔案之後,就要做一次檔案合併。

規則在NewRowComparator和NewRowComparatorForNormalDims當中

相關引數:

carbon.sort.size 100000

carbon.sort.intermediate.files.limit 20

到最後一步了,開啟DataWriterProcessorStepImpl類,它是通過CarbonFactHandlerFactory.createCarbonFactHandler生成一個CarbonFactHandler,通過CarbonFactHandler的addDataToStore方法處理CarbonRow

addDataToStore的實現很簡單,當row的數量達到一個blocklet的大小之後,就往執行緒池裡提交一個非同步的任務Producer進行處理

  public void addDataToStore(CarbonRow row) throws CarbonDataWriterException {
    dataRows.add(row);
    this.entryCount++;
    // if entry count reaches to leaf node size then we are ready to write
    // this to leaf node file and update the intermediate files
    if (this.entryCount == this.blockletSize) {
      try {
        semaphore.acquire();

        producerExecutorServiceTaskList.add(
            producerExecutorService.submit(
                new Producer(blockletDataHolder, dataRows, ++writerTaskSequenceCounter, false)
            )
        );
        blockletProcessingCount.incrementAndGet();
        // set the entry count to zero
        processedDataCount += entryCount;
        LOGGER.info("Total Number Of records added to store: " + processedDataCount);
        dataRows = new ArrayList<>(this.blockletSize);
        this.entryCount = 0;
      } catch (InterruptedException e) {
        LOGGER.error(e, e.getMessage());
        throw new CarbonDataWriterException(e.getMessage(), e);
      }
    }
  }

這裡用到了生產者消費者的模式,Producer的處理是多執行緒的,Consumer是單執行緒的;Producer主要是負責資料的壓縮,Consumer負責進行輸出,資料的交換通過blockletDataHolder。

相關引數:

carbon.number.of.cores.while.loading 2 (Producer的執行緒數)

number.of.rows.per.blocklet.column.page 32000

檔案生成主要包含以上過程,限於文章篇幅,下一章再繼續接著寫Carbondata的資料檔案格式細節。