client: Fix a memory leak in client_post_select().
[paraslash.git] / buffer_tree.c
index 1bc9529884dc16b779ee232022e6bb37ccee0e2d..962359fbfa253790880f1380551335eaf9861c6d 100644 (file)
@@ -6,13 +6,28 @@
 #include "string.h"
 #include "buffer_tree.h"
 #include "error.h"
+#include "sched.h"
 
+/* whead = NULL means area full */
+struct btr_pool {
+       char *name;
+       char *area_start;
+       char *area_end;
+       char *rhead;
+       char *whead;
+};
+
+enum btr_buffer_flags {
+       /* changes the way the buffer is deallocated */
+       BTR_BF_BTR_POOL = 1,
+};
 
 struct btr_buffer {
        char *buf;
        size_t size;
        /** The number of references to this buffer. */
        int refcount;
+       struct btr_pool *pool;
 };
 
 struct btr_buffer_reference {
@@ -20,6 +35,7 @@ struct btr_buffer_reference {
        size_t consumed;
        /* Each buffer reference belongs to the buffer queue list of some buffer tree node. */
        struct list_head node;
+       size_t wrap_count;
 };
 
 struct btr_node {
@@ -29,6 +45,8 @@ struct btr_node {
        struct list_head node;
        /* The children nodes of this btr node are linked together in a list. */
        struct list_head children;
+       /* Time of first data transfer. */
+       struct timeval start;
        /**
         * The input queue is a list of references to btr buffers. Each item on
         * the list represents an input buffer which has not been completely
@@ -39,6 +57,130 @@ struct btr_node {
        void *context;
 };
 
+/**
+ * Create a new buffer pool.
+ *
+ * \param name The name of the new buffer pool.
+ *
+ * \param area The size in bytes of the pool area.
+ *
+ * \return An opaque pointer to the newly created buffer pool. It must be
+ * passed to btr_pool_free() after it is no longer used to deallocate all
+ * resources.
+ */
+struct btr_pool *btr_pool_new(const char *name, size_t area_size)
+{
+       struct btr_pool *btrp;
+
+       PARA_INFO_LOG("%s, %zu bytes\n", name, area_size);
+       btrp = para_malloc(sizeof(*btrp));
+       btrp->area_start = para_malloc(area_size);
+       btrp->area_end = btrp->area_start + area_size;
+       btrp->rhead = btrp->area_start;
+       btrp->whead = btrp->area_start;
+       btrp->name = para_strdup(name);
+       return btrp;
+}
+
+/**
+ * Dellocate resources used by a buffer pool.
+ *
+ * \param btrp A pointer obtained via btr_pool_new().
+ */
+void btr_pool_free(struct btr_pool *btrp)
+{
+       if (!btrp)
+               return;
+       free(btrp->area_start);
+       free(btrp->name);
+       free(btrp);
+}
+
+/**
+ * Return the size of the buffer pool area.
+ *
+ * \param btrp The buffer pool.
+ *
+ * \return The same value which was passed during creation time to
+ * btr_pool_new().
+ */
+size_t btr_pool_size(struct btr_pool *btrp)
+{
+       return btrp->area_end - btrp->area_start;
+}
+
+size_t btr_pool_filled(struct btr_pool *btrp)
+{
+       if (!btrp->whead)
+               return btr_pool_size(btrp);
+       if (btrp->rhead <= btrp->whead)
+               return  btrp->whead - btrp->rhead;
+       return btr_pool_size(btrp) - (btrp->rhead - btrp->whead);
+}
+
+size_t btr_pool_unused(struct btr_pool *btrp)
+{
+       return btr_pool_size(btrp) - btr_pool_filled(btrp);
+}
+
+/*
+ * Return maximal size available for one read. This is
+ * smaller than the value returned by btr_pool_unused().
+ */
+size_t btr_pool_available(struct btr_pool *btrp)
+{
+       if (!btrp->whead)
+               return 0;
+       if (btrp->rhead <= btrp->whead)
+               return btrp->area_end - btrp->whead;
+       return btrp->rhead - btrp->whead;
+}
+
+size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result)
+{
+       if (result)
+               *result = btrp->whead;
+       return btr_pool_available(btrp);
+}
+
+void btr_pool_allocate(struct btr_pool *btrp, size_t size)
+{
+       char *end;
+
+       if (size == 0)
+               return;
+       assert(size <= btr_pool_available(btrp));
+       end = btrp->whead + size;
+       assert(end <= btrp->area_end);
+
+       if (end == btrp->area_end) {
+               PARA_DEBUG_LOG("%s: end of pool area reached\n", btrp->name);
+               end = btrp->area_start;
+       }
+       if (end == btrp->rhead) {
+               PARA_DEBUG_LOG("%s btrp buffer full\n", btrp->name);
+               end = NULL; /* buffer full */
+       }
+       btrp->whead = end;
+}
+
+static void btr_pool_deallocate(struct btr_pool *btrp, size_t size)
+{
+       char *end = btrp->rhead + size;
+
+       if (size == 0)
+               return;
+       assert(end <= btrp->area_end);
+       assert(size <= btr_pool_filled(btrp));
+       if (end == btrp->area_end)
+               end = btrp->area_start;
+       if (!btrp->whead)
+               btrp->whead = btrp->rhead;
+       btrp->rhead = end;
+       if (btrp->rhead == btrp->whead)
+               btrp->rhead = btrp->whead = btrp->area_start;
+}
+
 #define FOR_EACH_CHILD(_tn, _btrn) list_for_each_entry((_tn), \
        &((_btrn)->children), node)
 #define FOR_EACH_CHILD_SAFE(_tn, _tmp, _btrn) \
@@ -49,41 +191,75 @@ struct btr_node {
 #define FOR_EACH_BUFFER_REF_SAFE(_br, _tmp, _btrn) \
        list_for_each_entry_safe((_br), (_tmp), &(_btrn)->input_queue, node)
 
-struct btr_node *btr_new_node(const char *name, struct btr_node *parent,
-               btr_command_handler handler, void *context)
+struct btr_node *btr_new_node(struct btr_node_description *bnd)
 {
        struct btr_node *btrn = para_malloc(sizeof(*btrn));
 
-       btrn->name = para_strdup(name);
-       btrn->parent = parent;
-       btrn->execute = handler;
-       btrn->context = context;
-       if (parent)
-               list_add_tail(&btrn->node, &parent->children);
+       btrn->name = para_strdup(bnd->name);
+       btrn->parent = bnd->parent;
+       btrn->execute = bnd->handler;
+       btrn->context = bnd->context;
+       btrn->start.tv_sec = 0;
+       btrn->start.tv_usec = 0;
        INIT_LIST_HEAD(&btrn->children);
        INIT_LIST_HEAD(&btrn->input_queue);
-       if (parent)
-               PARA_INFO_LOG("added %s as child of %s\n", name, parent->name);
-       else
-               PARA_INFO_LOG("added %s as btr root\n", name);
+       if (!bnd->child) {
+               if (bnd->parent) {
+                       list_add_tail(&btrn->node, &bnd->parent->children);
+                       PARA_INFO_LOG("new leaf node: %s (child of %s)\n",
+                               bnd->name, bnd->parent->name);
+               } else
+                       PARA_INFO_LOG("added %s as btr root\n", bnd->name);
+               goto out;
+       }
+       if (!bnd->parent) {
+               assert(!bnd->child->parent);
+               PARA_INFO_LOG("new root: %s (was %s)\n",
+                       bnd->name, bnd->child->name);
+               btrn->parent = NULL;
+               list_add_tail(&bnd->child->node, &btrn->children);
+               /* link it in */
+               bnd->child->parent = btrn;
+               goto out;
+       }
+       PARA_EMERG_LOG("inserting internal nodes not yet supported.\n");
+       exit(EXIT_FAILURE);
+       assert(bnd->child->parent == bnd->parent);
+out:
        return btrn;
 }
 
 /*
  * Allocate a new btr buffer.
  *
- * The freshly allocated buffer will have a zero refcount.
+ * The freshly allocated buffer will have a zero refcount and will
+ * not be associated with a btr pool.
  */
 static struct btr_buffer *new_btrb(char *buf, size_t size)
 {
-       struct btr_buffer *btrb = para_malloc(sizeof(*btrb));
+       struct btr_buffer *btrb = para_calloc(sizeof(*btrb));
 
        btrb->buf = buf;
        btrb->size = size;
-       btrb->refcount = 0;
        return btrb;
 }
 
+static void dealloc_buffer(struct btr_buffer *btrb)
+{
+       if (btrb->pool)
+               btr_pool_deallocate(btrb->pool, btrb->size);
+       else
+               free(btrb->buf);
+}
+
+static struct btr_buffer_reference *get_first_input_br(struct btr_node *btrn)
+{
+       if (list_empty(&btrn->input_queue))
+               return NULL;
+       return list_first_entry(&btrn->input_queue,
+               struct btr_buffer_reference, node);
+}
+
 /*
  * Deallocate the reference, release the resources if refcount drops to zero.
  */
@@ -91,12 +267,11 @@ static void btr_drop_buffer_reference(struct btr_buffer_reference *br)
 {
        struct btr_buffer *btrb = br->btrb;
 
-       //PARA_CRIT_LOG("dropping buffer reference %p\n", br);
        list_del(&br->node);
        free(br);
        btrb->refcount--;
        if (btrb->refcount == 0) {
-               free(btrb->buf);
+               dealloc_buffer(btrb);
                free(btrb);
        }
 }
@@ -106,12 +281,16 @@ static void add_btrb_to_children(struct btr_buffer *btrb,
 {
        struct btr_node *ch;
 
+       if (btrn->start.tv_sec == 0)
+               btrn->start = *now;
        FOR_EACH_CHILD(ch, btrn) {
-               struct btr_buffer_reference *br = para_malloc(sizeof(*br));
+               struct btr_buffer_reference *br = para_calloc(sizeof(*br));
                br->btrb = btrb;
                br->consumed = consumed;
                list_add_tail(&br->node, &ch->input_queue);
                btrb->refcount++;
+               if (ch->start.tv_sec == 0)
+                       ch->start = *now;
        }
 }
 
@@ -119,10 +298,54 @@ void btr_add_output(char *buf, size_t size, struct btr_node *btrn)
 {
        struct btr_buffer *btrb;
 
+       assert(size != 0);
+       if (list_empty(&btrn->children)) {
+               free(buf);
+               return;
+       }
+       btrb = new_btrb(buf, size);
+       add_btrb_to_children(btrb, btrn, 0);
+}
+
+void btr_add_output_pool(struct btr_pool *btrp, size_t size,
+               struct btr_node *btrn)
+{
+       struct btr_buffer *btrb;
+       char *buf;
+       size_t avail;
+
+       assert(size != 0);
+       if (list_empty(&btrn->children))
+               return;
+       avail = btr_pool_get_buffer(btrp, &buf);
+       assert(avail >= size);
+       btr_pool_allocate(btrp, size);
        btrb = new_btrb(buf, size);
+       btrb->pool = btrp;
        add_btrb_to_children(btrb, btrn, 0);
 }
 
+void btr_copy(const void *src, size_t n, struct btr_pool *btrp,
+       struct btr_node *btrn)
+{
+       char *buf;
+       size_t sz, copy;
+
+       if (n == 0)
+               return;
+       assert(n <= btr_pool_unused(btrp));
+       sz = btr_pool_get_buffer(btrp, &buf);
+       copy = PARA_MIN(sz, n);
+       memcpy(buf, src, copy);
+       btr_add_output_pool(btrp, copy, btrn);
+       if (copy == n)
+               return;
+       sz = btr_pool_get_buffer(btrp, &buf);
+       assert(sz >= n - copy);
+       memcpy(buf, src + copy, n - copy);
+       btr_add_output_pool(btrp, n - copy, btrn);
+}
+
 static void btr_pushdown_br(struct btr_buffer_reference *br, struct btr_node *btrn)
 {
        add_btrb_to_children(br->btrb, btrn, br->consumed);
@@ -137,6 +360,17 @@ void btr_pushdown(struct btr_node *btrn)
                btr_pushdown_br(br, btrn);
 }
 
+int btr_pushdown_one(struct btr_node *btrn)
+{
+       struct btr_buffer_reference *br;
+
+       if (list_empty(&btrn->input_queue))
+               return 0;
+       br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
+       btr_pushdown_br(br, btrn);
+       return 1;
+}
+
 /* Return true if this node has no children. */
 bool btr_no_children(struct btr_node *btrn)
 {
@@ -162,32 +396,90 @@ static inline size_t br_available_bytes(struct btr_buffer_reference *br)
 
 size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
 {
-       *buf = br->btrb->buf + br->consumed;
+       if (buf)
+               *buf = br->btrb->buf + br->consumed;
        return br_available_bytes(br);
 }
 
+/**
+ * \return zero if the input buffer queue is empty.
+ */
 size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
 {
        struct btr_buffer_reference *br;
+       char *buf, *result = NULL;
+       size_t sz, rv = 0;
 
-       if (list_empty(&btrn->input_queue)) {
-               *bufp = NULL;
-               return 0;
+       FOR_EACH_BUFFER_REF(br, btrn) {
+               sz = btr_get_buffer_by_reference(br, &buf);
+               if (!result) {
+                       result = buf;
+                       rv = sz;
+                       if (!br->btrb->pool)
+                               break;
+                       continue;
+               }
+               if (!br->btrb->pool)
+                       break;
+               if (result + rv != buf)
+                       break;
+               rv += sz;
        }
-       br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
-       return btr_get_buffer_by_reference(br, bufp);
+       if (bufp)
+               *bufp = result;
+       return rv;
 }
 
 void btr_consume(struct btr_node *btrn, size_t numbytes)
 {
-       struct btr_buffer_reference *br;
+       struct btr_buffer_reference *br, *tmp;
+       size_t sz;
 
-       assert(!list_empty(&btrn->input_queue));
-       br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
-       assert(br->consumed + numbytes <= br->btrb->size);
-       br->consumed += numbytes;
-       if (br->consumed == br->btrb->size)
-               btr_drop_buffer_reference(br);
+       if (numbytes == 0)
+               return;
+       br = get_first_input_br(btrn);
+       assert(br);
+
+       if (br->wrap_count == 0) {
+               /*
+                * No wrap buffer. Drop buffer references whose buffer
+                * has been fully used. */
+               FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn) {
+                       if (br->consumed + numbytes <= br->btrb->size) {
+                               br->consumed += numbytes;
+                               if (br->consumed == br->btrb->size)
+                                       btr_drop_buffer_reference(br);
+                               return;
+                       }
+                       numbytes -= br->btrb->size - br->consumed;
+                       btr_drop_buffer_reference(br);
+               }
+               assert(true);
+       }
+       /*
+
+       We have a wrap buffer, consume from it. If in total,
+       i.e. including previous calls to brt_consume(), less than
+       wrap_count has been consumed, there's nothing more we can do.
+
+       Otherwise we drop the wrap buffer and consume from subsequent
+       buffers of the input queue the correct amount of bytes. This
+       is the total number of bytes that have been consumed from the
+       wrap buffer.
+*/
+       PARA_DEBUG_LOG("consuming %zu/%zu bytes from wrap buffer\n", numbytes,
+               br_available_bytes(br));
+
+       assert(numbytes <= br_available_bytes(br));
+       if (br->consumed + numbytes < br->wrap_count) {
+               br->consumed += numbytes;
+               return;
+       }
+       PARA_DEBUG_LOG("dropping wrap buffer (%zu bytes)\n", br->btrb->size);
+       /* get rid of the wrap buffer */
+       sz = br->consumed + numbytes;
+       btr_drop_buffer_reference(br);
+       return btr_consume(btrn, sz);
 }
 
 static void flush_input_queue(struct btr_node *btrn)
@@ -197,43 +489,52 @@ static void flush_input_queue(struct btr_node *btrn)
                btr_drop_buffer_reference(br);
 }
 
-void btr_del_node(struct btr_node *btrn)
+void btr_free_node(struct btr_node *btrn)
+{
+       if (!btrn)
+               return;
+       free(btrn->name);
+       free(btrn);
+}
+
+void btr_remove_node(struct btr_node *btrn)
 {
        struct btr_node *ch;
 
        if (!btrn)
                return;
-       PARA_NOTICE_LOG("deleting %s\n", btrn->name);
+       PARA_NOTICE_LOG("removing btr node %s from buffer tree\n", btrn->name);
        FOR_EACH_CHILD(ch, btrn)
                ch->parent = NULL;
        flush_input_queue(btrn);
        if (btrn->parent)
                list_del(&btrn->node);
-       free(btrn->name);
-       free(btrn);
 }
 
 size_t btr_get_input_queue_size(struct btr_node *btrn)
 {
        struct btr_buffer_reference *br;
-       size_t size = 0;
+       size_t size = 0, wrap_consumed = 0;
 
        FOR_EACH_BUFFER_REF(br, btrn) {
-               //PARA_CRIT_LOG("size: %zu\n", size);
+               if (br->wrap_count != 0) {
+                       wrap_consumed = br->consumed;
+                       continue;
+               }
                size += br_available_bytes(br);
        }
+       assert(wrap_consumed <= size);
+       size -= wrap_consumed;
        return size;
 }
 
-int btr_splice_out_node(struct btr_node *btrn)
+void btr_splice_out_node(struct btr_node *btrn)
 {
        struct btr_node *ch, *tmp;
 
-       if (!btrn)
-               return -ERRNO_TO_PARA_ERROR(EINVAL);
-       if (btr_get_input_queue_size(btrn) != 0)
-               return -ERRNO_TO_PARA_ERROR(EINVAL);
+       assert(btrn);
        PARA_NOTICE_LOG("splicing out %s\n", btrn->name);
+       btr_pushdown(btrn);
        if (btrn->parent)
                list_del(&btrn->node);
        FOR_EACH_CHILD_SAFE(ch, tmp, btrn) {
@@ -244,9 +545,6 @@ int btr_splice_out_node(struct btr_node *btrn)
                        list_move(&ch->node, &btrn->parent->children);
        }
        assert(list_empty(&btrn->children));
-       free(btrn->name);
-       free(btrn);
-       return 1;
 }
 
 /**
@@ -304,14 +602,112 @@ void *btr_context(struct btr_node *btrn)
        return btrn->context;
 }
 
+static bool need_buffer_pool_merge(struct btr_node *btrn)
+{
+       struct btr_buffer_reference *br = get_first_input_br(btrn);
+
+       if (!br)
+               return false;
+       if (br->wrap_count != 0)
+               return true;
+       if (br->btrb->pool)
+               return true;
+       return false;
+}
+
+static void merge_input_pool(struct btr_node *btrn, size_t dest_size)
+{
+       struct btr_buffer_reference *br, *wbr = NULL;
+       int num_refs; /* including wrap buffer */
+       char *buf, *buf1 = NULL, *buf2 = NULL;
+       size_t sz, sz1 = 0, sz2 = 0, wsz;
+
+       br = get_first_input_br(btrn);
+       if (!br || br_available_bytes(br) >= dest_size)
+               return;
+       num_refs = 0;
+       FOR_EACH_BUFFER_REF(br, btrn) {
+               num_refs++;
+               sz = btr_get_buffer_by_reference(br, &buf);
+               if (sz == 0)
+                       break;
+               if (br->wrap_count != 0) {
+                       assert(!wbr);
+                       assert(num_refs == 1);
+                       wbr = br;
+                       if (sz >= dest_size)
+                               return;
+                       continue;
+               }
+               if (!buf1) {
+                       buf1 = buf;
+                       sz1 = sz;
+                       goto next;
+               }
+               if (buf1 + sz1 == buf) {
+                       sz1 += sz;
+                       goto next;
+               }
+               if (!buf2) {
+                       buf2 = buf;
+                       sz2 = sz;
+                       goto next;
+               }
+               assert(buf2 + sz2 == buf);
+               sz2 += sz;
+next:
+               if (sz1 + sz2 >= dest_size)
+                       break;
+       }
+       if (!buf2) /* nothing to do */
+               return;
+       assert(buf1 && sz2 > 0);
+       /*
+        * If the second buffer is large, we only take the first part of it to
+        * avoid having to memcpy() huge buffers.
+        */
+       sz2 = PARA_MIN(sz2, (size_t)(64 * 1024));
+       if (!wbr) {
+               /* Make a new wrap buffer combining buf1 and buf2. */
+               sz = sz1 + sz2;
+               buf = para_malloc(sz);
+               PARA_DEBUG_LOG("merging input buffers: (%p:%zu, %p:%zu) -> %p:%zu\n",
+                       buf1, sz1, buf2, sz2, buf, sz);
+               memcpy(buf, buf1, sz1);
+               memcpy(buf + sz1, buf2, sz2);
+               br = para_calloc(sizeof(*br));
+               br->btrb = new_btrb(buf, sz);
+               br->btrb->refcount = 1;
+               br->consumed = 0;
+               /* This is a wrap buffer */
+               br->wrap_count = sz1;
+               para_list_add(&br->node, &btrn->input_queue);
+               return;
+       }
+       PARA_DEBUG_LOG("increasing wrap buffer, sz1: %zu, sz2: %zu\n", sz1, sz2);
+       /*
+        * We already have a wrap buffer, but it is too small. It might be
+        * partially used.
+        */
+       wsz = br_available_bytes(wbr);
+       if (wbr->wrap_count == sz1 && wbr->btrb->size >= sz1 + sz2) /* nothing we can do about it */
+               return;
+       sz = sz1 + sz2 - wbr->btrb->size; /* amount of new data */
+       wbr->btrb->size += sz;
+       wbr->btrb->buf = para_realloc(wbr->btrb->buf, wbr->btrb->size);
+       /* copy the new data to the end of the reallocated buffer */
+       assert(sz2 >= sz);
+       memcpy(wbr->btrb->buf + wbr->btrb->size - sz, buf2 + sz2 - sz, sz);
+}
+
 /**
  * Merge the first two input buffers into one.
  *
  * This is a quite expensive operation.
  *
- * \return The number of buffers that have been merged (zero, one or two).
+ * \return The number of buffers that have been available (zero, one or two).
  */
-int btr_merge(struct btr_node *btrn)
+static int merge_input(struct btr_node *btrn)
 {
        struct btr_buffer_reference *brs[2], *br;
        char *bufs[2], *buf;
@@ -333,17 +729,15 @@ int btr_merge(struct btr_node *btrn)
        }
        /* make a new btrb that combines the two buffers and a br to it. */
        sz = szs[0] + szs[1];
-       //PARA_CRIT_LOG("merging input buffers: (%zu, %zu) -> %zu\n",
-       //      szs[0], szs[1], sz);
        buf = para_malloc(sz);
-       /* TODO: Avoid this memcopy by introducing btr buffer pool. */
+       PARA_DEBUG_LOG("%s: memory merging input buffers: (%zu, %zu) -> %zu\n",
+               btrn->name, szs[0], szs[1], sz);
        memcpy(buf, bufs[0], szs[0]);
        memcpy(buf + szs[0], bufs[1], szs[1]);
 
-       br = para_malloc(sizeof(*br));
+       br = para_calloc(sizeof(*br));
        br->btrb = new_btrb(buf, sz);
        br->btrb->refcount = 1;
-       br->consumed = 0;
 
        /* replace the first two refs by the new one */
        btr_drop_buffer_reference(brs[0]);
@@ -351,3 +745,94 @@ int btr_merge(struct btr_node *btrn)
        para_list_add(&br->node, &btrn->input_queue);
        return 2;
 }
+
+void btr_merge(struct btr_node *btrn, size_t dest_size)
+{
+       if (need_buffer_pool_merge(btrn))
+               return merge_input_pool(btrn, dest_size);
+       for (;;) {
+               char *buf;
+               size_t len = btr_next_buffer(btrn, &buf);
+               if (len >= dest_size)
+                       return;
+               PARA_DEBUG_LOG("input size = %zu < %zu = dest\n", len, dest_size);
+               if (merge_input(btrn) < 2)
+                       return;
+       }
+}
+
+bool btr_eof(struct btr_node *btrn)
+{
+       char *buf;
+       size_t len = btr_next_buffer(btrn, &buf);
+
+       return (len == 0 && btr_no_parent(btrn));
+}
+
+void log_tree_recursively(struct btr_node *btrn, int loglevel, int depth)
+{
+       struct btr_node *ch;
+       const char spaces[] = "                 ", *space = spaces + 16 - depth;
+
+       if (depth > 16)
+               return;
+       para_log(loglevel, "%s%s\n", space, btrn->name);
+       FOR_EACH_CHILD(ch, btrn)
+               log_tree_recursively(ch, loglevel, depth + 1);
+}
+
+void btr_log_tree(struct btr_node *btrn, int loglevel)
+{
+       return log_tree_recursively(btrn, loglevel, 0);
+}
+
+/*
+ * \return \a root if \a name is \p NULL.
+ */
+struct btr_node *btr_search_node(const char *name, struct btr_node *root)
+{
+       struct btr_node *ch;
+
+       if (!name)
+               return root;
+       if (!strcmp(root->name, name))
+               return root;
+       FOR_EACH_CHILD(ch, root) {
+               struct btr_node *result = btr_search_node(name, ch);
+               if (result)
+                       return result;
+       }
+       return NULL;
+}
+
+/** 640K ought to be enough for everybody ;) */
+#define BTRN_MAX_PENDING (640 * 1024)
+
+int btr_node_status(struct btr_node *btrn, size_t min_iqs,
+       enum btr_node_type type)
+{
+       size_t iqs;
+
+       assert(btrn);
+       if (type != BTR_NT_LEAF) {
+               if (btr_no_children(btrn))
+                       return -E_BTR_NO_CHILD;
+               if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
+                       return 0;
+       }
+       if (type != BTR_NT_ROOT) {
+               if (btr_eof(btrn))
+                       return -E_BTR_EOF;
+               iqs = btr_get_input_queue_size(btrn);
+               if (iqs == 0) /* we have a parent, because not eof */
+                       return 0;
+               if (iqs < min_iqs && !btr_no_parent(btrn))
+                       return 0;
+       }
+       return 1;
+}
+
+void btr_get_node_start(struct btr_node *btrn, struct timeval *tv)
+{
+       *tv = btrn->start;
+}