X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=buffer_tree.c;h=962359fbfa253790880f1380551335eaf9861c6d;hp=a7a3220b4d267955ae5fe467d9e08b1c48b6a5f3;hb=11ef83c4abb2ccbdf3f99a8adf98749b2b0656c2;hpb=9facffb38faa01da0d1ba9574f9cc1c47b1deb2e diff --git a/buffer_tree.c b/buffer_tree.c index a7a3220b..962359fb 100644 --- a/buffer_tree.c +++ b/buffer_tree.c @@ -8,6 +8,7 @@ #include "error.h" #include "sched.h" +/* whead = NULL means area full */ struct btr_pool { char *name; char *area_start; @@ -56,6 +57,17 @@ struct btr_node { void *context; }; +/** + * Create a new buffer pool. + * + * \param name The name of the new buffer pool. + * + * \param area The size in bytes of the pool area. + * + * \return An opaque pointer to the newly created buffer pool. It must be + * passed to btr_pool_free() after it is no longer used to deallocate all + * resources. + */ struct btr_pool *btr_pool_new(const char *name, size_t area_size) { struct btr_pool *btrp; @@ -70,8 +82,11 @@ struct btr_pool *btr_pool_new(const char *name, size_t area_size) return btrp; } -/* whead = NULL means area full */ - +/** + * Dellocate resources used by a buffer pool. + * + * \param btrp A pointer obtained via btr_pool_new(). + */ void btr_pool_free(struct btr_pool *btrp) { if (!btrp) @@ -81,6 +96,14 @@ void btr_pool_free(struct btr_pool *btrp) free(btrp); } +/** + * Return the size of the buffer pool area. + * + * \param btrp The buffer pool. + * + * \return The same value which was passed during creation time to + * btr_pool_new(). + */ size_t btr_pool_size(struct btr_pool *btrp) { return btrp->area_end - btrp->area_start; @@ -100,6 +123,10 @@ size_t btr_pool_unused(struct btr_pool *btrp) return btr_pool_size(btrp) - btr_pool_filled(btrp); } +/* + * Return maximal size available for one read. This is + * smaller than the value returned by btr_pool_unused(). + */ size_t btr_pool_available(struct btr_pool *btrp) { if (!btrp->whead) @@ -122,7 +149,6 @@ void btr_pool_allocate(struct btr_pool *btrp, size_t size) if (size == 0) return; - //PARA_CRIT_LOG("filled: %zu, alloc %zu\n", btr_pool_filled(btrp), size); assert(size <= btr_pool_available(btrp)); end = btrp->whead + size; assert(end <= btrp->area_end); @@ -132,18 +158,16 @@ void btr_pool_allocate(struct btr_pool *btrp, size_t size) end = btrp->area_start; } if (end == btrp->rhead) { - PARA_DEBUG_LOG("btrp buffer full\n"); + PARA_DEBUG_LOG("%s btrp buffer full\n", btrp->name); end = NULL; /* buffer full */ } btrp->whead = end; - //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp)); } static void btr_pool_deallocate(struct btr_pool *btrp, size_t size) { char *end = btrp->rhead + size; - //PARA_CRIT_LOG("filled: %zu, dealloc %zu\n", btr_pool_filled(btrp), size); if (size == 0) return; assert(end <= btrp->area_end); @@ -155,7 +179,6 @@ static void btr_pool_deallocate(struct btr_pool *btrp, size_t size) btrp->rhead = end; if (btrp->rhead == btrp->whead) btrp->rhead = btrp->whead = btrp->area_start; - //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp)); } #define FOR_EACH_CHILD(_tn, _btrn) list_for_each_entry((_tn), \ @@ -168,25 +191,41 @@ static void btr_pool_deallocate(struct btr_pool *btrp, size_t size) #define FOR_EACH_BUFFER_REF_SAFE(_br, _tmp, _btrn) \ list_for_each_entry_safe((_br), (_tmp), &(_btrn)->input_queue, node) -struct btr_node *btr_new_node(const char *name, struct btr_node *parent, - btr_command_handler handler, void *context) +struct btr_node *btr_new_node(struct btr_node_description *bnd) { struct btr_node *btrn = para_malloc(sizeof(*btrn)); - btrn->name = para_strdup(name); - btrn->parent = parent; - btrn->execute = handler; - btrn->context = context; + btrn->name = para_strdup(bnd->name); + btrn->parent = bnd->parent; + btrn->execute = bnd->handler; + btrn->context = bnd->context; btrn->start.tv_sec = 0; btrn->start.tv_usec = 0; - if (parent) - list_add_tail(&btrn->node, &parent->children); INIT_LIST_HEAD(&btrn->children); INIT_LIST_HEAD(&btrn->input_queue); - if (parent) - PARA_INFO_LOG("added %s as child of %s\n", name, parent->name); - else - PARA_INFO_LOG("added %s as btr root\n", name); + if (!bnd->child) { + if (bnd->parent) { + list_add_tail(&btrn->node, &bnd->parent->children); + PARA_INFO_LOG("new leaf node: %s (child of %s)\n", + bnd->name, bnd->parent->name); + } else + PARA_INFO_LOG("added %s as btr root\n", bnd->name); + goto out; + } + if (!bnd->parent) { + assert(!bnd->child->parent); + PARA_INFO_LOG("new root: %s (was %s)\n", + bnd->name, bnd->child->name); + btrn->parent = NULL; + list_add_tail(&bnd->child->node, &btrn->children); + /* link it in */ + bnd->child->parent = btrn; + goto out; + } + PARA_EMERG_LOG("inserting internal nodes not yet supported.\n"); + exit(EXIT_FAILURE); + assert(bnd->child->parent == bnd->parent); +out: return btrn; } @@ -228,7 +267,6 @@ static void btr_drop_buffer_reference(struct btr_buffer_reference *br) { struct btr_buffer *btrb = br->btrb; - //PARA_CRIT_LOG("dropping buffer reference %p\n", br); list_del(&br->node); free(br); btrb->refcount--; @@ -383,15 +421,8 @@ size_t btr_next_buffer(struct btr_node *btrn, char **bufp) } if (!br->btrb->pool) break; - if (result + rv != buf) { - PARA_DEBUG_LOG("%s: pool merge impossible: %p != %p\n", - btrn->name, result + rv, buf); + if (result + rv != buf) break; - } -// PARA_CRIT_LOG("%s: inplace merge (%zu, %zu)->%zu\n", btrn->name, -// rv, sz, rv + sz); -// PARA_CRIT_LOG("%s: inplace merge %p (%zu)\n", btrn->name, -// result, sz); rv += sz; } if (bufp) @@ -409,7 +440,6 @@ void btr_consume(struct btr_node *btrn, size_t numbytes) br = get_first_input_br(btrn); assert(br); - //PARA_CRIT_LOG("wrap count: %zu\n", br->wrap_count); if (br->wrap_count == 0) { /* * No wrap buffer. Drop buffer references whose buffer @@ -484,12 +514,17 @@ void btr_remove_node(struct btr_node *btrn) size_t btr_get_input_queue_size(struct btr_node *btrn) { struct btr_buffer_reference *br; - size_t size = 0; + size_t size = 0, wrap_consumed = 0; FOR_EACH_BUFFER_REF(br, btrn) { - //PARA_CRIT_LOG("size: %zu\n", size); + if (br->wrap_count != 0) { + wrap_consumed = br->consumed; + continue; + } size += br_available_bytes(br); } + assert(wrap_consumed <= size); + size -= wrap_consumed; return size; } @@ -582,18 +617,20 @@ static bool need_buffer_pool_merge(struct btr_node *btrn) static void merge_input_pool(struct btr_node *btrn, size_t dest_size) { - struct btr_buffer_reference *br, *wbr; + struct btr_buffer_reference *br, *wbr = NULL; int num_refs; /* including wrap buffer */ - char *buf, *buf1, *buf2 = NULL; - size_t sz, sz1, sz2 = 0, wsz; + char *buf, *buf1 = NULL, *buf2 = NULL; + size_t sz, sz1 = 0, sz2 = 0, wsz; - if (list_empty(&btrn->input_queue)) + br = get_first_input_br(btrn); + if (!br || br_available_bytes(br) >= dest_size) return; - num_refs = 0; FOR_EACH_BUFFER_REF(br, btrn) { num_refs++; sz = btr_get_buffer_by_reference(br, &buf); + if (sz == 0) + break; if (br->wrap_count != 0) { assert(!wbr); assert(num_refs == 1); @@ -622,11 +659,16 @@ next: if (sz1 + sz2 >= dest_size) break; } + if (!buf2) /* nothing to do */ + return; + assert(buf1 && sz2 > 0); + /* + * If the second buffer is large, we only take the first part of it to + * avoid having to memcpy() huge buffers. + */ + sz2 = PARA_MIN(sz2, (size_t)(64 * 1024)); if (!wbr) { - assert(buf1); - if (!buf2) /* nothing to do */ - return; - /* make a new wrap buffer combining buf1 and buf 2. */ + /* Make a new wrap buffer combining buf1 and buf2. */ sz = sz1 + sz2; buf = para_malloc(sz); PARA_DEBUG_LOG("merging input buffers: (%p:%zu, %p:%zu) -> %p:%zu\n", @@ -642,6 +684,7 @@ next: para_list_add(&br->node, &btrn->input_queue); return; } + PARA_DEBUG_LOG("increasing wrap buffer, sz1: %zu, sz2: %zu\n", sz1, sz2); /* * We already have a wrap buffer, but it is too small. It might be * partially used. @@ -649,10 +692,8 @@ next: wsz = br_available_bytes(wbr); if (wbr->wrap_count == sz1 && wbr->btrb->size >= sz1 + sz2) /* nothing we can do about it */ return; - assert(buf1 && buf2); sz = sz1 + sz2 - wbr->btrb->size; /* amount of new data */ wbr->btrb->size += sz; - PARA_DEBUG_LOG("increasing wrap buffer to %zu\n", wbr->btrb->size); wbr->btrb->buf = para_realloc(wbr->btrb->buf, wbr->btrb->size); /* copy the new data to the end of the reallocated buffer */ assert(sz2 >= sz); @@ -745,6 +786,25 @@ void btr_log_tree(struct btr_node *btrn, int loglevel) return log_tree_recursively(btrn, loglevel, 0); } +/* + * \return \a root if \a name is \p NULL. + */ +struct btr_node *btr_search_node(const char *name, struct btr_node *root) +{ + struct btr_node *ch; + + if (!name) + return root; + if (!strcmp(root->name, name)) + return root; + FOR_EACH_CHILD(ch, root) { + struct btr_node *result = btr_search_node(name, ch); + if (result) + return result; + } + return NULL; +} + /** 640K ought to be enough for everybody ;) */ #define BTRN_MAX_PENDING (640 * 1024) @@ -753,8 +813,7 @@ int btr_node_status(struct btr_node *btrn, size_t min_iqs, { size_t iqs; - if (!btrn) - return 0; + assert(btrn); if (type != BTR_NT_LEAF) { if (btr_no_children(btrn)) return -E_BTR_NO_CHILD;