blob: 45386fe7623db551e6725a10b3b705872e1c67d7 [file] [log] [blame] [edit]
<
/*
* Copyright (C) 2015 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "block_tree.h"
#include <assert.h>
#include <inttypes.h>
#include <limits.h>
#include <lk/compiler.h>
#include <lk/macros.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "array.h"
#include "block_allocator.h"
#include "block_cache.h"
#include "crypt.h"
#include "debug.h"
#include "transaction.h"
#if BUILD_STORAGE_TEST
#include <math.h>
#endif
bool print_lookup;
static bool print_ops;
static bool print_node_changes;
static bool print_internal_changes;
static bool print_changes;
static bool print_changes_full_tree;
static bool print_check_errors;
/**
* struct block_tree_node - On-disk B+ tree node header and payload start
* @iv: initial value used for encrypt/decrypt
* @is_leaf: 0 for internal nodes, 1 for leaf nodes.
* @data: Key, child and data payload, size of entries is tree specific
* so accessor functions are needed to get to specific entries.
*
* On-disk B+ tree node format.
*
* If %is_leaf is 0, the node is an internal B+ tree node and @data contains n
* keys and n + 1 child block-macs. If %is_leaf is 1, the node is a leaf node
* and @data contains n keys and n data entries. For either node type
* n <= block_tree_max_key_count. For root leaf nodes n >= 0, for root internal
* nodes n >= 1, otherwise n >= block_tree_min_key_count. While updating the
* tree, n can temporarily exceed these limits by 1 for a single node. There is
* no room in this struct to store extra keys, so the extra key and child/data
* is stored in the in-memory tree struct.
*
* The value of block_tree_max_key_count and block_tree_min_key_count depends
* on the block, key, child and data sizes for the tree, and may not be the
* same for leaf and internal nodes.
*
* Keys are stored starting at @data, child and data values are stored starting
* at data + block_tree_max_key_count * key-size.
*
* TODO: store n in this struct? Currently a 0 key value us used to determine
* how full a node is, which prevents storing 0 keys in the tree.
*/
struct block_tree_node {
struct iv iv;
uint64_t is_leaf;
uint8_t data[0];
};
/**
* block_tree_node_is_leaf - Check if node is a leaf or an internal node
* @node_ro: Node pointer.
*
* Return: %true if @node_ro is a leaf node, %false if @node_ro is an internal
* node.
*/
static bool block_tree_node_is_leaf(const struct block_tree_node* node_ro) {
assert(node_ro);
assert(node_ro->is_leaf <= 1);
return node_ro->is_leaf;
}
/**
* block_tree_set_sizes - Configure tree block and entry sizes
* @tree: Tree object.
* @block_size: Block size.
* @key_size: Key size. [1-8].
* @child_size: Child size. [@key_size-24].
* @data_size: Data size. [1-24].
*
* Store tree block and entry sizes and calculate key counts.
*/
static void block_tree_set_sizes(struct block_tree* tree,
size_t block_size,
size_t key_size,
size_t child_size,
size_t data_size) {
size_t payload_size = block_size - sizeof(struct block_tree_node);
assert(payload_size < block_size);
assert(key_size);
assert(key_size <= sizeof(data_block_t));
assert(child_size >= key_size);
assert(child_size <= sizeof(struct block_mac));
assert(data_size);
assert(data_size <= sizeof(struct block_mac));
tree->key_size = key_size;
tree->child_data_size[0] = child_size;
tree->child_data_size[1] = data_size;
tree->key_count[0] = (payload_size - child_size) / (key_size + child_size);
/* TODO: allow next pointer when mac is not needed? */
tree->key_count[1] = (payload_size) / (key_size + data_size);
tree->block_size = block_size;
assert(tree->key_count[0] >= 2);
assert(tree->key_count[1] >= 2);
}
/**
* is_zero - Helper function to check that buffer only contain 0 bytes
* @data: Data pointer.
* @size: Number of bytes to check.
*
* Return: %true if @size is 0 or @data[0..@size - 1] is all 0, %false
* otherwise.
*/
static bool is_zero(const void* data, size_t size) {
if (!size) {
return true;
}
assert(size >= 1);
return !*(char*)data && !memcmp(data, data + 1, size - 1);
}
/**
* block_tree_max_key_count - Return max key count for leaf or internal nodes
* @tree: Tree object.
* @leaf: %true for leaf node, %false for internal node.
*
* Return: Maximum number of keys that fit in a leaf node or in and internal
* node.
*/
static unsigned int block_tree_max_key_count(const struct block_tree* tree,
bool leaf) {
unsigned int key_count = tree->key_count[leaf];
assert(key_count);
assert(key_count * tree->key_size < tree->block_size);
return key_count;
}
/**
* block_tree_node_max_key_count - Return max key count for specific node
* @tree: Tree object.
* @node_ro: Node pointer.
*
* Return: Maximum number of keys that fit @node_ro.
*/
static unsigned int block_tree_node_max_key_count(
const struct block_tree* tree,
const struct block_tree_node* node_ro) {
return block_tree_max_key_count(tree, block_tree_node_is_leaf(node_ro));
}
/**
* Child selection for block_tree_node_shift
* @SHIFT_LEAF_OR_LEFT_CHILD: Required value for leaf nodes, selects left
* child for internal nodes.
* @SHIFT_RIGHT_CHILD: Select right child.
* @SHIFT_LEFT_CHILD: Select left child (for src_index).
* @SHIFT_BOTH: Select left child at start and right child at
* end. Only valid when inserting data at the end
* of a node.
*/
#define SHIFT_LEAF_OR_LEFT_CHILD 0U
#define SHIFT_RIGHT_CHILD (1U << 0)
#define SHIFT_LEFT_CHILD (1U << 1)
#define SHIFT_BOTH (SHIFT_RIGHT_CHILD | SHIFT_LEFT_CHILD)
/**
* block_tree_node_shift - Helper function to insert or remove entries in a node
* @tree: Tree object.
* @node_rw: Node pointer.
* @dest_index: Destination index for existing entries to shift.
* @src_index: Source index for existing entries to shift.
* @shift_mode: Specifies which child to shift for internal nodes.
* @new_key: When shifting up (inserting), keys to insert.
* @new_data: When shifting up (inserting), data (or child) to insert.
* @overflow_key: When shifting up (inserting), location to store keys that
* was shifted out of range. Can be %NULL if all those keys are
* 0.
* @overflow_data: When shifting up (inserting), location to store data (or
* child) that was shifted out of range. Can be %NULL if that
* data is all 0.
*/
static void block_tree_node_shift(const struct block_tree* tree,
struct block_tree_node* node_rw,
unsigned int dest_index,
unsigned int src_index,
uint32_t shift_mode,
const void* new_key,
const void* new_data,
void* overflow_key,
void* overflow_data) {
bool is_leaf = block_tree_node_is_leaf(node_rw);
unsigned int max_count = tree->key_count[is_leaf];
int i;
void* base;
size_t entry_size;
const void* src;
void* dest;
size_t size;
unsigned int clear_index;
assert(max_count);
assert(dest_index <= max_count + !is_leaf + 1);
for (i = 0; i < 2; i++) {
if (i == 0) {
/* key */
base = node_rw->data;
entry_size = tree->key_size;
} else {
/* data/child */
base = node_rw->data + tree->key_size * max_count;
entry_size = tree->child_data_size[is_leaf];
if (!is_leaf) {
/* internal nodes have one more child than keys */
max_count++;
}
if (shift_mode & SHIFT_RIGHT_CHILD) {
assert(!is_leaf);
if (!(shift_mode & SHIFT_LEFT_CHILD) && src_index != UINT_MAX) {
src_index++;
}
dest_index++;
}
}
if (src_index < dest_index) {
/* Inserting, copy entries that will be overwritten to overflow_* */
size = (dest_index - src_index) * entry_size;
if (src_index == max_count) {
src = i == 0 ? new_key : new_data;
} else {
src = base + max_count * entry_size - size;
}
dest = i == 0 ? overflow_key : overflow_data;
if (dest) {
if (print_node_changes) {
#if TLOG_LVL >= TLOG_LVL_DEBUG
printf("%s: copy %p, index %d/%d, to overflow, %p, size %zd, is_zero %d\n",
__func__, src, max_count - (dest_index - src_index),
max_count, dest, size, is_zero(src, size));
#else
printf("%s: copy src index %d/%d, to overflow dest, size %zd, is_zero %d\n",
__func__, max_count - (dest_index - src_index),
max_count, size, is_zero(src, size));
#endif
}
memcpy(dest, src, size);
} else {
assert(is_zero(src, size));
}
}
if (src_index < max_count) {
/* Inserting or deleting, shift up or down */
src = base + src_index * entry_size;
dest = base + dest_index * entry_size;
size = (max_count - MAX(src_index, dest_index)) * entry_size;
if (print_node_changes) {
#if TLOG_LVL >= TLOG_LVL_DEBUG
printf("%s: move %p, index %d, to %p, index %d, size %zd, is_zero %d\n",
__func__, src, src_index, dest, dest_index, size,
is_zero(src, size));
#else
printf("%s: move src index %d, to dest index %d, size %zd, is_zero %d\n",
__func__, src_index, dest_index, size,
is_zero(src, size));
#endif
}
memmove(dest, src, size);
if (src_index >= dest_index) {
clear_index = max_count + dest_index - src_index;
}
} else {
clear_index = dest_index;
}
if (src_index < dest_index) {
/* Inserting, copy new entries passed in */
assert(new_key);
assert(new_data);
src = i == 0 ? new_key : new_data;
dest = base + src_index * entry_size;
size = (dest_index - src_index) * entry_size;
if (src_index == max_count) {
/* NOP, data was copied to overflow_* above */
} else {
assert(src);
if (print_node_changes) {
#if TLOG_LVL >= TLOG_LVL_DEBUG
printf("%s: copy new data %p, to %p, index %d, size %zd, is_zero %d\n",
__func__, src, dest, src_index, size,
is_zero(src, size));
#else
printf("%s: copy new data, index %d, size %zd, is_zero %d\n",
__func__, src_index, size,
is_zero(src, size));
#endif
}
memcpy(dest, src, size);
}
} else {
/* Deleting, clear unused space at end */
assert(dest_index <= max_count);
dest = base + clear_index * entry_size;
size = (max_count - clear_index) * entry_size;
if (print_node_changes) {
#if TLOG_LVL >= TLOG_LVL_DEBUG
printf("%s: clear %p, index %d/%d, size %zd\n", __func__, dest,
clear_index, max_count, size);
#else
printf("%s: clear dest, index %d/%d, size %zd\n", __func__,
clear_index, max_count, size);
#endif
}
memset(dest, 0, size);
}
}
}
/**
* block_tree_node_merge_entries - Helper function to merge nodes
* @tree: Tree object.
* @node_rw: Destination node.
* @src_node_ro: Source node.
* @dest_index: Index to insert at in @node_rw.
* @count: Number of entries to copy from start of @src_node_ro.
* @merge_key: For internal nodes, key to insert between nodes.
*/
static void block_tree_node_merge_entries(
const struct block_tree* tree,
struct block_tree_node* node_rw,
const struct block_tree_node* src_node_ro,
unsigned int dest_index,
unsigned int count,
const void* merge_key) {
bool is_leaf = block_tree_node_is_leaf(node_rw);
unsigned int max_count = tree->key_count[is_leaf];
void* dest_key;
uint32_t shift_mode = SHIFT_LEAF_OR_LEFT_CHILD;
if (!is_leaf) {
dest_key = node_rw->data + tree->key_size * dest_index;
assert(is_zero(dest_key, tree->key_size));
memcpy(dest_key, merge_key, tree->key_size);
dest_index++;
shift_mode = SHIFT_BOTH;
}
block_tree_node_shift(tree, node_rw, dest_index + count, dest_index,
shift_mode, src_node_ro->data,
src_node_ro->data + tree->key_size * max_count, NULL,
NULL);
}
/**
* block_tree_node_shift_down - Helper function to delete entries from node
* @tree: Tree object.
* @node_rw: Node.
* @dest_index: Destination index for existing entries to shift (or
* first entry to remove).
* @src_index: Source index for existing entries to shift (or first entry
* after @dest_index not to remove).
* @shift_mode: Specifies which child to shift for internal nodes.
*/
static void block_tree_node_shift_down(const struct block_tree* tree,
struct block_tree_node* node_rw,
unsigned int dest_index,
unsigned int src_index,
uint32_t shift_mode) {
assert(dest_index < src_index);
block_tree_node_shift(tree, node_rw, dest_index, src_index, shift_mode,
NULL, NULL, NULL, NULL);
}
/**
* block_tree_node_shift_down - Helper function to delete entries from end of
* node
* @tree: Tree object.
* @node_rw: Node.
* @start_index: Index of first entry to remove. For internal nodes the
* the right child is removed, so @start_index refers to the
* key index, not the child index.
*/
static void block_tree_node_clear_end(const struct block_tree* tree,
struct block_tree_node* node_rw,
unsigned int start_index) {
block_tree_node_shift_down(tree, node_rw, start_index, UINT_MAX,
block_tree_node_is_leaf(node_rw)
? SHIFT_LEAF_OR_LEFT_CHILD
: SHIFT_RIGHT_CHILD);
}
/**
* block_tree_node_insert - Helper function to insert one entry in a node
* @tree: Tree object.
* @node_rw: Node.
* @index: Index to insert at.
* @shift_mode: Specifies which child to insert for internal nodes.
* @new_key: Key to insert.
* @new_data: Data or child to insert.
* @overflow_key: Location to store key that was shifted out of range. Can be
* %NULL if that key is always 0.
* @overflow_data: Location to store data (or child) that was shifted out of
* range. Can be %NULL if that data is all 0.
*/
static void block_tree_node_insert(const struct block_tree* tree,
struct block_tree_node* node_rw,
unsigned int index,
uint32_t shift_mode,
const void* new_key,
const void* new_data,
void* overflow_key,
void* overflow_data) {
block_tree_node_shift(tree, node_rw, index + 1, index, shift_mode, new_key,
new_data, overflow_key, overflow_data);
}
/**
* block_tree_node_get_key - Get key from node or in-memory pending update
* @tree: Tree object.
* @node_block: Block number where @node_ro data is stored.
* @node_ro: Node.
* @index: Index of key to get.
*
* Return: key at @index, or 0 if there is no key at @index. If @index is
* equal to max key count, check for a matching entry in @tree->inserting.
*/
static data_block_t block_tree_node_get_key(
const struct block_tree* tree,
data_block_t node_block,
const struct block_tree_node* node_ro,
unsigned int index) {
data_block_t key = 0;
const void* keyp;
const size_t key_count = block_tree_node_max_key_count(tree, node_ro);
const size_t key_size = tree->key_size;
assert(node_ro);
if (index < key_count) {
keyp = node_ro->data + index * tree->key_size;
assert(sizeof(key) >= key_size);
memcpy(&key, keyp, key_size);
}
if (!key && node_block == tree->inserting.block) {
assert(index >= key_count);
if (index <= key_count) {
key = tree->inserting.key;
}
}
return key;
}
/**
* block_tree_node_set_key - Set key in node
* @tree: Tree object.
* @node_rw: Node.
* @index: Index of key to set.
* @new_key: Key value to write.
*/
static void block_tree_node_set_key(const struct block_tree* tree,
struct block_tree_node* node_rw,
unsigned int index,
data_block_t new_key) {
const size_t key_size = tree->key_size;
const size_t key_count = block_tree_node_max_key_count(tree, node_rw);
assert(node_rw);
assert(index < key_count);
assert(key_size <= sizeof(new_key));
/* TODO: support big-endian host */
memcpy(node_rw->data + index * tree->key_size, &new_key, key_size);
}
/**
* block_tree_node_get_child_data - Get pointer to child or data
* @tree: Tree object.
* @node_ro: Node.
* @index: Index of child or data entry to get.
*
* Return: pointer to child or data entry at index @index in @node_ro.
*/
static const void* block_tree_node_get_child_data(
const struct block_tree* tree,
const struct block_tree_node* node_ro,
unsigned int index) {
bool is_leaf = block_tree_node_is_leaf(node_ro);
const size_t max_key_count = tree->key_count[is_leaf];
const size_t child_data_size = tree->child_data_size[is_leaf];
const void* child_data;
assert(index < max_key_count + !is_leaf);
child_data = node_ro->data + tree->key_size * max_key_count +
child_data_size * index;
assert(child_data > (void*)node_ro->data);
assert(child_data < (void*)node_ro + tree->block_size);
return child_data;
}
/**
* block_tree_node_get_child_data_rw - Get pointer to child or data
* @tree: Tree object.
* @node_rw: Node.
* @index: Index of child or data entry to get.
*
* Return: pointer to child or data entry at index @index in @node_rw.
*/
static void* block_tree_node_get_child_data_rw(const struct block_tree* tree,
struct block_tree_node* node_rw,
int index) {
return (void*)block_tree_node_get_child_data(tree, node_rw, index);
}
/**
* block_tree_node_get_child - Get child from node or in-memory pending update
* @tree: Tree object.
* @node_block: Block number where @node_ro data is stored.
* @node_ro: Internal node.
* @index: Index of child to get.
*
* Return: child at @index, or 0 if there is no child at @index. If @index is
* equal to max child count, check for a matching entry in @tree->inserting.
*/
static const struct block_mac* block_tree_node_get_child(
const struct transaction* tr,
const struct block_tree* tree,
data_block_t node_block,
const struct block_tree_node* node_ro,
unsigned int index) {
const struct block_mac* child = NULL;
const size_t key_count = block_tree_node_max_key_count(tree, node_ro);
assert(!block_tree_node_is_leaf(node_ro));
if (index <= key_count) {
child = block_tree_node_get_child_data(tree, node_ro, index);
if (!block_mac_to_block(tr, child)) {
child = NULL;
}
}
if (!child && node_block == tree->inserting.block) {
assert(index > key_count);
if (index <= key_count + 1) {
child = &tree->inserting.child;
}
}
return child;
}
/**
* block_tree_node_get_data - Get data from node or in-memory pending update
* @tree: Tree object.
* @node_block: Block number where @node_ro data is stored.
* @node_ro: Leaf node.
* @index: Index of data to get.
*
* Return: data at @index, or 0 if there is no data at @index. If @index is
* equal to max key count, check for a matching entry in @tree->inserting.
*/
static struct block_mac block_tree_node_get_data(
const struct transaction* tr,
const struct block_tree* tree,
data_block_t node_block,
const struct block_tree_node* node_ro,
unsigned int index) {
struct block_mac block_mac_ret = BLOCK_MAC_INITIAL_VALUE(block_mac_ret);
const struct block_mac* datap = NULL;
const size_t max_key_count = block_tree_node_max_key_count(tree, node_ro);
assert(block_tree_node_is_leaf(node_ro));
if (index < max_key_count) {
datap = block_tree_node_get_child_data(tree, node_ro, index);
if (!block_mac_to_block(tr, datap)) {
datap = NULL;
}
}
if (!datap && node_block == tree->inserting.block) {
assert(index >= max_key_count);
if (index <= max_key_count) {
datap = &tree->inserting.data;
}
}
if (datap) {
block_mac_copy(tr, &block_mac_ret, datap);
}
return block_mac_ret;
}
#define block_tree_node_for_each_child(tr, tree, block, node_ro, child, i) \
for (i = 0; \
(child = block_tree_node_get_child(tr, tree, block, node_ro, i)); \
i++)
/**
* block_tree_node_print_internal - Print internal node
* @tr: Transaction object.
* @tree: Tree object.
* @node_block: Block number where @node_ro data is stored.
* @node_ro: Node.
*/
static void block_tree_node_print_internal(
const struct transaction* tr,
const struct block_tree* tree,
data_block_t node_block,
const struct block_tree_node* node_ro) {
unsigned int i;
const struct block_mac* child;
const size_t key_count = block_tree_node_max_key_count(tree, node_ro);
assert(!block_tree_node_is_leaf(node_ro));
for (i = 0; i <= key_count + 1; i++) {
child = block_tree_node_get_child(tr, tree, node_block, node_ro, i);
if (child) {
printf(" %" PRIu64 "", block_mac_to_block(tr, child));
} else if (i < key_count + 1) {
printf(" .");
}
if (block_tree_node_get_key(tree, node_block, node_ro, i)) {
if (i == key_count) {
printf(" inserting");
}
printf(" [%" PRIu64 "-]",
block_tree_node_get_key(tree, node_block, node_ro, i));
}
}
assert(!block_tree_node_get_child(tr, tree, node_block, node_ro, i));
printf("\n");
}
/**
* block_tree_node_print_leaf - Print leaf node
* @tr: Transaction object.
* @tree: Tree object.
* @node_block: Block number where @node_ro data is stored.
* @node_ro: Node.
*/
static void block_tree_node_print_leaf(const struct transaction* tr,
const struct block_tree* tree,
data_block_t node_block,
const struct block_tree_node* node_ro) {
unsigned int i;
data_block_t key;
struct block_mac data;
const size_t key_count = block_tree_node_max_key_count(tree, node_ro);
assert(block_tree_node_is_leaf(node_ro));
for (i = 0; i <= key_count; i++) {
key = block_tree_node_get_key(tree, node_block, node_ro, i);
data = block_tree_node_get_data(tr, tree, node_block, node_ro, i);
if (key || block_mac_valid(tr, &data)) {
if (i == key_count) {
printf(" inserting");
}
printf(" [%" PRIu64 ": %" PRIu64 "]", key,
block_mac_to_block(tr, &data));
} else if (i < key_count) {
printf(" .");
}
}
printf("\n");
}
/**
* block_tree_node_print - Print node
* @tr: Transaction object.
* @tree: Tree object.
* @node_block: Block number where @node_ro data is stored.
* @node_ro: Node.
*/
static void block_tree_node_print(const struct transaction* tr,
const struct block_tree* tree,
data_block_t node_block,
const struct block_tree_node* node_ro) {
printf(" %3" PRIu64 ":", node_block);
if (node_ro->is_leaf == true) {
block_tree_node_print_leaf(tr, tree, node_block, node_ro);
} else if (!node_ro->is_leaf) {
block_tree_node_print_internal(tr, tree, node_block, node_ro);
} else {
printf(" bad node header %" PRIx64 "\n", node_ro->is_leaf);
}
}
/**
* block_tree_print_sub_tree - Print tree or a branch of a tree
* @tr: Transaction object.
* @tree: Tree object.
* @block_mac: Root of tree or branch to print.
*/
static void block_tree_print_sub_tree(struct transaction* tr,
const struct block_tree* tree,
const struct block_mac* block_mac) {
int i;
const struct block_tree_node* node_ro;
struct obj_ref node_ref = OBJ_REF_INITIAL_VALUE(node_ref);
const struct block_mac* child;
if (!block_mac || !block_mac_valid(tr, block_mac)) {
printf("empty\n");
return;
}
node_ro = block_get(tr, block_mac, NULL, &node_ref);
if (!node_ro) {
printf(" %3" PRIu64 ": unreadable\n",
block_mac_to_block(tr, block_mac));
return;
}
block_tree_node_print(tr, tree, block_mac_to_block(tr, block_mac), node_ro);
if (!node_ro->is_leaf) {
block_tree_node_for_each_child(tr, tree,
block_mac_to_block(tr, block_mac),
node_ro, child, i) {
block_tree_print_sub_tree(tr, tree, child);
}
}
block_put(node_ro, &node_ref);
}
/**
* block_tree_print - Print tree
* @tr: Transaction object.
* @tree: Tree object.
*/
void block_tree_print(struct transaction* tr, const struct block_tree* tree) {
block_tree_print_sub_tree(tr, tree, &tree->root);
}
/**
* block_tree_node_check - Check node for errors
* @tr: Transaction object.
* @tree: Tree object.
* @node_ro: Node.
* @node_block: Block number where @node_ro data is stored.
* @min_key: Minimum allowed key value.
* @max_key: Maximum allowed key value.
*
* Return: %false is an error is detected, %true otherwise.
*/
static bool block_tree_node_check(const struct transaction* tr,
const struct block_tree* tree,
const struct block_tree_node* node_ro,
data_block_t node_block,
data_block_t min_key,
data_block_t max_key) {
unsigned int i;
data_block_t key;
data_block_t prev_key = 0;
int empty_count;
const void* child_data;
size_t key_count = block_tree_node_max_key_count(tree, node_ro);
bool is_leaf;
if (node_ro->is_leaf && node_ro->is_leaf != true) {
printf("%s: bad node header %" PRIx64 "\n", __func__, node_ro->is_leaf);
goto err;
}
is_leaf = block_tree_node_is_leaf(node_ro);
empty_count = 0;
for (i = 0; i < key_count; i++) {
key = block_tree_node_get_key(tree, node_block, node_ro, i);
if (!key) {
empty_count++;
}
if (empty_count) {
if (key) {
printf("%s: %" PRIu64
": expected zero key at %d, found %" PRIu64 "\n",
__func__, node_block, i, key);
goto err;
}
child_data =
block_tree_node_get_child_data(tree, node_ro, i + !is_leaf);
if (!is_zero(child_data, tree->child_data_size[is_leaf])) {
printf("%s: %" PRIu64
": expected zero data/right child value at %d\n",
__func__, node_block, i);
goto err;
}
continue;
}
if (key < prev_key || key < min_key || key > max_key) {
printf("%s: %" PRIu64 ": bad key at %d, %" PRIu64
" not in [%" PRIu64 "/%" PRIu64 "-%" PRIu64 "]\n",
__func__, node_block, i, key, min_key, prev_key, max_key);
if (tr->failed && key >= prev_key) {
printf("%s: transaction failed, ignore\n", __func__);
} else {
goto err;
}
}
prev_key = key;
}
return true;
err:
if (print_check_errors) {
block_tree_node_print(tr, tree, node_block, node_ro);
}
return false;
}
/**
* block_tree_check_sub_tree - Check tree or a branch of a tree for errros
* @tr: Transaction object.
* @tree: Tree object.
* @block_mac: Root of tree or branch to check.
* @is_root: %true if @block_mac refers to the root of the tree, %false
* otherwise.
* @min_key: Minimum allowed key value.
* @max_key: Maximum allowed key value.
* @updating: %true if @tree is currently updating and nodes below
* min-full should be allowed, %false otherwise.
*
* Return: Depth of tree/branch, -1 if an error was detected or -2 if @block_mac
* could not be read.
*
* TODO: Reduce overlap with and move more checks to block_tree_node_check.
*/
static int block_tree_check_sub_tree(struct transaction* tr,
const struct block_tree* tree,
const struct block_mac* block_mac,
bool is_root,
data_block_t min_key,
data_block_t max_key,
bool updating) {
const struct block_tree_node* node_ro;
unsigned int i;
int last_child = 0;
int empty_count;
int depth;
int sub_tree_depth;
data_block_t child_min_key;
data_block_t child_max_key;
data_block_t key;
int max_empty_count;
size_t key_count;
const void* child_data;
struct block_mac child_block_mac;
struct obj_ref ref = OBJ_REF_INITIAL_VALUE(ref);
bool is_leaf;
if (!block_mac || !block_mac_to_block(tr, block_mac))
return 0;
depth = 1;
if (block_mac_to_block(tr, block_mac) >= tr->fs->dev->block_count) {
printf("%s: %3" PRIu64 ": invalid\n", __func__,
block_mac_to_block(tr, block_mac));
return -1;
}
node_ro = block_get_no_tr_fail(tr, block_mac, NULL, &ref);
if (!node_ro) {
if (tr->failed) {
/*
* Failed transactions discards dirty cache entries so parts of the
* tree may now be unreadable.
*/
pr_warn("ignore unreadable block %" PRIu64 ", transaction failed\n",
block_mac_to_block(tr, block_mac));
return -2;
}
pr_warn("%3" PRIu64 ": unreadable\n",
block_mac_to_block(tr, block_mac));
return -2;
}
if (!block_tree_node_check(tr, tree, node_ro,
block_mac_to_block(tr, block_mac), min_key,
max_key)) {
goto err;
}
if (node_ro->is_leaf && node_ro->is_leaf != true) {
printf("%s: bad node header %" PRIx64 "\n", __func__, node_ro->is_leaf);
goto err;
}
is_leaf = block_tree_node_is_leaf(node_ro);
key_count = block_tree_node_max_key_count(tree, node_ro);
/* TODO: don't allow empty root */
max_empty_count =
is_root ? (key_count /*- 1*/) : ((key_count + 1) / 2 + updating);
child_min_key = min_key;
empty_count = 0;
for (i = 0; i < key_count; i++) {
key = block_tree_node_get_key(tree, block_mac_to_block(tr, block_mac),
node_ro, i);
if (!key) {
empty_count++;
}
if (empty_count) {
if (key) {
printf("%s: %" PRIu64
": expected zero key at %d, found %" PRIu64 "\n",
__func__, block_mac_to_block(tr, block_mac), i, key);
goto err;
}
child_data =
block_tree_node_get_child_data(tree, node_ro, i + !is_leaf);
if (!is_zero(child_data, tree->child_data_size[is_leaf])) {
printf("%s: %" PRIu64
": expected zero data/right child value at %d\n",
__func__, block_mac_to_block(tr, block_mac), i);
goto err;
}
continue;
}
if (i == 0 && min_key && is_leaf && key != min_key) {
printf("%s: %" PRIu64 ": bad key at %d, %" PRIu64
" not start of [%" PRIu64 "-%" PRIu64 "]\n",
__func__, block_mac_to_block(tr, block_mac), i, key, min_key,
max_key);
if (tr->failed) {
printf("%s: transaction failed, ignore\n", __func__);
} else if (!key) {
// warn only for now
printf("%s: ignore empty node error\n", __func__);
} else {
goto err;
}
}
min_key = key;
child_max_key = key;
if (!is_leaf) {
child_data = block_tree_node_get_child_data(tree, node_ro, i);
block_mac_copy(tr, &child_block_mac, child_data);
block_put(node_ro, &ref);
sub_tree_depth = block_tree_check_sub_tree(
tr, tree, &child_block_mac, false, child_min_key,
child_max_key, updating);
node_ro = block_get_no_tr_fail(tr, block_mac, NULL, &ref);
if (!node_ro) {
pr_warn("%3" PRIu64 ": unreadable\n",
block_mac_to_block(tr, block_mac));
return -2;
}
if (sub_tree_depth == -1) {
goto err;
}
if (sub_tree_depth == -2) {
pr_warn("%" PRIu64 ": unreadable subtree at %d\n",
block_mac_to_block(tr, block_mac), i);
if (depth == 1) {
depth = -2;
}
} else if (depth == 1 || depth == -2) {
depth = sub_tree_depth + 1;
} else if (sub_tree_depth != depth - 1) {
printf("%s: %" PRIu64 ": bad subtree depth at %d, %d != %d\n",
__func__, block_mac_to_block(tr, block_mac), i,
sub_tree_depth, depth - 1);
goto err;
}
}
child_min_key = key;
min_key = key;
last_child = i + 1;
}
child_max_key = max_key;
if (!is_leaf) {
child_data = block_tree_node_get_child_data(tree, node_ro, last_child);
block_mac_copy(tr, &child_block_mac, child_data);
block_put(node_ro, &ref);
sub_tree_depth = block_tree_check_sub_tree(tr, tree, &child_block_mac,
false, child_min_key,
child_max_key, updating);
node_ro = block_get_no_tr_fail(tr, block_mac, NULL, &ref);
if (!node_ro) {
pr_warn("%3" PRIu64 ": unreadable\n",
block_mac_to_block(tr, block_mac));
return -2;
}
if (sub_tree_depth == -1) {
goto err;
}
if (sub_tree_depth == -2) {
pr_warn("%" PRIu64 ": unreadable subtree at %d\n",
block_mac_to_block(tr, block_mac), last_child);
if (depth == 1) {
depth = -2;
}
} else if (depth == 1 || depth == -2) {
depth = sub_tree_depth + 1;
} else if (sub_tree_depth != depth - 1) {
printf("%s: %" PRIu64 ": bad subtree depth at %d, %d != %d\n",
__func__, block_mac_to_block(tr, block_mac), last_child,
sub_tree_depth, depth - 1);
goto err;
}
}
if (empty_count > max_empty_count) {
printf("%s: %" PRIu64 ": too many empty nodes, %d > %d\n", __func__,
block_mac_to_block(tr, block_mac), empty_count, max_empty_count);
if (tr->failed) {
printf("%s: ignore error, transaction failed\n", __func__);
} else {
goto err;
}
}
block_put(node_ro, &ref);
return depth;
err:
block_put(node_ro, &ref);
return -1;
}
/**
* block_tree_check - Check tree for errros
* @tr: Transaction object.
* @tree: Tree object.
*
* Return: %false if an error was detected, %true otherwise.
*/
bool block_tree_check(struct transaction* tr, const struct block_tree* tree) {
int ret;
assert(tr->fs->dev->block_size >= tree->block_size);
ret = block_tree_check_sub_tree(tr, tree, &tree->root, true, 0,
DATA_BLOCK_MAX, tree->updating);
if (ret < 0) {
if (ret == -2) {
pr_warn("block_tree not fully readable\n");
if (!tr->failed) {
transaction_fail(tr);
}
return true;
}
pr_err("%s: invalid block_tree:\n", __func__);
if (print_check_errors) {
block_tree_print(tr, tree);
}
return false;
}
return true;
}
/**
* block_tree_node_full - Check if node is full
* @tree: Tree object.
* @node_ro: Node.
*
* Return: %true is @node_ro is full, %false otherwise.
*/
static bool block_tree_node_full(const struct block_tree* tree,
const struct block_tree_node* node_ro) {
const size_t key_count = block_tree_node_max_key_count(tree, node_ro);
return !!block_tree_node_get_key(tree, DATA_BLOCK_INVALID, node_ro,
key_count - 1);
}
/**
* block_tree_min_key_count - Return the minimum number of keys for a node
* @tree: Tree object.
* @leaf: %true for leaf nodes, %false for internal nodes.
*
* B+ tree minimum full rules for non-root nodes.
*
* min-children = (max-children + 1) / 2 - 1
* max-keys = max-children - 1
* max-children = max-keys + 1
* min-keys = min-children - 1
* = (max-children + 1) / 2 - 1
* = (max-keys + 1 + 1) / 2 - 1
* = max-keys / 2
*
* Return: minimum number of keys required for non-root leaf or internal nodes.
*/
static unsigned int block_tree_min_key_count(const struct block_tree* tree,
bool leaf) {
return block_tree_max_key_count(tree, leaf) / 2;
}
/**
* block_tree_node_min_full_index - Return index of last key in "min-full" node
* @tree: Tree object.
* @node_ro: Node.
*
* Return: index.
*/
static int block_tree_node_min_full_index(
const struct block_tree* tree,
const struct block_tree_node* node_ro) {
return block_tree_min_key_count(tree, block_tree_node_is_leaf(node_ro)) - 1;
}
/**
* block_tree_above_min_full - Check if node has more than the minimum number of
* entries
* @tree: Tree object.
* @node_ro: Node.
*
* Return: %true if @node_ro has more than the minimim required entry count,
* %false otherwise.
*/
bool block_tree_above_min_full(const struct block_tree* tree,
const struct block_tree_node* node_ro) {
int min_full_index = block_tree_node_min_full_index(tree, node_ro);
return !!block_tree_node_get_key(tree, DATA_BLOCK_INVALID, node_ro,
min_full_index + 1);
}
/**
* block_tree_above_min_full - Check if node has less than the minimum number of
* entries
* @tree: Tree object.
* @node_ro: Node.
*
* Return: %true if @node_ro has less than the minimim required entry count,
* %false otherwise.
*/
bool block_tree_below_min_full(const struct block_tree* tree,
const struct block_tree_node* node_ro) {
int min_full_index = block_tree_node_min_full_index(tree, node_ro);
return !block_tree_node_get_key(tree, DATA_BLOCK_INVALID, node_ro,
min_full_index);
}
/**
* block_tree_node_need_copy - Check if node needs to be copied before write
* @tr: Transaction object.
* @tree: Tree object.
* @block_mac: Block number and mac of node to check.
*
* Return: %true if @tree is a copy_on_write tree and @block_mac needs to be
* copied before the node can be written, %false otherwise.
*/
static bool block_tree_node_need_copy(struct transaction* tr,
const struct block_tree* tree,
const struct block_mac* block_mac) {
return tree->copy_on_write &&
transaction_block_need_copy(tr, block_mac_to_block(tr, block_mac)) &&
!tr->failed;
}
/**
* block_tree_node_find_block - Helper function for block_tree_walk
* @tr: Transaction object.
* @tree: Tree object.
* @node_block: Block number where @node_ro data is stored.
* @node_ro: Node to search.
* @key: Key to search for.
* @key_is_max: If %true, and @key is not found in a leaf node, use largest
* entry with a key less than @key. If %false, and @key is not
* found in a leaf node, use smallest entry with a key greater
* than @key. Ignored for internal nodes or when @key is found.
* @child: Output arg for child at returned index.
* @prev_key: Output arg. Unmodified if returned index is 0, set to key at
* index - 1 otherwise.
* @next_key: Output arg. Unmodified if returned index does not contain
* an entry, set to key at index otherwise.
*
* Return: index of closest matching entry.
*/
static int block_tree_node_find_block(const struct transaction* tr,
const struct block_tree* tree,
data_block_t node_block,
const struct block_tree_node* node_ro,
data_block_t key,
bool key_is_max,
const struct block_mac** child,
data_block_t* prev_key,
data_block_t* next_key) {
int i;
data_block_t curr_key;
int keys_count;
bool is_leaf = block_tree_node_is_leaf(node_ro);
keys_count = block_tree_node_max_key_count(tree, node_ro);
/* TODO: better search? */
for (i = 0; i < keys_count + 1; i++) {
curr_key = block_tree_node_get_key(tree, node_block, node_ro, i);
if (!curr_key || (key <= (curr_key - !is_leaf))) {
break;
}
curr_key = 0;
}
if (i == keys_count && curr_key) {
assert(tree->inserting.block == node_block);
if (print_ops) {
printf("%s: using inserting block %" PRIu64 ", key %" PRIu64
", child %" PRIu64 ", data %" PRIu64 "\n",
__func__, tree->inserting.block, tree->inserting.key,
block_mac_to_block(tr, &tree->inserting.child),
block_mac_to_block(tr, &tree->inserting.data));
}
}
if (key_is_max && is_leaf && i > 0 && (!curr_key || curr_key > key)) {
i--;
curr_key = block_tree_node_get_key(tree, node_block, node_ro, i);
}
if (curr_key) {
*next_key = curr_key;
}
if (i > 0) {
/* TODO: move to loop */
*prev_key = block_tree_node_get_key(tree, node_block, node_ro, i - 1);
}
if (is_leaf) {
*child = NULL;
} else {
*child = block_tree_node_get_child(tr, tree, node_block, node_ro, i);
assert(*child);
assert(block_mac_valid(tr, *child));
}
if (print_lookup) {
printf("%s: block %" PRIu64 ", key %" PRIu64
" return %d, child_block %" PRIu64 ", prev_key %" PRIu64
", next_key %" PRIu64 "\n",
__func__, node_block, key, i,
*child ? block_mac_to_block(tr, *child) : 0, *prev_key,
*next_key);
block_tree_node_print(tr, tree, node_block, node_ro);
}
return i;
}
/**
* block_tree_walk - Walk tree
* @tr: Transaction object.
* @tree: Tree object.
* @key: Key to search for.
* @key_is_max: If %true, and @key is not found in a leaf node, use largest
* entry with a key less than @key. If %false, and @key is not
* found in a leaf node, use smallest entry with a key greater
* than @key. Ignored for internal nodes or when @key is found.
* @path: Tree path object to initalize.
*
* Walk tree to find path to @key or the insert point for @key.
*
* Note that if @key is not found, @path may point to an empty insert slot. A
* caller that is looking for the closest match to @key, must therefore call
* block_tree_path_next to advance @path to a valid entry if this case is hit.
* TODO: Add a helper function or argument to avoid this.
*/
void block_tree_walk(struct transaction* tr,
struct block_tree* tree,
data_block_t key,
bool key_is_max,
struct block_tree_path* path) {
const struct block_tree_node* node_ro;
const struct block_tree_node* parent_node_ro;
struct obj_ref ref[2] = {
OBJ_REF_INITIAL_VALUE(ref[0]),
OBJ_REF_INITIAL_VALUE(ref[1]),
};
int ref_index = 0;
const struct block_mac* child;
data_block_t prev_key = 0;
data_block_t next_key = 0;
int child_index;
const struct block_mac* block_mac;
if (print_lookup) {
printf("%s: tree root block %" PRIu64 ", key %" PRIu64
", key_is_max %d\n",
__func__, block_mac_to_block(tr, &tree->root), key, key_is_max);
}
assert(tr);
assert(tree);
assert(tree->block_size <= tr->fs->dev->block_size);
path->count = 0;
memset(&path->data, 0, sizeof(path->data));
path->tr = tr;
path->tree = tree;
path->tree_update_count = tree->update_count;
block_mac = &tree->root;
parent_node_ro = NULL;
while (block_mac && block_mac_valid(tr, block_mac)) {
assert(path->count < countof(path->entry));
node_ro = block_get(tr, block_mac, NULL, &ref[ref_index]);
if (!node_ro) {
assert(tr->failed);
pr_warn("transaction failed, abort\n");
path->count = 0;
goto err;
}
assert(node_ro);
path->entry[path->count].block_mac = *block_mac;
child_index = block_tree_node_find_block(
tr, path->tree, block_mac_to_block(tr, block_mac), node_ro, key,
key_is_max, &child, &prev_key, &next_key);
if (path->count) {
assert(!path->entry[path->count - 1].next_key || next_key);
assert(!path->entry[path->count - 1].next_key ||
next_key <= path->entry[path->count - 1].next_key);
assert(!path->entry[path->count - 1].prev_key || prev_key);
assert(!path->entry[path->count - 1].prev_key ||
prev_key >= path->entry[path->count - 1].prev_key);
}
path->entry[path->count].index = child_index;
path->entry[path->count].prev_key = prev_key;
path->entry[path->count].next_key = next_key;
if (!child) {
assert(block_tree_node_is_leaf(node_ro));
path->data = block_tree_node_get_data(
tr, tree, block_mac_to_block(tr, block_mac), node_ro,
child_index);
assert(!key_is_max || block_mac_valid(path->tr, &path->data) ||
path->count == 0);
}
block_mac = child;
if (parent_node_ro) {
block_put(parent_node_ro, &ref[!ref_index]);
}
parent_node_ro = node_ro;
ref_index = !ref_index;
if (print_lookup) {
printf("%s: path[%d] = %" PRIu64 ", index %d, prev_key %" PRIu64
", next_key %" PRIu64 "\n",
__func__, path->count,
block_mac_to_block(path->tr,
&path->entry[path->count].block_mac),
path->entry[path->count].index,
path->entry[path->count].prev_key,
path->entry[path->count].next_key);
if (!block_mac) {
printf("%s: path.data.block = %" PRIu64 "\n", __func__,
block_mac_to_block(path->tr, &path->data));
}
}
path->count++;
}
err:
if (parent_node_ro) {
block_put(parent_node_ro, &ref[!ref_index]);
}
}
/**
* block_tree_path_next - Walk tree to get to next entry
* @path: Tree path object.
*/
void block_tree_path_next(struct block_tree_path* path) {
const struct block_tree_node* node_ro;
const struct block_tree_node* parent_node_ro;
struct obj_ref ref[2] = {
OBJ_REF_INITIAL_VALUE(ref[0]),
OBJ_REF_INITIAL_VALUE(ref[1]),
};
bool ref_index = 0;
unsigned int index;
unsigned int depth;
const struct block_mac* block_mac;
data_block_t prev_key;
data_block_t next_key;
struct block_mac next_data;
const struct block_mac* next_child;
data_block_t parent_next_key;
assert(path->tree_update_count == path->tree->update_count);
assert(path->count);
depth = path->count - 1;
assert(path->entry[depth].next_key);
if (print_lookup) {
printf("%s: tree root block %" PRIu64 "\n", __func__,
block_mac_to_block(path->tr, &path->tree->root));
}
block_mac = &path->entry[depth].block_mac;
index = path->entry[depth].index;
parent_next_key = depth > 0 ? path->entry[depth - 1].next_key : 0;
node_ro = block_get(path->tr, block_mac, NULL, &ref[ref_index]);
if (!node_ro) {
assert(path->tr->failed);
pr_warn("transaction failed, abort\n");
goto err_no_refs;
}
assert(node_ro);
assert(block_tree_node_is_leaf(node_ro));
prev_key = block_tree_node_get_key(path->tree,
block_mac_to_block(path->tr, block_mac),
node_ro, index);
index++;
next_key = block_tree_node_get_key(path->tree,
block_mac_to_block(path->tr, block_mac),
node_ro, index);
next_data = block_tree_node_get_data(
path->tr, path->tree, block_mac_to_block(path->tr, block_mac),
node_ro, index);
block_put(node_ro, &ref[ref_index]);
assert(path->entry[depth].next_key == prev_key || !prev_key);
if (next_key || !parent_next_key) {
assert(!next_key || next_key >= prev_key);
path->entry[depth].index = index;
path->entry[depth].prev_key = prev_key;
path->entry[depth].next_key = next_key;
path->data = next_data;
if (print_lookup) {
printf("%s: path[%d] = %" PRIu64 ", index %d, prev_key %" PRIu64
", next_key %" PRIu64 "\n",
__func__, depth,
block_mac_to_block(path->tr, &path->entry[depth].block_mac),
path->entry[depth].index, path->entry[depth].prev_key,
path->entry[depth].next_key);
printf("%s: path.data.block = %" PRIu64 "\n", __func__,
block_mac_to_block(path->tr, &path->data));
}
return;
}
assert(depth > 0);
while (depth > 0) {
depth--;
if (!path->entry[depth].next_key) {
continue;
}
block_mac = &path->entry[depth].block_mac;
index = path->entry[depth].index;
node_ro = block_get(path->tr, block_mac, NULL, &ref[ref_index]);
if (!node_ro) {
assert(path->tr->failed);
pr_warn("transaction failed, abort\n");
goto err_no_refs;
}
assert(node_ro);
assert(!block_tree_node_is_leaf(node_ro));
parent_next_key = depth > 0 ? path->entry[depth - 1].next_key : 0;
prev_key = block_tree_node_get_key(
path->tree, block_mac_to_block(path->tr, block_mac), node_ro,
index);
index++;
next_key = block_tree_node_get_key(
path->tree, block_mac_to_block(path->tr, block_mac), node_ro,
index);
next_child = block_tree_node_get_child(
path->tr, path->tree, block_mac_to_block(path->tr, block_mac),
node_ro, index);
if (next_child) {
parent_node_ro = node_ro;
ref_index = !ref_index;
assert(prev_key && prev_key == path->entry[depth].next_key);
path->entry[depth].index = index;
path->entry[depth].prev_key = prev_key;
path->entry[depth].next_key = next_key ?: parent_next_key;
if (print_lookup) {
printf("%s: path[%d] = %" PRIu64 ", index %d, prev_key %" PRIu64
", next_key %" PRIu64 "\n",
__func__, depth,
block_mac_to_block(path->tr,
&path->entry[depth].block_mac),
path->entry[depth].index, path->entry[depth].prev_key,
path->entry[depth].next_key);
}
break;
}
block_put(node_ro, &ref[ref_index]);
}
assert(next_child);
while (++depth < path->count - 1) {
node_ro = block_get(path->tr, next_child, NULL, &ref[ref_index]);
if (!node_ro) {
assert(path->tr->failed);
pr_warn("transaction failed, abort\n");
goto err_has_parent_ref;
}
assert(node_ro);
assert(!block_tree_node_is_leaf(node_ro));
path->entry[depth].block_mac = *next_child;
block_put(parent_node_ro, &ref[!ref_index]);
path->entry[depth].index = 0;
path->entry[depth].prev_key = prev_key;
path->entry[depth].next_key = block_tree_node_get_key(
path->tree, DATA_BLOCK_INVALID, node_ro, 0);
if (print_lookup) {
printf("%s: path[%d] = %" PRIu64 ", index %d, prev_key %" PRIu64
", next_key %" PRIu64 "\n",
__func__, depth,
block_mac_to_block(path->tr, &path->entry[depth].block_mac),
path->entry[depth].index, path->entry[depth].prev_key,
path->entry[depth].next_key);
}
next_child = block_tree_node_get_child(path->tr, path->tree,
DATA_BLOCK_INVALID, node_ro, 0);
parent_node_ro = node_ro;
ref_index = !ref_index;
assert(path->entry[depth].next_key);
assert(next_child);
}
assert(next_child);
node_ro = block_get(path->tr, next_child, NULL, &ref[ref_index]);
if (!node_ro) {
assert(path->tr->failed);
pr_warn("transaction failed, abort\n");
goto err_has_parent_ref;
}
assert(node_ro);
assert(block_tree_node_is_leaf(node_ro));
path->entry[depth].block_mac = *next_child;
block_put(parent_node_ro, &ref[!ref_index]);
path->entry[depth].index = 0;
path->entry[depth].prev_key = prev_key;
path->entry[depth].next_key =
block_tree_node_get_key(path->tree, DATA_BLOCK_INVALID, node_ro, 0);
path->data = block_tree_node_get_data(path->tr, path->tree,
DATA_BLOCK_INVALID, node_ro, 0);
block_put(node_ro, &ref[ref_index]);
assert(path->entry[depth].next_key);
if (print_lookup) {
printf("%s: path[%d] = %" PRIu64 ", index %d, prev_key %" PRIu64
", next_key %" PRIu64 "\n",
__func__, depth,
block_mac_to_block(path->tr, &path->entry[depth].block_mac),
path->entry[depth].index, path->entry[depth].prev_key,
path->entry[depth].next_key);
printf("%s: path.data.block = %" PRIu64 "\n", __func__,
block_mac_to_block(path->tr, &path->data));
}
return;
err_has_parent_ref:
block_put(parent_node_ro, &ref[!ref_index]);
err_no_refs:
assert(path->tr->failed);
path->count = 0;
}
/**
* block_tree_block_dirty - Get writable node
* @tr: Transaction object.
* @path: Tree path object.
* @path_index: Path depth that @node_ro was found at.
* @node_ro: Source node
*
* Copy @node_ro and update @path if needed.
*
* Return: writable pointer to @node_ro.
*/
static struct block_tree_node* block_tree_block_dirty(
struct transaction* tr,
struct block_tree_path* path,
int path_index,
const struct block_tree_node* node_ro) {
data_block_t new_block;
struct block_mac* block_mac;
block_mac = &path->entry[path_index].block_mac;
assert(path_index ||
block_mac_same_block(tr, block_mac, &path->tree->root));
if (!block_tree_node_need_copy(tr, path->tree, block_mac)) {
if (tr->failed) {
return NULL;
}
return block_dirty(tr, node_ro, !path->tree->allow_copy_on_write);
}
assert(path->tree->allow_copy_on_write);
new_block = block_allocate_etc(tr, !path->tree->allow_copy_on_write);
if (print_internal_changes) {
printf("%s: copy block %" PRIu64 " to %" PRIu64 "\n", __func__,
block_mac_to_block(tr, block_mac), new_block);
}
if (!new_block) {
return NULL;
}
assert(new_block != block_mac_to_block(tr, block_mac));
assert(!tr->failed);
block_free(tr, block_mac_to_block(tr, block_mac));
if (tr->failed) {
pr_warn("transaction failed, abort\n");
return NULL;
}
block_mac_set_block(tr, block_mac, new_block);
if (!path_index) {
block_mac_set_block(tr, &path->tree->root, new_block);
path->tree->root_block_changed = true;
}
return block_move(tr, node_ro, new_block, !path->tree->allow_copy_on_write);
}
/**
* block_tree_block_get_write - Get writable node fro path
* @tr: Transaction object.
* @path: Tree path object.
* @path_index: Path depth to get node from.
* @ref: Pointer to store reference in.
*
* Get node from @path and @path_index, copying it and updating @path if
* needed.
*
* Return: writable pointer to node.
*/
static struct block_tree_node* block_tree_block_get_write(
struct transaction* tr,
struct block_tree_path* path,
int path_index,
struct obj_ref* ref) {
const struct block_tree_node* node_ro;
struct block_tree_node* node_rw;
node_ro = block_get(tr, &path->entry[path_index].block_mac, NULL, ref);
if (!node_ro) {
return NULL;
}
node_rw = block_tree_block_dirty(tr, path, path_index, node_ro);
if (!node_rw) {
block_put(node_ro, ref);
}
return node_rw;
}
/**
* block_tree_path_put_dirty - Release dirty node and update tree
* @tr: Transaction object.
* @path: Tree path object.
* @path_index: Path depth to get node from.
* @data: Block data pointed to by leaf node when @path_index is
* @path->count. Tree node otherwise.
* @data_ref: Reference to @data that will be released.
*
* Release dirty external block or tree node and update tree. Update mac values
* and copy blocks as needed until root of tree is reached.
*/
void block_tree_path_put_dirty(struct transaction* tr,
struct block_tree_path* path,
int path_index,
void* data,
struct obj_ref* data_ref) {
unsigned int index;
struct block_mac* block_mac;
struct block_tree_node* parent_node_rw;
bool parent_is_leaf;
struct obj_ref parent_node_ref = OBJ_REF_INITIAL_VALUE(parent_node_ref);
struct obj_ref tmp_ref = OBJ_REF_INITIAL_VALUE(tmp_ref);
if (path_index == (int)path->count) {
assert(path_index < (int)countof(path->entry));
path->entry[path_index].block_mac = path->data; // copy data
}
while (--path_index >= 0) {
parent_node_rw = block_tree_block_get_write(tr, path, path_index,
&parent_node_ref);
if (tr->failed) {
assert(!parent_node_rw);
block_put_dirty_discard(data, data_ref);
pr_warn("transaction failed, abort\n");
return;
}
assert(parent_node_rw);
parent_is_leaf = block_tree_node_is_leaf(parent_node_rw);
index = path->entry[path_index].index;
assert(path_index == (int)path->count - 1 || !parent_is_leaf);
assert(path->tree->child_data_size[parent_is_leaf] <=
sizeof(*block_mac));
assert(path->tree->child_data_size[parent_is_leaf] >=
tr->fs->block_num_size + tr->fs->mac_size);
block_mac = block_tree_node_get_child_data_rw(path->tree,
parent_node_rw, index);
/* check that block was copied if needed */
assert(!block_tree_node_need_copy(tr, path->tree, block_mac) ||
!block_mac_same_block(tr, block_mac,
&path->entry[path_index + 1].block_mac));
/* check that block was not copied when not needed */
assert(block_tree_node_need_copy(tr, path->tree, block_mac) ||
block_mac_same_block(tr, block_mac,
&path->entry[path_index + 1].block_mac) ||
tr->failed);
if (!block_mac_same_block(tr, block_mac,
&path->entry[path_index + 1].block_mac)) {
if (print_internal_changes) {
printf("%s: update copied block %" PRIu64 " to %" PRIu64 "\n",
__func__, block_mac_to_block(tr, block_mac),
block_mac_to_block(
tr, &path->entry[path_index + 1].block_mac));
}
block_mac_set_block(
tr, block_mac,
block_mac_to_block(tr,
&path->entry[path_index + 1].block_mac));
}
if (tr->failed) {
block_put_dirty_discard(data, data_ref);
block_put_dirty_discard(parent_node_rw, &parent_node_ref);
pr_warn("transaction failed, abort\n");
return;
}
assert(block_mac_eq(tr, block_mac,
&path->entry[path_index + 1].block_mac));
block_put_dirty(tr, data, data_ref, block_mac, parent_node_rw);
assert(!block_mac_eq(tr, block_mac,
&path->entry[path_index + 1].block_mac) ||
tr->fs->mac_size < 16);
block_mac_copy_mac(tr, &path->entry[path_index + 1].block_mac,
block_mac);
assert(block_mac_eq(tr, block_mac,
&path->entry[path_index + 1].block_mac));
data = parent_node_rw;
data_ref = &tmp_ref;
obj_ref_transfer(data_ref, &parent_node_ref);
}
block_mac = &path->tree->root;
assert(block_mac_same_block(tr, block_mac, &path->entry[0].block_mac));
assert(block_mac_eq(tr, block_mac, &path->entry[0].block_mac));
block_put_dirty(tr, data, data_ref, block_mac, NULL);
assert(!block_mac_eq(tr, block_mac, &path->entry[0].block_mac) ||
tr->fs->mac_size < 16);
block_mac_copy_mac(tr, &path->entry[0].block_mac, block_mac);
assert(block_mac_eq(tr, block_mac, &path->entry[0].block_mac));
}
/**
* block_tree_update_key - Update key in internal parent node
* @tr: Transaction object.
* @path: Tree path object.
* @path_index: Path depth start search at.
* @new_key: New key value.
*
* Search path until a left parent key is found (non-zero index) then set it
* to @new_key.
*
* TODO: merge with mac update?
*/
void block_tree_update_key(struct transaction* tr,
struct block_tree_path* path,
int path_index,
data_block_t new_key) {
const struct block_mac* block_mac;
unsigned int index;
struct block_tree_node* node_rw;
struct obj_ref node_ref = OBJ_REF_INITIAL_VALUE(node_ref);
assert(new_key);
while (path_index >= 0) {
assert(path_index < (int)countof(path->entry));
block_mac = &path->entry[path_index].block_mac;
index = path->entry[path_index].index;
if (index == 0) {
path_index--;
continue;
}
node_rw = block_tree_block_get_write(tr, path, path_index, &node_ref);
if (tr->failed) {
pr_warn("transaction failed, abort\n");
return;
}
assert(node_rw);
if (print_internal_changes) {
printf("%s: block %" PRIu64 ", index %d, [%" PRIu64
"-] -> [%" PRIu64 "-]\n",
__func__, block_mac_to_block(tr, block_mac), index,
block_tree_node_get_key(path->tree, DATA_BLOCK_INVALID,
node_rw, index - 1),
new_key);
block_tree_node_print(tr, path->tree,
block_mac_to_block(tr, block_mac), node_rw);
}
assert(!block_tree_node_is_leaf(node_rw));
assert(index == 1 || new_key >= block_tree_node_get_key(
path->tree, DATA_BLOCK_INVALID,
node_rw, index - 2));
assert(index == block_tree_node_max_key_count(path->tree, node_rw) ||
new_key <= block_tree_node_get_key(path->tree,
DATA_BLOCK_INVALID, node_rw,
index) ||
!block_tree_node_get_key(path->tree, DATA_BLOCK_INVALID, node_rw,
index));
assert(path->entry[path_index].prev_key ==
block_tree_node_get_key(path->tree, DATA_BLOCK_INVALID, node_rw,
index - 1));
block_tree_node_set_key(path->tree, node_rw, index - 1, new_key);
assert(block_tree_node_check(tr, path->tree, node_rw,
block_mac_to_block(tr, block_mac), 0,
DATA_BLOCK_MAX));
path->entry[path_index].prev_key = new_key;
if (print_internal_changes) {
block_tree_node_print(tr, path->tree,
block_mac_to_block(tr, block_mac), node_rw);
}
block_tree_path_put_dirty(tr, path, path_index, node_rw, &node_ref);
return;
}
if (print_internal_changes) {
printf("%s: root reached, no update needed (new key %" PRIu64 ")\n",
__func__, new_key);
}
}
/**
* block_tree_node_get_key_count - Get number of keys currently stored in node
* @tree: Tree object.
* @node_ro: Node.
*/
static int block_tree_node_get_key_count(
const struct block_tree* tree,
const struct block_tree_node* node_ro) {
unsigned int i;
unsigned int max_key_count = block_tree_node_max_key_count(tree, node_ro);
for (i = 0; i < max_key_count; i++) {
if (!block_tree_node_get_key(tree, DATA_BLOCK_INVALID, node_ro, i)) {
break;
}
}
return i;
}
/**
* block_tree_node_leaf_remove_entry - Remove entry from a leaf node
* @tr: Transaction object.
* @tree: Tree object.
* @node_block_mac: Block number and mac for @node_rw.
* @node_rw: Node.
* @index: Index of entry to remove.
*/
static void block_tree_node_leaf_remove_entry(
struct transaction* tr,
const struct block_tree* tree,
const struct block_mac* node_block_mac,
struct block_tree_node* node_rw,
unsigned int index) {
unsigned int max_key_count = block_tree_node_max_key_count(tree, node_rw);
if (print_internal_changes) {
printf("%s: index %d:", __func__, index);
block_tree_node_print(tr, tree, block_mac_to_block(tr, node_block_mac),
node_rw);
}
assert(index < max_key_count);
assert(block_tree_node_is_leaf(node_rw));
block_tree_node_shift_down(tree, node_rw, index, index + 1,
SHIFT_LEAF_OR_LEFT_CHILD);
if (print_internal_changes) {
printf("%s: done: ", __func__);
block_tree_node_print(tr, tree, block_mac_to_block(tr, node_block_mac),
node_rw);
}
}
/**
* block_tree_node_split - Split a full node and add entry.
* @tr: Transaction object.
* @path: Tree path object.
* @append_key: Key to add.
* @append_child: Child to add if @path points to an internal node.
* @append_data: Data to add if @path points to a leaf node.
*
* Allocate a new node and move half of the entries to it. If the old node was
* the root node, allocate a new root node. Add new node to parent.
*/
static void block_tree_node_split(struct transaction* tr,
struct block_tree_path* path,
data_block_t append_key,
const struct block_mac* append_child,
const struct block_mac* append_data) {
struct block_tree_node* parent_node_rw;
struct obj_ref parent_node_ref = OBJ_REF_INITIAL_VALUE(parent_node_ref);
struct block_tree_node* node_left_rw;
struct obj_ref node_left_ref = OBJ_REF_INITIAL_VALUE(node_left_ref);
struct block_tree_node* node_right_rw;
struct obj_ref node_right_ref = OBJ_REF_INITIAL_VALUE(node_right_ref);
bool is_leaf;
struct block_mac* left_block_mac;
struct block_mac* right_block_mac;
data_block_t left_block_num;
struct block_mac right;
const struct block_mac* block_mac;
const struct block_mac* parent_block_mac;
unsigned int parent_index;
int split_index;
data_block_t parent_key;
data_block_t overflow_key = 0;
struct block_mac overflow_child;
size_t max_key_count;
assert(path->count > 0);
assert(path->tree);
block_mac = &path->entry[path->count - 1].block_mac;
path->count--;
if (print_internal_changes) {
printf("%s: tree root %" PRIu64 ", block %" PRIu64 "\n", __func__,
block_mac_to_block(tr, &path->tree->root),
block_mac_to_block(tr, block_mac));
}
assert(append_key);
/* we should only insert one node at a time */
assert(!path->tree->inserting.block);
path->tree->inserting.block = block_mac_to_block(tr, block_mac);
path->tree->inserting.key = append_key;
if (append_data) {
assert(!append_child);
path->tree->inserting.data = *append_data;
}
if (append_child) {
assert(!append_data);
path->tree->inserting.child = *append_child;
}
assert(!path->tree->copy_on_write || path->tree->allow_copy_on_write);
block_mac_set_block(
tr, &right,
block_allocate_etc(tr, !path->tree->allow_copy_on_write));
if (!path->count) {
/* use old block at the new root */
left_block_num =
block_allocate_etc(tr, !path->tree->allow_copy_on_write);
} else {
left_block_num = block_mac_to_block(tr, block_mac);
}
if (tr->failed) {
pr_warn("transaction failed, abort\n");
return;
}
assert(block_mac);
assert(block_mac_valid(tr, block_mac));
assert(path->tree->inserting.block == block_mac_to_block(tr, block_mac));
assert(path->tree->inserting.key == append_key);
assert(block_mac_to_block(tr, &path->tree->inserting.data) ==
(append_data ? block_mac_to_block(tr, append_data) : 0));
assert(block_mac_to_block(tr, &path->tree->inserting.child) ==
(append_child ? block_mac_to_block(tr, append_child) : 0));
memset(&path->tree->inserting, 0, sizeof(path->tree->inserting));
if (print_internal_changes) {
printf("%s: %" PRIu64 " -> %" PRIu64 " %" PRIu64 "\n", __func__,
block_mac_to_block(tr, block_mac), left_block_num,
block_mac_to_block(tr, &right));
}
node_left_rw =
block_tree_block_get_write(tr, path, path->count, &node_left_ref);
if (!node_left_rw) {
assert(tr->failed);
pr_warn("transaction failed, abort\n");
return;
}
assert(node_left_rw);
is_leaf = block_tree_node_is_leaf(node_left_rw);
if (!path->count) {
assert(left_block_num != block_mac_to_block(tr, block_mac));
parent_node_rw = node_left_rw;
obj_ref_transfer(&parent_node_ref, &node_left_ref);
/* TODO: use allocated node for root instead since we need to update the
* mac anyway */
node_left_rw = block_get_copy(tr, node_left_rw, left_block_num,
!path->tree->allow_copy_on_write,
&node_left_ref);
assert(node_left_rw);
if (tr->failed) {
pr_warn("transaction failed, abort\n");
goto err_copy_parent;
}
// parent_node_rw = block_get_cleared(path->tree->root.block);
/* TODO: use block_get_cleared */
memset(parent_node_rw, 0, path->tree->block_size);
parent_index = 0;
left_block_mac = block_tree_node_get_child_data_rw(
path->tree, parent_node_rw, parent_index);
block_mac_set_block(tr, left_block_mac, left_block_num);
parent_block_mac = block_mac;
} else {
assert(left_block_num == block_mac_to_block(tr, block_mac));
parent_block_mac = &path->entry[path->count - 1].block_mac;
parent_index = path->entry[path->count - 1].index;
parent_node_rw = block_tree_block_get_write(tr, path, path->count - 1,
&parent_node_ref);
if (tr->failed) {
pr_warn("transaction failed, abort\n");
goto err_get_parent;
}
assert(parent_node_rw);
assert(!block_tree_node_is_leaf(parent_node_rw));
left_block_mac = block_tree_node_get_child_data_rw(
path->tree, parent_node_rw, parent_index);
}
assert(block_mac_to_block(tr, left_block_mac) == left_block_num);
assert(!tr->failed);
node_right_rw =
block_get_copy(tr, node_left_rw, block_mac_to_block(tr, &right),
!path->tree->allow_copy_on_write, &node_right_ref);
if (tr->failed) {
pr_warn("transaction failed, abort\n");
goto err_copy_right;
}
assert(block_tree_node_is_leaf(node_left_rw) == is_leaf);
assert(block_tree_node_is_leaf(node_right_rw) == is_leaf);
assert(is_leaf || (append_key && append_child));
assert(!is_leaf || (append_key && append_data));
assert(block_tree_node_full(path->tree, node_left_rw));
assert(!tr->failed);
max_key_count = block_tree_node_max_key_count(path->tree, node_left_rw);
split_index = (max_key_count + 1) / 2;
if (print_internal_changes) {
printf("%s: insert split_index %d", __func__, split_index);
block_tree_node_print(tr, path->tree, left_block_num, node_left_rw);
if (is_leaf) {
printf("%s: append key, data: [%" PRIu64 ": %" PRIu64 "]\n",
__func__, append_key, block_mac_to_block(tr, append_data));
} else {
printf("%s: append key, child: [%" PRIu64 "-] %" PRIu64 "\n",
__func__, append_key, block_mac_to_block(tr, append_child));
}
}
/* Update left node */
block_tree_node_clear_end(path->tree, node_left_rw, split_index);
if (print_internal_changes) {
printf("%s: left %3" PRIu64 "", __func__, left_block_num);
block_tree_node_print(tr, path->tree, left_block_num, node_left_rw);
}
/*
* Update right node. For internal nodes the key at split_index moves to
* the parent, for leaf nodes it gets copied
*/
parent_key = block_tree_node_get_key(path->tree, DATA_BLOCK_INVALID,
node_right_rw, split_index);
block_tree_node_shift_down(path->tree, node_right_rw, 0,
split_index + !is_leaf,
SHIFT_LEAF_OR_LEFT_CHILD);
assert(max_key_count ==
block_tree_node_max_key_count(path->tree, node_right_rw));
block_tree_node_insert(
path->tree, node_right_rw, max_key_count - split_index - !is_leaf,
is_leaf ? SHIFT_LEAF_OR_LEFT_CHILD : SHIFT_RIGHT_CHILD, &append_key,
is_leaf ? append_data : append_child, NULL, NULL);
if (print_internal_changes) {
printf("%s: right %3" PRIu64 "", __func__,
block_mac_to_block(tr, &right));
block_tree_node_print(tr, path->tree, block_mac_to_block(tr, &right),
node_right_rw);
}
/* Insert right node in parent node */
assert(!block_tree_node_is_leaf(parent_node_rw));
if (print_internal_changes) {
printf("%s: insert [%" PRIu64 "-] %" PRIu64 " @%d:", __func__,
parent_key, block_mac_to_block(tr, &right), parent_index);
block_tree_node_print(tr, path->tree,
block_mac_to_block(tr, parent_block_mac),
parent_node_rw);
}
assert(parent_index ==
block_tree_node_max_key_count(path->tree, parent_node_rw) ||
!block_tree_node_get_key(path->tree, DATA_BLOCK_INVALID,
parent_node_rw, parent_index) ||
parent_key < block_tree_node_get_key(path->tree, DATA_BLOCK_INVALID,
parent_node_rw, parent_index));
assert(parent_index == 0 ||
block_tree_node_get_key(path->tree, DATA_BLOCK_INVALID,
parent_node_rw,
parent_index - 1) <= parent_key);
block_tree_node_insert(path->tree, parent_node_rw, parent_index,
SHIFT_RIGHT_CHILD, &parent_key, &right,
&overflow_key, &overflow_child);
assert(!overflow_key == !block_mac_valid(tr, &overflow_child));
/*
* If overflow_key is set it is not safe to re-enter the tree at this point
* as overflow_child is not reachable. It becomes reachable again when
* saved in tree->inserting.child at the top of this function.
*/
if (parent_index <
block_tree_node_max_key_count(path->tree, parent_node_rw)) {
right_block_mac = block_tree_node_get_child_data_rw(
path->tree, parent_node_rw, parent_index + 1);
} else {
assert(block_mac_valid(tr, &overflow_child));
right_block_mac = &overflow_child;
}
if (print_internal_changes) {
printf("%s: insert [%" PRIu64 "-] %" PRIu64 " @%d done:", __func__,
parent_key, block_mac_to_block(tr, &right), parent_index);
block_tree_node_print(tr, path->tree,
block_mac_to_block(tr, parent_block_mac),
parent_node_rw);
}
if (!path->count && print_internal_changes) {
printf("%s root", __func__);
block_tree_node_print(tr, path->tree, block_mac_to_block(tr, block_mac),
parent_node_rw);
}
block_put_dirty(tr, node_left_rw, &node_left_ref, left_block_mac,
parent_node_rw);
block_put_dirty(tr, node_right_rw, &node_right_ref, right_block_mac,
parent_node_rw);
/* block_tree_path_put_dirty will handle negative depths */
block_tree_path_put_dirty(tr, path, (int)path->count - 1, parent_node_rw,
&parent_node_ref);
if (overflow_key) {
assert(path->count); /* new root node should not overflow */
if (print_internal_changes) {
printf("%s: overflow [%" PRIu64 "-] %" PRIu64 "\n", __func__,
overflow_key, block_mac_to_block(tr, &overflow_child));
}
/* TODO: use a loop instead */
block_tree_node_split(tr, path, overflow_key, &overflow_child, 0);
}
return;
err_copy_right:
block_put_dirty_discard(node_right_rw, &node_right_ref);
err_copy_parent:
block_put_dirty_discard(parent_node_rw, &parent_node_ref);
err_get_parent:
block_put_dirty_discard(node_left_rw, &node_left_ref);
}
/**
* block_tree_sibling_index - Get the index of a sibling node
* @index: Index of current node.
*
* Use left sibling when possible, otherwise use right sibling. Other siblings
* could be used, but this simple rule avoids selecting an empty sibling if
* a non-empty sibling exists.
*/
static int block_tree_sibling_index(int index) {
return !index ? 1 : index - 1;
}
/**
* block_tree_get_sibling_block - Get block and mac for sibling block
* @tr: Transaction object.
* @path: Tree path object.
*
* Return: block_mac of sibling block selected by block_tree_sibling_index
*/
static struct block_mac block_tree_get_sibling_block(
struct transaction* tr,
struct block_tree_path* path) {
struct block_mac block_mac = BLOCK_MAC_INITIAL_VALUE(block_mac);
const struct block_mac* parent;
const struct block_mac* block_mac_ptr;
int parent_index;
const struct block_tree_node* node_ro;
struct obj_ref node_ref = OBJ_REF_INITIAL_VALUE(node_ref);
assert(path->tree);
assert(path->count > 1);
parent = &path->entry[path->count - 2].block_mac;
parent_index = path->entry[path->count - 2].index;
node_ro = block_get(tr, parent, NULL, &node_ref);
if (!node_ro) {
assert(tr->failed);
pr_warn("transaction failed, abort\n");
goto err;
}
parent_index = block_tree_sibling_index(parent_index);
block_mac_ptr =
block_tree_node_get_child_data(path->tree, node_ro, parent_index);
block_mac_copy(tr, &block_mac, block_mac_ptr);
assert(block_mac_valid(tr, &block_mac));
block_put(node_ro, &node_ref);
err:
return block_mac;
}
/**
* block_tree_path_set_sibling_block - Set path to point to sibling block_mac
* @path: Tree path object.
* @block_mac: Block-mac of sibling block to swap with path entry
* @left: %true if @block_mac is the left sibling on entry (right on
* return) %false otherwise.
*
* Update @path to point to the left (if @left is %true) or right sibling node
* @block_mac, and return the block_mac of the old node that @path pointed to in
* @block_mac.
*
* This is used to toggle the path between two sibling nodes without re-reading
* the parent node. It does not preserve the prev_key or next_key value that is
* not shared beween the siblings.
*/
static void block_tree_path_set_sibling_block(struct block_tree_path* path,
struct block_mac* block_mac,
bool left) {
int parent_index;
struct block_mac tmp;
assert(path->count > 1);
parent_index = path->entry[path->count - 2].index;
// assert(left || parent_index <= 1); // assert only valid on initial call
assert(!left || parent_index > 0);
if (left) {
parent_index--;
} else {
parent_index++;
}
if (print_internal_changes) {
printf("%s: path[%d].index: %d -> %d\n", __func__, path->count - 2,
path->entry[path->count - 2].index, parent_index);
printf("%s: path[%d].block_mac.block: %" PRIu64 " -> %" PRIu64 "\n",
__func__, path->count - 1,
block_mac_to_block(path->tr,
&path->entry[path->count - 1].block_mac),
block_mac_to_block(path->tr, block_mac));
}
path->entry[path->count - 2].index = parent_index;
if (left) {
path->entry[path->count - 2].next_key =
path->entry[path->count - 2].prev_key;
path->entry[path->count - 2].prev_key = 0; // unknown
} else {
path->entry[path->count - 2].prev_key =
path->entry[path->count - 2].next_key;
path->entry[path->count - 2].next_key = 0; // unknown
}
tmp = *block_mac;
*block_mac = path->entry[path->count - 1].block_mac;
path->entry[path->count - 1].block_mac = tmp;
}
static void block_tree_node_merge(struct transaction* tr,
struct block_tree_path* path);
/**
* block_tree_remove_internal - Remove key and right child from internal node
* @tr: Transaction object.
* @path: Tree path object.
*
* Removes an entry from an internal node, and check the resulting node follows
* B+ tree rules. If a root node would have no remaining entries, delete it. If
* a non-root node has too few remaining entries, call block_tree_node_merge
* which will either borrow an entry from, or merge with a sibling node.
*
* TODO: merge with block_tree_node_merge and avoid recursion?
*/
static void block_tree_remove_internal(struct transaction* tr,
struct block_tree_path* path) {
const struct block_tree_node* node_ro;
struct block_tree_node* node_rw;
struct obj_ref node_ref = OBJ_REF_INITIAL_VALUE(node_ref);
int index;
bool need_merge;
const struct block_mac* block_mac;
assert(path->count);
block_mac = &path->entry[path->count - 1].block_mac;
index = path->entry[path->count - 1].index;
node_ro = block_get(tr, block_mac, NULL, &node_ref);
if (!node_ro) {
assert(tr->failed);
pr_warn("transaction failed, abort\n");
return;
}
assert(!block_tree_node_is_leaf(node_ro));
assert(index > 0);
if (print_internal_changes) {
printf("%s: @%d:", __func__, index);
block_tree_node_print(tr, path->tree, block_mac_to_block(tr, block_mac),
node_ro);
}
if (path->count == 1 &&
!block_tree_node_get_key(path->tree, DATA_BLOCK_INVALID, node_ro, 1)) {
/* block_tree_node_merge always removes right node */
assert(index == 1);
const struct block_mac* new_root =
block_tree_node_get_child_data(path->tree, node_ro, 0);
if (print_internal_changes) {
printf("%s: root emptied, new root %" PRIu64 "\n", __func__,
block_mac_to_block(tr, new_root));
}
assert(block_mac_valid(tr, new_root));
path->tree->root = *new_root;
block_discard_dirty(node_ro);
block_put(node_ro, &node_ref);
assert(!path->tree->copy_on_write || path->tree->allow_copy_on_write);
block_free_etc(tr, block_mac_to_block(tr, block_mac),
!path->tree->allow_copy_on_write);
return;
}
node_rw = block_tree_block_dirty(tr, path, path->count - 1, node_ro);
if (tr->failed) {
block_put(node_ro, &node_ref);
pr_warn("transaction failed, abort\n");
return;
}
block_tree_node_shift_down(path->tree, node_rw, index - 1, index,
SHIFT_RIGHT_CHILD);
if (print_internal_changes) {
printf("%s: @%d done:", __func__, index);
block_tree_node_print(tr, path->tree, block_mac_to_block(tr, block_mac),
node_rw);
}
need_merge =
path->count > 1 && block_tree_below_min_full(path->tree, node_rw);
block_tree_path_put_dirty(tr, path, path->count - 1, node_rw, &node_ref);
if (need_merge) {
block_tree_node_merge(tr, path);
}
}
/**
* block_tree_node_merge - Merge node or borrow entry from sibling block
* @tr: Transaction object.
* @path: Tree path object.
*
* If sibling block has more entries than it needs, migrate an entry from it,
* otherwise merge the two nodes and free the block that stored the right node.
*/
static void block_tree_node_merge(struct transaction* tr,
struct block_tree_path* path) {
const struct block_tree_node* node_ro;
const struct block_tree_node* merge_node_ro;
struct block_tree_node* node_rw;
struct block_tree_node* merge_node_rw;
struct obj_ref node_ref1 = OBJ_REF_INITIAL_VALUE(node_ref1);
struct obj_ref node_ref2 = OBJ_REF_INITIAL_VALUE(node_ref2);
struct obj_ref* node_ref = &node_ref1;
struct obj_ref* merge_node_ref = &node_ref2;
const struct block_mac* block_mac;
struct block_mac merge_block;
unsigned int src_index;
unsigned int dest_index;
data_block_t key;
data_block_t parent_key;
int index;
bool node_is_left;
bool is_leaf;
assert(path->count > 1);
assert(path->tree);
block_mac = &path->entry[path->count - 1].block_mac;
node_is_left = !path->entry[path->count - 2].index;
merge_block = block_tree_get_sibling_block(tr, path);
if (tr->failed) {
pr_warn("transaction failed, abort\n");
return;
}
node_ro = block_get(tr, block_mac, NULL, node_ref);
if (!node_ro) {
assert(tr->failed);
pr_warn("transaction failed, abort\n");
return;
}
assert(node_ro);
is_leaf = block_tree_node_is_leaf(node_ro);
merge_node_ro = block_get(tr, &merge_block, NULL, merge_node_ref);
if (!merge_node_ro) {
assert(tr->failed);
block_put(node_ro, node_ref);
pr_warn("transaction failed, abort\n");
return;
}
assert(merge_node_ro);
assert(is_leaf == block_tree_node_is_leaf(merge_node_ro));
if (print_internal_changes) {
printf("%s: node_is_left %d\n", __func__, node_is_left);
block_tree_node_print(tr, path->tree, block_mac_to_block(tr, block_mac),
node_ro);
block_tree_node_print(tr, path->tree,
block_mac_to_block(tr, &merge_block),
merge_node_ro);
}
assert(block_tree_below_min_full(path->tree, node_ro));
assert(!block_tree_below_min_full(path->tree, merge_node_ro));
if (block_tree_above_min_full(path->tree, merge_node_ro)) {
/* migrate entries instead */
block_tree_path_set_sibling_block(path, &merge_block, !node_is_left);
assert(!tr->failed);
merge_node_rw = block_tree_block_dirty(tr, path, path->count - 1,
merge_node_ro);
block_tree_path_set_sibling_block(path, &merge_block, node_is_left);
if (!merge_node_rw) {
assert(tr->failed);
block_put(node_ro, node_ref);
block_put(merge_node_ro, merge_node_ref);
pr_warn("transaction failed, abort\n");
return;
}
assert(!block_tree_node_need_copy(tr, path->tree, &merge_block));
node_rw = block_dirty(tr, node_ro, !path->tree->copy_on_write);
assert(!block_tree_node_need_copy(tr, path->tree, block_mac));
if (node_is_left) {
src_index = 0;
dest_index = block_tree_node_min_full_index(path->tree, node_rw);
} else {
src_index =
block_tree_node_get_key_count(path->tree, merge_node_rw) -
1;
dest_index = 0;
}
key = block_tree_node_get_key(path->tree, DATA_BLOCK_INVALID,
merge_node_rw, src_index);
if (node_is_left && is_leaf) {
parent_key = block_tree_node_get_key(path->tree, DATA_BLOCK_INVALID,
merge_node_rw, 1);
} else {
parent_key = key;
}
if (!is_leaf) {
if (node_is_left) {
key = path->entry[path->count - 2].next_key;
} else {
key = path->entry[path->count - 2].prev_key;
}
}
block_tree_node_insert(path->tree, node_rw, dest_index,
(node_is_left && !is_leaf)
? SHIFT_RIGHT_CHILD
: SHIFT_LEAF_OR_LEFT_CHILD,
&key,
block_tree_node_get_child_data(
path->tree, merge_node_rw,
src_index + (!node_is_left && !is_leaf)),
NULL, NULL);
block_tree_node_shift_down(
path->tree, merge_node_rw, src_index, src_index + 1,
(node_is_left || is_leaf) ? SHIFT_LEAF_OR_LEFT_CHILD
: SHIFT_RIGHT_CHILD);
if (node_is_left) {
if (dest_index == 0 && is_leaf) {
data_block_t key0;
key0 = block_tree_node_get_key(path->tree, DATA_BLOCK_INVALID,
node_rw, 0);
assert(key0);
block_tree_update_key(tr, path, path->count - 2, key0);
}
block_tree_path_set_sibling_block(path, &merge_block,
!node_is_left);
}
block_tree_update_key(tr, path, path->count - 2, parent_key);
if (node_is_left) {
block_tree_path_set_sibling_block(path, &merge_block, node_is_left);
}
if (print_internal_changes) {
printf("%s: %" PRIu64 " %" PRIu64 " done\n", __func__,
block_mac_to_block(tr, block_mac),
block_mac_to_block(tr, &merge_block));
block_tree_node_print(tr, path->tree,
block_mac_to_block(tr, block_mac), node_rw);
block_tree_node_print(tr, path->tree,
block_mac_to_block(tr, &merge_block),
merge_node_rw);
}
block_tree_path_put_dirty(tr, path, path->count - 1, node_rw, node_ref);
block_tree_path_set_sibling_block(path, &merge_block, !node_is_left);
block_tree_path_put_dirty(tr, path, path->count - 1, merge_node_rw,
merge_node_ref);
block_tree_path_set_sibling_block(path, &merge_block, node_is_left);
} else {
const struct block_mac* left;
const struct block_mac* right;
data_block_t* merge_key;
if (!node_is_left) {
const struct block_tree_node* tmp_node;
struct obj_ref* tmp_ref;
tmp_node = node_ro;
node_ro = merge_node_ro;
merge_node_ro = tmp_node;
tmp_ref = node_ref;
node_ref = merge_node_ref;
merge_node_ref = tmp_ref;
block_tree_path_set_sibling_block(path, &merge_block, true);
}
left = block_mac;
right = &merge_block;
node_rw = block_tree_block_dirty(tr, path, path->count - 1, node_ro);
if (!node_rw) {
assert(tr->failed);
block_put(node_ro, node_ref);
block_put(merge_node_ro, merge_node_ref);
pr_warn("transaction failed, abort\n");
return;
}
assert(!block_tree_node_need_copy(tr, path->tree, left));
index = block_tree_node_get_key_count(path->tree, node_rw);
if (is_leaf) {
merge_key = NULL;
} else {
merge_key = &path->entry[path->count - 2].next_key;
assert(*merge_key);
}
block_tree_node_merge_entries(
path->tree, node_rw, merge_node_ro, index,
block_tree_node_get_key_count(path->tree, merge_node_ro),
merge_key);
assert(block_tree_node_check(tr, path->tree, node_rw,
block_mac_to_block(tr, left), 0,
DATA_BLOCK_MAX));
if (is_leaf && !block_tree_node_min_full_index(path->tree, node_rw) &&
!index) {
data_block_t key0;
key0 = block_tree_node_get_key(path->tree, DATA_BLOCK_INVALID,
node_rw, 0);
assert(key0);
if (print_internal_changes) {
printf("hack, special case for order <= 4 tree\n");
}
block_tree_update_key(tr, path, path->count - 2, key0);
}
if (print_internal_changes) {
printf("%s: merge done, free %" PRIu64 "\n", __func__,
block_mac_to_block(tr, right));
block_tree_node_print(tr, path->tree, block_mac_to_block(tr, left),
node_rw);
}
block_tree_path_put_dirty(tr, path, path->count - 1, node_rw, node_ref);
block_discard_dirty(merge_node_ro);
block_put(merge_node_ro, merge_node_ref);
block_tree_path_set_sibling_block(path, &merge_block, false);
path->count--;
block_tree_remove_internal(tr, path);
block_free_etc(tr, block_mac_to_block(tr, block_mac),
!path->tree->allow_copy_on_write);
}
}
/**
* block_tree_insert_block_mac - Insert an entry into a B+ tree
* @tr: Transaction object.
* @tree: Tree object.
* @key: Key of new entry.
* @data: Data of new entry.
*
* Find a valid insert point for @key and insert @key and @data there. If this
* node becomes overfull, split it.
*
* TODO: use a void * for data and merge with block_tree_insert.
* TODO: Allow insert by path
*/
void block_tree_insert_block_mac(struct transaction* tr,
struct block_tree* tree,
data_block_t key,
struct block_mac data) {
const struct block_tree_node* node_ro;
struct block_tree_node* node_rw;
struct obj_ref node_ref = OBJ_REF_INITIAL_VALUE(node_ref);
const struct block_mac* block_mac;
data_block_t overflow_key = 0;
struct block_mac overflow_data;
int index;
struct block_tree_path path;
assert(!tr->failed);
assert(!tree->updating);
assert(key);
assert(block_mac_valid(tr, &data));
tree->updating = true;
if (!block_mac_valid(tr, &tree->root)) {
assert(!tree->copy_on_write || tree->allow_copy_on_write);
block_mac_set_block(tr, &tree->root,
block_allocate_etc(tr, !tree->allow_copy_on_write));
if (tr->failed) {
pr_warn("transaction failed, abort\n");
goto err;
}
if (print_internal_changes) {
printf("%s: new root block %" PRIu64 "\n", __func__,
block_mac_to_block(tr, &tree->root));
}
assert(block_mac_valid(tr, &tree->root));
node_rw = block_get_cleared(tr, block_mac_to_block(tr, &tree->root),
!tree->allow_copy_on_write, &node_ref);
node_rw->is_leaf = true;
block_put_dirty(tr, node_rw, &node_ref, &tree->root, NULL);
tree->root_block_changed = true;
}
block_tree_walk(tr, tree, key, false, &path);
if (tr->failed) {
pr_warn("transaction failed, abort\n");
goto err;
}
assert(path.count > 0);
block_mac = &path.entry[path.count - 1].block_mac;
index = path.entry[path.count - 1].index;
node_ro = block_get(tr, block_mac, NULL, &node_ref);
if (!node_ro) {
assert(tr->failed);
pr_warn("transaction failed, abort\n");
goto err;
}
if (print_changes) {
printf("%s: key %" PRIu64 ", data.block %" PRIu64 ", index %d\n",
__func__, key, block_mac_to_block(tr, &data), index);
assert(node_ro);
block_tree_node_print(tr, tree, block_mac_to_block(tr, block_mac),
node_ro);
}
assert(node_ro);
assert(block_tree_node_is_leaf(node_ro));
assert(index || !path.entry[path.count - 1].prev_key ||
block_tree_node_get_key(tree, DATA_BLOCK_INVALID, node_ro, index) ==
key);
node_rw = block_tree_block_dirty(tr, &path, path.count - 1, node_ro);
if (!node_rw) {
assert(tr->failed);
pr_warn("transaction failed, abort\n");
block_put(node_ro, &node_ref);
goto err;
}
block_tree_node_insert(tree, node_rw, index, SHIFT_LEAF_OR_LEFT_CHILD, &key,
&data, &overflow_key, &overflow_data);
block_tree_path_put_dirty(tr, &path, path.count - 1, node_rw, &node_ref);
if (overflow_key) {
assert(block_mac_valid(tr, &overflow_data));
block_tree_node_split(tr, &path, overflow_key, NULL, &overflow_data);
}
if (print_changes_full_tree) {
printf("%s: key %" PRIu64 ", data.block %" PRIu64 ", done:\n", __func__,
key, block_mac_to_block(tr, &data));
block_tree_print(tr, tree);
}
err:
tree->update_count++;
tree->updating = false;
full_assert(block_tree_check(tr, tree));
}
/**
* block_tree_insert - Insert a data_block_t type entry into a B+ tree
* @tr: Transaction object.
* @tree: Tree object.
* @key: Key of new entry.
* @data_block: Data of new entry.
*/
void block_tree_insert(struct transaction* tr,
struct block_tree* tree,
data_block_t key,
data_block_t data_block) {
struct block_mac data = BLOCK_MAC_INITIAL_VALUE(data);
block_mac_set_block(tr, &data, data_block);
block_tree_insert_block_mac(tr, tree, key, data);
}
/**
* block_tree_remove - Remove an entry from a B+ tree
* @tr: Transaction object.
* @tree: Tree object.
* @key: Key of entry to remove.
* @data: Data of entry to remove.
*
* Find an entry in the tree where the key matches @key, and the start of data
* matches @data, and remove it from the tree. Call block_tree_node_merge if
* the resulting node does follow B+ tree rules.
*
* TODO: Remove by by path instead
*/
void block_tree_remove(struct transaction* tr,
struct block_tree* tree,
data_block_t key,
data_block_t data) {
struct block_tree_path path;
const struct block_tree_node* node_ro;
struct block_tree_node* node_rw;
struct obj_ref node_ref = OBJ_REF_INITIAL_VALUE(node_ref);
const struct block_mac* block_mac;
int index;
bool need_merge = false;
data_block_t new_parent_key;
assert(!tr->failed);
assert(!tree->updating);
assert(block_mac_valid(tr, &tree->root));
full_assert(block_tree_check(tr, tree));
assert(key);
assert(data);
tree->updating = true;
block_tree_walk(tr, tree, key, false, &path); /* TODO: make writeable */
if (tr->failed) {
pr_warn("transaction failed, abort\n");
goto err;
}
assert(path.count > 0);
if (block_mac_to_block(tr, &path.data) != data) {
/* TODO: make path writeable after finding maching entry */
block_tree_walk(tr, tree, key - 1, true, &path);
while (block_mac_to_block(tr, &path.data) != data ||
block_tree_path_get_key(&path) != key) {
assert(block_tree_path_get_key(&path));
block_tree_path_next(&path);
}
}
block_mac = &path.entry[path.count - 1].block_mac;
index = path.entry[path.count - 1].index;
node_ro = block_get(tr, block_mac, NULL, &node_ref);
if (!node_ro) {
assert(tr->failed);
pr_warn("transaction failed, abort\n");
goto err;
}
assert(block_tree_node_is_leaf(node_ro));
assert(block_tree_node_get_key(tree, DATA_BLOCK_INVALID, node_ro, index) ==
key);
assert(!memcmp(block_tree_node_get_child_data(tree, node_ro, index), &data,
MIN(sizeof(data), tr->fs->block_num_size)));
if (print_changes) {
printf("%s: key %" PRIu64 ", data %" PRIu64 ", index %d\n", __func__,
key, data, index);
block_tree_node_print(tr, tree, block_mac_to_block(tr, block_mac),
node_ro);
}
node_rw = block_tree_block_dirty(tr, &path, path.count - 1, node_ro);
if (tr->failed) {
block_put(node_ro, &node_ref);
pr_warn("transaction failed, abort\n");
goto err;
}
assert(node_rw);
block_tree_node_leaf_remove_entry(tr, tree, block_mac, node_rw, index);
if (path.count > 1) {
if (!index) {
new_parent_key = block_tree_node_get_key(tree, DATA_BLOCK_INVALID,
node_rw, 0);
assert(new_parent_key ||
!block_tree_node_min_full_index(tree, node_rw));
if (new_parent_key) {
block_tree_update_key(tr, &path, path.count - 2,
new_parent_key);
}
}
need_merge = block_tree_below_min_full(tree, node_rw);
}
block_tree_path_put_dirty(tr, &path, path.count - 1, node_rw, &node_ref);
if (need_merge) {
block_tree_node_merge(tr, &path);
}
if (print_changes_full_tree) {
printf("%s: key %" PRIu64 ", data %" PRIu64 ", done:\n", __func__, key,
data);
block_tree_print(tr, tree);
}
err:
tree->update_count++;
tree->updating = false;
full_assert(block_tree_check(tr, tree));
}
/**
* block_tree_update_block_mac - Update key or data of an existing B+ tree entry
* @tr: Transaction object.