7 #include "buffer_tree.h"
18 enum btr_buffer_flags {
19 /* changes the way the buffer is deallocated */
26 /** The number of references to this buffer. */
28 struct btr_pool *pool;
31 struct btr_buffer_reference {
32 struct btr_buffer *btrb;
34 /* Each buffer reference belongs to the buffer queue list of some buffer tree node. */
35 struct list_head node;
41 struct btr_node *parent;
42 /* The position of this btr node in the buffer tree. */
43 struct list_head node;
44 /* The children nodes of this btr node are linked together in a list. */
45 struct list_head children;
46 /* Time of first data transfer. */
49 * The input queue is a list of references to btr buffers. Each item on
50 * the list represents an input buffer which has not been completely
51 * used by this btr node.
53 struct list_head input_queue;
54 btr_command_handler execute;
58 struct btr_pool *btr_pool_new(size_t area_size)
60 struct btr_pool *btrp = para_malloc(sizeof(*btrp));
62 btrp->area_start = para_malloc(area_size);
63 btrp->area_end = btrp->area_start + area_size;
64 btrp->rhead = btrp->area_start;
65 btrp->whead = btrp->area_start;
69 /* whead = NULL means area full */
71 void btr_pool_free(struct btr_pool *btrp)
75 free(btrp->area_start);
79 size_t btr_pool_size(struct btr_pool *btrp)
81 return btrp->area_end - btrp->area_start;
84 size_t btr_pool_filled(struct btr_pool *btrp)
87 return btr_pool_size(btrp);
88 if (btrp->rhead <= btrp->whead)
89 return btrp->whead - btrp->rhead;
90 return btr_pool_size(btrp) - (btrp->rhead - btrp->whead);
93 size_t btr_pool_unused(struct btr_pool *btrp)
95 return btr_pool_size(btrp) - btr_pool_filled(btrp);
98 size_t btr_pool_available(struct btr_pool *btrp)
102 if (btrp->rhead <= btrp->whead)
103 return btrp->area_end - btrp->whead;
104 return btrp->rhead - btrp->whead;
107 size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result)
110 *result = btrp->whead;
111 return btr_pool_available(btrp);
114 void btr_pool_allocate(struct btr_pool *btrp, size_t size)
120 //PARA_CRIT_LOG("filled: %zu, alloc %zu\n", btr_pool_filled(btrp), size);
121 assert(size <= btr_pool_available(btrp));
122 end = btrp->whead + size;
123 assert(end <= btrp->area_end);
125 if (end == btrp->area_end) {
126 PARA_DEBUG_LOG("end of pool area reached: %p\n", end);
127 end = btrp->area_start;
129 if (end == btrp->rhead) {
130 PARA_DEBUG_LOG("btrp buffer full\n");
131 end = NULL; /* buffer full */
134 //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
137 static void btr_pool_deallocate(struct btr_pool *btrp, size_t size)
139 char *end = btrp->rhead + size;
141 //PARA_CRIT_LOG("filled: %zu, dealloc %zu\n", btr_pool_filled(btrp), size);
144 assert(end <= btrp->area_end);
145 assert(size <= btr_pool_filled(btrp));
146 if (end == btrp->area_end)
147 end = btrp->area_start;
149 btrp->whead = btrp->rhead;
151 if (btrp->rhead == btrp->whead)
152 btrp->rhead = btrp->whead = btrp->area_start;
153 //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
156 #define FOR_EACH_CHILD(_tn, _btrn) list_for_each_entry((_tn), \
157 &((_btrn)->children), node)
158 #define FOR_EACH_CHILD_SAFE(_tn, _tmp, _btrn) \
159 list_for_each_entry_safe((_tn), (_tmp), &((_btrn)->children), node)
161 #define FOR_EACH_BUFFER_REF(_br, _btrn) \
162 list_for_each_entry((_br), &(_btrn)->input_queue, node)
163 #define FOR_EACH_BUFFER_REF_SAFE(_br, _tmp, _btrn) \
164 list_for_each_entry_safe((_br), (_tmp), &(_btrn)->input_queue, node)
166 struct btr_node *btr_new_node(const char *name, struct btr_node *parent,
167 btr_command_handler handler, void *context)
169 struct btr_node *btrn = para_malloc(sizeof(*btrn));
171 btrn->name = para_strdup(name);
172 btrn->parent = parent;
173 btrn->execute = handler;
174 btrn->context = context;
175 btrn->start.tv_sec = 0;
176 btrn->start.tv_usec = 0;
178 list_add_tail(&btrn->node, &parent->children);
179 INIT_LIST_HEAD(&btrn->children);
180 INIT_LIST_HEAD(&btrn->input_queue);
182 PARA_INFO_LOG("added %s as child of %s\n", name, parent->name);
184 PARA_INFO_LOG("added %s as btr root\n", name);
189 * Allocate a new btr buffer.
191 * The freshly allocated buffer will have a zero refcount.
193 static struct btr_buffer *new_btrb(char *buf, size_t size)
195 struct btr_buffer *btrb = para_calloc(sizeof(*btrb));
202 static void dealloc_buffer(struct btr_buffer *btrb)
205 btr_pool_deallocate(btrb->pool, btrb->size);
210 static struct btr_buffer_reference *get_first_input_br(struct btr_node *btrn)
212 if (list_empty(&btrn->input_queue))
214 return list_first_entry(&btrn->input_queue,
215 struct btr_buffer_reference, node);
219 * Deallocate the reference, release the resources if refcount drops to zero.
221 static void btr_drop_buffer_reference(struct btr_buffer_reference *br)
223 struct btr_buffer *btrb = br->btrb;
225 //PARA_CRIT_LOG("dropping buffer reference %p\n", br);
229 if (btrb->refcount == 0) {
230 dealloc_buffer(btrb);
235 static void add_btrb_to_children(struct btr_buffer *btrb,
236 struct btr_node *btrn, size_t consumed)
240 if (btrn->start.tv_sec == 0)
242 FOR_EACH_CHILD(ch, btrn) {
243 struct btr_buffer_reference *br = para_calloc(sizeof(*br));
245 br->consumed = consumed;
246 list_add_tail(&br->node, &ch->input_queue);
248 if (ch->start.tv_sec == 0)
253 void btr_add_output(char *buf, size_t size, struct btr_node *btrn)
255 struct btr_buffer *btrb;
258 if (list_empty(&btrn->children)) {
262 btrb = new_btrb(buf, size);
263 add_btrb_to_children(btrb, btrn, 0);
266 void btr_add_output_pool(struct btr_pool *btrp, char *buf, size_t size,
267 struct btr_node *btrn)
269 struct btr_buffer *btrb;
272 if (list_empty(&btrn->children)) {
273 btr_pool_deallocate(btrp, size);
276 btrb = new_btrb(buf, size);
278 add_btrb_to_children(btrb, btrn, 0);
281 static void btr_pushdown_br(struct btr_buffer_reference *br, struct btr_node *btrn)
283 add_btrb_to_children(br->btrb, btrn, br->consumed);
284 btr_drop_buffer_reference(br);
287 void btr_pushdown(struct btr_node *btrn)
289 struct btr_buffer_reference *br, *tmp;
291 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
292 btr_pushdown_br(br, btrn);
295 int btr_pushdown_one(struct btr_node *btrn)
297 struct btr_buffer_reference *br;
299 if (list_empty(&btrn->input_queue))
301 br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
302 btr_pushdown_br(br, btrn);
306 /* Return true if this node has no children. */
307 bool btr_no_children(struct btr_node *btrn)
309 return list_empty(&btrn->children);
312 bool btr_no_parent(struct btr_node *btrn)
314 return !btrn->parent;
317 bool btr_inplace_ok(struct btr_node *btrn)
321 return list_is_singular(&btrn->parent->children);
324 static inline size_t br_available_bytes(struct btr_buffer_reference *br)
326 return br->btrb->size - br->consumed;
329 size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
332 *buf = br->btrb->buf + br->consumed;
333 return br_available_bytes(br);
337 * \return zero if the input buffer queue is empty.
339 size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
341 struct btr_buffer_reference *br;
342 char *buf, *result = NULL;
345 FOR_EACH_BUFFER_REF(br, btrn) {
346 sz = btr_get_buffer_by_reference(br, &buf);
356 if (result + rv != buf) {
357 PARA_DEBUG_LOG("%s: pool merge impossible: %p != %p\n",
358 btrn->name, result + rv, buf);
361 // PARA_CRIT_LOG("%s: inplace merge (%zu, %zu)->%zu\n", btrn->name,
363 // PARA_CRIT_LOG("%s: inplace merge %p (%zu)\n", btrn->name,
372 void btr_consume(struct btr_node *btrn, size_t numbytes)
374 struct btr_buffer_reference *br, *tmp;
379 br = get_first_input_br(btrn);
382 //PARA_CRIT_LOG("wrap count: %zu\n", br->wrap_count);
383 if (br->wrap_count == 0) {
385 * No wrap buffer. Drop buffer references whose buffer
386 * has been fully used. */
387 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn) {
388 if (br->consumed + numbytes <= br->btrb->size) {
389 br->consumed += numbytes;
390 if (br->consumed == br->btrb->size)
391 btr_drop_buffer_reference(br);
394 numbytes -= br->btrb->size - br->consumed;
395 btr_drop_buffer_reference(br);
401 We have a wrap buffer, consume from it. If in total,
402 i.e. including previous calls to brt_consume(), less than
403 wrap_count has been consumed, there's nothing more we can do.
405 Otherwise we drop the wrap buffer and consume from subsequent
406 buffers of the input queue the correct amount of bytes. This
407 is the total number of bytes that have been consumed from the
410 PARA_DEBUG_LOG("consuming %zu/%zu bytes from wrap buffer\n", numbytes,
411 br_available_bytes(br));
413 assert(numbytes <= br_available_bytes(br));
414 if (br->consumed + numbytes < br->wrap_count) {
415 br->consumed += numbytes;
418 PARA_DEBUG_LOG("dropping wrap buffer (%zu bytes)\n", br->btrb->size);
419 /* get rid of the wrap buffer */
420 sz = br->consumed + numbytes;
421 btr_drop_buffer_reference(br);
422 return btr_consume(btrn, sz);
425 static void flush_input_queue(struct btr_node *btrn)
427 struct btr_buffer_reference *br, *tmp;
428 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
429 btr_drop_buffer_reference(br);
432 void btr_free_node(struct btr_node *btrn)
440 void btr_remove_node(struct btr_node *btrn)
446 PARA_NOTICE_LOG("removing btr node %s from buffer tree\n", btrn->name);
447 FOR_EACH_CHILD(ch, btrn)
449 flush_input_queue(btrn);
451 list_del(&btrn->node);
454 size_t btr_get_input_queue_size(struct btr_node *btrn)
456 struct btr_buffer_reference *br;
459 FOR_EACH_BUFFER_REF(br, btrn) {
460 //PARA_CRIT_LOG("size: %zu\n", size);
461 size += br_available_bytes(br);
466 void btr_splice_out_node(struct btr_node *btrn)
468 struct btr_node *ch, *tmp;
471 PARA_NOTICE_LOG("splicing out %s\n", btrn->name);
474 list_del(&btrn->node);
475 FOR_EACH_CHILD_SAFE(ch, tmp, btrn) {
476 PARA_INFO_LOG("parent(%s): %s\n", ch->name,
477 btrn->parent? btrn->parent->name : "NULL");
478 ch->parent = btrn->parent;
480 list_move(&ch->node, &btrn->parent->children);
482 assert(list_empty(&btrn->children));
486 * Return the size of the largest input queue.
488 * Iterates over all children of the given node.
490 size_t btr_bytes_pending(struct btr_node *btrn)
495 FOR_EACH_CHILD(ch, btrn) {
496 size_t size = btr_get_input_queue_size(ch);
497 max_size = PARA_MAX(max_size, size);
502 int btr_exec(struct btr_node *btrn, const char *command, char **value_result)
505 return -ERRNO_TO_PARA_ERROR(EINVAL);
507 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
508 return btrn->execute(btrn, command, value_result);
511 int btr_exec_up(struct btr_node *btrn, const char *command, char **value_result)
515 for (; btrn; btrn = btrn->parent) {
516 struct btr_node *parent = btrn->parent;
518 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
519 if (!parent->execute)
521 PARA_INFO_LOG("parent: %s, cmd: %s\n", parent->name, command);
522 ret = parent->execute(parent, command, value_result);
523 if (ret == -ERRNO_TO_PARA_ERROR(ENOTSUP))
527 if (value_result && *value_result)
528 PARA_NOTICE_LOG("%s(%s): %s\n", command, parent->name,
532 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
535 void *btr_context(struct btr_node *btrn)
537 return btrn->context;
540 static bool need_buffer_pool_merge(struct btr_node *btrn)
542 struct btr_buffer_reference *br = get_first_input_br(btrn);
546 if (br->wrap_count != 0)
553 static void merge_input_pool(struct btr_node *btrn, size_t dest_size)
555 struct btr_buffer_reference *br, *wbr;
556 int num_refs; /* including wrap buffer */
557 char *buf, *buf1, *buf2 = NULL;
558 size_t sz, sz1, sz2 = 0, wsz;
560 if (list_empty(&btrn->input_queue))
564 FOR_EACH_BUFFER_REF(br, btrn) {
566 sz = btr_get_buffer_by_reference(br, &buf);
567 if (br->wrap_count != 0) {
569 assert(num_refs == 1);
580 if (buf1 + sz1 == buf) {
589 assert(buf2 + sz2 == buf);
592 if (sz1 + sz2 >= dest_size)
597 if (!buf2) /* nothing to do */
599 /* make a new wrap buffer combining buf1 and buf 2. */
601 buf = para_malloc(sz);
602 PARA_DEBUG_LOG("merging input buffers: (%p:%zu, %p:%zu) -> %p:%zu\n",
603 buf1, sz1, buf2, sz2, buf, sz);
604 memcpy(buf, buf1, sz1);
605 memcpy(buf + sz1, buf2, sz2);
606 br = para_calloc(sizeof(*br));
607 br->btrb = new_btrb(buf, sz);
608 br->btrb->refcount = 1;
610 /* This is a wrap buffer */
611 br->wrap_count = sz1;
612 para_list_add(&br->node, &btrn->input_queue);
616 * We already have a wrap buffer, but it is too small. It might be
619 wsz = br_available_bytes(wbr);
620 if (wbr->wrap_count == sz1 && wbr->btrb->size >= sz1 + sz2) /* nothing we can do about it */
622 assert(buf1 && buf2);
623 sz = sz1 + sz2 - wbr->btrb->size; /* amount of new data */
624 wbr->btrb->size += sz;
625 PARA_DEBUG_LOG("increasing wrap buffer to %zu\n", wbr->btrb->size);
626 wbr->btrb->buf = para_realloc(wbr->btrb->buf, wbr->btrb->size);
627 /* copy the new data to the end of the reallocated buffer */
629 memcpy(wbr->btrb->buf + wbr->btrb->size - sz, buf2 + sz2 - sz, sz);
633 * Merge the first two input buffers into one.
635 * This is a quite expensive operation.
637 * \return The number of buffers that have been available (zero, one or two).
639 static int merge_input(struct btr_node *btrn)
641 struct btr_buffer_reference *brs[2], *br;
646 if (list_empty(&btrn->input_queue))
648 if (list_is_singular(&btrn->input_queue))
651 /* get references to the first two buffers */
652 FOR_EACH_BUFFER_REF(br, btrn) {
654 szs[i] = btr_get_buffer_by_reference(brs[i], bufs + i);
659 /* make a new btrb that combines the two buffers and a br to it. */
660 sz = szs[0] + szs[1];
661 buf = para_malloc(sz);
662 PARA_DEBUG_LOG("memory merging input buffers: (%zu, %zu) -> %zu\n",
664 memcpy(buf, bufs[0], szs[0]);
665 memcpy(buf + szs[0], bufs[1], szs[1]);
667 br = para_calloc(sizeof(*br));
668 br->btrb = new_btrb(buf, sz);
669 br->btrb->refcount = 1;
671 /* replace the first two refs by the new one */
672 btr_drop_buffer_reference(brs[0]);
673 btr_drop_buffer_reference(brs[1]);
674 para_list_add(&br->node, &btrn->input_queue);
678 void btr_merge(struct btr_node *btrn, size_t dest_size)
680 if (need_buffer_pool_merge(btrn))
681 return merge_input_pool(btrn, dest_size);
684 size_t len = btr_next_buffer(btrn, &buf);
685 if (len >= dest_size)
687 PARA_DEBUG_LOG("input size = %zu < %zu = dest\n", len, dest_size);
688 if (merge_input(btrn) < 2)
693 bool btr_eof(struct btr_node *btrn)
696 size_t len = btr_next_buffer(btrn, &buf);
698 return (len == 0 && btr_no_parent(btrn));
701 void log_tree_recursively(struct btr_node *btrn, int loglevel, int depth)
704 const char spaces[] = " ", *space = spaces + 16 - depth;
708 para_log(loglevel, "%s%s\n", space, btrn->name);
709 FOR_EACH_CHILD(ch, btrn)
710 log_tree_recursively(ch, loglevel, depth + 1);
713 void btr_log_tree(struct btr_node *btrn, int loglevel)
715 return log_tree_recursively(btrn, loglevel, 0);
718 /** 640K ought to be enough for everybody ;) */
719 #define BTRN_MAX_PENDING (640 * 1024)
721 int btr_node_status(struct btr_node *btrn, size_t min_iqs,
722 enum btr_node_type type)
726 if (type != BTR_NT_LEAF) {
727 if (btr_no_children(btrn))
728 return -E_BTR_NO_CHILD;
729 if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
732 if (type != BTR_NT_ROOT) {
735 iqs = btr_get_input_queue_size(btrn);
736 if (iqs == 0) /* we have a parent, because not eof */
738 if (iqs < min_iqs && !btr_no_parent(btrn))
744 void btr_get_node_start(struct btr_node *btrn, struct timeval *tv)