1
0
Fork 0

bcache: Refactor btree io

The most significant change is that btree reads are now done
synchronously, instead of asynchronously and doing the post read stuff
from a workqueue.

This was originally done because we can't block on IO under
generic_make_request(). But - we already have a mechanism to punt cache
lookups to workqueue if needed, so if we just use that we don't have to
deal with the complexity of doing things asynchronously.

The main benefit is this makes the locking situation saner; we can hold
our write lock on the btree node until we're finished reading it, and we
don't need that btree_node_read_done() flag anymore.

Also, for writes, btree_write() was broken out into btree_node_write()
and btree_leaf_dirty() - the old code with the boolean argument was dumb
and confusing.

The prio_blocked mechanism was improved a bit too, now the only counter
is in struct btree_write, we don't mess with transfering a count from
struct btree anymore.

This required changing garbage collection to block prios at the start
and unblock when it finishes, which is cleaner than what it was doing
anyways (the old code had mostly the same effect, but was doing it in a
convoluted way)

And the btree iter btree_node_read_done() uses was converted to a real
mempool.

Signed-off-by: Kent Overstreet <koverstreet@google.com>
hifive-unleashed-5.1
Kent Overstreet 2013-04-25 13:58:35 -07:00
parent 119ba0f828
commit 5794351146
6 changed files with 142 additions and 178 deletions

View File

@ -819,10 +819,9 @@ struct cache_set {
/*
* A btree node on disk could have too many bsets for an iterator to fit
* on the stack - this is a single element mempool for btree_read_work()
* on the stack - have to dynamically allocate them
*/
struct mutex fill_lock;
struct btree_iter *fill_iter;
mempool_t *fill_iter;
/*
* btree_sort() is a merge sort and requires temporary space - single

View File

@ -134,44 +134,17 @@ static uint64_t btree_csum_set(struct btree *b, struct bset *i)
return crc ^ 0xffffffffffffffffULL;
}
static void btree_bio_endio(struct bio *bio, int error)
void bch_btree_node_read_done(struct btree *b)
{
struct closure *cl = bio->bi_private;
struct btree *b = container_of(cl, struct btree, io.cl);
if (error)
set_btree_node_io_error(b);
bch_bbio_count_io_errors(b->c, bio, error, (bio->bi_rw & WRITE)
? "writing btree" : "reading btree");
closure_put(cl);
}
static void btree_bio_init(struct btree *b)
{
BUG_ON(b->bio);
b->bio = bch_bbio_alloc(b->c);
b->bio->bi_end_io = btree_bio_endio;
b->bio->bi_private = &b->io.cl;
}
void bch_btree_read_done(struct closure *cl)
{
struct btree *b = container_of(cl, struct btree, io.cl);
struct bset *i = b->sets[0].data;
struct btree_iter *iter = b->c->fill_iter;
const char *err = "bad btree header";
BUG_ON(b->nsets || b->written);
struct bset *i = b->sets[0].data;
struct btree_iter *iter;
bch_bbio_free(b->bio, b->c);
b->bio = NULL;
mutex_lock(&b->c->fill_lock);
iter = mempool_alloc(b->c->fill_iter, GFP_NOWAIT);
iter->size = b->c->sb.bucket_size / b->c->sb.block_size;
iter->used = 0;
if (btree_node_io_error(b) ||
!i->seq)
if (!i->seq)
goto err;
for (;
@ -228,17 +201,8 @@ void bch_btree_read_done(struct closure *cl)
if (b->written < btree_blocks(b))
bch_bset_init_next(b);
out:
mutex_unlock(&b->c->fill_lock);
spin_lock(&b->c->btree_read_time_lock);
bch_time_stats_update(&b->c->btree_read_time, b->io_start_time);
spin_unlock(&b->c->btree_read_time_lock);
smp_wmb(); /* read_done is our write lock */
set_btree_node_read_done(b);
closure_return(cl);
mempool_free(iter, b->c->fill_iter);
return;
err:
set_btree_node_io_error(b);
bch_cache_set_error(b->c, "%s at bucket %zu, block %zu, %u keys",
@ -247,26 +211,51 @@ err:
goto out;
}
void bch_btree_read(struct btree *b)
static void btree_node_read_endio(struct bio *bio, int error)
{
BUG_ON(b->nsets || b->written);
struct closure *cl = bio->bi_private;
closure_put(cl);
}
if (!closure_trylock(&b->io.cl, &b->c->cl))
BUG();
b->io_start_time = local_clock();
btree_bio_init(b);
b->bio->bi_rw = REQ_META|READ_SYNC;
b->bio->bi_size = KEY_SIZE(&b->key) << 9;
bch_bio_map(b->bio, b->sets[0].data);
void bch_btree_node_read(struct btree *b)
{
uint64_t start_time = local_clock();
struct closure cl;
struct bio *bio;
closure_init_stack(&cl);
pr_debug("%s", pbtree(b));
trace_bcache_btree_read(b->bio);
bch_submit_bbio(b->bio, b->c, &b->key, 0);
continue_at(&b->io.cl, bch_btree_read_done, system_wq);
bio = bch_bbio_alloc(b->c);
bio->bi_rw = REQ_META|READ_SYNC;
bio->bi_size = KEY_SIZE(&b->key) << 9;
bio->bi_end_io = btree_node_read_endio;
bio->bi_private = &cl;
bch_bio_map(bio, b->sets[0].data);
trace_bcache_btree_read(bio);
bch_submit_bbio(bio, b->c, &b->key, 0);
closure_sync(&cl);
if (!test_bit(BIO_UPTODATE, &bio->bi_flags))
set_btree_node_io_error(b);
bch_bbio_free(bio, b->c);
if (btree_node_io_error(b))
goto err;
bch_btree_node_read_done(b);
spin_lock(&b->c->btree_read_time_lock);
bch_time_stats_update(&b->c->btree_read_time, start_time);
spin_unlock(&b->c->btree_read_time_lock);
return;
err:
bch_cache_set_error(b->c, "io error reading bucket %lu",
PTR_BUCKET_NR(b->c, &b->key, 0));
}
static void btree_complete_write(struct btree *b, struct btree_write *w)
@ -280,15 +269,11 @@ static void btree_complete_write(struct btree *b, struct btree_write *w)
__closure_wake_up(&b->c->journal.wait);
}
if (w->owner)
closure_put(w->owner);
w->prio_blocked = 0;
w->journal = NULL;
w->owner = NULL;
}
static void __btree_write_done(struct closure *cl)
static void __btree_node_write_done(struct closure *cl)
{
struct btree *b = container_of(cl, struct btree, io.cl);
struct btree_write *w = btree_prev_write(b);
@ -304,7 +289,7 @@ static void __btree_write_done(struct closure *cl)
closure_return(cl);
}
static void btree_write_done(struct closure *cl)
static void btree_node_write_done(struct closure *cl)
{
struct btree *b = container_of(cl, struct btree, io.cl);
struct bio_vec *bv;
@ -313,10 +298,22 @@ static void btree_write_done(struct closure *cl)
__bio_for_each_segment(bv, b->bio, n, 0)
__free_page(bv->bv_page);
__btree_write_done(cl);
__btree_node_write_done(cl);
}
static void do_btree_write(struct btree *b)
static void btree_node_write_endio(struct bio *bio, int error)
{
struct closure *cl = bio->bi_private;
struct btree *b = container_of(cl, struct btree, io.cl);
if (error)
set_btree_node_io_error(b);
bch_bbio_count_io_errors(b->c, bio, error, "writing btree");
closure_put(cl);
}
static void do_btree_node_write(struct btree *b)
{
struct closure *cl = &b->io.cl;
struct bset *i = b->sets[b->nsets].data;
@ -325,7 +322,11 @@ static void do_btree_write(struct btree *b)
i->version = BCACHE_BSET_VERSION;
i->csum = btree_csum_set(b, i);
btree_bio_init(b);
BUG_ON(b->bio);
b->bio = bch_bbio_alloc(b->c);
b->bio->bi_end_io = btree_node_write_endio;
b->bio->bi_private = &b->io.cl;
b->bio->bi_rw = REQ_META|WRITE_SYNC;
b->bio->bi_size = set_blocks(i, b->c) * block_bytes(b->c);
bch_bio_map(b->bio, i);
@ -345,7 +346,7 @@ static void do_btree_write(struct btree *b)
trace_bcache_btree_write(b->bio);
bch_submit_bbio(b->bio, b->c, &k.key, 0);
continue_at(cl, btree_write_done, NULL);
continue_at(cl, btree_node_write_done, NULL);
} else {
b->bio->bi_vcnt = 0;
bch_bio_map(b->bio, i);
@ -354,26 +355,30 @@ static void do_btree_write(struct btree *b)
bch_submit_bbio(b->bio, b->c, &k.key, 0);
closure_sync(cl);
__btree_write_done(cl);
__btree_node_write_done(cl);
}
}
static void __btree_write(struct btree *b)
void bch_btree_node_write(struct btree *b, struct closure *parent)
{
struct bset *i = b->sets[b->nsets].data;
BUG_ON(current->bio_list);
BUG_ON(b->written >= btree_blocks(b));
BUG_ON(b->written && !i->keys);
BUG_ON(b->sets->data->seq != i->seq);
closure_lock(&b->io, &b->c->cl);
cancel_delayed_work(&b->work);
/* If caller isn't waiting for write, parent refcount is cache set */
closure_lock(&b->io, parent ?: &b->c->cl);
clear_bit(BTREE_NODE_dirty, &b->flags);
change_bit(BTREE_NODE_write_idx, &b->flags);
bch_check_key_order(b, i);
BUG_ON(b->written && !i->keys);
do_btree_write(b);
do_btree_node_write(b);
pr_debug("%s block %i keys %i", pbtree(b), b->written, i->keys);
@ -387,37 +392,31 @@ static void __btree_write(struct btree *b)
bch_bset_init_next(b);
}
static void btree_write_work(struct work_struct *w)
static void btree_node_write_work(struct work_struct *w)
{
struct btree *b = container_of(to_delayed_work(w), struct btree, work);
down_write(&b->lock);
rw_lock(true, b, b->level);
if (btree_node_dirty(b))
__btree_write(b);
up_write(&b->lock);
bch_btree_node_write(b, NULL);
rw_unlock(true, b);
}
void bch_btree_write(struct btree *b, bool now, struct btree_op *op)
static void bch_btree_leaf_dirty(struct btree *b, struct btree_op *op)
{
struct bset *i = b->sets[b->nsets].data;
struct btree_write *w = btree_current_write(b);
BUG_ON(b->written &&
(b->written >= btree_blocks(b) ||
i->seq != b->sets[0].data->seq ||
!i->keys));
BUG_ON(!b->written);
BUG_ON(!i->keys);
if (!btree_node_dirty(b)) {
set_btree_node_dirty(b);
queue_delayed_work(btree_io_wq, &b->work,
msecs_to_jiffies(30000));
}
if (!btree_node_dirty(b))
queue_delayed_work(btree_io_wq, &b->work, 30 * HZ);
w->prio_blocked += b->prio_blocked;
b->prio_blocked = 0;
set_btree_node_dirty(b);
if (op && op->journal && !b->level) {
if (op && op->journal) {
if (w->journal &&
journal_pin_cmp(b->c, w, op)) {
atomic_dec_bug(w->journal);
@ -430,23 +429,10 @@ void bch_btree_write(struct btree *b, bool now, struct btree_op *op)
}
}
if (current->bio_list)
return;
/* Force write if set is too big */
if (now ||
b->level ||
set_bytes(i) > PAGE_SIZE - 48) {
if (op && now) {
/* Must wait on multiple writes */
BUG_ON(w->owner);
w->owner = &op->cl;
closure_get(&op->cl);
}
__btree_write(b);
}
BUG_ON(!b->written);
if (set_bytes(i) > PAGE_SIZE - 48 &&
!current->bio_list)
bch_btree_node_write(b, NULL);
}
/*
@ -559,7 +545,7 @@ static struct btree *mca_bucket_alloc(struct cache_set *c,
init_rwsem(&b->lock);
lockdep_set_novalidate_class(&b->lock);
INIT_LIST_HEAD(&b->list);
INIT_DELAYED_WORK(&b->work, btree_write_work);
INIT_DELAYED_WORK(&b->work, btree_node_write_work);
b->c = c;
closure_init_unlocked(&b->io);
@ -582,7 +568,7 @@ static int mca_reap(struct btree *b, struct closure *cl, unsigned min_order)
BUG_ON(btree_node_dirty(b) && !b->sets[0].data);
if (cl && btree_node_dirty(b))
bch_btree_write(b, true, NULL);
bch_btree_node_write(b, NULL);
if (cl)
closure_wait_event_async(&b->io.wait, cl,
@ -905,6 +891,9 @@ retry:
b = mca_find(c, k);
if (!b) {
if (current->bio_list)
return ERR_PTR(-EAGAIN);
mutex_lock(&c->bucket_lock);
b = mca_alloc(c, k, level, &op->cl);
mutex_unlock(&c->bucket_lock);
@ -914,7 +903,7 @@ retry:
if (IS_ERR(b))
return b;
bch_btree_read(b);
bch_btree_node_read(b);
if (!write)
downgrade_write(&b->lock);
@ -937,15 +926,12 @@ retry:
for (; i <= b->nsets; i++)
prefetch(b->sets[i].data);
if (!closure_wait_event(&b->io.wait, &op->cl,
btree_node_read_done(b))) {
if (btree_node_io_error(b)) {
rw_unlock(write, b);
b = ERR_PTR(-EAGAIN);
} else if (btree_node_io_error(b)) {
rw_unlock(write, b);
b = ERR_PTR(-EIO);
} else
BUG_ON(!b->written);
return ERR_PTR(-EIO);
}
BUG_ON(!b->written);
return b;
}
@ -959,7 +945,7 @@ static void btree_node_prefetch(struct cache_set *c, struct bkey *k, int level)
mutex_unlock(&c->bucket_lock);
if (!IS_ERR_OR_NULL(b)) {
bch_btree_read(b);
bch_btree_node_read(b);
rw_unlock(true, b);
}
}
@ -982,12 +968,6 @@ static void btree_node_free(struct btree *b, struct btree_op *op)
btree_complete_write(b, btree_current_write(b));
clear_bit(BTREE_NODE_dirty, &b->flags);
if (b->prio_blocked &&
!atomic_sub_return(b->prio_blocked, &b->c->prio_blocked))
wake_up_allocators(b->c);
b->prio_blocked = 0;
cancel_delayed_work(&b->work);
mutex_lock(&b->c->bucket_lock);
@ -1028,7 +1008,6 @@ retry:
goto retry;
}
set_btree_node_read_done(b);
b->accessed = 1;
bch_bset_init_next(b);
@ -1166,14 +1145,11 @@ static struct btree *btree_gc_alloc(struct btree *b, struct bkey *k,
if (!IS_ERR_OR_NULL(n)) {
swap(b, n);
__bkey_put(b->c, &b->key);
memcpy(k->ptr, b->key.ptr,
sizeof(uint64_t) * KEY_PTRS(&b->key));
__bkey_put(b->c, &b->key);
atomic_inc(&b->c->prio_blocked);
b->prio_blocked++;
btree_node_free(n, op);
up_write(&n->lock);
}
@ -1293,14 +1269,9 @@ static int btree_gc_recurse(struct btree *b, struct btree_op *op,
void write(struct btree *r)
{
if (!r->written)
bch_btree_write(r, true, op);
else if (btree_node_dirty(r)) {
BUG_ON(btree_current_write(r)->owner);
btree_current_write(r)->owner = writes;
closure_get(writes);
bch_btree_write(r, true, NULL);
}
bch_btree_node_write(r, &op->cl);
else if (btree_node_dirty(r))
bch_btree_node_write(r, writes);
up_write(&r->lock);
}
@ -1386,9 +1357,7 @@ static int bch_btree_gc_root(struct btree *b, struct btree_op *op,
ret = btree_gc_recurse(b, op, writes, gc);
if (!b->written || btree_node_dirty(b)) {
atomic_inc(&b->c->prio_blocked);
b->prio_blocked++;
bch_btree_write(b, true, n ? op : NULL);
bch_btree_node_write(b, n ? &op->cl : NULL);
}
if (!IS_ERR_OR_NULL(n)) {
@ -1508,8 +1477,8 @@ static void bch_btree_gc(struct closure *cl)
struct gc_stat stats;
struct closure writes;
struct btree_op op;
uint64_t start_time = local_clock();
trace_bcache_gc_start(c->sb.set_uuid);
blktrace_msg_all(c, "Starting gc");
@ -1520,6 +1489,8 @@ static void bch_btree_gc(struct closure *cl)
btree_gc_start(c);
atomic_inc(&c->prio_blocked);
ret = btree_root(gc_root, c, &op, &writes, &stats);
closure_sync(&op.cl);
closure_sync(&writes);
@ -1537,6 +1508,9 @@ static void bch_btree_gc(struct closure *cl)
available = bch_btree_gc_finish(c);
atomic_dec(&c->prio_blocked);
wake_up_allocators(c);
bch_time_stats_update(&c->btree_gc_time, start_time);
stats.key_bytes *= sizeof(uint64_t);
@ -1544,10 +1518,9 @@ static void bch_btree_gc(struct closure *cl)
stats.data <<= 9;
stats.in_use = (c->nbuckets - available) * 100 / c->nbuckets;
memcpy(&c->gc_stats, &stats, sizeof(struct gc_stat));
blktrace_msg_all(c, "Finished gc");
blktrace_msg_all(c, "Finished gc");
trace_bcache_gc_end(c->sb.set_uuid);
wake_up_allocators(c);
continue_at(cl, bch_moving_gc, bch_gc_wq);
}
@ -1857,7 +1830,7 @@ merged:
op_type(op), pbtree(b), pkey(k));
if (b->level && !KEY_OFFSET(k))
b->prio_blocked++;
btree_current_write(b)->prio_blocked++;
pr_debug("%s for %s at %s: %s", status,
op_type(op), pbtree(b), pkey(k));
@ -1907,7 +1880,6 @@ bool bch_btree_insert_check_key(struct btree *b, struct btree_op *op,
BUG_ON(op->type != BTREE_INSERT);
BUG_ON(!btree_insert_key(b, op, &tmp.k));
bch_btree_write(b, false, NULL);
ret = true;
out:
downgrade_write(&b->lock);
@ -1967,18 +1939,18 @@ static int btree_split(struct btree *b, struct btree_op *op)
bkey_copy_key(&n2->key, &b->key);
bch_keylist_add(&op->keys, &n2->key);
bch_btree_write(n2, true, op);
bch_btree_node_write(n2, &op->cl);
rw_unlock(true, n2);
} else
bch_btree_insert_keys(n1, op);
bch_keylist_add(&op->keys, &n1->key);
bch_btree_write(n1, true, op);
bch_btree_node_write(n1, &op->cl);
if (n3) {
bkey_copy_key(&n3->key, &MAX_KEY);
bch_btree_insert_keys(n3, op);
bch_btree_write(n3, true, op);
bch_btree_node_write(n3, &op->cl);
closure_sync(&op->cl);
bch_btree_set_root(n3);
@ -2082,8 +2054,12 @@ static int bch_btree_insert_recurse(struct btree *b, struct btree_op *op,
BUG_ON(write_block(b) != b->sets[b->nsets].data);
if (bch_btree_insert_keys(b, op))
bch_btree_write(b, false, op);
if (bch_btree_insert_keys(b, op)) {
if (!b->level)
bch_btree_leaf_dirty(b, op);
else
bch_btree_node_write(b, &op->cl);
}
}
return 0;

View File

@ -102,7 +102,6 @@
#include "debug.h"
struct btree_write {
struct closure *owner;
atomic_t *journal;
/* If btree_split() frees a btree node, it writes a new pointer to that
@ -142,16 +141,12 @@ struct btree {
*/
struct bset_tree sets[MAX_BSETS];
/* Used to refcount bio splits, also protects b->bio */
/* For outstanding btree writes, used as a lock - protects write_idx */
struct closure_with_waitlist io;
/* Gets transferred to w->prio_blocked - see the comment there */
int prio_blocked;
struct list_head list;
struct delayed_work work;
uint64_t io_start_time;
struct btree_write writes[2];
struct bio *bio;
};
@ -164,13 +159,11 @@ static inline void set_btree_node_ ## flag(struct btree *b) \
{ set_bit(BTREE_NODE_ ## flag, &b->flags); } \
enum btree_flags {
BTREE_NODE_read_done,
BTREE_NODE_io_error,
BTREE_NODE_dirty,
BTREE_NODE_write_idx,
};
BTREE_FLAG(read_done);
BTREE_FLAG(io_error);
BTREE_FLAG(dirty);
BTREE_FLAG(write_idx);
@ -293,9 +286,7 @@ static inline void rw_unlock(bool w, struct btree *b)
#ifdef CONFIG_BCACHE_EDEBUG
unsigned i;
if (w &&
b->key.ptr[0] &&
btree_node_read_done(b))
if (w && b->key.ptr[0])
for (i = 0; i <= b->nsets; i++)
bch_check_key_order(b, b->sets[i].data);
#endif
@ -370,9 +361,9 @@ static inline bool should_split(struct btree *b)
> btree_blocks(b));
}
void bch_btree_read_done(struct closure *);
void bch_btree_read(struct btree *);
void bch_btree_write(struct btree *b, bool now, struct btree_op *op);
void bch_btree_node_read(struct btree *);
void bch_btree_node_read_done(struct btree *);
void bch_btree_node_write(struct btree *, struct closure *);
void bch_cannibalize_unlock(struct cache_set *, struct closure *);
void bch_btree_set_root(struct btree *);

View File

@ -144,7 +144,7 @@ void bch_btree_verify(struct btree *b, struct bset *new)
v->written = 0;
v->level = b->level;
bch_btree_read(v);
bch_btree_node_read(v);
closure_wait_event(&v->io.wait, &cl,
atomic_read(&b->io.cl.remaining) == -1);
@ -512,7 +512,7 @@ static ssize_t btree_fuzz(struct kobject *k, struct kobj_attribute *a,
bch_btree_sort(b);
fill->written = 0;
bch_btree_read_done(&fill->io.cl);
bch_btree_node_read_done(fill);
if (b->sets[0].data->keys != fill->sets[0].data->keys ||
memcmp(b->sets[0].data->start,

View File

@ -384,7 +384,7 @@ out:
return;
found:
if (btree_node_dirty(best))
bch_btree_write(best, true, NULL);
bch_btree_node_write(best, NULL);
rw_unlock(true, best);
}

View File

@ -1255,9 +1255,10 @@ static void cache_set_free(struct closure *cl)
free_pages((unsigned long) c->uuids, ilog2(bucket_pages(c)));
free_pages((unsigned long) c->sort, ilog2(bucket_pages(c)));
kfree(c->fill_iter);
if (c->bio_split)
bioset_free(c->bio_split);
if (c->fill_iter)
mempool_destroy(c->fill_iter);
if (c->bio_meta)
mempool_destroy(c->bio_meta);
if (c->search)
@ -1295,7 +1296,7 @@ static void cache_set_flush(struct closure *cl)
/* Should skip this if we're unregistering because of an error */
list_for_each_entry(b, &c->btree_cache, list)
if (btree_node_dirty(b))
bch_btree_write(b, true, NULL);
bch_btree_node_write(b, NULL);
closure_return(cl);
}
@ -1374,7 +1375,6 @@ struct cache_set *bch_cache_set_alloc(struct cache_sb *sb)
BTREE_MAX_PAGES);
mutex_init(&c->bucket_lock);
mutex_init(&c->fill_lock);
mutex_init(&c->sort_lock);
spin_lock_init(&c->sort_time_lock);
closure_init_unlocked(&c->sb_write);
@ -1400,8 +1400,8 @@ struct cache_set *bch_cache_set_alloc(struct cache_sb *sb)
!(c->bio_meta = mempool_create_kmalloc_pool(2,
sizeof(struct bbio) + sizeof(struct bio_vec) *
bucket_pages(c))) ||
!(c->fill_iter = mempool_create_kmalloc_pool(1, iter_size)) ||
!(c->bio_split = bioset_create(4, offsetof(struct bbio, bio))) ||
!(c->fill_iter = kmalloc(iter_size, GFP_KERNEL)) ||
!(c->sort = alloc_bucket_pages(GFP_KERNEL, c)) ||
!(c->uuids = alloc_bucket_pages(GFP_KERNEL, c)) ||
bch_journal_alloc(c) ||
@ -1409,8 +1409,6 @@ struct cache_set *bch_cache_set_alloc(struct cache_sb *sb)
bch_open_buckets_alloc(c))
goto err;
c->fill_iter->size = sb->bucket_size / sb->block_size;
c->congested_read_threshold_us = 2000;
c->congested_write_threshold_us = 20000;
c->error_limit = 8 << IO_ERROR_SHIFT;
@ -1551,7 +1549,7 @@ static void run_cache_set(struct cache_set *c)
goto err_unlock_gc;
bkey_copy_key(&c->root->key, &MAX_KEY);
bch_btree_write(c->root, true, &op);
bch_btree_node_write(c->root, &op.cl);
bch_btree_set_root(c->root);
rw_unlock(true, c->root);