Fix stream grabbing.
[paraslash.git] / buffer_tree.c
1 #include <regex.h>
2 #include <stdbool.h>
3
4 #include "para.h"
5 #include "list.h"
6 #include "string.h"
7 #include "buffer_tree.h"
8 #include "error.h"
9 #include "sched.h"
10
11 struct btr_pool {
12 char *name;
13 char *area_start;
14 char *area_end;
15 char *rhead;
16 char *whead;
17 };
18
19 enum btr_buffer_flags {
20 /* changes the way the buffer is deallocated */
21 BTR_BF_BTR_POOL = 1,
22 };
23
24 struct btr_buffer {
25 char *buf;
26 size_t size;
27 /** The number of references to this buffer. */
28 int refcount;
29 struct btr_pool *pool;
30 };
31
32 struct btr_buffer_reference {
33 struct btr_buffer *btrb;
34 size_t consumed;
35 /* Each buffer reference belongs to the buffer queue list of some buffer tree node. */
36 struct list_head node;
37 size_t wrap_count;
38 };
39
40 struct btr_node {
41 char *name;
42 struct btr_node *parent;
43 /* The position of this btr node in the buffer tree. */
44 struct list_head node;
45 /* The children nodes of this btr node are linked together in a list. */
46 struct list_head children;
47 /* Time of first data transfer. */
48 struct timeval start;
49 /**
50 * The input queue is a list of references to btr buffers. Each item on
51 * the list represents an input buffer which has not been completely
52 * used by this btr node.
53 */
54 struct list_head input_queue;
55 btr_command_handler execute;
56 void *context;
57 };
58
59 struct btr_pool *btr_pool_new(const char *name, size_t area_size)
60 {
61 struct btr_pool *btrp;
62
63 PARA_INFO_LOG("%s, %zu bytes\n", name, area_size);
64 btrp = para_malloc(sizeof(*btrp));
65 btrp->area_start = para_malloc(area_size);
66 btrp->area_end = btrp->area_start + area_size;
67 btrp->rhead = btrp->area_start;
68 btrp->whead = btrp->area_start;
69 btrp->name = para_strdup(name);
70 return btrp;
71 }
72
73 /* whead = NULL means area full */
74
75 void btr_pool_free(struct btr_pool *btrp)
76 {
77 if (!btrp)
78 return;
79 free(btrp->area_start);
80 free(btrp->name);
81 free(btrp);
82 }
83
84 size_t btr_pool_size(struct btr_pool *btrp)
85 {
86 return btrp->area_end - btrp->area_start;
87 }
88
89 size_t btr_pool_filled(struct btr_pool *btrp)
90 {
91 if (!btrp->whead)
92 return btr_pool_size(btrp);
93 if (btrp->rhead <= btrp->whead)
94 return btrp->whead - btrp->rhead;
95 return btr_pool_size(btrp) - (btrp->rhead - btrp->whead);
96 }
97
98 size_t btr_pool_unused(struct btr_pool *btrp)
99 {
100 return btr_pool_size(btrp) - btr_pool_filled(btrp);
101 }
102
103 size_t btr_pool_available(struct btr_pool *btrp)
104 {
105 if (!btrp->whead)
106 return 0;
107 if (btrp->rhead <= btrp->whead)
108 return btrp->area_end - btrp->whead;
109 return btrp->rhead - btrp->whead;
110 }
111
112 size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result)
113 {
114 if (result)
115 *result = btrp->whead;
116 return btr_pool_available(btrp);
117 }
118
119 void btr_pool_allocate(struct btr_pool *btrp, size_t size)
120 {
121 char *end;
122
123 if (size == 0)
124 return;
125 //PARA_CRIT_LOG("filled: %zu, alloc %zu\n", btr_pool_filled(btrp), size);
126 assert(size <= btr_pool_available(btrp));
127 end = btrp->whead + size;
128 assert(end <= btrp->area_end);
129
130 if (end == btrp->area_end) {
131 PARA_DEBUG_LOG("%s: end of pool area reached\n", btrp->name);
132 end = btrp->area_start;
133 }
134 if (end == btrp->rhead) {
135 PARA_DEBUG_LOG("btrp buffer full\n");
136 end = NULL; /* buffer full */
137 }
138 btrp->whead = end;
139 //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
140 }
141
142 static void btr_pool_deallocate(struct btr_pool *btrp, size_t size)
143 {
144 char *end = btrp->rhead + size;
145
146 //PARA_CRIT_LOG("filled: %zu, dealloc %zu\n", btr_pool_filled(btrp), size);
147 if (size == 0)
148 return;
149 assert(end <= btrp->area_end);
150 assert(size <= btr_pool_filled(btrp));
151 if (end == btrp->area_end)
152 end = btrp->area_start;
153 if (!btrp->whead)
154 btrp->whead = btrp->rhead;
155 btrp->rhead = end;
156 if (btrp->rhead == btrp->whead)
157 btrp->rhead = btrp->whead = btrp->area_start;
158 //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
159 }
160
161 #define FOR_EACH_CHILD(_tn, _btrn) list_for_each_entry((_tn), \
162 &((_btrn)->children), node)
163 #define FOR_EACH_CHILD_SAFE(_tn, _tmp, _btrn) \
164 list_for_each_entry_safe((_tn), (_tmp), &((_btrn)->children), node)
165
166 #define FOR_EACH_BUFFER_REF(_br, _btrn) \
167 list_for_each_entry((_br), &(_btrn)->input_queue, node)
168 #define FOR_EACH_BUFFER_REF_SAFE(_br, _tmp, _btrn) \
169 list_for_each_entry_safe((_br), (_tmp), &(_btrn)->input_queue, node)
170
171 struct btr_node *btr_new_node(const char *name, struct btr_node *parent,
172 btr_command_handler handler, void *context)
173 {
174 struct btr_node *btrn = para_malloc(sizeof(*btrn));
175
176 btrn->name = para_strdup(name);
177 btrn->parent = parent;
178 btrn->execute = handler;
179 btrn->context = context;
180 btrn->start.tv_sec = 0;
181 btrn->start.tv_usec = 0;
182 if (parent)
183 list_add_tail(&btrn->node, &parent->children);
184 INIT_LIST_HEAD(&btrn->children);
185 INIT_LIST_HEAD(&btrn->input_queue);
186 if (parent)
187 PARA_INFO_LOG("added %s as child of %s\n", name, parent->name);
188 else
189 PARA_INFO_LOG("added %s as btr root\n", name);
190 return btrn;
191 }
192
193 /*
194 * Allocate a new btr buffer.
195 *
196 * The freshly allocated buffer will have a zero refcount and will
197 * not be associated with a btr pool.
198 */
199 static struct btr_buffer *new_btrb(char *buf, size_t size)
200 {
201 struct btr_buffer *btrb = para_calloc(sizeof(*btrb));
202
203 btrb->buf = buf;
204 btrb->size = size;
205 return btrb;
206 }
207
208 static void dealloc_buffer(struct btr_buffer *btrb)
209 {
210 if (btrb->pool)
211 btr_pool_deallocate(btrb->pool, btrb->size);
212 else
213 free(btrb->buf);
214 }
215
216 static struct btr_buffer_reference *get_first_input_br(struct btr_node *btrn)
217 {
218 if (list_empty(&btrn->input_queue))
219 return NULL;
220 return list_first_entry(&btrn->input_queue,
221 struct btr_buffer_reference, node);
222 }
223
224 /*
225 * Deallocate the reference, release the resources if refcount drops to zero.
226 */
227 static void btr_drop_buffer_reference(struct btr_buffer_reference *br)
228 {
229 struct btr_buffer *btrb = br->btrb;
230
231 //PARA_CRIT_LOG("dropping buffer reference %p\n", br);
232 list_del(&br->node);
233 free(br);
234 btrb->refcount--;
235 if (btrb->refcount == 0) {
236 dealloc_buffer(btrb);
237 free(btrb);
238 }
239 }
240
241 static void add_btrb_to_children(struct btr_buffer *btrb,
242 struct btr_node *btrn, size_t consumed)
243 {
244 struct btr_node *ch;
245
246 if (btrn->start.tv_sec == 0)
247 btrn->start = *now;
248 FOR_EACH_CHILD(ch, btrn) {
249 struct btr_buffer_reference *br = para_calloc(sizeof(*br));
250 br->btrb = btrb;
251 br->consumed = consumed;
252 list_add_tail(&br->node, &ch->input_queue);
253 btrb->refcount++;
254 if (ch->start.tv_sec == 0)
255 ch->start = *now;
256 }
257 }
258
259 void btr_add_output(char *buf, size_t size, struct btr_node *btrn)
260 {
261 struct btr_buffer *btrb;
262
263 assert(size != 0);
264 if (list_empty(&btrn->children)) {
265 free(buf);
266 return;
267 }
268 btrb = new_btrb(buf, size);
269 add_btrb_to_children(btrb, btrn, 0);
270 }
271
272 void btr_add_output_pool(struct btr_pool *btrp, size_t size,
273 struct btr_node *btrn)
274 {
275 struct btr_buffer *btrb;
276 char *buf;
277 size_t avail;
278
279 assert(size != 0);
280 if (list_empty(&btrn->children))
281 return;
282 avail = btr_pool_get_buffer(btrp, &buf);
283 assert(avail >= size);
284 btr_pool_allocate(btrp, size);
285 btrb = new_btrb(buf, size);
286 btrb->pool = btrp;
287 add_btrb_to_children(btrb, btrn, 0);
288 }
289
290 void btr_copy(const void *src, size_t n, struct btr_pool *btrp,
291 struct btr_node *btrn)
292 {
293 char *buf;
294 size_t sz, copy;
295
296 if (n == 0)
297 return;
298 assert(n <= btr_pool_unused(btrp));
299 sz = btr_pool_get_buffer(btrp, &buf);
300 copy = PARA_MIN(sz, n);
301 memcpy(buf, src, copy);
302 btr_add_output_pool(btrp, copy, btrn);
303 if (copy == n)
304 return;
305 sz = btr_pool_get_buffer(btrp, &buf);
306 assert(sz >= n - copy);
307 memcpy(buf, src + copy, n - copy);
308 btr_add_output_pool(btrp, n - copy, btrn);
309 }
310
311 static void btr_pushdown_br(struct btr_buffer_reference *br, struct btr_node *btrn)
312 {
313 add_btrb_to_children(br->btrb, btrn, br->consumed);
314 btr_drop_buffer_reference(br);
315 }
316
317 void btr_pushdown(struct btr_node *btrn)
318 {
319 struct btr_buffer_reference *br, *tmp;
320
321 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
322 btr_pushdown_br(br, btrn);
323 }
324
325 int btr_pushdown_one(struct btr_node *btrn)
326 {
327 struct btr_buffer_reference *br;
328
329 if (list_empty(&btrn->input_queue))
330 return 0;
331 br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
332 btr_pushdown_br(br, btrn);
333 return 1;
334 }
335
336 /* Return true if this node has no children. */
337 bool btr_no_children(struct btr_node *btrn)
338 {
339 return list_empty(&btrn->children);
340 }
341
342 bool btr_no_parent(struct btr_node *btrn)
343 {
344 return !btrn->parent;
345 }
346
347 bool btr_inplace_ok(struct btr_node *btrn)
348 {
349 if (!btrn->parent)
350 return true;
351 return list_is_singular(&btrn->parent->children);
352 }
353
354 static inline size_t br_available_bytes(struct btr_buffer_reference *br)
355 {
356 return br->btrb->size - br->consumed;
357 }
358
359 size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
360 {
361 if (buf)
362 *buf = br->btrb->buf + br->consumed;
363 return br_available_bytes(br);
364 }
365
366 /**
367 * \return zero if the input buffer queue is empty.
368 */
369 size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
370 {
371 struct btr_buffer_reference *br;
372 char *buf, *result = NULL;
373 size_t sz, rv = 0;
374
375 FOR_EACH_BUFFER_REF(br, btrn) {
376 sz = btr_get_buffer_by_reference(br, &buf);
377 if (!result) {
378 result = buf;
379 rv = sz;
380 if (!br->btrb->pool)
381 break;
382 continue;
383 }
384 if (!br->btrb->pool)
385 break;
386 if (result + rv != buf) {
387 PARA_DEBUG_LOG("%s: pool merge impossible: %p != %p\n",
388 btrn->name, result + rv, buf);
389 break;
390 }
391 // PARA_CRIT_LOG("%s: inplace merge (%zu, %zu)->%zu\n", btrn->name,
392 // rv, sz, rv + sz);
393 // PARA_CRIT_LOG("%s: inplace merge %p (%zu)\n", btrn->name,
394 // result, sz);
395 rv += sz;
396 }
397 if (bufp)
398 *bufp = result;
399 return rv;
400 }
401
402 void btr_consume(struct btr_node *btrn, size_t numbytes)
403 {
404 struct btr_buffer_reference *br, *tmp;
405 size_t sz;
406
407 if (numbytes == 0)
408 return;
409 br = get_first_input_br(btrn);
410 assert(br);
411
412 //PARA_CRIT_LOG("wrap count: %zu\n", br->wrap_count);
413 if (br->wrap_count == 0) {
414 /*
415 * No wrap buffer. Drop buffer references whose buffer
416 * has been fully used. */
417 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn) {
418 if (br->consumed + numbytes <= br->btrb->size) {
419 br->consumed += numbytes;
420 if (br->consumed == br->btrb->size)
421 btr_drop_buffer_reference(br);
422 return;
423 }
424 numbytes -= br->btrb->size - br->consumed;
425 btr_drop_buffer_reference(br);
426 }
427 assert(true);
428 }
429 /*
430
431 We have a wrap buffer, consume from it. If in total,
432 i.e. including previous calls to brt_consume(), less than
433 wrap_count has been consumed, there's nothing more we can do.
434
435 Otherwise we drop the wrap buffer and consume from subsequent
436 buffers of the input queue the correct amount of bytes. This
437 is the total number of bytes that have been consumed from the
438 wrap buffer.
439 */
440 PARA_DEBUG_LOG("consuming %zu/%zu bytes from wrap buffer\n", numbytes,
441 br_available_bytes(br));
442
443 assert(numbytes <= br_available_bytes(br));
444 if (br->consumed + numbytes < br->wrap_count) {
445 br->consumed += numbytes;
446 return;
447 }
448 PARA_DEBUG_LOG("dropping wrap buffer (%zu bytes)\n", br->btrb->size);
449 /* get rid of the wrap buffer */
450 sz = br->consumed + numbytes;
451 btr_drop_buffer_reference(br);
452 return btr_consume(btrn, sz);
453 }
454
455 static void flush_input_queue(struct btr_node *btrn)
456 {
457 struct btr_buffer_reference *br, *tmp;
458 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
459 btr_drop_buffer_reference(br);
460 }
461
462 void btr_free_node(struct btr_node *btrn)
463 {
464 if (!btrn)
465 return;
466 free(btrn->name);
467 free(btrn);
468 }
469
470 void btr_remove_node(struct btr_node *btrn)
471 {
472 struct btr_node *ch;
473
474 if (!btrn)
475 return;
476 PARA_NOTICE_LOG("removing btr node %s from buffer tree\n", btrn->name);
477 FOR_EACH_CHILD(ch, btrn)
478 ch->parent = NULL;
479 flush_input_queue(btrn);
480 if (btrn->parent)
481 list_del(&btrn->node);
482 }
483
484 size_t btr_get_input_queue_size(struct btr_node *btrn)
485 {
486 struct btr_buffer_reference *br;
487 size_t size = 0;
488
489 FOR_EACH_BUFFER_REF(br, btrn) {
490 //PARA_CRIT_LOG("size: %zu\n", size);
491 size += br_available_bytes(br);
492 }
493 return size;
494 }
495
496 void btr_splice_out_node(struct btr_node *btrn)
497 {
498 struct btr_node *ch, *tmp;
499
500 assert(btrn);
501 PARA_NOTICE_LOG("splicing out %s\n", btrn->name);
502 btr_pushdown(btrn);
503 if (btrn->parent)
504 list_del(&btrn->node);
505 FOR_EACH_CHILD_SAFE(ch, tmp, btrn) {
506 PARA_INFO_LOG("parent(%s): %s\n", ch->name,
507 btrn->parent? btrn->parent->name : "NULL");
508 ch->parent = btrn->parent;
509 if (btrn->parent)
510 list_move(&ch->node, &btrn->parent->children);
511 }
512 assert(list_empty(&btrn->children));
513 }
514
515 /**
516 * Return the size of the largest input queue.
517 *
518 * Iterates over all children of the given node.
519 */
520 size_t btr_bytes_pending(struct btr_node *btrn)
521 {
522 size_t max_size = 0;
523 struct btr_node *ch;
524
525 FOR_EACH_CHILD(ch, btrn) {
526 size_t size = btr_get_input_queue_size(ch);
527 max_size = PARA_MAX(max_size, size);
528 }
529 return max_size;
530 }
531
532 int btr_exec(struct btr_node *btrn, const char *command, char **value_result)
533 {
534 if (!btrn)
535 return -ERRNO_TO_PARA_ERROR(EINVAL);
536 if (!btrn->execute)
537 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
538 return btrn->execute(btrn, command, value_result);
539 }
540
541 int btr_exec_up(struct btr_node *btrn, const char *command, char **value_result)
542 {
543 int ret;
544
545 for (; btrn; btrn = btrn->parent) {
546 struct btr_node *parent = btrn->parent;
547 if (!parent)
548 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
549 if (!parent->execute)
550 continue;
551 PARA_INFO_LOG("parent: %s, cmd: %s\n", parent->name, command);
552 ret = parent->execute(parent, command, value_result);
553 if (ret == -ERRNO_TO_PARA_ERROR(ENOTSUP))
554 continue;
555 if (ret < 0)
556 return ret;
557 if (value_result && *value_result)
558 PARA_NOTICE_LOG("%s(%s): %s\n", command, parent->name,
559 *value_result);
560 return 1;
561 }
562 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
563 }
564
565 void *btr_context(struct btr_node *btrn)
566 {
567 return btrn->context;
568 }
569
570 static bool need_buffer_pool_merge(struct btr_node *btrn)
571 {
572 struct btr_buffer_reference *br = get_first_input_br(btrn);
573
574 if (!br)
575 return false;
576 if (br->wrap_count != 0)
577 return true;
578 if (br->btrb->pool)
579 return true;
580 return false;
581 }
582
583 static void merge_input_pool(struct btr_node *btrn, size_t dest_size)
584 {
585 struct btr_buffer_reference *br, *wbr;
586 int num_refs; /* including wrap buffer */
587 char *buf, *buf1, *buf2 = NULL;
588 size_t sz, sz1, sz2 = 0, wsz;
589
590 if (list_empty(&btrn->input_queue))
591 return;
592
593 num_refs = 0;
594 FOR_EACH_BUFFER_REF(br, btrn) {
595 num_refs++;
596 sz = btr_get_buffer_by_reference(br, &buf);
597 if (br->wrap_count != 0) {
598 assert(!wbr);
599 assert(num_refs == 1);
600 wbr = br;
601 if (sz >= dest_size)
602 return;
603 continue;
604 }
605 if (!buf1) {
606 buf1 = buf;
607 sz1 = sz;
608 goto next;
609 }
610 if (buf1 + sz1 == buf) {
611 sz1 += sz;
612 goto next;
613 }
614 if (!buf2) {
615 buf2 = buf;
616 sz2 = sz;
617 goto next;
618 }
619 assert(buf2 + sz2 == buf);
620 sz2 += sz;
621 next:
622 if (sz1 + sz2 >= dest_size)
623 break;
624 }
625 if (!wbr) {
626 assert(buf1);
627 if (!buf2) /* nothing to do */
628 return;
629 /* make a new wrap buffer combining buf1 and buf 2. */
630 sz = sz1 + sz2;
631 buf = para_malloc(sz);
632 PARA_DEBUG_LOG("merging input buffers: (%p:%zu, %p:%zu) -> %p:%zu\n",
633 buf1, sz1, buf2, sz2, buf, sz);
634 memcpy(buf, buf1, sz1);
635 memcpy(buf + sz1, buf2, sz2);
636 br = para_calloc(sizeof(*br));
637 br->btrb = new_btrb(buf, sz);
638 br->btrb->refcount = 1;
639 br->consumed = 0;
640 /* This is a wrap buffer */
641 br->wrap_count = sz1;
642 para_list_add(&br->node, &btrn->input_queue);
643 return;
644 }
645 /*
646 * We already have a wrap buffer, but it is too small. It might be
647 * partially used.
648 */
649 wsz = br_available_bytes(wbr);
650 if (wbr->wrap_count == sz1 && wbr->btrb->size >= sz1 + sz2) /* nothing we can do about it */
651 return;
652 assert(buf1 && buf2);
653 sz = sz1 + sz2 - wbr->btrb->size; /* amount of new data */
654 wbr->btrb->size += sz;
655 PARA_DEBUG_LOG("increasing wrap buffer to %zu\n", wbr->btrb->size);
656 wbr->btrb->buf = para_realloc(wbr->btrb->buf, wbr->btrb->size);
657 /* copy the new data to the end of the reallocated buffer */
658 assert(sz2 >= sz);
659 memcpy(wbr->btrb->buf + wbr->btrb->size - sz, buf2 + sz2 - sz, sz);
660 }
661
662 /**
663 * Merge the first two input buffers into one.
664 *
665 * This is a quite expensive operation.
666 *
667 * \return The number of buffers that have been available (zero, one or two).
668 */
669 static int merge_input(struct btr_node *btrn)
670 {
671 struct btr_buffer_reference *brs[2], *br;
672 char *bufs[2], *buf;
673 size_t szs[2], sz;
674 int i;
675
676 if (list_empty(&btrn->input_queue))
677 return 0;
678 if (list_is_singular(&btrn->input_queue))
679 return 1;
680 i = 0;
681 /* get references to the first two buffers */
682 FOR_EACH_BUFFER_REF(br, btrn) {
683 brs[i] = br;
684 szs[i] = btr_get_buffer_by_reference(brs[i], bufs + i);
685 i++;
686 if (i == 2)
687 break;
688 }
689 /* make a new btrb that combines the two buffers and a br to it. */
690 sz = szs[0] + szs[1];
691 buf = para_malloc(sz);
692 PARA_DEBUG_LOG("%s: memory merging input buffers: (%zu, %zu) -> %zu\n",
693 btrn->name, szs[0], szs[1], sz);
694 memcpy(buf, bufs[0], szs[0]);
695 memcpy(buf + szs[0], bufs[1], szs[1]);
696
697 br = para_calloc(sizeof(*br));
698 br->btrb = new_btrb(buf, sz);
699 br->btrb->refcount = 1;
700
701 /* replace the first two refs by the new one */
702 btr_drop_buffer_reference(brs[0]);
703 btr_drop_buffer_reference(brs[1]);
704 para_list_add(&br->node, &btrn->input_queue);
705 return 2;
706 }
707
708 void btr_merge(struct btr_node *btrn, size_t dest_size)
709 {
710 if (need_buffer_pool_merge(btrn))
711 return merge_input_pool(btrn, dest_size);
712 for (;;) {
713 char *buf;
714 size_t len = btr_next_buffer(btrn, &buf);
715 if (len >= dest_size)
716 return;
717 PARA_DEBUG_LOG("input size = %zu < %zu = dest\n", len, dest_size);
718 if (merge_input(btrn) < 2)
719 return;
720 }
721 }
722
723 bool btr_eof(struct btr_node *btrn)
724 {
725 char *buf;
726 size_t len = btr_next_buffer(btrn, &buf);
727
728 return (len == 0 && btr_no_parent(btrn));
729 }
730
731 void log_tree_recursively(struct btr_node *btrn, int loglevel, int depth)
732 {
733 struct btr_node *ch;
734 const char spaces[] = " ", *space = spaces + 16 - depth;
735
736 if (depth > 16)
737 return;
738 para_log(loglevel, "%s%s\n", space, btrn->name);
739 FOR_EACH_CHILD(ch, btrn)
740 log_tree_recursively(ch, loglevel, depth + 1);
741 }
742
743 void btr_log_tree(struct btr_node *btrn, int loglevel)
744 {
745 return log_tree_recursively(btrn, loglevel, 0);
746 }
747
748 /*
749 * \return \a root if \a name is \p NULL.
750 */
751 struct btr_node *btr_search_node(const char *name, struct btr_node *root)
752 {
753 struct btr_node *ch;
754
755 if (!name)
756 return root;
757 if (!strcmp(root->name, name))
758 return root;
759 FOR_EACH_CHILD(ch, root) {
760 struct btr_node *result = btr_search_node(name, ch);
761 if (result)
762 return result;
763 }
764 return NULL;
765 }
766
767 /** 640K ought to be enough for everybody ;) */
768 #define BTRN_MAX_PENDING (640 * 1024)
769
770 int btr_node_status(struct btr_node *btrn, size_t min_iqs,
771 enum btr_node_type type)
772 {
773 size_t iqs;
774
775 if (!btrn)
776 return 0;
777 if (type != BTR_NT_LEAF) {
778 if (btr_no_children(btrn))
779 return -E_BTR_NO_CHILD;
780 if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
781 return 0;
782 }
783 if (type != BTR_NT_ROOT) {
784 if (btr_eof(btrn))
785 return -E_BTR_EOF;
786 iqs = btr_get_input_queue_size(btrn);
787 if (iqs == 0) /* we have a parent, because not eof */
788 return 0;
789 if (iqs < min_iqs && !btr_no_parent(btrn))
790 return 0;
791 }
792 return 1;
793 }
794
795 void btr_get_node_start(struct btr_node *btrn, struct timeval *tv)
796 {
797 *tv = btrn->start;
798 }