7 #include "buffer_tree.h"
19 enum btr_buffer_flags {
20 /* changes the way the buffer is deallocated */
27 /** The number of references to this buffer. */
29 struct btr_pool *pool;
32 struct btr_buffer_reference {
33 struct btr_buffer *btrb;
35 /* Each buffer reference belongs to the buffer queue list of some buffer tree node. */
36 struct list_head node;
42 struct btr_node *parent;
43 /* The position of this btr node in the buffer tree. */
44 struct list_head node;
45 /* The children nodes of this btr node are linked together in a list. */
46 struct list_head children;
47 /* Time of first data transfer. */
50 * The input queue is a list of references to btr buffers. Each item on
51 * the list represents an input buffer which has not been completely
52 * used by this btr node.
54 struct list_head input_queue;
55 btr_command_handler execute;
59 struct btr_pool *btr_pool_new(const char *name, size_t area_size)
61 struct btr_pool *btrp;
63 PARA_INFO_LOG("%s, %zu bytes\n", name, area_size);
64 btrp = para_malloc(sizeof(*btrp));
65 btrp->area_start = para_malloc(area_size);
66 btrp->area_end = btrp->area_start + area_size;
67 btrp->rhead = btrp->area_start;
68 btrp->whead = btrp->area_start;
69 btrp->name = para_strdup(name);
73 /* whead = NULL means area full */
75 void btr_pool_free(struct btr_pool *btrp)
79 free(btrp->area_start);
84 size_t btr_pool_size(struct btr_pool *btrp)
86 return btrp->area_end - btrp->area_start;
89 size_t btr_pool_filled(struct btr_pool *btrp)
92 return btr_pool_size(btrp);
93 if (btrp->rhead <= btrp->whead)
94 return btrp->whead - btrp->rhead;
95 return btr_pool_size(btrp) - (btrp->rhead - btrp->whead);
98 size_t btr_pool_unused(struct btr_pool *btrp)
100 return btr_pool_size(btrp) - btr_pool_filled(btrp);
104 * Return maximal size available for one read. This is
105 * smaller than the value returned by btr_pool_unused().
107 size_t btr_pool_available(struct btr_pool *btrp)
111 if (btrp->rhead <= btrp->whead)
112 return btrp->area_end - btrp->whead;
113 return btrp->rhead - btrp->whead;
116 size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result)
119 *result = btrp->whead;
120 return btr_pool_available(btrp);
123 void btr_pool_allocate(struct btr_pool *btrp, size_t size)
129 assert(size <= btr_pool_available(btrp));
130 end = btrp->whead + size;
131 assert(end <= btrp->area_end);
133 if (end == btrp->area_end) {
134 PARA_DEBUG_LOG("%s: end of pool area reached\n", btrp->name);
135 end = btrp->area_start;
137 if (end == btrp->rhead) {
138 PARA_DEBUG_LOG("%s btrp buffer full\n", btrp->name);
139 end = NULL; /* buffer full */
144 static void btr_pool_deallocate(struct btr_pool *btrp, size_t size)
146 char *end = btrp->rhead + size;
150 assert(end <= btrp->area_end);
151 assert(size <= btr_pool_filled(btrp));
152 if (end == btrp->area_end)
153 end = btrp->area_start;
155 btrp->whead = btrp->rhead;
157 if (btrp->rhead == btrp->whead)
158 btrp->rhead = btrp->whead = btrp->area_start;
161 #define FOR_EACH_CHILD(_tn, _btrn) list_for_each_entry((_tn), \
162 &((_btrn)->children), node)
163 #define FOR_EACH_CHILD_SAFE(_tn, _tmp, _btrn) \
164 list_for_each_entry_safe((_tn), (_tmp), &((_btrn)->children), node)
166 #define FOR_EACH_BUFFER_REF(_br, _btrn) \
167 list_for_each_entry((_br), &(_btrn)->input_queue, node)
168 #define FOR_EACH_BUFFER_REF_SAFE(_br, _tmp, _btrn) \
169 list_for_each_entry_safe((_br), (_tmp), &(_btrn)->input_queue, node)
173 (NULL, NULL): new, isolated node.
174 (NULL, c): new node becomes root, c must be old root
175 (p, NULL): new leaf node
176 (p, c): new internal node, ch must be child of p
179 struct btr_node *btr_new_node(struct btr_node_description *bnd)
181 struct btr_node *btrn = para_malloc(sizeof(*btrn));
183 btrn->name = para_strdup(bnd->name);
184 btrn->parent = bnd->parent;
185 btrn->execute = bnd->handler;
186 btrn->context = bnd->context;
187 btrn->start.tv_sec = 0;
188 btrn->start.tv_usec = 0;
190 list_add_tail(&btrn->node, &bnd->parent->children);
191 INIT_LIST_HEAD(&btrn->children);
192 INIT_LIST_HEAD(&btrn->input_queue);
194 PARA_INFO_LOG("added %s as child of %s\n", bnd->name, bnd->parent->name);
196 PARA_INFO_LOG("added %s as btr root\n", bnd->name);
201 * Allocate a new btr buffer.
203 * The freshly allocated buffer will have a zero refcount and will
204 * not be associated with a btr pool.
206 static struct btr_buffer *new_btrb(char *buf, size_t size)
208 struct btr_buffer *btrb = para_calloc(sizeof(*btrb));
215 static void dealloc_buffer(struct btr_buffer *btrb)
218 btr_pool_deallocate(btrb->pool, btrb->size);
223 static struct btr_buffer_reference *get_first_input_br(struct btr_node *btrn)
225 if (list_empty(&btrn->input_queue))
227 return list_first_entry(&btrn->input_queue,
228 struct btr_buffer_reference, node);
232 * Deallocate the reference, release the resources if refcount drops to zero.
234 static void btr_drop_buffer_reference(struct btr_buffer_reference *br)
236 struct btr_buffer *btrb = br->btrb;
241 if (btrb->refcount == 0) {
242 dealloc_buffer(btrb);
247 static void add_btrb_to_children(struct btr_buffer *btrb,
248 struct btr_node *btrn, size_t consumed)
252 if (btrn->start.tv_sec == 0)
254 FOR_EACH_CHILD(ch, btrn) {
255 struct btr_buffer_reference *br = para_calloc(sizeof(*br));
257 br->consumed = consumed;
258 list_add_tail(&br->node, &ch->input_queue);
260 if (ch->start.tv_sec == 0)
265 void btr_add_output(char *buf, size_t size, struct btr_node *btrn)
267 struct btr_buffer *btrb;
270 if (list_empty(&btrn->children)) {
274 btrb = new_btrb(buf, size);
275 add_btrb_to_children(btrb, btrn, 0);
278 void btr_add_output_pool(struct btr_pool *btrp, size_t size,
279 struct btr_node *btrn)
281 struct btr_buffer *btrb;
286 if (list_empty(&btrn->children))
288 avail = btr_pool_get_buffer(btrp, &buf);
289 assert(avail >= size);
290 btr_pool_allocate(btrp, size);
291 btrb = new_btrb(buf, size);
293 add_btrb_to_children(btrb, btrn, 0);
296 void btr_copy(const void *src, size_t n, struct btr_pool *btrp,
297 struct btr_node *btrn)
304 assert(n <= btr_pool_unused(btrp));
305 sz = btr_pool_get_buffer(btrp, &buf);
306 copy = PARA_MIN(sz, n);
307 memcpy(buf, src, copy);
308 btr_add_output_pool(btrp, copy, btrn);
311 sz = btr_pool_get_buffer(btrp, &buf);
312 assert(sz >= n - copy);
313 memcpy(buf, src + copy, n - copy);
314 btr_add_output_pool(btrp, n - copy, btrn);
317 static void btr_pushdown_br(struct btr_buffer_reference *br, struct btr_node *btrn)
319 add_btrb_to_children(br->btrb, btrn, br->consumed);
320 btr_drop_buffer_reference(br);
323 void btr_pushdown(struct btr_node *btrn)
325 struct btr_buffer_reference *br, *tmp;
327 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
328 btr_pushdown_br(br, btrn);
331 int btr_pushdown_one(struct btr_node *btrn)
333 struct btr_buffer_reference *br;
335 if (list_empty(&btrn->input_queue))
337 br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
338 btr_pushdown_br(br, btrn);
342 /* Return true if this node has no children. */
343 bool btr_no_children(struct btr_node *btrn)
345 return list_empty(&btrn->children);
348 bool btr_no_parent(struct btr_node *btrn)
350 return !btrn->parent;
353 bool btr_inplace_ok(struct btr_node *btrn)
357 return list_is_singular(&btrn->parent->children);
360 static inline size_t br_available_bytes(struct btr_buffer_reference *br)
362 return br->btrb->size - br->consumed;
365 size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
368 *buf = br->btrb->buf + br->consumed;
369 return br_available_bytes(br);
373 * \return zero if the input buffer queue is empty.
375 size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
377 struct btr_buffer_reference *br;
378 char *buf, *result = NULL;
381 FOR_EACH_BUFFER_REF(br, btrn) {
382 sz = btr_get_buffer_by_reference(br, &buf);
392 if (result + rv != buf)
401 void btr_consume(struct btr_node *btrn, size_t numbytes)
403 struct btr_buffer_reference *br, *tmp;
408 br = get_first_input_br(btrn);
411 if (br->wrap_count == 0) {
413 * No wrap buffer. Drop buffer references whose buffer
414 * has been fully used. */
415 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn) {
416 if (br->consumed + numbytes <= br->btrb->size) {
417 br->consumed += numbytes;
418 if (br->consumed == br->btrb->size)
419 btr_drop_buffer_reference(br);
422 numbytes -= br->btrb->size - br->consumed;
423 btr_drop_buffer_reference(br);
429 We have a wrap buffer, consume from it. If in total,
430 i.e. including previous calls to brt_consume(), less than
431 wrap_count has been consumed, there's nothing more we can do.
433 Otherwise we drop the wrap buffer and consume from subsequent
434 buffers of the input queue the correct amount of bytes. This
435 is the total number of bytes that have been consumed from the
438 PARA_DEBUG_LOG("consuming %zu/%zu bytes from wrap buffer\n", numbytes,
439 br_available_bytes(br));
441 assert(numbytes <= br_available_bytes(br));
442 if (br->consumed + numbytes < br->wrap_count) {
443 br->consumed += numbytes;
446 PARA_DEBUG_LOG("dropping wrap buffer (%zu bytes)\n", br->btrb->size);
447 /* get rid of the wrap buffer */
448 sz = br->consumed + numbytes;
449 btr_drop_buffer_reference(br);
450 return btr_consume(btrn, sz);
453 static void flush_input_queue(struct btr_node *btrn)
455 struct btr_buffer_reference *br, *tmp;
456 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
457 btr_drop_buffer_reference(br);
460 void btr_free_node(struct btr_node *btrn)
468 void btr_remove_node(struct btr_node *btrn)
474 PARA_NOTICE_LOG("removing btr node %s from buffer tree\n", btrn->name);
475 FOR_EACH_CHILD(ch, btrn)
477 flush_input_queue(btrn);
479 list_del(&btrn->node);
482 size_t btr_get_input_queue_size(struct btr_node *btrn)
484 struct btr_buffer_reference *br;
485 size_t size = 0, wrap_consumed = 0;
487 FOR_EACH_BUFFER_REF(br, btrn) {
488 if (br->wrap_count != 0) {
489 wrap_consumed = br->consumed;
492 size += br_available_bytes(br);
494 assert(wrap_consumed <= size);
495 size -= wrap_consumed;
499 void btr_splice_out_node(struct btr_node *btrn)
501 struct btr_node *ch, *tmp;
504 PARA_NOTICE_LOG("splicing out %s\n", btrn->name);
507 list_del(&btrn->node);
508 FOR_EACH_CHILD_SAFE(ch, tmp, btrn) {
509 PARA_INFO_LOG("parent(%s): %s\n", ch->name,
510 btrn->parent? btrn->parent->name : "NULL");
511 ch->parent = btrn->parent;
513 list_move(&ch->node, &btrn->parent->children);
515 assert(list_empty(&btrn->children));
519 * Return the size of the largest input queue.
521 * Iterates over all children of the given node.
523 size_t btr_bytes_pending(struct btr_node *btrn)
528 FOR_EACH_CHILD(ch, btrn) {
529 size_t size = btr_get_input_queue_size(ch);
530 max_size = PARA_MAX(max_size, size);
535 int btr_exec(struct btr_node *btrn, const char *command, char **value_result)
538 return -ERRNO_TO_PARA_ERROR(EINVAL);
540 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
541 return btrn->execute(btrn, command, value_result);
544 int btr_exec_up(struct btr_node *btrn, const char *command, char **value_result)
548 for (; btrn; btrn = btrn->parent) {
549 struct btr_node *parent = btrn->parent;
551 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
552 if (!parent->execute)
554 PARA_INFO_LOG("parent: %s, cmd: %s\n", parent->name, command);
555 ret = parent->execute(parent, command, value_result);
556 if (ret == -ERRNO_TO_PARA_ERROR(ENOTSUP))
560 if (value_result && *value_result)
561 PARA_NOTICE_LOG("%s(%s): %s\n", command, parent->name,
565 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
568 void *btr_context(struct btr_node *btrn)
570 return btrn->context;
573 static bool need_buffer_pool_merge(struct btr_node *btrn)
575 struct btr_buffer_reference *br = get_first_input_br(btrn);
579 if (br->wrap_count != 0)
586 static void merge_input_pool(struct btr_node *btrn, size_t dest_size)
588 struct btr_buffer_reference *br, *wbr = NULL;
589 int num_refs; /* including wrap buffer */
590 char *buf, *buf1 = NULL, *buf2 = NULL;
591 size_t sz, sz1 = 0, sz2 = 0, wsz;
593 br = get_first_input_br(btrn);
594 if (!br || br_available_bytes(br) >= dest_size)
597 FOR_EACH_BUFFER_REF(br, btrn) {
599 sz = btr_get_buffer_by_reference(br, &buf);
602 if (br->wrap_count != 0) {
604 assert(num_refs == 1);
615 if (buf1 + sz1 == buf) {
624 assert(buf2 + sz2 == buf);
627 if (sz1 + sz2 >= dest_size)
630 if (!buf2) /* nothing to do */
632 assert(buf1 && sz2 > 0);
634 * If the second buffer is large, we only take the first part of it to
635 * avoid having to memcpy() huge buffers.
637 sz2 = PARA_MIN(sz2, (size_t)(64 * 1024));
639 /* Make a new wrap buffer combining buf1 and buf2. */
641 buf = para_malloc(sz);
642 PARA_DEBUG_LOG("merging input buffers: (%p:%zu, %p:%zu) -> %p:%zu\n",
643 buf1, sz1, buf2, sz2, buf, sz);
644 memcpy(buf, buf1, sz1);
645 memcpy(buf + sz1, buf2, sz2);
646 br = para_calloc(sizeof(*br));
647 br->btrb = new_btrb(buf, sz);
648 br->btrb->refcount = 1;
650 /* This is a wrap buffer */
651 br->wrap_count = sz1;
652 para_list_add(&br->node, &btrn->input_queue);
655 PARA_DEBUG_LOG("increasing wrap buffer, sz1: %zu, sz2: %zu\n", sz1, sz2);
657 * We already have a wrap buffer, but it is too small. It might be
660 wsz = br_available_bytes(wbr);
661 if (wbr->wrap_count == sz1 && wbr->btrb->size >= sz1 + sz2) /* nothing we can do about it */
663 sz = sz1 + sz2 - wbr->btrb->size; /* amount of new data */
664 wbr->btrb->size += sz;
665 wbr->btrb->buf = para_realloc(wbr->btrb->buf, wbr->btrb->size);
666 /* copy the new data to the end of the reallocated buffer */
668 memcpy(wbr->btrb->buf + wbr->btrb->size - sz, buf2 + sz2 - sz, sz);
672 * Merge the first two input buffers into one.
674 * This is a quite expensive operation.
676 * \return The number of buffers that have been available (zero, one or two).
678 static int merge_input(struct btr_node *btrn)
680 struct btr_buffer_reference *brs[2], *br;
685 if (list_empty(&btrn->input_queue))
687 if (list_is_singular(&btrn->input_queue))
690 /* get references to the first two buffers */
691 FOR_EACH_BUFFER_REF(br, btrn) {
693 szs[i] = btr_get_buffer_by_reference(brs[i], bufs + i);
698 /* make a new btrb that combines the two buffers and a br to it. */
699 sz = szs[0] + szs[1];
700 buf = para_malloc(sz);
701 PARA_DEBUG_LOG("%s: memory merging input buffers: (%zu, %zu) -> %zu\n",
702 btrn->name, szs[0], szs[1], sz);
703 memcpy(buf, bufs[0], szs[0]);
704 memcpy(buf + szs[0], bufs[1], szs[1]);
706 br = para_calloc(sizeof(*br));
707 br->btrb = new_btrb(buf, sz);
708 br->btrb->refcount = 1;
710 /* replace the first two refs by the new one */
711 btr_drop_buffer_reference(brs[0]);
712 btr_drop_buffer_reference(brs[1]);
713 para_list_add(&br->node, &btrn->input_queue);
717 void btr_merge(struct btr_node *btrn, size_t dest_size)
719 if (need_buffer_pool_merge(btrn))
720 return merge_input_pool(btrn, dest_size);
723 size_t len = btr_next_buffer(btrn, &buf);
724 if (len >= dest_size)
726 PARA_DEBUG_LOG("input size = %zu < %zu = dest\n", len, dest_size);
727 if (merge_input(btrn) < 2)
732 bool btr_eof(struct btr_node *btrn)
735 size_t len = btr_next_buffer(btrn, &buf);
737 return (len == 0 && btr_no_parent(btrn));
740 void log_tree_recursively(struct btr_node *btrn, int loglevel, int depth)
743 const char spaces[] = " ", *space = spaces + 16 - depth;
747 para_log(loglevel, "%s%s\n", space, btrn->name);
748 FOR_EACH_CHILD(ch, btrn)
749 log_tree_recursively(ch, loglevel, depth + 1);
752 void btr_log_tree(struct btr_node *btrn, int loglevel)
754 return log_tree_recursively(btrn, loglevel, 0);
758 * \return \a root if \a name is \p NULL.
760 struct btr_node *btr_search_node(const char *name, struct btr_node *root)
766 if (!strcmp(root->name, name))
768 FOR_EACH_CHILD(ch, root) {
769 struct btr_node *result = btr_search_node(name, ch);
776 /** 640K ought to be enough for everybody ;) */
777 #define BTRN_MAX_PENDING (640 * 1024)
779 int btr_node_status(struct btr_node *btrn, size_t min_iqs,
780 enum btr_node_type type)
785 if (type != BTR_NT_LEAF) {
786 if (btr_no_children(btrn))
787 return -E_BTR_NO_CHILD;
788 if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
791 if (type != BTR_NT_ROOT) {
794 iqs = btr_get_input_queue_size(btrn);
795 if (iqs == 0) /* we have a parent, because not eof */
797 if (iqs < min_iqs && !btr_no_parent(btrn))
803 void btr_get_node_start(struct btr_node *btrn, struct timeval *tv)