[btr] Add more documentation.
[paraslash.git] / buffer_tree.c
1 #include <regex.h>
2 #include <stdbool.h>
3
4 #include "para.h"
5 #include "list.h"
6 #include "string.h"
7 #include "buffer_tree.h"
8 #include "error.h"
9 #include "sched.h"
10
11 /* whead = NULL means area full */
12 struct btr_pool {
13 char *name;
14 char *area_start;
15 char *area_end;
16 char *rhead;
17 char *whead;
18 };
19
20 enum btr_buffer_flags {
21 /* changes the way the buffer is deallocated */
22 BTR_BF_BTR_POOL = 1,
23 };
24
25 struct btr_buffer {
26 char *buf;
27 size_t size;
28 /** The number of references to this buffer. */
29 int refcount;
30 struct btr_pool *pool;
31 };
32
33 struct btr_buffer_reference {
34 struct btr_buffer *btrb;
35 size_t consumed;
36 /* Each buffer reference belongs to the buffer queue list of some buffer tree node. */
37 struct list_head node;
38 size_t wrap_count;
39 };
40
41 struct btr_node {
42 char *name;
43 struct btr_node *parent;
44 /* The position of this btr node in the buffer tree. */
45 struct list_head node;
46 /* The children nodes of this btr node are linked together in a list. */
47 struct list_head children;
48 /* Time of first data transfer. */
49 struct timeval start;
50 /**
51 * The input queue is a list of references to btr buffers. Each item on
52 * the list represents an input buffer which has not been completely
53 * used by this btr node.
54 */
55 struct list_head input_queue;
56 btr_command_handler execute;
57 void *context;
58 };
59
60 /**
61 * Create a new buffer pool.
62 *
63 * \param name The name of the new buffer pool.
64 *
65 * \param area The size in bytes of the pool area.
66 *
67 * \return An opaque pointer to the newly created buffer pool. It must be
68 * passed to btr_pool_free() after it is no longer used to deallocate all
69 * resources.
70 */
71 struct btr_pool *btr_pool_new(const char *name, size_t area_size)
72 {
73 struct btr_pool *btrp;
74
75 PARA_INFO_LOG("%s, %zu bytes\n", name, area_size);
76 btrp = para_malloc(sizeof(*btrp));
77 btrp->area_start = para_malloc(area_size);
78 btrp->area_end = btrp->area_start + area_size;
79 btrp->rhead = btrp->area_start;
80 btrp->whead = btrp->area_start;
81 btrp->name = para_strdup(name);
82 return btrp;
83 }
84
85 /**
86 * Dellocate resources used by a buffer pool.
87 *
88 * \param btrp A pointer obtained via btr_pool_new().
89 */
90 void btr_pool_free(struct btr_pool *btrp)
91 {
92 if (!btrp)
93 return;
94 free(btrp->area_start);
95 free(btrp->name);
96 free(btrp);
97 }
98
99 /**
100 * Return the size of the buffer pool area.
101 *
102 * \param btrp The buffer pool.
103 *
104 * \return The same value which was passed during creation time to
105 * btr_pool_new().
106 */
107 size_t btr_pool_size(struct btr_pool *btrp)
108 {
109 return btrp->area_end - btrp->area_start;
110 }
111
112 size_t btr_pool_filled(struct btr_pool *btrp)
113 {
114 if (!btrp->whead)
115 return btr_pool_size(btrp);
116 if (btrp->rhead <= btrp->whead)
117 return btrp->whead - btrp->rhead;
118 return btr_pool_size(btrp) - (btrp->rhead - btrp->whead);
119 }
120
121 size_t btr_pool_unused(struct btr_pool *btrp)
122 {
123 return btr_pool_size(btrp) - btr_pool_filled(btrp);
124 }
125
126 /*
127 * Return maximal size available for one read. This is
128 * smaller than the value returned by btr_pool_unused().
129 */
130 size_t btr_pool_available(struct btr_pool *btrp)
131 {
132 if (!btrp->whead)
133 return 0;
134 if (btrp->rhead <= btrp->whead)
135 return btrp->area_end - btrp->whead;
136 return btrp->rhead - btrp->whead;
137 }
138
139 /**
140 * Obtain the current write head.
141 *
142 * \param btrp The buffer pool.
143 * \param result The write head is returned here.
144 *
145 * \return The maximal amount of bytes that may be written to the returned
146 * buffer.
147 */
148 size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result)
149 {
150 if (result)
151 *result = btrp->whead;
152 return btr_pool_available(btrp);
153 }
154
155 /**
156 * Mark a part of the buffer pool area as allocated.
157 *
158 * \param btrp The buffer pool.
159 * \param size The amount of bytes to be allocated.
160 *
161 * This is usually called after the caller wrote to the buffer obtained by
162 * btr_pool_get_buffer().
163 */
164 static void btr_pool_allocate(struct btr_pool *btrp, size_t size)
165 {
166 char *end;
167
168 if (size == 0)
169 return;
170 assert(size <= btr_pool_available(btrp));
171 end = btrp->whead + size;
172 assert(end <= btrp->area_end);
173
174 if (end == btrp->area_end) {
175 PARA_DEBUG_LOG("%s: end of pool area reached\n", btrp->name);
176 end = btrp->area_start;
177 }
178 if (end == btrp->rhead) {
179 PARA_DEBUG_LOG("%s btrp buffer full\n", btrp->name);
180 end = NULL; /* buffer full */
181 }
182 btrp->whead = end;
183 }
184
185 static void btr_pool_deallocate(struct btr_pool *btrp, size_t size)
186 {
187 char *end = btrp->rhead + size;
188
189 if (size == 0)
190 return;
191 assert(end <= btrp->area_end);
192 assert(size <= btr_pool_filled(btrp));
193 if (end == btrp->area_end)
194 end = btrp->area_start;
195 if (!btrp->whead)
196 btrp->whead = btrp->rhead;
197 btrp->rhead = end;
198 if (btrp->rhead == btrp->whead)
199 btrp->rhead = btrp->whead = btrp->area_start;
200 }
201
202 #define FOR_EACH_CHILD(_tn, _btrn) list_for_each_entry((_tn), \
203 &((_btrn)->children), node)
204 #define FOR_EACH_CHILD_SAFE(_tn, _tmp, _btrn) \
205 list_for_each_entry_safe((_tn), (_tmp), &((_btrn)->children), node)
206
207 #define FOR_EACH_BUFFER_REF(_br, _btrn) \
208 list_for_each_entry((_br), &(_btrn)->input_queue, node)
209 #define FOR_EACH_BUFFER_REF_SAFE(_br, _tmp, _btrn) \
210 list_for_each_entry_safe((_br), (_tmp), &(_btrn)->input_queue, node)
211
212 struct btr_node *btr_new_node(struct btr_node_description *bnd)
213 {
214 struct btr_node *btrn = para_malloc(sizeof(*btrn));
215
216 btrn->name = para_strdup(bnd->name);
217 btrn->parent = bnd->parent;
218 btrn->execute = bnd->handler;
219 btrn->context = bnd->context;
220 btrn->start.tv_sec = 0;
221 btrn->start.tv_usec = 0;
222 INIT_LIST_HEAD(&btrn->children);
223 INIT_LIST_HEAD(&btrn->input_queue);
224 if (!bnd->child) {
225 if (bnd->parent) {
226 list_add_tail(&btrn->node, &bnd->parent->children);
227 PARA_INFO_LOG("new leaf node: %s (child of %s)\n",
228 bnd->name, bnd->parent->name);
229 } else
230 PARA_INFO_LOG("added %s as btr root\n", bnd->name);
231 goto out;
232 }
233 if (!bnd->parent) {
234 assert(!bnd->child->parent);
235 PARA_INFO_LOG("new root: %s (was %s)\n",
236 bnd->name, bnd->child->name);
237 btrn->parent = NULL;
238 list_add_tail(&bnd->child->node, &btrn->children);
239 /* link it in */
240 bnd->child->parent = btrn;
241 goto out;
242 }
243 PARA_EMERG_LOG("inserting internal nodes not yet supported.\n");
244 exit(EXIT_FAILURE);
245 assert(bnd->child->parent == bnd->parent);
246 out:
247 return btrn;
248 }
249
250 /*
251 * Allocate a new btr buffer.
252 *
253 * The freshly allocated buffer will have a zero refcount and will
254 * not be associated with a btr pool.
255 */
256 static struct btr_buffer *new_btrb(char *buf, size_t size)
257 {
258 struct btr_buffer *btrb = para_calloc(sizeof(*btrb));
259
260 btrb->buf = buf;
261 btrb->size = size;
262 return btrb;
263 }
264
265 static void dealloc_buffer(struct btr_buffer *btrb)
266 {
267 if (btrb->pool)
268 btr_pool_deallocate(btrb->pool, btrb->size);
269 else
270 free(btrb->buf);
271 }
272
273 static struct btr_buffer_reference *get_first_input_br(struct btr_node *btrn)
274 {
275 if (list_empty(&btrn->input_queue))
276 return NULL;
277 return list_first_entry(&btrn->input_queue,
278 struct btr_buffer_reference, node);
279 }
280
281 /*
282 * Deallocate the reference, release the resources if refcount drops to zero.
283 */
284 static void btr_drop_buffer_reference(struct btr_buffer_reference *br)
285 {
286 struct btr_buffer *btrb = br->btrb;
287
288 list_del(&br->node);
289 free(br);
290 btrb->refcount--;
291 if (btrb->refcount == 0) {
292 dealloc_buffer(btrb);
293 free(btrb);
294 }
295 }
296
297 static void add_btrb_to_children(struct btr_buffer *btrb,
298 struct btr_node *btrn, size_t consumed)
299 {
300 struct btr_node *ch;
301
302 if (btrn->start.tv_sec == 0)
303 btrn->start = *now;
304 FOR_EACH_CHILD(ch, btrn) {
305 struct btr_buffer_reference *br = para_calloc(sizeof(*br));
306 br->btrb = btrb;
307 br->consumed = consumed;
308 list_add_tail(&br->node, &ch->input_queue);
309 btrb->refcount++;
310 if (ch->start.tv_sec == 0)
311 ch->start = *now;
312 }
313 }
314
315 void btr_add_output(char *buf, size_t size, struct btr_node *btrn)
316 {
317 struct btr_buffer *btrb;
318
319 assert(size != 0);
320 if (list_empty(&btrn->children)) {
321 free(buf);
322 return;
323 }
324 btrb = new_btrb(buf, size);
325 add_btrb_to_children(btrb, btrn, 0);
326 }
327
328 /**
329 * Feed data to child nodes of the buffer tree.
330 *
331 * \param btrp The buffer pool.
332 *
333 */
334 void btr_add_output_pool(struct btr_pool *btrp, size_t size,
335 struct btr_node *btrn)
336 {
337 struct btr_buffer *btrb;
338 char *buf;
339 size_t avail;
340
341 assert(size != 0);
342 if (list_empty(&btrn->children))
343 return;
344 avail = btr_pool_get_buffer(btrp, &buf);
345 assert(avail >= size);
346 btr_pool_allocate(btrp, size);
347 btrb = new_btrb(buf, size);
348 btrb->pool = btrp;
349 add_btrb_to_children(btrb, btrn, 0);
350 }
351
352 /**
353 * Copy data to write head of a buffer pool and feed it to all children nodes.
354 *
355 * \param src The source buffer.
356 * \param n The size of the source buffer in bytes.
357 * \param btrp The destination buffer pool.
358 * \param btrn Add the data as output of this node.
359 *
360 * This is expensive. The caller must make sure the data fits into the buffer
361 * pool area.
362 */
363 void btr_copy(const void *src, size_t n, struct btr_pool *btrp,
364 struct btr_node *btrn)
365 {
366 char *buf;
367 size_t sz, copy;
368
369 if (n == 0)
370 return;
371 assert(n <= btr_pool_unused(btrp));
372 sz = btr_pool_get_buffer(btrp, &buf);
373 copy = PARA_MIN(sz, n);
374 memcpy(buf, src, copy);
375 btr_add_output_pool(btrp, copy, btrn);
376 if (copy == n)
377 return;
378 sz = btr_pool_get_buffer(btrp, &buf);
379 assert(sz >= n - copy);
380 memcpy(buf, src + copy, n - copy);
381 btr_add_output_pool(btrp, n - copy, btrn);
382 }
383
384 static void btr_pushdown_br(struct btr_buffer_reference *br, struct btr_node *btrn)
385 {
386 add_btrb_to_children(br->btrb, btrn, br->consumed);
387 btr_drop_buffer_reference(br);
388 }
389
390 void btr_pushdown(struct btr_node *btrn)
391 {
392 struct btr_buffer_reference *br, *tmp;
393
394 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
395 btr_pushdown_br(br, btrn);
396 }
397
398 int btr_pushdown_one(struct btr_node *btrn)
399 {
400 struct btr_buffer_reference *br;
401
402 if (list_empty(&btrn->input_queue))
403 return 0;
404 br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
405 btr_pushdown_br(br, btrn);
406 return 1;
407 }
408
409 /* Return true if this node has no children. */
410 bool btr_no_children(struct btr_node *btrn)
411 {
412 return list_empty(&btrn->children);
413 }
414
415 bool btr_no_parent(struct btr_node *btrn)
416 {
417 return !btrn->parent;
418 }
419
420 bool btr_inplace_ok(struct btr_node *btrn)
421 {
422 if (!btrn->parent)
423 return true;
424 return list_is_singular(&btrn->parent->children);
425 }
426
427 static inline size_t br_available_bytes(struct btr_buffer_reference *br)
428 {
429 return br->btrb->size - br->consumed;
430 }
431
432 size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
433 {
434 if (buf)
435 *buf = br->btrb->buf + br->consumed;
436 return br_available_bytes(br);
437 }
438
439 /**
440 * \return zero if the input buffer queue is empty.
441 */
442 size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
443 {
444 struct btr_buffer_reference *br;
445 char *buf, *result = NULL;
446 size_t sz, rv = 0;
447
448 FOR_EACH_BUFFER_REF(br, btrn) {
449 sz = btr_get_buffer_by_reference(br, &buf);
450 if (!result) {
451 result = buf;
452 rv = sz;
453 if (!br->btrb->pool)
454 break;
455 continue;
456 }
457 if (!br->btrb->pool)
458 break;
459 if (result + rv != buf)
460 break;
461 rv += sz;
462 }
463 if (bufp)
464 *bufp = result;
465 return rv;
466 }
467
468 void btr_consume(struct btr_node *btrn, size_t numbytes)
469 {
470 struct btr_buffer_reference *br, *tmp;
471 size_t sz;
472
473 if (numbytes == 0)
474 return;
475 br = get_first_input_br(btrn);
476 assert(br);
477
478 if (br->wrap_count == 0) {
479 /*
480 * No wrap buffer. Drop buffer references whose buffer
481 * has been fully used. */
482 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn) {
483 if (br->consumed + numbytes <= br->btrb->size) {
484 br->consumed += numbytes;
485 if (br->consumed == br->btrb->size)
486 btr_drop_buffer_reference(br);
487 return;
488 }
489 numbytes -= br->btrb->size - br->consumed;
490 btr_drop_buffer_reference(br);
491 }
492 assert(true);
493 }
494 /*
495
496 We have a wrap buffer, consume from it. If in total,
497 i.e. including previous calls to brt_consume(), less than
498 wrap_count has been consumed, there's nothing more we can do.
499
500 Otherwise we drop the wrap buffer and consume from subsequent
501 buffers of the input queue the correct amount of bytes. This
502 is the total number of bytes that have been consumed from the
503 wrap buffer.
504 */
505 PARA_DEBUG_LOG("consuming %zu/%zu bytes from wrap buffer\n", numbytes,
506 br_available_bytes(br));
507
508 assert(numbytes <= br_available_bytes(br));
509 if (br->consumed + numbytes < br->wrap_count) {
510 br->consumed += numbytes;
511 return;
512 }
513 PARA_DEBUG_LOG("dropping wrap buffer (%zu bytes)\n", br->btrb->size);
514 /* get rid of the wrap buffer */
515 sz = br->consumed + numbytes;
516 btr_drop_buffer_reference(br);
517 return btr_consume(btrn, sz);
518 }
519
520 static void flush_input_queue(struct btr_node *btrn)
521 {
522 struct btr_buffer_reference *br, *tmp;
523 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
524 btr_drop_buffer_reference(br);
525 }
526
527 void btr_free_node(struct btr_node *btrn)
528 {
529 if (!btrn)
530 return;
531 free(btrn->name);
532 free(btrn);
533 }
534
535 void btr_remove_node(struct btr_node *btrn)
536 {
537 struct btr_node *ch;
538
539 if (!btrn)
540 return;
541 PARA_NOTICE_LOG("removing btr node %s from buffer tree\n", btrn->name);
542 FOR_EACH_CHILD(ch, btrn)
543 ch->parent = NULL;
544 flush_input_queue(btrn);
545 if (btrn->parent)
546 list_del(&btrn->node);
547 }
548
549 size_t btr_get_input_queue_size(struct btr_node *btrn)
550 {
551 struct btr_buffer_reference *br;
552 size_t size = 0, wrap_consumed = 0;
553
554 FOR_EACH_BUFFER_REF(br, btrn) {
555 if (br->wrap_count != 0) {
556 wrap_consumed = br->consumed;
557 continue;
558 }
559 size += br_available_bytes(br);
560 }
561 assert(wrap_consumed <= size);
562 size -= wrap_consumed;
563 return size;
564 }
565
566 void btr_splice_out_node(struct btr_node *btrn)
567 {
568 struct btr_node *ch, *tmp;
569
570 assert(btrn);
571 PARA_NOTICE_LOG("splicing out %s\n", btrn->name);
572 btr_pushdown(btrn);
573 if (btrn->parent)
574 list_del(&btrn->node);
575 FOR_EACH_CHILD_SAFE(ch, tmp, btrn) {
576 PARA_INFO_LOG("parent(%s): %s\n", ch->name,
577 btrn->parent? btrn->parent->name : "NULL");
578 ch->parent = btrn->parent;
579 if (btrn->parent)
580 list_move(&ch->node, &btrn->parent->children);
581 }
582 assert(list_empty(&btrn->children));
583 }
584
585 /**
586 * Return the size of the largest input queue.
587 *
588 * Iterates over all children of the given node.
589 */
590 size_t btr_bytes_pending(struct btr_node *btrn)
591 {
592 size_t max_size = 0;
593 struct btr_node *ch;
594
595 FOR_EACH_CHILD(ch, btrn) {
596 size_t size = btr_get_input_queue_size(ch);
597 max_size = PARA_MAX(max_size, size);
598 }
599 return max_size;
600 }
601
602 int btr_exec(struct btr_node *btrn, const char *command, char **value_result)
603 {
604 if (!btrn)
605 return -ERRNO_TO_PARA_ERROR(EINVAL);
606 if (!btrn->execute)
607 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
608 return btrn->execute(btrn, command, value_result);
609 }
610
611 int btr_exec_up(struct btr_node *btrn, const char *command, char **value_result)
612 {
613 int ret;
614
615 for (; btrn; btrn = btrn->parent) {
616 struct btr_node *parent = btrn->parent;
617 if (!parent)
618 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
619 if (!parent->execute)
620 continue;
621 PARA_INFO_LOG("parent: %s, cmd: %s\n", parent->name, command);
622 ret = parent->execute(parent, command, value_result);
623 if (ret == -ERRNO_TO_PARA_ERROR(ENOTSUP))
624 continue;
625 if (ret < 0)
626 return ret;
627 if (value_result && *value_result)
628 PARA_NOTICE_LOG("%s(%s): %s\n", command, parent->name,
629 *value_result);
630 return 1;
631 }
632 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
633 }
634
635 void *btr_context(struct btr_node *btrn)
636 {
637 return btrn->context;
638 }
639
640 static bool need_buffer_pool_merge(struct btr_node *btrn)
641 {
642 struct btr_buffer_reference *br = get_first_input_br(btrn);
643
644 if (!br)
645 return false;
646 if (br->wrap_count != 0)
647 return true;
648 if (br->btrb->pool)
649 return true;
650 return false;
651 }
652
653 static void merge_input_pool(struct btr_node *btrn, size_t dest_size)
654 {
655 struct btr_buffer_reference *br, *wbr = NULL;
656 int num_refs; /* including wrap buffer */
657 char *buf, *buf1 = NULL, *buf2 = NULL;
658 size_t sz, sz1 = 0, sz2 = 0, wsz;
659
660 br = get_first_input_br(btrn);
661 if (!br || br_available_bytes(br) >= dest_size)
662 return;
663 num_refs = 0;
664 FOR_EACH_BUFFER_REF(br, btrn) {
665 num_refs++;
666 sz = btr_get_buffer_by_reference(br, &buf);
667 if (sz == 0)
668 break;
669 if (br->wrap_count != 0) {
670 assert(!wbr);
671 assert(num_refs == 1);
672 wbr = br;
673 if (sz >= dest_size)
674 return;
675 continue;
676 }
677 if (!buf1) {
678 buf1 = buf;
679 sz1 = sz;
680 goto next;
681 }
682 if (buf1 + sz1 == buf) {
683 sz1 += sz;
684 goto next;
685 }
686 if (!buf2) {
687 buf2 = buf;
688 sz2 = sz;
689 goto next;
690 }
691 assert(buf2 + sz2 == buf);
692 sz2 += sz;
693 next:
694 if (sz1 + sz2 >= dest_size)
695 break;
696 }
697 if (!buf2) /* nothing to do */
698 return;
699 assert(buf1 && sz2 > 0);
700 /*
701 * If the second buffer is large, we only take the first part of it to
702 * avoid having to memcpy() huge buffers.
703 */
704 sz2 = PARA_MIN(sz2, (size_t)(64 * 1024));
705 if (!wbr) {
706 /* Make a new wrap buffer combining buf1 and buf2. */
707 sz = sz1 + sz2;
708 buf = para_malloc(sz);
709 PARA_DEBUG_LOG("merging input buffers: (%p:%zu, %p:%zu) -> %p:%zu\n",
710 buf1, sz1, buf2, sz2, buf, sz);
711 memcpy(buf, buf1, sz1);
712 memcpy(buf + sz1, buf2, sz2);
713 br = para_calloc(sizeof(*br));
714 br->btrb = new_btrb(buf, sz);
715 br->btrb->refcount = 1;
716 br->consumed = 0;
717 /* This is a wrap buffer */
718 br->wrap_count = sz1;
719 para_list_add(&br->node, &btrn->input_queue);
720 return;
721 }
722 PARA_DEBUG_LOG("increasing wrap buffer, sz1: %zu, sz2: %zu\n", sz1, sz2);
723 /*
724 * We already have a wrap buffer, but it is too small. It might be
725 * partially used.
726 */
727 wsz = br_available_bytes(wbr);
728 if (wbr->wrap_count == sz1 && wbr->btrb->size >= sz1 + sz2) /* nothing we can do about it */
729 return;
730 sz = sz1 + sz2 - wbr->btrb->size; /* amount of new data */
731 wbr->btrb->size += sz;
732 wbr->btrb->buf = para_realloc(wbr->btrb->buf, wbr->btrb->size);
733 /* copy the new data to the end of the reallocated buffer */
734 assert(sz2 >= sz);
735 memcpy(wbr->btrb->buf + wbr->btrb->size - sz, buf2 + sz2 - sz, sz);
736 }
737
738 /**
739 * Merge the first two input buffers into one.
740 *
741 * This is a quite expensive operation.
742 *
743 * \return The number of buffers that have been available (zero, one or two).
744 */
745 static int merge_input(struct btr_node *btrn)
746 {
747 struct btr_buffer_reference *brs[2], *br;
748 char *bufs[2], *buf;
749 size_t szs[2], sz;
750 int i;
751
752 if (list_empty(&btrn->input_queue))
753 return 0;
754 if (list_is_singular(&btrn->input_queue))
755 return 1;
756 i = 0;
757 /* get references to the first two buffers */
758 FOR_EACH_BUFFER_REF(br, btrn) {
759 brs[i] = br;
760 szs[i] = btr_get_buffer_by_reference(brs[i], bufs + i);
761 i++;
762 if (i == 2)
763 break;
764 }
765 /* make a new btrb that combines the two buffers and a br to it. */
766 sz = szs[0] + szs[1];
767 buf = para_malloc(sz);
768 PARA_DEBUG_LOG("%s: memory merging input buffers: (%zu, %zu) -> %zu\n",
769 btrn->name, szs[0], szs[1], sz);
770 memcpy(buf, bufs[0], szs[0]);
771 memcpy(buf + szs[0], bufs[1], szs[1]);
772
773 br = para_calloc(sizeof(*br));
774 br->btrb = new_btrb(buf, sz);
775 br->btrb->refcount = 1;
776
777 /* replace the first two refs by the new one */
778 btr_drop_buffer_reference(brs[0]);
779 btr_drop_buffer_reference(brs[1]);
780 para_list_add(&br->node, &btrn->input_queue);
781 return 2;
782 }
783
784 void btr_merge(struct btr_node *btrn, size_t dest_size)
785 {
786 if (need_buffer_pool_merge(btrn))
787 return merge_input_pool(btrn, dest_size);
788 for (;;) {
789 char *buf;
790 size_t len = btr_next_buffer(btrn, &buf);
791 if (len >= dest_size)
792 return;
793 PARA_DEBUG_LOG("input size = %zu < %zu = dest\n", len, dest_size);
794 if (merge_input(btrn) < 2)
795 return;
796 }
797 }
798
799 bool btr_eof(struct btr_node *btrn)
800 {
801 char *buf;
802 size_t len = btr_next_buffer(btrn, &buf);
803
804 return (len == 0 && btr_no_parent(btrn));
805 }
806
807 void log_tree_recursively(struct btr_node *btrn, int loglevel, int depth)
808 {
809 struct btr_node *ch;
810 const char spaces[] = " ", *space = spaces + 16 - depth;
811
812 if (depth > 16)
813 return;
814 para_log(loglevel, "%s%s\n", space, btrn->name);
815 FOR_EACH_CHILD(ch, btrn)
816 log_tree_recursively(ch, loglevel, depth + 1);
817 }
818
819 void btr_log_tree(struct btr_node *btrn, int loglevel)
820 {
821 return log_tree_recursively(btrn, loglevel, 0);
822 }
823
824 /*
825 * \return \a root if \a name is \p NULL.
826 */
827 struct btr_node *btr_search_node(const char *name, struct btr_node *root)
828 {
829 struct btr_node *ch;
830
831 if (!name)
832 return root;
833 if (!strcmp(root->name, name))
834 return root;
835 FOR_EACH_CHILD(ch, root) {
836 struct btr_node *result = btr_search_node(name, ch);
837 if (result)
838 return result;
839 }
840 return NULL;
841 }
842
843 /** 640K ought to be enough for everybody ;) */
844 #define BTRN_MAX_PENDING (640 * 1024)
845
846 int btr_node_status(struct btr_node *btrn, size_t min_iqs,
847 enum btr_node_type type)
848 {
849 size_t iqs;
850
851 assert(btrn);
852 if (type != BTR_NT_LEAF) {
853 if (btr_no_children(btrn))
854 return -E_BTR_NO_CHILD;
855 if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
856 return 0;
857 }
858 if (type != BTR_NT_ROOT) {
859 if (btr_eof(btrn))
860 return -E_BTR_EOF;
861 iqs = btr_get_input_queue_size(btrn);
862 if (iqs == 0) /* we have a parent, because not eof */
863 return 0;
864 if (iqs < min_iqs && !btr_no_parent(btrn))
865 return 0;
866 }
867 return 1;
868 }
869
870 void btr_get_node_start(struct btr_node *btrn, struct timeval *tv)
871 {
872 *tv = btrn->start;
873 }