]> git.tuebingen.mpg.de Git - paraslash.git/blob - buffer_tree.c
1ea68917cd5438cffc4d64a5d3292d43fcd7da3a
[paraslash.git] / buffer_tree.c
1 #include <regex.h>
2 #include <stdbool.h>
3
4 #include "para.h"
5 #include "list.h"
6 #include "string.h"
7 #include "buffer_tree.h"
8 #include "error.h"
9 #include "sched.h"
10
11 struct btr_pool {
12         char *name;
13         char *area_start;
14         char *area_end;
15         char *rhead;
16         char *whead;
17 };
18
19 enum btr_buffer_flags {
20         /* changes the way the buffer is deallocated */
21         BTR_BF_BTR_POOL = 1,
22 };
23
24 struct btr_buffer {
25         char *buf;
26         size_t size;
27         /** The number of references to this buffer. */
28         int refcount;
29         struct btr_pool *pool;
30 };
31
32 struct btr_buffer_reference {
33         struct btr_buffer *btrb;
34         size_t consumed;
35         /* Each buffer reference belongs to the buffer queue list of some buffer tree node. */
36         struct list_head node;
37         size_t wrap_count;
38 };
39
40 struct btr_node {
41         char *name;
42         struct btr_node *parent;
43         /* The position of this btr node in the buffer tree. */
44         struct list_head node;
45         /* The children nodes of this btr node are linked together in a list. */
46         struct list_head children;
47         /* Time of first data transfer. */
48         struct timeval start;
49         /**
50          * The input queue is a list of references to btr buffers. Each item on
51          * the list represents an input buffer which has not been completely
52          * used by this btr node.
53          */
54         struct list_head input_queue;
55         btr_command_handler execute;
56         void *context;
57 };
58
59 struct btr_pool *btr_pool_new(const char *name, size_t area_size)
60 {
61         struct btr_pool *btrp;
62
63         PARA_INFO_LOG("%s, %zu bytes\n", name, area_size);
64         btrp = para_malloc(sizeof(*btrp));
65         btrp->area_start = para_malloc(area_size);
66         btrp->area_end = btrp->area_start + area_size;
67         btrp->rhead = btrp->area_start;
68         btrp->whead = btrp->area_start;
69         btrp->name = para_strdup(name);
70         return btrp;
71 }
72
73 /* whead = NULL means area full */
74
75 void btr_pool_free(struct btr_pool *btrp)
76 {
77         if (!btrp)
78                 return;
79         free(btrp->area_start);
80         free(btrp->name);
81         free(btrp);
82 }
83
84 size_t btr_pool_size(struct btr_pool *btrp)
85 {
86         return btrp->area_end - btrp->area_start;
87 }
88
89 size_t btr_pool_filled(struct btr_pool *btrp)
90 {
91         if (!btrp->whead)
92                 return btr_pool_size(btrp);
93         if (btrp->rhead <= btrp->whead)
94                 return  btrp->whead - btrp->rhead;
95         return btr_pool_size(btrp) - (btrp->rhead - btrp->whead);
96 }
97
98 size_t btr_pool_unused(struct btr_pool *btrp)
99 {
100         return btr_pool_size(btrp) - btr_pool_filled(btrp);
101 }
102
103 /*
104  * Return maximal size available for one read. This is
105  * smaller than the value returned by btr_pool_unused().
106  */
107 size_t btr_pool_available(struct btr_pool *btrp)
108 {
109         if (!btrp->whead)
110                 return 0;
111         if (btrp->rhead <= btrp->whead)
112                 return btrp->area_end - btrp->whead;
113         return btrp->rhead - btrp->whead;
114 }
115
116 size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result)
117 {
118         if (result)
119                 *result = btrp->whead;
120         return btr_pool_available(btrp);
121 }
122
123 void btr_pool_allocate(struct btr_pool *btrp, size_t size)
124 {
125         char *end;
126
127         if (size == 0)
128                 return;
129         assert(size <= btr_pool_available(btrp));
130         end = btrp->whead + size;
131         assert(end <= btrp->area_end);
132
133         if (end == btrp->area_end) {
134                 PARA_DEBUG_LOG("%s: end of pool area reached\n", btrp->name);
135                 end = btrp->area_start;
136         }
137         if (end == btrp->rhead) {
138                 PARA_DEBUG_LOG("%s btrp buffer full\n", btrp->name);
139                 end = NULL; /* buffer full */
140         }
141         btrp->whead = end;
142 }
143
144 static void btr_pool_deallocate(struct btr_pool *btrp, size_t size)
145 {
146         char *end = btrp->rhead + size;
147
148         if (size == 0)
149                 return;
150         assert(end <= btrp->area_end);
151         assert(size <= btr_pool_filled(btrp));
152         if (end == btrp->area_end)
153                 end = btrp->area_start;
154         if (!btrp->whead)
155                 btrp->whead = btrp->rhead;
156         btrp->rhead = end;
157         if (btrp->rhead == btrp->whead)
158                 btrp->rhead = btrp->whead = btrp->area_start;
159 }
160
161 #define FOR_EACH_CHILD(_tn, _btrn) list_for_each_entry((_tn), \
162         &((_btrn)->children), node)
163 #define FOR_EACH_CHILD_SAFE(_tn, _tmp, _btrn) \
164         list_for_each_entry_safe((_tn), (_tmp), &((_btrn)->children), node)
165
166 #define FOR_EACH_BUFFER_REF(_br, _btrn) \
167         list_for_each_entry((_br), &(_btrn)->input_queue, node)
168 #define FOR_EACH_BUFFER_REF_SAFE(_br, _tmp, _btrn) \
169         list_for_each_entry_safe((_br), (_tmp), &(_btrn)->input_queue, node)
170
171 /*
172         (parent, child):
173         (NULL, NULL): new, isolated node.
174         (p, NULL): new leaf node
175         (NULL, c): new node becomes root, c must be old root
176         (p, c): new internal node, ch must be child of p, not yet implemented
177
178 */
179 struct btr_node *btr_new_node(struct btr_node_description *bnd)
180 {
181         struct btr_node *btrn = para_malloc(sizeof(*btrn));
182
183         btrn->name = para_strdup(bnd->name);
184         btrn->parent = bnd->parent;
185         btrn->execute = bnd->handler;
186         btrn->context = bnd->context;
187         btrn->start.tv_sec = 0;
188         btrn->start.tv_usec = 0;
189         INIT_LIST_HEAD(&btrn->children);
190         INIT_LIST_HEAD(&btrn->input_queue);
191         if (!bnd->child) {
192                 if (bnd->parent) {
193                         list_add_tail(&btrn->node, &bnd->parent->children);
194                         PARA_INFO_LOG("new leaf node: %s (child of %s)\n",
195                                 bnd->name, bnd->parent->name);
196                 } else
197                         PARA_INFO_LOG("added %s as btr root\n", bnd->name);
198                 goto out;
199         }
200         if (!bnd->parent) {
201                 assert(!bnd->child->parent);
202                 PARA_INFO_LOG("new root: %s (was %s)\n",
203                         bnd->name, bnd->child->name);
204                 btrn->parent = NULL;
205                 list_add_tail(&bnd->child->node, &btrn->children);
206                 /* link it in */
207                 bnd->child->parent = btrn;
208                 goto out;
209         }
210         PARA_EMERG_LOG("inserting internal nodes not yet supported.\n");
211         exit(EXIT_FAILURE);
212         assert(bnd->child->parent == bnd->parent);
213 out:
214         return btrn;
215 }
216
217 /*
218  * Allocate a new btr buffer.
219  *
220  * The freshly allocated buffer will have a zero refcount and will
221  * not be associated with a btr pool.
222  */
223 static struct btr_buffer *new_btrb(char *buf, size_t size)
224 {
225         struct btr_buffer *btrb = para_calloc(sizeof(*btrb));
226
227         btrb->buf = buf;
228         btrb->size = size;
229         return btrb;
230 }
231
232 static void dealloc_buffer(struct btr_buffer *btrb)
233 {
234         if (btrb->pool)
235                 btr_pool_deallocate(btrb->pool, btrb->size);
236         else
237                 free(btrb->buf);
238 }
239
240 static struct btr_buffer_reference *get_first_input_br(struct btr_node *btrn)
241 {
242         if (list_empty(&btrn->input_queue))
243                 return NULL;
244         return list_first_entry(&btrn->input_queue,
245                 struct btr_buffer_reference, node);
246 }
247
248 /*
249  * Deallocate the reference, release the resources if refcount drops to zero.
250  */
251 static void btr_drop_buffer_reference(struct btr_buffer_reference *br)
252 {
253         struct btr_buffer *btrb = br->btrb;
254
255         list_del(&br->node);
256         free(br);
257         btrb->refcount--;
258         if (btrb->refcount == 0) {
259                 dealloc_buffer(btrb);
260                 free(btrb);
261         }
262 }
263
264 static void add_btrb_to_children(struct btr_buffer *btrb,
265                 struct btr_node *btrn, size_t consumed)
266 {
267         struct btr_node *ch;
268
269         if (btrn->start.tv_sec == 0)
270                 btrn->start = *now;
271         FOR_EACH_CHILD(ch, btrn) {
272                 struct btr_buffer_reference *br = para_calloc(sizeof(*br));
273                 br->btrb = btrb;
274                 br->consumed = consumed;
275                 list_add_tail(&br->node, &ch->input_queue);
276                 btrb->refcount++;
277                 if (ch->start.tv_sec == 0)
278                         ch->start = *now;
279         }
280 }
281
282 void btr_add_output(char *buf, size_t size, struct btr_node *btrn)
283 {
284         struct btr_buffer *btrb;
285
286         assert(size != 0);
287         if (list_empty(&btrn->children)) {
288                 free(buf);
289                 return;
290         }
291         btrb = new_btrb(buf, size);
292         add_btrb_to_children(btrb, btrn, 0);
293 }
294
295 void btr_add_output_pool(struct btr_pool *btrp, size_t size,
296                 struct btr_node *btrn)
297 {
298         struct btr_buffer *btrb;
299         char *buf;
300         size_t avail;
301
302         assert(size != 0);
303         if (list_empty(&btrn->children))
304                 return;
305         avail = btr_pool_get_buffer(btrp, &buf);
306         assert(avail >= size);
307         btr_pool_allocate(btrp, size);
308         btrb = new_btrb(buf, size);
309         btrb->pool = btrp;
310         add_btrb_to_children(btrb, btrn, 0);
311 }
312
313 void btr_copy(const void *src, size_t n, struct btr_pool *btrp,
314         struct btr_node *btrn)
315 {
316         char *buf;
317         size_t sz, copy;
318
319         if (n == 0)
320                 return;
321         assert(n <= btr_pool_unused(btrp));
322         sz = btr_pool_get_buffer(btrp, &buf);
323         copy = PARA_MIN(sz, n);
324         memcpy(buf, src, copy);
325         btr_add_output_pool(btrp, copy, btrn);
326         if (copy == n)
327                 return;
328         sz = btr_pool_get_buffer(btrp, &buf);
329         assert(sz >= n - copy);
330         memcpy(buf, src + copy, n - copy);
331         btr_add_output_pool(btrp, n - copy, btrn);
332 }
333
334 static void btr_pushdown_br(struct btr_buffer_reference *br, struct btr_node *btrn)
335 {
336         add_btrb_to_children(br->btrb, btrn, br->consumed);
337         btr_drop_buffer_reference(br);
338 }
339
340 void btr_pushdown(struct btr_node *btrn)
341 {
342         struct btr_buffer_reference *br, *tmp;
343
344         FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
345                 btr_pushdown_br(br, btrn);
346 }
347
348 int btr_pushdown_one(struct btr_node *btrn)
349 {
350         struct btr_buffer_reference *br;
351
352         if (list_empty(&btrn->input_queue))
353                 return 0;
354         br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
355         btr_pushdown_br(br, btrn);
356         return 1;
357 }
358
359 /* Return true if this node has no children. */
360 bool btr_no_children(struct btr_node *btrn)
361 {
362         return list_empty(&btrn->children);
363 }
364
365 bool btr_no_parent(struct btr_node *btrn)
366 {
367         return !btrn->parent;
368 }
369
370 bool btr_inplace_ok(struct btr_node *btrn)
371 {
372         if (!btrn->parent)
373                 return true;
374         return list_is_singular(&btrn->parent->children);
375 }
376
377 static inline size_t br_available_bytes(struct btr_buffer_reference *br)
378 {
379         return br->btrb->size - br->consumed;
380 }
381
382 size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
383 {
384         if (buf)
385                 *buf = br->btrb->buf + br->consumed;
386         return br_available_bytes(br);
387 }
388
389 /**
390  * \return zero if the input buffer queue is empty.
391  */
392 size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
393 {
394         struct btr_buffer_reference *br;
395         char *buf, *result = NULL;
396         size_t sz, rv = 0;
397
398         FOR_EACH_BUFFER_REF(br, btrn) {
399                 sz = btr_get_buffer_by_reference(br, &buf);
400                 if (!result) {
401                         result = buf;
402                         rv = sz;
403                         if (!br->btrb->pool)
404                                 break;
405                         continue;
406                 }
407                 if (!br->btrb->pool)
408                         break;
409                 if (result + rv != buf)
410                         break;
411                 rv += sz;
412         }
413         if (bufp)
414                 *bufp = result;
415         return rv;
416 }
417
418 void btr_consume(struct btr_node *btrn, size_t numbytes)
419 {
420         struct btr_buffer_reference *br, *tmp;
421         size_t sz;
422
423         if (numbytes == 0)
424                 return;
425         br = get_first_input_br(btrn);
426         assert(br);
427
428         if (br->wrap_count == 0) {
429                 /*
430                  * No wrap buffer. Drop buffer references whose buffer
431                  * has been fully used. */
432                 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn) {
433                         if (br->consumed + numbytes <= br->btrb->size) {
434                                 br->consumed += numbytes;
435                                 if (br->consumed == br->btrb->size)
436                                         btr_drop_buffer_reference(br);
437                                 return;
438                         }
439                         numbytes -= br->btrb->size - br->consumed;
440                         btr_drop_buffer_reference(br);
441                 }
442                 assert(true);
443         }
444         /*
445
446         We have a wrap buffer, consume from it. If in total,
447         i.e. including previous calls to brt_consume(), less than
448         wrap_count has been consumed, there's nothing more we can do.
449
450         Otherwise we drop the wrap buffer and consume from subsequent
451         buffers of the input queue the correct amount of bytes. This
452         is the total number of bytes that have been consumed from the
453         wrap buffer.
454 */
455         PARA_DEBUG_LOG("consuming %zu/%zu bytes from wrap buffer\n", numbytes,
456                 br_available_bytes(br));
457
458         assert(numbytes <= br_available_bytes(br));
459         if (br->consumed + numbytes < br->wrap_count) {
460                 br->consumed += numbytes;
461                 return;
462         }
463         PARA_DEBUG_LOG("dropping wrap buffer (%zu bytes)\n", br->btrb->size);
464         /* get rid of the wrap buffer */
465         sz = br->consumed + numbytes;
466         btr_drop_buffer_reference(br);
467         return btr_consume(btrn, sz);
468 }
469
470 static void flush_input_queue(struct btr_node *btrn)
471 {
472         struct btr_buffer_reference *br, *tmp;
473         FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
474                 btr_drop_buffer_reference(br);
475 }
476
477 void btr_free_node(struct btr_node *btrn)
478 {
479         if (!btrn)
480                 return;
481         free(btrn->name);
482         free(btrn);
483 }
484
485 void btr_remove_node(struct btr_node *btrn)
486 {
487         struct btr_node *ch;
488
489         if (!btrn)
490                 return;
491         PARA_NOTICE_LOG("removing btr node %s from buffer tree\n", btrn->name);
492         FOR_EACH_CHILD(ch, btrn)
493                 ch->parent = NULL;
494         flush_input_queue(btrn);
495         if (btrn->parent)
496                 list_del(&btrn->node);
497 }
498
499 size_t btr_get_input_queue_size(struct btr_node *btrn)
500 {
501         struct btr_buffer_reference *br;
502         size_t size = 0, wrap_consumed = 0;
503
504         FOR_EACH_BUFFER_REF(br, btrn) {
505                 if (br->wrap_count != 0) {
506                         wrap_consumed = br->consumed;
507                         continue;
508                 }
509                 size += br_available_bytes(br);
510         }
511         assert(wrap_consumed <= size);
512         size -= wrap_consumed;
513         return size;
514 }
515
516 void btr_splice_out_node(struct btr_node *btrn)
517 {
518         struct btr_node *ch, *tmp;
519
520         assert(btrn);
521         PARA_NOTICE_LOG("splicing out %s\n", btrn->name);
522         btr_pushdown(btrn);
523         if (btrn->parent)
524                 list_del(&btrn->node);
525         FOR_EACH_CHILD_SAFE(ch, tmp, btrn) {
526                 PARA_INFO_LOG("parent(%s): %s\n", ch->name,
527                         btrn->parent? btrn->parent->name : "NULL");
528                 ch->parent = btrn->parent;
529                 if (btrn->parent)
530                         list_move(&ch->node, &btrn->parent->children);
531         }
532         assert(list_empty(&btrn->children));
533 }
534
535 /**
536  * Return the size of the largest input queue.
537  *
538  * Iterates over all children of the given node.
539  */
540 size_t btr_bytes_pending(struct btr_node *btrn)
541 {
542         size_t max_size = 0;
543         struct btr_node *ch;
544
545         FOR_EACH_CHILD(ch, btrn) {
546                 size_t size = btr_get_input_queue_size(ch);
547                 max_size = PARA_MAX(max_size, size);
548         }
549         return max_size;
550 }
551
552 int btr_exec(struct btr_node *btrn, const char *command, char **value_result)
553 {
554         if (!btrn)
555                 return -ERRNO_TO_PARA_ERROR(EINVAL);
556         if (!btrn->execute)
557                 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
558         return btrn->execute(btrn, command, value_result);
559 }
560
561 int btr_exec_up(struct btr_node *btrn, const char *command, char **value_result)
562 {
563         int ret;
564
565         for (; btrn; btrn = btrn->parent) {
566                 struct btr_node *parent = btrn->parent;
567                 if (!parent)
568                         return -ERRNO_TO_PARA_ERROR(ENOTSUP);
569                 if (!parent->execute)
570                         continue;
571                 PARA_INFO_LOG("parent: %s, cmd: %s\n", parent->name, command);
572                 ret = parent->execute(parent, command, value_result);
573                 if (ret == -ERRNO_TO_PARA_ERROR(ENOTSUP))
574                         continue;
575                 if (ret < 0)
576                         return ret;
577                 if (value_result && *value_result)
578                         PARA_NOTICE_LOG("%s(%s): %s\n", command, parent->name,
579                                 *value_result);
580                 return 1;
581         }
582         return -ERRNO_TO_PARA_ERROR(ENOTSUP);
583 }
584
585 void *btr_context(struct btr_node *btrn)
586 {
587         return btrn->context;
588 }
589
590 static bool need_buffer_pool_merge(struct btr_node *btrn)
591 {
592         struct btr_buffer_reference *br = get_first_input_br(btrn);
593
594         if (!br)
595                 return false;
596         if (br->wrap_count != 0)
597                 return true;
598         if (br->btrb->pool)
599                 return true;
600         return false;
601 }
602
603 static void merge_input_pool(struct btr_node *btrn, size_t dest_size)
604 {
605         struct btr_buffer_reference *br, *wbr = NULL;
606         int num_refs; /* including wrap buffer */
607         char *buf, *buf1 = NULL, *buf2 = NULL;
608         size_t sz, sz1 = 0, sz2 = 0, wsz;
609
610         br = get_first_input_br(btrn);
611         if (!br || br_available_bytes(br) >= dest_size)
612                 return;
613         num_refs = 0;
614         FOR_EACH_BUFFER_REF(br, btrn) {
615                 num_refs++;
616                 sz = btr_get_buffer_by_reference(br, &buf);
617                 if (sz == 0)
618                         break;
619                 if (br->wrap_count != 0) {
620                         assert(!wbr);
621                         assert(num_refs == 1);
622                         wbr = br;
623                         if (sz >= dest_size)
624                                 return;
625                         continue;
626                 }
627                 if (!buf1) {
628                         buf1 = buf;
629                         sz1 = sz;
630                         goto next;
631                 }
632                 if (buf1 + sz1 == buf) {
633                         sz1 += sz;
634                         goto next;
635                 }
636                 if (!buf2) {
637                         buf2 = buf;
638                         sz2 = sz;
639                         goto next;
640                 }
641                 assert(buf2 + sz2 == buf);
642                 sz2 += sz;
643 next:
644                 if (sz1 + sz2 >= dest_size)
645                         break;
646         }
647         if (!buf2) /* nothing to do */
648                 return;
649         assert(buf1 && sz2 > 0);
650         /*
651          * If the second buffer is large, we only take the first part of it to
652          * avoid having to memcpy() huge buffers.
653          */
654         sz2 = PARA_MIN(sz2, (size_t)(64 * 1024));
655         if (!wbr) {
656                 /* Make a new wrap buffer combining buf1 and buf2. */
657                 sz = sz1 + sz2;
658                 buf = para_malloc(sz);
659                 PARA_DEBUG_LOG("merging input buffers: (%p:%zu, %p:%zu) -> %p:%zu\n",
660                         buf1, sz1, buf2, sz2, buf, sz);
661                 memcpy(buf, buf1, sz1);
662                 memcpy(buf + sz1, buf2, sz2);
663                 br = para_calloc(sizeof(*br));
664                 br->btrb = new_btrb(buf, sz);
665                 br->btrb->refcount = 1;
666                 br->consumed = 0;
667                 /* This is a wrap buffer */
668                 br->wrap_count = sz1;
669                 para_list_add(&br->node, &btrn->input_queue);
670                 return;
671         }
672         PARA_DEBUG_LOG("increasing wrap buffer, sz1: %zu, sz2: %zu\n", sz1, sz2);
673         /*
674          * We already have a wrap buffer, but it is too small. It might be
675          * partially used.
676          */
677         wsz = br_available_bytes(wbr);
678         if (wbr->wrap_count == sz1 && wbr->btrb->size >= sz1 + sz2) /* nothing we can do about it */
679                 return;
680         sz = sz1 + sz2 - wbr->btrb->size; /* amount of new data */
681         wbr->btrb->size += sz;
682         wbr->btrb->buf = para_realloc(wbr->btrb->buf, wbr->btrb->size);
683         /* copy the new data to the end of the reallocated buffer */
684         assert(sz2 >= sz);
685         memcpy(wbr->btrb->buf + wbr->btrb->size - sz, buf2 + sz2 - sz, sz);
686 }
687
688 /**
689  * Merge the first two input buffers into one.
690  *
691  * This is a quite expensive operation.
692  *
693  * \return The number of buffers that have been available (zero, one or two).
694  */
695 static int merge_input(struct btr_node *btrn)
696 {
697         struct btr_buffer_reference *brs[2], *br;
698         char *bufs[2], *buf;
699         size_t szs[2], sz;
700         int i;
701
702         if (list_empty(&btrn->input_queue))
703                 return 0;
704         if (list_is_singular(&btrn->input_queue))
705                 return 1;
706         i = 0;
707         /* get references to the first two buffers */
708         FOR_EACH_BUFFER_REF(br, btrn) {
709                 brs[i] = br;
710                 szs[i] = btr_get_buffer_by_reference(brs[i], bufs + i);
711                 i++;
712                 if (i == 2)
713                         break;
714         }
715         /* make a new btrb that combines the two buffers and a br to it. */
716         sz = szs[0] + szs[1];
717         buf = para_malloc(sz);
718         PARA_DEBUG_LOG("%s: memory merging input buffers: (%zu, %zu) -> %zu\n",
719                 btrn->name, szs[0], szs[1], sz);
720         memcpy(buf, bufs[0], szs[0]);
721         memcpy(buf + szs[0], bufs[1], szs[1]);
722
723         br = para_calloc(sizeof(*br));
724         br->btrb = new_btrb(buf, sz);
725         br->btrb->refcount = 1;
726
727         /* replace the first two refs by the new one */
728         btr_drop_buffer_reference(brs[0]);
729         btr_drop_buffer_reference(brs[1]);
730         para_list_add(&br->node, &btrn->input_queue);
731         return 2;
732 }
733
734 void btr_merge(struct btr_node *btrn, size_t dest_size)
735 {
736         if (need_buffer_pool_merge(btrn))
737                 return merge_input_pool(btrn, dest_size);
738         for (;;) {
739                 char *buf;
740                 size_t len = btr_next_buffer(btrn, &buf);
741                 if (len >= dest_size)
742                         return;
743                 PARA_DEBUG_LOG("input size = %zu < %zu = dest\n", len, dest_size);
744                 if (merge_input(btrn) < 2)
745                         return;
746         }
747 }
748
749 bool btr_eof(struct btr_node *btrn)
750 {
751         char *buf;
752         size_t len = btr_next_buffer(btrn, &buf);
753
754         return (len == 0 && btr_no_parent(btrn));
755 }
756
757 void log_tree_recursively(struct btr_node *btrn, int loglevel, int depth)
758 {
759         struct btr_node *ch;
760         const char spaces[] = "                 ", *space = spaces + 16 - depth;
761
762         if (depth > 16)
763                 return;
764         para_log(loglevel, "%s%s\n", space, btrn->name);
765         FOR_EACH_CHILD(ch, btrn)
766                 log_tree_recursively(ch, loglevel, depth + 1);
767 }
768
769 void btr_log_tree(struct btr_node *btrn, int loglevel)
770 {
771         return log_tree_recursively(btrn, loglevel, 0);
772 }
773
774 /*
775  * \return \a root if \a name is \p NULL.
776  */
777 struct btr_node *btr_search_node(const char *name, struct btr_node *root)
778 {
779         struct btr_node *ch;
780
781         if (!name)
782                 return root;
783         if (!strcmp(root->name, name))
784                 return root;
785         FOR_EACH_CHILD(ch, root) {
786                 struct btr_node *result = btr_search_node(name, ch);
787                 if (result)
788                         return result;
789         }
790         return NULL;
791 }
792
793 /** 640K ought to be enough for everybody ;) */
794 #define BTRN_MAX_PENDING (640 * 1024)
795
796 int btr_node_status(struct btr_node *btrn, size_t min_iqs,
797         enum btr_node_type type)
798 {
799         size_t iqs;
800
801         assert(btrn);
802         if (type != BTR_NT_LEAF) {
803                 if (btr_no_children(btrn))
804                         return -E_BTR_NO_CHILD;
805                 if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
806                         return 0;
807         }
808         if (type != BTR_NT_ROOT) {
809                 if (btr_eof(btrn))
810                         return -E_BTR_EOF;
811                 iqs = btr_get_input_queue_size(btrn);
812                 if (iqs == 0) /* we have a parent, because not eof */
813                         return 0;
814                 if (iqs < min_iqs && !btr_no_parent(btrn))
815                         return 0;
816         }
817         return 1;
818 }
819
820 void btr_get_node_start(struct btr_node *btrn, struct timeval *tv)
821 {
822         *tv = btrn->start;
823 }