]> git.tuebingen.mpg.de Git - paraslash.git/blob - buffer_tree.c
6dc3c41b2b9deb6787c1e6414e0f43ce394355af
[paraslash.git] / buffer_tree.c
1 #include <regex.h>
2 #include <stdbool.h>
3
4 #include "para.h"
5 #include "list.h"
6 #include "string.h"
7 #include "buffer_tree.h"
8 #include "error.h"
9 #include "sched.h"
10
11 struct btr_pool {
12         char *name;
13         char *area_start;
14         char *area_end;
15         char *rhead;
16         char *whead;
17 };
18
19 enum btr_buffer_flags {
20         /* changes the way the buffer is deallocated */
21         BTR_BF_BTR_POOL = 1,
22 };
23
24 struct btr_buffer {
25         char *buf;
26         size_t size;
27         /** The number of references to this buffer. */
28         int refcount;
29         struct btr_pool *pool;
30 };
31
32 struct btr_buffer_reference {
33         struct btr_buffer *btrb;
34         size_t consumed;
35         /* Each buffer reference belongs to the buffer queue list of some buffer tree node. */
36         struct list_head node;
37         size_t wrap_count;
38 };
39
40 struct btr_node {
41         char *name;
42         struct btr_node *parent;
43         /* The position of this btr node in the buffer tree. */
44         struct list_head node;
45         /* The children nodes of this btr node are linked together in a list. */
46         struct list_head children;
47         /* Time of first data transfer. */
48         struct timeval start;
49         /**
50          * The input queue is a list of references to btr buffers. Each item on
51          * the list represents an input buffer which has not been completely
52          * used by this btr node.
53          */
54         struct list_head input_queue;
55         btr_command_handler execute;
56         void *context;
57 };
58
59 struct btr_pool *btr_pool_new(const char *name, size_t area_size)
60 {
61         struct btr_pool *btrp;
62
63         PARA_INFO_LOG("%s, %zu bytes\n", name, area_size);
64         btrp = para_malloc(sizeof(*btrp));
65         btrp->area_start = para_malloc(area_size);
66         btrp->area_end = btrp->area_start + area_size;
67         btrp->rhead = btrp->area_start;
68         btrp->whead = btrp->area_start;
69         btrp->name = para_strdup(name);
70         return btrp;
71 }
72
73 /* whead = NULL means area full */
74
75 void btr_pool_free(struct btr_pool *btrp)
76 {
77         if (!btrp)
78                 return;
79         free(btrp->area_start);
80         free(btrp->name);
81         free(btrp);
82 }
83
84 size_t btr_pool_size(struct btr_pool *btrp)
85 {
86         return btrp->area_end - btrp->area_start;
87 }
88
89 size_t btr_pool_filled(struct btr_pool *btrp)
90 {
91         if (!btrp->whead)
92                 return btr_pool_size(btrp);
93         if (btrp->rhead <= btrp->whead)
94                 return  btrp->whead - btrp->rhead;
95         return btr_pool_size(btrp) - (btrp->rhead - btrp->whead);
96 }
97
98 size_t btr_pool_unused(struct btr_pool *btrp)
99 {
100         return btr_pool_size(btrp) - btr_pool_filled(btrp);
101 }
102
103 /*
104  * Return maximal size available for one read. This is
105  * smaller than the value returned by btr_pool_unused().
106  */
107 size_t btr_pool_available(struct btr_pool *btrp)
108 {
109         if (!btrp->whead)
110                 return 0;
111         if (btrp->rhead <= btrp->whead)
112                 return btrp->area_end - btrp->whead;
113         return btrp->rhead - btrp->whead;
114 }
115
116 size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result)
117 {
118         if (result)
119                 *result = btrp->whead;
120         return btr_pool_available(btrp);
121 }
122
123 void btr_pool_allocate(struct btr_pool *btrp, size_t size)
124 {
125         char *end;
126
127         if (size == 0)
128                 return;
129         assert(size <= btr_pool_available(btrp));
130         end = btrp->whead + size;
131         assert(end <= btrp->area_end);
132
133         if (end == btrp->area_end) {
134                 PARA_DEBUG_LOG("%s: end of pool area reached\n", btrp->name);
135                 end = btrp->area_start;
136         }
137         if (end == btrp->rhead) {
138                 PARA_DEBUG_LOG("%s btrp buffer full\n", btrp->name);
139                 end = NULL; /* buffer full */
140         }
141         btrp->whead = end;
142 }
143
144 static void btr_pool_deallocate(struct btr_pool *btrp, size_t size)
145 {
146         char *end = btrp->rhead + size;
147
148         if (size == 0)
149                 return;
150         assert(end <= btrp->area_end);
151         assert(size <= btr_pool_filled(btrp));
152         if (end == btrp->area_end)
153                 end = btrp->area_start;
154         if (!btrp->whead)
155                 btrp->whead = btrp->rhead;
156         btrp->rhead = end;
157         if (btrp->rhead == btrp->whead)
158                 btrp->rhead = btrp->whead = btrp->area_start;
159 }
160
161 #define FOR_EACH_CHILD(_tn, _btrn) list_for_each_entry((_tn), \
162         &((_btrn)->children), node)
163 #define FOR_EACH_CHILD_SAFE(_tn, _tmp, _btrn) \
164         list_for_each_entry_safe((_tn), (_tmp), &((_btrn)->children), node)
165
166 #define FOR_EACH_BUFFER_REF(_br, _btrn) \
167         list_for_each_entry((_br), &(_btrn)->input_queue, node)
168 #define FOR_EACH_BUFFER_REF_SAFE(_br, _tmp, _btrn) \
169         list_for_each_entry_safe((_br), (_tmp), &(_btrn)->input_queue, node)
170
171 struct btr_node *btr_new_node(struct btr_node_description *bnd)
172 {
173         struct btr_node *btrn = para_malloc(sizeof(*btrn));
174
175         btrn->name = para_strdup(bnd->name);
176         btrn->parent = bnd->parent;
177         btrn->execute = bnd->handler;
178         btrn->context = bnd->context;
179         btrn->start.tv_sec = 0;
180         btrn->start.tv_usec = 0;
181         INIT_LIST_HEAD(&btrn->children);
182         INIT_LIST_HEAD(&btrn->input_queue);
183         if (!bnd->child) {
184                 if (bnd->parent) {
185                         list_add_tail(&btrn->node, &bnd->parent->children);
186                         PARA_INFO_LOG("new leaf node: %s (child of %s)\n",
187                                 bnd->name, bnd->parent->name);
188                 } else
189                         PARA_INFO_LOG("added %s as btr root\n", bnd->name);
190                 goto out;
191         }
192         if (!bnd->parent) {
193                 assert(!bnd->child->parent);
194                 PARA_INFO_LOG("new root: %s (was %s)\n",
195                         bnd->name, bnd->child->name);
196                 btrn->parent = NULL;
197                 list_add_tail(&bnd->child->node, &btrn->children);
198                 /* link it in */
199                 bnd->child->parent = btrn;
200                 goto out;
201         }
202         PARA_EMERG_LOG("inserting internal nodes not yet supported.\n");
203         exit(EXIT_FAILURE);
204         assert(bnd->child->parent == bnd->parent);
205 out:
206         return btrn;
207 }
208
209 /*
210  * Allocate a new btr buffer.
211  *
212  * The freshly allocated buffer will have a zero refcount and will
213  * not be associated with a btr pool.
214  */
215 static struct btr_buffer *new_btrb(char *buf, size_t size)
216 {
217         struct btr_buffer *btrb = para_calloc(sizeof(*btrb));
218
219         btrb->buf = buf;
220         btrb->size = size;
221         return btrb;
222 }
223
224 static void dealloc_buffer(struct btr_buffer *btrb)
225 {
226         if (btrb->pool)
227                 btr_pool_deallocate(btrb->pool, btrb->size);
228         else
229                 free(btrb->buf);
230 }
231
232 static struct btr_buffer_reference *get_first_input_br(struct btr_node *btrn)
233 {
234         if (list_empty(&btrn->input_queue))
235                 return NULL;
236         return list_first_entry(&btrn->input_queue,
237                 struct btr_buffer_reference, node);
238 }
239
240 /*
241  * Deallocate the reference, release the resources if refcount drops to zero.
242  */
243 static void btr_drop_buffer_reference(struct btr_buffer_reference *br)
244 {
245         struct btr_buffer *btrb = br->btrb;
246
247         list_del(&br->node);
248         free(br);
249         btrb->refcount--;
250         if (btrb->refcount == 0) {
251                 dealloc_buffer(btrb);
252                 free(btrb);
253         }
254 }
255
256 static void add_btrb_to_children(struct btr_buffer *btrb,
257                 struct btr_node *btrn, size_t consumed)
258 {
259         struct btr_node *ch;
260
261         if (btrn->start.tv_sec == 0)
262                 btrn->start = *now;
263         FOR_EACH_CHILD(ch, btrn) {
264                 struct btr_buffer_reference *br = para_calloc(sizeof(*br));
265                 br->btrb = btrb;
266                 br->consumed = consumed;
267                 list_add_tail(&br->node, &ch->input_queue);
268                 btrb->refcount++;
269                 if (ch->start.tv_sec == 0)
270                         ch->start = *now;
271         }
272 }
273
274 void btr_add_output(char *buf, size_t size, struct btr_node *btrn)
275 {
276         struct btr_buffer *btrb;
277
278         assert(size != 0);
279         if (list_empty(&btrn->children)) {
280                 free(buf);
281                 return;
282         }
283         btrb = new_btrb(buf, size);
284         add_btrb_to_children(btrb, btrn, 0);
285 }
286
287 void btr_add_output_pool(struct btr_pool *btrp, size_t size,
288                 struct btr_node *btrn)
289 {
290         struct btr_buffer *btrb;
291         char *buf;
292         size_t avail;
293
294         assert(size != 0);
295         if (list_empty(&btrn->children))
296                 return;
297         avail = btr_pool_get_buffer(btrp, &buf);
298         assert(avail >= size);
299         btr_pool_allocate(btrp, size);
300         btrb = new_btrb(buf, size);
301         btrb->pool = btrp;
302         add_btrb_to_children(btrb, btrn, 0);
303 }
304
305 void btr_copy(const void *src, size_t n, struct btr_pool *btrp,
306         struct btr_node *btrn)
307 {
308         char *buf;
309         size_t sz, copy;
310
311         if (n == 0)
312                 return;
313         assert(n <= btr_pool_unused(btrp));
314         sz = btr_pool_get_buffer(btrp, &buf);
315         copy = PARA_MIN(sz, n);
316         memcpy(buf, src, copy);
317         btr_add_output_pool(btrp, copy, btrn);
318         if (copy == n)
319                 return;
320         sz = btr_pool_get_buffer(btrp, &buf);
321         assert(sz >= n - copy);
322         memcpy(buf, src + copy, n - copy);
323         btr_add_output_pool(btrp, n - copy, btrn);
324 }
325
326 static void btr_pushdown_br(struct btr_buffer_reference *br, struct btr_node *btrn)
327 {
328         add_btrb_to_children(br->btrb, btrn, br->consumed);
329         btr_drop_buffer_reference(br);
330 }
331
332 void btr_pushdown(struct btr_node *btrn)
333 {
334         struct btr_buffer_reference *br, *tmp;
335
336         FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
337                 btr_pushdown_br(br, btrn);
338 }
339
340 int btr_pushdown_one(struct btr_node *btrn)
341 {
342         struct btr_buffer_reference *br;
343
344         if (list_empty(&btrn->input_queue))
345                 return 0;
346         br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
347         btr_pushdown_br(br, btrn);
348         return 1;
349 }
350
351 /* Return true if this node has no children. */
352 bool btr_no_children(struct btr_node *btrn)
353 {
354         return list_empty(&btrn->children);
355 }
356
357 bool btr_no_parent(struct btr_node *btrn)
358 {
359         return !btrn->parent;
360 }
361
362 bool btr_inplace_ok(struct btr_node *btrn)
363 {
364         if (!btrn->parent)
365                 return true;
366         return list_is_singular(&btrn->parent->children);
367 }
368
369 static inline size_t br_available_bytes(struct btr_buffer_reference *br)
370 {
371         return br->btrb->size - br->consumed;
372 }
373
374 size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
375 {
376         if (buf)
377                 *buf = br->btrb->buf + br->consumed;
378         return br_available_bytes(br);
379 }
380
381 /**
382  * \return zero if the input buffer queue is empty.
383  */
384 size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
385 {
386         struct btr_buffer_reference *br;
387         char *buf, *result = NULL;
388         size_t sz, rv = 0;
389
390         FOR_EACH_BUFFER_REF(br, btrn) {
391                 sz = btr_get_buffer_by_reference(br, &buf);
392                 if (!result) {
393                         result = buf;
394                         rv = sz;
395                         if (!br->btrb->pool)
396                                 break;
397                         continue;
398                 }
399                 if (!br->btrb->pool)
400                         break;
401                 if (result + rv != buf)
402                         break;
403                 rv += sz;
404         }
405         if (bufp)
406                 *bufp = result;
407         return rv;
408 }
409
410 void btr_consume(struct btr_node *btrn, size_t numbytes)
411 {
412         struct btr_buffer_reference *br, *tmp;
413         size_t sz;
414
415         if (numbytes == 0)
416                 return;
417         br = get_first_input_br(btrn);
418         assert(br);
419
420         if (br->wrap_count == 0) {
421                 /*
422                  * No wrap buffer. Drop buffer references whose buffer
423                  * has been fully used. */
424                 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn) {
425                         if (br->consumed + numbytes <= br->btrb->size) {
426                                 br->consumed += numbytes;
427                                 if (br->consumed == br->btrb->size)
428                                         btr_drop_buffer_reference(br);
429                                 return;
430                         }
431                         numbytes -= br->btrb->size - br->consumed;
432                         btr_drop_buffer_reference(br);
433                 }
434                 assert(true);
435         }
436         /*
437
438         We have a wrap buffer, consume from it. If in total,
439         i.e. including previous calls to brt_consume(), less than
440         wrap_count has been consumed, there's nothing more we can do.
441
442         Otherwise we drop the wrap buffer and consume from subsequent
443         buffers of the input queue the correct amount of bytes. This
444         is the total number of bytes that have been consumed from the
445         wrap buffer.
446 */
447         PARA_DEBUG_LOG("consuming %zu/%zu bytes from wrap buffer\n", numbytes,
448                 br_available_bytes(br));
449
450         assert(numbytes <= br_available_bytes(br));
451         if (br->consumed + numbytes < br->wrap_count) {
452                 br->consumed += numbytes;
453                 return;
454         }
455         PARA_DEBUG_LOG("dropping wrap buffer (%zu bytes)\n", br->btrb->size);
456         /* get rid of the wrap buffer */
457         sz = br->consumed + numbytes;
458         btr_drop_buffer_reference(br);
459         return btr_consume(btrn, sz);
460 }
461
462 static void flush_input_queue(struct btr_node *btrn)
463 {
464         struct btr_buffer_reference *br, *tmp;
465         FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
466                 btr_drop_buffer_reference(br);
467 }
468
469 void btr_free_node(struct btr_node *btrn)
470 {
471         if (!btrn)
472                 return;
473         free(btrn->name);
474         free(btrn);
475 }
476
477 void btr_remove_node(struct btr_node *btrn)
478 {
479         struct btr_node *ch;
480
481         if (!btrn)
482                 return;
483         PARA_NOTICE_LOG("removing btr node %s from buffer tree\n", btrn->name);
484         FOR_EACH_CHILD(ch, btrn)
485                 ch->parent = NULL;
486         flush_input_queue(btrn);
487         if (btrn->parent)
488                 list_del(&btrn->node);
489 }
490
491 size_t btr_get_input_queue_size(struct btr_node *btrn)
492 {
493         struct btr_buffer_reference *br;
494         size_t size = 0, wrap_consumed = 0;
495
496         FOR_EACH_BUFFER_REF(br, btrn) {
497                 if (br->wrap_count != 0) {
498                         wrap_consumed = br->consumed;
499                         continue;
500                 }
501                 size += br_available_bytes(br);
502         }
503         assert(wrap_consumed <= size);
504         size -= wrap_consumed;
505         return size;
506 }
507
508 void btr_splice_out_node(struct btr_node *btrn)
509 {
510         struct btr_node *ch, *tmp;
511
512         assert(btrn);
513         PARA_NOTICE_LOG("splicing out %s\n", btrn->name);
514         btr_pushdown(btrn);
515         if (btrn->parent)
516                 list_del(&btrn->node);
517         FOR_EACH_CHILD_SAFE(ch, tmp, btrn) {
518                 PARA_INFO_LOG("parent(%s): %s\n", ch->name,
519                         btrn->parent? btrn->parent->name : "NULL");
520                 ch->parent = btrn->parent;
521                 if (btrn->parent)
522                         list_move(&ch->node, &btrn->parent->children);
523         }
524         assert(list_empty(&btrn->children));
525 }
526
527 /**
528  * Return the size of the largest input queue.
529  *
530  * Iterates over all children of the given node.
531  */
532 size_t btr_bytes_pending(struct btr_node *btrn)
533 {
534         size_t max_size = 0;
535         struct btr_node *ch;
536
537         FOR_EACH_CHILD(ch, btrn) {
538                 size_t size = btr_get_input_queue_size(ch);
539                 max_size = PARA_MAX(max_size, size);
540         }
541         return max_size;
542 }
543
544 int btr_exec(struct btr_node *btrn, const char *command, char **value_result)
545 {
546         if (!btrn)
547                 return -ERRNO_TO_PARA_ERROR(EINVAL);
548         if (!btrn->execute)
549                 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
550         return btrn->execute(btrn, command, value_result);
551 }
552
553 int btr_exec_up(struct btr_node *btrn, const char *command, char **value_result)
554 {
555         int ret;
556
557         for (; btrn; btrn = btrn->parent) {
558                 struct btr_node *parent = btrn->parent;
559                 if (!parent)
560                         return -ERRNO_TO_PARA_ERROR(ENOTSUP);
561                 if (!parent->execute)
562                         continue;
563                 PARA_INFO_LOG("parent: %s, cmd: %s\n", parent->name, command);
564                 ret = parent->execute(parent, command, value_result);
565                 if (ret == -ERRNO_TO_PARA_ERROR(ENOTSUP))
566                         continue;
567                 if (ret < 0)
568                         return ret;
569                 if (value_result && *value_result)
570                         PARA_NOTICE_LOG("%s(%s): %s\n", command, parent->name,
571                                 *value_result);
572                 return 1;
573         }
574         return -ERRNO_TO_PARA_ERROR(ENOTSUP);
575 }
576
577 void *btr_context(struct btr_node *btrn)
578 {
579         return btrn->context;
580 }
581
582 static bool need_buffer_pool_merge(struct btr_node *btrn)
583 {
584         struct btr_buffer_reference *br = get_first_input_br(btrn);
585
586         if (!br)
587                 return false;
588         if (br->wrap_count != 0)
589                 return true;
590         if (br->btrb->pool)
591                 return true;
592         return false;
593 }
594
595 static void merge_input_pool(struct btr_node *btrn, size_t dest_size)
596 {
597         struct btr_buffer_reference *br, *wbr = NULL;
598         int num_refs; /* including wrap buffer */
599         char *buf, *buf1 = NULL, *buf2 = NULL;
600         size_t sz, sz1 = 0, sz2 = 0, wsz;
601
602         br = get_first_input_br(btrn);
603         if (!br || br_available_bytes(br) >= dest_size)
604                 return;
605         num_refs = 0;
606         FOR_EACH_BUFFER_REF(br, btrn) {
607                 num_refs++;
608                 sz = btr_get_buffer_by_reference(br, &buf);
609                 if (sz == 0)
610                         break;
611                 if (br->wrap_count != 0) {
612                         assert(!wbr);
613                         assert(num_refs == 1);
614                         wbr = br;
615                         if (sz >= dest_size)
616                                 return;
617                         continue;
618                 }
619                 if (!buf1) {
620                         buf1 = buf;
621                         sz1 = sz;
622                         goto next;
623                 }
624                 if (buf1 + sz1 == buf) {
625                         sz1 += sz;
626                         goto next;
627                 }
628                 if (!buf2) {
629                         buf2 = buf;
630                         sz2 = sz;
631                         goto next;
632                 }
633                 assert(buf2 + sz2 == buf);
634                 sz2 += sz;
635 next:
636                 if (sz1 + sz2 >= dest_size)
637                         break;
638         }
639         if (!buf2) /* nothing to do */
640                 return;
641         assert(buf1 && sz2 > 0);
642         /*
643          * If the second buffer is large, we only take the first part of it to
644          * avoid having to memcpy() huge buffers.
645          */
646         sz2 = PARA_MIN(sz2, (size_t)(64 * 1024));
647         if (!wbr) {
648                 /* Make a new wrap buffer combining buf1 and buf2. */
649                 sz = sz1 + sz2;
650                 buf = para_malloc(sz);
651                 PARA_DEBUG_LOG("merging input buffers: (%p:%zu, %p:%zu) -> %p:%zu\n",
652                         buf1, sz1, buf2, sz2, buf, sz);
653                 memcpy(buf, buf1, sz1);
654                 memcpy(buf + sz1, buf2, sz2);
655                 br = para_calloc(sizeof(*br));
656                 br->btrb = new_btrb(buf, sz);
657                 br->btrb->refcount = 1;
658                 br->consumed = 0;
659                 /* This is a wrap buffer */
660                 br->wrap_count = sz1;
661                 para_list_add(&br->node, &btrn->input_queue);
662                 return;
663         }
664         PARA_DEBUG_LOG("increasing wrap buffer, sz1: %zu, sz2: %zu\n", sz1, sz2);
665         /*
666          * We already have a wrap buffer, but it is too small. It might be
667          * partially used.
668          */
669         wsz = br_available_bytes(wbr);
670         if (wbr->wrap_count == sz1 && wbr->btrb->size >= sz1 + sz2) /* nothing we can do about it */
671                 return;
672         sz = sz1 + sz2 - wbr->btrb->size; /* amount of new data */
673         wbr->btrb->size += sz;
674         wbr->btrb->buf = para_realloc(wbr->btrb->buf, wbr->btrb->size);
675         /* copy the new data to the end of the reallocated buffer */
676         assert(sz2 >= sz);
677         memcpy(wbr->btrb->buf + wbr->btrb->size - sz, buf2 + sz2 - sz, sz);
678 }
679
680 /**
681  * Merge the first two input buffers into one.
682  *
683  * This is a quite expensive operation.
684  *
685  * \return The number of buffers that have been available (zero, one or two).
686  */
687 static int merge_input(struct btr_node *btrn)
688 {
689         struct btr_buffer_reference *brs[2], *br;
690         char *bufs[2], *buf;
691         size_t szs[2], sz;
692         int i;
693
694         if (list_empty(&btrn->input_queue))
695                 return 0;
696         if (list_is_singular(&btrn->input_queue))
697                 return 1;
698         i = 0;
699         /* get references to the first two buffers */
700         FOR_EACH_BUFFER_REF(br, btrn) {
701                 brs[i] = br;
702                 szs[i] = btr_get_buffer_by_reference(brs[i], bufs + i);
703                 i++;
704                 if (i == 2)
705                         break;
706         }
707         /* make a new btrb that combines the two buffers and a br to it. */
708         sz = szs[0] + szs[1];
709         buf = para_malloc(sz);
710         PARA_DEBUG_LOG("%s: memory merging input buffers: (%zu, %zu) -> %zu\n",
711                 btrn->name, szs[0], szs[1], sz);
712         memcpy(buf, bufs[0], szs[0]);
713         memcpy(buf + szs[0], bufs[1], szs[1]);
714
715         br = para_calloc(sizeof(*br));
716         br->btrb = new_btrb(buf, sz);
717         br->btrb->refcount = 1;
718
719         /* replace the first two refs by the new one */
720         btr_drop_buffer_reference(brs[0]);
721         btr_drop_buffer_reference(brs[1]);
722         para_list_add(&br->node, &btrn->input_queue);
723         return 2;
724 }
725
726 void btr_merge(struct btr_node *btrn, size_t dest_size)
727 {
728         if (need_buffer_pool_merge(btrn))
729                 return merge_input_pool(btrn, dest_size);
730         for (;;) {
731                 char *buf;
732                 size_t len = btr_next_buffer(btrn, &buf);
733                 if (len >= dest_size)
734                         return;
735                 PARA_DEBUG_LOG("input size = %zu < %zu = dest\n", len, dest_size);
736                 if (merge_input(btrn) < 2)
737                         return;
738         }
739 }
740
741 bool btr_eof(struct btr_node *btrn)
742 {
743         char *buf;
744         size_t len = btr_next_buffer(btrn, &buf);
745
746         return (len == 0 && btr_no_parent(btrn));
747 }
748
749 void log_tree_recursively(struct btr_node *btrn, int loglevel, int depth)
750 {
751         struct btr_node *ch;
752         const char spaces[] = "                 ", *space = spaces + 16 - depth;
753
754         if (depth > 16)
755                 return;
756         para_log(loglevel, "%s%s\n", space, btrn->name);
757         FOR_EACH_CHILD(ch, btrn)
758                 log_tree_recursively(ch, loglevel, depth + 1);
759 }
760
761 void btr_log_tree(struct btr_node *btrn, int loglevel)
762 {
763         return log_tree_recursively(btrn, loglevel, 0);
764 }
765
766 /*
767  * \return \a root if \a name is \p NULL.
768  */
769 struct btr_node *btr_search_node(const char *name, struct btr_node *root)
770 {
771         struct btr_node *ch;
772
773         if (!name)
774                 return root;
775         if (!strcmp(root->name, name))
776                 return root;
777         FOR_EACH_CHILD(ch, root) {
778                 struct btr_node *result = btr_search_node(name, ch);
779                 if (result)
780                         return result;
781         }
782         return NULL;
783 }
784
785 /** 640K ought to be enough for everybody ;) */
786 #define BTRN_MAX_PENDING (640 * 1024)
787
788 int btr_node_status(struct btr_node *btrn, size_t min_iqs,
789         enum btr_node_type type)
790 {
791         size_t iqs;
792
793         assert(btrn);
794         if (type != BTR_NT_LEAF) {
795                 if (btr_no_children(btrn))
796                         return -E_BTR_NO_CHILD;
797                 if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
798                         return 0;
799         }
800         if (type != BTR_NT_ROOT) {
801                 if (btr_eof(btrn))
802                         return -E_BTR_EOF;
803                 iqs = btr_get_input_queue_size(btrn);
804                 if (iqs == 0) /* we have a parent, because not eof */
805                         return 0;
806                 if (iqs < min_iqs && !btr_no_parent(btrn))
807                         return 0;
808         }
809         return 1;
810 }
811
812 void btr_get_node_start(struct btr_node *btrn, struct timeval *tv)
813 {
814         *tv = btrn->start;
815 }