]> git.tuebingen.mpg.de Git - paraslash.git/blob - buffer_tree.c
[btr] log message cleanups.
[paraslash.git] / buffer_tree.c
1 #include <regex.h>
2 #include <stdbool.h>
3
4 #include "para.h"
5 #include "list.h"
6 #include "string.h"
7 #include "buffer_tree.h"
8 #include "error.h"
9 #include "sched.h"
10
11 struct btr_pool {
12         char *name;
13         char *area_start;
14         char *area_end;
15         char *rhead;
16         char *whead;
17 };
18
19 enum btr_buffer_flags {
20         /* changes the way the buffer is deallocated */
21         BTR_BF_BTR_POOL = 1,
22 };
23
24 struct btr_buffer {
25         char *buf;
26         size_t size;
27         /** The number of references to this buffer. */
28         int refcount;
29         struct btr_pool *pool;
30 };
31
32 struct btr_buffer_reference {
33         struct btr_buffer *btrb;
34         size_t consumed;
35         /* Each buffer reference belongs to the buffer queue list of some buffer tree node. */
36         struct list_head node;
37         size_t wrap_count;
38 };
39
40 struct btr_node {
41         char *name;
42         struct btr_node *parent;
43         /* The position of this btr node in the buffer tree. */
44         struct list_head node;
45         /* The children nodes of this btr node are linked together in a list. */
46         struct list_head children;
47         /* Time of first data transfer. */
48         struct timeval start;
49         /**
50          * The input queue is a list of references to btr buffers. Each item on
51          * the list represents an input buffer which has not been completely
52          * used by this btr node.
53          */
54         struct list_head input_queue;
55         btr_command_handler execute;
56         void *context;
57 };
58
59 struct btr_pool *btr_pool_new(const char *name, size_t area_size)
60 {
61         struct btr_pool *btrp;
62
63         PARA_INFO_LOG("%s, %zu bytes\n", name, area_size);
64         btrp = para_malloc(sizeof(*btrp));
65         btrp->area_start = para_malloc(area_size);
66         btrp->area_end = btrp->area_start + area_size;
67         btrp->rhead = btrp->area_start;
68         btrp->whead = btrp->area_start;
69         btrp->name = para_strdup(name);
70         return btrp;
71 }
72
73 /* whead = NULL means area full */
74
75 void btr_pool_free(struct btr_pool *btrp)
76 {
77         if (!btrp)
78                 return;
79         free(btrp->area_start);
80         free(btrp->name);
81         free(btrp);
82 }
83
84 size_t btr_pool_size(struct btr_pool *btrp)
85 {
86         return btrp->area_end - btrp->area_start;
87 }
88
89 size_t btr_pool_filled(struct btr_pool *btrp)
90 {
91         if (!btrp->whead)
92                 return btr_pool_size(btrp);
93         if (btrp->rhead <= btrp->whead)
94                 return  btrp->whead - btrp->rhead;
95         return btr_pool_size(btrp) - (btrp->rhead - btrp->whead);
96 }
97
98 size_t btr_pool_unused(struct btr_pool *btrp)
99 {
100         return btr_pool_size(btrp) - btr_pool_filled(btrp);
101 }
102
103 /*
104  * Return maximal size available for one read. This is
105  * smaller than the value returned by btr_pool_unused().
106  */
107 size_t btr_pool_available(struct btr_pool *btrp)
108 {
109         if (!btrp->whead)
110                 return 0;
111         if (btrp->rhead <= btrp->whead)
112                 return btrp->area_end - btrp->whead;
113         return btrp->rhead - btrp->whead;
114 }
115
116 size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result)
117 {
118         if (result)
119                 *result = btrp->whead;
120         return btr_pool_available(btrp);
121 }
122
123 void btr_pool_allocate(struct btr_pool *btrp, size_t size)
124 {
125         char *end;
126
127         if (size == 0)
128                 return;
129         assert(size <= btr_pool_available(btrp));
130         end = btrp->whead + size;
131         assert(end <= btrp->area_end);
132
133         if (end == btrp->area_end) {
134                 PARA_DEBUG_LOG("%s: end of pool area reached\n", btrp->name);
135                 end = btrp->area_start;
136         }
137         if (end == btrp->rhead) {
138                 PARA_DEBUG_LOG("%s btrp buffer full\n", btrp->name);
139                 end = NULL; /* buffer full */
140         }
141         btrp->whead = end;
142 }
143
144 static void btr_pool_deallocate(struct btr_pool *btrp, size_t size)
145 {
146         char *end = btrp->rhead + size;
147
148         if (size == 0)
149                 return;
150         assert(end <= btrp->area_end);
151         assert(size <= btr_pool_filled(btrp));
152         if (end == btrp->area_end)
153                 end = btrp->area_start;
154         if (!btrp->whead)
155                 btrp->whead = btrp->rhead;
156         btrp->rhead = end;
157         if (btrp->rhead == btrp->whead)
158                 btrp->rhead = btrp->whead = btrp->area_start;
159 }
160
161 #define FOR_EACH_CHILD(_tn, _btrn) list_for_each_entry((_tn), \
162         &((_btrn)->children), node)
163 #define FOR_EACH_CHILD_SAFE(_tn, _tmp, _btrn) \
164         list_for_each_entry_safe((_tn), (_tmp), &((_btrn)->children), node)
165
166 #define FOR_EACH_BUFFER_REF(_br, _btrn) \
167         list_for_each_entry((_br), &(_btrn)->input_queue, node)
168 #define FOR_EACH_BUFFER_REF_SAFE(_br, _tmp, _btrn) \
169         list_for_each_entry_safe((_br), (_tmp), &(_btrn)->input_queue, node)
170
171 /*
172         (parent, child):
173         (NULL, NULL): new, isolated node.
174         (NULL, c): new node becomes root, c must be old root
175         (p, NULL): new leaf node
176         (p, c): new internal node, ch must be child of p
177
178 */
179 struct btr_node *btr_new_node(struct btr_node_description *bnd)
180 {
181         struct btr_node *btrn = para_malloc(sizeof(*btrn));
182
183         btrn->name = para_strdup(bnd->name);
184         btrn->parent = bnd->parent;
185         btrn->execute = bnd->handler;
186         btrn->context = bnd->context;
187         btrn->start.tv_sec = 0;
188         btrn->start.tv_usec = 0;
189         if (bnd->parent)
190                 list_add_tail(&btrn->node, &bnd->parent->children);
191         INIT_LIST_HEAD(&btrn->children);
192         INIT_LIST_HEAD(&btrn->input_queue);
193         if (bnd->parent)
194                 PARA_INFO_LOG("added %s as child of %s\n", bnd->name, bnd->parent->name);
195         else
196                 PARA_INFO_LOG("added %s as btr root\n", bnd->name);
197         return btrn;
198 }
199
200 /*
201  * Allocate a new btr buffer.
202  *
203  * The freshly allocated buffer will have a zero refcount and will
204  * not be associated with a btr pool.
205  */
206 static struct btr_buffer *new_btrb(char *buf, size_t size)
207 {
208         struct btr_buffer *btrb = para_calloc(sizeof(*btrb));
209
210         btrb->buf = buf;
211         btrb->size = size;
212         return btrb;
213 }
214
215 static void dealloc_buffer(struct btr_buffer *btrb)
216 {
217         if (btrb->pool)
218                 btr_pool_deallocate(btrb->pool, btrb->size);
219         else
220                 free(btrb->buf);
221 }
222
223 static struct btr_buffer_reference *get_first_input_br(struct btr_node *btrn)
224 {
225         if (list_empty(&btrn->input_queue))
226                 return NULL;
227         return list_first_entry(&btrn->input_queue,
228                 struct btr_buffer_reference, node);
229 }
230
231 /*
232  * Deallocate the reference, release the resources if refcount drops to zero.
233  */
234 static void btr_drop_buffer_reference(struct btr_buffer_reference *br)
235 {
236         struct btr_buffer *btrb = br->btrb;
237
238         list_del(&br->node);
239         free(br);
240         btrb->refcount--;
241         if (btrb->refcount == 0) {
242                 dealloc_buffer(btrb);
243                 free(btrb);
244         }
245 }
246
247 static void add_btrb_to_children(struct btr_buffer *btrb,
248                 struct btr_node *btrn, size_t consumed)
249 {
250         struct btr_node *ch;
251
252         if (btrn->start.tv_sec == 0)
253                 btrn->start = *now;
254         FOR_EACH_CHILD(ch, btrn) {
255                 struct btr_buffer_reference *br = para_calloc(sizeof(*br));
256                 br->btrb = btrb;
257                 br->consumed = consumed;
258                 list_add_tail(&br->node, &ch->input_queue);
259                 btrb->refcount++;
260                 if (ch->start.tv_sec == 0)
261                         ch->start = *now;
262         }
263 }
264
265 void btr_add_output(char *buf, size_t size, struct btr_node *btrn)
266 {
267         struct btr_buffer *btrb;
268
269         assert(size != 0);
270         if (list_empty(&btrn->children)) {
271                 free(buf);
272                 return;
273         }
274         btrb = new_btrb(buf, size);
275         add_btrb_to_children(btrb, btrn, 0);
276 }
277
278 void btr_add_output_pool(struct btr_pool *btrp, size_t size,
279                 struct btr_node *btrn)
280 {
281         struct btr_buffer *btrb;
282         char *buf;
283         size_t avail;
284
285         assert(size != 0);
286         if (list_empty(&btrn->children))
287                 return;
288         avail = btr_pool_get_buffer(btrp, &buf);
289         assert(avail >= size);
290         btr_pool_allocate(btrp, size);
291         btrb = new_btrb(buf, size);
292         btrb->pool = btrp;
293         add_btrb_to_children(btrb, btrn, 0);
294 }
295
296 void btr_copy(const void *src, size_t n, struct btr_pool *btrp,
297         struct btr_node *btrn)
298 {
299         char *buf;
300         size_t sz, copy;
301
302         if (n == 0)
303                 return;
304         assert(n <= btr_pool_unused(btrp));
305         sz = btr_pool_get_buffer(btrp, &buf);
306         copy = PARA_MIN(sz, n);
307         memcpy(buf, src, copy);
308         btr_add_output_pool(btrp, copy, btrn);
309         if (copy == n)
310                 return;
311         sz = btr_pool_get_buffer(btrp, &buf);
312         assert(sz >= n - copy);
313         memcpy(buf, src + copy, n - copy);
314         btr_add_output_pool(btrp, n - copy, btrn);
315 }
316
317 static void btr_pushdown_br(struct btr_buffer_reference *br, struct btr_node *btrn)
318 {
319         add_btrb_to_children(br->btrb, btrn, br->consumed);
320         btr_drop_buffer_reference(br);
321 }
322
323 void btr_pushdown(struct btr_node *btrn)
324 {
325         struct btr_buffer_reference *br, *tmp;
326
327         FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
328                 btr_pushdown_br(br, btrn);
329 }
330
331 int btr_pushdown_one(struct btr_node *btrn)
332 {
333         struct btr_buffer_reference *br;
334
335         if (list_empty(&btrn->input_queue))
336                 return 0;
337         br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
338         btr_pushdown_br(br, btrn);
339         return 1;
340 }
341
342 /* Return true if this node has no children. */
343 bool btr_no_children(struct btr_node *btrn)
344 {
345         return list_empty(&btrn->children);
346 }
347
348 bool btr_no_parent(struct btr_node *btrn)
349 {
350         return !btrn->parent;
351 }
352
353 bool btr_inplace_ok(struct btr_node *btrn)
354 {
355         if (!btrn->parent)
356                 return true;
357         return list_is_singular(&btrn->parent->children);
358 }
359
360 static inline size_t br_available_bytes(struct btr_buffer_reference *br)
361 {
362         return br->btrb->size - br->consumed;
363 }
364
365 size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
366 {
367         if (buf)
368                 *buf = br->btrb->buf + br->consumed;
369         return br_available_bytes(br);
370 }
371
372 /**
373  * \return zero if the input buffer queue is empty.
374  */
375 size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
376 {
377         struct btr_buffer_reference *br;
378         char *buf, *result = NULL;
379         size_t sz, rv = 0;
380
381         FOR_EACH_BUFFER_REF(br, btrn) {
382                 sz = btr_get_buffer_by_reference(br, &buf);
383                 if (!result) {
384                         result = buf;
385                         rv = sz;
386                         if (!br->btrb->pool)
387                                 break;
388                         continue;
389                 }
390                 if (!br->btrb->pool)
391                         break;
392                 if (result + rv != buf)
393                         break;
394                 rv += sz;
395         }
396         if (bufp)
397                 *bufp = result;
398         return rv;
399 }
400
401 void btr_consume(struct btr_node *btrn, size_t numbytes)
402 {
403         struct btr_buffer_reference *br, *tmp;
404         size_t sz;
405
406         if (numbytes == 0)
407                 return;
408         br = get_first_input_br(btrn);
409         assert(br);
410
411         if (br->wrap_count == 0) {
412                 /*
413                  * No wrap buffer. Drop buffer references whose buffer
414                  * has been fully used. */
415                 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn) {
416                         if (br->consumed + numbytes <= br->btrb->size) {
417                                 br->consumed += numbytes;
418                                 if (br->consumed == br->btrb->size)
419                                         btr_drop_buffer_reference(br);
420                                 return;
421                         }
422                         numbytes -= br->btrb->size - br->consumed;
423                         btr_drop_buffer_reference(br);
424                 }
425                 assert(true);
426         }
427         /*
428
429         We have a wrap buffer, consume from it. If in total,
430         i.e. including previous calls to brt_consume(), less than
431         wrap_count has been consumed, there's nothing more we can do.
432
433         Otherwise we drop the wrap buffer and consume from subsequent
434         buffers of the input queue the correct amount of bytes. This
435         is the total number of bytes that have been consumed from the
436         wrap buffer.
437 */
438         PARA_DEBUG_LOG("consuming %zu/%zu bytes from wrap buffer\n", numbytes,
439                 br_available_bytes(br));
440
441         assert(numbytes <= br_available_bytes(br));
442         if (br->consumed + numbytes < br->wrap_count) {
443                 br->consumed += numbytes;
444                 return;
445         }
446         PARA_DEBUG_LOG("dropping wrap buffer (%zu bytes)\n", br->btrb->size);
447         /* get rid of the wrap buffer */
448         sz = br->consumed + numbytes;
449         btr_drop_buffer_reference(br);
450         return btr_consume(btrn, sz);
451 }
452
453 static void flush_input_queue(struct btr_node *btrn)
454 {
455         struct btr_buffer_reference *br, *tmp;
456         FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
457                 btr_drop_buffer_reference(br);
458 }
459
460 void btr_free_node(struct btr_node *btrn)
461 {
462         if (!btrn)
463                 return;
464         free(btrn->name);
465         free(btrn);
466 }
467
468 void btr_remove_node(struct btr_node *btrn)
469 {
470         struct btr_node *ch;
471
472         if (!btrn)
473                 return;
474         PARA_NOTICE_LOG("removing btr node %s from buffer tree\n", btrn->name);
475         FOR_EACH_CHILD(ch, btrn)
476                 ch->parent = NULL;
477         flush_input_queue(btrn);
478         if (btrn->parent)
479                 list_del(&btrn->node);
480 }
481
482 size_t btr_get_input_queue_size(struct btr_node *btrn)
483 {
484         struct btr_buffer_reference *br;
485         size_t size = 0, wrap_consumed = 0;
486
487         FOR_EACH_BUFFER_REF(br, btrn) {
488                 if (br->wrap_count != 0) {
489                         wrap_consumed = br->consumed;
490                         continue;
491                 }
492                 size += br_available_bytes(br);
493         }
494         assert(wrap_consumed <= size);
495         size -= wrap_consumed;
496         return size;
497 }
498
499 void btr_splice_out_node(struct btr_node *btrn)
500 {
501         struct btr_node *ch, *tmp;
502
503         assert(btrn);
504         PARA_NOTICE_LOG("splicing out %s\n", btrn->name);
505         btr_pushdown(btrn);
506         if (btrn->parent)
507                 list_del(&btrn->node);
508         FOR_EACH_CHILD_SAFE(ch, tmp, btrn) {
509                 PARA_INFO_LOG("parent(%s): %s\n", ch->name,
510                         btrn->parent? btrn->parent->name : "NULL");
511                 ch->parent = btrn->parent;
512                 if (btrn->parent)
513                         list_move(&ch->node, &btrn->parent->children);
514         }
515         assert(list_empty(&btrn->children));
516 }
517
518 /**
519  * Return the size of the largest input queue.
520  *
521  * Iterates over all children of the given node.
522  */
523 size_t btr_bytes_pending(struct btr_node *btrn)
524 {
525         size_t max_size = 0;
526         struct btr_node *ch;
527
528         FOR_EACH_CHILD(ch, btrn) {
529                 size_t size = btr_get_input_queue_size(ch);
530                 max_size = PARA_MAX(max_size, size);
531         }
532         return max_size;
533 }
534
535 int btr_exec(struct btr_node *btrn, const char *command, char **value_result)
536 {
537         if (!btrn)
538                 return -ERRNO_TO_PARA_ERROR(EINVAL);
539         if (!btrn->execute)
540                 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
541         return btrn->execute(btrn, command, value_result);
542 }
543
544 int btr_exec_up(struct btr_node *btrn, const char *command, char **value_result)
545 {
546         int ret;
547
548         for (; btrn; btrn = btrn->parent) {
549                 struct btr_node *parent = btrn->parent;
550                 if (!parent)
551                         return -ERRNO_TO_PARA_ERROR(ENOTSUP);
552                 if (!parent->execute)
553                         continue;
554                 PARA_INFO_LOG("parent: %s, cmd: %s\n", parent->name, command);
555                 ret = parent->execute(parent, command, value_result);
556                 if (ret == -ERRNO_TO_PARA_ERROR(ENOTSUP))
557                         continue;
558                 if (ret < 0)
559                         return ret;
560                 if (value_result && *value_result)
561                         PARA_NOTICE_LOG("%s(%s): %s\n", command, parent->name,
562                                 *value_result);
563                 return 1;
564         }
565         return -ERRNO_TO_PARA_ERROR(ENOTSUP);
566 }
567
568 void *btr_context(struct btr_node *btrn)
569 {
570         return btrn->context;
571 }
572
573 static bool need_buffer_pool_merge(struct btr_node *btrn)
574 {
575         struct btr_buffer_reference *br = get_first_input_br(btrn);
576
577         if (!br)
578                 return false;
579         if (br->wrap_count != 0)
580                 return true;
581         if (br->btrb->pool)
582                 return true;
583         return false;
584 }
585
586 static void merge_input_pool(struct btr_node *btrn, size_t dest_size)
587 {
588         struct btr_buffer_reference *br, *wbr = NULL;
589         int num_refs; /* including wrap buffer */
590         char *buf, *buf1 = NULL, *buf2 = NULL;
591         size_t sz, sz1 = 0, sz2 = 0, wsz;
592
593         br = get_first_input_br(btrn);
594         if (!br || br_available_bytes(br) >= dest_size)
595                 return;
596         num_refs = 0;
597         FOR_EACH_BUFFER_REF(br, btrn) {
598                 num_refs++;
599                 sz = btr_get_buffer_by_reference(br, &buf);
600                 if (sz == 0)
601                         break;
602                 if (br->wrap_count != 0) {
603                         assert(!wbr);
604                         assert(num_refs == 1);
605                         wbr = br;
606                         if (sz >= dest_size)
607                                 return;
608                         continue;
609                 }
610                 if (!buf1) {
611                         buf1 = buf;
612                         sz1 = sz;
613                         goto next;
614                 }
615                 if (buf1 + sz1 == buf) {
616                         sz1 += sz;
617                         goto next;
618                 }
619                 if (!buf2) {
620                         buf2 = buf;
621                         sz2 = sz;
622                         goto next;
623                 }
624                 assert(buf2 + sz2 == buf);
625                 sz2 += sz;
626 next:
627                 if (sz1 + sz2 >= dest_size)
628                         break;
629         }
630         if (!buf2) /* nothing to do */
631                 return;
632         assert(buf1 && sz2 > 0);
633         /*
634          * If the second buffer is large, we only take the first part of it to
635          * avoid having to memcpy() huge buffers.
636          */
637         sz2 = PARA_MIN(sz2, (size_t)(64 * 1024));
638         if (!wbr) {
639                 /* Make a new wrap buffer combining buf1 and buf2. */
640                 sz = sz1 + sz2;
641                 buf = para_malloc(sz);
642                 PARA_DEBUG_LOG("merging input buffers: (%p:%zu, %p:%zu) -> %p:%zu\n",
643                         buf1, sz1, buf2, sz2, buf, sz);
644                 memcpy(buf, buf1, sz1);
645                 memcpy(buf + sz1, buf2, sz2);
646                 br = para_calloc(sizeof(*br));
647                 br->btrb = new_btrb(buf, sz);
648                 br->btrb->refcount = 1;
649                 br->consumed = 0;
650                 /* This is a wrap buffer */
651                 br->wrap_count = sz1;
652                 para_list_add(&br->node, &btrn->input_queue);
653                 return;
654         }
655         PARA_DEBUG_LOG("increasing wrap buffer, sz1: %zu, sz2: %zu\n", sz1, sz2);
656         /*
657          * We already have a wrap buffer, but it is too small. It might be
658          * partially used.
659          */
660         wsz = br_available_bytes(wbr);
661         if (wbr->wrap_count == sz1 && wbr->btrb->size >= sz1 + sz2) /* nothing we can do about it */
662                 return;
663         sz = sz1 + sz2 - wbr->btrb->size; /* amount of new data */
664         wbr->btrb->size += sz;
665         wbr->btrb->buf = para_realloc(wbr->btrb->buf, wbr->btrb->size);
666         /* copy the new data to the end of the reallocated buffer */
667         assert(sz2 >= sz);
668         memcpy(wbr->btrb->buf + wbr->btrb->size - sz, buf2 + sz2 - sz, sz);
669 }
670
671 /**
672  * Merge the first two input buffers into one.
673  *
674  * This is a quite expensive operation.
675  *
676  * \return The number of buffers that have been available (zero, one or two).
677  */
678 static int merge_input(struct btr_node *btrn)
679 {
680         struct btr_buffer_reference *brs[2], *br;
681         char *bufs[2], *buf;
682         size_t szs[2], sz;
683         int i;
684
685         if (list_empty(&btrn->input_queue))
686                 return 0;
687         if (list_is_singular(&btrn->input_queue))
688                 return 1;
689         i = 0;
690         /* get references to the first two buffers */
691         FOR_EACH_BUFFER_REF(br, btrn) {
692                 brs[i] = br;
693                 szs[i] = btr_get_buffer_by_reference(brs[i], bufs + i);
694                 i++;
695                 if (i == 2)
696                         break;
697         }
698         /* make a new btrb that combines the two buffers and a br to it. */
699         sz = szs[0] + szs[1];
700         buf = para_malloc(sz);
701         PARA_DEBUG_LOG("%s: memory merging input buffers: (%zu, %zu) -> %zu\n",
702                 btrn->name, szs[0], szs[1], sz);
703         memcpy(buf, bufs[0], szs[0]);
704         memcpy(buf + szs[0], bufs[1], szs[1]);
705
706         br = para_calloc(sizeof(*br));
707         br->btrb = new_btrb(buf, sz);
708         br->btrb->refcount = 1;
709
710         /* replace the first two refs by the new one */
711         btr_drop_buffer_reference(brs[0]);
712         btr_drop_buffer_reference(brs[1]);
713         para_list_add(&br->node, &btrn->input_queue);
714         return 2;
715 }
716
717 void btr_merge(struct btr_node *btrn, size_t dest_size)
718 {
719         if (need_buffer_pool_merge(btrn))
720                 return merge_input_pool(btrn, dest_size);
721         for (;;) {
722                 char *buf;
723                 size_t len = btr_next_buffer(btrn, &buf);
724                 if (len >= dest_size)
725                         return;
726                 PARA_DEBUG_LOG("input size = %zu < %zu = dest\n", len, dest_size);
727                 if (merge_input(btrn) < 2)
728                         return;
729         }
730 }
731
732 bool btr_eof(struct btr_node *btrn)
733 {
734         char *buf;
735         size_t len = btr_next_buffer(btrn, &buf);
736
737         return (len == 0 && btr_no_parent(btrn));
738 }
739
740 void log_tree_recursively(struct btr_node *btrn, int loglevel, int depth)
741 {
742         struct btr_node *ch;
743         const char spaces[] = "                 ", *space = spaces + 16 - depth;
744
745         if (depth > 16)
746                 return;
747         para_log(loglevel, "%s%s\n", space, btrn->name);
748         FOR_EACH_CHILD(ch, btrn)
749                 log_tree_recursively(ch, loglevel, depth + 1);
750 }
751
752 void btr_log_tree(struct btr_node *btrn, int loglevel)
753 {
754         return log_tree_recursively(btrn, loglevel, 0);
755 }
756
757 /*
758  * \return \a root if \a name is \p NULL.
759  */
760 struct btr_node *btr_search_node(const char *name, struct btr_node *root)
761 {
762         struct btr_node *ch;
763
764         if (!name)
765                 return root;
766         if (!strcmp(root->name, name))
767                 return root;
768         FOR_EACH_CHILD(ch, root) {
769                 struct btr_node *result = btr_search_node(name, ch);
770                 if (result)
771                         return result;
772         }
773         return NULL;
774 }
775
776 /** 640K ought to be enough for everybody ;) */
777 #define BTRN_MAX_PENDING (640 * 1024)
778
779 int btr_node_status(struct btr_node *btrn, size_t min_iqs,
780         enum btr_node_type type)
781 {
782         size_t iqs;
783
784         assert(btrn);
785         if (type != BTR_NT_LEAF) {
786                 if (btr_no_children(btrn))
787                         return -E_BTR_NO_CHILD;
788                 if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
789                         return 0;
790         }
791         if (type != BTR_NT_ROOT) {
792                 if (btr_eof(btrn))
793                         return -E_BTR_EOF;
794                 iqs = btr_get_input_queue_size(btrn);
795                 if (iqs == 0) /* we have a parent, because not eof */
796                         return 0;
797                 if (iqs < min_iqs && !btr_no_parent(btrn))
798                         return 0;
799         }
800         return 1;
801 }
802
803 void btr_get_node_start(struct btr_node *btrn, struct timeval *tv)
804 {
805         *tv = btrn->start;
806 }