7 #include "buffer_tree.h"
19 enum btr_buffer_flags {
20 /* changes the way the buffer is deallocated */
27 /** The number of references to this buffer. */
29 struct btr_pool *pool;
32 struct btr_buffer_reference {
33 struct btr_buffer *btrb;
35 /* Each buffer reference belongs to the buffer queue list of some buffer tree node. */
36 struct list_head node;
42 struct btr_node *parent;
43 /* The position of this btr node in the buffer tree. */
44 struct list_head node;
45 /* The children nodes of this btr node are linked together in a list. */
46 struct list_head children;
47 /* Time of first data transfer. */
50 * The input queue is a list of references to btr buffers. Each item on
51 * the list represents an input buffer which has not been completely
52 * used by this btr node.
54 struct list_head input_queue;
55 btr_command_handler execute;
59 struct btr_pool *btr_pool_new(const char *name, size_t area_size)
61 struct btr_pool *btrp;
63 PARA_INFO_LOG("%s, %zu bytes\n", name, area_size);
64 btrp = para_malloc(sizeof(*btrp));
65 btrp->area_start = para_malloc(area_size);
66 btrp->area_end = btrp->area_start + area_size;
67 btrp->rhead = btrp->area_start;
68 btrp->whead = btrp->area_start;
69 btrp->name = para_strdup(name);
73 /* whead = NULL means area full */
75 void btr_pool_free(struct btr_pool *btrp)
79 free(btrp->area_start);
84 size_t btr_pool_size(struct btr_pool *btrp)
86 return btrp->area_end - btrp->area_start;
89 size_t btr_pool_filled(struct btr_pool *btrp)
92 return btr_pool_size(btrp);
93 if (btrp->rhead <= btrp->whead)
94 return btrp->whead - btrp->rhead;
95 return btr_pool_size(btrp) - (btrp->rhead - btrp->whead);
98 size_t btr_pool_unused(struct btr_pool *btrp)
100 return btr_pool_size(btrp) - btr_pool_filled(btrp);
103 size_t btr_pool_available(struct btr_pool *btrp)
107 if (btrp->rhead <= btrp->whead)
108 return btrp->area_end - btrp->whead;
109 return btrp->rhead - btrp->whead;
112 size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result)
115 *result = btrp->whead;
116 return btr_pool_available(btrp);
119 void btr_pool_allocate(struct btr_pool *btrp, size_t size)
125 //PARA_CRIT_LOG("filled: %zu, alloc %zu\n", btr_pool_filled(btrp), size);
126 assert(size <= btr_pool_available(btrp));
127 end = btrp->whead + size;
128 assert(end <= btrp->area_end);
130 if (end == btrp->area_end) {
131 PARA_DEBUG_LOG("%s: end of pool area reached\n", btrp->name);
132 end = btrp->area_start;
134 if (end == btrp->rhead) {
135 PARA_DEBUG_LOG("btrp buffer full\n");
136 end = NULL; /* buffer full */
139 //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
142 static void btr_pool_deallocate(struct btr_pool *btrp, size_t size)
144 char *end = btrp->rhead + size;
146 //PARA_CRIT_LOG("filled: %zu, dealloc %zu\n", btr_pool_filled(btrp), size);
149 assert(end <= btrp->area_end);
150 assert(size <= btr_pool_filled(btrp));
151 if (end == btrp->area_end)
152 end = btrp->area_start;
154 btrp->whead = btrp->rhead;
156 if (btrp->rhead == btrp->whead)
157 btrp->rhead = btrp->whead = btrp->area_start;
158 //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
161 #define FOR_EACH_CHILD(_tn, _btrn) list_for_each_entry((_tn), \
162 &((_btrn)->children), node)
163 #define FOR_EACH_CHILD_SAFE(_tn, _tmp, _btrn) \
164 list_for_each_entry_safe((_tn), (_tmp), &((_btrn)->children), node)
166 #define FOR_EACH_BUFFER_REF(_br, _btrn) \
167 list_for_each_entry((_br), &(_btrn)->input_queue, node)
168 #define FOR_EACH_BUFFER_REF_SAFE(_br, _tmp, _btrn) \
169 list_for_each_entry_safe((_br), (_tmp), &(_btrn)->input_queue, node)
171 struct btr_node *btr_new_node(const char *name, struct btr_node *parent,
172 btr_command_handler handler, void *context)
174 struct btr_node *btrn = para_malloc(sizeof(*btrn));
176 btrn->name = para_strdup(name);
177 btrn->parent = parent;
178 btrn->execute = handler;
179 btrn->context = context;
180 btrn->start.tv_sec = 0;
181 btrn->start.tv_usec = 0;
183 list_add_tail(&btrn->node, &parent->children);
184 INIT_LIST_HEAD(&btrn->children);
185 INIT_LIST_HEAD(&btrn->input_queue);
187 PARA_INFO_LOG("added %s as child of %s\n", name, parent->name);
189 PARA_INFO_LOG("added %s as btr root\n", name);
194 * Allocate a new btr buffer.
196 * The freshly allocated buffer will have a zero refcount and will
197 * not be associated with a btr pool.
199 static struct btr_buffer *new_btrb(char *buf, size_t size)
201 struct btr_buffer *btrb = para_calloc(sizeof(*btrb));
208 static void dealloc_buffer(struct btr_buffer *btrb)
211 btr_pool_deallocate(btrb->pool, btrb->size);
216 static struct btr_buffer_reference *get_first_input_br(struct btr_node *btrn)
218 if (list_empty(&btrn->input_queue))
220 return list_first_entry(&btrn->input_queue,
221 struct btr_buffer_reference, node);
225 * Deallocate the reference, release the resources if refcount drops to zero.
227 static void btr_drop_buffer_reference(struct btr_buffer_reference *br)
229 struct btr_buffer *btrb = br->btrb;
231 //PARA_CRIT_LOG("dropping buffer reference %p\n", br);
235 if (btrb->refcount == 0) {
236 dealloc_buffer(btrb);
241 static void add_btrb_to_children(struct btr_buffer *btrb,
242 struct btr_node *btrn, size_t consumed)
246 if (btrn->start.tv_sec == 0)
248 FOR_EACH_CHILD(ch, btrn) {
249 struct btr_buffer_reference *br = para_calloc(sizeof(*br));
251 br->consumed = consumed;
252 list_add_tail(&br->node, &ch->input_queue);
254 if (ch->start.tv_sec == 0)
259 void btr_add_output(char *buf, size_t size, struct btr_node *btrn)
261 struct btr_buffer *btrb;
264 if (list_empty(&btrn->children)) {
268 btrb = new_btrb(buf, size);
269 add_btrb_to_children(btrb, btrn, 0);
272 void btr_add_output_pool(struct btr_pool *btrp, size_t size,
273 struct btr_node *btrn)
275 struct btr_buffer *btrb;
280 if (list_empty(&btrn->children))
282 avail = btr_pool_get_buffer(btrp, &buf);
283 assert(avail >= size);
284 btr_pool_allocate(btrp, size);
285 btrb = new_btrb(buf, size);
287 add_btrb_to_children(btrb, btrn, 0);
290 void btr_copy(const void *src, size_t n, struct btr_pool *btrp,
291 struct btr_node *btrn)
298 assert(n <= btr_pool_unused(btrp));
299 sz = btr_pool_get_buffer(btrp, &buf);
300 copy = PARA_MIN(sz, n);
301 memcpy(buf, src, copy);
302 btr_add_output_pool(btrp, copy, btrn);
305 sz = btr_pool_get_buffer(btrp, &buf);
306 assert(sz >= n - copy);
307 memcpy(buf, src + copy, n - copy);
308 btr_add_output_pool(btrp, n - copy, btrn);
311 static void btr_pushdown_br(struct btr_buffer_reference *br, struct btr_node *btrn)
313 add_btrb_to_children(br->btrb, btrn, br->consumed);
314 btr_drop_buffer_reference(br);
317 void btr_pushdown(struct btr_node *btrn)
319 struct btr_buffer_reference *br, *tmp;
321 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
322 btr_pushdown_br(br, btrn);
325 int btr_pushdown_one(struct btr_node *btrn)
327 struct btr_buffer_reference *br;
329 if (list_empty(&btrn->input_queue))
331 br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
332 btr_pushdown_br(br, btrn);
336 /* Return true if this node has no children. */
337 bool btr_no_children(struct btr_node *btrn)
339 return list_empty(&btrn->children);
342 bool btr_no_parent(struct btr_node *btrn)
344 return !btrn->parent;
347 bool btr_inplace_ok(struct btr_node *btrn)
351 return list_is_singular(&btrn->parent->children);
354 static inline size_t br_available_bytes(struct btr_buffer_reference *br)
356 return br->btrb->size - br->consumed;
359 size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
362 *buf = br->btrb->buf + br->consumed;
363 return br_available_bytes(br);
367 * \return zero if the input buffer queue is empty.
369 size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
371 struct btr_buffer_reference *br;
372 char *buf, *result = NULL;
375 FOR_EACH_BUFFER_REF(br, btrn) {
376 sz = btr_get_buffer_by_reference(br, &buf);
386 if (result + rv != buf) {
387 PARA_DEBUG_LOG("%s: pool merge impossible: %p != %p\n",
388 btrn->name, result + rv, buf);
391 // PARA_CRIT_LOG("%s: inplace merge (%zu, %zu)->%zu\n", btrn->name,
393 // PARA_CRIT_LOG("%s: inplace merge %p (%zu)\n", btrn->name,
402 void btr_consume(struct btr_node *btrn, size_t numbytes)
404 struct btr_buffer_reference *br, *tmp;
409 br = get_first_input_br(btrn);
412 //PARA_CRIT_LOG("wrap count: %zu\n", br->wrap_count);
413 if (br->wrap_count == 0) {
415 * No wrap buffer. Drop buffer references whose buffer
416 * has been fully used. */
417 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn) {
418 if (br->consumed + numbytes <= br->btrb->size) {
419 br->consumed += numbytes;
420 if (br->consumed == br->btrb->size)
421 btr_drop_buffer_reference(br);
424 numbytes -= br->btrb->size - br->consumed;
425 btr_drop_buffer_reference(br);
431 We have a wrap buffer, consume from it. If in total,
432 i.e. including previous calls to brt_consume(), less than
433 wrap_count has been consumed, there's nothing more we can do.
435 Otherwise we drop the wrap buffer and consume from subsequent
436 buffers of the input queue the correct amount of bytes. This
437 is the total number of bytes that have been consumed from the
440 PARA_DEBUG_LOG("consuming %zu/%zu bytes from wrap buffer\n", numbytes,
441 br_available_bytes(br));
443 assert(numbytes <= br_available_bytes(br));
444 if (br->consumed + numbytes < br->wrap_count) {
445 br->consumed += numbytes;
448 PARA_DEBUG_LOG("dropping wrap buffer (%zu bytes)\n", br->btrb->size);
449 /* get rid of the wrap buffer */
450 sz = br->consumed + numbytes;
451 btr_drop_buffer_reference(br);
452 return btr_consume(btrn, sz);
455 static void flush_input_queue(struct btr_node *btrn)
457 struct btr_buffer_reference *br, *tmp;
458 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
459 btr_drop_buffer_reference(br);
462 void btr_free_node(struct btr_node *btrn)
470 void btr_remove_node(struct btr_node *btrn)
476 PARA_NOTICE_LOG("removing btr node %s from buffer tree\n", btrn->name);
477 FOR_EACH_CHILD(ch, btrn)
479 flush_input_queue(btrn);
481 list_del(&btrn->node);
484 size_t btr_get_input_queue_size(struct btr_node *btrn)
486 struct btr_buffer_reference *br;
489 FOR_EACH_BUFFER_REF(br, btrn) {
490 //PARA_CRIT_LOG("size: %zu\n", size);
491 size += br_available_bytes(br);
496 void btr_splice_out_node(struct btr_node *btrn)
498 struct btr_node *ch, *tmp;
501 PARA_NOTICE_LOG("splicing out %s\n", btrn->name);
504 list_del(&btrn->node);
505 FOR_EACH_CHILD_SAFE(ch, tmp, btrn) {
506 PARA_INFO_LOG("parent(%s): %s\n", ch->name,
507 btrn->parent? btrn->parent->name : "NULL");
508 ch->parent = btrn->parent;
510 list_move(&ch->node, &btrn->parent->children);
512 assert(list_empty(&btrn->children));
516 * Return the size of the largest input queue.
518 * Iterates over all children of the given node.
520 size_t btr_bytes_pending(struct btr_node *btrn)
525 FOR_EACH_CHILD(ch, btrn) {
526 size_t size = btr_get_input_queue_size(ch);
527 max_size = PARA_MAX(max_size, size);
532 int btr_exec(struct btr_node *btrn, const char *command, char **value_result)
535 return -ERRNO_TO_PARA_ERROR(EINVAL);
537 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
538 return btrn->execute(btrn, command, value_result);
541 int btr_exec_up(struct btr_node *btrn, const char *command, char **value_result)
545 for (; btrn; btrn = btrn->parent) {
546 struct btr_node *parent = btrn->parent;
548 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
549 if (!parent->execute)
551 PARA_INFO_LOG("parent: %s, cmd: %s\n", parent->name, command);
552 ret = parent->execute(parent, command, value_result);
553 if (ret == -ERRNO_TO_PARA_ERROR(ENOTSUP))
557 if (value_result && *value_result)
558 PARA_NOTICE_LOG("%s(%s): %s\n", command, parent->name,
562 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
565 void *btr_context(struct btr_node *btrn)
567 return btrn->context;
570 static bool need_buffer_pool_merge(struct btr_node *btrn)
572 struct btr_buffer_reference *br = get_first_input_br(btrn);
576 if (br->wrap_count != 0)
583 static void merge_input_pool(struct btr_node *btrn, size_t dest_size)
585 struct btr_buffer_reference *br, *wbr;
586 int num_refs; /* including wrap buffer */
587 char *buf, *buf1, *buf2 = NULL;
588 size_t sz, sz1, sz2 = 0, wsz;
590 if (list_empty(&btrn->input_queue))
594 FOR_EACH_BUFFER_REF(br, btrn) {
596 sz = btr_get_buffer_by_reference(br, &buf);
597 if (br->wrap_count != 0) {
599 assert(num_refs == 1);
610 if (buf1 + sz1 == buf) {
619 assert(buf2 + sz2 == buf);
622 if (sz1 + sz2 >= dest_size)
627 if (!buf2) /* nothing to do */
629 /* make a new wrap buffer combining buf1 and buf 2. */
631 buf = para_malloc(sz);
632 PARA_DEBUG_LOG("merging input buffers: (%p:%zu, %p:%zu) -> %p:%zu\n",
633 buf1, sz1, buf2, sz2, buf, sz);
634 memcpy(buf, buf1, sz1);
635 memcpy(buf + sz1, buf2, sz2);
636 br = para_calloc(sizeof(*br));
637 br->btrb = new_btrb(buf, sz);
638 br->btrb->refcount = 1;
640 /* This is a wrap buffer */
641 br->wrap_count = sz1;
642 para_list_add(&br->node, &btrn->input_queue);
646 * We already have a wrap buffer, but it is too small. It might be
649 wsz = br_available_bytes(wbr);
650 if (wbr->wrap_count == sz1 && wbr->btrb->size >= sz1 + sz2) /* nothing we can do about it */
652 assert(buf1 && buf2);
653 sz = sz1 + sz2 - wbr->btrb->size; /* amount of new data */
654 wbr->btrb->size += sz;
655 PARA_DEBUG_LOG("increasing wrap buffer to %zu\n", wbr->btrb->size);
656 wbr->btrb->buf = para_realloc(wbr->btrb->buf, wbr->btrb->size);
657 /* copy the new data to the end of the reallocated buffer */
659 memcpy(wbr->btrb->buf + wbr->btrb->size - sz, buf2 + sz2 - sz, sz);
663 * Merge the first two input buffers into one.
665 * This is a quite expensive operation.
667 * \return The number of buffers that have been available (zero, one or two).
669 static int merge_input(struct btr_node *btrn)
671 struct btr_buffer_reference *brs[2], *br;
676 if (list_empty(&btrn->input_queue))
678 if (list_is_singular(&btrn->input_queue))
681 /* get references to the first two buffers */
682 FOR_EACH_BUFFER_REF(br, btrn) {
684 szs[i] = btr_get_buffer_by_reference(brs[i], bufs + i);
689 /* make a new btrb that combines the two buffers and a br to it. */
690 sz = szs[0] + szs[1];
691 buf = para_malloc(sz);
692 PARA_DEBUG_LOG("%s: memory merging input buffers: (%zu, %zu) -> %zu\n",
693 btrn->name, szs[0], szs[1], sz);
694 memcpy(buf, bufs[0], szs[0]);
695 memcpy(buf + szs[0], bufs[1], szs[1]);
697 br = para_calloc(sizeof(*br));
698 br->btrb = new_btrb(buf, sz);
699 br->btrb->refcount = 1;
701 /* replace the first two refs by the new one */
702 btr_drop_buffer_reference(brs[0]);
703 btr_drop_buffer_reference(brs[1]);
704 para_list_add(&br->node, &btrn->input_queue);
708 void btr_merge(struct btr_node *btrn, size_t dest_size)
710 if (need_buffer_pool_merge(btrn))
711 return merge_input_pool(btrn, dest_size);
714 size_t len = btr_next_buffer(btrn, &buf);
715 if (len >= dest_size)
717 PARA_DEBUG_LOG("input size = %zu < %zu = dest\n", len, dest_size);
718 if (merge_input(btrn) < 2)
723 bool btr_eof(struct btr_node *btrn)
726 size_t len = btr_next_buffer(btrn, &buf);
728 return (len == 0 && btr_no_parent(btrn));
731 void log_tree_recursively(struct btr_node *btrn, int loglevel, int depth)
734 const char spaces[] = " ", *space = spaces + 16 - depth;
738 para_log(loglevel, "%s%s\n", space, btrn->name);
739 FOR_EACH_CHILD(ch, btrn)
740 log_tree_recursively(ch, loglevel, depth + 1);
743 void btr_log_tree(struct btr_node *btrn, int loglevel)
745 return log_tree_recursively(btrn, loglevel, 0);
748 /** 640K ought to be enough for everybody ;) */
749 #define BTRN_MAX_PENDING (640 * 1024)
751 int btr_node_status(struct btr_node *btrn, size_t min_iqs,
752 enum btr_node_type type)
758 if (type != BTR_NT_LEAF) {
759 if (btr_no_children(btrn))
760 return -E_BTR_NO_CHILD;
761 if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
764 if (type != BTR_NT_ROOT) {
767 iqs = btr_get_input_queue_size(btrn);
768 if (iqs == 0) /* we have a parent, because not eof */
770 if (iqs < min_iqs && !btr_no_parent(btrn))
776 void btr_get_node_start(struct btr_node *btrn, struct timeval *tv)