Implement buffer tree pool support.
authorAndre Noll <maan@systemlinux.org>
Sat, 9 Jan 2010 01:28:15 +0000 (02:28 +0100)
committerAndre Noll <maan@systemlinux.org>
Sat, 9 Jan 2010 01:28:15 +0000 (02:28 +0100)
Using a buffer pool avoids quite some uses of memcpy() and memmove().
This patch also switches over the http receiver to use buffer pools.

buffer_tree.c
buffer_tree.h
http_recv.c

index 1be2037..dbdcb58 100644 (file)
@@ -8,12 +8,24 @@
 #include "error.h"
 #include "sched.h"
 
+struct btr_pool {
+       char *area_start;
+       char *area_end;
+       char *rhead;
+       char *whead;
+};
+
+enum btr_buffer_flags {
+       /* changes the way the buffer is deallocated */
+       BTR_BF_BTR_POOL = 1,
+};
 
 struct btr_buffer {
        char *buf;
        size_t size;
        /** The number of references to this buffer. */
        int refcount;
+       struct btr_pool *pool;
 };
 
 struct btr_buffer_reference {
@@ -21,6 +33,7 @@ struct btr_buffer_reference {
        size_t consumed;
        /* Each buffer reference belongs to the buffer queue list of some buffer tree node. */
        struct list_head node;
+       size_t wrap_count;
 };
 
 struct btr_node {
@@ -42,6 +55,104 @@ struct btr_node {
        void *context;
 };
 
+struct btr_pool *btr_pool_new(size_t area_size)
+{
+       struct btr_pool *btrp = para_malloc(sizeof(*btrp));
+
+       btrp->area_start = para_malloc(area_size);
+       btrp->area_end = btrp->area_start + area_size;
+       btrp->rhead = btrp->area_start;
+       btrp->whead = btrp->area_start;
+       return btrp;
+}
+
+/* whead = NULL means area full */
+
+void btr_pool_free(struct btr_pool *btrp)
+{
+       if (!btrp)
+               return;
+       free(btrp->area_start);
+       free(btrp);
+}
+
+size_t btr_pool_size(struct btr_pool *btrp)
+{
+       return btrp->area_end - btrp->area_start;
+}
+
+size_t btr_pool_filled(struct btr_pool *btrp)
+{
+       if (!btrp->whead)
+               return btr_pool_size(btrp);
+       if (btrp->rhead <= btrp->whead)
+               return  btrp->whead - btrp->rhead;
+       return btr_pool_size(btrp) - (btrp->rhead - btrp->whead);
+}
+
+size_t btr_pool_unused(struct btr_pool *btrp)
+{
+       return btr_pool_size(btrp) - btr_pool_filled(btrp);
+}
+
+size_t btr_pool_available(struct btr_pool *btrp)
+{
+       if (!btrp->whead)
+               return 0;
+       if (btrp->rhead <= btrp->whead)
+               return btrp->area_end - btrp->whead;
+       return btrp->rhead - btrp->whead;
+}
+
+size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result)
+{
+       if (result)
+               *result = btrp->whead;
+       return btr_pool_available(btrp);
+}
+
+void btr_pool_allocate(struct btr_pool *btrp, size_t size)
+{
+       char *end;
+
+       if (size == 0)
+               return;
+       //PARA_CRIT_LOG("filled: %zu, alloc %zu\n", btr_pool_filled(btrp), size);
+       assert(size <= btr_pool_available(btrp));
+       end = btrp->whead + size;
+       assert(end <= btrp->area_end);
+
+       if (end == btrp->area_end) {
+               PARA_DEBUG_LOG("end of pool area reached: %p\n", end);
+               end = btrp->area_start;
+       }
+       if (end == btrp->rhead) {
+               PARA_DEBUG_LOG("btrp buffer full\n");
+               end = NULL; /* buffer full */
+       }
+       btrp->whead = end;
+       //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
+}
+
+static void btr_pool_deallocate(struct btr_pool *btrp, size_t size)
+{
+       char *end = btrp->rhead + size;
+
+       //PARA_CRIT_LOG("filled: %zu, dealloc %zu\n", btr_pool_filled(btrp), size);
+       if (size == 0)
+               return;
+       assert(end <= btrp->area_end);
+       assert(size <= btr_pool_filled(btrp));
+       if (end == btrp->area_end)
+               end = btrp->area_start;
+       if (!btrp->whead)
+               btrp->whead = btrp->rhead;
+       btrp->rhead = end;
+       if (btrp->rhead == btrp->whead)
+               btrp->rhead = btrp->whead = btrp->area_start;
+       //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
+}
+
 #define FOR_EACH_CHILD(_tn, _btrn) list_for_each_entry((_tn), \
        &((_btrn)->children), node)
 #define FOR_EACH_CHILD_SAFE(_tn, _tmp, _btrn) \
@@ -81,14 +192,29 @@ struct btr_node *btr_new_node(const char *name, struct btr_node *parent,
  */
 static struct btr_buffer *new_btrb(char *buf, size_t size)
 {
-       struct btr_buffer *btrb = para_malloc(sizeof(*btrb));
+       struct btr_buffer *btrb = para_calloc(sizeof(*btrb));
 
        btrb->buf = buf;
        btrb->size = size;
-       btrb->refcount = 0;
        return btrb;
 }
 
+static void dealloc_buffer(struct btr_buffer *btrb)
+{
+       if (btrb->pool)
+               btr_pool_deallocate(btrb->pool, btrb->size);
+       else
+               free(btrb->buf);
+}
+
+static struct btr_buffer_reference *get_first_input_br(struct btr_node *btrn)
+{
+       if (list_empty(&btrn->input_queue))
+               return NULL;
+       return list_first_entry(&btrn->input_queue,
+               struct btr_buffer_reference, node);
+}
+
 /*
  * Deallocate the reference, release the resources if refcount drops to zero.
  */
@@ -101,7 +227,7 @@ static void btr_drop_buffer_reference(struct btr_buffer_reference *br)
        free(br);
        btrb->refcount--;
        if (btrb->refcount == 0) {
-               free(btrb->buf);
+               dealloc_buffer(btrb);
                free(btrb);
        }
 }
@@ -114,7 +240,7 @@ static void add_btrb_to_children(struct btr_buffer *btrb,
        if (btrn->start.tv_sec == 0)
                btrn->start = *now;
        FOR_EACH_CHILD(ch, btrn) {
-               struct btr_buffer_reference *br = para_malloc(sizeof(*br));
+               struct btr_buffer_reference *br = para_calloc(sizeof(*br));
                br->btrb = btrb;
                br->consumed = consumed;
                list_add_tail(&br->node, &ch->input_queue);
@@ -137,6 +263,21 @@ void btr_add_output(char *buf, size_t size, struct btr_node *btrn)
        add_btrb_to_children(btrb, btrn, 0);
 }
 
+void btr_add_output_pool(struct btr_pool *btrp, char *buf, size_t size,
+       struct btr_node *btrn)
+{
+       struct btr_buffer *btrb;
+
+       assert(size != 0);
+       if (list_empty(&btrn->children)) {
+               btr_pool_deallocate(btrp, size);
+               return;
+       }
+       btrb = new_btrb(buf, size);
+       btrb->pool = btrp;
+       add_btrb_to_children(btrb, btrn, 0);
+}
+
 static void btr_pushdown_br(struct btr_buffer_reference *br, struct btr_node *btrn)
 {
        add_btrb_to_children(br->btrb, btrn, br->consumed);
@@ -187,7 +328,8 @@ static inline size_t br_available_bytes(struct btr_buffer_reference *br)
 
 size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
 {
-       *buf = br->btrb->buf + br->consumed;
+       if (buf)
+               *buf = br->btrb->buf + br->consumed;
        return br_available_bytes(br);
 }
 
@@ -197,25 +339,87 @@ size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
 size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
 {
        struct btr_buffer_reference *br;
+       char *buf, *result = NULL;
+       size_t sz, rv = 0;
 
-       if (list_empty(&btrn->input_queue)) {
-               *bufp = NULL;
-               return 0;
+       FOR_EACH_BUFFER_REF(br, btrn) {
+               sz = btr_get_buffer_by_reference(br, &buf);
+               if (!result) {
+                       result = buf;
+                       rv = sz;
+                       if (!br->btrb->pool)
+                               break;
+                       continue;
+               }
+               if (!br->btrb->pool)
+                       break;
+               if (result + rv != buf) {
+                       PARA_DEBUG_LOG("%s: pool merge impossible: %p != %p\n",
+                               btrn->name, result + rv, buf);
+                       break;
+               }
+//             PARA_CRIT_LOG("%s: inplace merge (%zu, %zu)->%zu\n", btrn->name,
+//                     rv, sz, rv + sz);
+//             PARA_CRIT_LOG("%s: inplace merge %p (%zu)\n", btrn->name,
+//                     result, sz);
+               rv += sz;
        }
-       br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
-       return btr_get_buffer_by_reference(br, bufp);
+       if (bufp)
+               *bufp = result;
+       return rv;
 }
 
 void btr_consume(struct btr_node *btrn, size_t numbytes)
 {
-       struct btr_buffer_reference *br;
+       struct btr_buffer_reference *br, *tmp;
+       size_t sz;
 
-       assert(!list_empty(&btrn->input_queue));
-       br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
-       assert(br->consumed + numbytes <= br->btrb->size);
-       br->consumed += numbytes;
-       if (br->consumed == br->btrb->size)
-               btr_drop_buffer_reference(br);
+       if (numbytes == 0)
+               return;
+       br = get_first_input_br(btrn);
+       assert(br);
+
+       //PARA_CRIT_LOG("wrap count: %zu\n", br->wrap_count);
+       if (br->wrap_count == 0) {
+               /*
+                * No wrap buffer. Drop buffer references whose buffer
+                * has been fully used. */
+               FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn) {
+                       if (br->consumed + numbytes <= br->btrb->size) {
+                               br->consumed += numbytes;
+                               if (br->consumed == br->btrb->size)
+                                       btr_drop_buffer_reference(br);
+                               return;
+                       }
+                       numbytes -= br->btrb->size - br->consumed;
+                       btr_drop_buffer_reference(br);
+               }
+               assert(true);
+       }
+       /*
+
+       We have a wrap buffer, consume from it. If in total,
+       i.e. including previous calls to brt_consume(), less than
+       wrap_count has been consumed, there's nothing more we can do.
+
+       Otherwise we drop the wrap buffer and consume from subsequent
+       buffers of the input queue the correct amount of bytes. This
+       is the total number of bytes that have been consumed from the
+       wrap buffer.
+*/
+       PARA_DEBUG_LOG("consuming %zu/%zu bytes from wrap buffer\n", numbytes,
+               br_available_bytes(br));
+
+       assert(numbytes <= br_available_bytes(br));
+       if (br->consumed + numbytes < br->wrap_count) {
+               br->consumed += numbytes;
+               return;
+       }
+       PARA_DEBUG_LOG("dropping wrap buffer (%zu bytes)\n", br->btrb->size);
+       /* get rid of the wrap buffer */
+       sz = br->consumed + numbytes;
+       btr_drop_buffer_reference(br);
+       return btr_consume(btrn, sz);
 }
 
 static void flush_input_queue(struct btr_node *btrn)
@@ -333,6 +537,98 @@ void *btr_context(struct btr_node *btrn)
        return btrn->context;
 }
 
+static bool need_buffer_pool_merge(struct btr_node *btrn)
+{
+       struct btr_buffer_reference *br = get_first_input_br(btrn);
+
+       if (!br)
+               return false;
+       if (br->wrap_count != 0)
+               return true;
+       if (br->btrb->pool)
+               return true;
+       return false;
+}
+
+static void merge_input_pool(struct btr_node *btrn, size_t dest_size)
+{
+       struct btr_buffer_reference *br, *wbr;
+       int num_refs; /* including wrap buffer */
+       char *buf, *buf1, *buf2 = NULL;
+       size_t sz, sz1, sz2 = 0, wsz;
+
+       if (list_empty(&btrn->input_queue))
+               return;
+
+       num_refs = 0;
+       FOR_EACH_BUFFER_REF(br, btrn) {
+               num_refs++;
+               sz = btr_get_buffer_by_reference(br, &buf);
+               if (br->wrap_count != 0) {
+                       assert(!wbr);
+                       assert(num_refs == 1);
+                       wbr = br;
+                       if (sz >= dest_size)
+                               return;
+                       continue;
+               }
+               if (!buf1) {
+                       buf1 = buf;
+                       sz1 = sz;
+                       goto next;
+               }
+               if (buf1 + sz1 == buf) {
+                       sz1 += sz;
+                       goto next;
+               }
+               if (!buf2) {
+                       buf2 = buf;
+                       sz2 = sz;
+                       goto next;
+               }
+               assert(buf2 + sz2 == buf);
+               sz2 += sz;
+next:
+               if (sz1 + sz2 >= dest_size)
+                       break;
+       }
+       if (!wbr) {
+               assert(buf1);
+               if (!buf2) /* nothing to do */
+                       return;
+               /* make a new wrap buffer combining buf1 and buf 2. */
+               sz = sz1 + sz2;
+               buf = para_malloc(sz);
+               PARA_DEBUG_LOG("merging input buffers: (%p:%zu, %p:%zu) -> %p:%zu\n",
+                       buf1, sz1, buf2, sz2, buf, sz);
+               memcpy(buf, buf1, sz1);
+               memcpy(buf + sz1, buf2, sz2);
+               br = para_calloc(sizeof(*br));
+               br->btrb = new_btrb(buf, sz);
+               br->btrb->refcount = 1;
+               br->consumed = 0;
+               /* This is a wrap buffer */
+               br->wrap_count = sz1;
+               para_list_add(&br->node, &btrn->input_queue);
+               return;
+       }
+       /*
+        * We already have a wrap buffer, but it is too small. It might be
+        * partially used.
+        */
+       wsz = br_available_bytes(wbr);
+       if (wbr->wrap_count == sz1 && wbr->btrb->size >= sz1 + sz2) /* nothing we can do about it */
+               return;
+       assert(buf1 && buf2);
+       sz = sz1 + sz2 - wbr->btrb->size; /* amount of new data */
+       wbr->btrb->size += sz;
+       PARA_DEBUG_LOG("increasing wrap buffer to %zu\n", wbr->btrb->size);
+       wbr->btrb->buf = para_realloc(wbr->btrb->buf, wbr->btrb->size);
+       /* copy the new data to the end of the reallocated buffer */
+       assert(sz2 >= sz);
+       memcpy(wbr->btrb->buf + wbr->btrb->size - sz, buf2 + sz2 - sz, sz);
+}
+
 /**
  * Merge the first two input buffers into one.
  *
@@ -362,17 +658,15 @@ static int merge_input(struct btr_node *btrn)
        }
        /* make a new btrb that combines the two buffers and a br to it. */
        sz = szs[0] + szs[1];
-       PARA_DEBUG_LOG("merging input buffers: (%zu, %zu) -> %zu\n",
-               szs[0], szs[1], sz);
        buf = para_malloc(sz);
-       /* TODO: Avoid this memcopy by introducing btr buffer pool. */
+       PARA_DEBUG_LOG("memory merging input buffers: (%zu, %zu) -> %zu\n",
+               szs[0], szs[1], sz);
        memcpy(buf, bufs[0], szs[0]);
        memcpy(buf + szs[0], bufs[1], szs[1]);
 
-       br = para_malloc(sizeof(*br));
+       br = para_calloc(sizeof(*br));
        br->btrb = new_btrb(buf, sz);
        br->btrb->refcount = 1;
-       br->consumed = 0;
 
        /* replace the first two refs by the new one */
        btr_drop_buffer_reference(brs[0]);
@@ -383,6 +677,8 @@ static int merge_input(struct btr_node *btrn)
 
 void btr_merge(struct btr_node *btrn, size_t dest_size)
 {
+       if (need_buffer_pool_merge(btrn))
+               return merge_input_pool(btrn, dest_size);
        for (;;) {
                char *buf;
                size_t len = btr_next_buffer(btrn, &buf);
index 9de9926..53b24fe 100644 (file)
@@ -1,5 +1,6 @@
 
 struct btr_node;
+struct btr_pool;
 
 typedef int (*btr_command_handler)(struct btr_node *btrn,
                const char *command, char **result);
@@ -10,6 +11,13 @@ enum btr_node_type {
        BTR_NT_LEAF,
 };
 
+struct btr_pool *btr_pool_new(size_t area_size);
+void btr_pool_free(struct btr_pool *btrp);
+size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result);
+void btr_pool_allocate(struct btr_pool *btrp, size_t size);
+void btr_add_output_pool(struct btr_pool *btrp, char *buf, size_t size,
+       struct btr_node *btrn);
+
 struct btr_node *btr_new_node(const char *name, struct btr_node *parent,
                btr_command_handler handler, void *context);
 void btr_remove_node(struct btr_node *btrn);
index ed362bf..b81ba80 100644 (file)
 enum http_recv_status {HTTP_CONNECTED, HTTP_SENT_GET_REQUEST, HTTP_STREAMING};
 
 /**
- * data specific to the http receiver
+ * Data specific to the http receiver.
  *
  * Each running instance of the http receiver reserves space for one such struct.
  */
 struct private_http_recv_data {
-/**
- *
- *
- * the current status of the http receiver node
- *
- * It gets initialized to \p HTTP_CONNECTED by the open function of the
- * http receiver.
- *
- * \sa receiver::open, receiver_node.
- */
+       /**
+        * The current status of the http receiver node.
+        *
+        * It gets initialized to \p HTTP_CONNECTED by the open function of the
+        * http receiver.
+        *
+        * \sa receiver::open, receiver_node.
+        */
        enum http_recv_status status;
-/**
- *
- *
- * the file descriptor used for receiving the http stream
- *
- * The pre_select function of the http receiver adds this file descriptor to
- * the set of file decriptors which are checked for reading/writing (depending
- * on the current status) by the select loop of the application (para_audiod or
- * para_recv).
- *
- * The post_select function of the http receiver uses \a fd, if ready, to
- * establish the http connection, and updates \a status according to the new
- * state of the connection.  As soon as \a status is \p HTTP_STREAMING, \a fd is
- * going to be only checked for reading. If data is available, it is read into
- * the output buffer of the receiver node by post_select.
- *
- * \sa receiver::pre_select receiver::post_select receiver_node, http_recv_status
- */
+       /**
+        * The file descriptor used for receiving the http stream.
+        *
+        * The pre_select function of the http receiver adds this file descriptor to
+        * the set of file decriptors which are checked for reading/writing (depending
+        * on the current status) by the select loop of the application (para_audiod or
+        * para_recv).
+        *
+        * The post_select function of the http receiver uses \a fd, if ready, to
+        * establish the http connection, and updates \a status according to the new
+        * state of the connection.  As soon as \a status is \p HTTP_STREAMING, \a fd is
+        * going to be only checked for reading. If data is available, it is read into
+        * the output buffer of the receiver node by post_select.
+        *
+        * \sa receiver::pre_select receiver::post_select receiver_node, http_recv_status
+        */
        int fd;
+       struct btr_pool *btrp;
 };
 
 static void http_shutdown(void)
@@ -103,8 +100,6 @@ static void http_recv_pre_select(struct sched *s, struct task *t)
                para_fd_set(phd->fd, &s->rfds, &s->max_fileno);
 }
 
-#define HTTP_RECV_READ_BUF_SIZE 16384
-
 static void http_recv_post_select(struct sched *s, struct task *t)
 {
        struct receiver_node *rn = container_of(t, struct receiver_node, task);
@@ -150,16 +145,22 @@ static void http_recv_post_select(struct sched *s, struct task *t)
        }
        if (btrn) {
                char *buf;
+               size_t sz;
 
-               buf = para_malloc(HTTP_RECV_READ_BUF_SIZE);
-               ret = recv_bin_buffer(phd->fd, buf, HTTP_RECV_READ_BUF_SIZE);
+               sz = btr_pool_get_buffer(phd->btrp, &buf);
+               //PARA_CRIT_LOG("max buffer %p: %zu\n", buf, sz);
+               ret = -E_HTTP_RECV_OVERRUN;
+               if (sz == 0)
+                       goto err;
+               //buf = para_malloc(HTTP_RECV_READ_BUF_SIZE);
+               //sz = HTTP_RECV_READ_BUF_SIZE;
+               ret = recv_bin_buffer(phd->fd, buf, sz);
                if (ret == 0)
                        ret = -E_RECV_EOF;
-               if (ret < 0) {
-                       free(buf);
+               if (ret < 0)
                        goto err;
-               }
-               btr_add_output(buf, ret, btrn);
+               btr_pool_allocate(phd->btrp, ret);
+               btr_add_output_pool(phd->btrp, buf, ret, btrn);
                return;
        }
        ret = -E_HTTP_RECV_OVERRUN;
@@ -184,6 +185,7 @@ static void http_recv_close(struct receiver_node *rn)
        struct private_http_recv_data *phd = rn->private_data;
 
        close(phd->fd);
+       btr_pool_free(phd->btrp);
        free(rn->buf);
        free(rn->private_data);
 }
@@ -217,6 +219,7 @@ static int http_recv_open(struct receiver_node *rn)
        rn->private_data = phd = para_calloc(sizeof(struct private_http_recv_data));
        phd->fd = fd;
        phd->status = HTTP_CONNECTED;
+       phd->btrp = btr_pool_new(320 * 1024);
        return 1;
 }