]> git.tuebingen.mpg.de Git - paraslash.git/blob - buffer_tree.c
Add btr pool support to the fecdec filter.
[paraslash.git] / buffer_tree.c
1 #include <regex.h>
2 #include <stdbool.h>
3
4 #include "para.h"
5 #include "list.h"
6 #include "string.h"
7 #include "buffer_tree.h"
8 #include "error.h"
9 #include "sched.h"
10
11 struct btr_pool {
12         char *name;
13         char *area_start;
14         char *area_end;
15         char *rhead;
16         char *whead;
17 };
18
19 enum btr_buffer_flags {
20         /* changes the way the buffer is deallocated */
21         BTR_BF_BTR_POOL = 1,
22 };
23
24 struct btr_buffer {
25         char *buf;
26         size_t size;
27         /** The number of references to this buffer. */
28         int refcount;
29         struct btr_pool *pool;
30 };
31
32 struct btr_buffer_reference {
33         struct btr_buffer *btrb;
34         size_t consumed;
35         /* Each buffer reference belongs to the buffer queue list of some buffer tree node. */
36         struct list_head node;
37         size_t wrap_count;
38 };
39
40 struct btr_node {
41         char *name;
42         struct btr_node *parent;
43         /* The position of this btr node in the buffer tree. */
44         struct list_head node;
45         /* The children nodes of this btr node are linked together in a list. */
46         struct list_head children;
47         /* Time of first data transfer. */
48         struct timeval start;
49         /**
50          * The input queue is a list of references to btr buffers. Each item on
51          * the list represents an input buffer which has not been completely
52          * used by this btr node.
53          */
54         struct list_head input_queue;
55         btr_command_handler execute;
56         void *context;
57 };
58
59 struct btr_pool *btr_pool_new(const char *name, size_t area_size)
60 {
61         struct btr_pool *btrp;
62
63         PARA_INFO_LOG("%s, %zu bytes\n", name, area_size);
64         btrp = para_malloc(sizeof(*btrp));
65         btrp->area_start = para_malloc(area_size);
66         btrp->area_end = btrp->area_start + area_size;
67         btrp->rhead = btrp->area_start;
68         btrp->whead = btrp->area_start;
69         btrp->name = para_strdup(name);
70         return btrp;
71 }
72
73 /* whead = NULL means area full */
74
75 void btr_pool_free(struct btr_pool *btrp)
76 {
77         if (!btrp)
78                 return;
79         free(btrp->area_start);
80         free(btrp->name);
81         free(btrp);
82 }
83
84 size_t btr_pool_size(struct btr_pool *btrp)
85 {
86         return btrp->area_end - btrp->area_start;
87 }
88
89 size_t btr_pool_filled(struct btr_pool *btrp)
90 {
91         if (!btrp->whead)
92                 return btr_pool_size(btrp);
93         if (btrp->rhead <= btrp->whead)
94                 return  btrp->whead - btrp->rhead;
95         return btr_pool_size(btrp) - (btrp->rhead - btrp->whead);
96 }
97
98 size_t btr_pool_unused(struct btr_pool *btrp)
99 {
100         return btr_pool_size(btrp) - btr_pool_filled(btrp);
101 }
102
103 size_t btr_pool_available(struct btr_pool *btrp)
104 {
105         if (!btrp->whead)
106                 return 0;
107         if (btrp->rhead <= btrp->whead)
108                 return btrp->area_end - btrp->whead;
109         return btrp->rhead - btrp->whead;
110 }
111
112 size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result)
113 {
114         if (result)
115                 *result = btrp->whead;
116         return btr_pool_available(btrp);
117 }
118
119 void btr_pool_allocate(struct btr_pool *btrp, size_t size)
120 {
121         char *end;
122
123         if (size == 0)
124                 return;
125         //PARA_CRIT_LOG("filled: %zu, alloc %zu\n", btr_pool_filled(btrp), size);
126         assert(size <= btr_pool_available(btrp));
127         end = btrp->whead + size;
128         assert(end <= btrp->area_end);
129
130         if (end == btrp->area_end) {
131                 PARA_DEBUG_LOG("end of pool area reached: %p\n", end);
132                 end = btrp->area_start;
133         }
134         if (end == btrp->rhead) {
135                 PARA_DEBUG_LOG("btrp buffer full\n");
136                 end = NULL; /* buffer full */
137         }
138         btrp->whead = end;
139         //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
140 }
141
142 static void btr_pool_deallocate(struct btr_pool *btrp, size_t size)
143 {
144         char *end = btrp->rhead + size;
145
146         //PARA_CRIT_LOG("filled: %zu, dealloc %zu\n", btr_pool_filled(btrp), size);
147         if (size == 0)
148                 return;
149         assert(end <= btrp->area_end);
150         assert(size <= btr_pool_filled(btrp));
151         if (end == btrp->area_end)
152                 end = btrp->area_start;
153         if (!btrp->whead)
154                 btrp->whead = btrp->rhead;
155         btrp->rhead = end;
156         if (btrp->rhead == btrp->whead)
157                 btrp->rhead = btrp->whead = btrp->area_start;
158         //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
159 }
160
161 #define FOR_EACH_CHILD(_tn, _btrn) list_for_each_entry((_tn), \
162         &((_btrn)->children), node)
163 #define FOR_EACH_CHILD_SAFE(_tn, _tmp, _btrn) \
164         list_for_each_entry_safe((_tn), (_tmp), &((_btrn)->children), node)
165
166 #define FOR_EACH_BUFFER_REF(_br, _btrn) \
167         list_for_each_entry((_br), &(_btrn)->input_queue, node)
168 #define FOR_EACH_BUFFER_REF_SAFE(_br, _tmp, _btrn) \
169         list_for_each_entry_safe((_br), (_tmp), &(_btrn)->input_queue, node)
170
171 struct btr_node *btr_new_node(const char *name, struct btr_node *parent,
172                 btr_command_handler handler, void *context)
173 {
174         struct btr_node *btrn = para_malloc(sizeof(*btrn));
175
176         btrn->name = para_strdup(name);
177         btrn->parent = parent;
178         btrn->execute = handler;
179         btrn->context = context;
180         btrn->start.tv_sec = 0;
181         btrn->start.tv_usec = 0;
182         if (parent)
183                 list_add_tail(&btrn->node, &parent->children);
184         INIT_LIST_HEAD(&btrn->children);
185         INIT_LIST_HEAD(&btrn->input_queue);
186         if (parent)
187                 PARA_INFO_LOG("added %s as child of %s\n", name, parent->name);
188         else
189                 PARA_INFO_LOG("added %s as btr root\n", name);
190         return btrn;
191 }
192
193 /*
194  * Allocate a new btr buffer.
195  *
196  * The freshly allocated buffer will have a zero refcount.
197  */
198 static struct btr_buffer *new_btrb(char *buf, size_t size)
199 {
200         struct btr_buffer *btrb = para_calloc(sizeof(*btrb));
201
202         btrb->buf = buf;
203         btrb->size = size;
204         return btrb;
205 }
206
207 static void dealloc_buffer(struct btr_buffer *btrb)
208 {
209         if (btrb->pool)
210                 btr_pool_deallocate(btrb->pool, btrb->size);
211         else
212                 free(btrb->buf);
213 }
214
215 static struct btr_buffer_reference *get_first_input_br(struct btr_node *btrn)
216 {
217         if (list_empty(&btrn->input_queue))
218                 return NULL;
219         return list_first_entry(&btrn->input_queue,
220                 struct btr_buffer_reference, node);
221 }
222
223 /*
224  * Deallocate the reference, release the resources if refcount drops to zero.
225  */
226 static void btr_drop_buffer_reference(struct btr_buffer_reference *br)
227 {
228         struct btr_buffer *btrb = br->btrb;
229
230         //PARA_CRIT_LOG("dropping buffer reference %p\n", br);
231         list_del(&br->node);
232         free(br);
233         btrb->refcount--;
234         if (btrb->refcount == 0) {
235                 dealloc_buffer(btrb);
236                 free(btrb);
237         }
238 }
239
240 static void add_btrb_to_children(struct btr_buffer *btrb,
241                 struct btr_node *btrn, size_t consumed)
242 {
243         struct btr_node *ch;
244
245         if (btrn->start.tv_sec == 0)
246                 btrn->start = *now;
247         FOR_EACH_CHILD(ch, btrn) {
248                 struct btr_buffer_reference *br = para_calloc(sizeof(*br));
249                 br->btrb = btrb;
250                 br->consumed = consumed;
251                 list_add_tail(&br->node, &ch->input_queue);
252                 btrb->refcount++;
253                 if (ch->start.tv_sec == 0)
254                         ch->start = *now;
255         }
256 }
257
258 void btr_add_output(char *buf, size_t size, struct btr_node *btrn)
259 {
260         struct btr_buffer *btrb;
261
262         assert(size != 0);
263         if (list_empty(&btrn->children)) {
264                 free(buf);
265                 return;
266         }
267         btrb = new_btrb(buf, size);
268         add_btrb_to_children(btrb, btrn, 0);
269 }
270
271 void btr_add_output_pool(struct btr_pool *btrp, size_t size,
272                 struct btr_node *btrn)
273 {
274         struct btr_buffer *btrb;
275         char *buf;
276         size_t avail;
277
278         assert(size != 0);
279         if (list_empty(&btrn->children))
280                 return;
281         avail = btr_pool_get_buffer(btrp, &buf);
282         assert(avail >= size);
283         btr_pool_allocate(btrp, size);
284         btrb = new_btrb(buf, size);
285         btrb->pool = btrp;
286         add_btrb_to_children(btrb, btrn, 0);
287 }
288
289 void btr_copy(const void *src, size_t n, struct btr_pool *btrp,
290         struct btr_node *btrn)
291 {
292         char *buf;
293         size_t sz, copy;
294
295         if (n == 0)
296                 return;
297         assert(n <= btr_pool_unused(btrp));
298         sz = btr_pool_get_buffer(btrp, &buf);
299         copy = PARA_MIN(sz, n);
300         memcpy(buf, src, copy);
301         btr_add_output_pool(btrp, copy, btrn);
302         if (copy == n)
303                 return;
304         sz = btr_pool_get_buffer(btrp, &buf);
305         assert(sz >= n - copy);
306         memcpy(buf, src + copy, n - copy);
307         btr_add_output_pool(btrp, n - copy, btrn);
308 }
309
310 static void btr_pushdown_br(struct btr_buffer_reference *br, struct btr_node *btrn)
311 {
312         add_btrb_to_children(br->btrb, btrn, br->consumed);
313         btr_drop_buffer_reference(br);
314 }
315
316 void btr_pushdown(struct btr_node *btrn)
317 {
318         struct btr_buffer_reference *br, *tmp;
319
320         FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
321                 btr_pushdown_br(br, btrn);
322 }
323
324 int btr_pushdown_one(struct btr_node *btrn)
325 {
326         struct btr_buffer_reference *br;
327
328         if (list_empty(&btrn->input_queue))
329                 return 0;
330         br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
331         btr_pushdown_br(br, btrn);
332         return 1;
333 }
334
335 /* Return true if this node has no children. */
336 bool btr_no_children(struct btr_node *btrn)
337 {
338         return list_empty(&btrn->children);
339 }
340
341 bool btr_no_parent(struct btr_node *btrn)
342 {
343         return !btrn->parent;
344 }
345
346 bool btr_inplace_ok(struct btr_node *btrn)
347 {
348         if (!btrn->parent)
349                 return true;
350         return list_is_singular(&btrn->parent->children);
351 }
352
353 static inline size_t br_available_bytes(struct btr_buffer_reference *br)
354 {
355         return br->btrb->size - br->consumed;
356 }
357
358 size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
359 {
360         if (buf)
361                 *buf = br->btrb->buf + br->consumed;
362         return br_available_bytes(br);
363 }
364
365 /**
366  * \return zero if the input buffer queue is empty.
367  */
368 size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
369 {
370         struct btr_buffer_reference *br;
371         char *buf, *result = NULL;
372         size_t sz, rv = 0;
373
374         FOR_EACH_BUFFER_REF(br, btrn) {
375                 sz = btr_get_buffer_by_reference(br, &buf);
376                 if (!result) {
377                         result = buf;
378                         rv = sz;
379                         if (!br->btrb->pool)
380                                 break;
381                         continue;
382                 }
383                 if (!br->btrb->pool)
384                         break;
385                 if (result + rv != buf) {
386                         PARA_DEBUG_LOG("%s: pool merge impossible: %p != %p\n",
387                                 btrn->name, result + rv, buf);
388                         break;
389                 }
390 //              PARA_CRIT_LOG("%s: inplace merge (%zu, %zu)->%zu\n", btrn->name,
391 //                      rv, sz, rv + sz);
392 //              PARA_CRIT_LOG("%s: inplace merge %p (%zu)\n", btrn->name,
393 //                      result, sz);
394                 rv += sz;
395         }
396         if (bufp)
397                 *bufp = result;
398         return rv;
399 }
400
401 void btr_consume(struct btr_node *btrn, size_t numbytes)
402 {
403         struct btr_buffer_reference *br, *tmp;
404         size_t sz;
405
406         if (numbytes == 0)
407                 return;
408         br = get_first_input_br(btrn);
409         assert(br);
410
411         //PARA_CRIT_LOG("wrap count: %zu\n", br->wrap_count);
412         if (br->wrap_count == 0) {
413                 /*
414                  * No wrap buffer. Drop buffer references whose buffer
415                  * has been fully used. */
416                 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn) {
417                         if (br->consumed + numbytes <= br->btrb->size) {
418                                 br->consumed += numbytes;
419                                 if (br->consumed == br->btrb->size)
420                                         btr_drop_buffer_reference(br);
421                                 return;
422                         }
423                         numbytes -= br->btrb->size - br->consumed;
424                         btr_drop_buffer_reference(br);
425                 }
426                 assert(true);
427         }
428         /*
429
430         We have a wrap buffer, consume from it. If in total,
431         i.e. including previous calls to brt_consume(), less than
432         wrap_count has been consumed, there's nothing more we can do.
433
434         Otherwise we drop the wrap buffer and consume from subsequent
435         buffers of the input queue the correct amount of bytes. This
436         is the total number of bytes that have been consumed from the
437         wrap buffer.
438 */
439         PARA_DEBUG_LOG("consuming %zu/%zu bytes from wrap buffer\n", numbytes,
440                 br_available_bytes(br));
441
442         assert(numbytes <= br_available_bytes(br));
443         if (br->consumed + numbytes < br->wrap_count) {
444                 br->consumed += numbytes;
445                 return;
446         }
447         PARA_DEBUG_LOG("dropping wrap buffer (%zu bytes)\n", br->btrb->size);
448         /* get rid of the wrap buffer */
449         sz = br->consumed + numbytes;
450         btr_drop_buffer_reference(br);
451         return btr_consume(btrn, sz);
452 }
453
454 static void flush_input_queue(struct btr_node *btrn)
455 {
456         struct btr_buffer_reference *br, *tmp;
457         FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
458                 btr_drop_buffer_reference(br);
459 }
460
461 void btr_free_node(struct btr_node *btrn)
462 {
463         if (!btrn)
464                 return;
465         free(btrn->name);
466         free(btrn);
467 }
468
469 void btr_remove_node(struct btr_node *btrn)
470 {
471         struct btr_node *ch;
472
473         if (!btrn)
474                 return;
475         PARA_NOTICE_LOG("removing btr node %s from buffer tree\n", btrn->name);
476         FOR_EACH_CHILD(ch, btrn)
477                 ch->parent = NULL;
478         flush_input_queue(btrn);
479         if (btrn->parent)
480                 list_del(&btrn->node);
481 }
482
483 size_t btr_get_input_queue_size(struct btr_node *btrn)
484 {
485         struct btr_buffer_reference *br;
486         size_t size = 0;
487
488         FOR_EACH_BUFFER_REF(br, btrn) {
489                 //PARA_CRIT_LOG("size: %zu\n", size);
490                 size += br_available_bytes(br);
491         }
492         return size;
493 }
494
495 void btr_splice_out_node(struct btr_node *btrn)
496 {
497         struct btr_node *ch, *tmp;
498
499         assert(btrn);
500         PARA_NOTICE_LOG("splicing out %s\n", btrn->name);
501         btr_pushdown(btrn);
502         if (btrn->parent)
503                 list_del(&btrn->node);
504         FOR_EACH_CHILD_SAFE(ch, tmp, btrn) {
505                 PARA_INFO_LOG("parent(%s): %s\n", ch->name,
506                         btrn->parent? btrn->parent->name : "NULL");
507                 ch->parent = btrn->parent;
508                 if (btrn->parent)
509                         list_move(&ch->node, &btrn->parent->children);
510         }
511         assert(list_empty(&btrn->children));
512 }
513
514 /**
515  * Return the size of the largest input queue.
516  *
517  * Iterates over all children of the given node.
518  */
519 size_t btr_bytes_pending(struct btr_node *btrn)
520 {
521         size_t max_size = 0;
522         struct btr_node *ch;
523
524         FOR_EACH_CHILD(ch, btrn) {
525                 size_t size = btr_get_input_queue_size(ch);
526                 max_size = PARA_MAX(max_size, size);
527         }
528         return max_size;
529 }
530
531 int btr_exec(struct btr_node *btrn, const char *command, char **value_result)
532 {
533         if (!btrn)
534                 return -ERRNO_TO_PARA_ERROR(EINVAL);
535         if (!btrn->execute)
536                 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
537         return btrn->execute(btrn, command, value_result);
538 }
539
540 int btr_exec_up(struct btr_node *btrn, const char *command, char **value_result)
541 {
542         int ret;
543
544         for (; btrn; btrn = btrn->parent) {
545                 struct btr_node *parent = btrn->parent;
546                 if (!parent)
547                         return -ERRNO_TO_PARA_ERROR(ENOTSUP);
548                 if (!parent->execute)
549                         continue;
550                 PARA_INFO_LOG("parent: %s, cmd: %s\n", parent->name, command);
551                 ret = parent->execute(parent, command, value_result);
552                 if (ret == -ERRNO_TO_PARA_ERROR(ENOTSUP))
553                         continue;
554                 if (ret < 0)
555                         return ret;
556                 if (value_result && *value_result)
557                         PARA_NOTICE_LOG("%s(%s): %s\n", command, parent->name,
558                                 *value_result);
559                 return 1;
560         }
561         return -ERRNO_TO_PARA_ERROR(ENOTSUP);
562 }
563
564 void *btr_context(struct btr_node *btrn)
565 {
566         return btrn->context;
567 }
568
569 static bool need_buffer_pool_merge(struct btr_node *btrn)
570 {
571         struct btr_buffer_reference *br = get_first_input_br(btrn);
572
573         if (!br)
574                 return false;
575         if (br->wrap_count != 0)
576                 return true;
577         if (br->btrb->pool)
578                 return true;
579         return false;
580 }
581
582 static void merge_input_pool(struct btr_node *btrn, size_t dest_size)
583 {
584         struct btr_buffer_reference *br, *wbr;
585         int num_refs; /* including wrap buffer */
586         char *buf, *buf1, *buf2 = NULL;
587         size_t sz, sz1, sz2 = 0, wsz;
588
589         if (list_empty(&btrn->input_queue))
590                 return;
591
592         num_refs = 0;
593         FOR_EACH_BUFFER_REF(br, btrn) {
594                 num_refs++;
595                 sz = btr_get_buffer_by_reference(br, &buf);
596                 if (br->wrap_count != 0) {
597                         assert(!wbr);
598                         assert(num_refs == 1);
599                         wbr = br;
600                         if (sz >= dest_size)
601                                 return;
602                         continue;
603                 }
604                 if (!buf1) {
605                         buf1 = buf;
606                         sz1 = sz;
607                         goto next;
608                 }
609                 if (buf1 + sz1 == buf) {
610                         sz1 += sz;
611                         goto next;
612                 }
613                 if (!buf2) {
614                         buf2 = buf;
615                         sz2 = sz;
616                         goto next;
617                 }
618                 assert(buf2 + sz2 == buf);
619                 sz2 += sz;
620 next:
621                 if (sz1 + sz2 >= dest_size)
622                         break;
623         }
624         if (!wbr) {
625                 assert(buf1);
626                 if (!buf2) /* nothing to do */
627                         return;
628                 /* make a new wrap buffer combining buf1 and buf 2. */
629                 sz = sz1 + sz2;
630                 buf = para_malloc(sz);
631                 PARA_DEBUG_LOG("merging input buffers: (%p:%zu, %p:%zu) -> %p:%zu\n",
632                         buf1, sz1, buf2, sz2, buf, sz);
633                 memcpy(buf, buf1, sz1);
634                 memcpy(buf + sz1, buf2, sz2);
635                 br = para_calloc(sizeof(*br));
636                 br->btrb = new_btrb(buf, sz);
637                 br->btrb->refcount = 1;
638                 br->consumed = 0;
639                 /* This is a wrap buffer */
640                 br->wrap_count = sz1;
641                 para_list_add(&br->node, &btrn->input_queue);
642                 return;
643         }
644         /*
645          * We already have a wrap buffer, but it is too small. It might be
646          * partially used.
647          */
648         wsz = br_available_bytes(wbr);
649         if (wbr->wrap_count == sz1 && wbr->btrb->size >= sz1 + sz2) /* nothing we can do about it */
650                 return;
651         assert(buf1 && buf2);
652         sz = sz1 + sz2 - wbr->btrb->size; /* amount of new data */
653         wbr->btrb->size += sz;
654         PARA_DEBUG_LOG("increasing wrap buffer to %zu\n", wbr->btrb->size);
655         wbr->btrb->buf = para_realloc(wbr->btrb->buf, wbr->btrb->size);
656         /* copy the new data to the end of the reallocated buffer */
657         assert(sz2 >= sz);
658         memcpy(wbr->btrb->buf + wbr->btrb->size - sz, buf2 + sz2 - sz, sz);
659 }
660
661 /**
662  * Merge the first two input buffers into one.
663  *
664  * This is a quite expensive operation.
665  *
666  * \return The number of buffers that have been available (zero, one or two).
667  */
668 static int merge_input(struct btr_node *btrn)
669 {
670         struct btr_buffer_reference *brs[2], *br;
671         char *bufs[2], *buf;
672         size_t szs[2], sz;
673         int i;
674
675         if (list_empty(&btrn->input_queue))
676                 return 0;
677         if (list_is_singular(&btrn->input_queue))
678                 return 1;
679         i = 0;
680         /* get references to the first two buffers */
681         FOR_EACH_BUFFER_REF(br, btrn) {
682                 brs[i] = br;
683                 szs[i] = btr_get_buffer_by_reference(brs[i], bufs + i);
684                 i++;
685                 if (i == 2)
686                         break;
687         }
688         /* make a new btrb that combines the two buffers and a br to it. */
689         sz = szs[0] + szs[1];
690         buf = para_malloc(sz);
691         PARA_DEBUG_LOG("memory merging input buffers: (%zu, %zu) -> %zu\n",
692                 szs[0], szs[1], sz);
693         memcpy(buf, bufs[0], szs[0]);
694         memcpy(buf + szs[0], bufs[1], szs[1]);
695
696         br = para_calloc(sizeof(*br));
697         br->btrb = new_btrb(buf, sz);
698         br->btrb->refcount = 1;
699
700         /* replace the first two refs by the new one */
701         btr_drop_buffer_reference(brs[0]);
702         btr_drop_buffer_reference(brs[1]);
703         para_list_add(&br->node, &btrn->input_queue);
704         return 2;
705 }
706
707 void btr_merge(struct btr_node *btrn, size_t dest_size)
708 {
709         if (need_buffer_pool_merge(btrn))
710                 return merge_input_pool(btrn, dest_size);
711         for (;;) {
712                 char *buf;
713                 size_t len = btr_next_buffer(btrn, &buf);
714                 if (len >= dest_size)
715                         return;
716                 PARA_DEBUG_LOG("input size = %zu < %zu = dest\n", len, dest_size);
717                 if (merge_input(btrn) < 2)
718                         return;
719         }
720 }
721
722 bool btr_eof(struct btr_node *btrn)
723 {
724         char *buf;
725         size_t len = btr_next_buffer(btrn, &buf);
726
727         return (len == 0 && btr_no_parent(btrn));
728 }
729
730 void log_tree_recursively(struct btr_node *btrn, int loglevel, int depth)
731 {
732         struct btr_node *ch;
733         const char spaces[] = "                 ", *space = spaces + 16 - depth;
734
735         if (depth > 16)
736                 return;
737         para_log(loglevel, "%s%s\n", space, btrn->name);
738         FOR_EACH_CHILD(ch, btrn)
739                 log_tree_recursively(ch, loglevel, depth + 1);
740 }
741
742 void btr_log_tree(struct btr_node *btrn, int loglevel)
743 {
744         return log_tree_recursively(btrn, loglevel, 0);
745 }
746
747 /** 640K ought to be enough for everybody ;) */
748 #define BTRN_MAX_PENDING (640 * 1024)
749
750 int btr_node_status(struct btr_node *btrn, size_t min_iqs,
751         enum btr_node_type type)
752 {
753         size_t iqs;
754
755         if (type != BTR_NT_LEAF) {
756                 if (btr_no_children(btrn))
757                         return -E_BTR_NO_CHILD;
758                 if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
759                         return 0;
760         }
761         if (type != BTR_NT_ROOT) {
762                 if (btr_eof(btrn))
763                         return -E_BTR_EOF;
764                 iqs = btr_get_input_queue_size(btrn);
765                 if (iqs == 0) /* we have a parent, because not eof */
766                         return 0;
767                 if (iqs < min_iqs && !btr_no_parent(btrn))
768                         return 0;
769         }
770         return 1;
771 }
772
773 void btr_get_node_start(struct btr_node *btrn, struct timeval *tv)
774 {
775         *tv = btrn->start;
776 }