7 #include "buffer_tree.h"
19 enum btr_buffer_flags {
20 /* changes the way the buffer is deallocated */
27 /** The number of references to this buffer. */
29 struct btr_pool *pool;
32 struct btr_buffer_reference {
33 struct btr_buffer *btrb;
35 /* Each buffer reference belongs to the buffer queue list of some buffer tree node. */
36 struct list_head node;
42 struct btr_node *parent;
43 /* The position of this btr node in the buffer tree. */
44 struct list_head node;
45 /* The children nodes of this btr node are linked together in a list. */
46 struct list_head children;
47 /* Time of first data transfer. */
50 * The input queue is a list of references to btr buffers. Each item on
51 * the list represents an input buffer which has not been completely
52 * used by this btr node.
54 struct list_head input_queue;
55 btr_command_handler execute;
59 struct btr_pool *btr_pool_new(const char *name, size_t area_size)
61 struct btr_pool *btrp;
63 PARA_INFO_LOG("%s, %zu bytes\n", name, area_size);
64 btrp = para_malloc(sizeof(*btrp));
65 btrp->area_start = para_malloc(area_size);
66 btrp->area_end = btrp->area_start + area_size;
67 btrp->rhead = btrp->area_start;
68 btrp->whead = btrp->area_start;
69 btrp->name = para_strdup(name);
73 /* whead = NULL means area full */
75 void btr_pool_free(struct btr_pool *btrp)
79 free(btrp->area_start);
84 size_t btr_pool_size(struct btr_pool *btrp)
86 return btrp->area_end - btrp->area_start;
89 size_t btr_pool_filled(struct btr_pool *btrp)
92 return btr_pool_size(btrp);
93 if (btrp->rhead <= btrp->whead)
94 return btrp->whead - btrp->rhead;
95 return btr_pool_size(btrp) - (btrp->rhead - btrp->whead);
98 size_t btr_pool_unused(struct btr_pool *btrp)
100 return btr_pool_size(btrp) - btr_pool_filled(btrp);
103 size_t btr_pool_available(struct btr_pool *btrp)
107 if (btrp->rhead <= btrp->whead)
108 return btrp->area_end - btrp->whead;
109 return btrp->rhead - btrp->whead;
112 size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result)
115 *result = btrp->whead;
116 return btr_pool_available(btrp);
119 void btr_pool_allocate(struct btr_pool *btrp, size_t size)
125 //PARA_CRIT_LOG("filled: %zu, alloc %zu\n", btr_pool_filled(btrp), size);
126 assert(size <= btr_pool_available(btrp));
127 end = btrp->whead + size;
128 assert(end <= btrp->area_end);
130 if (end == btrp->area_end) {
131 PARA_DEBUG_LOG("end of pool area reached: %p\n", end);
132 end = btrp->area_start;
134 if (end == btrp->rhead) {
135 PARA_DEBUG_LOG("btrp buffer full\n");
136 end = NULL; /* buffer full */
139 //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
142 static void btr_pool_deallocate(struct btr_pool *btrp, size_t size)
144 char *end = btrp->rhead + size;
146 //PARA_CRIT_LOG("filled: %zu, dealloc %zu\n", btr_pool_filled(btrp), size);
149 assert(end <= btrp->area_end);
150 assert(size <= btr_pool_filled(btrp));
151 if (end == btrp->area_end)
152 end = btrp->area_start;
154 btrp->whead = btrp->rhead;
156 if (btrp->rhead == btrp->whead)
157 btrp->rhead = btrp->whead = btrp->area_start;
158 //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
161 #define FOR_EACH_CHILD(_tn, _btrn) list_for_each_entry((_tn), \
162 &((_btrn)->children), node)
163 #define FOR_EACH_CHILD_SAFE(_tn, _tmp, _btrn) \
164 list_for_each_entry_safe((_tn), (_tmp), &((_btrn)->children), node)
166 #define FOR_EACH_BUFFER_REF(_br, _btrn) \
167 list_for_each_entry((_br), &(_btrn)->input_queue, node)
168 #define FOR_EACH_BUFFER_REF_SAFE(_br, _tmp, _btrn) \
169 list_for_each_entry_safe((_br), (_tmp), &(_btrn)->input_queue, node)
171 struct btr_node *btr_new_node(const char *name, struct btr_node *parent,
172 btr_command_handler handler, void *context)
174 struct btr_node *btrn = para_malloc(sizeof(*btrn));
176 btrn->name = para_strdup(name);
177 btrn->parent = parent;
178 btrn->execute = handler;
179 btrn->context = context;
180 btrn->start.tv_sec = 0;
181 btrn->start.tv_usec = 0;
183 list_add_tail(&btrn->node, &parent->children);
184 INIT_LIST_HEAD(&btrn->children);
185 INIT_LIST_HEAD(&btrn->input_queue);
187 PARA_INFO_LOG("added %s as child of %s\n", name, parent->name);
189 PARA_INFO_LOG("added %s as btr root\n", name);
194 * Allocate a new btr buffer.
196 * The freshly allocated buffer will have a zero refcount.
198 static struct btr_buffer *new_btrb(char *buf, size_t size)
200 struct btr_buffer *btrb = para_calloc(sizeof(*btrb));
207 static void dealloc_buffer(struct btr_buffer *btrb)
210 btr_pool_deallocate(btrb->pool, btrb->size);
215 static struct btr_buffer_reference *get_first_input_br(struct btr_node *btrn)
217 if (list_empty(&btrn->input_queue))
219 return list_first_entry(&btrn->input_queue,
220 struct btr_buffer_reference, node);
224 * Deallocate the reference, release the resources if refcount drops to zero.
226 static void btr_drop_buffer_reference(struct btr_buffer_reference *br)
228 struct btr_buffer *btrb = br->btrb;
230 //PARA_CRIT_LOG("dropping buffer reference %p\n", br);
234 if (btrb->refcount == 0) {
235 dealloc_buffer(btrb);
240 static void add_btrb_to_children(struct btr_buffer *btrb,
241 struct btr_node *btrn, size_t consumed)
245 if (btrn->start.tv_sec == 0)
247 FOR_EACH_CHILD(ch, btrn) {
248 struct btr_buffer_reference *br = para_calloc(sizeof(*br));
250 br->consumed = consumed;
251 list_add_tail(&br->node, &ch->input_queue);
253 if (ch->start.tv_sec == 0)
258 void btr_add_output(char *buf, size_t size, struct btr_node *btrn)
260 struct btr_buffer *btrb;
263 if (list_empty(&btrn->children)) {
267 btrb = new_btrb(buf, size);
268 add_btrb_to_children(btrb, btrn, 0);
271 void btr_add_output_pool(struct btr_pool *btrp, size_t size,
272 struct btr_node *btrn)
274 struct btr_buffer *btrb;
279 if (list_empty(&btrn->children))
281 avail = btr_pool_get_buffer(btrp, &buf);
282 assert(avail >= size);
283 btr_pool_allocate(btrp, size);
284 btrb = new_btrb(buf, size);
286 add_btrb_to_children(btrb, btrn, 0);
289 void btr_copy(const void *src, size_t n, struct btr_pool *btrp,
290 struct btr_node *btrn)
297 assert(n <= btr_pool_unused(btrp));
298 sz = btr_pool_get_buffer(btrp, &buf);
299 copy = PARA_MIN(sz, n);
300 memcpy(buf, src, copy);
301 btr_add_output_pool(btrp, copy, btrn);
304 sz = btr_pool_get_buffer(btrp, &buf);
305 assert(sz >= n - copy);
306 memcpy(buf, src + copy, n - copy);
307 btr_add_output_pool(btrp, n - copy, btrn);
310 static void btr_pushdown_br(struct btr_buffer_reference *br, struct btr_node *btrn)
312 add_btrb_to_children(br->btrb, btrn, br->consumed);
313 btr_drop_buffer_reference(br);
316 void btr_pushdown(struct btr_node *btrn)
318 struct btr_buffer_reference *br, *tmp;
320 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
321 btr_pushdown_br(br, btrn);
324 int btr_pushdown_one(struct btr_node *btrn)
326 struct btr_buffer_reference *br;
328 if (list_empty(&btrn->input_queue))
330 br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
331 btr_pushdown_br(br, btrn);
335 /* Return true if this node has no children. */
336 bool btr_no_children(struct btr_node *btrn)
338 return list_empty(&btrn->children);
341 bool btr_no_parent(struct btr_node *btrn)
343 return !btrn->parent;
346 bool btr_inplace_ok(struct btr_node *btrn)
350 return list_is_singular(&btrn->parent->children);
353 static inline size_t br_available_bytes(struct btr_buffer_reference *br)
355 return br->btrb->size - br->consumed;
358 size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
361 *buf = br->btrb->buf + br->consumed;
362 return br_available_bytes(br);
366 * \return zero if the input buffer queue is empty.
368 size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
370 struct btr_buffer_reference *br;
371 char *buf, *result = NULL;
374 FOR_EACH_BUFFER_REF(br, btrn) {
375 sz = btr_get_buffer_by_reference(br, &buf);
385 if (result + rv != buf) {
386 PARA_DEBUG_LOG("%s: pool merge impossible: %p != %p\n",
387 btrn->name, result + rv, buf);
390 // PARA_CRIT_LOG("%s: inplace merge (%zu, %zu)->%zu\n", btrn->name,
392 // PARA_CRIT_LOG("%s: inplace merge %p (%zu)\n", btrn->name,
401 void btr_consume(struct btr_node *btrn, size_t numbytes)
403 struct btr_buffer_reference *br, *tmp;
408 br = get_first_input_br(btrn);
411 //PARA_CRIT_LOG("wrap count: %zu\n", br->wrap_count);
412 if (br->wrap_count == 0) {
414 * No wrap buffer. Drop buffer references whose buffer
415 * has been fully used. */
416 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn) {
417 if (br->consumed + numbytes <= br->btrb->size) {
418 br->consumed += numbytes;
419 if (br->consumed == br->btrb->size)
420 btr_drop_buffer_reference(br);
423 numbytes -= br->btrb->size - br->consumed;
424 btr_drop_buffer_reference(br);
430 We have a wrap buffer, consume from it. If in total,
431 i.e. including previous calls to brt_consume(), less than
432 wrap_count has been consumed, there's nothing more we can do.
434 Otherwise we drop the wrap buffer and consume from subsequent
435 buffers of the input queue the correct amount of bytes. This
436 is the total number of bytes that have been consumed from the
439 PARA_DEBUG_LOG("consuming %zu/%zu bytes from wrap buffer\n", numbytes,
440 br_available_bytes(br));
442 assert(numbytes <= br_available_bytes(br));
443 if (br->consumed + numbytes < br->wrap_count) {
444 br->consumed += numbytes;
447 PARA_DEBUG_LOG("dropping wrap buffer (%zu bytes)\n", br->btrb->size);
448 /* get rid of the wrap buffer */
449 sz = br->consumed + numbytes;
450 btr_drop_buffer_reference(br);
451 return btr_consume(btrn, sz);
454 static void flush_input_queue(struct btr_node *btrn)
456 struct btr_buffer_reference *br, *tmp;
457 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
458 btr_drop_buffer_reference(br);
461 void btr_free_node(struct btr_node *btrn)
469 void btr_remove_node(struct btr_node *btrn)
475 PARA_NOTICE_LOG("removing btr node %s from buffer tree\n", btrn->name);
476 FOR_EACH_CHILD(ch, btrn)
478 flush_input_queue(btrn);
480 list_del(&btrn->node);
483 size_t btr_get_input_queue_size(struct btr_node *btrn)
485 struct btr_buffer_reference *br;
488 FOR_EACH_BUFFER_REF(br, btrn) {
489 //PARA_CRIT_LOG("size: %zu\n", size);
490 size += br_available_bytes(br);
495 void btr_splice_out_node(struct btr_node *btrn)
497 struct btr_node *ch, *tmp;
500 PARA_NOTICE_LOG("splicing out %s\n", btrn->name);
503 list_del(&btrn->node);
504 FOR_EACH_CHILD_SAFE(ch, tmp, btrn) {
505 PARA_INFO_LOG("parent(%s): %s\n", ch->name,
506 btrn->parent? btrn->parent->name : "NULL");
507 ch->parent = btrn->parent;
509 list_move(&ch->node, &btrn->parent->children);
511 assert(list_empty(&btrn->children));
515 * Return the size of the largest input queue.
517 * Iterates over all children of the given node.
519 size_t btr_bytes_pending(struct btr_node *btrn)
524 FOR_EACH_CHILD(ch, btrn) {
525 size_t size = btr_get_input_queue_size(ch);
526 max_size = PARA_MAX(max_size, size);
531 int btr_exec(struct btr_node *btrn, const char *command, char **value_result)
534 return -ERRNO_TO_PARA_ERROR(EINVAL);
536 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
537 return btrn->execute(btrn, command, value_result);
540 int btr_exec_up(struct btr_node *btrn, const char *command, char **value_result)
544 for (; btrn; btrn = btrn->parent) {
545 struct btr_node *parent = btrn->parent;
547 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
548 if (!parent->execute)
550 PARA_INFO_LOG("parent: %s, cmd: %s\n", parent->name, command);
551 ret = parent->execute(parent, command, value_result);
552 if (ret == -ERRNO_TO_PARA_ERROR(ENOTSUP))
556 if (value_result && *value_result)
557 PARA_NOTICE_LOG("%s(%s): %s\n", command, parent->name,
561 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
564 void *btr_context(struct btr_node *btrn)
566 return btrn->context;
569 static bool need_buffer_pool_merge(struct btr_node *btrn)
571 struct btr_buffer_reference *br = get_first_input_br(btrn);
575 if (br->wrap_count != 0)
582 static void merge_input_pool(struct btr_node *btrn, size_t dest_size)
584 struct btr_buffer_reference *br, *wbr;
585 int num_refs; /* including wrap buffer */
586 char *buf, *buf1, *buf2 = NULL;
587 size_t sz, sz1, sz2 = 0, wsz;
589 if (list_empty(&btrn->input_queue))
593 FOR_EACH_BUFFER_REF(br, btrn) {
595 sz = btr_get_buffer_by_reference(br, &buf);
596 if (br->wrap_count != 0) {
598 assert(num_refs == 1);
609 if (buf1 + sz1 == buf) {
618 assert(buf2 + sz2 == buf);
621 if (sz1 + sz2 >= dest_size)
626 if (!buf2) /* nothing to do */
628 /* make a new wrap buffer combining buf1 and buf 2. */
630 buf = para_malloc(sz);
631 PARA_DEBUG_LOG("merging input buffers: (%p:%zu, %p:%zu) -> %p:%zu\n",
632 buf1, sz1, buf2, sz2, buf, sz);
633 memcpy(buf, buf1, sz1);
634 memcpy(buf + sz1, buf2, sz2);
635 br = para_calloc(sizeof(*br));
636 br->btrb = new_btrb(buf, sz);
637 br->btrb->refcount = 1;
639 /* This is a wrap buffer */
640 br->wrap_count = sz1;
641 para_list_add(&br->node, &btrn->input_queue);
645 * We already have a wrap buffer, but it is too small. It might be
648 wsz = br_available_bytes(wbr);
649 if (wbr->wrap_count == sz1 && wbr->btrb->size >= sz1 + sz2) /* nothing we can do about it */
651 assert(buf1 && buf2);
652 sz = sz1 + sz2 - wbr->btrb->size; /* amount of new data */
653 wbr->btrb->size += sz;
654 PARA_DEBUG_LOG("increasing wrap buffer to %zu\n", wbr->btrb->size);
655 wbr->btrb->buf = para_realloc(wbr->btrb->buf, wbr->btrb->size);
656 /* copy the new data to the end of the reallocated buffer */
658 memcpy(wbr->btrb->buf + wbr->btrb->size - sz, buf2 + sz2 - sz, sz);
662 * Merge the first two input buffers into one.
664 * This is a quite expensive operation.
666 * \return The number of buffers that have been available (zero, one or two).
668 static int merge_input(struct btr_node *btrn)
670 struct btr_buffer_reference *brs[2], *br;
675 if (list_empty(&btrn->input_queue))
677 if (list_is_singular(&btrn->input_queue))
680 /* get references to the first two buffers */
681 FOR_EACH_BUFFER_REF(br, btrn) {
683 szs[i] = btr_get_buffer_by_reference(brs[i], bufs + i);
688 /* make a new btrb that combines the two buffers and a br to it. */
689 sz = szs[0] + szs[1];
690 buf = para_malloc(sz);
691 PARA_DEBUG_LOG("memory merging input buffers: (%zu, %zu) -> %zu\n",
693 memcpy(buf, bufs[0], szs[0]);
694 memcpy(buf + szs[0], bufs[1], szs[1]);
696 br = para_calloc(sizeof(*br));
697 br->btrb = new_btrb(buf, sz);
698 br->btrb->refcount = 1;
700 /* replace the first two refs by the new one */
701 btr_drop_buffer_reference(brs[0]);
702 btr_drop_buffer_reference(brs[1]);
703 para_list_add(&br->node, &btrn->input_queue);
707 void btr_merge(struct btr_node *btrn, size_t dest_size)
709 if (need_buffer_pool_merge(btrn))
710 return merge_input_pool(btrn, dest_size);
713 size_t len = btr_next_buffer(btrn, &buf);
714 if (len >= dest_size)
716 PARA_DEBUG_LOG("input size = %zu < %zu = dest\n", len, dest_size);
717 if (merge_input(btrn) < 2)
722 bool btr_eof(struct btr_node *btrn)
725 size_t len = btr_next_buffer(btrn, &buf);
727 return (len == 0 && btr_no_parent(btrn));
730 void log_tree_recursively(struct btr_node *btrn, int loglevel, int depth)
733 const char spaces[] = " ", *space = spaces + 16 - depth;
737 para_log(loglevel, "%s%s\n", space, btrn->name);
738 FOR_EACH_CHILD(ch, btrn)
739 log_tree_recursively(ch, loglevel, depth + 1);
742 void btr_log_tree(struct btr_node *btrn, int loglevel)
744 return log_tree_recursively(btrn, loglevel, 0);
747 /** 640K ought to be enough for everybody ;) */
748 #define BTRN_MAX_PENDING (640 * 1024)
750 int btr_node_status(struct btr_node *btrn, size_t min_iqs,
751 enum btr_node_type type)
755 if (type != BTR_NT_LEAF) {
756 if (btr_no_children(btrn))
757 return -E_BTR_NO_CHILD;
758 if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
761 if (type != BTR_NT_ROOT) {
764 iqs = btr_get_input_queue_size(btrn);
765 if (iqs == 0) /* we have a parent, because not eof */
767 if (iqs < min_iqs && !btr_no_parent(btrn))
773 void btr_get_node_start(struct btr_node *btrn, struct timeval *tv)