Switch to the new afs.
authorAndre Noll <maan@systemlinux.org>
Mon, 22 Oct 2007 20:23:31 +0000 (22:23 +0200)
committerAndre Noll <maan@systemlinux.org>
Mon, 22 Oct 2007 20:23:31 +0000 (22:23 +0200)
This obsoletes get_audio_file() of vss.c. It is being replaced
by a call to the afs layer from vss_post_select() which requests
a struct audio_format_data for the next audio file.

The afd struct, the chunk table and the path of the new audio file are
stored in a shared memory area. The id of that area is sent through
the afs-server socket. An open fd for the underlying audio file is
send to the server process as well using usual socket magic.

The vss task of the server process attaches the shared memory area
and mmaps the open fd to start audio streaming.

The code is still quite buggy, but let's do bug fixes and removal of
the old audio file selectors in susequent patches.

29 files changed:
aac_afh.c
afs.c
afs.cmd
afs.h
afs_common.c
aft.c
command.c
dccp_send.c
error.h
fd.c
fd.h
http_send.c
mp3_afh.c
mysql_selector.c
mysql_selector.cmd
ogg_afh.c
ortp_send.c
osl.h
playlist_selector.c
playlist_selector.cmd
random_selector.c
random_selector.cmd
send.h
server.c
server.cmd
server.h
string.c
vss.c
vss.h

index d0d437e..76b2849 100644 (file)
--- a/aac_afh.c
+++ b/aac_afh.c
 /** \file aac_afh.c para_server's aac audio format handler */
 
 #include "para.h"
-#include "afh.h"
-#include "server.h"
 #include "error.h"
 #include "string.h"
+#include "afh.h"
+#include "afs.h"
+#include "server.h"
 #include "aac.h"
 
 static int aac_find_stsz(unsigned char *buf, size_t buflen, off_t *skip)
diff --git a/afs.c b/afs.c
index c4e8d16..2e11539 100644 (file)
--- a/afs.c
+++ b/afs.c
 #include <fnmatch.h>
 #include "server.cmdline.h"
 #include "para.h"
+#include "error.h"
 #include "string.h"
 #include "afh.h"
+#include "afs.h"
 #include "server.h"
-#include "error.h"
 #include <dirent.h> /* readdir() */
 #include <sys/mman.h>
 #include <sys/time.h>
 #include "net.h"
-#include "afs.h"
 #include "ipc.h"
 #include "list.h"
 #include "sched.h"
@@ -75,6 +75,11 @@ struct command_task {
        struct task task;
 };
 
+static int server_socket;
+static struct command_task command_task_struct;
+static struct signal_task signal_task_struct;
+
+
 /**
  * A random number used to "authenticate" the connection.
  *
@@ -425,6 +430,40 @@ int stdin_command(int fd, struct osl_object *arg_obj, callback_function *f,
        return ret;
 }
 
+int pass_afd(int fd, char *buf, size_t size)
+{
+       struct msghdr msg = {.msg_iov = NULL};
+       struct cmsghdr *cmsg;
+       char control[255];
+       int ret;
+       struct iovec iov;
+
+       iov.iov_base = buf;
+       iov.iov_len  = size;
+
+       msg.msg_iov = &iov;
+       msg.msg_iovlen = 1;
+
+       msg.msg_control = control;
+       msg.msg_controllen = sizeof(control);
+
+       cmsg = CMSG_FIRSTHDR(&msg);
+       cmsg->cmsg_level = SOL_SOCKET;
+       cmsg->cmsg_type = SCM_RIGHTS;
+       cmsg->cmsg_len = CMSG_LEN(sizeof(int));
+       *(int *)CMSG_DATA(cmsg) = fd;
+
+       /* Sum of the length of all control messages in the buffer */
+       msg.msg_controllen = cmsg->cmsg_len;
+       PARA_NOTICE_LOG("passing %zu bytes and fd %d\n", size, fd);
+       ret = sendmsg(server_socket, &msg, 0);
+       if (ret < 0) {
+               ret = -ERRNO_TO_PARA_ERROR(errno);
+               return ret;
+       }
+       return 1;
+}
+
 /**
  * Open the audio file with highest score.
  *
@@ -437,33 +476,35 @@ int stdin_command(int fd, struct osl_object *arg_obj, callback_function *f,
  *
  * \sa close_audio_file(), open_and_update_audio_file().
  */
-int open_next_audio_file(struct audio_file_data *afd)
+int open_next_audio_file(void)
 {
        struct osl_row *aft_row;
-       int ret;
-       for (;;) {
-               ret = score_get_best(&aft_row, &afd->score);
-               if (ret < 0)
-                       return ret;
-               ret = open_and_update_audio_file(aft_row, afd);
-               if (ret >= 0)
-                       return ret;
-       }
-}
+       struct audio_file_data afd;
+       int ret, shmid;
+       char buf[8];
 
-/**
- * Free all resources which were allocated by open_next_audio_file().
- *
- * \param afd The structure previously filled in by open_next_audio_file().
- *
- * \return The return value of the underlying call to para_munmap().
- *
- * \sa open_next_audio_file().
- */
-int close_audio_file(struct audio_file_data *afd)
-{
-       free(afd->afhi.chunk_table);
-       return para_munmap(afd->map.data, afd->map.size);
+       PARA_NOTICE_LOG("getting next af\n");
+       ret = score_get_best(&aft_row, &afd.score);
+       if (ret < 0)
+               return ret;
+       ret = open_and_update_audio_file(aft_row, &afd);
+       if (ret < 0)
+               return ret;
+       shmid = ret;
+       PARA_NOTICE_LOG("shmid: %u\n", shmid);
+       if (!write_ok(server_socket)) {
+               PARA_EMERG_LOG("afs_socket not writable\n");
+               goto destroy;
+       }
+       *(uint32_t *)buf = NEXT_AUDIO_FILE;
+       *(uint32_t *)(buf + 4) = (uint32_t)shmid;
+       ret = pass_afd(afd.fd, buf, 8);
+       if (ret >= 0)
+               return ret;
+       PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret));
+destroy:
+       shm_destroy(shmid);
+       return ret;
 }
 
 static enum play_mode init_admissible_files(void)
@@ -572,10 +613,6 @@ static int open_afs_tables(void)
        return ret;
 }
 
-static int server_socket;
-static struct command_task command_task_struct;
-static struct signal_task signal_task_struct;
-
 static void unregister_tasks(void)
 {
        unregister_task(&command_task_struct.task);
@@ -639,6 +676,7 @@ static void command_pre_select(struct sched *s, struct task *t)
        struct command_task *ct = t->private_data;
        struct afs_client *client;
 
+       para_fd_set(server_socket, &s->rfds, &s->max_fileno);
        para_fd_set(ct->fd, &s->rfds, &s->max_fileno);
        list_for_each_entry(client, &afs_client_list, node)
                para_fd_set(client->fd, &s->rfds, &s->max_fileno);
@@ -700,6 +738,27 @@ out:
        return ret;
 }
 
+static void execute_server_command(void)
+{
+       char buf[8];
+       int ret = recv_bin_buffer(server_socket, buf, sizeof(buf) - 1);
+
+       if (ret <= 0) {
+               if (ret < 0)
+                       PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret));
+               return;
+       }
+       buf[ret] = '\0';
+       PARA_NOTICE_LOG("received: %s\n", buf);
+       if (!strcmp(buf, "new")) {
+               ret = open_next_audio_file();
+               PARA_NOTICE_LOG("ret: %d\n", ret);
+               return;
+       }
+       PARA_ERROR_LOG("unknown command\n");
+
+}
+
 static void execute_afs_command(int fd, uint32_t expected_cookie)
 {
        uint32_t cookie;
@@ -741,7 +800,10 @@ static void command_post_select(struct sched *s, struct task *t)
        struct sockaddr_un unix_addr;
        struct afs_client *client, *tmp;
 
-       /* First, check the list of connected clients. */
+       if (FD_ISSET(server_socket, &s->rfds))
+               execute_server_command();
+
+       /* Check the list of connected clients. */
        list_for_each_entry_safe(client, tmp, &afs_client_list, node) {
                if (FD_ISSET(client->fd, &s->rfds))
                        execute_afs_command(client->fd, ct->cookie);
@@ -756,7 +818,7 @@ static void command_post_select(struct sched *s, struct task *t)
                list_del(&client->node);
                free(client);
        }
-       /* Next, accept connections on the local socket. */
+       /* Accept connections on the local socket. */
        if (!FD_ISSET(ct->fd, &s->rfds))
                goto out;
        t->ret = para_accept(ct->fd, &unix_addr, sizeof(unix_addr));
diff --git a/afs.cmd b/afs.cmd
index 15a30d0..92f2e56 100644 (file)
--- a/afs.cmd
+++ b/afs.cmd
@@ -3,7 +3,7 @@ SF: afs.c aft.c attribute.c
 HC: Prototypes for the commands of the audio file selector.
 CC: Array of commands for the audio file selector.
 AT: server_command
-IN: para afh server list user_list
+IN: para error string afh afs server list user_list
 SN: list of afs commands
 TM: mood lyr img pl
 ---
diff --git a/afs.h b/afs.h
index db931d9..6163072 100644 (file)
--- a/afs.h
+++ b/afs.h
@@ -72,11 +72,16 @@ enum play_mode {PLAY_MODE_MOOD, PLAY_MODE_PLAYLIST};
 
 struct audio_file_data {
        enum play_mode current_play_mode;
+       int fd;
        long score;
        struct afs_info afsi;
        struct audio_format_info afhi;
        char *path;
-       struct osl_object map;
+};
+
+enum afs_server_code {
+       NEXT_AUDIO_FILE,
+       AFD_CHANGE
 };
 
 /** Flags passed to for_each_matching_row(). */
@@ -124,7 +129,6 @@ int send_option_arg_callback_request(struct osl_object *options,
 int stdin_command(int fd, struct osl_object *arg_obj, callback_function *f,
                unsigned max_len, struct osl_object *result);
 int string_compare(const struct osl_object *obj1, const struct osl_object *obj2);
-int open_next_audio_file(struct audio_file_data *afd);
 int close_audio_file(struct audio_file_data *afd);
 int for_each_matching_row(struct pattern_match_data *pmd);
 
@@ -151,6 +155,7 @@ int get_attribute_text(uint64_t *atts, const char *delim, char **text);
 void aft_init(struct afs_table *t);
 int aft_get_row_of_path(const char *path, struct osl_row **row);
 int open_and_update_audio_file(struct osl_row *aft_row, struct audio_file_data *afd);
+int load_afd(int shmid, struct audio_file_data *afd);
 int load_afsi(struct afs_info *afsi, struct osl_object *obj);
 void save_afsi(struct afs_info *afsi, struct osl_object *obj);
 int get_afsi_of_row(const struct osl_row *row, struct afs_info *afsi);
index 17b4e16..c45b3d9 100644 (file)
 
 
 #include "para.h"
+#include "error.h"
+#include "string.h"
 #include "fd.h"
 #include "server.cmdline.h"
 #include "afh.h"
+#include "afs.h"
 #include "server.h"
 #include "vss.h"
 #include <dirent.h> /* readdir() */
 #include <sys/stat.h> /* stat */
 #include <sys/types.h> /* mode_t */
-#include "error.h"
-#include "string.h"
 
 /**
  * Traverse the given directory recursively.
diff --git a/aft.c b/aft.c
index aae6d66..ff2691f 100644 (file)
--- a/aft.c
+++ b/aft.c
@@ -17,6 +17,7 @@
 #include "net.h"
 #include "vss.h"
 #include "fd.h"
+#include "ipc.h"
 
 static struct osl_table *audio_file_table;
 
@@ -372,11 +373,23 @@ enum chunk_info_offsets{
        CHUNK_TABLE_OFFSET = 20,
 };
 
-/* TODO: audio format handlers could just produce this */
-static void save_chunk_info(struct audio_format_info *afhi, char *buf)
+static void save_chunk_table(struct audio_format_info *afhi, char *buf)
 {
        int i;
+       for (i = 0; i < afhi->chunks_total; i++)
+               write_u32(buf + 4 * i, afhi->chunk_table[i]);
+}
 
+static void load_chunk_table(struct audio_format_info *afhi, char *buf)
+{
+       int i;
+       for (i = 0; i < afhi->chunks_total; i++)
+               afhi->chunk_table[i] = read_u32(buf + 4 * i);
+}
+
+/* TODO: audio format handlers could just produce this */
+static void save_chunk_info(struct audio_format_info *afhi, char *buf)
+{
        if (!afhi)
                return;
        write_u32(buf + CHUNKS_TOTAL_OFFSET, afhi->chunks_total);
@@ -384,14 +397,12 @@ static void save_chunk_info(struct audio_format_info *afhi, char *buf)
        write_u32(buf + HEADER_OFFSET_OFFSET, afhi->header_offset);
        write_u32(buf + CHUNK_TV_TV_SEC_OFFSET, afhi->chunk_tv.tv_sec);
        write_u32(buf + CHUNK_TV_TV_USEC, afhi->chunk_tv.tv_usec);
-       for (i = 0; i < afhi->chunks_total; i++)
-               write_u32(buf + CHUNK_TABLE_OFFSET + 4 * i, afhi->chunk_table[i]);
+       save_chunk_table(afhi, buf + CHUNK_TABLE_OFFSET);
 }
 
 static int load_chunk_info(struct osl_object *obj, struct audio_format_info *afhi)
 {
        char *buf = obj->data;
-       int i;
 
        if (obj->size < CHUNK_TABLE_OFFSET)
                return -E_BAD_DATA_SIZE;
@@ -405,8 +416,7 @@ static int load_chunk_info(struct osl_object *obj, struct audio_format_info *afh
        if (afhi->chunks_total * 4 + CHUNK_TABLE_OFFSET > obj->size)
                return -E_BAD_DATA_SIZE;
        afhi->chunk_table = para_malloc(afhi->chunks_total * sizeof(size_t));
-       for (i = 0; i < afhi->chunks_total; i++)
-               afhi->chunk_table[i] = read_u32(buf + CHUNK_TABLE_OFFSET + 4 * i);
+       load_chunk_table(afhi, buf + CHUNK_TABLE_OFFSET);
        return 1;
 }
 
@@ -581,6 +591,7 @@ int get_afhi_of_row(const struct osl_row *row, struct audio_format_info *afhi)
        return 1;
 }
 
+#if 0
 /**
  * Get the chunk table of an audio file, given a row of the audio file table.
  *
@@ -602,6 +613,58 @@ static int get_chunk_table_of_row(const struct osl_row *row, struct audio_format
        osl_close_disk_object(&obj);
        return ret;
 }
+#endif
+
+/* returns shmid on success */
+static int save_afd(struct audio_file_data *afd)
+{
+       size_t path_size = strlen(afd->path) + 1;
+       size_t size = sizeof(*afd) + path_size
+               + 4 * afd->afhi.chunks_total;
+
+       PARA_NOTICE_LOG("size: %zu\n", size);
+       int shmid, ret = shm_new(size);
+       void *shm_afd;
+       char *buf;
+
+       if (ret < 0)
+               return ret;
+       shmid = ret;
+       ret = shm_attach(shmid, ATTACH_RW, &shm_afd);
+       if (ret < 0)
+               goto err;
+       *(struct audio_file_data *)shm_afd = *afd;
+       buf = shm_afd;
+       buf += sizeof(*afd);
+       strcpy(buf, afd->path);
+       buf += path_size;
+       save_chunk_table(&afd->afhi, buf);
+       shm_detach(shm_afd);
+       return shmid;
+err:
+       shm_destroy(shmid);
+       return ret;
+}
+
+int load_afd(int shmid, struct audio_file_data *afd)
+{
+       void *shm_afd;
+       char *buf;
+       int ret;
+
+       ret = shm_attach(shmid, ATTACH_RO, &shm_afd);
+       if (ret < 0)
+               return ret;
+       *afd = *(struct audio_file_data *)shm_afd;
+       buf = shm_afd;
+       buf += sizeof(*afd);
+       afd->path = para_strdup(buf);
+       buf += strlen(buf) + 1;
+       afd->afhi.chunk_table = para_malloc(afd->afhi.chunks_total * sizeof(size_t));
+       load_chunk_table(&afd->afhi, buf);
+       shm_detach(shm_afd);
+       return 1;
+}
 
 /**
  * Mmap the given audio file and update statistics.
@@ -622,6 +685,7 @@ int open_and_update_audio_file(struct osl_row *aft_row, struct audio_file_data *
        struct afs_info new_afsi;
        int ret = get_hash_of_row(aft_row, &aft_hash);
        struct afsi_change_event_data aced;
+       struct osl_object map, chunk_table_obj;
 
        if (ret < 0)
                return ret;
@@ -637,29 +701,39 @@ int open_and_update_audio_file(struct osl_row *aft_row, struct audio_file_data *
        ret = get_afhi_of_row(aft_row, &afd->afhi);
        if (ret < 0)
                return ret;
-       ret = get_chunk_table_of_row(aft_row, &afd->afhi);
+       ret = osl_open_disk_object(audio_file_table, aft_row,
+               AFTCOL_CHUNKS, &chunk_table_obj);
        if (ret < 0)
                return ret;
-       ret = mmap_full_file(afd->path, O_RDONLY, &afd->map.data,
-               &afd->map.size, NULL);
+       ret = mmap_full_file(afd->path, O_RDONLY, &map.data,
+               &map.size, &afd->fd);
        if (ret < 0)
                goto err;
-       hash_function(afd->map.data, afd->map.size, file_hash);
-       ret = -E_HASH_MISMATCH;
-       if (hash_compare(file_hash, aft_hash))
+       hash_function(map.data, map.size, file_hash);
+       ret = hash_compare(file_hash, aft_hash);
+       para_munmap(map.data, map.size);
+       if (ret) {
+               ret = -E_HASH_MISMATCH;
                goto err;
+       }
        new_afsi = afd->afsi;
        new_afsi.num_played++;
        new_afsi.last_played = time(NULL);
        save_afsi(&new_afsi, &afsi_obj); /* in-place update */
 
+       ret = load_chunk_info(&chunk_table_obj, &afd->afhi);
+       if (ret < 0)
+               goto err;
+
        aced.aft_row = aft_row;
        aced.old_afsi = &afd->afsi;
        afs_event(AFSI_CHANGE, NULL, &aced);
-
-       return ret;
-err:
+       ret = save_afd(afd);
+       if (ret < 0)
+               goto err;
        free(afd->afhi.chunk_table);
+err:
+       osl_close_disk_object(&chunk_table_obj);
        return ret;
 }
 
@@ -1550,7 +1624,7 @@ static int add_one_audio_file(const char *path, const void *private_data)
        const struct private_add_data *pad = private_data;
        struct audio_format_info afhi, *afhi_ptr = NULL;
        struct osl_row *pb = NULL, *hs = NULL; /* path brother/hash sister */
-       struct osl_object map, obj = {.data = NULL}, query, result;
+       struct osl_object map, obj = {.data = NULL}, query, result = {.data = NULL};
        HASH_TYPE hash[HASH_SIZE];
 
        afhi.header_offset = 0;
@@ -1617,7 +1691,7 @@ static int add_one_audio_file(const char *path, const void *private_data)
        save_audio_file_info(hash, path, afhi_ptr, pad->flags, format_num, &obj);
        /* Ask afs to consider this entry for adding. */
        ret = send_callback_request(com_add_callback, &obj, &result);
-       if (result.data && result.size) {
+       if (ret >= 0 && result.data && result.size) {
                ret2 = send_va_buffer(pad->fd, "%s", (char *)result.data);
                free(result.data);
                if (ret >= 0 && ret2 < 0)
index 97924f2..a84d3b8 100644 (file)
--- a/command.c
+++ b/command.c
 #include <openssl/rc4.h>
 
 #include "para.h"
+#include "error.h"
 #include "server.cmdline.h"
 #include "afs_common.h"
+#include "string.h"
 #include "afh.h"
+#include "afs.h"
 #include "server.h"
 #include "vss.h"
 #include "send.h"
 #include "rc4.h"
-#include "error.h"
 #include "net.h"
 #include "daemon.h"
-#include "string.h"
 #include "fd.h"
 #include "list.h"
 #include "user_list.h"
index c506fd9..1ea8de2 100644 (file)
 
 #include "para.h"
 #include "error.h"
+#include "string.h"
 #include "afh.h"
+#include "afs.h"
 #include "server.h"
 #include "net.h"
 #include "list.h"
 #include "vss.h"
 #include "send.h"
 #include "dccp.h"
-#include "string.h"
 #include "fd.h"
 #include "close_on_fork.h"
 #include "chunk_queue.h"
diff --git a/error.h b/error.h
index 1c3aaa3..b6670df 100644 (file)
--- a/error.h
+++ b/error.h
@@ -64,7 +64,6 @@ extern const char **para_errlist[];
        PARA_ERROR(STAT, "can not stat file"), \
        PARA_ERROR(FSTAT, "fstat error"), \
        PARA_ERROR(RENAME, "rename failed"), \
-       PARA_ERROR(MUNMAP, "munmap failed"), \
        PARA_ERROR(WRITE, "write error"), \
        PARA_ERROR(LSEEK, "lseek error"), \
        PARA_ERROR(BUSY, "table is busy"), \
@@ -312,6 +311,8 @@ extern const char **para_errlist[];
        PARA_ERROR(BAD_AUDIO_FILE_SUFFIX, "unknown suffix"), \
        PARA_ERROR(AUDIO_FORMAT, "audio format not recognized"), \
        PARA_ERROR(CHUNK, "unable to get chunk"), \
+       PARA_ERROR(SHORT_AFS_READ, "short read from afs socket"), \
+       PARA_ERROR(BAD_AFS_CODE, "received junk from afs"), \
 
 
 #define AFS_COMMON_ERRORS \
@@ -409,6 +410,7 @@ extern const char **para_errlist[];
        PARA_ERROR(CHDIR_PERM, "insufficient permissions to chdir"), \
        PARA_ERROR(EMPTY, "file empty"), \
        PARA_ERROR(MMAP, "mmap error"), \
+       PARA_ERROR(MUNMAP, "munmap failed"), \
 
 
 #define WRITE_ERRORS \
diff --git a/fd.c b/fd.c
index 83c6cd5..f9b3d9d 100644 (file)
--- a/fd.c
+++ b/fd.c
@@ -332,3 +332,47 @@ out:
                *fd_ptr = fd;
        return ret;
 }
+
+/**
+ * A wrapper for munmap(2).
+ *
+ * \param start The start address of the memory mapping.
+ * \param length The size of the mapping.
+ *
+ * \return Positive on success, \p -E_MUNMAP on errors.
+ *
+ * \sa munmap(2), mmap_full_file().
+ */
+int para_munmap(void *start, size_t length)
+{
+       if (munmap(start, length) >= 0)
+               return 1;
+       PARA_ERROR_LOG("munmap (%p/%zu) failed: %s\n", start, length,
+               strerror(errno));
+       return -E_MUNMAP;
+}
+
+/**
+ * check a file descriptor for writability
+ *
+ * \param fd the file descriptor
+ *
+ * \return positive if fd is ready for writing, zero if it isn't, negative if
+ * an error occurred.
+ */
+
+int write_ok(int fd)
+{
+       struct timeval tv = {0, 0};
+       fd_set wfds;
+       int ret;
+again:
+       FD_ZERO(&wfds);
+       FD_SET(fd, &wfds);
+       tv.tv_sec = 0;
+       tv.tv_usec = 0;
+       ret = select(fd + 1, NULL, &wfds, NULL, &tv);
+       if (ret < 0 && errno == EINTR)
+               goto again;
+       return ret;
+}
diff --git a/fd.h b/fd.h
index 0b25959..224f832 100644 (file)
--- a/fd.h
+++ b/fd.h
@@ -20,3 +20,5 @@ int para_fchdir(int fd);
 int para_chdir(const char *path);
 int mmap_full_file(const char *filename, int open_mode, void **map,
        size_t *size, int *fd_ptr);
+int para_munmap(void *start, size_t length);
+int write_ok(int fd);
index 7ee3e55..7035f27 100644 (file)
 #include <dirent.h>
 
 #include "para.h"
+#include "error.h"
+#include "string.h"
 #include "server.cmdline.h"
 #include "afh.h"
+#include "afs.h"
 #include "server.h"
 #include "http.h"
 #include "vss.h"
 #include "send.h"
 #include "list.h"
 #include "close_on_fork.h"
-#include "error.h"
 #include "net.h"
-#include "string.h"
 #include "fd.h"
 #include "chunk_queue.h"
 
index ae7f117..e36cdef 100644 (file)
--- a/mp3_afh.c
+++ b/mp3_afh.c
  */
 
 #include "para.h"
-#include "afh.h"
-#include "server.h"
 #include "error.h"
+#include "afh.h"
 #include "string.h"
+#include "afs.h"
+#include "server.h"
 
 /** \cond some defines and structs which are only used in this file */
 
index eaee16a..5b145dc 100644 (file)
 /** \endcond */
 
 #include "para.h"
+#include "error.h"
+#include "string.h"
 #include "server.cmdline.h"
 #include "afh.h"
+#include "afs.h"
 #include "server.h"
 #include "vss.h"
 #include "afs_common.h"
 #include <mysql/mysql.h>
 #include <mysql/mysql_version.h>
 #include <regex.h>
-#include "error.h"
 #include "net.h"
-#include "string.h"
 #include "list.h"
 #include "user_list.h"
 #include "mysql_selector_command_list.h"
index 5bfcc95..a49d6d1 100644 (file)
@@ -3,7 +3,7 @@ SF: mysql_selector.c
 HC: prototypes for the commands of the mysql audio file selector
 CC: array of commands for the mysql audio file selector
 AT: server_command
-IN: para afh server list user_list
+IN: para error string afh afs server list user_list
 SN: list of mysql selector commands
 ---
 N: cam
index 71ee7e3..39859c0 100644 (file)
--- a/ogg_afh.c
+++ b/ogg_afh.c
 
 #include "para.h"
 #include "afh.h"
-#include "server.h"
 #include "error.h"
 #include "string.h"
+#include "afs.h"
+#include "server.h"
 
 /** must be big enough to hold header */
 #define CHUNK_SIZE 32768
index 9095a8f..084fdf5 100644 (file)
 
 #include "server.cmdline.h"
 #include "para.h"
+#include "error.h"
+#include "string.h"
 #include "afh.h"
+#include "afs.h"
 #include "server.h"
 #include "vss.h"
 #include "send.h"
 #include "list.h"
 #include "ortp.h"
-#include "string.h"
 
 /** \cond convert in_addr to ascii */
 #define TARGET_ADDR(oc) inet_ntoa((oc)->addr)
@@ -168,6 +170,8 @@ static void ortp_send(long unsigned current_chunk, long unsigned chunks_sent,
 
        if (self->status != SENDER_ON)
                return;
+
+//     PARA_NOTICE_LOG("sending %lu\n", current_chunk);
        chunk_tv = vss_chunk_time();
        if (!chunk_tv)
                return;
diff --git a/osl.h b/osl.h
index 1fec986..cb3194b 100644 (file)
--- a/osl.h
+++ b/osl.h
@@ -183,21 +183,3 @@ ssize_t para_write_all(int fd, const void *buf, size_t size);
 int para_lseek(int fd, off_t *offset, int whence);
 int para_write_file(const char *filename, const void *buf, size_t size);
 
-/**
- * A wrapper for munmap(2).
- *
- * \param start The start address of the memory mapping.
- * \param length The size of the mapping.
- *
- * \return Positive on success, \p -E_MUNMAP on errors.
- *
- * \sa munmap(2), mmap_full_file().
- */
-_static_inline_ int para_munmap(void *start, size_t length)
-{
-       if (munmap(start, length) >= 0)
-               return 1;
-       PARA_ERROR_LOG("munmap (%p/%zu) failed: %s\n", start, length,
-               strerror(errno));
-       return -E_MUNMAP;
-}
index 6a74a92..2d076ee 100644 (file)
@@ -8,12 +8,13 @@
 
 #include <signal.h>
 #include "para.h"
+#include "error.h"
+#include "string.h"
 #include "afh.h"
+#include "afs.h"
 #include "server.h"
 #include "afs_common.h"
-#include "error.h"
 #include "net.h"
-#include "string.h"
 #include "ipc.h"
 #include "list.h"
 #include "user_list.h"
index 188b16e..b819025 100644 (file)
@@ -3,7 +3,7 @@ SF: playlist_selector.c
 HC: prototypes for the commands of the playlist audio file selector
 CC: array of commands for the playlist audio file selector
 AT: server_command
-IN: para afh server list user_list
+IN: para error string afh afs server list user_list
 SN: list of playlist selector commands
 ---
 N: ppl
index bcbdae6..ac82058 100644 (file)
@@ -8,13 +8,14 @@
 
 #include <sys/time.h> /* gettimeofday */
 #include "para.h"
+#include "error.h"
+#include "string.h"
 #include "server.cmdline.h"
 #include "afh.h"
+#include "afs.h"
 #include "server.h"
 #include "afs_common.h"
-#include "error.h"
 #include "net.h"
-#include "string.h"
 #include "random_selector_command_list.h"
 
 extern struct misc_meta_data *mmd;
index ef68cff..67f07db 100644 (file)
@@ -3,7 +3,7 @@ SF: random_selector.c
 HC: prototypes for the commands of the random audio file selector
 CC: array of commands for the random audio file selector
 AT: server_command
-IN: para afh server list user_list
+IN: para error string afh afs server list user_list
 SN: list of random selector commands
 ---
 N: random_info
diff --git a/send.h b/send.h
index 8b4317e..94d14cf 100644 (file)
--- a/send.h
+++ b/send.h
@@ -84,27 +84,3 @@ struct sender {
        int (*client_cmds[NUM_SENDER_CMDS])(struct sender_command_data*);
 };
 
-/**
- * check a file descriptor for writability
- *
- * \param fd the file descriptor
- *
- * \return positive if fd is ready for writing, zero if it isn't, negative if
- * an error occurred.
- */
-
-static inline int write_ok(int fd)
-{
-       struct timeval tv = {0, 0};
-       fd_set wfds;
-       int ret;
-again:
-       FD_ZERO(&wfds);
-       FD_SET(fd, &wfds);
-       tv.tv_sec = 0;
-       tv.tv_usec = 0;
-       ret = select(fd + 1, NULL, &wfds, NULL, &tv);
-       if (ret < 0 && errno == EINTR)
-               goto again;
-       return ret;
-}
index 7171c6e..5777ec4 100644 (file)
--- a/server.c
+++ b/server.c
 #include <dirent.h>
 
 #include "para.h"
+#include "error.h"
 #include "server.cmdline.h"
 #include "afs_common.h"
 #include "afh.h"
+#include "string.h"
+#include "afs.h"
 #include "server.h"
 #include "vss.h"
 #include "config.h"
 #include "close_on_fork.h"
 #include "send.h"
-#include "error.h"
 #include "net.h"
 #include "daemon.h"
-#include "string.h"
 #include "ipc.h"
 #include "fd.h"
 #include "list.h"
 #include "sched.h"
 #include "signal.h"
 #include "user_list.h"
-#include "afs.h"
 
 /** define the array of error lists needed by para_server */
 INIT_SERVER_ERRLISTS;
@@ -352,7 +352,7 @@ out:
 }
 
 uint32_t afs_socket_cookie;
-static int afs_socket;
+int afs_socket;
 static pid_t afs_pid;
 
 static void init_afs(void)
@@ -499,14 +499,14 @@ repeat:
        /* check socket and signal pipe in any case */
        para_fd_set(sockfd, &rfds, &max_fileno);
        para_fd_set(signal_pipe, &rfds, &max_fileno);
-       timeout = vss_preselect();
+       timeout = vss_preselect(&rfds, &wfds, &max_fileno);
        status_refresh();
        for (i = 0; senders[i].name; i++) {
                if (senders[i].status != SENDER_ON)
                        continue;
                if (!senders[i].pre_select)
                        continue;
-               senders[i].pre_select( &max_fileno, &rfds, &wfds);
+               senders[i].pre_select(&max_fileno, &rfds, &wfds);
        }
        if (selectors[mmd->selector_num].pre_select) {
                ret = selectors[mmd->selector_num].pre_select(&rfds, &wfds);
@@ -515,6 +515,7 @@ repeat:
        mmd_unlock();
        ret = para_select(max_fileno + 1, &rfds, &wfds, timeout);
        mmd_lock();
+       vss_post_select(&rfds, &wfds);
        if (mmd->selector_change >= 0)
                change_selector();
        if (selectors[mmd->selector_num].post_select)
index da89548..62b90fd 100644 (file)
@@ -3,7 +3,7 @@ SF: command.c
 HC: prototypes for the server command handlers
 CC: array of server commands
 AT: server_command
-IN: para afh server list user_list
+IN: para error string afh afs server list user_list
 SN: list of server commands
 ---
 N: chs
index 9c98cf6..86d4f17 100644 (file)
--- a/server.h
+++ b/server.h
@@ -59,57 +59,60 @@ struct sender_command_data{
  *     date.
  */
 struct misc_meta_data {
-/** information on the current audio file  */
+       /** information on the current audio file  */
        struct audio_format_info afi;
-/** the size of the current audio file in bytes */
+       /** the size of the current audio file in bytes */
        size_t size;
-/** the full path of the current audio file */
+       /** the full path of the current audio file */
        char filename[_POSIX_PATH_MAX];
-/** the last modification file of the current audio file */
+       /** the last modification file of the current audio file */
        time_t mtime;
-/** the number of the current audio format */
+       /** the number of the current audio format */
        int audio_format;
-/** the "old" status flags -- commands may only read them */
+       /** the "old" status flags -- commands may only read them */
        unsigned int vss_status_flags;
-/** the new status flags -- commands may set them **/
+       /** The new status flags -- commands may set them. */
        unsigned int new_vss_status_flags;
-/** the number of data chunks sent for the current audio file */
+       /** the number of data chunks sent for the current audio file */
        long unsigned chunks_sent;
-/** set by the jmp/ff commands to the new position in chunks */
+       /** set by the jmp/ff commands to the new position in chunks */
        long unsigned repos_request;
-/** the number of the chunk currently sent out*/
+       /** The number of the chunk currently sent out. */
        long unsigned current_chunk;
-/** the milliseconds that have been skipped of the current audio file */
+       /** the milliseconds that have been skipped of the current audio file */
        long offset;
-/** the time para_server started to stream */
+       /** the time para_server started to stream */
        struct timeval stream_start;
-/** the event counter
- *
- * commands may increase this to force a status update to be sent to all
- * connected clients
-*/
+       /**
+        * The event counter.
+        *
+        * Commands may increase this to force a status update to be sent to all
+        * connected clients.
+        */
        unsigned int events;
-/** the number of audio files already sent */
+       /** the number of audio files already sent */
        unsigned int num_played;
-/** the number of executed commands */
+       /** the number of executed commands */
        unsigned int num_commands;
-/** the number of connections para_server received so far */
+       /** the number of connections para_server received so far */
        unsigned int num_connects;
-/** the number of connections currently active */
+       /** the number of connections currently active */
        unsigned int active_connections;
-/** the process id of para_server */
+       /** the process id of para_server */
        pid_t server_pid;
-/** a string that gets filled in by the current audio file selector */
+       /** a string that gets filled in by the current audio file selector */
        char selector_info[MMD_INFO_SIZE];
-/** the number if the current audio file selector */
+       /** The number of the current audio file selector. */
        int selector_num;
-/** commands set this to non-zero to change the current selector */
+       /** commands set this to non-zero to change the current selector */
        int selector_change;
-/** used by the sender command */
+       /** used by the sender command */
        struct sender_command_data sender_cmd_data;
+       struct audio_file_data afd;
 };
 
 extern struct server_args_info conf;
+extern int afs_socket;
 
 int handle_connect(int fd, struct sockaddr_in *addr);
 void mmd_unlock(void);
index bf10783..8c97ccf 100644 (file)
--- a/string.c
+++ b/string.c
@@ -63,7 +63,8 @@ __must_check __malloc void *para_malloc(size_t size)
        void *p = malloc(size);
 
        if (!p) {
-               PARA_EMERG_LOG("%s", "malloc failed, aborting\n");
+               PARA_EMERG_LOG("malloc failed (size = %zu),  aborting\n",
+                       size);
                exit(EXIT_FAILURE);
        }
        return p;
diff --git a/vss.c b/vss.c
index 5b7ed32..dbf8547 100644 (file)
--- a/vss.c
+++ b/vss.c
 #include <dirent.h>
 
 #include "para.h"
+#include "error.h"
+#include "string.h"
 #include "afh.h"
+#include "afs.h"
 #include "server.h"
+#include "net.h"
 #include "server.cmdline.h"
 #include "afs_common.h"
 #include "vss.h"
 #include "send.h"
-#include "error.h"
-#include "string.h"
+#include "ipc.h"
 #include "fd.h"
 
 extern const char *status_item_list[];
@@ -497,6 +500,14 @@ struct timeval *vss_chunk_time(void)
        return &mmd->afi.chunk_tv;
 }
 
+enum afs_socket_status {
+       AFS_SOCKET_READY,
+       AFS_SOCKET_CHECK_FOR_WRITE,
+       AFS_SOCKET_AFD_PENDING
+};
+
+static enum afs_socket_status afsss;
+
 /**
  * compute the timeout for para_server's main select-loop
  *
@@ -514,12 +525,15 @@ struct timeval *vss_chunk_time(void)
  * \return A pointer to a struct timeval containing the timeout for the next
  * chunk of data to be sent, or NULL if we're not sending right now.
  */
-struct timeval *vss_preselect(void)
+struct timeval *vss_preselect(fd_set *rfds, fd_set *wfds, int *max_fileno)
 {
        struct audio_format_handler *af = NULL;
        int i, format;
        struct timeval *ret;
-again:
+
+       para_fd_set(afs_socket, rfds, max_fileno);
+
+//again:
        format = mmd->audio_format;
        if (format >= 0)
                af = afl + format;
@@ -553,12 +567,113 @@ again:
        if (!ret && !map && vss_playing() &&
                        !(mmd->new_vss_status_flags & VSS_NOMORE)) {
                PARA_DEBUG_LOG("%s", "ready and playing, but no audio file\n");
-               vss_get_audio_file();
-               goto again;
+               //vss_get_audio_file();
+               if (afsss == AFS_SOCKET_READY) {
+                       para_fd_set(afs_socket, wfds, max_fileno);
+                       afsss = AFS_SOCKET_CHECK_FOR_WRITE;
+               }
+//             goto again;
        }
        return ret;
 }
 
+static int recv_afs_msg(int *fd, uint32_t *code, uint32_t *data)
+{
+       char control[255], buf[8];
+       struct msghdr msg = {.msg_iov = NULL};
+       struct cmsghdr *cmsg;
+       struct iovec iov;
+       int ret = 0;
+
+       iov.iov_base = buf;
+       iov.iov_len = sizeof(buf);
+       msg.msg_iov = &iov;
+       msg.msg_iovlen = 1;
+       msg.msg_control = control;
+       msg.msg_controllen = sizeof(control);
+       memset(buf, 0, sizeof(buf));
+       ret = recvmsg(afs_socket, &msg, 0);
+       if (ret < 0)
+               return -ERRNO_TO_PARA_ERROR(errno);
+       if (iov.iov_len != sizeof(buf))
+               return -E_SHORT_AFS_READ;
+       *code = *(uint32_t*)buf;
+       *data =  *(uint32_t*)(buf + 4);
+       cmsg = CMSG_FIRSTHDR(&msg);
+       for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
+               if (cmsg->cmsg_level != SOL_SOCKET
+                       || cmsg->cmsg_type != SCM_RIGHTS)
+               continue;
+               if ((cmsg->cmsg_len - CMSG_LEN(0)) / sizeof(int) != 1)
+               continue;
+               *fd = *(int *)CMSG_DATA(cmsg);
+       }
+       return 1;
+}
+
+static void recv_afs_result(void)
+{
+       int ret, passed_fd = -1, shmid;
+       uint32_t afs_code = 0, afs_data = 0;
+       struct stat statbuf;
+       struct timeval now;
+
+       ret = recv_afs_msg(&passed_fd, &afs_code, &afs_data);
+       if (ret < 0)
+               goto err;
+       PARA_NOTICE_LOG("got the fd: %d, code: %u, shmid: %u\n",
+               passed_fd, afs_code, afs_data);
+       ret = -E_BAD_AFS_CODE;
+       if (afs_code != NEXT_AUDIO_FILE)
+               goto err;
+       afsss = AFS_SOCKET_READY;
+       shmid = afs_data;
+       ret = load_afd(shmid, &mmd->afd);
+       if (ret < 0)
+               goto err;
+       shm_destroy(shmid);
+       PARA_NOTICE_LOG("next audio file: %s (%lu chunks)\n", mmd->afd.path,
+               mmd->afd.afhi.chunks_total);
+       ret = fstat(passed_fd, &statbuf);
+       if (ret < 0) {
+               ret = -ERRNO_TO_PARA_ERROR(errno);
+               goto err;
+       }
+       mmd->size = statbuf.st_size;
+       mmd->mtime = statbuf.st_mtime;
+       map = para_mmap(mmd->size, PROT_READ, MAP_PRIVATE,
+               passed_fd, 0);
+       strcpy(mmd->filename, mmd->afd.path); /* FIXME: check length */
+       mmd->afi.header_len = 0; /* default: no header */
+       mmd->audio_format = mmd->afd.afsi.audio_format_id;
+       mmd->chunks_sent = 0;
+       mmd->current_chunk = 0;
+       mmd->offset = 0;
+       mmd->events++;
+       mmd->num_played++;
+       mmd->afi = mmd->afd.afhi;
+       mmd->new_vss_status_flags &= (~VSS_NEXT);
+       gettimeofday(&now, NULL);
+       tv_add(&now, &announce_tv, &data_send_barrier);
+       return;
+err:
+       if (passed_fd >= 0)
+               close(passed_fd);
+       PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret));
+}
+
+void vss_post_select(fd_set *rfds, fd_set *wfds)
+{
+       int ret;
+
+       if (FD_ISSET(afs_socket, rfds))
+               recv_afs_result();
+       if (afsss != AFS_SOCKET_CHECK_FOR_WRITE || !FD_ISSET(afs_socket, wfds))
+               return;
+       ret = send_buffer(afs_socket, "new");
+       afsss = AFS_SOCKET_AFD_PENDING;
+}
+
 static void get_chunk(long unsigned chunk_num, char **buf, size_t *len)
 {
        size_t pos = mmd->afi.chunk_table[chunk_num];
diff --git a/vss.h b/vss.h
index 9b10a5e..0ef9892 100644 (file)
--- a/vss.h
+++ b/vss.h
@@ -7,7 +7,8 @@
 /** \file vss.h exported functions from vss.c (para_server) */
 void vss_init(void);
 void vss_send_chunk(void);
-struct timeval *vss_preselect(void);
+struct timeval *vss_preselect(fd_set *rfds, fd_set *wfds, int *max_fileno);
+void vss_post_select(fd_set *rfds, fd_set *wfds);
 const char *audio_format_name(int);
 unsigned int vss_playing(void);
 unsigned int vss_next(void);