From 28f8405e062fcff1f0ce90eb01ffeaca299cffa7 Mon Sep 17 00:00:00 2001 From: Andre Noll Date: Tue, 27 Apr 2010 21:43:19 +0200 Subject: [PATCH] Basic infrastructure changes for FEC/DCCP support. This patch allows to add a FEC client without specifying the FEC parameters. Instead, these parameters are set at stream start time via the new ->open() method of the senders. It is desirable to defer ssetting the FEC parameters to allow dynamic determination of the slice size. We need an established connection to find out the best value for the slice size, but the UDP sender currently has to fix the slice size at target add time, which is too early. The slice size for the UDP and the DCCP sender are currently hardcoded values; this will be fixed by subsequent patches. --- aft.c | 1 - audiod.c | 3 +- chunk_queue.c | 1 - command.c | 2 +- dccp_send.c | 80 +++++++++++++++++++++++++++++++++++++-------------- http_send.c | 2 +- send.h | 41 +++++++++++++++++++++++++- send_common.c | 2 -- server.c | 4 +-- server.h | 2 -- udp_send.c | 19 ++++++------ vss.c | 76 +++++++++++++++++++++++++++++++----------------- vss.h | 29 ++----------------- 13 files changed, 167 insertions(+), 95 deletions(-) diff --git a/aft.c b/aft.c index 25058e7c..eb0f7b57 100644 --- a/aft.c +++ b/aft.c @@ -21,7 +21,6 @@ #include "afh.h" #include "afs.h" #include "net.h" -#include "vss.h" #include "fd.h" #include "ipc.h" #include "portable_io.h" diff --git a/audiod.c b/audiod.c index 778318ce..5c17e6ea 100644 --- a/audiod.c +++ b/audiod.c @@ -863,7 +863,8 @@ static int init_default_filters(void) * If udp is used to receive this audiod format, add fecdec as * the first filter. */ - if (strcmp(afi[i].receiver->name, "udp") == 0) { + if (strcmp(afi[i].receiver->name, "udp") == 0 || + strcmp(afi[i].receiver->name, "dccp") == 0) { tmp = para_strdup("fecdec"); add_filter(i, tmp); free(tmp); diff --git a/chunk_queue.c b/chunk_queue.c index 3f5ac1d9..468b1dc2 100644 --- a/chunk_queue.c +++ b/chunk_queue.c @@ -11,7 +11,6 @@ #include "para.h" #include "list.h" #include "afh.h" -#include "vss.h" #include "string.h" #include "error.h" diff --git a/command.c b/command.c index b7795a00..42fa3e75 100644 --- a/command.c +++ b/command.c @@ -23,9 +23,9 @@ #include "afh.h" #include "afs.h" #include "server.h" -#include "vss.h" #include "list.h" #include "send.h" +#include "vss.h" #include "rc4.h" #include "net.h" #include "daemon.h" diff --git a/dccp_send.c b/dccp_send.c index 6248ae80..0e69a969 100644 --- a/dccp_send.c +++ b/dccp_send.c @@ -24,8 +24,8 @@ #include "server.h" #include "net.h" #include "list.h" -#include "vss.h" #include "send.h" +#include "vss.h" #include "fd.h" #include "close_on_fork.h" #include "chunk_queue.h" @@ -36,6 +36,14 @@ #define DCCP_MAX_BYTES_PER_WRITE 1024 static struct sender_status dccp_sender_status, *dss = &dccp_sender_status; +static struct sender *self; + + +struct dccp_fec_client { + struct fec_client_parms fcp; + struct fec_client *fc; + struct sender_client *sc; +}; static void dccp_pre_select(int *max_fileno, fd_set *rfds, __a_unused fd_set *wfds) @@ -65,9 +73,47 @@ static int dccp_get_tx_ccid(int sockfd) return tx_ccid; } +static void dccp_shutdown_client(struct sender_client *sc) +{ + struct dccp_fec_client *dfc = sc->private_data; + + vss_del_fec_client(dfc->fc); + shutdown_client(sc, dss); +} + +static void dccp_shutdown_clients(void) +{ + struct sender_client *sc, *tmp; + + list_for_each_entry_safe(sc, tmp, &dss->client_list, node) + dccp_shutdown_client(sc); +} + +static int dccp_open(void *client, struct fec_client_parms **fcp) +{ + struct dccp_fec_client *dfc = client; + + dfc->fcp.slices_per_group = 4; + dfc->fcp.data_slices_per_group = 3; + dfc->fcp.max_slice_bytes = 1472; + *fcp = &dfc->fcp; + return 1; +} + +static int dccp_send_fec(char *buf, size_t len, void *private_data) +{ + struct dccp_fec_client *dfc = private_data; + int ret = write_nonblock(dfc->sc->fd, buf, len, DCCP_MAX_BYTES_PER_WRITE); + + if (ret < 0) + dccp_shutdown_client(dfc->sc); + return ret; +} + static void dccp_post_select(fd_set *rfds, __a_unused fd_set *wfds) { struct sender_client *sc; + struct dccp_fec_client *dfc; int tx_ccid; sc = accept_sender_client(dss, rfds); @@ -87,26 +133,15 @@ static void dccp_post_select(fd_set *rfds, __a_unused fd_set *wfds) * from the client; by shutting down this unused communication path we can * reduce processing costs a bit. See analogous comment in dccp_recv.c. */ - if (shutdown(sc->fd, SHUT_RD) >= 0) + if (shutdown(sc->fd, SHUT_RD) < 0) { + PARA_WARNING_LOG("%s\n", strerror(errno)); + shutdown_client(sc, dss); return; - PARA_WARNING_LOG("%s\n", strerror(errno)); - shutdown_client(sc, dss); -} - -static void dccp_send(long unsigned current_chunk, - __a_unused long unsigned chunks_sent, const char *buf, - size_t len, const char *header_buf, size_t header_len) -{ - struct sender_client *sc, *tmp; - - list_for_each_entry_safe(sc, tmp, &dss->client_list, node) - send_chunk(sc, dss, DCCP_MAX_BYTES_PER_WRITE, current_chunk, buf, - len, header_buf, header_len); -} - -static void dccp_shutdown_clients(void) -{ - shutdown_clients(dss); + } + dfc = para_calloc(sizeof(*dfc)); + sc->private_data = dfc; + dfc->sc = sc; + vss_add_fec_client(self, dfc, &dfc->fc); } static int dccp_com_on(__a_unused struct sender_command_data *scd) @@ -176,7 +211,9 @@ void dccp_send_init(struct sender *s) int ret; s->info = dccp_info; - s->send = dccp_send; + s->send = NULL; + s->open = dccp_open; + s->send_fec = dccp_send_fec; s->pre_select = dccp_pre_select; s->post_select = dccp_post_select; s->shutdown_clients = dccp_shutdown_clients; @@ -194,4 +231,5 @@ void dccp_send_init(struct sender *s) ret = generic_com_on(dss, IPPROTO_DCCP); if (ret < 0) PARA_ERROR_LOG("%s\n", para_strerror(-ret)); + self = s; } diff --git a/http_send.c b/http_send.c index 424d63b2..2c918fc8 100644 --- a/http_send.c +++ b/http_send.c @@ -19,9 +19,9 @@ #include "afs.h" #include "server.h" #include "http.h" -#include "vss.h" #include "list.h" #include "send.h" +#include "vss.h" #include "close_on_fork.h" #include "net.h" #include "fd.h" diff --git a/send.h b/send.h index acf62db4..72c7a6c6 100644 --- a/send.h +++ b/send.h @@ -9,6 +9,28 @@ /** The sender subcommands. */ enum {SENDER_ADD, SENDER_DELETE, SENDER_ALLOW, SENDER_DENY, SENDER_ON, SENDER_OFF, NUM_SENDER_CMDS}; +/** + * Each paraslash sender may register arbitrary many clients to the virtual + * streaming system, possibly with varying fec parameters. In order to do so, + * it must allocate a \a fec_client_parms structure and pass it to \ref + * add_fec_client. + * + * Clients are automatically removed from that list by the vss if an error + * occurs, or if the sender requests deletion of a client by calling \ref + * vss_del_fec_client(). + */ +struct fec_client; + +/** FEC parameters requested by FEC clients. */ +struct fec_client_parms { + /** Number of data slices plus redundant slices. */ + uint8_t slices_per_group; + /** Number of slices minus number of redundant slices. */ + uint8_t data_slices_per_group; + /** Maximal number of bytes per slice, initially zero. */ + uint16_t max_slice_bytes; +}; + /** * Describes one supported sender of para_server. * @@ -47,10 +69,27 @@ struct sender { * only differ if the stream was repositioned by the \a ff or \a jmp * command. Of course, \a buf is a pointer to the chunk of data which * should be sent, and \a len is the length of this buffer. - */ + */ void (*send)(long unsigned current_chunk, long unsigned chunks_sent, const char *buf, size_t len, const char *header_buf, size_t header_len); + + /** + * Obtain the FEC parameters of a FEC client. + * + * This is called once by vss.c at the beginning of a stream. Senders + * are supposed to set \a fcp to a struct which is suitable for the FEC + * client identified by \a private_data. + */ + int (*open)(void *client, struct fec_client_parms **fcp); + /** + * Send the next slice to a FEC client. + * + * Called by vss.c when the next slice should be sent to the FEC client + * identified by \a private_data, the pointer which was previously + * passed to vss_add_fec_target(). + */ + int (*send_fec)(char *buf, size_t num_bytes, void *private_data); /** * Add file descriptors to fd_sets. * diff --git a/send_common.c b/send_common.c index b44c8133..ed140c84 100644 --- a/send_common.c +++ b/send_common.c @@ -417,7 +417,6 @@ static int parse_fec_parms(const char *arg, struct sender_command_data *scd) ret = -ERRNO_TO_PARA_ERROR(EINVAL); if (val < 0 || val > 65535) goto out; - scd->max_slice_bytes = val; /* parse data_slices_per_group */ b = e + 1; e = strchr(b, ':'); @@ -480,7 +479,6 @@ int parse_fec_url(const char *arg, struct sender_command_data *scd) goto out; } /* use default fec parameters. */ - scd->max_slice_bytes = 1472; scd->slices_per_group = 16; scd->data_slices_per_group = 14; ret = 0; diff --git a/server.c b/server.c index bc143039..0e256053 100644 --- a/server.c +++ b/server.c @@ -77,11 +77,11 @@ #include "string.h" #include "afs.h" #include "server.h" +#include "list.h" +#include "send.h" #include "vss.h" #include "config.h" #include "close_on_fork.h" -#include "list.h" -#include "send.h" #include "net.h" #include "daemon.h" #include "ipc.h" diff --git a/server.h b/server.h index 43ba841d..822e80e1 100644 --- a/server.h +++ b/server.h @@ -25,8 +25,6 @@ struct sender_command_data{ int netmask; /** The port number for add/remove. */ int port; - /** Maximal slice size. */ - uint16_t max_slice_bytes; /** Number of data slices plus redundant slices. */ uint8_t slices_per_group; /** Number of slices minus number of redundant slices. */ diff --git a/udp_send.c b/udp_send.c index e80670d0..29deafb0 100644 --- a/udp_send.c +++ b/udp_send.c @@ -21,9 +21,9 @@ #include "afh.h" #include "afs.h" #include "server.h" -#include "vss.h" #include "list.h" #include "send.h" +#include "vss.h" #include "portable_io.h" #include "net.h" #include "fd.h" @@ -51,6 +51,7 @@ struct udp_target { static struct list_head targets; static int sender_status; +static struct sender *self; static void udp_close_target(struct udp_target *ut) { @@ -152,8 +153,9 @@ err: /** The maximal size of the per-target chunk queue. */ #define UDP_CQ_BYTES 40000 -static int udp_init_session(struct udp_target *ut) +static int udp_open(void *client, struct fec_client_parms **fcp) { + struct udp_target *ut = client; int ret; char *iface = NULL; @@ -181,6 +183,8 @@ static int udp_init_session(struct udp_target *ut) } add_close_on_fork_list(ut->fd); ut->cq = cq_new(UDP_CQ_BYTES); + ut->fcp.max_slice_bytes = 1472; /* FIXME */ + *fcp = &ut->fcp; PARA_NOTICE_LOG("sending to udp %s#%d\n", ut->host, ut->port); return 1; } @@ -238,9 +242,6 @@ static int udp_send_fec(char *buf, size_t len, void *private_data) if (sender_status == SENDER_OFF) return 0; - ret = udp_init_session(ut); - if (ret < 0) - goto fail; ret = send_queued_chunks(ut->fd, ut->cq, 0); if (ret == -ERRNO_TO_PARA_ERROR(ECONNREFUSED)) ret = 0; @@ -278,10 +279,7 @@ static void udp_add_target(struct sender_command_data *scd) para_list_add(&ut->node, &targets); ut->fcp.slices_per_group = scd->slices_per_group; ut->fcp.data_slices_per_group = scd->data_slices_per_group; - ut->fcp.max_slice_bytes = scd->max_slice_bytes; - ut->fcp.send = udp_send_fec; - ut->fcp.private_data = ut; - vss_add_fec_client(&ut->fcp, &ut->fc); + vss_add_fec_client(self, ut, &ut->fc); } static int udp_com_add(struct sender_command_data *scd) @@ -366,6 +364,8 @@ void udp_send_init(struct sender *s) s->info = udp_info; s->help = udp_help; s->send = NULL; + s->send_fec = udp_send_fec; + s->open = udp_open; s->pre_select = NULL; s->post_select = NULL; s->shutdown_clients = udp_shutdown_targets; @@ -379,5 +379,6 @@ void udp_send_init(struct sender *s) udp_init_target_list(); if (!conf.udp_no_autostart_given) sender_status = SENDER_ON; + self = s; PARA_DEBUG_LOG("udp sender init complete\n"); } diff --git a/vss.c b/vss.c index 898180c0..afde0dcb 100644 --- a/vss.c +++ b/vss.c @@ -26,8 +26,8 @@ #include "net.h" #include "server.cmdline.h" #include "list.h" -#include "vss.h" #include "send.h" +#include "vss.h" #include "ipc.h" #include "fd.h" #include "sched.h" @@ -137,6 +137,8 @@ struct fec_group { struct fec_client { /** If negative, this client is temporarily disabled. */ int error; + /** UDP or DCCP. */ + struct sender *sender; /** Parameters requested by the client. */ struct fec_client_parms *fcp; /** Used by the core FEC code. */ @@ -161,6 +163,8 @@ struct fec_client { int num_extra_slices; /** Contains the FEC-encoded data. */ unsigned char *enc_buf; + /** Pointer obtained from sender when the client is added. */ + void *private_data; }; /** @@ -413,33 +417,16 @@ size_t vss_get_fec_eof_packet(const char **buf) * * \return Standard. */ -int vss_add_fec_client(struct fec_client_parms *fcp, struct fec_client **result) +int vss_add_fec_client(struct sender *sender, void *private_data, + struct fec_client **result) { - int ret; - struct fec_client *fc; + struct fec_client *fc = para_calloc(sizeof(*fc)); - if (fcp->max_slice_bytes < FEC_HEADER_SIZE + fcp->data_slices_per_group) - return -ERRNO_TO_PARA_ERROR(EINVAL); - fc = para_calloc(sizeof(*fc)); - fc->fcp = fcp; - ret = fec_new(fcp->data_slices_per_group, fcp->slices_per_group, - &fc->parms); - if (ret < 0) - goto err; - fc->first_stream_chunk = -1; /* stream not yet started */ - fc->src_data = para_malloc(fc->fcp->slices_per_group * sizeof(char *)); - fc->enc_buf = para_calloc(fc->fcp->max_slice_bytes); - fc->num_extra_slices = 0; - fc->extra_src_buf = para_calloc(fc->fcp->max_slice_bytes); - fc->next_header_time.tv_sec = 0; + fc->private_data = private_data; + fc->sender = sender; para_list_add(&fc->node, &fec_client_list); *result = fc; return 1; -err: - fec_free(fc->parms); - free(fc); - *result = NULL; - return ret; } /** @@ -713,6 +700,7 @@ static void vss_pre_select(struct sched *s, struct task *t) list_for_each_entry_safe(fc, tmp, &fec_client_list, node) { fc->first_stream_chunk = -1; fc->error = 0; + fc->fcp = NULL; } mmd->stream_start.tv_sec = 0; mmd->stream_start.tv_usec = 0; @@ -837,6 +825,35 @@ err: mmd->new_vss_status_flags = VSS_NEXT; } +static int open_fec_client(struct fec_client *fc) +{ + int ret; + struct fec_client_parms *fcp; + + ret = fc->sender->open(fc->private_data, &fc->fcp); + if (ret < 0) { + fc->fcp = NULL; + return ret; + } + fcp = fc->fcp; + if (fcp->max_slice_bytes < FEC_HEADER_SIZE + fcp->data_slices_per_group) + return -ERRNO_TO_PARA_ERROR(EINVAL); + ret = fec_new(fcp->data_slices_per_group, fcp->slices_per_group, + &fc->parms); + if (ret < 0) + goto err; + fc->first_stream_chunk = -1; /* stream not yet started */ + fc->src_data = para_malloc(fc->fcp->slices_per_group * sizeof(char *)); + fc->enc_buf = para_calloc(fc->fcp->max_slice_bytes); + fc->num_extra_slices = 0; + fc->extra_src_buf = para_calloc(fc->fcp->max_slice_bytes); + fc->next_header_time.tv_sec = 0; + return 1; +err: + fec_free(fc->parms); + return ret; +} + /** * Main sending function. * @@ -848,7 +865,7 @@ err: */ static void vss_send(struct vss_task *vsst) { - int i, fec_active = 0; + int ret, i, fec_active = 0; struct timeval due; struct fec_client *fc, *tmp_fc; @@ -862,6 +879,13 @@ static void vss_send(struct vss_task *vsst) list_for_each_entry_safe(fc, tmp_fc, &fec_client_list, node) { if (fc->error < 0) continue; + if (!fc->fcp) { + ret = open_fec_client(fc); + if (ret < 0) { + PARA_ERROR_LOG("%s\n", para_strerror(-ret)); + continue; + } + } if (!next_slice_is_due(fc, NULL)) { fec_active = 1; continue; @@ -870,9 +894,9 @@ static void vss_send(struct vss_task *vsst) continue; PARA_DEBUG_LOG("sending %d:%d (%u bytes)\n", fc->group.num, fc->current_slice_num, fc->fcp->max_slice_bytes); - fc->fcp->send((char *)fc->enc_buf, + fc->sender->send_fec((char *)fc->enc_buf, fc->fcp->max_slice_bytes, - fc->fcp->private_data); + fc->private_data); fc->current_slice_num++; fec_active = 1; } diff --git a/vss.h b/vss.h index ed2cd785..3d80274b 100644 --- a/vss.h +++ b/vss.h @@ -24,32 +24,7 @@ const char *supported_audio_formats(void); /** Currently playing. */ #define VSS_PLAYING 8 -/** - * Each paraslash sender may register arbitrary many clients to the virtual - * streaming system, possibly with varying fec parameters. In order to do so, - * it must allocate a \a fec_client_parms structure and pass it to \ref - * add_fec_client. - * - * Clients are automatically removed from that list by the vss if an error - * occurs, or if the sender requests deletion of a client by calling \ref - * vss_del_fec_client(). - */ -struct fec_client; - -/** FEC parameters requested by FEC clients. */ -struct fec_client_parms { - /** Number of data slices plus redundant slices. */ - uint8_t slices_per_group; - /** Number of slices minus number of redundant slices. */ - uint8_t data_slices_per_group; - /** Maximal number of bytes per slice. */ - uint16_t max_slice_bytes; - /** Called by vss.c when the next slice should be sent. */ - int (*send)(char *buf, size_t num_bytes, void *private_data); - /** Passed verbatim to \a send(). */ - void *private_data; -}; - -int vss_add_fec_client(struct fec_client_parms *fcp, struct fec_client **result); +int vss_add_fec_client(struct sender *sender, void *private_data, + struct fec_client **result); void vss_del_fec_client(struct fec_client *fc); size_t vss_get_fec_eof_packet(const char **buf); -- 2.39.2