40079c2ff9984f5207ad5f1cb0aeaaf78bf9dedc
[paraslash.git] / buffer_tree.c
1 #include <regex.h>
2 #include <stdbool.h>
3
4 #include "para.h"
5 #include "list.h"
6 #include "string.h"
7 #include "buffer_tree.h"
8 #include "error.h"
9 #include "sched.h"
10
11 struct btr_pool {
12 char *name;
13 char *area_start;
14 char *area_end;
15 char *rhead;
16 char *whead;
17 };
18
19 enum btr_buffer_flags {
20 /* changes the way the buffer is deallocated */
21 BTR_BF_BTR_POOL = 1,
22 };
23
24 struct btr_buffer {
25 char *buf;
26 size_t size;
27 /** The number of references to this buffer. */
28 int refcount;
29 struct btr_pool *pool;
30 };
31
32 struct btr_buffer_reference {
33 struct btr_buffer *btrb;
34 size_t consumed;
35 /* Each buffer reference belongs to the buffer queue list of some buffer tree node. */
36 struct list_head node;
37 size_t wrap_count;
38 };
39
40 struct btr_node {
41 char *name;
42 struct btr_node *parent;
43 /* The position of this btr node in the buffer tree. */
44 struct list_head node;
45 /* The children nodes of this btr node are linked together in a list. */
46 struct list_head children;
47 /* Time of first data transfer. */
48 struct timeval start;
49 /**
50 * The input queue is a list of references to btr buffers. Each item on
51 * the list represents an input buffer which has not been completely
52 * used by this btr node.
53 */
54 struct list_head input_queue;
55 btr_command_handler execute;
56 void *context;
57 };
58
59 struct btr_pool *btr_pool_new(const char *name, size_t area_size)
60 {
61 struct btr_pool *btrp;
62
63 PARA_INFO_LOG("%s, %zu bytes\n", name, area_size);
64 btrp = para_malloc(sizeof(*btrp));
65 btrp->area_start = para_malloc(area_size);
66 btrp->area_end = btrp->area_start + area_size;
67 btrp->rhead = btrp->area_start;
68 btrp->whead = btrp->area_start;
69 btrp->name = para_strdup(name);
70 return btrp;
71 }
72
73 /* whead = NULL means area full */
74
75 void btr_pool_free(struct btr_pool *btrp)
76 {
77 if (!btrp)
78 return;
79 free(btrp->area_start);
80 free(btrp->name);
81 free(btrp);
82 }
83
84 size_t btr_pool_size(struct btr_pool *btrp)
85 {
86 return btrp->area_end - btrp->area_start;
87 }
88
89 size_t btr_pool_filled(struct btr_pool *btrp)
90 {
91 if (!btrp->whead)
92 return btr_pool_size(btrp);
93 if (btrp->rhead <= btrp->whead)
94 return btrp->whead - btrp->rhead;
95 return btr_pool_size(btrp) - (btrp->rhead - btrp->whead);
96 }
97
98 size_t btr_pool_unused(struct btr_pool *btrp)
99 {
100 return btr_pool_size(btrp) - btr_pool_filled(btrp);
101 }
102
103 size_t btr_pool_available(struct btr_pool *btrp)
104 {
105 if (!btrp->whead)
106 return 0;
107 if (btrp->rhead <= btrp->whead)
108 return btrp->area_end - btrp->whead;
109 return btrp->rhead - btrp->whead;
110 }
111
112 size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result)
113 {
114 if (result)
115 *result = btrp->whead;
116 return btr_pool_available(btrp);
117 }
118
119 void btr_pool_allocate(struct btr_pool *btrp, size_t size)
120 {
121 char *end;
122
123 if (size == 0)
124 return;
125 //PARA_CRIT_LOG("filled: %zu, alloc %zu\n", btr_pool_filled(btrp), size);
126 assert(size <= btr_pool_available(btrp));
127 end = btrp->whead + size;
128 assert(end <= btrp->area_end);
129
130 if (end == btrp->area_end) {
131 PARA_DEBUG_LOG("%s: end of pool area reached\n", btrp->name);
132 end = btrp->area_start;
133 }
134 if (end == btrp->rhead) {
135 PARA_DEBUG_LOG("btrp buffer full\n");
136 end = NULL; /* buffer full */
137 }
138 btrp->whead = end;
139 //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
140 }
141
142 static void btr_pool_deallocate(struct btr_pool *btrp, size_t size)
143 {
144 char *end = btrp->rhead + size;
145
146 //PARA_CRIT_LOG("filled: %zu, dealloc %zu\n", btr_pool_filled(btrp), size);
147 if (size == 0)
148 return;
149 assert(end <= btrp->area_end);
150 assert(size <= btr_pool_filled(btrp));
151 if (end == btrp->area_end)
152 end = btrp->area_start;
153 if (!btrp->whead)
154 btrp->whead = btrp->rhead;
155 btrp->rhead = end;
156 if (btrp->rhead == btrp->whead)
157 btrp->rhead = btrp->whead = btrp->area_start;
158 //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
159 }
160
161 #define FOR_EACH_CHILD(_tn, _btrn) list_for_each_entry((_tn), \
162 &((_btrn)->children), node)
163 #define FOR_EACH_CHILD_SAFE(_tn, _tmp, _btrn) \
164 list_for_each_entry_safe((_tn), (_tmp), &((_btrn)->children), node)
165
166 #define FOR_EACH_BUFFER_REF(_br, _btrn) \
167 list_for_each_entry((_br), &(_btrn)->input_queue, node)
168 #define FOR_EACH_BUFFER_REF_SAFE(_br, _tmp, _btrn) \
169 list_for_each_entry_safe((_br), (_tmp), &(_btrn)->input_queue, node)
170
171 struct btr_node *btr_new_node(const char *name, struct btr_node *parent,
172 btr_command_handler handler, void *context)
173 {
174 struct btr_node *btrn = para_malloc(sizeof(*btrn));
175
176 btrn->name = para_strdup(name);
177 btrn->parent = parent;
178 btrn->execute = handler;
179 btrn->context = context;
180 btrn->start.tv_sec = 0;
181 btrn->start.tv_usec = 0;
182 if (parent)
183 list_add_tail(&btrn->node, &parent->children);
184 INIT_LIST_HEAD(&btrn->children);
185 INIT_LIST_HEAD(&btrn->input_queue);
186 if (parent)
187 PARA_INFO_LOG("added %s as child of %s\n", name, parent->name);
188 else
189 PARA_INFO_LOG("added %s as btr root\n", name);
190 return btrn;
191 }
192
193 /*
194 * Allocate a new btr buffer.
195 *
196 * The freshly allocated buffer will have a zero refcount and will
197 * not be associated with a btr pool.
198 */
199 static struct btr_buffer *new_btrb(char *buf, size_t size)
200 {
201 struct btr_buffer *btrb = para_calloc(sizeof(*btrb));
202
203 btrb->buf = buf;
204 btrb->size = size;
205 return btrb;
206 }
207
208 static void dealloc_buffer(struct btr_buffer *btrb)
209 {
210 if (btrb->pool)
211 btr_pool_deallocate(btrb->pool, btrb->size);
212 else
213 free(btrb->buf);
214 }
215
216 static struct btr_buffer_reference *get_first_input_br(struct btr_node *btrn)
217 {
218 if (list_empty(&btrn->input_queue))
219 return NULL;
220 return list_first_entry(&btrn->input_queue,
221 struct btr_buffer_reference, node);
222 }
223
224 /*
225 * Deallocate the reference, release the resources if refcount drops to zero.
226 */
227 static void btr_drop_buffer_reference(struct btr_buffer_reference *br)
228 {
229 struct btr_buffer *btrb = br->btrb;
230
231 //PARA_CRIT_LOG("dropping buffer reference %p\n", br);
232 list_del(&br->node);
233 free(br);
234 btrb->refcount--;
235 if (btrb->refcount == 0) {
236 dealloc_buffer(btrb);
237 free(btrb);
238 }
239 }
240
241 static void add_btrb_to_children(struct btr_buffer *btrb,
242 struct btr_node *btrn, size_t consumed)
243 {
244 struct btr_node *ch;
245
246 if (btrn->start.tv_sec == 0)
247 btrn->start = *now;
248 FOR_EACH_CHILD(ch, btrn) {
249 struct btr_buffer_reference *br = para_calloc(sizeof(*br));
250 br->btrb = btrb;
251 br->consumed = consumed;
252 list_add_tail(&br->node, &ch->input_queue);
253 btrb->refcount++;
254 if (ch->start.tv_sec == 0)
255 ch->start = *now;
256 }
257 }
258
259 void btr_add_output(char *buf, size_t size, struct btr_node *btrn)
260 {
261 struct btr_buffer *btrb;
262
263 assert(size != 0);
264 if (list_empty(&btrn->children)) {
265 free(buf);
266 return;
267 }
268 btrb = new_btrb(buf, size);
269 add_btrb_to_children(btrb, btrn, 0);
270 }
271
272 void btr_add_output_pool(struct btr_pool *btrp, size_t size,
273 struct btr_node *btrn)
274 {
275 struct btr_buffer *btrb;
276 char *buf;
277 size_t avail;
278
279 assert(size != 0);
280 if (list_empty(&btrn->children))
281 return;
282 avail = btr_pool_get_buffer(btrp, &buf);
283 assert(avail >= size);
284 btr_pool_allocate(btrp, size);
285 btrb = new_btrb(buf, size);
286 btrb->pool = btrp;
287 add_btrb_to_children(btrb, btrn, 0);
288 }
289
290 void btr_copy(const void *src, size_t n, struct btr_pool *btrp,
291 struct btr_node *btrn)
292 {
293 char *buf;
294 size_t sz, copy;
295
296 if (n == 0)
297 return;
298 assert(n <= btr_pool_unused(btrp));
299 sz = btr_pool_get_buffer(btrp, &buf);
300 copy = PARA_MIN(sz, n);
301 memcpy(buf, src, copy);
302 btr_add_output_pool(btrp, copy, btrn);
303 if (copy == n)
304 return;
305 sz = btr_pool_get_buffer(btrp, &buf);
306 assert(sz >= n - copy);
307 memcpy(buf, src + copy, n - copy);
308 btr_add_output_pool(btrp, n - copy, btrn);
309 }
310
311 static void btr_pushdown_br(struct btr_buffer_reference *br, struct btr_node *btrn)
312 {
313 add_btrb_to_children(br->btrb, btrn, br->consumed);
314 btr_drop_buffer_reference(br);
315 }
316
317 void btr_pushdown(struct btr_node *btrn)
318 {
319 struct btr_buffer_reference *br, *tmp;
320
321 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
322 btr_pushdown_br(br, btrn);
323 }
324
325 int btr_pushdown_one(struct btr_node *btrn)
326 {
327 struct btr_buffer_reference *br;
328
329 if (list_empty(&btrn->input_queue))
330 return 0;
331 br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
332 btr_pushdown_br(br, btrn);
333 return 1;
334 }
335
336 /* Return true if this node has no children. */
337 bool btr_no_children(struct btr_node *btrn)
338 {
339 return list_empty(&btrn->children);
340 }
341
342 bool btr_no_parent(struct btr_node *btrn)
343 {
344 return !btrn->parent;
345 }
346
347 bool btr_inplace_ok(struct btr_node *btrn)
348 {
349 if (!btrn->parent)
350 return true;
351 return list_is_singular(&btrn->parent->children);
352 }
353
354 static inline size_t br_available_bytes(struct btr_buffer_reference *br)
355 {
356 return br->btrb->size - br->consumed;
357 }
358
359 size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
360 {
361 if (buf)
362 *buf = br->btrb->buf + br->consumed;
363 return br_available_bytes(br);
364 }
365
366 /**
367 * \return zero if the input buffer queue is empty.
368 */
369 size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
370 {
371 struct btr_buffer_reference *br;
372 char *buf, *result = NULL;
373 size_t sz, rv = 0;
374
375 FOR_EACH_BUFFER_REF(br, btrn) {
376 sz = btr_get_buffer_by_reference(br, &buf);
377 if (!result) {
378 result = buf;
379 rv = sz;
380 if (!br->btrb->pool)
381 break;
382 continue;
383 }
384 if (!br->btrb->pool)
385 break;
386 if (result + rv != buf) {
387 PARA_DEBUG_LOG("%s: pool merge impossible: %p != %p\n",
388 btrn->name, result + rv, buf);
389 break;
390 }
391 // PARA_CRIT_LOG("%s: inplace merge (%zu, %zu)->%zu\n", btrn->name,
392 // rv, sz, rv + sz);
393 // PARA_CRIT_LOG("%s: inplace merge %p (%zu)\n", btrn->name,
394 // result, sz);
395 rv += sz;
396 }
397 if (bufp)
398 *bufp = result;
399 return rv;
400 }
401
402 void btr_consume(struct btr_node *btrn, size_t numbytes)
403 {
404 struct btr_buffer_reference *br, *tmp;
405 size_t sz;
406
407 if (numbytes == 0)
408 return;
409 br = get_first_input_br(btrn);
410 assert(br);
411
412 //PARA_CRIT_LOG("wrap count: %zu\n", br->wrap_count);
413 if (br->wrap_count == 0) {
414 /*
415 * No wrap buffer. Drop buffer references whose buffer
416 * has been fully used. */
417 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn) {
418 if (br->consumed + numbytes <= br->btrb->size) {
419 br->consumed += numbytes;
420 if (br->consumed == br->btrb->size)
421 btr_drop_buffer_reference(br);
422 return;
423 }
424 numbytes -= br->btrb->size - br->consumed;
425 btr_drop_buffer_reference(br);
426 }
427 assert(true);
428 }
429 /*
430
431 We have a wrap buffer, consume from it. If in total,
432 i.e. including previous calls to brt_consume(), less than
433 wrap_count has been consumed, there's nothing more we can do.
434
435 Otherwise we drop the wrap buffer and consume from subsequent
436 buffers of the input queue the correct amount of bytes. This
437 is the total number of bytes that have been consumed from the
438 wrap buffer.
439 */
440 PARA_DEBUG_LOG("consuming %zu/%zu bytes from wrap buffer\n", numbytes,
441 br_available_bytes(br));
442
443 assert(numbytes <= br_available_bytes(br));
444 if (br->consumed + numbytes < br->wrap_count) {
445 br->consumed += numbytes;
446 return;
447 }
448 PARA_DEBUG_LOG("dropping wrap buffer (%zu bytes)\n", br->btrb->size);
449 /* get rid of the wrap buffer */
450 sz = br->consumed + numbytes;
451 btr_drop_buffer_reference(br);
452 return btr_consume(btrn, sz);
453 }
454
455 static void flush_input_queue(struct btr_node *btrn)
456 {
457 struct btr_buffer_reference *br, *tmp;
458 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
459 btr_drop_buffer_reference(br);
460 }
461
462 void btr_free_node(struct btr_node *btrn)
463 {
464 if (!btrn)
465 return;
466 free(btrn->name);
467 free(btrn);
468 }
469
470 void btr_remove_node(struct btr_node *btrn)
471 {
472 struct btr_node *ch;
473
474 if (!btrn)
475 return;
476 PARA_NOTICE_LOG("removing btr node %s from buffer tree\n", btrn->name);
477 FOR_EACH_CHILD(ch, btrn)
478 ch->parent = NULL;
479 flush_input_queue(btrn);
480 if (btrn->parent)
481 list_del(&btrn->node);
482 }
483
484 size_t btr_get_input_queue_size(struct btr_node *btrn)
485 {
486 struct btr_buffer_reference *br;
487 size_t size = 0;
488
489 FOR_EACH_BUFFER_REF(br, btrn) {
490 //PARA_CRIT_LOG("size: %zu\n", size);
491 size += br_available_bytes(br);
492 }
493 return size;
494 }
495
496 void btr_splice_out_node(struct btr_node *btrn)
497 {
498 struct btr_node *ch, *tmp;
499
500 assert(btrn);
501 PARA_NOTICE_LOG("splicing out %s\n", btrn->name);
502 btr_pushdown(btrn);
503 if (btrn->parent)
504 list_del(&btrn->node);
505 FOR_EACH_CHILD_SAFE(ch, tmp, btrn) {
506 PARA_INFO_LOG("parent(%s): %s\n", ch->name,
507 btrn->parent? btrn->parent->name : "NULL");
508 ch->parent = btrn->parent;
509 if (btrn->parent)
510 list_move(&ch->node, &btrn->parent->children);
511 }
512 assert(list_empty(&btrn->children));
513 }
514
515 /**
516 * Return the size of the largest input queue.
517 *
518 * Iterates over all children of the given node.
519 */
520 size_t btr_bytes_pending(struct btr_node *btrn)
521 {
522 size_t max_size = 0;
523 struct btr_node *ch;
524
525 FOR_EACH_CHILD(ch, btrn) {
526 size_t size = btr_get_input_queue_size(ch);
527 max_size = PARA_MAX(max_size, size);
528 }
529 return max_size;
530 }
531
532 int btr_exec(struct btr_node *btrn, const char *command, char **value_result)
533 {
534 if (!btrn)
535 return -ERRNO_TO_PARA_ERROR(EINVAL);
536 if (!btrn->execute)
537 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
538 return btrn->execute(btrn, command, value_result);
539 }
540
541 int btr_exec_up(struct btr_node *btrn, const char *command, char **value_result)
542 {
543 int ret;
544
545 for (; btrn; btrn = btrn->parent) {
546 struct btr_node *parent = btrn->parent;
547 if (!parent)
548 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
549 if (!parent->execute)
550 continue;
551 PARA_INFO_LOG("parent: %s, cmd: %s\n", parent->name, command);
552 ret = parent->execute(parent, command, value_result);
553 if (ret == -ERRNO_TO_PARA_ERROR(ENOTSUP))
554 continue;
555 if (ret < 0)
556 return ret;
557 if (value_result && *value_result)
558 PARA_NOTICE_LOG("%s(%s): %s\n", command, parent->name,
559 *value_result);
560 return 1;
561 }
562 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
563 }
564
565 void *btr_context(struct btr_node *btrn)
566 {
567 return btrn->context;
568 }
569
570 static bool need_buffer_pool_merge(struct btr_node *btrn)
571 {
572 struct btr_buffer_reference *br = get_first_input_br(btrn);
573
574 if (!br)
575 return false;
576 if (br->wrap_count != 0)
577 return true;
578 if (br->btrb->pool)
579 return true;
580 return false;
581 }
582
583 static void merge_input_pool(struct btr_node *btrn, size_t dest_size)
584 {
585 struct btr_buffer_reference *br, *wbr = NULL;
586 int num_refs; /* including wrap buffer */
587 char *buf, *buf1 = NULL, *buf2 = NULL;
588 size_t sz, sz1 = 0, sz2 = 0, wsz;
589
590 br = get_first_input_br(btrn);
591 if (!br || br_available_bytes(br) >= dest_size)
592 return;
593 num_refs = 0;
594 FOR_EACH_BUFFER_REF(br, btrn) {
595 num_refs++;
596 sz = btr_get_buffer_by_reference(br, &buf);
597 if (sz == 0)
598 break;
599 if (br->wrap_count != 0) {
600 assert(!wbr);
601 assert(num_refs == 1);
602 wbr = br;
603 if (sz >= dest_size)
604 return;
605 continue;
606 }
607 if (!buf1) {
608 buf1 = buf;
609 sz1 = sz;
610 goto next;
611 }
612 if (buf1 + sz1 == buf) {
613 sz1 += sz;
614 goto next;
615 }
616 if (!buf2) {
617 buf2 = buf;
618 sz2 = sz;
619 goto next;
620 }
621 assert(buf2 + sz2 == buf);
622 sz2 += sz;
623 next:
624 if (sz1 + sz2 >= dest_size)
625 break;
626 }
627 if (!buf2) /* nothing to do */
628 return;
629 assert(buf1 && sz2 > 0);
630 /*
631 * If the second buffer is large, we only take the first part of it to
632 * avoid having to memcpy() huge buffers.
633 */
634 sz2 = PARA_MIN(sz2, (size_t)(64 * 1024));
635 if (!wbr) {
636 /* Make a new wrap buffer combining buf1 and buf2. */
637 sz = sz1 + sz2;
638 buf = para_malloc(sz);
639 PARA_DEBUG_LOG("merging input buffers: (%p:%zu, %p:%zu) -> %p:%zu\n",
640 buf1, sz1, buf2, sz2, buf, sz);
641 memcpy(buf, buf1, sz1);
642 memcpy(buf + sz1, buf2, sz2);
643 br = para_calloc(sizeof(*br));
644 br->btrb = new_btrb(buf, sz);
645 br->btrb->refcount = 1;
646 br->consumed = 0;
647 /* This is a wrap buffer */
648 br->wrap_count = sz1;
649 para_list_add(&br->node, &btrn->input_queue);
650 return;
651 }
652 PARA_DEBUG_LOG("increasing wrap buffer, sz1: %zu, sz2: %zu\n", sz1, sz2);
653 /*
654 * We already have a wrap buffer, but it is too small. It might be
655 * partially used.
656 */
657 wsz = br_available_bytes(wbr);
658 if (wbr->wrap_count == sz1 && wbr->btrb->size >= sz1 + sz2) /* nothing we can do about it */
659 return;
660 sz = sz1 + sz2 - wbr->btrb->size; /* amount of new data */
661 wbr->btrb->size += sz;
662 wbr->btrb->buf = para_realloc(wbr->btrb->buf, wbr->btrb->size);
663 /* copy the new data to the end of the reallocated buffer */
664 assert(sz2 >= sz);
665 memcpy(wbr->btrb->buf + wbr->btrb->size - sz, buf2 + sz2 - sz, sz);
666 }
667
668 /**
669 * Merge the first two input buffers into one.
670 *
671 * This is a quite expensive operation.
672 *
673 * \return The number of buffers that have been available (zero, one or two).
674 */
675 static int merge_input(struct btr_node *btrn)
676 {
677 struct btr_buffer_reference *brs[2], *br;
678 char *bufs[2], *buf;
679 size_t szs[2], sz;
680 int i;
681
682 if (list_empty(&btrn->input_queue))
683 return 0;
684 if (list_is_singular(&btrn->input_queue))
685 return 1;
686 i = 0;
687 /* get references to the first two buffers */
688 FOR_EACH_BUFFER_REF(br, btrn) {
689 brs[i] = br;
690 szs[i] = btr_get_buffer_by_reference(brs[i], bufs + i);
691 i++;
692 if (i == 2)
693 break;
694 }
695 /* make a new btrb that combines the two buffers and a br to it. */
696 sz = szs[0] + szs[1];
697 buf = para_malloc(sz);
698 PARA_DEBUG_LOG("%s: memory merging input buffers: (%zu, %zu) -> %zu\n",
699 btrn->name, szs[0], szs[1], sz);
700 memcpy(buf, bufs[0], szs[0]);
701 memcpy(buf + szs[0], bufs[1], szs[1]);
702
703 br = para_calloc(sizeof(*br));
704 br->btrb = new_btrb(buf, sz);
705 br->btrb->refcount = 1;
706
707 /* replace the first two refs by the new one */
708 btr_drop_buffer_reference(brs[0]);
709 btr_drop_buffer_reference(brs[1]);
710 para_list_add(&br->node, &btrn->input_queue);
711 return 2;
712 }
713
714 void btr_merge(struct btr_node *btrn, size_t dest_size)
715 {
716 if (need_buffer_pool_merge(btrn))
717 return merge_input_pool(btrn, dest_size);
718 for (;;) {
719 char *buf;
720 size_t len = btr_next_buffer(btrn, &buf);
721 if (len >= dest_size)
722 return;
723 PARA_DEBUG_LOG("input size = %zu < %zu = dest\n", len, dest_size);
724 if (merge_input(btrn) < 2)
725 return;
726 }
727 }
728
729 bool btr_eof(struct btr_node *btrn)
730 {
731 char *buf;
732 size_t len = btr_next_buffer(btrn, &buf);
733
734 return (len == 0 && btr_no_parent(btrn));
735 }
736
737 void log_tree_recursively(struct btr_node *btrn, int loglevel, int depth)
738 {
739 struct btr_node *ch;
740 const char spaces[] = " ", *space = spaces + 16 - depth;
741
742 if (depth > 16)
743 return;
744 para_log(loglevel, "%s%s\n", space, btrn->name);
745 FOR_EACH_CHILD(ch, btrn)
746 log_tree_recursively(ch, loglevel, depth + 1);
747 }
748
749 void btr_log_tree(struct btr_node *btrn, int loglevel)
750 {
751 return log_tree_recursively(btrn, loglevel, 0);
752 }
753
754 /*
755 * \return \a root if \a name is \p NULL.
756 */
757 struct btr_node *btr_search_node(const char *name, struct btr_node *root)
758 {
759 struct btr_node *ch;
760
761 if (!name)
762 return root;
763 if (!strcmp(root->name, name))
764 return root;
765 FOR_EACH_CHILD(ch, root) {
766 struct btr_node *result = btr_search_node(name, ch);
767 if (result)
768 return result;
769 }
770 return NULL;
771 }
772
773 /** 640K ought to be enough for everybody ;) */
774 #define BTRN_MAX_PENDING (640 * 1024)
775
776 int btr_node_status(struct btr_node *btrn, size_t min_iqs,
777 enum btr_node_type type)
778 {
779 size_t iqs;
780
781 if (!btrn)
782 return 0;
783 if (type != BTR_NT_LEAF) {
784 if (btr_no_children(btrn))
785 return -E_BTR_NO_CHILD;
786 if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
787 return 0;
788 }
789 if (type != BTR_NT_ROOT) {
790 if (btr_eof(btrn))
791 return -E_BTR_EOF;
792 iqs = btr_get_input_queue_size(btrn);
793 if (iqs == 0) /* we have a parent, because not eof */
794 return 0;
795 if (iqs < min_iqs && !btr_no_parent(btrn))
796 return 0;
797 }
798 return 1;
799 }
800
801 void btr_get_node_start(struct btr_node *btrn, struct timeval *tv)
802 {
803 *tv = btrn->start;
804 }