7 #include "buffer_tree.h"
19 enum btr_buffer_flags {
20 /* changes the way the buffer is deallocated */
27 /** The number of references to this buffer. */
29 struct btr_pool *pool;
32 struct btr_buffer_reference {
33 struct btr_buffer *btrb;
35 /* Each buffer reference belongs to the buffer queue list of some buffer tree node. */
36 struct list_head node;
42 struct btr_node *parent;
43 /* The position of this btr node in the buffer tree. */
44 struct list_head node;
45 /* The children nodes of this btr node are linked together in a list. */
46 struct list_head children;
47 /* Time of first data transfer. */
50 * The input queue is a list of references to btr buffers. Each item on
51 * the list represents an input buffer which has not been completely
52 * used by this btr node.
54 struct list_head input_queue;
55 btr_command_handler execute;
59 struct btr_pool *btr_pool_new(const char *name, size_t area_size)
61 struct btr_pool *btrp;
63 PARA_INFO_LOG("%s, %zu bytes\n", name, area_size);
64 btrp = para_malloc(sizeof(*btrp));
65 btrp->area_start = para_malloc(area_size);
66 btrp->area_end = btrp->area_start + area_size;
67 btrp->rhead = btrp->area_start;
68 btrp->whead = btrp->area_start;
69 btrp->name = para_strdup(name);
73 /* whead = NULL means area full */
75 void btr_pool_free(struct btr_pool *btrp)
79 free(btrp->area_start);
84 size_t btr_pool_size(struct btr_pool *btrp)
86 return btrp->area_end - btrp->area_start;
89 size_t btr_pool_filled(struct btr_pool *btrp)
92 return btr_pool_size(btrp);
93 if (btrp->rhead <= btrp->whead)
94 return btrp->whead - btrp->rhead;
95 return btr_pool_size(btrp) - (btrp->rhead - btrp->whead);
98 size_t btr_pool_unused(struct btr_pool *btrp)
100 return btr_pool_size(btrp) - btr_pool_filled(btrp);
104 * Return maximal size available for one read. This is
105 * smaller than the value returned by btr_pool_unused().
107 size_t btr_pool_available(struct btr_pool *btrp)
111 if (btrp->rhead <= btrp->whead)
112 return btrp->area_end - btrp->whead;
113 return btrp->rhead - btrp->whead;
116 size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result)
119 *result = btrp->whead;
120 return btr_pool_available(btrp);
123 void btr_pool_allocate(struct btr_pool *btrp, size_t size)
129 //PARA_CRIT_LOG("filled: %zu, alloc %zu\n", btr_pool_filled(btrp), size);
130 assert(size <= btr_pool_available(btrp));
131 end = btrp->whead + size;
132 assert(end <= btrp->area_end);
134 if (end == btrp->area_end) {
135 PARA_DEBUG_LOG("%s: end of pool area reached\n", btrp->name);
136 end = btrp->area_start;
138 if (end == btrp->rhead) {
139 PARA_DEBUG_LOG("btrp buffer full\n");
140 end = NULL; /* buffer full */
143 //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
146 static void btr_pool_deallocate(struct btr_pool *btrp, size_t size)
148 char *end = btrp->rhead + size;
150 //PARA_CRIT_LOG("filled: %zu, dealloc %zu\n", btr_pool_filled(btrp), size);
153 assert(end <= btrp->area_end);
154 assert(size <= btr_pool_filled(btrp));
155 if (end == btrp->area_end)
156 end = btrp->area_start;
158 btrp->whead = btrp->rhead;
160 if (btrp->rhead == btrp->whead)
161 btrp->rhead = btrp->whead = btrp->area_start;
162 //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
165 #define FOR_EACH_CHILD(_tn, _btrn) list_for_each_entry((_tn), \
166 &((_btrn)->children), node)
167 #define FOR_EACH_CHILD_SAFE(_tn, _tmp, _btrn) \
168 list_for_each_entry_safe((_tn), (_tmp), &((_btrn)->children), node)
170 #define FOR_EACH_BUFFER_REF(_br, _btrn) \
171 list_for_each_entry((_br), &(_btrn)->input_queue, node)
172 #define FOR_EACH_BUFFER_REF_SAFE(_br, _tmp, _btrn) \
173 list_for_each_entry_safe((_br), (_tmp), &(_btrn)->input_queue, node)
177 (NULL, NULL): new, isolated node.
178 (NULL, c): new node becomes root, c must be old root
179 (p, NULL): new leaf node
180 (p, c): new internal node, ch must be child of p
183 struct btr_node *btr_new_node(struct btr_node_description *bnd)
185 struct btr_node *btrn = para_malloc(sizeof(*btrn));
187 btrn->name = para_strdup(bnd->name);
188 btrn->parent = bnd->parent;
189 btrn->execute = bnd->handler;
190 btrn->context = bnd->context;
191 btrn->start.tv_sec = 0;
192 btrn->start.tv_usec = 0;
194 list_add_tail(&btrn->node, &bnd->parent->children);
195 INIT_LIST_HEAD(&btrn->children);
196 INIT_LIST_HEAD(&btrn->input_queue);
198 PARA_INFO_LOG("added %s as child of %s\n", bnd->name, bnd->parent->name);
200 PARA_INFO_LOG("added %s as btr root\n", bnd->name);
205 * Allocate a new btr buffer.
207 * The freshly allocated buffer will have a zero refcount and will
208 * not be associated with a btr pool.
210 static struct btr_buffer *new_btrb(char *buf, size_t size)
212 struct btr_buffer *btrb = para_calloc(sizeof(*btrb));
219 static void dealloc_buffer(struct btr_buffer *btrb)
222 btr_pool_deallocate(btrb->pool, btrb->size);
227 static struct btr_buffer_reference *get_first_input_br(struct btr_node *btrn)
229 if (list_empty(&btrn->input_queue))
231 return list_first_entry(&btrn->input_queue,
232 struct btr_buffer_reference, node);
236 * Deallocate the reference, release the resources if refcount drops to zero.
238 static void btr_drop_buffer_reference(struct btr_buffer_reference *br)
240 struct btr_buffer *btrb = br->btrb;
242 //PARA_CRIT_LOG("dropping buffer reference %p\n", br);
246 if (btrb->refcount == 0) {
247 dealloc_buffer(btrb);
252 static void add_btrb_to_children(struct btr_buffer *btrb,
253 struct btr_node *btrn, size_t consumed)
257 if (btrn->start.tv_sec == 0)
259 FOR_EACH_CHILD(ch, btrn) {
260 struct btr_buffer_reference *br = para_calloc(sizeof(*br));
262 br->consumed = consumed;
263 list_add_tail(&br->node, &ch->input_queue);
265 if (ch->start.tv_sec == 0)
270 void btr_add_output(char *buf, size_t size, struct btr_node *btrn)
272 struct btr_buffer *btrb;
275 if (list_empty(&btrn->children)) {
279 btrb = new_btrb(buf, size);
280 add_btrb_to_children(btrb, btrn, 0);
283 void btr_add_output_pool(struct btr_pool *btrp, size_t size,
284 struct btr_node *btrn)
286 struct btr_buffer *btrb;
291 if (list_empty(&btrn->children))
293 avail = btr_pool_get_buffer(btrp, &buf);
294 assert(avail >= size);
295 btr_pool_allocate(btrp, size);
296 btrb = new_btrb(buf, size);
298 add_btrb_to_children(btrb, btrn, 0);
301 void btr_copy(const void *src, size_t n, struct btr_pool *btrp,
302 struct btr_node *btrn)
309 assert(n <= btr_pool_unused(btrp));
310 sz = btr_pool_get_buffer(btrp, &buf);
311 copy = PARA_MIN(sz, n);
312 memcpy(buf, src, copy);
313 btr_add_output_pool(btrp, copy, btrn);
316 sz = btr_pool_get_buffer(btrp, &buf);
317 assert(sz >= n - copy);
318 memcpy(buf, src + copy, n - copy);
319 btr_add_output_pool(btrp, n - copy, btrn);
322 static void btr_pushdown_br(struct btr_buffer_reference *br, struct btr_node *btrn)
324 add_btrb_to_children(br->btrb, btrn, br->consumed);
325 btr_drop_buffer_reference(br);
328 void btr_pushdown(struct btr_node *btrn)
330 struct btr_buffer_reference *br, *tmp;
332 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
333 btr_pushdown_br(br, btrn);
336 int btr_pushdown_one(struct btr_node *btrn)
338 struct btr_buffer_reference *br;
340 if (list_empty(&btrn->input_queue))
342 br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
343 btr_pushdown_br(br, btrn);
347 /* Return true if this node has no children. */
348 bool btr_no_children(struct btr_node *btrn)
350 return list_empty(&btrn->children);
353 bool btr_no_parent(struct btr_node *btrn)
355 return !btrn->parent;
358 bool btr_inplace_ok(struct btr_node *btrn)
362 return list_is_singular(&btrn->parent->children);
365 static inline size_t br_available_bytes(struct btr_buffer_reference *br)
367 return br->btrb->size - br->consumed;
370 size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
373 *buf = br->btrb->buf + br->consumed;
374 return br_available_bytes(br);
378 * \return zero if the input buffer queue is empty.
380 size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
382 struct btr_buffer_reference *br;
383 char *buf, *result = NULL;
386 FOR_EACH_BUFFER_REF(br, btrn) {
387 sz = btr_get_buffer_by_reference(br, &buf);
397 if (result + rv != buf) {
398 PARA_DEBUG_LOG("%s: pool merge impossible: %p != %p\n",
399 btrn->name, result + rv, buf);
402 // PARA_CRIT_LOG("%s: inplace merge (%zu, %zu)->%zu\n", btrn->name,
404 // PARA_CRIT_LOG("%s: inplace merge %p (%zu)\n", btrn->name,
413 void btr_consume(struct btr_node *btrn, size_t numbytes)
415 struct btr_buffer_reference *br, *tmp;
420 br = get_first_input_br(btrn);
423 //PARA_CRIT_LOG("wrap count: %zu\n", br->wrap_count);
424 if (br->wrap_count == 0) {
426 * No wrap buffer. Drop buffer references whose buffer
427 * has been fully used. */
428 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn) {
429 if (br->consumed + numbytes <= br->btrb->size) {
430 br->consumed += numbytes;
431 if (br->consumed == br->btrb->size)
432 btr_drop_buffer_reference(br);
435 numbytes -= br->btrb->size - br->consumed;
436 btr_drop_buffer_reference(br);
442 We have a wrap buffer, consume from it. If in total,
443 i.e. including previous calls to brt_consume(), less than
444 wrap_count has been consumed, there's nothing more we can do.
446 Otherwise we drop the wrap buffer and consume from subsequent
447 buffers of the input queue the correct amount of bytes. This
448 is the total number of bytes that have been consumed from the
451 PARA_DEBUG_LOG("consuming %zu/%zu bytes from wrap buffer\n", numbytes,
452 br_available_bytes(br));
454 assert(numbytes <= br_available_bytes(br));
455 if (br->consumed + numbytes < br->wrap_count) {
456 br->consumed += numbytes;
459 PARA_DEBUG_LOG("dropping wrap buffer (%zu bytes)\n", br->btrb->size);
460 /* get rid of the wrap buffer */
461 sz = br->consumed + numbytes;
462 btr_drop_buffer_reference(br);
463 return btr_consume(btrn, sz);
466 static void flush_input_queue(struct btr_node *btrn)
468 struct btr_buffer_reference *br, *tmp;
469 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
470 btr_drop_buffer_reference(br);
473 void btr_free_node(struct btr_node *btrn)
481 void btr_remove_node(struct btr_node *btrn)
487 PARA_NOTICE_LOG("removing btr node %s from buffer tree\n", btrn->name);
488 FOR_EACH_CHILD(ch, btrn)
490 flush_input_queue(btrn);
492 list_del(&btrn->node);
495 size_t btr_get_input_queue_size(struct btr_node *btrn)
497 struct btr_buffer_reference *br;
498 size_t size = 0, wrap_consumed = 0;
500 FOR_EACH_BUFFER_REF(br, btrn) {
501 if (br->wrap_count != 0) {
502 wrap_consumed = br->consumed;
505 size += br_available_bytes(br);
507 assert(wrap_consumed <= size);
508 size -= wrap_consumed;
512 void btr_splice_out_node(struct btr_node *btrn)
514 struct btr_node *ch, *tmp;
517 PARA_NOTICE_LOG("splicing out %s\n", btrn->name);
520 list_del(&btrn->node);
521 FOR_EACH_CHILD_SAFE(ch, tmp, btrn) {
522 PARA_INFO_LOG("parent(%s): %s\n", ch->name,
523 btrn->parent? btrn->parent->name : "NULL");
524 ch->parent = btrn->parent;
526 list_move(&ch->node, &btrn->parent->children);
528 assert(list_empty(&btrn->children));
532 * Return the size of the largest input queue.
534 * Iterates over all children of the given node.
536 size_t btr_bytes_pending(struct btr_node *btrn)
541 FOR_EACH_CHILD(ch, btrn) {
542 size_t size = btr_get_input_queue_size(ch);
543 max_size = PARA_MAX(max_size, size);
548 int btr_exec(struct btr_node *btrn, const char *command, char **value_result)
551 return -ERRNO_TO_PARA_ERROR(EINVAL);
553 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
554 return btrn->execute(btrn, command, value_result);
557 int btr_exec_up(struct btr_node *btrn, const char *command, char **value_result)
561 for (; btrn; btrn = btrn->parent) {
562 struct btr_node *parent = btrn->parent;
564 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
565 if (!parent->execute)
567 PARA_INFO_LOG("parent: %s, cmd: %s\n", parent->name, command);
568 ret = parent->execute(parent, command, value_result);
569 if (ret == -ERRNO_TO_PARA_ERROR(ENOTSUP))
573 if (value_result && *value_result)
574 PARA_NOTICE_LOG("%s(%s): %s\n", command, parent->name,
578 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
581 void *btr_context(struct btr_node *btrn)
583 return btrn->context;
586 static bool need_buffer_pool_merge(struct btr_node *btrn)
588 struct btr_buffer_reference *br = get_first_input_br(btrn);
592 if (br->wrap_count != 0)
599 static void merge_input_pool(struct btr_node *btrn, size_t dest_size)
601 struct btr_buffer_reference *br, *wbr = NULL;
602 int num_refs; /* including wrap buffer */
603 char *buf, *buf1 = NULL, *buf2 = NULL;
604 size_t sz, sz1 = 0, sz2 = 0, wsz;
606 br = get_first_input_br(btrn);
607 if (!br || br_available_bytes(br) >= dest_size)
610 FOR_EACH_BUFFER_REF(br, btrn) {
612 sz = btr_get_buffer_by_reference(br, &buf);
615 if (br->wrap_count != 0) {
617 assert(num_refs == 1);
628 if (buf1 + sz1 == buf) {
637 assert(buf2 + sz2 == buf);
640 if (sz1 + sz2 >= dest_size)
643 if (!buf2) /* nothing to do */
645 assert(buf1 && sz2 > 0);
647 * If the second buffer is large, we only take the first part of it to
648 * avoid having to memcpy() huge buffers.
650 sz2 = PARA_MIN(sz2, (size_t)(64 * 1024));
652 /* Make a new wrap buffer combining buf1 and buf2. */
654 buf = para_malloc(sz);
655 PARA_DEBUG_LOG("merging input buffers: (%p:%zu, %p:%zu) -> %p:%zu\n",
656 buf1, sz1, buf2, sz2, buf, sz);
657 memcpy(buf, buf1, sz1);
658 memcpy(buf + sz1, buf2, sz2);
659 br = para_calloc(sizeof(*br));
660 br->btrb = new_btrb(buf, sz);
661 br->btrb->refcount = 1;
663 /* This is a wrap buffer */
664 br->wrap_count = sz1;
665 para_list_add(&br->node, &btrn->input_queue);
668 PARA_DEBUG_LOG("increasing wrap buffer, sz1: %zu, sz2: %zu\n", sz1, sz2);
670 * We already have a wrap buffer, but it is too small. It might be
673 wsz = br_available_bytes(wbr);
674 if (wbr->wrap_count == sz1 && wbr->btrb->size >= sz1 + sz2) /* nothing we can do about it */
676 sz = sz1 + sz2 - wbr->btrb->size; /* amount of new data */
677 wbr->btrb->size += sz;
678 wbr->btrb->buf = para_realloc(wbr->btrb->buf, wbr->btrb->size);
679 /* copy the new data to the end of the reallocated buffer */
681 memcpy(wbr->btrb->buf + wbr->btrb->size - sz, buf2 + sz2 - sz, sz);
685 * Merge the first two input buffers into one.
687 * This is a quite expensive operation.
689 * \return The number of buffers that have been available (zero, one or two).
691 static int merge_input(struct btr_node *btrn)
693 struct btr_buffer_reference *brs[2], *br;
698 if (list_empty(&btrn->input_queue))
700 if (list_is_singular(&btrn->input_queue))
703 /* get references to the first two buffers */
704 FOR_EACH_BUFFER_REF(br, btrn) {
706 szs[i] = btr_get_buffer_by_reference(brs[i], bufs + i);
711 /* make a new btrb that combines the two buffers and a br to it. */
712 sz = szs[0] + szs[1];
713 buf = para_malloc(sz);
714 PARA_DEBUG_LOG("%s: memory merging input buffers: (%zu, %zu) -> %zu\n",
715 btrn->name, szs[0], szs[1], sz);
716 memcpy(buf, bufs[0], szs[0]);
717 memcpy(buf + szs[0], bufs[1], szs[1]);
719 br = para_calloc(sizeof(*br));
720 br->btrb = new_btrb(buf, sz);
721 br->btrb->refcount = 1;
723 /* replace the first two refs by the new one */
724 btr_drop_buffer_reference(brs[0]);
725 btr_drop_buffer_reference(brs[1]);
726 para_list_add(&br->node, &btrn->input_queue);
730 void btr_merge(struct btr_node *btrn, size_t dest_size)
732 if (need_buffer_pool_merge(btrn))
733 return merge_input_pool(btrn, dest_size);
736 size_t len = btr_next_buffer(btrn, &buf);
737 if (len >= dest_size)
739 PARA_DEBUG_LOG("input size = %zu < %zu = dest\n", len, dest_size);
740 if (merge_input(btrn) < 2)
745 bool btr_eof(struct btr_node *btrn)
748 size_t len = btr_next_buffer(btrn, &buf);
750 return (len == 0 && btr_no_parent(btrn));
753 void log_tree_recursively(struct btr_node *btrn, int loglevel, int depth)
756 const char spaces[] = " ", *space = spaces + 16 - depth;
760 para_log(loglevel, "%s%s\n", space, btrn->name);
761 FOR_EACH_CHILD(ch, btrn)
762 log_tree_recursively(ch, loglevel, depth + 1);
765 void btr_log_tree(struct btr_node *btrn, int loglevel)
767 return log_tree_recursively(btrn, loglevel, 0);
771 * \return \a root if \a name is \p NULL.
773 struct btr_node *btr_search_node(const char *name, struct btr_node *root)
779 if (!strcmp(root->name, name))
781 FOR_EACH_CHILD(ch, root) {
782 struct btr_node *result = btr_search_node(name, ch);
789 /** 640K ought to be enough for everybody ;) */
790 #define BTRN_MAX_PENDING (640 * 1024)
792 int btr_node_status(struct btr_node *btrn, size_t min_iqs,
793 enum btr_node_type type)
798 if (type != BTR_NT_LEAF) {
799 if (btr_no_children(btrn))
800 return -E_BTR_NO_CHILD;
801 if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
804 if (type != BTR_NT_ROOT) {
807 iqs = btr_get_input_queue_size(btrn);
808 if (iqs == 0) /* we have a parent, because not eof */
810 if (iqs < min_iqs && !btr_no_parent(btrn))
816 void btr_get_node_start(struct btr_node *btrn, struct timeval *tv)