mp3dec: Don't check for bad data delay on errors.
[paraslash.git] / buffer_tree.c
1 #include <regex.h>
2 #include <stdbool.h>
3
4 #include "para.h"
5 #include "list.h"
6 #include "string.h"
7 #include "buffer_tree.h"
8 #include "error.h"
9 #include "sched.h"
10
11 struct btr_pool {
12         char *name;
13         char *area_start;
14         char *area_end;
15         char *rhead;
16         char *whead;
17 };
18
19 enum btr_buffer_flags {
20         /* changes the way the buffer is deallocated */
21         BTR_BF_BTR_POOL = 1,
22 };
23
24 struct btr_buffer {
25         char *buf;
26         size_t size;
27         /** The number of references to this buffer. */
28         int refcount;
29         struct btr_pool *pool;
30 };
31
32 struct btr_buffer_reference {
33         struct btr_buffer *btrb;
34         size_t consumed;
35         /* Each buffer reference belongs to the buffer queue list of some buffer tree node. */
36         struct list_head node;
37         size_t wrap_count;
38 };
39
40 struct btr_node {
41         char *name;
42         struct btr_node *parent;
43         /* The position of this btr node in the buffer tree. */
44         struct list_head node;
45         /* The children nodes of this btr node are linked together in a list. */
46         struct list_head children;
47         /* Time of first data transfer. */
48         struct timeval start;
49         /**
50          * The input queue is a list of references to btr buffers. Each item on
51          * the list represents an input buffer which has not been completely
52          * used by this btr node.
53          */
54         struct list_head input_queue;
55         btr_command_handler execute;
56         void *context;
57 };
58
59 struct btr_pool *btr_pool_new(const char *name, size_t area_size)
60 {
61         struct btr_pool *btrp;
62
63         PARA_INFO_LOG("%s, %zu bytes\n", name, area_size);
64         btrp = para_malloc(sizeof(*btrp));
65         btrp->area_start = para_malloc(area_size);
66         btrp->area_end = btrp->area_start + area_size;
67         btrp->rhead = btrp->area_start;
68         btrp->whead = btrp->area_start;
69         btrp->name = para_strdup(name);
70         return btrp;
71 }
72
73 /* whead = NULL means area full */
74
75 void btr_pool_free(struct btr_pool *btrp)
76 {
77         if (!btrp)
78                 return;
79         free(btrp->area_start);
80         free(btrp->name);
81         free(btrp);
82 }
83
84 size_t btr_pool_size(struct btr_pool *btrp)
85 {
86         return btrp->area_end - btrp->area_start;
87 }
88
89 size_t btr_pool_filled(struct btr_pool *btrp)
90 {
91         if (!btrp->whead)
92                 return btr_pool_size(btrp);
93         if (btrp->rhead <= btrp->whead)
94                 return  btrp->whead - btrp->rhead;
95         return btr_pool_size(btrp) - (btrp->rhead - btrp->whead);
96 }
97
98 size_t btr_pool_unused(struct btr_pool *btrp)
99 {
100         return btr_pool_size(btrp) - btr_pool_filled(btrp);
101 }
102
103 size_t btr_pool_available(struct btr_pool *btrp)
104 {
105         if (!btrp->whead)
106                 return 0;
107         if (btrp->rhead <= btrp->whead)
108                 return btrp->area_end - btrp->whead;
109         return btrp->rhead - btrp->whead;
110 }
111
112 size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result)
113 {
114         if (result)
115                 *result = btrp->whead;
116         return btr_pool_available(btrp);
117 }
118
119 void btr_pool_allocate(struct btr_pool *btrp, size_t size)
120 {
121         char *end;
122
123         if (size == 0)
124                 return;
125         //PARA_CRIT_LOG("filled: %zu, alloc %zu\n", btr_pool_filled(btrp), size);
126         assert(size <= btr_pool_available(btrp));
127         end = btrp->whead + size;
128         assert(end <= btrp->area_end);
129
130         if (end == btrp->area_end) {
131                 PARA_DEBUG_LOG("%s: end of pool area reached\n", btrp->name);
132                 end = btrp->area_start;
133         }
134         if (end == btrp->rhead) {
135                 PARA_DEBUG_LOG("btrp buffer full\n");
136                 end = NULL; /* buffer full */
137         }
138         btrp->whead = end;
139         //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
140 }
141
142 static void btr_pool_deallocate(struct btr_pool *btrp, size_t size)
143 {
144         char *end = btrp->rhead + size;
145
146         //PARA_CRIT_LOG("filled: %zu, dealloc %zu\n", btr_pool_filled(btrp), size);
147         if (size == 0)
148                 return;
149         assert(end <= btrp->area_end);
150         assert(size <= btr_pool_filled(btrp));
151         if (end == btrp->area_end)
152                 end = btrp->area_start;
153         if (!btrp->whead)
154                 btrp->whead = btrp->rhead;
155         btrp->rhead = end;
156         if (btrp->rhead == btrp->whead)
157                 btrp->rhead = btrp->whead = btrp->area_start;
158         //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
159 }
160
161 #define FOR_EACH_CHILD(_tn, _btrn) list_for_each_entry((_tn), \
162         &((_btrn)->children), node)
163 #define FOR_EACH_CHILD_SAFE(_tn, _tmp, _btrn) \
164         list_for_each_entry_safe((_tn), (_tmp), &((_btrn)->children), node)
165
166 #define FOR_EACH_BUFFER_REF(_br, _btrn) \
167         list_for_each_entry((_br), &(_btrn)->input_queue, node)
168 #define FOR_EACH_BUFFER_REF_SAFE(_br, _tmp, _btrn) \
169         list_for_each_entry_safe((_br), (_tmp), &(_btrn)->input_queue, node)
170
171 struct btr_node *btr_new_node(const char *name, struct btr_node *parent,
172                 btr_command_handler handler, void *context)
173 {
174         struct btr_node *btrn = para_malloc(sizeof(*btrn));
175
176         btrn->name = para_strdup(name);
177         btrn->parent = parent;
178         btrn->execute = handler;
179         btrn->context = context;
180         btrn->start.tv_sec = 0;
181         btrn->start.tv_usec = 0;
182         if (parent)
183                 list_add_tail(&btrn->node, &parent->children);
184         INIT_LIST_HEAD(&btrn->children);
185         INIT_LIST_HEAD(&btrn->input_queue);
186         if (parent)
187                 PARA_INFO_LOG("added %s as child of %s\n", name, parent->name);
188         else
189                 PARA_INFO_LOG("added %s as btr root\n", name);
190         return btrn;
191 }
192
193 /*
194  * Allocate a new btr buffer.
195  *
196  * The freshly allocated buffer will have a zero refcount and will
197  * not be associated with a btr pool.
198  */
199 static struct btr_buffer *new_btrb(char *buf, size_t size)
200 {
201         struct btr_buffer *btrb = para_calloc(sizeof(*btrb));
202
203         btrb->buf = buf;
204         btrb->size = size;
205         return btrb;
206 }
207
208 static void dealloc_buffer(struct btr_buffer *btrb)
209 {
210         if (btrb->pool)
211                 btr_pool_deallocate(btrb->pool, btrb->size);
212         else
213                 free(btrb->buf);
214 }
215
216 static struct btr_buffer_reference *get_first_input_br(struct btr_node *btrn)
217 {
218         if (list_empty(&btrn->input_queue))
219                 return NULL;
220         return list_first_entry(&btrn->input_queue,
221                 struct btr_buffer_reference, node);
222 }
223
224 /*
225  * Deallocate the reference, release the resources if refcount drops to zero.
226  */
227 static void btr_drop_buffer_reference(struct btr_buffer_reference *br)
228 {
229         struct btr_buffer *btrb = br->btrb;
230
231         //PARA_CRIT_LOG("dropping buffer reference %p\n", br);
232         list_del(&br->node);
233         free(br);
234         btrb->refcount--;
235         if (btrb->refcount == 0) {
236                 dealloc_buffer(btrb);
237                 free(btrb);
238         }
239 }
240
241 static void add_btrb_to_children(struct btr_buffer *btrb,
242                 struct btr_node *btrn, size_t consumed)
243 {
244         struct btr_node *ch;
245
246         if (btrn->start.tv_sec == 0)
247                 btrn->start = *now;
248         FOR_EACH_CHILD(ch, btrn) {
249                 struct btr_buffer_reference *br = para_calloc(sizeof(*br));
250                 br->btrb = btrb;
251                 br->consumed = consumed;
252                 list_add_tail(&br->node, &ch->input_queue);
253                 btrb->refcount++;
254                 if (ch->start.tv_sec == 0)
255                         ch->start = *now;
256         }
257 }
258
259 void btr_add_output(char *buf, size_t size, struct btr_node *btrn)
260 {
261         struct btr_buffer *btrb;
262
263         assert(size != 0);
264         if (list_empty(&btrn->children)) {
265                 free(buf);
266                 return;
267         }
268         btrb = new_btrb(buf, size);
269         add_btrb_to_children(btrb, btrn, 0);
270 }
271
272 void btr_add_output_pool(struct btr_pool *btrp, size_t size,
273                 struct btr_node *btrn)
274 {
275         struct btr_buffer *btrb;
276         char *buf;
277         size_t avail;
278
279         assert(size != 0);
280         if (list_empty(&btrn->children))
281                 return;
282         avail = btr_pool_get_buffer(btrp, &buf);
283         assert(avail >= size);
284         btr_pool_allocate(btrp, size);
285         btrb = new_btrb(buf, size);
286         btrb->pool = btrp;
287         add_btrb_to_children(btrb, btrn, 0);
288 }
289
290 void btr_copy(const void *src, size_t n, struct btr_pool *btrp,
291         struct btr_node *btrn)
292 {
293         char *buf;
294         size_t sz, copy;
295
296         if (n == 0)
297                 return;
298         assert(n <= btr_pool_unused(btrp));
299         sz = btr_pool_get_buffer(btrp, &buf);
300         copy = PARA_MIN(sz, n);
301         memcpy(buf, src, copy);
302         btr_add_output_pool(btrp, copy, btrn);
303         if (copy == n)
304                 return;
305         sz = btr_pool_get_buffer(btrp, &buf);
306         assert(sz >= n - copy);
307         memcpy(buf, src + copy, n - copy);
308         btr_add_output_pool(btrp, n - copy, btrn);
309 }
310
311 static void btr_pushdown_br(struct btr_buffer_reference *br, struct btr_node *btrn)
312 {
313         add_btrb_to_children(br->btrb, btrn, br->consumed);
314         btr_drop_buffer_reference(br);
315 }
316
317 void btr_pushdown(struct btr_node *btrn)
318 {
319         struct btr_buffer_reference *br, *tmp;
320
321         FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
322                 btr_pushdown_br(br, btrn);
323 }
324
325 int btr_pushdown_one(struct btr_node *btrn)
326 {
327         struct btr_buffer_reference *br;
328
329         if (list_empty(&btrn->input_queue))
330                 return 0;
331         br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
332         btr_pushdown_br(br, btrn);
333         return 1;
334 }
335
336 /* Return true if this node has no children. */
337 bool btr_no_children(struct btr_node *btrn)
338 {
339         return list_empty(&btrn->children);
340 }
341
342 bool btr_no_parent(struct btr_node *btrn)
343 {
344         return !btrn->parent;
345 }
346
347 bool btr_inplace_ok(struct btr_node *btrn)
348 {
349         if (!btrn->parent)
350                 return true;
351         return list_is_singular(&btrn->parent->children);
352 }
353
354 static inline size_t br_available_bytes(struct btr_buffer_reference *br)
355 {
356         return br->btrb->size - br->consumed;
357 }
358
359 size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
360 {
361         if (buf)
362                 *buf = br->btrb->buf + br->consumed;
363         return br_available_bytes(br);
364 }
365
366 /**
367  * \return zero if the input buffer queue is empty.
368  */
369 size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
370 {
371         struct btr_buffer_reference *br;
372         char *buf, *result = NULL;
373         size_t sz, rv = 0;
374
375         FOR_EACH_BUFFER_REF(br, btrn) {
376                 sz = btr_get_buffer_by_reference(br, &buf);
377                 if (!result) {
378                         result = buf;
379                         rv = sz;
380                         if (!br->btrb->pool)
381                                 break;
382                         continue;
383                 }
384                 if (!br->btrb->pool)
385                         break;
386                 if (result + rv != buf) {
387                         PARA_DEBUG_LOG("%s: pool merge impossible: %p != %p\n",
388                                 btrn->name, result + rv, buf);
389                         break;
390                 }
391 //              PARA_CRIT_LOG("%s: inplace merge (%zu, %zu)->%zu\n", btrn->name,
392 //                      rv, sz, rv + sz);
393 //              PARA_CRIT_LOG("%s: inplace merge %p (%zu)\n", btrn->name,
394 //                      result, sz);
395                 rv += sz;
396         }
397         if (bufp)
398                 *bufp = result;
399         return rv;
400 }
401
402 void btr_consume(struct btr_node *btrn, size_t numbytes)
403 {
404         struct btr_buffer_reference *br, *tmp;
405         size_t sz;
406
407         if (numbytes == 0)
408                 return;
409         br = get_first_input_br(btrn);
410         assert(br);
411
412         //PARA_CRIT_LOG("wrap count: %zu\n", br->wrap_count);
413         if (br->wrap_count == 0) {
414                 /*
415                  * No wrap buffer. Drop buffer references whose buffer
416                  * has been fully used. */
417                 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn) {
418                         if (br->consumed + numbytes <= br->btrb->size) {
419                                 br->consumed += numbytes;
420                                 if (br->consumed == br->btrb->size)
421                                         btr_drop_buffer_reference(br);
422                                 return;
423                         }
424                         numbytes -= br->btrb->size - br->consumed;
425                         btr_drop_buffer_reference(br);
426                 }
427                 assert(true);
428         }
429         /*
430
431         We have a wrap buffer, consume from it. If in total,
432         i.e. including previous calls to brt_consume(), less than
433         wrap_count has been consumed, there's nothing more we can do.
434
435         Otherwise we drop the wrap buffer and consume from subsequent
436         buffers of the input queue the correct amount of bytes. This
437         is the total number of bytes that have been consumed from the
438         wrap buffer.
439 */
440         PARA_DEBUG_LOG("consuming %zu/%zu bytes from wrap buffer\n", numbytes,
441                 br_available_bytes(br));
442
443         assert(numbytes <= br_available_bytes(br));
444         if (br->consumed + numbytes < br->wrap_count) {
445                 br->consumed += numbytes;
446                 return;
447         }
448         PARA_DEBUG_LOG("dropping wrap buffer (%zu bytes)\n", br->btrb->size);
449         /* get rid of the wrap buffer */
450         sz = br->consumed + numbytes;
451         btr_drop_buffer_reference(br);
452         return btr_consume(btrn, sz);
453 }
454
455 static void flush_input_queue(struct btr_node *btrn)
456 {
457         struct btr_buffer_reference *br, *tmp;
458         FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
459                 btr_drop_buffer_reference(br);
460 }
461
462 void btr_free_node(struct btr_node *btrn)
463 {
464         if (!btrn)
465                 return;
466         free(btrn->name);
467         free(btrn);
468 }
469
470 void btr_remove_node(struct btr_node *btrn)
471 {
472         struct btr_node *ch;
473
474         if (!btrn)
475                 return;
476         PARA_NOTICE_LOG("removing btr node %s from buffer tree\n", btrn->name);
477         FOR_EACH_CHILD(ch, btrn)
478                 ch->parent = NULL;
479         flush_input_queue(btrn);
480         if (btrn->parent)
481                 list_del(&btrn->node);
482 }
483
484 size_t btr_get_input_queue_size(struct btr_node *btrn)
485 {
486         struct btr_buffer_reference *br;
487         size_t size = 0;
488
489         FOR_EACH_BUFFER_REF(br, btrn) {
490                 //PARA_CRIT_LOG("size: %zu\n", size);
491                 size += br_available_bytes(br);
492         }
493         return size;
494 }
495
496 void btr_splice_out_node(struct btr_node *btrn)
497 {
498         struct btr_node *ch, *tmp;
499
500         assert(btrn);
501         PARA_NOTICE_LOG("splicing out %s\n", btrn->name);
502         btr_pushdown(btrn);
503         if (btrn->parent)
504                 list_del(&btrn->node);
505         FOR_EACH_CHILD_SAFE(ch, tmp, btrn) {
506                 PARA_INFO_LOG("parent(%s): %s\n", ch->name,
507                         btrn->parent? btrn->parent->name : "NULL");
508                 ch->parent = btrn->parent;
509                 if (btrn->parent)
510                         list_move(&ch->node, &btrn->parent->children);
511         }
512         assert(list_empty(&btrn->children));
513 }
514
515 /**
516  * Return the size of the largest input queue.
517  *
518  * Iterates over all children of the given node.
519  */
520 size_t btr_bytes_pending(struct btr_node *btrn)
521 {
522         size_t max_size = 0;
523         struct btr_node *ch;
524
525         FOR_EACH_CHILD(ch, btrn) {
526                 size_t size = btr_get_input_queue_size(ch);
527                 max_size = PARA_MAX(max_size, size);
528         }
529         return max_size;
530 }
531
532 int btr_exec(struct btr_node *btrn, const char *command, char **value_result)
533 {
534         if (!btrn)
535                 return -ERRNO_TO_PARA_ERROR(EINVAL);
536         if (!btrn->execute)
537                 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
538         return btrn->execute(btrn, command, value_result);
539 }
540
541 int btr_exec_up(struct btr_node *btrn, const char *command, char **value_result)
542 {
543         int ret;
544
545         for (; btrn; btrn = btrn->parent) {
546                 struct btr_node *parent = btrn->parent;
547                 if (!parent)
548                         return -ERRNO_TO_PARA_ERROR(ENOTSUP);
549                 if (!parent->execute)
550                         continue;
551                 PARA_INFO_LOG("parent: %s, cmd: %s\n", parent->name, command);
552                 ret = parent->execute(parent, command, value_result);
553                 if (ret == -ERRNO_TO_PARA_ERROR(ENOTSUP))
554                         continue;
555                 if (ret < 0)
556                         return ret;
557                 if (value_result && *value_result)
558                         PARA_NOTICE_LOG("%s(%s): %s\n", command, parent->name,
559                                 *value_result);
560                 return 1;
561         }
562         return -ERRNO_TO_PARA_ERROR(ENOTSUP);
563 }
564
565 void *btr_context(struct btr_node *btrn)
566 {
567         return btrn->context;
568 }
569
570 static bool need_buffer_pool_merge(struct btr_node *btrn)
571 {
572         struct btr_buffer_reference *br = get_first_input_br(btrn);
573
574         if (!br)
575                 return false;
576         if (br->wrap_count != 0)
577                 return true;
578         if (br->btrb->pool)
579                 return true;
580         return false;
581 }
582
583 static void merge_input_pool(struct btr_node *btrn, size_t dest_size)
584 {
585         struct btr_buffer_reference *br, *wbr;
586         int num_refs; /* including wrap buffer */
587         char *buf, *buf1, *buf2 = NULL;
588         size_t sz, sz1, sz2 = 0, wsz;
589
590         if (list_empty(&btrn->input_queue))
591                 return;
592
593         num_refs = 0;
594         FOR_EACH_BUFFER_REF(br, btrn) {
595                 num_refs++;
596                 sz = btr_get_buffer_by_reference(br, &buf);
597                 if (br->wrap_count != 0) {
598                         assert(!wbr);
599                         assert(num_refs == 1);
600                         wbr = br;
601                         if (sz >= dest_size)
602                                 return;
603                         continue;
604                 }
605                 if (!buf1) {
606                         buf1 = buf;
607                         sz1 = sz;
608                         goto next;
609                 }
610                 if (buf1 + sz1 == buf) {
611                         sz1 += sz;
612                         goto next;
613                 }
614                 if (!buf2) {
615                         buf2 = buf;
616                         sz2 = sz;
617                         goto next;
618                 }
619                 assert(buf2 + sz2 == buf);
620                 sz2 += sz;
621 next:
622                 if (sz1 + sz2 >= dest_size)
623                         break;
624         }
625         /*
626          * If the second buffer is large, we only take the first part of it to
627          * avoid having to memcpy() huge buffers.
628          */
629         sz2 = PARA_MIN(sz2, (size_t)(64 * 1024));
630         if (!wbr) {
631                 assert(buf1);
632                 if (!buf2) /* nothing to do */
633                         return;
634                 /* Make a new wrap buffer combining buf1 and buf2. */
635                 sz = sz1 + sz2;
636                 buf = para_malloc(sz);
637                 PARA_DEBUG_LOG("merging input buffers: (%p:%zu, %p:%zu) -> %p:%zu\n",
638                         buf1, sz1, buf2, sz2, buf, sz);
639                 memcpy(buf, buf1, sz1);
640                 memcpy(buf + sz1, buf2, sz2);
641                 br = para_calloc(sizeof(*br));
642                 br->btrb = new_btrb(buf, sz);
643                 br->btrb->refcount = 1;
644                 br->consumed = 0;
645                 /* This is a wrap buffer */
646                 br->wrap_count = sz1;
647                 para_list_add(&br->node, &btrn->input_queue);
648                 return;
649         }
650         /*
651          * We already have a wrap buffer, but it is too small. It might be
652          * partially used.
653          */
654         wsz = br_available_bytes(wbr);
655         if (wbr->wrap_count == sz1 && wbr->btrb->size >= sz1 + sz2) /* nothing we can do about it */
656                 return;
657         assert(buf1 && buf2);
658         sz = sz1 + sz2 - wbr->btrb->size; /* amount of new data */
659         wbr->btrb->size += sz;
660         PARA_DEBUG_LOG("increasing wrap buffer to %zu\n", wbr->btrb->size);
661         wbr->btrb->buf = para_realloc(wbr->btrb->buf, wbr->btrb->size);
662         /* copy the new data to the end of the reallocated buffer */
663         assert(sz2 >= sz);
664         memcpy(wbr->btrb->buf + wbr->btrb->size - sz, buf2 + sz2 - sz, sz);
665 }
666
667 /**
668  * Merge the first two input buffers into one.
669  *
670  * This is a quite expensive operation.
671  *
672  * \return The number of buffers that have been available (zero, one or two).
673  */
674 static int merge_input(struct btr_node *btrn)
675 {
676         struct btr_buffer_reference *brs[2], *br;
677         char *bufs[2], *buf;
678         size_t szs[2], sz;
679         int i;
680
681         if (list_empty(&btrn->input_queue))
682                 return 0;
683         if (list_is_singular(&btrn->input_queue))
684                 return 1;
685         i = 0;
686         /* get references to the first two buffers */
687         FOR_EACH_BUFFER_REF(br, btrn) {
688                 brs[i] = br;
689                 szs[i] = btr_get_buffer_by_reference(brs[i], bufs + i);
690                 i++;
691                 if (i == 2)
692                         break;
693         }
694         /* make a new btrb that combines the two buffers and a br to it. */
695         sz = szs[0] + szs[1];
696         buf = para_malloc(sz);
697         PARA_DEBUG_LOG("%s: memory merging input buffers: (%zu, %zu) -> %zu\n",
698                 btrn->name, szs[0], szs[1], sz);
699         memcpy(buf, bufs[0], szs[0]);
700         memcpy(buf + szs[0], bufs[1], szs[1]);
701
702         br = para_calloc(sizeof(*br));
703         br->btrb = new_btrb(buf, sz);
704         br->btrb->refcount = 1;
705
706         /* replace the first two refs by the new one */
707         btr_drop_buffer_reference(brs[0]);
708         btr_drop_buffer_reference(brs[1]);
709         para_list_add(&br->node, &btrn->input_queue);
710         return 2;
711 }
712
713 void btr_merge(struct btr_node *btrn, size_t dest_size)
714 {
715         if (need_buffer_pool_merge(btrn))
716                 return merge_input_pool(btrn, dest_size);
717         for (;;) {
718                 char *buf;
719                 size_t len = btr_next_buffer(btrn, &buf);
720                 if (len >= dest_size)
721                         return;
722                 PARA_DEBUG_LOG("input size = %zu < %zu = dest\n", len, dest_size);
723                 if (merge_input(btrn) < 2)
724                         return;
725         }
726 }
727
728 bool btr_eof(struct btr_node *btrn)
729 {
730         char *buf;
731         size_t len = btr_next_buffer(btrn, &buf);
732
733         return (len == 0 && btr_no_parent(btrn));
734 }
735
736 void log_tree_recursively(struct btr_node *btrn, int loglevel, int depth)
737 {
738         struct btr_node *ch;
739         const char spaces[] = "                 ", *space = spaces + 16 - depth;
740
741         if (depth > 16)
742                 return;
743         para_log(loglevel, "%s%s\n", space, btrn->name);
744         FOR_EACH_CHILD(ch, btrn)
745                 log_tree_recursively(ch, loglevel, depth + 1);
746 }
747
748 void btr_log_tree(struct btr_node *btrn, int loglevel)
749 {
750         return log_tree_recursively(btrn, loglevel, 0);
751 }
752
753 /*
754  * \return \a root if \a name is \p NULL.
755  */
756 struct btr_node *btr_search_node(const char *name, struct btr_node *root)
757 {
758         struct btr_node *ch;
759
760         if (!name)
761                 return root;
762         if (!strcmp(root->name, name))
763                 return root;
764         FOR_EACH_CHILD(ch, root) {
765                 struct btr_node *result = btr_search_node(name, ch);
766                 if (result)
767                         return result;
768         }
769         return NULL;
770 }
771
772 /** 640K ought to be enough for everybody ;) */
773 #define BTRN_MAX_PENDING (640 * 1024)
774
775 int btr_node_status(struct btr_node *btrn, size_t min_iqs,
776         enum btr_node_type type)
777 {
778         size_t iqs;
779
780         if (!btrn)
781                 return 0;
782         if (type != BTR_NT_LEAF) {
783                 if (btr_no_children(btrn))
784                         return -E_BTR_NO_CHILD;
785                 if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
786                         return 0;
787         }
788         if (type != BTR_NT_ROOT) {
789                 if (btr_eof(btrn))
790                         return -E_BTR_EOF;
791                 iqs = btr_get_input_queue_size(btrn);
792                 if (iqs == 0) /* we have a parent, because not eof */
793                         return 0;
794                 if (iqs < min_iqs && !btr_no_parent(btrn))
795                         return 0;
796         }
797         return 1;
798 }
799
800 void btr_get_node_start(struct btr_node *btrn, struct timeval *tv)
801 {
802         *tv = btrn->start;
803 }