X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=vss.c;h=2a24a50068b47b96c16fe287e456aeaf51c55473;hp=13a8e25fd0febc7aeeb56e8e37099f7ec4747a0f;hb=e3dfede17fa6c297f89a34cb997b7276c884497a;hpb=2031b9cab9304b02c0372f73eef54d9501277031 diff --git a/vss.c b/vss.c index 13a8e25f..2a24a500 100644 --- a/vss.c +++ b/vss.c @@ -35,29 +35,9 @@ #include "fd.h" extern struct misc_meta_data *mmd; - -extern void dccp_send_init(struct sender *); -extern void http_send_init(struct sender *); -extern void udp_send_init(struct sender *); - -/** The list of supported senders. */ -struct sender senders[] = { - { - .name = "http", - .init = http_send_init, - }, - { - .name = "dccp", - .init = dccp_send_init, - }, - { - .name = "udp", - .init = udp_send_init, - }, - { - .name = NULL, - } -}; +extern const struct sender udp_sender, dccp_sender, http_sender; +const struct sender * const senders[] = { + &http_sender, &dccp_sender, &udp_sender, NULL}; /** The possible states of the afs socket. */ enum afs_socket_status { @@ -298,7 +278,7 @@ static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst) if (fcp->init_fec) { /* * Set the maximum slice size to the Maximum Packet Size if the - * transport protocol allows to determine this value. The user + * transport protocol allows determination of this value. The user * can specify a slice size up to this value. */ ret = fcp->init_fec(fc->sc); @@ -918,10 +898,10 @@ static void vss_pre_select(struct sched *s, void *context) vsst->afsss = AFS_SOCKET_CHECK_FOR_WRITE; } else para_fd_set(vsst->afs_socket, &s->rfds, &s->max_fileno); - for (i = 0; senders[i].name; i++) { - if (!senders[i].pre_select) + FOR_EACH_SENDER(i) { + if (!senders[i]->pre_select) continue; - senders[i].pre_select(&s->max_fileno, &s->rfds, &s->wfds); + senders[i]->pre_select(&s->max_fileno, &s->rfds, &s->wfds); } vss_compute_timeout(s, vsst); } @@ -961,6 +941,7 @@ static int recv_afs_msg(int afs_socket, int *fd, uint32_t *code, uint32_t *data) } #ifndef MAP_POPULATE +/** As of 2018, neither FreeBSD-11.2 nor NetBSD-8.0 have MAP_POPULATE. */ #define MAP_POPULATE 0 #endif @@ -978,11 +959,12 @@ static void recv_afs_result(struct vss_task *vsst, fd_set *rfds) if (ret < 0) goto err; vsst->afsss = AFS_SOCKET_READY; - PARA_DEBUG_LOG("fd: %d, code: %u, shmid: %u\n", passed_fd, afs_code, - afs_data); ret = -E_NOFD; - if (afs_code != NEXT_AUDIO_FILE) + if (afs_code != NEXT_AUDIO_FILE) { + PARA_ERROR_LOG("afs code: %u, expected: %d\n", afs_code, + NEXT_AUDIO_FILE); goto err; + } if (passed_fd < 0) goto err; shmid = afs_data; @@ -1082,10 +1064,10 @@ static void vss_send(struct vss_task *vsst) * We call ->send() even if len is zero because senders might * have data queued which can be sent now. */ - for (i = 0; senders[i].name; i++) { - if (!senders[i].send) + FOR_EACH_SENDER(i) { + if (!senders[i]->send) continue; - senders[i].send(mmd->current_chunk, mmd->chunks_sent, + senders[i]->send(mmd->current_chunk, mmd->chunks_sent, buf, len, vsst->header_buf, vsst->header_len); } } @@ -1098,12 +1080,17 @@ static int vss_post_select(struct sched *s, void *context) int ret, i; struct vss_task *vsst = context; + ret = task_get_notification(vsst->task); + if (ret < 0) { + afh_free_header(vsst->header_buf, mmd->afd.audio_format_id); + return ret; + } if (!vsst->map || vss_next() || vss_paused() || vss_repos()) { /* shut down senders and fec clients */ struct fec_client *fc, *tmp; - for (i = 0; senders[i].name; i++) - if (senders[i].shutdown_clients) - senders[i].shutdown_clients(); + FOR_EACH_SENDER(i) + if (senders[i]->shutdown_clients) + senders[i]->shutdown_clients(); list_for_each_entry_safe(fc, tmp, &fec_client_list, node) fc->state = FEC_STATE_NONE; mmd->stream_start.tv_sec = 0; @@ -1129,8 +1116,8 @@ static int vss_post_select(struct sched *s, void *context) int num = mmd->sender_cmd_data.cmd_num, sender_num = mmd->sender_cmd_data.sender_num; - if (senders[sender_num].client_cmds[num]) { - ret = senders[sender_num].client_cmds[num] + if (senders[sender_num]->client_cmds[num]) { + ret = senders[sender_num]->client_cmds[num] (&mmd->sender_cmd_data); if (ret < 0) PARA_ERROR_LOG("%s\n", para_strerror(-ret)); @@ -1147,10 +1134,10 @@ static int vss_post_select(struct sched *s, void *context) else vsst->afsss = AFS_SOCKET_AFD_PENDING; } - for (i = 0; senders[i].name; i++) { - if (!senders[i].post_select) + FOR_EACH_SENDER(i) { + if (!senders[i]->post_select) continue; - senders[i].post_select(&s->rfds, &s->wfds); + senders[i]->post_select(&s->rfds, &s->wfds); } if ((vss_playing() && !(mmd->vss_status_flags & VSS_PLAYING)) || (vss_next() && vss_playing())) @@ -1179,9 +1166,9 @@ void vss_init(int afs_socket, struct sched *s) ms2tv(announce_time, &vsst->announce_tv); PARA_INFO_LOG("announce timeval: %lums\n", tv2ms(&vsst->announce_tv)); INIT_LIST_HEAD(&fec_client_list); - for (i = 0; senders[i].name; i++) { - PARA_NOTICE_LOG("initializing %s sender\n", senders[i].name); - senders[i].init(&senders[i]); + FOR_EACH_SENDER(i) { + PARA_NOTICE_LOG("initializing %s sender\n", senders[i]->name); + senders[i]->init(); } mmd->sender_cmd_data.cmd_num = -1; if (OPT_GIVEN(AUTOPLAY)) { @@ -1200,3 +1187,20 @@ void vss_init(int afs_socket, struct sched *s) .context = vsst, }, s); } + +/** + * Turn off the virtual streaming system. + * + * This is only executed on exit. It calls the ->shutdown method of all senders. + */ +void vss_shutdown(void) +{ + int i; + + FOR_EACH_SENDER(i) { + if (!senders[i]->shutdown) + continue; + PARA_NOTICE_LOG("shutting down %s sender\n", senders[i]->name); + senders[i]->shutdown(); + } +}