X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=vss.c;h=2e952eb4c6a1b0202a69ae79e0bab9a153fd0b04;hp=4be6e18a6833c313b40c0453a10fe021b75b3ee5;hb=edeb499676e6d042ef1a913914a9fcb45a8cadde;hpb=f099900540c4e4c54b10181254b895ccfe6ef410 diff --git a/vss.c b/vss.c index 4be6e18a..2e952eb4 100644 --- a/vss.c +++ b/vss.c @@ -131,14 +131,18 @@ struct fec_group { uint8_t num_header_slices; }; +enum fec_client_state { + FEC_STATE_NONE = 0, /**< not initialized and not enabled */ + FEC_STATE_DISABLED, /**< temporarily disabled */ + FEC_STATE_READY_TO_RUN /**< initialized and enabled */ +}; + /** * Describes one connected FEC client. */ struct fec_client { - /** If negative, this client is temporarily disabled. */ - int error; - /** Whether the sender client is ready to push out data. */ - bool ready; + /** Current state of the client */ + enum fec_client_state state; /** The connected sender client (transport layer). */ struct sender_client *sc; /** Parameters requested by the client. */ @@ -165,8 +169,8 @@ struct fec_client { int num_extra_slices; /** Contains the FEC-encoded data. */ unsigned char *enc_buf; - /** Pointer obtained from sender when the client is added. */ - void *private_data; + /** Maximal packet size. */ + int mps; }; /** @@ -205,41 +209,39 @@ static void write_fec_header(struct fec_client *fc, struct vss_task *vsst) write_u32(buf + 14, g->bytes); write_u8(buf + 18, fc->current_slice_num); - write_u16(buf + 20, p->max_slice_bytes - FEC_HEADER_SIZE); + write_u16(buf + 20, fc->mps - FEC_HEADER_SIZE); write_u8(buf + 22, g->first_chunk? 0 : 1); write_u8(buf + 23, vsst->header_len? 1 : 0); memset(buf + 24, 0, 7); } -static int need_audio_header(struct fec_client *fc, struct vss_task *vsst) +static bool need_audio_header(struct fec_client *fc, struct vss_task *vsst) { if (!mmd->current_chunk) { tv_add(now, &vsst->header_interval, &fc->next_header_time); - return 0; + return false; } if (!vsst->header_buf) - return 0; - if (!vsst->header_len) - return 0; + return false; + if (vsst->header_len == 0) + return false; if (fc->group.num && tv_diff(&fc->next_header_time, now, NULL) > 0) - return 0; + return false; tv_add(now, &vsst->header_interval, &fc->next_header_time); - return 1; + return true; } -static int num_slices(long unsigned bytes, struct fec_client *fc, uint8_t *result) +static int num_slices(long unsigned bytes, int mps, int rs) { - unsigned long m = fc->fcp->max_slice_bytes - FEC_HEADER_SIZE; - unsigned rv, redundant_slices = fc->fcp->slices_per_group - - fc->fcp->data_slices_per_group; + int m = mps - FEC_HEADER_SIZE; + int ret; - if (!m) - return -E_BAD_CT; - rv = (bytes + m - 1) / m; - if (rv + redundant_slices > 255) + assert(m > 0); + assert(rs > 0); + ret = DIV_ROUND_UP(bytes, m); + if (ret + rs > 255) return -E_BAD_CT; - *result = rv; - return 1; + return ret; } /* set group start and group duration */ @@ -254,47 +256,83 @@ static void set_group_timing(struct fec_client *fc, struct fec_group *g) tv2ms(&g->duration), tv2ms(chunk_tv), tv2ms(&g->slice_duration)); } +static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst) +{ + int k, n, ret, mps; + int hs, ds, rs; /* header/data/redundant slices */ + struct fec_client_parms *fcp = fc->fcp; + + /* set mps */ + if (fcp->init_fec) { + /* + * Set the maximum slice size to the Maximum Packet Size if the + * transport protocol allows to determine this value. The user + * can specify a slice size up to this value. + */ + ret = fcp->init_fec(fc->sc); + if (ret < 0) + return ret; + mps = ret; + } else + mps = generic_max_transport_msg_size(fc->sc->fd); + if (mps <= FEC_HEADER_SIZE) + return -ERRNO_TO_PARA_ERROR(EINVAL); + + rs = fc->fcp->slices_per_group - fc->fcp->data_slices_per_group; + ret = num_slices(vsst->header_len, mps, rs); + if (ret < 0) + goto err; + hs = ret; + ret = num_slices(mmd->afd.max_chunk_size, mps, rs); + if (ret < 0) + goto err; + ds = ret; + k = ret + ds; + if (k < fc->fcp->data_slices_per_group) + k = fc->fcp->data_slices_per_group; + n = k + rs; + PARA_CRIT_LOG("hs: %d, ds: %d, rs: %d, k: %d, n: %d\n", hs, ds, rs, k, n); + fec_free(fc->parms); + ret = fec_new(k, n, &fc->parms); + if (ret < 0) + return ret; + fc->num_extra_slices = k - fc->fcp->data_slices_per_group; + PARA_NOTICE_LOG("fec parms %d:%d:%d (%d extra slices)\n", + mps, k, n, fc->num_extra_slices); + fc->src_data = para_realloc(fc->src_data, k * sizeof(char *)); + fc->enc_buf = para_realloc(fc->enc_buf, mps); + memset(fc->enc_buf, 0, mps); + fc->extra_src_buf = para_realloc(fc->extra_src_buf, mps); + memset(fc->extra_src_buf, 0, mps); + + fc->mps = mps; + fc->state = FEC_STATE_READY_TO_RUN; + fc->next_header_time.tv_sec = 0; + fc->stream_start = *now; + fc->first_stream_chunk = mmd->current_chunk; + return 1; +err: + fec_free(fc->parms); + return ret; +} + static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst) { - int ret, i, k, data_slices; + int ret, i, k, n, data_slices; size_t len; const char *buf, *start_buf; struct fec_group *g = &fc->group; - unsigned slice_bytes = fc->fcp->max_slice_bytes - FEC_HEADER_SIZE; + unsigned slice_bytes; uint32_t max_data_size; - if (fc->first_stream_chunk < 0) { - uint8_t hs, ds; /* needed header/data slices */ - uint8_t rs = fc->fcp->slices_per_group - - fc->fcp->data_slices_per_group; /* redundant slices */ - int n; - - ret = num_slices(vsst->header_len, fc, &hs); + if (fc->state == FEC_STATE_NONE) { + ret = initialize_fec_client(fc, vsst); if (ret < 0) return ret; - ret = num_slices(afh_get_largest_chunk_size(&mmd->afd.afhi), - fc, &ds); - if (ret < 0) - return ret; - k = (int)hs + ds; - if (k > 255) - return -E_BAD_CT; - if (k < fc->fcp->data_slices_per_group) - k = fc->fcp->data_slices_per_group; - n = k + rs; - fc->num_extra_slices = k - fc->fcp->data_slices_per_group; - PARA_NOTICE_LOG("fec parms %d:%d:%d (%d extra slices)\n", - slice_bytes, k, n, fc->num_extra_slices); - fec_free(fc->parms); - fc->src_data = para_realloc(fc->src_data, k * sizeof(char *)); - ret = fec_new(k, n, &fc->parms); - if (ret < 0) - return ret; - fc->stream_start = *now; - fc->first_stream_chunk = mmd->current_chunk; g->first_chunk = mmd->current_chunk; g->num = 0; g->start = *now; + } else { struct timeval tmp; if (g->first_chunk + g->num_chunks >= mmd->afd.afhi.chunks_total) @@ -305,15 +343,20 @@ static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst) */ tmp = g->start; tv_add(&tmp, &g->duration, &g->start); - k = fc->fcp->data_slices_per_group + fc->num_extra_slices; set_group_timing(fc, g); g->first_chunk += g->num_chunks; g->num++; } + slice_bytes = fc->mps - FEC_HEADER_SIZE; + PARA_CRIT_LOG("slice_bytes: %d\n", slice_bytes); + k = fc->fcp->data_slices_per_group + fc->num_extra_slices; + n = fc->fcp->slices_per_group + fc->num_extra_slices; + PARA_CRIT_LOG("k: %d, n: %d\n", k, n); if (need_audio_header(fc, vsst)) { - ret = num_slices(vsst->header_len, fc, &g->num_header_slices); + ret = num_slices(vsst->header_len, slice_bytes, n - k); if (ret < 0) return ret; + g->num_header_slices = ret; } else g->num_header_slices = 0; afh_get_chunk(g->first_chunk, &mmd->afd.afhi, vsst->map, &start_buf, @@ -375,8 +418,7 @@ static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst) static int compute_next_fec_slice(struct fec_client *fc, struct vss_task *vsst) { - assert(fc->error >= 0); - if (fc->first_stream_chunk < 0 || fc->current_slice_num + if (fc->state == FEC_STATE_NONE || fc->current_slice_num == fc->fcp->slices_per_group + fc->num_extra_slices) { int ret = setup_next_fec_group(fc, vsst); if (ret == 0) @@ -384,14 +426,13 @@ static int compute_next_fec_slice(struct fec_client *fc, struct vss_task *vsst) if (ret < 0) { PARA_ERROR_LOG("%s\n", para_strerror(-ret)); PARA_ERROR_LOG("FEC client temporarily disabled\n"); - fc->error = ret; - return fc->error; + fc->state = FEC_STATE_DISABLED; + return ret; } } write_fec_header(fc, vsst); fec_encode(fc->parms, fc->src_data, fc->enc_buf + FEC_HEADER_SIZE, - fc->current_slice_num, - fc->fcp->max_slice_bytes - FEC_HEADER_SIZE); + fc->current_slice_num, fc->mps - FEC_HEADER_SIZE); return 1; } @@ -457,7 +498,7 @@ static int next_slice_is_due(struct fec_client *fc, struct timeval *diff) struct timeval tmp, next; int ret; - if (fc->first_stream_chunk < 0) + if (fc->state == FEC_STATE_NONE) return 1; tv_scale(fc->current_slice_num, &fc->group.slice_duration, &tmp); tv_add(&tmp, &fc->group.start, &next); @@ -469,11 +510,10 @@ static void compute_slice_timeout(struct timeval *timeout) { struct fec_client *fc; - assert(vss_playing()); list_for_each_entry(fc, &fec_client_list, node) { struct timeval diff; - if (fc->error < 0) + if (fc->state != FEC_STATE_READY_TO_RUN) continue; if (next_slice_is_due(fc, &diff)) { timeout->tv_sec = 0; @@ -496,7 +536,7 @@ static void set_eof_barrier(struct vss_task *vsst) list_for_each_entry(fc, &fec_client_list, node) { struct timeval group_duration; - if (fc->error < 0) + if (fc->state != FEC_STATE_READY_TO_RUN) continue; tv_scale(fc->group.num_chunks, chunk_tv, &group_duration); if (tv_diff(&timeout, &group_duration, NULL) < 0) @@ -636,18 +676,6 @@ static void vss_eof(struct vss_task *vsst) mmd->events++; } -/** - * Get the list of all supported audio formats. - * - * \return Aa space separated list of all supported audio formats - * It is not allocated at runtime, i.e. there is no need to free - * the returned string in the caller. - */ -const char *supported_audio_formats(void) -{ - return SUPPORTED_AUDIO_FORMATS; -} - static int need_to_request_new_audio_file(struct vss_task *vsst) { struct timeval diff; @@ -690,7 +718,7 @@ static void set_mmd_offset(void) static void vss_pre_select(struct sched *s, struct task *t) { int i; - struct timeval *tv, diff; + struct timeval *tv; struct vss_task *vsst = container_of(t, struct vss_task, task); if (!vsst->map || vss_next() || vss_paused() || vss_repos()) { @@ -698,10 +726,8 @@ static void vss_pre_select(struct sched *s, struct task *t) for (i = 0; senders[i].name; i++) if (senders[i].shutdown_clients) senders[i].shutdown_clients(); - list_for_each_entry_safe(fc, tmp, &fec_client_list, node) { - fc->first_stream_chunk = -1; - fc->error = 0; - } + list_for_each_entry_safe(fc, tmp, &fec_client_list, node) + fc->state = FEC_STATE_NONE; mmd->stream_start.tv_sec = 0; mmd->stream_start.tv_usec = 0; } @@ -731,8 +757,8 @@ static void vss_pre_select(struct sched *s, struct task *t) senders[i].pre_select(&s->max_fileno, &s->rfds, &s->wfds); } tv = vss_compute_timeout(vsst); - if (tv && tv_diff(tv, &s->timeout, &diff) < 0) - s->timeout = *tv; + if (tv) + sched_request_timeout(tv, s); } static int recv_afs_msg(int afs_socket, int *fd, uint32_t *code, uint32_t *data) @@ -825,42 +851,6 @@ err: mmd->new_vss_status_flags = VSS_NEXT; } -static int initialize_fec_client(struct fec_client *fc) -{ - int ret; - struct fec_client_parms *fcp = fc->fcp; - - if (fcp->init_fec) { - /* - * Set the maximum slice size to the Maximum Packet Size if the - * transport protocol allows to determine this value. The user - * can specify a slice size up to this value. - */ - ret = fcp->init_fec(fc->sc); - if (ret < 0) - return ret; - if (!fcp->max_slice_bytes || fcp->max_slice_bytes > ret) - fcp->max_slice_bytes = ret; - } - if (fcp->max_slice_bytes < FEC_HEADER_SIZE + fcp->data_slices_per_group) - return -ERRNO_TO_PARA_ERROR(EINVAL); - ret = fec_new(fcp->data_slices_per_group, fcp->slices_per_group, - &fc->parms); - if (ret < 0) - goto err; - fc->first_stream_chunk = -1; /* stream not yet started */ - fc->src_data = para_malloc(fc->fcp->slices_per_group * sizeof(char *)); - fc->enc_buf = para_calloc(fc->fcp->max_slice_bytes); - fc->num_extra_slices = 0; - fc->extra_src_buf = para_calloc(fc->fcp->max_slice_bytes); - fc->next_header_time.tv_sec = 0; - fc->ready = true; - return 1; -err: - fec_free(fc->parms); - return ret; -} - /** * Main sending function. * @@ -872,7 +862,7 @@ err: */ static void vss_send(struct vss_task *vsst) { - int ret, i, fec_active = 0; + int i, fec_active = 0; struct timeval due; struct fec_client *fc, *tmp_fc; @@ -884,15 +874,8 @@ static void vss_send(struct vss_task *vsst) &due, 1) < 0) return; list_for_each_entry_safe(fc, tmp_fc, &fec_client_list, node) { - if (fc->error < 0) + if (fc->state == FEC_STATE_DISABLED) continue; - if (!fc->ready) { - ret = initialize_fec_client(fc); - if (ret < 0) { - PARA_ERROR_LOG("%s\n", para_strerror(-ret)); - continue; - } - } if (!next_slice_is_due(fc, NULL)) { fec_active = 1; continue; @@ -900,9 +883,8 @@ static void vss_send(struct vss_task *vsst) if (compute_next_fec_slice(fc, vsst) <= 0) continue; PARA_DEBUG_LOG("sending %d:%d (%u bytes)\n", fc->group.num, - fc->current_slice_num, fc->fcp->max_slice_bytes); - fc->fcp->send_fec(fc->sc, (char *)fc->enc_buf, - fc->fcp->max_slice_bytes); + fc->current_slice_num, fc->mps); + fc->fcp->send_fec(fc->sc, (char *)fc->enc_buf, fc->mps); fc->current_slice_num++; fec_active = 1; } @@ -950,8 +932,12 @@ static void vss_post_select(struct sched *s, struct task *t) int num = mmd->sender_cmd_data.cmd_num, sender_num = mmd->sender_cmd_data.sender_num; - if (senders[sender_num].client_cmds[num]) - senders[sender_num].client_cmds[num](&mmd->sender_cmd_data); + if (senders[sender_num].client_cmds[num]) { + ret = senders[sender_num].client_cmds[num] + (&mmd->sender_cmd_data); + if (ret < 0) + PARA_ERROR_LOG("%s\n", para_strerror(-ret)); + } mmd->sender_cmd_data.cmd_num = -1; } if (vsst->afsss != AFS_SOCKET_CHECK_FOR_WRITE)