[btr] Add a comment to btr_pool_available().
[paraslash.git] / buffer_tree.c
1 #include <regex.h>
2 #include <stdbool.h>
3
4 #include "para.h"
5 #include "list.h"
6 #include "string.h"
7 #include "buffer_tree.h"
8 #include "error.h"
9 #include "sched.h"
10
11 struct btr_pool {
12 char *name;
13 char *area_start;
14 char *area_end;
15 char *rhead;
16 char *whead;
17 };
18
19 enum btr_buffer_flags {
20 /* changes the way the buffer is deallocated */
21 BTR_BF_BTR_POOL = 1,
22 };
23
24 struct btr_buffer {
25 char *buf;
26 size_t size;
27 /** The number of references to this buffer. */
28 int refcount;
29 struct btr_pool *pool;
30 };
31
32 struct btr_buffer_reference {
33 struct btr_buffer *btrb;
34 size_t consumed;
35 /* Each buffer reference belongs to the buffer queue list of some buffer tree node. */
36 struct list_head node;
37 size_t wrap_count;
38 };
39
40 struct btr_node {
41 char *name;
42 struct btr_node *parent;
43 /* The position of this btr node in the buffer tree. */
44 struct list_head node;
45 /* The children nodes of this btr node are linked together in a list. */
46 struct list_head children;
47 /* Time of first data transfer. */
48 struct timeval start;
49 /**
50 * The input queue is a list of references to btr buffers. Each item on
51 * the list represents an input buffer which has not been completely
52 * used by this btr node.
53 */
54 struct list_head input_queue;
55 btr_command_handler execute;
56 void *context;
57 };
58
59 struct btr_pool *btr_pool_new(const char *name, size_t area_size)
60 {
61 struct btr_pool *btrp;
62
63 PARA_INFO_LOG("%s, %zu bytes\n", name, area_size);
64 btrp = para_malloc(sizeof(*btrp));
65 btrp->area_start = para_malloc(area_size);
66 btrp->area_end = btrp->area_start + area_size;
67 btrp->rhead = btrp->area_start;
68 btrp->whead = btrp->area_start;
69 btrp->name = para_strdup(name);
70 return btrp;
71 }
72
73 /* whead = NULL means area full */
74
75 void btr_pool_free(struct btr_pool *btrp)
76 {
77 if (!btrp)
78 return;
79 free(btrp->area_start);
80 free(btrp->name);
81 free(btrp);
82 }
83
84 size_t btr_pool_size(struct btr_pool *btrp)
85 {
86 return btrp->area_end - btrp->area_start;
87 }
88
89 size_t btr_pool_filled(struct btr_pool *btrp)
90 {
91 if (!btrp->whead)
92 return btr_pool_size(btrp);
93 if (btrp->rhead <= btrp->whead)
94 return btrp->whead - btrp->rhead;
95 return btr_pool_size(btrp) - (btrp->rhead - btrp->whead);
96 }
97
98 size_t btr_pool_unused(struct btr_pool *btrp)
99 {
100 return btr_pool_size(btrp) - btr_pool_filled(btrp);
101 }
102
103 /*
104 * Return maximal size available for one read. This is
105 * smaller than the value returned by btr_pool_unused().
106 */
107 size_t btr_pool_available(struct btr_pool *btrp)
108 {
109 if (!btrp->whead)
110 return 0;
111 if (btrp->rhead <= btrp->whead)
112 return btrp->area_end - btrp->whead;
113 return btrp->rhead - btrp->whead;
114 }
115
116 size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result)
117 {
118 if (result)
119 *result = btrp->whead;
120 return btr_pool_available(btrp);
121 }
122
123 void btr_pool_allocate(struct btr_pool *btrp, size_t size)
124 {
125 char *end;
126
127 if (size == 0)
128 return;
129 //PARA_CRIT_LOG("filled: %zu, alloc %zu\n", btr_pool_filled(btrp), size);
130 assert(size <= btr_pool_available(btrp));
131 end = btrp->whead + size;
132 assert(end <= btrp->area_end);
133
134 if (end == btrp->area_end) {
135 PARA_DEBUG_LOG("%s: end of pool area reached\n", btrp->name);
136 end = btrp->area_start;
137 }
138 if (end == btrp->rhead) {
139 PARA_DEBUG_LOG("btrp buffer full\n");
140 end = NULL; /* buffer full */
141 }
142 btrp->whead = end;
143 //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
144 }
145
146 static void btr_pool_deallocate(struct btr_pool *btrp, size_t size)
147 {
148 char *end = btrp->rhead + size;
149
150 //PARA_CRIT_LOG("filled: %zu, dealloc %zu\n", btr_pool_filled(btrp), size);
151 if (size == 0)
152 return;
153 assert(end <= btrp->area_end);
154 assert(size <= btr_pool_filled(btrp));
155 if (end == btrp->area_end)
156 end = btrp->area_start;
157 if (!btrp->whead)
158 btrp->whead = btrp->rhead;
159 btrp->rhead = end;
160 if (btrp->rhead == btrp->whead)
161 btrp->rhead = btrp->whead = btrp->area_start;
162 //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
163 }
164
165 #define FOR_EACH_CHILD(_tn, _btrn) list_for_each_entry((_tn), \
166 &((_btrn)->children), node)
167 #define FOR_EACH_CHILD_SAFE(_tn, _tmp, _btrn) \
168 list_for_each_entry_safe((_tn), (_tmp), &((_btrn)->children), node)
169
170 #define FOR_EACH_BUFFER_REF(_br, _btrn) \
171 list_for_each_entry((_br), &(_btrn)->input_queue, node)
172 #define FOR_EACH_BUFFER_REF_SAFE(_br, _tmp, _btrn) \
173 list_for_each_entry_safe((_br), (_tmp), &(_btrn)->input_queue, node)
174
175 /*
176 (parent, child):
177 (NULL, NULL): new, isolated node.
178 (NULL, c): new node becomes root, c must be old root
179 (p, NULL): new leaf node
180 (p, c): new internal node, ch must be child of p
181
182 */
183 struct btr_node *btr_new_node(struct btr_node_description *bnd)
184 {
185 struct btr_node *btrn = para_malloc(sizeof(*btrn));
186
187 btrn->name = para_strdup(bnd->name);
188 btrn->parent = bnd->parent;
189 btrn->execute = bnd->handler;
190 btrn->context = bnd->context;
191 btrn->start.tv_sec = 0;
192 btrn->start.tv_usec = 0;
193 if (bnd->parent)
194 list_add_tail(&btrn->node, &bnd->parent->children);
195 INIT_LIST_HEAD(&btrn->children);
196 INIT_LIST_HEAD(&btrn->input_queue);
197 if (bnd->parent)
198 PARA_INFO_LOG("added %s as child of %s\n", bnd->name, bnd->parent->name);
199 else
200 PARA_INFO_LOG("added %s as btr root\n", bnd->name);
201 return btrn;
202 }
203
204 /*
205 * Allocate a new btr buffer.
206 *
207 * The freshly allocated buffer will have a zero refcount and will
208 * not be associated with a btr pool.
209 */
210 static struct btr_buffer *new_btrb(char *buf, size_t size)
211 {
212 struct btr_buffer *btrb = para_calloc(sizeof(*btrb));
213
214 btrb->buf = buf;
215 btrb->size = size;
216 return btrb;
217 }
218
219 static void dealloc_buffer(struct btr_buffer *btrb)
220 {
221 if (btrb->pool)
222 btr_pool_deallocate(btrb->pool, btrb->size);
223 else
224 free(btrb->buf);
225 }
226
227 static struct btr_buffer_reference *get_first_input_br(struct btr_node *btrn)
228 {
229 if (list_empty(&btrn->input_queue))
230 return NULL;
231 return list_first_entry(&btrn->input_queue,
232 struct btr_buffer_reference, node);
233 }
234
235 /*
236 * Deallocate the reference, release the resources if refcount drops to zero.
237 */
238 static void btr_drop_buffer_reference(struct btr_buffer_reference *br)
239 {
240 struct btr_buffer *btrb = br->btrb;
241
242 //PARA_CRIT_LOG("dropping buffer reference %p\n", br);
243 list_del(&br->node);
244 free(br);
245 btrb->refcount--;
246 if (btrb->refcount == 0) {
247 dealloc_buffer(btrb);
248 free(btrb);
249 }
250 }
251
252 static void add_btrb_to_children(struct btr_buffer *btrb,
253 struct btr_node *btrn, size_t consumed)
254 {
255 struct btr_node *ch;
256
257 if (btrn->start.tv_sec == 0)
258 btrn->start = *now;
259 FOR_EACH_CHILD(ch, btrn) {
260 struct btr_buffer_reference *br = para_calloc(sizeof(*br));
261 br->btrb = btrb;
262 br->consumed = consumed;
263 list_add_tail(&br->node, &ch->input_queue);
264 btrb->refcount++;
265 if (ch->start.tv_sec == 0)
266 ch->start = *now;
267 }
268 }
269
270 void btr_add_output(char *buf, size_t size, struct btr_node *btrn)
271 {
272 struct btr_buffer *btrb;
273
274 assert(size != 0);
275 if (list_empty(&btrn->children)) {
276 free(buf);
277 return;
278 }
279 btrb = new_btrb(buf, size);
280 add_btrb_to_children(btrb, btrn, 0);
281 }
282
283 void btr_add_output_pool(struct btr_pool *btrp, size_t size,
284 struct btr_node *btrn)
285 {
286 struct btr_buffer *btrb;
287 char *buf;
288 size_t avail;
289
290 assert(size != 0);
291 if (list_empty(&btrn->children))
292 return;
293 avail = btr_pool_get_buffer(btrp, &buf);
294 assert(avail >= size);
295 btr_pool_allocate(btrp, size);
296 btrb = new_btrb(buf, size);
297 btrb->pool = btrp;
298 add_btrb_to_children(btrb, btrn, 0);
299 }
300
301 void btr_copy(const void *src, size_t n, struct btr_pool *btrp,
302 struct btr_node *btrn)
303 {
304 char *buf;
305 size_t sz, copy;
306
307 if (n == 0)
308 return;
309 assert(n <= btr_pool_unused(btrp));
310 sz = btr_pool_get_buffer(btrp, &buf);
311 copy = PARA_MIN(sz, n);
312 memcpy(buf, src, copy);
313 btr_add_output_pool(btrp, copy, btrn);
314 if (copy == n)
315 return;
316 sz = btr_pool_get_buffer(btrp, &buf);
317 assert(sz >= n - copy);
318 memcpy(buf, src + copy, n - copy);
319 btr_add_output_pool(btrp, n - copy, btrn);
320 }
321
322 static void btr_pushdown_br(struct btr_buffer_reference *br, struct btr_node *btrn)
323 {
324 add_btrb_to_children(br->btrb, btrn, br->consumed);
325 btr_drop_buffer_reference(br);
326 }
327
328 void btr_pushdown(struct btr_node *btrn)
329 {
330 struct btr_buffer_reference *br, *tmp;
331
332 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
333 btr_pushdown_br(br, btrn);
334 }
335
336 int btr_pushdown_one(struct btr_node *btrn)
337 {
338 struct btr_buffer_reference *br;
339
340 if (list_empty(&btrn->input_queue))
341 return 0;
342 br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
343 btr_pushdown_br(br, btrn);
344 return 1;
345 }
346
347 /* Return true if this node has no children. */
348 bool btr_no_children(struct btr_node *btrn)
349 {
350 return list_empty(&btrn->children);
351 }
352
353 bool btr_no_parent(struct btr_node *btrn)
354 {
355 return !btrn->parent;
356 }
357
358 bool btr_inplace_ok(struct btr_node *btrn)
359 {
360 if (!btrn->parent)
361 return true;
362 return list_is_singular(&btrn->parent->children);
363 }
364
365 static inline size_t br_available_bytes(struct btr_buffer_reference *br)
366 {
367 return br->btrb->size - br->consumed;
368 }
369
370 size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
371 {
372 if (buf)
373 *buf = br->btrb->buf + br->consumed;
374 return br_available_bytes(br);
375 }
376
377 /**
378 * \return zero if the input buffer queue is empty.
379 */
380 size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
381 {
382 struct btr_buffer_reference *br;
383 char *buf, *result = NULL;
384 size_t sz, rv = 0;
385
386 FOR_EACH_BUFFER_REF(br, btrn) {
387 sz = btr_get_buffer_by_reference(br, &buf);
388 if (!result) {
389 result = buf;
390 rv = sz;
391 if (!br->btrb->pool)
392 break;
393 continue;
394 }
395 if (!br->btrb->pool)
396 break;
397 if (result + rv != buf) {
398 PARA_DEBUG_LOG("%s: pool merge impossible: %p != %p\n",
399 btrn->name, result + rv, buf);
400 break;
401 }
402 // PARA_CRIT_LOG("%s: inplace merge (%zu, %zu)->%zu\n", btrn->name,
403 // rv, sz, rv + sz);
404 // PARA_CRIT_LOG("%s: inplace merge %p (%zu)\n", btrn->name,
405 // result, sz);
406 rv += sz;
407 }
408 if (bufp)
409 *bufp = result;
410 return rv;
411 }
412
413 void btr_consume(struct btr_node *btrn, size_t numbytes)
414 {
415 struct btr_buffer_reference *br, *tmp;
416 size_t sz;
417
418 if (numbytes == 0)
419 return;
420 br = get_first_input_br(btrn);
421 assert(br);
422
423 //PARA_CRIT_LOG("wrap count: %zu\n", br->wrap_count);
424 if (br->wrap_count == 0) {
425 /*
426 * No wrap buffer. Drop buffer references whose buffer
427 * has been fully used. */
428 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn) {
429 if (br->consumed + numbytes <= br->btrb->size) {
430 br->consumed += numbytes;
431 if (br->consumed == br->btrb->size)
432 btr_drop_buffer_reference(br);
433 return;
434 }
435 numbytes -= br->btrb->size - br->consumed;
436 btr_drop_buffer_reference(br);
437 }
438 assert(true);
439 }
440 /*
441
442 We have a wrap buffer, consume from it. If in total,
443 i.e. including previous calls to brt_consume(), less than
444 wrap_count has been consumed, there's nothing more we can do.
445
446 Otherwise we drop the wrap buffer and consume from subsequent
447 buffers of the input queue the correct amount of bytes. This
448 is the total number of bytes that have been consumed from the
449 wrap buffer.
450 */
451 PARA_DEBUG_LOG("consuming %zu/%zu bytes from wrap buffer\n", numbytes,
452 br_available_bytes(br));
453
454 assert(numbytes <= br_available_bytes(br));
455 if (br->consumed + numbytes < br->wrap_count) {
456 br->consumed += numbytes;
457 return;
458 }
459 PARA_DEBUG_LOG("dropping wrap buffer (%zu bytes)\n", br->btrb->size);
460 /* get rid of the wrap buffer */
461 sz = br->consumed + numbytes;
462 btr_drop_buffer_reference(br);
463 return btr_consume(btrn, sz);
464 }
465
466 static void flush_input_queue(struct btr_node *btrn)
467 {
468 struct btr_buffer_reference *br, *tmp;
469 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
470 btr_drop_buffer_reference(br);
471 }
472
473 void btr_free_node(struct btr_node *btrn)
474 {
475 if (!btrn)
476 return;
477 free(btrn->name);
478 free(btrn);
479 }
480
481 void btr_remove_node(struct btr_node *btrn)
482 {
483 struct btr_node *ch;
484
485 if (!btrn)
486 return;
487 PARA_NOTICE_LOG("removing btr node %s from buffer tree\n", btrn->name);
488 FOR_EACH_CHILD(ch, btrn)
489 ch->parent = NULL;
490 flush_input_queue(btrn);
491 if (btrn->parent)
492 list_del(&btrn->node);
493 }
494
495 size_t btr_get_input_queue_size(struct btr_node *btrn)
496 {
497 struct btr_buffer_reference *br;
498 size_t size = 0, wrap_consumed = 0;
499
500 FOR_EACH_BUFFER_REF(br, btrn) {
501 if (br->wrap_count != 0) {
502 wrap_consumed = br->consumed;
503 continue;
504 }
505 size += br_available_bytes(br);
506 }
507 assert(wrap_consumed <= size);
508 size -= wrap_consumed;
509 return size;
510 }
511
512 void btr_splice_out_node(struct btr_node *btrn)
513 {
514 struct btr_node *ch, *tmp;
515
516 assert(btrn);
517 PARA_NOTICE_LOG("splicing out %s\n", btrn->name);
518 btr_pushdown(btrn);
519 if (btrn->parent)
520 list_del(&btrn->node);
521 FOR_EACH_CHILD_SAFE(ch, tmp, btrn) {
522 PARA_INFO_LOG("parent(%s): %s\n", ch->name,
523 btrn->parent? btrn->parent->name : "NULL");
524 ch->parent = btrn->parent;
525 if (btrn->parent)
526 list_move(&ch->node, &btrn->parent->children);
527 }
528 assert(list_empty(&btrn->children));
529 }
530
531 /**
532 * Return the size of the largest input queue.
533 *
534 * Iterates over all children of the given node.
535 */
536 size_t btr_bytes_pending(struct btr_node *btrn)
537 {
538 size_t max_size = 0;
539 struct btr_node *ch;
540
541 FOR_EACH_CHILD(ch, btrn) {
542 size_t size = btr_get_input_queue_size(ch);
543 max_size = PARA_MAX(max_size, size);
544 }
545 return max_size;
546 }
547
548 int btr_exec(struct btr_node *btrn, const char *command, char **value_result)
549 {
550 if (!btrn)
551 return -ERRNO_TO_PARA_ERROR(EINVAL);
552 if (!btrn->execute)
553 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
554 return btrn->execute(btrn, command, value_result);
555 }
556
557 int btr_exec_up(struct btr_node *btrn, const char *command, char **value_result)
558 {
559 int ret;
560
561 for (; btrn; btrn = btrn->parent) {
562 struct btr_node *parent = btrn->parent;
563 if (!parent)
564 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
565 if (!parent->execute)
566 continue;
567 PARA_INFO_LOG("parent: %s, cmd: %s\n", parent->name, command);
568 ret = parent->execute(parent, command, value_result);
569 if (ret == -ERRNO_TO_PARA_ERROR(ENOTSUP))
570 continue;
571 if (ret < 0)
572 return ret;
573 if (value_result && *value_result)
574 PARA_NOTICE_LOG("%s(%s): %s\n", command, parent->name,
575 *value_result);
576 return 1;
577 }
578 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
579 }
580
581 void *btr_context(struct btr_node *btrn)
582 {
583 return btrn->context;
584 }
585
586 static bool need_buffer_pool_merge(struct btr_node *btrn)
587 {
588 struct btr_buffer_reference *br = get_first_input_br(btrn);
589
590 if (!br)
591 return false;
592 if (br->wrap_count != 0)
593 return true;
594 if (br->btrb->pool)
595 return true;
596 return false;
597 }
598
599 static void merge_input_pool(struct btr_node *btrn, size_t dest_size)
600 {
601 struct btr_buffer_reference *br, *wbr = NULL;
602 int num_refs; /* including wrap buffer */
603 char *buf, *buf1 = NULL, *buf2 = NULL;
604 size_t sz, sz1 = 0, sz2 = 0, wsz;
605
606 br = get_first_input_br(btrn);
607 if (!br || br_available_bytes(br) >= dest_size)
608 return;
609 num_refs = 0;
610 FOR_EACH_BUFFER_REF(br, btrn) {
611 num_refs++;
612 sz = btr_get_buffer_by_reference(br, &buf);
613 if (sz == 0)
614 break;
615 if (br->wrap_count != 0) {
616 assert(!wbr);
617 assert(num_refs == 1);
618 wbr = br;
619 if (sz >= dest_size)
620 return;
621 continue;
622 }
623 if (!buf1) {
624 buf1 = buf;
625 sz1 = sz;
626 goto next;
627 }
628 if (buf1 + sz1 == buf) {
629 sz1 += sz;
630 goto next;
631 }
632 if (!buf2) {
633 buf2 = buf;
634 sz2 = sz;
635 goto next;
636 }
637 assert(buf2 + sz2 == buf);
638 sz2 += sz;
639 next:
640 if (sz1 + sz2 >= dest_size)
641 break;
642 }
643 if (!buf2) /* nothing to do */
644 return;
645 assert(buf1 && sz2 > 0);
646 /*
647 * If the second buffer is large, we only take the first part of it to
648 * avoid having to memcpy() huge buffers.
649 */
650 sz2 = PARA_MIN(sz2, (size_t)(64 * 1024));
651 if (!wbr) {
652 /* Make a new wrap buffer combining buf1 and buf2. */
653 sz = sz1 + sz2;
654 buf = para_malloc(sz);
655 PARA_DEBUG_LOG("merging input buffers: (%p:%zu, %p:%zu) -> %p:%zu\n",
656 buf1, sz1, buf2, sz2, buf, sz);
657 memcpy(buf, buf1, sz1);
658 memcpy(buf + sz1, buf2, sz2);
659 br = para_calloc(sizeof(*br));
660 br->btrb = new_btrb(buf, sz);
661 br->btrb->refcount = 1;
662 br->consumed = 0;
663 /* This is a wrap buffer */
664 br->wrap_count = sz1;
665 para_list_add(&br->node, &btrn->input_queue);
666 return;
667 }
668 PARA_DEBUG_LOG("increasing wrap buffer, sz1: %zu, sz2: %zu\n", sz1, sz2);
669 /*
670 * We already have a wrap buffer, but it is too small. It might be
671 * partially used.
672 */
673 wsz = br_available_bytes(wbr);
674 if (wbr->wrap_count == sz1 && wbr->btrb->size >= sz1 + sz2) /* nothing we can do about it */
675 return;
676 sz = sz1 + sz2 - wbr->btrb->size; /* amount of new data */
677 wbr->btrb->size += sz;
678 wbr->btrb->buf = para_realloc(wbr->btrb->buf, wbr->btrb->size);
679 /* copy the new data to the end of the reallocated buffer */
680 assert(sz2 >= sz);
681 memcpy(wbr->btrb->buf + wbr->btrb->size - sz, buf2 + sz2 - sz, sz);
682 }
683
684 /**
685 * Merge the first two input buffers into one.
686 *
687 * This is a quite expensive operation.
688 *
689 * \return The number of buffers that have been available (zero, one or two).
690 */
691 static int merge_input(struct btr_node *btrn)
692 {
693 struct btr_buffer_reference *brs[2], *br;
694 char *bufs[2], *buf;
695 size_t szs[2], sz;
696 int i;
697
698 if (list_empty(&btrn->input_queue))
699 return 0;
700 if (list_is_singular(&btrn->input_queue))
701 return 1;
702 i = 0;
703 /* get references to the first two buffers */
704 FOR_EACH_BUFFER_REF(br, btrn) {
705 brs[i] = br;
706 szs[i] = btr_get_buffer_by_reference(brs[i], bufs + i);
707 i++;
708 if (i == 2)
709 break;
710 }
711 /* make a new btrb that combines the two buffers and a br to it. */
712 sz = szs[0] + szs[1];
713 buf = para_malloc(sz);
714 PARA_DEBUG_LOG("%s: memory merging input buffers: (%zu, %zu) -> %zu\n",
715 btrn->name, szs[0], szs[1], sz);
716 memcpy(buf, bufs[0], szs[0]);
717 memcpy(buf + szs[0], bufs[1], szs[1]);
718
719 br = para_calloc(sizeof(*br));
720 br->btrb = new_btrb(buf, sz);
721 br->btrb->refcount = 1;
722
723 /* replace the first two refs by the new one */
724 btr_drop_buffer_reference(brs[0]);
725 btr_drop_buffer_reference(brs[1]);
726 para_list_add(&br->node, &btrn->input_queue);
727 return 2;
728 }
729
730 void btr_merge(struct btr_node *btrn, size_t dest_size)
731 {
732 if (need_buffer_pool_merge(btrn))
733 return merge_input_pool(btrn, dest_size);
734 for (;;) {
735 char *buf;
736 size_t len = btr_next_buffer(btrn, &buf);
737 if (len >= dest_size)
738 return;
739 PARA_DEBUG_LOG("input size = %zu < %zu = dest\n", len, dest_size);
740 if (merge_input(btrn) < 2)
741 return;
742 }
743 }
744
745 bool btr_eof(struct btr_node *btrn)
746 {
747 char *buf;
748 size_t len = btr_next_buffer(btrn, &buf);
749
750 return (len == 0 && btr_no_parent(btrn));
751 }
752
753 void log_tree_recursively(struct btr_node *btrn, int loglevel, int depth)
754 {
755 struct btr_node *ch;
756 const char spaces[] = " ", *space = spaces + 16 - depth;
757
758 if (depth > 16)
759 return;
760 para_log(loglevel, "%s%s\n", space, btrn->name);
761 FOR_EACH_CHILD(ch, btrn)
762 log_tree_recursively(ch, loglevel, depth + 1);
763 }
764
765 void btr_log_tree(struct btr_node *btrn, int loglevel)
766 {
767 return log_tree_recursively(btrn, loglevel, 0);
768 }
769
770 /*
771 * \return \a root if \a name is \p NULL.
772 */
773 struct btr_node *btr_search_node(const char *name, struct btr_node *root)
774 {
775 struct btr_node *ch;
776
777 if (!name)
778 return root;
779 if (!strcmp(root->name, name))
780 return root;
781 FOR_EACH_CHILD(ch, root) {
782 struct btr_node *result = btr_search_node(name, ch);
783 if (result)
784 return result;
785 }
786 return NULL;
787 }
788
789 /** 640K ought to be enough for everybody ;) */
790 #define BTRN_MAX_PENDING (640 * 1024)
791
792 int btr_node_status(struct btr_node *btrn, size_t min_iqs,
793 enum btr_node_type type)
794 {
795 size_t iqs;
796
797 assert(btrn);
798 if (type != BTR_NT_LEAF) {
799 if (btr_no_children(btrn))
800 return -E_BTR_NO_CHILD;
801 if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
802 return 0;
803 }
804 if (type != BTR_NT_ROOT) {
805 if (btr_eof(btrn))
806 return -E_BTR_EOF;
807 iqs = btr_get_input_queue_size(btrn);
808 if (iqs == 0) /* we have a parent, because not eof */
809 return 0;
810 if (iqs < min_iqs && !btr_no_parent(btrn))
811 return 0;
812 }
813 return 1;
814 }
815
816 void btr_get_node_start(struct btr_node *btrn, struct timeval *tv)
817 {
818 *tv = btrn->start;
819 }