man_dir := man/man1
m4_ggos := afh audioc audiod client filter gui recv server write
-all_ggos := $(m4_ggos) dccp_recv oggdec_filter alsa_write oss_write fade http_recv \
+all_ggos := $(m4_ggos) dccp_recv alsa_write oss_write fade http_recv \
osx_write udp_recv amp_filter compress_filter file_write \
mp3dec_filter prebuffer_filter
ggo_generated := $(addsuffix .ggo, $(addprefix $(ggo_dir)/,$(m4_ggos)))
-NEWS
-====
-
---------------------------------------------
0.4.3 (to be announced) "imaginary radiation"
---------------------------------------------
- Fix an end-of-file detection bug in the oggdec filter.
- New user manual
- The new nonblock API
+ - FEC support for the DCCP transport (Gerrit Renker).
+ - Both options of the oggdec filter have been removed.
------------------------------------------
0.4.2 (2010-04-23) "associative expansion"
#include "afh.h"
#include "afs.h"
#include "net.h"
-#include "vss.h"
#include "fd.h"
#include "ipc.h"
#include "portable_io.h"
* If udp is used to receive this audiod format, add fecdec as
* the first filter.
*/
- if (strcmp(afi[i].receiver->name, "udp") == 0) {
+ if (strcmp(afi[i].receiver->name, "udp") == 0 ||
+ strcmp(afi[i].receiver->name, "dccp") == 0) {
tmp = para_strdup("fecdec");
add_filter(i, tmp);
free(tmp);
#include "para.h"
#include "list.h"
#include "afh.h"
-#include "vss.h"
#include "string.h"
#include "error.h"
#include "afh.h"
#include "afs.h"
#include "server.h"
-#include "vss.h"
#include "list.h"
#include "send.h"
+#include "vss.h"
#include "rc4.h"
#include "net.h"
#include "daemon.h"
audiod_ldflags="$audiod_ldflags $oggvorbis_libs -lvorbis -lvorbisfile"
afh_ldflags="$afh_ldflags $oggvorbis_libs -logg -lvorbis -lvorbisfile"
- filter_cmdline_objs="$filter_cmdline_objs add_cmdline(oggdec_filter)"
- audiod_cmdline_objs="$audiod_cmdline_objs add_cmdline(oggdec_filter)"
-
server_errlist_objs="$server_errlist_objs ogg_afh"
filter_errlist_objs="$filter_errlist_objs oggdec_filter"
audiod_errlist_objs="$audiod_errlist_objs oggdec_filter"
#include "server.h"
#include "net.h"
#include "list.h"
-#include "vss.h"
#include "send.h"
+#include "vss.h"
#include "fd.h"
#include "close_on_fork.h"
#include "chunk_queue.h"
#include "server.cmdline.h"
#include "acl.h"
-/** Do not write more than that many bytes at once. */
-#define DCCP_MAX_BYTES_PER_WRITE 1024
-
static struct sender_status dccp_sender_status, *dss = &dccp_sender_status;
+struct dccp_fec_client {
+ struct fec_client_parms fcp;
+ struct fec_client *fc;
+};
+
static void dccp_pre_select(int *max_fileno, fd_set *rfds,
__a_unused fd_set *wfds)
{
return tx_ccid;
}
+static void dccp_shutdown_client(struct sender_client *sc)
+{
+ struct dccp_fec_client *dfc = sc->private_data;
+
+ vss_del_fec_client(dfc->fc);
+ shutdown_client(sc, dss);
+}
+
+static void dccp_shutdown_clients(void)
+{
+ struct sender_client *sc, *tmp;
+
+ list_for_each_entry_safe(sc, tmp, &dss->client_list, node)
+ dccp_shutdown_client(sc);
+}
+
+/** * Obtain current MPS according to RFC 4340, sec. 14. */
+static int dccp_init_fec(struct sender_client *sc)
+{
+ int mps, ret;
+ socklen_t ml = sizeof(mps);
+
+ /* If call fails, return some sensible minimum value */
+ ret = getsockopt(sc->fd, SOL_DCCP, DCCP_SOCKOPT_GET_CUR_MPS, &mps, &ml);
+ if (ret < 0) {
+ PARA_NOTICE_LOG("can not determine MPS: %s\n", strerror(errno));
+ mps = generic_max_transport_msg_size(sc->fd) - DCCP_MAX_HEADER;
+ }
+ PARA_INFO_LOG("current MPS = %d bytes\n", mps);
+ assert(mps > 0);
+ if (conf.dccp_max_slice_size_arg > 0)
+ mps = PARA_MIN(mps, conf.dccp_max_slice_size_arg);
+ return mps;
+}
+
+static int dccp_send_fec(struct sender_client *sc, char *buf, size_t len)
+{
+ int ret = write_nonblock(sc->fd, buf, len);
+
+ if (ret < 0)
+ dccp_shutdown_client(sc);
+ return ret;
+}
+
static void dccp_post_select(fd_set *rfds, __a_unused fd_set *wfds)
{
struct sender_client *sc;
+ struct dccp_fec_client *dfc;
int tx_ccid;
sc = accept_sender_client(dss, rfds);
* from the client; by shutting down this unused communication path we can
* reduce processing costs a bit. See analogous comment in dccp_recv.c.
*/
- if (shutdown(sc->fd, SHUT_RD) >= 0)
+ if (shutdown(sc->fd, SHUT_RD) < 0) {
+ PARA_WARNING_LOG("%s\n", strerror(errno));
+ shutdown_client(sc, dss);
return;
- PARA_WARNING_LOG("%s\n", strerror(errno));
- shutdown_client(sc, dss);
-}
-
-static void dccp_send(long unsigned current_chunk,
- __a_unused long unsigned chunks_sent, const char *buf,
- size_t len, const char *header_buf, size_t header_len)
-{
- struct sender_client *sc, *tmp;
-
- list_for_each_entry_safe(sc, tmp, &dss->client_list, node)
- send_chunk(sc, dss, DCCP_MAX_BYTES_PER_WRITE, current_chunk, buf,
- len, header_buf, header_len);
-}
-
-static void dccp_shutdown_clients(void)
-{
- shutdown_clients(dss);
+ }
+ dfc = para_calloc(sizeof(*dfc));
+ sc->private_data = dfc;
+ dfc->fcp.data_slices_per_group = conf.dccp_data_slices_per_group_arg;
+ dfc->fcp.slices_per_group = conf.dccp_slices_per_group_arg;
+ dfc->fcp.init_fec = dccp_init_fec;
+ dfc->fcp.send_fec = dccp_send_fec;
+ dfc->fc = vss_add_fec_client(sc, &dfc->fcp);
}
static int dccp_com_on(__a_unused struct sender_command_data *scd)
*/
void dccp_send_init(struct sender *s)
{
- int ret;
+ int ret, k, n;
s->info = dccp_info;
- s->send = dccp_send;
+ s->send = NULL;
s->pre_select = dccp_pre_select;
s->post_select = dccp_post_select;
s->shutdown_clients = dccp_shutdown_clients;
s->client_cmds[SENDER_ADD] = NULL;
s->client_cmds[SENDER_DELETE] = NULL;
+ k = conf.dccp_data_slices_per_group_arg;
+ n = conf.dccp_slices_per_group_arg;
+
+ if (k <= 0 || n <= 0 || k >= n) {
+ PARA_WARNING_LOG("invalid FEC parameters, using defaults\n");
+ conf.dccp_data_slices_per_group_arg = 3;
+ conf.dccp_slices_per_group_arg = 4;
+ }
+
init_sender_status(dss, conf.dccp_access_arg, conf.dccp_access_given,
conf.dccp_port_arg, conf.dccp_max_clients_arg,
conf.dccp_default_deny_given);
* \param fd The file descriptor.
* \param buf the buffer to write.
* \param len the number of bytes of \a buf.
- * \param max_bytes_per_write Do not write more than that many bytes at once.
- *
- * If \a max_bytes_per_write is non-zero, do not send more than that many bytes
- * per write().
*
* EAGAIN is not considered an error condition. For example CCID3 has a
* sending wait queue which fills up and is emptied asynchronously. The EAGAIN
*
* \return Negative on errors, number of bytes written else.
*/
-int write_nonblock(int fd, const char *buf, size_t len,
- size_t max_bytes_per_write)
+int write_nonblock(int fd, const char *buf, size_t len)
{
size_t written = 0;
int ret = 0;
while (written < len) {
size_t num = len - written;
- if (max_bytes_per_write && max_bytes_per_write < num)
- num = max_bytes_per_write;
ret = write(fd, buf + written, num);
if (ret < 0 && errno == EAGAIN)
return written;
size_t *num_bytes);
int read_nonblock(int fd, void *buf, size_t sz, fd_set *rfds, size_t *num_bytes);
int read_pattern(int fd, const char *pattern, size_t bufsize, fd_set *rfds);
-int write_nonblock(int fd, const char *buf, size_t len,
- size_t max_bytes_per_write);
+int write_nonblock(int fd, const char *buf, size_t len);
int for_each_file_in_dir(const char *dirname,
int (*func)(const char *, void *), void *private_data);
+++ /dev/null
-option "bufsize" b
-#~~~~~~~~~~~~~~~~~
-"size of output buffer"
-int typestr="kilobyte"
-default="128"
-optional
-details="
- Increase this if you encounter output buffer overrun errors. Smaller
- values make the oggdec filter use less memory.
-"
-
-option "initial_buffer" i
-#~~~~~~~~~~~~~~~~~~~~~~~~
-"size of initial input buffer"
-int typestr="kilobyte"
-default="16"
-optional
-details="
- On startup, defer decoding until that many kilobytes are
- available in the input buffer.
-"
See http_max_clients for details.
"
+option "dccp_max_slice_size" -
+#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+"Upper bound for the FEC slice size"
+int typestr = "size"
+optional
+default = "0"
+details = "
+ If this value is non-positive (the default) the dccp sender
+ uses the maximum packet size (MPS) of the connection as the
+ slice size. The MPS is a network parameter and depends on
+ the path maximum transmission unit (path MTU) of an incoming
+ connection, i.e. on the largest packet size that can be
+ transmitted without causing fragmentation.
+
+ This option allows to use a value less than the MPS in order
+ to fine-tune application performance. Values greater than
+ the MPS of an incoming connection can not be set.
+"
+
+option "dccp_data_slices_per_group" -
+#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+"The number of non-redundant slices per FEC group"
+int typestr = "num"
+optional
+default = "3"
+details = "
+ This determines the number of slices in each FEC group that are
+ necessary to decode the group. The given number must be smaller
+ than the value of the dccp_slices_per_group option below.
+
+ Note that the duration of a FEC group is proportional to the
+ product dccp_max_slice_size * dccp_data_slices_per_group.
+"
+
+option "dccp_slices_per_group" -
+#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+"The total number of slices per FEC group"
+int typestr = "num"
+optional
+default = "4"
+details = "
+ This value must be larger than the value given for above
+ dccp_data_slices_per_group above. The difference being the
+ number of redundant slices per group, i.e. the number of
+ data packets that may be lost without causing interruptions
+ of the resulting audio stream.
+
+ Increase this value if for lossy networks.
+"
+
####################
section "udp sender"
####################
if (gc->mode == GM_SLOPPY)
return len;
}
- ret = write_nonblock(gc->fd, buf, len, 0);
+ ret = write_nonblock(gc->fd, buf, len);
if (ret < 0)
goto err;
if (ret > 0)
#include "afs.h"
#include "server.h"
#include "http.h"
-#include "vss.h"
#include "list.h"
#include "send.h"
+#include "vss.h"
#include "close_on_fork.h"
#include "net.h"
#include "fd.h"
shutdown_clients(hss);
}
+static int queue_chunk_or_shutdown(struct sender_client *sc,
+ struct sender_status *ss, const char *buf, size_t num_bytes)
+{
+ int ret = cq_enqueue(sc->cq, buf, num_bytes);
+ if (ret < 0)
+ shutdown_client(sc, ss);
+ return ret;
+}
+
+/**
+ * Send one chunk of audio data to a connected client.
+ *
+ * \param sc The client.
+ * \param ss The sender.
+ * \param current_chunk The number of the chunk to write.
+ * \param buf The data to write.
+ * \param len The number of bytes of \a buf.
+ * \param header_buf The audio file header.
+ * \param header_len The number of bytes of \a header_buf.
+ *
+ * On errors, the client is shut down. If only a part of the buffer could be
+ * written, the remainder is put into the chunk queue for that client.
+ */
+static void http_send_chunk(struct sender_client *sc, struct sender_status *ss,
+ long unsigned current_chunk, const char *buf, size_t len,
+ const char *header_buf, size_t header_len)
+{
+ int ret;
+
+ if (!sc->header_sent && current_chunk) {
+ if (header_buf && header_len > 0) {
+ ret = queue_chunk_or_shutdown(sc, ss, header_buf, header_len);
+ if (ret < 0)
+ goto out;
+ }
+ }
+ sc->header_sent = 1;
+ ret = send_queued_chunks(sc->fd, sc->cq);
+ if (ret < 0) {
+ shutdown_client(sc, ss);
+ goto out;
+ }
+ if (!len)
+ goto out;
+ if (!ret) { /* still data left in the queue */
+ ret = queue_chunk_or_shutdown(sc, ss, buf, len);
+ goto out;
+ }
+ ret = write_nonblock(sc->fd, buf, len);
+ if (ret < 0) {
+ shutdown_client(sc, ss);
+ goto out;
+ }
+ if (ret != len)
+ ret = queue_chunk_or_shutdown(sc, ss, buf + ret, len - ret);
+out:
+ if (ret < 0)
+ PARA_NOTICE_LOG("%s\n", para_strerror(-ret));
+}
+
static void http_send(long unsigned current_chunk,
__a_unused long unsigned chunks_sent, const char *buf, size_t len,
const char *header_buf, size_t header_len)
list_for_each_entry_safe(sc, tmp, &hss->client_list, node) {
struct private_http_sender_data *phsd = sc->private_data;
- if (phsd->status != HTTP_STREAMING)
- continue;
- send_chunk(sc, hss, 0, current_chunk, buf, len, header_buf,
- header_len);
+
+ if (phsd->status == HTTP_STREAMING)
+ http_send_chunk(sc, hss, current_chunk, buf, len,
+ header_buf, header_len);
}
}
: sizeof(struct sockaddr_in);
}
+/** True if @ss holds a v6-mapped-v4 address (RFC 4291, 2.5.5.2) */
+static bool SS_IS_ADDR_V4MAPPED(const struct sockaddr_storage *ss)
+{
+ const struct sockaddr_in6 *ia6 = (const struct sockaddr_in6 *)ss;
+
+ return ss->ss_family == AF_INET6 && IN6_IS_ADDR_V4MAPPED(&ia6->sin6_addr);
+}
+
/**
* Process IPv4/v6 address, turn v6-mapped-v4 address into normal IPv4 address.
* \param ss Container of IPv4/6 address.
static const struct sockaddr *
normalize_ip_address(const struct sockaddr_storage *ss)
{
- const struct sockaddr_in6 *ia6 = (const struct sockaddr_in6 *)ss;
-
assert(ss->ss_family == AF_INET || ss->ss_family == AF_INET6);
- if (ss->ss_family == AF_INET6 && IN6_IS_ADDR_V4MAPPED(&ia6->sin6_addr)) {
+ if (SS_IS_ADDR_V4MAPPED(ss)) {
+ const struct sockaddr_in6 *ia6 = (const struct sockaddr_in6 *)ss;
static struct sockaddr_in ia;
ia.sin_family = AF_INET;
return (const struct sockaddr *)ss;
}
+/**
+ * Generic/fallback MTU values
+ *
+ * These are taken from RFC 1122, RFC 2460, and RFC 5405.
+ * - RFC 1122, 3.3.3 defines EMTU_S ("Effective MTU for sending") and recommends
+ * to use an EMTU_S size of of 576 bytes if the IPv4 path MTU is unknown;
+ * - RFC 2460, 5. requires a minimum IPv6 MTU of 1280 bytes;
+ * - RFC 5405, 3.2 recommends that if path MTU discovery is not done,
+ * UDP senders should use the respective minimum values of EMTU_S.
+ */
+static inline int generic_mtu(const int af_type)
+{
+ return af_type == AF_INET6 ? 1280 : 576;
+}
+
+/** Crude approximation of IP header overhead - neglecting options. */
+static inline int estimated_header_overhead(const int af_type)
+{
+ return af_type == AF_INET6 ? 40 : 20;
+}
+
+/**
+ * Maximum transport-layer message size (MMS_S) as per RFC 1122, 3.3.3
+ * Socket must be connected.
+ */
+int generic_max_transport_msg_size(int sockfd)
+{
+ struct sockaddr_storage ss;
+ socklen_t sslen = sizeof(ss);
+ int af_type = AF_INET;
+
+ if (getpeername(sockfd, (struct sockaddr *)&ss, &sslen) < 0) {
+ PARA_ERROR_LOG("can not determine remote address type: %s\n",
+ strerror(errno));
+ } else if (!SS_IS_ADDR_V4MAPPED(&ss)) {
+ af_type = ss.ss_family;
+ }
+ return generic_mtu(af_type) - estimated_header_overhead(af_type);
+}
+
/**
* Print numeric host and port number (beware - uses static char).
*
#define SOL_DCCP 269 /**< Linux socket level. */
#endif
+#ifndef DCCP_SOCKOPT_GET_CUR_MPS
+#define DCCP_SOCKOPT_GET_CUR_MPS 5 /**< Max packet size, RFC 4340, 14. */
+#endif
+
#ifndef DCCP_SOCKOPT_AVAILABLE_CCIDS
#define DCCP_SOCKOPT_AVAILABLE_CCIDS 12 /**< List of supported CCIDs. */
#endif
extern char *local_name(int sockfd);
extern char *remote_name(int sockfd);
+/**
+ * Determining maximum payload (packet) size
+ */
+extern int generic_max_transport_msg_size(int sockfd);
+
int send_bin_buffer(int, const char *, size_t);
int send_buffer(int, const char *);
__printf_2_3 int send_va_buffer(int fd, const char *fmt, ...);
/**
* Functions and definitions to support \p IPPROTO_DCCP
*/
-/** Hardcoded maximum number of separate CCID modules compiled into a host */
+/** Estimated worst-case length of a DCCP header including options. */
+#define DCCP_MAX_HEADER 128
+/** Hardcoded maximum number of separate CCID modules compiled into a host. */
#define DCCP_MAX_HOST_CCIDS 20
extern int dccp_available_ccids(uint8_t **ccid_array);
#include <stdbool.h>
#include "para.h"
-#include "oggdec_filter.cmdline.h"
#include "list.h"
#include "sched.h"
#include "ggo.h"
}
}
-static int oggdec_parse_config(int argc, char **argv, void **config)
-{
- int ret;
- struct oggdec_filter_args_info *ogg_conf;
-
- ogg_conf = para_calloc(sizeof(*ogg_conf));
- ret = -E_OGGDEC_SYNTAX;
- if (oggdec_cmdline_parser(argc, argv, ogg_conf))
- goto err;
- ret = -ERRNO_TO_PARA_ERROR(EINVAL);
- if (ogg_conf->bufsize_arg < 0)
- goto err;
- if (ogg_conf->bufsize_arg >= INT_MAX / 1024)
- goto err;
- if (ogg_conf->initial_buffer_arg < 0)
- goto err;
- if (ogg_conf->initial_buffer_arg >= INT_MAX / 1024)
- goto err;
- *config = ogg_conf;
- return 1;
-err:
- free(ogg_conf);
- return ret;
-}
-
-static void oggdec_free_config(void *conf)
-{
- oggdec_cmdline_parser_free(conf);
-}
-
/**
* The init function of the ogg vorbis decoder.
*
*/
void oggdec_filter_init(struct filter *f)
{
- struct oggdec_filter_args_info dummy;
-
- oggdec_cmdline_parser_init(&dummy);
f->open = ogg_open;
f->close = ogg_close;
f->pre_select = generic_filter_pre_select;
f->post_select = ogg_post_select;
- f->parse_config = oggdec_parse_config;
- f->free_config = oggdec_free_config;
f->execute = oggdec_execute;
- f->help = (struct ggo_help) {
- .short_help = oggdec_filter_args_info_help,
- .detailed_help = oggdec_filter_args_info_detailed_help
- };
}
goto out;
return;
}
+ btr_merge(btrn, wn->min_iqs);
bytes = btr_next_buffer(btrn, &data);
frames = bytes / powd->bytes_per_frame;
if (!frames) { /* eof and less than a single frame available */
ret = 0;
if (!FD_ISSET(powd->fd, &s->wfds))
goto out;
- ret = write_nonblock(powd->fd, data, frames * powd->bytes_per_frame, 0);
+ ret = write_nonblock(powd->fd, data, frames * powd->bytes_per_frame);
if (ret < 0)
goto out;
btr_consume(btrn, ret);
* only differ if the stream was repositioned by the \a ff or \a jmp
* command. Of course, \a buf is a pointer to the chunk of data which
* should be sent, and \a len is the length of this buffer.
- */
+ */
void (*send)(long unsigned current_chunk, long unsigned chunks_sent,
const char *buf, size_t len, const char *header_buf,
size_t header_len);
void *private_data;
};
+/**
+ * Each paraslash sender may register arbitrary many clients to the virtual
+ * streaming system, possibly with varying fec parameters. In order to do so,
+ * it must allocate a \a fec_client_parms structure and pass it to \ref
+ * vss_add_fec_client.
+ *
+ * Clients are automatically removed from that list by the vss if an error
+ * occurs, or if the sender requests deletion of a client by calling \ref
+ * vss_del_fec_client().
+ */
+
+/** FEC parameters requested by FEC clients. */
+struct fec_client_parms {
+ /** Number of data slices plus redundant slices. */
+ uint8_t slices_per_group;
+ /** Number of slices minus number of redundant slices. */
+ uint8_t data_slices_per_group;
+ /** Maximal number of bytes per slice, initially zero. */
+ uint16_t max_slice_bytes;
+ /**
+ * Transport-layer initialisation for FEC support.
+ *
+ * This optional function serves (a) to make the transport layer
+ * ready to send FEC slices and (b) to determine the Maximum
+ * Packet Size (MPS) supported by the connection. The MPS value
+ * determines the largest payload size. This value is used to
+ * send FEC slices that are not larger than the path MTU, to avoid
+ * fragmentation and to maximize packet utilization. The user can
+ * alternatively specify a slice size of up to this value.
+ */
+ int (*init_fec)(struct sender_client *sc);
+ /** Push out FEC-encoded packets */
+ int (*send_fec)(struct sender_client *sc, char *buf, size_t len);
+};
+
/** Describes the current status of one paraslash sender. */
struct sender_status {
/** The file descriptor of the socket this sender is listening on. */
void shutdown_client(struct sender_client *sc, struct sender_status *ss);
void shutdown_clients(struct sender_status *ss);
-void send_chunk(struct sender_client *sc, struct sender_status *ss,
- size_t max_bytes_per_write, long unsigned current_chunk,
- const char *buf, size_t len, const char *header_buf,
- size_t header_len);
void init_sender_status(struct sender_status *ss, char **access_arg, int num_access_args,
int port, int max_clients, int default_deny);
char *get_sender_info(struct sender_status *ss, const char *name);
void generic_com_off(struct sender_status *ss);
char *generic_sender_help(void);
struct sender_client *accept_sender_client(struct sender_status *ss, fd_set *rfds);
-int send_queued_chunks(int fd, struct chunk_queue *cq,
- size_t max_bytes_per_write);
+int send_queued_chunks(int fd, struct chunk_queue *cq);
int parse_fec_url(const char *arg, struct sender_command_data *scd);
shutdown_client(sc, ss);
}
-static int queue_chunk_or_shutdown(struct sender_client *sc,
- struct sender_status *ss, const char *buf, size_t num_bytes)
-{
- int ret = cq_enqueue(sc->cq, buf, num_bytes);
- if (ret < 0)
- shutdown_client(sc, ss);
- return ret;
-}
-
/**
* Try to empty the chunk queue for this fd.
*
* \param fd The file descriptor.
* \param cq The list of queued chunks.
- * \param max_bytes_per_write Do not send more than this in one go.
*
* \return Negative on errors, zero if not everything was sent, one otherwise.
*/
-int send_queued_chunks(int fd, struct chunk_queue *cq,
- size_t max_bytes_per_write)
+int send_queued_chunks(int fd, struct chunk_queue *cq)
{
struct queued_chunk *qc;
while ((qc = cq_peek(cq))) {
const char *buf;
size_t len;
int ret;
+
cq_get(qc, &buf, &len);
- ret = write_nonblock(fd, buf, len, max_bytes_per_write);
+ ret = write_nonblock(fd, buf, len);
if (ret < 0)
return ret;
cq_update(cq, ret);
return 1;
}
-/**
- * Send one chunk of audio data to a connected client.
- *
- * \param sc The client.
- * \param ss The sender.
- * \param max_bytes_per_write Split writes to chunks of at most that many bytes.
- * \param current_chunk The number of the chunk to write.
- * \param buf The data to write.
- * \param len The number of bytes of \a buf.
- * \param header_buf The audio file header.
- * \param header_len The number of bytes of \a header_buf.
- *
- * On errors, the client is shut down. If only a part of the buffer could be
- * written, the remainder is put into the chunk queue for that client.
- */
-void send_chunk(struct sender_client *sc, struct sender_status *ss,
- size_t max_bytes_per_write, long unsigned current_chunk,
- const char *buf, size_t len, const char *header_buf,
- size_t header_len)
-{
- int ret;
-
- if (!sc->header_sent && current_chunk) {
- if (header_buf && header_len > 0) {
- ret = queue_chunk_or_shutdown(sc, ss, header_buf, header_len);
- if (ret < 0)
- goto out;
- }
- }
- sc->header_sent = 1;
- ret = send_queued_chunks(sc->fd, sc->cq, max_bytes_per_write);
- if (ret < 0) {
- shutdown_client(sc, ss);
- goto out;
- }
- if (!len)
- goto out;
- if (!ret) { /* still data left in the queue */
- ret = queue_chunk_or_shutdown(sc, ss, buf, len);
- goto out;
- }
- ret = write_nonblock(sc->fd, buf, len, max_bytes_per_write);
- if (ret < 0) {
- shutdown_client(sc, ss);
- goto out;
- }
- if (ret != len)
- ret = queue_chunk_or_shutdown(sc, ss, buf + ret, len - ret);
-out:
- if (ret < 0)
- PARA_NOTICE_LOG("%s\n", para_strerror(-ret));
-}
-
/**
* Initialize a struct sender status.
*
goto out;
}
/* use default fec parameters. */
- scd->max_slice_bytes = 1472;
+ scd->max_slice_bytes = 0;
scd->slices_per_group = 16;
scd->data_slices_per_group = 14;
ret = 0;
#include "string.h"
#include "afs.h"
#include "server.h"
+#include "list.h"
+#include "send.h"
#include "vss.h"
#include "config.h"
#include "close_on_fork.h"
-#include "list.h"
-#include "send.h"
#include "net.h"
#include "daemon.h"
#include "ipc.h"
sz = btr_next_buffer(btrn, &buf);
if (sz == 0)
break;
- ret = write_nonblock(STDOUT_FILENO, buf, sz, 0);
+ ret = write_nonblock(STDOUT_FILENO, buf, sz);
if (ret <= 0)
break;
btr_consume(btrn, ret);
#include <sys/time.h>
#include <dirent.h>
#include <sys/socket.h>
+#include <netinet/udp.h>
#include <net/if.h>
#include <osl.h>
#include "afh.h"
#include "afs.h"
#include "server.h"
-#include "vss.h"
#include "list.h"
#include "send.h"
+#include "vss.h"
#include "portable_io.h"
#include "net.h"
#include "fd.h"
#include "close_on_fork.h"
#include "chunk_queue.h"
+/**
+ * Time window during which ICMP Destination/Port Unreachable messages are
+ * ignored, covering transient receiver problems such as restarting the
+ * client, rebooting, reconfiguration, or handover.
+ */
+#define UDP_MAX_UNREACHABLE_TIME 30
+
/** Describes one entry in the list of targets for the udp sender. */
struct udp_target {
- /** The position of this target in the list of targets. */
- struct list_head node;
/** The hostname (DNS name or IPv4/v6 address string). */
char host[MAX_HOSTLEN];
/** The UDP port. */
int port;
- /** The socket fd. */
- int fd;
- /** The list of queued chunks for this fd. */
- struct chunk_queue *cq;
+ /** Track time (seconds) of last ICMP Port Unreachable error */
+ time_t last_unreachable;
+ /** Common sender client data */
+ struct sender_client *sc;
/** The opaque structure returned by vss_add_fec_client(). */
struct fec_client *fc;
/** The FEC parameters for this target. */
static struct list_head targets;
static int sender_status;
-static void udp_close_target(struct udp_target *ut)
+static void udp_close_target(struct sender_client *sc)
{
- if (ut->fd < 0)
- return;
- close(ut->fd);
- del_close_on_fork_list(ut->fd);
- cq_destroy(ut->cq);
- ut->cq = NULL;
- ut->fd = -1;
+ if (sc->cq != NULL) {
+ del_close_on_fork_list(sc->fd);
+ cq_destroy(sc->cq);
+ free(sc->name);
+ sc->name = NULL;
+ sc->cq = NULL;
+
+ }
}
static void udp_delete_target(struct udp_target *ut, const char *msg)
{
+
PARA_NOTICE_LOG("deleting %s#%d (%s) from list\n", ut->host,
ut->port, msg);
- udp_close_target(ut);
+ udp_close_target(ut->sc);
vss_del_fec_client(ut->fc);
- list_del(&ut->node);
+ list_del(&ut->sc->node);
+ free(ut->sc);
free(ut);
}
/**
* Perform AF-independent multicast sender setup.
*
- * \param fd The connected socket descriptor.
- * \param ttl UDPv4 multicast TTL or UDPv6 multicast number of hops.
- * Use -1 to mean default, 0..255 otherwise.
- * \param iface The outgoing multicast interface, or NULL for the default.
+ * \param sc The connected sender client.
*
* \return Zero if okay, negative on error.
*/
-static int mcast_sender_setup(struct udp_target *ut, int ttl, char *iface)
+static int mcast_sender_setup(struct sender_client *sc)
{
struct sockaddr_storage ss;
socklen_t sslen = sizeof(ss);
-
+ int ttl = conf.udp_ttl_arg, id = 0;
const int on = 1;
- int id = iface == NULL ? 0 : if_nametoindex(iface);
- if (getpeername(ut->fd, (struct sockaddr *)&ss, &sslen) < 0)
- goto err;
+ if (conf.udp_mcast_iface_given) {
+ char *iface = conf.udp_mcast_iface_arg;
+
+ id = if_nametoindex(iface);
+ if (id == 0)
+ PARA_WARNING_LOG("could not resolve interface '%s', "
+ "using default", iface);
+ }
- if (iface != NULL && id == 0)
- PARA_WARNING_LOG("could not resolve interface %s, using default", iface);
+ if (getpeername(sc->fd, (struct sockaddr *)&ss, &sslen) < 0)
+ goto err;
/* RFC 3493, 5.2: -1 means 'use kernel default' */
if (ttl < 0 || ttl > 255)
memset(&mn, 0, sizeof(mn));
mn.imr_ifindex = id;
- if (setsockopt(ut->fd, IPPROTO_IP, IP_MULTICAST_IF, &mn, sizeof(mn)) < 0)
+ if (setsockopt(sc->fd, IPPROTO_IP, IP_MULTICAST_IF, &mn, sizeof(mn)) < 0)
goto err;
#else
PARA_ERROR_LOG("No support for setting outgoing IPv4 mcast interface.");
* Enable receiving multicast messages generated on the local host
* At least on Linux, this is enabled by default.
*/
- if (setsockopt(ut->fd, IPPROTO_IP, IP_MULTICAST_LOOP, &on, sizeof(on)) < 0)
+ if (setsockopt(sc->fd, IPPROTO_IP, IP_MULTICAST_LOOP, &on, sizeof(on)) < 0)
break;
/* Default: use local subnet (do not flood out into the WAN) */
if (ttl == -1)
ttl = 1;
- if (setsockopt(ut->fd, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)) < 0)
+ if (setsockopt(sc->fd, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)) < 0)
break;
return 0;
case AF_INET6:
if (!IN6_IS_ADDR_MULTICAST(&((struct sockaddr_in6 *)&ss)->sin6_addr))
return 0;
if (id != 0 &&
- setsockopt(ut->fd, IPPROTO_IPV6, IPV6_MULTICAST_IF, &id, sizeof(id)) < 0)
+ setsockopt(sc->fd, IPPROTO_IPV6, IPV6_MULTICAST_IF, &id, sizeof(id)) < 0)
break;
- if (setsockopt(ut->fd, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, &on, sizeof(on)) < 0)
+ if (setsockopt(sc->fd, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, &on, sizeof(on)) < 0)
break;
- if (setsockopt(ut->fd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &ttl, sizeof(ttl)) < 0)
+ if (setsockopt(sc->fd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &ttl, sizeof(ttl)) < 0)
break;
return 0;
default:
/** The maximal size of the per-target chunk queue. */
#define UDP_CQ_BYTES 40000
-static int udp_init_session(struct udp_target *ut)
+static void udp_init_session(struct sender_client *sc)
{
- int ret;
- char *iface = NULL;
-
- if (ut->fd >= 0) /* nothing to do */
- return 0;
-
- ret = para_connect_simple(IPPROTO_UDP, ut->host, ut->port);
- if (ret < 0)
- return ret;
- ut->fd = ret;
-
- if (conf.udp_mcast_iface_given)
- iface = conf.udp_mcast_iface_arg;
-
- ret = mcast_sender_setup(ut, conf.udp_ttl_arg, iface);
- if (ret < 0) {
- close(ut->fd);
- return ret;
+ if (sc->cq == NULL) {
+ add_close_on_fork_list(sc->fd);
+ sc->cq = cq_new(UDP_CQ_BYTES);
+ sc->name = para_strdup(remote_name(sc->fd));
+ PARA_NOTICE_LOG("sending to udp %s\n", sc->name);
}
-
- ret = mark_fd_nonblocking(ut->fd);
- if (ret < 0) {
- close(ut->fd);
- return ret;
- }
- add_close_on_fork_list(ut->fd);
- ut->cq = cq_new(UDP_CQ_BYTES);
- PARA_NOTICE_LOG("sending to udp %s#%d\n", ut->host, ut->port);
- return 1;
}
static void udp_shutdown_targets(void)
{
- struct udp_target *ut, *tmp;
- const char *buf = NULL;
- size_t len = 0; /* STFU, gcc */
-
- list_for_each_entry_safe(ut, tmp, &targets, node) {
- int ubuntu_glibc_headers_suck;
- if (ut->fd < 0)
- continue;
- if (!buf)
- len = vss_get_fec_eof_packet(&buf);
- /* ignore return value, we're closing the target anyway. */
- ubuntu_glibc_headers_suck = write(ut->fd, buf, len); /* STFU */
- udp_close_target(ut);
- }
+ struct sender_client *sc, *tmp;
+ const char *buf;
+ size_t len = vss_get_fec_eof_packet(&buf);
+
+ list_for_each_entry_safe(sc, tmp, &targets, node)
+ if (sc->cq != NULL) {
+ /* ignore return value, closing the target anyway. */
+ (void)write(sc->fd, buf, len);
+ udp_close_target(sc);
+ }
}
static int udp_com_on(__a_unused struct sender_command_data *scd)
static int udp_com_delete(struct sender_command_data *scd)
{
- struct udp_target *ut, *tmp;
+ struct sender_client *sc, *tmp;
+
+ list_for_each_entry_safe(sc, tmp, &targets, node) {
+ struct udp_target *ut = sc->private_data;
- list_for_each_entry_safe(ut, tmp, &targets, node) {
/* Unspecified port means wildcard port match */
if (scd->port > 0 && scd->port != ut->port)
continue;
return 1;
}
-static int udp_send_fec(char *buf, size_t len, void *private_data)
+/** Initialize UDP session and set maximum payload size. */
+static int udp_init_fec(struct sender_client *sc)
+{
+ int mps;
+
+ udp_init_session(sc);
+ mps = generic_max_transport_msg_size(sc->fd) - sizeof(struct udphdr);
+ PARA_INFO_LOG("current MPS = %d bytes\n", mps);
+ return mps;
+}
+
+/** Check and clear socket error if any. */
+static int udp_check_socket_state(struct udp_target *ut)
{
- struct udp_target *ut = private_data;
+ int ret;
+ socklen_t errlen = sizeof(ret);
+
+ if (getsockopt(ut->sc->fd, SOL_SOCKET, SO_ERROR, &ret, &errlen) < 0) {
+ PARA_ERROR_LOG("SO_ERROR failed: %s\n", strerror(ret));
+ return 0;
+ } else if (ret == 0) {
+ return 0;
+ } else if (ret == ECONNREFUSED) {
+ time_t dist = now->tv_sec - ut->last_unreachable;
+
+ if (dist <= UDP_MAX_UNREACHABLE_TIME) {
+ return 0;
+ } else if (dist > 2 * UDP_MAX_UNREACHABLE_TIME) {
+ ut->last_unreachable = now->tv_sec;
+ return 0;
+ } else {
+ /*
+ * unreachable_time < dist <= 2 * unreachable_time
+ * No errors are allowed during this time window.
+ */
+ PARA_NOTICE_LOG("Evicting %s#%d after %d seconds "
+ "of connection errors.\n",
+ ut->host, ut->port, (int)dist);
+ }
+ }
+ return -ERRNO_TO_PARA_ERROR(ret);
+}
+
+static int udp_send_fec(struct sender_client *sc, char *buf, size_t len)
+{
+ struct udp_target *ut = sc->private_data;
int ret;
if (sender_status == SENDER_OFF)
return 0;
- ret = udp_init_session(ut);
+ if (len == 0 && !cq_peek(ut->sc->cq))
+ return 0;
+ ret = udp_check_socket_state(ut);
if (ret < 0)
goto fail;
- ret = send_queued_chunks(ut->fd, ut->cq, 0);
- if (ret == -ERRNO_TO_PARA_ERROR(ECONNREFUSED))
- ret = 0;
+ ret = send_queued_chunks(sc->fd, sc->cq);
if (ret < 0)
goto fail;
- if (!len)
- return 0;
if (!ret) { /* still data left in the queue */
- ret = cq_force_enqueue(ut->cq, buf, len);
+ ret = cq_force_enqueue(sc->cq, buf, len);
assert(ret >= 0);
+ return 0;
}
- ret = write_nonblock(ut->fd, buf, len, 0);
- if (ret == -ERRNO_TO_PARA_ERROR(ECONNREFUSED))
+ ret = write_nonblock(sc->fd, buf, len);
+ if (ret == -ERRNO_TO_PARA_ERROR(ECONNREFUSED)) {
+ /*
+ * Happens if meanwhile an ICMP Destination / Port Unreachable
+ * has arrived. Ignore, persistent errors will be caught above.
+ */
ret = 0;
+ }
if (ret < 0)
goto fail;
if (ret != len) {
- ret = cq_force_enqueue(ut->cq, buf + ret, len - ret);
+ ret = cq_force_enqueue(sc->cq, buf + ret, len - ret);
assert(ret >= 0);
}
return 1;
static void udp_add_target(struct sender_command_data *scd)
{
- struct udp_target *ut = para_calloc(sizeof(struct udp_target));
+ int ret, port = scd->port > 0 ? scd->port : conf.udp_default_port_arg;
+ struct udp_target *ut = para_calloc(sizeof(*ut));
strncpy(ut->host, scd->host, sizeof(ut->host));
ut->port = scd->port > 0 ? scd->port : conf.udp_default_port_arg;
- ut->fd = -1; /* not yet connected */
- PARA_INFO_LOG("adding to target list (%s#%d)\n", ut->host, ut->port);
- para_list_add(&ut->node, &targets);
- ut->fcp.slices_per_group = scd->slices_per_group;
+
+ ut->fcp.slices_per_group = scd->slices_per_group;
ut->fcp.data_slices_per_group = scd->data_slices_per_group;
- ut->fcp.max_slice_bytes = scd->max_slice_bytes;
- ut->fcp.send = udp_send_fec;
- ut->fcp.private_data = ut;
- vss_add_fec_client(&ut->fcp, &ut->fc);
+ ut->fcp.max_slice_bytes = scd->max_slice_bytes;
+ ut->fcp.init_fec = udp_init_fec;
+ ut->fcp.send_fec = udp_send_fec;
+
+ ut->sc = para_calloc(sizeof(*ut->sc));
+ ut->sc->private_data = ut;
+ ut->sc->fd = -1;
+ ret = para_connect_simple(IPPROTO_UDP, scd->host, port);
+ if (ret < 0)
+ goto err;
+ ut->sc->fd = ret;
+
+ ret = mcast_sender_setup(ut->sc);
+ if (ret < 0)
+ goto err;
+ ret = mark_fd_nonblocking(ut->sc->fd);
+ if (ret < 0)
+ goto err;
+ PARA_INFO_LOG("adding to target list (%s#%d)\n", ut->host, ut->port);
+ ut->fc = vss_add_fec_client(ut->sc, &ut->fcp);
+ para_list_add(&ut->sc->node, &targets);
+ return;
+err:
+ if (ut->sc->fd >= 0)
+ close(ut->sc->fd);
+ PARA_NOTICE_LOG("failed to set up %s#%d (%s)- not adding it\n",
+ scd->host, port, para_strerror(-ret));
+ free(ut->sc);
+ free(ut);
}
static int udp_com_add(struct sender_command_data *scd)
static char *udp_info(void)
{
- struct udp_target *ut;
+ struct sender_client *sc;
char *ret, *tgts = NULL;
- list_for_each_entry(ut, &targets, node) {
+ list_for_each_entry(sc, &targets, node) {
+ struct udp_target *ut = sc->private_data;
bool is_v6 = strchr(ut->host, ':') != NULL;
char *tmp = make_message("%s%s%s%s:%d/%u:%u:%u ", tgts ? : "",
is_v6 ? "[" : "", ut->host,
#include "net.h"
#include "server.cmdline.h"
#include "list.h"
-#include "vss.h"
#include "send.h"
+#include "vss.h"
#include "ipc.h"
#include "fd.h"
#include "sched.h"
uint8_t num_header_slices;
};
+enum fec_client_state {
+ FEC_STATE_NONE = 0, /**< not initialized and not enabled */
+ FEC_STATE_DISABLED, /**< temporarily disabled */
+ FEC_STATE_READY_TO_RUN /**< initialized and enabled */
+};
+
/**
* Describes one connected FEC client.
*/
struct fec_client {
- /** If negative, this client is temporarily disabled. */
- int error;
+ /** Current state of the client */
+ enum fec_client_state state;
+ /** The connected sender client (transport layer). */
+ struct sender_client *sc;
/** Parameters requested by the client. */
struct fec_client_parms *fcp;
/** Used by the core FEC code. */
static int compute_next_fec_slice(struct fec_client *fc, struct vss_task *vsst)
{
- assert(fc->error >= 0);
if (fc->first_stream_chunk < 0 || fc->current_slice_num
== fc->fcp->slices_per_group + fc->num_extra_slices) {
int ret = setup_next_fec_group(fc, vsst);
if (ret < 0) {
PARA_ERROR_LOG("%s\n", para_strerror(-ret));
PARA_ERROR_LOG("FEC client temporarily disabled\n");
- fc->error = ret;
- return fc->error;
+ fc->state = FEC_STATE_DISABLED;
+ return ret;
}
}
write_fec_header(fc, vsst);
/**
* Add one entry to the list of active fec clients.
*
- * \param fcp Describes the fec parameters to be used for this client.
- * \param result An opaque pointer that must be used by remove the client later.
+ * \param sc Generic sender_client data of the transport layer.
+ * \param fcp FEC parameters as supplied by the transport layer.
*
- * \return Standard.
+ * \return Newly allocated fec_client struct.
*/
-int vss_add_fec_client(struct fec_client_parms *fcp, struct fec_client **result)
+struct fec_client *vss_add_fec_client(struct sender_client *sc,
+ struct fec_client_parms *fcp)
{
- int ret;
- struct fec_client *fc;
+ struct fec_client *fc = para_calloc(sizeof(*fc));
- if (fcp->max_slice_bytes < FEC_HEADER_SIZE + fcp->data_slices_per_group)
- return -ERRNO_TO_PARA_ERROR(EINVAL);
- fc = para_calloc(sizeof(*fc));
+ fc->sc = sc;
fc->fcp = fcp;
- ret = fec_new(fcp->data_slices_per_group, fcp->slices_per_group,
- &fc->parms);
- if (ret < 0)
- goto err;
- fc->first_stream_chunk = -1; /* stream not yet started */
- fc->src_data = para_malloc(fc->fcp->slices_per_group * sizeof(char *));
- fc->enc_buf = para_calloc(fc->fcp->max_slice_bytes);
- fc->num_extra_slices = 0;
- fc->extra_src_buf = para_calloc(fc->fcp->max_slice_bytes);
- fc->next_header_time.tv_sec = 0;
para_list_add(&fc->node, &fec_client_list);
- *result = fc;
- return 1;
-err:
- fec_free(fc->parms);
- free(fc);
- *result = NULL;
- return ret;
+ return fc;
}
/**
{
struct fec_client *fc;
- assert(vss_playing());
list_for_each_entry(fc, &fec_client_list, node) {
struct timeval diff;
- if (fc->error < 0)
+ if (fc->state != FEC_STATE_READY_TO_RUN)
continue;
if (next_slice_is_due(fc, &diff)) {
timeout->tv_sec = 0;
list_for_each_entry(fc, &fec_client_list, node) {
struct timeval group_duration;
- if (fc->error < 0)
+ if (fc->state != FEC_STATE_READY_TO_RUN)
continue;
tv_scale(fc->group.num_chunks, chunk_tv, &group_duration);
if (tv_diff(&timeout, &group_duration, NULL) < 0)
senders[i].shutdown_clients();
list_for_each_entry_safe(fc, tmp, &fec_client_list, node) {
fc->first_stream_chunk = -1;
- fc->error = 0;
+ fc->state = FEC_STATE_NONE;
}
mmd->stream_start.tv_sec = 0;
mmd->stream_start.tv_usec = 0;
mmd->new_vss_status_flags = VSS_NEXT;
}
+static int initialize_fec_client(struct fec_client *fc)
+{
+ int ret;
+ struct fec_client_parms *fcp = fc->fcp;
+
+ if (fcp->init_fec) {
+ /*
+ * Set the maximum slice size to the Maximum Packet Size if the
+ * transport protocol allows to determine this value. The user
+ * can specify a slice size up to this value.
+ */
+ ret = fcp->init_fec(fc->sc);
+ if (ret < 0)
+ return ret;
+ if (!fcp->max_slice_bytes || fcp->max_slice_bytes > ret)
+ fcp->max_slice_bytes = ret;
+ }
+ if (fcp->max_slice_bytes < FEC_HEADER_SIZE + fcp->data_slices_per_group)
+ return -ERRNO_TO_PARA_ERROR(EINVAL);
+ ret = fec_new(fcp->data_slices_per_group, fcp->slices_per_group,
+ &fc->parms);
+ if (ret < 0)
+ goto err;
+ fc->first_stream_chunk = -1; /* stream not yet started */
+ fc->src_data = para_malloc(fc->fcp->slices_per_group * sizeof(char *));
+ fc->enc_buf = para_calloc(fc->fcp->max_slice_bytes);
+ fc->num_extra_slices = 0;
+ fc->extra_src_buf = para_calloc(fc->fcp->max_slice_bytes);
+ fc->next_header_time.tv_sec = 0;
+ fc->state = FEC_STATE_READY_TO_RUN;
+ return 1;
+err:
+ fec_free(fc->parms);
+ return ret;
+}
+
/**
* Main sending function.
*
*/
static void vss_send(struct vss_task *vsst)
{
- int i, fec_active = 0;
+ int ret, i, fec_active = 0;
struct timeval due;
struct fec_client *fc, *tmp_fc;
&due, 1) < 0)
return;
list_for_each_entry_safe(fc, tmp_fc, &fec_client_list, node) {
- if (fc->error < 0)
+ switch (fc->state) {
+ case FEC_STATE_DISABLED:
continue;
+ case FEC_STATE_NONE:
+ ret = initialize_fec_client(fc);
+ if (ret < 0) {
+ PARA_ERROR_LOG("%s\n", para_strerror(-ret));
+ continue;
+ }
+ /* fall through */
+ case FEC_STATE_READY_TO_RUN:
+ break;
+ }
if (!next_slice_is_due(fc, NULL)) {
fec_active = 1;
continue;
continue;
PARA_DEBUG_LOG("sending %d:%d (%u bytes)\n", fc->group.num,
fc->current_slice_num, fc->fcp->max_slice_bytes);
- fc->fcp->send((char *)fc->enc_buf,
- fc->fcp->max_slice_bytes,
- fc->fcp->private_data);
+ fc->fcp->send_fec(fc->sc, (char *)fc->enc_buf,
+ fc->fcp->max_slice_bytes);
fc->current_slice_num++;
fec_active = 1;
}
/** Currently playing. */
#define VSS_PLAYING 8
-/**
- * Each paraslash sender may register arbitrary many clients to the virtual
- * streaming system, possibly with varying fec parameters. In order to do so,
- * it must allocate a \a fec_client_parms structure and pass it to \ref
- * add_fec_client.
- *
- * Clients are automatically removed from that list by the vss if an error
- * occurs, or if the sender requests deletion of a client by calling \ref
- * vss_del_fec_client().
- */
-struct fec_client;
-
-/** FEC parameters requested by FEC clients. */
-struct fec_client_parms {
- /** Number of data slices plus redundant slices. */
- uint8_t slices_per_group;
- /** Number of slices minus number of redundant slices. */
- uint8_t data_slices_per_group;
- /** Maximal number of bytes per slice. */
- uint16_t max_slice_bytes;
- /** Called by vss.c when the next slice should be sent. */
- int (*send)(char *buf, size_t num_bytes, void *private_data);
- /** Passed verbatim to \a send(). */
- void *private_data;
-};
-
-int vss_add_fec_client(struct fec_client_parms *fcp, struct fec_client **result);
+struct fec_client *vss_add_fec_client(struct sender_client *sc,
+ struct fec_client_parms *fcp);
void vss_del_fec_client(struct fec_client *fc);
size_t vss_get_fec_eof_packet(const char **buf);
<h1>Events</h1>
<hr>
<ul>
+ <li>2010-06-23: <a href="cooking.html">What's cooking page</a> online</li>
<li>2010-04-23: <a href="versions/paraslash-0.4.2.tar.bz2">paraslash-0.4.2</a>
<a href="versions/paraslash-0.4.2.tar.bz2.asc">(sig)</a>
"associative expansion"