]> git.tuebingen.mpg.de Git - paraslash.git/blob - buffer_tree.c
[btr] Add more documentation.
[paraslash.git] / buffer_tree.c
1 #include <regex.h>
2 #include <stdbool.h>
3
4 #include "para.h"
5 #include "list.h"
6 #include "string.h"
7 #include "buffer_tree.h"
8 #include "error.h"
9 #include "sched.h"
10
11 /* whead = NULL means area full */
12 struct btr_pool {
13         char *name;
14         char *area_start;
15         char *area_end;
16         char *rhead;
17         char *whead;
18 };
19
20 enum btr_buffer_flags {
21         /* changes the way the buffer is deallocated */
22         BTR_BF_BTR_POOL = 1,
23 };
24
25 struct btr_buffer {
26         char *buf;
27         size_t size;
28         /** The number of references to this buffer. */
29         int refcount;
30         struct btr_pool *pool;
31 };
32
33 struct btr_buffer_reference {
34         struct btr_buffer *btrb;
35         size_t consumed;
36         /* Each buffer reference belongs to the buffer queue list of some buffer tree node. */
37         struct list_head node;
38         size_t wrap_count;
39 };
40
41 struct btr_node {
42         char *name;
43         struct btr_node *parent;
44         /* The position of this btr node in the buffer tree. */
45         struct list_head node;
46         /* The children nodes of this btr node are linked together in a list. */
47         struct list_head children;
48         /* Time of first data transfer. */
49         struct timeval start;
50         /**
51          * The input queue is a list of references to btr buffers. Each item on
52          * the list represents an input buffer which has not been completely
53          * used by this btr node.
54          */
55         struct list_head input_queue;
56         btr_command_handler execute;
57         void *context;
58 };
59
60 /**
61  * Create a new buffer pool.
62  *
63  * \param name The name of the new buffer pool.
64  *
65  * \param area The size in bytes of the pool area.
66  *
67  * \return An opaque pointer to the newly created buffer pool. It must be
68  * passed to btr_pool_free() after it is no longer used to deallocate all
69  * resources.
70  */
71 struct btr_pool *btr_pool_new(const char *name, size_t area_size)
72 {
73         struct btr_pool *btrp;
74
75         PARA_INFO_LOG("%s, %zu bytes\n", name, area_size);
76         btrp = para_malloc(sizeof(*btrp));
77         btrp->area_start = para_malloc(area_size);
78         btrp->area_end = btrp->area_start + area_size;
79         btrp->rhead = btrp->area_start;
80         btrp->whead = btrp->area_start;
81         btrp->name = para_strdup(name);
82         return btrp;
83 }
84
85 /**
86  * Dellocate resources used by a buffer pool.
87  *
88  * \param btrp A pointer obtained via btr_pool_new().
89  */
90 void btr_pool_free(struct btr_pool *btrp)
91 {
92         if (!btrp)
93                 return;
94         free(btrp->area_start);
95         free(btrp->name);
96         free(btrp);
97 }
98
99 /**
100  * Return the size of the buffer pool area.
101  *
102  * \param btrp The buffer pool.
103  *
104  * \return The same value which was passed during creation time to
105  * btr_pool_new().
106  */
107 size_t btr_pool_size(struct btr_pool *btrp)
108 {
109         return btrp->area_end - btrp->area_start;
110 }
111
112 size_t btr_pool_filled(struct btr_pool *btrp)
113 {
114         if (!btrp->whead)
115                 return btr_pool_size(btrp);
116         if (btrp->rhead <= btrp->whead)
117                 return  btrp->whead - btrp->rhead;
118         return btr_pool_size(btrp) - (btrp->rhead - btrp->whead);
119 }
120
121 size_t btr_pool_unused(struct btr_pool *btrp)
122 {
123         return btr_pool_size(btrp) - btr_pool_filled(btrp);
124 }
125
126 /*
127  * Return maximal size available for one read. This is
128  * smaller than the value returned by btr_pool_unused().
129  */
130 size_t btr_pool_available(struct btr_pool *btrp)
131 {
132         if (!btrp->whead)
133                 return 0;
134         if (btrp->rhead <= btrp->whead)
135                 return btrp->area_end - btrp->whead;
136         return btrp->rhead - btrp->whead;
137 }
138
139 /**
140  * Obtain the current write head.
141  *
142  * \param btrp The buffer pool.
143  * \param result The write head is returned here.
144  *
145  * \return The maximal amount of bytes that may be written to the returned
146  * buffer.
147  */
148 size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result)
149 {
150         if (result)
151                 *result = btrp->whead;
152         return btr_pool_available(btrp);
153 }
154
155 /**
156  * Mark a part of the buffer pool area as allocated.
157  *
158  * \param btrp The buffer pool.
159  * \param size The amount of bytes to be allocated.
160  *
161  * This is usually called after the caller wrote to the buffer obtained by
162  * btr_pool_get_buffer().
163  */
164 static void btr_pool_allocate(struct btr_pool *btrp, size_t size)
165 {
166         char *end;
167
168         if (size == 0)
169                 return;
170         assert(size <= btr_pool_available(btrp));
171         end = btrp->whead + size;
172         assert(end <= btrp->area_end);
173
174         if (end == btrp->area_end) {
175                 PARA_DEBUG_LOG("%s: end of pool area reached\n", btrp->name);
176                 end = btrp->area_start;
177         }
178         if (end == btrp->rhead) {
179                 PARA_DEBUG_LOG("%s btrp buffer full\n", btrp->name);
180                 end = NULL; /* buffer full */
181         }
182         btrp->whead = end;
183 }
184
185 static void btr_pool_deallocate(struct btr_pool *btrp, size_t size)
186 {
187         char *end = btrp->rhead + size;
188
189         if (size == 0)
190                 return;
191         assert(end <= btrp->area_end);
192         assert(size <= btr_pool_filled(btrp));
193         if (end == btrp->area_end)
194                 end = btrp->area_start;
195         if (!btrp->whead)
196                 btrp->whead = btrp->rhead;
197         btrp->rhead = end;
198         if (btrp->rhead == btrp->whead)
199                 btrp->rhead = btrp->whead = btrp->area_start;
200 }
201
202 #define FOR_EACH_CHILD(_tn, _btrn) list_for_each_entry((_tn), \
203         &((_btrn)->children), node)
204 #define FOR_EACH_CHILD_SAFE(_tn, _tmp, _btrn) \
205         list_for_each_entry_safe((_tn), (_tmp), &((_btrn)->children), node)
206
207 #define FOR_EACH_BUFFER_REF(_br, _btrn) \
208         list_for_each_entry((_br), &(_btrn)->input_queue, node)
209 #define FOR_EACH_BUFFER_REF_SAFE(_br, _tmp, _btrn) \
210         list_for_each_entry_safe((_br), (_tmp), &(_btrn)->input_queue, node)
211
212 struct btr_node *btr_new_node(struct btr_node_description *bnd)
213 {
214         struct btr_node *btrn = para_malloc(sizeof(*btrn));
215
216         btrn->name = para_strdup(bnd->name);
217         btrn->parent = bnd->parent;
218         btrn->execute = bnd->handler;
219         btrn->context = bnd->context;
220         btrn->start.tv_sec = 0;
221         btrn->start.tv_usec = 0;
222         INIT_LIST_HEAD(&btrn->children);
223         INIT_LIST_HEAD(&btrn->input_queue);
224         if (!bnd->child) {
225                 if (bnd->parent) {
226                         list_add_tail(&btrn->node, &bnd->parent->children);
227                         PARA_INFO_LOG("new leaf node: %s (child of %s)\n",
228                                 bnd->name, bnd->parent->name);
229                 } else
230                         PARA_INFO_LOG("added %s as btr root\n", bnd->name);
231                 goto out;
232         }
233         if (!bnd->parent) {
234                 assert(!bnd->child->parent);
235                 PARA_INFO_LOG("new root: %s (was %s)\n",
236                         bnd->name, bnd->child->name);
237                 btrn->parent = NULL;
238                 list_add_tail(&bnd->child->node, &btrn->children);
239                 /* link it in */
240                 bnd->child->parent = btrn;
241                 goto out;
242         }
243         PARA_EMERG_LOG("inserting internal nodes not yet supported.\n");
244         exit(EXIT_FAILURE);
245         assert(bnd->child->parent == bnd->parent);
246 out:
247         return btrn;
248 }
249
250 /*
251  * Allocate a new btr buffer.
252  *
253  * The freshly allocated buffer will have a zero refcount and will
254  * not be associated with a btr pool.
255  */
256 static struct btr_buffer *new_btrb(char *buf, size_t size)
257 {
258         struct btr_buffer *btrb = para_calloc(sizeof(*btrb));
259
260         btrb->buf = buf;
261         btrb->size = size;
262         return btrb;
263 }
264
265 static void dealloc_buffer(struct btr_buffer *btrb)
266 {
267         if (btrb->pool)
268                 btr_pool_deallocate(btrb->pool, btrb->size);
269         else
270                 free(btrb->buf);
271 }
272
273 static struct btr_buffer_reference *get_first_input_br(struct btr_node *btrn)
274 {
275         if (list_empty(&btrn->input_queue))
276                 return NULL;
277         return list_first_entry(&btrn->input_queue,
278                 struct btr_buffer_reference, node);
279 }
280
281 /*
282  * Deallocate the reference, release the resources if refcount drops to zero.
283  */
284 static void btr_drop_buffer_reference(struct btr_buffer_reference *br)
285 {
286         struct btr_buffer *btrb = br->btrb;
287
288         list_del(&br->node);
289         free(br);
290         btrb->refcount--;
291         if (btrb->refcount == 0) {
292                 dealloc_buffer(btrb);
293                 free(btrb);
294         }
295 }
296
297 static void add_btrb_to_children(struct btr_buffer *btrb,
298                 struct btr_node *btrn, size_t consumed)
299 {
300         struct btr_node *ch;
301
302         if (btrn->start.tv_sec == 0)
303                 btrn->start = *now;
304         FOR_EACH_CHILD(ch, btrn) {
305                 struct btr_buffer_reference *br = para_calloc(sizeof(*br));
306                 br->btrb = btrb;
307                 br->consumed = consumed;
308                 list_add_tail(&br->node, &ch->input_queue);
309                 btrb->refcount++;
310                 if (ch->start.tv_sec == 0)
311                         ch->start = *now;
312         }
313 }
314
315 void btr_add_output(char *buf, size_t size, struct btr_node *btrn)
316 {
317         struct btr_buffer *btrb;
318
319         assert(size != 0);
320         if (list_empty(&btrn->children)) {
321                 free(buf);
322                 return;
323         }
324         btrb = new_btrb(buf, size);
325         add_btrb_to_children(btrb, btrn, 0);
326 }
327
328 /**
329  * Feed data to child nodes of the buffer tree.
330  *
331  * \param btrp The buffer pool.
332  * 
333  */
334 void btr_add_output_pool(struct btr_pool *btrp, size_t size,
335                 struct btr_node *btrn)
336 {
337         struct btr_buffer *btrb;
338         char *buf;
339         size_t avail;
340
341         assert(size != 0);
342         if (list_empty(&btrn->children))
343                 return;
344         avail = btr_pool_get_buffer(btrp, &buf);
345         assert(avail >= size);
346         btr_pool_allocate(btrp, size);
347         btrb = new_btrb(buf, size);
348         btrb->pool = btrp;
349         add_btrb_to_children(btrb, btrn, 0);
350 }
351
352 /**
353  * Copy data to write head of a buffer pool and feed it to all children nodes.
354  *
355  * \param src The source buffer.
356  * \param n The size of the source buffer in bytes.
357  * \param btrp The destination buffer pool.
358  * \param btrn Add the data as output of this node.
359  *
360  * This is expensive. The caller must make sure the data fits into the buffer
361  * pool area.
362  */
363 void btr_copy(const void *src, size_t n, struct btr_pool *btrp,
364         struct btr_node *btrn)
365 {
366         char *buf;
367         size_t sz, copy;
368
369         if (n == 0)
370                 return;
371         assert(n <= btr_pool_unused(btrp));
372         sz = btr_pool_get_buffer(btrp, &buf);
373         copy = PARA_MIN(sz, n);
374         memcpy(buf, src, copy);
375         btr_add_output_pool(btrp, copy, btrn);
376         if (copy == n)
377                 return;
378         sz = btr_pool_get_buffer(btrp, &buf);
379         assert(sz >= n - copy);
380         memcpy(buf, src + copy, n - copy);
381         btr_add_output_pool(btrp, n - copy, btrn);
382 }
383
384 static void btr_pushdown_br(struct btr_buffer_reference *br, struct btr_node *btrn)
385 {
386         add_btrb_to_children(br->btrb, btrn, br->consumed);
387         btr_drop_buffer_reference(br);
388 }
389
390 void btr_pushdown(struct btr_node *btrn)
391 {
392         struct btr_buffer_reference *br, *tmp;
393
394         FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
395                 btr_pushdown_br(br, btrn);
396 }
397
398 int btr_pushdown_one(struct btr_node *btrn)
399 {
400         struct btr_buffer_reference *br;
401
402         if (list_empty(&btrn->input_queue))
403                 return 0;
404         br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
405         btr_pushdown_br(br, btrn);
406         return 1;
407 }
408
409 /* Return true if this node has no children. */
410 bool btr_no_children(struct btr_node *btrn)
411 {
412         return list_empty(&btrn->children);
413 }
414
415 bool btr_no_parent(struct btr_node *btrn)
416 {
417         return !btrn->parent;
418 }
419
420 bool btr_inplace_ok(struct btr_node *btrn)
421 {
422         if (!btrn->parent)
423                 return true;
424         return list_is_singular(&btrn->parent->children);
425 }
426
427 static inline size_t br_available_bytes(struct btr_buffer_reference *br)
428 {
429         return br->btrb->size - br->consumed;
430 }
431
432 size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
433 {
434         if (buf)
435                 *buf = br->btrb->buf + br->consumed;
436         return br_available_bytes(br);
437 }
438
439 /**
440  * \return zero if the input buffer queue is empty.
441  */
442 size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
443 {
444         struct btr_buffer_reference *br;
445         char *buf, *result = NULL;
446         size_t sz, rv = 0;
447
448         FOR_EACH_BUFFER_REF(br, btrn) {
449                 sz = btr_get_buffer_by_reference(br, &buf);
450                 if (!result) {
451                         result = buf;
452                         rv = sz;
453                         if (!br->btrb->pool)
454                                 break;
455                         continue;
456                 }
457                 if (!br->btrb->pool)
458                         break;
459                 if (result + rv != buf)
460                         break;
461                 rv += sz;
462         }
463         if (bufp)
464                 *bufp = result;
465         return rv;
466 }
467
468 void btr_consume(struct btr_node *btrn, size_t numbytes)
469 {
470         struct btr_buffer_reference *br, *tmp;
471         size_t sz;
472
473         if (numbytes == 0)
474                 return;
475         br = get_first_input_br(btrn);
476         assert(br);
477
478         if (br->wrap_count == 0) {
479                 /*
480                  * No wrap buffer. Drop buffer references whose buffer
481                  * has been fully used. */
482                 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn) {
483                         if (br->consumed + numbytes <= br->btrb->size) {
484                                 br->consumed += numbytes;
485                                 if (br->consumed == br->btrb->size)
486                                         btr_drop_buffer_reference(br);
487                                 return;
488                         }
489                         numbytes -= br->btrb->size - br->consumed;
490                         btr_drop_buffer_reference(br);
491                 }
492                 assert(true);
493         }
494         /*
495
496         We have a wrap buffer, consume from it. If in total,
497         i.e. including previous calls to brt_consume(), less than
498         wrap_count has been consumed, there's nothing more we can do.
499
500         Otherwise we drop the wrap buffer and consume from subsequent
501         buffers of the input queue the correct amount of bytes. This
502         is the total number of bytes that have been consumed from the
503         wrap buffer.
504 */
505         PARA_DEBUG_LOG("consuming %zu/%zu bytes from wrap buffer\n", numbytes,
506                 br_available_bytes(br));
507
508         assert(numbytes <= br_available_bytes(br));
509         if (br->consumed + numbytes < br->wrap_count) {
510                 br->consumed += numbytes;
511                 return;
512         }
513         PARA_DEBUG_LOG("dropping wrap buffer (%zu bytes)\n", br->btrb->size);
514         /* get rid of the wrap buffer */
515         sz = br->consumed + numbytes;
516         btr_drop_buffer_reference(br);
517         return btr_consume(btrn, sz);
518 }
519
520 static void flush_input_queue(struct btr_node *btrn)
521 {
522         struct btr_buffer_reference *br, *tmp;
523         FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
524                 btr_drop_buffer_reference(br);
525 }
526
527 void btr_free_node(struct btr_node *btrn)
528 {
529         if (!btrn)
530                 return;
531         free(btrn->name);
532         free(btrn);
533 }
534
535 void btr_remove_node(struct btr_node *btrn)
536 {
537         struct btr_node *ch;
538
539         if (!btrn)
540                 return;
541         PARA_NOTICE_LOG("removing btr node %s from buffer tree\n", btrn->name);
542         FOR_EACH_CHILD(ch, btrn)
543                 ch->parent = NULL;
544         flush_input_queue(btrn);
545         if (btrn->parent)
546                 list_del(&btrn->node);
547 }
548
549 size_t btr_get_input_queue_size(struct btr_node *btrn)
550 {
551         struct btr_buffer_reference *br;
552         size_t size = 0, wrap_consumed = 0;
553
554         FOR_EACH_BUFFER_REF(br, btrn) {
555                 if (br->wrap_count != 0) {
556                         wrap_consumed = br->consumed;
557                         continue;
558                 }
559                 size += br_available_bytes(br);
560         }
561         assert(wrap_consumed <= size);
562         size -= wrap_consumed;
563         return size;
564 }
565
566 void btr_splice_out_node(struct btr_node *btrn)
567 {
568         struct btr_node *ch, *tmp;
569
570         assert(btrn);
571         PARA_NOTICE_LOG("splicing out %s\n", btrn->name);
572         btr_pushdown(btrn);
573         if (btrn->parent)
574                 list_del(&btrn->node);
575         FOR_EACH_CHILD_SAFE(ch, tmp, btrn) {
576                 PARA_INFO_LOG("parent(%s): %s\n", ch->name,
577                         btrn->parent? btrn->parent->name : "NULL");
578                 ch->parent = btrn->parent;
579                 if (btrn->parent)
580                         list_move(&ch->node, &btrn->parent->children);
581         }
582         assert(list_empty(&btrn->children));
583 }
584
585 /**
586  * Return the size of the largest input queue.
587  *
588  * Iterates over all children of the given node.
589  */
590 size_t btr_bytes_pending(struct btr_node *btrn)
591 {
592         size_t max_size = 0;
593         struct btr_node *ch;
594
595         FOR_EACH_CHILD(ch, btrn) {
596                 size_t size = btr_get_input_queue_size(ch);
597                 max_size = PARA_MAX(max_size, size);
598         }
599         return max_size;
600 }
601
602 int btr_exec(struct btr_node *btrn, const char *command, char **value_result)
603 {
604         if (!btrn)
605                 return -ERRNO_TO_PARA_ERROR(EINVAL);
606         if (!btrn->execute)
607                 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
608         return btrn->execute(btrn, command, value_result);
609 }
610
611 int btr_exec_up(struct btr_node *btrn, const char *command, char **value_result)
612 {
613         int ret;
614
615         for (; btrn; btrn = btrn->parent) {
616                 struct btr_node *parent = btrn->parent;
617                 if (!parent)
618                         return -ERRNO_TO_PARA_ERROR(ENOTSUP);
619                 if (!parent->execute)
620                         continue;
621                 PARA_INFO_LOG("parent: %s, cmd: %s\n", parent->name, command);
622                 ret = parent->execute(parent, command, value_result);
623                 if (ret == -ERRNO_TO_PARA_ERROR(ENOTSUP))
624                         continue;
625                 if (ret < 0)
626                         return ret;
627                 if (value_result && *value_result)
628                         PARA_NOTICE_LOG("%s(%s): %s\n", command, parent->name,
629                                 *value_result);
630                 return 1;
631         }
632         return -ERRNO_TO_PARA_ERROR(ENOTSUP);
633 }
634
635 void *btr_context(struct btr_node *btrn)
636 {
637         return btrn->context;
638 }
639
640 static bool need_buffer_pool_merge(struct btr_node *btrn)
641 {
642         struct btr_buffer_reference *br = get_first_input_br(btrn);
643
644         if (!br)
645                 return false;
646         if (br->wrap_count != 0)
647                 return true;
648         if (br->btrb->pool)
649                 return true;
650         return false;
651 }
652
653 static void merge_input_pool(struct btr_node *btrn, size_t dest_size)
654 {
655         struct btr_buffer_reference *br, *wbr = NULL;
656         int num_refs; /* including wrap buffer */
657         char *buf, *buf1 = NULL, *buf2 = NULL;
658         size_t sz, sz1 = 0, sz2 = 0, wsz;
659
660         br = get_first_input_br(btrn);
661         if (!br || br_available_bytes(br) >= dest_size)
662                 return;
663         num_refs = 0;
664         FOR_EACH_BUFFER_REF(br, btrn) {
665                 num_refs++;
666                 sz = btr_get_buffer_by_reference(br, &buf);
667                 if (sz == 0)
668                         break;
669                 if (br->wrap_count != 0) {
670                         assert(!wbr);
671                         assert(num_refs == 1);
672                         wbr = br;
673                         if (sz >= dest_size)
674                                 return;
675                         continue;
676                 }
677                 if (!buf1) {
678                         buf1 = buf;
679                         sz1 = sz;
680                         goto next;
681                 }
682                 if (buf1 + sz1 == buf) {
683                         sz1 += sz;
684                         goto next;
685                 }
686                 if (!buf2) {
687                         buf2 = buf;
688                         sz2 = sz;
689                         goto next;
690                 }
691                 assert(buf2 + sz2 == buf);
692                 sz2 += sz;
693 next:
694                 if (sz1 + sz2 >= dest_size)
695                         break;
696         }
697         if (!buf2) /* nothing to do */
698                 return;
699         assert(buf1 && sz2 > 0);
700         /*
701          * If the second buffer is large, we only take the first part of it to
702          * avoid having to memcpy() huge buffers.
703          */
704         sz2 = PARA_MIN(sz2, (size_t)(64 * 1024));
705         if (!wbr) {
706                 /* Make a new wrap buffer combining buf1 and buf2. */
707                 sz = sz1 + sz2;
708                 buf = para_malloc(sz);
709                 PARA_DEBUG_LOG("merging input buffers: (%p:%zu, %p:%zu) -> %p:%zu\n",
710                         buf1, sz1, buf2, sz2, buf, sz);
711                 memcpy(buf, buf1, sz1);
712                 memcpy(buf + sz1, buf2, sz2);
713                 br = para_calloc(sizeof(*br));
714                 br->btrb = new_btrb(buf, sz);
715                 br->btrb->refcount = 1;
716                 br->consumed = 0;
717                 /* This is a wrap buffer */
718                 br->wrap_count = sz1;
719                 para_list_add(&br->node, &btrn->input_queue);
720                 return;
721         }
722         PARA_DEBUG_LOG("increasing wrap buffer, sz1: %zu, sz2: %zu\n", sz1, sz2);
723         /*
724          * We already have a wrap buffer, but it is too small. It might be
725          * partially used.
726          */
727         wsz = br_available_bytes(wbr);
728         if (wbr->wrap_count == sz1 && wbr->btrb->size >= sz1 + sz2) /* nothing we can do about it */
729                 return;
730         sz = sz1 + sz2 - wbr->btrb->size; /* amount of new data */
731         wbr->btrb->size += sz;
732         wbr->btrb->buf = para_realloc(wbr->btrb->buf, wbr->btrb->size);
733         /* copy the new data to the end of the reallocated buffer */
734         assert(sz2 >= sz);
735         memcpy(wbr->btrb->buf + wbr->btrb->size - sz, buf2 + sz2 - sz, sz);
736 }
737
738 /**
739  * Merge the first two input buffers into one.
740  *
741  * This is a quite expensive operation.
742  *
743  * \return The number of buffers that have been available (zero, one or two).
744  */
745 static int merge_input(struct btr_node *btrn)
746 {
747         struct btr_buffer_reference *brs[2], *br;
748         char *bufs[2], *buf;
749         size_t szs[2], sz;
750         int i;
751
752         if (list_empty(&btrn->input_queue))
753                 return 0;
754         if (list_is_singular(&btrn->input_queue))
755                 return 1;
756         i = 0;
757         /* get references to the first two buffers */
758         FOR_EACH_BUFFER_REF(br, btrn) {
759                 brs[i] = br;
760                 szs[i] = btr_get_buffer_by_reference(brs[i], bufs + i);
761                 i++;
762                 if (i == 2)
763                         break;
764         }
765         /* make a new btrb that combines the two buffers and a br to it. */
766         sz = szs[0] + szs[1];
767         buf = para_malloc(sz);
768         PARA_DEBUG_LOG("%s: memory merging input buffers: (%zu, %zu) -> %zu\n",
769                 btrn->name, szs[0], szs[1], sz);
770         memcpy(buf, bufs[0], szs[0]);
771         memcpy(buf + szs[0], bufs[1], szs[1]);
772
773         br = para_calloc(sizeof(*br));
774         br->btrb = new_btrb(buf, sz);
775         br->btrb->refcount = 1;
776
777         /* replace the first two refs by the new one */
778         btr_drop_buffer_reference(brs[0]);
779         btr_drop_buffer_reference(brs[1]);
780         para_list_add(&br->node, &btrn->input_queue);
781         return 2;
782 }
783
784 void btr_merge(struct btr_node *btrn, size_t dest_size)
785 {
786         if (need_buffer_pool_merge(btrn))
787                 return merge_input_pool(btrn, dest_size);
788         for (;;) {
789                 char *buf;
790                 size_t len = btr_next_buffer(btrn, &buf);
791                 if (len >= dest_size)
792                         return;
793                 PARA_DEBUG_LOG("input size = %zu < %zu = dest\n", len, dest_size);
794                 if (merge_input(btrn) < 2)
795                         return;
796         }
797 }
798
799 bool btr_eof(struct btr_node *btrn)
800 {
801         char *buf;
802         size_t len = btr_next_buffer(btrn, &buf);
803
804         return (len == 0 && btr_no_parent(btrn));
805 }
806
807 void log_tree_recursively(struct btr_node *btrn, int loglevel, int depth)
808 {
809         struct btr_node *ch;
810         const char spaces[] = "                 ", *space = spaces + 16 - depth;
811
812         if (depth > 16)
813                 return;
814         para_log(loglevel, "%s%s\n", space, btrn->name);
815         FOR_EACH_CHILD(ch, btrn)
816                 log_tree_recursively(ch, loglevel, depth + 1);
817 }
818
819 void btr_log_tree(struct btr_node *btrn, int loglevel)
820 {
821         return log_tree_recursively(btrn, loglevel, 0);
822 }
823
824 /*
825  * \return \a root if \a name is \p NULL.
826  */
827 struct btr_node *btr_search_node(const char *name, struct btr_node *root)
828 {
829         struct btr_node *ch;
830
831         if (!name)
832                 return root;
833         if (!strcmp(root->name, name))
834                 return root;
835         FOR_EACH_CHILD(ch, root) {
836                 struct btr_node *result = btr_search_node(name, ch);
837                 if (result)
838                         return result;
839         }
840         return NULL;
841 }
842
843 /** 640K ought to be enough for everybody ;) */
844 #define BTRN_MAX_PENDING (640 * 1024)
845
846 int btr_node_status(struct btr_node *btrn, size_t min_iqs,
847         enum btr_node_type type)
848 {
849         size_t iqs;
850
851         assert(btrn);
852         if (type != BTR_NT_LEAF) {
853                 if (btr_no_children(btrn))
854                         return -E_BTR_NO_CHILD;
855                 if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
856                         return 0;
857         }
858         if (type != BTR_NT_ROOT) {
859                 if (btr_eof(btrn))
860                         return -E_BTR_EOF;
861                 iqs = btr_get_input_queue_size(btrn);
862                 if (iqs == 0) /* we have a parent, because not eof */
863                         return 0;
864                 if (iqs < min_iqs && !btr_no_parent(btrn))
865                         return 0;
866         }
867         return 1;
868 }
869
870 void btr_get_node_start(struct btr_node *btrn, struct timeval *tv)
871 {
872         *tv = btrn->start;
873 }