8aa66b6c7efce8a51e37ab2b52f4102f074d936d
[paraslash.git] / buffer_tree.c
1 #include <regex.h>
2 #include <stdbool.h>
3
4 #include "para.h"
5 #include "list.h"
6 #include "string.h"
7 #include "buffer_tree.h"
8 #include "error.h"
9 #include "sched.h"
10
11 struct btr_pool {
12         char *name;
13         char *area_start;
14         char *area_end;
15         char *rhead;
16         char *whead;
17 };
18
19 enum btr_buffer_flags {
20         /* changes the way the buffer is deallocated */
21         BTR_BF_BTR_POOL = 1,
22 };
23
24 struct btr_buffer {
25         char *buf;
26         size_t size;
27         /** The number of references to this buffer. */
28         int refcount;
29         struct btr_pool *pool;
30 };
31
32 struct btr_buffer_reference {
33         struct btr_buffer *btrb;
34         size_t consumed;
35         /* Each buffer reference belongs to the buffer queue list of some buffer tree node. */
36         struct list_head node;
37         size_t wrap_count;
38 };
39
40 struct btr_node {
41         char *name;
42         struct btr_node *parent;
43         /* The position of this btr node in the buffer tree. */
44         struct list_head node;
45         /* The children nodes of this btr node are linked together in a list. */
46         struct list_head children;
47         /* Time of first data transfer. */
48         struct timeval start;
49         /**
50          * The input queue is a list of references to btr buffers. Each item on
51          * the list represents an input buffer which has not been completely
52          * used by this btr node.
53          */
54         struct list_head input_queue;
55         btr_command_handler execute;
56         void *context;
57 };
58
59 struct btr_pool *btr_pool_new(const char *name, size_t area_size)
60 {
61         struct btr_pool *btrp;
62
63         PARA_INFO_LOG("%s, %zu bytes\n", name, area_size);
64         btrp = para_malloc(sizeof(*btrp));
65         btrp->area_start = para_malloc(area_size);
66         btrp->area_end = btrp->area_start + area_size;
67         btrp->rhead = btrp->area_start;
68         btrp->whead = btrp->area_start;
69         btrp->name = para_strdup(name);
70         return btrp;
71 }
72
73 /* whead = NULL means area full */
74
75 void btr_pool_free(struct btr_pool *btrp)
76 {
77         if (!btrp)
78                 return;
79         free(btrp->area_start);
80         free(btrp->name);
81         free(btrp);
82 }
83
84 size_t btr_pool_size(struct btr_pool *btrp)
85 {
86         return btrp->area_end - btrp->area_start;
87 }
88
89 size_t btr_pool_filled(struct btr_pool *btrp)
90 {
91         if (!btrp->whead)
92                 return btr_pool_size(btrp);
93         if (btrp->rhead <= btrp->whead)
94                 return  btrp->whead - btrp->rhead;
95         return btr_pool_size(btrp) - (btrp->rhead - btrp->whead);
96 }
97
98 size_t btr_pool_unused(struct btr_pool *btrp)
99 {
100         return btr_pool_size(btrp) - btr_pool_filled(btrp);
101 }
102
103 size_t btr_pool_available(struct btr_pool *btrp)
104 {
105         if (!btrp->whead)
106                 return 0;
107         if (btrp->rhead <= btrp->whead)
108                 return btrp->area_end - btrp->whead;
109         return btrp->rhead - btrp->whead;
110 }
111
112 size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result)
113 {
114         if (result)
115                 *result = btrp->whead;
116         return btr_pool_available(btrp);
117 }
118
119 void btr_pool_allocate(struct btr_pool *btrp, size_t size)
120 {
121         char *end;
122
123         if (size == 0)
124                 return;
125         //PARA_CRIT_LOG("filled: %zu, alloc %zu\n", btr_pool_filled(btrp), size);
126         assert(size <= btr_pool_available(btrp));
127         end = btrp->whead + size;
128         assert(end <= btrp->area_end);
129
130         if (end == btrp->area_end) {
131                 PARA_DEBUG_LOG("%s: end of pool area reached\n", btrp->name);
132                 end = btrp->area_start;
133         }
134         if (end == btrp->rhead) {
135                 PARA_DEBUG_LOG("btrp buffer full\n");
136                 end = NULL; /* buffer full */
137         }
138         btrp->whead = end;
139         //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
140 }
141
142 static void btr_pool_deallocate(struct btr_pool *btrp, size_t size)
143 {
144         char *end = btrp->rhead + size;
145
146         //PARA_CRIT_LOG("filled: %zu, dealloc %zu\n", btr_pool_filled(btrp), size);
147         if (size == 0)
148                 return;
149         assert(end <= btrp->area_end);
150         assert(size <= btr_pool_filled(btrp));
151         if (end == btrp->area_end)
152                 end = btrp->area_start;
153         if (!btrp->whead)
154                 btrp->whead = btrp->rhead;
155         btrp->rhead = end;
156         if (btrp->rhead == btrp->whead)
157                 btrp->rhead = btrp->whead = btrp->area_start;
158         //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
159 }
160
161 #define FOR_EACH_CHILD(_tn, _btrn) list_for_each_entry((_tn), \
162         &((_btrn)->children), node)
163 #define FOR_EACH_CHILD_SAFE(_tn, _tmp, _btrn) \
164         list_for_each_entry_safe((_tn), (_tmp), &((_btrn)->children), node)
165
166 #define FOR_EACH_BUFFER_REF(_br, _btrn) \
167         list_for_each_entry((_br), &(_btrn)->input_queue, node)
168 #define FOR_EACH_BUFFER_REF_SAFE(_br, _tmp, _btrn) \
169         list_for_each_entry_safe((_br), (_tmp), &(_btrn)->input_queue, node)
170
171 struct btr_node *btr_new_node(const char *name, struct btr_node *parent,
172                 btr_command_handler handler, void *context)
173 {
174         struct btr_node *btrn = para_malloc(sizeof(*btrn));
175
176         btrn->name = para_strdup(name);
177         btrn->parent = parent;
178         btrn->execute = handler;
179         btrn->context = context;
180         btrn->start.tv_sec = 0;
181         btrn->start.tv_usec = 0;
182         if (parent)
183                 list_add_tail(&btrn->node, &parent->children);
184         INIT_LIST_HEAD(&btrn->children);
185         INIT_LIST_HEAD(&btrn->input_queue);
186         if (parent)
187                 PARA_INFO_LOG("added %s as child of %s\n", name, parent->name);
188         else
189                 PARA_INFO_LOG("added %s as btr root\n", name);
190         return btrn;
191 }
192
193 /*
194  * Allocate a new btr buffer.
195  *
196  * The freshly allocated buffer will have a zero refcount and will
197  * not be associated with a btr pool.
198  */
199 static struct btr_buffer *new_btrb(char *buf, size_t size)
200 {
201         struct btr_buffer *btrb = para_calloc(sizeof(*btrb));
202
203         btrb->buf = buf;
204         btrb->size = size;
205         return btrb;
206 }
207
208 static void dealloc_buffer(struct btr_buffer *btrb)
209 {
210         if (btrb->pool)
211                 btr_pool_deallocate(btrb->pool, btrb->size);
212         else
213                 free(btrb->buf);
214 }
215
216 static struct btr_buffer_reference *get_first_input_br(struct btr_node *btrn)
217 {
218         if (list_empty(&btrn->input_queue))
219                 return NULL;
220         return list_first_entry(&btrn->input_queue,
221                 struct btr_buffer_reference, node);
222 }
223
224 /*
225  * Deallocate the reference, release the resources if refcount drops to zero.
226  */
227 static void btr_drop_buffer_reference(struct btr_buffer_reference *br)
228 {
229         struct btr_buffer *btrb = br->btrb;
230
231         //PARA_CRIT_LOG("dropping buffer reference %p\n", br);
232         list_del(&br->node);
233         free(br);
234         btrb->refcount--;
235         if (btrb->refcount == 0) {
236                 dealloc_buffer(btrb);
237                 free(btrb);
238         }
239 }
240
241 static void add_btrb_to_children(struct btr_buffer *btrb,
242                 struct btr_node *btrn, size_t consumed)
243 {
244         struct btr_node *ch;
245
246         if (btrn->start.tv_sec == 0)
247                 btrn->start = *now;
248         FOR_EACH_CHILD(ch, btrn) {
249                 struct btr_buffer_reference *br = para_calloc(sizeof(*br));
250                 br->btrb = btrb;
251                 br->consumed = consumed;
252                 list_add_tail(&br->node, &ch->input_queue);
253                 btrb->refcount++;
254                 if (ch->start.tv_sec == 0)
255                         ch->start = *now;
256         }
257 }
258
259 void btr_add_output(char *buf, size_t size, struct btr_node *btrn)
260 {
261         struct btr_buffer *btrb;
262
263         assert(size != 0);
264         if (list_empty(&btrn->children)) {
265                 free(buf);
266                 return;
267         }
268         btrb = new_btrb(buf, size);
269         add_btrb_to_children(btrb, btrn, 0);
270 }
271
272 void btr_add_output_pool(struct btr_pool *btrp, size_t size,
273                 struct btr_node *btrn)
274 {
275         struct btr_buffer *btrb;
276         char *buf;
277         size_t avail;
278
279         assert(size != 0);
280         if (list_empty(&btrn->children))
281                 return;
282         avail = btr_pool_get_buffer(btrp, &buf);
283         assert(avail >= size);
284         btr_pool_allocate(btrp, size);
285         btrb = new_btrb(buf, size);
286         btrb->pool = btrp;
287         add_btrb_to_children(btrb, btrn, 0);
288 }
289
290 void btr_copy(const void *src, size_t n, struct btr_pool *btrp,
291         struct btr_node *btrn)
292 {
293         char *buf;
294         size_t sz, copy;
295
296         if (n == 0)
297                 return;
298         assert(n <= btr_pool_unused(btrp));
299         sz = btr_pool_get_buffer(btrp, &buf);
300         copy = PARA_MIN(sz, n);
301         memcpy(buf, src, copy);
302         btr_add_output_pool(btrp, copy, btrn);
303         if (copy == n)
304                 return;
305         sz = btr_pool_get_buffer(btrp, &buf);
306         assert(sz >= n - copy);
307         memcpy(buf, src + copy, n - copy);
308         btr_add_output_pool(btrp, n - copy, btrn);
309 }
310
311 static void btr_pushdown_br(struct btr_buffer_reference *br, struct btr_node *btrn)
312 {
313         add_btrb_to_children(br->btrb, btrn, br->consumed);
314         btr_drop_buffer_reference(br);
315 }
316
317 void btr_pushdown(struct btr_node *btrn)
318 {
319         struct btr_buffer_reference *br, *tmp;
320
321         FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
322                 btr_pushdown_br(br, btrn);
323 }
324
325 int btr_pushdown_one(struct btr_node *btrn)
326 {
327         struct btr_buffer_reference *br;
328
329         if (list_empty(&btrn->input_queue))
330                 return 0;
331         br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
332         btr_pushdown_br(br, btrn);
333         return 1;
334 }
335
336 /* Return true if this node has no children. */
337 bool btr_no_children(struct btr_node *btrn)
338 {
339         return list_empty(&btrn->children);
340 }
341
342 bool btr_no_parent(struct btr_node *btrn)
343 {
344         return !btrn->parent;
345 }
346
347 bool btr_inplace_ok(struct btr_node *btrn)
348 {
349         if (!btrn->parent)
350                 return true;
351         return list_is_singular(&btrn->parent->children);
352 }
353
354 static inline size_t br_available_bytes(struct btr_buffer_reference *br)
355 {
356         return br->btrb->size - br->consumed;
357 }
358
359 size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
360 {
361         if (buf)
362                 *buf = br->btrb->buf + br->consumed;
363         return br_available_bytes(br);
364 }
365
366 /**
367  * \return zero if the input buffer queue is empty.
368  */
369 size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
370 {
371         struct btr_buffer_reference *br;
372         char *buf, *result = NULL;
373         size_t sz, rv = 0;
374
375         FOR_EACH_BUFFER_REF(br, btrn) {
376                 sz = btr_get_buffer_by_reference(br, &buf);
377                 if (!result) {
378                         result = buf;
379                         rv = sz;
380                         if (!br->btrb->pool)
381                                 break;
382                         continue;
383                 }
384                 if (!br->btrb->pool)
385                         break;
386                 if (result + rv != buf) {
387                         PARA_DEBUG_LOG("%s: pool merge impossible: %p != %p\n",
388                                 btrn->name, result + rv, buf);
389                         break;
390                 }
391 //              PARA_CRIT_LOG("%s: inplace merge (%zu, %zu)->%zu\n", btrn->name,
392 //                      rv, sz, rv + sz);
393 //              PARA_CRIT_LOG("%s: inplace merge %p (%zu)\n", btrn->name,
394 //                      result, sz);
395                 rv += sz;
396         }
397         if (bufp)
398                 *bufp = result;
399         return rv;
400 }
401
402 void btr_consume(struct btr_node *btrn, size_t numbytes)
403 {
404         struct btr_buffer_reference *br, *tmp;
405         size_t sz;
406
407         if (numbytes == 0)
408                 return;
409         br = get_first_input_br(btrn);
410         assert(br);
411
412         //PARA_CRIT_LOG("wrap count: %zu\n", br->wrap_count);
413         if (br->wrap_count == 0) {
414                 /*
415                  * No wrap buffer. Drop buffer references whose buffer
416                  * has been fully used. */
417                 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn) {
418                         if (br->consumed + numbytes <= br->btrb->size) {
419                                 br->consumed += numbytes;
420                                 if (br->consumed == br->btrb->size)
421                                         btr_drop_buffer_reference(br);
422                                 return;
423                         }
424                         numbytes -= br->btrb->size - br->consumed;
425                         btr_drop_buffer_reference(br);
426                 }
427                 assert(true);
428         }
429         /*
430
431         We have a wrap buffer, consume from it. If in total,
432         i.e. including previous calls to brt_consume(), less than
433         wrap_count has been consumed, there's nothing more we can do.
434
435         Otherwise we drop the wrap buffer and consume from subsequent
436         buffers of the input queue the correct amount of bytes. This
437         is the total number of bytes that have been consumed from the
438         wrap buffer.
439 */
440         PARA_DEBUG_LOG("consuming %zu/%zu bytes from wrap buffer\n", numbytes,
441                 br_available_bytes(br));
442
443         assert(numbytes <= br_available_bytes(br));
444         if (br->consumed + numbytes < br->wrap_count) {
445                 br->consumed += numbytes;
446                 return;
447         }
448         PARA_DEBUG_LOG("dropping wrap buffer (%zu bytes)\n", br->btrb->size);
449         /* get rid of the wrap buffer */
450         sz = br->consumed + numbytes;
451         btr_drop_buffer_reference(br);
452         return btr_consume(btrn, sz);
453 }
454
455 static void flush_input_queue(struct btr_node *btrn)
456 {
457         struct btr_buffer_reference *br, *tmp;
458         FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
459                 btr_drop_buffer_reference(br);
460 }
461
462 void btr_free_node(struct btr_node *btrn)
463 {
464         if (!btrn)
465                 return;
466         free(btrn->name);
467         free(btrn);
468 }
469
470 void btr_remove_node(struct btr_node *btrn)
471 {
472         struct btr_node *ch;
473
474         if (!btrn)
475                 return;
476         PARA_NOTICE_LOG("removing btr node %s from buffer tree\n", btrn->name);
477         FOR_EACH_CHILD(ch, btrn)
478                 ch->parent = NULL;
479         flush_input_queue(btrn);
480         if (btrn->parent)
481                 list_del(&btrn->node);
482 }
483
484 size_t btr_get_input_queue_size(struct btr_node *btrn)
485 {
486         struct btr_buffer_reference *br;
487         size_t size = 0;
488
489         FOR_EACH_BUFFER_REF(br, btrn) {
490                 //PARA_CRIT_LOG("size: %zu\n", size);
491                 size += br_available_bytes(br);
492         }
493         return size;
494 }
495
496 void btr_splice_out_node(struct btr_node *btrn)
497 {
498         struct btr_node *ch, *tmp;
499
500         assert(btrn);
501         PARA_NOTICE_LOG("splicing out %s\n", btrn->name);
502         btr_pushdown(btrn);
503         if (btrn->parent)
504                 list_del(&btrn->node);
505         FOR_EACH_CHILD_SAFE(ch, tmp, btrn) {
506                 PARA_INFO_LOG("parent(%s): %s\n", ch->name,
507                         btrn->parent? btrn->parent->name : "NULL");
508                 ch->parent = btrn->parent;
509                 if (btrn->parent)
510                         list_move(&ch->node, &btrn->parent->children);
511         }
512         assert(list_empty(&btrn->children));
513 }
514
515 /**
516  * Return the size of the largest input queue.
517  *
518  * Iterates over all children of the given node.
519  */
520 size_t btr_bytes_pending(struct btr_node *btrn)
521 {
522         size_t max_size = 0;
523         struct btr_node *ch;
524
525         FOR_EACH_CHILD(ch, btrn) {
526                 size_t size = btr_get_input_queue_size(ch);
527                 max_size = PARA_MAX(max_size, size);
528         }
529         return max_size;
530 }
531
532 int btr_exec(struct btr_node *btrn, const char *command, char **value_result)
533 {
534         if (!btrn)
535                 return -ERRNO_TO_PARA_ERROR(EINVAL);
536         if (!btrn->execute)
537                 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
538         return btrn->execute(btrn, command, value_result);
539 }
540
541 int btr_exec_up(struct btr_node *btrn, const char *command, char **value_result)
542 {
543         int ret;
544
545         for (; btrn; btrn = btrn->parent) {
546                 struct btr_node *parent = btrn->parent;
547                 if (!parent)
548                         return -ERRNO_TO_PARA_ERROR(ENOTSUP);
549                 if (!parent->execute)
550                         continue;
551                 PARA_INFO_LOG("parent: %s, cmd: %s\n", parent->name, command);
552                 ret = parent->execute(parent, command, value_result);
553                 if (ret == -ERRNO_TO_PARA_ERROR(ENOTSUP))
554                         continue;
555                 if (ret < 0)
556                         return ret;
557                 if (value_result && *value_result)
558                         PARA_NOTICE_LOG("%s(%s): %s\n", command, parent->name,
559                                 *value_result);
560                 return 1;
561         }
562         return -ERRNO_TO_PARA_ERROR(ENOTSUP);
563 }
564
565 void *btr_context(struct btr_node *btrn)
566 {
567         return btrn->context;
568 }
569
570 static bool need_buffer_pool_merge(struct btr_node *btrn)
571 {
572         struct btr_buffer_reference *br = get_first_input_br(btrn);
573
574         if (!br)
575                 return false;
576         if (br->wrap_count != 0)
577                 return true;
578         if (br->btrb->pool)
579                 return true;
580         return false;
581 }
582
583 static void merge_input_pool(struct btr_node *btrn, size_t dest_size)
584 {
585         struct btr_buffer_reference *br, *wbr;
586         int num_refs; /* including wrap buffer */
587         char *buf, *buf1, *buf2 = NULL;
588         size_t sz, sz1, sz2 = 0, wsz;
589
590         if (list_empty(&btrn->input_queue))
591                 return;
592
593         num_refs = 0;
594         FOR_EACH_BUFFER_REF(br, btrn) {
595                 num_refs++;
596                 sz = btr_get_buffer_by_reference(br, &buf);
597                 if (br->wrap_count != 0) {
598                         assert(!wbr);
599                         assert(num_refs == 1);
600                         wbr = br;
601                         if (sz >= dest_size)
602                                 return;
603                         continue;
604                 }
605                 if (!buf1) {
606                         buf1 = buf;
607                         sz1 = sz;
608                         goto next;
609                 }
610                 if (buf1 + sz1 == buf) {
611                         sz1 += sz;
612                         goto next;
613                 }
614                 if (!buf2) {
615                         buf2 = buf;
616                         sz2 = sz;
617                         goto next;
618                 }
619                 assert(buf2 + sz2 == buf);
620                 sz2 += sz;
621 next:
622                 if (sz1 + sz2 >= dest_size)
623                         break;
624         }
625         if (!wbr) {
626                 assert(buf1);
627                 if (!buf2) /* nothing to do */
628                         return;
629                 /* make a new wrap buffer combining buf1 and buf 2. */
630                 sz = sz1 + sz2;
631                 buf = para_malloc(sz);
632                 PARA_DEBUG_LOG("merging input buffers: (%p:%zu, %p:%zu) -> %p:%zu\n",
633                         buf1, sz1, buf2, sz2, buf, sz);
634                 memcpy(buf, buf1, sz1);
635                 memcpy(buf + sz1, buf2, sz2);
636                 br = para_calloc(sizeof(*br));
637                 br->btrb = new_btrb(buf, sz);
638                 br->btrb->refcount = 1;
639                 br->consumed = 0;
640                 /* This is a wrap buffer */
641                 br->wrap_count = sz1;
642                 para_list_add(&br->node, &btrn->input_queue);
643                 return;
644         }
645         /*
646          * We already have a wrap buffer, but it is too small. It might be
647          * partially used.
648          */
649         wsz = br_available_bytes(wbr);
650         if (wbr->wrap_count == sz1 && wbr->btrb->size >= sz1 + sz2) /* nothing we can do about it */
651                 return;
652         assert(buf1 && buf2);
653         sz = sz1 + sz2 - wbr->btrb->size; /* amount of new data */
654         wbr->btrb->size += sz;
655         PARA_DEBUG_LOG("increasing wrap buffer to %zu\n", wbr->btrb->size);
656         wbr->btrb->buf = para_realloc(wbr->btrb->buf, wbr->btrb->size);
657         /* copy the new data to the end of the reallocated buffer */
658         assert(sz2 >= sz);
659         memcpy(wbr->btrb->buf + wbr->btrb->size - sz, buf2 + sz2 - sz, sz);
660 }
661
662 /**
663  * Merge the first two input buffers into one.
664  *
665  * This is a quite expensive operation.
666  *
667  * \return The number of buffers that have been available (zero, one or two).
668  */
669 static int merge_input(struct btr_node *btrn)
670 {
671         struct btr_buffer_reference *brs[2], *br;
672         char *bufs[2], *buf;
673         size_t szs[2], sz;
674         int i;
675
676         if (list_empty(&btrn->input_queue))
677                 return 0;
678         if (list_is_singular(&btrn->input_queue))
679                 return 1;
680         i = 0;
681         /* get references to the first two buffers */
682         FOR_EACH_BUFFER_REF(br, btrn) {
683                 brs[i] = br;
684                 szs[i] = btr_get_buffer_by_reference(brs[i], bufs + i);
685                 i++;
686                 if (i == 2)
687                         break;
688         }
689         /* make a new btrb that combines the two buffers and a br to it. */
690         sz = szs[0] + szs[1];
691         buf = para_malloc(sz);
692         PARA_DEBUG_LOG("%s: memory merging input buffers: (%zu, %zu) -> %zu\n",
693                 btrn->name, szs[0], szs[1], sz);
694         memcpy(buf, bufs[0], szs[0]);
695         memcpy(buf + szs[0], bufs[1], szs[1]);
696
697         br = para_calloc(sizeof(*br));
698         br->btrb = new_btrb(buf, sz);
699         br->btrb->refcount = 1;
700
701         /* replace the first two refs by the new one */
702         btr_drop_buffer_reference(brs[0]);
703         btr_drop_buffer_reference(brs[1]);
704         para_list_add(&br->node, &btrn->input_queue);
705         return 2;
706 }
707
708 void btr_merge(struct btr_node *btrn, size_t dest_size)
709 {
710         if (need_buffer_pool_merge(btrn))
711                 return merge_input_pool(btrn, dest_size);
712         for (;;) {
713                 char *buf;
714                 size_t len = btr_next_buffer(btrn, &buf);
715                 if (len >= dest_size)
716                         return;
717                 PARA_DEBUG_LOG("input size = %zu < %zu = dest\n", len, dest_size);
718                 if (merge_input(btrn) < 2)
719                         return;
720         }
721 }
722
723 bool btr_eof(struct btr_node *btrn)
724 {
725         char *buf;
726         size_t len = btr_next_buffer(btrn, &buf);
727
728         return (len == 0 && btr_no_parent(btrn));
729 }
730
731 void log_tree_recursively(struct btr_node *btrn, int loglevel, int depth)
732 {
733         struct btr_node *ch;
734         const char spaces[] = "                 ", *space = spaces + 16 - depth;
735
736         if (depth > 16)
737                 return;
738         para_log(loglevel, "%s%s\n", space, btrn->name);
739         FOR_EACH_CHILD(ch, btrn)
740                 log_tree_recursively(ch, loglevel, depth + 1);
741 }
742
743 void btr_log_tree(struct btr_node *btrn, int loglevel)
744 {
745         return log_tree_recursively(btrn, loglevel, 0);
746 }
747
748 /*
749  * \return \a root if \a name is \p NULL.
750  */
751 struct btr_node *btr_search_node(const char *name, struct btr_node *root)
752 {
753         struct btr_node *ch;
754
755         if (!name)
756                 return root;
757         if (!strcmp(root->name, name))
758                 return root;
759         FOR_EACH_CHILD(ch, root) {
760                 struct btr_node *result = btr_search_node(name, ch);
761                 if (result)
762                         return result;
763         }
764         return NULL;
765 }
766
767 /** 640K ought to be enough for everybody ;) */
768 #define BTRN_MAX_PENDING (640 * 1024)
769
770 int btr_node_status(struct btr_node *btrn, size_t min_iqs,
771         enum btr_node_type type)
772 {
773         size_t iqs;
774
775         if (!btrn)
776                 return 0;
777         if (type != BTR_NT_LEAF) {
778                 if (btr_no_children(btrn))
779                         return -E_BTR_NO_CHILD;
780                 if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
781                         return 0;
782         }
783         if (type != BTR_NT_ROOT) {
784                 if (btr_eof(btrn))
785                         return -E_BTR_EOF;
786                 iqs = btr_get_input_queue_size(btrn);
787                 if (iqs == 0) /* we have a parent, because not eof */
788                         return 0;
789                 if (iqs < min_iqs && !btr_no_parent(btrn))
790                         return 0;
791         }
792         return 1;
793 }
794
795 void btr_get_node_start(struct btr_node *btrn, struct timeval *tv)
796 {
797         *tv = btrn->start;
798 }