[btr] Fix merge_input_pool().
[paraslash.git] / buffer_tree.c
1 #include <regex.h>
2 #include <stdbool.h>
3
4 #include "para.h"
5 #include "list.h"
6 #include "string.h"
7 #include "buffer_tree.h"
8 #include "error.h"
9 #include "sched.h"
10
11 struct btr_pool {
12         char *name;
13         char *area_start;
14         char *area_end;
15         char *rhead;
16         char *whead;
17 };
18
19 enum btr_buffer_flags {
20         /* changes the way the buffer is deallocated */
21         BTR_BF_BTR_POOL = 1,
22 };
23
24 struct btr_buffer {
25         char *buf;
26         size_t size;
27         /** The number of references to this buffer. */
28         int refcount;
29         struct btr_pool *pool;
30 };
31
32 struct btr_buffer_reference {
33         struct btr_buffer *btrb;
34         size_t consumed;
35         /* Each buffer reference belongs to the buffer queue list of some buffer tree node. */
36         struct list_head node;
37         size_t wrap_count;
38 };
39
40 struct btr_node {
41         char *name;
42         struct btr_node *parent;
43         /* The position of this btr node in the buffer tree. */
44         struct list_head node;
45         /* The children nodes of this btr node are linked together in a list. */
46         struct list_head children;
47         /* Time of first data transfer. */
48         struct timeval start;
49         /**
50          * The input queue is a list of references to btr buffers. Each item on
51          * the list represents an input buffer which has not been completely
52          * used by this btr node.
53          */
54         struct list_head input_queue;
55         btr_command_handler execute;
56         void *context;
57 };
58
59 struct btr_pool *btr_pool_new(const char *name, size_t area_size)
60 {
61         struct btr_pool *btrp;
62
63         PARA_INFO_LOG("%s, %zu bytes\n", name, area_size);
64         btrp = para_malloc(sizeof(*btrp));
65         btrp->area_start = para_malloc(area_size);
66         btrp->area_end = btrp->area_start + area_size;
67         btrp->rhead = btrp->area_start;
68         btrp->whead = btrp->area_start;
69         btrp->name = para_strdup(name);
70         return btrp;
71 }
72
73 /* whead = NULL means area full */
74
75 void btr_pool_free(struct btr_pool *btrp)
76 {
77         if (!btrp)
78                 return;
79         free(btrp->area_start);
80         free(btrp->name);
81         free(btrp);
82 }
83
84 size_t btr_pool_size(struct btr_pool *btrp)
85 {
86         return btrp->area_end - btrp->area_start;
87 }
88
89 size_t btr_pool_filled(struct btr_pool *btrp)
90 {
91         if (!btrp->whead)
92                 return btr_pool_size(btrp);
93         if (btrp->rhead <= btrp->whead)
94                 return  btrp->whead - btrp->rhead;
95         return btr_pool_size(btrp) - (btrp->rhead - btrp->whead);
96 }
97
98 size_t btr_pool_unused(struct btr_pool *btrp)
99 {
100         return btr_pool_size(btrp) - btr_pool_filled(btrp);
101 }
102
103 size_t btr_pool_available(struct btr_pool *btrp)
104 {
105         if (!btrp->whead)
106                 return 0;
107         if (btrp->rhead <= btrp->whead)
108                 return btrp->area_end - btrp->whead;
109         return btrp->rhead - btrp->whead;
110 }
111
112 size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result)
113 {
114         if (result)
115                 *result = btrp->whead;
116         return btr_pool_available(btrp);
117 }
118
119 void btr_pool_allocate(struct btr_pool *btrp, size_t size)
120 {
121         char *end;
122
123         if (size == 0)
124                 return;
125         //PARA_CRIT_LOG("filled: %zu, alloc %zu\n", btr_pool_filled(btrp), size);
126         assert(size <= btr_pool_available(btrp));
127         end = btrp->whead + size;
128         assert(end <= btrp->area_end);
129
130         if (end == btrp->area_end) {
131                 PARA_DEBUG_LOG("%s: end of pool area reached\n", btrp->name);
132                 end = btrp->area_start;
133         }
134         if (end == btrp->rhead) {
135                 PARA_DEBUG_LOG("btrp buffer full\n");
136                 end = NULL; /* buffer full */
137         }
138         btrp->whead = end;
139         //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
140 }
141
142 static void btr_pool_deallocate(struct btr_pool *btrp, size_t size)
143 {
144         char *end = btrp->rhead + size;
145
146         //PARA_CRIT_LOG("filled: %zu, dealloc %zu\n", btr_pool_filled(btrp), size);
147         if (size == 0)
148                 return;
149         assert(end <= btrp->area_end);
150         assert(size <= btr_pool_filled(btrp));
151         if (end == btrp->area_end)
152                 end = btrp->area_start;
153         if (!btrp->whead)
154                 btrp->whead = btrp->rhead;
155         btrp->rhead = end;
156         if (btrp->rhead == btrp->whead)
157                 btrp->rhead = btrp->whead = btrp->area_start;
158         //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
159 }
160
161 #define FOR_EACH_CHILD(_tn, _btrn) list_for_each_entry((_tn), \
162         &((_btrn)->children), node)
163 #define FOR_EACH_CHILD_SAFE(_tn, _tmp, _btrn) \
164         list_for_each_entry_safe((_tn), (_tmp), &((_btrn)->children), node)
165
166 #define FOR_EACH_BUFFER_REF(_br, _btrn) \
167         list_for_each_entry((_br), &(_btrn)->input_queue, node)
168 #define FOR_EACH_BUFFER_REF_SAFE(_br, _tmp, _btrn) \
169         list_for_each_entry_safe((_br), (_tmp), &(_btrn)->input_queue, node)
170
171 struct btr_node *btr_new_node(const char *name, struct btr_node *parent,
172                 btr_command_handler handler, void *context)
173 {
174         struct btr_node *btrn = para_malloc(sizeof(*btrn));
175
176         btrn->name = para_strdup(name);
177         btrn->parent = parent;
178         btrn->execute = handler;
179         btrn->context = context;
180         btrn->start.tv_sec = 0;
181         btrn->start.tv_usec = 0;
182         if (parent)
183                 list_add_tail(&btrn->node, &parent->children);
184         INIT_LIST_HEAD(&btrn->children);
185         INIT_LIST_HEAD(&btrn->input_queue);
186         if (parent)
187                 PARA_INFO_LOG("added %s as child of %s\n", name, parent->name);
188         else
189                 PARA_INFO_LOG("added %s as btr root\n", name);
190         return btrn;
191 }
192
193 /*
194  * Allocate a new btr buffer.
195  *
196  * The freshly allocated buffer will have a zero refcount and will
197  * not be associated with a btr pool.
198  */
199 static struct btr_buffer *new_btrb(char *buf, size_t size)
200 {
201         struct btr_buffer *btrb = para_calloc(sizeof(*btrb));
202
203         btrb->buf = buf;
204         btrb->size = size;
205         return btrb;
206 }
207
208 static void dealloc_buffer(struct btr_buffer *btrb)
209 {
210         if (btrb->pool)
211                 btr_pool_deallocate(btrb->pool, btrb->size);
212         else
213                 free(btrb->buf);
214 }
215
216 static struct btr_buffer_reference *get_first_input_br(struct btr_node *btrn)
217 {
218         if (list_empty(&btrn->input_queue))
219                 return NULL;
220         return list_first_entry(&btrn->input_queue,
221                 struct btr_buffer_reference, node);
222 }
223
224 /*
225  * Deallocate the reference, release the resources if refcount drops to zero.
226  */
227 static void btr_drop_buffer_reference(struct btr_buffer_reference *br)
228 {
229         struct btr_buffer *btrb = br->btrb;
230
231         //PARA_CRIT_LOG("dropping buffer reference %p\n", br);
232         list_del(&br->node);
233         free(br);
234         btrb->refcount--;
235         if (btrb->refcount == 0) {
236                 dealloc_buffer(btrb);
237                 free(btrb);
238         }
239 }
240
241 static void add_btrb_to_children(struct btr_buffer *btrb,
242                 struct btr_node *btrn, size_t consumed)
243 {
244         struct btr_node *ch;
245
246         if (btrn->start.tv_sec == 0)
247                 btrn->start = *now;
248         FOR_EACH_CHILD(ch, btrn) {
249                 struct btr_buffer_reference *br = para_calloc(sizeof(*br));
250                 br->btrb = btrb;
251                 br->consumed = consumed;
252                 list_add_tail(&br->node, &ch->input_queue);
253                 btrb->refcount++;
254                 if (ch->start.tv_sec == 0)
255                         ch->start = *now;
256         }
257 }
258
259 void btr_add_output(char *buf, size_t size, struct btr_node *btrn)
260 {
261         struct btr_buffer *btrb;
262
263         assert(size != 0);
264         if (list_empty(&btrn->children)) {
265                 free(buf);
266                 return;
267         }
268         btrb = new_btrb(buf, size);
269         add_btrb_to_children(btrb, btrn, 0);
270 }
271
272 void btr_add_output_pool(struct btr_pool *btrp, size_t size,
273                 struct btr_node *btrn)
274 {
275         struct btr_buffer *btrb;
276         char *buf;
277         size_t avail;
278
279         assert(size != 0);
280         if (list_empty(&btrn->children))
281                 return;
282         avail = btr_pool_get_buffer(btrp, &buf);
283         assert(avail >= size);
284         btr_pool_allocate(btrp, size);
285         btrb = new_btrb(buf, size);
286         btrb->pool = btrp;
287         add_btrb_to_children(btrb, btrn, 0);
288 }
289
290 void btr_copy(const void *src, size_t n, struct btr_pool *btrp,
291         struct btr_node *btrn)
292 {
293         char *buf;
294         size_t sz, copy;
295
296         if (n == 0)
297                 return;
298         assert(n <= btr_pool_unused(btrp));
299         sz = btr_pool_get_buffer(btrp, &buf);
300         copy = PARA_MIN(sz, n);
301         memcpy(buf, src, copy);
302         btr_add_output_pool(btrp, copy, btrn);
303         if (copy == n)
304                 return;
305         sz = btr_pool_get_buffer(btrp, &buf);
306         assert(sz >= n - copy);
307         memcpy(buf, src + copy, n - copy);
308         btr_add_output_pool(btrp, n - copy, btrn);
309 }
310
311 static void btr_pushdown_br(struct btr_buffer_reference *br, struct btr_node *btrn)
312 {
313         add_btrb_to_children(br->btrb, btrn, br->consumed);
314         btr_drop_buffer_reference(br);
315 }
316
317 void btr_pushdown(struct btr_node *btrn)
318 {
319         struct btr_buffer_reference *br, *tmp;
320
321         FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
322                 btr_pushdown_br(br, btrn);
323 }
324
325 int btr_pushdown_one(struct btr_node *btrn)
326 {
327         struct btr_buffer_reference *br;
328
329         if (list_empty(&btrn->input_queue))
330                 return 0;
331         br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
332         btr_pushdown_br(br, btrn);
333         return 1;
334 }
335
336 /* Return true if this node has no children. */
337 bool btr_no_children(struct btr_node *btrn)
338 {
339         return list_empty(&btrn->children);
340 }
341
342 bool btr_no_parent(struct btr_node *btrn)
343 {
344         return !btrn->parent;
345 }
346
347 bool btr_inplace_ok(struct btr_node *btrn)
348 {
349         if (!btrn->parent)
350                 return true;
351         return list_is_singular(&btrn->parent->children);
352 }
353
354 static inline size_t br_available_bytes(struct btr_buffer_reference *br)
355 {
356         return br->btrb->size - br->consumed;
357 }
358
359 size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
360 {
361         if (buf)
362                 *buf = br->btrb->buf + br->consumed;
363         return br_available_bytes(br);
364 }
365
366 /**
367  * \return zero if the input buffer queue is empty.
368  */
369 size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
370 {
371         struct btr_buffer_reference *br;
372         char *buf, *result = NULL;
373         size_t sz, rv = 0;
374
375         FOR_EACH_BUFFER_REF(br, btrn) {
376                 sz = btr_get_buffer_by_reference(br, &buf);
377                 if (!result) {
378                         result = buf;
379                         rv = sz;
380                         if (!br->btrb->pool)
381                                 break;
382                         continue;
383                 }
384                 if (!br->btrb->pool)
385                         break;
386                 if (result + rv != buf) {
387                         PARA_DEBUG_LOG("%s: pool merge impossible: %p != %p\n",
388                                 btrn->name, result + rv, buf);
389                         break;
390                 }
391 //              PARA_CRIT_LOG("%s: inplace merge (%zu, %zu)->%zu\n", btrn->name,
392 //                      rv, sz, rv + sz);
393 //              PARA_CRIT_LOG("%s: inplace merge %p (%zu)\n", btrn->name,
394 //                      result, sz);
395                 rv += sz;
396         }
397         if (bufp)
398                 *bufp = result;
399         return rv;
400 }
401
402 void btr_consume(struct btr_node *btrn, size_t numbytes)
403 {
404         struct btr_buffer_reference *br, *tmp;
405         size_t sz;
406
407         if (numbytes == 0)
408                 return;
409         br = get_first_input_br(btrn);
410         assert(br);
411
412         //PARA_CRIT_LOG("wrap count: %zu\n", br->wrap_count);
413         if (br->wrap_count == 0) {
414                 /*
415                  * No wrap buffer. Drop buffer references whose buffer
416                  * has been fully used. */
417                 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn) {
418                         if (br->consumed + numbytes <= br->btrb->size) {
419                                 br->consumed += numbytes;
420                                 if (br->consumed == br->btrb->size)
421                                         btr_drop_buffer_reference(br);
422                                 return;
423                         }
424                         numbytes -= br->btrb->size - br->consumed;
425                         btr_drop_buffer_reference(br);
426                 }
427                 assert(true);
428         }
429         /*
430
431         We have a wrap buffer, consume from it. If in total,
432         i.e. including previous calls to brt_consume(), less than
433         wrap_count has been consumed, there's nothing more we can do.
434
435         Otherwise we drop the wrap buffer and consume from subsequent
436         buffers of the input queue the correct amount of bytes. This
437         is the total number of bytes that have been consumed from the
438         wrap buffer.
439 */
440         PARA_DEBUG_LOG("consuming %zu/%zu bytes from wrap buffer\n", numbytes,
441                 br_available_bytes(br));
442
443         assert(numbytes <= br_available_bytes(br));
444         if (br->consumed + numbytes < br->wrap_count) {
445                 br->consumed += numbytes;
446                 return;
447         }
448         PARA_DEBUG_LOG("dropping wrap buffer (%zu bytes)\n", br->btrb->size);
449         /* get rid of the wrap buffer */
450         sz = br->consumed + numbytes;
451         btr_drop_buffer_reference(br);
452         return btr_consume(btrn, sz);
453 }
454
455 static void flush_input_queue(struct btr_node *btrn)
456 {
457         struct btr_buffer_reference *br, *tmp;
458         FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
459                 btr_drop_buffer_reference(br);
460 }
461
462 void btr_free_node(struct btr_node *btrn)
463 {
464         if (!btrn)
465                 return;
466         free(btrn->name);
467         free(btrn);
468 }
469
470 void btr_remove_node(struct btr_node *btrn)
471 {
472         struct btr_node *ch;
473
474         if (!btrn)
475                 return;
476         PARA_NOTICE_LOG("removing btr node %s from buffer tree\n", btrn->name);
477         FOR_EACH_CHILD(ch, btrn)
478                 ch->parent = NULL;
479         flush_input_queue(btrn);
480         if (btrn->parent)
481                 list_del(&btrn->node);
482 }
483
484 size_t btr_get_input_queue_size(struct btr_node *btrn)
485 {
486         struct btr_buffer_reference *br;
487         size_t size = 0;
488
489         FOR_EACH_BUFFER_REF(br, btrn) {
490                 //PARA_CRIT_LOG("size: %zu\n", size);
491                 size += br_available_bytes(br);
492         }
493         return size;
494 }
495
496 void btr_splice_out_node(struct btr_node *btrn)
497 {
498         struct btr_node *ch, *tmp;
499
500         assert(btrn);
501         PARA_NOTICE_LOG("splicing out %s\n", btrn->name);
502         btr_pushdown(btrn);
503         if (btrn->parent)
504                 list_del(&btrn->node);
505         FOR_EACH_CHILD_SAFE(ch, tmp, btrn) {
506                 PARA_INFO_LOG("parent(%s): %s\n", ch->name,
507                         btrn->parent? btrn->parent->name : "NULL");
508                 ch->parent = btrn->parent;
509                 if (btrn->parent)
510                         list_move(&ch->node, &btrn->parent->children);
511         }
512         assert(list_empty(&btrn->children));
513 }
514
515 /**
516  * Return the size of the largest input queue.
517  *
518  * Iterates over all children of the given node.
519  */
520 size_t btr_bytes_pending(struct btr_node *btrn)
521 {
522         size_t max_size = 0;
523         struct btr_node *ch;
524
525         FOR_EACH_CHILD(ch, btrn) {
526                 size_t size = btr_get_input_queue_size(ch);
527                 max_size = PARA_MAX(max_size, size);
528         }
529         return max_size;
530 }
531
532 int btr_exec(struct btr_node *btrn, const char *command, char **value_result)
533 {
534         if (!btrn)
535                 return -ERRNO_TO_PARA_ERROR(EINVAL);
536         if (!btrn->execute)
537                 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
538         return btrn->execute(btrn, command, value_result);
539 }
540
541 int btr_exec_up(struct btr_node *btrn, const char *command, char **value_result)
542 {
543         int ret;
544
545         for (; btrn; btrn = btrn->parent) {
546                 struct btr_node *parent = btrn->parent;
547                 if (!parent)
548                         return -ERRNO_TO_PARA_ERROR(ENOTSUP);
549                 if (!parent->execute)
550                         continue;
551                 PARA_INFO_LOG("parent: %s, cmd: %s\n", parent->name, command);
552                 ret = parent->execute(parent, command, value_result);
553                 if (ret == -ERRNO_TO_PARA_ERROR(ENOTSUP))
554                         continue;
555                 if (ret < 0)
556                         return ret;
557                 if (value_result && *value_result)
558                         PARA_NOTICE_LOG("%s(%s): %s\n", command, parent->name,
559                                 *value_result);
560                 return 1;
561         }
562         return -ERRNO_TO_PARA_ERROR(ENOTSUP);
563 }
564
565 void *btr_context(struct btr_node *btrn)
566 {
567         return btrn->context;
568 }
569
570 static bool need_buffer_pool_merge(struct btr_node *btrn)
571 {
572         struct btr_buffer_reference *br = get_first_input_br(btrn);
573
574         if (!br)
575                 return false;
576         if (br->wrap_count != 0)
577                 return true;
578         if (br->btrb->pool)
579                 return true;
580         return false;
581 }
582
583 static void merge_input_pool(struct btr_node *btrn, size_t dest_size)
584 {
585         struct btr_buffer_reference *br, *wbr = NULL;
586         int num_refs; /* including wrap buffer */
587         char *buf, *buf1 = NULL, *buf2 = NULL;
588         size_t sz, sz1 = 0, sz2 = 0, wsz;
589
590         br = get_first_input_br(btrn);
591         if (!br || br_available_bytes(br) >= dest_size)
592                 return;
593         num_refs = 0;
594         FOR_EACH_BUFFER_REF(br, btrn) {
595                 num_refs++;
596                 sz = btr_get_buffer_by_reference(br, &buf);
597                 if (sz == 0)
598                         break;
599                 if (br->wrap_count != 0) {
600                         assert(!wbr);
601                         assert(num_refs == 1);
602                         wbr = br;
603                         if (sz >= dest_size)
604                                 return;
605                         continue;
606                 }
607                 if (!buf1) {
608                         buf1 = buf;
609                         sz1 = sz;
610                         goto next;
611                 }
612                 if (buf1 + sz1 == buf) {
613                         sz1 += sz;
614                         goto next;
615                 }
616                 if (!buf2) {
617                         buf2 = buf;
618                         sz2 = sz;
619                         goto next;
620                 }
621                 assert(buf2 + sz2 == buf);
622                 sz2 += sz;
623 next:
624                 if (sz1 + sz2 >= dest_size)
625                         break;
626         }
627         if (!buf2) /* nothing to do */
628                 return;
629         assert(buf1 && sz2 > 0);
630         /*
631          * If the second buffer is large, we only take the first part of it to
632          * avoid having to memcpy() huge buffers.
633          */
634         sz2 = PARA_MIN(sz2, (size_t)(64 * 1024));
635         if (!wbr) {
636                 /* Make a new wrap buffer combining buf1 and buf2. */
637                 sz = sz1 + sz2;
638                 buf = para_malloc(sz);
639                 PARA_DEBUG_LOG("merging input buffers: (%p:%zu, %p:%zu) -> %p:%zu\n",
640                         buf1, sz1, buf2, sz2, buf, sz);
641                 memcpy(buf, buf1, sz1);
642                 memcpy(buf + sz1, buf2, sz2);
643                 br = para_calloc(sizeof(*br));
644                 br->btrb = new_btrb(buf, sz);
645                 br->btrb->refcount = 1;
646                 br->consumed = 0;
647                 /* This is a wrap buffer */
648                 br->wrap_count = sz1;
649                 para_list_add(&br->node, &btrn->input_queue);
650                 return;
651         }
652         PARA_DEBUG_LOG("increasing wrap buffer, sz1: %zu, sz2: %zu\n", sz1, sz2);
653         /*
654          * We already have a wrap buffer, but it is too small. It might be
655          * partially used.
656          */
657         wsz = br_available_bytes(wbr);
658         if (wbr->wrap_count == sz1 && wbr->btrb->size >= sz1 + sz2) /* nothing we can do about it */
659                 return;
660         sz = sz1 + sz2 - wbr->btrb->size; /* amount of new data */
661         wbr->btrb->size += sz;
662         wbr->btrb->buf = para_realloc(wbr->btrb->buf, wbr->btrb->size);
663         /* copy the new data to the end of the reallocated buffer */
664         assert(sz2 >= sz);
665         memcpy(wbr->btrb->buf + wbr->btrb->size - sz, buf2 + sz2 - sz, sz);
666 }
667
668 /**
669  * Merge the first two input buffers into one.
670  *
671  * This is a quite expensive operation.
672  *
673  * \return The number of buffers that have been available (zero, one or two).
674  */
675 static int merge_input(struct btr_node *btrn)
676 {
677         struct btr_buffer_reference *brs[2], *br;
678         char *bufs[2], *buf;
679         size_t szs[2], sz;
680         int i;
681
682         if (list_empty(&btrn->input_queue))
683                 return 0;
684         if (list_is_singular(&btrn->input_queue))
685                 return 1;
686         i = 0;
687         /* get references to the first two buffers */
688         FOR_EACH_BUFFER_REF(br, btrn) {
689                 brs[i] = br;
690                 szs[i] = btr_get_buffer_by_reference(brs[i], bufs + i);
691                 i++;
692                 if (i == 2)
693                         break;
694         }
695         /* make a new btrb that combines the two buffers and a br to it. */
696         sz = szs[0] + szs[1];
697         buf = para_malloc(sz);
698         PARA_DEBUG_LOG("%s: memory merging input buffers: (%zu, %zu) -> %zu\n",
699                 btrn->name, szs[0], szs[1], sz);
700         memcpy(buf, bufs[0], szs[0]);
701         memcpy(buf + szs[0], bufs[1], szs[1]);
702
703         br = para_calloc(sizeof(*br));
704         br->btrb = new_btrb(buf, sz);
705         br->btrb->refcount = 1;
706
707         /* replace the first two refs by the new one */
708         btr_drop_buffer_reference(brs[0]);
709         btr_drop_buffer_reference(brs[1]);
710         para_list_add(&br->node, &btrn->input_queue);
711         return 2;
712 }
713
714 void btr_merge(struct btr_node *btrn, size_t dest_size)
715 {
716         if (need_buffer_pool_merge(btrn))
717                 return merge_input_pool(btrn, dest_size);
718         for (;;) {
719                 char *buf;
720                 size_t len = btr_next_buffer(btrn, &buf);
721                 if (len >= dest_size)
722                         return;
723                 PARA_DEBUG_LOG("input size = %zu < %zu = dest\n", len, dest_size);
724                 if (merge_input(btrn) < 2)
725                         return;
726         }
727 }
728
729 bool btr_eof(struct btr_node *btrn)
730 {
731         char *buf;
732         size_t len = btr_next_buffer(btrn, &buf);
733
734         return (len == 0 && btr_no_parent(btrn));
735 }
736
737 void log_tree_recursively(struct btr_node *btrn, int loglevel, int depth)
738 {
739         struct btr_node *ch;
740         const char spaces[] = "                 ", *space = spaces + 16 - depth;
741
742         if (depth > 16)
743                 return;
744         para_log(loglevel, "%s%s\n", space, btrn->name);
745         FOR_EACH_CHILD(ch, btrn)
746                 log_tree_recursively(ch, loglevel, depth + 1);
747 }
748
749 void btr_log_tree(struct btr_node *btrn, int loglevel)
750 {
751         return log_tree_recursively(btrn, loglevel, 0);
752 }
753
754 /*
755  * \return \a root if \a name is \p NULL.
756  */
757 struct btr_node *btr_search_node(const char *name, struct btr_node *root)
758 {
759         struct btr_node *ch;
760
761         if (!name)
762                 return root;
763         if (!strcmp(root->name, name))
764                 return root;
765         FOR_EACH_CHILD(ch, root) {
766                 struct btr_node *result = btr_search_node(name, ch);
767                 if (result)
768                         return result;
769         }
770         return NULL;
771 }
772
773 /** 640K ought to be enough for everybody ;) */
774 #define BTRN_MAX_PENDING (640 * 1024)
775
776 int btr_node_status(struct btr_node *btrn, size_t min_iqs,
777         enum btr_node_type type)
778 {
779         size_t iqs;
780
781         if (!btrn)
782                 return 0;
783         if (type != BTR_NT_LEAF) {
784                 if (btr_no_children(btrn))
785                         return -E_BTR_NO_CHILD;
786                 if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
787                         return 0;
788         }
789         if (type != BTR_NT_ROOT) {
790                 if (btr_eof(btrn))
791                         return -E_BTR_EOF;
792                 iqs = btr_get_input_queue_size(btrn);
793                 if (iqs == 0) /* we have a parent, because not eof */
794                         return 0;
795                 if (iqs < min_iqs && !btr_no_parent(btrn))
796                         return 0;
797         }
798         return 1;
799 }
800
801 void btr_get_node_start(struct btr_node *btrn, struct timeval *tv)
802 {
803         *tv = btrn->start;
804 }