1. 程式人生 > 其它 >hbase原始碼系列(十三)快取機制MemStore與Block Cache

hbase原始碼系列(十三)快取機制MemStore與Block Cache

這一章講hbase的快取機制,這裡面涉及的內容也是比較多,呵呵,我理解中的快取是儲存在記憶體中的特定的便於檢索的資料結構就是快取。

之前在講put的時候,put是被新增到Store裡面,這個Store是個介面,實現是在HStore裡面,MemStore其實是它底下的小子。

那它和Region Server、Region是什麼關係?

Region Server下面有若干個Region,每個Region下面有若干的列族,每個列族對應著一個HStore。

HStore裡面有三個很重要的類,在這章的內容都會提到。

protected final MemStore memstore;
private final CacheConfig cacheConf;
final StoreEngine<?, ?, ?, ?> storeEngine;

MemStore是儲存著兩個有序的kv集合,kv進來先寫到裡面,超過閥值之後就會寫入硬碟。

CacheConf是針對HFileBlock的快取,專門用來快取快,預設是在讀的時候快取塊,也可以修改列族的引數,讓它在寫的時候也快取,這個在資料模型定義的時候提到過。

StoreEngine是StoreFile的管理器,它管理著這個列族對應的所有StoreFiles。

1. MemStore

memstore比較有意思,我們先看它的add方法,這個是入口。

long add(final KeyValue kv) {
    this.lock.readLock().lock();
    try {
      KeyValue toAdd = maybeCloneWithAllocator(kv);
      return internalAdd(toAdd);
    } finally {
      this.lock.readLock().unlock();
    }
}

先把kv放到maybeCloneWithAllocator裡面複製出來一個新的kv,然後再走internalAdd的方法,為啥要這麼搞呢?

1.1 MemStoreLAB

先看maybeCloneWithAllocator,我們慢慢看,沒關係。

private KeyValue maybeCloneWithAllocator(KeyValue kv) {
    if (allocator == null) {
      return kv;
    }
    int len = kv.getLength();
    //從allocator當中分配出來len長度的非堆空間
    Allocation alloc = allocator.allocateBytes(len);
    if (alloc == null) {
      // 太大了,allocator決定不給它分配
      return kv;
    }
    //用allocator生成的空間,new一個kv出來
    assert alloc != null && alloc.getData() != null;
    System.arraycopy(kv.getBuffer(), kv.getOffset(), alloc.getData(), alloc.getOffset(), len);
    KeyValue newKv = new KeyValue(alloc.getData(), alloc.getOffset(), len);
    newKv.setMvccVersion(kv.getMvccVersion());
    return newKv;
 }

allocator是何許人也,它是一個MemStoreLAB,它是幹啥的呀,這個讓人很糾結呀?

public Allocation allocateBytes(int size) {
    // 如果申請的size比maxAlloc大,就不分了
    if (size > maxAlloc) {
      return null;
    }

    while (true) {
      Chunk c = getOrMakeChunk();

      // 給它分配個位置,返回陣列的起始位置
      int allocOffset = c.alloc(size);
      if (allocOffset != -1) {
        // 用一個數據結構Allocation來描述這個,它主要包括兩個資訊,1:陣列的引用,2:資料在陣列當中的起始位置
        return new Allocation(c.data, allocOffset);
      }

      // 空間不足了,釋放掉它
      tryRetireChunk(c);
    }
}

下面看看getOrMakeChunk看看是啥情況,挺疑惑的東西。

private Chunk getOrMakeChunk() {
    while (true) {
      // 當前的Chunk不為空,就取當前的
      Chunk c = curChunk.get();
      if (c != null) {
        return c;
      }
      // 這裡還有個Chunk的Pool,預設是沒有的,走的是new Chunk這條路徑
      c = (chunkPool != null) ? chunkPool.getChunk() : new Chunk(chunkSize);
      if (curChunk.compareAndSet(null, c)) {
        // curChunk是為空的話,就設定為c,然後加到chunkQueue裡面
        c.init();
        this.chunkQueue.add(c);
        return c;
      } else if (chunkPool != null) {
        // 先放回去,待會兒再拿出來
        chunkPool.putbackChunk(c);
      }      
    }
}

Chunk是一個持有一個byte[]陣列的資料結構,屬性如下。

static class Chunk {
    /* 實際資料儲存的地方,被不停地分配 */
    private byte[] data;
    private static final int UNINITIALIZED = -1;
    private static final int OOM = -2;
    /* 下一個chunk的起始位置,也是上一個chunk的結束位置 */
    private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED);

    /** 分配給了多少個kv */
    private AtomicInteger allocCount = new AtomicInteger();

    /** Chunk的大小 */
    private final int size;

好吧,我們現在清楚了,它是給每個kv的資料又重新找了個地方混,從註釋上面講這個Chunk未初始化,沒有被分配記憶體,所以開銷小。不太理解這個東西,人家之前也是在byte數組裡面混,只不顧挪了個窩了,莫非是為了減少記憶體碎片?尼瑪,還真被我說中了,在我以前的資料裡面有《調優》

不管怎麼樣吧,把多個小的kv寫到一個連續的數組裡面可能是好點好處吧,下面講一下它的相關引數吧。

/** 可分配的最大值,超過這個值就不給它分配了,預設值是256K */
hbase.hregion.memstore.mslab.max.allocation 預設值是256  * 1024
/** 每個Chunk的大小,預設是2M */
hbase.hregion.memstore.mslab.chunksize 預設值是2048 * 1024

那我們繼續講講這個MemStoreChunkPool吧,它預設是不被開啟的,因為它的引數hbase.hregion.memstore.chunkpool.maxsize預設是0 (只允許輸入0->1的數值),它是通過堆記憶體的最大值*比例來計算得出來的結果。

它可以承受的最大的Chunk的數量是這麼計算的 MaxCount = MemStore記憶體限制 * Chunkpool.Maxsize / Chunksize。

MemStore的記憶體最大最小值分別是0.35 --> 0.4,這個在我之前的部落格裡面也有。

hbase.regionserver.global.memstore.upperLimit 
hbase.regionserver.global.memstore.lowerLimit

還有這個引數hbase.hregion.memstore.chunkpool.initialsize需要設定,預設又是0,輸入0->1的數值,MaxCount乘以它就設定初始的Chunk大小。

沒試過開啟這個Pool效果是否會好,它是依附在MemStore裡面的,它設定過大了,最直接的影響就是,另外兩個集合的空間就小了。

1.2 有序集合

分配完Chunk之後,乾的是這個函式,就是新增到一個有序集合當中kvset。

private long internalAdd(final KeyValue toAdd) {
    long s = heapSizeChange(toAdd, addToKVSet(toAdd));
    //把時間戳範圍加到內部去
    timeRangeTracker.includeTimestamp(toAdd);
    this.size.addAndGet(s);
    return s;
}

MemStore裡面有兩個有序的集合,kvset和snapshot,KeyValueSkipListSet的內部實現是ConcurrentNavigableMap。

volatile KeyValueSkipListSet kvset;
volatile KeyValueSkipListSet snapshot;

它們的排序規則上一章已經說過了,排過序的在搜尋的時候方便查詢,這裡為什麼還有一個snapshot呢?snapshot是一個和它一樣的東西,我們都知道MemStore是要flush到檔案生成StoreFile的,那我不能寫檔案的時候讓別人都沒法讀了吧,那怎麼辦,先把它拷貝到snapshot當中,這個時間很短,複製完了就可以訪問kvset,實際flush的之後,我們flush掉snapshot當中的kv就可以啦。

2. CacheConfig

在看這個之前,先推薦看一下我的另外一篇文章《快取機制以及可以利用SSD作為儲存的BucketCache》,否則後面有很多概念,你看不懂的。

這裡我們主要關注的是LruBlockCache和BucketCache,至於他們的使用,請參照上面的部落格設定,這裡不再介紹哦。

CacheConfig是一個HStore一個,屬性是根據列族定製的,比如是否常駐記憶體,但是它記憶體用來快取塊的BlockCache是Region Server全域性共享的的globalBlockCache,在new一個CacheConfig的時候,它會呼叫instantiateBlockCache方法返回一個BlockCache快取Block的,如果已經存在globalBlockCache,就直接返回,沒有才會重新例項化一個globalBlockCache。

這裡還分堆上記憶體和直接分配的記憶體,堆上的記憶體的引數hfile.block.cache.size預設是0.25。

2.1 DoubleCache

直接分配的記憶體,要通過設定JVM引數-XX:MaxDirectMemorySize來設定,設定了這個之後我們還需要設定hbase.offheapcache.percentage(預設是0)來設定佔直接分配記憶體的比例。

offHeapCacheSize =offheapcache.percentage * DirectMemorySize

這裡我們還真不能設定它,因為如果設定了它的話,它會把new一個DoubleCache出來,它是LruBlockCache和SlabCache的合體,之前我提到的那篇文章裡面說到SlabCache是一個只能存固定大小的Block大小的Cache,比較垃圾。

2.2 LruBlockCache

如果offHeapCacheSize <= 0,就走下面的邏輯,這裡我就簡單陳述一下了,程式碼沒啥可貼的。

LruBlockCache和BucketCache的合作方式有兩種,一種是BucketCache作為二級快取使用,比如SSD,一種是在記憶體當中,它倆各佔比列0.1和0.9,還是建議上SSD做二級快取,其實也不貴。

不管如何,BlockCache這塊的總大小是固定的,是由這個引數決定hfile.block.cache.size,預設它是0.25,所以LruBlockCache最大也就是0.25的最大堆記憶體。

在LruBlockCache當中還分了三種優先順序的快取塊,分別是SINGLE、MULTI、MEMORY,比列分別是0.25、0.5、0.25,當快要滿的時候,要把塊剔除出記憶體的時候,就要遍歷所有的塊了,然後計算他們的分別佔的比例,剔除的程式碼還挺有意思。

     PriorityQueue<BlockBucket> bucketQueue =
        new PriorityQueue<BlockBucket>(3);

      bucketQueue.add(bucketSingle);
      bucketQueue.add(bucketMulti);
      bucketQueue.add(bucketMemory);

      int remainingBuckets = 3;
      long bytesFreed = 0;

      BlockBucket bucket;
      while((bucket = bucketQueue.poll()) != null) {
        long overflow = bucket.overflow();
        if(overflow > 0) {
          //把要釋放的空間bytesToFree分給3個bucket,3個分完
          long bucketBytesToFree = Math.min(overflow,
            (bytesToFree - bytesFreed) / remainingBuckets);
          bytesFreed += bucket.free(bucketBytesToFree);
        }
        remainingBuckets--;
      }

搞了一個優先順序佇列,先從SINGLE的開刀、SINGLE不行了,再拿MULTI開刀,最後是MEMORY。bytesToFree是之前計算好的,要釋放的大小=當前值-最小值。

在我們設定列族引數的時候,有一個InMemory的引數,如果設定了它就是MEMORY,如果沒設定,就是SINGLE,SINGLE型別的一旦被訪問過之後,立馬變成高富帥的MULTI,但是沒有希望變成MEMORY。

這裡之前百度的一個哥麼問我,Meta表的塊會不會一直被儲存在MEMORY當中呢,這塊的程式碼寫得讓人有點兒鬱悶的,它是按照列族的引數設定的,但是我怎麼去找Meta表的列族設定啊,啊被我找到了,在程式碼裡面寫著的。

public static final HTableDescriptor META_TABLEDESC = new HTableDescriptor(
      TableName.META_TABLE_NAME,
      new HColumnDescriptor[] {
          new HColumnDescriptor(HConstants.CATALOG_FAMILY)
              // 保持10個版本是為了幫助除錯
              .setMaxVersions(10)
              .setInMemory(true)
              .setBlocksize(8 * 1024)
              .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
              // 不使用BloomFilter
              .setBloomFilterType(BloomType.NONE)
});

可以看出來Meta表的塊只有8K,常駐記憶體,不使用BloomFilter,允許叢集間複製。

再吐槽一下hbase這個Lru演算法吧,做得挺粗糙的,它記錄了每個Block塊的訪問次數,但是它並沒有按照這個來排序,就是簡單的依賴雜湊值來排序。

Tips:江湖傳言一個Regionserver上有一個BlockCache和N個Memstore,它們的大小之和不能大於等於heapsize * 0.8,否則HBase不能正常啟動,想想也是,hbase是記憶體大戶,記憶體稍有不夠就掛掉,大家要小心設定這個快取的引數。

 2.3 BucketCache

原來這塊的圖在上面的那篇文章已經提到了,我就不再重複了,之前沒看的請一定要看,那邊有很詳細的圖解,我這裡只是講點我瞭解的實現。

我們可以從兩個方法裡面看LruBlockCache和BucketCache的關係,一個是getBlock,一個是evictBlock,先看evictBlock。

protected long evictBlock(CachedBlock block, boolean evictedByEvictionProcess) {
  //從map裡面刪除
    map.remove(block.getCacheKey());if (evictedByEvictionProcess && victimHandler != null) {
      boolean wait = getCurrentSize() < acceptableSize();
      boolean inMemory = block.getPriority() == BlockPriority.MEMORY;
   //儲存到victimHandler裡面
      victimHandler.cacheBlockWithWait(block.getCacheKey(), block.getBuffer(),
          inMemory, wait);
    }
    return block.heapSize()
}

在把block剔除出記憶體之後,就把塊加到victimHandler裡面,這個victimHandler就是BucketCache,在CacheConfig例項化LruBlockCache之後就用setVictimCache方法傳進去的。

看完這個我們再看getBlock。

public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat) {
    CachedBlock cb = map.get(cacheKey);
    if(cb == null) {if (victimHandler != null)
        return victimHandler.getBlock(cacheKey, caching, repeat);
      return null;
    }
    return cb.getBuffer();
}

 先從map中取,如果找不到就從victimHandler中取得。

從上面兩個方法,我們可以看出來BucketCache是LruBlockCache的二級快取,它不要了才會存到BucketCache當中,取得時候也是,找不到了才想起人家來。

好,我們現在進入到BucketCache裡面看看,它裡面有幾個重要的屬性。

// Store/read block data
IOEngine ioEngine;
// 記憶體map
private ConcurrentHashMap<BlockCacheKey, RAMQueueEntry> ramCache;
// 後備佇列,質儲存塊的索引資訊,比如offset, length
private ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMap;

這裡怎麼又來了兩個,一個記憶體的,一個後備隊裡的,這個是有區別的RAMQueueEntry當中直接儲存了塊的buffer資料,BucketEntry只是儲存了起始位置和長度。

下面我們看看這個流程吧,還是老規矩,先看入口,再看出口,入口在哪裡,前面的程式碼中提到了,入口在cacheBlockWithWait方法。

    //已經有就不加啦
    if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey))
      return;
    //寫入一級快取
    RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);
    ramCache.put(cacheKey, re);
    //用雜湊值給計算出一個隨機的佇列來
    int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size();
    //把實體也插入到寫入佇列
    BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum);

可以看得出來在這個方法當中,先把塊寫入到ramCache當中,然後再插入到一個隨機的寫入佇列,寫入執行緒有3個,每個寫入執行緒持有一個寫入佇列,執行緒的數量由引數hbase.bucketcache.writer.threads控制。

我們看看這個WriterThread的run方法吧。

     List<RAMQueueEntry> entries = new ArrayList<RAMQueueEntry>();
      try {
        while (cacheEnabled && writerEnabled) {
          try {
            //從inputQueue拿出來放到entries,然後再對entries操作
            entries.add(inputQueue.take());
            inputQueue.drainTo(entries);
          } catch (InterruptedException ie) {
            if (!cacheEnabled) break;
          }
          doDrain(entries);
        }

那我們要關注的就是doDrain的方法了,在這個方法裡面,它主要乾了4件事情。

1、把ramCache當中的實體給剔除出來轉換成BucketEntry,並切入到ioEngine。

2、ioEngine同步,ioEngine包括3種(file,offheap,heap),第一種就是寫入SSD,用的是FileChannel,後兩種是寫入到一個ByteBufferArray

3、把BucketEntry新增到backingMap

4、如果空間不足的話,呼叫freeSpace清理空間,清理空間的方法和LruBlockCache的方法類似。

這裡面的Bucket它也不是一個具體的東西,它裡面記住的也是起始位置,使用了多少次的這些引數,所以說它是一個邏輯上的,而不是物理上的分配的一塊隨機的地址。

final private static class Bucket {
    //基準起始位置
    private long baseOffset;
    //每個item分配的大小
    private int itemAllocationSize; 
    //對應的在bucketSizeInfos中的位置
    private int sizeIndex;
    //總容量
    private int itemCount;
    private int freeList[];
    //空閒的數量
    private int freeCount;
    //已經使用的數量
    private int usedCount;
}

我們是不是可以這麼理解:就是當我們不需要某個塊的時候我們不用去物理的刪除它,只需要不斷的重用它裡面的空間就可以了,而不需要管怎麼刪除、釋放等相關內容。

BucketSizeInfo是負責管理這些Bucket的,它管理著3個佇列,同時它可以動態根據需求,new一些新的不同大小的Bucket出來,也可以把現有的Bucket變更它的大小,Bucket的大小最小是5K,最大是513K。

final class BucketSizeInfo {
    // Free bucket means it has space to allocate a block;
    // Completely free bucket means it has no block.
    private List<Bucket> bucketList, freeBuckets, completelyFreeBuckets;
    private int sizeIndex;
}

sizeIndex是啥意思?是在BucketSizeInfo的數組裡面的位置,它的大小都是有固定的值的,不能多也不能少,這裡就不詳細介紹了。我們直接看WriteToCache這個方法吧,好驗證一下之前的想法。

    //序列化長度 = 資料長度 + 額外的序列化的長度16個位元組
      int len = data.getSerializedLength();
      // This cacheable thing can't be serialized...
      if (len == 0) return null;
      //bucketAllocator給分配點空間
      long offset = bucketAllocator.allocateBlock(len);
      //生成一個實體
      BucketEntry bucketEntry = new BucketEntry(offset, len, accessTime, inMemory);
      //設定Deserializer,具體的實現在HFileBlock當中
      bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
      try {
        if (data instanceof HFileBlock) {
          ByteBuffer sliceBuf = ((HFileBlock) data).getBufferReadOnlyWithHeader();
          sliceBuf.rewind();
          assert len == sliceBuf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE;
          ByteBuffer extraInfoBuffer = ByteBuffer.allocate(HFileBlock.EXTRA_SERIALIZATION_SPACE);
          ((HFileBlock) data).serializeExtraInfo(extraInfoBuffer);
          //先寫入資料資訊,再寫入頭資訊
          ioEngine.write(sliceBuf, offset);
          ioEngine.write(extraInfoBuffer, offset + len - HFileBlock.EXTRA_SERIALIZATION_SPACE);
        } else {
          //如果不是HFileBlock的話,把資料序列化到bb當中,然後寫入到IOEngine
          ByteBuffer bb = ByteBuffer.allocate(len);
          data.serialize(bb);
          ioEngine.write(bb, offset);
        }
      } catch (IOException ioe) {
        // 出錯了就釋放掉這個這個塊
        bucketAllocator.freeBlock(offset);
        throw ioe;
    }

這裡我們看這一句就可以了ioEngine.write(sliceBuf, offset);  在寫入ioEngine的時候是要傳這個offset的,也正好驗證了我之前的想法,所以BucketAllocator.allocateBlock的分配管理這塊就很關鍵了。

關於怎麼分配這塊,還是留個能人講吧,我是講不好了。