1. 程式人生 > 其它 >hbase原始碼系列(九)StoreFile儲存格式

hbase原始碼系列(九)StoreFile儲存格式

從這一章開始要講Region Server這塊的了,但是在講Region Server這塊之前得講一下StoreFile,否則後面的不好講下去,這塊是基礎,Region Sever上面的操作,大部分都是基於它來進行的。

HFile概述

HFile是HBase中實際存資料的檔案,為HBase提供高效快速的資料訪問。它是基於Hadoop的TFile,模仿Google Bigtable 架構中的SSTable格式。檔案格式如下:

檔案是變長的,唯一固定的塊是File Info和Trailer,如圖所示,Trailer有指向其它塊的指標,這些指標也寫在了檔案裡,Index塊記錄了data和meta塊的偏移量,meta塊是可選的。

下面我們從原來上來一個一個的看它們到底是啥樣的,先從入口看起,那就是StoreFile.Writer的append方法,先看怎麼寫入的,然後它就怎麼讀了,不知道怎麼使用這個類的,可以看看我寫的這篇文章《非mapreduce生成Hfile,然後匯入hbase當中》。

往HFile追加KeyValue 

不扯這些了,看一下StoreFile裡面的append方法。

    public void append(final KeyValue kv) throws IOException {
      //如果是新的rowkey的value,就追加到Bloomfilter裡面去
      appendGeneralBloomfilter(kv);
      //如果是DeleteFamily、DeleteFamilyVersion型別的kv
      appendDeleteFamilyBloomFilter(kv);
      writer.append(kv);
      //記錄最新的put的時間戳,更新時間戳範圍
      trackTimestamps(kv);
    }

在用writer進行append之前先把kv寫到generalBloomFilterWriter裡面,但是我們發現generalBloomFilterWriter是HFile.Writer裡面的InlineBlockWriter。

generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite(
          conf, cacheConf, bloomType,
          (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
//在createGeneralBloomAtWriter方法發現了以下程式碼
......
CompoundBloomFilterWriter bloomWriter = new CompoundBloomFilterWriter(getBloomBlockSize(conf),
        err, Hash.getHashType(conf), maxFold, cacheConf.shouldCacheBloomsOnWrite(),
        bloomType == BloomType.ROWCOL ? KeyValue.COMPARATOR : KeyValue.RAW_COMPARATOR);
    writer.addInlineBlockWriter(bloomWriter);

我們接下來看HFileWriterV2的append方法吧。

public void append(final KeyValue kv) throws IOException {
    append(kv.getMvccVersion(), kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(),
        kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
    this.maxMemstoreTS = Math.max(this.maxMemstoreTS, kv.getMvccVersion());
}

為什麼貼這段程式碼,注意這個引數maxMemstoreTS,它取kv的mvcc來比較,mvcc是用來實現MemStore的原子性操作的,在MemStore flush的時候同一批次的mvcc都是一樣的,失敗的時候,把mvcc相同的全部幹掉,這裡提一下,以後應該還會說到,繼續追殺append方法。方法比較長,大家展開看看。

private void append(final long memstoreTS, final byte[] key, final int koffset, final int klength,
      final byte[] value, final int voffset, final int vlength)
      throws IOException {
    boolean dupKey = checkKey(key, koffset, klength);
    checkValue(value, voffset, vlength);
    if (!dupKey) {
      //在寫每一個新的KeyValue之間,都要檢查,到了BlockSize就重新寫一個HFileBlock
      checkBlockBoundary();
    }
    //如果當前的fsBlockWriter的狀態不對,就重新寫一個新塊
    if (!fsBlockWriter.isWriting())
      newBlock();

    // 把值寫入到ouputStream當中,怎麼寫入的自己看啊
    {
      DataOutputStream out = fsBlockWriter.getUserDataStream();
      out.writeInt(klength);
      totalKeyLength += klength;
      out.writeInt(vlength);
      totalValueLength += vlength;
      out.write(key, koffset, klength);
      out.write(value, voffset, vlength);
      if (this.includeMemstoreTS) {
        WritableUtils.writeVLong(out, memstoreTS);
      }
    }

    // 記錄每個塊的第一個key 和 上次寫的key
    if (firstKeyInBlock == null) {
      firstKeyInBlock = new byte[klength];
      System.arraycopy(key, koffset, firstKeyInBlock, 0, klength);
    }

    lastKeyBuffer = key;
    lastKeyOffset = koffset;
    lastKeyLength = klength;
    entryCount++;
  }

從上面我們可以看到來,HFile寫入的時候,是分一個塊一個塊的寫入的,每個Block塊64KB左右,這樣有利於資料的隨機訪問,不利於連續訪問,連續訪問需求大的,可以把Block塊的大小設定得大一點。好,我們繼續看checkBlockBoundary方法。

  private void checkBlockBoundary() throws IOException {
    if (fsBlockWriter.blockSizeWritten() < blockSize)
      return;

    finishBlock();
    writeInlineBlocks(false);
    newBlock();
  }

簡單交代一下

1、結束一個block的時候,把block的所有資料寫入到hdfs的流當中,記錄一些資訊到DataBlockIndex(塊的第一個key和上一個塊的key的中間值,塊的大小,塊的起始位置)。

2、writeInlineBlocks(false)給了一個false,是否要關閉,所以現在什麼都沒幹,它要等到最後才會輸出的。

3、newBlock方法就是重置輸出流,做好準備,讀寫下一個塊。

Close的時候

 close的時候就有得忙咯,從之前的圖上面來看,它在最後的時候是最忙的,因為它要寫入一大堆索引資訊、附屬資訊啥的。

public void close() throws IOException {
      boolean hasGeneralBloom = this.closeGeneralBloomFilter();
      boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter();
      writer.close();
}

在呼叫writer的close方法之前,close了兩個BloomFilter,把BloomFilter的型別寫進FileInfo裡面去,把BloomWriter新增到Writer裡面。下面進入正題吧,放大招了,我摺疊吧。。。

public void close() throws IOException {
    if (outputStream == null) {
      return;
    }
    // 經過編碼壓縮的,把編碼壓縮方式寫進FileInfo裡面
    blockEncoder.saveMetadata(this);
    //結束塊
    finishBlock();
    //輸出DataBlockIndex索引的非root層資訊
    writeInlineBlocks(true);

    FixedFileTrailer trailer = new FixedFileTrailer(2,HFileReaderV2.MAX_MINOR_VERSION);

    // 如果有meta塊的存在的話
    if (!metaNames.isEmpty()) {
      for (int i = 0; i < metaNames.size(); ++i) {
        long offset = outputStream.getPos();
        // 輸出meta的內容,它是meta的名字的集合,按照名字排序
        DataOutputStream dos = fsBlockWriter.startWriting(BlockType.META);
        metaData.get(i).write(dos);

        fsBlockWriter.writeHeaderAndData(outputStream);
        totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();

        // 把meta塊的資訊加到meta塊的索引裡
        metaBlockIndexWriter.addEntry(metaNames.get(i), offset,
            fsBlockWriter.getOnDiskSizeWithHeader());
      }
    }

    //下面這部分是開啟檔案的時候就載入的部分,是前面部分的索引
    //HFileBlockIndex的根層次的索引
    long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream);
    trailer.setLoadOnOpenOffset(rootIndexOffset);

    //Meta塊的索引
    metaBlockIndexWriter.writeSingleLevelIndex(fsBlockWriter.startWriting(
        BlockType.ROOT_INDEX), "meta");
    fsBlockWriter.writeHeaderAndData(outputStream);
    totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();

    //如果需要寫入Memstore的最大時間戳到FileInfo裡面
    if (this.includeMemstoreTS) {
      appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS));
      appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE));
    }

    //把FileInfo的起始位置寫入trailer,然後輸出
    writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO));
    fsBlockWriter.writeHeaderAndData(outputStream);
    totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();

    // 輸出GENERAL_BLOOM_META、DELETE_FAMILY_BLOOM_META型別的BloomFilter的資訊
    for (BlockWritable w : additionalLoadOnOpenData){
      fsBlockWriter.writeBlock(w, outputStream);
      totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
    }

    //HFileBlockIndex的二級實體的層次
    trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels());
    //壓縮前的HFileBlockIndex的大小
    trailer.setUncompressedDataIndexSize(
        dataBlockIndexWriter.getTotalUncompressedSize());
    //第一個HFileBlock的起始位置
    trailer.setFirstDataBlockOffset(firstDataBlockOffset);
    //最後一個HFileBlock的起始位置
    trailer.setLastDataBlockOffset(lastDataBlockOffset);
    //比較器的型別
    trailer.setComparatorClass(comparator.getClass());
    //HFileBlockIndex的根實體的數量,應該是和HFileBlock的數量是一樣的
    //它每次都把HFileBlock的第一個key加進去
    trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries());

    //把Trailer的資訊寫入硬碟,關閉輸出流
    finishClose(trailer);

    fsBlockWriter.release();
  }

和圖片上寫的有些出入。

1、輸出HFileBlocks

2、輸出HFileBlockIndex的二級索引(我叫它二級索引,我也不知道對不對,HFileBlockIndex那塊我有點兒忘了,等我再重新除錯的時候再看看吧)

3、如果有的話,輸出MetaBlock

下面的部分是開啟檔案的時候就載入的

4、輸出HFileBlockIndex的根索引

5、如果有的話,輸出MetaBlockIndex的根索引(它比較小,所以只有一層)

6、輸出檔案資訊(FileInfo)

7、輸出檔案尾巴(Trailer)

 Open的時候

這部分打算講一下例項化Reader的時候,根據不同型別的檔案是怎麼例項化Reader的,在StoreFile裡面搜尋open方法。

this.reader = fileInfo.open(this.fs, this.cacheConf, dataBlockEncoder.getEncodingInCache());

 // 載入檔案資訊到map裡面去,後面部分就不展開講了
metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo());

我們進入F3進入fileInfo.open這個方法裡面去。

    FSDataInputStreamWrapper in;
    FileStatus status;

    if (this.link != null) {
      // HFileLink
      in = new FSDataInputStreamWrapper(fs, this.link);
      status = this.link.getFileStatus(fs);
    } else if (this.reference != null) {
      // HFile Reference 反向計算出來引用所指向的位置的HFile位置
      Path referencePath = getReferredToFile(this.getPath());
      in = new FSDataInputStreamWrapper(fs, referencePath);
      status = fs.getFileStatus(referencePath);
    } else {
      in = new FSDataInputStreamWrapper(fs, this.getPath());
      status = fileStatus;
    }
    long length = status.getLen();
    if (this.reference != null) {
      hdfsBlocksDistribution = computeRefFileHDFSBlockDistribution(fs, reference, status);
      //如果是引用的話,建立一個一半的reader
      return new HalfStoreFileReader(
          fs, this.getPath(), in, length, cacheConf, reference, dataBlockEncoding);
    } else {
      hdfsBlocksDistribution = FSUtils.computeHDFSBlocksDistribution(fs, status, 0, length);
      return new StoreFile.Reader(fs, this.getPath(), in, length, cacheConf, dataBlockEncoding);
    }

它一上來就判斷它是不是HFileLink是否為空了,這是啥情況?找了一下,原來在StoreFile的建構函式的時候,就開始判斷了。

this.fileStatus = fileStatus;
    Path p = fileStatus.getPath();
    if (HFileLink.isHFileLink(p)) {
      // HFileLink 被判斷出來它是HFile
      this.reference = null;
      this.link = new HFileLink(conf, p);
    } else if (isReference(p)) {
      this.reference = Reference.read(fs, p);
      //關聯的地址也可能是一個HFileLink,snapshot的時候介紹了
      Path referencePath = getReferredToFile(p);
      if (HFileLink.isHFileLink(referencePath)) {
        // HFileLink Reference 如果它是一個HFileLink型的
        this.link = new HFileLink(conf, referencePath);
      } else {
        // 只是引用
        this.link = null;
      }
    } else if (isHFile(p)) {
      // HFile
      this.reference = null;
      this.link = null;
    } else {
      throw new IOException("path=" + p + " doesn't look like a valid StoreFile");
    }

它有4種情況:

1、HFileLink

2、既是HFileLink又是Reference檔案

3、只是Reference檔案

4、HFile

 說HFileLink吧,我們看看它的建構函式

public HFileLink(final Path rootDir, final Path archiveDir, final Path path) {
    Path hfilePath = getRelativeTablePath(path);
    this.tempPath = new Path(new Path(rootDir, HConstants.HBASE_TEMP_DIRECTORY), hfilePath);
    this.originPath = new Path(rootDir, hfilePath);
    this.archivePath = new Path(archiveDir, hfilePath);
    setLocations(originPath, tempPath, archivePath);
}

尼瑪,它計算了三個地址,原始位置,archive中的位置,臨時目錄的位置,按照順序新增到一個locations數組裡面。。接著看FSDataInputStreamWrapper吧,下面是三段程式碼

this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
//走的link.open(hfs)
new FSDataInputStream(new FileLinkInputStream(fs, this));
//注意tryOpen方法
public FileLinkInputStream(final FileSystem fs, final FileLink fileLink, int bufferSize)
        throws IOException {
      this.bufferSize = bufferSize;
      this.fileLink = fileLink;
      this.fs = fs;
      this.in = tryOpen();
}

tryOpen的方法,會按順序開啟多個locations列表。。

for (Path path: fileLink.getLocations()) {
        if (path.equals(currentPath)) continue;
        try {
          in = fs.open(path, bufferSize);
          in.seek(pos);
          assert(in.getPos() == pos) : "Link unable to seek to the right position=" + pos;
          if (LOG.isTraceEnabled()) {
            if (currentPath != null) {
              LOG.debug("link open path=" + path);
            } else {
              LOG.trace("link switch from path=" + currentPath + " to path=" + path);
            }
          }
          currentPath = path;
          return(in);
        } catch (FileNotFoundException e) {
          // Try another file location
        }
}

恩,這回終於知道它是怎麼出來的了,原來是嘗試打開了三次,直到找到正確的位置。

StoreFile的檔案格式到這裡就結束了,有點兒遺憾的是HFileBlockIndex沒給大家講清楚。

補充:經網友"東岸往事"的提醒,有一個地方寫錯了,在結束一個塊之後,會把它所有的BloomFilter全部輸出,HFileBlockIndex的話,如果滿了預設的128*1024個就輸出二級索引。

具體的的內容在後面說查詢的時候會說,下面先交代一下:

通過看繼承InlineBlockWriter的類,發現了以下資訊

1、BlockIndexWriter 不是關閉的情況下,沒有超過預設值128*1024是不會輸出的,每128*1024個HFileBlock 1個二級索引。

HFileBlockIndex包括2層,如果是MetaBlock的HFileBlock是1層。

二級索引 curInlineChunk 在結束了一個塊之後新增一個索引的key(上一個塊的firstKey和這個塊的firstKey的中間值)。

byte[] indexKey = comparator.calcIndexKey(lastKeyOfPreviousBlock, firstKeyInBlock);
curInlineChunk.add(firstKey, blockOffset, blockDataSize);

一級索引 rootChunk 輸出一次二級索引之後新增每個HFileBlock的第一個key,這樣子其實二級索引裡面是包括是一級索引的所有key的。

firstKey = curInlineChunk.getBlockKey(0); 
rootChunk.add(firstKey, offset, onDiskSize, totalNumEntries);

2、CompoundBloomFilterWriter也就是Bloom Filter,在資料不為空的時候,就會輸出。

對於HFileV2的正確的圖,應該是下面這個,但是上面的那個圖看起來好看一點,就保留了。