X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=vss.c;h=023b2c1d3cc8fcc0dcf23f0b5d2fc5c43487012f;hp=6aac9d59b5107e1a1c4118a961d54f4d500686e7;hb=9d15fd7e3a77282cec660eed54f46c3282bf8f29;hpb=21859dc53e11f56e76a5db0f105ee65a20d09ab9 diff --git a/vss.c b/vss.c index 6aac9d59..023b2c1d 100644 --- a/vss.c +++ b/vss.c @@ -25,8 +25,8 @@ #include "string.h" #include "afh.h" #include "afs.h" -#include "server.h" #include "net.h" +#include "server.h" #include "list.h" #include "send.h" #include "sched.h" @@ -35,29 +35,9 @@ #include "fd.h" extern struct misc_meta_data *mmd; - -extern void dccp_send_init(struct sender *); -extern void http_send_init(struct sender *); -extern void udp_send_init(struct sender *); - -/** The list of supported senders. */ -struct sender senders[] = { - { - .name = "http", - .init = http_send_init, - }, - { - .name = "dccp", - .init = dccp_send_init, - }, - { - .name = "udp", - .init = udp_send_init, - }, - { - .name = NULL, - } -}; +extern const struct sender udp_sender, dccp_sender, http_sender; +const struct sender * const senders[] = { + &http_sender, &dccp_sender, &udp_sender, NULL}; /** The possible states of the afs socket. */ enum afs_socket_status { @@ -298,7 +278,7 @@ static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst) if (fcp->init_fec) { /* * Set the maximum slice size to the Maximum Packet Size if the - * transport protocol allows to determine this value. The user + * transport protocol allows determination of this value. The user * can specify a slice size up to this value. */ ret = fcp->init_fec(fc->sc); @@ -918,10 +898,10 @@ static void vss_pre_select(struct sched *s, void *context) vsst->afsss = AFS_SOCKET_CHECK_FOR_WRITE; } else para_fd_set(vsst->afs_socket, &s->rfds, &s->max_fileno); - for (i = 0; senders[i].name; i++) { - if (!senders[i].pre_select) + FOR_EACH_SENDER(i) { + if (!senders[i]->pre_select) continue; - senders[i].pre_select(&s->max_fileno, &s->rfds, &s->wfds); + senders[i]->pre_select(&s->max_fileno, &s->rfds, &s->wfds); } vss_compute_timeout(s, vsst); } @@ -961,6 +941,7 @@ static int recv_afs_msg(int afs_socket, int *fd, uint32_t *code, uint32_t *data) } #ifndef MAP_POPULATE +/** As of 2018, neither FreeBSD-11.2 nor NetBSD-8.0 have MAP_POPULATE. */ #define MAP_POPULATE 0 #endif @@ -978,11 +959,12 @@ static void recv_afs_result(struct vss_task *vsst, fd_set *rfds) if (ret < 0) goto err; vsst->afsss = AFS_SOCKET_READY; - PARA_DEBUG_LOG("fd: %d, code: %u, shmid: %u\n", passed_fd, afs_code, - afs_data); ret = -E_NOFD; - if (afs_code != NEXT_AUDIO_FILE) + if (afs_code != NEXT_AUDIO_FILE) { + PARA_ERROR_LOG("afs code: %u, expected: %d\n", afs_code, + NEXT_AUDIO_FILE); goto err; + } if (passed_fd < 0) goto err; shmid = afs_data; @@ -997,7 +979,7 @@ static void recv_afs_result(struct vss_task *vsst, fd_set *rfds) goto err; } ret = para_mmap(statbuf.st_size, PROT_READ, MAP_PRIVATE | MAP_POPULATE, - passed_fd, 0, &vsst->map); + passed_fd, &vsst->map); if (ret < 0) goto err; vsst->mapsize = statbuf.st_size; @@ -1034,6 +1016,8 @@ static void vss_send(struct vss_task *vsst) bool fec_active = false; struct timeval due; struct fec_client *fc, *tmp_fc; + char *buf; + size_t len; if (!vsst->map || !vss_playing()) return; @@ -1064,35 +1048,31 @@ static void vss_send(struct vss_task *vsst) } compute_chunk_time(mmd->chunks_sent, &mmd->afd.afhi.chunk_tv, &mmd->stream_start, &due); - if (tv_diff(&due, now, NULL) <= 0) { - char *buf; - size_t len; - - if (!mmd->chunks_sent) { - mmd->stream_start = *now; - mmd->events++; - set_mmd_offset(); - } - ret = vss_get_chunk(mmd->current_chunk, vsst, &buf, &len); - if (ret < 0) { - PARA_ERROR_LOG("could not get chunk %lu: %s\n", - mmd->current_chunk, para_strerror(-ret)); - } else { - /* - * We call ->send() even if len is zero because senders - * might have data queued which can be sent now. - */ - for (i = 0; senders[i].name; i++) { - if (!senders[i].send) - continue; - senders[i].send(mmd->current_chunk, - mmd->chunks_sent, buf, len, - vsst->header_buf, vsst->header_len); - } + if (tv_diff(&due, now, NULL) > 0) + return; + if (!mmd->chunks_sent) { + mmd->stream_start = *now; + mmd->events++; + set_mmd_offset(); + } + ret = vss_get_chunk(mmd->current_chunk, vsst, &buf, &len); + if (ret < 0) { + PARA_ERROR_LOG("could not get chunk %lu: %s\n", + mmd->current_chunk, para_strerror(-ret)); + } else { + /* + * We call ->send() even if len is zero because senders might + * have data queued which can be sent now. + */ + FOR_EACH_SENDER(i) { + if (!senders[i]->send) + continue; + senders[i]->send(mmd->current_chunk, mmd->chunks_sent, + buf, len, vsst->header_buf, vsst->header_len); } - mmd->chunks_sent++; - mmd->current_chunk++; } + mmd->chunks_sent++; + mmd->current_chunk++; } static int vss_post_select(struct sched *s, void *context) @@ -1100,12 +1080,17 @@ static int vss_post_select(struct sched *s, void *context) int ret, i; struct vss_task *vsst = context; + ret = task_get_notification(vsst->task); + if (ret < 0) { + afh_free_header(vsst->header_buf, mmd->afd.audio_format_id); + return ret; + } if (!vsst->map || vss_next() || vss_paused() || vss_repos()) { /* shut down senders and fec clients */ struct fec_client *fc, *tmp; - for (i = 0; senders[i].name; i++) - if (senders[i].shutdown_clients) - senders[i].shutdown_clients(); + FOR_EACH_SENDER(i) + if (senders[i]->shutdown_clients) + senders[i]->shutdown_clients(); list_for_each_entry_safe(fc, tmp, &fec_client_list, node) fc->state = FEC_STATE_NONE; mmd->stream_start.tv_sec = 0; @@ -1131,8 +1116,8 @@ static int vss_post_select(struct sched *s, void *context) int num = mmd->sender_cmd_data.cmd_num, sender_num = mmd->sender_cmd_data.sender_num; - if (senders[sender_num].client_cmds[num]) { - ret = senders[sender_num].client_cmds[num] + if (senders[sender_num]->client_cmds[num]) { + ret = senders[sender_num]->client_cmds[num] (&mmd->sender_cmd_data); if (ret < 0) PARA_ERROR_LOG("%s\n", para_strerror(-ret)); @@ -1149,10 +1134,10 @@ static int vss_post_select(struct sched *s, void *context) else vsst->afsss = AFS_SOCKET_AFD_PENDING; } - for (i = 0; senders[i].name; i++) { - if (!senders[i].post_select) + FOR_EACH_SENDER(i) { + if (!senders[i]->post_select) continue; - senders[i].post_select(&s->rfds, &s->wfds); + senders[i]->post_select(&s->rfds, &s->wfds); } if ((vss_playing() && !(mmd->vss_status_flags & VSS_PLAYING)) || (vss_next() && vss_playing())) @@ -1181,9 +1166,9 @@ void vss_init(int afs_socket, struct sched *s) ms2tv(announce_time, &vsst->announce_tv); PARA_INFO_LOG("announce timeval: %lums\n", tv2ms(&vsst->announce_tv)); INIT_LIST_HEAD(&fec_client_list); - for (i = 0; senders[i].name; i++) { - PARA_NOTICE_LOG("initializing %s sender\n", senders[i].name); - senders[i].init(&senders[i]); + FOR_EACH_SENDER(i) { + PARA_NOTICE_LOG("initializing %s sender\n", senders[i]->name); + senders[i]->init(); } mmd->sender_cmd_data.cmd_num = -1; if (OPT_GIVEN(AUTOPLAY)) { @@ -1196,9 +1181,26 @@ void vss_init(int afs_socket, struct sched *s) &vsst->data_send_barrier); } vsst->task = task_register(&(struct task_info) { - .name = "vss task", + .name = "vss", .pre_select = vss_pre_select, .post_select = vss_post_select, .context = vsst, }, s); } + +/** + * Turn off the virtual streaming system. + * + * This is only executed on exit. It calls the ->shutdowwn method of all senders. + */ +void vss_shutdown(void) +{ + int i; + + FOR_EACH_SENDER(i) { + if (!senders[i]->shutdown) + continue; + PARA_NOTICE_LOG("shutting down %s sender\n", senders[i]->name); + senders[i]->shutdown(); + } +}