X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=vss.c;h=dbf8547c28e1bd091fac6f5a62e7f040e4bc8889;hp=5b7ed320aaaa38885f208dda80ad51d690999b2c;hb=61250cf03241bf73662dac3753e44660a572fa2a;hpb=97f53e18953fc2013c0b14f0261ac385e45b0284 diff --git a/vss.c b/vss.c index 5b7ed320..dbf8547c 100644 --- a/vss.c +++ b/vss.c @@ -17,14 +17,17 @@ #include #include "para.h" +#include "error.h" +#include "string.h" #include "afh.h" +#include "afs.h" #include "server.h" +#include "net.h" #include "server.cmdline.h" #include "afs_common.h" #include "vss.h" #include "send.h" -#include "error.h" -#include "string.h" +#include "ipc.h" #include "fd.h" extern const char *status_item_list[]; @@ -497,6 +500,14 @@ struct timeval *vss_chunk_time(void) return &mmd->afi.chunk_tv; } +enum afs_socket_status { + AFS_SOCKET_READY, + AFS_SOCKET_CHECK_FOR_WRITE, + AFS_SOCKET_AFD_PENDING +}; + +static enum afs_socket_status afsss; + /** * compute the timeout for para_server's main select-loop * @@ -514,12 +525,15 @@ struct timeval *vss_chunk_time(void) * \return A pointer to a struct timeval containing the timeout for the next * chunk of data to be sent, or NULL if we're not sending right now. */ -struct timeval *vss_preselect(void) +struct timeval *vss_preselect(fd_set *rfds, fd_set *wfds, int *max_fileno) { struct audio_format_handler *af = NULL; int i, format; struct timeval *ret; -again: + + para_fd_set(afs_socket, rfds, max_fileno); + +//again: format = mmd->audio_format; if (format >= 0) af = afl + format; @@ -553,12 +567,113 @@ again: if (!ret && !map && vss_playing() && !(mmd->new_vss_status_flags & VSS_NOMORE)) { PARA_DEBUG_LOG("%s", "ready and playing, but no audio file\n"); - vss_get_audio_file(); - goto again; + //vss_get_audio_file(); + if (afsss == AFS_SOCKET_READY) { + para_fd_set(afs_socket, wfds, max_fileno); + afsss = AFS_SOCKET_CHECK_FOR_WRITE; + } +// goto again; } return ret; } +static int recv_afs_msg(int *fd, uint32_t *code, uint32_t *data) +{ + char control[255], buf[8]; + struct msghdr msg = {.msg_iov = NULL}; + struct cmsghdr *cmsg; + struct iovec iov; + int ret = 0; + + iov.iov_base = buf; + iov.iov_len = sizeof(buf); + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + msg.msg_control = control; + msg.msg_controllen = sizeof(control); + memset(buf, 0, sizeof(buf)); + ret = recvmsg(afs_socket, &msg, 0); + if (ret < 0) + return -ERRNO_TO_PARA_ERROR(errno); + if (iov.iov_len != sizeof(buf)) + return -E_SHORT_AFS_READ; + *code = *(uint32_t*)buf; + *data = *(uint32_t*)(buf + 4); + cmsg = CMSG_FIRSTHDR(&msg); + for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) { + if (cmsg->cmsg_level != SOL_SOCKET + || cmsg->cmsg_type != SCM_RIGHTS) + continue; + if ((cmsg->cmsg_len - CMSG_LEN(0)) / sizeof(int) != 1) + continue; + *fd = *(int *)CMSG_DATA(cmsg); + } + return 1; +} + +static void recv_afs_result(void) +{ + int ret, passed_fd = -1, shmid; + uint32_t afs_code = 0, afs_data = 0; + struct stat statbuf; + struct timeval now; + + ret = recv_afs_msg(&passed_fd, &afs_code, &afs_data); + if (ret < 0) + goto err; + PARA_NOTICE_LOG("got the fd: %d, code: %u, shmid: %u\n", + passed_fd, afs_code, afs_data); + ret = -E_BAD_AFS_CODE; + if (afs_code != NEXT_AUDIO_FILE) + goto err; + afsss = AFS_SOCKET_READY; + shmid = afs_data; + ret = load_afd(shmid, &mmd->afd); + if (ret < 0) + goto err; + shm_destroy(shmid); + PARA_NOTICE_LOG("next audio file: %s (%lu chunks)\n", mmd->afd.path, + mmd->afd.afhi.chunks_total); + ret = fstat(passed_fd, &statbuf); + if (ret < 0) { + ret = -ERRNO_TO_PARA_ERROR(errno); + goto err; + } + mmd->size = statbuf.st_size; + mmd->mtime = statbuf.st_mtime; + map = para_mmap(mmd->size, PROT_READ, MAP_PRIVATE, + passed_fd, 0); + strcpy(mmd->filename, mmd->afd.path); /* FIXME: check length */ + mmd->afi.header_len = 0; /* default: no header */ + mmd->audio_format = mmd->afd.afsi.audio_format_id; + mmd->chunks_sent = 0; + mmd->current_chunk = 0; + mmd->offset = 0; + mmd->events++; + mmd->num_played++; + mmd->afi = mmd->afd.afhi; + mmd->new_vss_status_flags &= (~VSS_NEXT); + gettimeofday(&now, NULL); + tv_add(&now, &announce_tv, &data_send_barrier); + return; +err: + if (passed_fd >= 0) + close(passed_fd); + PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret)); +} + +void vss_post_select(fd_set *rfds, fd_set *wfds) +{ + int ret; + + if (FD_ISSET(afs_socket, rfds)) + recv_afs_result(); + if (afsss != AFS_SOCKET_CHECK_FOR_WRITE || !FD_ISSET(afs_socket, wfds)) + return; + ret = send_buffer(afs_socket, "new"); + afsss = AFS_SOCKET_AFD_PENDING; +} + static void get_chunk(long unsigned chunk_num, char **buf, size_t *len) { size_t pos = mmd->afi.chunk_table[chunk_num];