1. 程式人生 > >RocksDB Persistent Read Cache部分程式碼分析

RocksDB Persistent Read Cache部分程式碼分析

【github Wiki搜尋Persistent Read cache詳細描述】

第二種優勢:直接拿走不影響使用

 

設計時不考慮針對某種裝置

三個主要部分:

Block Lookup Index:maps a given LSM block address to a cache record locator. The cacherecord locator helps locate the block data in the cache. The cache record canbe described as { file-id, offset, size }.將LSM block

地址對映到一個cache record locator上,cache record locator幫助在cache中查詢block data(根據hash表確定所在檔案以及offset等,在查詢時使用該表)

 

File Lookup Index / LRU:This index maps a given file identifier to its reference object abstraction.The object abstraction can be used for reading data from the cache【data 也儲存在cache中】.When we run out of space on the persistent cache, we evict the least recentlyused file from this index.

將file ID和它引用物件的抽象對映起來,物件抽象用來從cache中讀資料,persistent cache空間用完時,踢出index中最近最少使用的file(在空間不夠回收空間時使用該表,隨機選擇了要剔除的file後進行刪除)

 

File Layout:Thecache is stored in the file system as a sequence of files. Each file contains asequence of records which contain data corresponding to a block on RocksDB LSM.

整個cache相當於檔案系統中一系列檔案【配置時指定檔案路徑】,每個file包含一系列的records,每個records包含RocksDB LSM中的block【給定LSM block地址後,通過第一個BLOCK LOOKUP INDEX查詢到file-id offset size

 API: https://github.com/facebook/rocksdb/blob/master/include/rocksdb/persistent_cache.h

 

// Persistent Cache
//
// Persistent cache is tiered key-value cache that can use persistent medium. It
// is a generic design and can leverage any storage medium -- disk/SSD/NVM/RAM.
// The code has been kept generic but significant benchmark/design/development
// time has been spent to make sure the cache performs appropriately for
// respective storage medium.
// The file defines
// PersistentCacheTier    : Implementation that handles individual cache tier
// PersistentTieresCache  : Implementation that handles all tiers as a logical
//                          unit
//
// PersistentTieredCache architecture:
// +--------------------------+ PersistentCacheTier that handles multiple tiers
// | +----------------+       |
// | | RAM            | PersistentCacheTier that handles RAM (VolatileCacheImpl)
// | +----------------+       |
// |   | next                 |
// |   v                      |
// | +----------------+       |
// | | NVM            | PersistentCacheTier implementation that handles NVM
// | +----------------+ (BlockCacheImpl)
// |   | next                 |
// |   V                      |
// | +----------------+       |
// | | LE-SSD         | PersistentCacheTier implementation that handles LE-SSD
// | +----------------+ (BlockCacheImpl)
// |   |                      |
// |   V                      |
// |  null                    |
// +--------------------------+
//               |
//               V
//              null
namespace rocksdb {

// Persistent Cache Config
//
// This struct captures all the options that are used to configure persistent
// cache. Some of the terminologies used in naming the options are
//
// dispatch size :
// This is the size in which IO is dispatched to the device
//
// write buffer size :
// This is the size of an individual write buffer size. Write buffers are
// grouped to form buffered file.
//
// cache size :
// This is the logical maximum for the cache size
//
// qdepth :
// This is the max number of IOs that can issues to the device in parallel
//
// pepeling :
// The writer code path follows pipelined architecture, which means the
// operations are handed off from one stage to another
//
// pipelining backlog size :
// With the pipelined architecture, there can always be backlogging of ops in
// pipeline queues. This is the maximum backlog size after which ops are dropped
// from queue

Wiki Benchmarking-tools部分:persistent_cache_bench

通過輸入指令時—path獲得引數path  為一個檔案路徑:

-path(Path for cachefile) type: string default: "/tmp/microbench/blkcache"

 

path_("") {
 verbose_ = IsFlagPresent(flags, ARG_VERBOSE);
 json_ = IsFlagPresent(flags, ARG_JSON);

 std::map<std::string, std::string>::const_iterator itr = options.find(ARG_PATH);
 if (itr != options.end()) {
   path_ = itr->second;
   if (path_.empty()) {
     exec_state_ = LDBCommandExecuteResult::Failed("--path: missing pathname");
   }
 }

cache_ =NewBlockCache(Env::Default(), path_,
                        /*size=*/std::numeric_limits<uint64_t>::max(),
                         /*direct_writes=*/true);
cache_ =NewTieredCache(Env::Default(), path_,
                         /*memory_size=*/static_cast<size_t>(1 * 1024 * 1024));

插入:

Persistent_cache_test.h中

PersistentCacheTierTest下

void Insert(const size_t nthreads,const size_t max_keys) {
   key_ = 0;
   max_keys_ = max_keys;
   // spawn threads
auto fn =std::bind(&PersistentCacheTierTest::InsertImpl, this);//開多個執行緒執行InsertImpl
   auto threads = SpawnThreads(nthreads, fn);
   // join with threads
   Join(std::move(threads));
   // Flush cache
   Flush();
 }

InsertImpl中 Status status = cache_->Insert(key,data, sizeof(data)); 

cache_:std::shared_ptr<PersistentCacheTier>cache_;

PersistentCacheTier:  BlockCacheTier,VolatileCacheTier繼承它 PersistentCacheTier  相當於一個介面,可疊加多層

// This a logical abstraction thatdefines a tier of the persistent cache. Tiers
// can be stacked over one another.PersistentCahe provides the basic definition
// for accessing/storing in thecache. PersistentCacheTier extends the interface
// to enable management and stackingof tiers.

Volatile_tier_impl.h

// VolatileCacheTier
//
// This file provides persistent cache tier implementation for caching
// key/values in RAM.
//
//        key/values
//           |
//           V
// +-------------------+
// | VolatileCacheTier | Store in an evictable hash table
// +-------------------+
//           |
//           V
//       on eviction
//   pushed to next tier
//
// The implementation is designed to be concurrent. The evictable hash table
// implementation is not concurrent at this point though.
//
// The eviction algorithm is LRU

class BlockCacheTier : publicPersistentCacheTier

執行在BlockCacheTier中的Insert函式:

 

上一個cacheFile寫滿了,執行Status BlockCacheTier::NewCacheFile() {}新建一個cacheFile

將新建的cache_file_插入元資料,status = metadata_.Insert(cache_file_);【插入index的是file】

 

boolBlockCacheTierMetadata::Insert(BlockCacheFile* file) {
  return cache_file_index_.Insert(file);//插入進索引
}

Hash_table_evictable.h:

將file插入索引,根據GetBucket獲得要插入的bucket,再獲取要插入的LRUList和這個LRUList對應的mutex,加鎖,插入file到bucket,也插入LRU

  bool Insert(T* t) {
    const uint64_t h = Hash()(t);//獲得hash值(file的)
    typename hash_table::Bucket& bucket = GetBucket(h);
    LRUListType& lru = GetLRUList(h);
    port::RWMutex& lock = GetMutex(h);

    WriteLock _(&lock);
    if (hash_table::Insert(&bucket, t)) {
      lru.Push(t);
      return true;
    }
    return false;
  }

其中:

  typename hash_table::Bucket& GetBucket(const uint64_t h) {
    const uint32_t bucket_idx = h % hash_table::nbuckets_;
    return hash_table::buckets_[bucket_idx];
  }

  LRUListType& GetLRUList(const uint64_t h) {
    const uint32_t bucket_idx = h % hash_table::nbuckets_;
    const uint32_t lock_idx = bucket_idx % hash_table::nlocks_;
    return lru_lists_[lock_idx];
  }

  port::RWMutex& GetMutex(const uint64_t h) {
    const uint32_t bucket_idx = h % hash_table::nbuckets_;
    const uint32_t lock_idx = bucket_idx % hash_table::nlocks_;
    return hash_table::locks_[lock_idx];
  }

整個區域分為nbuckets_個bucket,根據hash(t)餘數確定是第幾個bucket,根據nbuckets_與nlocks_大小關係確定幾個bucket公用一個鎖個lru_list(nlocks_>=nbuckets_ 則一個bucket一個lru list/mutex)

 

刪除最近最少使用的object【先隨機選擇一個lru_list,看其中是否有可以剔除的object,如果有,則剔除,否則看下一個lru list有沒有可以踢的,直到第nlocks_個lru list】

hash_table_evictable.h

  T* Evict(const std::function<void(T*)>& fn = nullptr) {
    uint32_t random = Random::GetTLSInstance()->Next();
    const size_t start_idx = random % hash_table::nlocks_;//隨機找一個部分開始檢視能不能踢
    T* t = nullptr;

    // iterate from start_idx .. 0 .. start_idx
    for (size_t i = 0; !t && i < hash_table::nlocks_; ++i) {//從這個一直找到第nlocks個
      const size_t idx = (start_idx + i) % hash_table::nlocks_;

      WriteLock _(&hash_table::locks_[idx]);
      LRUListType& lru = lru_lists_[idx];
      if (!lru.IsEmpty() && (t = lru.Pop()) != nullptr) {//是否有可以剔除的
        assert(!t->refs_);
        // We got an item to evict, erase from the bucket
        const uint64_t h = Hash()(t);
        typename hash_table::Bucket& bucket = GetBucket(h);
        T* tmp = nullptr;
        bool status = hash_table::Erase(&bucket, t, &tmp);
        assert(t == tmp);
        (void)status;
        assert(status);
        if (fn) {
          fn(t);
        }
        break;
      }
      assert(!t);
    }
    return t;
  }

預留空間: block_cache_tier.cc

bool BlockCacheTier::Reserve(const size_t size) {
  WriteLock _(&lock_);
  assert(size_ <= opt_.cache_size);


  if (size + size_ <= opt_.cache_size) {
    // there is enough space to write 如果空間還夠,不用剔除
    size_ += size;
    return true;
  }


  assert(size + size_ >= opt_.cache_size);
  // there is not enough space to fit the requested data
  // we can clear some space by evicting cold data


  const double retain_fac = (100 - kEvictPct) / static_cast<double>(100);//kEvictPct:cache滿了的時候要剔除的百分比
  while (size + size_ > opt_.cache_size * retain_fac) {
    unique_ptr<BlockCacheFile> f(metadata_.Evict());//while迴圈剔除一些檔案
    if (!f) {
      // nothing is evictable
      return false;
    }
    assert(!f->refs_);
    uint64_t file_size;
    if (!f->Delete(&file_size).ok()) {
      // unable to delete file
      return false;
    }


    assert(file_size <= size_);
    size_ -= file_size;
  }


  size_ += size;
  assert(size_ <= opt_.cache_size * 0.9);
  return true;
}