if (fn->loaded > fn->bufsize * 3 / 5)
return 0;
- if (len < 2048 && !*fc->input_error)
+ ret = *fc->input_error;
+ if (ret < 0)
+ return ret;
+ if (len < 2048)
return 0;
if (!padd->initialized) {
* \param argv Usual argument vector.
* \param ct_ptr Points to dynamically allocated and initialized client task
* struct upon successful return.
+ * \param loglevel If not \p NULL, the number of the loglevel is stored here.
*
* Check the command line options given by \a argc and argv, set default values
* for user name and rsa key file, read further option from the config file.
int *loglevel)
{
char *home = para_homedir();
- struct stat statbuf;
int ret;
struct client_task *ct = para_calloc(sizeof(struct client_task));
ct->config_file = ct->conf.config_file_given?
para_strdup(ct->conf.config_file_arg) :
make_message("%s/.paraslash/client.conf", home);
- ret = stat(ct->config_file, &statbuf);
- if (ret && ct->conf.config_file_given) {
+ ret = file_exists(ct->config_file);
+ if (!ret && ct->conf.config_file_given) {
ret = -E_NO_CONFIG;
goto out;
}
- if (!ret) {
+ if (ret) {
struct client_cmdline_parser_params params = {
.override = 0,
.initialize = 0,
break;
case SENDER_ADD:
case SENDER_DELETE:
- if (argc != 4 && argc != 5)
- return -E_COMMAND_SYNTAX;
- if (!inet_pton(AF_INET, argv[3], &scd->addr))
+ if (argc != 4)
return -E_COMMAND_SYNTAX;
- scd->port = -1;
- if (argc == 5) {
- scd->port = atoi(argv[4]);
- if (scd->port < 0 || scd->port > 65535)
- return -E_COMMAND_SYNTAX;
- }
- break;
+ return parse_fec_url(argv[3], scd);
default:
return -E_COMMAND_SYNTAX;
}
playlist sha1 rbtree sched audiod grab_client filter_common wav_filter compress_filter
http_recv dccp_recv recv_common write_common file_write audiod_command
client_common recv stdout filter stdin audioc write client fsck exec send_common ggo
-udp_recv udp_send color"
+udp_recv udp_send color fec fecdec_filter"
all_executables="server recv filter audioc write client fsck afh"
recv_cmdline_objs="recv.cmdline http_recv.cmdline dccp_recv.cmdline udp_recv.cmdline"
recv_errlist_objs="http_recv recv_common recv time string net dccp_recv
- fd sched stdout ggo udp_recv"
+ fd sched stdout ggo udp_recv fec"
recv_ldflags=""
receivers=" http dccp udp"
senders=" http dccp udp"
filter_cmdline_objs="filter.cmdline compress_filter.cmdline amp_filter.cmdline"
-filter_errlist_objs="filter_common wav_filter compress_filter filter string stdin stdout sched fd amp_filter ggo"
+filter_errlist_objs="filter_common wav_filter compress_filter filter string
+ stdin stdout sched fd amp_filter ggo fecdec_filter fec"
filter_ldflags=""
-filters=" compress wav amp"
+filters=" compress wav amp fecdec"
audioc_cmdline_objs="audioc.cmdline"
audioc_errlist_objs="audioc string net fd"
audiod_command_list amp_filter.cmdline udp_recv.cmdline"
audiod_errlist_objs="audiod signal string daemon stat net
time grab_client filter_common wav_filter compress_filter amp_filter http_recv dccp_recv
- recv_common fd sched write_common file_write audiod_command crypt
- client_common ggo udp_recv color"
+ recv_common fd sched write_common file_write audiod_command crypt fecdec_filter
+ client_common ggo udp_recv color fec"
audiod_ldflags=""
audiod_audio_formats=""
server_errlist_objs="server afh_common mp3_afh vss command net string signal
time daemon stat crypt http_send close_on_fork
ipc dccp_send fd user_list chunk_queue afs osl aft mood score attribute
- blob playlist sha1 rbtree sched acl send_common udp_send color"
+ blob playlist sha1 rbtree sched acl send_common udp_send color fec"
server_ldflags=""
server_audio_formats=" mp3"
########################################################################### ucred
AC_MSG_CHECKING(for struct ucred)
AC_TRY_LINK([
+ #define _GNU_SOURCE
#include <sys/types.h>
#include <sys/socket.h>
],[
#define GGO_ERRORS
#define COLOR_ERRORS
-
extern const char **para_errlist[];
#define COMPRESS_FILTER_ERRORS \
PARA_ERROR(COMPRESS_SYNTAX, "syntax error in compress filter config"), \
+#define FEC_ERRORS \
+ PARA_ERROR(FEC_BAD_IDX, "invalid index vector"), \
+ PARA_ERROR(FEC_SINGULAR, "unexpected singular matrix"), \
+ PARA_ERROR(FEC_PIVOT, "pivot column not found"), \
+ PARA_ERROR(FEC_PARMS, "invalid fec parameters"), \
+
+
+#define FECDEC_FILTER_ERRORS \
+ PARA_ERROR(BAD_FEC_HEADER, "invalid fec header"), \
+ PARA_ERROR(BAD_SLICE_SIZE, "slice size too large"), \
+ PARA_ERROR(BAD_SLICE_NUM, "invalid slice number"), \
+ PARA_ERROR(FECDEC_OVERRUN, "fecdec output buffer overrun"), \
+ PARA_ERROR(FECDEC_EOF, "received eof packet"), \
+
+
#define AMP_FILTER_ERRORS \
PARA_ERROR(AMP_SYNTAX, "syntax error in amp filter config"), \
--- /dev/null
+/*
+ * fec.c -- forward error correction based on Vandermonde matrices
+ * 980624
+ * (C) 1997-98 Luigi Rizzo (luigi@iet.unipi.it)
+ *
+ * Portions derived from code by Phil Karn (karn@ka9q.ampr.org),
+ * Robert Morelos-Zaragoza (robert@spectra.eng.hawaii.edu) and Hari
+ * Thirumoorthy (harit@spectra.eng.hawaii.edu), Aug 1995
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+ * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS
+ * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
+ * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
+ * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
+ * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
+ * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ */
+
+#include "para.h"
+#include "error.h"
+#include "portable_io.h"
+#include "string.h"
+#include "fec.h"
+
+#define GF_BITS 8 /* code over GF(256) */
+#define GF_SIZE ((1 << GF_BITS) - 1)
+
+/*
+ * To speed up computations, we have tables for logarithm, exponent and inverse
+ * of a number. We use a table for multiplication as well (it takes 64K, no big
+ * deal even on a PDA, especially because it can be pre-initialized an put into
+ * a ROM!). The macro gf_mul(x,y) takes care of multiplications.
+ */
+static unsigned char gf_exp[2 * GF_SIZE]; /* index->poly form conversion table */
+static int gf_log[GF_SIZE + 1]; /* Poly->index form conversion table */
+static unsigned char inverse[GF_SIZE + 1]; /* inverse of field elem. */
+static unsigned char gf_mul_table[GF_SIZE + 1][GF_SIZE + 1];
+/* Multiply two numbers. */
+#define gf_mul(x,y) gf_mul_table[x][y]
+
+/* Compute x % GF_SIZE without a slow divide. */
+static inline unsigned char modnn(int x)
+{
+ while (x >= GF_SIZE) {
+ x -= GF_SIZE;
+ x = (x >> GF_BITS) + (x & GF_SIZE);
+ }
+ return x;
+}
+
+static void init_mul_table(void)
+{
+ int i, j;
+ for (i = 0; i < GF_SIZE + 1; i++)
+ for (j = 0; j < GF_SIZE + 1; j++)
+ gf_mul_table[i][j] =
+ gf_exp[modnn(gf_log[i] + gf_log[j])];
+
+ for (j = 0; j < GF_SIZE + 1; j++)
+ gf_mul_table[0][j] = gf_mul_table[j][0] = 0;
+}
+
+static unsigned char *alloc_matrix(int rows, int cols)
+{
+ return para_malloc(rows * cols);
+}
+
+/*
+ * Initialize the data structures used for computations in GF.
+ *
+ * This generates GF(2**GF_BITS) from the irreducible polynomial p(X) in
+ * p[0]..p[m].
+ *
+ * Lookup tables:
+ * index->polynomial form gf_exp[] contains j= \alpha^i;
+ * polynomial form -> index form gf_log[ j = \alpha^i ] = i
+ * \alpha=x is the primitive element of GF(2^m)
+ *
+ * For efficiency, gf_exp[] has size 2*GF_SIZE, so that a simple
+ * multiplication of two numbers can be resolved without calling modnn
+ */
+static void generate_gf(void)
+{
+ int i;
+ unsigned char mask = 1;
+ char *pp = "101110001"; /* The primitive polynomial 1+x^2+x^3+x^4+x^8 */
+ gf_exp[GF_BITS] = 0; /* will be updated at the end of the 1st loop */
+
+ /*
+ * first, generate the (polynomial representation of) powers of \alpha,
+ * which are stored in gf_exp[i] = \alpha ** i .
+ * At the same time build gf_log[gf_exp[i]] = i .
+ * The first GF_BITS powers are simply bits shifted to the left.
+ */
+ for (i = 0; i < GF_BITS; i++, mask <<= 1) {
+ gf_exp[i] = mask;
+ gf_log[gf_exp[i]] = i;
+ /*
+ * If pp[i] == 1 then \alpha ** i occurs in poly-repr
+ * gf_exp[GF_BITS] = \alpha ** GF_BITS
+ */
+ if (pp[i] == '1')
+ gf_exp[GF_BITS] ^= mask;
+ }
+ /*
+ * now gf_exp[GF_BITS] = \alpha ** GF_BITS is complete, so can also
+ * compute its inverse.
+ */
+ gf_log[gf_exp[GF_BITS]] = GF_BITS;
+ /*
+ * Poly-repr of \alpha ** (i+1) is given by poly-repr of \alpha ** i
+ * shifted left one-bit and accounting for any \alpha ** GF_BITS term
+ * that may occur when poly-repr of \alpha ** i is shifted.
+ */
+ mask = 1 << (GF_BITS - 1);
+ for (i = GF_BITS + 1; i < GF_SIZE; i++) {
+ if (gf_exp[i - 1] >= mask)
+ gf_exp[i] =
+ gf_exp[GF_BITS] ^ ((gf_exp[i - 1] ^ mask) << 1);
+ else
+ gf_exp[i] = gf_exp[i - 1] << 1;
+ gf_log[gf_exp[i]] = i;
+ }
+ /*
+ * log(0) is not defined, so use a special value
+ */
+ gf_log[0] = GF_SIZE;
+ /* set the extended gf_exp values for fast multiply */
+ for (i = 0; i < GF_SIZE; i++)
+ gf_exp[i + GF_SIZE] = gf_exp[i];
+
+ inverse[0] = 0; /* 0 has no inverse. */
+ inverse[1] = 1;
+ for (i = 2; i <= GF_SIZE; i++)
+ inverse[i] = gf_exp[GF_SIZE - gf_log[i]];
+}
+
+/*
+ * Compute dst[] = dst[] + c * src[]
+ *
+ * This is used often, so better optimize it! Currently the loop is unrolled 16
+ * times. The case c=0 is also optimized, whereas c=1 is not.
+ */
+#define UNROLL 16
+static void addmul(unsigned char *dst1, const unsigned char const *src1,
+ unsigned char c, int sz)
+{
+ if (c == 0)
+ return;
+ unsigned char *dst = dst1, *lim = &dst[sz - UNROLL + 1],
+ *col = gf_mul_table[c];
+ const unsigned char const *src = src1;
+
+ for (; dst < lim; dst += UNROLL, src += UNROLL) {
+ dst[0] ^= col[src[0]];
+ dst[1] ^= col[src[1]];
+ dst[2] ^= col[src[2]];
+ dst[3] ^= col[src[3]];
+ dst[4] ^= col[src[4]];
+ dst[5] ^= col[src[5]];
+ dst[6] ^= col[src[6]];
+ dst[7] ^= col[src[7]];
+ dst[8] ^= col[src[8]];
+ dst[9] ^= col[src[9]];
+ dst[10] ^= col[src[10]];
+ dst[11] ^= col[src[11]];
+ dst[12] ^= col[src[12]];
+ dst[13] ^= col[src[13]];
+ dst[14] ^= col[src[14]];
+ dst[15] ^= col[src[15]];
+ }
+ lim += UNROLL - 1;
+ for (; dst < lim; dst++, src++) /* final components */
+ *dst ^= col[*src];
+}
+
+/*
+ * Compute C = AB where A is n*k, B is k*m, C is n*m
+ */
+static void matmul(unsigned char *a, unsigned char *b, unsigned char *c,
+ int n, int k, int m)
+{
+ int row, col, i;
+
+ for (row = 0; row < n; row++) {
+ for (col = 0; col < m; col++) {
+ unsigned char *pa = &a[row * k], *pb = &b[col], acc = 0;
+ for (i = 0; i < k; i++, pa++, pb += m)
+ acc ^= gf_mul(*pa, *pb);
+ c[row * m + col] = acc;
+ }
+ }
+}
+
+#define FEC_SWAP(a,b) {typeof(a) tmp = a; a = b; b = tmp;}
+
+/*
+ * Compute the inverse of a matrix.
+ *
+ * k is the size of the matrix 'src' (Gauss-Jordan, adapted from Numerical
+ * Recipes in C). Returns -1 if 'src' is singular.
+ */
+static int invert_mat(unsigned char *src, int k)
+{
+ int irow, icol, row, col, ix, error;
+ int *indxc = para_malloc(k * sizeof(int));
+ int *indxr = para_malloc(k * sizeof(int));
+ int *ipiv = para_malloc(k * sizeof(int)); /* elements used as pivots */
+ unsigned char c, *p, *id_row = alloc_matrix(1, k),
+ *temp_row = alloc_matrix(1, k);
+
+ memset(id_row, 0, k);
+ memset(ipiv, 0, k * sizeof(int));
+
+ for (col = 0; col < k; col++) {
+ unsigned char *pivot_row;
+ /*
+ * Zeroing column 'col', look for a non-zero element.
+ * First try on the diagonal, if it fails, look elsewhere.
+ */
+ irow = icol = -1;
+ if (ipiv[col] != 1 && src[col * k + col] != 0) {
+ irow = col;
+ icol = col;
+ goto found_piv;
+ }
+ for (row = 0; row < k; row++) {
+ if (ipiv[row] != 1) {
+ for (ix = 0; ix < k; ix++) {
+ if (ipiv[ix] == 0) {
+ if (src[row * k + ix] != 0) {
+ irow = row;
+ icol = ix;
+ goto found_piv;
+ }
+ } else if (ipiv[ix] > 1) {
+ error = -E_FEC_PIVOT;
+ goto fail;
+ }
+ }
+ }
+ }
+ error = -E_FEC_PIVOT;
+ if (icol == -1)
+ goto fail;
+found_piv:
+ ++(ipiv[icol]);
+ /*
+ * swap rows irow and icol, so afterwards the diagonal element
+ * will be correct. Rarely done, not worth optimizing.
+ */
+ if (irow != icol)
+ for (ix = 0; ix < k; ix++)
+ FEC_SWAP(src[irow * k + ix], src[icol * k + ix]);
+ indxr[col] = irow;
+ indxc[col] = icol;
+ pivot_row = &src[icol * k];
+ error = -E_FEC_SINGULAR;
+ c = pivot_row[icol];
+ if (c == 0)
+ goto fail;
+ if (c != 1) { /* otherwise this is a NOP */
+ /*
+ * this is done often , but optimizing is not so
+ * fruitful, at least in the obvious ways (unrolling)
+ */
+ c = inverse[c];
+ pivot_row[icol] = 1;
+ for (ix = 0; ix < k; ix++)
+ pivot_row[ix] = gf_mul(c, pivot_row[ix]);
+ }
+ /*
+ * from all rows, remove multiples of the selected row to zero
+ * the relevant entry (in fact, the entry is not zero because
+ * we know it must be zero). (Here, if we know that the
+ * pivot_row is the identity, we can optimize the addmul).
+ */
+ id_row[icol] = 1;
+ if (memcmp(pivot_row, id_row, k) != 0) {
+ for (p = src, ix = 0; ix < k; ix++, p += k) {
+ if (ix != icol) {
+ c = p[icol];
+ p[icol] = 0;
+ addmul(p, pivot_row, c, k);
+ }
+ }
+ }
+ id_row[icol] = 0;
+ }
+ for (col = k - 1; col >= 0; col--) {
+ if (indxr[col] < 0 || indxr[col] >= k)
+ PARA_CRIT_LOG("AARGH, indxr[col] %d\n", indxr[col]);
+ else if (indxc[col] < 0 || indxc[col] >= k)
+ PARA_CRIT_LOG("AARGH, indxc[col] %d\n", indxc[col]);
+ else if (indxr[col] != indxc[col]) {
+ for (row = 0; row < k; row++) {
+ FEC_SWAP(src[row * k + indxr[col]],
+ src[row * k + indxc[col]]);
+ }
+ }
+ }
+ error = 0;
+fail:
+ free(indxc);
+ free(indxr);
+ free(ipiv);
+ free(id_row);
+ free(temp_row);
+ return error;
+}
+
+/*
+ * Invert a Vandermonde matrix.
+ *
+ * It assumes that the matrix is not singular and _IS_ a Vandermonde matrix.
+ * Only uses the second column of the matrix, containing the p_i's.
+ *
+ * Algorithm borrowed from "Numerical recipes in C" -- sec.2.8, but largely
+ * revised for GF purposes.
+ */
+static void invert_vdm(unsigned char *src, int k)
+{
+ int i, j, row, col;
+ unsigned char *b, *c, *p, t, xx;
+
+ if (k == 1) /* degenerate */
+ return;
+ /*
+ * c holds the coefficient of P(x) = Prod (x - p_i), i=0..k-1
+ * b holds the coefficient for the matrix inversion
+ */
+ c = para_malloc(k);
+ b = para_malloc(k);
+ p = para_malloc(k);
+
+ for (j = 1, i = 0; i < k; i++, j += k) {
+ c[i] = 0;
+ p[i] = src[j];
+ }
+ /*
+ * construct coeffs recursively. We know c[k] = 1 (implicit) and start
+ * P_0 = x - p_0, then at each stage multiply by x - p_i generating P_i
+ * = x P_{i-1} - p_i P_{i-1} After k steps we are done.
+ */
+ c[k - 1] = p[0]; /* really -p(0), but x = -x in GF(2^m) */
+ for (i = 1; i < k; i++) {
+ unsigned char p_i = p[i];
+ for (j = k - 1 - (i - 1); j < k - 1; j++)
+ c[j] ^= gf_mul(p_i, c[j + 1]);
+ c[k - 1] ^= p_i;
+ }
+
+ for (row = 0; row < k; row++) {
+ /*
+ * synthetic division etc.
+ */
+ xx = p[row];
+ t = 1;
+ b[k - 1] = 1; /* this is in fact c[k] */
+ for (i = k - 2; i >= 0; i--) {
+ b[i] = c[i + 1] ^ gf_mul(xx, b[i + 1]);
+ t = gf_mul(xx, t) ^ b[i];
+ }
+ for (col = 0; col < k; col++)
+ src[col * k + row] = gf_mul(inverse[t], b[col]);
+ }
+ free(c);
+ free(b);
+ free(p);
+}
+
+static int fec_initialized;
+
+static void init_fec(void)
+{
+ generate_gf();
+ init_mul_table();
+ fec_initialized = 1;
+}
+
+/** Internal FEC parameters. */
+struct fec_parms {
+ /** Number of data slices. */
+ int k;
+ /** Number of slices (including redundant slices). */
+ int n;
+ /** The encoding matrix, computed by init_fec(). */
+ unsigned char *enc_matrix;
+};
+
+/**
+ * Deallocate a fec params structure.
+ *
+ * \param p The structure to free.
+ */
+void fec_free(struct fec_parms *p)
+{
+ if (!p)
+ return;
+ free(p->enc_matrix);
+ free(p);
+}
+
+/**
+ * Create a new encoder and return an opaque descriptor to it.
+ *
+ * \param k Number of input slices.
+ * \param n Number of output slices.
+ * \param result On success the Fec descriptor is returned here.
+ *
+ * \return Standard.
+ *
+ * This creates the k*n encoding matrix. It is computed starting with a
+ * Vandermonde matrix, and then transformed into a systematic matrix.
+ */
+int fec_new(int k, int n, struct fec_parms **result)
+{
+ int row, col;
+ unsigned char *p, *tmp_m;
+ struct fec_parms *parms;
+
+ if (!fec_initialized)
+ init_fec();
+
+ if (k < 1 || k > GF_SIZE + 1 || n > GF_SIZE + 1 || k > n)
+ return -E_FEC_PARMS;
+ parms = para_malloc(sizeof(struct fec_parms));
+ parms->k = k;
+ parms->n = n;
+ parms->enc_matrix = alloc_matrix(n, k);
+ tmp_m = alloc_matrix(n, k);
+ /*
+ * fill the matrix with powers of field elements, starting from 0.
+ * The first row is special, cannot be computed with exp. table.
+ */
+ tmp_m[0] = 1;
+ for (col = 1; col < k; col++)
+ tmp_m[col] = 0;
+ for (p = tmp_m + k, row = 0; row < n - 1; row++, p += k) {
+ for (col = 0; col < k; col++)
+ p[col] = gf_exp[modnn(row * col)];
+ }
+
+ /*
+ * quick code to build systematic matrix: invert the top
+ * k*k vandermonde matrix, multiply right the bottom n-k rows
+ * by the inverse, and construct the identity matrix at the top.
+ */
+ invert_vdm(tmp_m, k); /* much faster than invert_mat */
+ matmul(tmp_m + k * k, tmp_m, parms->enc_matrix + k * k, n - k, k, k);
+ /*
+ * the upper matrix is I so do not bother with a slow multiply
+ */
+ memset(parms->enc_matrix, 0, k * k);
+ for (p = parms->enc_matrix, col = 0; col < k; col++, p += k + 1)
+ *p = 1;
+ free(tmp_m);
+ *result = parms;
+ return 0;
+}
+
+/**
+ * Compute one encoded slice of the given input.
+ *
+ * \param parms The fec parameters returned earlier by fec_new().
+ * \param src The \a k data slices to encode.
+ * \param dst Result pointer.
+ * \param idx The index of the slice to compute.
+ * \param sz The size of the input data packets.
+ *
+ * Encode the \a k slices of size \a sz given by \a src and store the output
+ * slice number \a idx in \a dst.
+ */
+void fec_encode(struct fec_parms *parms, const unsigned char * const *src,
+ unsigned char *dst, int idx, int sz)
+{
+ int i, k = parms->k;
+ unsigned char *p;
+
+ assert(idx <= parms->n);
+
+ if (idx < k) {
+ memcpy(dst, src[idx], sz);
+ return;
+ }
+ p = &(parms->enc_matrix[idx * k]);
+ memset(dst, 0, sz);
+ for (i = 0; i < k; i++)
+ addmul(dst, src[i], p[i], sz);
+}
+
+/* Move src packets in their position. */
+static int shuffle(unsigned char **data, int *idx, int k)
+{
+ int i;
+
+ for (i = 0; i < k;) {
+ if (idx[i] >= k || idx[i] == i)
+ i++;
+ else { /* put index and data at the right position */
+ int c = idx[i];
+
+ if (idx[c] == c) /* conflict */
+ return -E_FEC_BAD_IDX;
+ FEC_SWAP(idx[i], idx[c]);
+ FEC_SWAP(data[i], data[c]);
+ }
+ }
+ return 0;
+}
+
+/*
+ * Construct the decoding matrix given the indices. The encoding matrix must
+ * already be allocated.
+ */
+static int build_decode_matrix(struct fec_parms *parms, int *idx,
+ unsigned char **result)
+{
+ int ret = -E_FEC_BAD_IDX, i, k = parms->k;
+ unsigned char *p, *matrix = alloc_matrix(k, k);
+
+ for (i = 0, p = matrix; i < k; i++, p += k) {
+ if (idx[i] >= parms->n) /* invalid index */
+ goto err;
+ if (idx[i] < k) {
+ memset(p, 0, k);
+ p[i] = 1;
+ } else
+ memcpy(p, &(parms->enc_matrix[idx[i] * k]), k);
+ }
+ ret = invert_mat(matrix, k);
+ if (ret < 0)
+ goto err;
+ *result = matrix;
+ return 0;
+err:
+ free(matrix);
+ *result = NULL;
+ return ret;
+}
+
+/**
+ * Decode one slice from the group of received slices.
+ *
+ * \param parms Pointer to fec params structure.
+ * \param data Pointers to received packets.
+ * \param idx Pointer to packet indices (gets modified).
+ * \param sz Size of each packet.
+ *
+ * \return Zero on success, -1 on errors.
+ *
+ * The \a data vector of received slices and the indices of slices are used to
+ * produce the correct output slice. The data slices are modified in-place.
+ */
+int fec_decode(struct fec_parms *parms, unsigned char **data, int *idx,
+ int sz)
+{
+ unsigned char *m_dec, **slice;
+ int ret, row, col, k = parms->k;
+
+ ret = shuffle(data, idx, k);
+ if (ret < 0)
+ return ret;
+ ret = build_decode_matrix(parms, idx, &m_dec);
+ if (ret < 0)
+ return ret;
+ /* do the actual decoding */
+ slice = para_malloc(k * sizeof(unsigned char *));
+ for (row = 0; row < k; row++) {
+ if (idx[row] >= k) {
+ slice[row] = para_calloc(sz);
+ for (col = 0; col < k; col++)
+ addmul(slice[row], data[col],
+ m_dec[row * k + col], sz);
+ }
+ }
+ /* move slices to their final destination */
+ for (row = 0; row < k; row++) {
+ if (idx[row] >= k) {
+ memcpy(data[row], slice[row], sz);
+ free(slice[row]);
+ }
+ }
+ free(slice);
+ free(m_dec);
+ return 0;
+}
--- /dev/null
+/*
+ * fec.c -- forward error correction based on Vandermonde matrices
+ * 980614
+ * (C) 1997-98 Luigi Rizzo (luigi@iet.unipi.it)
+ *
+ * Portions derived from code by Phil Karn (karn@ka9q.ampr.org),
+ * Robert Morelos-Zaragoza (robert@spectra.eng.hawaii.edu) and Hari
+ * Thirumoorthy (harit@spectra.eng.hawaii.edu), Aug 1995
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+ * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS
+ * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
+ * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
+ * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
+ * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
+ * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ */
+
+#define FEC_MAGIC 0xFECC0DEC
+#define FEC_HEADER_SIZE 32
+
+struct fec_parms;
+
+int fec_new(int k, int n, struct fec_parms **parms);
+void fec_free(struct fec_parms *p);
+void fec_encode(struct fec_parms *parms, const unsigned char * const *src,
+ unsigned char *dst, int idx, int sz);
+int fec_decode(struct fec_parms *parms, unsigned char **data, int *idx,
+ int sz);
+
+
--- /dev/null
+/*
+ * Copyright (C) 2009 Andre Noll <maan@systemlinux.org>
+ *
+ * Licensed under the GPL v2. For licencing details see COPYING.
+ */
+
+/** \file fecdec_filter.c A filter fec-decodes an audio stream. */
+
+#include <dirent.h>
+#include "para.h"
+#include "error.h"
+#include "list.h"
+#include "sched.h"
+#include "ggo.h"
+#include "filter.h"
+#include "string.h"
+#include "portable_io.h"
+#include "fec.h"
+#include "fd.h"
+
+/**
+ * How many FEC groups to store in memory.
+ *
+ * Packet reordering requires to keep more than one FEC group in memory because
+ * slices belonging to the next FEC group may arrive before the current FEC group
+ * is complete.
+ */
+#define NUM_FEC_GROUPS 3
+
+/** Size of the output buffer of the fecdec filter. */
+#define FECDEC_OUTBUF_SIZE (128 * 1024)
+
+/** Data read from the header of a slice. */
+struct fec_header {
+ /** Total number of slices in this group. */
+ uint8_t slices_per_group;
+ /** Number of slices needed to start decoding. */
+ uint8_t data_slices_per_group;
+ /** Size of the ogg vorbis header (zero for mp3, aac). */
+ uint32_t audio_header_size;
+ /** Number of the FEC group this slice belongs to. */
+ uint32_t group_num;
+ /** Size of the data in this FEC group. */
+ uint32_t group_bytes;
+ /** Number of this slice in the group. */
+ uint8_t slice_num;
+ /** Used data bytes of this slice. */
+ uint16_t slice_bytes;
+};
+
+/**
+ * The status of one partially received FEC group.
+ */
+struct fecdec_group {
+ /** The header read from the last slice. */
+ struct fec_header h;
+ /** How many slices received so far. */
+ int num_received_slices;
+ /** The size of the \a idx and the \a data arrays below. */
+ int num_slices;
+ /** Array of indices of the received slices. */
+ int *idx;
+ /** Content of the received slices. */
+ unsigned char **data;
+};
+
+/**
+ * Data private to the fecdec filter.
+ */
+struct private_fecdec_data {
+ /** Used by the fec core code. */
+ struct fec_parms *fec;
+ /** Keeps track of what was received so far. */
+ struct fecdec_group groups[NUM_FEC_GROUPS];
+};
+
+/** Iterate over all fecdec groups. */
+#define FOR_EACH_FECDEC_GROUP(g, d) for (g = (d)->groups; \
+ (g) - (d)->groups < NUM_FEC_GROUPS; (g)++)
+
+static int group_complete(struct fecdec_group *fg)
+{
+ return fg->num_received_slices >= fg->h.data_slices_per_group;
+}
+
+static int group_empty(struct fecdec_group *fg)
+{
+ return fg->num_received_slices == 0;
+}
+
+static void clear_group(struct fecdec_group *fg)
+{
+ int i;
+
+ for (i = 0; i < fg->num_slices; i++) {
+ free(fg->data[i]);
+ fg->data[i] = NULL;
+ fg->idx[i] = -1;
+ }
+ free(fg->data);
+ free(fg->idx);
+ fg->num_slices = 0;
+ memset(&fg->h, 0, sizeof(struct fec_header));
+ fg->num_received_slices = 0;
+}
+
+static int find_group(struct fec_header *h,
+ struct private_fecdec_data *pfd, struct fecdec_group **result)
+{
+ struct fecdec_group *fg;
+
+ FOR_EACH_FECDEC_GROUP(fg, pfd) {
+ if (fg->h.group_num != h->group_num)
+ continue;
+ if (fg->h.slices_per_group != h->slices_per_group)
+ continue;
+ if (fg->h.data_slices_per_group != h->data_slices_per_group)
+ continue;
+ *result = fg;
+ return 1;
+ }
+ return 0;
+}
+
+static struct fecdec_group *find_unused_group(struct private_fecdec_data *pfd)
+{
+ struct fecdec_group *fg;
+
+ FOR_EACH_FECDEC_GROUP(fg, pfd) {
+ if (fg->num_received_slices == 0)
+ return fg;
+ }
+ return NULL;
+}
+
+static struct fecdec_group *try_to_free_group(struct private_fecdec_data *pfd)
+{
+ struct fecdec_group *fg;
+
+ FOR_EACH_FECDEC_GROUP(fg, pfd) {
+ if (!group_complete(fg))
+ continue;
+ clear_group(fg);
+ return fg;
+ }
+ return NULL;
+}
+
+static struct fecdec_group *free_oldest_group(struct private_fecdec_data *pfd)
+{
+ struct fecdec_group *fg, *oldest = NULL;
+
+ FOR_EACH_FECDEC_GROUP(fg, pfd) {
+ if (!oldest || oldest->h.group_num > fg->h.group_num)
+ oldest = fg;
+ }
+ if (!group_complete(oldest) && !group_empty(oldest))
+ PARA_WARNING_LOG("Clearing incomplete group %d "
+ "(contains %d slices)\n", fg->h.group_num,
+ fg->num_received_slices);
+ clear_group(oldest);
+ return oldest;
+}
+
+/* returns 1 if the group was found, 0 if not, negative on errors */
+static int get_group(struct fec_header *h, struct private_fecdec_data *pfd,
+ struct fecdec_group **result)
+{
+ struct fecdec_group *fg;
+ int ret = find_group(h, pfd, &fg);
+
+ if (ret < 0)
+ return ret;
+ if (ret > 0) /* found group */
+ goto success;
+ /* group not found */
+ fg = find_unused_group(pfd);
+ if (fg)
+ goto success;
+ fg = try_to_free_group(pfd);
+ if (fg)
+ goto success;
+ fg = free_oldest_group(pfd);
+ ret = 0;
+success:
+ fg->h = *h;
+ *result = fg;
+ return ret;
+}
+
+/*
+ * returns 1 if slice was added, zero otherwise (because the group was already
+ * complete). In any case the number of received slices is being increased by
+ * one.
+ */
+static int add_slice(char *buf, struct fecdec_group *fg)
+{
+ int r, slice_num;
+
+ if (group_complete(fg)) {
+ PARA_DEBUG_LOG("group complete, ignoring slice %d\n",
+ fg->h.slice_num);
+ fg->num_received_slices++;
+ return 0;
+ }
+ slice_num = fg->h.slice_num;
+ if (fg->num_slices == 0) {
+ fg->num_slices = fg->h.slices_per_group;
+ fg->idx = malloc(fg->num_slices * sizeof(int));
+ fg->data = malloc(fg->num_slices * sizeof(unsigned char *));
+ memset(fg->data, 0, fg->num_slices * sizeof(unsigned char *));
+ }
+ r = fg->num_received_slices;
+ fg->idx[r] = slice_num;
+ fg->data[r] = malloc(fg->h.slice_bytes);
+ memcpy(fg->data[r], buf, fg->h.slice_bytes);
+ fg->num_received_slices++;
+ return 1;
+}
+
+static int decode_group(struct fecdec_group *fg, struct filter_node *fn)
+{
+ int i, ret, sb = fg->h.slice_bytes;
+ size_t written = 0;
+ struct private_fecdec_data *pfd = fn->private_data;
+
+ ret = fec_decode(pfd->fec, fg->data, fg->idx, sb);
+ if (ret < 0)
+ return ret;
+ PARA_DEBUG_LOG("writing group %d (%d/%d decoded data bytes)\n",
+ fg->h.group_num, fg->h.group_bytes,
+ fg->h.data_slices_per_group * sb);
+ for (i = 0; i < fg->h.data_slices_per_group; i++) {
+ size_t n = sb;
+ if (n + written > fg->h.group_bytes)
+ n = fg->h.group_bytes - written;
+ if (fn->loaded + n > fn->bufsize)
+ return -E_FECDEC_OVERRUN;
+ memcpy(fn->buf + fn->loaded, fg->data[i], n);
+ fn->loaded += n;
+ written += n;
+ }
+ return 0;
+}
+
+/**
+ * Read a fec header from a buffer.
+ *
+ * \param buf The buffer to write to.
+ * \param h The fec header to write.
+ */
+static int read_fec_header(char *buf, size_t len, struct fec_header *h)
+{
+ uint32_t magic;
+
+ if (len < FEC_HEADER_SIZE)
+ return 0;
+ magic = read_u32(buf);
+ if (magic != FEC_MAGIC)
+ return -E_BAD_FEC_HEADER;
+ h->slices_per_group = read_u8(buf + 4);
+ h->data_slices_per_group = read_u8(buf + 5);
+ h->audio_header_size = read_u32(buf + 6);
+
+ h->group_num = read_u32(buf + 10);
+ h->group_bytes = read_u32(buf + 14);
+
+ h->slice_num = read_u8(buf + 18);
+ h->slice_bytes = read_u16(buf + 20);
+ if (!h->group_bytes && & h->slice_bytes)
+ return -E_FECDEC_EOF;
+// PARA_DEBUG_LOG("group %u, slize %u, slices per group: %u\n",
+// h->group_num, h->slice_num, h->slices_per_group);
+ return 1;
+}
+
+/* returns 1 if we used the buffer, 0 if we didn't, negative on errors */
+static int dispatch_slice(char *buf, size_t len, struct fec_header *h,
+ struct filter_node *fn)
+{
+ struct fecdec_group *fg;
+ int ret;
+ struct private_fecdec_data *pfd = fn->private_data;
+
+ if (h->slice_bytes > len) /* can not use the thing, try to read more */
+ return 0;
+ ret = get_group(h, pfd, &fg);
+ if (ret < 0)
+ return ret;
+ if (!add_slice(buf, fg))
+ return 1;
+ if (group_complete(fg)) {
+ if (!pfd->fec) {
+ int k = h->data_slices_per_group, n = h->slices_per_group;
+ PARA_NOTICE_LOG("init fec (%d, %d)\n", k, n);
+ ret = fec_new(k, n, &pfd->fec);
+ if (ret < 0)
+ return ret;
+ }
+ ret = decode_group(fg, fn);
+ if (ret < 0)
+ return ret;
+ }
+ return 1;
+}
+
+static int fecdec(char *buf, size_t len, struct filter_node *fn)
+{
+ int ret;
+ struct fec_header h;
+
+ ret = read_fec_header(buf, len, &h);
+ if (ret <= 0)
+ return ret;
+ if (h.slice_bytes > fn->bufsize)
+ return -E_BAD_SLICE_SIZE;
+ if (h.slice_num > h.slices_per_group)
+ return -E_BAD_SLICE_NUM;
+ ret = dispatch_slice(buf + FEC_HEADER_SIZE, len - FEC_HEADER_SIZE,
+ &h, fn);
+ //PARA_INFO_LOG("ret: %d, len: %d, slice_bytes: %d\n", ret, len, h.slice_bytes);
+ if (ret <= 0)
+ return ret;
+ return FEC_HEADER_SIZE + h.slice_bytes;
+}
+
+static void fecdec_close(struct filter_node *fn)
+{
+ struct private_fecdec_data *pfd = fn->private_data;
+ struct fecdec_group *fg;
+
+ FOR_EACH_FECDEC_GROUP(fg, pfd)
+ clear_group(fg);
+ free(fn->buf);
+ fn->buf = NULL;
+ free(fn->private_data);
+ fn->private_data = NULL;
+}
+
+static void fecdec_open(struct filter_node *fn)
+{
+ fn->bufsize = FECDEC_OUTBUF_SIZE;
+ fn->buf = para_malloc(fn->bufsize);
+ fn->private_data = para_calloc(sizeof(struct private_fecdec_data));
+ fn->loaded = 0;
+}
+
+/**
+ * The init function of the fecdec filter.
+ *
+ * \param f struct to initialize.
+ */
+void fecdec_filter_init(struct filter *f)
+{
+ f->convert = fecdec;
+ f->close = fecdec_close;
+ f->open = fecdec_open;
+}
/**
* Call the init function of each supported filter.
- *
- * \param all_filters the array of all supported filters.
- *
* \sa filter::init
*/
void filter_init(void)
option "udp_target" -
#~~~~~~~~~~~~~~~~~~~~
-"add udp target"
-string typestr="a.b.c.d:p"
+"add udp target with optional port"
+string typestr="host[:port]"
optional
multiple
details="
- Add given host/port to the list of targets. This option
- can be given multiple times. Example: '224.0.1.38:1500'
- instructs the udp sender to send to udp port 1500 on host
- 224.0.1.38 (unassigned ip in the Local Network Control Block
- 224.0.0/24). This is useful for multicast streaming.
+ Add given host/port to the list of targets. The 'host' argument
+ can be either an IPv4/v6 address or hostname (RFC 3986 syntax).
+ The 'port' argument is an optional port number. If the 'port'
+ part is absent, the 'udp_default_port' value is used.
+
+ The following examples are possible targets:
+ '10.10.1.2:8000' (host:port); '10.10.1.2' (with default port);
+ '224.0.1.38:1500' (IPv4 multicast); 'localhost:8001' (hostname
+ with port); '[::1]:8001' (IPv6 localhost); '[badc0de::1]' (IPv6
+ host with default port); '[FF00::beef]:1500' (IPv6 multicast).
+
+ This option can be given multiple times, for multiple targets.
"
option "udp_no_autostart" -
* when any key is pressed.
*
* EXTERNAL_MODE: Check only signal pipe. Used when an external command
- * is running. During that thime curses is disabled. Returns when
+ * is running. During that time curses is disabled. Returns when
* cmd_pid == 0.
*/
static int do_select(int mode)
/** \file net.c Networking-related helper functions. */
+/*
+ * Since glibc 2.8, the _GNU_SOURCE feature test macro must be defined in order
+ * to obtain the definition of the ucred structure.
+ */
+#define _GNU_SOURCE
+
#include <netdb.h>
/* At least NetBSD needs these. */
#endif
#include <dirent.h>
+#include <regex.h>
#include "para.h"
#include "error.h"
crypt_data_array[fd].private_data = NULL;
}
+/**
+ * Match string as a candidate IPv4 address.
+ *
+ * \param address The string to match.
+ * \return True if \a address has "dot-quad" format.
+ */
+static bool is_v4_dot_quad(const char *address)
+{
+ bool result;
+ regex_t r;
+
+ assert(!regcomp(&r, "^([0-9]+\\.){3}[0-9]+$", REG_EXTENDED|REG_NOSUB));
+ result = regexec(&r, address, 0, NULL, 0) == 0;
+ regfree(&r);
+ return result;
+}
+
+/**
+ * Perform basic syntax checking on the host-part of an URL:
+ *
+ * - Since ':' is invalid in IPv4 addresses and DNS names, the
+ * presence of ':' causes interpretation as IPv6 address;
+ * - next the first-match-wins algorithm from RFC 3986 is applied;
+ * - else the string is considered as DNS name, to be resolved later.
+ *
+ * \param host The host string to check.
+ * \return True if \a host passes the syntax checks.
+ *
+ * \sa RFC 3986, 3.2.2; RFC 1123, 2.1; RFC 1034, 3.5
+ */
+static bool host_string_ok(const char *host)
+{
+ if (host == NULL || *host == '\0')
+ return false;
+ if (strchr(host, ':') != NULL)
+ return is_valid_ipv6_address(host);
+ if (is_v4_dot_quad(host))
+ return is_valid_ipv4_address(host);
+ return true;
+}
+
+/**
+ * Parse and validate URL string.
+ *
+ * The URL syntax is loosely based on RFC 3986, supporting one of
+ * - "["host"]"[:port] for native IPv6 addresses and
+ * - host[:port] for IPv4 hostnames and DNS names.
+ *
+ * Native IPv6 addresses must be enclosed in square brackets, since
+ * otherwise there is an ambiguity with the port separator `:'.
+ * The 'port' part is always considered to be a number; if absent,
+ * it is set to -1, to indicate that a default port is to be used.
+ *
+ * The following are valid examples:
+ * - 10.10.1.1
+ * - 10.10.1.2:8000
+ * - localhost
+ * - localhost:8001
+ * - [::1]:8000
+ * - [badc0de::1]
+ *
+ * \param url The URL string to take apart.
+ * \param host To return the copied host part of \a url.
+ * \param hostlen The maximum length of \a host.
+ * \param port To return the port number (if any) of \a url.
+ *
+ * \return Pointer to \a host, or NULL if failed.
+ * If NULL is returned, \a host and \a portnum are undefined. If no
+ * port number was present in \a url, \a portnum is set to -1.
+ *
+ * \sa RFC 3986, 3.2.2/3.2.3
+ */
+char *parse_url(const char *url,
+ char *host, ssize_t hostlen,
+ int32_t *port)
+{
+ const char *o = url;
+ char *c = host, *end = c + (hostlen - 1);
+
+ *port = -1;
+
+ if (o == NULL || hostlen < 1)
+ goto failed;
+
+ if (*o == '[') {
+ for (++o; (*c = *o == ']' ? '\0' : *o); c++, o++)
+ if (c == end)
+ goto failed;
+
+ if (*o++ != ']' || (*o != '\0' && *o != ':'))
+ goto failed;
+ } else {
+ for (; (*c = *o == ':'? '\0' : *o); c++, o++)
+ if (c == end)
+ goto failed;
+ }
+
+ if (*o == ':')
+ if (para_atoi32(++o, port) < 0 ||
+ *port < 0 || *port > 0xffff)
+ goto failed;
+
+ if (host_string_ok(host))
+ return host;
+failed:
+ *host = '\0';
+ return NULL;
+}
/**
* Determine the socket type for a given layer-4 protocol.
#endif
/** \endcond */
+
+/**
+ * Functions to parse and validate (parts of) URLs.
+ */
+extern char *parse_url(const char *url,
+ char *host, ssize_t hostlen, int32_t *port);
/**
* Ensure that string conforms to the IPv4 address format.
*
return inet_pton(AF_INET, address, &test_it) != 0;
}
+/**
+ * Ensure that string conforms to IPv6 address format.
+ *
+ * \param address The address string to check.
+ *
+ * \return 1 if string has a valid IPv6 address syntax, 0 if not.
+ * \sa RFC 4291
+ */
+_static_inline_ bool is_valid_ipv6_address(const char *address)
+{
+ struct in6_addr test_it;
+
+ return inet_pton(AF_INET6, address, &test_it) != 0;
+}
+
/**
* Generic socket creation (passive and active sockets).
*/
struct sender_client *accept_sender_client(struct sender_status *ss);
int send_queued_chunks(int fd, struct chunk_queue *cq,
size_t max_bytes_per_write);
+int parse_fec_url(const char *arg, struct sender_command_data *scd);
"example: allow 127.0.0.1 32\n"
);
}
+
+static int parse_fec_parms(const char *arg, struct sender_command_data *scd)
+{
+ int32_t val;
+ char *a = para_strdup(arg), *b = a, *e = strchr(b, ':');
+ int ret = -E_COMMAND_SYNTAX;
+
+ /* parse max slice bytes */
+ if (!e)
+ goto out;
+ *e = '\0';
+ ret = para_atoi32(b, &val);
+ if (ret < 0)
+ goto out;
+ ret = -ERRNO_TO_PARA_ERROR(EINVAL);
+ if (val < 0 || val > 65535)
+ goto out;
+ scd->max_slice_bytes = val;
+ /* parse data_slices_per_group */
+ b = e + 1;
+ e = strchr(b, ':');
+ ret = -E_COMMAND_SYNTAX;
+ if (!e)
+ goto out;
+ *e = '\0';
+ ret = para_atoi32(b, &val);
+ if (ret < 0)
+ goto out;
+ ret = -ERRNO_TO_PARA_ERROR(EINVAL);
+ if (val < 0 || val > 255)
+ goto out;
+ scd->data_slices_per_group = val;
+ /* parse slices_per_group */
+ b = e + 1;
+ ret = para_atoi32(b, &val);
+ if (ret < 0)
+ goto out;
+ ret = -ERRNO_TO_PARA_ERROR(EINVAL);
+ if (val < 0 || val < scd->data_slices_per_group)
+ goto out;
+ scd->slices_per_group = val;
+ ret = 0;
+out:
+ free(a);
+ return ret;
+}
+
+int parse_fec_url(const char *arg, struct sender_command_data *scd)
+{
+ int ret;
+ ssize_t len = sizeof(scd->host);
+ char *a = para_strdup(arg), *p = strchr(a, '/');
+
+ if (p) {
+ *p = '\0';
+ len = strlen(a);
+ }
+ ret = -ERRNO_TO_PARA_ERROR(EINVAL);
+ if (!parse_url(a, scd->host, len, &scd->port))
+ goto out;
+ if (p) {
+ ret = parse_fec_parms(p + 1, scd);
+ goto out;
+ }
+ /* use default fec parameters. */
+ scd->max_slice_bytes = 1490;
+ scd->slices_per_group = 16;
+ scd->data_slices_per_group = 14;
+ ret = 0;
+out:
+ free(a);
+ return ret;
+}
/** \file server.h Common server data structures. */
-
/** Size of the selector_info and audio_file info strings of struct misc_meta_data. */
#define MMD_INFO_SIZE 16384
+/** The maximum length of the host component in an URL */
+#define MAX_HOSTLEN 256
+
/**
* Defines one command of para_server.
*/
/** The number of the sender in question. */
int sender_num;
/** Used for the allow/deny/add/remove subcommands. */
- struct in_addr addr;
- /** Used for the allow/deny/add/remove subcommands. */
- char host[256];
+ char host[MAX_HOSTLEN];
/** Used for allow/deny. */
int netmask;
/** The port number for add/remove. */
int port;
+ /** Maximal slice size. */
+ uint16_t max_slice_bytes;
+ /** Number of data slices plus redundant slices. */
+ uint8_t slices_per_group;
+ /** Number of slices minus number of redundant slices. */
+ uint8_t data_slices_per_group;
};
/**
+++ /dev/null
-/*
- * Copyright (C) 2006-2009 Andre Noll <maan@systemlinux.org>
- *
- * Licensed under the GPL v2. For licencing details see COPYING.
- */
-
-/** \file udp_header.h some macros used by udp_send.c and udp_recv.c. */
-#include <net/if.h>
-
-/**
- * Number of bytes of the paraslash udp header.
- *
- * The udp sender prepends a header at the beginning of each data chunk. Within
- * this header, the type of the current audio stream and the * type of this
- * data chunk is coded.
- */
-#define UDP_AUDIO_HEADER_LEN 16
-
-/** The possible stream types. */
-enum udp_stream_type {
- /** Used for mp3 and aac streams. */
- UDP_PLAIN_STREAM,
- /** Ogg vorbis streams. */
- UDP_HEADER_STREAM,
- /** stream type not yet known. */
- UDP_UNKNOWN_STREAM
-};
-
-/** The possible packet types. */
-enum udp_audio_packet_type {
- /** Beginning of file. */
- UDP_BOF_PACKET,
- /** End of file. */
- UDP_EOF_PACKET,
- /** Combined header/data packet (ogg only). */
- UDP_HEADER_PACKET,
- /** Packet contains only audio file data. */
- UDP_DATA_PACKET,
- /** Invalid packet type. */
- UDP_UNKNOWN_PACKET
-};
-
-/** The contents of an udp audio header. */
-struct udp_audio_header {
- /** see \ref udp_stream_type. */
- uint8_t stream_type;
- /** see \ref udp_audio_packet_type. */
- uint8_t packet_type;
- /** Non-zero only for header packets. */
- uint16_t header_len;
- /** Length of header plus audio file data. */
- uint16_t payload_len;
-};
-
-/**
- * Write a struct udp_audio_header to a buffer.
- *
- * \param buf The buffer to write to.
- * \param h The audio header to write.
- *
- * Used by the udp sender.
- *
- */
-_static_inline_ void write_udp_audio_header(char *buf, struct udp_audio_header *h)
-{
- memcpy(buf, "UDPM", 4);
- write_u8(buf + 4, h->stream_type);
- write_u8(buf + 5, h->packet_type);
- write_u16(buf + 6, h->header_len);
- write_u16(buf + 8, h->payload_len);
- memset(buf + 10, 0, 6);
-}
-
-/**
- * Used by the udp receiver to read a struct udp_audio_header from a buffer.
- *
- * \param buf The buffer to read from.
- * \param len The length of \a buf.
- * \param h Result pointer.
- *
- * \return 1 if \a buf contains a valid udp audio header, -1 else.
- */
-_static_inline_ int read_udp_audio_header(char *buf, size_t len,
- struct udp_audio_header *h)
-{
- if (len < 4)
- goto err;
- if (memcmp(buf, "UDPM", 4))
- goto err;
- h->stream_type = read_u8(buf + 4);
- h->packet_type = read_u8(buf + 5);
- h->header_len = read_u16(buf + 6);
- h->payload_len = read_u16(buf + 8);
- return 1;
-err:
- h->stream_type = UDP_UNKNOWN_STREAM;
- h->packet_type = UDP_UNKNOWN_PACKET;
- h->header_len = h->payload_len = 0;
- return -1;
-}
/** \file udp_recv.c Paraslash's udp receiver */
#include <dirent.h>
+#include <net/if.h>
#include "para.h"
#include "error.h"
#include "portable_io.h"
-#include "udp_header.h"
#include "list.h"
#include "sched.h"
#include "ggo.h"
/** The size of the receiver node buffer. */
#define UDP_RECV_CHUNK_SIZE (128 * 1024)
-
/**
* Data specific to the udp receiver.
*
* \sa \ref receiver, \ref receiver_node.
*/
struct private_udp_recv_data {
- /**
- * Whether a header was received.
- *
- * A flag indicating whether this receiver already received a packet
- * which contains the audio file header.
- *
- * This flag has no effect if the audio stream indicates that no extra
- * headers will be sent (mp3, aac). Otherwise, all data packets are
- * dropped until the header is received.
- */
- int have_header;
/** The socket file descriptor. */
int fd;
- /** Non-zero on short reads. */
- uint16_t need_more;
- /** Copied from the first audio header received. */
- uint16_t stream_type;
};
static void udp_recv_pre_select(struct sched *s, struct task *t)
return nbytes + loaded < UDP_RECV_CHUNK_SIZE;
}
-/*
- * Perform some sanity checks on an udp audio file header.
- *
- * return: negative on error, 0: discard data, 1: use data
- */
-static int examine_audio_header(struct private_udp_recv_data *purd,
- struct udp_audio_header *uah, size_t packet_size)
-{
- /* payload_len includes header */
- if (uah->payload_len < uah->header_len)
- return -E_UDP_BAD_HEADER;
- switch (uah->packet_type) {
- case UDP_EOF_PACKET:
- return -E_RECV_EOF;
- case UDP_BOF_PACKET:
- purd->have_header = 1;
- /* fall through */
- case UDP_DATA_PACKET:
- if (uah->header_len) /* header in no-header packet */
- return -E_UDP_BAD_HEADER;
- break;
- case UDP_HEADER_PACKET:
- if (!uah->header_len) /** no header in header packet */
- return -E_UDP_BAD_HEADER;
- break;
- default: /* bad packet type */
- return -E_UDP_BAD_HEADER;
- }
- /* check stream type */
- if (uah->stream_type != UDP_PLAIN_STREAM &&
- uah->stream_type != UDP_HEADER_STREAM)
- return -E_UDP_BAD_STREAM_TYPE;
- if (purd->stream_type == UDP_UNKNOWN_STREAM)
- purd->stream_type = uah->stream_type;
- /* stream type must not change */
- if (uah->stream_type != purd->stream_type)
- return -E_UDP_BAD_STREAM_TYPE;
- if (!purd->have_header && uah->stream_type == UDP_HEADER_STREAM)
- /* can't use the data, wait for header packet */
- return 0;
- if (packet_size < uah->payload_len + UDP_AUDIO_HEADER_LEN)
- /* we read only a part of the package */
- purd->need_more = uah->payload_len
- + UDP_AUDIO_HEADER_LEN - packet_size;
- return 1;
-}
-
static int add_rn_output(struct receiver_node *rn, char *buf, size_t len)
{
if (!len)
struct private_udp_recv_data *purd = rn->private_data;
int ret;
char tmpbuf[UDP_RECV_CHUNK_SIZE];
- uint16_t data_len;
- char *data_buf;
size_t packet_size;
- struct udp_audio_header uah;
if (rn->output_error && *rn->output_error < 0) {
t->error = *rn->output_error;
if (!ret)
return;
packet_size = ret;
- for (;;) {
- uint16_t num;
-
- if (!purd->need_more) {
- ret = read_udp_audio_header(tmpbuf, packet_size, &uah);
- if (ret >= 0)
- break;
- goto success; /* drop data */
- }
- num = PARA_MIN(purd->need_more, (uint16_t)packet_size);
- assert(num > 0);
- t->error = add_rn_output(rn, tmpbuf, num);
- if (t->error < 0)
- return;
- purd->need_more -= num;
- if (packet_size <= num)
- goto success;
- packet_size -= num;
- memmove(tmpbuf, tmpbuf + num, packet_size);
- }
- assert(!purd->need_more);
- t->error = examine_audio_header(purd, &uah, packet_size);
- if (t->error <= 0)
+ t->error = add_rn_output(rn, tmpbuf, packet_size);
+ if (t->error < 0)
return;
- data_len = uah.payload_len;
- data_buf = tmpbuf + UDP_AUDIO_HEADER_LEN;
- if (uah.packet_type == UDP_HEADER_PACKET) {
- if (purd->have_header) { /* skip header */
- data_buf += uah.header_len;
- data_len -= uah.header_len;
- } else { /* only use the header */
- purd->have_header = 1;
- data_len = uah.header_len;
- }
- }
- t->error = add_rn_output(rn, data_buf, data_len);
- return;
success:
t->error = 1;
}
ret = mark_fd_nonblocking(purd->fd);
if (ret < 0)
goto err;
- purd->stream_type = UDP_UNKNOWN_STREAM;
PARA_NOTICE_LOG("receiving from %s:%d, fd=%d\n", c->host_arg,
c->port_arg, purd->fd);
return purd->fd;
#include <sys/time.h>
#include <dirent.h>
+#include <net/if.h>
#include "server.cmdline.h"
#include "para.h"
#include "list.h"
#include "send.h"
#include "portable_io.h"
-#include "udp_header.h"
#include "net.h"
#include "fd.h"
#include "sched.h"
#include "close_on_fork.h"
#include "chunk_queue.h"
-/** Convert in_addr to ascii. */
-#define TARGET_ADDR(oc) inet_ntoa((oc)->addr)
-
/** Describes one entry in the list of targets for the udp sender. */
struct udp_target {
- /** Address info. */
- struct in_addr addr;
/** The position of this target in the list of targets. */
struct list_head node;
+ /** The hostname (DNS name or IPv4/v6 address string). */
+ char host[MAX_HOSTLEN];
/** The UDP port. */
int port;
/** The socket fd. */
int fd;
/** The list of queued chunks for this fd. */
struct chunk_queue *cq;
+ /** The opaque structure returned by vss_add_fec_client(). */
+ struct fec_client *fc;
+ /** The FEC parameters for this target. */
+ struct fec_client_parms fcp;
};
static struct list_head targets;
static void udp_delete_target(struct udp_target *ut, const char *msg)
{
- PARA_NOTICE_LOG("deleting %s:%d (%s) from list\n", TARGET_ADDR(ut),
+ PARA_NOTICE_LOG("deleting %s#%d (%s) from list\n", ut->host,
ut->port, msg);
udp_close_target(ut);
+ vss_del_fec_client(ut->fc);
list_del(&ut->node);
free(ut);
}
if (ut->fd >= 0) /* nothing to do */
return 0;
- ret = makesock(AF_UNSPEC, IPPROTO_UDP, 0, TARGET_ADDR(ut), ut->port);
+ ret = makesock(AF_UNSPEC, IPPROTO_UDP, 0, ut->host, ut->port);
if (ret < 0)
return ret;
ut->fd = ret;
}
add_close_on_fork_list(ut->fd);
ut->cq = cq_new(UDP_CQ_BYTES);
- PARA_NOTICE_LOG("sending to udp %s:%d\n", TARGET_ADDR(ut), ut->port);
+ PARA_NOTICE_LOG("sending to udp %s#%d using fec parms %d:%d:%d\n",
+ ut->host, ut->port , ut->fcp.max_slice_bytes,
+ ut->fcp.data_slices_per_group, ut->fcp.slices_per_group);
return 1;
}
-static void udp_send_buf(char *buf, size_t len)
-{
- struct udp_target *ut, *tmp;
- int ret;
-
- list_for_each_entry_safe(ut, tmp, &targets, node) {
- ret = udp_init_session(ut);
- if (ret < 0) {
- udp_delete_target(ut, para_strerror(-ret));
- continue;
- }
- ret = send_queued_chunks(ut->fd, ut->cq, 0);
- if (ret < 0) {
- udp_delete_target(ut, para_strerror(-ret));
- continue;
- }
- if (!len)
- continue;
- if (!ret) { /* still data left in the queue */
- ret = cq_enqueue(ut->cq, buf, len);
- if (ret < 0) {
- udp_delete_target(ut, para_strerror(-ret));
- continue;
- }
- }
- ret = write_nonblock(ut->fd, buf, len, 0);
- if (ret < 0) {
- udp_delete_target(ut, para_strerror(-ret));
- continue;
- }
- if (ret != len) {
- ret = cq_enqueue(ut->cq, buf + ret, len - ret);
- if (ret < 0) {
- udp_delete_target(ut, para_strerror(-ret));
- continue;
- }
- }
- }
-}
-
static void udp_shutdown_targets(void)
{
- char buf[UDP_AUDIO_HEADER_LEN];
struct udp_target *ut, *tmp;
- struct udp_audio_header uah = {
- .stream_type = UDP_UNKNOWN_STREAM,
- .packet_type = UDP_EOF_PACKET,
- };
+ const char *buf = NULL;
+ size_t len = 0; /* STFU, gcc */
- write_udp_audio_header(buf, &uah);
list_for_each_entry_safe(ut, tmp, &targets, node) {
if (ut->fd < 0)
continue;
- write(ut->fd, buf, UDP_AUDIO_HEADER_LEN);
+ if (!buf)
+ len = vss_get_fec_eof_packet(&buf);
+ write(ut->fd, buf, len);
udp_close_target(ut);
}
}
-static int need_extra_header(long unsigned current_chunk)
-{
- static struct timeval last_header;
- struct timeval diff;
-
- if (!current_chunk)
- return 0;
- tv_diff(now, &last_header, &diff);
- if (tv2ms(&diff) < conf.udp_header_interval_arg)
- return 0;
- last_header = *now;
- return 1;
-}
-
-static void udp_send(long unsigned current_chunk, __a_unused long unsigned chunks_sent,
- const char *buf, size_t len, const char *header_buf,
- size_t header_len)
-{
- char *sendbuf;
- size_t sendbuf_len;
- struct timeval *chunk_tv;
- struct udp_audio_header uah;
-
-// PARA_NOTICE_LOG("len: %zd, header_len: %zd\n", len, header_len);
- if (sender_status != SENDER_ON)
- return;
-
- /* we might not yet know the chunk time */
- chunk_tv = vss_chunk_time();
- if (!chunk_tv)
- return;
- if (list_empty(&targets))
- return;
- uah.stream_type = header_len? UDP_HEADER_STREAM : UDP_PLAIN_STREAM;
- uah.header_len = need_extra_header(current_chunk)? header_len : 0;
- if (!current_chunk)
- uah.packet_type = UDP_BOF_PACKET;
- else if (uah.header_len)
- uah.packet_type = UDP_HEADER_PACKET;
- else
- uah.packet_type = UDP_DATA_PACKET;
- uah.payload_len = uah.header_len + len;
- sendbuf_len = UDP_AUDIO_HEADER_LEN + uah.payload_len;
- sendbuf = para_malloc(sendbuf_len);
- write_udp_audio_header(sendbuf, &uah);
- if (uah.header_len)
- memcpy(sendbuf + UDP_AUDIO_HEADER_LEN, header_buf,
- uah.header_len);
- memcpy(sendbuf + UDP_AUDIO_HEADER_LEN + uah.header_len, buf, len);
- udp_send_buf(sendbuf, sendbuf_len);
- free(sendbuf);
-}
-
static int udp_com_on(__a_unused struct sender_command_data *scd)
{
sender_status = SENDER_ON;
static int udp_com_delete(struct sender_command_data *scd)
{
- char *a = para_strdup(inet_ntoa(scd->addr));
struct udp_target *ut, *tmp;
+
list_for_each_entry_safe(ut, tmp, &targets, node) {
- if (scd->port != ut->port)
+ /* Unspecified port means wildcard port match */
+ if (scd->port > 0 && scd->port != ut->port)
continue;
- if (strcmp(TARGET_ADDR(ut), a))
+ if (strcmp(ut->host, scd->host))
continue;
udp_delete_target(ut, "com_delete");
}
return 1;
}
-static void udp_add_target(int port, struct in_addr *addr)
+static int udp_send_fec(char *buf, size_t len, void *private_data)
+{
+ struct udp_target *ut = private_data;
+ int ret = udp_init_session(ut);
+
+ if (ret < 0)
+ goto fail;
+ ret = send_queued_chunks(ut->fd, ut->cq, 0);
+ if (ret < 0)
+ goto fail;
+ if (!len)
+ return 0;
+ if (!ret) { /* still data left in the queue */
+ ret = cq_enqueue(ut->cq, buf, len);
+ if (ret < 0)
+ goto fail;
+ }
+ ret = write_nonblock(ut->fd, buf, len, 0);
+ if (ret < 0)
+ goto fail;
+ if (ret != len) {
+ ret = cq_enqueue(ut->cq, buf + ret, len - ret);
+ if (ret < 0)
+ goto fail;
+ }
+ return 1;
+fail:
+ udp_delete_target(ut, para_strerror(-ret));
+ return ret;
+}
+
+static void udp_add_target(struct sender_command_data *scd)
{
struct udp_target *ut = para_calloc(sizeof(struct udp_target));
- ut->port = port;
- ut->addr = *addr;
+
+ strncpy(ut->host, scd->host, sizeof(ut->host));
+ ut->port = scd->port > 0 ? scd->port : conf.udp_default_port_arg;
ut->fd = -1; /* not yet connected */
- PARA_INFO_LOG("adding to target list (%s:%d)\n",
- TARGET_ADDR(ut), ut->port);
+ PARA_INFO_LOG("adding to target list (%s#%d)\n", ut->host, ut->port);
para_list_add(&ut->node, &targets);
+ ut->fcp.slices_per_group = scd->slices_per_group;
+ ut->fcp.data_slices_per_group = scd->data_slices_per_group;
+ ut->fcp.max_slice_bytes = scd->max_slice_bytes;
+ ut->fcp.send = udp_send_fec;
+ ut->fcp.private_data = ut;
+ vss_add_fec_client(&ut->fcp, &ut->fc);
}
static int udp_com_add(struct sender_command_data *scd)
{
- int port = (scd->port > 0)? scd->port : conf.udp_default_port_arg;
- udp_add_target(port, &scd->addr);
+ udp_add_target(scd);
return 1;
}
char *ret, *tgts = NULL;
list_for_each_entry(ut, &targets, node) {
- char *tmp = make_message("%s%s:%d ", tgts? tgts : "",
- TARGET_ADDR(ut), ut->port);
+ bool is_v6 = strchr(ut->host, ':') != NULL;
+ char *tmp = make_message("%s%s%s%s:%d ", tgts ? : "",
+ is_v6 ? "[" : "", ut->host,
+ is_v6 ? "]" : "", ut->port);
free(tgts);
tgts = tmp;
}
static void udp_init_target_list(void)
{
+ struct sender_command_data scd;
int i;
INIT_LIST_HEAD(&targets);
for (i = 0; i < conf.udp_target_given; i++) {
- char *arg = para_strdup(conf.udp_target_arg[i]);
- char *p = strchr(arg, ':');
- int port;
- struct in_addr addr;
-
- if (!p)
- goto err;
- *p = '\0';
- if (!inet_pton(AF_INET, arg, &addr))
- goto err;
- port = atoi(++p);
- if (port < 0 || port > 65535)
- port = conf.udp_default_port_arg;
- udp_add_target(port, &addr);
- goto success;
-err:
- PARA_CRIT_LOG("syntax error for udp target option "
- "#%d, ignoring\n", i);
-success:
- free(arg);
- continue;
+ if (parse_fec_url(conf.udp_target_arg[i], &scd) < 0) {
+ PARA_CRIT_LOG("syntax error for udp target option "
+ "#%d, ignoring\n", i);
+ continue;
+ }
+ udp_add_target(&scd);
}
}
{
return make_message(
"usage: {on|off}\n"
- "usage: {add|delete} IP port\n"
- "example: add 224.0.1.38 8000 (multicast streaming)\n"
+ "usage: {add|delete} host[:port][/packet_size:k:n]\n"
+ "examples: add 224.0.1.38:1500 (IPv4 multicast)\n"
+ " add 224.0.1.38:1500/1400:14:16\n"
+ " (likewise, using 1400 byte packets, with 14\n"
+ " data slices per 16 slice FEC group)\n"
+ " add 10.10.1.42 (using default port)\n"
+ " add [FF05::42]:1500 (IPv6 multicast)\n"
+ " add [::1] (IPv6 localhost/default port)\n"
+ " delete myhost.net (host with port wildcard)\n"
+ " delete [badc0de::1] (IPv6 with port wildcard)\n"
);
}
INIT_LIST_HEAD(&targets);
s->info = udp_info;
s->help = udp_help;
- s->send = udp_send;
+ s->send = NULL;
s->pre_select = NULL;
s->post_select = NULL;
s->shutdown_clients = udp_shutdown_targets;
#include "para.h"
#include "error.h"
+#include "portable_io.h"
+#include "fec.h"
#include "string.h"
#include "afh.h"
#include "afs.h"
#include "server.h"
#include "net.h"
#include "server.cmdline.h"
-#include "vss.h"
#include "list.h"
+#include "vss.h"
#include "send.h"
#include "ipc.h"
#include "fd.h"
size_t header_len;
};
+/**
+ * The list of currently connected fec clients.
+ *
+ * Senders may use \ref vss_add_fec_client() to add entries to the list.
+ */
+static struct list_head fec_client_list;
+
+/**
+ * Describes one slice of a FEC group.
+ *
+ * FEC slices directly correspond to the data packages sent by the paraslash
+ * senders that use FEC. Each slice is identified by its group number and its
+ * number within the group. All slices have the same size, but the last slice
+ * of the group may not be filled entirely.
+ */
+struct fec_slice {
+ /** The slice number within the FEC group. */
+ uint8_t num;
+ /** The number of used bytes in this slice. */
+ uint16_t bytes;
+};
+
+/**
+ * Data associated with one FEC group.
+ *
+ * A FEC group consists of a fixed number of slices and this number is given by
+ * the \a slices_per_group parameter of struct \ref fec_client_parms. Each FEC
+ * group contains a number of chunks of the current audio file.
+ */
+struct fec_group {
+ /** The number of the FEC group. */
+ uint32_t num;
+ /** Number of bytes in this group. */
+ uint32_t bytes;
+ /** The first chunk of the current audio file belonging to the group. */
+ uint32_t first_chunk;
+ /** The number of chunks contained in this group. */
+ uint32_t num_chunks;
+ /** The time needed to play all chunks of the group. */
+ struct timeval duration;
+ /** When the first chunk was sent. */
+ struct timeval start;
+ /** \a The group duration divided by \a slices_per_group. */
+ struct timeval slice_duration;
+};
+
+/**
+ * Describes one connected FEC client.
+ */
+struct fec_client {
+ /** Parameters requested by the client. */
+ struct fec_client_parms *fcp;
+ /** Used by the core FEC code. */
+ struct fec_parms *parms;
+ /** The position of this client in the fec client list. */
+ struct list_head node;
+ /** When the first slice for this client was sent. */
+ struct timeval stream_start;
+ /** The first chunk sent to this FEC client. */
+ int first_stream_chunk;
+ /** Describes the current group. */
+ struct fec_group group;
+ /** Describes the current slice. */
+ struct fec_slice slice;
+ /** The data to be FEC-encoded (point to a region within the mapped audio file). */
+ const unsigned char **src_data;
+ /** Used for the last source pointer of the last group. */
+ unsigned char *extra_src_buf;
+ /** The size of the buffer for the extra source pointer. */
+ size_t extra_src_buf_size;
+ /** Contains FEC-encoded data. */
+ unsigned char *enc_buf;
+ /** Size of \a enc_buf. */
+ size_t enc_buf_size;
+};
+
+/**
+ * Get the chunk time of the current audio file.
+ *
+ * \return A pointer to a struct containing the chunk time, or NULL,
+ * if currently no audio file is selected.
+ */
+struct timeval *vss_chunk_time(void)
+{
+ if (mmd->afd.afhi.chunk_tv.tv_sec == 0 &&
+ mmd->afd.afhi.chunk_tv.tv_usec == 0)
+ return NULL;
+ return &mmd->afd.afhi.chunk_tv;
+}
+
+/**
+ * Write a fec header to a buffer.
+ *
+ * \param buf The buffer to write to.
+ * \param h The fec header to write.
+ */
+static void write_fec_header(struct fec_client *fc)
+{
+ char *buf = (char *)fc->enc_buf;
+
+ write_u32(buf, FEC_MAGIC);
+
+ write_u8(buf + 4, fc->fcp->slices_per_group);
+ write_u8(buf + 5, fc->fcp->data_slices_per_group);
+ write_u32(buf + 6, (uint32_t)0); /* audio header len */
+
+ write_u32(buf + 10, fc->group.num);
+ write_u32(buf + 14, fc->group.bytes);
+
+ write_u8(buf + 18, fc->slice.num);
+ write_u16(buf + 20, fc->slice.bytes);
+ memset(buf + 22, 0, 11);
+}
+
+static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst)
+{
+ uint32_t max_group_size, last_payload_size;
+ int i, k = fc->fcp->data_slices_per_group;
+ size_t len;
+ const char *start_buf;
+ struct timeval tmp, *chunk_tv = vss_chunk_time();
+
+ assert(chunk_tv);
+ if (fc->first_stream_chunk < 0) {
+ fc->stream_start = *now;
+ fc->first_stream_chunk = mmd->current_chunk;
+ fc->group.first_chunk = mmd->current_chunk;
+ fc->group.num = 0;
+ } else {
+ fc->group.first_chunk += fc->group.num_chunks;
+ fc->group.num++;
+ }
+ if (fc->group.first_chunk >= mmd->afd.afhi.chunks_total)
+ return 0;
+ max_group_size = (fc->fcp->max_slice_bytes - FEC_HEADER_SIZE) * k;
+ afh_get_chunk(fc->group.first_chunk, &mmd->afd.afhi, vsst->map,
+ &start_buf, &len);
+ for (i = fc->group.first_chunk, fc->group.bytes = 0;
+ i < mmd->afd.afhi.chunks_total; i++) {
+ const char *buf;
+
+ afh_get_chunk(i, &mmd->afd.afhi, vsst->map, &buf, &len);
+ if (fc->group.bytes + len > max_group_size)
+ break;
+ fc->group.bytes += len;
+ }
+ fc->group.num_chunks = i - fc->group.first_chunk;
+ fc->slice.num = 0;
+ fc->slice.bytes = ROUND_UP(fc->group.bytes, k) / k;
+
+ /* The last slice will not be fully used */
+ last_payload_size = fc->group.bytes % fc->slice.bytes;
+ if (!last_payload_size)
+ last_payload_size = fc->slice.bytes;
+
+ tv_scale(fc->group.first_chunk - fc->first_stream_chunk, chunk_tv,
+ &tmp);
+ tv_add(&fc->stream_start, &tmp, &fc->group.start);
+ if (fc->group.num) /* quick hack to avoid buffer underruns */
+ fc->group.start.tv_sec--;
+ tv_scale(fc->group.num_chunks, chunk_tv, &fc->group.duration);
+ tv_divide(fc->fcp->slices_per_group, &fc->group.duration,
+ &fc->group.slice_duration);
+
+ for (i = 0; i < k; i++)
+ fc->src_data[i] = (const unsigned char *)start_buf
+ + i * fc->slice.bytes;
+
+ if (start_buf + k * fc->slice.bytes > vsst->map + mmd->size) {
+ /* can not use last slice as it goes beyond the map */
+ if (fc->extra_src_buf_size < fc->slice.bytes) {
+ fc->extra_src_buf = para_realloc(fc->extra_src_buf, fc->slice.bytes);
+ fc->extra_src_buf_size = fc->slice.bytes;
+ }
+ memcpy(fc->extra_src_buf, start_buf + (k - 1) * fc->slice.bytes,
+ last_payload_size);
+ memset(fc->extra_src_buf + last_payload_size, 0,
+ fc->slice.bytes - last_payload_size);
+ fc->src_data[k - 1] = fc->extra_src_buf;
+ }
+ if (fc->enc_buf_size < fc->slice.bytes + FEC_HEADER_SIZE) {
+ fc->enc_buf_size = fc->slice.bytes + FEC_HEADER_SIZE;
+ fc->enc_buf = para_realloc(fc->enc_buf, fc->enc_buf_size);
+ }
+ PARA_INFO_LOG("FEC group %d: %d chunks (%d - %d), duration: %lums\n",
+ fc->group.num, fc->group.num_chunks, fc->group.first_chunk,
+ fc->group.first_chunk + fc->group.num_chunks - 1,
+ tv2ms(&fc->group.duration));
+ return 1;
+}
+
+static int compute_next_fec_slice(struct fec_client *fc, struct vss_task *vsst)
+{
+ if (fc->first_stream_chunk < 0 || fc->slice.num
+ == fc->fcp->slices_per_group) {
+ if (!setup_next_fec_group(fc, vsst))
+ return 0;
+ }
+ write_fec_header(fc);
+ fec_encode(fc->parms, fc->src_data, fc->enc_buf + FEC_HEADER_SIZE,
+ fc->slice.num, fc->slice.bytes);
+ return 1;
+}
+
+/**
+ * Return a buffer that marks the end of the stream.
+ *
+ * \param buf Result pointer.
+ * \return The length of the eof buffer.
+ *
+ * This is used for (multicast) udp streaming where closing the socket on the
+ * sender might not give rise to an eof condition at the peer.
+ */
+size_t vss_get_fec_eof_packet(const char **buf)
+{
+ static const char fec_eof_packet[FEC_HEADER_SIZE] =
+ "\xec\x0d\xcc\xfe\0\0\0\0"
+ "\0\0\0\0\0\0\0\0"
+ "\0\0\0\0\0\0\0\0"
+ "\0\0\0\0\0\0\0\0";
+ *buf = fec_eof_packet;
+ return FEC_HEADER_SIZE;
+}
+
+/**
+ * Add one entry to the list of active fec clients.
+ *
+ * \param fcp Describes the fec parameters to be used for this client.
+ * \param result An opaque pointer that must be used by remove the client later.
+ *
+ * \return Standard.
+ */
+int vss_add_fec_client(struct fec_client_parms *fcp, struct fec_client **result)
+{
+ int ret;
+ struct fec_client *fc;
+
+ if (fcp->max_slice_bytes < FEC_HEADER_SIZE + fcp->data_slices_per_group)
+ return -ERRNO_TO_PARA_ERROR(EINVAL);
+ fc = para_calloc(sizeof(*fc));
+ fc->fcp = fcp;
+ ret = fec_new(fcp->data_slices_per_group, fcp->slices_per_group,
+ &fc->parms);
+ if (ret < 0)
+ goto err;
+ fc->first_stream_chunk = -1; /* stream not yet started */
+ fc->src_data = para_malloc(fc->fcp->slices_per_group * sizeof(char *));
+ para_list_add(&fc->node, &fec_client_list);
+ *result = fc;
+ return 1;
+err:
+ fec_free(fc->parms);
+ free(fc);
+ *result = NULL;
+ return ret;
+}
+
+/**
+ * Remove one entry from the list of active fec clients.
+ *
+ * \param fc The client to be removed.
+ */
+void vss_del_fec_client(struct fec_client *fc)
+{
+ list_del(&fc->node);
+ free(fc->src_data);
+ free(fc->enc_buf);
+ free(fc->extra_src_buf);
+ fec_free(fc->parms);
+ free(fc);
+}
+
+/*
+ * Compute if/when next slice is due. If it isn't due yet and \a diff is
+ * not \p Null, compute the time difference next - now, where
+ *
+ * next = stream_start + (first_group_chunk - first_stream_chunk)
+ * * chunk_time + slice_num * slice_time
+ */
+static int next_slice_is_due(struct fec_client *fc, struct timeval *diff)
+{
+ struct timeval tmp, next;
+ int ret;
+
+ if (fc->first_stream_chunk < 0)
+ return 1;
+ tv_scale(fc->slice.num, &fc->group.slice_duration, &tmp);
+ tv_add(&tmp, &fc->group.start, &next);
+ ret = tv_diff(&next, now, diff);
+ return ret < 0? 1 : 0;
+}
+
+static void compute_slice_timeout(struct timeval *timeout)
+{
+ struct fec_client *fc;
+
+ assert(vss_playing());
+ list_for_each_entry(fc, &fec_client_list, node) {
+ struct timeval diff;
+
+ if (next_slice_is_due(fc, &diff)) {
+ timeout->tv_sec = 0;
+ timeout->tv_usec = 0;
+ return;
+ }
+ /* timeout = min(timeout, diff) */
+ if (tv_diff(&diff, timeout, NULL) < 0)
+ *timeout = diff;
+ }
+}
+
/**
* Check if vss status flag \a P (playing) is set.
*
return NULL;
compute_chunk_time(mmd->chunks_sent, &mmd->afd.afhi.chunk_tv,
&mmd->stream_start, &next_chunk);
- if (chk_barrier("chunk", &next_chunk, &the_timeout, 0) < 0)
+ if (chk_barrier("chunk", &next_chunk, &the_timeout, 0) >= 0) {
+ /* chunk is due or bof */
+ the_timeout.tv_sec = 0;
+ the_timeout.tv_usec = 0;
return &the_timeout;
- /* chunk is due or bof */
- the_timeout.tv_sec = 0;
- the_timeout.tv_usec = 0;
+ }
+ /* compute min of current timeout and next slice time */
+ compute_slice_timeout(&the_timeout);
return &the_timeout;
}
static void vss_eof(struct vss_task *vsst)
{
+
mmd->stream_start = *now;
if (!vsst->map)
return;
return SUPPORTED_AUDIO_FORMATS;
}
-/**
- * Get the chunk time of the current audio file.
- *
- * \return A pointer to a struct containing the chunk time, or NULL,
- * if currently no audio file is selected.
- */
-struct timeval *vss_chunk_time(void)
-{
- if (mmd->afd.afhi.chunk_tv.tv_sec == 0 &&
- mmd->afd.afhi.chunk_tv.tv_usec == 0)
- return NULL;
- return &mmd->afd.afhi.chunk_tv;
-}
-
static int need_to_request_new_audio_file(struct vss_task *vsst)
{
struct timeval diff;
return 1;
}
+
+
/**
* Compute the timeout for para_server's main select-loop.
*
struct timeval *tv, diff;
struct vss_task *vsst = container_of(t, struct vss_task, task);
- if (!vsst->map || vss_next() || vss_paused() || vss_repos())
+ if (!vsst->map || vss_next() || vss_paused() || vss_repos()) {
+ struct fec_client *fc, *tmp;
for (i = 0; senders[i].name; i++)
- senders[i].shutdown_clients();
+ if (senders[i].shutdown_clients)
+ senders[i].shutdown_clients();
+ list_for_each_entry_safe(fc, tmp, &fec_client_list, node)
+ fc->first_stream_chunk = -1;
+ }
if (vss_next())
vss_eof(vsst);
else if (vss_paused()) {
/**
* Main sending function.
*
- * This function gets called from para_server as soon as the next chunk of data
- * should be pushed out. It obtains a pointer to the data to be sent out as
- * well as its length from mmd->afd.afhi. This information is then passed to
- * each supported sender's send() function which is supposed to send out the data
- * to all connected clients.
+ * This function gets called from vss_post_select(). It checks whether the next
+ * chunk of data should be pushed out. It obtains a pointer to the data to be
+ * sent out as well as its length from mmd->afd.afhi. This information is then
+ * passed to each supported sender's send() function as well as to the send()
+ * functions of each registered fec client.
*/
-static void vss_send_chunk(struct vss_task *vsst)
+static void vss_send(struct vss_task *vsst)
{
int i;
struct timeval due;
- const char *buf;
- size_t len;
+ struct fec_client *fc, *tmp_fc;
if (!vsst->map || !vss_playing())
return;
- compute_chunk_time(mmd->chunks_sent, &mmd->afd.afhi.chunk_tv,
- &mmd->stream_start, &due);
- if (tv_diff(&due, now, NULL) > 0)
- return;
if (chk_barrier("eof", &vsst->eof_barrier, &due, 1) < 0)
return;
if (chk_barrier("data send", &vsst->data_send_barrier,
mmd->new_vss_status_flags |= VSS_NEXT;
return;
}
- /*
- * We call the send function also in case of empty chunks as they
- * might have still some data queued which can be sent in this case.
- */
if (!mmd->chunks_sent) {
struct timeval tmp;
mmd->stream_start = *now;
mmd->offset = tv2ms(&tmp);
mmd->events++;
}
- afh_get_chunk(mmd->current_chunk, &mmd->afd.afhi, vsst->map, &buf, &len);
- for (i = 0; senders[i].name; i++)
- senders[i].send(mmd->current_chunk, mmd->chunks_sent, buf, len,
- vsst->header_buf, vsst->header_len);
+ compute_chunk_time(mmd->chunks_sent, &mmd->afd.afhi.chunk_tv,
+ &mmd->stream_start, &due);
+ if (tv_diff(&due, now, NULL) <= 0) {
+ const char *buf;
+ size_t len;
+ /*
+ * We call the send function also in case of empty chunks as
+ * they might have still some data queued which can be sent in
+ * this case.
+ */
+ afh_get_chunk(mmd->current_chunk, &mmd->afd.afhi, vsst->map,
+ &buf, &len);
+ for (i = 0; senders[i].name; i++) {
+ if (!senders[i].send)
+ continue;
+ senders[i].send(mmd->current_chunk, mmd->chunks_sent,
+ buf, len, vsst->header_buf, vsst->header_len);
+ }
+ }
+ list_for_each_entry_safe(fc, tmp_fc, &fec_client_list, node) {
+ if (!next_slice_is_due(fc, NULL))
+ continue;
+ if (!compute_next_fec_slice(fc, vsst))
+ continue;
+ PARA_DEBUG_LOG("sending %d:%d (%zu bytes)\n", fc->group.num,
+ fc->slice.num, fc->slice.bytes);
+ fc->fcp->send((char *)fc->enc_buf,
+ fc->slice.bytes + FEC_HEADER_SIZE,
+ fc->fcp->private_data);
+ fc->slice.num++;
+ }
mmd->new_vss_status_flags |= VSS_PLAYING;
mmd->chunks_sent++;
mmd->current_chunk++;
if ((vss_playing() && !(mmd->vss_status_flags & VSS_PLAYING)) ||
(vss_next() && vss_playing()))
tv_add(now, &vsst->announce_tv, &vsst->data_send_barrier);
- vss_send_chunk(vsst);
+ vss_send(vsst);
}
/**
tv_add(&vsst->autoplay_barrier, &vsst->announce_tv,
&vsst->data_send_barrier);
}
+ INIT_LIST_HEAD(&fec_client_list);
register_task(&vsst->task);
}
#define VSS_REPOS 4
/** Currently playing. */
#define VSS_PLAYING 8
-/** A client requested to change the audio file selector. */
-#define VSS_CHANGE 16
+
+/**
+ * Each paraslash sender may register arbitrary many clients to the virtual
+ * streaming system, possibly with varying fec parameters. In order to do so,
+ * it must allocate a \a fec_client_parms structure and pass it to \ref
+ * add_fec_client.
+ *
+ * Clients are automatically removed from that list by the vss if an error
+ * occurs, or if the sender requests deletion of a client by calling \ref
+ * vss_del_fec_client().
+ */
+struct fec_client;
+
+/** FEC parameters requested by FEC clients. */
+struct fec_client_parms {
+ /** Number of data slices plus redundant slices. */
+ uint8_t slices_per_group;
+ /** Number of slices minus number of redundant slices. */
+ uint8_t data_slices_per_group;
+ /** Maximal number of bytes per slice. */
+ uint16_t max_slice_bytes;
+ /** Called by vss.c when the next slice should be sent. */
+ int (*send)(char *buf, size_t num_bytes, void *private_data);
+ /** Passed verbatim to \a send(). */
+ void *private_data;
+};
+
+int vss_add_fec_client(struct fec_client_parms *fcp, struct fec_client **result);
+void vss_del_fec_client(struct fec_client *fc);
+size_t vss_get_fec_eof_packet(const char **buf);