[btr] Fix btr_get_input_queue_size().
[paraslash.git] / buffer_tree.c
1 #include <regex.h>
2 #include <stdbool.h>
3
4 #include "para.h"
5 #include "list.h"
6 #include "string.h"
7 #include "buffer_tree.h"
8 #include "error.h"
9 #include "sched.h"
10
11 struct btr_pool {
12 char *name;
13 char *area_start;
14 char *area_end;
15 char *rhead;
16 char *whead;
17 };
18
19 enum btr_buffer_flags {
20 /* changes the way the buffer is deallocated */
21 BTR_BF_BTR_POOL = 1,
22 };
23
24 struct btr_buffer {
25 char *buf;
26 size_t size;
27 /** The number of references to this buffer. */
28 int refcount;
29 struct btr_pool *pool;
30 };
31
32 struct btr_buffer_reference {
33 struct btr_buffer *btrb;
34 size_t consumed;
35 /* Each buffer reference belongs to the buffer queue list of some buffer tree node. */
36 struct list_head node;
37 size_t wrap_count;
38 };
39
40 struct btr_node {
41 char *name;
42 struct btr_node *parent;
43 /* The position of this btr node in the buffer tree. */
44 struct list_head node;
45 /* The children nodes of this btr node are linked together in a list. */
46 struct list_head children;
47 /* Time of first data transfer. */
48 struct timeval start;
49 /**
50 * The input queue is a list of references to btr buffers. Each item on
51 * the list represents an input buffer which has not been completely
52 * used by this btr node.
53 */
54 struct list_head input_queue;
55 btr_command_handler execute;
56 void *context;
57 };
58
59 struct btr_pool *btr_pool_new(const char *name, size_t area_size)
60 {
61 struct btr_pool *btrp;
62
63 PARA_INFO_LOG("%s, %zu bytes\n", name, area_size);
64 btrp = para_malloc(sizeof(*btrp));
65 btrp->area_start = para_malloc(area_size);
66 btrp->area_end = btrp->area_start + area_size;
67 btrp->rhead = btrp->area_start;
68 btrp->whead = btrp->area_start;
69 btrp->name = para_strdup(name);
70 return btrp;
71 }
72
73 /* whead = NULL means area full */
74
75 void btr_pool_free(struct btr_pool *btrp)
76 {
77 if (!btrp)
78 return;
79 free(btrp->area_start);
80 free(btrp->name);
81 free(btrp);
82 }
83
84 size_t btr_pool_size(struct btr_pool *btrp)
85 {
86 return btrp->area_end - btrp->area_start;
87 }
88
89 size_t btr_pool_filled(struct btr_pool *btrp)
90 {
91 if (!btrp->whead)
92 return btr_pool_size(btrp);
93 if (btrp->rhead <= btrp->whead)
94 return btrp->whead - btrp->rhead;
95 return btr_pool_size(btrp) - (btrp->rhead - btrp->whead);
96 }
97
98 size_t btr_pool_unused(struct btr_pool *btrp)
99 {
100 return btr_pool_size(btrp) - btr_pool_filled(btrp);
101 }
102
103 size_t btr_pool_available(struct btr_pool *btrp)
104 {
105 if (!btrp->whead)
106 return 0;
107 if (btrp->rhead <= btrp->whead)
108 return btrp->area_end - btrp->whead;
109 return btrp->rhead - btrp->whead;
110 }
111
112 size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result)
113 {
114 if (result)
115 *result = btrp->whead;
116 return btr_pool_available(btrp);
117 }
118
119 void btr_pool_allocate(struct btr_pool *btrp, size_t size)
120 {
121 char *end;
122
123 if (size == 0)
124 return;
125 //PARA_CRIT_LOG("filled: %zu, alloc %zu\n", btr_pool_filled(btrp), size);
126 assert(size <= btr_pool_available(btrp));
127 end = btrp->whead + size;
128 assert(end <= btrp->area_end);
129
130 if (end == btrp->area_end) {
131 PARA_DEBUG_LOG("%s: end of pool area reached\n", btrp->name);
132 end = btrp->area_start;
133 }
134 if (end == btrp->rhead) {
135 PARA_DEBUG_LOG("btrp buffer full\n");
136 end = NULL; /* buffer full */
137 }
138 btrp->whead = end;
139 //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
140 }
141
142 static void btr_pool_deallocate(struct btr_pool *btrp, size_t size)
143 {
144 char *end = btrp->rhead + size;
145
146 //PARA_CRIT_LOG("filled: %zu, dealloc %zu\n", btr_pool_filled(btrp), size);
147 if (size == 0)
148 return;
149 assert(end <= btrp->area_end);
150 assert(size <= btr_pool_filled(btrp));
151 if (end == btrp->area_end)
152 end = btrp->area_start;
153 if (!btrp->whead)
154 btrp->whead = btrp->rhead;
155 btrp->rhead = end;
156 if (btrp->rhead == btrp->whead)
157 btrp->rhead = btrp->whead = btrp->area_start;
158 //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
159 }
160
161 #define FOR_EACH_CHILD(_tn, _btrn) list_for_each_entry((_tn), \
162 &((_btrn)->children), node)
163 #define FOR_EACH_CHILD_SAFE(_tn, _tmp, _btrn) \
164 list_for_each_entry_safe((_tn), (_tmp), &((_btrn)->children), node)
165
166 #define FOR_EACH_BUFFER_REF(_br, _btrn) \
167 list_for_each_entry((_br), &(_btrn)->input_queue, node)
168 #define FOR_EACH_BUFFER_REF_SAFE(_br, _tmp, _btrn) \
169 list_for_each_entry_safe((_br), (_tmp), &(_btrn)->input_queue, node)
170
171 /*
172 (parent, child):
173 (NULL, NULL): new, isolated node.
174 (NULL, c): new node becomes root, c must be old root
175 (p, NULL): new leaf node
176 (p, c): new internal node, ch must be child of p
177
178 */
179 struct btr_node *btr_new_node(struct btr_node_description *bnd)
180 {
181 struct btr_node *btrn = para_malloc(sizeof(*btrn));
182
183 btrn->name = para_strdup(bnd->name);
184 btrn->parent = bnd->parent;
185 btrn->execute = bnd->handler;
186 btrn->context = bnd->context;
187 btrn->start.tv_sec = 0;
188 btrn->start.tv_usec = 0;
189 if (bnd->parent)
190 list_add_tail(&btrn->node, &bnd->parent->children);
191 INIT_LIST_HEAD(&btrn->children);
192 INIT_LIST_HEAD(&btrn->input_queue);
193 if (bnd->parent)
194 PARA_INFO_LOG("added %s as child of %s\n", bnd->name, bnd->parent->name);
195 else
196 PARA_INFO_LOG("added %s as btr root\n", bnd->name);
197 return btrn;
198 }
199
200 /*
201 * Allocate a new btr buffer.
202 *
203 * The freshly allocated buffer will have a zero refcount and will
204 * not be associated with a btr pool.
205 */
206 static struct btr_buffer *new_btrb(char *buf, size_t size)
207 {
208 struct btr_buffer *btrb = para_calloc(sizeof(*btrb));
209
210 btrb->buf = buf;
211 btrb->size = size;
212 return btrb;
213 }
214
215 static void dealloc_buffer(struct btr_buffer *btrb)
216 {
217 if (btrb->pool)
218 btr_pool_deallocate(btrb->pool, btrb->size);
219 else
220 free(btrb->buf);
221 }
222
223 static struct btr_buffer_reference *get_first_input_br(struct btr_node *btrn)
224 {
225 if (list_empty(&btrn->input_queue))
226 return NULL;
227 return list_first_entry(&btrn->input_queue,
228 struct btr_buffer_reference, node);
229 }
230
231 /*
232 * Deallocate the reference, release the resources if refcount drops to zero.
233 */
234 static void btr_drop_buffer_reference(struct btr_buffer_reference *br)
235 {
236 struct btr_buffer *btrb = br->btrb;
237
238 //PARA_CRIT_LOG("dropping buffer reference %p\n", br);
239 list_del(&br->node);
240 free(br);
241 btrb->refcount--;
242 if (btrb->refcount == 0) {
243 dealloc_buffer(btrb);
244 free(btrb);
245 }
246 }
247
248 static void add_btrb_to_children(struct btr_buffer *btrb,
249 struct btr_node *btrn, size_t consumed)
250 {
251 struct btr_node *ch;
252
253 if (btrn->start.tv_sec == 0)
254 btrn->start = *now;
255 FOR_EACH_CHILD(ch, btrn) {
256 struct btr_buffer_reference *br = para_calloc(sizeof(*br));
257 br->btrb = btrb;
258 br->consumed = consumed;
259 list_add_tail(&br->node, &ch->input_queue);
260 btrb->refcount++;
261 if (ch->start.tv_sec == 0)
262 ch->start = *now;
263 }
264 }
265
266 void btr_add_output(char *buf, size_t size, struct btr_node *btrn)
267 {
268 struct btr_buffer *btrb;
269
270 assert(size != 0);
271 if (list_empty(&btrn->children)) {
272 free(buf);
273 return;
274 }
275 btrb = new_btrb(buf, size);
276 add_btrb_to_children(btrb, btrn, 0);
277 }
278
279 void btr_add_output_pool(struct btr_pool *btrp, size_t size,
280 struct btr_node *btrn)
281 {
282 struct btr_buffer *btrb;
283 char *buf;
284 size_t avail;
285
286 assert(size != 0);
287 if (list_empty(&btrn->children))
288 return;
289 avail = btr_pool_get_buffer(btrp, &buf);
290 assert(avail >= size);
291 btr_pool_allocate(btrp, size);
292 btrb = new_btrb(buf, size);
293 btrb->pool = btrp;
294 add_btrb_to_children(btrb, btrn, 0);
295 }
296
297 void btr_copy(const void *src, size_t n, struct btr_pool *btrp,
298 struct btr_node *btrn)
299 {
300 char *buf;
301 size_t sz, copy;
302
303 if (n == 0)
304 return;
305 assert(n <= btr_pool_unused(btrp));
306 sz = btr_pool_get_buffer(btrp, &buf);
307 copy = PARA_MIN(sz, n);
308 memcpy(buf, src, copy);
309 btr_add_output_pool(btrp, copy, btrn);
310 if (copy == n)
311 return;
312 sz = btr_pool_get_buffer(btrp, &buf);
313 assert(sz >= n - copy);
314 memcpy(buf, src + copy, n - copy);
315 btr_add_output_pool(btrp, n - copy, btrn);
316 }
317
318 static void btr_pushdown_br(struct btr_buffer_reference *br, struct btr_node *btrn)
319 {
320 add_btrb_to_children(br->btrb, btrn, br->consumed);
321 btr_drop_buffer_reference(br);
322 }
323
324 void btr_pushdown(struct btr_node *btrn)
325 {
326 struct btr_buffer_reference *br, *tmp;
327
328 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
329 btr_pushdown_br(br, btrn);
330 }
331
332 int btr_pushdown_one(struct btr_node *btrn)
333 {
334 struct btr_buffer_reference *br;
335
336 if (list_empty(&btrn->input_queue))
337 return 0;
338 br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
339 btr_pushdown_br(br, btrn);
340 return 1;
341 }
342
343 /* Return true if this node has no children. */
344 bool btr_no_children(struct btr_node *btrn)
345 {
346 return list_empty(&btrn->children);
347 }
348
349 bool btr_no_parent(struct btr_node *btrn)
350 {
351 return !btrn->parent;
352 }
353
354 bool btr_inplace_ok(struct btr_node *btrn)
355 {
356 if (!btrn->parent)
357 return true;
358 return list_is_singular(&btrn->parent->children);
359 }
360
361 static inline size_t br_available_bytes(struct btr_buffer_reference *br)
362 {
363 return br->btrb->size - br->consumed;
364 }
365
366 size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
367 {
368 if (buf)
369 *buf = br->btrb->buf + br->consumed;
370 return br_available_bytes(br);
371 }
372
373 /**
374 * \return zero if the input buffer queue is empty.
375 */
376 size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
377 {
378 struct btr_buffer_reference *br;
379 char *buf, *result = NULL;
380 size_t sz, rv = 0;
381
382 FOR_EACH_BUFFER_REF(br, btrn) {
383 sz = btr_get_buffer_by_reference(br, &buf);
384 if (!result) {
385 result = buf;
386 rv = sz;
387 if (!br->btrb->pool)
388 break;
389 continue;
390 }
391 if (!br->btrb->pool)
392 break;
393 if (result + rv != buf) {
394 PARA_DEBUG_LOG("%s: pool merge impossible: %p != %p\n",
395 btrn->name, result + rv, buf);
396 break;
397 }
398 // PARA_CRIT_LOG("%s: inplace merge (%zu, %zu)->%zu\n", btrn->name,
399 // rv, sz, rv + sz);
400 // PARA_CRIT_LOG("%s: inplace merge %p (%zu)\n", btrn->name,
401 // result, sz);
402 rv += sz;
403 }
404 if (bufp)
405 *bufp = result;
406 return rv;
407 }
408
409 void btr_consume(struct btr_node *btrn, size_t numbytes)
410 {
411 struct btr_buffer_reference *br, *tmp;
412 size_t sz;
413
414 if (numbytes == 0)
415 return;
416 br = get_first_input_br(btrn);
417 assert(br);
418
419 //PARA_CRIT_LOG("wrap count: %zu\n", br->wrap_count);
420 if (br->wrap_count == 0) {
421 /*
422 * No wrap buffer. Drop buffer references whose buffer
423 * has been fully used. */
424 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn) {
425 if (br->consumed + numbytes <= br->btrb->size) {
426 br->consumed += numbytes;
427 if (br->consumed == br->btrb->size)
428 btr_drop_buffer_reference(br);
429 return;
430 }
431 numbytes -= br->btrb->size - br->consumed;
432 btr_drop_buffer_reference(br);
433 }
434 assert(true);
435 }
436 /*
437
438 We have a wrap buffer, consume from it. If in total,
439 i.e. including previous calls to brt_consume(), less than
440 wrap_count has been consumed, there's nothing more we can do.
441
442 Otherwise we drop the wrap buffer and consume from subsequent
443 buffers of the input queue the correct amount of bytes. This
444 is the total number of bytes that have been consumed from the
445 wrap buffer.
446 */
447 PARA_DEBUG_LOG("consuming %zu/%zu bytes from wrap buffer\n", numbytes,
448 br_available_bytes(br));
449
450 assert(numbytes <= br_available_bytes(br));
451 if (br->consumed + numbytes < br->wrap_count) {
452 br->consumed += numbytes;
453 return;
454 }
455 PARA_DEBUG_LOG("dropping wrap buffer (%zu bytes)\n", br->btrb->size);
456 /* get rid of the wrap buffer */
457 sz = br->consumed + numbytes;
458 btr_drop_buffer_reference(br);
459 return btr_consume(btrn, sz);
460 }
461
462 static void flush_input_queue(struct btr_node *btrn)
463 {
464 struct btr_buffer_reference *br, *tmp;
465 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
466 btr_drop_buffer_reference(br);
467 }
468
469 void btr_free_node(struct btr_node *btrn)
470 {
471 if (!btrn)
472 return;
473 free(btrn->name);
474 free(btrn);
475 }
476
477 void btr_remove_node(struct btr_node *btrn)
478 {
479 struct btr_node *ch;
480
481 if (!btrn)
482 return;
483 PARA_NOTICE_LOG("removing btr node %s from buffer tree\n", btrn->name);
484 FOR_EACH_CHILD(ch, btrn)
485 ch->parent = NULL;
486 flush_input_queue(btrn);
487 if (btrn->parent)
488 list_del(&btrn->node);
489 }
490
491 size_t btr_get_input_queue_size(struct btr_node *btrn)
492 {
493 struct btr_buffer_reference *br;
494 size_t size = 0, wrap_consumed = 0;
495
496 FOR_EACH_BUFFER_REF(br, btrn) {
497 if (br->wrap_count != 0) {
498 wrap_consumed = br->consumed;
499 continue;
500 }
501 size += br_available_bytes(br);
502 }
503 assert(wrap_consumed <= size);
504 size -= wrap_consumed;
505 return size;
506 }
507
508 void btr_splice_out_node(struct btr_node *btrn)
509 {
510 struct btr_node *ch, *tmp;
511
512 assert(btrn);
513 PARA_NOTICE_LOG("splicing out %s\n", btrn->name);
514 btr_pushdown(btrn);
515 if (btrn->parent)
516 list_del(&btrn->node);
517 FOR_EACH_CHILD_SAFE(ch, tmp, btrn) {
518 PARA_INFO_LOG("parent(%s): %s\n", ch->name,
519 btrn->parent? btrn->parent->name : "NULL");
520 ch->parent = btrn->parent;
521 if (btrn->parent)
522 list_move(&ch->node, &btrn->parent->children);
523 }
524 assert(list_empty(&btrn->children));
525 }
526
527 /**
528 * Return the size of the largest input queue.
529 *
530 * Iterates over all children of the given node.
531 */
532 size_t btr_bytes_pending(struct btr_node *btrn)
533 {
534 size_t max_size = 0;
535 struct btr_node *ch;
536
537 FOR_EACH_CHILD(ch, btrn) {
538 size_t size = btr_get_input_queue_size(ch);
539 max_size = PARA_MAX(max_size, size);
540 }
541 return max_size;
542 }
543
544 int btr_exec(struct btr_node *btrn, const char *command, char **value_result)
545 {
546 if (!btrn)
547 return -ERRNO_TO_PARA_ERROR(EINVAL);
548 if (!btrn->execute)
549 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
550 return btrn->execute(btrn, command, value_result);
551 }
552
553 int btr_exec_up(struct btr_node *btrn, const char *command, char **value_result)
554 {
555 int ret;
556
557 for (; btrn; btrn = btrn->parent) {
558 struct btr_node *parent = btrn->parent;
559 if (!parent)
560 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
561 if (!parent->execute)
562 continue;
563 PARA_INFO_LOG("parent: %s, cmd: %s\n", parent->name, command);
564 ret = parent->execute(parent, command, value_result);
565 if (ret == -ERRNO_TO_PARA_ERROR(ENOTSUP))
566 continue;
567 if (ret < 0)
568 return ret;
569 if (value_result && *value_result)
570 PARA_NOTICE_LOG("%s(%s): %s\n", command, parent->name,
571 *value_result);
572 return 1;
573 }
574 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
575 }
576
577 void *btr_context(struct btr_node *btrn)
578 {
579 return btrn->context;
580 }
581
582 static bool need_buffer_pool_merge(struct btr_node *btrn)
583 {
584 struct btr_buffer_reference *br = get_first_input_br(btrn);
585
586 if (!br)
587 return false;
588 if (br->wrap_count != 0)
589 return true;
590 if (br->btrb->pool)
591 return true;
592 return false;
593 }
594
595 static void merge_input_pool(struct btr_node *btrn, size_t dest_size)
596 {
597 struct btr_buffer_reference *br, *wbr = NULL;
598 int num_refs; /* including wrap buffer */
599 char *buf, *buf1 = NULL, *buf2 = NULL;
600 size_t sz, sz1 = 0, sz2 = 0, wsz;
601
602 br = get_first_input_br(btrn);
603 if (!br || br_available_bytes(br) >= dest_size)
604 return;
605 num_refs = 0;
606 FOR_EACH_BUFFER_REF(br, btrn) {
607 num_refs++;
608 sz = btr_get_buffer_by_reference(br, &buf);
609 if (sz == 0)
610 break;
611 if (br->wrap_count != 0) {
612 assert(!wbr);
613 assert(num_refs == 1);
614 wbr = br;
615 if (sz >= dest_size)
616 return;
617 continue;
618 }
619 if (!buf1) {
620 buf1 = buf;
621 sz1 = sz;
622 goto next;
623 }
624 if (buf1 + sz1 == buf) {
625 sz1 += sz;
626 goto next;
627 }
628 if (!buf2) {
629 buf2 = buf;
630 sz2 = sz;
631 goto next;
632 }
633 assert(buf2 + sz2 == buf);
634 sz2 += sz;
635 next:
636 if (sz1 + sz2 >= dest_size)
637 break;
638 }
639 if (!buf2) /* nothing to do */
640 return;
641 assert(buf1 && sz2 > 0);
642 /*
643 * If the second buffer is large, we only take the first part of it to
644 * avoid having to memcpy() huge buffers.
645 */
646 sz2 = PARA_MIN(sz2, (size_t)(64 * 1024));
647 if (!wbr) {
648 /* Make a new wrap buffer combining buf1 and buf2. */
649 sz = sz1 + sz2;
650 buf = para_malloc(sz);
651 PARA_DEBUG_LOG("merging input buffers: (%p:%zu, %p:%zu) -> %p:%zu\n",
652 buf1, sz1, buf2, sz2, buf, sz);
653 memcpy(buf, buf1, sz1);
654 memcpy(buf + sz1, buf2, sz2);
655 br = para_calloc(sizeof(*br));
656 br->btrb = new_btrb(buf, sz);
657 br->btrb->refcount = 1;
658 br->consumed = 0;
659 /* This is a wrap buffer */
660 br->wrap_count = sz1;
661 para_list_add(&br->node, &btrn->input_queue);
662 return;
663 }
664 PARA_DEBUG_LOG("increasing wrap buffer, sz1: %zu, sz2: %zu\n", sz1, sz2);
665 /*
666 * We already have a wrap buffer, but it is too small. It might be
667 * partially used.
668 */
669 wsz = br_available_bytes(wbr);
670 if (wbr->wrap_count == sz1 && wbr->btrb->size >= sz1 + sz2) /* nothing we can do about it */
671 return;
672 sz = sz1 + sz2 - wbr->btrb->size; /* amount of new data */
673 wbr->btrb->size += sz;
674 wbr->btrb->buf = para_realloc(wbr->btrb->buf, wbr->btrb->size);
675 /* copy the new data to the end of the reallocated buffer */
676 assert(sz2 >= sz);
677 memcpy(wbr->btrb->buf + wbr->btrb->size - sz, buf2 + sz2 - sz, sz);
678 }
679
680 /**
681 * Merge the first two input buffers into one.
682 *
683 * This is a quite expensive operation.
684 *
685 * \return The number of buffers that have been available (zero, one or two).
686 */
687 static int merge_input(struct btr_node *btrn)
688 {
689 struct btr_buffer_reference *brs[2], *br;
690 char *bufs[2], *buf;
691 size_t szs[2], sz;
692 int i;
693
694 if (list_empty(&btrn->input_queue))
695 return 0;
696 if (list_is_singular(&btrn->input_queue))
697 return 1;
698 i = 0;
699 /* get references to the first two buffers */
700 FOR_EACH_BUFFER_REF(br, btrn) {
701 brs[i] = br;
702 szs[i] = btr_get_buffer_by_reference(brs[i], bufs + i);
703 i++;
704 if (i == 2)
705 break;
706 }
707 /* make a new btrb that combines the two buffers and a br to it. */
708 sz = szs[0] + szs[1];
709 buf = para_malloc(sz);
710 PARA_DEBUG_LOG("%s: memory merging input buffers: (%zu, %zu) -> %zu\n",
711 btrn->name, szs[0], szs[1], sz);
712 memcpy(buf, bufs[0], szs[0]);
713 memcpy(buf + szs[0], bufs[1], szs[1]);
714
715 br = para_calloc(sizeof(*br));
716 br->btrb = new_btrb(buf, sz);
717 br->btrb->refcount = 1;
718
719 /* replace the first two refs by the new one */
720 btr_drop_buffer_reference(brs[0]);
721 btr_drop_buffer_reference(brs[1]);
722 para_list_add(&br->node, &btrn->input_queue);
723 return 2;
724 }
725
726 void btr_merge(struct btr_node *btrn, size_t dest_size)
727 {
728 if (need_buffer_pool_merge(btrn))
729 return merge_input_pool(btrn, dest_size);
730 for (;;) {
731 char *buf;
732 size_t len = btr_next_buffer(btrn, &buf);
733 if (len >= dest_size)
734 return;
735 PARA_DEBUG_LOG("input size = %zu < %zu = dest\n", len, dest_size);
736 if (merge_input(btrn) < 2)
737 return;
738 }
739 }
740
741 bool btr_eof(struct btr_node *btrn)
742 {
743 char *buf;
744 size_t len = btr_next_buffer(btrn, &buf);
745
746 return (len == 0 && btr_no_parent(btrn));
747 }
748
749 void log_tree_recursively(struct btr_node *btrn, int loglevel, int depth)
750 {
751 struct btr_node *ch;
752 const char spaces[] = " ", *space = spaces + 16 - depth;
753
754 if (depth > 16)
755 return;
756 para_log(loglevel, "%s%s\n", space, btrn->name);
757 FOR_EACH_CHILD(ch, btrn)
758 log_tree_recursively(ch, loglevel, depth + 1);
759 }
760
761 void btr_log_tree(struct btr_node *btrn, int loglevel)
762 {
763 return log_tree_recursively(btrn, loglevel, 0);
764 }
765
766 /*
767 * \return \a root if \a name is \p NULL.
768 */
769 struct btr_node *btr_search_node(const char *name, struct btr_node *root)
770 {
771 struct btr_node *ch;
772
773 if (!name)
774 return root;
775 if (!strcmp(root->name, name))
776 return root;
777 FOR_EACH_CHILD(ch, root) {
778 struct btr_node *result = btr_search_node(name, ch);
779 if (result)
780 return result;
781 }
782 return NULL;
783 }
784
785 /** 640K ought to be enough for everybody ;) */
786 #define BTRN_MAX_PENDING (640 * 1024)
787
788 int btr_node_status(struct btr_node *btrn, size_t min_iqs,
789 enum btr_node_type type)
790 {
791 size_t iqs;
792
793 if (!btrn)
794 return 0;
795 if (type != BTR_NT_LEAF) {
796 if (btr_no_children(btrn))
797 return -E_BTR_NO_CHILD;
798 if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
799 return 0;
800 }
801 if (type != BTR_NT_ROOT) {
802 if (btr_eof(btrn))
803 return -E_BTR_EOF;
804 iqs = btr_get_input_queue_size(btrn);
805 if (iqs == 0) /* we have a parent, because not eof */
806 return 0;
807 if (iqs < min_iqs && !btr_no_parent(btrn))
808 return 0;
809 }
810 return 1;
811 }
812
813 void btr_get_node_start(struct btr_node *btrn, struct timeval *tv)
814 {
815 *tv = btrn->start;
816 }