B+Tree原理及實現
阿新 • • 發佈:2022-05-11
一, B+Tree的定義:
- 分為葉子節點, 分支節點 (非葉子節點) 和根節點
- 分支節點只儲存索引值和節點指標, 指標是指向具體節點的記憶體地址; 葉子節點存有索引值和資料; 根節點和分支節點類似, 不同的是每次檢索資料都是從根節點開始
- 葉子節點之間會構成一個單向連結串列
二, B+Tree的結構圖:
三, B+Tree的應用場景:
一般應用在mysql的儲存引擎innodb, 其優點是檢索資料效率快; 相比二叉平衡樹, 二叉平衡樹在順序插入時會形成一個連結串列, 檢索效能降低, 在資料量多的情況下, 層級較深, 檢索速度慢; 使用B+Tree則可以避免上述問題, 因為其每個節點都會有上限索引值, 比如設定最大度數位3階, 意味著每個節點最多兩個索引值, 和三個指標
四, B+Tree的程式碼實現:
程式碼分析:
索引類:
- data : 儲存的具體資料( 只有葉子節點才會有值)
- indexVal : 索引值
- left : 當前索引左邊的節點指標
- right : 當前索引右邊的節點指標
節點類:
- id : 節點指標(節點的記憶體地址)
- parent : 父節點的指標
- isLeaf : 是否是葉子節點
- indexNum : 當前樹的顆粒度(階), 索引數大於等這個值則分裂
- indexMap : 索引物件陣列
使用PHP程式碼實現B+Tree結構:
程式碼參考網上的文章, 但是跟實際結構有偏差, 所以自己做了優化調整, 支援多階指標, 符合B+Tree官方結構圖( https://www.cs.usfca.edu/~galles/visualization/BPlusTree.html)
<?php /** * Class Data * 資料類 */ class Data { public $data = []; private $indexKey; public function __construct(array $data, $indexKey = 'index') { $this->data = $data; $this->indexKey = $indexKey; } public function getIndexVal() { return isset($this->data[$this->indexKey]) ? $this->data[$this->indexKey] : 0; } public function __toString() { return json_encode($this->data); } } /** * Class Index * 索引物件 */ class Index { /** * @var int * 索引值 */ private $indexVal; /** * @var Data * 索引指向的具體資料,在葉節點中該屬性才有值 */ private $data; /** * @var int * 索引左邊的BTNode的ID */ private $left; /** * @var int * 索引右邊的BTNode的ID */ private $right; public function __construct($indexVal = 0, $left = 0, $right = 0, $data = null) { $this->indexVal = $indexVal; $this->left = $left; $this->right = $right; $this->data = $data; } public function getIndexVal() { return $this->indexVal; } public function getLeft() { return $this->left; } public function getRight() { return $this->right; } /** * @return Data */ public function getData() { return $this->data; } public function updateLeft($pointer) { $this->left = $pointer; } public function updateRight($pointer) { $this->right = $pointer; } } /** * Class BTNode * B+樹節點 */ class BTNode { /** * @var int * 標識節點物件的唯一值 */ private $id = 0; /** * @var int * 父節點的ID值 */ public $parent = 0; /** * @var bool * 是否是葉節點 */ public $isLeaf = false; /** * @var int * 當前樹的顆粒度(階), 索引數大於等這個值則分裂 */ public $indexNum = 0; /** * @var array * 索引物件列表 */ private $indexMap = []; /** * @var int * 下一個兄弟節點的ID值(該屬性僅針對葉子節點) */ public $next = 0; public function __construct($isLeaf = false, $parent = 0) { $this->init($isLeaf, $parent); } protected function init($isLeaf, $parent) { $this->id = uniqid(); $this->isLeaf = $isLeaf; $this->parent = $parent; $this->indexMap = []; } public function getID() { return $this->id; } /** * @param Index $index * 向樹節點中新增新的索引物件,新增完成後需要按索引值升序排序 */ public function addIndex(Index $index) { array_push($this->indexMap, $index); usort($this->indexMap, function (Index $a, Index $b) { if ($a->getIndexVal() == $b->getIndexVal()) { return 0; } return $a->getIndexVal() > $b->getIndexVal() ? 1 : -1; }); $this->indexNum++; } /** * @return IndexMapIterator * 生成indexMap的迭代器 */ public function generateIndexMapIterator() { return new IndexMapIterator($this->indexMap); } /** * @param $order * @return bool * 判斷該節點是否已滿,當前的索引物件樹超過樹的階即為滿. */ public function isFull($order) { return $this->indexNum >= $order; } public function deleteMap($start) { $count = 0; for ($i = $start; $i < $this->indexNum; $i++) { $count++; unset($this->indexMap[$i]); } $this->indexNum = $this->indexNum - $count; } public function updateParent($id) { $this->parent = $id; } public function setNext($id) { $this->next = $id; } } /** * Class BPlusTree * B+樹 */ class BPlusTree { /** * @var int * 根節點ID */ public $root = 0; /** * @var array * 節點池: 節點的id為key, 節點物件為value */ private $nodeMap = []; /** * @var int * B+樹的階 */ private $order; public function __construct($order = 3) { $this->order = $order; } /** * @param Data $record * 寫入資料 */ public function insert(Data $record) { $indexVal = $record->getIndexVal(); if ($this->isEmpty()) { //樹為空,直接建立一個根節點,此節點是葉節點. $node = new BTNode(true, 0); $node->addIndex(new Index($indexVal, 0, 0, $record->data)); $this->storeNode($node); $this->root = $node->getID(); } else { $tmpNode = $this->getNodeByID($this->root); $prevNode = $tmpNode; //定位需要插入的葉節點 while ($tmpNode != null) { $prevNode = $tmpNode; $indexMapIterator = $tmpNode->generateIndexMapIterator(); $left = false; while ($indexMapIterator->valid()) { $indexObj = $indexMapIterator->current(); if ($indexVal > $indexObj->getIndexVal()) { $indexMapIterator->next(); } elseif ($indexVal == $indexObj->getIndexVal()) { //樹中已經存在相同的索引,不做處理. return false; } else { $left = true; break; } } if ($left) { $tmpNode = $this->getNodeByID($indexObj->getLeft()); } else { $indexMapIterator->prev(); $currentIndex = $indexMapIterator->current(); $tmpNode = $this->getNodeByID($currentIndex->getRight()); } } //葉子節點中儲存具體的值. $prevNode->addIndex(new Index($indexVal, 0, 0, $record->data)); //樹節點需要分裂 if ($prevNode->isFull($this->order)) { $this->split($prevNode); } } } /** * @param BTNode $node * 分裂節點 * */ protected function split(BTNode $node) { //獲取中間索引,建立新的索引 $middle = intval($node->indexNum/2); $middleIndexValue = 0; $pid = $node->parent; //分裂節點為根節點時,樹高度+1,建立新節點作為根節點. if ($pid == 0) { $parent = new BTNode(false, 0); $this->storeNode($parent); $pid = $parent->getID(); //新節點作為根節點 $this->root = $pid; } $parent = $this->getNodeByID($pid); $newNode = new BTNode($node->isLeaf, $pid); $this->storeNode($newNode); $indexMapIterator = $node->generateIndexMapIterator(); while ($indexMapIterator->valid()) { //將中間索引及之後的索引,移動到新節點 $indexObj = $indexMapIterator->current(); if ($newNode->isLeaf) { if ($indexMapIterator->key() >= $middle) { $newNode->addIndex(new Index($indexObj->getIndexVal(), $indexObj->getLeft(), $indexObj->getRight(), $indexObj->getData())); } } else { if ($indexMapIterator->key() > $middle) { $newNode->addIndex(new Index($indexObj->getIndexVal(), $indexObj->getLeft(), $indexObj->getRight(), $indexObj->getData())); //修改當前索引下節點的父節點 $sonLeftNode = $this->getNodeByID($indexObj->getLeft()); $sonLeftNode->updateParent($newNode->getID()); $sonRightNode = $this->getNodeByID($indexObj->getRight()); $sonRightNode->updateParent($newNode->getID()); } } if ($indexMapIterator->key() == $middle) { $middleIndexValue = $indexObj->getIndexVal(); } $indexMapIterator->next(); } //原節點的父節點更新為新的父節點(原節點為根節點時,會重新建立根節點,此時原節點的父節點是這個新的根節點) $node->updateParent($pid); //原節點分裂後,中間索引及之後的索引都被移動到了新節點,所以把移動的索引在原節點中刪除 $node->deleteMap($middle); //B+樹的葉子節點之間形成一個連結串列,在原節點分裂後,原節點的next指向新節點,新節點的next指向原節點的next if ($node->isLeaf) { $newNode->setNext($node->next); $node->setNext($newNode->getID()); } //向分裂節點的父節點新增索引物件,該索引物件的索引值是分裂節點的中間索引值,指向的是新建立的樹節點和原節點 $parent->addIndex(new Index($middleIndexValue, $node->getID(), $newNode->getID())); //調整父節點索引的指標 $parentIndexMapIterator = $parent->generateIndexMapIterator(); while ($parentIndexMapIterator->valid()) { $currentIndexObj = $parentIndexMapIterator->current(); if ($parentIndexMapIterator->key() > 0) { $parentIndexMapIterator->prev(); $prevIndexObj = $parentIndexMapIterator->current(); $prevRight = $prevIndexObj->getRight(); $currentIndexObj->updateLeft($prevRight); $parentIndexMapIterator->next(); } $parentIndexMapIterator->next(); } //若分裂節點的父節點索引達到上限,需要分裂父節點 if ($parent->isFull($this->order)) { $this->split($parent); } } /** * @param $index * @return Data|string * 索引單條查詢 */ public function find($indexVal) { $tmpNode = $this->getNodeByID($this->root); while ($tmpNode != null) { $indexMapIterator = $tmpNode->generateIndexMapIterator(); $left = false; while ($indexMapIterator->valid()) { $indexObj = $indexMapIterator->current(); if ($indexVal > $indexObj->getIndexVal()) { $indexMapIterator->next(); } elseif ($indexVal == $indexObj->getIndexVal()) { //只有葉節點中索引值相同的索引物件才持有具體資料. if ($tmpNode->isLeaf) { return $indexObj->getData(); } else { $indexMapIterator->next(); } } else { $left = true; break; } } if ($left) { $tmpNode = $this->getNodeByID($indexObj->getLeft()); } else { $indexMapIterator->prev(); $currentIndex = $indexMapIterator->current(); $tmpNode = $this->getNodeByID($currentIndex->getRight()); } } return 'record ['.$indexVal. '] is not exists!'; } /** * @param $start * @param $end * @return array * 範圍查詢 */ public function rangeFind($start, $end) { $index = $start; $tmpNode = $this->getNodeByID($this->root); $prevNode = $tmpNode; //根據start索引,定位到葉節點連結串列開始的節點. while ($tmpNode != null) { $prevNode = $tmpNode; $indexMapIterator = $tmpNode->generateIndexMapIterator(); $left = false; while ($indexMapIterator->valid()) { $indexObj = $indexMapIterator->current(); if ($index >= $indexObj->getIndexVal()) { $indexMapIterator->next(); } else { $left = true; break; } } if ($left) { $tmpNode = $this->getNodeByID($indexObj->getLeft()); } else { $indexMapIterator->prev(); $currentIndex = $indexMapIterator->current(); $tmpNode = $this->getNodeByID($currentIndex->getRight()); } } $tNode = $prevNode; $resultData = []; //從定位到的節點,遍歷葉節點連結串列,查詢出範圍內的記錄 while ($tNode != null) { $indexMapIterator = $tNode->generateIndexMapIterator(); while ($indexMapIterator->valid()) { $indexObj = $indexMapIterator->current(); if ($indexObj->getIndexVal() > $end) { break 2; } if ($indexObj->getIndexVal() >= $start) { array_push($resultData, $indexObj->getData()); } $indexMapIterator->next(); } $tNode = $this->getNodeByID($tNode->next); } return $resultData; } public function isEmpty() { return $this->root === 0; } /** * @param BTNode $node * 以節點的id為key, 節點物件為value, 儲存到節點池中. */ private function storeNode(BTNode $node) { $id = $node->getID(); $this->nodeMap[$id] = $node; } /** * @param $id * @return BTNode */ public function getNodeByID($id) { return isset($this->nodeMap[$id]) ? $this->nodeMap[$id] : null; } /** * @param bool $onlyLeafNode * 列印所有節點 */ public function dumpNodeMap($onlyLeafNode = false) { // echo '<pre>'; foreach ($this->nodeMap as $eachNode) { // if (!$onlyLeafNode || $eachNode->isLeaf) { // print_r($eachNode); // } if (!$eachNode->isLeaf) { print_r($eachNode); } } } } /** * Class IndexMapIterator * BTNode的indexMap的迭代器 */ class IndexMapIterator { private $indexMap = []; private $position = 0; public function __construct($indexMap = []) { $this->indexMap = $indexMap; $this->position = 0; } /** * @return Index */ public function current() { return $this->indexMap[$this->position]; } public function next() { $this->position++; } public function prev() { $this->position--; } public function key() { return $this->position; } public function valid() { return isset($this->indexMap[$this->position]); } public function rewind() { $this->position = 0; } } $dataList = [ ['id' => 10, 'name' => 'name_10', 'age' => 28], ['id' => 20, 'name' => 'name_20', 'age' => 23], ['id' => 30, 'name' => 'name_30', 'age' => 25], ['id' => 40, 'name' => 'name_40', 'age' => 24], ['id' => 5, 'name' => 'name_5', 'age' => 18], ['id' => 15, 'name' => 'name_15', 'age' => 22], ['id' => 50, 'name' => 'name_50', 'age' => 21], ['id' => 35, 'name' => 'name_35', 'age' => 24], ['id' => 60, 'name' => 'name_60', 'age' => 23], ['id' => 45, 'name' => 'name_45', 'age' => 23], ['id' => 22, 'name' => 'name_22', 'age' => 19], ]; $btree = new BPlusTree(5); foreach ($dataList as $value) { $data = new Data($value, 'id'); $btree->insert($data); } // var_dump($btree->find(22)); print_r($btree); var_dump($btree->rangeFind(40, 70)); // $btree->dumpNodeMap(); die;