]> git.tuebingen.mpg.de Git - paraslash.git/commitdiff
vss: Rework fec client setup.
authorAndre Noll <maan@tuebingen.mpg.de>
Fri, 12 Jun 2020 01:34:32 +0000 (03:34 +0200)
committerAndre Noll <maan@tuebingen.mpg.de>
Sat, 11 Jul 2020 10:50:55 +0000 (12:50 +0200)
The current fec code assumes that the chunks of the audio file form a
contigous buffer. At least for aac/m4a this is not true, which is
why streaming m4a files over udp never worked well.

This patch should be a big improvement in this regard. We now copy
the chunks to preallocated buffers, which also makes the code easier
to follow because we can get rid of the two extra buffers in struct
fec_client.

vss.c

diff --git a/vss.c b/vss.c
index 2a24a50068b47b96c16fe287e456aeaf51c55473..737b77ac29e546fcdbf808dbd884e5dcf5b291e5 100644 (file)
--- a/vss.c
+++ b/vss.c
@@ -148,14 +148,10 @@ struct fec_client {
        struct fec_group group;
        /** The current slice. */
        uint8_t current_slice_num;
        struct fec_group group;
        /** The current slice. */
        uint8_t current_slice_num;
-       /** The data to be FEC-encoded (point to a region within the mapped audio file). */
-       const unsigned char **src_data;
+       /** The data to be FEC-encoded */
+       unsigned char **src_data;
        /** Last time an audio  header was sent. */
        struct timeval next_header_time;
        /** Last time an audio  header was sent. */
        struct timeval next_header_time;
-       /** Used for the last source pointer of an audio file. */
-       unsigned char *extra_src_buf;
-       /** Needed for the last slice of the audio file header. */
-       unsigned char *extra_header_buf;
        /** Extra slices needed to store largest chunk + header. */
        int num_extra_slices;
        /** Contains the FEC-encoded data. */
        /** Extra slices needed to store largest chunk + header. */
        int num_extra_slices;
        /** Contains the FEC-encoded data. */
@@ -240,6 +236,21 @@ static bool need_data_slices(struct fec_client *fc, struct vss_task *vsst)
        return false;
 }
 
        return false;
 }
 
+static int fc_num_data_slices(const struct fec_client *fc)
+{
+       return fc->fcp->data_slices_per_group + fc->num_extra_slices;
+}
+
+static int fc_num_slices(const struct fec_client *fc)
+{
+       return fc->fcp->slices_per_group + fc->num_extra_slices;
+}
+
+static int fc_num_redundant_slices(const struct fec_client *fc)
+{
+       return fc->fcp->slices_per_group - fc->fcp->data_slices_per_group;
+}
+
 static int num_slices(long unsigned bytes, int max_payload, int rs)
 {
        int ret;
 static int num_slices(long unsigned bytes, int max_payload, int rs)
 {
        int ret;
@@ -270,7 +281,7 @@ static void set_group_timing(struct fec_client *fc, struct vss_task *vsst)
 
 static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst)
 {
 
 static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst)
 {
-       int k, n, ret;
+       int i, k, n, ret;
        int hs, ds, rs; /* header/data/redundant slices */
        struct fec_client_parms *fcp = fc->fcp;
 
        int hs, ds, rs; /* header/data/redundant slices */
        struct fec_client_parms *fcp = fc->fcp;
 
@@ -290,7 +301,17 @@ static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst)
        if (fc->mps <= FEC_HEADER_SIZE)
                return -ERRNO_TO_PARA_ERROR(EINVAL);
 
        if (fc->mps <= FEC_HEADER_SIZE)
                return -ERRNO_TO_PARA_ERROR(EINVAL);
 
-       rs = fc->fcp->slices_per_group - fc->fcp->data_slices_per_group;
+       /* free previous buffers, if any */
+       if (fc->src_data) {
+               k = fc_num_data_slices(fc);
+               for (i = 0; i < k; i++)
+                       free(fc->src_data[i]);
+               free(fc->src_data);
+               fc->src_data = NULL;
+       }
+       free(fc->enc_buf);
+
+       rs = fc_num_redundant_slices(fc);
        ret = num_slices(vsst->header_len, fc->mps - FEC_HEADER_SIZE, rs);
        if (ret < 0)
                return ret;
        ret = num_slices(vsst->header_len, fc->mps - FEC_HEADER_SIZE, rs);
        if (ret < 0)
                return ret;
@@ -306,17 +327,18 @@ static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst)
        if (k < fc->fcp->data_slices_per_group)
                k = fc->fcp->data_slices_per_group;
        fc->num_extra_slices = k - fc->fcp->data_slices_per_group;
        if (k < fc->fcp->data_slices_per_group)
                k = fc->fcp->data_slices_per_group;
        fc->num_extra_slices = k - fc->fcp->data_slices_per_group;
-       n = k + rs;
+       n = fc_num_slices(fc);
+       PARA_INFO_LOG("mps: %d, k: %d, n: %d, extra slices: %d\n",
+               fc->mps, k, n, fc->num_extra_slices);
+
        fec_free(fc->parms);
        ret = fec_new(k, n, &fc->parms);
        if (ret < 0)
                return ret;
        fec_free(fc->parms);
        ret = fec_new(k, n, &fc->parms);
        if (ret < 0)
                return ret;
-       PARA_INFO_LOG("mps: %d, k: %d, n: %d, extra slices: %d\n",
-               fc->mps, k, n, fc->num_extra_slices);
-       fc->src_data = para_realloc(fc->src_data, k * sizeof(char *));
-       fc->enc_buf = para_realloc(fc->enc_buf, fc->mps);
-       fc->extra_src_buf = para_realloc(fc->extra_src_buf, fc->mps);
-       fc->extra_header_buf = para_realloc(fc->extra_header_buf, fc->mps);
+       fc->src_data = para_malloc(k * sizeof(char *));
+       for (i = 0; i < k; i++)
+               fc->src_data[i] = para_malloc(fc->mps);
+       fc->enc_buf = para_malloc(fc->mps);
 
        fc->state = FEC_STATE_READY_TO_RUN;
        fc->next_header_time.tv_sec = 0;
 
        fc->state = FEC_STATE_READY_TO_RUN;
        fc->next_header_time.tv_sec = 0;
@@ -395,6 +417,8 @@ static int compute_group_size(struct vss_task *vsst, struct fec_group *g,
                g->num_chunks++;
        }
        assert(g->num_chunks);
                g->num_chunks++;
        }
        assert(g->num_chunks);
+       PARA_DEBUG_LOG("group #%u: %u chunks, %u bytes total\n", g->num,
+               g->num_chunks, g->bytes);
        return 1;
 }
 
        return 1;
 }
 
@@ -448,8 +472,8 @@ static int compute_group_size(struct vss_task *vsst, struct fec_group *g,
 static int compute_slice_size(struct fec_client *fc, struct vss_task *vsst)
 {
        struct fec_group *g = &fc->group;
 static int compute_slice_size(struct fec_client *fc, struct vss_task *vsst)
 {
        struct fec_group *g = &fc->group;
-       int k = fc->fcp->data_slices_per_group + fc->num_extra_slices;
-       int n = fc->fcp->slices_per_group + fc->num_extra_slices;
+       int k = fc_num_data_slices(fc);
+       int n = fc_num_slices(fc);
        int ret, k1, k2, h, d, min, max, sum;
        int max_slice_bytes = fc->mps - FEC_HEADER_SIZE;
        int max_group_bytes;
        int ret, k1, k2, h, d, min, max, sum;
        int max_slice_bytes = fc->mps - FEC_HEADER_SIZE;
        int max_group_bytes;
@@ -512,9 +536,8 @@ static int compute_slice_size(struct fec_client *fc, struct vss_task *vsst)
 
 static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst)
 {
 
 static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst)
 {
-       int ret, i, k, n, data_slices;
-       size_t len;
-       char *buf, *p;
+       int ret, i, c;
+       size_t copy, src_copied, slice_copied;
        struct fec_group *g = &fc->group;
 
        if (fc->state == FEC_STATE_NONE) {
        struct fec_group *g = &fc->group;
 
        if (fc->state == FEC_STATE_NONE) {
@@ -538,86 +561,63 @@ static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst)
                g->first_chunk += g->num_chunks;
                g->num++;
        }
                g->first_chunk += g->num_chunks;
                g->num++;
        }
-       k = fc->fcp->data_slices_per_group + fc->num_extra_slices;
-       n = fc->fcp->slices_per_group + fc->num_extra_slices;
-
        compute_slice_size(fc, vsst);
        assert(g->slice_bytes > 0);
        compute_slice_size(fc, vsst);
        assert(g->slice_bytes > 0);
-       ret = num_slices(g->bytes, g->slice_bytes, n - k);
-       if (ret < 0)
-               return ret;
-       data_slices = ret;
-       assert(g->num_header_slices + data_slices <= k);
        fc->current_slice_num = 0;
        if (g->num == 0)
                set_group_timing(fc, vsst);
        /* setup header slices */
        fc->current_slice_num = 0;
        if (g->num == 0)
                set_group_timing(fc, vsst);
        /* setup header slices */
-       buf = vsst->header_buf;
-       for (i = 0; i < g->num_header_slices; i++) {
-               uint32_t payload_size;
-               if (buf + g->slice_bytes <= vsst->header_buf + vsst->header_len) {
-                       fc->src_data[i] = (const unsigned char *)buf;
-                       buf += g->slice_bytes;
-                       continue;
-               }
-               /*
-                * Can not use vss->header_buf for this slice as it
-                * goes beyond the buffer. This slice will not be fully
-                * used.
-                */
-               payload_size = vsst->header_buf + vsst->header_len - buf;
-               memcpy(fc->extra_header_buf, buf, payload_size);
-               if (payload_size < g->slice_bytes)
-                       memset(fc->extra_header_buf + payload_size, 0,
-                               g->slice_bytes - payload_size);
-               /*
-                * There might be more than one header slice to fill although
-                * only the first one will be used. Set all header slices to
-                * our extra buffer.
-                */
-               while (i < g->num_header_slices)
-                       fc->src_data[i++] = fc->extra_header_buf;
-               break; /* we don't want i to be increased. */
+       for (i = 0, src_copied = 0; i < g->num_header_slices; i++) {
+               copy = PARA_MIN((size_t)g->slice_bytes, vsst->header_len - src_copied);
+               if (copy == 0)
+                       break;
+               memcpy(fc->src_data[i], vsst->header_buf + src_copied, copy);
+               if (copy < g->slice_bytes)
+                       memset(fc->src_data[i] + copy, 0, g->slice_bytes - copy);
+               src_copied += copy;
        }
        }
-
        /*
        /*
-        * Setup data slices. Note that for ogg streams chunk 0 points to a
-        * buffer on the heap rather than to the mapped audio file.
+        * There might be more than one header slice to fill although only the
+        * first one will be used. Zero out any remaining header slices.
         */
         */
-       ret = vss_get_chunk(g->first_chunk, vsst, &buf, &len);
-       if (ret < 0)
-               return ret;
-       for (p = buf; i < g->num_header_slices + data_slices; i++) {
-               if (p + g->slice_bytes > buf + g->bytes) {
-                       /*
-                        * We must make a copy for this slice since using p
-                        * directly would exceed the buffer.
-                        */
-                       uint32_t payload_size = buf + g->bytes - p;
-                       assert(payload_size + FEC_HEADER_SIZE <= fc->mps);
-                       memcpy(fc->extra_src_buf, p, payload_size);
-                       if (payload_size < g->slice_bytes)
-                               memset(fc->extra_src_buf + payload_size, 0,
-                                       g->slice_bytes - payload_size);
-                       fc->src_data[i] = fc->extra_src_buf;
-                       i++;
-                       break;
+       while (i < g->num_header_slices)
+               memset(fc->src_data[i++], 0, g->slice_bytes);
+
+       slice_copied = 0;
+       for (c = g->first_chunk; c < g->first_chunk + g->num_chunks; c++) {
+               char *buf;
+               size_t src_len;
+               ret = vss_get_chunk(c, vsst, &buf, &src_len);
+               if (ret < 0)
+                       return ret;
+               if (src_len == 0)
+                       continue;
+               src_copied = 0;
+               while (src_copied < src_len) {
+                       copy = PARA_MIN((size_t)g->slice_bytes - slice_copied,
+                               src_len - src_copied);
+                       memcpy(fc->src_data[i] + slice_copied,
+                               buf + src_copied, copy);
+                       src_copied += copy;
+                       slice_copied += copy;
+                       if (slice_copied == g->slice_bytes) {
+                               i++;
+                               slice_copied = 0;
+                       }
                }
                }
-               fc->src_data[i] = (const unsigned char *)p;
-               p += g->slice_bytes;
-       }
-       if (i < k) {
-               /* use arbitrary data for all remaining slices */
-               buf = vsst->map;
-               for (; i < k; i++)
-                       fc->src_data[i] = (const unsigned char *)buf;
        }
        }
+       if (i < fc_num_data_slices(fc) && slice_copied < g->slice_bytes)
+               memset(fc->src_data[i] + slice_copied, 0,
+                        g->slice_bytes - slice_copied);
+       /* zero out remaining slices, if any */
+       while (++i < fc_num_data_slices(fc))
+               memset(fc->src_data[i], 0, g->slice_bytes);
        PARA_DEBUG_LOG("FEC group %u: %u chunks (%u - %u), %u bytes\n",
                g->num, g->num_chunks, g->first_chunk,
                g->first_chunk + g->num_chunks - 1, g->bytes
        );
        PARA_DEBUG_LOG("slice_bytes: %d, %d header slices, %d data slices\n",
        PARA_DEBUG_LOG("FEC group %u: %u chunks (%u - %u), %u bytes\n",
                g->num, g->num_chunks, g->first_chunk,
                g->first_chunk + g->num_chunks - 1, g->bytes
        );
        PARA_DEBUG_LOG("slice_bytes: %d, %d header slices, %d data slices\n",
-               g->slice_bytes, g->num_header_slices, data_slices
+               g->slice_bytes, g->num_header_slices, fc_num_data_slices(fc)
        );
        return 1;
 }
        );
        return 1;
 }
@@ -637,8 +637,9 @@ static int compute_next_fec_slice(struct fec_client *fc, struct vss_task *vsst)
                }
        }
        write_fec_header(fc, vsst);
                }
        }
        write_fec_header(fc, vsst);
-       fec_encode(fc->parms, fc->src_data, fc->enc_buf + FEC_HEADER_SIZE,
-               fc->current_slice_num, fc->group.slice_bytes);
+       fec_encode(fc->parms, (const unsigned char * const*)fc->src_data,
+               fc->enc_buf + FEC_HEADER_SIZE, fc->current_slice_num,
+               fc->group.slice_bytes);
        return 1;
 }
 
        return 1;
 }
 
@@ -684,11 +685,13 @@ struct fec_client *vss_add_fec_client(struct sender_client *sc,
  */
 void vss_del_fec_client(struct fec_client *fc)
 {
  */
 void vss_del_fec_client(struct fec_client *fc)
 {
+       int i;
+
        list_del(&fc->node);
        list_del(&fc->node);
-       free(fc->src_data);
        free(fc->enc_buf);
        free(fc->enc_buf);
-       free(fc->extra_src_buf);
-       free(fc->extra_header_buf);
+       for (i = 0; i < fc_num_data_slices(fc); i++)
+               free(fc->src_data[i]);
+       free(fc->src_data);
        fec_free(fc->parms);
        free(fc);
 }
        fec_free(fc->parms);
        free(fc);
 }