e023bb146071748877aa2643dd414f73e1d78bba
[paraslash.git] / buffer_tree.c
1 #include <regex.h>
2 #include <stdbool.h>
3
4 #include "para.h"
5 #include "list.h"
6 #include "string.h"
7 #include "buffer_tree.h"
8 #include "error.h"
9 #include "sched.h"
10
11 struct btr_pool {
12         char *name;
13         char *area_start;
14         char *area_end;
15         char *rhead;
16         char *whead;
17 };
18
19 enum btr_buffer_flags {
20         /* changes the way the buffer is deallocated */
21         BTR_BF_BTR_POOL = 1,
22 };
23
24 struct btr_buffer {
25         char *buf;
26         size_t size;
27         /** The number of references to this buffer. */
28         int refcount;
29         struct btr_pool *pool;
30 };
31
32 struct btr_buffer_reference {
33         struct btr_buffer *btrb;
34         size_t consumed;
35         /* Each buffer reference belongs to the buffer queue list of some buffer tree node. */
36         struct list_head node;
37         size_t wrap_count;
38 };
39
40 struct btr_node {
41         char *name;
42         struct btr_node *parent;
43         /* The position of this btr node in the buffer tree. */
44         struct list_head node;
45         /* The children nodes of this btr node are linked together in a list. */
46         struct list_head children;
47         /* Time of first data transfer. */
48         struct timeval start;
49         /**
50          * The input queue is a list of references to btr buffers. Each item on
51          * the list represents an input buffer which has not been completely
52          * used by this btr node.
53          */
54         struct list_head input_queue;
55         btr_command_handler execute;
56         void *context;
57 };
58
59 struct btr_pool *btr_pool_new(const char *name, size_t area_size)
60 {
61         struct btr_pool *btrp;
62
63         PARA_INFO_LOG("%s, %zu bytes\n", name, area_size);
64         btrp = para_malloc(sizeof(*btrp));
65         btrp->area_start = para_malloc(area_size);
66         btrp->area_end = btrp->area_start + area_size;
67         btrp->rhead = btrp->area_start;
68         btrp->whead = btrp->area_start;
69         btrp->name = para_strdup(name);
70         return btrp;
71 }
72
73 /* whead = NULL means area full */
74
75 void btr_pool_free(struct btr_pool *btrp)
76 {
77         if (!btrp)
78                 return;
79         free(btrp->area_start);
80         free(btrp->name);
81         free(btrp);
82 }
83
84 size_t btr_pool_size(struct btr_pool *btrp)
85 {
86         return btrp->area_end - btrp->area_start;
87 }
88
89 size_t btr_pool_filled(struct btr_pool *btrp)
90 {
91         if (!btrp->whead)
92                 return btr_pool_size(btrp);
93         if (btrp->rhead <= btrp->whead)
94                 return  btrp->whead - btrp->rhead;
95         return btr_pool_size(btrp) - (btrp->rhead - btrp->whead);
96 }
97
98 size_t btr_pool_unused(struct btr_pool *btrp)
99 {
100         return btr_pool_size(btrp) - btr_pool_filled(btrp);
101 }
102
103 size_t btr_pool_available(struct btr_pool *btrp)
104 {
105         if (!btrp->whead)
106                 return 0;
107         if (btrp->rhead <= btrp->whead)
108                 return btrp->area_end - btrp->whead;
109         return btrp->rhead - btrp->whead;
110 }
111
112 size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result)
113 {
114         if (result)
115                 *result = btrp->whead;
116         return btr_pool_available(btrp);
117 }
118
119 void btr_pool_allocate(struct btr_pool *btrp, size_t size)
120 {
121         char *end;
122
123         if (size == 0)
124                 return;
125         //PARA_CRIT_LOG("filled: %zu, alloc %zu\n", btr_pool_filled(btrp), size);
126         assert(size <= btr_pool_available(btrp));
127         end = btrp->whead + size;
128         assert(end <= btrp->area_end);
129
130         if (end == btrp->area_end) {
131                 PARA_DEBUG_LOG("%s: end of pool area reached\n", btrp->name);
132                 end = btrp->area_start;
133         }
134         if (end == btrp->rhead) {
135                 PARA_DEBUG_LOG("btrp buffer full\n");
136                 end = NULL; /* buffer full */
137         }
138         btrp->whead = end;
139         //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
140 }
141
142 static void btr_pool_deallocate(struct btr_pool *btrp, size_t size)
143 {
144         char *end = btrp->rhead + size;
145
146         //PARA_CRIT_LOG("filled: %zu, dealloc %zu\n", btr_pool_filled(btrp), size);
147         if (size == 0)
148                 return;
149         assert(end <= btrp->area_end);
150         assert(size <= btr_pool_filled(btrp));
151         if (end == btrp->area_end)
152                 end = btrp->area_start;
153         if (!btrp->whead)
154                 btrp->whead = btrp->rhead;
155         btrp->rhead = end;
156         if (btrp->rhead == btrp->whead)
157                 btrp->rhead = btrp->whead = btrp->area_start;
158         //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
159 }
160
161 #define FOR_EACH_CHILD(_tn, _btrn) list_for_each_entry((_tn), \
162         &((_btrn)->children), node)
163 #define FOR_EACH_CHILD_SAFE(_tn, _tmp, _btrn) \
164         list_for_each_entry_safe((_tn), (_tmp), &((_btrn)->children), node)
165
166 #define FOR_EACH_BUFFER_REF(_br, _btrn) \
167         list_for_each_entry((_br), &(_btrn)->input_queue, node)
168 #define FOR_EACH_BUFFER_REF_SAFE(_br, _tmp, _btrn) \
169         list_for_each_entry_safe((_br), (_tmp), &(_btrn)->input_queue, node)
170
171 /*
172         (parent, child):
173         (NULL, NULL): new, isolated node.
174         (NULL, c): new node becomes root, c must be old root
175         (p, NULL): new leaf node
176         (p, c): new internal node, ch must be child of p
177
178 */
179 struct btr_node *btr_new_node(struct btr_node_description *bnd)
180 {
181         struct btr_node *btrn = para_malloc(sizeof(*btrn));
182
183         btrn->name = para_strdup(bnd->name);
184         btrn->parent = bnd->parent;
185         btrn->execute = bnd->handler;
186         btrn->context = bnd->context;
187         btrn->start.tv_sec = 0;
188         btrn->start.tv_usec = 0;
189         if (bnd->parent)
190                 list_add_tail(&btrn->node, &bnd->parent->children);
191         INIT_LIST_HEAD(&btrn->children);
192         INIT_LIST_HEAD(&btrn->input_queue);
193         if (bnd->parent)
194                 PARA_INFO_LOG("added %s as child of %s\n", bnd->name, bnd->parent->name);
195         else
196                 PARA_INFO_LOG("added %s as btr root\n", bnd->name);
197         return btrn;
198 }
199
200 /*
201  * Allocate a new btr buffer.
202  *
203  * The freshly allocated buffer will have a zero refcount and will
204  * not be associated with a btr pool.
205  */
206 static struct btr_buffer *new_btrb(char *buf, size_t size)
207 {
208         struct btr_buffer *btrb = para_calloc(sizeof(*btrb));
209
210         btrb->buf = buf;
211         btrb->size = size;
212         return btrb;
213 }
214
215 static void dealloc_buffer(struct btr_buffer *btrb)
216 {
217         if (btrb->pool)
218                 btr_pool_deallocate(btrb->pool, btrb->size);
219         else
220                 free(btrb->buf);
221 }
222
223 static struct btr_buffer_reference *get_first_input_br(struct btr_node *btrn)
224 {
225         if (list_empty(&btrn->input_queue))
226                 return NULL;
227         return list_first_entry(&btrn->input_queue,
228                 struct btr_buffer_reference, node);
229 }
230
231 /*
232  * Deallocate the reference, release the resources if refcount drops to zero.
233  */
234 static void btr_drop_buffer_reference(struct btr_buffer_reference *br)
235 {
236         struct btr_buffer *btrb = br->btrb;
237
238         //PARA_CRIT_LOG("dropping buffer reference %p\n", br);
239         list_del(&br->node);
240         free(br);
241         btrb->refcount--;
242         if (btrb->refcount == 0) {
243                 dealloc_buffer(btrb);
244                 free(btrb);
245         }
246 }
247
248 static void add_btrb_to_children(struct btr_buffer *btrb,
249                 struct btr_node *btrn, size_t consumed)
250 {
251         struct btr_node *ch;
252
253         if (btrn->start.tv_sec == 0)
254                 btrn->start = *now;
255         FOR_EACH_CHILD(ch, btrn) {
256                 struct btr_buffer_reference *br = para_calloc(sizeof(*br));
257                 br->btrb = btrb;
258                 br->consumed = consumed;
259                 list_add_tail(&br->node, &ch->input_queue);
260                 btrb->refcount++;
261                 if (ch->start.tv_sec == 0)
262                         ch->start = *now;
263         }
264 }
265
266 void btr_add_output(char *buf, size_t size, struct btr_node *btrn)
267 {
268         struct btr_buffer *btrb;
269
270         assert(size != 0);
271         if (list_empty(&btrn->children)) {
272                 free(buf);
273                 return;
274         }
275         btrb = new_btrb(buf, size);
276         add_btrb_to_children(btrb, btrn, 0);
277 }
278
279 void btr_add_output_pool(struct btr_pool *btrp, size_t size,
280                 struct btr_node *btrn)
281 {
282         struct btr_buffer *btrb;
283         char *buf;
284         size_t avail;
285
286         assert(size != 0);
287         if (list_empty(&btrn->children))
288                 return;
289         avail = btr_pool_get_buffer(btrp, &buf);
290         assert(avail >= size);
291         btr_pool_allocate(btrp, size);
292         btrb = new_btrb(buf, size);
293         btrb->pool = btrp;
294         add_btrb_to_children(btrb, btrn, 0);
295 }
296
297 void btr_copy(const void *src, size_t n, struct btr_pool *btrp,
298         struct btr_node *btrn)
299 {
300         char *buf;
301         size_t sz, copy;
302
303         if (n == 0)
304                 return;
305         assert(n <= btr_pool_unused(btrp));
306         sz = btr_pool_get_buffer(btrp, &buf);
307         copy = PARA_MIN(sz, n);
308         memcpy(buf, src, copy);
309         btr_add_output_pool(btrp, copy, btrn);
310         if (copy == n)
311                 return;
312         sz = btr_pool_get_buffer(btrp, &buf);
313         assert(sz >= n - copy);
314         memcpy(buf, src + copy, n - copy);
315         btr_add_output_pool(btrp, n - copy, btrn);
316 }
317
318 static void btr_pushdown_br(struct btr_buffer_reference *br, struct btr_node *btrn)
319 {
320         add_btrb_to_children(br->btrb, btrn, br->consumed);
321         btr_drop_buffer_reference(br);
322 }
323
324 void btr_pushdown(struct btr_node *btrn)
325 {
326         struct btr_buffer_reference *br, *tmp;
327
328         FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
329                 btr_pushdown_br(br, btrn);
330 }
331
332 int btr_pushdown_one(struct btr_node *btrn)
333 {
334         struct btr_buffer_reference *br;
335
336         if (list_empty(&btrn->input_queue))
337                 return 0;
338         br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
339         btr_pushdown_br(br, btrn);
340         return 1;
341 }
342
343 /* Return true if this node has no children. */
344 bool btr_no_children(struct btr_node *btrn)
345 {
346         return list_empty(&btrn->children);
347 }
348
349 bool btr_no_parent(struct btr_node *btrn)
350 {
351         return !btrn->parent;
352 }
353
354 bool btr_inplace_ok(struct btr_node *btrn)
355 {
356         if (!btrn->parent)
357                 return true;
358         return list_is_singular(&btrn->parent->children);
359 }
360
361 static inline size_t br_available_bytes(struct btr_buffer_reference *br)
362 {
363         return br->btrb->size - br->consumed;
364 }
365
366 size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
367 {
368         if (buf)
369                 *buf = br->btrb->buf + br->consumed;
370         return br_available_bytes(br);
371 }
372
373 /**
374  * \return zero if the input buffer queue is empty.
375  */
376 size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
377 {
378         struct btr_buffer_reference *br;
379         char *buf, *result = NULL;
380         size_t sz, rv = 0;
381
382         FOR_EACH_BUFFER_REF(br, btrn) {
383                 sz = btr_get_buffer_by_reference(br, &buf);
384                 if (!result) {
385                         result = buf;
386                         rv = sz;
387                         if (!br->btrb->pool)
388                                 break;
389                         continue;
390                 }
391                 if (!br->btrb->pool)
392                         break;
393                 if (result + rv != buf) {
394                         PARA_DEBUG_LOG("%s: pool merge impossible: %p != %p\n",
395                                 btrn->name, result + rv, buf);
396                         break;
397                 }
398 //              PARA_CRIT_LOG("%s: inplace merge (%zu, %zu)->%zu\n", btrn->name,
399 //                      rv, sz, rv + sz);
400 //              PARA_CRIT_LOG("%s: inplace merge %p (%zu)\n", btrn->name,
401 //                      result, sz);
402                 rv += sz;
403         }
404         if (bufp)
405                 *bufp = result;
406         return rv;
407 }
408
409 void btr_consume(struct btr_node *btrn, size_t numbytes)
410 {
411         struct btr_buffer_reference *br, *tmp;
412         size_t sz;
413
414         if (numbytes == 0)
415                 return;
416         br = get_first_input_br(btrn);
417         assert(br);
418
419         //PARA_CRIT_LOG("wrap count: %zu\n", br->wrap_count);
420         if (br->wrap_count == 0) {
421                 /*
422                  * No wrap buffer. Drop buffer references whose buffer
423                  * has been fully used. */
424                 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn) {
425                         if (br->consumed + numbytes <= br->btrb->size) {
426                                 br->consumed += numbytes;
427                                 if (br->consumed == br->btrb->size)
428                                         btr_drop_buffer_reference(br);
429                                 return;
430                         }
431                         numbytes -= br->btrb->size - br->consumed;
432                         btr_drop_buffer_reference(br);
433                 }
434                 assert(true);
435         }
436         /*
437
438         We have a wrap buffer, consume from it. If in total,
439         i.e. including previous calls to brt_consume(), less than
440         wrap_count has been consumed, there's nothing more we can do.
441
442         Otherwise we drop the wrap buffer and consume from subsequent
443         buffers of the input queue the correct amount of bytes. This
444         is the total number of bytes that have been consumed from the
445         wrap buffer.
446 */
447         PARA_DEBUG_LOG("consuming %zu/%zu bytes from wrap buffer\n", numbytes,
448                 br_available_bytes(br));
449
450         assert(numbytes <= br_available_bytes(br));
451         if (br->consumed + numbytes < br->wrap_count) {
452                 br->consumed += numbytes;
453                 return;
454         }
455         PARA_DEBUG_LOG("dropping wrap buffer (%zu bytes)\n", br->btrb->size);
456         /* get rid of the wrap buffer */
457         sz = br->consumed + numbytes;
458         btr_drop_buffer_reference(br);
459         return btr_consume(btrn, sz);
460 }
461
462 static void flush_input_queue(struct btr_node *btrn)
463 {
464         struct btr_buffer_reference *br, *tmp;
465         FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
466                 btr_drop_buffer_reference(br);
467 }
468
469 void btr_free_node(struct btr_node *btrn)
470 {
471         if (!btrn)
472                 return;
473         free(btrn->name);
474         free(btrn);
475 }
476
477 void btr_remove_node(struct btr_node *btrn)
478 {
479         struct btr_node *ch;
480
481         if (!btrn)
482                 return;
483         PARA_NOTICE_LOG("removing btr node %s from buffer tree\n", btrn->name);
484         FOR_EACH_CHILD(ch, btrn)
485                 ch->parent = NULL;
486         flush_input_queue(btrn);
487         if (btrn->parent)
488                 list_del(&btrn->node);
489 }
490
491 size_t btr_get_input_queue_size(struct btr_node *btrn)
492 {
493         struct btr_buffer_reference *br;
494         size_t size = 0;
495
496         FOR_EACH_BUFFER_REF(br, btrn) {
497                 //PARA_CRIT_LOG("size: %zu\n", size);
498                 size += br_available_bytes(br);
499         }
500         return size;
501 }
502
503 void btr_splice_out_node(struct btr_node *btrn)
504 {
505         struct btr_node *ch, *tmp;
506
507         assert(btrn);
508         PARA_NOTICE_LOG("splicing out %s\n", btrn->name);
509         btr_pushdown(btrn);
510         if (btrn->parent)
511                 list_del(&btrn->node);
512         FOR_EACH_CHILD_SAFE(ch, tmp, btrn) {
513                 PARA_INFO_LOG("parent(%s): %s\n", ch->name,
514                         btrn->parent? btrn->parent->name : "NULL");
515                 ch->parent = btrn->parent;
516                 if (btrn->parent)
517                         list_move(&ch->node, &btrn->parent->children);
518         }
519         assert(list_empty(&btrn->children));
520 }
521
522 /**
523  * Return the size of the largest input queue.
524  *
525  * Iterates over all children of the given node.
526  */
527 size_t btr_bytes_pending(struct btr_node *btrn)
528 {
529         size_t max_size = 0;
530         struct btr_node *ch;
531
532         FOR_EACH_CHILD(ch, btrn) {
533                 size_t size = btr_get_input_queue_size(ch);
534                 max_size = PARA_MAX(max_size, size);
535         }
536         return max_size;
537 }
538
539 int btr_exec(struct btr_node *btrn, const char *command, char **value_result)
540 {
541         if (!btrn)
542                 return -ERRNO_TO_PARA_ERROR(EINVAL);
543         if (!btrn->execute)
544                 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
545         return btrn->execute(btrn, command, value_result);
546 }
547
548 int btr_exec_up(struct btr_node *btrn, const char *command, char **value_result)
549 {
550         int ret;
551
552         for (; btrn; btrn = btrn->parent) {
553                 struct btr_node *parent = btrn->parent;
554                 if (!parent)
555                         return -ERRNO_TO_PARA_ERROR(ENOTSUP);
556                 if (!parent->execute)
557                         continue;
558                 PARA_INFO_LOG("parent: %s, cmd: %s\n", parent->name, command);
559                 ret = parent->execute(parent, command, value_result);
560                 if (ret == -ERRNO_TO_PARA_ERROR(ENOTSUP))
561                         continue;
562                 if (ret < 0)
563                         return ret;
564                 if (value_result && *value_result)
565                         PARA_NOTICE_LOG("%s(%s): %s\n", command, parent->name,
566                                 *value_result);
567                 return 1;
568         }
569         return -ERRNO_TO_PARA_ERROR(ENOTSUP);
570 }
571
572 void *btr_context(struct btr_node *btrn)
573 {
574         return btrn->context;
575 }
576
577 static bool need_buffer_pool_merge(struct btr_node *btrn)
578 {
579         struct btr_buffer_reference *br = get_first_input_br(btrn);
580
581         if (!br)
582                 return false;
583         if (br->wrap_count != 0)
584                 return true;
585         if (br->btrb->pool)
586                 return true;
587         return false;
588 }
589
590 static void merge_input_pool(struct btr_node *btrn, size_t dest_size)
591 {
592         struct btr_buffer_reference *br, *wbr = NULL;
593         int num_refs; /* including wrap buffer */
594         char *buf, *buf1 = NULL, *buf2 = NULL;
595         size_t sz, sz1 = 0, sz2 = 0, wsz;
596
597         br = get_first_input_br(btrn);
598         if (!br || br_available_bytes(br) >= dest_size)
599                 return;
600         num_refs = 0;
601         FOR_EACH_BUFFER_REF(br, btrn) {
602                 num_refs++;
603                 sz = btr_get_buffer_by_reference(br, &buf);
604                 if (sz == 0)
605                         break;
606                 if (br->wrap_count != 0) {
607                         assert(!wbr);
608                         assert(num_refs == 1);
609                         wbr = br;
610                         if (sz >= dest_size)
611                                 return;
612                         continue;
613                 }
614                 if (!buf1) {
615                         buf1 = buf;
616                         sz1 = sz;
617                         goto next;
618                 }
619                 if (buf1 + sz1 == buf) {
620                         sz1 += sz;
621                         goto next;
622                 }
623                 if (!buf2) {
624                         buf2 = buf;
625                         sz2 = sz;
626                         goto next;
627                 }
628                 assert(buf2 + sz2 == buf);
629                 sz2 += sz;
630 next:
631                 if (sz1 + sz2 >= dest_size)
632                         break;
633         }
634         if (!buf2) /* nothing to do */
635                 return;
636         assert(buf1 && sz2 > 0);
637         /*
638          * If the second buffer is large, we only take the first part of it to
639          * avoid having to memcpy() huge buffers.
640          */
641         sz2 = PARA_MIN(sz2, (size_t)(64 * 1024));
642         if (!wbr) {
643                 /* Make a new wrap buffer combining buf1 and buf2. */
644                 sz = sz1 + sz2;
645                 buf = para_malloc(sz);
646                 PARA_DEBUG_LOG("merging input buffers: (%p:%zu, %p:%zu) -> %p:%zu\n",
647                         buf1, sz1, buf2, sz2, buf, sz);
648                 memcpy(buf, buf1, sz1);
649                 memcpy(buf + sz1, buf2, sz2);
650                 br = para_calloc(sizeof(*br));
651                 br->btrb = new_btrb(buf, sz);
652                 br->btrb->refcount = 1;
653                 br->consumed = 0;
654                 /* This is a wrap buffer */
655                 br->wrap_count = sz1;
656                 para_list_add(&br->node, &btrn->input_queue);
657                 return;
658         }
659         PARA_DEBUG_LOG("increasing wrap buffer, sz1: %zu, sz2: %zu\n", sz1, sz2);
660         /*
661          * We already have a wrap buffer, but it is too small. It might be
662          * partially used.
663          */
664         wsz = br_available_bytes(wbr);
665         if (wbr->wrap_count == sz1 && wbr->btrb->size >= sz1 + sz2) /* nothing we can do about it */
666                 return;
667         sz = sz1 + sz2 - wbr->btrb->size; /* amount of new data */
668         wbr->btrb->size += sz;
669         wbr->btrb->buf = para_realloc(wbr->btrb->buf, wbr->btrb->size);
670         /* copy the new data to the end of the reallocated buffer */
671         assert(sz2 >= sz);
672         memcpy(wbr->btrb->buf + wbr->btrb->size - sz, buf2 + sz2 - sz, sz);
673 }
674
675 /**
676  * Merge the first two input buffers into one.
677  *
678  * This is a quite expensive operation.
679  *
680  * \return The number of buffers that have been available (zero, one or two).
681  */
682 static int merge_input(struct btr_node *btrn)
683 {
684         struct btr_buffer_reference *brs[2], *br;
685         char *bufs[2], *buf;
686         size_t szs[2], sz;
687         int i;
688
689         if (list_empty(&btrn->input_queue))
690                 return 0;
691         if (list_is_singular(&btrn->input_queue))
692                 return 1;
693         i = 0;
694         /* get references to the first two buffers */
695         FOR_EACH_BUFFER_REF(br, btrn) {
696                 brs[i] = br;
697                 szs[i] = btr_get_buffer_by_reference(brs[i], bufs + i);
698                 i++;
699                 if (i == 2)
700                         break;
701         }
702         /* make a new btrb that combines the two buffers and a br to it. */
703         sz = szs[0] + szs[1];
704         buf = para_malloc(sz);
705         PARA_DEBUG_LOG("%s: memory merging input buffers: (%zu, %zu) -> %zu\n",
706                 btrn->name, szs[0], szs[1], sz);
707         memcpy(buf, bufs[0], szs[0]);
708         memcpy(buf + szs[0], bufs[1], szs[1]);
709
710         br = para_calloc(sizeof(*br));
711         br->btrb = new_btrb(buf, sz);
712         br->btrb->refcount = 1;
713
714         /* replace the first two refs by the new one */
715         btr_drop_buffer_reference(brs[0]);
716         btr_drop_buffer_reference(brs[1]);
717         para_list_add(&br->node, &btrn->input_queue);
718         return 2;
719 }
720
721 void btr_merge(struct btr_node *btrn, size_t dest_size)
722 {
723         if (need_buffer_pool_merge(btrn))
724                 return merge_input_pool(btrn, dest_size);
725         for (;;) {
726                 char *buf;
727                 size_t len = btr_next_buffer(btrn, &buf);
728                 if (len >= dest_size)
729                         return;
730                 PARA_DEBUG_LOG("input size = %zu < %zu = dest\n", len, dest_size);
731                 if (merge_input(btrn) < 2)
732                         return;
733         }
734 }
735
736 bool btr_eof(struct btr_node *btrn)
737 {
738         char *buf;
739         size_t len = btr_next_buffer(btrn, &buf);
740
741         return (len == 0 && btr_no_parent(btrn));
742 }
743
744 void log_tree_recursively(struct btr_node *btrn, int loglevel, int depth)
745 {
746         struct btr_node *ch;
747         const char spaces[] = "                 ", *space = spaces + 16 - depth;
748
749         if (depth > 16)
750                 return;
751         para_log(loglevel, "%s%s\n", space, btrn->name);
752         FOR_EACH_CHILD(ch, btrn)
753                 log_tree_recursively(ch, loglevel, depth + 1);
754 }
755
756 void btr_log_tree(struct btr_node *btrn, int loglevel)
757 {
758         return log_tree_recursively(btrn, loglevel, 0);
759 }
760
761 /*
762  * \return \a root if \a name is \p NULL.
763  */
764 struct btr_node *btr_search_node(const char *name, struct btr_node *root)
765 {
766         struct btr_node *ch;
767
768         if (!name)
769                 return root;
770         if (!strcmp(root->name, name))
771                 return root;
772         FOR_EACH_CHILD(ch, root) {
773                 struct btr_node *result = btr_search_node(name, ch);
774                 if (result)
775                         return result;
776         }
777         return NULL;
778 }
779
780 /** 640K ought to be enough for everybody ;) */
781 #define BTRN_MAX_PENDING (640 * 1024)
782
783 int btr_node_status(struct btr_node *btrn, size_t min_iqs,
784         enum btr_node_type type)
785 {
786         size_t iqs;
787
788         if (!btrn)
789                 return 0;
790         if (type != BTR_NT_LEAF) {
791                 if (btr_no_children(btrn))
792                         return -E_BTR_NO_CHILD;
793                 if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
794                         return 0;
795         }
796         if (type != BTR_NT_ROOT) {
797                 if (btr_eof(btrn))
798                         return -E_BTR_EOF;
799                 iqs = btr_get_input_queue_size(btrn);
800                 if (iqs == 0) /* we have a parent, because not eof */
801                         return 0;
802                 if (iqs < min_iqs && !btr_no_parent(btrn))
803                         return 0;
804         }
805         return 1;
806 }
807
808 void btr_get_node_start(struct btr_node *btrn, struct timeval *tv)
809 {
810         *tv = btrn->start;
811 }