X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=vss.c;h=73c7231128b94152268b1bc8ea02d1f524b25b6a;hp=00c632c70674a034044f6c73c7f2c6b87bf63bd1;hb=ac1f19d550a81c8408c8fce2e237996c950253ab;hpb=1d2aa7dcc09cd7ed06d18372eb85407f556cb529 diff --git a/vss.c b/vss.c index 00c632c7..73c72311 100644 --- a/vss.c +++ b/vss.c @@ -25,8 +25,8 @@ #include "string.h" #include "afh.h" #include "afs.h" -#include "server.h" #include "net.h" +#include "server.h" #include "list.h" #include "send.h" #include "sched.h" @@ -35,29 +35,9 @@ #include "fd.h" extern struct misc_meta_data *mmd; - -extern void dccp_send_init(struct sender *); -extern void http_send_init(struct sender *); -extern void udp_send_init(struct sender *); - -/** The list of supported senders. */ -struct sender senders[] = { - { - .name = "http", - .init = http_send_init, - }, - { - .name = "dccp", - .init = dccp_send_init, - }, - { - .name = "udp", - .init = udp_send_init, - }, - { - .name = NULL, - } -}; +extern const struct sender udp_sender, dccp_sender, http_sender; +const struct sender * const senders[] = { + &http_sender, &dccp_sender, &udp_sender, NULL}; /** The possible states of the afs socket. */ enum afs_socket_status { @@ -298,7 +278,7 @@ static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst) if (fcp->init_fec) { /* * Set the maximum slice size to the Maximum Packet Size if the - * transport protocol allows to determine this value. The user + * transport protocol allows determination of this value. The user * can specify a slice size up to this value. */ ret = fcp->init_fec(fc->sc); @@ -918,10 +898,10 @@ static void vss_pre_select(struct sched *s, void *context) vsst->afsss = AFS_SOCKET_CHECK_FOR_WRITE; } else para_fd_set(vsst->afs_socket, &s->rfds, &s->max_fileno); - for (i = 0; senders[i].name; i++) { - if (!senders[i].pre_select) + FOR_EACH_SENDER(i) { + if (!senders[i]->pre_select) continue; - senders[i].pre_select(&s->max_fileno, &s->rfds, &s->wfds); + senders[i]->pre_select(&s->max_fileno, &s->rfds, &s->wfds); } vss_compute_timeout(s, vsst); } @@ -961,6 +941,7 @@ static int recv_afs_msg(int afs_socket, int *fd, uint32_t *code, uint32_t *data) } #ifndef MAP_POPULATE +/** As of 2018, neither FreeBSD-11.2 nor NetBSD-8.0 have MAP_POPULATE. */ #define MAP_POPULATE 0 #endif @@ -997,7 +978,7 @@ static void recv_afs_result(struct vss_task *vsst, fd_set *rfds) goto err; } ret = para_mmap(statbuf.st_size, PROT_READ, MAP_PRIVATE | MAP_POPULATE, - passed_fd, 0, &vsst->map); + passed_fd, &vsst->map); if (ret < 0) goto err; vsst->mapsize = statbuf.st_size; @@ -1082,10 +1063,10 @@ static void vss_send(struct vss_task *vsst) * We call ->send() even if len is zero because senders might * have data queued which can be sent now. */ - for (i = 0; senders[i].name; i++) { - if (!senders[i].send) + FOR_EACH_SENDER(i) { + if (!senders[i]->send) continue; - senders[i].send(mmd->current_chunk, mmd->chunks_sent, + senders[i]->send(mmd->current_chunk, mmd->chunks_sent, buf, len, vsst->header_buf, vsst->header_len); } } @@ -1098,12 +1079,17 @@ static int vss_post_select(struct sched *s, void *context) int ret, i; struct vss_task *vsst = context; + ret = task_get_notification(vsst->task); + if (ret < 0) { + afh_free_header(vsst->header_buf, mmd->afd.audio_format_id); + return ret; + } if (!vsst->map || vss_next() || vss_paused() || vss_repos()) { /* shut down senders and fec clients */ struct fec_client *fc, *tmp; - for (i = 0; senders[i].name; i++) - if (senders[i].shutdown_clients) - senders[i].shutdown_clients(); + FOR_EACH_SENDER(i) + if (senders[i]->shutdown_clients) + senders[i]->shutdown_clients(); list_for_each_entry_safe(fc, tmp, &fec_client_list, node) fc->state = FEC_STATE_NONE; mmd->stream_start.tv_sec = 0; @@ -1129,8 +1115,8 @@ static int vss_post_select(struct sched *s, void *context) int num = mmd->sender_cmd_data.cmd_num, sender_num = mmd->sender_cmd_data.sender_num; - if (senders[sender_num].client_cmds[num]) { - ret = senders[sender_num].client_cmds[num] + if (senders[sender_num]->client_cmds[num]) { + ret = senders[sender_num]->client_cmds[num] (&mmd->sender_cmd_data); if (ret < 0) PARA_ERROR_LOG("%s\n", para_strerror(-ret)); @@ -1147,10 +1133,10 @@ static int vss_post_select(struct sched *s, void *context) else vsst->afsss = AFS_SOCKET_AFD_PENDING; } - for (i = 0; senders[i].name; i++) { - if (!senders[i].post_select) + FOR_EACH_SENDER(i) { + if (!senders[i]->post_select) continue; - senders[i].post_select(&s->rfds, &s->wfds); + senders[i]->post_select(&s->rfds, &s->wfds); } if ((vss_playing() && !(mmd->vss_status_flags & VSS_PLAYING)) || (vss_next() && vss_playing())) @@ -1179,9 +1165,9 @@ void vss_init(int afs_socket, struct sched *s) ms2tv(announce_time, &vsst->announce_tv); PARA_INFO_LOG("announce timeval: %lums\n", tv2ms(&vsst->announce_tv)); INIT_LIST_HEAD(&fec_client_list); - for (i = 0; senders[i].name; i++) { - PARA_NOTICE_LOG("initializing %s sender\n", senders[i].name); - senders[i].init(&senders[i]); + FOR_EACH_SENDER(i) { + PARA_NOTICE_LOG("initializing %s sender\n", senders[i]->name); + senders[i]->init(); } mmd->sender_cmd_data.cmd_num = -1; if (OPT_GIVEN(AUTOPLAY)) { @@ -1200,3 +1186,20 @@ void vss_init(int afs_socket, struct sched *s) .context = vsst, }, s); } + +/** + * Turn off the virtual streaming system. + * + * This is only executed on exit. It calls the ->shutdowwn method of all senders. + */ +void vss_shutdown(void) +{ + int i; + + FOR_EACH_SENDER(i) { + if (!senders[i]->shutdown) + continue; + PARA_NOTICE_LOG("shutting down %s sender\n", senders[i]->name); + senders[i]->shutdown(); + } +}