X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=vss.c;h=73c7231128b94152268b1bc8ea02d1f524b25b6a;hp=cf51f43f32608c72f56d1af0b8ad72f25c21161f;hb=f932f45196a8a4480ad910c455abe8aae9824967;hpb=d31995b3074bb19aa4da66ce5c4774ca9ed101a1 diff --git a/vss.c b/vss.c index cf51f43f..73c72311 100644 --- a/vss.c +++ b/vss.c @@ -25,8 +25,8 @@ #include "string.h" #include "afh.h" #include "afs.h" -#include "server.h" #include "net.h" +#include "server.h" #include "list.h" #include "send.h" #include "sched.h" @@ -35,29 +35,9 @@ #include "fd.h" extern struct misc_meta_data *mmd; - -extern void dccp_send_init(struct sender *); -extern void http_send_init(struct sender *); -extern void udp_send_init(struct sender *); - -/** The list of supported senders. */ -struct sender senders[] = { - { - .name = "http", - .init = http_send_init, - }, - { - .name = "dccp", - .init = dccp_send_init, - }, - { - .name = "udp", - .init = udp_send_init, - }, - { - .name = NULL, - } -}; +extern const struct sender udp_sender, dccp_sender, http_sender; +const struct sender * const senders[] = { + &http_sender, &dccp_sender, &udp_sender, NULL}; /** The possible states of the afs socket. */ enum afs_socket_status { @@ -298,7 +278,7 @@ static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst) if (fcp->init_fec) { /* * Set the maximum slice size to the Maximum Packet Size if the - * transport protocol allows to determine this value. The user + * transport protocol allows determination of this value. The user * can specify a slice size up to this value. */ ret = fcp->init_fec(fc->sc); @@ -345,7 +325,7 @@ static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst) return 1; } -static void vss_get_chunk(int chunk_num, struct vss_task *vsst, +static int vss_get_chunk(int chunk_num, struct vss_task *vsst, char **buf, size_t *sz) { int ret; @@ -362,31 +342,32 @@ static void vss_get_chunk(int chunk_num, struct vss_task *vsst, assert(vsst->header_buf); *buf = vsst->header_buf; /* stripped header */ *sz = vsst->header_len; - return; + return 0; } ret = afh_get_chunk(chunk_num, &mmd->afd.afhi, mmd->afd.audio_format_id, vsst->map, vsst->mapsize, (const char **)buf, sz, &vsst->afh_context); if (ret < 0) { - PARA_WARNING_LOG("could not get chunk %d: %s\n", - chunk_num, para_strerror(-ret)); *buf = NULL; *sz = 0; } + return ret; } -static void compute_group_size(struct vss_task *vsst, struct fec_group *g, +static int compute_group_size(struct vss_task *vsst, struct fec_group *g, int max_bytes) { char *buf; size_t len; - int i, max_chunks = PARA_MAX(1LU, 150 / tv2ms(vss_chunk_time())); + int ret, i, max_chunks = PARA_MAX(1LU, 150 / tv2ms(vss_chunk_time())); if (g->first_chunk == 0) { g->num_chunks = 1; - vss_get_chunk(0, vsst, &buf, &len); + ret = vss_get_chunk(0, vsst, &buf, &len); + if (ret < 0) + return ret; g->bytes = len; - return; + return 0; } g->num_chunks = 0; @@ -404,7 +385,9 @@ static void compute_group_size(struct vss_task *vsst, struct fec_group *g, break; if (chunk_num >= mmd->afd.afhi.chunks_total) /* eof */ break; - vss_get_chunk(chunk_num, vsst, &buf, &len); + ret = vss_get_chunk(chunk_num, vsst, &buf, &len); + if (ret < 0) + return ret; if (g->bytes + len > max_bytes) break; /* Include this chunk */ @@ -412,6 +395,7 @@ static void compute_group_size(struct vss_task *vsst, struct fec_group *g, g->num_chunks++; } assert(g->num_chunks); + return 1; } /* @@ -473,7 +457,9 @@ static int compute_slice_size(struct fec_client *fc, struct vss_task *vsst) if (!need_audio_header(fc, vsst)) { max_group_bytes = k * max_slice_bytes; g->num_header_slices = 0; - compute_group_size(vsst, g, max_group_bytes); + ret = compute_group_size(vsst, g, max_group_bytes); + if (ret < 0) + return ret; g->slice_bytes = DIV_ROUND_UP(g->bytes, k); if (g->slice_bytes == 0) g->slice_bytes = 1; @@ -489,7 +475,9 @@ static int compute_slice_size(struct fec_client *fc, struct vss_task *vsst) h = vsst->header_len; max_group_bytes = (k - num_slices(h, max_slice_bytes, n - k)) * max_slice_bytes; - compute_group_size(vsst, g, max_group_bytes); + ret = compute_group_size(vsst, g, max_group_bytes); + if (ret < 0) + return ret; d = g->bytes; if (d == 0) { g->slice_bytes = DIV_ROUND_UP(h, k); @@ -596,7 +584,9 @@ static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst) * Setup data slices. Note that for ogg streams chunk 0 points to a * buffer on the heap rather than to the mapped audio file. */ - vss_get_chunk(g->first_chunk, vsst, &buf, &len); + ret = vss_get_chunk(g->first_chunk, vsst, &buf, &len); + if (ret < 0) + return ret; for (p = buf; i < g->num_header_slices + data_slices; i++) { if (p + g->slice_bytes > buf + g->bytes) { /* @@ -908,10 +898,10 @@ static void vss_pre_select(struct sched *s, void *context) vsst->afsss = AFS_SOCKET_CHECK_FOR_WRITE; } else para_fd_set(vsst->afs_socket, &s->rfds, &s->max_fileno); - for (i = 0; senders[i].name; i++) { - if (!senders[i].pre_select) + FOR_EACH_SENDER(i) { + if (!senders[i]->pre_select) continue; - senders[i].pre_select(&s->max_fileno, &s->rfds, &s->wfds); + senders[i]->pre_select(&s->max_fileno, &s->rfds, &s->wfds); } vss_compute_timeout(s, vsst); } @@ -951,6 +941,7 @@ static int recv_afs_msg(int afs_socket, int *fd, uint32_t *code, uint32_t *data) } #ifndef MAP_POPULATE +/** As of 2018, neither FreeBSD-11.2 nor NetBSD-8.0 have MAP_POPULATE. */ #define MAP_POPULATE 0 #endif @@ -987,7 +978,7 @@ static void recv_afs_result(struct vss_task *vsst, fd_set *rfds) goto err; } ret = para_mmap(statbuf.st_size, PROT_READ, MAP_PRIVATE | MAP_POPULATE, - passed_fd, 0, &vsst->map); + passed_fd, &vsst->map); if (ret < 0) goto err; vsst->mapsize = statbuf.st_size; @@ -1020,10 +1011,12 @@ err: */ static void vss_send(struct vss_task *vsst) { - int i; + int i, ret; bool fec_active = false; struct timeval due; struct fec_client *fc, *tmp_fc; + char *buf; + size_t len; if (!vsst->map || !vss_playing()) return; @@ -1054,29 +1047,31 @@ static void vss_send(struct vss_task *vsst) } compute_chunk_time(mmd->chunks_sent, &mmd->afd.afhi.chunk_tv, &mmd->stream_start, &due); - if (tv_diff(&due, now, NULL) <= 0) { - char *buf; - size_t len; - - if (!mmd->chunks_sent) { - mmd->stream_start = *now; - mmd->events++; - set_mmd_offset(); - } - vss_get_chunk(mmd->current_chunk, vsst, &buf, &len); - for (i = 0; senders[i].name; i++) { - /* - * We call ->send() even if len is zero because senders - * might have data queued which can be sent now. - */ - if (!senders[i].send) + if (tv_diff(&due, now, NULL) > 0) + return; + if (!mmd->chunks_sent) { + mmd->stream_start = *now; + mmd->events++; + set_mmd_offset(); + } + ret = vss_get_chunk(mmd->current_chunk, vsst, &buf, &len); + if (ret < 0) { + PARA_ERROR_LOG("could not get chunk %lu: %s\n", + mmd->current_chunk, para_strerror(-ret)); + } else { + /* + * We call ->send() even if len is zero because senders might + * have data queued which can be sent now. + */ + FOR_EACH_SENDER(i) { + if (!senders[i]->send) continue; - senders[i].send(mmd->current_chunk, mmd->chunks_sent, + senders[i]->send(mmd->current_chunk, mmd->chunks_sent, buf, len, vsst->header_buf, vsst->header_len); } - mmd->chunks_sent++; - mmd->current_chunk++; } + mmd->chunks_sent++; + mmd->current_chunk++; } static int vss_post_select(struct sched *s, void *context) @@ -1084,12 +1079,17 @@ static int vss_post_select(struct sched *s, void *context) int ret, i; struct vss_task *vsst = context; + ret = task_get_notification(vsst->task); + if (ret < 0) { + afh_free_header(vsst->header_buf, mmd->afd.audio_format_id); + return ret; + } if (!vsst->map || vss_next() || vss_paused() || vss_repos()) { /* shut down senders and fec clients */ struct fec_client *fc, *tmp; - for (i = 0; senders[i].name; i++) - if (senders[i].shutdown_clients) - senders[i].shutdown_clients(); + FOR_EACH_SENDER(i) + if (senders[i]->shutdown_clients) + senders[i]->shutdown_clients(); list_for_each_entry_safe(fc, tmp, &fec_client_list, node) fc->state = FEC_STATE_NONE; mmd->stream_start.tv_sec = 0; @@ -1115,8 +1115,8 @@ static int vss_post_select(struct sched *s, void *context) int num = mmd->sender_cmd_data.cmd_num, sender_num = mmd->sender_cmd_data.sender_num; - if (senders[sender_num].client_cmds[num]) { - ret = senders[sender_num].client_cmds[num] + if (senders[sender_num]->client_cmds[num]) { + ret = senders[sender_num]->client_cmds[num] (&mmd->sender_cmd_data); if (ret < 0) PARA_ERROR_LOG("%s\n", para_strerror(-ret)); @@ -1133,10 +1133,10 @@ static int vss_post_select(struct sched *s, void *context) else vsst->afsss = AFS_SOCKET_AFD_PENDING; } - for (i = 0; senders[i].name; i++) { - if (!senders[i].post_select) + FOR_EACH_SENDER(i) { + if (!senders[i]->post_select) continue; - senders[i].post_select(&s->rfds, &s->wfds); + senders[i]->post_select(&s->rfds, &s->wfds); } if ((vss_playing() && !(mmd->vss_status_flags & VSS_PLAYING)) || (vss_next() && vss_playing())) @@ -1165,9 +1165,9 @@ void vss_init(int afs_socket, struct sched *s) ms2tv(announce_time, &vsst->announce_tv); PARA_INFO_LOG("announce timeval: %lums\n", tv2ms(&vsst->announce_tv)); INIT_LIST_HEAD(&fec_client_list); - for (i = 0; senders[i].name; i++) { - PARA_NOTICE_LOG("initializing %s sender\n", senders[i].name); - senders[i].init(&senders[i]); + FOR_EACH_SENDER(i) { + PARA_NOTICE_LOG("initializing %s sender\n", senders[i]->name); + senders[i]->init(); } mmd->sender_cmd_data.cmd_num = -1; if (OPT_GIVEN(AUTOPLAY)) { @@ -1180,9 +1180,26 @@ void vss_init(int afs_socket, struct sched *s) &vsst->data_send_barrier); } vsst->task = task_register(&(struct task_info) { - .name = "vss task", + .name = "vss", .pre_select = vss_pre_select, .post_select = vss_post_select, .context = vsst, }, s); } + +/** + * Turn off the virtual streaming system. + * + * This is only executed on exit. It calls the ->shutdowwn method of all senders. + */ +void vss_shutdown(void) +{ + int i; + + FOR_EACH_SENDER(i) { + if (!senders[i]->shutdown) + continue; + PARA_NOTICE_LOG("shutting down %s sender\n", senders[i]->name); + senders[i]->shutdown(); + } +}