From: Andre Noll Date: Mon, 2 Mar 2009 09:39:59 +0000 (+0100) Subject: Merge commit 'meins/master' X-Git-Tag: v0.3.4~57 X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=commitdiff_plain;h=eea9d1cae30df921ae2fd1771518b8b21dbd0daf;hp=deaebb6eaaa83ec633b851c9c918683508008e47 Merge commit 'meins/master' --- diff --git a/aacdec_filter.c b/aacdec_filter.c index d9226b0d..1ceca1fc 100644 --- a/aacdec_filter.c +++ b/aacdec_filter.c @@ -62,7 +62,10 @@ static ssize_t aacdec(char *input_buffer, size_t len, struct filter_node *fn) if (fn->loaded > fn->bufsize * 3 / 5) return 0; - if (len < 2048 && !*fc->input_error) + ret = *fc->input_error; + if (ret < 0) + return ret; + if (len < 2048) return 0; if (!padd->initialized) { diff --git a/client_common.c b/client_common.c index d5807e75..c1e23f7a 100644 --- a/client_common.c +++ b/client_common.c @@ -312,6 +312,7 @@ err_out: * \param argv Usual argument vector. * \param ct_ptr Points to dynamically allocated and initialized client task * struct upon successful return. + * \param loglevel If not \p NULL, the number of the loglevel is stored here. * * Check the command line options given by \a argc and argv, set default values * for user name and rsa key file, read further option from the config file. @@ -323,7 +324,6 @@ int client_open(int argc, char *argv[], struct client_task **ct_ptr, int *loglevel) { char *home = para_homedir(); - struct stat statbuf; int ret; struct client_task *ct = para_calloc(sizeof(struct client_task)); @@ -346,12 +346,12 @@ int client_open(int argc, char *argv[], struct client_task **ct_ptr, ct->config_file = ct->conf.config_file_given? para_strdup(ct->conf.config_file_arg) : make_message("%s/.paraslash/client.conf", home); - ret = stat(ct->config_file, &statbuf); - if (ret && ct->conf.config_file_given) { + ret = file_exists(ct->config_file); + if (!ret && ct->conf.config_file_given) { ret = -E_NO_CONFIG; goto out; } - if (!ret) { + if (ret) { struct client_cmdline_parser_params params = { .override = 0, .initialize = 0, diff --git a/command.c b/command.c index 988eae64..b59fae7a 100644 --- a/command.c +++ b/command.c @@ -204,17 +204,9 @@ static int check_sender_args(int argc, char * const * argv, struct sender_comman break; case SENDER_ADD: case SENDER_DELETE: - if (argc != 4 && argc != 5) - return -E_COMMAND_SYNTAX; - if (!inet_pton(AF_INET, argv[3], &scd->addr)) + if (argc != 4) return -E_COMMAND_SYNTAX; - scd->port = -1; - if (argc == 5) { - scd->port = atoi(argv[4]); - if (scd->port < 0 || scd->port > 65535) - return -E_COMMAND_SYNTAX; - } - break; + return parse_fec_url(argv[3], scd); default: return -E_COMMAND_SYNTAX; } diff --git a/configure.ac b/configure.ac index 13ec59ba..f783db78 100644 --- a/configure.ac +++ b/configure.ac @@ -84,22 +84,23 @@ dccp_send fd user_list chunk_queue afs osl aft mood score attribute blob ringbuf playlist sha1 rbtree sched audiod grab_client filter_common wav_filter compress_filter http_recv dccp_recv recv_common write_common file_write audiod_command client_common recv stdout filter stdin audioc write client fsck exec send_common ggo -udp_recv udp_send color" +udp_recv udp_send color fec fecdec_filter" all_executables="server recv filter audioc write client fsck afh" recv_cmdline_objs="recv.cmdline http_recv.cmdline dccp_recv.cmdline udp_recv.cmdline" recv_errlist_objs="http_recv recv_common recv time string net dccp_recv - fd sched stdout ggo udp_recv" + fd sched stdout ggo udp_recv fec" recv_ldflags="" receivers=" http dccp udp" senders=" http dccp udp" filter_cmdline_objs="filter.cmdline compress_filter.cmdline amp_filter.cmdline" -filter_errlist_objs="filter_common wav_filter compress_filter filter string stdin stdout sched fd amp_filter ggo" +filter_errlist_objs="filter_common wav_filter compress_filter filter string + stdin stdout sched fd amp_filter ggo fecdec_filter fec" filter_ldflags="" -filters=" compress wav amp" +filters=" compress wav amp fecdec" audioc_cmdline_objs="audioc.cmdline" audioc_errlist_objs="audioc string net fd" @@ -110,8 +111,8 @@ audiod_cmdline_objs="audiod.cmdline grab_client.cmdline compress_filter.cmdline audiod_command_list amp_filter.cmdline udp_recv.cmdline" audiod_errlist_objs="audiod signal string daemon stat net time grab_client filter_common wav_filter compress_filter amp_filter http_recv dccp_recv - recv_common fd sched write_common file_write audiod_command crypt - client_common ggo udp_recv color" + recv_common fd sched write_common file_write audiod_command crypt fecdec_filter + client_common ggo udp_recv color fec" audiod_ldflags="" audiod_audio_formats="" @@ -123,7 +124,7 @@ server_cmdline_objs="server.cmdline server_command_list afs_command_list" server_errlist_objs="server afh_common mp3_afh vss command net string signal time daemon stat crypt http_send close_on_fork ipc dccp_send fd user_list chunk_queue afs osl aft mood score attribute - blob playlist sha1 rbtree sched acl send_common udp_send color" + blob playlist sha1 rbtree sched acl send_common udp_send color fec" server_ldflags="" server_audio_formats=" mp3" @@ -222,6 +223,7 @@ AC_SEARCH_LIBS([inet_ntoa],[nsl],[],[ ########################################################################### ucred AC_MSG_CHECKING(for struct ucred) AC_TRY_LINK([ + #define _GNU_SOURCE #include #include ],[ diff --git a/error.h b/error.h index 78d29792..24f2a113 100644 --- a/error.h +++ b/error.h @@ -31,13 +31,27 @@ DEFINE_ERRLIST_OBJECT_ENUM; #define GGO_ERRORS #define COLOR_ERRORS - extern const char **para_errlist[]; #define COMPRESS_FILTER_ERRORS \ PARA_ERROR(COMPRESS_SYNTAX, "syntax error in compress filter config"), \ +#define FEC_ERRORS \ + PARA_ERROR(FEC_BAD_IDX, "invalid index vector"), \ + PARA_ERROR(FEC_SINGULAR, "unexpected singular matrix"), \ + PARA_ERROR(FEC_PIVOT, "pivot column not found"), \ + PARA_ERROR(FEC_PARMS, "invalid fec parameters"), \ + + +#define FECDEC_FILTER_ERRORS \ + PARA_ERROR(BAD_FEC_HEADER, "invalid fec header"), \ + PARA_ERROR(BAD_SLICE_SIZE, "slice size too large"), \ + PARA_ERROR(BAD_SLICE_NUM, "invalid slice number"), \ + PARA_ERROR(FECDEC_OVERRUN, "fecdec output buffer overrun"), \ + PARA_ERROR(FECDEC_EOF, "received eof packet"), \ + + #define AMP_FILTER_ERRORS \ PARA_ERROR(AMP_SYNTAX, "syntax error in amp filter config"), \ diff --git a/fec.c b/fec.c new file mode 100644 index 00000000..f3e68454 --- /dev/null +++ b/fec.c @@ -0,0 +1,603 @@ +/* + * fec.c -- forward error correction based on Vandermonde matrices + * 980624 + * (C) 1997-98 Luigi Rizzo (luigi@iet.unipi.it) + * + * Portions derived from code by Phil Karn (karn@ka9q.ampr.org), + * Robert Morelos-Zaragoza (robert@spectra.eng.hawaii.edu) and Hari + * Thirumoorthy (harit@spectra.eng.hawaii.edu), Aug 1995 + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A + * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS + * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, + * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR + * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT + * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + */ + +#include "para.h" +#include "error.h" +#include "portable_io.h" +#include "string.h" +#include "fec.h" + +#define GF_BITS 8 /* code over GF(256) */ +#define GF_SIZE ((1 << GF_BITS) - 1) + +/* + * To speed up computations, we have tables for logarithm, exponent and inverse + * of a number. We use a table for multiplication as well (it takes 64K, no big + * deal even on a PDA, especially because it can be pre-initialized an put into + * a ROM!). The macro gf_mul(x,y) takes care of multiplications. + */ +static unsigned char gf_exp[2 * GF_SIZE]; /* index->poly form conversion table */ +static int gf_log[GF_SIZE + 1]; /* Poly->index form conversion table */ +static unsigned char inverse[GF_SIZE + 1]; /* inverse of field elem. */ +static unsigned char gf_mul_table[GF_SIZE + 1][GF_SIZE + 1]; +/* Multiply two numbers. */ +#define gf_mul(x,y) gf_mul_table[x][y] + +/* Compute x % GF_SIZE without a slow divide. */ +static inline unsigned char modnn(int x) +{ + while (x >= GF_SIZE) { + x -= GF_SIZE; + x = (x >> GF_BITS) + (x & GF_SIZE); + } + return x; +} + +static void init_mul_table(void) +{ + int i, j; + for (i = 0; i < GF_SIZE + 1; i++) + for (j = 0; j < GF_SIZE + 1; j++) + gf_mul_table[i][j] = + gf_exp[modnn(gf_log[i] + gf_log[j])]; + + for (j = 0; j < GF_SIZE + 1; j++) + gf_mul_table[0][j] = gf_mul_table[j][0] = 0; +} + +static unsigned char *alloc_matrix(int rows, int cols) +{ + return para_malloc(rows * cols); +} + +/* + * Initialize the data structures used for computations in GF. + * + * This generates GF(2**GF_BITS) from the irreducible polynomial p(X) in + * p[0]..p[m]. + * + * Lookup tables: + * index->polynomial form gf_exp[] contains j= \alpha^i; + * polynomial form -> index form gf_log[ j = \alpha^i ] = i + * \alpha=x is the primitive element of GF(2^m) + * + * For efficiency, gf_exp[] has size 2*GF_SIZE, so that a simple + * multiplication of two numbers can be resolved without calling modnn + */ +static void generate_gf(void) +{ + int i; + unsigned char mask = 1; + char *pp = "101110001"; /* The primitive polynomial 1+x^2+x^3+x^4+x^8 */ + gf_exp[GF_BITS] = 0; /* will be updated at the end of the 1st loop */ + + /* + * first, generate the (polynomial representation of) powers of \alpha, + * which are stored in gf_exp[i] = \alpha ** i . + * At the same time build gf_log[gf_exp[i]] = i . + * The first GF_BITS powers are simply bits shifted to the left. + */ + for (i = 0; i < GF_BITS; i++, mask <<= 1) { + gf_exp[i] = mask; + gf_log[gf_exp[i]] = i; + /* + * If pp[i] == 1 then \alpha ** i occurs in poly-repr + * gf_exp[GF_BITS] = \alpha ** GF_BITS + */ + if (pp[i] == '1') + gf_exp[GF_BITS] ^= mask; + } + /* + * now gf_exp[GF_BITS] = \alpha ** GF_BITS is complete, so can also + * compute its inverse. + */ + gf_log[gf_exp[GF_BITS]] = GF_BITS; + /* + * Poly-repr of \alpha ** (i+1) is given by poly-repr of \alpha ** i + * shifted left one-bit and accounting for any \alpha ** GF_BITS term + * that may occur when poly-repr of \alpha ** i is shifted. + */ + mask = 1 << (GF_BITS - 1); + for (i = GF_BITS + 1; i < GF_SIZE; i++) { + if (gf_exp[i - 1] >= mask) + gf_exp[i] = + gf_exp[GF_BITS] ^ ((gf_exp[i - 1] ^ mask) << 1); + else + gf_exp[i] = gf_exp[i - 1] << 1; + gf_log[gf_exp[i]] = i; + } + /* + * log(0) is not defined, so use a special value + */ + gf_log[0] = GF_SIZE; + /* set the extended gf_exp values for fast multiply */ + for (i = 0; i < GF_SIZE; i++) + gf_exp[i + GF_SIZE] = gf_exp[i]; + + inverse[0] = 0; /* 0 has no inverse. */ + inverse[1] = 1; + for (i = 2; i <= GF_SIZE; i++) + inverse[i] = gf_exp[GF_SIZE - gf_log[i]]; +} + +/* + * Compute dst[] = dst[] + c * src[] + * + * This is used often, so better optimize it! Currently the loop is unrolled 16 + * times. The case c=0 is also optimized, whereas c=1 is not. + */ +#define UNROLL 16 +static void addmul(unsigned char *dst1, const unsigned char const *src1, + unsigned char c, int sz) +{ + if (c == 0) + return; + unsigned char *dst = dst1, *lim = &dst[sz - UNROLL + 1], + *col = gf_mul_table[c]; + const unsigned char const *src = src1; + + for (; dst < lim; dst += UNROLL, src += UNROLL) { + dst[0] ^= col[src[0]]; + dst[1] ^= col[src[1]]; + dst[2] ^= col[src[2]]; + dst[3] ^= col[src[3]]; + dst[4] ^= col[src[4]]; + dst[5] ^= col[src[5]]; + dst[6] ^= col[src[6]]; + dst[7] ^= col[src[7]]; + dst[8] ^= col[src[8]]; + dst[9] ^= col[src[9]]; + dst[10] ^= col[src[10]]; + dst[11] ^= col[src[11]]; + dst[12] ^= col[src[12]]; + dst[13] ^= col[src[13]]; + dst[14] ^= col[src[14]]; + dst[15] ^= col[src[15]]; + } + lim += UNROLL - 1; + for (; dst < lim; dst++, src++) /* final components */ + *dst ^= col[*src]; +} + +/* + * Compute C = AB where A is n*k, B is k*m, C is n*m + */ +static void matmul(unsigned char *a, unsigned char *b, unsigned char *c, + int n, int k, int m) +{ + int row, col, i; + + for (row = 0; row < n; row++) { + for (col = 0; col < m; col++) { + unsigned char *pa = &a[row * k], *pb = &b[col], acc = 0; + for (i = 0; i < k; i++, pa++, pb += m) + acc ^= gf_mul(*pa, *pb); + c[row * m + col] = acc; + } + } +} + +#define FEC_SWAP(a,b) {typeof(a) tmp = a; a = b; b = tmp;} + +/* + * Compute the inverse of a matrix. + * + * k is the size of the matrix 'src' (Gauss-Jordan, adapted from Numerical + * Recipes in C). Returns -1 if 'src' is singular. + */ +static int invert_mat(unsigned char *src, int k) +{ + int irow, icol, row, col, ix, error; + int *indxc = para_malloc(k * sizeof(int)); + int *indxr = para_malloc(k * sizeof(int)); + int *ipiv = para_malloc(k * sizeof(int)); /* elements used as pivots */ + unsigned char c, *p, *id_row = alloc_matrix(1, k), + *temp_row = alloc_matrix(1, k); + + memset(id_row, 0, k); + memset(ipiv, 0, k * sizeof(int)); + + for (col = 0; col < k; col++) { + unsigned char *pivot_row; + /* + * Zeroing column 'col', look for a non-zero element. + * First try on the diagonal, if it fails, look elsewhere. + */ + irow = icol = -1; + if (ipiv[col] != 1 && src[col * k + col] != 0) { + irow = col; + icol = col; + goto found_piv; + } + for (row = 0; row < k; row++) { + if (ipiv[row] != 1) { + for (ix = 0; ix < k; ix++) { + if (ipiv[ix] == 0) { + if (src[row * k + ix] != 0) { + irow = row; + icol = ix; + goto found_piv; + } + } else if (ipiv[ix] > 1) { + error = -E_FEC_PIVOT; + goto fail; + } + } + } + } + error = -E_FEC_PIVOT; + if (icol == -1) + goto fail; +found_piv: + ++(ipiv[icol]); + /* + * swap rows irow and icol, so afterwards the diagonal element + * will be correct. Rarely done, not worth optimizing. + */ + if (irow != icol) + for (ix = 0; ix < k; ix++) + FEC_SWAP(src[irow * k + ix], src[icol * k + ix]); + indxr[col] = irow; + indxc[col] = icol; + pivot_row = &src[icol * k]; + error = -E_FEC_SINGULAR; + c = pivot_row[icol]; + if (c == 0) + goto fail; + if (c != 1) { /* otherwise this is a NOP */ + /* + * this is done often , but optimizing is not so + * fruitful, at least in the obvious ways (unrolling) + */ + c = inverse[c]; + pivot_row[icol] = 1; + for (ix = 0; ix < k; ix++) + pivot_row[ix] = gf_mul(c, pivot_row[ix]); + } + /* + * from all rows, remove multiples of the selected row to zero + * the relevant entry (in fact, the entry is not zero because + * we know it must be zero). (Here, if we know that the + * pivot_row is the identity, we can optimize the addmul). + */ + id_row[icol] = 1; + if (memcmp(pivot_row, id_row, k) != 0) { + for (p = src, ix = 0; ix < k; ix++, p += k) { + if (ix != icol) { + c = p[icol]; + p[icol] = 0; + addmul(p, pivot_row, c, k); + } + } + } + id_row[icol] = 0; + } + for (col = k - 1; col >= 0; col--) { + if (indxr[col] < 0 || indxr[col] >= k) + PARA_CRIT_LOG("AARGH, indxr[col] %d\n", indxr[col]); + else if (indxc[col] < 0 || indxc[col] >= k) + PARA_CRIT_LOG("AARGH, indxc[col] %d\n", indxc[col]); + else if (indxr[col] != indxc[col]) { + for (row = 0; row < k; row++) { + FEC_SWAP(src[row * k + indxr[col]], + src[row * k + indxc[col]]); + } + } + } + error = 0; +fail: + free(indxc); + free(indxr); + free(ipiv); + free(id_row); + free(temp_row); + return error; +} + +/* + * Invert a Vandermonde matrix. + * + * It assumes that the matrix is not singular and _IS_ a Vandermonde matrix. + * Only uses the second column of the matrix, containing the p_i's. + * + * Algorithm borrowed from "Numerical recipes in C" -- sec.2.8, but largely + * revised for GF purposes. + */ +static void invert_vdm(unsigned char *src, int k) +{ + int i, j, row, col; + unsigned char *b, *c, *p, t, xx; + + if (k == 1) /* degenerate */ + return; + /* + * c holds the coefficient of P(x) = Prod (x - p_i), i=0..k-1 + * b holds the coefficient for the matrix inversion + */ + c = para_malloc(k); + b = para_malloc(k); + p = para_malloc(k); + + for (j = 1, i = 0; i < k; i++, j += k) { + c[i] = 0; + p[i] = src[j]; + } + /* + * construct coeffs recursively. We know c[k] = 1 (implicit) and start + * P_0 = x - p_0, then at each stage multiply by x - p_i generating P_i + * = x P_{i-1} - p_i P_{i-1} After k steps we are done. + */ + c[k - 1] = p[0]; /* really -p(0), but x = -x in GF(2^m) */ + for (i = 1; i < k; i++) { + unsigned char p_i = p[i]; + for (j = k - 1 - (i - 1); j < k - 1; j++) + c[j] ^= gf_mul(p_i, c[j + 1]); + c[k - 1] ^= p_i; + } + + for (row = 0; row < k; row++) { + /* + * synthetic division etc. + */ + xx = p[row]; + t = 1; + b[k - 1] = 1; /* this is in fact c[k] */ + for (i = k - 2; i >= 0; i--) { + b[i] = c[i + 1] ^ gf_mul(xx, b[i + 1]); + t = gf_mul(xx, t) ^ b[i]; + } + for (col = 0; col < k; col++) + src[col * k + row] = gf_mul(inverse[t], b[col]); + } + free(c); + free(b); + free(p); +} + +static int fec_initialized; + +static void init_fec(void) +{ + generate_gf(); + init_mul_table(); + fec_initialized = 1; +} + +/** Internal FEC parameters. */ +struct fec_parms { + /** Number of data slices. */ + int k; + /** Number of slices (including redundant slices). */ + int n; + /** The encoding matrix, computed by init_fec(). */ + unsigned char *enc_matrix; +}; + +/** + * Deallocate a fec params structure. + * + * \param p The structure to free. + */ +void fec_free(struct fec_parms *p) +{ + if (!p) + return; + free(p->enc_matrix); + free(p); +} + +/** + * Create a new encoder and return an opaque descriptor to it. + * + * \param k Number of input slices. + * \param n Number of output slices. + * \param result On success the Fec descriptor is returned here. + * + * \return Standard. + * + * This creates the k*n encoding matrix. It is computed starting with a + * Vandermonde matrix, and then transformed into a systematic matrix. + */ +int fec_new(int k, int n, struct fec_parms **result) +{ + int row, col; + unsigned char *p, *tmp_m; + struct fec_parms *parms; + + if (!fec_initialized) + init_fec(); + + if (k < 1 || k > GF_SIZE + 1 || n > GF_SIZE + 1 || k > n) + return -E_FEC_PARMS; + parms = para_malloc(sizeof(struct fec_parms)); + parms->k = k; + parms->n = n; + parms->enc_matrix = alloc_matrix(n, k); + tmp_m = alloc_matrix(n, k); + /* + * fill the matrix with powers of field elements, starting from 0. + * The first row is special, cannot be computed with exp. table. + */ + tmp_m[0] = 1; + for (col = 1; col < k; col++) + tmp_m[col] = 0; + for (p = tmp_m + k, row = 0; row < n - 1; row++, p += k) { + for (col = 0; col < k; col++) + p[col] = gf_exp[modnn(row * col)]; + } + + /* + * quick code to build systematic matrix: invert the top + * k*k vandermonde matrix, multiply right the bottom n-k rows + * by the inverse, and construct the identity matrix at the top. + */ + invert_vdm(tmp_m, k); /* much faster than invert_mat */ + matmul(tmp_m + k * k, tmp_m, parms->enc_matrix + k * k, n - k, k, k); + /* + * the upper matrix is I so do not bother with a slow multiply + */ + memset(parms->enc_matrix, 0, k * k); + for (p = parms->enc_matrix, col = 0; col < k; col++, p += k + 1) + *p = 1; + free(tmp_m); + *result = parms; + return 0; +} + +/** + * Compute one encoded slice of the given input. + * + * \param parms The fec parameters returned earlier by fec_new(). + * \param src The \a k data slices to encode. + * \param dst Result pointer. + * \param idx The index of the slice to compute. + * \param sz The size of the input data packets. + * + * Encode the \a k slices of size \a sz given by \a src and store the output + * slice number \a idx in \a dst. + */ +void fec_encode(struct fec_parms *parms, const unsigned char * const *src, + unsigned char *dst, int idx, int sz) +{ + int i, k = parms->k; + unsigned char *p; + + assert(idx <= parms->n); + + if (idx < k) { + memcpy(dst, src[idx], sz); + return; + } + p = &(parms->enc_matrix[idx * k]); + memset(dst, 0, sz); + for (i = 0; i < k; i++) + addmul(dst, src[i], p[i], sz); +} + +/* Move src packets in their position. */ +static int shuffle(unsigned char **data, int *idx, int k) +{ + int i; + + for (i = 0; i < k;) { + if (idx[i] >= k || idx[i] == i) + i++; + else { /* put index and data at the right position */ + int c = idx[i]; + + if (idx[c] == c) /* conflict */ + return -E_FEC_BAD_IDX; + FEC_SWAP(idx[i], idx[c]); + FEC_SWAP(data[i], data[c]); + } + } + return 0; +} + +/* + * Construct the decoding matrix given the indices. The encoding matrix must + * already be allocated. + */ +static int build_decode_matrix(struct fec_parms *parms, int *idx, + unsigned char **result) +{ + int ret = -E_FEC_BAD_IDX, i, k = parms->k; + unsigned char *p, *matrix = alloc_matrix(k, k); + + for (i = 0, p = matrix; i < k; i++, p += k) { + if (idx[i] >= parms->n) /* invalid index */ + goto err; + if (idx[i] < k) { + memset(p, 0, k); + p[i] = 1; + } else + memcpy(p, &(parms->enc_matrix[idx[i] * k]), k); + } + ret = invert_mat(matrix, k); + if (ret < 0) + goto err; + *result = matrix; + return 0; +err: + free(matrix); + *result = NULL; + return ret; +} + +/** + * Decode one slice from the group of received slices. + * + * \param parms Pointer to fec params structure. + * \param data Pointers to received packets. + * \param idx Pointer to packet indices (gets modified). + * \param sz Size of each packet. + * + * \return Zero on success, -1 on errors. + * + * The \a data vector of received slices and the indices of slices are used to + * produce the correct output slice. The data slices are modified in-place. + */ +int fec_decode(struct fec_parms *parms, unsigned char **data, int *idx, + int sz) +{ + unsigned char *m_dec, **slice; + int ret, row, col, k = parms->k; + + ret = shuffle(data, idx, k); + if (ret < 0) + return ret; + ret = build_decode_matrix(parms, idx, &m_dec); + if (ret < 0) + return ret; + /* do the actual decoding */ + slice = para_malloc(k * sizeof(unsigned char *)); + for (row = 0; row < k; row++) { + if (idx[row] >= k) { + slice[row] = para_calloc(sz); + for (col = 0; col < k; col++) + addmul(slice[row], data[col], + m_dec[row * k + col], sz); + } + } + /* move slices to their final destination */ + for (row = 0; row < k; row++) { + if (idx[row] >= k) { + memcpy(data[row], slice[row], sz); + free(slice[row]); + } + } + free(slice); + free(m_dec); + return 0; +} diff --git a/fec.h b/fec.h new file mode 100644 index 00000000..241cf9a4 --- /dev/null +++ b/fec.h @@ -0,0 +1,47 @@ +/* + * fec.c -- forward error correction based on Vandermonde matrices + * 980614 + * (C) 1997-98 Luigi Rizzo (luigi@iet.unipi.it) + * + * Portions derived from code by Phil Karn (karn@ka9q.ampr.org), + * Robert Morelos-Zaragoza (robert@spectra.eng.hawaii.edu) and Hari + * Thirumoorthy (harit@spectra.eng.hawaii.edu), Aug 1995 + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A + * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS + * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, + * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR + * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT + * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + */ + +#define FEC_MAGIC 0xFECC0DEC +#define FEC_HEADER_SIZE 32 + +struct fec_parms; + +int fec_new(int k, int n, struct fec_parms **parms); +void fec_free(struct fec_parms *p); +void fec_encode(struct fec_parms *parms, const unsigned char * const *src, + unsigned char *dst, int idx, int sz); +int fec_decode(struct fec_parms *parms, unsigned char **data, int *idx, + int sz); + + diff --git a/fecdec_filter.c b/fecdec_filter.c new file mode 100644 index 00000000..376fe6b3 --- /dev/null +++ b/fecdec_filter.c @@ -0,0 +1,358 @@ +/* + * Copyright (C) 2009 Andre Noll + * + * Licensed under the GPL v2. For licencing details see COPYING. + */ + +/** \file fecdec_filter.c A filter fec-decodes an audio stream. */ + +#include +#include "para.h" +#include "error.h" +#include "list.h" +#include "sched.h" +#include "ggo.h" +#include "filter.h" +#include "string.h" +#include "portable_io.h" +#include "fec.h" +#include "fd.h" + +/** + * How many FEC groups to store in memory. + * + * Packet reordering requires to keep more than one FEC group in memory because + * slices belonging to the next FEC group may arrive before the current FEC group + * is complete. + */ +#define NUM_FEC_GROUPS 3 + +/** Size of the output buffer of the fecdec filter. */ +#define FECDEC_OUTBUF_SIZE (128 * 1024) + +/** Data read from the header of a slice. */ +struct fec_header { + /** Total number of slices in this group. */ + uint8_t slices_per_group; + /** Number of slices needed to start decoding. */ + uint8_t data_slices_per_group; + /** Size of the ogg vorbis header (zero for mp3, aac). */ + uint32_t audio_header_size; + /** Number of the FEC group this slice belongs to. */ + uint32_t group_num; + /** Size of the data in this FEC group. */ + uint32_t group_bytes; + /** Number of this slice in the group. */ + uint8_t slice_num; + /** Used data bytes of this slice. */ + uint16_t slice_bytes; +}; + +/** + * The status of one partially received FEC group. + */ +struct fecdec_group { + /** The header read from the last slice. */ + struct fec_header h; + /** How many slices received so far. */ + int num_received_slices; + /** The size of the \a idx and the \a data arrays below. */ + int num_slices; + /** Array of indices of the received slices. */ + int *idx; + /** Content of the received slices. */ + unsigned char **data; +}; + +/** + * Data private to the fecdec filter. + */ +struct private_fecdec_data { + /** Used by the fec core code. */ + struct fec_parms *fec; + /** Keeps track of what was received so far. */ + struct fecdec_group groups[NUM_FEC_GROUPS]; +}; + +/** Iterate over all fecdec groups. */ +#define FOR_EACH_FECDEC_GROUP(g, d) for (g = (d)->groups; \ + (g) - (d)->groups < NUM_FEC_GROUPS; (g)++) + +static int group_complete(struct fecdec_group *fg) +{ + return fg->num_received_slices >= fg->h.data_slices_per_group; +} + +static int group_empty(struct fecdec_group *fg) +{ + return fg->num_received_slices == 0; +} + +static void clear_group(struct fecdec_group *fg) +{ + int i; + + for (i = 0; i < fg->num_slices; i++) { + free(fg->data[i]); + fg->data[i] = NULL; + fg->idx[i] = -1; + } + free(fg->data); + free(fg->idx); + fg->num_slices = 0; + memset(&fg->h, 0, sizeof(struct fec_header)); + fg->num_received_slices = 0; +} + +static int find_group(struct fec_header *h, + struct private_fecdec_data *pfd, struct fecdec_group **result) +{ + struct fecdec_group *fg; + + FOR_EACH_FECDEC_GROUP(fg, pfd) { + if (fg->h.group_num != h->group_num) + continue; + if (fg->h.slices_per_group != h->slices_per_group) + continue; + if (fg->h.data_slices_per_group != h->data_slices_per_group) + continue; + *result = fg; + return 1; + } + return 0; +} + +static struct fecdec_group *find_unused_group(struct private_fecdec_data *pfd) +{ + struct fecdec_group *fg; + + FOR_EACH_FECDEC_GROUP(fg, pfd) { + if (fg->num_received_slices == 0) + return fg; + } + return NULL; +} + +static struct fecdec_group *try_to_free_group(struct private_fecdec_data *pfd) +{ + struct fecdec_group *fg; + + FOR_EACH_FECDEC_GROUP(fg, pfd) { + if (!group_complete(fg)) + continue; + clear_group(fg); + return fg; + } + return NULL; +} + +static struct fecdec_group *free_oldest_group(struct private_fecdec_data *pfd) +{ + struct fecdec_group *fg, *oldest = NULL; + + FOR_EACH_FECDEC_GROUP(fg, pfd) { + if (!oldest || oldest->h.group_num > fg->h.group_num) + oldest = fg; + } + if (!group_complete(oldest) && !group_empty(oldest)) + PARA_WARNING_LOG("Clearing incomplete group %d " + "(contains %d slices)\n", fg->h.group_num, + fg->num_received_slices); + clear_group(oldest); + return oldest; +} + +/* returns 1 if the group was found, 0 if not, negative on errors */ +static int get_group(struct fec_header *h, struct private_fecdec_data *pfd, + struct fecdec_group **result) +{ + struct fecdec_group *fg; + int ret = find_group(h, pfd, &fg); + + if (ret < 0) + return ret; + if (ret > 0) /* found group */ + goto success; + /* group not found */ + fg = find_unused_group(pfd); + if (fg) + goto success; + fg = try_to_free_group(pfd); + if (fg) + goto success; + fg = free_oldest_group(pfd); + ret = 0; +success: + fg->h = *h; + *result = fg; + return ret; +} + +/* + * returns 1 if slice was added, zero otherwise (because the group was already + * complete). In any case the number of received slices is being increased by + * one. + */ +static int add_slice(char *buf, struct fecdec_group *fg) +{ + int r, slice_num; + + if (group_complete(fg)) { + PARA_DEBUG_LOG("group complete, ignoring slice %d\n", + fg->h.slice_num); + fg->num_received_slices++; + return 0; + } + slice_num = fg->h.slice_num; + if (fg->num_slices == 0) { + fg->num_slices = fg->h.slices_per_group; + fg->idx = malloc(fg->num_slices * sizeof(int)); + fg->data = malloc(fg->num_slices * sizeof(unsigned char *)); + memset(fg->data, 0, fg->num_slices * sizeof(unsigned char *)); + } + r = fg->num_received_slices; + fg->idx[r] = slice_num; + fg->data[r] = malloc(fg->h.slice_bytes); + memcpy(fg->data[r], buf, fg->h.slice_bytes); + fg->num_received_slices++; + return 1; +} + +static int decode_group(struct fecdec_group *fg, struct filter_node *fn) +{ + int i, ret, sb = fg->h.slice_bytes; + size_t written = 0; + struct private_fecdec_data *pfd = fn->private_data; + + ret = fec_decode(pfd->fec, fg->data, fg->idx, sb); + if (ret < 0) + return ret; + PARA_DEBUG_LOG("writing group %d (%d/%d decoded data bytes)\n", + fg->h.group_num, fg->h.group_bytes, + fg->h.data_slices_per_group * sb); + for (i = 0; i < fg->h.data_slices_per_group; i++) { + size_t n = sb; + if (n + written > fg->h.group_bytes) + n = fg->h.group_bytes - written; + if (fn->loaded + n > fn->bufsize) + return -E_FECDEC_OVERRUN; + memcpy(fn->buf + fn->loaded, fg->data[i], n); + fn->loaded += n; + written += n; + } + return 0; +} + +/** + * Read a fec header from a buffer. + * + * \param buf The buffer to write to. + * \param h The fec header to write. + */ +static int read_fec_header(char *buf, size_t len, struct fec_header *h) +{ + uint32_t magic; + + if (len < FEC_HEADER_SIZE) + return 0; + magic = read_u32(buf); + if (magic != FEC_MAGIC) + return -E_BAD_FEC_HEADER; + h->slices_per_group = read_u8(buf + 4); + h->data_slices_per_group = read_u8(buf + 5); + h->audio_header_size = read_u32(buf + 6); + + h->group_num = read_u32(buf + 10); + h->group_bytes = read_u32(buf + 14); + + h->slice_num = read_u8(buf + 18); + h->slice_bytes = read_u16(buf + 20); + if (!h->group_bytes && & h->slice_bytes) + return -E_FECDEC_EOF; +// PARA_DEBUG_LOG("group %u, slize %u, slices per group: %u\n", +// h->group_num, h->slice_num, h->slices_per_group); + return 1; +} + +/* returns 1 if we used the buffer, 0 if we didn't, negative on errors */ +static int dispatch_slice(char *buf, size_t len, struct fec_header *h, + struct filter_node *fn) +{ + struct fecdec_group *fg; + int ret; + struct private_fecdec_data *pfd = fn->private_data; + + if (h->slice_bytes > len) /* can not use the thing, try to read more */ + return 0; + ret = get_group(h, pfd, &fg); + if (ret < 0) + return ret; + if (!add_slice(buf, fg)) + return 1; + if (group_complete(fg)) { + if (!pfd->fec) { + int k = h->data_slices_per_group, n = h->slices_per_group; + PARA_NOTICE_LOG("init fec (%d, %d)\n", k, n); + ret = fec_new(k, n, &pfd->fec); + if (ret < 0) + return ret; + } + ret = decode_group(fg, fn); + if (ret < 0) + return ret; + } + return 1; +} + +static int fecdec(char *buf, size_t len, struct filter_node *fn) +{ + int ret; + struct fec_header h; + + ret = read_fec_header(buf, len, &h); + if (ret <= 0) + return ret; + if (h.slice_bytes > fn->bufsize) + return -E_BAD_SLICE_SIZE; + if (h.slice_num > h.slices_per_group) + return -E_BAD_SLICE_NUM; + ret = dispatch_slice(buf + FEC_HEADER_SIZE, len - FEC_HEADER_SIZE, + &h, fn); + //PARA_INFO_LOG("ret: %d, len: %d, slice_bytes: %d\n", ret, len, h.slice_bytes); + if (ret <= 0) + return ret; + return FEC_HEADER_SIZE + h.slice_bytes; +} + +static void fecdec_close(struct filter_node *fn) +{ + struct private_fecdec_data *pfd = fn->private_data; + struct fecdec_group *fg; + + FOR_EACH_FECDEC_GROUP(fg, pfd) + clear_group(fg); + free(fn->buf); + fn->buf = NULL; + free(fn->private_data); + fn->private_data = NULL; +} + +static void fecdec_open(struct filter_node *fn) +{ + fn->bufsize = FECDEC_OUTBUF_SIZE; + fn->buf = para_malloc(fn->bufsize); + fn->private_data = para_calloc(sizeof(struct private_fecdec_data)); + fn->loaded = 0; +} + +/** + * The init function of the fecdec filter. + * + * \param f struct to initialize. + */ +void fecdec_filter_init(struct filter *f) +{ + f->convert = fecdec; + f->close = fecdec_close; + f->open = fecdec_open; +} diff --git a/filter_common.c b/filter_common.c index 285efe68..b601c688 100644 --- a/filter_common.c +++ b/filter_common.c @@ -23,9 +23,6 @@ struct filter filters[NUM_SUPPORTED_FILTERS] = {FILTER_ARRAY}; /** * Call the init function of each supported filter. - * - * \param all_filters the array of all supported filters. - * * \sa filter::init */ void filter_init(void) diff --git a/ggo/server.m4 b/ggo/server.m4 index 8aa753b9..f2738d37 100644 --- a/ggo/server.m4 +++ b/ggo/server.m4 @@ -243,16 +243,23 @@ section "udp sender" option "udp_target" - #~~~~~~~~~~~~~~~~~~~~ -"add udp target" -string typestr="a.b.c.d:p" +"add udp target with optional port" +string typestr="host[:port]" optional multiple details=" - Add given host/port to the list of targets. This option - can be given multiple times. Example: '224.0.1.38:1500' - instructs the udp sender to send to udp port 1500 on host - 224.0.1.38 (unassigned ip in the Local Network Control Block - 224.0.0/24). This is useful for multicast streaming. + Add given host/port to the list of targets. The 'host' argument + can be either an IPv4/v6 address or hostname (RFC 3986 syntax). + The 'port' argument is an optional port number. If the 'port' + part is absent, the 'udp_default_port' value is used. + + The following examples are possible targets: + '10.10.1.2:8000' (host:port); '10.10.1.2' (with default port); + '224.0.1.38:1500' (IPv4 multicast); 'localhost:8001' (hostname + with port); '[::1]:8001' (IPv6 localhost); '[badc0de::1]' (IPv6 + host with default port); '[FF00::beef]:1500' (IPv6 multicast). + + This option can be given multiple times, for multiple targets. " option "udp_no_autostart" - diff --git a/gui.c b/gui.c index 0a8f0818..4862df5e 100644 --- a/gui.c +++ b/gui.c @@ -896,7 +896,7 @@ static int open_audiod_pipe(void) * when any key is pressed. * * EXTERNAL_MODE: Check only signal pipe. Used when an external command - * is running. During that thime curses is disabled. Returns when + * is running. During that time curses is disabled. Returns when * cmd_pid == 0. */ static int do_select(int mode) diff --git a/net.c b/net.c index b510dc18..88b3e3cc 100644 --- a/net.c +++ b/net.c @@ -6,6 +6,12 @@ /** \file net.c Networking-related helper functions. */ +/* + * Since glibc 2.8, the _GNU_SOURCE feature test macro must be defined in order + * to obtain the definition of the ucred structure. + */ +#define _GNU_SOURCE + #include /* At least NetBSD needs these. */ @@ -20,6 +26,7 @@ #endif #include +#include #include "para.h" #include "error.h" @@ -85,6 +92,114 @@ void disable_crypt(int fd) crypt_data_array[fd].private_data = NULL; } +/** + * Match string as a candidate IPv4 address. + * + * \param address The string to match. + * \return True if \a address has "dot-quad" format. + */ +static bool is_v4_dot_quad(const char *address) +{ + bool result; + regex_t r; + + assert(!regcomp(&r, "^([0-9]+\\.){3}[0-9]+$", REG_EXTENDED|REG_NOSUB)); + result = regexec(&r, address, 0, NULL, 0) == 0; + regfree(&r); + return result; +} + +/** + * Perform basic syntax checking on the host-part of an URL: + * + * - Since ':' is invalid in IPv4 addresses and DNS names, the + * presence of ':' causes interpretation as IPv6 address; + * - next the first-match-wins algorithm from RFC 3986 is applied; + * - else the string is considered as DNS name, to be resolved later. + * + * \param host The host string to check. + * \return True if \a host passes the syntax checks. + * + * \sa RFC 3986, 3.2.2; RFC 1123, 2.1; RFC 1034, 3.5 + */ +static bool host_string_ok(const char *host) +{ + if (host == NULL || *host == '\0') + return false; + if (strchr(host, ':') != NULL) + return is_valid_ipv6_address(host); + if (is_v4_dot_quad(host)) + return is_valid_ipv4_address(host); + return true; +} + +/** + * Parse and validate URL string. + * + * The URL syntax is loosely based on RFC 3986, supporting one of + * - "["host"]"[:port] for native IPv6 addresses and + * - host[:port] for IPv4 hostnames and DNS names. + * + * Native IPv6 addresses must be enclosed in square brackets, since + * otherwise there is an ambiguity with the port separator `:'. + * The 'port' part is always considered to be a number; if absent, + * it is set to -1, to indicate that a default port is to be used. + * + * The following are valid examples: + * - 10.10.1.1 + * - 10.10.1.2:8000 + * - localhost + * - localhost:8001 + * - [::1]:8000 + * - [badc0de::1] + * + * \param url The URL string to take apart. + * \param host To return the copied host part of \a url. + * \param hostlen The maximum length of \a host. + * \param port To return the port number (if any) of \a url. + * + * \return Pointer to \a host, or NULL if failed. + * If NULL is returned, \a host and \a portnum are undefined. If no + * port number was present in \a url, \a portnum is set to -1. + * + * \sa RFC 3986, 3.2.2/3.2.3 + */ +char *parse_url(const char *url, + char *host, ssize_t hostlen, + int32_t *port) +{ + const char *o = url; + char *c = host, *end = c + (hostlen - 1); + + *port = -1; + + if (o == NULL || hostlen < 1) + goto failed; + + if (*o == '[') { + for (++o; (*c = *o == ']' ? '\0' : *o); c++, o++) + if (c == end) + goto failed; + + if (*o++ != ']' || (*o != '\0' && *o != ':')) + goto failed; + } else { + for (; (*c = *o == ':'? '\0' : *o); c++, o++) + if (c == end) + goto failed; + } + + if (*o == ':') + if (para_atoi32(++o, port) < 0 || + *port < 0 || *port > 0xffff) + goto failed; + + if (host_string_ok(host)) + return host; +failed: + *host = '\0'; + return NULL; +} /** * Determine the socket type for a given layer-4 protocol. diff --git a/net.h b/net.h index 8ec9fa52..11b1708f 100644 --- a/net.h +++ b/net.h @@ -25,6 +25,12 @@ #endif /** \endcond */ + +/** + * Functions to parse and validate (parts of) URLs. + */ +extern char *parse_url(const char *url, + char *host, ssize_t hostlen, int32_t *port); /** * Ensure that string conforms to the IPv4 address format. * @@ -39,6 +45,21 @@ _static_inline_ bool is_valid_ipv4_address(const char *address) return inet_pton(AF_INET, address, &test_it) != 0; } +/** + * Ensure that string conforms to IPv6 address format. + * + * \param address The address string to check. + * + * \return 1 if string has a valid IPv6 address syntax, 0 if not. + * \sa RFC 4291 + */ +_static_inline_ bool is_valid_ipv6_address(const char *address) +{ + struct in6_addr test_it; + + return inet_pton(AF_INET6, address, &test_it) != 0; +} + /** * Generic socket creation (passive and active sockets). */ diff --git a/send.h b/send.h index c1ed00cf..7087c266 100644 --- a/send.h +++ b/send.h @@ -143,3 +143,4 @@ char *generic_sender_help(void); struct sender_client *accept_sender_client(struct sender_status *ss); int send_queued_chunks(int fd, struct chunk_queue *cq, size_t max_bytes_per_write); +int parse_fec_url(const char *arg, struct sender_command_data *scd); diff --git a/send_common.c b/send_common.c index f570273f..448ddfec 100644 --- a/send_common.c +++ b/send_common.c @@ -393,3 +393,76 @@ char *generic_sender_help(void) "example: allow 127.0.0.1 32\n" ); } + +static int parse_fec_parms(const char *arg, struct sender_command_data *scd) +{ + int32_t val; + char *a = para_strdup(arg), *b = a, *e = strchr(b, ':'); + int ret = -E_COMMAND_SYNTAX; + + /* parse max slice bytes */ + if (!e) + goto out; + *e = '\0'; + ret = para_atoi32(b, &val); + if (ret < 0) + goto out; + ret = -ERRNO_TO_PARA_ERROR(EINVAL); + if (val < 0 || val > 65535) + goto out; + scd->max_slice_bytes = val; + /* parse data_slices_per_group */ + b = e + 1; + e = strchr(b, ':'); + ret = -E_COMMAND_SYNTAX; + if (!e) + goto out; + *e = '\0'; + ret = para_atoi32(b, &val); + if (ret < 0) + goto out; + ret = -ERRNO_TO_PARA_ERROR(EINVAL); + if (val < 0 || val > 255) + goto out; + scd->data_slices_per_group = val; + /* parse slices_per_group */ + b = e + 1; + ret = para_atoi32(b, &val); + if (ret < 0) + goto out; + ret = -ERRNO_TO_PARA_ERROR(EINVAL); + if (val < 0 || val < scd->data_slices_per_group) + goto out; + scd->slices_per_group = val; + ret = 0; +out: + free(a); + return ret; +} + +int parse_fec_url(const char *arg, struct sender_command_data *scd) +{ + int ret; + ssize_t len = sizeof(scd->host); + char *a = para_strdup(arg), *p = strchr(a, '/'); + + if (p) { + *p = '\0'; + len = strlen(a); + } + ret = -ERRNO_TO_PARA_ERROR(EINVAL); + if (!parse_url(a, scd->host, len, &scd->port)) + goto out; + if (p) { + ret = parse_fec_parms(p + 1, scd); + goto out; + } + /* use default fec parameters. */ + scd->max_slice_bytes = 1490; + scd->slices_per_group = 16; + scd->data_slices_per_group = 14; + ret = 0; +out: + free(a); + return ret; +} diff --git a/server.h b/server.h index 8d1d6752..cf17c0ef 100644 --- a/server.h +++ b/server.h @@ -6,10 +6,12 @@ /** \file server.h Common server data structures. */ - /** Size of the selector_info and audio_file info strings of struct misc_meta_data. */ #define MMD_INFO_SIZE 16384 +/** The maximum length of the host component in an URL */ +#define MAX_HOSTLEN 256 + /** * Defines one command of para_server. */ @@ -35,13 +37,17 @@ struct sender_command_data{ /** The number of the sender in question. */ int sender_num; /** Used for the allow/deny/add/remove subcommands. */ - struct in_addr addr; - /** Used for the allow/deny/add/remove subcommands. */ - char host[256]; + char host[MAX_HOSTLEN]; /** Used for allow/deny. */ int netmask; /** The port number for add/remove. */ int port; + /** Maximal slice size. */ + uint16_t max_slice_bytes; + /** Number of data slices plus redundant slices. */ + uint8_t slices_per_group; + /** Number of slices minus number of redundant slices. */ + uint8_t data_slices_per_group; }; /** diff --git a/udp_header.h b/udp_header.h deleted file mode 100644 index 7e94b584..00000000 --- a/udp_header.h +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright (C) 2006-2009 Andre Noll - * - * Licensed under the GPL v2. For licencing details see COPYING. - */ - -/** \file udp_header.h some macros used by udp_send.c and udp_recv.c. */ -#include - -/** - * Number of bytes of the paraslash udp header. - * - * The udp sender prepends a header at the beginning of each data chunk. Within - * this header, the type of the current audio stream and the * type of this - * data chunk is coded. - */ -#define UDP_AUDIO_HEADER_LEN 16 - -/** The possible stream types. */ -enum udp_stream_type { - /** Used for mp3 and aac streams. */ - UDP_PLAIN_STREAM, - /** Ogg vorbis streams. */ - UDP_HEADER_STREAM, - /** stream type not yet known. */ - UDP_UNKNOWN_STREAM -}; - -/** The possible packet types. */ -enum udp_audio_packet_type { - /** Beginning of file. */ - UDP_BOF_PACKET, - /** End of file. */ - UDP_EOF_PACKET, - /** Combined header/data packet (ogg only). */ - UDP_HEADER_PACKET, - /** Packet contains only audio file data. */ - UDP_DATA_PACKET, - /** Invalid packet type. */ - UDP_UNKNOWN_PACKET -}; - -/** The contents of an udp audio header. */ -struct udp_audio_header { - /** see \ref udp_stream_type. */ - uint8_t stream_type; - /** see \ref udp_audio_packet_type. */ - uint8_t packet_type; - /** Non-zero only for header packets. */ - uint16_t header_len; - /** Length of header plus audio file data. */ - uint16_t payload_len; -}; - -/** - * Write a struct udp_audio_header to a buffer. - * - * \param buf The buffer to write to. - * \param h The audio header to write. - * - * Used by the udp sender. - * - */ -_static_inline_ void write_udp_audio_header(char *buf, struct udp_audio_header *h) -{ - memcpy(buf, "UDPM", 4); - write_u8(buf + 4, h->stream_type); - write_u8(buf + 5, h->packet_type); - write_u16(buf + 6, h->header_len); - write_u16(buf + 8, h->payload_len); - memset(buf + 10, 0, 6); -} - -/** - * Used by the udp receiver to read a struct udp_audio_header from a buffer. - * - * \param buf The buffer to read from. - * \param len The length of \a buf. - * \param h Result pointer. - * - * \return 1 if \a buf contains a valid udp audio header, -1 else. - */ -_static_inline_ int read_udp_audio_header(char *buf, size_t len, - struct udp_audio_header *h) -{ - if (len < 4) - goto err; - if (memcmp(buf, "UDPM", 4)) - goto err; - h->stream_type = read_u8(buf + 4); - h->packet_type = read_u8(buf + 5); - h->header_len = read_u16(buf + 6); - h->payload_len = read_u16(buf + 8); - return 1; -err: - h->stream_type = UDP_UNKNOWN_STREAM; - h->packet_type = UDP_UNKNOWN_PACKET; - h->header_len = h->payload_len = 0; - return -1; -} diff --git a/udp_recv.c b/udp_recv.c index 9ea35d8d..759caa3d 100644 --- a/udp_recv.c +++ b/udp_recv.c @@ -6,11 +6,11 @@ /** \file udp_recv.c Paraslash's udp receiver */ #include +#include #include "para.h" #include "error.h" #include "portable_io.h" -#include "udp_header.h" #include "list.h" #include "sched.h" #include "ggo.h" @@ -23,30 +23,14 @@ /** The size of the receiver node buffer. */ #define UDP_RECV_CHUNK_SIZE (128 * 1024) - /** * Data specific to the udp receiver. * * \sa \ref receiver, \ref receiver_node. */ struct private_udp_recv_data { - /** - * Whether a header was received. - * - * A flag indicating whether this receiver already received a packet - * which contains the audio file header. - * - * This flag has no effect if the audio stream indicates that no extra - * headers will be sent (mp3, aac). Otherwise, all data packets are - * dropped until the header is received. - */ - int have_header; /** The socket file descriptor. */ int fd; - /** Non-zero on short reads. */ - uint16_t need_more; - /** Copied from the first audio header received. */ - uint16_t stream_type; }; static void udp_recv_pre_select(struct sched *s, struct task *t) @@ -62,53 +46,6 @@ static int enough_space(size_t nbytes, size_t loaded) return nbytes + loaded < UDP_RECV_CHUNK_SIZE; } -/* - * Perform some sanity checks on an udp audio file header. - * - * return: negative on error, 0: discard data, 1: use data - */ -static int examine_audio_header(struct private_udp_recv_data *purd, - struct udp_audio_header *uah, size_t packet_size) -{ - /* payload_len includes header */ - if (uah->payload_len < uah->header_len) - return -E_UDP_BAD_HEADER; - switch (uah->packet_type) { - case UDP_EOF_PACKET: - return -E_RECV_EOF; - case UDP_BOF_PACKET: - purd->have_header = 1; - /* fall through */ - case UDP_DATA_PACKET: - if (uah->header_len) /* header in no-header packet */ - return -E_UDP_BAD_HEADER; - break; - case UDP_HEADER_PACKET: - if (!uah->header_len) /** no header in header packet */ - return -E_UDP_BAD_HEADER; - break; - default: /* bad packet type */ - return -E_UDP_BAD_HEADER; - } - /* check stream type */ - if (uah->stream_type != UDP_PLAIN_STREAM && - uah->stream_type != UDP_HEADER_STREAM) - return -E_UDP_BAD_STREAM_TYPE; - if (purd->stream_type == UDP_UNKNOWN_STREAM) - purd->stream_type = uah->stream_type; - /* stream type must not change */ - if (uah->stream_type != purd->stream_type) - return -E_UDP_BAD_STREAM_TYPE; - if (!purd->have_header && uah->stream_type == UDP_HEADER_STREAM) - /* can't use the data, wait for header packet */ - return 0; - if (packet_size < uah->payload_len + UDP_AUDIO_HEADER_LEN) - /* we read only a part of the package */ - purd->need_more = uah->payload_len - + UDP_AUDIO_HEADER_LEN - packet_size; - return 1; -} - static int add_rn_output(struct receiver_node *rn, char *buf, size_t len) { if (!len) @@ -126,10 +63,7 @@ static void udp_recv_post_select(__a_unused struct sched *s, struct task *t) struct private_udp_recv_data *purd = rn->private_data; int ret; char tmpbuf[UDP_RECV_CHUNK_SIZE]; - uint16_t data_len; - char *data_buf; size_t packet_size; - struct udp_audio_header uah; if (rn->output_error && *rn->output_error < 0) { t->error = *rn->output_error; @@ -148,43 +82,9 @@ static void udp_recv_post_select(__a_unused struct sched *s, struct task *t) if (!ret) return; packet_size = ret; - for (;;) { - uint16_t num; - - if (!purd->need_more) { - ret = read_udp_audio_header(tmpbuf, packet_size, &uah); - if (ret >= 0) - break; - goto success; /* drop data */ - } - num = PARA_MIN(purd->need_more, (uint16_t)packet_size); - assert(num > 0); - t->error = add_rn_output(rn, tmpbuf, num); - if (t->error < 0) - return; - purd->need_more -= num; - if (packet_size <= num) - goto success; - packet_size -= num; - memmove(tmpbuf, tmpbuf + num, packet_size); - } - assert(!purd->need_more); - t->error = examine_audio_header(purd, &uah, packet_size); - if (t->error <= 0) + t->error = add_rn_output(rn, tmpbuf, packet_size); + if (t->error < 0) return; - data_len = uah.payload_len; - data_buf = tmpbuf + UDP_AUDIO_HEADER_LEN; - if (uah.packet_type == UDP_HEADER_PACKET) { - if (purd->have_header) { /* skip header */ - data_buf += uah.header_len; - data_len -= uah.header_len; - } else { /* only use the header */ - purd->have_header = 1; - data_len = uah.header_len; - } - } - t->error = add_rn_output(rn, data_buf, data_len); - return; success: t->error = 1; } @@ -303,7 +203,6 @@ static int udp_recv_open(struct receiver_node *rn) ret = mark_fd_nonblocking(purd->fd); if (ret < 0) goto err; - purd->stream_type = UDP_UNKNOWN_STREAM; PARA_NOTICE_LOG("receiving from %s:%d, fd=%d\n", c->host_arg, c->port_arg, purd->fd); return purd->fd; diff --git a/udp_send.c b/udp_send.c index 140458e7..a21f478d 100644 --- a/udp_send.c +++ b/udp_send.c @@ -9,6 +9,7 @@ #include #include +#include #include "server.cmdline.h" #include "para.h" @@ -21,28 +22,28 @@ #include "list.h" #include "send.h" #include "portable_io.h" -#include "udp_header.h" #include "net.h" #include "fd.h" #include "sched.h" #include "close_on_fork.h" #include "chunk_queue.h" -/** Convert in_addr to ascii. */ -#define TARGET_ADDR(oc) inet_ntoa((oc)->addr) - /** Describes one entry in the list of targets for the udp sender. */ struct udp_target { - /** Address info. */ - struct in_addr addr; /** The position of this target in the list of targets. */ struct list_head node; + /** The hostname (DNS name or IPv4/v6 address string). */ + char host[MAX_HOSTLEN]; /** The UDP port. */ int port; /** The socket fd. */ int fd; /** The list of queued chunks for this fd. */ struct chunk_queue *cq; + /** The opaque structure returned by vss_add_fec_client(). */ + struct fec_client *fc; + /** The FEC parameters for this target. */ + struct fec_client_parms fcp; }; static struct list_head targets; @@ -61,9 +62,10 @@ static void udp_close_target(struct udp_target *ut) static void udp_delete_target(struct udp_target *ut, const char *msg) { - PARA_NOTICE_LOG("deleting %s:%d (%s) from list\n", TARGET_ADDR(ut), + PARA_NOTICE_LOG("deleting %s#%d (%s) from list\n", ut->host, ut->port, msg); udp_close_target(ut); + vss_del_fec_client(ut->fc); list_del(&ut->node); free(ut); } @@ -155,7 +157,7 @@ static int udp_init_session(struct udp_target *ut) if (ut->fd >= 0) /* nothing to do */ return 0; - ret = makesock(AF_UNSPEC, IPPROTO_UDP, 0, TARGET_ADDR(ut), ut->port); + ret = makesock(AF_UNSPEC, IPPROTO_UDP, 0, ut->host, ut->port); if (ret < 0) return ret; ut->fd = ret; @@ -176,121 +178,28 @@ static int udp_init_session(struct udp_target *ut) } add_close_on_fork_list(ut->fd); ut->cq = cq_new(UDP_CQ_BYTES); - PARA_NOTICE_LOG("sending to udp %s:%d\n", TARGET_ADDR(ut), ut->port); + PARA_NOTICE_LOG("sending to udp %s#%d using fec parms %d:%d:%d\n", + ut->host, ut->port , ut->fcp.max_slice_bytes, + ut->fcp.data_slices_per_group, ut->fcp.slices_per_group); return 1; } -static void udp_send_buf(char *buf, size_t len) -{ - struct udp_target *ut, *tmp; - int ret; - - list_for_each_entry_safe(ut, tmp, &targets, node) { - ret = udp_init_session(ut); - if (ret < 0) { - udp_delete_target(ut, para_strerror(-ret)); - continue; - } - ret = send_queued_chunks(ut->fd, ut->cq, 0); - if (ret < 0) { - udp_delete_target(ut, para_strerror(-ret)); - continue; - } - if (!len) - continue; - if (!ret) { /* still data left in the queue */ - ret = cq_enqueue(ut->cq, buf, len); - if (ret < 0) { - udp_delete_target(ut, para_strerror(-ret)); - continue; - } - } - ret = write_nonblock(ut->fd, buf, len, 0); - if (ret < 0) { - udp_delete_target(ut, para_strerror(-ret)); - continue; - } - if (ret != len) { - ret = cq_enqueue(ut->cq, buf + ret, len - ret); - if (ret < 0) { - udp_delete_target(ut, para_strerror(-ret)); - continue; - } - } - } -} - static void udp_shutdown_targets(void) { - char buf[UDP_AUDIO_HEADER_LEN]; struct udp_target *ut, *tmp; - struct udp_audio_header uah = { - .stream_type = UDP_UNKNOWN_STREAM, - .packet_type = UDP_EOF_PACKET, - }; + const char *buf = NULL; + size_t len = 0; /* STFU, gcc */ - write_udp_audio_header(buf, &uah); list_for_each_entry_safe(ut, tmp, &targets, node) { if (ut->fd < 0) continue; - write(ut->fd, buf, UDP_AUDIO_HEADER_LEN); + if (!buf) + len = vss_get_fec_eof_packet(&buf); + write(ut->fd, buf, len); udp_close_target(ut); } } -static int need_extra_header(long unsigned current_chunk) -{ - static struct timeval last_header; - struct timeval diff; - - if (!current_chunk) - return 0; - tv_diff(now, &last_header, &diff); - if (tv2ms(&diff) < conf.udp_header_interval_arg) - return 0; - last_header = *now; - return 1; -} - -static void udp_send(long unsigned current_chunk, __a_unused long unsigned chunks_sent, - const char *buf, size_t len, const char *header_buf, - size_t header_len) -{ - char *sendbuf; - size_t sendbuf_len; - struct timeval *chunk_tv; - struct udp_audio_header uah; - -// PARA_NOTICE_LOG("len: %zd, header_len: %zd\n", len, header_len); - if (sender_status != SENDER_ON) - return; - - /* we might not yet know the chunk time */ - chunk_tv = vss_chunk_time(); - if (!chunk_tv) - return; - if (list_empty(&targets)) - return; - uah.stream_type = header_len? UDP_HEADER_STREAM : UDP_PLAIN_STREAM; - uah.header_len = need_extra_header(current_chunk)? header_len : 0; - if (!current_chunk) - uah.packet_type = UDP_BOF_PACKET; - else if (uah.header_len) - uah.packet_type = UDP_HEADER_PACKET; - else - uah.packet_type = UDP_DATA_PACKET; - uah.payload_len = uah.header_len + len; - sendbuf_len = UDP_AUDIO_HEADER_LEN + uah.payload_len; - sendbuf = para_malloc(sendbuf_len); - write_udp_audio_header(sendbuf, &uah); - if (uah.header_len) - memcpy(sendbuf + UDP_AUDIO_HEADER_LEN, header_buf, - uah.header_len); - memcpy(sendbuf + UDP_AUDIO_HEADER_LEN + uah.header_len, buf, len); - udp_send_buf(sendbuf, sendbuf_len); - free(sendbuf); -} - static int udp_com_on(__a_unused struct sender_command_data *scd) { sender_status = SENDER_ON; @@ -306,33 +215,70 @@ static int udp_com_off(__a_unused struct sender_command_data *scd) static int udp_com_delete(struct sender_command_data *scd) { - char *a = para_strdup(inet_ntoa(scd->addr)); struct udp_target *ut, *tmp; + list_for_each_entry_safe(ut, tmp, &targets, node) { - if (scd->port != ut->port) + /* Unspecified port means wildcard port match */ + if (scd->port > 0 && scd->port != ut->port) continue; - if (strcmp(TARGET_ADDR(ut), a)) + if (strcmp(ut->host, scd->host)) continue; udp_delete_target(ut, "com_delete"); } return 1; } -static void udp_add_target(int port, struct in_addr *addr) +static int udp_send_fec(char *buf, size_t len, void *private_data) +{ + struct udp_target *ut = private_data; + int ret = udp_init_session(ut); + + if (ret < 0) + goto fail; + ret = send_queued_chunks(ut->fd, ut->cq, 0); + if (ret < 0) + goto fail; + if (!len) + return 0; + if (!ret) { /* still data left in the queue */ + ret = cq_enqueue(ut->cq, buf, len); + if (ret < 0) + goto fail; + } + ret = write_nonblock(ut->fd, buf, len, 0); + if (ret < 0) + goto fail; + if (ret != len) { + ret = cq_enqueue(ut->cq, buf + ret, len - ret); + if (ret < 0) + goto fail; + } + return 1; +fail: + udp_delete_target(ut, para_strerror(-ret)); + return ret; +} + +static void udp_add_target(struct sender_command_data *scd) { struct udp_target *ut = para_calloc(sizeof(struct udp_target)); - ut->port = port; - ut->addr = *addr; + + strncpy(ut->host, scd->host, sizeof(ut->host)); + ut->port = scd->port > 0 ? scd->port : conf.udp_default_port_arg; ut->fd = -1; /* not yet connected */ - PARA_INFO_LOG("adding to target list (%s:%d)\n", - TARGET_ADDR(ut), ut->port); + PARA_INFO_LOG("adding to target list (%s#%d)\n", ut->host, ut->port); para_list_add(&ut->node, &targets); + ut->fcp.slices_per_group = scd->slices_per_group; + ut->fcp.data_slices_per_group = scd->data_slices_per_group; + ut->fcp.max_slice_bytes = scd->max_slice_bytes; + ut->fcp.send = udp_send_fec; + ut->fcp.private_data = ut; + vss_add_fec_client(&ut->fcp, &ut->fc); } static int udp_com_add(struct sender_command_data *scd) { - int port = (scd->port > 0)? scd->port : conf.udp_default_port_arg; - udp_add_target(port, &scd->addr); + udp_add_target(scd); return 1; } @@ -342,8 +288,10 @@ static char *udp_info(void) char *ret, *tgts = NULL; list_for_each_entry(ut, &targets, node) { - char *tmp = make_message("%s%s:%d ", tgts? tgts : "", - TARGET_ADDR(ut), ut->port); + bool is_v6 = strchr(ut->host, ':') != NULL; + char *tmp = make_message("%s%s%s%s:%d ", tgts ? : "", + is_v6 ? "[" : "", ut->host, + is_v6 ? "]" : "", ut->port); free(tgts); tgts = tmp; } @@ -362,31 +310,17 @@ static char *udp_info(void) static void udp_init_target_list(void) { + struct sender_command_data scd; int i; INIT_LIST_HEAD(&targets); for (i = 0; i < conf.udp_target_given; i++) { - char *arg = para_strdup(conf.udp_target_arg[i]); - char *p = strchr(arg, ':'); - int port; - struct in_addr addr; - - if (!p) - goto err; - *p = '\0'; - if (!inet_pton(AF_INET, arg, &addr)) - goto err; - port = atoi(++p); - if (port < 0 || port > 65535) - port = conf.udp_default_port_arg; - udp_add_target(port, &addr); - goto success; -err: - PARA_CRIT_LOG("syntax error for udp target option " - "#%d, ignoring\n", i); -success: - free(arg); - continue; + if (parse_fec_url(conf.udp_target_arg[i], &scd) < 0) { + PARA_CRIT_LOG("syntax error for udp target option " + "#%d, ignoring\n", i); + continue; + } + udp_add_target(&scd); } } @@ -394,8 +328,16 @@ static char *udp_help(void) { return make_message( "usage: {on|off}\n" - "usage: {add|delete} IP port\n" - "example: add 224.0.1.38 8000 (multicast streaming)\n" + "usage: {add|delete} host[:port][/packet_size:k:n]\n" + "examples: add 224.0.1.38:1500 (IPv4 multicast)\n" + " add 224.0.1.38:1500/1400:14:16\n" + " (likewise, using 1400 byte packets, with 14\n" + " data slices per 16 slice FEC group)\n" + " add 10.10.1.42 (using default port)\n" + " add [FF05::42]:1500 (IPv6 multicast)\n" + " add [::1] (IPv6 localhost/default port)\n" + " delete myhost.net (host with port wildcard)\n" + " delete [badc0de::1] (IPv6 with port wildcard)\n" ); } @@ -411,7 +353,7 @@ void udp_send_init(struct sender *s) INIT_LIST_HEAD(&targets); s->info = udp_info; s->help = udp_help; - s->send = udp_send; + s->send = NULL; s->pre_select = NULL; s->post_select = NULL; s->shutdown_clients = udp_shutdown_targets; diff --git a/vss.c b/vss.c index d2053493..dc598936 100644 --- a/vss.c +++ b/vss.c @@ -15,14 +15,16 @@ #include "para.h" #include "error.h" +#include "portable_io.h" +#include "fec.h" #include "string.h" #include "afh.h" #include "afs.h" #include "server.h" #include "net.h" #include "server.cmdline.h" -#include "vss.h" #include "list.h" +#include "vss.h" #include "send.h" #include "ipc.h" #include "fd.h" @@ -87,6 +89,317 @@ struct vss_task { size_t header_len; }; +/** + * The list of currently connected fec clients. + * + * Senders may use \ref vss_add_fec_client() to add entries to the list. + */ +static struct list_head fec_client_list; + +/** + * Describes one slice of a FEC group. + * + * FEC slices directly correspond to the data packages sent by the paraslash + * senders that use FEC. Each slice is identified by its group number and its + * number within the group. All slices have the same size, but the last slice + * of the group may not be filled entirely. + */ +struct fec_slice { + /** The slice number within the FEC group. */ + uint8_t num; + /** The number of used bytes in this slice. */ + uint16_t bytes; +}; + +/** + * Data associated with one FEC group. + * + * A FEC group consists of a fixed number of slices and this number is given by + * the \a slices_per_group parameter of struct \ref fec_client_parms. Each FEC + * group contains a number of chunks of the current audio file. + */ +struct fec_group { + /** The number of the FEC group. */ + uint32_t num; + /** Number of bytes in this group. */ + uint32_t bytes; + /** The first chunk of the current audio file belonging to the group. */ + uint32_t first_chunk; + /** The number of chunks contained in this group. */ + uint32_t num_chunks; + /** The time needed to play all chunks of the group. */ + struct timeval duration; + /** When the first chunk was sent. */ + struct timeval start; + /** \a The group duration divided by \a slices_per_group. */ + struct timeval slice_duration; +}; + +/** + * Describes one connected FEC client. + */ +struct fec_client { + /** Parameters requested by the client. */ + struct fec_client_parms *fcp; + /** Used by the core FEC code. */ + struct fec_parms *parms; + /** The position of this client in the fec client list. */ + struct list_head node; + /** When the first slice for this client was sent. */ + struct timeval stream_start; + /** The first chunk sent to this FEC client. */ + int first_stream_chunk; + /** Describes the current group. */ + struct fec_group group; + /** Describes the current slice. */ + struct fec_slice slice; + /** The data to be FEC-encoded (point to a region within the mapped audio file). */ + const unsigned char **src_data; + /** Used for the last source pointer of the last group. */ + unsigned char *extra_src_buf; + /** The size of the buffer for the extra source pointer. */ + size_t extra_src_buf_size; + /** Contains FEC-encoded data. */ + unsigned char *enc_buf; + /** Size of \a enc_buf. */ + size_t enc_buf_size; +}; + +/** + * Get the chunk time of the current audio file. + * + * \return A pointer to a struct containing the chunk time, or NULL, + * if currently no audio file is selected. + */ +struct timeval *vss_chunk_time(void) +{ + if (mmd->afd.afhi.chunk_tv.tv_sec == 0 && + mmd->afd.afhi.chunk_tv.tv_usec == 0) + return NULL; + return &mmd->afd.afhi.chunk_tv; +} + +/** + * Write a fec header to a buffer. + * + * \param buf The buffer to write to. + * \param h The fec header to write. + */ +static void write_fec_header(struct fec_client *fc) +{ + char *buf = (char *)fc->enc_buf; + + write_u32(buf, FEC_MAGIC); + + write_u8(buf + 4, fc->fcp->slices_per_group); + write_u8(buf + 5, fc->fcp->data_slices_per_group); + write_u32(buf + 6, (uint32_t)0); /* audio header len */ + + write_u32(buf + 10, fc->group.num); + write_u32(buf + 14, fc->group.bytes); + + write_u8(buf + 18, fc->slice.num); + write_u16(buf + 20, fc->slice.bytes); + memset(buf + 22, 0, 11); +} + +static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst) +{ + uint32_t max_group_size, last_payload_size; + int i, k = fc->fcp->data_slices_per_group; + size_t len; + const char *start_buf; + struct timeval tmp, *chunk_tv = vss_chunk_time(); + + assert(chunk_tv); + if (fc->first_stream_chunk < 0) { + fc->stream_start = *now; + fc->first_stream_chunk = mmd->current_chunk; + fc->group.first_chunk = mmd->current_chunk; + fc->group.num = 0; + } else { + fc->group.first_chunk += fc->group.num_chunks; + fc->group.num++; + } + if (fc->group.first_chunk >= mmd->afd.afhi.chunks_total) + return 0; + max_group_size = (fc->fcp->max_slice_bytes - FEC_HEADER_SIZE) * k; + afh_get_chunk(fc->group.first_chunk, &mmd->afd.afhi, vsst->map, + &start_buf, &len); + for (i = fc->group.first_chunk, fc->group.bytes = 0; + i < mmd->afd.afhi.chunks_total; i++) { + const char *buf; + + afh_get_chunk(i, &mmd->afd.afhi, vsst->map, &buf, &len); + if (fc->group.bytes + len > max_group_size) + break; + fc->group.bytes += len; + } + fc->group.num_chunks = i - fc->group.first_chunk; + fc->slice.num = 0; + fc->slice.bytes = ROUND_UP(fc->group.bytes, k) / k; + + /* The last slice will not be fully used */ + last_payload_size = fc->group.bytes % fc->slice.bytes; + if (!last_payload_size) + last_payload_size = fc->slice.bytes; + + tv_scale(fc->group.first_chunk - fc->first_stream_chunk, chunk_tv, + &tmp); + tv_add(&fc->stream_start, &tmp, &fc->group.start); + if (fc->group.num) /* quick hack to avoid buffer underruns */ + fc->group.start.tv_sec--; + tv_scale(fc->group.num_chunks, chunk_tv, &fc->group.duration); + tv_divide(fc->fcp->slices_per_group, &fc->group.duration, + &fc->group.slice_duration); + + for (i = 0; i < k; i++) + fc->src_data[i] = (const unsigned char *)start_buf + + i * fc->slice.bytes; + + if (start_buf + k * fc->slice.bytes > vsst->map + mmd->size) { + /* can not use last slice as it goes beyond the map */ + if (fc->extra_src_buf_size < fc->slice.bytes) { + fc->extra_src_buf = para_realloc(fc->extra_src_buf, fc->slice.bytes); + fc->extra_src_buf_size = fc->slice.bytes; + } + memcpy(fc->extra_src_buf, start_buf + (k - 1) * fc->slice.bytes, + last_payload_size); + memset(fc->extra_src_buf + last_payload_size, 0, + fc->slice.bytes - last_payload_size); + fc->src_data[k - 1] = fc->extra_src_buf; + } + if (fc->enc_buf_size < fc->slice.bytes + FEC_HEADER_SIZE) { + fc->enc_buf_size = fc->slice.bytes + FEC_HEADER_SIZE; + fc->enc_buf = para_realloc(fc->enc_buf, fc->enc_buf_size); + } + PARA_INFO_LOG("FEC group %d: %d chunks (%d - %d), duration: %lums\n", + fc->group.num, fc->group.num_chunks, fc->group.first_chunk, + fc->group.first_chunk + fc->group.num_chunks - 1, + tv2ms(&fc->group.duration)); + return 1; +} + +static int compute_next_fec_slice(struct fec_client *fc, struct vss_task *vsst) +{ + if (fc->first_stream_chunk < 0 || fc->slice.num + == fc->fcp->slices_per_group) { + if (!setup_next_fec_group(fc, vsst)) + return 0; + } + write_fec_header(fc); + fec_encode(fc->parms, fc->src_data, fc->enc_buf + FEC_HEADER_SIZE, + fc->slice.num, fc->slice.bytes); + return 1; +} + +/** + * Return a buffer that marks the end of the stream. + * + * \param buf Result pointer. + * \return The length of the eof buffer. + * + * This is used for (multicast) udp streaming where closing the socket on the + * sender might not give rise to an eof condition at the peer. + */ +size_t vss_get_fec_eof_packet(const char **buf) +{ + static const char fec_eof_packet[FEC_HEADER_SIZE] = + "\xec\x0d\xcc\xfe\0\0\0\0" + "\0\0\0\0\0\0\0\0" + "\0\0\0\0\0\0\0\0" + "\0\0\0\0\0\0\0\0"; + *buf = fec_eof_packet; + return FEC_HEADER_SIZE; +} + +/** + * Add one entry to the list of active fec clients. + * + * \param fcp Describes the fec parameters to be used for this client. + * \param result An opaque pointer that must be used by remove the client later. + * + * \return Standard. + */ +int vss_add_fec_client(struct fec_client_parms *fcp, struct fec_client **result) +{ + int ret; + struct fec_client *fc; + + if (fcp->max_slice_bytes < FEC_HEADER_SIZE + fcp->data_slices_per_group) + return -ERRNO_TO_PARA_ERROR(EINVAL); + fc = para_calloc(sizeof(*fc)); + fc->fcp = fcp; + ret = fec_new(fcp->data_slices_per_group, fcp->slices_per_group, + &fc->parms); + if (ret < 0) + goto err; + fc->first_stream_chunk = -1; /* stream not yet started */ + fc->src_data = para_malloc(fc->fcp->slices_per_group * sizeof(char *)); + para_list_add(&fc->node, &fec_client_list); + *result = fc; + return 1; +err: + fec_free(fc->parms); + free(fc); + *result = NULL; + return ret; +} + +/** + * Remove one entry from the list of active fec clients. + * + * \param fc The client to be removed. + */ +void vss_del_fec_client(struct fec_client *fc) +{ + list_del(&fc->node); + free(fc->src_data); + free(fc->enc_buf); + free(fc->extra_src_buf); + fec_free(fc->parms); + free(fc); +} + +/* + * Compute if/when next slice is due. If it isn't due yet and \a diff is + * not \p Null, compute the time difference next - now, where + * + * next = stream_start + (first_group_chunk - first_stream_chunk) + * * chunk_time + slice_num * slice_time + */ +static int next_slice_is_due(struct fec_client *fc, struct timeval *diff) +{ + struct timeval tmp, next; + int ret; + + if (fc->first_stream_chunk < 0) + return 1; + tv_scale(fc->slice.num, &fc->group.slice_duration, &tmp); + tv_add(&tmp, &fc->group.start, &next); + ret = tv_diff(&next, now, diff); + return ret < 0? 1 : 0; +} + +static void compute_slice_timeout(struct timeval *timeout) +{ + struct fec_client *fc; + + assert(vss_playing()); + list_for_each_entry(fc, &fec_client_list, node) { + struct timeval diff; + + if (next_slice_is_due(fc, &diff)) { + timeout->tv_sec = 0; + timeout->tv_usec = 0; + return; + } + /* timeout = min(timeout, diff) */ + if (tv_diff(&diff, timeout, NULL) < 0) + *timeout = diff; + } +} + /** * Check if vss status flag \a P (playing) is set. * @@ -184,16 +497,20 @@ static struct timeval *vss_compute_timeout(struct vss_task *vsst) return NULL; compute_chunk_time(mmd->chunks_sent, &mmd->afd.afhi.chunk_tv, &mmd->stream_start, &next_chunk); - if (chk_barrier("chunk", &next_chunk, &the_timeout, 0) < 0) + if (chk_barrier("chunk", &next_chunk, &the_timeout, 0) >= 0) { + /* chunk is due or bof */ + the_timeout.tv_sec = 0; + the_timeout.tv_usec = 0; return &the_timeout; - /* chunk is due or bof */ - the_timeout.tv_sec = 0; - the_timeout.tv_usec = 0; + } + /* compute min of current timeout and next slice time */ + compute_slice_timeout(&the_timeout); return &the_timeout; } static void vss_eof(struct vss_task *vsst) { + mmd->stream_start = *now; if (!vsst->map) return; @@ -230,20 +547,6 @@ const char *supported_audio_formats(void) return SUPPORTED_AUDIO_FORMATS; } -/** - * Get the chunk time of the current audio file. - * - * \return A pointer to a struct containing the chunk time, or NULL, - * if currently no audio file is selected. - */ -struct timeval *vss_chunk_time(void) -{ - if (mmd->afd.afhi.chunk_tv.tv_sec == 0 && - mmd->afd.afhi.chunk_tv.tv_usec == 0) - return NULL; - return &mmd->afd.afhi.chunk_tv; -} - static int need_to_request_new_audio_file(struct vss_task *vsst) { struct timeval diff; @@ -262,6 +565,8 @@ static int need_to_request_new_audio_file(struct vss_task *vsst) return 1; } + + /** * Compute the timeout for para_server's main select-loop. * @@ -285,9 +590,14 @@ static void vss_pre_select(struct sched *s, struct task *t) struct timeval *tv, diff; struct vss_task *vsst = container_of(t, struct vss_task, task); - if (!vsst->map || vss_next() || vss_paused() || vss_repos()) + if (!vsst->map || vss_next() || vss_paused() || vss_repos()) { + struct fec_client *fc, *tmp; for (i = 0; senders[i].name; i++) - senders[i].shutdown_clients(); + if (senders[i].shutdown_clients) + senders[i].shutdown_clients(); + list_for_each_entry_safe(fc, tmp, &fec_client_list, node) + fc->first_stream_chunk = -1; + } if (vss_next()) vss_eof(vsst); else if (vss_paused()) { @@ -407,25 +717,20 @@ err: /** * Main sending function. * - * This function gets called from para_server as soon as the next chunk of data - * should be pushed out. It obtains a pointer to the data to be sent out as - * well as its length from mmd->afd.afhi. This information is then passed to - * each supported sender's send() function which is supposed to send out the data - * to all connected clients. + * This function gets called from vss_post_select(). It checks whether the next + * chunk of data should be pushed out. It obtains a pointer to the data to be + * sent out as well as its length from mmd->afd.afhi. This information is then + * passed to each supported sender's send() function as well as to the send() + * functions of each registered fec client. */ -static void vss_send_chunk(struct vss_task *vsst) +static void vss_send(struct vss_task *vsst) { int i; struct timeval due; - const char *buf; - size_t len; + struct fec_client *fc, *tmp_fc; if (!vsst->map || !vss_playing()) return; - compute_chunk_time(mmd->chunks_sent, &mmd->afd.afhi.chunk_tv, - &mmd->stream_start, &due); - if (tv_diff(&due, now, NULL) > 0) - return; if (chk_barrier("eof", &vsst->eof_barrier, &due, 1) < 0) return; if (chk_barrier("data send", &vsst->data_send_barrier, @@ -435,10 +740,6 @@ static void vss_send_chunk(struct vss_task *vsst) mmd->new_vss_status_flags |= VSS_NEXT; return; } - /* - * We call the send function also in case of empty chunks as they - * might have still some data queued which can be sent in this case. - */ if (!mmd->chunks_sent) { struct timeval tmp; mmd->stream_start = *now; @@ -446,10 +747,37 @@ static void vss_send_chunk(struct vss_task *vsst) mmd->offset = tv2ms(&tmp); mmd->events++; } - afh_get_chunk(mmd->current_chunk, &mmd->afd.afhi, vsst->map, &buf, &len); - for (i = 0; senders[i].name; i++) - senders[i].send(mmd->current_chunk, mmd->chunks_sent, buf, len, - vsst->header_buf, vsst->header_len); + compute_chunk_time(mmd->chunks_sent, &mmd->afd.afhi.chunk_tv, + &mmd->stream_start, &due); + if (tv_diff(&due, now, NULL) <= 0) { + const char *buf; + size_t len; + /* + * We call the send function also in case of empty chunks as + * they might have still some data queued which can be sent in + * this case. + */ + afh_get_chunk(mmd->current_chunk, &mmd->afd.afhi, vsst->map, + &buf, &len); + for (i = 0; senders[i].name; i++) { + if (!senders[i].send) + continue; + senders[i].send(mmd->current_chunk, mmd->chunks_sent, + buf, len, vsst->header_buf, vsst->header_len); + } + } + list_for_each_entry_safe(fc, tmp_fc, &fec_client_list, node) { + if (!next_slice_is_due(fc, NULL)) + continue; + if (!compute_next_fec_slice(fc, vsst)) + continue; + PARA_DEBUG_LOG("sending %d:%d (%zu bytes)\n", fc->group.num, + fc->slice.num, fc->slice.bytes); + fc->fcp->send((char *)fc->enc_buf, + fc->slice.bytes + FEC_HEADER_SIZE, + fc->fcp->private_data); + fc->slice.num++; + } mmd->new_vss_status_flags |= VSS_PLAYING; mmd->chunks_sent++; mmd->current_chunk++; @@ -484,7 +812,7 @@ static void vss_post_select(struct sched *s, struct task *t) if ((vss_playing() && !(mmd->vss_status_flags & VSS_PLAYING)) || (vss_next() && vss_playing())) tv_add(now, &vsst->announce_tv, &vsst->data_send_barrier); - vss_send_chunk(vsst); + vss_send(vsst); } /** @@ -527,5 +855,6 @@ void init_vss_task(int afs_socket) tv_add(&vsst->autoplay_barrier, &vsst->announce_tv, &vsst->data_send_barrier); } + INIT_LIST_HEAD(&fec_client_list); register_task(&vsst->task); } diff --git a/vss.h b/vss.h index 5c917e6d..9bdc449c 100644 --- a/vss.h +++ b/vss.h @@ -23,5 +23,33 @@ const char *supported_audio_formats(void); #define VSS_REPOS 4 /** Currently playing. */ #define VSS_PLAYING 8 -/** A client requested to change the audio file selector. */ -#define VSS_CHANGE 16 + +/** + * Each paraslash sender may register arbitrary many clients to the virtual + * streaming system, possibly with varying fec parameters. In order to do so, + * it must allocate a \a fec_client_parms structure and pass it to \ref + * add_fec_client. + * + * Clients are automatically removed from that list by the vss if an error + * occurs, or if the sender requests deletion of a client by calling \ref + * vss_del_fec_client(). + */ +struct fec_client; + +/** FEC parameters requested by FEC clients. */ +struct fec_client_parms { + /** Number of data slices plus redundant slices. */ + uint8_t slices_per_group; + /** Number of slices minus number of redundant slices. */ + uint8_t data_slices_per_group; + /** Maximal number of bytes per slice. */ + uint16_t max_slice_bytes; + /** Called by vss.c when the next slice should be sent. */ + int (*send)(char *buf, size_t num_bytes, void *private_data); + /** Passed verbatim to \a send(). */ + void *private_data; +}; + +int vss_add_fec_client(struct fec_client_parms *fcp, struct fec_client **result); +void vss_del_fec_client(struct fec_client *fc); +size_t vss_get_fec_eof_packet(const char **buf);