From 5f303cc5a96bfeaa66d2d6e899bf56d1d03ed085 Mon Sep 17 00:00:00 2001 From: Andre Noll Date: Sat, 9 Jan 2010 22:25:17 +0100 Subject: [PATCH] Add btr pool support to the fecdec filter. The consumers of the output of fecdec are typically the decoders which have requirements on the minimal amount of data available in one chunk. Using a buffer pool minimizes the number of memcpy() operations needed during btr_merge when the current buffer is too small. To fit the needs of fecdec, a new function btr_copy() is introduced which copies a data buffer into the btr pool area and generates btr buffers and references. btr_add_output_pool() had to be adjusted a bit and all callers have been changed to match. The patch also adds a new field "name" to the buffer_pool structure. This makes the log output more readable. --- buffer_tree.c | 43 ++++++++++++++++++++++++++++++++++++------- buffer_tree.h | 7 +++++-- fecdec_filter.c | 21 +++++++++++++++------ http_recv.c | 5 ++--- udp_recv.c | 7 +++---- 5 files changed, 61 insertions(+), 22 deletions(-) diff --git a/buffer_tree.c b/buffer_tree.c index dbdcb586..80c0425b 100644 --- a/buffer_tree.c +++ b/buffer_tree.c @@ -9,6 +9,7 @@ #include "sched.h" struct btr_pool { + char *name; char *area_start; char *area_end; char *rhead; @@ -55,14 +56,17 @@ struct btr_node { void *context; }; -struct btr_pool *btr_pool_new(size_t area_size) +struct btr_pool *btr_pool_new(const char *name, size_t area_size) { - struct btr_pool *btrp = para_malloc(sizeof(*btrp)); + struct btr_pool *btrp; + PARA_INFO_LOG("%s, %zu bytes\n", name, area_size); + btrp = para_malloc(sizeof(*btrp)); btrp->area_start = para_malloc(area_size); btrp->area_end = btrp->area_start + area_size; btrp->rhead = btrp->area_start; btrp->whead = btrp->area_start; + btrp->name = para_strdup(name); return btrp; } @@ -73,6 +77,7 @@ void btr_pool_free(struct btr_pool *btrp) if (!btrp) return; free(btrp->area_start); + free(btrp->name); free(btrp); } @@ -263,21 +268,45 @@ void btr_add_output(char *buf, size_t size, struct btr_node *btrn) add_btrb_to_children(btrb, btrn, 0); } -void btr_add_output_pool(struct btr_pool *btrp, char *buf, size_t size, - struct btr_node *btrn) +void btr_add_output_pool(struct btr_pool *btrp, size_t size, + struct btr_node *btrn) { struct btr_buffer *btrb; + char *buf; + size_t avail; assert(size != 0); - if (list_empty(&btrn->children)) { - btr_pool_deallocate(btrp, size); + if (list_empty(&btrn->children)) return; - } + avail = btr_pool_get_buffer(btrp, &buf); + assert(avail >= size); + btr_pool_allocate(btrp, size); btrb = new_btrb(buf, size); btrb->pool = btrp; add_btrb_to_children(btrb, btrn, 0); } +void btr_copy(const void *src, size_t n, struct btr_pool *btrp, + struct btr_node *btrn) +{ + char *buf; + size_t sz, copy; + + if (n == 0) + return; + assert(n <= btr_pool_unused(btrp)); + sz = btr_pool_get_buffer(btrp, &buf); + copy = PARA_MIN(sz, n); + memcpy(buf, src, copy); + btr_add_output_pool(btrp, copy, btrn); + if (copy == n) + return; + sz = btr_pool_get_buffer(btrp, &buf); + assert(sz >= n - copy); + memcpy(buf, src + copy, n - copy); + btr_add_output_pool(btrp, n - copy, btrn); +} + static void btr_pushdown_br(struct btr_buffer_reference *br, struct btr_node *btrn) { add_btrb_to_children(br->btrb, btrn, br->consumed); diff --git a/buffer_tree.h b/buffer_tree.h index 53b24fe3..b9cfc1e4 100644 --- a/buffer_tree.h +++ b/buffer_tree.h @@ -11,11 +11,14 @@ enum btr_node_type { BTR_NT_LEAF, }; -struct btr_pool *btr_pool_new(size_t area_size); +struct btr_pool *btr_pool_new(const char *name, size_t area_size); void btr_pool_free(struct btr_pool *btrp); size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result); void btr_pool_allocate(struct btr_pool *btrp, size_t size); -void btr_add_output_pool(struct btr_pool *btrp, char *buf, size_t size, +void btr_add_output_pool(struct btr_pool *btrp, size_t size, + struct btr_node *btrn); +size_t btr_pool_unused(struct btr_pool *btrp); +void btr_copy(const void *src, size_t n, struct btr_pool *btrp, struct btr_node *btrn); struct btr_node *btr_new_node(const char *name, struct btr_node *parent, diff --git a/fecdec_filter.c b/fecdec_filter.c index 39e78163..25533b24 100644 --- a/fecdec_filter.c +++ b/fecdec_filter.c @@ -86,6 +86,7 @@ struct private_fecdec_data { int have_header; /** Points to the first received group. */ struct fecdec_group *first_complete_group; + struct btr_pool *btrp; }; /** Iterate over all fecdec groups. */ @@ -311,11 +312,14 @@ static int decode_group(struct fecdec_group *fg, struct filter_node *fn) PARA_DEBUG_LOG("writing group %d (%d/%d decoded data bytes)\n", fg->h.group_num, fg->h.group_bytes, fg->h.data_slices_per_group * sb); - need = fn->loaded + (fg->h.data_slices_per_group - i) * sb; if (fn->btrn) { - buf = para_malloc(need); + need = (fg->h.data_slices_per_group - i) * sb; + if (need > btr_pool_unused(pfd->btrp)) + return -E_FECDEC_OVERRUN; + btr_pool_get_buffer(pfd->btrp, &buf); p = buf; } else { + need = fn->loaded + (fg->h.data_slices_per_group - i) * sb; if (need > fn->bufsize) { fn->bufsize = PARA_MAX(fn->bufsize * 2, need); if (fn->bufsize > FECDEC_MAX_OUTBUF_SIZE) @@ -334,7 +338,10 @@ static int decode_group(struct fecdec_group *fg, struct filter_node *fn) break; if (sb + written > fg->h.audio_header_size) n = fg->h.audio_header_size - written; - memcpy(p + written, fg->data[i], n); + if (fn->btrn) + btr_copy(fg->data[i], n, pfd->btrp, fn->btrn); + else + memcpy(p + written, fg->data[i], n); fn->loaded += n; written += n; } @@ -345,13 +352,14 @@ static int decode_group(struct fecdec_group *fg, struct filter_node *fn) size_t n = sb; if (n + written > fg->h.group_bytes) n = fg->h.group_bytes - written; - memcpy(p + written, fg->data[i], n); + if (fn->btrn) + btr_copy(fg->data[i], n, pfd->btrp, fn->btrn); + else + memcpy(p + written, fg->data[i], n); fn->loaded += n; written += n; } p += written; - if (fn->btrn) - btr_add_output(buf, p - buf, fn->btrn); return 0; } @@ -427,6 +435,7 @@ static int dispatch_slice(char *buf, size_t len, struct fec_header *h, ret = fec_new(k, n, &pfd->fec); if (ret < 0) return ret; + pfd->btrp = btr_pool_new("fecdec", 20 * k * h->slice_bytes); /* decode and clear the first group */ ret = decode_group(pfd->first_complete_group, fn); if (ret < 0) diff --git a/http_recv.c b/http_recv.c index b81ba802..def5e620 100644 --- a/http_recv.c +++ b/http_recv.c @@ -159,8 +159,7 @@ static void http_recv_post_select(struct sched *s, struct task *t) ret = -E_RECV_EOF; if (ret < 0) goto err; - btr_pool_allocate(phd->btrp, ret); - btr_add_output_pool(phd->btrp, buf, ret, btrn); + btr_add_output_pool(phd->btrp, ret, btrn); return; } ret = -E_HTTP_RECV_OVERRUN; @@ -219,7 +218,7 @@ static int http_recv_open(struct receiver_node *rn) rn->private_data = phd = para_calloc(sizeof(struct private_http_recv_data)); phd->fd = fd; phd->status = HTTP_CONNECTED; - phd->btrp = btr_pool_new(320 * 1024); + phd->btrp = btr_pool_new("http_recv", 320 * 1024); return 1; } diff --git a/udp_recv.c b/udp_recv.c index 0c5a3abf..3d680814 100644 --- a/udp_recv.c +++ b/udp_recv.c @@ -130,13 +130,12 @@ static void udp_recv_post_select_btr(__a_unused struct sched *s, struct task *t) packet_size = ret; if (packet_size >= FEC_EOF_PACKET_LEN) { if (!memcmp(buf, FEC_EOF_PACKET, FEC_EOF_PACKET_LEN)) { - PARA_CRIT_LOG("%p: eof\n", rn); + PARA_INFO_LOG("received eof packet\n"); ret = -E_RECV_EOF; goto err; } } - btr_pool_allocate(purd->btrp, packet_size); - btr_add_output_pool(purd->btrp, buf, packet_size, btrn); + btr_add_output_pool(purd->btrp, packet_size, btrn); return; err: btr_remove_node(btrn); @@ -273,7 +272,7 @@ static int udp_recv_open(struct receiver_node *rn) } PARA_CRIT_LOG("rn %p: receiving from %s:%d, fd=%d\n", rn, c->host_arg, c->port_arg, purd->fd); - purd->btrp = btr_pool_new(320 * 1024); + purd->btrp = btr_pool_new("udp_recv", 320 * 1024); return purd->fd; err: free(rn->private_data); -- 2.39.2