X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=vss.c;h=4be6e18a6833c313b40c0453a10fe021b75b3ee5;hp=40a18e5cdf475ac312e8581508d10de20881b82d;hb=f099900540c4e4c54b10181254b895ccfe6ef410;hpb=395aeb9da9c9cc2febe91b5a906c72e59e217594 diff --git a/vss.c b/vss.c index 40a18e5c..4be6e18a 100644 --- a/vss.c +++ b/vss.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 1997-2009 Andre Noll + * Copyright (C) 1997-2010 Andre Noll * * Licensed under the GPL v2. For licencing details see COPYING. */ @@ -26,8 +26,8 @@ #include "net.h" #include "server.cmdline.h" #include "list.h" -#include "vss.h" #include "send.h" +#include "vss.h" #include "ipc.h" #include "fd.h" #include "sched.h" @@ -123,6 +123,8 @@ struct fec_group { uint32_t num_chunks; /** When the first chunk was sent. */ struct timeval start; + /** The duration of the full group. */ + struct timeval duration; /** The group duration divided by the number of slices. */ struct timeval slice_duration; /** Group contains the audio file header that occupies that many slices. */ @@ -135,6 +137,10 @@ struct fec_group { struct fec_client { /** If negative, this client is temporarily disabled. */ int error; + /** Whether the sender client is ready to push out data. */ + bool ready; + /** The connected sender client (transport layer). */ + struct sender_client *sc; /** Parameters requested by the client. */ struct fec_client_parms *fcp; /** Used by the core FEC code. */ @@ -159,6 +165,8 @@ struct fec_client { int num_extra_slices; /** Contains the FEC-encoded data. */ unsigned char *enc_buf; + /** Pointer obtained from sender when the client is added. */ + void *private_data; }; /** @@ -234,15 +242,16 @@ static int num_slices(long unsigned bytes, struct fec_client *fc, uint8_t *resul return 1; } -static void set_slice_duration(struct fec_client *fc, struct fec_group *g) +/* set group start and group duration */ +static void set_group_timing(struct fec_client *fc, struct fec_group *g) { - struct timeval group_duration, *chunk_tv = vss_chunk_time(); + struct timeval *chunk_tv = vss_chunk_time(); - tv_scale(g->num_chunks, chunk_tv, &group_duration); + tv_scale(g->num_chunks, chunk_tv, &g->duration); tv_divide(fc->fcp->slices_per_group + fc->num_extra_slices, - &group_duration, &g->slice_duration); + &g->duration, &g->slice_duration); PARA_DEBUG_LOG("durations (group/chunk/slice): %lu/%lu/%lu\n", - tv2ms(&group_duration), tv2ms(chunk_tv), tv2ms(&g->slice_duration)); + tv2ms(&g->duration), tv2ms(chunk_tv), tv2ms(&g->slice_duration)); } static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst) @@ -250,51 +259,57 @@ static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst) int ret, i, k, data_slices; size_t len; const char *buf, *start_buf; - struct timeval tmp, *chunk_tv = vss_chunk_time(); struct fec_group *g = &fc->group; unsigned slice_bytes = fc->fcp->max_slice_bytes - FEC_HEADER_SIZE; uint32_t max_data_size; - assert(chunk_tv); - k = fc->fcp->data_slices_per_group + fc->num_extra_slices; if (fc->first_stream_chunk < 0) { - uint32_t largest = afh_get_largest_chunk_size(&mmd->afd.afhi) - + vsst->header_len; - uint8_t needed, want; + uint8_t hs, ds; /* needed header/data slices */ + uint8_t rs = fc->fcp->slices_per_group + - fc->fcp->data_slices_per_group; /* redundant slices */ + int n; - ret = num_slices(largest, fc, &needed); + ret = num_slices(vsst->header_len, fc, &hs); + if (ret < 0) + return ret; + ret = num_slices(afh_get_largest_chunk_size(&mmd->afd.afhi), + fc, &ds); + if (ret < 0) + return ret; + k = (int)hs + ds; + if (k > 255) + return -E_BAD_CT; + if (k < fc->fcp->data_slices_per_group) + k = fc->fcp->data_slices_per_group; + n = k + rs; + fc->num_extra_slices = k - fc->fcp->data_slices_per_group; + PARA_NOTICE_LOG("fec parms %d:%d:%d (%d extra slices)\n", + slice_bytes, k, n, fc->num_extra_slices); + fec_free(fc->parms); + fc->src_data = para_realloc(fc->src_data, k * sizeof(char *)); + ret = fec_new(k, n, &fc->parms); if (ret < 0) return ret; - if (needed > fc->fcp->data_slices_per_group) - PARA_WARNING_LOG("fec parms insufficient for this audio file\n"); - want = PARA_MAX(needed, fc->fcp->data_slices_per_group); - if (want != k) { - fec_free(fc->parms); - fc->src_data = para_realloc(fc->src_data, want * sizeof(char *)); - ret = fec_new(want, want + fc->fcp->slices_per_group - - fc->fcp->data_slices_per_group, &fc->parms); - if (ret < 0) - return ret; - k = want; - fc->num_extra_slices = 0; - if (k > fc->fcp->data_slices_per_group) { - fc->num_extra_slices = k - fc->fcp->data_slices_per_group; - PARA_NOTICE_LOG("using %d extra slices\n", - fc->num_extra_slices); - } - } fc->stream_start = *now; fc->first_stream_chunk = mmd->current_chunk; g->first_chunk = mmd->current_chunk; g->num = 0; + g->start = *now; } else { - /* use duration of the previous group for the timing of this group */ - set_slice_duration(fc, g); + struct timeval tmp; + if (g->first_chunk + g->num_chunks >= mmd->afd.afhi.chunks_total) + return 0; + /* + * Start and duration of this group depend only on the previous + * group. Compute the new group start as g->start += g->duration. + */ + tmp = g->start; + tv_add(&tmp, &g->duration, &g->start); + k = fc->fcp->data_slices_per_group + fc->num_extra_slices; + set_group_timing(fc, g); g->first_chunk += g->num_chunks; g->num++; } - if (g->first_chunk >= mmd->afd.afhi.chunks_total) - return 0; if (need_audio_header(fc, vsst)) { ret = num_slices(vsst->header_len, fc, &g->num_header_slices); if (ret < 0) @@ -317,7 +332,7 @@ static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst) assert(g->num_chunks); fc->current_slice_num = 0; if (g->num == 0) - set_slice_duration(fc, g); + set_group_timing(fc, g); /* setup header slices */ buf = vsst->header_buf; @@ -355,14 +370,6 @@ static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst) g->first_chunk + g->num_chunks - 1, g->num_header_slices, data_slices ); - /* set group start */ - if (g->num != 0 && vsst->header_len != 0 && fc->first_stream_chunk == 0) - /* chunk #0 is the audio file header */ - tv_scale(g->first_chunk - 1, chunk_tv, &tmp); - else - tv_scale(g->first_chunk - fc->first_stream_chunk, - chunk_tv, &tmp); - tv_add(&fc->stream_start, &tmp, &g->start); return 1; } @@ -407,38 +414,20 @@ size_t vss_get_fec_eof_packet(const char **buf) /** * Add one entry to the list of active fec clients. * - * \param fcp Describes the fec parameters to be used for this client. - * \param result An opaque pointer that must be used by remove the client later. + * \param sc Generic sender_client data of the transport layer. + * \param fcp FEC parameters as supplied by the transport layer. * - * \return Standard. + * \return Newly allocated fec_client struct. */ -int vss_add_fec_client(struct fec_client_parms *fcp, struct fec_client **result) +struct fec_client *vss_add_fec_client(struct sender_client *sc, + struct fec_client_parms *fcp) { - int ret; - struct fec_client *fc; + struct fec_client *fc = para_calloc(sizeof(*fc)); - if (fcp->max_slice_bytes < FEC_HEADER_SIZE + fcp->data_slices_per_group) - return -ERRNO_TO_PARA_ERROR(EINVAL); - fc = para_calloc(sizeof(*fc)); + fc->sc = sc; fc->fcp = fcp; - ret = fec_new(fcp->data_slices_per_group, fcp->slices_per_group, - &fc->parms); - if (ret < 0) - goto err; - fc->first_stream_chunk = -1; /* stream not yet started */ - fc->src_data = para_malloc(fc->fcp->slices_per_group * sizeof(char *)); - fc->enc_buf = para_calloc(fc->fcp->max_slice_bytes); - fc->num_extra_slices = 0; - fc->extra_src_buf = para_calloc(fc->fcp->max_slice_bytes); - fc->next_header_time.tv_sec = 0; para_list_add(&fc->node, &fec_client_list); - *result = fc; - return 1; -err: - fec_free(fc->parms); - free(fc); - *result = NULL; - return ret; + return fc; } /** @@ -636,7 +625,7 @@ static void vss_eof(struct vss_task *vsst) para_munmap(vsst->map, mmd->size); vsst->map = NULL; mmd->chunks_sent = 0; - mmd->offset = 0; + //mmd->offset = 0; mmd->afd.afhi.seconds_total = 0; mmd->afd.afhi.chunk_tv.tv_sec = 0; mmd->afd.afhi.chunk_tv.tv_usec = 0; @@ -677,6 +666,13 @@ static int need_to_request_new_audio_file(struct vss_task *vsst) return 1; } +static void set_mmd_offset(void) +{ + struct timeval offset; + tv_scale(mmd->current_chunk, &mmd->afd.afhi.chunk_tv, &offset); + mmd->offset = tv2ms(&offset); +} + /** * Compute the timeout for the main select-loop of the scheduler. * @@ -721,6 +717,7 @@ static void vss_pre_select(struct sched *s, struct task *t) mmd->chunks_sent = 0; mmd->current_chunk = mmd->repos_request; mmd->new_vss_status_flags &= ~VSS_REPOS; + set_mmd_offset(); } if (need_to_request_new_audio_file(vsst)) { PARA_DEBUG_LOG("ready and playing, but no audio file\n"); @@ -772,16 +769,20 @@ static int recv_afs_msg(int afs_socket, int *fd, uint32_t *code, uint32_t *data) return 1; } -static void recv_afs_result(struct vss_task *vsst) +static void recv_afs_result(struct vss_task *vsst, fd_set *rfds) { int ret, passed_fd, shmid; uint32_t afs_code = 0, afs_data = 0; struct stat statbuf; - vsst->afsss = AFS_SOCKET_READY; + if (!FD_ISSET(vsst->afs_socket, rfds)) + return; ret = recv_afs_msg(vsst->afs_socket, &passed_fd, &afs_code, &afs_data); + if (ret == -ERRNO_TO_PARA_ERROR(EAGAIN)) + return; if (ret < 0) goto err; + vsst->afsss = AFS_SOCKET_READY; PARA_DEBUG_LOG("fd: %d, code: %u, shmid: %u\n", passed_fd, afs_code, afs_data); ret = -E_NOFD; @@ -824,6 +825,42 @@ err: mmd->new_vss_status_flags = VSS_NEXT; } +static int initialize_fec_client(struct fec_client *fc) +{ + int ret; + struct fec_client_parms *fcp = fc->fcp; + + if (fcp->init_fec) { + /* + * Set the maximum slice size to the Maximum Packet Size if the + * transport protocol allows to determine this value. The user + * can specify a slice size up to this value. + */ + ret = fcp->init_fec(fc->sc); + if (ret < 0) + return ret; + if (!fcp->max_slice_bytes || fcp->max_slice_bytes > ret) + fcp->max_slice_bytes = ret; + } + if (fcp->max_slice_bytes < FEC_HEADER_SIZE + fcp->data_slices_per_group) + return -ERRNO_TO_PARA_ERROR(EINVAL); + ret = fec_new(fcp->data_slices_per_group, fcp->slices_per_group, + &fc->parms); + if (ret < 0) + goto err; + fc->first_stream_chunk = -1; /* stream not yet started */ + fc->src_data = para_malloc(fc->fcp->slices_per_group * sizeof(char *)); + fc->enc_buf = para_calloc(fc->fcp->max_slice_bytes); + fc->num_extra_slices = 0; + fc->extra_src_buf = para_calloc(fc->fcp->max_slice_bytes); + fc->next_header_time.tv_sec = 0; + fc->ready = true; + return 1; +err: + fec_free(fc->parms); + return ret; +} + /** * Main sending function. * @@ -835,7 +872,7 @@ err: */ static void vss_send(struct vss_task *vsst) { - int i, fec_active = 0; + int ret, i, fec_active = 0; struct timeval due; struct fec_client *fc, *tmp_fc; @@ -849,6 +886,13 @@ static void vss_send(struct vss_task *vsst) list_for_each_entry_safe(fc, tmp_fc, &fec_client_list, node) { if (fc->error < 0) continue; + if (!fc->ready) { + ret = initialize_fec_client(fc); + if (ret < 0) { + PARA_ERROR_LOG("%s\n", para_strerror(-ret)); + continue; + } + } if (!next_slice_is_due(fc, NULL)) { fec_active = 1; continue; @@ -857,9 +901,8 @@ static void vss_send(struct vss_task *vsst) continue; PARA_DEBUG_LOG("sending %d:%d (%u bytes)\n", fc->group.num, fc->current_slice_num, fc->fcp->max_slice_bytes); - fc->fcp->send((char *)fc->enc_buf, - fc->fcp->max_slice_bytes, - fc->fcp->private_data); + fc->fcp->send_fec(fc->sc, (char *)fc->enc_buf, + fc->fcp->max_slice_bytes); fc->current_slice_num++; fec_active = 1; } @@ -875,11 +918,9 @@ static void vss_send(struct vss_task *vsst) size_t len; if (!mmd->chunks_sent) { - struct timeval tmp; mmd->stream_start = *now; - tv_scale(mmd->current_chunk, &mmd->afd.afhi.chunk_tv, &tmp); - mmd->offset = tv2ms(&tmp); mmd->events++; + set_mmd_offset(); } /* * We call the send function also in case of empty chunks as @@ -913,10 +954,9 @@ static void vss_post_select(struct sched *s, struct task *t) senders[sender_num].client_cmds[num](&mmd->sender_cmd_data); mmd->sender_cmd_data.cmd_num = -1; } - if (vsst->afsss != AFS_SOCKET_CHECK_FOR_WRITE) { - if (FD_ISSET(vsst->afs_socket, &s->rfds)) - recv_afs_result(vsst); - } else if (FD_ISSET(vsst->afs_socket, &s->wfds)) { + if (vsst->afsss != AFS_SOCKET_CHECK_FOR_WRITE) + recv_afs_result(vsst, &s->rfds); + else if (FD_ISSET(vsst->afs_socket, &s->wfds)) { PARA_NOTICE_LOG("requesting new fd from afs\n"); ret = send_buffer(vsst->afs_socket, "new"); if (ret < 0)