一起做个简单的数据库(十三):拆分后更新父节点


用C语言从零开始实现SQLite clone系列:
  1. REPL的介绍和设置
  2. 世上最简单的SQL编译器和虚拟机
  3. 一个在内存中仅能做追加操作的单表数据库
  4. 第一次测试 (含bug处理)
  5. 持久化存储
  6. The Cursor Abstraction
  7. B树介绍
  8. B树叶子节点的格式
  9. 二进制查询和重复键
  10. 叶子节点的拆分
  11. B树的递归搜索
  12. 扫描多级B树


我们史诗般的B树创建之旅的下一步是更新拆分后的父节点。我会用下面的示例来做参考:
1.jpg

示例是这样的,我们把键“3”加入树中,这导致了左侧节点的分裂。拆分之后,我们用下列的方法来修复树:
  1. 将父节点的第一个键作为左侧子节点的最大键“3”
  2. 更新完以后,新增子节点的指针和键对
    • 新的指针指向新的子节点
    • - 新的键是子结点中的最大键“5”


首先我们引入两个新函数:

第一步更新update_internal_node_key(),第二步更新internal_node_insert():
@@ -670,9 +725,11 @@ void leaf_node_split_and_insert(Cursor* cursor, uint32_t key, Row* value) {
*/

void* old_node = get_page(cursor->table->pager, cursor->page_num);
+  uint32_t old_max = get_node_max_key(old_node);
uint32_t new_page_num = get_unused_page_num(cursor->table->pager);
void* new_node = get_page(cursor->table->pager, new_page_num);
initialize_leaf_node(new_node);
+  *node_parent(new_node) = *node_parent(old_node);
*leaf_node_next_leaf(new_node) = *leaf_node_next_leaf(old_node);
*leaf_node_next_leaf(old_node) = new_page_num;

@@ -709,8 +766,12 @@ void leaf_node_split_and_insert(Cursor* cursor, uint32_t key, Row* value) {
if (is_node_root(old_node)) {
 return create_new_root(cursor->table, new_page_num);
} else {
-    printf("Need to implement updating parent after split\n");
-    exit(EXIT_FAILURE);
+    uint32_t parent_page_num = *node_parent(old_node);
+    uint32_t new_max = get_node_max_key(old_node);
+    void* parent = get_page(cursor->table->pager, parent_page_num);
+
+    update_internal_node_key(parent, old_max, new_max);
+    internal_node_insert(cursor->table, parent_page_num, new_page_num);
+    return;
}


为了获得对父节点的引用,我们需要每个节点中指向其父节点的指针。
+uint32_t* node_parent(void* node) { return node + PARENT_POINTER_OFFSET; } 

@@ -660,6 +675,48 @@ void create_new_root(Table* table, uint32_t right_child_page_num) {
uint32_t left_child_max_key = get_node_max_key(left_child);
*internal_node_key(root, 0) = left_child_max_key;
*internal_node_right_child(root) = right_child_page_num;
+  *node_parent(left_child) = table->root_page_num;
+  *node_parent(right_child) = table->root_page_num;


现在我们需要在父节点中找到受影响的单元。子节点不知道自己的页码,所以我们找不到。但是它知道自己的最大键,所以我们可以在父结点中找这个键。
+void update_internal_node_key(void* node, uint32_t old_key, uint32_t new_key) {
+  uint32_t old_child_index = internal_node_find_child(node, old_key);
+  *internal_node_key(node, old_child_index) = new_key;


在internal_node_find_child()中我们会复用一些我们在内部节点查找键的代码。重构internal_node_find()以使用新的帮助器。
-Cursor* internal_node_find(Table* table, uint32_t page_num, uint32_t key) {
-  void* node = get_page(table->pager, page_num);
+uint32_t internal_node_find_child(void* node, uint32_t key) {
+  /*
+  Return the index of the child which should contain
+  the given key.
+  */
+
uint32_t num_keys = *internal_node_num_keys(node);

-/* Binary search to find index of child to search */
+  /* Binary search */
uint32_t min_index = 0;
uint32_t max_index = num_keys; /* there is one more child than key */

@@ -386,7 +394,14 @@ Cursor* internal_node_find(Table* table, uint32_t page_num, uint32_t key) {
 }
}

-uint32_t child_num = *internal_node_child(node, min_index);
+  return min_index;
+}
+
+Cursor* internal_node_find(Table* table, uint32_t page_num, uint32_t key) {
+  void* node = get_page(table->pager, page_num);
+
+  uint32_t child_index = internal_node_find_child(node, key);
+  uint32_t child_num = *internal_node_child(node, child_index);
void* child = get_page(table->pager, child_num);
switch (get_node_type(child)) {
 case NODE_LEAF: 

现在我们进入到本文的核心,实施internal_node_insert()。我分段来解释下:
+void internal_node_insert(Table* table, uint32_t parent_page_num,
+                          uint32_t child_page_num) {
+  /*
+  Add a new child/key pair to parent that corresponds to child
+  */
+
+  void* parent = get_page(table->pager, parent_page_num);
+  void* child = get_page(table->pager, child_page_num);
+  uint32_t child_max_key = get_node_max_key(child);
+  uint32_t index = internal_node_find_child(parent, child_max_key);
+
+  uint32_t original_num_keys = *internal_node_num_keys(parent);
+  *internal_node_num_keys(parent) = original_num_keys + 1;
+
+  if (original_num_keys >= INTERNAL_NODE_MAX_CELLS) {
+    printf("Need to implement splitting internal node\n");
+    exit(EXIT_FAILURE);
+  } 

在索引中需要插入新单元格(子项/键值对)的地方,插入取决于子项中最大的键。在示例中我们能看到,child_max_key应该是5而索引应该是1。

如果内部节点中没有其他单元的空间,则抛出错误。我们稍后会实施。

现在让我们看一下其余的功能:
+
+  uint32_t right_child_page_num = *internal_node_right_child(parent);
+  void* right_child = get_page(table->pager, right_child_page_num);
+
+  if (child_max_key > get_node_max_key(right_child)) {
+    /* Replace right child */
+    *internal_node_child(parent, original_num_keys) = right_child_page_num;
+    *internal_node_key(parent, original_num_keys) =
+        get_node_max_key(right_child);
+    *internal_node_right_child(parent) = child_page_num;
+  } else {
+    /* Make room for the new cell */
+    for (uint32_t i = original_num_keys; i > index; i--) {
+      void* destination = internal_node_cell(parent, i);
+      void* source = internal_node_cell(parent, i - 1);
+      memcpy(destination, source, INTERNAL_NODE_CELL_SIZE);
+    }
+    *internal_node_child(parent, index) = child_page_num;
+    *internal_node_key(parent, index) = child_max_key;
+  }
+} 

因为我们将最右边的孩子指针与其余的子项/密钥对分开存储,所以如果新的子项要成为最右边的子项,我们必须以不同的方式处理事情。

在我们的示例中,我们要进入else块(we would get into the else block)。首先,我们通过将其他单元格向右移动一个空间为新单元格腾出空间。 (尽管在我们的示例中,要移动的单元格为0)

接下来,我们依据索引将新的子指针和键写入单元格。

为了减少测试案例所占用的空间,我现在需要硬编码INTERNAL_NODE_MAX_CELLS。
@@ -126,6 +126,8 @@ const uint32_t INTERNAL_NODE_KEY_SIZE = sizeof(uint32_t);
const uint32_t INTERNAL_NODE_CHILD_SIZE = sizeof(uint32_t);
const uint32_t INTERNAL_NODE_CELL_SIZE =
 INTERNAL_NODE_CHILD_SIZE + INTERNAL_NODE_KEY_SIZE;
+/* Keep this small for testing */
+const uint32_t INTERNAL_NODE_MAX_CELLS = 3;

说到测试,我们的大数据集测试通过了旧的存根,并进入了新的存根:
@@ -65,7 +65,7 @@ describe 'database' do
 result = run_script(script)
 expect(result.last(2)).to match_array([
   "db > Executed.",
-      "db > Need to implement updating parent after split",
+      "db > Need to implement splitting internal node",
 ])

一切都很正常。

我现在引入一个新的测试来打印4节点树。为了让我们比顺序的ID测试更多的案例,此测试将以伪随机顺序添加记录。
+  it 'allows printing out the structure of a 4-leaf-node btree' do
+    script = [
+      "insert 18 user18 person18@example.com",
+      "insert 7 user7 person7@example.com",
+      "insert 10 user10 person10@example.com",
+      "insert 29 user29 person29@example.com",
+      "insert 23 user23 person23@example.com",
+      "insert 4 user4 person4@example.com",
+      "insert 14 user14 person14@example.com",
+      "insert 30 user30 person30@example.com",
+      "insert 15 user15 person15@example.com",
+      "insert 26 user26 person26@example.com",
+      "insert 22 user22 person22@example.com",
+      "insert 19 user19 person19@example.com",
+      "insert 2 user2 person2@example.com",
+      "insert 1 user1 person1@example.com",
+      "insert 21 user21 person21@example.com",
+      "insert 11 user11 person11@example.com",
+      "insert 6 user6 person6@example.com",
+      "insert 20 user20 person20@example.com",
+      "insert 5 user5 person5@example.com",
+      "insert 8 user8 person8@example.com",
+      "insert 9 user9 person9@example.com",
+      "insert 3 user3 person3@example.com",
+      "insert 12 user12 person12@example.com",
+      "insert 27 user27 person27@example.com",
+      "insert 17 user17 person17@example.com",
+      "insert 16 user16 person16@example.com",
+      "insert 13 user13 person13@example.com",
+      "insert 24 user24 person24@example.com",
+      "insert 25 user25 person25@example.com",
+      "insert 28 user28 person28@example.com",
+      ".btree",
+      ".exit",
+    ]
+    result = run_script(script)

输出是这样的:
- internal (size 3)
- leaf (size 7)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- key 1
- leaf (size 8)
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- key 15
- leaf (size 7)
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- key 22
- leaf (size 8)
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
db >

仔细看,你会看到个错误:
- 5
- 6
- 7
- key 1

键应该是7而不是1!

经过一些列的排错,我发现这个错误来自于一些错误的指针算法。
uint32_t* internal_node_key(void* node, uint32_t key_num) {
-  return internal_node_cell(node, key_num) + INTERNAL_NODE_CHILD_SIZE;
+  return (void*)internal_node_cell(node, key_num) + INTERNAL_NODE_CHILD_SIZE;


INTERNAL_NODE_CHILD_SIZE是4,我的希望是为internal_node_cell()的结果添加4个字节,但是internal_node_cell()返回了uint32_t*,这实际上是添加了4 * sizeof(uint32_t)个字节。在进行算术运算之前,我通过将其强制转换为void *来对其进行了修复。

请注意,void指针的算法不是C语言的标准,你的编译器也许没办法编译。之后我也许会写另外的文章详细解释。但是现在我们就结束在void指针算法。

好的。向可操作的B树的实现又迈出了一步。下一步应该是分割内部节点。

原文链接:Part 13 - Updating Parent Node After a Split(翻译:吴世曦)

0 个评论

要回复文章请先登录注册