Merge branch 'gerrit'
authorAndre Noll <maan@systemlinux.org>
Sat, 28 Feb 2009 13:56:11 +0000 (14:56 +0100)
committerAndre Noll <maan@systemlinux.org>
Sat, 28 Feb 2009 13:56:11 +0000 (14:56 +0100)
15 files changed:
command.c
configure.ac
error.h
fec.c [new file with mode: 0644]
fec.h [new file with mode: 0644]
fecdec_filter.c [new file with mode: 0644]
ggo/server.m4
net.c
net.h
server.h
udp_header.h [deleted file]
udp_recv.c
udp_send.c
vss.c
vss.h

index 988eae6..deb21a6 100644 (file)
--- a/command.c
+++ b/command.c
@@ -204,16 +204,9 @@ static int check_sender_args(int argc, char * const * argv, struct sender_comman
                break;
        case SENDER_ADD:
        case SENDER_DELETE:
-               if (argc != 4 && argc != 5)
-                       return -E_COMMAND_SYNTAX;
-               if (!inet_pton(AF_INET, argv[3], &scd->addr))
+               if (argc != 4 || parse_url(argv[3], scd->host,
+                               sizeof(scd->host), &scd->port) == NULL)
                        return -E_COMMAND_SYNTAX;
-               scd->port = -1;
-               if (argc == 5) {
-                       scd->port = atoi(argv[4]);
-                       if (scd->port < 0 || scd->port > 65535)
-                               return -E_COMMAND_SYNTAX;
-               }
                break;
        default:
                return -E_COMMAND_SYNTAX;
index 13ec59b..dca4920 100644 (file)
@@ -84,22 +84,23 @@ dccp_send fd user_list chunk_queue afs osl aft mood score attribute blob ringbuf
 playlist sha1 rbtree sched audiod grab_client filter_common wav_filter compress_filter
 http_recv dccp_recv recv_common write_common file_write audiod_command
 client_common recv stdout filter stdin audioc write client fsck exec send_common ggo
-udp_recv udp_send color"
+udp_recv udp_send color fec fecdec_filter"
 
 all_executables="server recv filter audioc write client fsck afh"
 
 recv_cmdline_objs="recv.cmdline http_recv.cmdline dccp_recv.cmdline udp_recv.cmdline"
 recv_errlist_objs="http_recv recv_common recv time string net dccp_recv
-       fd sched stdout ggo udp_recv"
+       fd sched stdout ggo udp_recv fec"
 recv_ldflags=""
 
 receivers=" http dccp udp"
 senders=" http dccp udp"
 
 filter_cmdline_objs="filter.cmdline compress_filter.cmdline amp_filter.cmdline"
-filter_errlist_objs="filter_common wav_filter compress_filter filter string stdin stdout sched fd amp_filter ggo"
+filter_errlist_objs="filter_common wav_filter compress_filter filter string
+       stdin stdout sched fd amp_filter ggo fecdec_filter fec"
 filter_ldflags=""
-filters=" compress wav amp"
+filters=" compress wav amp fecdec"
 
 audioc_cmdline_objs="audioc.cmdline"
 audioc_errlist_objs="audioc string net fd"
@@ -110,8 +111,8 @@ audiod_cmdline_objs="audiod.cmdline grab_client.cmdline compress_filter.cmdline
        audiod_command_list amp_filter.cmdline udp_recv.cmdline"
 audiod_errlist_objs="audiod signal string daemon stat net
        time grab_client filter_common wav_filter compress_filter amp_filter http_recv dccp_recv
-       recv_common fd sched write_common file_write audiod_command crypt
-       client_common ggo udp_recv color"
+       recv_common fd sched write_common file_write audiod_command crypt fecdec_filter
+       client_common ggo udp_recv color fec"
 audiod_ldflags=""
 audiod_audio_formats=""
 
@@ -123,7 +124,7 @@ server_cmdline_objs="server.cmdline server_command_list afs_command_list"
 server_errlist_objs="server afh_common mp3_afh vss command net string signal
        time daemon stat crypt http_send close_on_fork
        ipc dccp_send fd user_list chunk_queue afs osl aft mood score attribute
-       blob playlist sha1 rbtree sched acl send_common udp_send color"
+       blob playlist sha1 rbtree sched acl send_common udp_send color fec"
 server_ldflags=""
 server_audio_formats=" mp3"
 
diff --git a/error.h b/error.h
index 0a5fb21..c3ac3dc 100644 (file)
--- a/error.h
+++ b/error.h
@@ -31,13 +31,27 @@ DEFINE_ERRLIST_OBJECT_ENUM;
 #define GGO_ERRORS
 #define COLOR_ERRORS
 
-
 extern const char **para_errlist[];
 
 #define COMPRESS_FILTER_ERRORS \
        PARA_ERROR(COMPRESS_SYNTAX, "syntax error in compress filter config"), \
 
 
+#define FEC_ERRORS \
+       PARA_ERROR(FEC_BAD_IDX, "invalid index vector"), \
+       PARA_ERROR(FEC_SINGULAR, "unexpected singular matrix"), \
+       PARA_ERROR(FEC_PIVOT, "pivot column not found"), \
+       PARA_ERROR(FEC_PARMS, "invalid fec parameters"), \
+
+
+#define FECDEC_FILTER_ERRORS \
+       PARA_ERROR(BAD_FEC_HEADER, "invalid fec header"), \
+       PARA_ERROR(BAD_SLICE_SIZE, "slice size too large"), \
+       PARA_ERROR(BAD_SLICE_NUM, "invalid slice number"), \
+       PARA_ERROR(FECDEC_OVERRUN, "fecdec output buffer overrun"), \
+       PARA_ERROR(FECDEC_EOF, "received eof packet"), \
+
+
 #define AMP_FILTER_ERRORS \
        PARA_ERROR(AMP_SYNTAX, "syntax error in amp filter config"), \
 
diff --git a/fec.c b/fec.c
new file mode 100644 (file)
index 0000000..e543aed
--- /dev/null
+++ b/fec.c
@@ -0,0 +1,598 @@
+/*
+ * fec.c -- forward error correction based on Vandermonde matrices
+ * 980624
+ * (C) 1997-98 Luigi Rizzo (luigi@iet.unipi.it)
+ *
+ * Portions derived from code by Phil Karn (karn@ka9q.ampr.org),
+ * Robert Morelos-Zaragoza (robert@spectra.eng.hawaii.edu) and Hari
+ * Thirumoorthy (harit@spectra.eng.hawaii.edu), Aug 1995
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+ * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS
+ * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
+ * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
+ * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
+ * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
+ * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ */
+
+#include "para.h"
+#include "error.h"
+#include "portable_io.h"
+#include "string.h"
+#include "fec.h"
+
+#define GF_BITS  8 /* code over GF(256) */
+#define        GF_SIZE ((1 << GF_BITS) - 1)
+
+/*
+ * To speed up computations, we have tables for logarithm, exponent and inverse
+ * of a number. We use a table for multiplication as well (it takes 64K, no big
+ * deal even on a PDA, especially because it can be pre-initialized an put into
+ * a ROM!). The macro gf_mul(x,y) takes care of multiplications.
+ */
+static unsigned char gf_exp[2 * GF_SIZE]; /* index->poly form conversion table */
+static int gf_log[GF_SIZE + 1];        /* Poly->index form conversion table    */
+static unsigned char inverse[GF_SIZE + 1]; /* inverse of field elem. */
+static unsigned char gf_mul_table[GF_SIZE + 1][GF_SIZE + 1];
+/* Multiply two numbers. */
+#define gf_mul(x,y) gf_mul_table[x][y]
+
+/* Compute x % GF_SIZE without a slow divide. */
+static inline unsigned char modnn(int x)
+{
+       while (x >= GF_SIZE) {
+               x -= GF_SIZE;
+               x = (x >> GF_BITS) + (x & GF_SIZE);
+       }
+       return x;
+}
+
+static void init_mul_table(void)
+{
+       int i, j;
+       for (i = 0; i < GF_SIZE + 1; i++)
+               for (j = 0; j < GF_SIZE + 1; j++)
+                       gf_mul_table[i][j] =
+                               gf_exp[modnn(gf_log[i] + gf_log[j])];
+
+       for (j = 0; j < GF_SIZE + 1; j++)
+               gf_mul_table[0][j] = gf_mul_table[j][0] = 0;
+}
+
+static unsigned char *alloc_matrix(int rows, int cols)
+{
+       return para_malloc(rows * cols);
+}
+
+/*
+ * Initialize the data structures used for computations in GF.
+ *
+ * This generates GF(2**GF_BITS) from the irreducible polynomial p(X) in
+ * p[0]..p[m].
+ *
+ * Lookup tables:
+ *     index->polynomial form          gf_exp[] contains j= \alpha^i;
+ *     polynomial form -> index form   gf_log[ j = \alpha^i ] = i
+ * \alpha=x is the primitive element of GF(2^m)
+ *
+ * For efficiency, gf_exp[] has size 2*GF_SIZE, so that a simple
+ * multiplication of two numbers can be resolved without calling modnn
+ */
+static void generate_gf(void)
+{
+       int i;
+       unsigned char mask = 1;
+       char *pp = "101110001"; /* The primitive polynomial 1+x^2+x^3+x^4+x^8 */
+       gf_exp[GF_BITS] = 0; /* will be updated at the end of the 1st loop */
+
+       /*
+        * first, generate the (polynomial representation of) powers of \alpha,
+        * which are stored in gf_exp[i] = \alpha ** i .
+        * At the same time build gf_log[gf_exp[i]] = i .
+        * The first GF_BITS powers are simply bits shifted to the left.
+        */
+       for (i = 0; i < GF_BITS; i++, mask <<= 1) {
+               gf_exp[i] = mask;
+               gf_log[gf_exp[i]] = i;
+               /*
+                * If pp[i] == 1 then \alpha ** i occurs in poly-repr
+                * gf_exp[GF_BITS] = \alpha ** GF_BITS
+                */
+               if (pp[i] == '1')
+                       gf_exp[GF_BITS] ^= mask;
+       }
+       /*
+        * now gf_exp[GF_BITS] = \alpha ** GF_BITS is complete, so can also
+        * compute its inverse.
+        */
+       gf_log[gf_exp[GF_BITS]] = GF_BITS;
+       /*
+        * Poly-repr of \alpha ** (i+1) is given by poly-repr of \alpha ** i
+        * shifted left one-bit and accounting for any \alpha ** GF_BITS term
+        * that may occur when poly-repr of \alpha ** i is shifted.
+        */
+       mask = 1 << (GF_BITS - 1);
+       for (i = GF_BITS + 1; i < GF_SIZE; i++) {
+               if (gf_exp[i - 1] >= mask)
+                       gf_exp[i] =
+                               gf_exp[GF_BITS] ^ ((gf_exp[i - 1] ^ mask) << 1);
+               else
+                       gf_exp[i] = gf_exp[i - 1] << 1;
+               gf_log[gf_exp[i]] = i;
+       }
+       /*
+        * log(0) is not defined, so use a special value
+        */
+       gf_log[0] = GF_SIZE;
+       /* set the extended gf_exp values for fast multiply */
+       for (i = 0; i < GF_SIZE; i++)
+               gf_exp[i + GF_SIZE] = gf_exp[i];
+
+       inverse[0] = 0; /* 0 has no inverse. */
+       inverse[1] = 1;
+       for (i = 2; i <= GF_SIZE; i++)
+               inverse[i] = gf_exp[GF_SIZE - gf_log[i]];
+}
+
+/*
+ * Compute dst[] = dst[] + c * src[]
+ *
+ * This is used often, so better optimize it! Currently the loop is unrolled 16
+ * times. The case c=0 is also optimized, whereas c=1 is not.
+ */
+#define UNROLL 16
+static void addmul(unsigned char *dst1, const unsigned char const *src1,
+       unsigned char c, int sz)
+{
+       if (c == 0)
+               return;
+       unsigned char *dst = dst1, *lim = &dst[sz - UNROLL + 1],
+               *col = gf_mul_table[c];
+       const unsigned char const *src = src1;
+
+       for (; dst < lim; dst += UNROLL, src += UNROLL) {
+               dst[0] ^= col[src[0]];
+               dst[1] ^= col[src[1]];
+               dst[2] ^= col[src[2]];
+               dst[3] ^= col[src[3]];
+               dst[4] ^= col[src[4]];
+               dst[5] ^= col[src[5]];
+               dst[6] ^= col[src[6]];
+               dst[7] ^= col[src[7]];
+               dst[8] ^= col[src[8]];
+               dst[9] ^= col[src[9]];
+               dst[10] ^= col[src[10]];
+               dst[11] ^= col[src[11]];
+               dst[12] ^= col[src[12]];
+               dst[13] ^= col[src[13]];
+               dst[14] ^= col[src[14]];
+               dst[15] ^= col[src[15]];
+       }
+       lim += UNROLL - 1;
+       for (; dst < lim; dst++, src++) /* final components */
+               *dst ^= col[*src];
+}
+
+/*
+ * Compute C = AB where A is n*k, B is k*m, C is n*m
+ */
+static void matmul(unsigned char *a, unsigned char *b, unsigned char *c,
+               int n, int k, int m)
+{
+       int row, col, i;
+
+       for (row = 0; row < n; row++) {
+               for (col = 0; col < m; col++) {
+                       unsigned char *pa = &a[row * k], *pb = &b[col], acc = 0;
+                       for (i = 0; i < k; i++, pa++, pb += m)
+                               acc ^= gf_mul(*pa, *pb);
+                       c[row * m + col] = acc;
+               }
+       }
+}
+
+#define FEC_SWAP(a,b) {typeof(a) tmp = a; a = b; b = tmp;}
+
+/*
+ * Compute the inverse of a matrix.
+ *
+ * k is the size of the matrix 'src' (Gauss-Jordan, adapted from Numerical
+ * Recipes in C). Returns -1 if 'src' is singular.
+ */
+static int invert_mat(unsigned char *src, int k)
+{
+       int irow, icol, row, col, ix, error;
+       int *indxc = para_malloc(k * sizeof(int));
+       int *indxr = para_malloc(k * sizeof(int));
+       int *ipiv = para_malloc(k * sizeof(int)); /* elements used as pivots */
+       unsigned char c, *p, *id_row = alloc_matrix(1, k),
+               *temp_row = alloc_matrix(1, k);
+
+       memset(id_row, 0, k);
+       memset(ipiv, 0, k * sizeof(int));
+
+       for (col = 0; col < k; col++) {
+               unsigned char *pivot_row;
+               /*
+                * Zeroing column 'col', look for a non-zero element.
+                * First try on the diagonal, if it fails, look elsewhere.
+                */
+               irow = icol = -1;
+               if (ipiv[col] != 1 && src[col * k + col] != 0) {
+                       irow = col;
+                       icol = col;
+                       goto found_piv;
+               }
+               for (row = 0; row < k; row++) {
+                       if (ipiv[row] != 1) {
+                               for (ix = 0; ix < k; ix++) {
+                                       if (ipiv[ix] == 0) {
+                                               if (src[row * k + ix] != 0) {
+                                                       irow = row;
+                                                       icol = ix;
+                                                       goto found_piv;
+                                               }
+                                       } else if (ipiv[ix] > 1) {
+                                               error = -E_FEC_PIVOT;
+                                               goto fail;
+                                       }
+                               }
+                       }
+               }
+               error = -E_FEC_PIVOT;
+               if (icol == -1)
+                       goto fail;
+found_piv:
+               ++(ipiv[icol]);
+               /*
+                * swap rows irow and icol, so afterwards the diagonal element
+                * will be correct. Rarely done, not worth optimizing.
+                */
+               if (irow != icol)
+                       for (ix = 0; ix < k; ix++)
+                               FEC_SWAP(src[irow * k + ix], src[icol * k + ix]);
+               indxr[col] = irow;
+               indxc[col] = icol;
+               pivot_row = &src[icol * k];
+               error = -E_FEC_SINGULAR;
+               c = pivot_row[icol];
+               if (c == 0)
+                       goto fail;
+               if (c != 1) { /* otherwise this is a NOP */
+                       /*
+                        * this is done often , but optimizing is not so
+                        * fruitful, at least in the obvious ways (unrolling)
+                        */
+                       c = inverse[c];
+                       pivot_row[icol] = 1;
+                       for (ix = 0; ix < k; ix++)
+                               pivot_row[ix] = gf_mul(c, pivot_row[ix]);
+               }
+               /*
+                * from all rows, remove multiples of the selected row to zero
+                * the relevant entry (in fact, the entry is not zero because
+                * we know it must be zero).  (Here, if we know that the
+                * pivot_row is the identity, we can optimize the addmul).
+                */
+               id_row[icol] = 1;
+               if (memcmp(pivot_row, id_row, k) != 0) {
+                       for (p = src, ix = 0; ix < k; ix++, p += k) {
+                               if (ix != icol) {
+                                       c = p[icol];
+                                       p[icol] = 0;
+                                       addmul(p, pivot_row, c, k);
+                               }
+                       }
+               }
+               id_row[icol] = 0;
+       }
+       for (col = k - 1; col >= 0; col--) {
+               if (indxr[col] < 0 || indxr[col] >= k)
+                       PARA_CRIT_LOG("AARGH, indxr[col] %d\n", indxr[col]);
+               else if (indxc[col] < 0 || indxc[col] >= k)
+                       PARA_CRIT_LOG("AARGH, indxc[col] %d\n", indxc[col]);
+               else if (indxr[col] != indxc[col]) {
+                       for (row = 0; row < k; row++) {
+                               FEC_SWAP(src[row * k + indxr[col]],
+                                       src[row * k + indxc[col]]);
+                       }
+               }
+       }
+       error = 0;
+fail:
+       free(indxc);
+       free(indxr);
+       free(ipiv);
+       free(id_row);
+       free(temp_row);
+       return error;
+}
+
+/*
+ * Invert a Vandermonde matrix.
+ *
+ * It assumes that the matrix is not singular and _IS_ a Vandermonde matrix.
+ * Only uses the second column of the matrix, containing the p_i's.
+ *
+ * Algorithm borrowed from "Numerical recipes in C" -- sec.2.8, but largely
+ * revised for GF purposes.
+ */
+static void invert_vdm(unsigned char *src, int k)
+{
+       int i, j, row, col;
+       unsigned char *b, *c, *p, t, xx;
+
+       if (k == 1) /* degenerate */
+               return;
+       /*
+        * c holds the coefficient of P(x) = Prod (x - p_i), i=0..k-1
+        * b holds the coefficient for the matrix inversion
+        */
+       c = para_malloc(k);
+       b = para_malloc(k);
+       p = para_malloc(k);
+
+       for (j = 1, i = 0; i < k; i++, j += k) {
+               c[i] = 0;
+               p[i] = src[j];
+       }
+       /*
+        * construct coeffs recursively. We know c[k] = 1 (implicit) and start
+        * P_0 = x - p_0, then at each stage multiply by x - p_i generating P_i
+        * = x P_{i-1} - p_i P_{i-1} After k steps we are done.
+        */
+       c[k - 1] = p[0]; /* really -p(0), but x = -x in GF(2^m) */
+       for (i = 1; i < k; i++) {
+               unsigned char p_i = p[i];
+               for (j = k - 1 - (i - 1); j < k - 1; j++)
+                       c[j] ^= gf_mul(p_i, c[j + 1]);
+               c[k - 1] ^= p_i;
+       }
+
+       for (row = 0; row < k; row++) {
+               /*
+                * synthetic division etc.
+                */
+               xx = p[row];
+               t = 1;
+               b[k - 1] = 1; /* this is in fact c[k] */
+               for (i = k - 2; i >= 0; i--) {
+                       b[i] = c[i + 1] ^ gf_mul(xx, b[i + 1]);
+                       t = gf_mul(xx, t) ^ b[i];
+               }
+               for (col = 0; col < k; col++)
+                       src[col * k + row] = gf_mul(inverse[t], b[col]);
+       }
+       free(c);
+       free(b);
+       free(p);
+}
+
+static int fec_initialized;
+
+static void init_fec(void)
+{
+       generate_gf();
+       init_mul_table();
+       fec_initialized = 1;
+}
+
+struct fec_parms {
+       int k, n; /* parameters of the code */
+       unsigned char *enc_matrix;
+};
+
+/**
+ * Deallocate a fec params structure.
+ *
+ * \param p The structure to free.
+ */
+void fec_free(struct fec_parms *p)
+{
+       if (!p)
+               return;
+       free(p->enc_matrix);
+       free(p);
+}
+
+/**
+ * Create a new encoder and return an opaque descriptor to it.
+ *
+ * \param k Number of input slices.
+ * \param n Number of output slices.
+ * \param result On success the Fec descriptor is returned here.
+ *
+ * \return Standard.
+ *
+ * This creates the k*n encoding matrix.  It is computed starting with a
+ * Vandermonde matrix, and then transformed into a systematic matrix.
+ */
+int fec_new(int k, int n, struct fec_parms **result)
+{
+       int row, col;
+       unsigned char *p, *tmp_m;
+       struct fec_parms *parms;
+
+       if (!fec_initialized)
+               init_fec();
+
+       if (k < 1 || k > GF_SIZE + 1 || n > GF_SIZE + 1 || k > n)
+               return -E_FEC_PARMS;
+       parms = para_malloc(sizeof(struct fec_parms));
+       parms->k = k;
+       parms->n = n;
+       parms->enc_matrix = alloc_matrix(n, k);
+       tmp_m = alloc_matrix(n, k);
+       /*
+        * fill the matrix with powers of field elements, starting from 0.
+        * The first row is special, cannot be computed with exp. table.
+        */
+       tmp_m[0] = 1;
+       for (col = 1; col < k; col++)
+               tmp_m[col] = 0;
+       for (p = tmp_m + k, row = 0; row < n - 1; row++, p += k) {
+               for (col = 0; col < k; col++)
+                       p[col] = gf_exp[modnn(row * col)];
+       }
+
+       /*
+        * quick code to build systematic matrix: invert the top
+        * k*k vandermonde matrix, multiply right the bottom n-k rows
+        * by the inverse, and construct the identity matrix at the top.
+        */
+       invert_vdm(tmp_m, k); /* much faster than invert_mat */
+       matmul(tmp_m + k * k, tmp_m, parms->enc_matrix + k * k, n - k, k, k);
+       /*
+        * the upper matrix is I so do not bother with a slow multiply
+        */
+       memset(parms->enc_matrix, 0, k * k);
+       for (p = parms->enc_matrix, col = 0; col < k; col++, p += k + 1)
+               *p = 1;
+       free(tmp_m);
+       *result = parms;
+       return 0;
+}
+
+/**
+ * Compute one encoded slice of the given input.
+ *
+ * \param parms The fec parameters returned earlier by fec_new().
+ * \param src The \a k data slices to encode.
+ * \param dst Result pointer.
+ * \param idx The index of the slice to compute.
+ * \param sz The size of the input data packets.
+ *
+ * Encode the \a k slices of size \a sz given by \a src and store the output
+ * slice number \a idx in \dst.
+ */
+void fec_encode(struct fec_parms *parms, const unsigned char * const *src,
+               unsigned char *dst, int idx, int sz)
+{
+       int i, k = parms->k;
+       unsigned char *p;
+
+       assert(idx <= parms->n);
+
+       if (idx < k) {
+               memcpy(dst, src[idx], sz);
+               return;
+       }
+       p = &(parms->enc_matrix[idx * k]);
+       memset(dst, 0, sz);
+       for (i = 0; i < k; i++)
+               addmul(dst, src[i], p[i], sz);
+}
+
+/* Move src packets in their position. */
+static int shuffle(unsigned char **data, int *idx, int k)
+{
+       int i;
+
+       for (i = 0; i < k;) {
+               if (idx[i] >= k || idx[i] == i)
+                       i++;
+               else { /* put index and data at the right position */
+                       int c = idx[i];
+
+                       if (idx[c] == c) /* conflict */
+                               return -E_FEC_BAD_IDX;
+                       FEC_SWAP(idx[i], idx[c]);
+                       FEC_SWAP(data[i], data[c]);
+               }
+       }
+       return 0;
+}
+
+/*
+ * Construct the decoding matrix given the indices. The encoding matrix must
+ * already be allocated.
+ */
+static int build_decode_matrix(struct fec_parms *parms, int *idx,
+               unsigned char **result)
+{
+       int ret = -E_FEC_BAD_IDX, i, k = parms->k;
+       unsigned char *p, *matrix = alloc_matrix(k, k);
+
+       for (i = 0, p = matrix; i < k; i++, p += k) {
+               if (idx[i] >= parms->n) /* invalid index */
+                       goto err;
+               if (idx[i] < k) {
+                       memset(p, 0, k);
+                       p[i] = 1;
+               } else
+                       memcpy(p, &(parms->enc_matrix[idx[i] * k]), k);
+       }
+       ret = invert_mat(matrix, k);
+       if (ret < 0)
+               goto err;
+       *result = matrix;
+       return 0;
+err:
+       free(matrix);
+       *result = NULL;
+       return ret;
+}
+
+/**
+ * Decode one slice from the group of received slices.
+ *
+ * \param code Pointer to fec params structure.
+ * \param data Pointers to received packets.
+ * \param idx Pointer to packet indices (gets modified).
+ * \param sz Size of each packet.
+ *
+ * \return Zero on success, -1 on errors.
+ *
+ * The \a data vector of received slices and the indices of slices are used to
+ * produce the correct output slice. The data slices are modified in-place.
+ */
+int fec_decode(struct fec_parms *parms, unsigned char **data, int *idx,
+               int sz)
+{
+       unsigned char *m_dec, **slice;
+       int ret, row, col, k = parms->k;
+
+       ret = shuffle(data, idx, k);
+       if (ret < 0)
+               return ret;
+       ret = build_decode_matrix(parms, idx, &m_dec);
+       if (ret < 0)
+               return ret;
+       /* do the actual decoding */
+       slice = para_malloc(k * sizeof(unsigned char *));
+       for (row = 0; row < k; row++) {
+               if (idx[row] >= k) {
+                       slice[row] = para_calloc(sz);
+                       for (col = 0; col < k; col++)
+                               addmul(slice[row], data[col],
+                                       m_dec[row * k + col], sz);
+               }
+       }
+       /* move slices to their final destination */
+       for (row = 0; row < k; row++) {
+               if (idx[row] >= k) {
+                       memcpy(data[row], slice[row], sz);
+                       free(slice[row]);
+               }
+       }
+       free(slice);
+       free(m_dec);
+       return 0;
+}
diff --git a/fec.h b/fec.h
new file mode 100644 (file)
index 0000000..241cf9a
--- /dev/null
+++ b/fec.h
@@ -0,0 +1,47 @@
+/*
+ * fec.c -- forward error correction based on Vandermonde matrices
+ * 980614
+ * (C) 1997-98 Luigi Rizzo (luigi@iet.unipi.it)
+ *
+ * Portions derived from code by Phil Karn (karn@ka9q.ampr.org),
+ * Robert Morelos-Zaragoza (robert@spectra.eng.hawaii.edu) and Hari
+ * Thirumoorthy (harit@spectra.eng.hawaii.edu), Aug 1995
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+ * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS
+ * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
+ * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
+ * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
+ * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
+ * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ */
+
+#define FEC_MAGIC 0xFECC0DEC
+#define FEC_HEADER_SIZE 32
+
+struct fec_parms;
+
+int fec_new(int k, int n, struct fec_parms **parms);
+void fec_free(struct fec_parms *p);
+void fec_encode(struct fec_parms *parms, const unsigned char * const *src,
+               unsigned char *dst, int idx, int sz);
+int fec_decode(struct fec_parms *parms, unsigned char **data, int *idx,
+               int sz);
+
+
diff --git a/fecdec_filter.c b/fecdec_filter.c
new file mode 100644 (file)
index 0000000..721fc09
--- /dev/null
@@ -0,0 +1,330 @@
+/*
+ * Copyright (C) 2009 Andre Noll <maan@systemlinux.org>
+ *
+ * Licensed under the GPL v2. For licencing details see COPYING.
+ */
+
+/** \file fecdev_filter.c A filter fec-decodes an audio stream. */
+
+#include <dirent.h>
+#include "para.h"
+#include "error.h"
+#include "list.h"
+#include "sched.h"
+#include "ggo.h"
+#include "filter.h"
+#include "string.h"
+#include "portable_io.h"
+#include "fec.h"
+#include "fd.h"
+
+#define NUM_FEC_GROUPS 3
+#define INPUT_BUFFER_SIZE 16384
+
+/** size of the output buffer */
+#define FECDEC_OUTBUF_SIZE 81920
+
+struct fec_header {
+       uint8_t slices_per_group;
+       uint8_t data_slices_per_group;
+       uint32_t audio_header_size;
+
+       uint32_t group_num;
+       uint32_t group_bytes;
+
+       uint8_t slice_num;
+       uint16_t slice_bytes;
+};
+
+struct fec_group {
+       struct fec_header h;
+       int num_received_slices;
+       int num_slices;
+       int *idx;
+       unsigned char **data;
+};
+
+struct private_fecdec_data {
+       struct fec_parms *fec;
+       struct fec_group groups[NUM_FEC_GROUPS];
+};
+
+#define FOR_EACH_FEC_GROUP(g, d) for (g = (d)->groups; \
+       (g) - (d)->groups < NUM_FEC_GROUPS; (g)++)
+
+#define UNUSED_GROUP_NUM 0xffffffff
+
+static int group_complete(struct fec_group *fg)
+{
+       if (fg->h.group_num == UNUSED_GROUP_NUM)
+               return 0;
+       //PARA_INFO_LOG("received slices: %u, slices per group: %u\n", fg->num_received_slices, fg->h.data_slices_per_group);
+       return fg->num_received_slices >= fg->h.data_slices_per_group;
+}
+
+static int group_empty(struct fec_group *fg)
+{
+       return fg->num_received_slices == 0;
+}
+
+static void clear_group(struct fec_group *fg)
+{
+       int i;
+
+       if (!group_complete(fg) && !group_empty(fg))
+               PARA_WARNING_LOG("Clearing incomplete group %d "
+                       "(contains %d slices)\n", fg->h.group_num,
+                       fg->num_received_slices);
+       for (i = 0; i < fg->num_slices; i++) {
+               free(fg->data[i]);
+               fg->data[i] = NULL;
+               fg->idx[i] = -1;
+       }
+       free(fg->data);
+       free(fg->idx);
+       fg->num_slices = 0;
+       memset(&fg->h, 0, sizeof(struct fec_header));
+       fg->num_received_slices = 0;
+       fg->h.group_num = UNUSED_GROUP_NUM;
+}
+
+static int find_group(struct fec_header *h,
+               struct private_fecdec_data *pfd, struct fec_group **result)
+{
+       struct fec_group *fg;
+
+       FOR_EACH_FEC_GROUP(fg, pfd) {
+               if (fg->h.group_num != h->group_num)
+                       continue;
+               *result = fg;
+               return 1;
+       }
+       return 0;
+}
+
+static struct fec_group *find_unused_group(struct private_fecdec_data *pfd)
+{
+       struct fec_group *fg;
+
+       FOR_EACH_FEC_GROUP(fg, pfd) {
+               if (fg->num_received_slices == 0)
+                       return fg;
+       }
+       return NULL;
+}
+
+static struct fec_group *try_to_free_group(struct private_fecdec_data *pfd)
+{
+       struct fec_group *fg;
+
+       FOR_EACH_FEC_GROUP(fg, pfd) {
+               if (!group_complete(fg))
+                       continue;
+               clear_group(fg);
+               return fg;
+       }
+       return NULL;
+}
+
+static struct fec_group *free_oldest_group(struct private_fecdec_data *pfd)
+{
+       struct fec_group *fg, *oldest = NULL;
+
+       FOR_EACH_FEC_GROUP(fg, pfd) {
+               if (!oldest || oldest->h.group_num > fg->h.group_num)
+                       oldest = fg;
+       }
+       clear_group(oldest);
+       return oldest;
+}
+
+static int get_group(struct fec_header *h, struct private_fecdec_data *pfd,
+               struct fec_group **result)
+{
+       struct fec_group *fg;
+       int ret = find_group(h, pfd, &fg);
+
+       if (ret < 0)
+               return ret;
+       if (ret > 0) /* found group */
+               goto success;
+       /* group not found */
+       fg = find_unused_group(pfd);
+       if (fg)
+               goto update_header;
+       fg = try_to_free_group(pfd);
+       if (fg)
+               goto update_header;
+       fg = free_oldest_group(pfd);
+update_header:
+       fg->h = *h;
+success:
+       *result = fg;
+       return 1;
+}
+
+static int add_slice(char *buf, struct fec_group *fg)
+{
+       int r, slice_num;
+
+       if (group_complete(fg))
+               return 0;
+       slice_num = fg->h.slice_num;
+       if (fg->num_slices == 0) {
+               fg->num_slices = fg->h.slices_per_group;
+               fg->idx = malloc(fg->num_slices * sizeof(int));
+               fg->data = malloc(fg->num_slices * sizeof(unsigned char *));
+               memset(fg->data, 0, fg->num_slices * sizeof(unsigned char *));
+       }
+       r = fg->num_received_slices;
+       fg->idx[r] = slice_num;
+       fg->data[r] = malloc(fg->h.slice_bytes);
+       memcpy(fg->data[r], buf, fg->h.slice_bytes);
+       fg->num_received_slices++;
+       return 1;
+}
+
+static int decode_group(struct fec_group *fg, struct filter_node *fn)
+{
+       int i, ret, sb = fg->h.slice_bytes;
+       size_t written = 0;
+       struct private_fecdec_data *pfd = fn->private_data;
+
+       ret = fec_decode(pfd->fec, fg->data, fg->idx, sb);
+       if (ret < 0)
+               return ret;
+       PARA_DEBUG_LOG("writing group %d (%d/%d decoded data bytes)\n",
+               fg->h.group_num, fg->h.group_bytes,
+               fg->h.data_slices_per_group * sb);
+       for (i = 0; i < fg->h.data_slices_per_group; i++) {
+               size_t n = sb;
+               if (n + written > fg->h.group_bytes)
+                       n = fg->h.group_bytes - written;
+               if (fn->loaded + n > fn->bufsize)
+                       return -E_FECDEC_OVERRUN;
+               memcpy(fn->buf + fn->loaded, fg->data[i], n);
+               fn->loaded += n;
+               written += n;
+       }
+       return 0;
+}
+
+/**
+ * Read a fec header from a buffer.
+ *
+ * \param buf The buffer to write to.
+ * \param h The fec header to write.
+ */
+static int read_fec_header(char *buf, size_t len, struct fec_header *h)
+{
+       uint32_t magic;
+
+       if (len < FEC_HEADER_SIZE)
+               return 0;
+       magic = read_u32(buf);
+       if (magic != FEC_MAGIC)
+               return -E_BAD_FEC_HEADER;
+       h->slices_per_group = read_u8(buf + 4);
+       h->data_slices_per_group = read_u8(buf + 5);
+       h->audio_header_size = read_u32(buf + 6);
+
+       h->group_num = read_u32(buf + 10);
+       h->group_bytes = read_u32(buf + 14);
+
+       h->slice_num = read_u8(buf + 18);
+       h->slice_bytes = read_u16(buf + 20);
+       if (!h->group_bytes && & h->slice_bytes)
+               return -E_FECDEC_EOF;
+//     PARA_DEBUG_LOG("group %u, slize %u, slices per group: %u\n",
+//             h->group_num, h->slice_num, h->slices_per_group);
+       return 1;
+}
+
+static int dispatch_slice(char *buf, size_t len, struct fec_header *h,
+               struct filter_node *fn)
+{
+       struct fec_group *fg;
+       int ret;
+       struct private_fecdec_data *pfd = fn->private_data;
+
+       if (h->slice_bytes > len) /* can not use the thing, try to read more */
+               return 0;
+       ret = get_group(h, pfd, &fg);
+       if (ret < 0)
+               return ret;
+       if (group_complete(fg)) {
+               PARA_DEBUG_LOG("group complete, ignoring slice %d\n",
+                       h->slice_num);
+               return 1;
+       }
+       fg->h = *h;
+       ret = add_slice(buf, fg);
+       if (ret < 0)
+               return ret;
+       if (group_complete(fg)) {
+               if (!pfd->fec) {
+                       int k = h->data_slices_per_group, n = h->slices_per_group;
+                       PARA_NOTICE_LOG("init fec (%d, %d)\n", k, n);
+                       ret = fec_new(k, n, &pfd->fec);
+                       if (ret < 0)
+                               return ret;
+               }
+               ret = decode_group(fg, fn);
+               if (ret < 0)
+                       return ret;
+       }
+       return 1;
+}
+
+static int fecdec(char *buf, size_t len, struct filter_node *fn)
+{
+       int ret;
+       struct fec_header h;
+
+       ret = read_fec_header(buf, len, &h);
+       if (ret <= 0)
+               return ret;
+       if (h.slice_bytes > INPUT_BUFFER_SIZE)
+               return -E_BAD_SLICE_SIZE;
+       if (h.slice_num > h.slices_per_group)
+               return -E_BAD_SLICE_NUM;
+       ret = dispatch_slice(buf + FEC_HEADER_SIZE, len - FEC_HEADER_SIZE,
+               &h, fn);
+       //PARA_INFO_LOG("ret: %d, len: %d, slice_bytes: %d\n", ret, len, h.slice_bytes);
+       if (ret <= 0)
+               return ret;
+       return FEC_HEADER_SIZE + h.slice_bytes;
+}
+
+static void fecdec_close(struct filter_node *fn)
+{
+       struct private_fecdec_data *pfd = fn->private_data;
+       struct fec_group *fg;
+
+       FOR_EACH_FEC_GROUP(fg, pfd)
+               clear_group(fg);
+       free(fn->buf);
+       fn->buf = NULL;
+       free(fn->private_data);
+       fn->private_data = NULL;
+}
+
+static void fecdec_open(struct filter_node *fn)
+{
+       fn->bufsize = FECDEC_OUTBUF_SIZE;
+       fn->buf = para_malloc(fn->bufsize);
+       fn->private_data = para_calloc(sizeof(struct private_fecdec_data));
+       fn->loaded = 0;
+}
+
+/**
+ * The init function of the fecdec filter.
+ *
+ * \param f struct to initialize.
+ */
+void fecdec_filter_init(struct filter *f)
+{
+       f->convert = fecdec;
+       f->close = fecdec_close;
+       f->open = fecdec_open;
+}
index 8aa753b..f2738d3 100644 (file)
@@ -243,16 +243,23 @@ section "udp sender"
 
 option "udp_target" -
 #~~~~~~~~~~~~~~~~~~~~
-"add udp target"
-string typestr="a.b.c.d:p"
+"add udp target with optional port"
+string typestr="host[:port]"
 optional
 multiple
 details="
-       Add given host/port to the list of targets.  This option
-       can be given multiple times. Example: '224.0.1.38:1500'
-       instructs the udp sender to send to udp port 1500 on host
-       224.0.1.38 (unassigned ip in the Local Network Control Block
-       224.0.0/24). This is useful for multicast streaming.
+       Add given host/port to the list of targets. The 'host' argument
+       can be either an IPv4/v6 address or hostname (RFC 3986 syntax).
+       The 'port' argument is an optional port number. If the 'port'
+       part is absent, the 'udp_default_port' value is used.
+
+       The following examples are possible targets:
+       '10.10.1.2:8000' (host:port); '10.10.1.2' (with default port);
+       '224.0.1.38:1500' (IPv4 multicast); 'localhost:8001' (hostname
+       with port); '[::1]:8001' (IPv6 localhost); '[badc0de::1]' (IPv6
+       host with default port); '[FF00::beef]:1500' (IPv6 multicast).
+
+       This option can be given multiple times, for multiple targets.
 "
 
 option "udp_no_autostart" -
diff --git a/net.c b/net.c
index b510dc1..2a3fc72 100644 (file)
--- a/net.c
+++ b/net.c
@@ -20,6 +20,7 @@
 #endif
 
 #include <dirent.h>
+#include <regex.h>
 
 #include "para.h"
 #include "error.h"
@@ -85,6 +86,114 @@ void disable_crypt(int fd)
        crypt_data_array[fd].private_data = NULL;
 }
 
+/**
+ * Match string as a candidate IPv4 address.
+ *
+ * \param address The string to match.
+ * \return True if \a address has "dot-quad" format.
+ */
+static bool is_v4_dot_quad(const char *address)
+{
+       bool result;
+       regex_t r;
+
+       assert(!regcomp(&r, "^([0-9]+\\.){3}[0-9]+$", REG_EXTENDED|REG_NOSUB));
+       result = regexec(&r, address, 0, NULL, 0) == 0;
+       regfree(&r);
+       return result;
+}
+
+/**
+ * Perform basic syntax checking on the host-part of an URL:
+ *
+ * - Since ':' is invalid in IPv4 addresses and DNS names, the
+ *   presence of ':' causes interpretation as IPv6 address;
+ * - next the first-match-wins algorithm from RFC 3986 is applied;
+ * - else the string is considered as DNS name, to be resolved later.
+ *
+ * \param host The host string to check.
+ * \return True if \a host passes the syntax checks.
+ *
+ * \sa RFC 3986, 3.2.2; RFC 1123, 2.1; RFC 1034, 3.5
+ */
+static bool host_string_ok(const char *host)
+{
+       if (host == NULL || *host == '\0')
+               return false;
+       if (strchr(host, ':') != NULL)
+               return is_valid_ipv6_address(host);
+       if (is_v4_dot_quad(host))
+               return is_valid_ipv4_address(host);
+       return true;
+}
+
+/**
+ * Parse and validate URL string.
+ *
+ * The URL syntax is loosely based on RFC 3986, supporting one of
+ * - "["host"]"[:port] for native IPv6 addresses and
+ * - host[:port] for IPv4 hostnames and DNS names.
+ *
+ * Native IPv6 addresses must be enclosed in square brackets, since
+ * otherwise there is an ambiguity with the port separator `:'.
+ * The 'port' part is always considered to be a number; if absent,
+ * it is set to -1, to indicate that a default port is to be used.
+ *
+ * The following are valid examples:
+ * - 10.10.1.1
+ * - 10.10.1.2:8000
+ * - localhost
+ * - localhost:8001
+ * - [::1]:8000
+ * - [badc0de::1]
+ *
+ * \param url    The URL string to take apart.
+ * \param host   To return the copied host part of \a url.
+ * \param hostlen The maximum length of \a host.
+ * \param port   To return the port number (if any) of \a url.
+ *
+ * \return Pointer to \a host, or NULL if failed.
+ * If NULL is returned, \a host and \a portnum are undefined. If no
+ * port number was present in \a url, \a portnum is set to -1.
+ *
+ * \sa RFC 3986, 3.2.2/3.2.3
+ */
+char *parse_url(const char *url,
+               char    *host, ssize_t hostlen,
+               int32_t *port)
+{
+       const char *o = url;
+       char *c = host, *end = c + (hostlen - 1);
+
+       *port = -1;
+
+       if (o == NULL || hostlen < 1)
+               goto failed;
+
+       if (*o == '[') {
+               for (++o; (*c = *o == ']' ? '\0' : *o); c++, o++)
+                       if (c == end)
+                               goto failed;
+
+               if (*o++ != ']' || (*o != '\0' && *o != ':'))
+                       goto failed;
+       } else {
+               for (; (*c = *o == ':'? '\0' : *o); c++, o++)
+                       if (c == end)
+                               goto failed;
+       }
+
+       if (*o == ':')
+               if (para_atoi32(++o, port) < 0 ||
+                   *port < 0 || *port > 0xffff)
+                       goto failed;
+
+       if (host_string_ok(host))
+               return host;
+failed:
+       *host = '\0';
+       return NULL;
+}
 
 /**
  * Determine the socket type for a given layer-4 protocol.
diff --git a/net.h b/net.h
index 8ec9fa5..11b1708 100644 (file)
--- a/net.h
+++ b/net.h
 #endif
 /** \endcond */
 
+
+/**
+ * Functions to parse and validate (parts of) URLs.
+ */
+extern char *parse_url(const char *url,
+                      char *host, ssize_t hostlen, int32_t *port);
 /**
  * Ensure that string conforms to the IPv4 address format.
  *
@@ -39,6 +45,21 @@ _static_inline_ bool is_valid_ipv4_address(const char *address)
        return inet_pton(AF_INET, address, &test_it) != 0;
 }
 
+/**
+ * Ensure that string conforms to IPv6 address format.
+ *
+ * \param address The address string to check.
+ *
+ * \return 1 if string has a valid IPv6 address syntax, 0 if not.
+ * \sa RFC 4291
+ */
+_static_inline_ bool is_valid_ipv6_address(const char *address)
+{
+       struct in6_addr test_it;
+
+       return inet_pton(AF_INET6, address, &test_it) != 0;
+}
+
 /**
  * Generic socket creation (passive and active sockets).
  */
index 8d1d675..a0dfd2d 100644 (file)
--- a/server.h
+++ b/server.h
@@ -6,10 +6,12 @@
 
 /** \file server.h Common server data structures. */
 
-
 /** Size of the selector_info and audio_file info strings of struct misc_meta_data. */
 #define MMD_INFO_SIZE 16384
 
+/** The maximum length of the host component in an URL */
+#define MAX_HOSTLEN    256
+
 /**
  * Defines one command of para_server.
  */
@@ -35,9 +37,7 @@ struct sender_command_data{
        /** The number of the sender in question. */
        int sender_num;
        /** Used for the allow/deny/add/remove subcommands. */
-       struct in_addr addr;
-       /** Used for the allow/deny/add/remove subcommands. */
-       char host[256];
+       char host[MAX_HOSTLEN];
        /** Used for allow/deny. */
        int netmask;
        /** The port number for add/remove. */
diff --git a/udp_header.h b/udp_header.h
deleted file mode 100644 (file)
index 7e94b58..0000000
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Copyright (C) 2006-2009 Andre Noll <maan@systemlinux.org>
- *
- * Licensed under the GPL v2. For licencing details see COPYING.
- */
-
-/** \file udp_header.h some macros used by udp_send.c and udp_recv.c. */
-#include <net/if.h>
-
-/**
- * Number of bytes of the paraslash udp header.
- *
- * The udp sender prepends a header at the beginning of each data chunk. Within
- * this header, the type of the current audio stream and the * type of this
- * data chunk is coded.
- */
-#define UDP_AUDIO_HEADER_LEN 16
-
-/** The possible stream types. */
-enum udp_stream_type {
-       /** Used for mp3 and aac streams. */
-       UDP_PLAIN_STREAM,
-       /** Ogg vorbis streams. */
-       UDP_HEADER_STREAM,
-       /** stream type not yet known. */
-       UDP_UNKNOWN_STREAM
-};
-
-/** The possible packet types. */
-enum udp_audio_packet_type {
-       /** Beginning of file. */
-       UDP_BOF_PACKET,
-       /** End of file. */
-       UDP_EOF_PACKET,
-       /** Combined header/data packet (ogg only). */
-       UDP_HEADER_PACKET,
-       /** Packet contains only audio file data. */
-       UDP_DATA_PACKET,
-       /** Invalid packet type. */
-       UDP_UNKNOWN_PACKET
-};
-
-/** The contents of an udp audio header. */
-struct udp_audio_header {
-       /** see \ref udp_stream_type. */
-       uint8_t stream_type;
-       /** see \ref udp_audio_packet_type. */
-       uint8_t packet_type;
-       /** Non-zero only for header packets. */
-       uint16_t header_len;
-       /** Length of header plus audio file data. */
-       uint16_t payload_len;
-};
-
-/**
- * Write a struct udp_audio_header to a buffer.
- *
- * \param buf The buffer to write to.
- * \param h The audio header to write.
- *
- * Used by the udp sender.
- *
- */
-_static_inline_ void write_udp_audio_header(char *buf, struct udp_audio_header *h)
-{
-       memcpy(buf, "UDPM", 4);
-       write_u8(buf + 4, h->stream_type);
-       write_u8(buf + 5, h->packet_type);
-       write_u16(buf + 6, h->header_len);
-       write_u16(buf + 8, h->payload_len);
-       memset(buf + 10, 0, 6);
-}
-
-/**
- * Used by the udp receiver to read a struct udp_audio_header from a buffer.
- *
- * \param buf The buffer to read from.
- * \param len The length of \a buf.
- * \param h Result pointer.
- *
- * \return 1 if \a buf contains a valid udp audio header, -1 else.
- */
-_static_inline_ int read_udp_audio_header(char *buf, size_t len,
-               struct udp_audio_header *h)
-{
-       if (len < 4)
-               goto err;
-       if (memcmp(buf, "UDPM", 4))
-               goto err;
-       h->stream_type = read_u8(buf + 4);
-       h->packet_type = read_u8(buf + 5);
-       h->header_len = read_u16(buf + 6);
-       h->payload_len = read_u16(buf + 8);
-       return 1;
-err:
-       h->stream_type = UDP_UNKNOWN_STREAM;
-       h->packet_type = UDP_UNKNOWN_PACKET;
-       h->header_len = h->payload_len = 0;
-       return -1;
-}
index 9ea35d8..759caa3 100644 (file)
@@ -6,11 +6,11 @@
 /** \file udp_recv.c Paraslash's udp receiver */
 
 #include <dirent.h>
+#include <net/if.h>
 
 #include "para.h"
 #include "error.h"
 #include "portable_io.h"
-#include "udp_header.h"
 #include "list.h"
 #include "sched.h"
 #include "ggo.h"
 
 /** The size of the receiver node buffer. */
 #define UDP_RECV_CHUNK_SIZE (128 * 1024)
-
 /**
  * Data specific to the udp receiver.
  *
  * \sa \ref receiver, \ref receiver_node.
  */
 struct private_udp_recv_data {
-       /**
-        * Whether a header was received.
-        *
-        * A flag indicating whether this receiver already received a packet
-        * which contains the audio file header.
-        *
-        * This flag has no effect if the audio stream indicates that no extra
-        * headers will be sent (mp3, aac).  Otherwise, all data packets are
-        * dropped until the header is received.
-        */
-       int have_header;
        /** The socket file descriptor. */
        int fd;
-       /** Non-zero on short reads. */
-       uint16_t need_more;
-       /** Copied from the first audio header received. */
-       uint16_t stream_type;
 };
 
 static void udp_recv_pre_select(struct sched *s, struct task *t)
@@ -62,53 +46,6 @@ static int enough_space(size_t nbytes, size_t loaded)
        return nbytes + loaded < UDP_RECV_CHUNK_SIZE;
 }
 
-/*
- * Perform some sanity checks on an udp audio file header.
- *
- * return: negative on error, 0: discard data, 1: use data
- */
-static int examine_audio_header(struct private_udp_recv_data *purd,
-               struct udp_audio_header *uah, size_t packet_size)
-{
-       /* payload_len includes header */
-       if (uah->payload_len < uah->header_len)
-               return -E_UDP_BAD_HEADER;
-       switch (uah->packet_type) {
-       case UDP_EOF_PACKET:
-               return -E_RECV_EOF;
-       case UDP_BOF_PACKET:
-               purd->have_header = 1;
-               /* fall through */
-       case UDP_DATA_PACKET:
-               if (uah->header_len) /* header in no-header packet */
-                       return -E_UDP_BAD_HEADER;
-               break;
-       case UDP_HEADER_PACKET:
-               if (!uah->header_len) /** no header in header packet */
-                       return -E_UDP_BAD_HEADER;
-               break;
-       default: /* bad packet type */
-               return -E_UDP_BAD_HEADER;
-       }
-       /* check stream type */
-       if (uah->stream_type != UDP_PLAIN_STREAM &&
-                       uah->stream_type != UDP_HEADER_STREAM)
-               return -E_UDP_BAD_STREAM_TYPE;
-       if (purd->stream_type == UDP_UNKNOWN_STREAM)
-               purd->stream_type = uah->stream_type;
-       /* stream type must not change */
-       if (uah->stream_type != purd->stream_type)
-               return -E_UDP_BAD_STREAM_TYPE;
-       if (!purd->have_header && uah->stream_type == UDP_HEADER_STREAM)
-               /* can't use the data, wait for header packet */
-               return 0;
-       if (packet_size < uah->payload_len + UDP_AUDIO_HEADER_LEN)
-               /* we read only a part of the package */
-               purd->need_more = uah->payload_len
-                       + UDP_AUDIO_HEADER_LEN - packet_size;
-       return 1;
-}
-
 static int add_rn_output(struct receiver_node *rn, char *buf, size_t len)
 {
        if (!len)
@@ -126,10 +63,7 @@ static void udp_recv_post_select(__a_unused struct sched *s, struct task *t)
        struct private_udp_recv_data *purd = rn->private_data;
        int ret;
        char tmpbuf[UDP_RECV_CHUNK_SIZE];
-       uint16_t data_len;
-       char *data_buf;
        size_t packet_size;
-       struct udp_audio_header uah;
 
        if (rn->output_error && *rn->output_error < 0) {
                t->error = *rn->output_error;
@@ -148,43 +82,9 @@ static void udp_recv_post_select(__a_unused struct sched *s, struct task *t)
        if (!ret)
                return;
        packet_size = ret;
-       for (;;) {
-               uint16_t num;
-
-               if (!purd->need_more) {
-                       ret = read_udp_audio_header(tmpbuf, packet_size, &uah);
-                       if (ret >= 0)
-                               break;
-                       goto success; /* drop data */
-               }
-               num = PARA_MIN(purd->need_more, (uint16_t)packet_size);
-               assert(num > 0);
-               t->error = add_rn_output(rn, tmpbuf, num);
-               if (t->error < 0)
-                       return;
-               purd->need_more -= num;
-               if (packet_size <= num)
-                       goto success;
-               packet_size -= num;
-               memmove(tmpbuf, tmpbuf + num, packet_size);
-       }
-       assert(!purd->need_more);
-       t->error = examine_audio_header(purd, &uah, packet_size);
-       if (t->error <= 0)
+       t->error = add_rn_output(rn, tmpbuf, packet_size);
+       if (t->error < 0)
                return;
-       data_len = uah.payload_len;
-       data_buf = tmpbuf + UDP_AUDIO_HEADER_LEN;
-       if (uah.packet_type == UDP_HEADER_PACKET) {
-               if (purd->have_header) { /* skip header */
-                       data_buf += uah.header_len;
-                       data_len -= uah.header_len;
-               } else { /* only use the header */
-                       purd->have_header = 1;
-                       data_len = uah.header_len;
-               }
-       }
-       t->error = add_rn_output(rn, data_buf, data_len);
-       return;
 success:
        t->error = 1;
 }
@@ -303,7 +203,6 @@ static int udp_recv_open(struct receiver_node *rn)
        ret = mark_fd_nonblocking(purd->fd);
        if (ret < 0)
                goto err;
-       purd->stream_type = UDP_UNKNOWN_STREAM;
        PARA_NOTICE_LOG("receiving from %s:%d, fd=%d\n", c->host_arg,
                c->port_arg, purd->fd);
        return purd->fd;
index 140458e..c17fc31 100644 (file)
@@ -9,6 +9,7 @@
 
 #include <sys/time.h>
 #include <dirent.h>
+#include <net/if.h>
 
 #include "server.cmdline.h"
 #include "para.h"
 #include "list.h"
 #include "send.h"
 #include "portable_io.h"
-#include "udp_header.h"
 #include "net.h"
 #include "fd.h"
 #include "sched.h"
 #include "close_on_fork.h"
 #include "chunk_queue.h"
 
-/** Convert in_addr to ascii. */
-#define TARGET_ADDR(oc) inet_ntoa((oc)->addr)
-
 /** Describes one entry in the list of targets for the udp sender. */
 struct udp_target {
-       /** Address info. */
-       struct in_addr addr;
        /** The position of this target in the list of targets. */
        struct list_head node;
+       /** The hostname (DNS name or IPv4/v6 address string). */
+       char host[MAX_HOSTLEN];
        /** The UDP port. */
        int port;
        /** The socket fd. */
        int fd;
        /** The list of queued chunks for this fd. */
        struct chunk_queue *cq;
+       struct fec_client *fc;
+       struct fec_client_parms fcp;
 };
 
 static struct list_head targets;
@@ -61,9 +60,10 @@ static void udp_close_target(struct udp_target *ut)
 
 static void udp_delete_target(struct udp_target *ut, const char *msg)
 {
-       PARA_NOTICE_LOG("deleting %s:%d (%s) from list\n", TARGET_ADDR(ut),
+       PARA_NOTICE_LOG("deleting %s#%d (%s) from list\n", ut->host,
                ut->port, msg);
        udp_close_target(ut);
+       vss_del_fec_client(ut->fc);
        list_del(&ut->node);
        free(ut);
 }
@@ -155,7 +155,7 @@ static int udp_init_session(struct udp_target *ut)
        if (ut->fd >= 0) /* nothing to do */
                return 0;
 
-       ret = makesock(AF_UNSPEC, IPPROTO_UDP, 0, TARGET_ADDR(ut), ut->port);
+       ret = makesock(AF_UNSPEC, IPPROTO_UDP, 0, ut->host, ut->port);
        if (ret < 0)
                return ret;
        ut->fd = ret;
@@ -176,121 +176,26 @@ static int udp_init_session(struct udp_target *ut)
        }
        add_close_on_fork_list(ut->fd);
        ut->cq = cq_new(UDP_CQ_BYTES);
-       PARA_NOTICE_LOG("sending to udp %s:%d\n", TARGET_ADDR(ut), ut->port);
+       PARA_NOTICE_LOG("sending to udp %s#%d\n", ut->host, ut->port);
        return 1;
 }
 
-static void udp_send_buf(char *buf, size_t len)
-{
-       struct udp_target *ut, *tmp;
-       int ret;
-
-       list_for_each_entry_safe(ut, tmp, &targets, node) {
-               ret = udp_init_session(ut);
-               if (ret < 0) {
-                       udp_delete_target(ut, para_strerror(-ret));
-                       continue;
-               }
-               ret = send_queued_chunks(ut->fd, ut->cq, 0);
-               if (ret < 0) {
-                       udp_delete_target(ut, para_strerror(-ret));
-                       continue;
-               }
-               if (!len)
-                       continue;
-               if (!ret) { /* still data left in the queue */
-                       ret = cq_enqueue(ut->cq, buf, len);
-                       if (ret < 0) {
-                               udp_delete_target(ut, para_strerror(-ret));
-                               continue;
-                       }
-               }
-               ret = write_nonblock(ut->fd, buf, len, 0);
-               if (ret < 0) {
-                       udp_delete_target(ut, para_strerror(-ret));
-                       continue;
-               }
-               if (ret != len) {
-                       ret = cq_enqueue(ut->cq, buf + ret, len - ret);
-                       if (ret < 0) {
-                               udp_delete_target(ut, para_strerror(-ret));
-                               continue;
-                       }
-               }
-       }
-}
-
 static void udp_shutdown_targets(void)
 {
-       char buf[UDP_AUDIO_HEADER_LEN];
        struct udp_target *ut, *tmp;
-       struct udp_audio_header uah = {
-               .stream_type = UDP_UNKNOWN_STREAM,
-               .packet_type = UDP_EOF_PACKET,
-       };
+       const char *buf = NULL;
+       size_t len = 0; /* STFU, gcc */
 
-       write_udp_audio_header(buf, &uah);
        list_for_each_entry_safe(ut, tmp, &targets, node) {
                if (ut->fd < 0)
                        continue;
-               write(ut->fd, buf, UDP_AUDIO_HEADER_LEN);
+               if (!buf)
+                       len = vss_get_fec_eof_packet(&buf);
+               write(ut->fd, buf, len);
                udp_close_target(ut);
        }
 }
 
-static int need_extra_header(long unsigned current_chunk)
-{
-       static struct timeval last_header;
-       struct timeval diff;
-
-       if (!current_chunk)
-               return 0;
-       tv_diff(now, &last_header, &diff);
-       if (tv2ms(&diff) < conf.udp_header_interval_arg)
-               return 0;
-       last_header = *now;
-       return 1;
-}
-
-static void udp_send(long unsigned current_chunk, __a_unused long unsigned chunks_sent,
-               const char *buf, size_t len, const char *header_buf,
-               size_t header_len)
-{
-       char *sendbuf;
-       size_t sendbuf_len;
-       struct timeval *chunk_tv;
-       struct udp_audio_header uah;
-
-//     PARA_NOTICE_LOG("len: %zd, header_len: %zd\n", len, header_len);
-       if (sender_status != SENDER_ON)
-               return;
-
-       /* we might not yet know the chunk time */
-       chunk_tv = vss_chunk_time();
-       if (!chunk_tv)
-               return;
-       if (list_empty(&targets))
-               return;
-       uah.stream_type = header_len? UDP_HEADER_STREAM : UDP_PLAIN_STREAM;
-       uah.header_len = need_extra_header(current_chunk)? header_len : 0;
-       if (!current_chunk)
-               uah.packet_type = UDP_BOF_PACKET;
-       else if (uah.header_len)
-               uah.packet_type = UDP_HEADER_PACKET;
-       else
-               uah.packet_type = UDP_DATA_PACKET;
-       uah.payload_len = uah.header_len + len;
-       sendbuf_len = UDP_AUDIO_HEADER_LEN + uah.payload_len;
-       sendbuf = para_malloc(sendbuf_len);
-       write_udp_audio_header(sendbuf, &uah);
-       if (uah.header_len)
-               memcpy(sendbuf + UDP_AUDIO_HEADER_LEN, header_buf,
-                       uah.header_len);
-       memcpy(sendbuf + UDP_AUDIO_HEADER_LEN + uah.header_len, buf, len);
-       udp_send_buf(sendbuf, sendbuf_len);
-       free(sendbuf);
-}
-
 static int udp_com_on(__a_unused struct sender_command_data *scd)
 {
        sender_status = SENDER_ON;
@@ -306,33 +211,71 @@ static int udp_com_off(__a_unused struct sender_command_data *scd)
 
 static int udp_com_delete(struct sender_command_data *scd)
 {
-       char *a = para_strdup(inet_ntoa(scd->addr));
        struct udp_target *ut, *tmp;
+
        list_for_each_entry_safe(ut, tmp, &targets, node) {
-               if (scd->port != ut->port)
+               /* Unspecified port means wildcard port match */
+               if (scd->port > 0 && scd->port != ut->port)
                        continue;
-               if (strcmp(TARGET_ADDR(ut), a))
+               if (strcmp(ut->host, scd->host))
                        continue;
                udp_delete_target(ut, "com_delete");
        }
        return 1;
 }
 
-static void udp_add_target(int port, struct in_addr *addr)
+static int udp_send_fec(char *buf, size_t len, void *private_data)
+{
+       struct udp_target *ut = private_data;
+       int ret = udp_init_session(ut);
+
+       if (ret < 0)
+               goto fail;
+       ret = send_queued_chunks(ut->fd, ut->cq, 0);
+       if (ret < 0)
+               goto fail;
+       if (!len)
+               return 0;
+       if (!ret) { /* still data left in the queue */
+               ret = cq_enqueue(ut->cq, buf, len);
+               if (ret < 0)
+                       goto fail;
+       }
+       ret = write_nonblock(ut->fd, buf, len, 0);
+       if (ret < 0)
+               goto fail;
+       if (ret != len) {
+               ret = cq_enqueue(ut->cq, buf + ret, len - ret);
+               if (ret < 0)
+                       goto fail;
+       }
+       return 1;
+fail:
+       udp_delete_target(ut, para_strerror(-ret));
+       return ret;
+}
+
+static void udp_add_target(const char *host, int port)
 {
        struct udp_target *ut = para_calloc(sizeof(struct udp_target));
-       ut->port = port;
-       ut->addr = *addr;
+
+       strncpy(ut->host, host, sizeof(ut->host));
+       ut->port = port > 0 ? port : conf.udp_default_port_arg;
        ut->fd = -1; /* not yet connected */
-       PARA_INFO_LOG("adding to target list (%s:%d)\n",
-               TARGET_ADDR(ut), ut->port);
+       PARA_INFO_LOG("adding to target list (%s#%d)\n",
+                     ut->host, ut->port);
        para_list_add(&ut->node, &targets);
+       ut->fcp.slices_per_group = 16;
+       ut->fcp.data_slices_per_group = 14;
+       ut->fcp.max_slice_bytes = 1400;
+       ut->fcp.send = udp_send_fec;
+       ut->fcp.private_data = ut;
+       vss_add_fec_client(&ut->fcp, &ut->fc);
 }
 
 static int udp_com_add(struct sender_command_data *scd)
 {
-       int port = (scd->port > 0)? scd->port : conf.udp_default_port_arg;
-       udp_add_target(port, &scd->addr);
+       udp_add_target(scd->host, scd->port);
        return 1;
 }
 
@@ -342,8 +285,10 @@ static char *udp_info(void)
        char *ret, *tgts = NULL;
 
        list_for_each_entry(ut, &targets, node) {
-               char *tmp = make_message("%s%s:%d ", tgts? tgts : "",
-                       TARGET_ADDR(ut), ut->port);
+               bool is_v6 = strchr(ut->host, ':') != NULL;
+               char *tmp = make_message("%s%s%s%s:%d ", tgts ? : "",
+                                        is_v6 ? "[" : "", ut->host,
+                                        is_v6 ? "]" : "", ut->port);
                free(tgts);
                tgts = tmp;
        }
@@ -362,40 +307,30 @@ static char *udp_info(void)
 
 static void udp_init_target_list(void)
 {
-       int i;
+       char    host[MAX_HOSTLEN];
+       int     port, i;
 
        INIT_LIST_HEAD(&targets);
-       for (i = 0; i < conf.udp_target_given; i++) {
-               char *arg = para_strdup(conf.udp_target_arg[i]);
-               char *p = strchr(arg, ':');
-               int port;
-               struct in_addr addr;
-
-               if (!p)
-                       goto err;
-               *p = '\0';
-               if (!inet_pton(AF_INET, arg, &addr))
-                       goto err;
-               port = atoi(++p);
-               if (port < 0 || port > 65535)
-                       port = conf.udp_default_port_arg;
-               udp_add_target(port, &addr);
-               goto success;
-err:
-               PARA_CRIT_LOG("syntax error for udp target option "
-                       "#%d, ignoring\n", i);
-success:
-               free(arg);
-               continue;
-       }
+       for (i = 0; i < conf.udp_target_given; i++)
+               if (parse_url(conf.udp_target_arg[i], host,
+                                       sizeof(host), &port) == NULL)
+                       PARA_CRIT_LOG("syntax error for udp target option "
+                                     "#%d, ignoring\n", i);
+               else
+                       udp_add_target(host, port);
 }
 
 static char *udp_help(void)
 {
        return make_message(
                "usage: {on|off}\n"
-               "usage: {add|delete} IP port\n"
-               "example: add 224.0.1.38 8000 (multicast streaming)\n"
+               "usage: {add|delete} host[:port]\n"
+               "examples: add 224.0.1.38:1500  (IPv4 multicast)\n"
+               "          add 10.10.1.42       (using default port)\n"
+               "          add [FF05::42]:1500  (IPv6 multicast)\n"
+               "          add [::1]            (IPv6 localhost/default port)\n"
+               "          delete myhost.net    (host with port wildcard)\n"
+               "          delete [badc0de::1]  (IPv6 with port wildcard)\n"
        );
 }
 
@@ -411,7 +346,7 @@ void udp_send_init(struct sender *s)
        INIT_LIST_HEAD(&targets);
        s->info = udp_info;
        s->help = udp_help;
-       s->send = udp_send;
+       s->send = NULL;
        s->pre_select = NULL;
        s->post_select = NULL;
        s->shutdown_clients = udp_shutdown_targets;
diff --git a/vss.c b/vss.c
index d205349..0801eeb 100644 (file)
--- a/vss.c
+++ b/vss.c
 
 #include "para.h"
 #include "error.h"
+#include "portable_io.h"
+#include "fec.h"
 #include "string.h"
 #include "afh.h"
 #include "afs.h"
 #include "server.h"
 #include "net.h"
 #include "server.cmdline.h"
-#include "vss.h"
 #include "list.h"
+#include "vss.h"
 #include "send.h"
 #include "ipc.h"
 #include "fd.h"
@@ -87,6 +89,268 @@ struct vss_task {
        size_t header_len;
 };
 
+static struct list_head fec_client_list;
+
+struct fec_slice {
+       uint8_t num;
+       uint16_t bytes;
+};
+
+struct fec_group {
+       uint32_t num;
+       uint32_t bytes;
+       uint32_t first_chunk;
+       uint32_t num_chunks;
+       struct timeval duration;
+       struct timeval start;
+       struct timeval slice_duration;
+};
+
+struct fec_client {
+       struct fec_client_parms *fcp;
+       struct fec_parms *parms;
+       struct list_head node;
+       struct timeval stream_start;
+       int first_stream_chunk;
+       struct fec_group group;
+       struct fec_slice slice;
+       const unsigned char **src_data;
+       unsigned char *extra_src_buf;
+       size_t extra_src_buf_size;
+       unsigned char *enc_buf;
+       size_t enc_buf_size;
+};
+
+/**
+ * Get the chunk time of the current audio file.
+ *
+ * \return A pointer to a struct containing the chunk time, or NULL,
+ * if currently no audio file is selected.
+ */
+struct timeval *vss_chunk_time(void)
+{
+       if (mmd->afd.afhi.chunk_tv.tv_sec == 0 &&
+                       mmd->afd.afhi.chunk_tv.tv_usec == 0)
+               return NULL;
+       return &mmd->afd.afhi.chunk_tv;
+}
+
+static void setup_fec_group(struct fec_client *fc, struct vss_task *vsst)
+{
+       uint32_t num_bytes = 0, chunk_num, max_group_size, last_payload_size;
+       int i, k = fc->fcp->data_slices_per_group;
+       const unsigned char *start_buf = NULL;
+       struct timeval tmp, *chunk_tv = vss_chunk_time();
+
+       assert(chunk_tv);
+       max_group_size = (fc->fcp->max_slice_bytes - FEC_HEADER_SIZE) * k;
+       chunk_num = fc->group.first_chunk;
+       for (;;) {
+               const unsigned char *buf;
+               size_t len;
+
+               if (chunk_num >= mmd->afd.afhi.chunks_total)
+                       break;
+               afh_get_chunk(chunk_num, &mmd->afd.afhi, vsst->map, (const char **)&buf, &len);
+               if (!start_buf)
+                       start_buf = buf;
+               if (num_bytes + len > max_group_size)
+                       break;
+               chunk_num++;
+               num_bytes += len;
+       }
+       assert(start_buf);
+       fc->group.num_chunks = chunk_num - fc->group.first_chunk;
+       fc->group.num++;
+       fc->group.bytes = num_bytes;
+       fc->slice.num = 0;
+       fc->slice.bytes = ROUND_UP(num_bytes, k) / k;
+
+       /* The last slice will not be fully used */
+       last_payload_size = num_bytes % fc->slice.bytes;
+       if (!last_payload_size)
+               last_payload_size = fc->slice.bytes;
+
+       tv_scale(fc->group.num_chunks, chunk_tv, &fc->group.duration);
+       tv_scale(fc->group.first_chunk - fc->first_stream_chunk, chunk_tv,
+               &tmp);
+       tv_add(&fc->stream_start, &tmp, &fc->group.start);
+       tv_divide(fc->fcp->slices_per_group, &fc->group.duration,
+               &fc->group.slice_duration);
+
+       for (i = 0; i < k; i++)
+               fc->src_data[i] = start_buf + i * fc->slice.bytes;
+
+       if ((char *)start_buf + k * fc->slice.bytes > vsst->map + mmd->size) {
+               /* can not use last slice as it goes beyond the map */
+               if (fc->extra_src_buf_size < fc->slice.bytes)
+                       fc->extra_src_buf = para_realloc(fc->extra_src_buf, fc->slice.bytes);
+               memcpy(fc->extra_src_buf, start_buf + (k - 1) * fc->slice.bytes,
+                       last_payload_size);
+               memset(fc->extra_src_buf + last_payload_size, 0,
+                       fc->slice.bytes - last_payload_size);
+               fc->src_data[k - 1] = fc->extra_src_buf;
+       }
+
+}
+
+/**
+ * Write a fec header to a buffer.
+ *
+ * \param buf The buffer to write to.
+ * \param h The fec header to write.
+ */
+static void write_fec_header(struct fec_client *fc)
+{
+       char *buf = (char *)fc->enc_buf;
+
+       write_u32(buf, FEC_MAGIC);
+
+       write_u8(buf + 4, fc->fcp->slices_per_group);
+       write_u8(buf + 5, fc->fcp->data_slices_per_group);
+       write_u32(buf + 6, (uint32_t)0); /* audio header len */
+
+       write_u32(buf + 10, fc->group.num);
+       write_u32(buf + 14, fc->group.bytes);
+
+       write_u8(buf + 18, fc->slice.num);
+       write_u16(buf + 20, fc->slice.bytes);
+       memset(buf + 22, 0, 11);
+}
+
+/**
+ * Return a buffer that marks the end of the stream.
+ *
+ * \return The length of the eof buffer.
+ *
+ * This is used for (multicast) udp streaming where closing the socket on the
+ * sender might not give rise to an eof condition at the peer.
+ */
+size_t vss_get_fec_eof_packet(const char **buf)
+{
+       static const char fec_eof_packet[FEC_HEADER_SIZE] =
+       "\xec\x0d\xcc\xfe\0\0\0\0"
+       "\0\0\0\0\0\0\0\0"
+       "\0\0\0\0\0\0\0\0"
+       "\0\0\0\0\0\0\0\0";
+       *buf = fec_eof_packet;
+       return FEC_HEADER_SIZE;
+}
+
+static void compute_next_fec_slice(struct fec_client *fc, struct vss_task *vsst)
+{
+       if (fc->first_stream_chunk < 0) {
+               fc->stream_start = *now;
+               fc->first_stream_chunk = mmd->current_chunk;
+               fc->group.first_chunk = mmd->current_chunk;
+               fc->group.num = 0;
+               setup_fec_group(fc, vsst);
+       } else if (fc->slice.num == fc->fcp->slices_per_group) {
+               fc->group.first_chunk += fc->group.num_chunks;
+               setup_fec_group(fc, vsst);
+
+       }
+       if (fc->enc_buf_size < fc->slice.bytes + FEC_HEADER_SIZE) {
+               fc->enc_buf_size = fc->slice.bytes + FEC_HEADER_SIZE;
+               fc->enc_buf = para_realloc(fc->enc_buf, fc->enc_buf_size);
+       }
+       write_fec_header(fc);
+       fec_encode(fc->parms, fc->src_data, fc->enc_buf + FEC_HEADER_SIZE,
+               fc->slice.num, fc->slice.bytes);
+}
+
+/**
+ * Add one entry to the list of active fec clients.
+ *
+ * \param fcp Describes the fec parameters to be used for this client.
+ * \param result An opaque pointer that must be used by remove the client later.
+ *
+ * \return Standard.
+ */
+int vss_add_fec_client(struct fec_client_parms *fcp, struct fec_client **result)
+{
+       int ret;
+       struct fec_client *fc;
+
+       if (fcp->max_slice_bytes < FEC_HEADER_SIZE + fcp->data_slices_per_group)
+               return -ERRNO_TO_PARA_ERROR(EINVAL);
+       fc = para_calloc(sizeof(*fc));
+       fc->fcp = fcp;
+       ret = fec_new(fcp->data_slices_per_group, fcp->slices_per_group,
+               &fc->parms);
+       if (ret < 0)
+               goto err;
+       fc->first_stream_chunk = -1; /* stream not yet started */
+       fc->src_data = para_malloc(fc->fcp->slices_per_group * sizeof(char *));
+       para_list_add(&fc->node, &fec_client_list);
+       *result = fc;
+       return 1;
+err:
+       fec_free(fc->parms);
+       free(fc);
+       *result = NULL;
+       return ret;
+}
+
+/**
+ * Remove one entry from the list of active fec clients.
+ *
+ * \param result The client to be removed.
+ */
+void vss_del_fec_client(struct fec_client *fc)
+{
+       list_del(&fc->node);
+       free(fc->src_data);
+       free(fc->enc_buf);
+       free(fc->extra_src_buf);
+       fec_free(fc->parms);
+       free(fc);
+}
+
+/*
+ * Compute if/when next slice is due. If it isn't due yet and \a diff is
+ * not \p Null, compute the time difference next - now, where
+ *
+ *     next = stream_start + (first_group_chunk - first_stream_chunk)
+ *             * chunk_time + slice_num * slice_time
+ */
+static int next_slice_is_due(struct fec_client *fc, struct timeval *diff)
+{
+       struct timeval tmp, next;
+       int ret;
+
+       if (fc->first_stream_chunk < 0)
+               return 1;
+       tv_scale(fc->slice.num, &fc->group.slice_duration, &tmp);
+       tv_add(&tmp, &fc->group.start, &next);
+       ret = tv_diff(&next, now, diff);
+       return ret < 0? 1 : 0;
+}
+
+static void compute_slice_timeout(struct timeval *timeout)
+{
+       struct fec_client *fc;
+
+       assert(vss_playing());
+       list_for_each_entry(fc, &fec_client_list, node) {
+               struct timeval diff;
+               int ret = next_slice_is_due(fc, &diff);
+
+               // PARA_NOTICE_LOG("diff: %lu, ret: %d\n", tv2ms(&diff), ret);
+               if (ret) {
+                       timeout->tv_sec = 0;
+                       timeout->tv_usec = 0;
+                       goto out;
+               }
+               /* timeout = min(timeout, diff) */
+               if (tv_diff(&diff, timeout, NULL) < 0)
+                       *timeout = diff;
+       }
+out:
+       return;
+       PARA_NOTICE_LOG("slice timeout: %lu:%lu\n", (long unsigned)timeout->tv_sec, (long unsigned)timeout->tv_usec);
+}
+
 /**
  * Check if vss status flag \a P (playing) is set.
  *
@@ -184,16 +448,20 @@ static struct timeval *vss_compute_timeout(struct vss_task *vsst)
                return NULL;
        compute_chunk_time(mmd->chunks_sent, &mmd->afd.afhi.chunk_tv,
                &mmd->stream_start, &next_chunk);
-       if (chk_barrier("chunk", &next_chunk, &the_timeout, 0) < 0)
+       if (chk_barrier("chunk", &next_chunk, &the_timeout, 0) >= 0) {
+               /* chunk is due or bof */
+               the_timeout.tv_sec = 0;
+               the_timeout.tv_usec = 0;
                return &the_timeout;
-       /* chunk is due or bof */
-       the_timeout.tv_sec = 0;
-       the_timeout.tv_usec = 0;
+       }
+       /* compute min of current timeout and next slice time */
+       compute_slice_timeout(&the_timeout);
        return &the_timeout;
 }
 
 static void vss_eof(struct vss_task *vsst)
 {
+
        mmd->stream_start = *now;
        if (!vsst->map)
                return;
@@ -230,20 +498,6 @@ const char *supported_audio_formats(void)
        return SUPPORTED_AUDIO_FORMATS;
 }
 
-/**
- * Get the chunk time of the current audio file.
- *
- * \return A pointer to a struct containing the chunk time, or NULL,
- * if currently no audio file is selected.
- */
-struct timeval *vss_chunk_time(void)
-{
-       if (mmd->afd.afhi.chunk_tv.tv_sec == 0 &&
-                       mmd->afd.afhi.chunk_tv.tv_usec == 0)
-               return NULL;
-       return &mmd->afd.afhi.chunk_tv;
-}
-
 static int need_to_request_new_audio_file(struct vss_task *vsst)
 {
        struct timeval diff;
@@ -262,6 +516,8 @@ static int need_to_request_new_audio_file(struct vss_task *vsst)
        return 1;
 }
 
+
+
 /**
  * Compute the timeout for para_server's main select-loop.
  *
@@ -285,9 +541,14 @@ static void vss_pre_select(struct sched *s, struct task *t)
        struct timeval *tv, diff;
        struct vss_task *vsst = container_of(t, struct vss_task, task);
 
-       if (!vsst->map || vss_next() || vss_paused() || vss_repos())
+       if (!vsst->map || vss_next() || vss_paused() || vss_repos()) {
+               struct fec_client *fc, *tmp;
                for (i = 0; senders[i].name; i++)
-                       senders[i].shutdown_clients();
+                       if (senders[i].shutdown_clients)
+                               senders[i].shutdown_clients();
+               list_for_each_entry_safe(fc, tmp, &fec_client_list, node)
+                       fc->first_stream_chunk = -1;
+       }
        if (vss_next())
                vss_eof(vsst);
        else if (vss_paused()) {
@@ -407,25 +668,20 @@ err:
 /**
  * Main sending function.
  *
- * This function gets called from para_server as soon as the next chunk of data
- * should be pushed out. It obtains a pointer to the data to be sent out as
- * well as its length from mmd->afd.afhi. This information is then passed to
- * each supported sender's send() function which is supposed to send out the data
- * to all connected clients.
+ * This function gets called from vss_post_select(). It checks whether the next
+ * chunk of data should be pushed out. It obtains a pointer to the data to be
+ * sent out as well as its length from mmd->afd.afhi. This information is then
+ * passed to each supported sender's send() function as well as to the send()
+ * functions of each registered fec client.
  */
-static void vss_send_chunk(struct vss_task *vsst)
+static void vss_send(struct vss_task *vsst)
 {
        int i;
        struct timeval due;
-       const char *buf;
-       size_t len;
+       struct fec_client *fc, *tmp_fc;
 
        if (!vsst->map || !vss_playing())
                return;
-       compute_chunk_time(mmd->chunks_sent, &mmd->afd.afhi.chunk_tv,
-               &mmd->stream_start, &due);
-       if (tv_diff(&due, now, NULL) > 0)
-               return;
        if (chk_barrier("eof", &vsst->eof_barrier, &due, 1) < 0)
                return;
        if (chk_barrier("data send", &vsst->data_send_barrier,
@@ -435,10 +691,6 @@ static void vss_send_chunk(struct vss_task *vsst)
                mmd->new_vss_status_flags |= VSS_NEXT;
                return;
        }
-       /*
-        * We call the send function also in case of empty chunks as they
-        * might have still some data queued which can be sent in this case.
-        */
        if (!mmd->chunks_sent) {
                struct timeval tmp;
                mmd->stream_start = *now;
@@ -446,10 +698,36 @@ static void vss_send_chunk(struct vss_task *vsst)
                mmd->offset = tv2ms(&tmp);
                mmd->events++;
        }
-       afh_get_chunk(mmd->current_chunk, &mmd->afd.afhi, vsst->map, &buf, &len);
-       for (i = 0; senders[i].name; i++)
-               senders[i].send(mmd->current_chunk, mmd->chunks_sent, buf, len,
-                       vsst->header_buf, vsst->header_len);
+       compute_chunk_time(mmd->chunks_sent, &mmd->afd.afhi.chunk_tv,
+               &mmd->stream_start, &due);
+       if (tv_diff(&due, now, NULL) <= 0) {
+               const char *buf;
+               size_t len;
+               /*
+                * We call the send function also in case of empty chunks as
+                * they might have still some data queued which can be sent in
+                * this case.
+                */
+               afh_get_chunk(mmd->current_chunk, &mmd->afd.afhi, vsst->map,
+                       &buf, &len);
+               for (i = 0; senders[i].name; i++) {
+                       if (!senders[i].send)
+                               continue;
+                       senders[i].send(mmd->current_chunk, mmd->chunks_sent,
+                               buf, len, vsst->header_buf, vsst->header_len);
+               }
+       }
+       list_for_each_entry_safe(fc, tmp_fc, &fec_client_list, node) {
+               if (!next_slice_is_due(fc, NULL))
+                       continue;
+               compute_next_fec_slice(fc, vsst);
+               PARA_DEBUG_LOG("sending %d:%d (%zu bytes)\n", fc->group.num,
+                       fc->slice.num, fc->slice.bytes);
+               fc->fcp->send((char *)fc->enc_buf,
+                       fc->slice.bytes + FEC_HEADER_SIZE,
+                       fc->fcp->private_data);
+               fc->slice.num++;
+       }
        mmd->new_vss_status_flags |= VSS_PLAYING;
        mmd->chunks_sent++;
        mmd->current_chunk++;
@@ -484,7 +762,7 @@ static void vss_post_select(struct sched *s, struct task *t)
        if ((vss_playing() && !(mmd->vss_status_flags & VSS_PLAYING)) ||
                        (vss_next() && vss_playing()))
                tv_add(now, &vsst->announce_tv, &vsst->data_send_barrier);
-       vss_send_chunk(vsst);
+       vss_send(vsst);
 }
 
 /**
@@ -527,5 +805,6 @@ void init_vss_task(int afs_socket)
                tv_add(&vsst->autoplay_barrier, &vsst->announce_tv,
                        &vsst->data_send_barrier);
        }
+       INIT_LIST_HEAD(&fec_client_list);
        register_task(&vsst->task);
 }
diff --git a/vss.h b/vss.h
index 5c917e6..efabdf3 100644 (file)
--- a/vss.h
+++ b/vss.h
@@ -25,3 +25,27 @@ const char *supported_audio_formats(void);
 #define VSS_PLAYING 8
 /** A client requested to change the audio file selector. */
 #define VSS_CHANGE 16
+
+/**
+ * Each paraslash sender may register arbitrary many clients to the virtual
+ * streaming system, possibly with varying fec parameters. In order to do so,
+ * it must allocate a \a fec_client_parms structure and pass it to \ref
+ * add_fec_client.
+ *
+ * Clients are automatically removed from that list by the vss if an error
+ * occurs, or if the sender requests deletion of a client by calling \ref
+ * vss_del_fec_client().
+ */
+struct fec_client;
+
+struct fec_client_parms {
+       uint8_t slices_per_group;
+       uint8_t data_slices_per_group;
+       uint16_t max_slice_bytes;
+       int (*send)(char *buf, size_t num_bytes, void *private_data);
+       void *private_data;
+};
+
+int vss_add_fec_client(struct fec_client_parms *fcp, struct fec_client **result);
+void vss_del_fec_client(struct fec_client *fc);
+size_t vss_get_fec_eof_packet(const char **buf);