7 #include "buffer_tree.h"
19 enum btr_buffer_flags {
20 /* changes the way the buffer is deallocated */
27 /** The number of references to this buffer. */
29 struct btr_pool *pool;
32 struct btr_buffer_reference {
33 struct btr_buffer *btrb;
35 /* Each buffer reference belongs to the buffer queue list of some buffer tree node. */
36 struct list_head node;
42 struct btr_node *parent;
43 /* The position of this btr node in the buffer tree. */
44 struct list_head node;
45 /* The children nodes of this btr node are linked together in a list. */
46 struct list_head children;
47 /* Time of first data transfer. */
50 * The input queue is a list of references to btr buffers. Each item on
51 * the list represents an input buffer which has not been completely
52 * used by this btr node.
54 struct list_head input_queue;
55 btr_command_handler execute;
59 struct btr_pool *btr_pool_new(const char *name, size_t area_size)
61 struct btr_pool *btrp;
63 PARA_INFO_LOG("%s, %zu bytes\n", name, area_size);
64 btrp = para_malloc(sizeof(*btrp));
65 btrp->area_start = para_malloc(area_size);
66 btrp->area_end = btrp->area_start + area_size;
67 btrp->rhead = btrp->area_start;
68 btrp->whead = btrp->area_start;
69 btrp->name = para_strdup(name);
73 /* whead = NULL means area full */
75 void btr_pool_free(struct btr_pool *btrp)
79 free(btrp->area_start);
84 size_t btr_pool_size(struct btr_pool *btrp)
86 return btrp->area_end - btrp->area_start;
89 size_t btr_pool_filled(struct btr_pool *btrp)
92 return btr_pool_size(btrp);
93 if (btrp->rhead <= btrp->whead)
94 return btrp->whead - btrp->rhead;
95 return btr_pool_size(btrp) - (btrp->rhead - btrp->whead);
98 size_t btr_pool_unused(struct btr_pool *btrp)
100 return btr_pool_size(btrp) - btr_pool_filled(btrp);
103 size_t btr_pool_available(struct btr_pool *btrp)
107 if (btrp->rhead <= btrp->whead)
108 return btrp->area_end - btrp->whead;
109 return btrp->rhead - btrp->whead;
112 size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result)
115 *result = btrp->whead;
116 return btr_pool_available(btrp);
119 void btr_pool_allocate(struct btr_pool *btrp, size_t size)
125 //PARA_CRIT_LOG("filled: %zu, alloc %zu\n", btr_pool_filled(btrp), size);
126 assert(size <= btr_pool_available(btrp));
127 end = btrp->whead + size;
128 assert(end <= btrp->area_end);
130 if (end == btrp->area_end) {
131 PARA_DEBUG_LOG("%s: end of pool area reached\n", btrp->name);
132 end = btrp->area_start;
134 if (end == btrp->rhead) {
135 PARA_DEBUG_LOG("btrp buffer full\n");
136 end = NULL; /* buffer full */
139 //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
142 static void btr_pool_deallocate(struct btr_pool *btrp, size_t size)
144 char *end = btrp->rhead + size;
146 //PARA_CRIT_LOG("filled: %zu, dealloc %zu\n", btr_pool_filled(btrp), size);
149 assert(end <= btrp->area_end);
150 assert(size <= btr_pool_filled(btrp));
151 if (end == btrp->area_end)
152 end = btrp->area_start;
154 btrp->whead = btrp->rhead;
156 if (btrp->rhead == btrp->whead)
157 btrp->rhead = btrp->whead = btrp->area_start;
158 //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
161 #define FOR_EACH_CHILD(_tn, _btrn) list_for_each_entry((_tn), \
162 &((_btrn)->children), node)
163 #define FOR_EACH_CHILD_SAFE(_tn, _tmp, _btrn) \
164 list_for_each_entry_safe((_tn), (_tmp), &((_btrn)->children), node)
166 #define FOR_EACH_BUFFER_REF(_br, _btrn) \
167 list_for_each_entry((_br), &(_btrn)->input_queue, node)
168 #define FOR_EACH_BUFFER_REF_SAFE(_br, _tmp, _btrn) \
169 list_for_each_entry_safe((_br), (_tmp), &(_btrn)->input_queue, node)
173 (NULL, NULL): new, isolated node.
174 (NULL, c): new node becomes root, c must be old root
175 (p, NULL): new leaf node
176 (p, c): new internal node, ch must be child of p
179 struct btr_node *btr_new_node(struct btr_node_description *bnd)
181 struct btr_node *btrn = para_malloc(sizeof(*btrn));
183 btrn->name = para_strdup(bnd->name);
184 btrn->parent = bnd->parent;
185 btrn->execute = bnd->handler;
186 btrn->context = bnd->context;
187 btrn->start.tv_sec = 0;
188 btrn->start.tv_usec = 0;
190 list_add_tail(&btrn->node, &bnd->parent->children);
191 INIT_LIST_HEAD(&btrn->children);
192 INIT_LIST_HEAD(&btrn->input_queue);
194 PARA_INFO_LOG("added %s as child of %s\n", bnd->name, bnd->parent->name);
196 PARA_INFO_LOG("added %s as btr root\n", bnd->name);
201 * Allocate a new btr buffer.
203 * The freshly allocated buffer will have a zero refcount and will
204 * not be associated with a btr pool.
206 static struct btr_buffer *new_btrb(char *buf, size_t size)
208 struct btr_buffer *btrb = para_calloc(sizeof(*btrb));
215 static void dealloc_buffer(struct btr_buffer *btrb)
218 btr_pool_deallocate(btrb->pool, btrb->size);
223 static struct btr_buffer_reference *get_first_input_br(struct btr_node *btrn)
225 if (list_empty(&btrn->input_queue))
227 return list_first_entry(&btrn->input_queue,
228 struct btr_buffer_reference, node);
232 * Deallocate the reference, release the resources if refcount drops to zero.
234 static void btr_drop_buffer_reference(struct btr_buffer_reference *br)
236 struct btr_buffer *btrb = br->btrb;
238 //PARA_CRIT_LOG("dropping buffer reference %p\n", br);
242 if (btrb->refcount == 0) {
243 dealloc_buffer(btrb);
248 static void add_btrb_to_children(struct btr_buffer *btrb,
249 struct btr_node *btrn, size_t consumed)
253 if (btrn->start.tv_sec == 0)
255 FOR_EACH_CHILD(ch, btrn) {
256 struct btr_buffer_reference *br = para_calloc(sizeof(*br));
258 br->consumed = consumed;
259 list_add_tail(&br->node, &ch->input_queue);
261 if (ch->start.tv_sec == 0)
266 void btr_add_output(char *buf, size_t size, struct btr_node *btrn)
268 struct btr_buffer *btrb;
271 if (list_empty(&btrn->children)) {
275 btrb = new_btrb(buf, size);
276 add_btrb_to_children(btrb, btrn, 0);
279 void btr_add_output_pool(struct btr_pool *btrp, size_t size,
280 struct btr_node *btrn)
282 struct btr_buffer *btrb;
287 if (list_empty(&btrn->children))
289 avail = btr_pool_get_buffer(btrp, &buf);
290 assert(avail >= size);
291 btr_pool_allocate(btrp, size);
292 btrb = new_btrb(buf, size);
294 add_btrb_to_children(btrb, btrn, 0);
297 void btr_copy(const void *src, size_t n, struct btr_pool *btrp,
298 struct btr_node *btrn)
305 assert(n <= btr_pool_unused(btrp));
306 sz = btr_pool_get_buffer(btrp, &buf);
307 copy = PARA_MIN(sz, n);
308 memcpy(buf, src, copy);
309 btr_add_output_pool(btrp, copy, btrn);
312 sz = btr_pool_get_buffer(btrp, &buf);
313 assert(sz >= n - copy);
314 memcpy(buf, src + copy, n - copy);
315 btr_add_output_pool(btrp, n - copy, btrn);
318 static void btr_pushdown_br(struct btr_buffer_reference *br, struct btr_node *btrn)
320 add_btrb_to_children(br->btrb, btrn, br->consumed);
321 btr_drop_buffer_reference(br);
324 void btr_pushdown(struct btr_node *btrn)
326 struct btr_buffer_reference *br, *tmp;
328 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
329 btr_pushdown_br(br, btrn);
332 int btr_pushdown_one(struct btr_node *btrn)
334 struct btr_buffer_reference *br;
336 if (list_empty(&btrn->input_queue))
338 br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
339 btr_pushdown_br(br, btrn);
343 /* Return true if this node has no children. */
344 bool btr_no_children(struct btr_node *btrn)
346 return list_empty(&btrn->children);
349 bool btr_no_parent(struct btr_node *btrn)
351 return !btrn->parent;
354 bool btr_inplace_ok(struct btr_node *btrn)
358 return list_is_singular(&btrn->parent->children);
361 static inline size_t br_available_bytes(struct btr_buffer_reference *br)
363 return br->btrb->size - br->consumed;
366 size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
369 *buf = br->btrb->buf + br->consumed;
370 return br_available_bytes(br);
374 * \return zero if the input buffer queue is empty.
376 size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
378 struct btr_buffer_reference *br;
379 char *buf, *result = NULL;
382 FOR_EACH_BUFFER_REF(br, btrn) {
383 sz = btr_get_buffer_by_reference(br, &buf);
393 if (result + rv != buf) {
394 PARA_DEBUG_LOG("%s: pool merge impossible: %p != %p\n",
395 btrn->name, result + rv, buf);
398 // PARA_CRIT_LOG("%s: inplace merge (%zu, %zu)->%zu\n", btrn->name,
400 // PARA_CRIT_LOG("%s: inplace merge %p (%zu)\n", btrn->name,
409 void btr_consume(struct btr_node *btrn, size_t numbytes)
411 struct btr_buffer_reference *br, *tmp;
416 br = get_first_input_br(btrn);
419 //PARA_CRIT_LOG("wrap count: %zu\n", br->wrap_count);
420 if (br->wrap_count == 0) {
422 * No wrap buffer. Drop buffer references whose buffer
423 * has been fully used. */
424 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn) {
425 if (br->consumed + numbytes <= br->btrb->size) {
426 br->consumed += numbytes;
427 if (br->consumed == br->btrb->size)
428 btr_drop_buffer_reference(br);
431 numbytes -= br->btrb->size - br->consumed;
432 btr_drop_buffer_reference(br);
438 We have a wrap buffer, consume from it. If in total,
439 i.e. including previous calls to brt_consume(), less than
440 wrap_count has been consumed, there's nothing more we can do.
442 Otherwise we drop the wrap buffer and consume from subsequent
443 buffers of the input queue the correct amount of bytes. This
444 is the total number of bytes that have been consumed from the
447 PARA_DEBUG_LOG("consuming %zu/%zu bytes from wrap buffer\n", numbytes,
448 br_available_bytes(br));
450 assert(numbytes <= br_available_bytes(br));
451 if (br->consumed + numbytes < br->wrap_count) {
452 br->consumed += numbytes;
455 PARA_DEBUG_LOG("dropping wrap buffer (%zu bytes)\n", br->btrb->size);
456 /* get rid of the wrap buffer */
457 sz = br->consumed + numbytes;
458 btr_drop_buffer_reference(br);
459 return btr_consume(btrn, sz);
462 static void flush_input_queue(struct btr_node *btrn)
464 struct btr_buffer_reference *br, *tmp;
465 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
466 btr_drop_buffer_reference(br);
469 void btr_free_node(struct btr_node *btrn)
477 void btr_remove_node(struct btr_node *btrn)
483 PARA_NOTICE_LOG("removing btr node %s from buffer tree\n", btrn->name);
484 FOR_EACH_CHILD(ch, btrn)
486 flush_input_queue(btrn);
488 list_del(&btrn->node);
491 size_t btr_get_input_queue_size(struct btr_node *btrn)
493 struct btr_buffer_reference *br;
496 FOR_EACH_BUFFER_REF(br, btrn) {
497 //PARA_CRIT_LOG("size: %zu\n", size);
498 size += br_available_bytes(br);
503 void btr_splice_out_node(struct btr_node *btrn)
505 struct btr_node *ch, *tmp;
508 PARA_NOTICE_LOG("splicing out %s\n", btrn->name);
511 list_del(&btrn->node);
512 FOR_EACH_CHILD_SAFE(ch, tmp, btrn) {
513 PARA_INFO_LOG("parent(%s): %s\n", ch->name,
514 btrn->parent? btrn->parent->name : "NULL");
515 ch->parent = btrn->parent;
517 list_move(&ch->node, &btrn->parent->children);
519 assert(list_empty(&btrn->children));
523 * Return the size of the largest input queue.
525 * Iterates over all children of the given node.
527 size_t btr_bytes_pending(struct btr_node *btrn)
532 FOR_EACH_CHILD(ch, btrn) {
533 size_t size = btr_get_input_queue_size(ch);
534 max_size = PARA_MAX(max_size, size);
539 int btr_exec(struct btr_node *btrn, const char *command, char **value_result)
542 return -ERRNO_TO_PARA_ERROR(EINVAL);
544 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
545 return btrn->execute(btrn, command, value_result);
548 int btr_exec_up(struct btr_node *btrn, const char *command, char **value_result)
552 for (; btrn; btrn = btrn->parent) {
553 struct btr_node *parent = btrn->parent;
555 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
556 if (!parent->execute)
558 PARA_INFO_LOG("parent: %s, cmd: %s\n", parent->name, command);
559 ret = parent->execute(parent, command, value_result);
560 if (ret == -ERRNO_TO_PARA_ERROR(ENOTSUP))
564 if (value_result && *value_result)
565 PARA_NOTICE_LOG("%s(%s): %s\n", command, parent->name,
569 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
572 void *btr_context(struct btr_node *btrn)
574 return btrn->context;
577 static bool need_buffer_pool_merge(struct btr_node *btrn)
579 struct btr_buffer_reference *br = get_first_input_br(btrn);
583 if (br->wrap_count != 0)
590 static void merge_input_pool(struct btr_node *btrn, size_t dest_size)
592 struct btr_buffer_reference *br, *wbr = NULL;
593 int num_refs; /* including wrap buffer */
594 char *buf, *buf1 = NULL, *buf2 = NULL;
595 size_t sz, sz1 = 0, sz2 = 0, wsz;
597 br = get_first_input_br(btrn);
598 if (!br || br_available_bytes(br) >= dest_size)
601 FOR_EACH_BUFFER_REF(br, btrn) {
603 sz = btr_get_buffer_by_reference(br, &buf);
606 if (br->wrap_count != 0) {
608 assert(num_refs == 1);
619 if (buf1 + sz1 == buf) {
628 assert(buf2 + sz2 == buf);
631 if (sz1 + sz2 >= dest_size)
634 if (!buf2) /* nothing to do */
636 assert(buf1 && sz2 > 0);
638 * If the second buffer is large, we only take the first part of it to
639 * avoid having to memcpy() huge buffers.
641 sz2 = PARA_MIN(sz2, (size_t)(64 * 1024));
643 /* Make a new wrap buffer combining buf1 and buf2. */
645 buf = para_malloc(sz);
646 PARA_DEBUG_LOG("merging input buffers: (%p:%zu, %p:%zu) -> %p:%zu\n",
647 buf1, sz1, buf2, sz2, buf, sz);
648 memcpy(buf, buf1, sz1);
649 memcpy(buf + sz1, buf2, sz2);
650 br = para_calloc(sizeof(*br));
651 br->btrb = new_btrb(buf, sz);
652 br->btrb->refcount = 1;
654 /* This is a wrap buffer */
655 br->wrap_count = sz1;
656 para_list_add(&br->node, &btrn->input_queue);
659 PARA_DEBUG_LOG("increasing wrap buffer, sz1: %zu, sz2: %zu\n", sz1, sz2);
661 * We already have a wrap buffer, but it is too small. It might be
664 wsz = br_available_bytes(wbr);
665 if (wbr->wrap_count == sz1 && wbr->btrb->size >= sz1 + sz2) /* nothing we can do about it */
667 sz = sz1 + sz2 - wbr->btrb->size; /* amount of new data */
668 wbr->btrb->size += sz;
669 wbr->btrb->buf = para_realloc(wbr->btrb->buf, wbr->btrb->size);
670 /* copy the new data to the end of the reallocated buffer */
672 memcpy(wbr->btrb->buf + wbr->btrb->size - sz, buf2 + sz2 - sz, sz);
676 * Merge the first two input buffers into one.
678 * This is a quite expensive operation.
680 * \return The number of buffers that have been available (zero, one or two).
682 static int merge_input(struct btr_node *btrn)
684 struct btr_buffer_reference *brs[2], *br;
689 if (list_empty(&btrn->input_queue))
691 if (list_is_singular(&btrn->input_queue))
694 /* get references to the first two buffers */
695 FOR_EACH_BUFFER_REF(br, btrn) {
697 szs[i] = btr_get_buffer_by_reference(brs[i], bufs + i);
702 /* make a new btrb that combines the two buffers and a br to it. */
703 sz = szs[0] + szs[1];
704 buf = para_malloc(sz);
705 PARA_DEBUG_LOG("%s: memory merging input buffers: (%zu, %zu) -> %zu\n",
706 btrn->name, szs[0], szs[1], sz);
707 memcpy(buf, bufs[0], szs[0]);
708 memcpy(buf + szs[0], bufs[1], szs[1]);
710 br = para_calloc(sizeof(*br));
711 br->btrb = new_btrb(buf, sz);
712 br->btrb->refcount = 1;
714 /* replace the first two refs by the new one */
715 btr_drop_buffer_reference(brs[0]);
716 btr_drop_buffer_reference(brs[1]);
717 para_list_add(&br->node, &btrn->input_queue);
721 void btr_merge(struct btr_node *btrn, size_t dest_size)
723 if (need_buffer_pool_merge(btrn))
724 return merge_input_pool(btrn, dest_size);
727 size_t len = btr_next_buffer(btrn, &buf);
728 if (len >= dest_size)
730 PARA_DEBUG_LOG("input size = %zu < %zu = dest\n", len, dest_size);
731 if (merge_input(btrn) < 2)
736 bool btr_eof(struct btr_node *btrn)
739 size_t len = btr_next_buffer(btrn, &buf);
741 return (len == 0 && btr_no_parent(btrn));
744 void log_tree_recursively(struct btr_node *btrn, int loglevel, int depth)
747 const char spaces[] = " ", *space = spaces + 16 - depth;
751 para_log(loglevel, "%s%s\n", space, btrn->name);
752 FOR_EACH_CHILD(ch, btrn)
753 log_tree_recursively(ch, loglevel, depth + 1);
756 void btr_log_tree(struct btr_node *btrn, int loglevel)
758 return log_tree_recursively(btrn, loglevel, 0);
762 * \return \a root if \a name is \p NULL.
764 struct btr_node *btr_search_node(const char *name, struct btr_node *root)
770 if (!strcmp(root->name, name))
772 FOR_EACH_CHILD(ch, root) {
773 struct btr_node *result = btr_search_node(name, ch);
780 /** 640K ought to be enough for everybody ;) */
781 #define BTRN_MAX_PENDING (640 * 1024)
783 int btr_node_status(struct btr_node *btrn, size_t min_iqs,
784 enum btr_node_type type)
790 if (type != BTR_NT_LEAF) {
791 if (btr_no_children(btrn))
792 return -E_BTR_NO_CHILD;
793 if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
796 if (type != BTR_NT_ROOT) {
799 iqs = btr_get_input_queue_size(btrn);
800 if (iqs == 0) /* we have a parent, because not eof */
802 if (iqs < min_iqs && !btr_no_parent(btrn))
808 void btr_get_node_start(struct btr_node *btrn, struct timeval *tv)