gui: Simplify display command execution.
[paraslash.git] / vss.c
diff --git a/vss.c b/vss.c
index ba6f7c4c655134d11d79e6411ab92d63b4efd160..3ace49e9e90e8fb27144964a0f61f1fae267e843 100644 (file)
--- a/vss.c
+++ b/vss.c
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 1997-2011 Andre Noll <maan@systemlinux.org>
+ * Copyright (C) 1997-2014 Andre Noll <maan@systemlinux.org>
  *
  * Licensed under the GPL v2. For licencing details see COPYING.
  */
  * senders.
  */
 
+#include <sys/socket.h>
+#include <netinet/in.h>
 #include <regex.h>
-#include <dirent.h>
 #include <osl.h>
+#include <sys/types.h>
+#include <arpa/inet.h>
+#include <sys/un.h>
+#include <netdb.h>
 
 #include "para.h"
 #include "error.h"
 #include "server.cmdline.h"
 #include "list.h"
 #include "send.h"
+#include "sched.h"
 #include "vss.h"
 #include "ipc.h"
 #include "fd.h"
-#include "sched.h"
 
 extern struct misc_meta_data *mmd;
 
@@ -133,6 +138,7 @@ struct fec_group {
        uint16_t slice_bytes;
 };
 
+/** A FEC client is always in one of these states. */
 enum fec_client_state {
        FEC_STATE_NONE = 0,     /**< not initialized and not enabled */
        FEC_STATE_DISABLED,     /**< temporarily disabled */
@@ -361,8 +367,17 @@ static void vss_get_chunk(int chunk_num, struct vss_task *vsst,
 static void compute_group_size(struct vss_task *vsst, struct fec_group *g,
                int max_bytes)
 {
+       char *buf;
+       size_t len;
        int i, max_chunks = PARA_MAX(1LU, 150 / tv2ms(vss_chunk_time()));
 
+       if (g->first_chunk == 0) {
+               g->num_chunks = 1;
+               vss_get_chunk(0, vsst, &buf, &len);
+               g->bytes = len;
+               return;
+       }
+
        g->num_chunks = 0;
        g->bytes = 0;
        /*
@@ -372,8 +387,6 @@ static void compute_group_size(struct vss_task *vsst, struct fec_group *g,
         * of exactly one chunk for these audio formats.
         */
        for (i = 0;; i++) {
-               char *buf;
-               size_t len;
                int chunk_num = g->first_chunk + i;
 
                if (g->bytes > 0 && i >= max_chunks) /* duration limit */
@@ -502,7 +515,7 @@ static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst)
 {
        int ret, i, k, n, data_slices;
        size_t len;
-       char *buf;
+       char *buf, *p;
        struct fec_group *g = &fc->group;
 
        if (fc->state == FEC_STATE_NONE) {
@@ -542,6 +555,7 @@ static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst)
        /* setup header slices */
        buf = vsst->header_buf;
        for (i = 0; i < g->num_header_slices; i++) {
+               uint32_t payload_size;
                if (buf + g->slice_bytes <= vsst->header_buf + vsst->header_len) {
                        fc->src_data[i] = (const unsigned char *)buf;
                        buf += g->slice_bytes;
@@ -552,26 +566,35 @@ static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst)
                 * goes beyond the buffer. This slice will not be fully
                 * used.
                 */
-               uint32_t payload_size = vsst->header_buf
-                       + vsst->header_len - buf;
+               payload_size = vsst->header_buf + vsst->header_len - buf;
                memcpy(fc->extra_header_buf, buf, payload_size);
                if (payload_size < g->slice_bytes)
                        memset(fc->extra_header_buf + payload_size, 0,
                                g->slice_bytes - payload_size);
-               fc->src_data[i] = fc->extra_header_buf;
-               assert(i == g->num_header_slices - 1);
+               /*
+                * There might be more than one header slice to fill although
+                * only the first one will be used. Set all header slices to
+                * our extra buffer.
+                */
+               while (i < g->num_header_slices)
+                       fc->src_data[i++] = fc->extra_header_buf;
+               break; /* we don't want i to be increased. */
        }
 
-       /* setup data slices */
+       /*
+        * Setup data slices. Note that for ogg streams chunk 0 points to a
+        * buffer on the heap rather than to the mapped audio file.
+        */
        vss_get_chunk(g->first_chunk, vsst, &buf, &len);
-       for (; i < g->num_header_slices + data_slices; i++) {
-               if (buf + g->slice_bytes > vsst->map + mmd->size) {
+       for (p = buf; i < g->num_header_slices + data_slices; i++) {
+               if (p + g->slice_bytes > buf + g->bytes) {
                        /*
-                        * Can not use the memory mapped audio file for this
-                        * slice as it goes beyond the map.
+                        * We must make a copy for this slice since using p
+                        * directly would exceed the buffer.
                         */
-                       uint32_t payload_size = vsst->map + mmd->size - buf;
-                       memcpy(fc->extra_src_buf, buf, payload_size);
+                       uint32_t payload_size = buf + g->bytes - p;
+                       assert(payload_size + FEC_HEADER_SIZE <= fc->mps);
+                       memcpy(fc->extra_src_buf, p, payload_size);
                        if (payload_size < g->slice_bytes)
                                memset(fc->extra_src_buf + payload_size, 0,
                                        g->slice_bytes - payload_size);
@@ -579,8 +602,8 @@ static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst)
                        i++;
                        break;
                }
-               fc->src_data[i] = (const unsigned char *)buf;
-               buf += g->slice_bytes;
+               fc->src_data[i] = (const unsigned char *)p;
+               p += g->slice_bytes;
        }
        if (i < k) {
                /* use arbitrary data for all remaining slices */
@@ -689,26 +712,6 @@ static int next_slice_is_due(struct fec_client *fc, struct timeval *diff)
        return ret < 0? 1 : 0;
 }
 
-static void compute_slice_timeout(struct timeval *timeout)
-{
-       struct fec_client *fc;
-
-       list_for_each_entry(fc, &fec_client_list, node) {
-               struct timeval diff;
-
-               if (fc->state != FEC_STATE_READY_TO_RUN)
-                       continue;
-               if (next_slice_is_due(fc, &diff)) {
-                       timeout->tv_sec = 0;
-                       timeout->tv_usec = 0;
-                       return;
-               }
-               /* timeout = min(timeout, diff) */
-               if (tv_diff(&diff, timeout, NULL) < 0)
-                       *timeout = diff;
-       }
-}
-
 static void set_eof_barrier(struct vss_task *vsst)
 {
        struct fec_client *fc;
@@ -799,42 +802,38 @@ static int chk_barrier(const char *bname, const struct timeval *barrier,
        return -1;
 }
 
-/*
- * != NULL: timeout for next chunk
- * NULL: nothing to do
- */
-static struct timeval *vss_compute_timeout(struct vss_task *vsst)
+static void vss_compute_timeout(struct sched *s, struct vss_task *vsst)
 {
-       static struct timeval the_timeout;
-       struct timeval next_chunk;
-
-       if (vss_next() && vsst->map) {
-               /* only sleep a bit, nec*/
-               the_timeout.tv_sec = 0;
-               the_timeout.tv_usec = 100;
-               return &the_timeout;
-       }
-       if (chk_barrier("autoplay_delay", &vsst->autoplay_barrier,
-                       &the_timeout, 1) < 0)
-               return &the_timeout;
-       if (chk_barrier("eof", &vsst->eof_barrier, &the_timeout, 1) < 0)
-               return &the_timeout;
-       if (chk_barrier("data send", &vsst->data_send_barrier,
-                       &the_timeout, 1) < 0)
-               return &the_timeout;
+       struct timeval tv;
+       struct fec_client *fc;
+
        if (!vss_playing() || !vsst->map)
-               return NULL;
+               return;
+       if (vss_next() && vsst->map) /* only sleep a bit, nec*/
+               return sched_request_timeout_ms(100, s);
+
+       /* Each of these barriers must have passed until we may proceed */
+       if (sched_request_barrier(&vsst->autoplay_barrier, s) == 1)
+               return;
+       if (sched_request_barrier(&vsst->eof_barrier, s) == 1)
+               return;
+       if (sched_request_barrier(&vsst->data_send_barrier, s) == 1)
+               return;
+       /*
+        * Compute the select timeout as the minimal time until the next
+        * chunk/slice is due for any client.
+        */
        compute_chunk_time(mmd->chunks_sent, &mmd->afd.afhi.chunk_tv,
-               &mmd->stream_start, &next_chunk);
-       if (chk_barrier("chunk", &next_chunk, &the_timeout, 0) >= 0) {
-               /* chunk is due or bof */
-               the_timeout.tv_sec = 0;
-               the_timeout.tv_usec = 0;
-               return &the_timeout;
+               &mmd->stream_start, &tv);
+       if (sched_request_barrier_or_min_delay(&tv, s) == 0)
+               return;
+       list_for_each_entry(fc, &fec_client_list, node) {
+               if (fc->state != FEC_STATE_READY_TO_RUN)
+                       continue;
+               if (next_slice_is_due(fc, &tv))
+                       return sched_min_delay(s);
+               sched_request_timeout(&tv, s);
        }
-       /* compute min of current timeout and next slice time */
-       compute_slice_timeout(&the_timeout);
-       return &the_timeout;
 }
 
 static void vss_eof(struct vss_task *vsst)
@@ -903,7 +902,6 @@ static void set_mmd_offset(void)
 static void vss_pre_select(struct sched *s, struct task *t)
 {
        int i;
-       struct timeval *tv;
        struct vss_task *vsst = container_of(t, struct vss_task, task);
 
        if (!vsst->map || vss_next() || vss_paused() || vss_repos()) {
@@ -941,9 +939,7 @@ static void vss_pre_select(struct sched *s, struct task *t)
                        continue;
                senders[i].pre_select(&s->max_fileno, &s->rfds, &s->wfds);
        }
-       tv = vss_compute_timeout(vsst);
-       if (tv)
-               sched_request_timeout(tv, s);
+       vss_compute_timeout(s, vsst);
 }
 
 static int recv_afs_msg(int afs_socket, int *fd, uint32_t *code, uint32_t *data)
@@ -980,6 +976,10 @@ static int recv_afs_msg(int afs_socket, int *fd, uint32_t *code, uint32_t *data)
        return 1;
 }
 
+#ifndef MAP_POPULATE
+#define MAP_POPULATE 0
+#endif
+
 static void recv_afs_result(struct vss_task *vsst, fd_set *rfds)
 {
        int ret, passed_fd, shmid;
@@ -1014,8 +1014,8 @@ static void recv_afs_result(struct vss_task *vsst, fd_set *rfds)
        }
        mmd->size = statbuf.st_size;
        mmd->mtime = statbuf.st_mtime;
-       ret = para_mmap(mmd->size, PROT_READ, MAP_PRIVATE, passed_fd,
-               0, &vsst->map);
+       ret = para_mmap(mmd->size, PROT_READ, MAP_PRIVATE | MAP_POPULATE,
+               passed_fd, 0, &vsst->map);
        if (ret < 0)
                goto err;
        close(passed_fd);
@@ -1102,17 +1102,35 @@ static void vss_send(struct vss_task *vsst)
                        senders[i].send(mmd->current_chunk, mmd->chunks_sent,
                                buf, len, vsst->header_buf, vsst->header_len);
                }
+               /*
+                * Prefault next chunk(s)
+                *
+                * If the backing device of the memory-mapped audio file is
+                * slow and read-ahead is turned off or prevented for some
+                * reason, e.g. due to memory pressure, it may take much longer
+                * than the chunk interval to get the next chunk on the wire,
+                * causing buffer underruns on the client side. Mapping the
+                * file with MAP_POPULATE seems to help a bit, but it does not
+                * eliminate the delays completely. Moreover, it is supported
+                * only on Linux. So we do our own read-ahead here.
+                */
+               if (mmd->current_chunk > 0) { /* chunk 0 might be on the heap */
+                       buf += len;
+                       for (i = 0; i < 5 && buf < vsst->map + mmd->size; i++) {
+                               __a_unused volatile char x = *buf;
+                               buf += 4096;
+                       }
+               }
                mmd->chunks_sent++;
                mmd->current_chunk++;
        }
 }
 
-static void vss_post_select(struct sched *s, struct task *t)
+static int vss_post_select(struct sched *s, struct task *t)
 {
        int ret, i;
        struct vss_task *vsst = container_of(t, struct vss_task, task);
 
-
        if (mmd->sender_cmd_data.cmd_num >= 0) {
                int num = mmd->sender_cmd_data.cmd_num,
                        sender_num = mmd->sender_cmd_data.sender_num;
@@ -1129,7 +1147,7 @@ static void vss_post_select(struct sched *s, struct task *t)
                recv_afs_result(vsst, &s->rfds);
        else if (FD_ISSET(vsst->afs_socket, &s->wfds)) {
                PARA_NOTICE_LOG("requesting new fd from afs\n");
-               ret = send_buffer(vsst->afs_socket, "new");
+               ret = write_buffer(vsst->afs_socket, "new");
                if (ret < 0)
                        PARA_CRIT_LOG("%s\n", para_strerror(-ret));
                else
@@ -1144,17 +1162,19 @@ static void vss_post_select(struct sched *s, struct task *t)
                        (vss_next() && vss_playing()))
                tv_add(now, &vsst->announce_tv, &vsst->data_send_barrier);
        vss_send(vsst);
+       return 0;
 }
 
 /**
  * Initialize the virtual streaming system task.
  *
  * \param afs_socket The fd for communication with afs.
+ * \param s The scheduler to register the vss task to.
  *
  * This also initializes all supported senders and starts streaming
  * if the --autoplay command line flag was given.
  */
-void init_vss_task(int afs_socket)
+void init_vss_task(int afs_socket, struct sched *s)
 {
        static struct vss_task vss_task_struct, *vsst = &vss_task_struct;
        int i;
@@ -1187,5 +1207,5 @@ void init_vss_task(int afs_socket)
                        &vsst->data_send_barrier);
        }
        sprintf(vsst->task.status, "vss task");
-       register_task(&vsst->task);
+       register_task(s, &vsst->task);
 }