962359fbfa253790880f1380551335eaf9861c6d
[paraslash.git] / buffer_tree.c
1 #include <regex.h>
2 #include <stdbool.h>
3
4 #include "para.h"
5 #include "list.h"
6 #include "string.h"
7 #include "buffer_tree.h"
8 #include "error.h"
9 #include "sched.h"
10
11 /* whead = NULL means area full */
12 struct btr_pool {
13         char *name;
14         char *area_start;
15         char *area_end;
16         char *rhead;
17         char *whead;
18 };
19
20 enum btr_buffer_flags {
21         /* changes the way the buffer is deallocated */
22         BTR_BF_BTR_POOL = 1,
23 };
24
25 struct btr_buffer {
26         char *buf;
27         size_t size;
28         /** The number of references to this buffer. */
29         int refcount;
30         struct btr_pool *pool;
31 };
32
33 struct btr_buffer_reference {
34         struct btr_buffer *btrb;
35         size_t consumed;
36         /* Each buffer reference belongs to the buffer queue list of some buffer tree node. */
37         struct list_head node;
38         size_t wrap_count;
39 };
40
41 struct btr_node {
42         char *name;
43         struct btr_node *parent;
44         /* The position of this btr node in the buffer tree. */
45         struct list_head node;
46         /* The children nodes of this btr node are linked together in a list. */
47         struct list_head children;
48         /* Time of first data transfer. */
49         struct timeval start;
50         /**
51          * The input queue is a list of references to btr buffers. Each item on
52          * the list represents an input buffer which has not been completely
53          * used by this btr node.
54          */
55         struct list_head input_queue;
56         btr_command_handler execute;
57         void *context;
58 };
59
60 /**
61  * Create a new buffer pool.
62  *
63  * \param name The name of the new buffer pool.
64  *
65  * \param area The size in bytes of the pool area.
66  *
67  * \return An opaque pointer to the newly created buffer pool. It must be
68  * passed to btr_pool_free() after it is no longer used to deallocate all
69  * resources.
70  */
71 struct btr_pool *btr_pool_new(const char *name, size_t area_size)
72 {
73         struct btr_pool *btrp;
74
75         PARA_INFO_LOG("%s, %zu bytes\n", name, area_size);
76         btrp = para_malloc(sizeof(*btrp));
77         btrp->area_start = para_malloc(area_size);
78         btrp->area_end = btrp->area_start + area_size;
79         btrp->rhead = btrp->area_start;
80         btrp->whead = btrp->area_start;
81         btrp->name = para_strdup(name);
82         return btrp;
83 }
84
85 /**
86  * Dellocate resources used by a buffer pool.
87  *
88  * \param btrp A pointer obtained via btr_pool_new().
89  */
90 void btr_pool_free(struct btr_pool *btrp)
91 {
92         if (!btrp)
93                 return;
94         free(btrp->area_start);
95         free(btrp->name);
96         free(btrp);
97 }
98
99 /**
100  * Return the size of the buffer pool area.
101  *
102  * \param btrp The buffer pool.
103  *
104  * \return The same value which was passed during creation time to
105  * btr_pool_new().
106  */
107 size_t btr_pool_size(struct btr_pool *btrp)
108 {
109         return btrp->area_end - btrp->area_start;
110 }
111
112 size_t btr_pool_filled(struct btr_pool *btrp)
113 {
114         if (!btrp->whead)
115                 return btr_pool_size(btrp);
116         if (btrp->rhead <= btrp->whead)
117                 return  btrp->whead - btrp->rhead;
118         return btr_pool_size(btrp) - (btrp->rhead - btrp->whead);
119 }
120
121 size_t btr_pool_unused(struct btr_pool *btrp)
122 {
123         return btr_pool_size(btrp) - btr_pool_filled(btrp);
124 }
125
126 /*
127  * Return maximal size available for one read. This is
128  * smaller than the value returned by btr_pool_unused().
129  */
130 size_t btr_pool_available(struct btr_pool *btrp)
131 {
132         if (!btrp->whead)
133                 return 0;
134         if (btrp->rhead <= btrp->whead)
135                 return btrp->area_end - btrp->whead;
136         return btrp->rhead - btrp->whead;
137 }
138
139 size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result)
140 {
141         if (result)
142                 *result = btrp->whead;
143         return btr_pool_available(btrp);
144 }
145
146 void btr_pool_allocate(struct btr_pool *btrp, size_t size)
147 {
148         char *end;
149
150         if (size == 0)
151                 return;
152         assert(size <= btr_pool_available(btrp));
153         end = btrp->whead + size;
154         assert(end <= btrp->area_end);
155
156         if (end == btrp->area_end) {
157                 PARA_DEBUG_LOG("%s: end of pool area reached\n", btrp->name);
158                 end = btrp->area_start;
159         }
160         if (end == btrp->rhead) {
161                 PARA_DEBUG_LOG("%s btrp buffer full\n", btrp->name);
162                 end = NULL; /* buffer full */
163         }
164         btrp->whead = end;
165 }
166
167 static void btr_pool_deallocate(struct btr_pool *btrp, size_t size)
168 {
169         char *end = btrp->rhead + size;
170
171         if (size == 0)
172                 return;
173         assert(end <= btrp->area_end);
174         assert(size <= btr_pool_filled(btrp));
175         if (end == btrp->area_end)
176                 end = btrp->area_start;
177         if (!btrp->whead)
178                 btrp->whead = btrp->rhead;
179         btrp->rhead = end;
180         if (btrp->rhead == btrp->whead)
181                 btrp->rhead = btrp->whead = btrp->area_start;
182 }
183
184 #define FOR_EACH_CHILD(_tn, _btrn) list_for_each_entry((_tn), \
185         &((_btrn)->children), node)
186 #define FOR_EACH_CHILD_SAFE(_tn, _tmp, _btrn) \
187         list_for_each_entry_safe((_tn), (_tmp), &((_btrn)->children), node)
188
189 #define FOR_EACH_BUFFER_REF(_br, _btrn) \
190         list_for_each_entry((_br), &(_btrn)->input_queue, node)
191 #define FOR_EACH_BUFFER_REF_SAFE(_br, _tmp, _btrn) \
192         list_for_each_entry_safe((_br), (_tmp), &(_btrn)->input_queue, node)
193
194 struct btr_node *btr_new_node(struct btr_node_description *bnd)
195 {
196         struct btr_node *btrn = para_malloc(sizeof(*btrn));
197
198         btrn->name = para_strdup(bnd->name);
199         btrn->parent = bnd->parent;
200         btrn->execute = bnd->handler;
201         btrn->context = bnd->context;
202         btrn->start.tv_sec = 0;
203         btrn->start.tv_usec = 0;
204         INIT_LIST_HEAD(&btrn->children);
205         INIT_LIST_HEAD(&btrn->input_queue);
206         if (!bnd->child) {
207                 if (bnd->parent) {
208                         list_add_tail(&btrn->node, &bnd->parent->children);
209                         PARA_INFO_LOG("new leaf node: %s (child of %s)\n",
210                                 bnd->name, bnd->parent->name);
211                 } else
212                         PARA_INFO_LOG("added %s as btr root\n", bnd->name);
213                 goto out;
214         }
215         if (!bnd->parent) {
216                 assert(!bnd->child->parent);
217                 PARA_INFO_LOG("new root: %s (was %s)\n",
218                         bnd->name, bnd->child->name);
219                 btrn->parent = NULL;
220                 list_add_tail(&bnd->child->node, &btrn->children);
221                 /* link it in */
222                 bnd->child->parent = btrn;
223                 goto out;
224         }
225         PARA_EMERG_LOG("inserting internal nodes not yet supported.\n");
226         exit(EXIT_FAILURE);
227         assert(bnd->child->parent == bnd->parent);
228 out:
229         return btrn;
230 }
231
232 /*
233  * Allocate a new btr buffer.
234  *
235  * The freshly allocated buffer will have a zero refcount and will
236  * not be associated with a btr pool.
237  */
238 static struct btr_buffer *new_btrb(char *buf, size_t size)
239 {
240         struct btr_buffer *btrb = para_calloc(sizeof(*btrb));
241
242         btrb->buf = buf;
243         btrb->size = size;
244         return btrb;
245 }
246
247 static void dealloc_buffer(struct btr_buffer *btrb)
248 {
249         if (btrb->pool)
250                 btr_pool_deallocate(btrb->pool, btrb->size);
251         else
252                 free(btrb->buf);
253 }
254
255 static struct btr_buffer_reference *get_first_input_br(struct btr_node *btrn)
256 {
257         if (list_empty(&btrn->input_queue))
258                 return NULL;
259         return list_first_entry(&btrn->input_queue,
260                 struct btr_buffer_reference, node);
261 }
262
263 /*
264  * Deallocate the reference, release the resources if refcount drops to zero.
265  */
266 static void btr_drop_buffer_reference(struct btr_buffer_reference *br)
267 {
268         struct btr_buffer *btrb = br->btrb;
269
270         list_del(&br->node);
271         free(br);
272         btrb->refcount--;
273         if (btrb->refcount == 0) {
274                 dealloc_buffer(btrb);
275                 free(btrb);
276         }
277 }
278
279 static void add_btrb_to_children(struct btr_buffer *btrb,
280                 struct btr_node *btrn, size_t consumed)
281 {
282         struct btr_node *ch;
283
284         if (btrn->start.tv_sec == 0)
285                 btrn->start = *now;
286         FOR_EACH_CHILD(ch, btrn) {
287                 struct btr_buffer_reference *br = para_calloc(sizeof(*br));
288                 br->btrb = btrb;
289                 br->consumed = consumed;
290                 list_add_tail(&br->node, &ch->input_queue);
291                 btrb->refcount++;
292                 if (ch->start.tv_sec == 0)
293                         ch->start = *now;
294         }
295 }
296
297 void btr_add_output(char *buf, size_t size, struct btr_node *btrn)
298 {
299         struct btr_buffer *btrb;
300
301         assert(size != 0);
302         if (list_empty(&btrn->children)) {
303                 free(buf);
304                 return;
305         }
306         btrb = new_btrb(buf, size);
307         add_btrb_to_children(btrb, btrn, 0);
308 }
309
310 void btr_add_output_pool(struct btr_pool *btrp, size_t size,
311                 struct btr_node *btrn)
312 {
313         struct btr_buffer *btrb;
314         char *buf;
315         size_t avail;
316
317         assert(size != 0);
318         if (list_empty(&btrn->children))
319                 return;
320         avail = btr_pool_get_buffer(btrp, &buf);
321         assert(avail >= size);
322         btr_pool_allocate(btrp, size);
323         btrb = new_btrb(buf, size);
324         btrb->pool = btrp;
325         add_btrb_to_children(btrb, btrn, 0);
326 }
327
328 void btr_copy(const void *src, size_t n, struct btr_pool *btrp,
329         struct btr_node *btrn)
330 {
331         char *buf;
332         size_t sz, copy;
333
334         if (n == 0)
335                 return;
336         assert(n <= btr_pool_unused(btrp));
337         sz = btr_pool_get_buffer(btrp, &buf);
338         copy = PARA_MIN(sz, n);
339         memcpy(buf, src, copy);
340         btr_add_output_pool(btrp, copy, btrn);
341         if (copy == n)
342                 return;
343         sz = btr_pool_get_buffer(btrp, &buf);
344         assert(sz >= n - copy);
345         memcpy(buf, src + copy, n - copy);
346         btr_add_output_pool(btrp, n - copy, btrn);
347 }
348
349 static void btr_pushdown_br(struct btr_buffer_reference *br, struct btr_node *btrn)
350 {
351         add_btrb_to_children(br->btrb, btrn, br->consumed);
352         btr_drop_buffer_reference(br);
353 }
354
355 void btr_pushdown(struct btr_node *btrn)
356 {
357         struct btr_buffer_reference *br, *tmp;
358
359         FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
360                 btr_pushdown_br(br, btrn);
361 }
362
363 int btr_pushdown_one(struct btr_node *btrn)
364 {
365         struct btr_buffer_reference *br;
366
367         if (list_empty(&btrn->input_queue))
368                 return 0;
369         br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
370         btr_pushdown_br(br, btrn);
371         return 1;
372 }
373
374 /* Return true if this node has no children. */
375 bool btr_no_children(struct btr_node *btrn)
376 {
377         return list_empty(&btrn->children);
378 }
379
380 bool btr_no_parent(struct btr_node *btrn)
381 {
382         return !btrn->parent;
383 }
384
385 bool btr_inplace_ok(struct btr_node *btrn)
386 {
387         if (!btrn->parent)
388                 return true;
389         return list_is_singular(&btrn->parent->children);
390 }
391
392 static inline size_t br_available_bytes(struct btr_buffer_reference *br)
393 {
394         return br->btrb->size - br->consumed;
395 }
396
397 size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
398 {
399         if (buf)
400                 *buf = br->btrb->buf + br->consumed;
401         return br_available_bytes(br);
402 }
403
404 /**
405  * \return zero if the input buffer queue is empty.
406  */
407 size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
408 {
409         struct btr_buffer_reference *br;
410         char *buf, *result = NULL;
411         size_t sz, rv = 0;
412
413         FOR_EACH_BUFFER_REF(br, btrn) {
414                 sz = btr_get_buffer_by_reference(br, &buf);
415                 if (!result) {
416                         result = buf;
417                         rv = sz;
418                         if (!br->btrb->pool)
419                                 break;
420                         continue;
421                 }
422                 if (!br->btrb->pool)
423                         break;
424                 if (result + rv != buf)
425                         break;
426                 rv += sz;
427         }
428         if (bufp)
429                 *bufp = result;
430         return rv;
431 }
432
433 void btr_consume(struct btr_node *btrn, size_t numbytes)
434 {
435         struct btr_buffer_reference *br, *tmp;
436         size_t sz;
437
438         if (numbytes == 0)
439                 return;
440         br = get_first_input_br(btrn);
441         assert(br);
442
443         if (br->wrap_count == 0) {
444                 /*
445                  * No wrap buffer. Drop buffer references whose buffer
446                  * has been fully used. */
447                 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn) {
448                         if (br->consumed + numbytes <= br->btrb->size) {
449                                 br->consumed += numbytes;
450                                 if (br->consumed == br->btrb->size)
451                                         btr_drop_buffer_reference(br);
452                                 return;
453                         }
454                         numbytes -= br->btrb->size - br->consumed;
455                         btr_drop_buffer_reference(br);
456                 }
457                 assert(true);
458         }
459         /*
460
461         We have a wrap buffer, consume from it. If in total,
462         i.e. including previous calls to brt_consume(), less than
463         wrap_count has been consumed, there's nothing more we can do.
464
465         Otherwise we drop the wrap buffer and consume from subsequent
466         buffers of the input queue the correct amount of bytes. This
467         is the total number of bytes that have been consumed from the
468         wrap buffer.
469 */
470         PARA_DEBUG_LOG("consuming %zu/%zu bytes from wrap buffer\n", numbytes,
471                 br_available_bytes(br));
472
473         assert(numbytes <= br_available_bytes(br));
474         if (br->consumed + numbytes < br->wrap_count) {
475                 br->consumed += numbytes;
476                 return;
477         }
478         PARA_DEBUG_LOG("dropping wrap buffer (%zu bytes)\n", br->btrb->size);
479         /* get rid of the wrap buffer */
480         sz = br->consumed + numbytes;
481         btr_drop_buffer_reference(br);
482         return btr_consume(btrn, sz);
483 }
484
485 static void flush_input_queue(struct btr_node *btrn)
486 {
487         struct btr_buffer_reference *br, *tmp;
488         FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
489                 btr_drop_buffer_reference(br);
490 }
491
492 void btr_free_node(struct btr_node *btrn)
493 {
494         if (!btrn)
495                 return;
496         free(btrn->name);
497         free(btrn);
498 }
499
500 void btr_remove_node(struct btr_node *btrn)
501 {
502         struct btr_node *ch;
503
504         if (!btrn)
505                 return;
506         PARA_NOTICE_LOG("removing btr node %s from buffer tree\n", btrn->name);
507         FOR_EACH_CHILD(ch, btrn)
508                 ch->parent = NULL;
509         flush_input_queue(btrn);
510         if (btrn->parent)
511                 list_del(&btrn->node);
512 }
513
514 size_t btr_get_input_queue_size(struct btr_node *btrn)
515 {
516         struct btr_buffer_reference *br;
517         size_t size = 0, wrap_consumed = 0;
518
519         FOR_EACH_BUFFER_REF(br, btrn) {
520                 if (br->wrap_count != 0) {
521                         wrap_consumed = br->consumed;
522                         continue;
523                 }
524                 size += br_available_bytes(br);
525         }
526         assert(wrap_consumed <= size);
527         size -= wrap_consumed;
528         return size;
529 }
530
531 void btr_splice_out_node(struct btr_node *btrn)
532 {
533         struct btr_node *ch, *tmp;
534
535         assert(btrn);
536         PARA_NOTICE_LOG("splicing out %s\n", btrn->name);
537         btr_pushdown(btrn);
538         if (btrn->parent)
539                 list_del(&btrn->node);
540         FOR_EACH_CHILD_SAFE(ch, tmp, btrn) {
541                 PARA_INFO_LOG("parent(%s): %s\n", ch->name,
542                         btrn->parent? btrn->parent->name : "NULL");
543                 ch->parent = btrn->parent;
544                 if (btrn->parent)
545                         list_move(&ch->node, &btrn->parent->children);
546         }
547         assert(list_empty(&btrn->children));
548 }
549
550 /**
551  * Return the size of the largest input queue.
552  *
553  * Iterates over all children of the given node.
554  */
555 size_t btr_bytes_pending(struct btr_node *btrn)
556 {
557         size_t max_size = 0;
558         struct btr_node *ch;
559
560         FOR_EACH_CHILD(ch, btrn) {
561                 size_t size = btr_get_input_queue_size(ch);
562                 max_size = PARA_MAX(max_size, size);
563         }
564         return max_size;
565 }
566
567 int btr_exec(struct btr_node *btrn, const char *command, char **value_result)
568 {
569         if (!btrn)
570                 return -ERRNO_TO_PARA_ERROR(EINVAL);
571         if (!btrn->execute)
572                 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
573         return btrn->execute(btrn, command, value_result);
574 }
575
576 int btr_exec_up(struct btr_node *btrn, const char *command, char **value_result)
577 {
578         int ret;
579
580         for (; btrn; btrn = btrn->parent) {
581                 struct btr_node *parent = btrn->parent;
582                 if (!parent)
583                         return -ERRNO_TO_PARA_ERROR(ENOTSUP);
584                 if (!parent->execute)
585                         continue;
586                 PARA_INFO_LOG("parent: %s, cmd: %s\n", parent->name, command);
587                 ret = parent->execute(parent, command, value_result);
588                 if (ret == -ERRNO_TO_PARA_ERROR(ENOTSUP))
589                         continue;
590                 if (ret < 0)
591                         return ret;
592                 if (value_result && *value_result)
593                         PARA_NOTICE_LOG("%s(%s): %s\n", command, parent->name,
594                                 *value_result);
595                 return 1;
596         }
597         return -ERRNO_TO_PARA_ERROR(ENOTSUP);
598 }
599
600 void *btr_context(struct btr_node *btrn)
601 {
602         return btrn->context;
603 }
604
605 static bool need_buffer_pool_merge(struct btr_node *btrn)
606 {
607         struct btr_buffer_reference *br = get_first_input_br(btrn);
608
609         if (!br)
610                 return false;
611         if (br->wrap_count != 0)
612                 return true;
613         if (br->btrb->pool)
614                 return true;
615         return false;
616 }
617
618 static void merge_input_pool(struct btr_node *btrn, size_t dest_size)
619 {
620         struct btr_buffer_reference *br, *wbr = NULL;
621         int num_refs; /* including wrap buffer */
622         char *buf, *buf1 = NULL, *buf2 = NULL;
623         size_t sz, sz1 = 0, sz2 = 0, wsz;
624
625         br = get_first_input_br(btrn);
626         if (!br || br_available_bytes(br) >= dest_size)
627                 return;
628         num_refs = 0;
629         FOR_EACH_BUFFER_REF(br, btrn) {
630                 num_refs++;
631                 sz = btr_get_buffer_by_reference(br, &buf);
632                 if (sz == 0)
633                         break;
634                 if (br->wrap_count != 0) {
635                         assert(!wbr);
636                         assert(num_refs == 1);
637                         wbr = br;
638                         if (sz >= dest_size)
639                                 return;
640                         continue;
641                 }
642                 if (!buf1) {
643                         buf1 = buf;
644                         sz1 = sz;
645                         goto next;
646                 }
647                 if (buf1 + sz1 == buf) {
648                         sz1 += sz;
649                         goto next;
650                 }
651                 if (!buf2) {
652                         buf2 = buf;
653                         sz2 = sz;
654                         goto next;
655                 }
656                 assert(buf2 + sz2 == buf);
657                 sz2 += sz;
658 next:
659                 if (sz1 + sz2 >= dest_size)
660                         break;
661         }
662         if (!buf2) /* nothing to do */
663                 return;
664         assert(buf1 && sz2 > 0);
665         /*
666          * If the second buffer is large, we only take the first part of it to
667          * avoid having to memcpy() huge buffers.
668          */
669         sz2 = PARA_MIN(sz2, (size_t)(64 * 1024));
670         if (!wbr) {
671                 /* Make a new wrap buffer combining buf1 and buf2. */
672                 sz = sz1 + sz2;
673                 buf = para_malloc(sz);
674                 PARA_DEBUG_LOG("merging input buffers: (%p:%zu, %p:%zu) -> %p:%zu\n",
675                         buf1, sz1, buf2, sz2, buf, sz);
676                 memcpy(buf, buf1, sz1);
677                 memcpy(buf + sz1, buf2, sz2);
678                 br = para_calloc(sizeof(*br));
679                 br->btrb = new_btrb(buf, sz);
680                 br->btrb->refcount = 1;
681                 br->consumed = 0;
682                 /* This is a wrap buffer */
683                 br->wrap_count = sz1;
684                 para_list_add(&br->node, &btrn->input_queue);
685                 return;
686         }
687         PARA_DEBUG_LOG("increasing wrap buffer, sz1: %zu, sz2: %zu\n", sz1, sz2);
688         /*
689          * We already have a wrap buffer, but it is too small. It might be
690          * partially used.
691          */
692         wsz = br_available_bytes(wbr);
693         if (wbr->wrap_count == sz1 && wbr->btrb->size >= sz1 + sz2) /* nothing we can do about it */
694                 return;
695         sz = sz1 + sz2 - wbr->btrb->size; /* amount of new data */
696         wbr->btrb->size += sz;
697         wbr->btrb->buf = para_realloc(wbr->btrb->buf, wbr->btrb->size);
698         /* copy the new data to the end of the reallocated buffer */
699         assert(sz2 >= sz);
700         memcpy(wbr->btrb->buf + wbr->btrb->size - sz, buf2 + sz2 - sz, sz);
701 }
702
703 /**
704  * Merge the first two input buffers into one.
705  *
706  * This is a quite expensive operation.
707  *
708  * \return The number of buffers that have been available (zero, one or two).
709  */
710 static int merge_input(struct btr_node *btrn)
711 {
712         struct btr_buffer_reference *brs[2], *br;
713         char *bufs[2], *buf;
714         size_t szs[2], sz;
715         int i;
716
717         if (list_empty(&btrn->input_queue))
718                 return 0;
719         if (list_is_singular(&btrn->input_queue))
720                 return 1;
721         i = 0;
722         /* get references to the first two buffers */
723         FOR_EACH_BUFFER_REF(br, btrn) {
724                 brs[i] = br;
725                 szs[i] = btr_get_buffer_by_reference(brs[i], bufs + i);
726                 i++;
727                 if (i == 2)
728                         break;
729         }
730         /* make a new btrb that combines the two buffers and a br to it. */
731         sz = szs[0] + szs[1];
732         buf = para_malloc(sz);
733         PARA_DEBUG_LOG("%s: memory merging input buffers: (%zu, %zu) -> %zu\n",
734                 btrn->name, szs[0], szs[1], sz);
735         memcpy(buf, bufs[0], szs[0]);
736         memcpy(buf + szs[0], bufs[1], szs[1]);
737
738         br = para_calloc(sizeof(*br));
739         br->btrb = new_btrb(buf, sz);
740         br->btrb->refcount = 1;
741
742         /* replace the first two refs by the new one */
743         btr_drop_buffer_reference(brs[0]);
744         btr_drop_buffer_reference(brs[1]);
745         para_list_add(&br->node, &btrn->input_queue);
746         return 2;
747 }
748
749 void btr_merge(struct btr_node *btrn, size_t dest_size)
750 {
751         if (need_buffer_pool_merge(btrn))
752                 return merge_input_pool(btrn, dest_size);
753         for (;;) {
754                 char *buf;
755                 size_t len = btr_next_buffer(btrn, &buf);
756                 if (len >= dest_size)
757                         return;
758                 PARA_DEBUG_LOG("input size = %zu < %zu = dest\n", len, dest_size);
759                 if (merge_input(btrn) < 2)
760                         return;
761         }
762 }
763
764 bool btr_eof(struct btr_node *btrn)
765 {
766         char *buf;
767         size_t len = btr_next_buffer(btrn, &buf);
768
769         return (len == 0 && btr_no_parent(btrn));
770 }
771
772 void log_tree_recursively(struct btr_node *btrn, int loglevel, int depth)
773 {
774         struct btr_node *ch;
775         const char spaces[] = "                 ", *space = spaces + 16 - depth;
776
777         if (depth > 16)
778                 return;
779         para_log(loglevel, "%s%s\n", space, btrn->name);
780         FOR_EACH_CHILD(ch, btrn)
781                 log_tree_recursively(ch, loglevel, depth + 1);
782 }
783
784 void btr_log_tree(struct btr_node *btrn, int loglevel)
785 {
786         return log_tree_recursively(btrn, loglevel, 0);
787 }
788
789 /*
790  * \return \a root if \a name is \p NULL.
791  */
792 struct btr_node *btr_search_node(const char *name, struct btr_node *root)
793 {
794         struct btr_node *ch;
795
796         if (!name)
797                 return root;
798         if (!strcmp(root->name, name))
799                 return root;
800         FOR_EACH_CHILD(ch, root) {
801                 struct btr_node *result = btr_search_node(name, ch);
802                 if (result)
803                         return result;
804         }
805         return NULL;
806 }
807
808 /** 640K ought to be enough for everybody ;) */
809 #define BTRN_MAX_PENDING (640 * 1024)
810
811 int btr_node_status(struct btr_node *btrn, size_t min_iqs,
812         enum btr_node_type type)
813 {
814         size_t iqs;
815
816         assert(btrn);
817         if (type != BTR_NT_LEAF) {
818                 if (btr_no_children(btrn))
819                         return -E_BTR_NO_CHILD;
820                 if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
821                         return 0;
822         }
823         if (type != BTR_NT_ROOT) {
824                 if (btr_eof(btrn))
825                         return -E_BTR_EOF;
826                 iqs = btr_get_input_queue_size(btrn);
827                 if (iqs == 0) /* we have a parent, because not eof */
828                         return 0;
829                 if (iqs < min_iqs && !btr_no_parent(btrn))
830                         return 0;
831         }
832         return 1;
833 }
834
835 void btr_get_node_start(struct btr_node *btrn, struct timeval *tv)
836 {
837         *tv = btrn->start;
838 }