Add btr pool support to the fecdec filter.
[paraslash.git] / buffer_tree.c
index 1be2037f5a7f3b9a2eb231debebdcf0be0ecedba..80c0425b9931d8fcf29d28cfaf14e5647cd43674 100644 (file)
@@ -8,12 +8,25 @@
 #include "error.h"
 #include "sched.h"
 
+struct btr_pool {
+       char *name;
+       char *area_start;
+       char *area_end;
+       char *rhead;
+       char *whead;
+};
+
+enum btr_buffer_flags {
+       /* changes the way the buffer is deallocated */
+       BTR_BF_BTR_POOL = 1,
+};
 
 struct btr_buffer {
        char *buf;
        size_t size;
        /** The number of references to this buffer. */
        int refcount;
+       struct btr_pool *pool;
 };
 
 struct btr_buffer_reference {
@@ -21,6 +34,7 @@ struct btr_buffer_reference {
        size_t consumed;
        /* Each buffer reference belongs to the buffer queue list of some buffer tree node. */
        struct list_head node;
+       size_t wrap_count;
 };
 
 struct btr_node {
@@ -42,6 +56,108 @@ struct btr_node {
        void *context;
 };
 
+struct btr_pool *btr_pool_new(const char *name, size_t area_size)
+{
+       struct btr_pool *btrp;
+
+       PARA_INFO_LOG("%s, %zu bytes\n", name, area_size);
+       btrp = para_malloc(sizeof(*btrp));
+       btrp->area_start = para_malloc(area_size);
+       btrp->area_end = btrp->area_start + area_size;
+       btrp->rhead = btrp->area_start;
+       btrp->whead = btrp->area_start;
+       btrp->name = para_strdup(name);
+       return btrp;
+}
+
+/* whead = NULL means area full */
+
+void btr_pool_free(struct btr_pool *btrp)
+{
+       if (!btrp)
+               return;
+       free(btrp->area_start);
+       free(btrp->name);
+       free(btrp);
+}
+
+size_t btr_pool_size(struct btr_pool *btrp)
+{
+       return btrp->area_end - btrp->area_start;
+}
+
+size_t btr_pool_filled(struct btr_pool *btrp)
+{
+       if (!btrp->whead)
+               return btr_pool_size(btrp);
+       if (btrp->rhead <= btrp->whead)
+               return  btrp->whead - btrp->rhead;
+       return btr_pool_size(btrp) - (btrp->rhead - btrp->whead);
+}
+
+size_t btr_pool_unused(struct btr_pool *btrp)
+{
+       return btr_pool_size(btrp) - btr_pool_filled(btrp);
+}
+
+size_t btr_pool_available(struct btr_pool *btrp)
+{
+       if (!btrp->whead)
+               return 0;
+       if (btrp->rhead <= btrp->whead)
+               return btrp->area_end - btrp->whead;
+       return btrp->rhead - btrp->whead;
+}
+
+size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result)
+{
+       if (result)
+               *result = btrp->whead;
+       return btr_pool_available(btrp);
+}
+
+void btr_pool_allocate(struct btr_pool *btrp, size_t size)
+{
+       char *end;
+
+       if (size == 0)
+               return;
+       //PARA_CRIT_LOG("filled: %zu, alloc %zu\n", btr_pool_filled(btrp), size);
+       assert(size <= btr_pool_available(btrp));
+       end = btrp->whead + size;
+       assert(end <= btrp->area_end);
+
+       if (end == btrp->area_end) {
+               PARA_DEBUG_LOG("end of pool area reached: %p\n", end);
+               end = btrp->area_start;
+       }
+       if (end == btrp->rhead) {
+               PARA_DEBUG_LOG("btrp buffer full\n");
+               end = NULL; /* buffer full */
+       }
+       btrp->whead = end;
+       //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
+}
+
+static void btr_pool_deallocate(struct btr_pool *btrp, size_t size)
+{
+       char *end = btrp->rhead + size;
+
+       //PARA_CRIT_LOG("filled: %zu, dealloc %zu\n", btr_pool_filled(btrp), size);
+       if (size == 0)
+               return;
+       assert(end <= btrp->area_end);
+       assert(size <= btr_pool_filled(btrp));
+       if (end == btrp->area_end)
+               end = btrp->area_start;
+       if (!btrp->whead)
+               btrp->whead = btrp->rhead;
+       btrp->rhead = end;
+       if (btrp->rhead == btrp->whead)
+               btrp->rhead = btrp->whead = btrp->area_start;
+       //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
+}
+
 #define FOR_EACH_CHILD(_tn, _btrn) list_for_each_entry((_tn), \
        &((_btrn)->children), node)
 #define FOR_EACH_CHILD_SAFE(_tn, _tmp, _btrn) \
@@ -81,14 +197,29 @@ struct btr_node *btr_new_node(const char *name, struct btr_node *parent,
  */
 static struct btr_buffer *new_btrb(char *buf, size_t size)
 {
-       struct btr_buffer *btrb = para_malloc(sizeof(*btrb));
+       struct btr_buffer *btrb = para_calloc(sizeof(*btrb));
 
        btrb->buf = buf;
        btrb->size = size;
-       btrb->refcount = 0;
        return btrb;
 }
 
+static void dealloc_buffer(struct btr_buffer *btrb)
+{
+       if (btrb->pool)
+               btr_pool_deallocate(btrb->pool, btrb->size);
+       else
+               free(btrb->buf);
+}
+
+static struct btr_buffer_reference *get_first_input_br(struct btr_node *btrn)
+{
+       if (list_empty(&btrn->input_queue))
+               return NULL;
+       return list_first_entry(&btrn->input_queue,
+               struct btr_buffer_reference, node);
+}
+
 /*
  * Deallocate the reference, release the resources if refcount drops to zero.
  */
@@ -101,7 +232,7 @@ static void btr_drop_buffer_reference(struct btr_buffer_reference *br)
        free(br);
        btrb->refcount--;
        if (btrb->refcount == 0) {
-               free(btrb->buf);
+               dealloc_buffer(btrb);
                free(btrb);
        }
 }
@@ -114,7 +245,7 @@ static void add_btrb_to_children(struct btr_buffer *btrb,
        if (btrn->start.tv_sec == 0)
                btrn->start = *now;
        FOR_EACH_CHILD(ch, btrn) {
-               struct btr_buffer_reference *br = para_malloc(sizeof(*br));
+               struct btr_buffer_reference *br = para_calloc(sizeof(*br));
                br->btrb = btrb;
                br->consumed = consumed;
                list_add_tail(&br->node, &ch->input_queue);
@@ -137,6 +268,45 @@ void btr_add_output(char *buf, size_t size, struct btr_node *btrn)
        add_btrb_to_children(btrb, btrn, 0);
 }
 
+void btr_add_output_pool(struct btr_pool *btrp, size_t size,
+               struct btr_node *btrn)
+{
+       struct btr_buffer *btrb;
+       char *buf;
+       size_t avail;
+
+       assert(size != 0);
+       if (list_empty(&btrn->children))
+               return;
+       avail = btr_pool_get_buffer(btrp, &buf);
+       assert(avail >= size);
+       btr_pool_allocate(btrp, size);
+       btrb = new_btrb(buf, size);
+       btrb->pool = btrp;
+       add_btrb_to_children(btrb, btrn, 0);
+}
+
+void btr_copy(const void *src, size_t n, struct btr_pool *btrp,
+       struct btr_node *btrn)
+{
+       char *buf;
+       size_t sz, copy;
+
+       if (n == 0)
+               return;
+       assert(n <= btr_pool_unused(btrp));
+       sz = btr_pool_get_buffer(btrp, &buf);
+       copy = PARA_MIN(sz, n);
+       memcpy(buf, src, copy);
+       btr_add_output_pool(btrp, copy, btrn);
+       if (copy == n)
+               return;
+       sz = btr_pool_get_buffer(btrp, &buf);
+       assert(sz >= n - copy);
+       memcpy(buf, src + copy, n - copy);
+       btr_add_output_pool(btrp, n - copy, btrn);
+}
+
 static void btr_pushdown_br(struct btr_buffer_reference *br, struct btr_node *btrn)
 {
        add_btrb_to_children(br->btrb, btrn, br->consumed);
@@ -187,7 +357,8 @@ static inline size_t br_available_bytes(struct btr_buffer_reference *br)
 
 size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
 {
-       *buf = br->btrb->buf + br->consumed;
+       if (buf)
+               *buf = br->btrb->buf + br->consumed;
        return br_available_bytes(br);
 }
 
@@ -197,25 +368,87 @@ size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
 size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
 {
        struct btr_buffer_reference *br;
+       char *buf, *result = NULL;
+       size_t sz, rv = 0;
 
-       if (list_empty(&btrn->input_queue)) {
-               *bufp = NULL;
-               return 0;
+       FOR_EACH_BUFFER_REF(br, btrn) {
+               sz = btr_get_buffer_by_reference(br, &buf);
+               if (!result) {
+                       result = buf;
+                       rv = sz;
+                       if (!br->btrb->pool)
+                               break;
+                       continue;
+               }
+               if (!br->btrb->pool)
+                       break;
+               if (result + rv != buf) {
+                       PARA_DEBUG_LOG("%s: pool merge impossible: %p != %p\n",
+                               btrn->name, result + rv, buf);
+                       break;
+               }
+//             PARA_CRIT_LOG("%s: inplace merge (%zu, %zu)->%zu\n", btrn->name,
+//                     rv, sz, rv + sz);
+//             PARA_CRIT_LOG("%s: inplace merge %p (%zu)\n", btrn->name,
+//                     result, sz);
+               rv += sz;
        }
-       br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
-       return btr_get_buffer_by_reference(br, bufp);
+       if (bufp)
+               *bufp = result;
+       return rv;
 }
 
 void btr_consume(struct btr_node *btrn, size_t numbytes)
 {
-       struct btr_buffer_reference *br;
+       struct btr_buffer_reference *br, *tmp;
+       size_t sz;
 
-       assert(!list_empty(&btrn->input_queue));
-       br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
-       assert(br->consumed + numbytes <= br->btrb->size);
-       br->consumed += numbytes;
-       if (br->consumed == br->btrb->size)
-               btr_drop_buffer_reference(br);
+       if (numbytes == 0)
+               return;
+       br = get_first_input_br(btrn);
+       assert(br);
+
+       //PARA_CRIT_LOG("wrap count: %zu\n", br->wrap_count);
+       if (br->wrap_count == 0) {
+               /*
+                * No wrap buffer. Drop buffer references whose buffer
+                * has been fully used. */
+               FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn) {
+                       if (br->consumed + numbytes <= br->btrb->size) {
+                               br->consumed += numbytes;
+                               if (br->consumed == br->btrb->size)
+                                       btr_drop_buffer_reference(br);
+                               return;
+                       }
+                       numbytes -= br->btrb->size - br->consumed;
+                       btr_drop_buffer_reference(br);
+               }
+               assert(true);
+       }
+       /*
+
+       We have a wrap buffer, consume from it. If in total,
+       i.e. including previous calls to brt_consume(), less than
+       wrap_count has been consumed, there's nothing more we can do.
+
+       Otherwise we drop the wrap buffer and consume from subsequent
+       buffers of the input queue the correct amount of bytes. This
+       is the total number of bytes that have been consumed from the
+       wrap buffer.
+*/
+       PARA_DEBUG_LOG("consuming %zu/%zu bytes from wrap buffer\n", numbytes,
+               br_available_bytes(br));
+
+       assert(numbytes <= br_available_bytes(br));
+       if (br->consumed + numbytes < br->wrap_count) {
+               br->consumed += numbytes;
+               return;
+       }
+       PARA_DEBUG_LOG("dropping wrap buffer (%zu bytes)\n", br->btrb->size);
+       /* get rid of the wrap buffer */
+       sz = br->consumed + numbytes;
+       btr_drop_buffer_reference(br);
+       return btr_consume(btrn, sz);
 }
 
 static void flush_input_queue(struct btr_node *btrn)
@@ -333,6 +566,98 @@ void *btr_context(struct btr_node *btrn)
        return btrn->context;
 }
 
+static bool need_buffer_pool_merge(struct btr_node *btrn)
+{
+       struct btr_buffer_reference *br = get_first_input_br(btrn);
+
+       if (!br)
+               return false;
+       if (br->wrap_count != 0)
+               return true;
+       if (br->btrb->pool)
+               return true;
+       return false;
+}
+
+static void merge_input_pool(struct btr_node *btrn, size_t dest_size)
+{
+       struct btr_buffer_reference *br, *wbr;
+       int num_refs; /* including wrap buffer */
+       char *buf, *buf1, *buf2 = NULL;
+       size_t sz, sz1, sz2 = 0, wsz;
+
+       if (list_empty(&btrn->input_queue))
+               return;
+
+       num_refs = 0;
+       FOR_EACH_BUFFER_REF(br, btrn) {
+               num_refs++;
+               sz = btr_get_buffer_by_reference(br, &buf);
+               if (br->wrap_count != 0) {
+                       assert(!wbr);
+                       assert(num_refs == 1);
+                       wbr = br;
+                       if (sz >= dest_size)
+                               return;
+                       continue;
+               }
+               if (!buf1) {
+                       buf1 = buf;
+                       sz1 = sz;
+                       goto next;
+               }
+               if (buf1 + sz1 == buf) {
+                       sz1 += sz;
+                       goto next;
+               }
+               if (!buf2) {
+                       buf2 = buf;
+                       sz2 = sz;
+                       goto next;
+               }
+               assert(buf2 + sz2 == buf);
+               sz2 += sz;
+next:
+               if (sz1 + sz2 >= dest_size)
+                       break;
+       }
+       if (!wbr) {
+               assert(buf1);
+               if (!buf2) /* nothing to do */
+                       return;
+               /* make a new wrap buffer combining buf1 and buf 2. */
+               sz = sz1 + sz2;
+               buf = para_malloc(sz);
+               PARA_DEBUG_LOG("merging input buffers: (%p:%zu, %p:%zu) -> %p:%zu\n",
+                       buf1, sz1, buf2, sz2, buf, sz);
+               memcpy(buf, buf1, sz1);
+               memcpy(buf + sz1, buf2, sz2);
+               br = para_calloc(sizeof(*br));
+               br->btrb = new_btrb(buf, sz);
+               br->btrb->refcount = 1;
+               br->consumed = 0;
+               /* This is a wrap buffer */
+               br->wrap_count = sz1;
+               para_list_add(&br->node, &btrn->input_queue);
+               return;
+       }
+       /*
+        * We already have a wrap buffer, but it is too small. It might be
+        * partially used.
+        */
+       wsz = br_available_bytes(wbr);
+       if (wbr->wrap_count == sz1 && wbr->btrb->size >= sz1 + sz2) /* nothing we can do about it */
+               return;
+       assert(buf1 && buf2);
+       sz = sz1 + sz2 - wbr->btrb->size; /* amount of new data */
+       wbr->btrb->size += sz;
+       PARA_DEBUG_LOG("increasing wrap buffer to %zu\n", wbr->btrb->size);
+       wbr->btrb->buf = para_realloc(wbr->btrb->buf, wbr->btrb->size);
+       /* copy the new data to the end of the reallocated buffer */
+       assert(sz2 >= sz);
+       memcpy(wbr->btrb->buf + wbr->btrb->size - sz, buf2 + sz2 - sz, sz);
+}
+
 /**
  * Merge the first two input buffers into one.
  *
@@ -362,17 +687,15 @@ static int merge_input(struct btr_node *btrn)
        }
        /* make a new btrb that combines the two buffers and a br to it. */
        sz = szs[0] + szs[1];
-       PARA_DEBUG_LOG("merging input buffers: (%zu, %zu) -> %zu\n",
-               szs[0], szs[1], sz);
        buf = para_malloc(sz);
-       /* TODO: Avoid this memcopy by introducing btr buffer pool. */
+       PARA_DEBUG_LOG("memory merging input buffers: (%zu, %zu) -> %zu\n",
+               szs[0], szs[1], sz);
        memcpy(buf, bufs[0], szs[0]);
        memcpy(buf + szs[0], bufs[1], szs[1]);
 
-       br = para_malloc(sizeof(*br));
+       br = para_calloc(sizeof(*br));
        br->btrb = new_btrb(buf, sz);
        br->btrb->refcount = 1;
-       br->consumed = 0;
 
        /* replace the first two refs by the new one */
        btr_drop_buffer_reference(brs[0]);
@@ -383,6 +706,8 @@ static int merge_input(struct btr_node *btrn)
 
 void btr_merge(struct btr_node *btrn, size_t dest_size)
 {
+       if (need_buffer_pool_merge(btrn))
+               return merge_input_pool(btrn, dest_size);
        for (;;) {
                char *buf;
                size_t len = btr_next_buffer(btrn, &buf);