7 #include "buffer_tree.h"
19 enum btr_buffer_flags {
20 /* changes the way the buffer is deallocated */
27 /** The number of references to this buffer. */
29 struct btr_pool *pool;
32 struct btr_buffer_reference {
33 struct btr_buffer *btrb;
35 /* Each buffer reference belongs to the buffer queue list of some buffer tree node. */
36 struct list_head node;
42 struct btr_node *parent;
43 /* The position of this btr node in the buffer tree. */
44 struct list_head node;
45 /* The children nodes of this btr node are linked together in a list. */
46 struct list_head children;
47 /* Time of first data transfer. */
50 * The input queue is a list of references to btr buffers. Each item on
51 * the list represents an input buffer which has not been completely
52 * used by this btr node.
54 struct list_head input_queue;
55 btr_command_handler execute;
59 struct btr_pool *btr_pool_new(const char *name, size_t area_size)
61 struct btr_pool *btrp;
63 PARA_INFO_LOG("%s, %zu bytes\n", name, area_size);
64 btrp = para_malloc(sizeof(*btrp));
65 btrp->area_start = para_malloc(area_size);
66 btrp->area_end = btrp->area_start + area_size;
67 btrp->rhead = btrp->area_start;
68 btrp->whead = btrp->area_start;
69 btrp->name = para_strdup(name);
73 /* whead = NULL means area full */
75 void btr_pool_free(struct btr_pool *btrp)
79 free(btrp->area_start);
84 size_t btr_pool_size(struct btr_pool *btrp)
86 return btrp->area_end - btrp->area_start;
89 size_t btr_pool_filled(struct btr_pool *btrp)
92 return btr_pool_size(btrp);
93 if (btrp->rhead <= btrp->whead)
94 return btrp->whead - btrp->rhead;
95 return btr_pool_size(btrp) - (btrp->rhead - btrp->whead);
98 size_t btr_pool_unused(struct btr_pool *btrp)
100 return btr_pool_size(btrp) - btr_pool_filled(btrp);
104 * Return maximal size available for one read. This is
105 * smaller than the value returned by btr_pool_unused().
107 size_t btr_pool_available(struct btr_pool *btrp)
111 if (btrp->rhead <= btrp->whead)
112 return btrp->area_end - btrp->whead;
113 return btrp->rhead - btrp->whead;
116 size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result)
119 *result = btrp->whead;
120 return btr_pool_available(btrp);
123 void btr_pool_allocate(struct btr_pool *btrp, size_t size)
129 assert(size <= btr_pool_available(btrp));
130 end = btrp->whead + size;
131 assert(end <= btrp->area_end);
133 if (end == btrp->area_end) {
134 PARA_DEBUG_LOG("%s: end of pool area reached\n", btrp->name);
135 end = btrp->area_start;
137 if (end == btrp->rhead) {
138 PARA_DEBUG_LOG("%s btrp buffer full\n", btrp->name);
139 end = NULL; /* buffer full */
144 static void btr_pool_deallocate(struct btr_pool *btrp, size_t size)
146 char *end = btrp->rhead + size;
150 assert(end <= btrp->area_end);
151 assert(size <= btr_pool_filled(btrp));
152 if (end == btrp->area_end)
153 end = btrp->area_start;
155 btrp->whead = btrp->rhead;
157 if (btrp->rhead == btrp->whead)
158 btrp->rhead = btrp->whead = btrp->area_start;
161 #define FOR_EACH_CHILD(_tn, _btrn) list_for_each_entry((_tn), \
162 &((_btrn)->children), node)
163 #define FOR_EACH_CHILD_SAFE(_tn, _tmp, _btrn) \
164 list_for_each_entry_safe((_tn), (_tmp), &((_btrn)->children), node)
166 #define FOR_EACH_BUFFER_REF(_br, _btrn) \
167 list_for_each_entry((_br), &(_btrn)->input_queue, node)
168 #define FOR_EACH_BUFFER_REF_SAFE(_br, _tmp, _btrn) \
169 list_for_each_entry_safe((_br), (_tmp), &(_btrn)->input_queue, node)
171 struct btr_node *btr_new_node(struct btr_node_description *bnd)
173 struct btr_node *btrn = para_malloc(sizeof(*btrn));
175 btrn->name = para_strdup(bnd->name);
176 btrn->parent = bnd->parent;
177 btrn->execute = bnd->handler;
178 btrn->context = bnd->context;
179 btrn->start.tv_sec = 0;
180 btrn->start.tv_usec = 0;
181 INIT_LIST_HEAD(&btrn->children);
182 INIT_LIST_HEAD(&btrn->input_queue);
185 list_add_tail(&btrn->node, &bnd->parent->children);
186 PARA_INFO_LOG("new leaf node: %s (child of %s)\n",
187 bnd->name, bnd->parent->name);
189 PARA_INFO_LOG("added %s as btr root\n", bnd->name);
193 assert(!bnd->child->parent);
194 PARA_INFO_LOG("new root: %s (was %s)\n",
195 bnd->name, bnd->child->name);
197 list_add_tail(&bnd->child->node, &btrn->children);
199 bnd->child->parent = btrn;
202 PARA_EMERG_LOG("inserting internal nodes not yet supported.\n");
204 assert(bnd->child->parent == bnd->parent);
210 * Allocate a new btr buffer.
212 * The freshly allocated buffer will have a zero refcount and will
213 * not be associated with a btr pool.
215 static struct btr_buffer *new_btrb(char *buf, size_t size)
217 struct btr_buffer *btrb = para_calloc(sizeof(*btrb));
224 static void dealloc_buffer(struct btr_buffer *btrb)
227 btr_pool_deallocate(btrb->pool, btrb->size);
232 static struct btr_buffer_reference *get_first_input_br(struct btr_node *btrn)
234 if (list_empty(&btrn->input_queue))
236 return list_first_entry(&btrn->input_queue,
237 struct btr_buffer_reference, node);
241 * Deallocate the reference, release the resources if refcount drops to zero.
243 static void btr_drop_buffer_reference(struct btr_buffer_reference *br)
245 struct btr_buffer *btrb = br->btrb;
250 if (btrb->refcount == 0) {
251 dealloc_buffer(btrb);
256 static void add_btrb_to_children(struct btr_buffer *btrb,
257 struct btr_node *btrn, size_t consumed)
261 if (btrn->start.tv_sec == 0)
263 FOR_EACH_CHILD(ch, btrn) {
264 struct btr_buffer_reference *br = para_calloc(sizeof(*br));
266 br->consumed = consumed;
267 list_add_tail(&br->node, &ch->input_queue);
269 if (ch->start.tv_sec == 0)
274 void btr_add_output(char *buf, size_t size, struct btr_node *btrn)
276 struct btr_buffer *btrb;
279 if (list_empty(&btrn->children)) {
283 btrb = new_btrb(buf, size);
284 add_btrb_to_children(btrb, btrn, 0);
287 void btr_add_output_pool(struct btr_pool *btrp, size_t size,
288 struct btr_node *btrn)
290 struct btr_buffer *btrb;
295 if (list_empty(&btrn->children))
297 avail = btr_pool_get_buffer(btrp, &buf);
298 assert(avail >= size);
299 btr_pool_allocate(btrp, size);
300 btrb = new_btrb(buf, size);
302 add_btrb_to_children(btrb, btrn, 0);
305 void btr_copy(const void *src, size_t n, struct btr_pool *btrp,
306 struct btr_node *btrn)
313 assert(n <= btr_pool_unused(btrp));
314 sz = btr_pool_get_buffer(btrp, &buf);
315 copy = PARA_MIN(sz, n);
316 memcpy(buf, src, copy);
317 btr_add_output_pool(btrp, copy, btrn);
320 sz = btr_pool_get_buffer(btrp, &buf);
321 assert(sz >= n - copy);
322 memcpy(buf, src + copy, n - copy);
323 btr_add_output_pool(btrp, n - copy, btrn);
326 static void btr_pushdown_br(struct btr_buffer_reference *br, struct btr_node *btrn)
328 add_btrb_to_children(br->btrb, btrn, br->consumed);
329 btr_drop_buffer_reference(br);
332 void btr_pushdown(struct btr_node *btrn)
334 struct btr_buffer_reference *br, *tmp;
336 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
337 btr_pushdown_br(br, btrn);
340 int btr_pushdown_one(struct btr_node *btrn)
342 struct btr_buffer_reference *br;
344 if (list_empty(&btrn->input_queue))
346 br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
347 btr_pushdown_br(br, btrn);
351 /* Return true if this node has no children. */
352 bool btr_no_children(struct btr_node *btrn)
354 return list_empty(&btrn->children);
357 bool btr_no_parent(struct btr_node *btrn)
359 return !btrn->parent;
362 bool btr_inplace_ok(struct btr_node *btrn)
366 return list_is_singular(&btrn->parent->children);
369 static inline size_t br_available_bytes(struct btr_buffer_reference *br)
371 return br->btrb->size - br->consumed;
374 size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
377 *buf = br->btrb->buf + br->consumed;
378 return br_available_bytes(br);
382 * \return zero if the input buffer queue is empty.
384 size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
386 struct btr_buffer_reference *br;
387 char *buf, *result = NULL;
390 FOR_EACH_BUFFER_REF(br, btrn) {
391 sz = btr_get_buffer_by_reference(br, &buf);
401 if (result + rv != buf)
410 void btr_consume(struct btr_node *btrn, size_t numbytes)
412 struct btr_buffer_reference *br, *tmp;
417 br = get_first_input_br(btrn);
420 if (br->wrap_count == 0) {
422 * No wrap buffer. Drop buffer references whose buffer
423 * has been fully used. */
424 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn) {
425 if (br->consumed + numbytes <= br->btrb->size) {
426 br->consumed += numbytes;
427 if (br->consumed == br->btrb->size)
428 btr_drop_buffer_reference(br);
431 numbytes -= br->btrb->size - br->consumed;
432 btr_drop_buffer_reference(br);
438 We have a wrap buffer, consume from it. If in total,
439 i.e. including previous calls to brt_consume(), less than
440 wrap_count has been consumed, there's nothing more we can do.
442 Otherwise we drop the wrap buffer and consume from subsequent
443 buffers of the input queue the correct amount of bytes. This
444 is the total number of bytes that have been consumed from the
447 PARA_DEBUG_LOG("consuming %zu/%zu bytes from wrap buffer\n", numbytes,
448 br_available_bytes(br));
450 assert(numbytes <= br_available_bytes(br));
451 if (br->consumed + numbytes < br->wrap_count) {
452 br->consumed += numbytes;
455 PARA_DEBUG_LOG("dropping wrap buffer (%zu bytes)\n", br->btrb->size);
456 /* get rid of the wrap buffer */
457 sz = br->consumed + numbytes;
458 btr_drop_buffer_reference(br);
459 return btr_consume(btrn, sz);
462 static void flush_input_queue(struct btr_node *btrn)
464 struct btr_buffer_reference *br, *tmp;
465 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
466 btr_drop_buffer_reference(br);
469 void btr_free_node(struct btr_node *btrn)
477 void btr_remove_node(struct btr_node *btrn)
483 PARA_NOTICE_LOG("removing btr node %s from buffer tree\n", btrn->name);
484 FOR_EACH_CHILD(ch, btrn)
486 flush_input_queue(btrn);
488 list_del(&btrn->node);
491 size_t btr_get_input_queue_size(struct btr_node *btrn)
493 struct btr_buffer_reference *br;
494 size_t size = 0, wrap_consumed = 0;
496 FOR_EACH_BUFFER_REF(br, btrn) {
497 if (br->wrap_count != 0) {
498 wrap_consumed = br->consumed;
501 size += br_available_bytes(br);
503 assert(wrap_consumed <= size);
504 size -= wrap_consumed;
508 void btr_splice_out_node(struct btr_node *btrn)
510 struct btr_node *ch, *tmp;
513 PARA_NOTICE_LOG("splicing out %s\n", btrn->name);
516 list_del(&btrn->node);
517 FOR_EACH_CHILD_SAFE(ch, tmp, btrn) {
518 PARA_INFO_LOG("parent(%s): %s\n", ch->name,
519 btrn->parent? btrn->parent->name : "NULL");
520 ch->parent = btrn->parent;
522 list_move(&ch->node, &btrn->parent->children);
524 assert(list_empty(&btrn->children));
528 * Return the size of the largest input queue.
530 * Iterates over all children of the given node.
532 size_t btr_bytes_pending(struct btr_node *btrn)
537 FOR_EACH_CHILD(ch, btrn) {
538 size_t size = btr_get_input_queue_size(ch);
539 max_size = PARA_MAX(max_size, size);
544 int btr_exec(struct btr_node *btrn, const char *command, char **value_result)
547 return -ERRNO_TO_PARA_ERROR(EINVAL);
549 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
550 return btrn->execute(btrn, command, value_result);
553 int btr_exec_up(struct btr_node *btrn, const char *command, char **value_result)
557 for (; btrn; btrn = btrn->parent) {
558 struct btr_node *parent = btrn->parent;
560 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
561 if (!parent->execute)
563 PARA_INFO_LOG("parent: %s, cmd: %s\n", parent->name, command);
564 ret = parent->execute(parent, command, value_result);
565 if (ret == -ERRNO_TO_PARA_ERROR(ENOTSUP))
569 if (value_result && *value_result)
570 PARA_NOTICE_LOG("%s(%s): %s\n", command, parent->name,
574 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
577 void *btr_context(struct btr_node *btrn)
579 return btrn->context;
582 static bool need_buffer_pool_merge(struct btr_node *btrn)
584 struct btr_buffer_reference *br = get_first_input_br(btrn);
588 if (br->wrap_count != 0)
595 static void merge_input_pool(struct btr_node *btrn, size_t dest_size)
597 struct btr_buffer_reference *br, *wbr = NULL;
598 int num_refs; /* including wrap buffer */
599 char *buf, *buf1 = NULL, *buf2 = NULL;
600 size_t sz, sz1 = 0, sz2 = 0, wsz;
602 br = get_first_input_br(btrn);
603 if (!br || br_available_bytes(br) >= dest_size)
606 FOR_EACH_BUFFER_REF(br, btrn) {
608 sz = btr_get_buffer_by_reference(br, &buf);
611 if (br->wrap_count != 0) {
613 assert(num_refs == 1);
624 if (buf1 + sz1 == buf) {
633 assert(buf2 + sz2 == buf);
636 if (sz1 + sz2 >= dest_size)
639 if (!buf2) /* nothing to do */
641 assert(buf1 && sz2 > 0);
643 * If the second buffer is large, we only take the first part of it to
644 * avoid having to memcpy() huge buffers.
646 sz2 = PARA_MIN(sz2, (size_t)(64 * 1024));
648 /* Make a new wrap buffer combining buf1 and buf2. */
650 buf = para_malloc(sz);
651 PARA_DEBUG_LOG("merging input buffers: (%p:%zu, %p:%zu) -> %p:%zu\n",
652 buf1, sz1, buf2, sz2, buf, sz);
653 memcpy(buf, buf1, sz1);
654 memcpy(buf + sz1, buf2, sz2);
655 br = para_calloc(sizeof(*br));
656 br->btrb = new_btrb(buf, sz);
657 br->btrb->refcount = 1;
659 /* This is a wrap buffer */
660 br->wrap_count = sz1;
661 para_list_add(&br->node, &btrn->input_queue);
664 PARA_DEBUG_LOG("increasing wrap buffer, sz1: %zu, sz2: %zu\n", sz1, sz2);
666 * We already have a wrap buffer, but it is too small. It might be
669 wsz = br_available_bytes(wbr);
670 if (wbr->wrap_count == sz1 && wbr->btrb->size >= sz1 + sz2) /* nothing we can do about it */
672 sz = sz1 + sz2 - wbr->btrb->size; /* amount of new data */
673 wbr->btrb->size += sz;
674 wbr->btrb->buf = para_realloc(wbr->btrb->buf, wbr->btrb->size);
675 /* copy the new data to the end of the reallocated buffer */
677 memcpy(wbr->btrb->buf + wbr->btrb->size - sz, buf2 + sz2 - sz, sz);
681 * Merge the first two input buffers into one.
683 * This is a quite expensive operation.
685 * \return The number of buffers that have been available (zero, one or two).
687 static int merge_input(struct btr_node *btrn)
689 struct btr_buffer_reference *brs[2], *br;
694 if (list_empty(&btrn->input_queue))
696 if (list_is_singular(&btrn->input_queue))
699 /* get references to the first two buffers */
700 FOR_EACH_BUFFER_REF(br, btrn) {
702 szs[i] = btr_get_buffer_by_reference(brs[i], bufs + i);
707 /* make a new btrb that combines the two buffers and a br to it. */
708 sz = szs[0] + szs[1];
709 buf = para_malloc(sz);
710 PARA_DEBUG_LOG("%s: memory merging input buffers: (%zu, %zu) -> %zu\n",
711 btrn->name, szs[0], szs[1], sz);
712 memcpy(buf, bufs[0], szs[0]);
713 memcpy(buf + szs[0], bufs[1], szs[1]);
715 br = para_calloc(sizeof(*br));
716 br->btrb = new_btrb(buf, sz);
717 br->btrb->refcount = 1;
719 /* replace the first two refs by the new one */
720 btr_drop_buffer_reference(brs[0]);
721 btr_drop_buffer_reference(brs[1]);
722 para_list_add(&br->node, &btrn->input_queue);
726 void btr_merge(struct btr_node *btrn, size_t dest_size)
728 if (need_buffer_pool_merge(btrn))
729 return merge_input_pool(btrn, dest_size);
732 size_t len = btr_next_buffer(btrn, &buf);
733 if (len >= dest_size)
735 PARA_DEBUG_LOG("input size = %zu < %zu = dest\n", len, dest_size);
736 if (merge_input(btrn) < 2)
741 bool btr_eof(struct btr_node *btrn)
744 size_t len = btr_next_buffer(btrn, &buf);
746 return (len == 0 && btr_no_parent(btrn));
749 void log_tree_recursively(struct btr_node *btrn, int loglevel, int depth)
752 const char spaces[] = " ", *space = spaces + 16 - depth;
756 para_log(loglevel, "%s%s\n", space, btrn->name);
757 FOR_EACH_CHILD(ch, btrn)
758 log_tree_recursively(ch, loglevel, depth + 1);
761 void btr_log_tree(struct btr_node *btrn, int loglevel)
763 return log_tree_recursively(btrn, loglevel, 0);
767 * \return \a root if \a name is \p NULL.
769 struct btr_node *btr_search_node(const char *name, struct btr_node *root)
775 if (!strcmp(root->name, name))
777 FOR_EACH_CHILD(ch, root) {
778 struct btr_node *result = btr_search_node(name, ch);
785 /** 640K ought to be enough for everybody ;) */
786 #define BTRN_MAX_PENDING (640 * 1024)
788 int btr_node_status(struct btr_node *btrn, size_t min_iqs,
789 enum btr_node_type type)
794 if (type != BTR_NT_LEAF) {
795 if (btr_no_children(btrn))
796 return -E_BTR_NO_CHILD;
797 if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
800 if (type != BTR_NT_ROOT) {
803 iqs = btr_get_input_queue_size(btrn);
804 if (iqs == 0) /* we have a parent, because not eof */
806 if (iqs < min_iqs && !btr_no_parent(btrn))
812 void btr_get_node_start(struct btr_node *btrn, struct timeval *tv)