[btr] log message cleanups.
[paraslash.git] / buffer_tree.c
1 #include <regex.h>
2 #include <stdbool.h>
3
4 #include "para.h"
5 #include "list.h"
6 #include "string.h"
7 #include "buffer_tree.h"
8 #include "error.h"
9 #include "sched.h"
10
11 struct btr_pool {
12 char *name;
13 char *area_start;
14 char *area_end;
15 char *rhead;
16 char *whead;
17 };
18
19 enum btr_buffer_flags {
20 /* changes the way the buffer is deallocated */
21 BTR_BF_BTR_POOL = 1,
22 };
23
24 struct btr_buffer {
25 char *buf;
26 size_t size;
27 /** The number of references to this buffer. */
28 int refcount;
29 struct btr_pool *pool;
30 };
31
32 struct btr_buffer_reference {
33 struct btr_buffer *btrb;
34 size_t consumed;
35 /* Each buffer reference belongs to the buffer queue list of some buffer tree node. */
36 struct list_head node;
37 size_t wrap_count;
38 };
39
40 struct btr_node {
41 char *name;
42 struct btr_node *parent;
43 /* The position of this btr node in the buffer tree. */
44 struct list_head node;
45 /* The children nodes of this btr node are linked together in a list. */
46 struct list_head children;
47 /* Time of first data transfer. */
48 struct timeval start;
49 /**
50 * The input queue is a list of references to btr buffers. Each item on
51 * the list represents an input buffer which has not been completely
52 * used by this btr node.
53 */
54 struct list_head input_queue;
55 btr_command_handler execute;
56 void *context;
57 };
58
59 struct btr_pool *btr_pool_new(const char *name, size_t area_size)
60 {
61 struct btr_pool *btrp;
62
63 PARA_INFO_LOG("%s, %zu bytes\n", name, area_size);
64 btrp = para_malloc(sizeof(*btrp));
65 btrp->area_start = para_malloc(area_size);
66 btrp->area_end = btrp->area_start + area_size;
67 btrp->rhead = btrp->area_start;
68 btrp->whead = btrp->area_start;
69 btrp->name = para_strdup(name);
70 return btrp;
71 }
72
73 /* whead = NULL means area full */
74
75 void btr_pool_free(struct btr_pool *btrp)
76 {
77 if (!btrp)
78 return;
79 free(btrp->area_start);
80 free(btrp->name);
81 free(btrp);
82 }
83
84 size_t btr_pool_size(struct btr_pool *btrp)
85 {
86 return btrp->area_end - btrp->area_start;
87 }
88
89 size_t btr_pool_filled(struct btr_pool *btrp)
90 {
91 if (!btrp->whead)
92 return btr_pool_size(btrp);
93 if (btrp->rhead <= btrp->whead)
94 return btrp->whead - btrp->rhead;
95 return btr_pool_size(btrp) - (btrp->rhead - btrp->whead);
96 }
97
98 size_t btr_pool_unused(struct btr_pool *btrp)
99 {
100 return btr_pool_size(btrp) - btr_pool_filled(btrp);
101 }
102
103 /*
104 * Return maximal size available for one read. This is
105 * smaller than the value returned by btr_pool_unused().
106 */
107 size_t btr_pool_available(struct btr_pool *btrp)
108 {
109 if (!btrp->whead)
110 return 0;
111 if (btrp->rhead <= btrp->whead)
112 return btrp->area_end - btrp->whead;
113 return btrp->rhead - btrp->whead;
114 }
115
116 size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result)
117 {
118 if (result)
119 *result = btrp->whead;
120 return btr_pool_available(btrp);
121 }
122
123 void btr_pool_allocate(struct btr_pool *btrp, size_t size)
124 {
125 char *end;
126
127 if (size == 0)
128 return;
129 assert(size <= btr_pool_available(btrp));
130 end = btrp->whead + size;
131 assert(end <= btrp->area_end);
132
133 if (end == btrp->area_end) {
134 PARA_DEBUG_LOG("%s: end of pool area reached\n", btrp->name);
135 end = btrp->area_start;
136 }
137 if (end == btrp->rhead) {
138 PARA_DEBUG_LOG("%s btrp buffer full\n", btrp->name);
139 end = NULL; /* buffer full */
140 }
141 btrp->whead = end;
142 }
143
144 static void btr_pool_deallocate(struct btr_pool *btrp, size_t size)
145 {
146 char *end = btrp->rhead + size;
147
148 if (size == 0)
149 return;
150 assert(end <= btrp->area_end);
151 assert(size <= btr_pool_filled(btrp));
152 if (end == btrp->area_end)
153 end = btrp->area_start;
154 if (!btrp->whead)
155 btrp->whead = btrp->rhead;
156 btrp->rhead = end;
157 if (btrp->rhead == btrp->whead)
158 btrp->rhead = btrp->whead = btrp->area_start;
159 }
160
161 #define FOR_EACH_CHILD(_tn, _btrn) list_for_each_entry((_tn), \
162 &((_btrn)->children), node)
163 #define FOR_EACH_CHILD_SAFE(_tn, _tmp, _btrn) \
164 list_for_each_entry_safe((_tn), (_tmp), &((_btrn)->children), node)
165
166 #define FOR_EACH_BUFFER_REF(_br, _btrn) \
167 list_for_each_entry((_br), &(_btrn)->input_queue, node)
168 #define FOR_EACH_BUFFER_REF_SAFE(_br, _tmp, _btrn) \
169 list_for_each_entry_safe((_br), (_tmp), &(_btrn)->input_queue, node)
170
171 /*
172 (parent, child):
173 (NULL, NULL): new, isolated node.
174 (NULL, c): new node becomes root, c must be old root
175 (p, NULL): new leaf node
176 (p, c): new internal node, ch must be child of p
177
178 */
179 struct btr_node *btr_new_node(struct btr_node_description *bnd)
180 {
181 struct btr_node *btrn = para_malloc(sizeof(*btrn));
182
183 btrn->name = para_strdup(bnd->name);
184 btrn->parent = bnd->parent;
185 btrn->execute = bnd->handler;
186 btrn->context = bnd->context;
187 btrn->start.tv_sec = 0;
188 btrn->start.tv_usec = 0;
189 if (bnd->parent)
190 list_add_tail(&btrn->node, &bnd->parent->children);
191 INIT_LIST_HEAD(&btrn->children);
192 INIT_LIST_HEAD(&btrn->input_queue);
193 if (bnd->parent)
194 PARA_INFO_LOG("added %s as child of %s\n", bnd->name, bnd->parent->name);
195 else
196 PARA_INFO_LOG("added %s as btr root\n", bnd->name);
197 return btrn;
198 }
199
200 /*
201 * Allocate a new btr buffer.
202 *
203 * The freshly allocated buffer will have a zero refcount and will
204 * not be associated with a btr pool.
205 */
206 static struct btr_buffer *new_btrb(char *buf, size_t size)
207 {
208 struct btr_buffer *btrb = para_calloc(sizeof(*btrb));
209
210 btrb->buf = buf;
211 btrb->size = size;
212 return btrb;
213 }
214
215 static void dealloc_buffer(struct btr_buffer *btrb)
216 {
217 if (btrb->pool)
218 btr_pool_deallocate(btrb->pool, btrb->size);
219 else
220 free(btrb->buf);
221 }
222
223 static struct btr_buffer_reference *get_first_input_br(struct btr_node *btrn)
224 {
225 if (list_empty(&btrn->input_queue))
226 return NULL;
227 return list_first_entry(&btrn->input_queue,
228 struct btr_buffer_reference, node);
229 }
230
231 /*
232 * Deallocate the reference, release the resources if refcount drops to zero.
233 */
234 static void btr_drop_buffer_reference(struct btr_buffer_reference *br)
235 {
236 struct btr_buffer *btrb = br->btrb;
237
238 list_del(&br->node);
239 free(br);
240 btrb->refcount--;
241 if (btrb->refcount == 0) {
242 dealloc_buffer(btrb);
243 free(btrb);
244 }
245 }
246
247 static void add_btrb_to_children(struct btr_buffer *btrb,
248 struct btr_node *btrn, size_t consumed)
249 {
250 struct btr_node *ch;
251
252 if (btrn->start.tv_sec == 0)
253 btrn->start = *now;
254 FOR_EACH_CHILD(ch, btrn) {
255 struct btr_buffer_reference *br = para_calloc(sizeof(*br));
256 br->btrb = btrb;
257 br->consumed = consumed;
258 list_add_tail(&br->node, &ch->input_queue);
259 btrb->refcount++;
260 if (ch->start.tv_sec == 0)
261 ch->start = *now;
262 }
263 }
264
265 void btr_add_output(char *buf, size_t size, struct btr_node *btrn)
266 {
267 struct btr_buffer *btrb;
268
269 assert(size != 0);
270 if (list_empty(&btrn->children)) {
271 free(buf);
272 return;
273 }
274 btrb = new_btrb(buf, size);
275 add_btrb_to_children(btrb, btrn, 0);
276 }
277
278 void btr_add_output_pool(struct btr_pool *btrp, size_t size,
279 struct btr_node *btrn)
280 {
281 struct btr_buffer *btrb;
282 char *buf;
283 size_t avail;
284
285 assert(size != 0);
286 if (list_empty(&btrn->children))
287 return;
288 avail = btr_pool_get_buffer(btrp, &buf);
289 assert(avail >= size);
290 btr_pool_allocate(btrp, size);
291 btrb = new_btrb(buf, size);
292 btrb->pool = btrp;
293 add_btrb_to_children(btrb, btrn, 0);
294 }
295
296 void btr_copy(const void *src, size_t n, struct btr_pool *btrp,
297 struct btr_node *btrn)
298 {
299 char *buf;
300 size_t sz, copy;
301
302 if (n == 0)
303 return;
304 assert(n <= btr_pool_unused(btrp));
305 sz = btr_pool_get_buffer(btrp, &buf);
306 copy = PARA_MIN(sz, n);
307 memcpy(buf, src, copy);
308 btr_add_output_pool(btrp, copy, btrn);
309 if (copy == n)
310 return;
311 sz = btr_pool_get_buffer(btrp, &buf);
312 assert(sz >= n - copy);
313 memcpy(buf, src + copy, n - copy);
314 btr_add_output_pool(btrp, n - copy, btrn);
315 }
316
317 static void btr_pushdown_br(struct btr_buffer_reference *br, struct btr_node *btrn)
318 {
319 add_btrb_to_children(br->btrb, btrn, br->consumed);
320 btr_drop_buffer_reference(br);
321 }
322
323 void btr_pushdown(struct btr_node *btrn)
324 {
325 struct btr_buffer_reference *br, *tmp;
326
327 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
328 btr_pushdown_br(br, btrn);
329 }
330
331 int btr_pushdown_one(struct btr_node *btrn)
332 {
333 struct btr_buffer_reference *br;
334
335 if (list_empty(&btrn->input_queue))
336 return 0;
337 br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
338 btr_pushdown_br(br, btrn);
339 return 1;
340 }
341
342 /* Return true if this node has no children. */
343 bool btr_no_children(struct btr_node *btrn)
344 {
345 return list_empty(&btrn->children);
346 }
347
348 bool btr_no_parent(struct btr_node *btrn)
349 {
350 return !btrn->parent;
351 }
352
353 bool btr_inplace_ok(struct btr_node *btrn)
354 {
355 if (!btrn->parent)
356 return true;
357 return list_is_singular(&btrn->parent->children);
358 }
359
360 static inline size_t br_available_bytes(struct btr_buffer_reference *br)
361 {
362 return br->btrb->size - br->consumed;
363 }
364
365 size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
366 {
367 if (buf)
368 *buf = br->btrb->buf + br->consumed;
369 return br_available_bytes(br);
370 }
371
372 /**
373 * \return zero if the input buffer queue is empty.
374 */
375 size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
376 {
377 struct btr_buffer_reference *br;
378 char *buf, *result = NULL;
379 size_t sz, rv = 0;
380
381 FOR_EACH_BUFFER_REF(br, btrn) {
382 sz = btr_get_buffer_by_reference(br, &buf);
383 if (!result) {
384 result = buf;
385 rv = sz;
386 if (!br->btrb->pool)
387 break;
388 continue;
389 }
390 if (!br->btrb->pool)
391 break;
392 if (result + rv != buf)
393 break;
394 rv += sz;
395 }
396 if (bufp)
397 *bufp = result;
398 return rv;
399 }
400
401 void btr_consume(struct btr_node *btrn, size_t numbytes)
402 {
403 struct btr_buffer_reference *br, *tmp;
404 size_t sz;
405
406 if (numbytes == 0)
407 return;
408 br = get_first_input_br(btrn);
409 assert(br);
410
411 if (br->wrap_count == 0) {
412 /*
413 * No wrap buffer. Drop buffer references whose buffer
414 * has been fully used. */
415 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn) {
416 if (br->consumed + numbytes <= br->btrb->size) {
417 br->consumed += numbytes;
418 if (br->consumed == br->btrb->size)
419 btr_drop_buffer_reference(br);
420 return;
421 }
422 numbytes -= br->btrb->size - br->consumed;
423 btr_drop_buffer_reference(br);
424 }
425 assert(true);
426 }
427 /*
428
429 We have a wrap buffer, consume from it. If in total,
430 i.e. including previous calls to brt_consume(), less than
431 wrap_count has been consumed, there's nothing more we can do.
432
433 Otherwise we drop the wrap buffer and consume from subsequent
434 buffers of the input queue the correct amount of bytes. This
435 is the total number of bytes that have been consumed from the
436 wrap buffer.
437 */
438 PARA_DEBUG_LOG("consuming %zu/%zu bytes from wrap buffer\n", numbytes,
439 br_available_bytes(br));
440
441 assert(numbytes <= br_available_bytes(br));
442 if (br->consumed + numbytes < br->wrap_count) {
443 br->consumed += numbytes;
444 return;
445 }
446 PARA_DEBUG_LOG("dropping wrap buffer (%zu bytes)\n", br->btrb->size);
447 /* get rid of the wrap buffer */
448 sz = br->consumed + numbytes;
449 btr_drop_buffer_reference(br);
450 return btr_consume(btrn, sz);
451 }
452
453 static void flush_input_queue(struct btr_node *btrn)
454 {
455 struct btr_buffer_reference *br, *tmp;
456 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
457 btr_drop_buffer_reference(br);
458 }
459
460 void btr_free_node(struct btr_node *btrn)
461 {
462 if (!btrn)
463 return;
464 free(btrn->name);
465 free(btrn);
466 }
467
468 void btr_remove_node(struct btr_node *btrn)
469 {
470 struct btr_node *ch;
471
472 if (!btrn)
473 return;
474 PARA_NOTICE_LOG("removing btr node %s from buffer tree\n", btrn->name);
475 FOR_EACH_CHILD(ch, btrn)
476 ch->parent = NULL;
477 flush_input_queue(btrn);
478 if (btrn->parent)
479 list_del(&btrn->node);
480 }
481
482 size_t btr_get_input_queue_size(struct btr_node *btrn)
483 {
484 struct btr_buffer_reference *br;
485 size_t size = 0, wrap_consumed = 0;
486
487 FOR_EACH_BUFFER_REF(br, btrn) {
488 if (br->wrap_count != 0) {
489 wrap_consumed = br->consumed;
490 continue;
491 }
492 size += br_available_bytes(br);
493 }
494 assert(wrap_consumed <= size);
495 size -= wrap_consumed;
496 return size;
497 }
498
499 void btr_splice_out_node(struct btr_node *btrn)
500 {
501 struct btr_node *ch, *tmp;
502
503 assert(btrn);
504 PARA_NOTICE_LOG("splicing out %s\n", btrn->name);
505 btr_pushdown(btrn);
506 if (btrn->parent)
507 list_del(&btrn->node);
508 FOR_EACH_CHILD_SAFE(ch, tmp, btrn) {
509 PARA_INFO_LOG("parent(%s): %s\n", ch->name,
510 btrn->parent? btrn->parent->name : "NULL");
511 ch->parent = btrn->parent;
512 if (btrn->parent)
513 list_move(&ch->node, &btrn->parent->children);
514 }
515 assert(list_empty(&btrn->children));
516 }
517
518 /**
519 * Return the size of the largest input queue.
520 *
521 * Iterates over all children of the given node.
522 */
523 size_t btr_bytes_pending(struct btr_node *btrn)
524 {
525 size_t max_size = 0;
526 struct btr_node *ch;
527
528 FOR_EACH_CHILD(ch, btrn) {
529 size_t size = btr_get_input_queue_size(ch);
530 max_size = PARA_MAX(max_size, size);
531 }
532 return max_size;
533 }
534
535 int btr_exec(struct btr_node *btrn, const char *command, char **value_result)
536 {
537 if (!btrn)
538 return -ERRNO_TO_PARA_ERROR(EINVAL);
539 if (!btrn->execute)
540 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
541 return btrn->execute(btrn, command, value_result);
542 }
543
544 int btr_exec_up(struct btr_node *btrn, const char *command, char **value_result)
545 {
546 int ret;
547
548 for (; btrn; btrn = btrn->parent) {
549 struct btr_node *parent = btrn->parent;
550 if (!parent)
551 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
552 if (!parent->execute)
553 continue;
554 PARA_INFO_LOG("parent: %s, cmd: %s\n", parent->name, command);
555 ret = parent->execute(parent, command, value_result);
556 if (ret == -ERRNO_TO_PARA_ERROR(ENOTSUP))
557 continue;
558 if (ret < 0)
559 return ret;
560 if (value_result && *value_result)
561 PARA_NOTICE_LOG("%s(%s): %s\n", command, parent->name,
562 *value_result);
563 return 1;
564 }
565 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
566 }
567
568 void *btr_context(struct btr_node *btrn)
569 {
570 return btrn->context;
571 }
572
573 static bool need_buffer_pool_merge(struct btr_node *btrn)
574 {
575 struct btr_buffer_reference *br = get_first_input_br(btrn);
576
577 if (!br)
578 return false;
579 if (br->wrap_count != 0)
580 return true;
581 if (br->btrb->pool)
582 return true;
583 return false;
584 }
585
586 static void merge_input_pool(struct btr_node *btrn, size_t dest_size)
587 {
588 struct btr_buffer_reference *br, *wbr = NULL;
589 int num_refs; /* including wrap buffer */
590 char *buf, *buf1 = NULL, *buf2 = NULL;
591 size_t sz, sz1 = 0, sz2 = 0, wsz;
592
593 br = get_first_input_br(btrn);
594 if (!br || br_available_bytes(br) >= dest_size)
595 return;
596 num_refs = 0;
597 FOR_EACH_BUFFER_REF(br, btrn) {
598 num_refs++;
599 sz = btr_get_buffer_by_reference(br, &buf);
600 if (sz == 0)
601 break;
602 if (br->wrap_count != 0) {
603 assert(!wbr);
604 assert(num_refs == 1);
605 wbr = br;
606 if (sz >= dest_size)
607 return;
608 continue;
609 }
610 if (!buf1) {
611 buf1 = buf;
612 sz1 = sz;
613 goto next;
614 }
615 if (buf1 + sz1 == buf) {
616 sz1 += sz;
617 goto next;
618 }
619 if (!buf2) {
620 buf2 = buf;
621 sz2 = sz;
622 goto next;
623 }
624 assert(buf2 + sz2 == buf);
625 sz2 += sz;
626 next:
627 if (sz1 + sz2 >= dest_size)
628 break;
629 }
630 if (!buf2) /* nothing to do */
631 return;
632 assert(buf1 && sz2 > 0);
633 /*
634 * If the second buffer is large, we only take the first part of it to
635 * avoid having to memcpy() huge buffers.
636 */
637 sz2 = PARA_MIN(sz2, (size_t)(64 * 1024));
638 if (!wbr) {
639 /* Make a new wrap buffer combining buf1 and buf2. */
640 sz = sz1 + sz2;
641 buf = para_malloc(sz);
642 PARA_DEBUG_LOG("merging input buffers: (%p:%zu, %p:%zu) -> %p:%zu\n",
643 buf1, sz1, buf2, sz2, buf, sz);
644 memcpy(buf, buf1, sz1);
645 memcpy(buf + sz1, buf2, sz2);
646 br = para_calloc(sizeof(*br));
647 br->btrb = new_btrb(buf, sz);
648 br->btrb->refcount = 1;
649 br->consumed = 0;
650 /* This is a wrap buffer */
651 br->wrap_count = sz1;
652 para_list_add(&br->node, &btrn->input_queue);
653 return;
654 }
655 PARA_DEBUG_LOG("increasing wrap buffer, sz1: %zu, sz2: %zu\n", sz1, sz2);
656 /*
657 * We already have a wrap buffer, but it is too small. It might be
658 * partially used.
659 */
660 wsz = br_available_bytes(wbr);
661 if (wbr->wrap_count == sz1 && wbr->btrb->size >= sz1 + sz2) /* nothing we can do about it */
662 return;
663 sz = sz1 + sz2 - wbr->btrb->size; /* amount of new data */
664 wbr->btrb->size += sz;
665 wbr->btrb->buf = para_realloc(wbr->btrb->buf, wbr->btrb->size);
666 /* copy the new data to the end of the reallocated buffer */
667 assert(sz2 >= sz);
668 memcpy(wbr->btrb->buf + wbr->btrb->size - sz, buf2 + sz2 - sz, sz);
669 }
670
671 /**
672 * Merge the first two input buffers into one.
673 *
674 * This is a quite expensive operation.
675 *
676 * \return The number of buffers that have been available (zero, one or two).
677 */
678 static int merge_input(struct btr_node *btrn)
679 {
680 struct btr_buffer_reference *brs[2], *br;
681 char *bufs[2], *buf;
682 size_t szs[2], sz;
683 int i;
684
685 if (list_empty(&btrn->input_queue))
686 return 0;
687 if (list_is_singular(&btrn->input_queue))
688 return 1;
689 i = 0;
690 /* get references to the first two buffers */
691 FOR_EACH_BUFFER_REF(br, btrn) {
692 brs[i] = br;
693 szs[i] = btr_get_buffer_by_reference(brs[i], bufs + i);
694 i++;
695 if (i == 2)
696 break;
697 }
698 /* make a new btrb that combines the two buffers and a br to it. */
699 sz = szs[0] + szs[1];
700 buf = para_malloc(sz);
701 PARA_DEBUG_LOG("%s: memory merging input buffers: (%zu, %zu) -> %zu\n",
702 btrn->name, szs[0], szs[1], sz);
703 memcpy(buf, bufs[0], szs[0]);
704 memcpy(buf + szs[0], bufs[1], szs[1]);
705
706 br = para_calloc(sizeof(*br));
707 br->btrb = new_btrb(buf, sz);
708 br->btrb->refcount = 1;
709
710 /* replace the first two refs by the new one */
711 btr_drop_buffer_reference(brs[0]);
712 btr_drop_buffer_reference(brs[1]);
713 para_list_add(&br->node, &btrn->input_queue);
714 return 2;
715 }
716
717 void btr_merge(struct btr_node *btrn, size_t dest_size)
718 {
719 if (need_buffer_pool_merge(btrn))
720 return merge_input_pool(btrn, dest_size);
721 for (;;) {
722 char *buf;
723 size_t len = btr_next_buffer(btrn, &buf);
724 if (len >= dest_size)
725 return;
726 PARA_DEBUG_LOG("input size = %zu < %zu = dest\n", len, dest_size);
727 if (merge_input(btrn) < 2)
728 return;
729 }
730 }
731
732 bool btr_eof(struct btr_node *btrn)
733 {
734 char *buf;
735 size_t len = btr_next_buffer(btrn, &buf);
736
737 return (len == 0 && btr_no_parent(btrn));
738 }
739
740 void log_tree_recursively(struct btr_node *btrn, int loglevel, int depth)
741 {
742 struct btr_node *ch;
743 const char spaces[] = " ", *space = spaces + 16 - depth;
744
745 if (depth > 16)
746 return;
747 para_log(loglevel, "%s%s\n", space, btrn->name);
748 FOR_EACH_CHILD(ch, btrn)
749 log_tree_recursively(ch, loglevel, depth + 1);
750 }
751
752 void btr_log_tree(struct btr_node *btrn, int loglevel)
753 {
754 return log_tree_recursively(btrn, loglevel, 0);
755 }
756
757 /*
758 * \return \a root if \a name is \p NULL.
759 */
760 struct btr_node *btr_search_node(const char *name, struct btr_node *root)
761 {
762 struct btr_node *ch;
763
764 if (!name)
765 return root;
766 if (!strcmp(root->name, name))
767 return root;
768 FOR_EACH_CHILD(ch, root) {
769 struct btr_node *result = btr_search_node(name, ch);
770 if (result)
771 return result;
772 }
773 return NULL;
774 }
775
776 /** 640K ought to be enough for everybody ;) */
777 #define BTRN_MAX_PENDING (640 * 1024)
778
779 int btr_node_status(struct btr_node *btrn, size_t min_iqs,
780 enum btr_node_type type)
781 {
782 size_t iqs;
783
784 assert(btrn);
785 if (type != BTR_NT_LEAF) {
786 if (btr_no_children(btrn))
787 return -E_BTR_NO_CHILD;
788 if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
789 return 0;
790 }
791 if (type != BTR_NT_ROOT) {
792 if (btr_eof(btrn))
793 return -E_BTR_EOF;
794 iqs = btr_get_input_queue_size(btrn);
795 if (iqs == 0) /* we have a parent, because not eof */
796 return 0;
797 if (iqs < min_iqs && !btr_no_parent(btrn))
798 return 0;
799 }
800 return 1;
801 }
802
803 void btr_get_node_start(struct btr_node *btrn, struct timeval *tv)
804 {
805 *tv = btrn->start;
806 }