X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=vss.c;h=adc0cb63cea1204d3bfef811c492b807b74001c2;hp=898180c013a4845481bbfd48782fa9168f8e9048;hb=70d1779ed65ed4fb71680743aba2e6ca8aebb43c;hpb=840725e10602abd8187428924040f4bf3e04594c diff --git a/vss.c b/vss.c index 898180c0..adc0cb63 100644 --- a/vss.c +++ b/vss.c @@ -26,8 +26,8 @@ #include "net.h" #include "server.cmdline.h" #include "list.h" -#include "vss.h" #include "send.h" +#include "vss.h" #include "ipc.h" #include "fd.h" #include "sched.h" @@ -131,12 +131,20 @@ struct fec_group { uint8_t num_header_slices; }; +enum fec_client_state { + FEC_STATE_NONE = 0, /**< not initialized and not enabled */ + FEC_STATE_DISABLED, /**< temporarily disabled */ + FEC_STATE_READY_TO_RUN /**< initialized and enabled */ +}; + /** * Describes one connected FEC client. */ struct fec_client { - /** If negative, this client is temporarily disabled. */ - int error; + /** Current state of the client */ + enum fec_client_state state; + /** The connected sender client (transport layer). */ + struct sender_client *sc; /** Parameters requested by the client. */ struct fec_client_parms *fcp; /** Used by the core FEC code. */ @@ -369,7 +377,6 @@ static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst) static int compute_next_fec_slice(struct fec_client *fc, struct vss_task *vsst) { - assert(fc->error >= 0); if (fc->first_stream_chunk < 0 || fc->current_slice_num == fc->fcp->slices_per_group + fc->num_extra_slices) { int ret = setup_next_fec_group(fc, vsst); @@ -378,8 +385,8 @@ static int compute_next_fec_slice(struct fec_client *fc, struct vss_task *vsst) if (ret < 0) { PARA_ERROR_LOG("%s\n", para_strerror(-ret)); PARA_ERROR_LOG("FEC client temporarily disabled\n"); - fc->error = ret; - return fc->error; + fc->state = FEC_STATE_DISABLED; + return ret; } } write_fec_header(fc, vsst); @@ -408,38 +415,20 @@ size_t vss_get_fec_eof_packet(const char **buf) /** * Add one entry to the list of active fec clients. * - * \param fcp Describes the fec parameters to be used for this client. - * \param result An opaque pointer that must be used by remove the client later. + * \param sc Generic sender_client data of the transport layer. + * \param fcp FEC parameters as supplied by the transport layer. * - * \return Standard. + * \return Newly allocated fec_client struct. */ -int vss_add_fec_client(struct fec_client_parms *fcp, struct fec_client **result) +struct fec_client *vss_add_fec_client(struct sender_client *sc, + struct fec_client_parms *fcp) { - int ret; - struct fec_client *fc; + struct fec_client *fc = para_calloc(sizeof(*fc)); - if (fcp->max_slice_bytes < FEC_HEADER_SIZE + fcp->data_slices_per_group) - return -ERRNO_TO_PARA_ERROR(EINVAL); - fc = para_calloc(sizeof(*fc)); + fc->sc = sc; fc->fcp = fcp; - ret = fec_new(fcp->data_slices_per_group, fcp->slices_per_group, - &fc->parms); - if (ret < 0) - goto err; - fc->first_stream_chunk = -1; /* stream not yet started */ - fc->src_data = para_malloc(fc->fcp->slices_per_group * sizeof(char *)); - fc->enc_buf = para_calloc(fc->fcp->max_slice_bytes); - fc->num_extra_slices = 0; - fc->extra_src_buf = para_calloc(fc->fcp->max_slice_bytes); - fc->next_header_time.tv_sec = 0; para_list_add(&fc->node, &fec_client_list); - *result = fc; - return 1; -err: - fec_free(fc->parms); - free(fc); - *result = NULL; - return ret; + return fc; } /** @@ -481,11 +470,10 @@ static void compute_slice_timeout(struct timeval *timeout) { struct fec_client *fc; - assert(vss_playing()); list_for_each_entry(fc, &fec_client_list, node) { struct timeval diff; - if (fc->error < 0) + if (fc->state != FEC_STATE_READY_TO_RUN) continue; if (next_slice_is_due(fc, &diff)) { timeout->tv_sec = 0; @@ -508,7 +496,7 @@ static void set_eof_barrier(struct vss_task *vsst) list_for_each_entry(fc, &fec_client_list, node) { struct timeval group_duration; - if (fc->error < 0) + if (fc->state != FEC_STATE_READY_TO_RUN) continue; tv_scale(fc->group.num_chunks, chunk_tv, &group_duration); if (tv_diff(&timeout, &group_duration, NULL) < 0) @@ -712,7 +700,7 @@ static void vss_pre_select(struct sched *s, struct task *t) senders[i].shutdown_clients(); list_for_each_entry_safe(fc, tmp, &fec_client_list, node) { fc->first_stream_chunk = -1; - fc->error = 0; + fc->state = FEC_STATE_NONE; } mmd->stream_start.tv_sec = 0; mmd->stream_start.tv_usec = 0; @@ -837,6 +825,42 @@ err: mmd->new_vss_status_flags = VSS_NEXT; } +static int initialize_fec_client(struct fec_client *fc) +{ + int ret; + struct fec_client_parms *fcp = fc->fcp; + + if (fcp->init_fec) { + /* + * Set the maximum slice size to the Maximum Packet Size if the + * transport protocol allows to determine this value. The user + * can specify a slice size up to this value. + */ + ret = fcp->init_fec(fc->sc); + if (ret < 0) + return ret; + if (!fcp->max_slice_bytes || fcp->max_slice_bytes > ret) + fcp->max_slice_bytes = ret; + } + if (fcp->max_slice_bytes < FEC_HEADER_SIZE + fcp->data_slices_per_group) + return -ERRNO_TO_PARA_ERROR(EINVAL); + ret = fec_new(fcp->data_slices_per_group, fcp->slices_per_group, + &fc->parms); + if (ret < 0) + goto err; + fc->first_stream_chunk = -1; /* stream not yet started */ + fc->src_data = para_malloc(fc->fcp->slices_per_group * sizeof(char *)); + fc->enc_buf = para_calloc(fc->fcp->max_slice_bytes); + fc->num_extra_slices = 0; + fc->extra_src_buf = para_calloc(fc->fcp->max_slice_bytes); + fc->next_header_time.tv_sec = 0; + fc->state = FEC_STATE_READY_TO_RUN; + return 1; +err: + fec_free(fc->parms); + return ret; +} + /** * Main sending function. * @@ -848,7 +872,7 @@ err: */ static void vss_send(struct vss_task *vsst) { - int i, fec_active = 0; + int ret, i, fec_active = 0; struct timeval due; struct fec_client *fc, *tmp_fc; @@ -860,8 +884,19 @@ static void vss_send(struct vss_task *vsst) &due, 1) < 0) return; list_for_each_entry_safe(fc, tmp_fc, &fec_client_list, node) { - if (fc->error < 0) + switch (fc->state) { + case FEC_STATE_DISABLED: continue; + case FEC_STATE_NONE: + ret = initialize_fec_client(fc); + if (ret < 0) { + PARA_ERROR_LOG("%s\n", para_strerror(-ret)); + continue; + } + /* fall through */ + case FEC_STATE_READY_TO_RUN: + break; + } if (!next_slice_is_due(fc, NULL)) { fec_active = 1; continue; @@ -870,9 +905,8 @@ static void vss_send(struct vss_task *vsst) continue; PARA_DEBUG_LOG("sending %d:%d (%u bytes)\n", fc->group.num, fc->current_slice_num, fc->fcp->max_slice_bytes); - fc->fcp->send((char *)fc->enc_buf, - fc->fcp->max_slice_bytes, - fc->fcp->private_data); + fc->fcp->send_fec(fc->sc, (char *)fc->enc_buf, + fc->fcp->max_slice_bytes); fc->current_slice_num++; fec_active = 1; } @@ -920,8 +954,12 @@ static void vss_post_select(struct sched *s, struct task *t) int num = mmd->sender_cmd_data.cmd_num, sender_num = mmd->sender_cmd_data.sender_num; - if (senders[sender_num].client_cmds[num]) - senders[sender_num].client_cmds[num](&mmd->sender_cmd_data); + if (senders[sender_num].client_cmds[num]) { + ret = senders[sender_num].client_cmds[num] + (&mmd->sender_cmd_data); + if (ret < 0) + PARA_ERROR_LOG("%s\n", para_strerror(-ret)); + } mmd->sender_cmd_data.cmd_num = -1; } if (vsst->afsss != AFS_SOCKET_CHECK_FOR_WRITE)