X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=vss.c;h=ad5f7ec0db6886894a90f32b4c5adf444871ddf0;hp=f1bb57983609ad60c20cb03ba73b3a1e518e9cd3;hb=442a3320ff155d09b990c0ee2abace399cbcd6dd;hpb=178fcf6a776ac60eaa55174c883f1c6735204670 diff --git a/vss.c b/vss.c index f1bb5798..ad5f7ec0 100644 --- a/vss.c +++ b/vss.c @@ -25,8 +25,8 @@ #include "string.h" #include "afh.h" #include "afs.h" -#include "server.h" #include "net.h" +#include "server.h" #include "list.h" #include "send.h" #include "sched.h" @@ -35,29 +35,9 @@ #include "fd.h" extern struct misc_meta_data *mmd; - -extern void dccp_send_init(struct sender *); -extern void http_send_init(struct sender *); -extern void udp_send_init(struct sender *); - -/** The list of supported senders. */ -struct sender senders[] = { - { - .name = "http", - .init = http_send_init, - }, - { - .name = "dccp", - .init = dccp_send_init, - }, - { - .name = "udp", - .init = udp_send_init, - }, - { - .name = NULL, - } -}; +extern const struct sender udp_sender, dccp_sender, http_sender; +const struct sender * const senders[] = { + &http_sender, &dccp_sender, &udp_sender, NULL}; /** The possible states of the afs socket. */ enum afs_socket_status { @@ -168,14 +148,10 @@ struct fec_client { struct fec_group group; /** The current slice. */ uint8_t current_slice_num; - /** The data to be FEC-encoded (point to a region within the mapped audio file). */ - const unsigned char **src_data; + /** The data to be FEC-encoded. */ + unsigned char **src_data; /** Last time an audio header was sent. */ struct timeval next_header_time; - /** Used for the last source pointer of an audio file. */ - unsigned char *extra_src_buf; - /** Needed for the last slice of the audio file header. */ - unsigned char *extra_header_buf; /** Extra slices needed to store largest chunk + header. */ int num_extra_slices; /** Contains the FEC-encoded data. */ @@ -260,6 +236,21 @@ static bool need_data_slices(struct fec_client *fc, struct vss_task *vsst) return false; } +static int fc_num_data_slices(const struct fec_client *fc) +{ + return fc->fcp->data_slices_per_group + fc->num_extra_slices; +} + +static int fc_num_slices(const struct fec_client *fc) +{ + return fc->fcp->slices_per_group + fc->num_extra_slices; +} + +static int fc_num_redundant_slices(const struct fec_client *fc) +{ + return fc->fcp->slices_per_group - fc->fcp->data_slices_per_group; +} + static int num_slices(long unsigned bytes, int max_payload, int rs) { int ret; @@ -290,7 +281,7 @@ static void set_group_timing(struct fec_client *fc, struct vss_task *vsst) static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst) { - int k, n, ret; + int i, k, n, ret; int hs, ds, rs; /* header/data/redundant slices */ struct fec_client_parms *fcp = fc->fcp; @@ -298,7 +289,7 @@ static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst) if (fcp->init_fec) { /* * Set the maximum slice size to the Maximum Packet Size if the - * transport protocol allows to determine this value. The user + * transport protocol allows determination of this value. The user * can specify a slice size up to this value. */ ret = fcp->init_fec(fc->sc); @@ -310,7 +301,17 @@ static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst) if (fc->mps <= FEC_HEADER_SIZE) return -ERRNO_TO_PARA_ERROR(EINVAL); - rs = fc->fcp->slices_per_group - fc->fcp->data_slices_per_group; + /* free previous buffers, if any */ + if (fc->src_data) { + k = fc_num_data_slices(fc); + for (i = 0; i < k; i++) + free(fc->src_data[i]); + free(fc->src_data); + fc->src_data = NULL; + } + free(fc->enc_buf); + + rs = fc_num_redundant_slices(fc); ret = num_slices(vsst->header_len, fc->mps - FEC_HEADER_SIZE, rs); if (ret < 0) return ret; @@ -326,17 +327,18 @@ static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst) if (k < fc->fcp->data_slices_per_group) k = fc->fcp->data_slices_per_group; fc->num_extra_slices = k - fc->fcp->data_slices_per_group; - n = k + rs; + n = fc_num_slices(fc); + PARA_INFO_LOG("mps: %d, k: %d, n: %d, extra slices: %d\n", + fc->mps, k, n, fc->num_extra_slices); + fec_free(fc->parms); ret = fec_new(k, n, &fc->parms); if (ret < 0) return ret; - PARA_INFO_LOG("mps: %d, k: %d, n: %d, extra slices: %d\n", - fc->mps, k, n, fc->num_extra_slices); - fc->src_data = para_realloc(fc->src_data, k * sizeof(char *)); - fc->enc_buf = para_realloc(fc->enc_buf, fc->mps); - fc->extra_src_buf = para_realloc(fc->extra_src_buf, fc->mps); - fc->extra_header_buf = para_realloc(fc->extra_header_buf, fc->mps); + fc->src_data = para_malloc(k * sizeof(char *)); + for (i = 0; i < k; i++) + fc->src_data[i] = para_malloc(fc->mps); + fc->enc_buf = para_malloc(fc->mps); fc->state = FEC_STATE_READY_TO_RUN; fc->next_header_time.tv_sec = 0; @@ -415,6 +417,8 @@ static int compute_group_size(struct vss_task *vsst, struct fec_group *g, g->num_chunks++; } assert(g->num_chunks); + PARA_DEBUG_LOG("group #%u: %u chunks, %u bytes total\n", g->num, + g->num_chunks, g->bytes); return 1; } @@ -468,8 +472,8 @@ static int compute_group_size(struct vss_task *vsst, struct fec_group *g, static int compute_slice_size(struct fec_client *fc, struct vss_task *vsst) { struct fec_group *g = &fc->group; - int k = fc->fcp->data_slices_per_group + fc->num_extra_slices; - int n = fc->fcp->slices_per_group + fc->num_extra_slices; + int k = fc_num_data_slices(fc); + int n = fc_num_slices(fc); int ret, k1, k2, h, d, min, max, sum; int max_slice_bytes = fc->mps - FEC_HEADER_SIZE; int max_group_bytes; @@ -532,9 +536,8 @@ static int compute_slice_size(struct fec_client *fc, struct vss_task *vsst) static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst) { - int ret, i, k, n, data_slices; - size_t len; - char *buf, *p; + int ret, i, c; + size_t copy, src_copied, slice_copied; struct fec_group *g = &fc->group; if (fc->state == FEC_STATE_NONE) { @@ -558,86 +561,63 @@ static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst) g->first_chunk += g->num_chunks; g->num++; } - k = fc->fcp->data_slices_per_group + fc->num_extra_slices; - n = fc->fcp->slices_per_group + fc->num_extra_slices; - compute_slice_size(fc, vsst); assert(g->slice_bytes > 0); - ret = num_slices(g->bytes, g->slice_bytes, n - k); - if (ret < 0) - return ret; - data_slices = ret; - assert(g->num_header_slices + data_slices <= k); fc->current_slice_num = 0; if (g->num == 0) set_group_timing(fc, vsst); /* setup header slices */ - buf = vsst->header_buf; - for (i = 0; i < g->num_header_slices; i++) { - uint32_t payload_size; - if (buf + g->slice_bytes <= vsst->header_buf + vsst->header_len) { - fc->src_data[i] = (const unsigned char *)buf; - buf += g->slice_bytes; - continue; - } - /* - * Can not use vss->header_buf for this slice as it - * goes beyond the buffer. This slice will not be fully - * used. - */ - payload_size = vsst->header_buf + vsst->header_len - buf; - memcpy(fc->extra_header_buf, buf, payload_size); - if (payload_size < g->slice_bytes) - memset(fc->extra_header_buf + payload_size, 0, - g->slice_bytes - payload_size); - /* - * There might be more than one header slice to fill although - * only the first one will be used. Set all header slices to - * our extra buffer. - */ - while (i < g->num_header_slices) - fc->src_data[i++] = fc->extra_header_buf; - break; /* we don't want i to be increased. */ + for (i = 0, src_copied = 0; i < g->num_header_slices; i++) { + copy = PARA_MIN((size_t)g->slice_bytes, vsst->header_len - src_copied); + if (copy == 0) + break; + memcpy(fc->src_data[i], vsst->header_buf + src_copied, copy); + if (copy < g->slice_bytes) + memset(fc->src_data[i] + copy, 0, g->slice_bytes - copy); + src_copied += copy; } - /* - * Setup data slices. Note that for ogg streams chunk 0 points to a - * buffer on the heap rather than to the mapped audio file. + * There might be more than one header slice to fill although only the + * first one will be used. Zero out any remaining header slices. */ - ret = vss_get_chunk(g->first_chunk, vsst, &buf, &len); - if (ret < 0) - return ret; - for (p = buf; i < g->num_header_slices + data_slices; i++) { - if (p + g->slice_bytes > buf + g->bytes) { - /* - * We must make a copy for this slice since using p - * directly would exceed the buffer. - */ - uint32_t payload_size = buf + g->bytes - p; - assert(payload_size + FEC_HEADER_SIZE <= fc->mps); - memcpy(fc->extra_src_buf, p, payload_size); - if (payload_size < g->slice_bytes) - memset(fc->extra_src_buf + payload_size, 0, - g->slice_bytes - payload_size); - fc->src_data[i] = fc->extra_src_buf; - i++; - break; + while (i < g->num_header_slices) + memset(fc->src_data[i++], 0, g->slice_bytes); + + slice_copied = 0; + for (c = g->first_chunk; c < g->first_chunk + g->num_chunks; c++) { + char *buf; + size_t src_len; + ret = vss_get_chunk(c, vsst, &buf, &src_len); + if (ret < 0) + return ret; + if (src_len == 0) + continue; + src_copied = 0; + while (src_copied < src_len) { + copy = PARA_MIN((size_t)g->slice_bytes - slice_copied, + src_len - src_copied); + memcpy(fc->src_data[i] + slice_copied, + buf + src_copied, copy); + src_copied += copy; + slice_copied += copy; + if (slice_copied == g->slice_bytes) { + i++; + slice_copied = 0; + } } - fc->src_data[i] = (const unsigned char *)p; - p += g->slice_bytes; - } - if (i < k) { - /* use arbitrary data for all remaining slices */ - buf = vsst->map; - for (; i < k; i++) - fc->src_data[i] = (const unsigned char *)buf; } + if (i < fc_num_data_slices(fc) && slice_copied < g->slice_bytes) + memset(fc->src_data[i] + slice_copied, 0, + g->slice_bytes - slice_copied); + /* zero out remaining slices, if any */ + while (++i < fc_num_data_slices(fc)) + memset(fc->src_data[i], 0, g->slice_bytes); PARA_DEBUG_LOG("FEC group %u: %u chunks (%u - %u), %u bytes\n", g->num, g->num_chunks, g->first_chunk, g->first_chunk + g->num_chunks - 1, g->bytes ); PARA_DEBUG_LOG("slice_bytes: %d, %d header slices, %d data slices\n", - g->slice_bytes, g->num_header_slices, data_slices + g->slice_bytes, g->num_header_slices, fc_num_data_slices(fc) ); return 1; } @@ -657,8 +637,9 @@ static int compute_next_fec_slice(struct fec_client *fc, struct vss_task *vsst) } } write_fec_header(fc, vsst); - fec_encode(fc->parms, fc->src_data, fc->enc_buf + FEC_HEADER_SIZE, - fc->current_slice_num, fc->group.slice_bytes); + fec_encode(fc->parms, (const unsigned char * const*)fc->src_data, + fc->enc_buf + FEC_HEADER_SIZE, fc->current_slice_num, + fc->group.slice_bytes); return 1; } @@ -704,11 +685,15 @@ struct fec_client *vss_add_fec_client(struct sender_client *sc, */ void vss_del_fec_client(struct fec_client *fc) { + int i; + list_del(&fc->node); - free(fc->src_data); free(fc->enc_buf); - free(fc->extra_src_buf); - free(fc->extra_header_buf); + if (fc->src_data) { + for (i = 0; i < fc_num_data_slices(fc); i++) + free(fc->src_data[i]); + free(fc->src_data); + } fec_free(fc->parms); free(fc); } @@ -918,10 +903,10 @@ static void vss_pre_select(struct sched *s, void *context) vsst->afsss = AFS_SOCKET_CHECK_FOR_WRITE; } else para_fd_set(vsst->afs_socket, &s->rfds, &s->max_fileno); - for (i = 0; senders[i].name; i++) { - if (!senders[i].pre_select) + FOR_EACH_SENDER(i) { + if (!senders[i]->pre_select) continue; - senders[i].pre_select(&s->max_fileno, &s->rfds, &s->wfds); + senders[i]->pre_select(&s->max_fileno, &s->rfds, &s->wfds); } vss_compute_timeout(s, vsst); } @@ -961,6 +946,7 @@ static int recv_afs_msg(int afs_socket, int *fd, uint32_t *code, uint32_t *data) } #ifndef MAP_POPULATE +/** As of 2018, neither FreeBSD-11.2 nor NetBSD-8.0 have MAP_POPULATE. */ #define MAP_POPULATE 0 #endif @@ -978,11 +964,12 @@ static void recv_afs_result(struct vss_task *vsst, fd_set *rfds) if (ret < 0) goto err; vsst->afsss = AFS_SOCKET_READY; - PARA_DEBUG_LOG("fd: %d, code: %u, shmid: %u\n", passed_fd, afs_code, - afs_data); ret = -E_NOFD; - if (afs_code != NEXT_AUDIO_FILE) + if (afs_code != NEXT_AUDIO_FILE) { + PARA_ERROR_LOG("afs code: %u, expected: %d\n", afs_code, + NEXT_AUDIO_FILE); goto err; + } if (passed_fd < 0) goto err; shmid = afs_data; @@ -997,7 +984,7 @@ static void recv_afs_result(struct vss_task *vsst, fd_set *rfds) goto err; } ret = para_mmap(statbuf.st_size, PROT_READ, MAP_PRIVATE | MAP_POPULATE, - passed_fd, 0, &vsst->map); + passed_fd, &vsst->map); if (ret < 0) goto err; vsst->mapsize = statbuf.st_size; @@ -1082,10 +1069,10 @@ static void vss_send(struct vss_task *vsst) * We call ->send() even if len is zero because senders might * have data queued which can be sent now. */ - for (i = 0; senders[i].name; i++) { - if (!senders[i].send) + FOR_EACH_SENDER(i) { + if (!senders[i]->send) continue; - senders[i].send(mmd->current_chunk, mmd->chunks_sent, + senders[i]->send(mmd->current_chunk, mmd->chunks_sent, buf, len, vsst->header_buf, vsst->header_len); } } @@ -1098,12 +1085,17 @@ static int vss_post_select(struct sched *s, void *context) int ret, i; struct vss_task *vsst = context; + ret = task_get_notification(vsst->task); + if (ret < 0) { + afh_free_header(vsst->header_buf, mmd->afd.audio_format_id); + return ret; + } if (!vsst->map || vss_next() || vss_paused() || vss_repos()) { /* shut down senders and fec clients */ struct fec_client *fc, *tmp; - for (i = 0; senders[i].name; i++) - if (senders[i].shutdown_clients) - senders[i].shutdown_clients(); + FOR_EACH_SENDER(i) + if (senders[i]->shutdown_clients) + senders[i]->shutdown_clients(); list_for_each_entry_safe(fc, tmp, &fec_client_list, node) fc->state = FEC_STATE_NONE; mmd->stream_start.tv_sec = 0; @@ -1129,8 +1121,8 @@ static int vss_post_select(struct sched *s, void *context) int num = mmd->sender_cmd_data.cmd_num, sender_num = mmd->sender_cmd_data.sender_num; - if (senders[sender_num].client_cmds[num]) { - ret = senders[sender_num].client_cmds[num] + if (senders[sender_num]->client_cmds[num]) { + ret = senders[sender_num]->client_cmds[num] (&mmd->sender_cmd_data); if (ret < 0) PARA_ERROR_LOG("%s\n", para_strerror(-ret)); @@ -1147,10 +1139,10 @@ static int vss_post_select(struct sched *s, void *context) else vsst->afsss = AFS_SOCKET_AFD_PENDING; } - for (i = 0; senders[i].name; i++) { - if (!senders[i].post_select) + FOR_EACH_SENDER(i) { + if (!senders[i]->post_select) continue; - senders[i].post_select(&s->rfds, &s->wfds); + senders[i]->post_select(&s->rfds, &s->wfds); } if ((vss_playing() && !(mmd->vss_status_flags & VSS_PLAYING)) || (vss_next() && vss_playing())) @@ -1178,10 +1170,10 @@ void vss_init(int afs_socket, struct sched *s) vsst->afs_socket = afs_socket; ms2tv(announce_time, &vsst->announce_tv); PARA_INFO_LOG("announce timeval: %lums\n", tv2ms(&vsst->announce_tv)); - INIT_LIST_HEAD(&fec_client_list); - for (i = 0; senders[i].name; i++) { - PARA_NOTICE_LOG("initializing %s sender\n", senders[i].name); - senders[i].init(&senders[i]); + init_list_head(&fec_client_list); + FOR_EACH_SENDER(i) { + PARA_NOTICE_LOG("initializing %s sender\n", senders[i]->name); + senders[i]->init(); } mmd->sender_cmd_data.cmd_num = -1; if (OPT_GIVEN(AUTOPLAY)) { @@ -1194,9 +1186,26 @@ void vss_init(int afs_socket, struct sched *s) &vsst->data_send_barrier); } vsst->task = task_register(&(struct task_info) { - .name = "vss task", + .name = "vss", .pre_select = vss_pre_select, .post_select = vss_post_select, .context = vsst, }, s); } + +/** + * Turn off the virtual streaming system. + * + * This is only executed on exit. It calls the ->shutdown method of all senders. + */ +void vss_shutdown(void) +{ + int i; + + FOR_EACH_SENDER(i) { + if (!senders[i]->shutdown) + continue; + PARA_NOTICE_LOG("shutting down %s sender\n", senders[i]->name); + senders[i]->shutdown(); + } +}