X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=vss.c;h=101fdb8818b4fe8ca794bda2cd71a385b093df02;hp=023b2c1d3cc8fcc0dcf23f0b5d2fc5c43487012f;hb=6428bef6e3c172851dc97c3e126f438c7aaf0695;hpb=e2e1adca824585eeecd091f54955e51212927533 diff --git a/vss.c b/vss.c index 023b2c1d..101fdb88 100644 --- a/vss.c +++ b/vss.c @@ -148,14 +148,10 @@ struct fec_client { struct fec_group group; /** The current slice. */ uint8_t current_slice_num; - /** The data to be FEC-encoded (point to a region within the mapped audio file). */ - const unsigned char **src_data; + /** The data to be FEC-encoded. */ + unsigned char **src_data; /** Last time an audio header was sent. */ struct timeval next_header_time; - /** Used for the last source pointer of an audio file. */ - unsigned char *extra_src_buf; - /** Needed for the last slice of the audio file header. */ - unsigned char *extra_header_buf; /** Extra slices needed to store largest chunk + header. */ int num_extra_slices; /** Contains the FEC-encoded data. */ @@ -240,6 +236,21 @@ static bool need_data_slices(struct fec_client *fc, struct vss_task *vsst) return false; } +static int fc_num_data_slices(const struct fec_client *fc) +{ + return fc->fcp->data_slices_per_group + fc->num_extra_slices; +} + +static int fc_num_slices(const struct fec_client *fc) +{ + return fc->fcp->slices_per_group + fc->num_extra_slices; +} + +static int fc_num_redundant_slices(const struct fec_client *fc) +{ + return fc->fcp->slices_per_group - fc->fcp->data_slices_per_group; +} + static int num_slices(long unsigned bytes, int max_payload, int rs) { int ret; @@ -270,7 +281,7 @@ static void set_group_timing(struct fec_client *fc, struct vss_task *vsst) static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst) { - int k, n, ret; + int i, k, n, ret; int hs, ds, rs; /* header/data/redundant slices */ struct fec_client_parms *fcp = fc->fcp; @@ -290,7 +301,17 @@ static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst) if (fc->mps <= FEC_HEADER_SIZE) return -ERRNO_TO_PARA_ERROR(EINVAL); - rs = fc->fcp->slices_per_group - fc->fcp->data_slices_per_group; + /* free previous buffers, if any */ + if (fc->src_data) { + k = fc_num_data_slices(fc); + for (i = 0; i < k; i++) + free(fc->src_data[i]); + free(fc->src_data); + fc->src_data = NULL; + } + free(fc->enc_buf); + + rs = fc_num_redundant_slices(fc); ret = num_slices(vsst->header_len, fc->mps - FEC_HEADER_SIZE, rs); if (ret < 0) return ret; @@ -306,17 +327,18 @@ static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst) if (k < fc->fcp->data_slices_per_group) k = fc->fcp->data_slices_per_group; fc->num_extra_slices = k - fc->fcp->data_slices_per_group; - n = k + rs; + n = fc_num_slices(fc); + PARA_INFO_LOG("mps: %d, k: %d, n: %d, extra slices: %d\n", + fc->mps, k, n, fc->num_extra_slices); + fec_free(fc->parms); ret = fec_new(k, n, &fc->parms); if (ret < 0) return ret; - PARA_INFO_LOG("mps: %d, k: %d, n: %d, extra slices: %d\n", - fc->mps, k, n, fc->num_extra_slices); - fc->src_data = para_realloc(fc->src_data, k * sizeof(char *)); - fc->enc_buf = para_realloc(fc->enc_buf, fc->mps); - fc->extra_src_buf = para_realloc(fc->extra_src_buf, fc->mps); - fc->extra_header_buf = para_realloc(fc->extra_header_buf, fc->mps); + fc->src_data = para_malloc(k * sizeof(char *)); + for (i = 0; i < k; i++) + fc->src_data[i] = para_malloc(fc->mps); + fc->enc_buf = para_malloc(fc->mps); fc->state = FEC_STATE_READY_TO_RUN; fc->next_header_time.tv_sec = 0; @@ -394,7 +416,10 @@ static int compute_group_size(struct vss_task *vsst, struct fec_group *g, g->bytes += len; g->num_chunks++; } - assert(g->num_chunks); + if (g->num_chunks == 0) + return -E_EOF; + PARA_DEBUG_LOG("group #%u: %u chunks, %u bytes total\n", g->num, + g->num_chunks, g->bytes); return 1; } @@ -448,8 +473,8 @@ static int compute_group_size(struct vss_task *vsst, struct fec_group *g, static int compute_slice_size(struct fec_client *fc, struct vss_task *vsst) { struct fec_group *g = &fc->group; - int k = fc->fcp->data_slices_per_group + fc->num_extra_slices; - int n = fc->fcp->slices_per_group + fc->num_extra_slices; + int k = fc_num_data_slices(fc); + int n = fc_num_slices(fc); int ret, k1, k2, h, d, min, max, sum; int max_slice_bytes = fc->mps - FEC_HEADER_SIZE; int max_group_bytes; @@ -512,9 +537,8 @@ static int compute_slice_size(struct fec_client *fc, struct vss_task *vsst) static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst) { - int ret, i, k, n, data_slices; - size_t len; - char *buf, *p; + int ret, i, c; + size_t copy, src_copied, slice_copied; struct fec_group *g = &fc->group; if (fc->state == FEC_STATE_NONE) { @@ -538,86 +562,63 @@ static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst) g->first_chunk += g->num_chunks; g->num++; } - k = fc->fcp->data_slices_per_group + fc->num_extra_slices; - n = fc->fcp->slices_per_group + fc->num_extra_slices; - compute_slice_size(fc, vsst); assert(g->slice_bytes > 0); - ret = num_slices(g->bytes, g->slice_bytes, n - k); - if (ret < 0) - return ret; - data_slices = ret; - assert(g->num_header_slices + data_slices <= k); fc->current_slice_num = 0; if (g->num == 0) set_group_timing(fc, vsst); /* setup header slices */ - buf = vsst->header_buf; - for (i = 0; i < g->num_header_slices; i++) { - uint32_t payload_size; - if (buf + g->slice_bytes <= vsst->header_buf + vsst->header_len) { - fc->src_data[i] = (const unsigned char *)buf; - buf += g->slice_bytes; - continue; - } - /* - * Can not use vss->header_buf for this slice as it - * goes beyond the buffer. This slice will not be fully - * used. - */ - payload_size = vsst->header_buf + vsst->header_len - buf; - memcpy(fc->extra_header_buf, buf, payload_size); - if (payload_size < g->slice_bytes) - memset(fc->extra_header_buf + payload_size, 0, - g->slice_bytes - payload_size); - /* - * There might be more than one header slice to fill although - * only the first one will be used. Set all header slices to - * our extra buffer. - */ - while (i < g->num_header_slices) - fc->src_data[i++] = fc->extra_header_buf; - break; /* we don't want i to be increased. */ + for (i = 0, src_copied = 0; i < g->num_header_slices; i++) { + copy = PARA_MIN((size_t)g->slice_bytes, vsst->header_len - src_copied); + if (copy == 0) + break; + memcpy(fc->src_data[i], vsst->header_buf + src_copied, copy); + if (copy < g->slice_bytes) + memset(fc->src_data[i] + copy, 0, g->slice_bytes - copy); + src_copied += copy; } - /* - * Setup data slices. Note that for ogg streams chunk 0 points to a - * buffer on the heap rather than to the mapped audio file. + * There might be more than one header slice to fill although only the + * first one will be used. Zero out any remaining header slices. */ - ret = vss_get_chunk(g->first_chunk, vsst, &buf, &len); - if (ret < 0) - return ret; - for (p = buf; i < g->num_header_slices + data_slices; i++) { - if (p + g->slice_bytes > buf + g->bytes) { - /* - * We must make a copy for this slice since using p - * directly would exceed the buffer. - */ - uint32_t payload_size = buf + g->bytes - p; - assert(payload_size + FEC_HEADER_SIZE <= fc->mps); - memcpy(fc->extra_src_buf, p, payload_size); - if (payload_size < g->slice_bytes) - memset(fc->extra_src_buf + payload_size, 0, - g->slice_bytes - payload_size); - fc->src_data[i] = fc->extra_src_buf; - i++; - break; + while (i < g->num_header_slices) + memset(fc->src_data[i++], 0, g->slice_bytes); + + slice_copied = 0; + for (c = g->first_chunk; c < g->first_chunk + g->num_chunks; c++) { + char *buf; + size_t src_len; + ret = vss_get_chunk(c, vsst, &buf, &src_len); + if (ret < 0) + return ret; + if (src_len == 0) + continue; + src_copied = 0; + while (src_copied < src_len) { + copy = PARA_MIN((size_t)g->slice_bytes - slice_copied, + src_len - src_copied); + memcpy(fc->src_data[i] + slice_copied, + buf + src_copied, copy); + src_copied += copy; + slice_copied += copy; + if (slice_copied == g->slice_bytes) { + i++; + slice_copied = 0; + } } - fc->src_data[i] = (const unsigned char *)p; - p += g->slice_bytes; - } - if (i < k) { - /* use arbitrary data for all remaining slices */ - buf = vsst->map; - for (; i < k; i++) - fc->src_data[i] = (const unsigned char *)buf; } + if (i < fc_num_data_slices(fc) && slice_copied < g->slice_bytes) + memset(fc->src_data[i] + slice_copied, 0, + g->slice_bytes - slice_copied); + /* zero out remaining slices, if any */ + while (++i < fc_num_data_slices(fc)) + memset(fc->src_data[i], 0, g->slice_bytes); PARA_DEBUG_LOG("FEC group %u: %u chunks (%u - %u), %u bytes\n", g->num, g->num_chunks, g->first_chunk, g->first_chunk + g->num_chunks - 1, g->bytes ); PARA_DEBUG_LOG("slice_bytes: %d, %d header slices, %d data slices\n", - g->slice_bytes, g->num_header_slices, data_slices + g->slice_bytes, g->num_header_slices, fc_num_data_slices(fc) ); return 1; } @@ -637,8 +638,9 @@ static int compute_next_fec_slice(struct fec_client *fc, struct vss_task *vsst) } } write_fec_header(fc, vsst); - fec_encode(fc->parms, fc->src_data, fc->enc_buf + FEC_HEADER_SIZE, - fc->current_slice_num, fc->group.slice_bytes); + fec_encode(fc->parms, (const unsigned char * const*)fc->src_data, + fc->enc_buf + FEC_HEADER_SIZE, fc->current_slice_num, + fc->group.slice_bytes); return 1; } @@ -684,11 +686,15 @@ struct fec_client *vss_add_fec_client(struct sender_client *sc, */ void vss_del_fec_client(struct fec_client *fc) { + int i; + list_del(&fc->node); - free(fc->src_data); free(fc->enc_buf); - free(fc->extra_src_buf); - free(fc->extra_header_buf); + if (fc->src_data) { + for (i = 0; i < fc_num_data_slices(fc); i++) + free(fc->src_data[i]); + free(fc->src_data); + } fec_free(fc->parms); free(fc); } @@ -1191,7 +1197,7 @@ void vss_init(int afs_socket, struct sched *s) /** * Turn off the virtual streaming system. * - * This is only executed on exit. It calls the ->shutdowwn method of all senders. + * This is only executed on exit. It calls the ->shutdown method of all senders. */ void vss_shutdown(void) {