7 #include "buffer_tree.h"
11 /* whead = NULL means area full */
20 enum btr_buffer_flags {
21 /* changes the way the buffer is deallocated */
28 /** The number of references to this buffer. */
30 struct btr_pool *pool;
33 struct btr_buffer_reference {
34 struct btr_buffer *btrb;
36 /* Each buffer reference belongs to the buffer queue list of some buffer tree node. */
37 struct list_head node;
43 struct btr_node *parent;
44 /* The position of this btr node in the buffer tree. */
45 struct list_head node;
46 /* The children nodes of this btr node are linked together in a list. */
47 struct list_head children;
48 /* Time of first data transfer. */
51 * The input queue is a list of references to btr buffers. Each item on
52 * the list represents an input buffer which has not been completely
53 * used by this btr node.
55 struct list_head input_queue;
56 btr_command_handler execute;
61 * Create a new buffer pool.
63 * \param name The name of the new buffer pool.
65 * \param area The size in bytes of the pool area.
67 * \return An opaque pointer to the newly created buffer pool. It must be
68 * passed to btr_pool_free() after it is no longer used to deallocate all
71 struct btr_pool *btr_pool_new(const char *name, size_t area_size)
73 struct btr_pool *btrp;
75 PARA_INFO_LOG("%s, %zu bytes\n", name, area_size);
76 btrp = para_malloc(sizeof(*btrp));
77 btrp->area_start = para_malloc(area_size);
78 btrp->area_end = btrp->area_start + area_size;
79 btrp->rhead = btrp->area_start;
80 btrp->whead = btrp->area_start;
81 btrp->name = para_strdup(name);
86 * Dellocate resources used by a buffer pool.
88 * \param btrp A pointer obtained via btr_pool_new().
90 void btr_pool_free(struct btr_pool *btrp)
94 free(btrp->area_start);
100 * Return the size of the buffer pool area.
102 * \param btrp The buffer pool.
104 * \return The same value which was passed during creation time to
107 size_t btr_pool_size(struct btr_pool *btrp)
109 return btrp->area_end - btrp->area_start;
112 size_t btr_pool_filled(struct btr_pool *btrp)
115 return btr_pool_size(btrp);
116 if (btrp->rhead <= btrp->whead)
117 return btrp->whead - btrp->rhead;
118 return btr_pool_size(btrp) - (btrp->rhead - btrp->whead);
121 size_t btr_pool_unused(struct btr_pool *btrp)
123 return btr_pool_size(btrp) - btr_pool_filled(btrp);
127 * Return maximal size available for one read. This is
128 * smaller than the value returned by btr_pool_unused().
130 size_t btr_pool_available(struct btr_pool *btrp)
134 if (btrp->rhead <= btrp->whead)
135 return btrp->area_end - btrp->whead;
136 return btrp->rhead - btrp->whead;
139 size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result)
142 *result = btrp->whead;
143 return btr_pool_available(btrp);
146 void btr_pool_allocate(struct btr_pool *btrp, size_t size)
152 assert(size <= btr_pool_available(btrp));
153 end = btrp->whead + size;
154 assert(end <= btrp->area_end);
156 if (end == btrp->area_end) {
157 PARA_DEBUG_LOG("%s: end of pool area reached\n", btrp->name);
158 end = btrp->area_start;
160 if (end == btrp->rhead) {
161 PARA_DEBUG_LOG("%s btrp buffer full\n", btrp->name);
162 end = NULL; /* buffer full */
167 static void btr_pool_deallocate(struct btr_pool *btrp, size_t size)
169 char *end = btrp->rhead + size;
173 assert(end <= btrp->area_end);
174 assert(size <= btr_pool_filled(btrp));
175 if (end == btrp->area_end)
176 end = btrp->area_start;
178 btrp->whead = btrp->rhead;
180 if (btrp->rhead == btrp->whead)
181 btrp->rhead = btrp->whead = btrp->area_start;
184 #define FOR_EACH_CHILD(_tn, _btrn) list_for_each_entry((_tn), \
185 &((_btrn)->children), node)
186 #define FOR_EACH_CHILD_SAFE(_tn, _tmp, _btrn) \
187 list_for_each_entry_safe((_tn), (_tmp), &((_btrn)->children), node)
189 #define FOR_EACH_BUFFER_REF(_br, _btrn) \
190 list_for_each_entry((_br), &(_btrn)->input_queue, node)
191 #define FOR_EACH_BUFFER_REF_SAFE(_br, _tmp, _btrn) \
192 list_for_each_entry_safe((_br), (_tmp), &(_btrn)->input_queue, node)
194 struct btr_node *btr_new_node(struct btr_node_description *bnd)
196 struct btr_node *btrn = para_malloc(sizeof(*btrn));
198 btrn->name = para_strdup(bnd->name);
199 btrn->parent = bnd->parent;
200 btrn->execute = bnd->handler;
201 btrn->context = bnd->context;
202 btrn->start.tv_sec = 0;
203 btrn->start.tv_usec = 0;
204 INIT_LIST_HEAD(&btrn->children);
205 INIT_LIST_HEAD(&btrn->input_queue);
208 list_add_tail(&btrn->node, &bnd->parent->children);
209 PARA_INFO_LOG("new leaf node: %s (child of %s)\n",
210 bnd->name, bnd->parent->name);
212 PARA_INFO_LOG("added %s as btr root\n", bnd->name);
216 assert(!bnd->child->parent);
217 PARA_INFO_LOG("new root: %s (was %s)\n",
218 bnd->name, bnd->child->name);
220 list_add_tail(&bnd->child->node, &btrn->children);
222 bnd->child->parent = btrn;
225 PARA_EMERG_LOG("inserting internal nodes not yet supported.\n");
227 assert(bnd->child->parent == bnd->parent);
233 * Allocate a new btr buffer.
235 * The freshly allocated buffer will have a zero refcount and will
236 * not be associated with a btr pool.
238 static struct btr_buffer *new_btrb(char *buf, size_t size)
240 struct btr_buffer *btrb = para_calloc(sizeof(*btrb));
247 static void dealloc_buffer(struct btr_buffer *btrb)
250 btr_pool_deallocate(btrb->pool, btrb->size);
255 static struct btr_buffer_reference *get_first_input_br(struct btr_node *btrn)
257 if (list_empty(&btrn->input_queue))
259 return list_first_entry(&btrn->input_queue,
260 struct btr_buffer_reference, node);
264 * Deallocate the reference, release the resources if refcount drops to zero.
266 static void btr_drop_buffer_reference(struct btr_buffer_reference *br)
268 struct btr_buffer *btrb = br->btrb;
273 if (btrb->refcount == 0) {
274 dealloc_buffer(btrb);
279 static void add_btrb_to_children(struct btr_buffer *btrb,
280 struct btr_node *btrn, size_t consumed)
284 if (btrn->start.tv_sec == 0)
286 FOR_EACH_CHILD(ch, btrn) {
287 struct btr_buffer_reference *br = para_calloc(sizeof(*br));
289 br->consumed = consumed;
290 list_add_tail(&br->node, &ch->input_queue);
292 if (ch->start.tv_sec == 0)
297 void btr_add_output(char *buf, size_t size, struct btr_node *btrn)
299 struct btr_buffer *btrb;
302 if (list_empty(&btrn->children)) {
306 btrb = new_btrb(buf, size);
307 add_btrb_to_children(btrb, btrn, 0);
310 void btr_add_output_pool(struct btr_pool *btrp, size_t size,
311 struct btr_node *btrn)
313 struct btr_buffer *btrb;
318 if (list_empty(&btrn->children))
320 avail = btr_pool_get_buffer(btrp, &buf);
321 assert(avail >= size);
322 btr_pool_allocate(btrp, size);
323 btrb = new_btrb(buf, size);
325 add_btrb_to_children(btrb, btrn, 0);
328 void btr_copy(const void *src, size_t n, struct btr_pool *btrp,
329 struct btr_node *btrn)
336 assert(n <= btr_pool_unused(btrp));
337 sz = btr_pool_get_buffer(btrp, &buf);
338 copy = PARA_MIN(sz, n);
339 memcpy(buf, src, copy);
340 btr_add_output_pool(btrp, copy, btrn);
343 sz = btr_pool_get_buffer(btrp, &buf);
344 assert(sz >= n - copy);
345 memcpy(buf, src + copy, n - copy);
346 btr_add_output_pool(btrp, n - copy, btrn);
349 static void btr_pushdown_br(struct btr_buffer_reference *br, struct btr_node *btrn)
351 add_btrb_to_children(br->btrb, btrn, br->consumed);
352 btr_drop_buffer_reference(br);
355 void btr_pushdown(struct btr_node *btrn)
357 struct btr_buffer_reference *br, *tmp;
359 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
360 btr_pushdown_br(br, btrn);
363 int btr_pushdown_one(struct btr_node *btrn)
365 struct btr_buffer_reference *br;
367 if (list_empty(&btrn->input_queue))
369 br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
370 btr_pushdown_br(br, btrn);
374 /* Return true if this node has no children. */
375 bool btr_no_children(struct btr_node *btrn)
377 return list_empty(&btrn->children);
380 bool btr_no_parent(struct btr_node *btrn)
382 return !btrn->parent;
385 bool btr_inplace_ok(struct btr_node *btrn)
389 return list_is_singular(&btrn->parent->children);
392 static inline size_t br_available_bytes(struct btr_buffer_reference *br)
394 return br->btrb->size - br->consumed;
397 size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
400 *buf = br->btrb->buf + br->consumed;
401 return br_available_bytes(br);
405 * \return zero if the input buffer queue is empty.
407 size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
409 struct btr_buffer_reference *br;
410 char *buf, *result = NULL;
413 FOR_EACH_BUFFER_REF(br, btrn) {
414 sz = btr_get_buffer_by_reference(br, &buf);
424 if (result + rv != buf)
433 void btr_consume(struct btr_node *btrn, size_t numbytes)
435 struct btr_buffer_reference *br, *tmp;
440 br = get_first_input_br(btrn);
443 if (br->wrap_count == 0) {
445 * No wrap buffer. Drop buffer references whose buffer
446 * has been fully used. */
447 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn) {
448 if (br->consumed + numbytes <= br->btrb->size) {
449 br->consumed += numbytes;
450 if (br->consumed == br->btrb->size)
451 btr_drop_buffer_reference(br);
454 numbytes -= br->btrb->size - br->consumed;
455 btr_drop_buffer_reference(br);
461 We have a wrap buffer, consume from it. If in total,
462 i.e. including previous calls to brt_consume(), less than
463 wrap_count has been consumed, there's nothing more we can do.
465 Otherwise we drop the wrap buffer and consume from subsequent
466 buffers of the input queue the correct amount of bytes. This
467 is the total number of bytes that have been consumed from the
470 PARA_DEBUG_LOG("consuming %zu/%zu bytes from wrap buffer\n", numbytes,
471 br_available_bytes(br));
473 assert(numbytes <= br_available_bytes(br));
474 if (br->consumed + numbytes < br->wrap_count) {
475 br->consumed += numbytes;
478 PARA_DEBUG_LOG("dropping wrap buffer (%zu bytes)\n", br->btrb->size);
479 /* get rid of the wrap buffer */
480 sz = br->consumed + numbytes;
481 btr_drop_buffer_reference(br);
482 return btr_consume(btrn, sz);
485 static void flush_input_queue(struct btr_node *btrn)
487 struct btr_buffer_reference *br, *tmp;
488 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
489 btr_drop_buffer_reference(br);
492 void btr_free_node(struct btr_node *btrn)
500 void btr_remove_node(struct btr_node *btrn)
506 PARA_NOTICE_LOG("removing btr node %s from buffer tree\n", btrn->name);
507 FOR_EACH_CHILD(ch, btrn)
509 flush_input_queue(btrn);
511 list_del(&btrn->node);
514 size_t btr_get_input_queue_size(struct btr_node *btrn)
516 struct btr_buffer_reference *br;
517 size_t size = 0, wrap_consumed = 0;
519 FOR_EACH_BUFFER_REF(br, btrn) {
520 if (br->wrap_count != 0) {
521 wrap_consumed = br->consumed;
524 size += br_available_bytes(br);
526 assert(wrap_consumed <= size);
527 size -= wrap_consumed;
531 void btr_splice_out_node(struct btr_node *btrn)
533 struct btr_node *ch, *tmp;
536 PARA_NOTICE_LOG("splicing out %s\n", btrn->name);
539 list_del(&btrn->node);
540 FOR_EACH_CHILD_SAFE(ch, tmp, btrn) {
541 PARA_INFO_LOG("parent(%s): %s\n", ch->name,
542 btrn->parent? btrn->parent->name : "NULL");
543 ch->parent = btrn->parent;
545 list_move(&ch->node, &btrn->parent->children);
547 assert(list_empty(&btrn->children));
551 * Return the size of the largest input queue.
553 * Iterates over all children of the given node.
555 size_t btr_bytes_pending(struct btr_node *btrn)
560 FOR_EACH_CHILD(ch, btrn) {
561 size_t size = btr_get_input_queue_size(ch);
562 max_size = PARA_MAX(max_size, size);
567 int btr_exec(struct btr_node *btrn, const char *command, char **value_result)
570 return -ERRNO_TO_PARA_ERROR(EINVAL);
572 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
573 return btrn->execute(btrn, command, value_result);
576 int btr_exec_up(struct btr_node *btrn, const char *command, char **value_result)
580 for (; btrn; btrn = btrn->parent) {
581 struct btr_node *parent = btrn->parent;
583 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
584 if (!parent->execute)
586 PARA_INFO_LOG("parent: %s, cmd: %s\n", parent->name, command);
587 ret = parent->execute(parent, command, value_result);
588 if (ret == -ERRNO_TO_PARA_ERROR(ENOTSUP))
592 if (value_result && *value_result)
593 PARA_NOTICE_LOG("%s(%s): %s\n", command, parent->name,
597 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
600 void *btr_context(struct btr_node *btrn)
602 return btrn->context;
605 static bool need_buffer_pool_merge(struct btr_node *btrn)
607 struct btr_buffer_reference *br = get_first_input_br(btrn);
611 if (br->wrap_count != 0)
618 static void merge_input_pool(struct btr_node *btrn, size_t dest_size)
620 struct btr_buffer_reference *br, *wbr = NULL;
621 int num_refs; /* including wrap buffer */
622 char *buf, *buf1 = NULL, *buf2 = NULL;
623 size_t sz, sz1 = 0, sz2 = 0, wsz;
625 br = get_first_input_br(btrn);
626 if (!br || br_available_bytes(br) >= dest_size)
629 FOR_EACH_BUFFER_REF(br, btrn) {
631 sz = btr_get_buffer_by_reference(br, &buf);
634 if (br->wrap_count != 0) {
636 assert(num_refs == 1);
647 if (buf1 + sz1 == buf) {
656 assert(buf2 + sz2 == buf);
659 if (sz1 + sz2 >= dest_size)
662 if (!buf2) /* nothing to do */
664 assert(buf1 && sz2 > 0);
666 * If the second buffer is large, we only take the first part of it to
667 * avoid having to memcpy() huge buffers.
669 sz2 = PARA_MIN(sz2, (size_t)(64 * 1024));
671 /* Make a new wrap buffer combining buf1 and buf2. */
673 buf = para_malloc(sz);
674 PARA_DEBUG_LOG("merging input buffers: (%p:%zu, %p:%zu) -> %p:%zu\n",
675 buf1, sz1, buf2, sz2, buf, sz);
676 memcpy(buf, buf1, sz1);
677 memcpy(buf + sz1, buf2, sz2);
678 br = para_calloc(sizeof(*br));
679 br->btrb = new_btrb(buf, sz);
680 br->btrb->refcount = 1;
682 /* This is a wrap buffer */
683 br->wrap_count = sz1;
684 para_list_add(&br->node, &btrn->input_queue);
687 PARA_DEBUG_LOG("increasing wrap buffer, sz1: %zu, sz2: %zu\n", sz1, sz2);
689 * We already have a wrap buffer, but it is too small. It might be
692 wsz = br_available_bytes(wbr);
693 if (wbr->wrap_count == sz1 && wbr->btrb->size >= sz1 + sz2) /* nothing we can do about it */
695 sz = sz1 + sz2 - wbr->btrb->size; /* amount of new data */
696 wbr->btrb->size += sz;
697 wbr->btrb->buf = para_realloc(wbr->btrb->buf, wbr->btrb->size);
698 /* copy the new data to the end of the reallocated buffer */
700 memcpy(wbr->btrb->buf + wbr->btrb->size - sz, buf2 + sz2 - sz, sz);
704 * Merge the first two input buffers into one.
706 * This is a quite expensive operation.
708 * \return The number of buffers that have been available (zero, one or two).
710 static int merge_input(struct btr_node *btrn)
712 struct btr_buffer_reference *brs[2], *br;
717 if (list_empty(&btrn->input_queue))
719 if (list_is_singular(&btrn->input_queue))
722 /* get references to the first two buffers */
723 FOR_EACH_BUFFER_REF(br, btrn) {
725 szs[i] = btr_get_buffer_by_reference(brs[i], bufs + i);
730 /* make a new btrb that combines the two buffers and a br to it. */
731 sz = szs[0] + szs[1];
732 buf = para_malloc(sz);
733 PARA_DEBUG_LOG("%s: memory merging input buffers: (%zu, %zu) -> %zu\n",
734 btrn->name, szs[0], szs[1], sz);
735 memcpy(buf, bufs[0], szs[0]);
736 memcpy(buf + szs[0], bufs[1], szs[1]);
738 br = para_calloc(sizeof(*br));
739 br->btrb = new_btrb(buf, sz);
740 br->btrb->refcount = 1;
742 /* replace the first two refs by the new one */
743 btr_drop_buffer_reference(brs[0]);
744 btr_drop_buffer_reference(brs[1]);
745 para_list_add(&br->node, &btrn->input_queue);
749 void btr_merge(struct btr_node *btrn, size_t dest_size)
751 if (need_buffer_pool_merge(btrn))
752 return merge_input_pool(btrn, dest_size);
755 size_t len = btr_next_buffer(btrn, &buf);
756 if (len >= dest_size)
758 PARA_DEBUG_LOG("input size = %zu < %zu = dest\n", len, dest_size);
759 if (merge_input(btrn) < 2)
764 bool btr_eof(struct btr_node *btrn)
767 size_t len = btr_next_buffer(btrn, &buf);
769 return (len == 0 && btr_no_parent(btrn));
772 void log_tree_recursively(struct btr_node *btrn, int loglevel, int depth)
775 const char spaces[] = " ", *space = spaces + 16 - depth;
779 para_log(loglevel, "%s%s\n", space, btrn->name);
780 FOR_EACH_CHILD(ch, btrn)
781 log_tree_recursively(ch, loglevel, depth + 1);
784 void btr_log_tree(struct btr_node *btrn, int loglevel)
786 return log_tree_recursively(btrn, loglevel, 0);
790 * \return \a root if \a name is \p NULL.
792 struct btr_node *btr_search_node(const char *name, struct btr_node *root)
798 if (!strcmp(root->name, name))
800 FOR_EACH_CHILD(ch, root) {
801 struct btr_node *result = btr_search_node(name, ch);
808 /** 640K ought to be enough for everybody ;) */
809 #define BTRN_MAX_PENDING (640 * 1024)
811 int btr_node_status(struct btr_node *btrn, size_t min_iqs,
812 enum btr_node_type type)
817 if (type != BTR_NT_LEAF) {
818 if (btr_no_children(btrn))
819 return -E_BTR_NO_CHILD;
820 if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
823 if (type != BTR_NT_ROOT) {
826 iqs = btr_get_input_queue_size(btrn);
827 if (iqs == 0) /* we have a parent, because not eof */
829 if (iqs < min_iqs && !btr_no_parent(btrn))
835 void btr_get_node_start(struct btr_node *btrn, struct timeval *tv)