]> git.tuebingen.mpg.de Git - paraslash.git/blob - buffer_tree.c
201fb0b0e21ac4556b12968b3e22cde23fd5c302
[paraslash.git] / buffer_tree.c
1 #include <regex.h>
2 #include <stdbool.h>
3
4 #include "para.h"
5 #include "list.h"
6 #include "string.h"
7 #include "buffer_tree.h"
8 #include "error.h"
9 #include "sched.h"
10
11 /* whead = NULL means area full */
12 struct btr_pool {
13         char *name;
14         char *area_start;
15         char *area_end;
16         char *rhead;
17         char *whead;
18 };
19
20 struct btr_buffer {
21         char *buf;
22         size_t size;
23         /** The number of references to this buffer. */
24         int refcount;
25         /* NULL means no buffer pool but a malloced buffer. */
26         struct btr_pool *pool;
27 };
28
29 struct btr_buffer_reference {
30         struct btr_buffer *btrb;
31         size_t consumed;
32         /* Each buffer reference belongs to the buffer queue list of some buffer tree node. */
33         struct list_head node;
34         size_t wrap_count;
35 };
36
37 struct btr_node {
38         char *name;
39         struct btr_node *parent;
40         /* The position of this btr node in the buffer tree. */
41         struct list_head node;
42         /* The children nodes of this btr node are linked together in a list. */
43         struct list_head children;
44         /* Time of first data transfer. */
45         struct timeval start;
46         /**
47          * The input queue is a list of references to btr buffers. Each item on
48          * the list represents an input buffer which has not been completely
49          * used by this btr node.
50          */
51         struct list_head input_queue;
52         btr_command_handler execute;
53         void *context;
54 };
55
56 /**
57  * Create a new buffer pool.
58  *
59  * \param name The name of the new buffer pool.
60  * \param area_size The size in bytes of the pool area.
61  *
62  * \return An opaque pointer to the newly created buffer pool. It must be
63  * passed to btr_pool_free() after it is no longer used to deallocate all
64  * resources.
65  */
66 struct btr_pool *btr_pool_new(const char *name, size_t area_size)
67 {
68         struct btr_pool *btrp;
69
70         PARA_INFO_LOG("%s, %zu bytes\n", name, area_size);
71         btrp = para_malloc(sizeof(*btrp));
72         btrp->area_start = para_malloc(area_size);
73         btrp->area_end = btrp->area_start + area_size;
74         btrp->rhead = btrp->area_start;
75         btrp->whead = btrp->area_start;
76         btrp->name = para_strdup(name);
77         return btrp;
78 }
79
80 /**
81  * Deallocate resources used by a buffer pool.
82  *
83  * \param btrp A pointer obtained via btr_pool_new().
84  */
85 void btr_pool_free(struct btr_pool *btrp)
86 {
87         if (!btrp)
88                 return;
89         free(btrp->area_start);
90         free(btrp->name);
91         free(btrp);
92 }
93
94 /**
95  * Return the size of the buffer pool area.
96  *
97  * \param btrp The buffer pool.
98  *
99  * \return The same value which was passed during creation time to
100  * btr_pool_new().
101  */
102 size_t btr_pool_size(struct btr_pool *btrp)
103 {
104         return btrp->area_end - btrp->area_start;
105 }
106
107 size_t btr_pool_filled(struct btr_pool *btrp)
108 {
109         if (!btrp->whead)
110                 return btr_pool_size(btrp);
111         if (btrp->rhead <= btrp->whead)
112                 return  btrp->whead - btrp->rhead;
113         return btr_pool_size(btrp) - (btrp->rhead - btrp->whead);
114 }
115
116 /**
117  * Get the number of unused bytes in the buffer pool.
118  *
119  * \param btrp The pool.
120  *
121  * \return The number of bytes that can currently be allocated.
122  *
123  * Note that in general the returned number of bytes is not available as a
124  * single contiguous buffer. Use btr_pool_available() to obtain the length of
125  * the largest contiguous buffer that can currently be allocated from the
126  * buffer pool.
127  */
128 size_t btr_pool_unused(struct btr_pool *btrp)
129 {
130         return btr_pool_size(btrp) - btr_pool_filled(btrp);
131 }
132
133 /*
134  * Return maximal size available for one read. This is
135  * smaller than the value returned by btr_pool_unused().
136  */
137 size_t btr_pool_available(struct btr_pool *btrp)
138 {
139         if (!btrp->whead)
140                 return 0;
141         if (btrp->rhead <= btrp->whead)
142                 return btrp->area_end - btrp->whead;
143         return btrp->rhead - btrp->whead;
144 }
145
146 /**
147  * Obtain the current write head.
148  *
149  * \param btrp The buffer pool.
150  * \param result The write head is returned here.
151  *
152  * \return The maximal amount of bytes that may be written to the returned
153  * buffer.
154  */
155 size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result)
156 {
157         if (result)
158                 *result = btrp->whead;
159         return btr_pool_available(btrp);
160 }
161
162 /**
163  * Mark a part of the buffer pool area as allocated.
164  *
165  * \param btrp The buffer pool.
166  * \param size The amount of bytes to be allocated.
167  *
168  * This is usually called after the caller wrote to the buffer obtained by
169  * btr_pool_get_buffer().
170  */
171 static void btr_pool_allocate(struct btr_pool *btrp, size_t size)
172 {
173         char *end;
174
175         if (size == 0)
176                 return;
177         assert(size <= btr_pool_available(btrp));
178         end = btrp->whead + size;
179         assert(end <= btrp->area_end);
180
181         if (end == btrp->area_end) {
182                 PARA_DEBUG_LOG("%s: end of pool area reached\n", btrp->name);
183                 end = btrp->area_start;
184         }
185         if (end == btrp->rhead) {
186                 PARA_DEBUG_LOG("%s btrp buffer full\n", btrp->name);
187                 end = NULL; /* buffer full */
188         }
189         btrp->whead = end;
190 }
191
192 static void btr_pool_deallocate(struct btr_pool *btrp, size_t size)
193 {
194         char *end = btrp->rhead + size;
195
196         if (size == 0)
197                 return;
198         assert(end <= btrp->area_end);
199         assert(size <= btr_pool_filled(btrp));
200         if (end == btrp->area_end)
201                 end = btrp->area_start;
202         if (!btrp->whead)
203                 btrp->whead = btrp->rhead;
204         btrp->rhead = end;
205         if (btrp->rhead == btrp->whead)
206                 btrp->rhead = btrp->whead = btrp->area_start;
207 }
208
209 #define FOR_EACH_CHILD(_tn, _btrn) list_for_each_entry((_tn), \
210         &((_btrn)->children), node)
211 #define FOR_EACH_CHILD_SAFE(_tn, _tmp, _btrn) \
212         list_for_each_entry_safe((_tn), (_tmp), &((_btrn)->children), node)
213
214 #define FOR_EACH_BUFFER_REF(_br, _btrn) \
215         list_for_each_entry((_br), &(_btrn)->input_queue, node)
216 #define FOR_EACH_BUFFER_REF_SAFE(_br, _tmp, _btrn) \
217         list_for_each_entry_safe((_br), (_tmp), &(_btrn)->input_queue, node)
218
219 /**
220  * Create a new buffer tree node.
221  *
222  * \param bnd Specifies how to create the new node.
223  *
224  * This function always succeeds (or calls exit()). The returned pointer must
225  * be freed using btr_free_node() after the node has been removed from the
226  * buffer tree via btr_remove_node().
227  */
228 struct btr_node *btr_new_node(struct btr_node_description *bnd)
229 {
230         struct btr_node *btrn = para_malloc(sizeof(*btrn));
231
232         btrn->name = para_strdup(bnd->name);
233         btrn->parent = bnd->parent;
234         btrn->execute = bnd->handler;
235         btrn->context = bnd->context;
236         btrn->start.tv_sec = 0;
237         btrn->start.tv_usec = 0;
238         INIT_LIST_HEAD(&btrn->children);
239         INIT_LIST_HEAD(&btrn->input_queue);
240         if (!bnd->child) {
241                 if (bnd->parent) {
242                         list_add_tail(&btrn->node, &bnd->parent->children);
243                         PARA_INFO_LOG("new leaf node: %s (child of %s)\n",
244                                 bnd->name, bnd->parent->name);
245                 } else
246                         PARA_INFO_LOG("added %s as btr root\n", bnd->name);
247                 goto out;
248         }
249         if (!bnd->parent) {
250                 assert(!bnd->child->parent);
251                 PARA_INFO_LOG("new root: %s (was %s)\n",
252                         bnd->name, bnd->child->name);
253                 btrn->parent = NULL;
254                 list_add_tail(&bnd->child->node, &btrn->children);
255                 /* link it in */
256                 bnd->child->parent = btrn;
257                 goto out;
258         }
259         PARA_EMERG_LOG("inserting internal nodes not yet supported.\n");
260         exit(EXIT_FAILURE);
261         assert(bnd->child->parent == bnd->parent);
262 out:
263         return btrn;
264 }
265
266 /*
267  * Allocate a new btr buffer.
268  *
269  * The freshly allocated buffer will have a zero refcount and will
270  * not be associated with a btr pool.
271  */
272 static struct btr_buffer *new_btrb(char *buf, size_t size)
273 {
274         struct btr_buffer *btrb = para_calloc(sizeof(*btrb));
275
276         btrb->buf = buf;
277         btrb->size = size;
278         return btrb;
279 }
280
281 static void dealloc_buffer(struct btr_buffer *btrb)
282 {
283         if (btrb->pool)
284                 btr_pool_deallocate(btrb->pool, btrb->size);
285         else
286                 free(btrb->buf);
287 }
288
289 static struct btr_buffer_reference *get_first_input_br(struct btr_node *btrn)
290 {
291         if (list_empty(&btrn->input_queue))
292                 return NULL;
293         return list_first_entry(&btrn->input_queue,
294                 struct btr_buffer_reference, node);
295 }
296
297 /*
298  * Deallocate the reference, release the resources if refcount drops to zero.
299  */
300 static void btr_drop_buffer_reference(struct btr_buffer_reference *br)
301 {
302         struct btr_buffer *btrb = br->btrb;
303
304         list_del(&br->node);
305         free(br);
306         btrb->refcount--;
307         if (btrb->refcount == 0) {
308                 dealloc_buffer(btrb);
309                 free(btrb);
310         }
311 }
312
313 static void add_btrb_to_children(struct btr_buffer *btrb,
314                 struct btr_node *btrn, size_t consumed)
315 {
316         struct btr_node *ch;
317
318         if (btrn->start.tv_sec == 0)
319                 btrn->start = *now;
320         FOR_EACH_CHILD(ch, btrn) {
321                 struct btr_buffer_reference *br = para_calloc(sizeof(*br));
322                 br->btrb = btrb;
323                 br->consumed = consumed;
324                 list_add_tail(&br->node, &ch->input_queue);
325                 btrb->refcount++;
326                 if (ch->start.tv_sec == 0)
327                         ch->start = *now;
328         }
329 }
330
331 /**
332  * Insert a malloced buffer into the buffer tree.
333  *
334  * \param buf The buffer to insert.
335  * \param size The size of \a buf in bytes.
336  * \param btrn Position in the buffer tree to create the output.
337  *
338  * This creates references to \a buf and adds these references to each child of
339  * \a btrn. The buffer will be freed using standard free() once no buffer tree
340  * node is referencing it any more.
341  *
342  * Note that this function must not be used if \a buf was obtained from a
343  * buffer pool. Use btr_add_output_pool() in this case.
344  */
345 void btr_add_output(char *buf, size_t size, struct btr_node *btrn)
346 {
347         struct btr_buffer *btrb;
348
349         assert(size != 0);
350         if (list_empty(&btrn->children)) {
351                 free(buf);
352                 return;
353         }
354         btrb = new_btrb(buf, size);
355         add_btrb_to_children(btrb, btrn, 0);
356 }
357
358 /**
359  * Feed data to child nodes of a buffer tree node.
360  *
361  * \param btrp The buffer pool.
362  * \param size The number of bytes to be allocated and fed to each child.
363  * \param btrn The node whose children are to be fed.
364  *
365  * This function allocates the amount of bytes from the buffer pool area,
366  * starting at the current value of the write head, and creates buffer
367  * references to the resulting part of the buffer pool area, one for each child
368  * of \a btrn. The references are then fed into the input queue of each child.
369  */
370 void btr_add_output_pool(struct btr_pool *btrp, size_t size,
371                 struct btr_node *btrn)
372 {
373         struct btr_buffer *btrb;
374         char *buf;
375         size_t avail;
376
377         assert(size != 0);
378         if (list_empty(&btrn->children))
379                 return;
380         avail = btr_pool_get_buffer(btrp, &buf);
381         assert(avail >= size);
382         btr_pool_allocate(btrp, size);
383         btrb = new_btrb(buf, size);
384         btrb->pool = btrp;
385         add_btrb_to_children(btrb, btrn, 0);
386 }
387
388 /**
389  * Copy data to write head of a buffer pool and feed it to all children nodes.
390  *
391  * \param src The source buffer.
392  * \param n The size of the source buffer in bytes.
393  * \param btrp The destination buffer pool.
394  * \param btrn Add the data as output of this node.
395  *
396  * This is expensive. The caller must make sure the data fits into the buffer
397  * pool area.
398  */
399 void btr_copy(const void *src, size_t n, struct btr_pool *btrp,
400         struct btr_node *btrn)
401 {
402         char *buf;
403         size_t sz, copy;
404
405         if (n == 0)
406                 return;
407         assert(n <= btr_pool_unused(btrp));
408         sz = btr_pool_get_buffer(btrp, &buf);
409         copy = PARA_MIN(sz, n);
410         memcpy(buf, src, copy);
411         btr_add_output_pool(btrp, copy, btrn);
412         if (copy == n)
413                 return;
414         sz = btr_pool_get_buffer(btrp, &buf);
415         assert(sz >= n - copy);
416         memcpy(buf, src + copy, n - copy);
417         btr_add_output_pool(btrp, n - copy, btrn);
418 }
419
420 static void btr_pushdown_br(struct btr_buffer_reference *br, struct btr_node *btrn)
421 {
422         add_btrb_to_children(br->btrb, btrn, br->consumed);
423         btr_drop_buffer_reference(br);
424 }
425
426 /**
427  * Feed all buffer references of the input queue through the output channel.
428  *
429  * \param btrn The node whose buffer references should be pushed down.
430  *
431  * This function is useful for filters that do not change the contents of the
432  * buffers at all, like the wav filter or the amp filter if no amplification
433  * was specified. This function is rather cheap.
434  */
435 void btr_pushdown(struct btr_node *btrn)
436 {
437         struct btr_buffer_reference *br, *tmp;
438
439         FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
440                 btr_pushdown_br(br, btrn);
441 }
442
443 int btr_pushdown_one(struct btr_node *btrn)
444 {
445         struct btr_buffer_reference *br;
446
447         if (list_empty(&btrn->input_queue))
448                 return 0;
449         br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
450         btr_pushdown_br(br, btrn);
451         return 1;
452 }
453
454 /*
455  * Find out whether a node is a leaf node.
456  *
457  * \param btrn The node to check.
458  *
459  * \return True if this node has no children. False otherwise.
460  */
461 static bool btr_no_children(struct btr_node *btrn)
462 {
463         return list_empty(&btrn->children);
464 }
465
466 /**
467  * Find out whether a node is an orphan node.
468  *
469  * \param btrn The buffer tree node.
470  *
471  * \return True if \a btrn has no parent.
472  *
473  * This function will always return true for the root node.  However in case
474  * nodes have been removed from the tree, other nodes may become orphans too.
475  */
476 bool btr_no_parent(struct btr_node *btrn)
477 {
478         return !btrn->parent;
479 }
480
481 bool btr_inplace_ok(struct btr_node *btrn)
482 {
483         if (!btrn->parent)
484                 return true;
485         return list_is_singular(&btrn->parent->children);
486 }
487
488 static inline size_t br_available_bytes(struct btr_buffer_reference *br)
489 {
490         return br->btrb->size - br->consumed;
491 }
492
493 size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
494 {
495         if (buf)
496                 *buf = br->btrb->buf + br->consumed;
497         return br_available_bytes(br);
498 }
499
500 /**
501  * Obtain the next buffer of the input queue of a buffer tree node.
502  *
503  * \param btrn The node whose input queue is to be queried.
504  * \param bufp Result pointer.
505  *
506  * \return The number of bytes that can be read from buf. Zero if the input
507  * buffer queue is empty. In this case the value of \a bufp is undefined.
508  */
509 size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
510 {
511         struct btr_buffer_reference *br;
512         char *buf, *result = NULL;
513         size_t sz, rv = 0;
514
515         FOR_EACH_BUFFER_REF(br, btrn) {
516                 sz = btr_get_buffer_by_reference(br, &buf);
517                 if (!result) {
518                         result = buf;
519                         rv = sz;
520                         if (!br->btrb->pool)
521                                 break;
522                         continue;
523                 }
524                 if (!br->btrb->pool)
525                         break;
526                 if (result + rv != buf)
527                         break;
528                 rv += sz;
529         }
530         if (bufp)
531                 *bufp = result;
532         return rv;
533 }
534
535 /**
536  * Deallocate the given number of bytes from the input queue.
537  *
538  * \param btrn The buffer tree node.
539  * \param numbytes The number of bytes to be deallocated.
540  *
541  * This function must be used to get rid of existing buffer references in the
542  * node's input queue. If no references to a buffer remain, the underlying
543  * buffers are either freed (in the non-buffer tree case) or the read head of
544  * the buffer pool is being advanced.
545  *
546  * Note that \a numbytes may be smaller than the buffer size. In this case the
547  * buffer is not deallocated and subsequent calls to btr_next_buffer() return
548  * the remaining part of the buffer.
549  */
550 void btr_consume(struct btr_node *btrn, size_t numbytes)
551 {
552         struct btr_buffer_reference *br, *tmp;
553         size_t sz;
554
555         if (numbytes == 0)
556                 return;
557         br = get_first_input_br(btrn);
558         assert(br);
559
560         if (br->wrap_count == 0) {
561                 /*
562                  * No wrap buffer. Drop buffer references whose buffer
563                  * has been fully used. */
564                 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn) {
565                         if (br->consumed + numbytes <= br->btrb->size) {
566                                 br->consumed += numbytes;
567                                 if (br->consumed == br->btrb->size)
568                                         btr_drop_buffer_reference(br);
569                                 return;
570                         }
571                         numbytes -= br->btrb->size - br->consumed;
572                         btr_drop_buffer_reference(br);
573                 }
574                 assert(true);
575         }
576         /*
577          * We have a wrap buffer, consume from it. If in total, i.e. including
578          * previous calls to brt_consume(), less than wrap_count has been
579          * consumed, there's nothing more we can do.
580          *
581          * Otherwise we drop the wrap buffer and consume from subsequent
582          * buffers of the input queue the correct amount of bytes. This is the
583          * total number of bytes that have been consumed from the wrap buffer.
584          */
585         PARA_DEBUG_LOG("consuming %zu/%zu bytes from wrap buffer\n", numbytes,
586                 br_available_bytes(br));
587
588         assert(numbytes <= br_available_bytes(br));
589         if (br->consumed + numbytes < br->wrap_count) {
590                 br->consumed += numbytes;
591                 return;
592         }
593         PARA_DEBUG_LOG("dropping wrap buffer (%zu bytes)\n", br->btrb->size);
594         /* get rid of the wrap buffer */
595         sz = br->consumed + numbytes;
596         btr_drop_buffer_reference(br);
597         return btr_consume(btrn, sz);
598 }
599
600 static void flush_input_queue(struct btr_node *btrn)
601 {
602         struct btr_buffer_reference *br, *tmp;
603         FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
604                 btr_drop_buffer_reference(br);
605 }
606
607 /**
608  * Free all resources allocated by btr_new_node().
609  *
610  * Like free(3), it is OK to call this with a \p NULL pointer argument.
611  */
612 void btr_free_node(struct btr_node *btrn)
613 {
614         if (!btrn)
615                 return;
616         free(btrn->name);
617         free(btrn);
618 }
619
620 /**
621  * Remove a node from a buffer tree.
622  *
623  * \param btrn The node to remove.
624  *
625  * This makes all child nodes of \a btrn orphans and removes \a btrn from the
626  * list of children of its parent. Moreover, the input queue of \a btrn is
627  * flushed if it is not empty.
628  *
629  * \sa \ref btr_splice_out_node.
630  */
631 void btr_remove_node(struct btr_node *btrn)
632 {
633         struct btr_node *ch;
634
635         if (!btrn)
636                 return;
637         PARA_NOTICE_LOG("removing btr node %s from buffer tree\n", btrn->name);
638         FOR_EACH_CHILD(ch, btrn)
639                 ch->parent = NULL;
640         flush_input_queue(btrn);
641         if (btrn->parent)
642                 list_del(&btrn->node);
643 }
644
645 /**
646  * Return the amount of available input bytes of a buffer tree node.
647  *
648  * \param btrn The node whose input size should be computed.
649  *
650  * \return The total number of bytes available in the node's input
651  * queue.
652  *
653  * This simply iterates over all buffer references in the input queue and
654  * returns the sum of the sizes of all references.
655  */
656 size_t btr_get_input_queue_size(struct btr_node *btrn)
657 {
658         struct btr_buffer_reference *br;
659         size_t size = 0, wrap_consumed = 0;
660
661         FOR_EACH_BUFFER_REF(br, btrn) {
662                 if (br->wrap_count != 0) {
663                         wrap_consumed = br->consumed;
664                         continue;
665                 }
666                 size += br_available_bytes(br);
667         }
668         assert(wrap_consumed <= size);
669         size -= wrap_consumed;
670         return size;
671 }
672
673 /**
674  * Remove a node from the buffer tree, reconnecting parent and children.
675  *
676  * \param btrn The node to splice out.
677  *
678  * This function is used by buffer tree nodes that do not exist during the
679  * whole lifetime of the buffer tree. Unlike btr_remove_node(), calling
680  * btr_splice_out_node() does not split the tree into disconnected components
681  * but reconnects the buffer tree by making all child nodes of \a btrn children
682  * of the parent of \a btrn.
683  */
684 void btr_splice_out_node(struct btr_node *btrn)
685 {
686         struct btr_node *ch, *tmp;
687
688         assert(btrn);
689         PARA_NOTICE_LOG("splicing out %s\n", btrn->name);
690         btr_pushdown(btrn);
691         if (btrn->parent)
692                 list_del(&btrn->node);
693         FOR_EACH_CHILD_SAFE(ch, tmp, btrn) {
694                 PARA_INFO_LOG("parent(%s): %s\n", ch->name,
695                         btrn->parent? btrn->parent->name : "NULL");
696                 ch->parent = btrn->parent;
697                 if (btrn->parent)
698                         list_move(&ch->node, &btrn->parent->children);
699         }
700         assert(list_empty(&btrn->children));
701 }
702
703 /**
704  * Return the size of the largest input queue.
705  *
706  * Iterates over all children of the given node.
707  */
708 static size_t btr_bytes_pending(struct btr_node *btrn)
709 {
710         size_t max_size = 0;
711         struct btr_node *ch;
712
713         FOR_EACH_CHILD(ch, btrn) {
714                 size_t size = btr_get_input_queue_size(ch);
715                 max_size = PARA_MAX(max_size, size);
716         }
717         return max_size;
718 }
719
720 int btr_exec(struct btr_node *btrn, const char *command, char **value_result)
721 {
722         if (!btrn)
723                 return -ERRNO_TO_PARA_ERROR(EINVAL);
724         if (!btrn->execute)
725                 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
726         return btrn->execute(btrn, command, value_result);
727 }
728
729 /**
730  * Execute a inter-node command on a parent node.
731  *
732  * \param btrn The node to start looking.
733  * \param command The command to execute.
734  * \param value_result Additional arguments and result value.
735  *
736  * This function traverses the buffer tree upwards and looks for parent nodes
737  * of \a btrn that understands \a command. On the first such node the command
738  * is executed, and the result is stored in \a value_result.
739  *
740  * \return \p -ENOTSUP if no parent node of \a btrn understands \a command.
741  * Otherwise the return value of the command handler is returned.
742  */
743 int btr_exec_up(struct btr_node *btrn, const char *command, char **value_result)
744 {
745         int ret;
746
747         for (; btrn; btrn = btrn->parent) {
748                 struct btr_node *parent = btrn->parent;
749                 if (!parent)
750                         return -ERRNO_TO_PARA_ERROR(ENOTSUP);
751                 if (!parent->execute)
752                         continue;
753                 PARA_INFO_LOG("parent: %s, cmd: %s\n", parent->name, command);
754                 ret = parent->execute(parent, command, value_result);
755                 if (ret == -ERRNO_TO_PARA_ERROR(ENOTSUP))
756                         continue;
757                 if (ret < 0)
758                         return ret;
759                 if (value_result && *value_result)
760                         PARA_NOTICE_LOG("%s(%s): %s\n", command, parent->name,
761                                 *value_result);
762                 return 1;
763         }
764         return -ERRNO_TO_PARA_ERROR(ENOTSUP);
765 }
766
767 /**
768  * Obtain the context of a buffer node tree.
769  *
770  * The returned pointer equals the context pointer used at creation time of the
771  * node.
772  *
773  * \sa btr_new_node(), struct \ref btr_node_description.
774  */
775 void *btr_context(struct btr_node *btrn)
776 {
777         return btrn->context;
778 }
779
780 static bool need_buffer_pool_merge(struct btr_node *btrn)
781 {
782         struct btr_buffer_reference *br = get_first_input_br(btrn);
783
784         if (!br)
785                 return false;
786         if (br->wrap_count != 0)
787                 return true;
788         if (br->btrb->pool)
789                 return true;
790         return false;
791 }
792
793 static void merge_input_pool(struct btr_node *btrn, size_t dest_size)
794 {
795         struct btr_buffer_reference *br, *wbr = NULL;
796         int num_refs; /* including wrap buffer */
797         char *buf, *buf1 = NULL, *buf2 = NULL;
798         size_t sz, sz1 = 0, sz2 = 0, wsz;
799
800         br = get_first_input_br(btrn);
801         if (!br || br_available_bytes(br) >= dest_size)
802                 return;
803         num_refs = 0;
804         FOR_EACH_BUFFER_REF(br, btrn) {
805                 num_refs++;
806                 sz = btr_get_buffer_by_reference(br, &buf);
807                 if (sz == 0)
808                         break;
809                 if (br->wrap_count != 0) {
810                         assert(!wbr);
811                         assert(num_refs == 1);
812                         wbr = br;
813                         if (sz >= dest_size)
814                                 return;
815                         continue;
816                 }
817                 if (!buf1) {
818                         buf1 = buf;
819                         sz1 = sz;
820                         goto next;
821                 }
822                 if (buf1 + sz1 == buf) {
823                         sz1 += sz;
824                         goto next;
825                 }
826                 if (!buf2) {
827                         buf2 = buf;
828                         sz2 = sz;
829                         goto next;
830                 }
831                 assert(buf2 + sz2 == buf);
832                 sz2 += sz;
833 next:
834                 if (sz1 + sz2 >= dest_size)
835                         break;
836         }
837         if (!buf2) /* nothing to do */
838                 return;
839         assert(buf1 && sz2 > 0);
840         /*
841          * If the second buffer is large, we only take the first part of it to
842          * avoid having to memcpy() huge buffers.
843          */
844         sz2 = PARA_MIN(sz2, (size_t)(64 * 1024));
845         if (!wbr) {
846                 /* Make a new wrap buffer combining buf1 and buf2. */
847                 sz = sz1 + sz2;
848                 buf = para_malloc(sz);
849                 PARA_DEBUG_LOG("merging input buffers: (%p:%zu, %p:%zu) -> %p:%zu\n",
850                         buf1, sz1, buf2, sz2, buf, sz);
851                 memcpy(buf, buf1, sz1);
852                 memcpy(buf + sz1, buf2, sz2);
853                 br = para_calloc(sizeof(*br));
854                 br->btrb = new_btrb(buf, sz);
855                 br->btrb->refcount = 1;
856                 br->consumed = 0;
857                 /* This is a wrap buffer */
858                 br->wrap_count = sz1;
859                 para_list_add(&br->node, &btrn->input_queue);
860                 return;
861         }
862         /*
863          * We already have a wrap buffer, but it is too small. It might be
864          * partially used.
865          */
866         wsz = br_available_bytes(wbr);
867         if (wbr->wrap_count == sz1 && wbr->btrb->size >= sz1 + sz2) /* nothing we can do about it */
868                 return;
869         sz = sz1 + sz2 - wbr->btrb->size; /* amount of new data */
870         PARA_DEBUG_LOG("increasing wrap buffer %zu -> %zu\n", wbr->btrb->size,
871                 wbr->btrb->size + sz);
872         wbr->btrb->size += sz;
873         wbr->btrb->buf = para_realloc(wbr->btrb->buf, wbr->btrb->size);
874         /* copy the new data to the end of the reallocated buffer */
875         assert(sz2 >= sz);
876         memcpy(wbr->btrb->buf + wbr->btrb->size - sz, buf2 + sz2 - sz, sz);
877 }
878
879 /**
880  * Merge the first two input buffers into one.
881  *
882  * This is a quite expensive operation.
883  *
884  * \return The number of buffers that have been available (zero, one or two).
885  */
886 static int merge_input(struct btr_node *btrn)
887 {
888         struct btr_buffer_reference *brs[2], *br;
889         char *bufs[2], *buf;
890         size_t szs[2], sz;
891         int i;
892
893         if (list_empty(&btrn->input_queue))
894                 return 0;
895         if (list_is_singular(&btrn->input_queue))
896                 return 1;
897         i = 0;
898         /* get references to the first two buffers */
899         FOR_EACH_BUFFER_REF(br, btrn) {
900                 brs[i] = br;
901                 szs[i] = btr_get_buffer_by_reference(brs[i], bufs + i);
902                 i++;
903                 if (i == 2)
904                         break;
905         }
906         assert(i == 2);
907         /* make a new btrb that combines the two buffers and a br to it. */
908         sz = szs[0] + szs[1];
909         buf = para_malloc(sz);
910         PARA_DEBUG_LOG("%s: memory merging input buffers: (%zu, %zu) -> %zu\n",
911                 btrn->name, szs[0], szs[1], sz);
912         memcpy(buf, bufs[0], szs[0]);
913         memcpy(buf + szs[0], bufs[1], szs[1]);
914
915         br = para_calloc(sizeof(*br));
916         br->btrb = new_btrb(buf, sz);
917         br->btrb->refcount = 1;
918
919         /* replace the first two refs by the new one */
920         btr_drop_buffer_reference(brs[0]);
921         btr_drop_buffer_reference(brs[1]);
922         para_list_add(&br->node, &btrn->input_queue);
923         return 2;
924 }
925
926 /**
927  * Combine input queue buffers.
928  *
929  * \param btrn The buffer tree node whose input should be merged.
930  * \param dest_size Stop merging if a buffer of at least this size exists.
931  *
932  * Used to combine as many buffers as needed into a single buffer whose size is
933  * at least \a dest_size. This function is rather cheap in case the parent node
934  * uses buffer pools and rather expensive otherwise.
935  *
936  * Note that if less than \a dest_size bytes are available in total, this
937  * function does nothing and subsequent calls to btr_next_buffer() will still
938  * return a buffer size less than \a dest_size.
939  */
940 void btr_merge(struct btr_node *btrn, size_t dest_size)
941 {
942         if (need_buffer_pool_merge(btrn))
943                 return merge_input_pool(btrn, dest_size);
944         for (;;) {
945                 char *buf;
946                 size_t len = btr_next_buffer(btrn, &buf);
947                 if (len >= dest_size)
948                         return;
949                 PARA_DEBUG_LOG("input size = %zu < %zu = dest\n", len, dest_size);
950                 if (merge_input(btrn) < 2)
951                         return;
952         }
953 }
954
955 bool btr_eof(struct btr_node *btrn)
956 {
957         char *buf;
958         size_t len = btr_next_buffer(btrn, &buf);
959
960         return (len == 0 && btr_no_parent(btrn));
961 }
962
963 void log_tree_recursively(struct btr_node *btrn, int loglevel, int depth)
964 {
965         struct btr_node *ch;
966         const char spaces[] = "                 ", *space = spaces + 16 - depth;
967
968         if (depth > 16)
969                 return;
970         para_log(loglevel, "%s%s\n", space, btrn->name);
971         FOR_EACH_CHILD(ch, btrn)
972                 log_tree_recursively(ch, loglevel, depth + 1);
973 }
974
975 void btr_log_tree(struct btr_node *btrn, int loglevel)
976 {
977         return log_tree_recursively(btrn, loglevel, 0);
978 }
979
980 /*
981  * \return \a root if \a name is \p NULL.
982  */
983 struct btr_node *btr_search_node(const char *name, struct btr_node *root)
984 {
985         struct btr_node *ch;
986
987         if (!name)
988                 return root;
989         if (!strcmp(root->name, name))
990                 return root;
991         FOR_EACH_CHILD(ch, root) {
992                 struct btr_node *result = btr_search_node(name, ch);
993                 if (result)
994                         return result;
995         }
996         return NULL;
997 }
998
999 /** 640K ought to be enough for everybody ;) */
1000 #define BTRN_MAX_PENDING (640 * 1024)
1001
1002 int btr_node_status(struct btr_node *btrn, size_t min_iqs,
1003         enum btr_node_type type)
1004 {
1005         size_t iqs;
1006
1007         assert(btrn);
1008         if (type != BTR_NT_LEAF) {
1009                 if (btr_no_children(btrn))
1010                         return -E_BTR_NO_CHILD;
1011                 if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
1012                         return 0;
1013         }
1014         if (type != BTR_NT_ROOT) {
1015                 if (btr_eof(btrn))
1016                         return -E_BTR_EOF;
1017                 iqs = btr_get_input_queue_size(btrn);
1018                 if (iqs == 0) /* we have a parent, because not eof */
1019                         return 0;
1020                 if (iqs < min_iqs && !btr_no_parent(btrn))
1021                         return 0;
1022         }
1023         return 1;
1024 }
1025
1026 void btr_get_node_start(struct btr_node *btrn, struct timeval *tv)
1027 {
1028         *tv = btrn->start;
1029 }