X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=vss.c;h=98965efd386a14b5fd95361c9f6878fb959af128;hp=a7542677249e14ad71182d8d276c55177e1c0ca7;hb=2b471378b49c04db7bb84d1e75db981f91ad93db;hpb=e67fbcaf61d3d0f3ce62332f9615e5ed8e5a1c81 diff --git a/vss.c b/vss.c index a7542677..98965efd 100644 --- a/vss.c +++ b/vss.c @@ -30,16 +30,55 @@ #include "ipc.h" #include "fd.h" -static struct timeval announce_tv; -static struct timeval data_send_barrier; -static struct timeval eof_barrier; -static struct timeval autoplay_barrier; - extern struct misc_meta_data *mmd; -extern struct sender senders[]; + +extern void dccp_send_init(struct sender *); +extern void http_send_init(struct sender *); +extern void ortp_send_init(struct sender *); + +/** the list of supported senders */ +struct sender senders[] = { + { + .name = "http", + .init = http_send_init, + }, + { + .name = "dccp", + .init = dccp_send_init, + }, +#ifdef HAVE_ORTP + { + .name = "ortp", + .init = ortp_send_init, + }, +#endif + { + .name = NULL, + } +}; + +/** The possible states of the afs socket. See \ref afs_socket. */ +enum afs_socket_status { + /** Socket is inactive. */ + AFS_SOCKET_READY, + /** Socket fd was included in the write fd set for select(). */ + AFS_SOCKET_CHECK_FOR_WRITE, + /** vss wrote a request to the socket and waits for afs to reply. */ + AFS_SOCKET_AFD_PENDING +}; static char *map; +struct vss_task { + struct timeval announce_tv; + struct timeval data_send_barrier; + struct timeval eof_barrier; + struct timeval autoplay_barrier; + enum afs_socket_status afsss; +}; + +static struct vss_task vss_task_struct, *vsst = &vss_task_struct; + /** * Check if vss status flag \a P (playing) is set. * @@ -97,6 +136,25 @@ unsigned int vss_stopped(void) && !(mmd->new_vss_status_flags & VSS_PLAYING); } +/** + * Get the data of the given chunk. + * + * \param chunk_num The number of the desired chunk. + * \param buf Chunk data. + * \param len Chunk length in bytes. + * + * \return Standard. + */ +int vss_get_chunk(long unsigned chunk_num, char **buf, size_t *len) +{ + if (!map || !vss_playing()) + return -E_CHUNK; + if (chunk_num >= mmd->afd.afhi.chunks_total) + return -E_CHUNK; + afh_get_chunk(chunk_num, &mmd->afd.afhi, map, buf, len); + return 1; +} + /** * Initialize the virtual streaming system. * @@ -111,21 +169,22 @@ void vss_init(void) conf.announce_time_arg : 300, autoplay_delay = conf.autoplay_delay_arg > 0? conf.autoplay_delay_arg : 0; - ms2tv(announce_time, &announce_tv); - PARA_INFO_LOG("announce timeval: %lums\n", tv2ms(&announce_tv)); + ms2tv(announce_time, &vsst->announce_tv); + PARA_INFO_LOG("announce timeval: %lums\n", tv2ms(&vsst->announce_tv)); for (i = 0; senders[i].name; i++) { PARA_NOTICE_LOG("initializing %s sender\n", senders[i].name); senders[i].init(&senders[i]); } free(hn); free(home); + mmd->sender_cmd_data.cmd_num = -1; if (conf.autoplay_given) { struct timeval now, tmp; mmd->vss_status_flags |= VSS_PLAYING; mmd->new_vss_status_flags |= VSS_PLAYING; gettimeofday(&now, NULL); ms2tv(autoplay_delay, &tmp); - tv_add(&now, &tmp, &autoplay_barrier); + tv_add(&now, &tmp, &vsst->autoplay_barrier); } } @@ -159,12 +218,12 @@ static struct timeval *vss_compute_timeout(void) return &the_timeout; } gettimeofday(&now, NULL); - if (chk_barrier("autoplay_delay", &now, &autoplay_barrier, + if (chk_barrier("autoplay_delay", &now, &vsst->autoplay_barrier, &the_timeout, 1) < 0) return &the_timeout; - if (chk_barrier("eof", &now, &eof_barrier, &the_timeout, 1) < 0) + if (chk_barrier("eof", &now, &vsst->eof_barrier, &the_timeout, 1) < 0) return &the_timeout; - if (chk_barrier("data send", &now, &data_send_barrier, + if (chk_barrier("data send", &now, &vsst->data_send_barrier, &the_timeout, 1) < 0) return &the_timeout; if (!vss_playing() || !map) @@ -182,19 +241,15 @@ static struct timeval *vss_compute_timeout(void) static void vss_eof(void) { struct timeval now; - int i; char *tmp; - if (!map) { - for (i = 0; senders[i].name; i++) - senders[i].shutdown_clients(); + if (!map) return; - } if (mmd->new_vss_status_flags & VSS_NOMORE) mmd->new_vss_status_flags = VSS_NEXT; gettimeofday(&now, NULL); - tv_add(&mmd->afd.afhi.eof_tv, &now, &eof_barrier); - munmap(map, mmd->size); + tv_add(&mmd->afd.afhi.eof_tv, &now, &vsst->eof_barrier); + para_munmap(map, mmd->size); map = NULL; mmd->chunks_sent = 0; mmd->offset = 0; @@ -250,17 +305,18 @@ struct timeval *vss_chunk_time(void) return &mmd->afd.afhi.chunk_tv; } -/** The possible states of the afs socket. See \ref afs_socket. */ -enum afs_socket_status { - /** Socket is inactive. */ - AFS_SOCKET_READY, - /** Socket fd was included in the write fd set for select(). */ - AFS_SOCKET_CHECK_FOR_WRITE, - /** vss wrote a request to the socket and waits for afs to reply. */ - AFS_SOCKET_AFD_PENDING -}; - -static enum afs_socket_status afsss; +static int need_to_request_new_audio_file(void) +{ + if (map) /* have audio file */ + return 0; + if (!vss_playing()) /* don't need one */ + return 0; + if (mmd->new_vss_status_flags & VSS_NOMORE) + return 0; + if (vsst->afsss == AFS_SOCKET_AFD_PENDING) /* already requested one */ + return 0; + return 1; +} /** * Compute the timeout for para_server's main select-loop. @@ -286,49 +342,40 @@ static enum afs_socket_status afsss; struct timeval *vss_preselect(fd_set *rfds, fd_set *wfds, int *max_fileno) { int i; - struct timeval *tv; + struct timeval now; - para_fd_set(afs_socket, rfds, max_fileno); - if (!map) + if (!map || vss_next() || vss_paused() || vss_repos()) for (i = 0; senders[i].name; i++) senders[i].shutdown_clients(); - else { - if (vss_next()) { - vss_eof(); - tv = vss_compute_timeout(); - goto out; - } - } - if (vss_paused() || vss_repos()) { - for (i = 0; senders[i].name; i++) - senders[i].shutdown_clients(); - if (map) { - struct timeval now; + if (vss_next()) + vss_eof(); + else if (vss_paused()) { + if (mmd->chunks_sent) { gettimeofday(&now, NULL); - if (!vss_paused() || mmd->chunks_sent) - tv_add(&mmd->afd.afhi.eof_tv, &now, &eof_barrier); - if (vss_repos()) - tv_add(&now, &announce_tv, &data_send_barrier); + tv_add(&mmd->afd.afhi.eof_tv, &now, &vsst->eof_barrier); } mmd->chunks_sent = 0; - } - if (vss_repos()) { - mmd->new_vss_status_flags &= ~(VSS_REPOS); + } else if (vss_repos()) { + gettimeofday(&now, NULL); + tv_add(&now, &vsst->announce_tv, &vsst->data_send_barrier); + tv_add(&mmd->afd.afhi.eof_tv, &now, &vsst->eof_barrier); + mmd->chunks_sent = 0; mmd->current_chunk = mmd->repos_request; + mmd->new_vss_status_flags &= ~VSS_REPOS; } - tv = vss_compute_timeout(); - if (tv) - goto out; - if (!map && vss_playing() && - !(mmd->new_vss_status_flags & VSS_NOMORE)) { - if (afsss != AFS_SOCKET_AFD_PENDING) { - PARA_DEBUG_LOG("ready and playing, but no audio file\n"); - para_fd_set(afs_socket, wfds, max_fileno); - afsss = AFS_SOCKET_CHECK_FOR_WRITE; - } + + if (need_to_request_new_audio_file()) { + PARA_DEBUG_LOG("ready and playing, but no audio file\n"); + para_fd_set(afs_socket, wfds, max_fileno); + vsst->afsss = AFS_SOCKET_CHECK_FOR_WRITE; + } else + para_fd_set(afs_socket, rfds, max_fileno); + for (i = 0; senders[i].name; i++) { + if (!senders[i].pre_select) + continue; + senders[i].pre_select(max_fileno, rfds, wfds); } -out: - return tv; + return vss_compute_timeout(); } static int recv_afs_msg(int *fd, uint32_t *code, uint32_t *data) @@ -350,7 +397,7 @@ static int recv_afs_msg(int *fd, uint32_t *code, uint32_t *data) ret = recvmsg(afs_socket, &msg, 0); if (ret < 0) return -ERRNO_TO_PARA_ERROR(errno); - afsss = AFS_SOCKET_READY; + vsst->afsss = AFS_SOCKET_READY; if (iov.iov_len != sizeof(buf)) return -E_AFS_SHORT_READ; *code = *(uint32_t*)buf; @@ -407,7 +454,7 @@ static void recv_afs_result(void) mmd->num_played++; mmd->new_vss_status_flags &= (~VSS_NEXT); gettimeofday(&now, NULL); - tv_add(&now, &announce_tv, &data_send_barrier); + tv_add(&now, &vsst->announce_tv, &vsst->data_send_barrier); return; err: free(mmd->afd.afhi.chunk_table); @@ -417,37 +464,6 @@ err: mmd->new_vss_status_flags = VSS_NEXT; } -void vss_post_select(fd_set *rfds, fd_set *wfds) -{ - int ret; - - if (FD_ISSET(afs_socket, rfds)) - recv_afs_result(); - if (afsss != AFS_SOCKET_CHECK_FOR_WRITE || !FD_ISSET(afs_socket, wfds)) - return; - PARA_NOTICE_LOG("requesting new fd from afs\n"); - ret = send_buffer(afs_socket, "new"); - afsss = AFS_SOCKET_AFD_PENDING; -} -/** - * Get the data of the given chunk. - * - * \param chunk_num The number of the desired chunk. - * \param buf Chunk data. - * \param len Chunk length in bytes. - * - * \return Standard. - */ -int vss_get_chunk(long unsigned chunk_num, char **buf, size_t *len) -{ - if (!map || !vss_playing()) - return -E_CHUNK; - if (chunk_num >= mmd->afd.afhi.chunks_total) - return -E_CHUNK; - afh_get_chunk(chunk_num, &mmd->afd.afhi, map, buf, len); - return 1; -} - /** * Main sending function. * @@ -457,7 +473,7 @@ int vss_get_chunk(long unsigned chunk_num, char **buf, size_t *len) * each supported sender's send() function which is supposed to send out the data * to all connected clients. */ -void vss_send_chunk(void) +static void vss_send_chunk(void) { int i; struct timeval now, due; @@ -471,15 +487,15 @@ void vss_send_chunk(void) &mmd->stream_start, &due); if (tv_diff(&due, &now, NULL) > 0) return; - if (chk_barrier("eof", &now, &eof_barrier, &due, 1) < 0) + if (chk_barrier("eof", &now, &vsst->eof_barrier, &due, 1) < 0) return; - if (chk_barrier("data send", &now, &data_send_barrier, + if (chk_barrier("data send", &now, &vsst->data_send_barrier, &due, 1) < 0) return; mmd->new_vss_status_flags &= ~VSS_REPOS; if (mmd->current_chunk >= mmd->afd.afhi.chunks_total) { /* eof */ mmd->new_vss_status_flags |= VSS_NEXT; - return vss_eof(); + return; } /* * We call the send function also in case of empty chunks as they @@ -499,3 +515,31 @@ void vss_send_chunk(void) mmd->chunks_sent++; mmd->current_chunk++; } + +void vss_post_select(fd_set *rfds, fd_set *wfds) +{ + int ret, i; + + if (mmd->sender_cmd_data.cmd_num >= 0) { + int num = mmd->sender_cmd_data.cmd_num, + s = mmd->sender_cmd_data.sender_num; + + if (senders[s].client_cmds[num]) + senders[s].client_cmds[num](&mmd->sender_cmd_data); + mmd->sender_cmd_data.cmd_num = -1; + } + if (vsst->afsss != AFS_SOCKET_CHECK_FOR_WRITE) { + if (FD_ISSET(afs_socket, rfds)) + recv_afs_result(); + } else if (FD_ISSET(afs_socket, wfds)) { + PARA_NOTICE_LOG("requesting new fd from afs\n"); + ret = send_buffer(afs_socket, "new"); + vsst->afsss = AFS_SOCKET_AFD_PENDING; + } + for (i = 0; senders[i].name; i++) { + if (!senders[i].post_select) + continue; + senders[i].post_select(rfds, wfds); + } + vss_send_chunk(); +}