client: Fix a memory leak in client_post_select().
[paraslash.git] / buffer_tree.c
1 #include <regex.h>
2 #include <stdbool.h>
3
4 #include "para.h"
5 #include "list.h"
6 #include "string.h"
7 #include "buffer_tree.h"
8 #include "error.h"
9 #include "sched.h"
10
11 /* whead = NULL means area full */
12 struct btr_pool {
13 char *name;
14 char *area_start;
15 char *area_end;
16 char *rhead;
17 char *whead;
18 };
19
20 enum btr_buffer_flags {
21 /* changes the way the buffer is deallocated */
22 BTR_BF_BTR_POOL = 1,
23 };
24
25 struct btr_buffer {
26 char *buf;
27 size_t size;
28 /** The number of references to this buffer. */
29 int refcount;
30 struct btr_pool *pool;
31 };
32
33 struct btr_buffer_reference {
34 struct btr_buffer *btrb;
35 size_t consumed;
36 /* Each buffer reference belongs to the buffer queue list of some buffer tree node. */
37 struct list_head node;
38 size_t wrap_count;
39 };
40
41 struct btr_node {
42 char *name;
43 struct btr_node *parent;
44 /* The position of this btr node in the buffer tree. */
45 struct list_head node;
46 /* The children nodes of this btr node are linked together in a list. */
47 struct list_head children;
48 /* Time of first data transfer. */
49 struct timeval start;
50 /**
51 * The input queue is a list of references to btr buffers. Each item on
52 * the list represents an input buffer which has not been completely
53 * used by this btr node.
54 */
55 struct list_head input_queue;
56 btr_command_handler execute;
57 void *context;
58 };
59
60 /**
61 * Create a new buffer pool.
62 *
63 * \param name The name of the new buffer pool.
64 *
65 * \param area The size in bytes of the pool area.
66 *
67 * \return An opaque pointer to the newly created buffer pool. It must be
68 * passed to btr_pool_free() after it is no longer used to deallocate all
69 * resources.
70 */
71 struct btr_pool *btr_pool_new(const char *name, size_t area_size)
72 {
73 struct btr_pool *btrp;
74
75 PARA_INFO_LOG("%s, %zu bytes\n", name, area_size);
76 btrp = para_malloc(sizeof(*btrp));
77 btrp->area_start = para_malloc(area_size);
78 btrp->area_end = btrp->area_start + area_size;
79 btrp->rhead = btrp->area_start;
80 btrp->whead = btrp->area_start;
81 btrp->name = para_strdup(name);
82 return btrp;
83 }
84
85 /**
86 * Dellocate resources used by a buffer pool.
87 *
88 * \param btrp A pointer obtained via btr_pool_new().
89 */
90 void btr_pool_free(struct btr_pool *btrp)
91 {
92 if (!btrp)
93 return;
94 free(btrp->area_start);
95 free(btrp->name);
96 free(btrp);
97 }
98
99 /**
100 * Return the size of the buffer pool area.
101 *
102 * \param btrp The buffer pool.
103 *
104 * \return The same value which was passed during creation time to
105 * btr_pool_new().
106 */
107 size_t btr_pool_size(struct btr_pool *btrp)
108 {
109 return btrp->area_end - btrp->area_start;
110 }
111
112 size_t btr_pool_filled(struct btr_pool *btrp)
113 {
114 if (!btrp->whead)
115 return btr_pool_size(btrp);
116 if (btrp->rhead <= btrp->whead)
117 return btrp->whead - btrp->rhead;
118 return btr_pool_size(btrp) - (btrp->rhead - btrp->whead);
119 }
120
121 size_t btr_pool_unused(struct btr_pool *btrp)
122 {
123 return btr_pool_size(btrp) - btr_pool_filled(btrp);
124 }
125
126 /*
127 * Return maximal size available for one read. This is
128 * smaller than the value returned by btr_pool_unused().
129 */
130 size_t btr_pool_available(struct btr_pool *btrp)
131 {
132 if (!btrp->whead)
133 return 0;
134 if (btrp->rhead <= btrp->whead)
135 return btrp->area_end - btrp->whead;
136 return btrp->rhead - btrp->whead;
137 }
138
139 size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result)
140 {
141 if (result)
142 *result = btrp->whead;
143 return btr_pool_available(btrp);
144 }
145
146 void btr_pool_allocate(struct btr_pool *btrp, size_t size)
147 {
148 char *end;
149
150 if (size == 0)
151 return;
152 assert(size <= btr_pool_available(btrp));
153 end = btrp->whead + size;
154 assert(end <= btrp->area_end);
155
156 if (end == btrp->area_end) {
157 PARA_DEBUG_LOG("%s: end of pool area reached\n", btrp->name);
158 end = btrp->area_start;
159 }
160 if (end == btrp->rhead) {
161 PARA_DEBUG_LOG("%s btrp buffer full\n", btrp->name);
162 end = NULL; /* buffer full */
163 }
164 btrp->whead = end;
165 }
166
167 static void btr_pool_deallocate(struct btr_pool *btrp, size_t size)
168 {
169 char *end = btrp->rhead + size;
170
171 if (size == 0)
172 return;
173 assert(end <= btrp->area_end);
174 assert(size <= btr_pool_filled(btrp));
175 if (end == btrp->area_end)
176 end = btrp->area_start;
177 if (!btrp->whead)
178 btrp->whead = btrp->rhead;
179 btrp->rhead = end;
180 if (btrp->rhead == btrp->whead)
181 btrp->rhead = btrp->whead = btrp->area_start;
182 }
183
184 #define FOR_EACH_CHILD(_tn, _btrn) list_for_each_entry((_tn), \
185 &((_btrn)->children), node)
186 #define FOR_EACH_CHILD_SAFE(_tn, _tmp, _btrn) \
187 list_for_each_entry_safe((_tn), (_tmp), &((_btrn)->children), node)
188
189 #define FOR_EACH_BUFFER_REF(_br, _btrn) \
190 list_for_each_entry((_br), &(_btrn)->input_queue, node)
191 #define FOR_EACH_BUFFER_REF_SAFE(_br, _tmp, _btrn) \
192 list_for_each_entry_safe((_br), (_tmp), &(_btrn)->input_queue, node)
193
194 struct btr_node *btr_new_node(struct btr_node_description *bnd)
195 {
196 struct btr_node *btrn = para_malloc(sizeof(*btrn));
197
198 btrn->name = para_strdup(bnd->name);
199 btrn->parent = bnd->parent;
200 btrn->execute = bnd->handler;
201 btrn->context = bnd->context;
202 btrn->start.tv_sec = 0;
203 btrn->start.tv_usec = 0;
204 INIT_LIST_HEAD(&btrn->children);
205 INIT_LIST_HEAD(&btrn->input_queue);
206 if (!bnd->child) {
207 if (bnd->parent) {
208 list_add_tail(&btrn->node, &bnd->parent->children);
209 PARA_INFO_LOG("new leaf node: %s (child of %s)\n",
210 bnd->name, bnd->parent->name);
211 } else
212 PARA_INFO_LOG("added %s as btr root\n", bnd->name);
213 goto out;
214 }
215 if (!bnd->parent) {
216 assert(!bnd->child->parent);
217 PARA_INFO_LOG("new root: %s (was %s)\n",
218 bnd->name, bnd->child->name);
219 btrn->parent = NULL;
220 list_add_tail(&bnd->child->node, &btrn->children);
221 /* link it in */
222 bnd->child->parent = btrn;
223 goto out;
224 }
225 PARA_EMERG_LOG("inserting internal nodes not yet supported.\n");
226 exit(EXIT_FAILURE);
227 assert(bnd->child->parent == bnd->parent);
228 out:
229 return btrn;
230 }
231
232 /*
233 * Allocate a new btr buffer.
234 *
235 * The freshly allocated buffer will have a zero refcount and will
236 * not be associated with a btr pool.
237 */
238 static struct btr_buffer *new_btrb(char *buf, size_t size)
239 {
240 struct btr_buffer *btrb = para_calloc(sizeof(*btrb));
241
242 btrb->buf = buf;
243 btrb->size = size;
244 return btrb;
245 }
246
247 static void dealloc_buffer(struct btr_buffer *btrb)
248 {
249 if (btrb->pool)
250 btr_pool_deallocate(btrb->pool, btrb->size);
251 else
252 free(btrb->buf);
253 }
254
255 static struct btr_buffer_reference *get_first_input_br(struct btr_node *btrn)
256 {
257 if (list_empty(&btrn->input_queue))
258 return NULL;
259 return list_first_entry(&btrn->input_queue,
260 struct btr_buffer_reference, node);
261 }
262
263 /*
264 * Deallocate the reference, release the resources if refcount drops to zero.
265 */
266 static void btr_drop_buffer_reference(struct btr_buffer_reference *br)
267 {
268 struct btr_buffer *btrb = br->btrb;
269
270 list_del(&br->node);
271 free(br);
272 btrb->refcount--;
273 if (btrb->refcount == 0) {
274 dealloc_buffer(btrb);
275 free(btrb);
276 }
277 }
278
279 static void add_btrb_to_children(struct btr_buffer *btrb,
280 struct btr_node *btrn, size_t consumed)
281 {
282 struct btr_node *ch;
283
284 if (btrn->start.tv_sec == 0)
285 btrn->start = *now;
286 FOR_EACH_CHILD(ch, btrn) {
287 struct btr_buffer_reference *br = para_calloc(sizeof(*br));
288 br->btrb = btrb;
289 br->consumed = consumed;
290 list_add_tail(&br->node, &ch->input_queue);
291 btrb->refcount++;
292 if (ch->start.tv_sec == 0)
293 ch->start = *now;
294 }
295 }
296
297 void btr_add_output(char *buf, size_t size, struct btr_node *btrn)
298 {
299 struct btr_buffer *btrb;
300
301 assert(size != 0);
302 if (list_empty(&btrn->children)) {
303 free(buf);
304 return;
305 }
306 btrb = new_btrb(buf, size);
307 add_btrb_to_children(btrb, btrn, 0);
308 }
309
310 void btr_add_output_pool(struct btr_pool *btrp, size_t size,
311 struct btr_node *btrn)
312 {
313 struct btr_buffer *btrb;
314 char *buf;
315 size_t avail;
316
317 assert(size != 0);
318 if (list_empty(&btrn->children))
319 return;
320 avail = btr_pool_get_buffer(btrp, &buf);
321 assert(avail >= size);
322 btr_pool_allocate(btrp, size);
323 btrb = new_btrb(buf, size);
324 btrb->pool = btrp;
325 add_btrb_to_children(btrb, btrn, 0);
326 }
327
328 void btr_copy(const void *src, size_t n, struct btr_pool *btrp,
329 struct btr_node *btrn)
330 {
331 char *buf;
332 size_t sz, copy;
333
334 if (n == 0)
335 return;
336 assert(n <= btr_pool_unused(btrp));
337 sz = btr_pool_get_buffer(btrp, &buf);
338 copy = PARA_MIN(sz, n);
339 memcpy(buf, src, copy);
340 btr_add_output_pool(btrp, copy, btrn);
341 if (copy == n)
342 return;
343 sz = btr_pool_get_buffer(btrp, &buf);
344 assert(sz >= n - copy);
345 memcpy(buf, src + copy, n - copy);
346 btr_add_output_pool(btrp, n - copy, btrn);
347 }
348
349 static void btr_pushdown_br(struct btr_buffer_reference *br, struct btr_node *btrn)
350 {
351 add_btrb_to_children(br->btrb, btrn, br->consumed);
352 btr_drop_buffer_reference(br);
353 }
354
355 void btr_pushdown(struct btr_node *btrn)
356 {
357 struct btr_buffer_reference *br, *tmp;
358
359 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
360 btr_pushdown_br(br, btrn);
361 }
362
363 int btr_pushdown_one(struct btr_node *btrn)
364 {
365 struct btr_buffer_reference *br;
366
367 if (list_empty(&btrn->input_queue))
368 return 0;
369 br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
370 btr_pushdown_br(br, btrn);
371 return 1;
372 }
373
374 /* Return true if this node has no children. */
375 bool btr_no_children(struct btr_node *btrn)
376 {
377 return list_empty(&btrn->children);
378 }
379
380 bool btr_no_parent(struct btr_node *btrn)
381 {
382 return !btrn->parent;
383 }
384
385 bool btr_inplace_ok(struct btr_node *btrn)
386 {
387 if (!btrn->parent)
388 return true;
389 return list_is_singular(&btrn->parent->children);
390 }
391
392 static inline size_t br_available_bytes(struct btr_buffer_reference *br)
393 {
394 return br->btrb->size - br->consumed;
395 }
396
397 size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
398 {
399 if (buf)
400 *buf = br->btrb->buf + br->consumed;
401 return br_available_bytes(br);
402 }
403
404 /**
405 * \return zero if the input buffer queue is empty.
406 */
407 size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
408 {
409 struct btr_buffer_reference *br;
410 char *buf, *result = NULL;
411 size_t sz, rv = 0;
412
413 FOR_EACH_BUFFER_REF(br, btrn) {
414 sz = btr_get_buffer_by_reference(br, &buf);
415 if (!result) {
416 result = buf;
417 rv = sz;
418 if (!br->btrb->pool)
419 break;
420 continue;
421 }
422 if (!br->btrb->pool)
423 break;
424 if (result + rv != buf)
425 break;
426 rv += sz;
427 }
428 if (bufp)
429 *bufp = result;
430 return rv;
431 }
432
433 void btr_consume(struct btr_node *btrn, size_t numbytes)
434 {
435 struct btr_buffer_reference *br, *tmp;
436 size_t sz;
437
438 if (numbytes == 0)
439 return;
440 br = get_first_input_br(btrn);
441 assert(br);
442
443 if (br->wrap_count == 0) {
444 /*
445 * No wrap buffer. Drop buffer references whose buffer
446 * has been fully used. */
447 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn) {
448 if (br->consumed + numbytes <= br->btrb->size) {
449 br->consumed += numbytes;
450 if (br->consumed == br->btrb->size)
451 btr_drop_buffer_reference(br);
452 return;
453 }
454 numbytes -= br->btrb->size - br->consumed;
455 btr_drop_buffer_reference(br);
456 }
457 assert(true);
458 }
459 /*
460
461 We have a wrap buffer, consume from it. If in total,
462 i.e. including previous calls to brt_consume(), less than
463 wrap_count has been consumed, there's nothing more we can do.
464
465 Otherwise we drop the wrap buffer and consume from subsequent
466 buffers of the input queue the correct amount of bytes. This
467 is the total number of bytes that have been consumed from the
468 wrap buffer.
469 */
470 PARA_DEBUG_LOG("consuming %zu/%zu bytes from wrap buffer\n", numbytes,
471 br_available_bytes(br));
472
473 assert(numbytes <= br_available_bytes(br));
474 if (br->consumed + numbytes < br->wrap_count) {
475 br->consumed += numbytes;
476 return;
477 }
478 PARA_DEBUG_LOG("dropping wrap buffer (%zu bytes)\n", br->btrb->size);
479 /* get rid of the wrap buffer */
480 sz = br->consumed + numbytes;
481 btr_drop_buffer_reference(br);
482 return btr_consume(btrn, sz);
483 }
484
485 static void flush_input_queue(struct btr_node *btrn)
486 {
487 struct btr_buffer_reference *br, *tmp;
488 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
489 btr_drop_buffer_reference(br);
490 }
491
492 void btr_free_node(struct btr_node *btrn)
493 {
494 if (!btrn)
495 return;
496 free(btrn->name);
497 free(btrn);
498 }
499
500 void btr_remove_node(struct btr_node *btrn)
501 {
502 struct btr_node *ch;
503
504 if (!btrn)
505 return;
506 PARA_NOTICE_LOG("removing btr node %s from buffer tree\n", btrn->name);
507 FOR_EACH_CHILD(ch, btrn)
508 ch->parent = NULL;
509 flush_input_queue(btrn);
510 if (btrn->parent)
511 list_del(&btrn->node);
512 }
513
514 size_t btr_get_input_queue_size(struct btr_node *btrn)
515 {
516 struct btr_buffer_reference *br;
517 size_t size = 0, wrap_consumed = 0;
518
519 FOR_EACH_BUFFER_REF(br, btrn) {
520 if (br->wrap_count != 0) {
521 wrap_consumed = br->consumed;
522 continue;
523 }
524 size += br_available_bytes(br);
525 }
526 assert(wrap_consumed <= size);
527 size -= wrap_consumed;
528 return size;
529 }
530
531 void btr_splice_out_node(struct btr_node *btrn)
532 {
533 struct btr_node *ch, *tmp;
534
535 assert(btrn);
536 PARA_NOTICE_LOG("splicing out %s\n", btrn->name);
537 btr_pushdown(btrn);
538 if (btrn->parent)
539 list_del(&btrn->node);
540 FOR_EACH_CHILD_SAFE(ch, tmp, btrn) {
541 PARA_INFO_LOG("parent(%s): %s\n", ch->name,
542 btrn->parent? btrn->parent->name : "NULL");
543 ch->parent = btrn->parent;
544 if (btrn->parent)
545 list_move(&ch->node, &btrn->parent->children);
546 }
547 assert(list_empty(&btrn->children));
548 }
549
550 /**
551 * Return the size of the largest input queue.
552 *
553 * Iterates over all children of the given node.
554 */
555 size_t btr_bytes_pending(struct btr_node *btrn)
556 {
557 size_t max_size = 0;
558 struct btr_node *ch;
559
560 FOR_EACH_CHILD(ch, btrn) {
561 size_t size = btr_get_input_queue_size(ch);
562 max_size = PARA_MAX(max_size, size);
563 }
564 return max_size;
565 }
566
567 int btr_exec(struct btr_node *btrn, const char *command, char **value_result)
568 {
569 if (!btrn)
570 return -ERRNO_TO_PARA_ERROR(EINVAL);
571 if (!btrn->execute)
572 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
573 return btrn->execute(btrn, command, value_result);
574 }
575
576 int btr_exec_up(struct btr_node *btrn, const char *command, char **value_result)
577 {
578 int ret;
579
580 for (; btrn; btrn = btrn->parent) {
581 struct btr_node *parent = btrn->parent;
582 if (!parent)
583 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
584 if (!parent->execute)
585 continue;
586 PARA_INFO_LOG("parent: %s, cmd: %s\n", parent->name, command);
587 ret = parent->execute(parent, command, value_result);
588 if (ret == -ERRNO_TO_PARA_ERROR(ENOTSUP))
589 continue;
590 if (ret < 0)
591 return ret;
592 if (value_result && *value_result)
593 PARA_NOTICE_LOG("%s(%s): %s\n", command, parent->name,
594 *value_result);
595 return 1;
596 }
597 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
598 }
599
600 void *btr_context(struct btr_node *btrn)
601 {
602 return btrn->context;
603 }
604
605 static bool need_buffer_pool_merge(struct btr_node *btrn)
606 {
607 struct btr_buffer_reference *br = get_first_input_br(btrn);
608
609 if (!br)
610 return false;
611 if (br->wrap_count != 0)
612 return true;
613 if (br->btrb->pool)
614 return true;
615 return false;
616 }
617
618 static void merge_input_pool(struct btr_node *btrn, size_t dest_size)
619 {
620 struct btr_buffer_reference *br, *wbr = NULL;
621 int num_refs; /* including wrap buffer */
622 char *buf, *buf1 = NULL, *buf2 = NULL;
623 size_t sz, sz1 = 0, sz2 = 0, wsz;
624
625 br = get_first_input_br(btrn);
626 if (!br || br_available_bytes(br) >= dest_size)
627 return;
628 num_refs = 0;
629 FOR_EACH_BUFFER_REF(br, btrn) {
630 num_refs++;
631 sz = btr_get_buffer_by_reference(br, &buf);
632 if (sz == 0)
633 break;
634 if (br->wrap_count != 0) {
635 assert(!wbr);
636 assert(num_refs == 1);
637 wbr = br;
638 if (sz >= dest_size)
639 return;
640 continue;
641 }
642 if (!buf1) {
643 buf1 = buf;
644 sz1 = sz;
645 goto next;
646 }
647 if (buf1 + sz1 == buf) {
648 sz1 += sz;
649 goto next;
650 }
651 if (!buf2) {
652 buf2 = buf;
653 sz2 = sz;
654 goto next;
655 }
656 assert(buf2 + sz2 == buf);
657 sz2 += sz;
658 next:
659 if (sz1 + sz2 >= dest_size)
660 break;
661 }
662 if (!buf2) /* nothing to do */
663 return;
664 assert(buf1 && sz2 > 0);
665 /*
666 * If the second buffer is large, we only take the first part of it to
667 * avoid having to memcpy() huge buffers.
668 */
669 sz2 = PARA_MIN(sz2, (size_t)(64 * 1024));
670 if (!wbr) {
671 /* Make a new wrap buffer combining buf1 and buf2. */
672 sz = sz1 + sz2;
673 buf = para_malloc(sz);
674 PARA_DEBUG_LOG("merging input buffers: (%p:%zu, %p:%zu) -> %p:%zu\n",
675 buf1, sz1, buf2, sz2, buf, sz);
676 memcpy(buf, buf1, sz1);
677 memcpy(buf + sz1, buf2, sz2);
678 br = para_calloc(sizeof(*br));
679 br->btrb = new_btrb(buf, sz);
680 br->btrb->refcount = 1;
681 br->consumed = 0;
682 /* This is a wrap buffer */
683 br->wrap_count = sz1;
684 para_list_add(&br->node, &btrn->input_queue);
685 return;
686 }
687 PARA_DEBUG_LOG("increasing wrap buffer, sz1: %zu, sz2: %zu\n", sz1, sz2);
688 /*
689 * We already have a wrap buffer, but it is too small. It might be
690 * partially used.
691 */
692 wsz = br_available_bytes(wbr);
693 if (wbr->wrap_count == sz1 && wbr->btrb->size >= sz1 + sz2) /* nothing we can do about it */
694 return;
695 sz = sz1 + sz2 - wbr->btrb->size; /* amount of new data */
696 wbr->btrb->size += sz;
697 wbr->btrb->buf = para_realloc(wbr->btrb->buf, wbr->btrb->size);
698 /* copy the new data to the end of the reallocated buffer */
699 assert(sz2 >= sz);
700 memcpy(wbr->btrb->buf + wbr->btrb->size - sz, buf2 + sz2 - sz, sz);
701 }
702
703 /**
704 * Merge the first two input buffers into one.
705 *
706 * This is a quite expensive operation.
707 *
708 * \return The number of buffers that have been available (zero, one or two).
709 */
710 static int merge_input(struct btr_node *btrn)
711 {
712 struct btr_buffer_reference *brs[2], *br;
713 char *bufs[2], *buf;
714 size_t szs[2], sz;
715 int i;
716
717 if (list_empty(&btrn->input_queue))
718 return 0;
719 if (list_is_singular(&btrn->input_queue))
720 return 1;
721 i = 0;
722 /* get references to the first two buffers */
723 FOR_EACH_BUFFER_REF(br, btrn) {
724 brs[i] = br;
725 szs[i] = btr_get_buffer_by_reference(brs[i], bufs + i);
726 i++;
727 if (i == 2)
728 break;
729 }
730 /* make a new btrb that combines the two buffers and a br to it. */
731 sz = szs[0] + szs[1];
732 buf = para_malloc(sz);
733 PARA_DEBUG_LOG("%s: memory merging input buffers: (%zu, %zu) -> %zu\n",
734 btrn->name, szs[0], szs[1], sz);
735 memcpy(buf, bufs[0], szs[0]);
736 memcpy(buf + szs[0], bufs[1], szs[1]);
737
738 br = para_calloc(sizeof(*br));
739 br->btrb = new_btrb(buf, sz);
740 br->btrb->refcount = 1;
741
742 /* replace the first two refs by the new one */
743 btr_drop_buffer_reference(brs[0]);
744 btr_drop_buffer_reference(brs[1]);
745 para_list_add(&br->node, &btrn->input_queue);
746 return 2;
747 }
748
749 void btr_merge(struct btr_node *btrn, size_t dest_size)
750 {
751 if (need_buffer_pool_merge(btrn))
752 return merge_input_pool(btrn, dest_size);
753 for (;;) {
754 char *buf;
755 size_t len = btr_next_buffer(btrn, &buf);
756 if (len >= dest_size)
757 return;
758 PARA_DEBUG_LOG("input size = %zu < %zu = dest\n", len, dest_size);
759 if (merge_input(btrn) < 2)
760 return;
761 }
762 }
763
764 bool btr_eof(struct btr_node *btrn)
765 {
766 char *buf;
767 size_t len = btr_next_buffer(btrn, &buf);
768
769 return (len == 0 && btr_no_parent(btrn));
770 }
771
772 void log_tree_recursively(struct btr_node *btrn, int loglevel, int depth)
773 {
774 struct btr_node *ch;
775 const char spaces[] = " ", *space = spaces + 16 - depth;
776
777 if (depth > 16)
778 return;
779 para_log(loglevel, "%s%s\n", space, btrn->name);
780 FOR_EACH_CHILD(ch, btrn)
781 log_tree_recursively(ch, loglevel, depth + 1);
782 }
783
784 void btr_log_tree(struct btr_node *btrn, int loglevel)
785 {
786 return log_tree_recursively(btrn, loglevel, 0);
787 }
788
789 /*
790 * \return \a root if \a name is \p NULL.
791 */
792 struct btr_node *btr_search_node(const char *name, struct btr_node *root)
793 {
794 struct btr_node *ch;
795
796 if (!name)
797 return root;
798 if (!strcmp(root->name, name))
799 return root;
800 FOR_EACH_CHILD(ch, root) {
801 struct btr_node *result = btr_search_node(name, ch);
802 if (result)
803 return result;
804 }
805 return NULL;
806 }
807
808 /** 640K ought to be enough for everybody ;) */
809 #define BTRN_MAX_PENDING (640 * 1024)
810
811 int btr_node_status(struct btr_node *btrn, size_t min_iqs,
812 enum btr_node_type type)
813 {
814 size_t iqs;
815
816 assert(btrn);
817 if (type != BTR_NT_LEAF) {
818 if (btr_no_children(btrn))
819 return -E_BTR_NO_CHILD;
820 if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
821 return 0;
822 }
823 if (type != BTR_NT_ROOT) {
824 if (btr_eof(btrn))
825 return -E_BTR_EOF;
826 iqs = btr_get_input_queue_size(btrn);
827 if (iqs == 0) /* we have a parent, because not eof */
828 return 0;
829 if (iqs < min_iqs && !btr_no_parent(btrn))
830 return 0;
831 }
832 return 1;
833 }
834
835 void btr_get_node_start(struct btr_node *btrn, struct timeval *tv)
836 {
837 *tv = btrn->start;
838 }