btr support for para_client.
[paraslash.git] / buffer_tree.c
1 #include <regex.h>
2 #include <stdbool.h>
3
4 #include "para.h"
5 #include "list.h"
6 #include "string.h"
7 #include "buffer_tree.h"
8 #include "error.h"
9 #include "sched.h"
10
11 struct btr_pool {
12 char *name;
13 char *area_start;
14 char *area_end;
15 char *rhead;
16 char *whead;
17 };
18
19 enum btr_buffer_flags {
20 /* changes the way the buffer is deallocated */
21 BTR_BF_BTR_POOL = 1,
22 };
23
24 struct btr_buffer {
25 char *buf;
26 size_t size;
27 /** The number of references to this buffer. */
28 int refcount;
29 struct btr_pool *pool;
30 };
31
32 struct btr_buffer_reference {
33 struct btr_buffer *btrb;
34 size_t consumed;
35 /* Each buffer reference belongs to the buffer queue list of some buffer tree node. */
36 struct list_head node;
37 size_t wrap_count;
38 };
39
40 struct btr_node {
41 char *name;
42 struct btr_node *parent;
43 /* The position of this btr node in the buffer tree. */
44 struct list_head node;
45 /* The children nodes of this btr node are linked together in a list. */
46 struct list_head children;
47 /* Time of first data transfer. */
48 struct timeval start;
49 /**
50 * The input queue is a list of references to btr buffers. Each item on
51 * the list represents an input buffer which has not been completely
52 * used by this btr node.
53 */
54 struct list_head input_queue;
55 btr_command_handler execute;
56 void *context;
57 };
58
59 struct btr_pool *btr_pool_new(const char *name, size_t area_size)
60 {
61 struct btr_pool *btrp;
62
63 PARA_INFO_LOG("%s, %zu bytes\n", name, area_size);
64 btrp = para_malloc(sizeof(*btrp));
65 btrp->area_start = para_malloc(area_size);
66 btrp->area_end = btrp->area_start + area_size;
67 btrp->rhead = btrp->area_start;
68 btrp->whead = btrp->area_start;
69 btrp->name = para_strdup(name);
70 return btrp;
71 }
72
73 /* whead = NULL means area full */
74
75 void btr_pool_free(struct btr_pool *btrp)
76 {
77 if (!btrp)
78 return;
79 free(btrp->area_start);
80 free(btrp->name);
81 free(btrp);
82 }
83
84 size_t btr_pool_size(struct btr_pool *btrp)
85 {
86 return btrp->area_end - btrp->area_start;
87 }
88
89 size_t btr_pool_filled(struct btr_pool *btrp)
90 {
91 if (!btrp->whead)
92 return btr_pool_size(btrp);
93 if (btrp->rhead <= btrp->whead)
94 return btrp->whead - btrp->rhead;
95 return btr_pool_size(btrp) - (btrp->rhead - btrp->whead);
96 }
97
98 size_t btr_pool_unused(struct btr_pool *btrp)
99 {
100 return btr_pool_size(btrp) - btr_pool_filled(btrp);
101 }
102
103 /*
104 * Return maximal size available for one read. This is
105 * smaller than the value returned by btr_pool_unused().
106 */
107 size_t btr_pool_available(struct btr_pool *btrp)
108 {
109 if (!btrp->whead)
110 return 0;
111 if (btrp->rhead <= btrp->whead)
112 return btrp->area_end - btrp->whead;
113 return btrp->rhead - btrp->whead;
114 }
115
116 size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result)
117 {
118 if (result)
119 *result = btrp->whead;
120 return btr_pool_available(btrp);
121 }
122
123 void btr_pool_allocate(struct btr_pool *btrp, size_t size)
124 {
125 char *end;
126
127 if (size == 0)
128 return;
129 assert(size <= btr_pool_available(btrp));
130 end = btrp->whead + size;
131 assert(end <= btrp->area_end);
132
133 if (end == btrp->area_end) {
134 PARA_DEBUG_LOG("%s: end of pool area reached\n", btrp->name);
135 end = btrp->area_start;
136 }
137 if (end == btrp->rhead) {
138 PARA_DEBUG_LOG("%s btrp buffer full\n", btrp->name);
139 end = NULL; /* buffer full */
140 }
141 btrp->whead = end;
142 }
143
144 static void btr_pool_deallocate(struct btr_pool *btrp, size_t size)
145 {
146 char *end = btrp->rhead + size;
147
148 if (size == 0)
149 return;
150 assert(end <= btrp->area_end);
151 assert(size <= btr_pool_filled(btrp));
152 if (end == btrp->area_end)
153 end = btrp->area_start;
154 if (!btrp->whead)
155 btrp->whead = btrp->rhead;
156 btrp->rhead = end;
157 if (btrp->rhead == btrp->whead)
158 btrp->rhead = btrp->whead = btrp->area_start;
159 }
160
161 #define FOR_EACH_CHILD(_tn, _btrn) list_for_each_entry((_tn), \
162 &((_btrn)->children), node)
163 #define FOR_EACH_CHILD_SAFE(_tn, _tmp, _btrn) \
164 list_for_each_entry_safe((_tn), (_tmp), &((_btrn)->children), node)
165
166 #define FOR_EACH_BUFFER_REF(_br, _btrn) \
167 list_for_each_entry((_br), &(_btrn)->input_queue, node)
168 #define FOR_EACH_BUFFER_REF_SAFE(_br, _tmp, _btrn) \
169 list_for_each_entry_safe((_br), (_tmp), &(_btrn)->input_queue, node)
170
171 /*
172 (parent, child):
173 (NULL, NULL): new, isolated node.
174 (p, NULL): new leaf node
175 (NULL, c): new node becomes root, c must be old root
176 (p, c): new internal node, ch must be child of p, not yet implemented
177
178 */
179 struct btr_node *btr_new_node(struct btr_node_description *bnd)
180 {
181 struct btr_node *btrn = para_malloc(sizeof(*btrn));
182
183 btrn->name = para_strdup(bnd->name);
184 btrn->parent = bnd->parent;
185 btrn->execute = bnd->handler;
186 btrn->context = bnd->context;
187 btrn->start.tv_sec = 0;
188 btrn->start.tv_usec = 0;
189 INIT_LIST_HEAD(&btrn->children);
190 INIT_LIST_HEAD(&btrn->input_queue);
191 if (!bnd->child) {
192 if (bnd->parent) {
193 list_add_tail(&btrn->node, &bnd->parent->children);
194 PARA_INFO_LOG("new leaf node: %s (child of %s)\n",
195 bnd->name, bnd->parent->name);
196 } else
197 PARA_INFO_LOG("added %s as btr root\n", bnd->name);
198 goto out;
199 }
200 if (!bnd->parent) {
201 assert(!bnd->child->parent);
202 PARA_INFO_LOG("new root: %s (was %s)\n",
203 bnd->name, bnd->child->name);
204 btrn->parent = NULL;
205 list_add_tail(&bnd->child->node, &btrn->children);
206 /* link it in */
207 bnd->child->parent = btrn;
208 goto out;
209 }
210 PARA_EMERG_LOG("inserting internal nodes not yet supported.\n");
211 exit(EXIT_FAILURE);
212 assert(bnd->child->parent == bnd->parent);
213 out:
214 return btrn;
215 }
216
217 /*
218 * Allocate a new btr buffer.
219 *
220 * The freshly allocated buffer will have a zero refcount and will
221 * not be associated with a btr pool.
222 */
223 static struct btr_buffer *new_btrb(char *buf, size_t size)
224 {
225 struct btr_buffer *btrb = para_calloc(sizeof(*btrb));
226
227 btrb->buf = buf;
228 btrb->size = size;
229 return btrb;
230 }
231
232 static void dealloc_buffer(struct btr_buffer *btrb)
233 {
234 if (btrb->pool)
235 btr_pool_deallocate(btrb->pool, btrb->size);
236 else
237 free(btrb->buf);
238 }
239
240 static struct btr_buffer_reference *get_first_input_br(struct btr_node *btrn)
241 {
242 if (list_empty(&btrn->input_queue))
243 return NULL;
244 return list_first_entry(&btrn->input_queue,
245 struct btr_buffer_reference, node);
246 }
247
248 /*
249 * Deallocate the reference, release the resources if refcount drops to zero.
250 */
251 static void btr_drop_buffer_reference(struct btr_buffer_reference *br)
252 {
253 struct btr_buffer *btrb = br->btrb;
254
255 list_del(&br->node);
256 free(br);
257 btrb->refcount--;
258 if (btrb->refcount == 0) {
259 dealloc_buffer(btrb);
260 free(btrb);
261 }
262 }
263
264 static void add_btrb_to_children(struct btr_buffer *btrb,
265 struct btr_node *btrn, size_t consumed)
266 {
267 struct btr_node *ch;
268
269 if (btrn->start.tv_sec == 0)
270 btrn->start = *now;
271 FOR_EACH_CHILD(ch, btrn) {
272 struct btr_buffer_reference *br = para_calloc(sizeof(*br));
273 br->btrb = btrb;
274 br->consumed = consumed;
275 list_add_tail(&br->node, &ch->input_queue);
276 btrb->refcount++;
277 if (ch->start.tv_sec == 0)
278 ch->start = *now;
279 }
280 }
281
282 void btr_add_output(char *buf, size_t size, struct btr_node *btrn)
283 {
284 struct btr_buffer *btrb;
285
286 assert(size != 0);
287 if (list_empty(&btrn->children)) {
288 free(buf);
289 return;
290 }
291 btrb = new_btrb(buf, size);
292 add_btrb_to_children(btrb, btrn, 0);
293 }
294
295 void btr_add_output_pool(struct btr_pool *btrp, size_t size,
296 struct btr_node *btrn)
297 {
298 struct btr_buffer *btrb;
299 char *buf;
300 size_t avail;
301
302 assert(size != 0);
303 if (list_empty(&btrn->children))
304 return;
305 avail = btr_pool_get_buffer(btrp, &buf);
306 assert(avail >= size);
307 btr_pool_allocate(btrp, size);
308 btrb = new_btrb(buf, size);
309 btrb->pool = btrp;
310 add_btrb_to_children(btrb, btrn, 0);
311 }
312
313 void btr_copy(const void *src, size_t n, struct btr_pool *btrp,
314 struct btr_node *btrn)
315 {
316 char *buf;
317 size_t sz, copy;
318
319 if (n == 0)
320 return;
321 assert(n <= btr_pool_unused(btrp));
322 sz = btr_pool_get_buffer(btrp, &buf);
323 copy = PARA_MIN(sz, n);
324 memcpy(buf, src, copy);
325 btr_add_output_pool(btrp, copy, btrn);
326 if (copy == n)
327 return;
328 sz = btr_pool_get_buffer(btrp, &buf);
329 assert(sz >= n - copy);
330 memcpy(buf, src + copy, n - copy);
331 btr_add_output_pool(btrp, n - copy, btrn);
332 }
333
334 static void btr_pushdown_br(struct btr_buffer_reference *br, struct btr_node *btrn)
335 {
336 add_btrb_to_children(br->btrb, btrn, br->consumed);
337 btr_drop_buffer_reference(br);
338 }
339
340 void btr_pushdown(struct btr_node *btrn)
341 {
342 struct btr_buffer_reference *br, *tmp;
343
344 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
345 btr_pushdown_br(br, btrn);
346 }
347
348 int btr_pushdown_one(struct btr_node *btrn)
349 {
350 struct btr_buffer_reference *br;
351
352 if (list_empty(&btrn->input_queue))
353 return 0;
354 br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
355 btr_pushdown_br(br, btrn);
356 return 1;
357 }
358
359 /* Return true if this node has no children. */
360 bool btr_no_children(struct btr_node *btrn)
361 {
362 return list_empty(&btrn->children);
363 }
364
365 bool btr_no_parent(struct btr_node *btrn)
366 {
367 return !btrn->parent;
368 }
369
370 bool btr_inplace_ok(struct btr_node *btrn)
371 {
372 if (!btrn->parent)
373 return true;
374 return list_is_singular(&btrn->parent->children);
375 }
376
377 static inline size_t br_available_bytes(struct btr_buffer_reference *br)
378 {
379 return br->btrb->size - br->consumed;
380 }
381
382 size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
383 {
384 if (buf)
385 *buf = br->btrb->buf + br->consumed;
386 return br_available_bytes(br);
387 }
388
389 /**
390 * \return zero if the input buffer queue is empty.
391 */
392 size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
393 {
394 struct btr_buffer_reference *br;
395 char *buf, *result = NULL;
396 size_t sz, rv = 0;
397
398 FOR_EACH_BUFFER_REF(br, btrn) {
399 sz = btr_get_buffer_by_reference(br, &buf);
400 if (!result) {
401 result = buf;
402 rv = sz;
403 if (!br->btrb->pool)
404 break;
405 continue;
406 }
407 if (!br->btrb->pool)
408 break;
409 if (result + rv != buf)
410 break;
411 rv += sz;
412 }
413 if (bufp)
414 *bufp = result;
415 return rv;
416 }
417
418 void btr_consume(struct btr_node *btrn, size_t numbytes)
419 {
420 struct btr_buffer_reference *br, *tmp;
421 size_t sz;
422
423 if (numbytes == 0)
424 return;
425 br = get_first_input_br(btrn);
426 assert(br);
427
428 if (br->wrap_count == 0) {
429 /*
430 * No wrap buffer. Drop buffer references whose buffer
431 * has been fully used. */
432 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn) {
433 if (br->consumed + numbytes <= br->btrb->size) {
434 br->consumed += numbytes;
435 if (br->consumed == br->btrb->size)
436 btr_drop_buffer_reference(br);
437 return;
438 }
439 numbytes -= br->btrb->size - br->consumed;
440 btr_drop_buffer_reference(br);
441 }
442 assert(true);
443 }
444 /*
445
446 We have a wrap buffer, consume from it. If in total,
447 i.e. including previous calls to brt_consume(), less than
448 wrap_count has been consumed, there's nothing more we can do.
449
450 Otherwise we drop the wrap buffer and consume from subsequent
451 buffers of the input queue the correct amount of bytes. This
452 is the total number of bytes that have been consumed from the
453 wrap buffer.
454 */
455 PARA_DEBUG_LOG("consuming %zu/%zu bytes from wrap buffer\n", numbytes,
456 br_available_bytes(br));
457
458 assert(numbytes <= br_available_bytes(br));
459 if (br->consumed + numbytes < br->wrap_count) {
460 br->consumed += numbytes;
461 return;
462 }
463 PARA_DEBUG_LOG("dropping wrap buffer (%zu bytes)\n", br->btrb->size);
464 /* get rid of the wrap buffer */
465 sz = br->consumed + numbytes;
466 btr_drop_buffer_reference(br);
467 return btr_consume(btrn, sz);
468 }
469
470 static void flush_input_queue(struct btr_node *btrn)
471 {
472 struct btr_buffer_reference *br, *tmp;
473 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
474 btr_drop_buffer_reference(br);
475 }
476
477 void btr_free_node(struct btr_node *btrn)
478 {
479 if (!btrn)
480 return;
481 free(btrn->name);
482 free(btrn);
483 }
484
485 void btr_remove_node(struct btr_node *btrn)
486 {
487 struct btr_node *ch;
488
489 if (!btrn)
490 return;
491 PARA_NOTICE_LOG("removing btr node %s from buffer tree\n", btrn->name);
492 FOR_EACH_CHILD(ch, btrn)
493 ch->parent = NULL;
494 flush_input_queue(btrn);
495 if (btrn->parent)
496 list_del(&btrn->node);
497 }
498
499 size_t btr_get_input_queue_size(struct btr_node *btrn)
500 {
501 struct btr_buffer_reference *br;
502 size_t size = 0, wrap_consumed = 0;
503
504 FOR_EACH_BUFFER_REF(br, btrn) {
505 if (br->wrap_count != 0) {
506 wrap_consumed = br->consumed;
507 continue;
508 }
509 size += br_available_bytes(br);
510 }
511 assert(wrap_consumed <= size);
512 size -= wrap_consumed;
513 return size;
514 }
515
516 void btr_splice_out_node(struct btr_node *btrn)
517 {
518 struct btr_node *ch, *tmp;
519
520 assert(btrn);
521 PARA_NOTICE_LOG("splicing out %s\n", btrn->name);
522 btr_pushdown(btrn);
523 if (btrn->parent)
524 list_del(&btrn->node);
525 FOR_EACH_CHILD_SAFE(ch, tmp, btrn) {
526 PARA_INFO_LOG("parent(%s): %s\n", ch->name,
527 btrn->parent? btrn->parent->name : "NULL");
528 ch->parent = btrn->parent;
529 if (btrn->parent)
530 list_move(&ch->node, &btrn->parent->children);
531 }
532 assert(list_empty(&btrn->children));
533 }
534
535 /**
536 * Return the size of the largest input queue.
537 *
538 * Iterates over all children of the given node.
539 */
540 size_t btr_bytes_pending(struct btr_node *btrn)
541 {
542 size_t max_size = 0;
543 struct btr_node *ch;
544
545 FOR_EACH_CHILD(ch, btrn) {
546 size_t size = btr_get_input_queue_size(ch);
547 max_size = PARA_MAX(max_size, size);
548 }
549 return max_size;
550 }
551
552 int btr_exec(struct btr_node *btrn, const char *command, char **value_result)
553 {
554 if (!btrn)
555 return -ERRNO_TO_PARA_ERROR(EINVAL);
556 if (!btrn->execute)
557 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
558 return btrn->execute(btrn, command, value_result);
559 }
560
561 int btr_exec_up(struct btr_node *btrn, const char *command, char **value_result)
562 {
563 int ret;
564
565 for (; btrn; btrn = btrn->parent) {
566 struct btr_node *parent = btrn->parent;
567 if (!parent)
568 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
569 if (!parent->execute)
570 continue;
571 PARA_INFO_LOG("parent: %s, cmd: %s\n", parent->name, command);
572 ret = parent->execute(parent, command, value_result);
573 if (ret == -ERRNO_TO_PARA_ERROR(ENOTSUP))
574 continue;
575 if (ret < 0)
576 return ret;
577 if (value_result && *value_result)
578 PARA_NOTICE_LOG("%s(%s): %s\n", command, parent->name,
579 *value_result);
580 return 1;
581 }
582 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
583 }
584
585 void *btr_context(struct btr_node *btrn)
586 {
587 return btrn->context;
588 }
589
590 static bool need_buffer_pool_merge(struct btr_node *btrn)
591 {
592 struct btr_buffer_reference *br = get_first_input_br(btrn);
593
594 if (!br)
595 return false;
596 if (br->wrap_count != 0)
597 return true;
598 if (br->btrb->pool)
599 return true;
600 return false;
601 }
602
603 static void merge_input_pool(struct btr_node *btrn, size_t dest_size)
604 {
605 struct btr_buffer_reference *br, *wbr = NULL;
606 int num_refs; /* including wrap buffer */
607 char *buf, *buf1 = NULL, *buf2 = NULL;
608 size_t sz, sz1 = 0, sz2 = 0, wsz;
609
610 br = get_first_input_br(btrn);
611 if (!br || br_available_bytes(br) >= dest_size)
612 return;
613 num_refs = 0;
614 FOR_EACH_BUFFER_REF(br, btrn) {
615 num_refs++;
616 sz = btr_get_buffer_by_reference(br, &buf);
617 if (sz == 0)
618 break;
619 if (br->wrap_count != 0) {
620 assert(!wbr);
621 assert(num_refs == 1);
622 wbr = br;
623 if (sz >= dest_size)
624 return;
625 continue;
626 }
627 if (!buf1) {
628 buf1 = buf;
629 sz1 = sz;
630 goto next;
631 }
632 if (buf1 + sz1 == buf) {
633 sz1 += sz;
634 goto next;
635 }
636 if (!buf2) {
637 buf2 = buf;
638 sz2 = sz;
639 goto next;
640 }
641 assert(buf2 + sz2 == buf);
642 sz2 += sz;
643 next:
644 if (sz1 + sz2 >= dest_size)
645 break;
646 }
647 if (!buf2) /* nothing to do */
648 return;
649 assert(buf1 && sz2 > 0);
650 /*
651 * If the second buffer is large, we only take the first part of it to
652 * avoid having to memcpy() huge buffers.
653 */
654 sz2 = PARA_MIN(sz2, (size_t)(64 * 1024));
655 if (!wbr) {
656 /* Make a new wrap buffer combining buf1 and buf2. */
657 sz = sz1 + sz2;
658 buf = para_malloc(sz);
659 PARA_DEBUG_LOG("merging input buffers: (%p:%zu, %p:%zu) -> %p:%zu\n",
660 buf1, sz1, buf2, sz2, buf, sz);
661 memcpy(buf, buf1, sz1);
662 memcpy(buf + sz1, buf2, sz2);
663 br = para_calloc(sizeof(*br));
664 br->btrb = new_btrb(buf, sz);
665 br->btrb->refcount = 1;
666 br->consumed = 0;
667 /* This is a wrap buffer */
668 br->wrap_count = sz1;
669 para_list_add(&br->node, &btrn->input_queue);
670 return;
671 }
672 PARA_DEBUG_LOG("increasing wrap buffer, sz1: %zu, sz2: %zu\n", sz1, sz2);
673 /*
674 * We already have a wrap buffer, but it is too small. It might be
675 * partially used.
676 */
677 wsz = br_available_bytes(wbr);
678 if (wbr->wrap_count == sz1 && wbr->btrb->size >= sz1 + sz2) /* nothing we can do about it */
679 return;
680 sz = sz1 + sz2 - wbr->btrb->size; /* amount of new data */
681 wbr->btrb->size += sz;
682 wbr->btrb->buf = para_realloc(wbr->btrb->buf, wbr->btrb->size);
683 /* copy the new data to the end of the reallocated buffer */
684 assert(sz2 >= sz);
685 memcpy(wbr->btrb->buf + wbr->btrb->size - sz, buf2 + sz2 - sz, sz);
686 }
687
688 /**
689 * Merge the first two input buffers into one.
690 *
691 * This is a quite expensive operation.
692 *
693 * \return The number of buffers that have been available (zero, one or two).
694 */
695 static int merge_input(struct btr_node *btrn)
696 {
697 struct btr_buffer_reference *brs[2], *br;
698 char *bufs[2], *buf;
699 size_t szs[2], sz;
700 int i;
701
702 if (list_empty(&btrn->input_queue))
703 return 0;
704 if (list_is_singular(&btrn->input_queue))
705 return 1;
706 i = 0;
707 /* get references to the first two buffers */
708 FOR_EACH_BUFFER_REF(br, btrn) {
709 brs[i] = br;
710 szs[i] = btr_get_buffer_by_reference(brs[i], bufs + i);
711 i++;
712 if (i == 2)
713 break;
714 }
715 /* make a new btrb that combines the two buffers and a br to it. */
716 sz = szs[0] + szs[1];
717 buf = para_malloc(sz);
718 PARA_DEBUG_LOG("%s: memory merging input buffers: (%zu, %zu) -> %zu\n",
719 btrn->name, szs[0], szs[1], sz);
720 memcpy(buf, bufs[0], szs[0]);
721 memcpy(buf + szs[0], bufs[1], szs[1]);
722
723 br = para_calloc(sizeof(*br));
724 br->btrb = new_btrb(buf, sz);
725 br->btrb->refcount = 1;
726
727 /* replace the first two refs by the new one */
728 btr_drop_buffer_reference(brs[0]);
729 btr_drop_buffer_reference(brs[1]);
730 para_list_add(&br->node, &btrn->input_queue);
731 return 2;
732 }
733
734 void btr_merge(struct btr_node *btrn, size_t dest_size)
735 {
736 if (need_buffer_pool_merge(btrn))
737 return merge_input_pool(btrn, dest_size);
738 for (;;) {
739 char *buf;
740 size_t len = btr_next_buffer(btrn, &buf);
741 if (len >= dest_size)
742 return;
743 PARA_DEBUG_LOG("input size = %zu < %zu = dest\n", len, dest_size);
744 if (merge_input(btrn) < 2)
745 return;
746 }
747 }
748
749 bool btr_eof(struct btr_node *btrn)
750 {
751 char *buf;
752 size_t len = btr_next_buffer(btrn, &buf);
753
754 return (len == 0 && btr_no_parent(btrn));
755 }
756
757 void log_tree_recursively(struct btr_node *btrn, int loglevel, int depth)
758 {
759 struct btr_node *ch;
760 const char spaces[] = " ", *space = spaces + 16 - depth;
761
762 if (depth > 16)
763 return;
764 para_log(loglevel, "%s%s\n", space, btrn->name);
765 FOR_EACH_CHILD(ch, btrn)
766 log_tree_recursively(ch, loglevel, depth + 1);
767 }
768
769 void btr_log_tree(struct btr_node *btrn, int loglevel)
770 {
771 return log_tree_recursively(btrn, loglevel, 0);
772 }
773
774 /*
775 * \return \a root if \a name is \p NULL.
776 */
777 struct btr_node *btr_search_node(const char *name, struct btr_node *root)
778 {
779 struct btr_node *ch;
780
781 if (!name)
782 return root;
783 if (!strcmp(root->name, name))
784 return root;
785 FOR_EACH_CHILD(ch, root) {
786 struct btr_node *result = btr_search_node(name, ch);
787 if (result)
788 return result;
789 }
790 return NULL;
791 }
792
793 /** 640K ought to be enough for everybody ;) */
794 #define BTRN_MAX_PENDING (640 * 1024)
795
796 int btr_node_status(struct btr_node *btrn, size_t min_iqs,
797 enum btr_node_type type)
798 {
799 size_t iqs;
800
801 assert(btrn);
802 if (type != BTR_NT_LEAF) {
803 if (btr_no_children(btrn))
804 return -E_BTR_NO_CHILD;
805 if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
806 return 0;
807 }
808 if (type != BTR_NT_ROOT) {
809 if (btr_eof(btrn))
810 return -E_BTR_EOF;
811 iqs = btr_get_input_queue_size(btrn);
812 if (iqs == 0) /* we have a parent, because not eof */
813 return 0;
814 if (iqs < min_iqs && !btr_no_parent(btrn))
815 return 0;
816 }
817 return 1;
818 }
819
820 void btr_get_node_start(struct btr_node *btrn, struct timeval *tv)
821 {
822 *tv = btrn->start;
823 }