X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=vss.c;h=adc0cb63cea1204d3bfef811c492b807b74001c2;hp=4be6e18a6833c313b40c0453a10fe021b75b3ee5;hb=645175864a613fc539f46ffa4028742aad3b1b30;hpb=f099900540c4e4c54b10181254b895ccfe6ef410 diff --git a/vss.c b/vss.c index 4be6e18a..adc0cb63 100644 --- a/vss.c +++ b/vss.c @@ -131,14 +131,18 @@ struct fec_group { uint8_t num_header_slices; }; +enum fec_client_state { + FEC_STATE_NONE = 0, /**< not initialized and not enabled */ + FEC_STATE_DISABLED, /**< temporarily disabled */ + FEC_STATE_READY_TO_RUN /**< initialized and enabled */ +}; + /** * Describes one connected FEC client. */ struct fec_client { - /** If negative, this client is temporarily disabled. */ - int error; - /** Whether the sender client is ready to push out data. */ - bool ready; + /** Current state of the client */ + enum fec_client_state state; /** The connected sender client (transport layer). */ struct sender_client *sc; /** Parameters requested by the client. */ @@ -165,8 +169,6 @@ struct fec_client { int num_extra_slices; /** Contains the FEC-encoded data. */ unsigned char *enc_buf; - /** Pointer obtained from sender when the client is added. */ - void *private_data; }; /** @@ -375,7 +377,6 @@ static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst) static int compute_next_fec_slice(struct fec_client *fc, struct vss_task *vsst) { - assert(fc->error >= 0); if (fc->first_stream_chunk < 0 || fc->current_slice_num == fc->fcp->slices_per_group + fc->num_extra_slices) { int ret = setup_next_fec_group(fc, vsst); @@ -384,8 +385,8 @@ static int compute_next_fec_slice(struct fec_client *fc, struct vss_task *vsst) if (ret < 0) { PARA_ERROR_LOG("%s\n", para_strerror(-ret)); PARA_ERROR_LOG("FEC client temporarily disabled\n"); - fc->error = ret; - return fc->error; + fc->state = FEC_STATE_DISABLED; + return ret; } } write_fec_header(fc, vsst); @@ -469,11 +470,10 @@ static void compute_slice_timeout(struct timeval *timeout) { struct fec_client *fc; - assert(vss_playing()); list_for_each_entry(fc, &fec_client_list, node) { struct timeval diff; - if (fc->error < 0) + if (fc->state != FEC_STATE_READY_TO_RUN) continue; if (next_slice_is_due(fc, &diff)) { timeout->tv_sec = 0; @@ -496,7 +496,7 @@ static void set_eof_barrier(struct vss_task *vsst) list_for_each_entry(fc, &fec_client_list, node) { struct timeval group_duration; - if (fc->error < 0) + if (fc->state != FEC_STATE_READY_TO_RUN) continue; tv_scale(fc->group.num_chunks, chunk_tv, &group_duration); if (tv_diff(&timeout, &group_duration, NULL) < 0) @@ -700,7 +700,7 @@ static void vss_pre_select(struct sched *s, struct task *t) senders[i].shutdown_clients(); list_for_each_entry_safe(fc, tmp, &fec_client_list, node) { fc->first_stream_chunk = -1; - fc->error = 0; + fc->state = FEC_STATE_NONE; } mmd->stream_start.tv_sec = 0; mmd->stream_start.tv_usec = 0; @@ -854,7 +854,7 @@ static int initialize_fec_client(struct fec_client *fc) fc->num_extra_slices = 0; fc->extra_src_buf = para_calloc(fc->fcp->max_slice_bytes); fc->next_header_time.tv_sec = 0; - fc->ready = true; + fc->state = FEC_STATE_READY_TO_RUN; return 1; err: fec_free(fc->parms); @@ -884,14 +884,18 @@ static void vss_send(struct vss_task *vsst) &due, 1) < 0) return; list_for_each_entry_safe(fc, tmp_fc, &fec_client_list, node) { - if (fc->error < 0) + switch (fc->state) { + case FEC_STATE_DISABLED: continue; - if (!fc->ready) { + case FEC_STATE_NONE: ret = initialize_fec_client(fc); if (ret < 0) { PARA_ERROR_LOG("%s\n", para_strerror(-ret)); continue; } + /* fall through */ + case FEC_STATE_READY_TO_RUN: + break; } if (!next_slice_is_due(fc, NULL)) { fec_active = 1; @@ -950,8 +954,12 @@ static void vss_post_select(struct sched *s, struct task *t) int num = mmd->sender_cmd_data.cmd_num, sender_num = mmd->sender_cmd_data.sender_num; - if (senders[sender_num].client_cmds[num]) - senders[sender_num].client_cmds[num](&mmd->sender_cmd_data); + if (senders[sender_num].client_cmds[num]) { + ret = senders[sender_num].client_cmds[num] + (&mmd->sender_cmd_data); + if (ret < 0) + PARA_ERROR_LOG("%s\n", para_strerror(-ret)); + } mmd->sender_cmd_data.cmd_num = -1; } if (vsst->afsss != AFS_SOCKET_CHECK_FOR_WRITE)