X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=vss.c;h=023b2c1d3cc8fcc0dcf23f0b5d2fc5c43487012f;hp=f6ec83934d971d3f6de50c363e9c15bb7c94866c;hb=HEAD;hpb=a3a97518290b19c08667db167bdddba2f94d4b24 diff --git a/vss.c b/vss.c index f6ec8393..cd55851c 100644 --- a/vss.c +++ b/vss.c @@ -28,8 +28,8 @@ #include "net.h" #include "server.h" #include "list.h" -#include "send.h" #include "sched.h" +#include "send.h" #include "vss.h" #include "ipc.h" #include "fd.h" @@ -43,7 +43,7 @@ const struct sender * const senders[] = { enum afs_socket_status { /** Socket is inactive. */ AFS_SOCKET_READY, - /** Socket fd was included in the write fd set for select(). */ + /** Socket fd was monitored for writing. */ AFS_SOCKET_CHECK_FOR_WRITE, /** vss wrote a request to the socket and waits for reply from afs. */ AFS_SOCKET_AFD_PENDING @@ -148,14 +148,10 @@ struct fec_client { struct fec_group group; /** The current slice. */ uint8_t current_slice_num; - /** The data to be FEC-encoded (point to a region within the mapped audio file). */ - const unsigned char **src_data; + /** The data to be FEC-encoded. */ + unsigned char **src_data; /** Last time an audio header was sent. */ struct timeval next_header_time; - /** Used for the last source pointer of an audio file. */ - unsigned char *extra_src_buf; - /** Needed for the last slice of the audio file header. */ - unsigned char *extra_header_buf; /** Extra slices needed to store largest chunk + header. */ int num_extra_slices; /** Contains the FEC-encoded data. */ @@ -240,6 +236,21 @@ static bool need_data_slices(struct fec_client *fc, struct vss_task *vsst) return false; } +static int fc_num_data_slices(const struct fec_client *fc) +{ + return fc->fcp->data_slices_per_group + fc->num_extra_slices; +} + +static int fc_num_slices(const struct fec_client *fc) +{ + return fc->fcp->slices_per_group + fc->num_extra_slices; +} + +static int fc_num_redundant_slices(const struct fec_client *fc) +{ + return fc->fcp->slices_per_group - fc->fcp->data_slices_per_group; +} + static int num_slices(long unsigned bytes, int max_payload, int rs) { int ret; @@ -270,7 +281,7 @@ static void set_group_timing(struct fec_client *fc, struct vss_task *vsst) static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst) { - int k, n, ret; + int i, k, n, ret; int hs, ds, rs; /* header/data/redundant slices */ struct fec_client_parms *fcp = fc->fcp; @@ -290,7 +301,17 @@ static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst) if (fc->mps <= FEC_HEADER_SIZE) return -ERRNO_TO_PARA_ERROR(EINVAL); - rs = fc->fcp->slices_per_group - fc->fcp->data_slices_per_group; + /* free previous buffers, if any */ + if (fc->src_data) { + k = fc_num_data_slices(fc); + for (i = 0; i < k; i++) + free(fc->src_data[i]); + free(fc->src_data); + fc->src_data = NULL; + } + free(fc->enc_buf); + + rs = fc_num_redundant_slices(fc); ret = num_slices(vsst->header_len, fc->mps - FEC_HEADER_SIZE, rs); if (ret < 0) return ret; @@ -306,17 +327,18 @@ static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst) if (k < fc->fcp->data_slices_per_group) k = fc->fcp->data_slices_per_group; fc->num_extra_slices = k - fc->fcp->data_slices_per_group; - n = k + rs; + n = fc_num_slices(fc); + PARA_INFO_LOG("mps: %d, k: %d, n: %d, extra slices: %d\n", + fc->mps, k, n, fc->num_extra_slices); + fec_free(fc->parms); ret = fec_new(k, n, &fc->parms); if (ret < 0) return ret; - PARA_INFO_LOG("mps: %d, k: %d, n: %d, extra slices: %d\n", - fc->mps, k, n, fc->num_extra_slices); - fc->src_data = para_realloc(fc->src_data, k * sizeof(char *)); - fc->enc_buf = para_realloc(fc->enc_buf, fc->mps); - fc->extra_src_buf = para_realloc(fc->extra_src_buf, fc->mps); - fc->extra_header_buf = para_realloc(fc->extra_header_buf, fc->mps); + fc->src_data = arr_alloc(k, sizeof(char *)); + for (i = 0; i < k; i++) + fc->src_data[i] = alloc(fc->mps); + fc->enc_buf = alloc(fc->mps); fc->state = FEC_STATE_READY_TO_RUN; fc->next_header_time.tv_sec = 0; @@ -326,7 +348,7 @@ static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst) } static int vss_get_chunk(int chunk_num, struct vss_task *vsst, - char **buf, size_t *sz) + char **buf, uint32_t *len) { int ret; @@ -341,15 +363,15 @@ static int vss_get_chunk(int chunk_num, struct vss_task *vsst, if (chunk_num == 0 && vsst->header_len > 0) { assert(vsst->header_buf); *buf = vsst->header_buf; /* stripped header */ - *sz = vsst->header_len; + *len = vsst->header_len; return 0; } ret = afh_get_chunk(chunk_num, &mmd->afd.afhi, mmd->afd.audio_format_id, vsst->map, vsst->mapsize, - (const char **)buf, sz, &vsst->afh_context); + (const char **)buf, len, &vsst->afh_context); if (ret < 0) { *buf = NULL; - *sz = 0; + *len = 0; } return ret; } @@ -358,7 +380,7 @@ static int compute_group_size(struct vss_task *vsst, struct fec_group *g, int max_bytes) { char *buf; - size_t len; + uint32_t len; int ret, i, max_chunks = PARA_MAX(1LU, 150 / tv2ms(vss_chunk_time())); if (g->first_chunk == 0) { @@ -394,7 +416,10 @@ static int compute_group_size(struct vss_task *vsst, struct fec_group *g, g->bytes += len; g->num_chunks++; } - assert(g->num_chunks); + if (g->num_chunks == 0) + return -E_EOF; + PARA_DEBUG_LOG("group #%u: %u chunks, %u bytes total\n", g->num, + g->num_chunks, g->bytes); return 1; } @@ -448,8 +473,8 @@ static int compute_group_size(struct vss_task *vsst, struct fec_group *g, static int compute_slice_size(struct fec_client *fc, struct vss_task *vsst) { struct fec_group *g = &fc->group; - int k = fc->fcp->data_slices_per_group + fc->num_extra_slices; - int n = fc->fcp->slices_per_group + fc->num_extra_slices; + int k = fc_num_data_slices(fc); + int n = fc_num_slices(fc); int ret, k1, k2, h, d, min, max, sum; int max_slice_bytes = fc->mps - FEC_HEADER_SIZE; int max_group_bytes; @@ -512,9 +537,8 @@ static int compute_slice_size(struct fec_client *fc, struct vss_task *vsst) static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst) { - int ret, i, k, n, data_slices; - size_t len; - char *buf, *p; + int ret, i, c; + size_t copy, src_copied, slice_copied; struct fec_group *g = &fc->group; if (fc->state == FEC_STATE_NONE) { @@ -538,86 +562,63 @@ static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst) g->first_chunk += g->num_chunks; g->num++; } - k = fc->fcp->data_slices_per_group + fc->num_extra_slices; - n = fc->fcp->slices_per_group + fc->num_extra_slices; - compute_slice_size(fc, vsst); assert(g->slice_bytes > 0); - ret = num_slices(g->bytes, g->slice_bytes, n - k); - if (ret < 0) - return ret; - data_slices = ret; - assert(g->num_header_slices + data_slices <= k); fc->current_slice_num = 0; if (g->num == 0) set_group_timing(fc, vsst); /* setup header slices */ - buf = vsst->header_buf; - for (i = 0; i < g->num_header_slices; i++) { - uint32_t payload_size; - if (buf + g->slice_bytes <= vsst->header_buf + vsst->header_len) { - fc->src_data[i] = (const unsigned char *)buf; - buf += g->slice_bytes; - continue; - } - /* - * Can not use vss->header_buf for this slice as it - * goes beyond the buffer. This slice will not be fully - * used. - */ - payload_size = vsst->header_buf + vsst->header_len - buf; - memcpy(fc->extra_header_buf, buf, payload_size); - if (payload_size < g->slice_bytes) - memset(fc->extra_header_buf + payload_size, 0, - g->slice_bytes - payload_size); - /* - * There might be more than one header slice to fill although - * only the first one will be used. Set all header slices to - * our extra buffer. - */ - while (i < g->num_header_slices) - fc->src_data[i++] = fc->extra_header_buf; - break; /* we don't want i to be increased. */ + for (i = 0, src_copied = 0; i < g->num_header_slices; i++) { + copy = PARA_MIN((size_t)g->slice_bytes, vsst->header_len - src_copied); + if (copy == 0) + break; + memcpy(fc->src_data[i], vsst->header_buf + src_copied, copy); + if (copy < g->slice_bytes) + memset(fc->src_data[i] + copy, 0, g->slice_bytes - copy); + src_copied += copy; } - /* - * Setup data slices. Note that for ogg streams chunk 0 points to a - * buffer on the heap rather than to the mapped audio file. + * There might be more than one header slice to fill although only the + * first one will be used. Zero out any remaining header slices. */ - ret = vss_get_chunk(g->first_chunk, vsst, &buf, &len); - if (ret < 0) - return ret; - for (p = buf; i < g->num_header_slices + data_slices; i++) { - if (p + g->slice_bytes > buf + g->bytes) { - /* - * We must make a copy for this slice since using p - * directly would exceed the buffer. - */ - uint32_t payload_size = buf + g->bytes - p; - assert(payload_size + FEC_HEADER_SIZE <= fc->mps); - memcpy(fc->extra_src_buf, p, payload_size); - if (payload_size < g->slice_bytes) - memset(fc->extra_src_buf + payload_size, 0, - g->slice_bytes - payload_size); - fc->src_data[i] = fc->extra_src_buf; - i++; - break; + while (i < g->num_header_slices) + memset(fc->src_data[i++], 0, g->slice_bytes); + + slice_copied = 0; + for (c = g->first_chunk; c < g->first_chunk + g->num_chunks; c++) { + char *buf; + uint32_t src_len; + ret = vss_get_chunk(c, vsst, &buf, &src_len); + if (ret < 0) + return ret; + if (src_len == 0) + continue; + src_copied = 0; + while (src_copied < src_len) { + copy = PARA_MIN((size_t)g->slice_bytes - slice_copied, + src_len - src_copied); + memcpy(fc->src_data[i] + slice_copied, + buf + src_copied, copy); + src_copied += copy; + slice_copied += copy; + if (slice_copied == g->slice_bytes) { + i++; + slice_copied = 0; + } } - fc->src_data[i] = (const unsigned char *)p; - p += g->slice_bytes; - } - if (i < k) { - /* use arbitrary data for all remaining slices */ - buf = vsst->map; - for (; i < k; i++) - fc->src_data[i] = (const unsigned char *)buf; } + if (i < fc_num_data_slices(fc) && slice_copied < g->slice_bytes) + memset(fc->src_data[i] + slice_copied, 0, + g->slice_bytes - slice_copied); + /* zero out remaining slices, if any */ + while (++i < fc_num_data_slices(fc)) + memset(fc->src_data[i], 0, g->slice_bytes); PARA_DEBUG_LOG("FEC group %u: %u chunks (%u - %u), %u bytes\n", g->num, g->num_chunks, g->first_chunk, g->first_chunk + g->num_chunks - 1, g->bytes ); PARA_DEBUG_LOG("slice_bytes: %d, %d header slices, %d data slices\n", - g->slice_bytes, g->num_header_slices, data_slices + g->slice_bytes, g->num_header_slices, fc_num_data_slices(fc) ); return 1; } @@ -637,8 +638,9 @@ static int compute_next_fec_slice(struct fec_client *fc, struct vss_task *vsst) } } write_fec_header(fc, vsst); - fec_encode(fc->parms, fc->src_data, fc->enc_buf + FEC_HEADER_SIZE, - fc->current_slice_num, fc->group.slice_bytes); + fec_encode(fc->parms, (const unsigned char * const*)fc->src_data, + fc->enc_buf + FEC_HEADER_SIZE, fc->current_slice_num, + fc->group.slice_bytes); return 1; } @@ -669,7 +671,7 @@ size_t vss_get_fec_eof_packet(const char **buf) struct fec_client *vss_add_fec_client(struct sender_client *sc, struct fec_client_parms *fcp) { - struct fec_client *fc = para_calloc(sizeof(*fc)); + struct fec_client *fc = zalloc(sizeof(*fc)); fc->sc = sc; fc->fcp = fcp; @@ -684,11 +686,15 @@ struct fec_client *vss_add_fec_client(struct sender_client *sc, */ void vss_del_fec_client(struct fec_client *fc) { + int i; + list_del(&fc->node); - free(fc->src_data); free(fc->enc_buf); - free(fc->extra_src_buf); - free(fc->extra_header_buf); + if (fc->src_data) { + for (i = 0; i < fc_num_data_slices(fc); i++) + free(fc->src_data[i]); + free(fc->src_data); + } fec_free(fc->parms); free(fc); } @@ -821,7 +827,7 @@ static void vss_compute_timeout(struct sched *s, struct vss_task *vsst) if (sched_request_barrier(&vsst->data_send_barrier, s) == 1) return; /* - * Compute the select timeout as the minimal time until the next + * Compute the I/O timeout as the minimal time until the next * chunk/slice is due for any client. */ compute_chunk_time(mmd->chunks_sent, &mmd->afd.afhi.chunk_tv, @@ -839,7 +845,6 @@ static void vss_compute_timeout(struct sched *s, struct vss_task *vsst) static void vss_eof(struct vss_task *vsst) { - if (!vsst->map) return; if (mmd->new_vss_status_flags & VSS_NOMORE) @@ -850,8 +855,8 @@ static void vss_eof(struct vss_task *vsst) para_munmap(vsst->map, vsst->mapsize); vsst->map = NULL; mmd->chunks_sent = 0; - //mmd->offset = 0; mmd->afd.afhi.seconds_total = 0; + mmd->afd.afhi.chunks_total = 0; mmd->afd.afhi.chunk_tv.tv_sec = 0; mmd->afd.afhi.chunk_tv.tv_usec = 0; free(mmd->afd.afhi.chunk_table); @@ -887,21 +892,21 @@ static void set_mmd_offset(void) mmd->offset = tv2ms(&offset); } -static void vss_pre_select(struct sched *s, void *context) +static void vss_pre_monitor(struct sched *s, void *context) { int i; struct vss_task *vsst = context; if (need_to_request_new_audio_file(vsst)) { PARA_DEBUG_LOG("ready and playing, but no audio file\n"); - para_fd_set(vsst->afs_socket, &s->wfds, &s->max_fileno); + sched_monitor_writefd(vsst->afs_socket, s); vsst->afsss = AFS_SOCKET_CHECK_FOR_WRITE; } else - para_fd_set(vsst->afs_socket, &s->rfds, &s->max_fileno); + sched_monitor_readfd(vsst->afs_socket, s); FOR_EACH_SENDER(i) { - if (!senders[i]->pre_select) + if (!senders[i]->pre_monitor) continue; - senders[i]->pre_select(&s->max_fileno, &s->rfds, &s->wfds); + senders[i]->pre_monitor(s); } vss_compute_timeout(s, vsst); } @@ -941,16 +946,17 @@ static int recv_afs_msg(int afs_socket, int *fd, uint32_t *code, uint32_t *data) } #ifndef MAP_POPULATE +/** As of 2018, neither FreeBSD-11.2 nor NetBSD-8.0 have MAP_POPULATE. */ #define MAP_POPULATE 0 #endif -static void recv_afs_result(struct vss_task *vsst, fd_set *rfds) +static void recv_afs_result(struct vss_task *vsst, const struct sched *s) { int ret, passed_fd, shmid; uint32_t afs_code = 0, afs_data = 0; struct stat statbuf; - if (!FD_ISSET(vsst->afs_socket, rfds)) + if (!sched_read_ok(vsst->afs_socket, s)) return; ret = recv_afs_msg(vsst->afs_socket, &passed_fd, &afs_code, &afs_data); if (ret == -ERRNO_TO_PARA_ERROR(EAGAIN)) @@ -958,11 +964,17 @@ static void recv_afs_result(struct vss_task *vsst, fd_set *rfds) if (ret < 0) goto err; vsst->afsss = AFS_SOCKET_READY; - PARA_DEBUG_LOG("fd: %d, code: %u, shmid: %u\n", passed_fd, afs_code, - afs_data); + if (afs_code == NO_ADMISSIBLE_FILES) { + PARA_NOTICE_LOG("no admissible files\n"); + ret = 0; + goto err; + } ret = -E_NOFD; - if (afs_code != NEXT_AUDIO_FILE) + if (afs_code != NEXT_AUDIO_FILE) { + PARA_ERROR_LOG("afs code: %u, expected: %d\n", afs_code, + NEXT_AUDIO_FILE); goto err; + } if (passed_fd < 0) goto err; shmid = afs_data; @@ -993,20 +1005,17 @@ static void recv_afs_result(struct vss_task *vsst, fd_set *rfds) return; err: free(mmd->afd.afhi.chunk_table); + mmd->afd.afhi.chunk_table = NULL; if (passed_fd >= 0) close(passed_fd); - PARA_ERROR_LOG("%s\n", para_strerror(-ret)); + if (ret < 0) + PARA_ERROR_LOG("%s\n", para_strerror(-ret)); mmd->new_vss_status_flags = VSS_NEXT; } /** - * Main sending function. - * - * This function gets called from vss_post_select(). It checks whether the next - * chunk of data should be pushed out. It obtains a pointer to the data to be - * sent out as well as its length from mmd->afd.afhi. This information is then - * passed to each supported sender's send() function as well as to the send() - * functions of each registered fec client. + * If the next chunk needs to be sent, pass a pointer to the chunk data to all + * registered fec clients and to each sender's ->send() method. */ static void vss_send(struct vss_task *vsst) { @@ -1015,7 +1024,7 @@ static void vss_send(struct vss_task *vsst) struct timeval due; struct fec_client *fc, *tmp_fc; char *buf; - size_t len; + uint32_t len; if (!vsst->map || !vss_playing()) return; @@ -1073,7 +1082,7 @@ static void vss_send(struct vss_task *vsst) mmd->current_chunk++; } -static int vss_post_select(struct sched *s, void *context) +static int vss_post_monitor(struct sched *s, void *context) { int ret, i; struct vss_task *vsst = context; @@ -1123,9 +1132,9 @@ static int vss_post_select(struct sched *s, void *context) mmd->sender_cmd_data.cmd_num = -1; } if (vsst->afsss != AFS_SOCKET_CHECK_FOR_WRITE) - recv_afs_result(vsst, &s->rfds); - else if (FD_ISSET(vsst->afs_socket, &s->wfds)) { - PARA_NOTICE_LOG("requesting new fd from afs\n"); + recv_afs_result(vsst, s); + else if (sched_write_ok(vsst->afs_socket, s)) { + PARA_INFO_LOG("requesting new fd from afs\n"); ret = write_buffer(vsst->afs_socket, "new"); if (ret < 0) PARA_CRIT_LOG("%s\n", para_strerror(-ret)); @@ -1133,9 +1142,9 @@ static int vss_post_select(struct sched *s, void *context) vsst->afsss = AFS_SOCKET_AFD_PENDING; } FOR_EACH_SENDER(i) { - if (!senders[i]->post_select) + if (!senders[i]->post_monitor) continue; - senders[i]->post_select(&s->rfds, &s->wfds); + senders[i]->post_monitor(s); } if ((vss_playing() && !(mmd->vss_status_flags & VSS_PLAYING)) || (vss_next() && vss_playing())) @@ -1163,7 +1172,7 @@ void vss_init(int afs_socket, struct sched *s) vsst->afs_socket = afs_socket; ms2tv(announce_time, &vsst->announce_tv); PARA_INFO_LOG("announce timeval: %lums\n", tv2ms(&vsst->announce_tv)); - INIT_LIST_HEAD(&fec_client_list); + init_list_head(&fec_client_list); FOR_EACH_SENDER(i) { PARA_NOTICE_LOG("initializing %s sender\n", senders[i]->name); senders[i]->init(); @@ -1180,8 +1189,8 @@ void vss_init(int afs_socket, struct sched *s) } vsst->task = task_register(&(struct task_info) { .name = "vss", - .pre_select = vss_pre_select, - .post_select = vss_post_select, + .pre_monitor = vss_pre_monitor, + .post_monitor = vss_post_monitor, .context = vsst, }, s); } @@ -1189,16 +1198,19 @@ void vss_init(int afs_socket, struct sched *s) /** * Turn off the virtual streaming system. * - * This is only executed on exit. It calls the ->shutdowwn method of all senders. + * This is only executed on exit. It calls the ->shutdown method of all senders. */ void vss_shutdown(void) { int i; + bool is_command_handler = process_is_command_handler(); FOR_EACH_SENDER(i) { if (!senders[i]->shutdown) continue; - PARA_NOTICE_LOG("shutting down %s sender\n", senders[i]->name); + if (!is_command_handler) + PARA_NOTICE_LOG("shutting down %s sender\n", + senders[i]->name); senders[i]->shutdown(); } }