afh: Move audio_format_name() up.
[paraslash.git] / vss.c
diff --git a/vss.c b/vss.c
index 4a1db40d716c9022c7a38e0653ee50a515a53e4f..f6ec83934d971d3f6de50c363e9c15bb7c94866c 100644 (file)
--- a/vss.c
+++ b/vss.c
@@ -1,8 +1,4 @@
-/*
- * Copyright (C) 1997 Andre Noll <maan@tuebingen.mpg.de>
- *
- * Licensed under the GPL v2. For licencing details see COPYING.
- */
+/* Copyright (C) 1997 Andre Noll <maan@tuebingen.mpg.de>, see file COPYING. */
 
 /** \file vss.c The virtual streaming system.
  *
@@ -29,8 +25,8 @@
 #include "string.h"
 #include "afh.h"
 #include "afs.h"
-#include "server.h"
 #include "net.h"
+#include "server.h"
 #include "list.h"
 #include "send.h"
 #include "sched.h"
 #include "fd.h"
 
 extern struct misc_meta_data *mmd;
-
-extern void dccp_send_init(struct sender *);
-extern void http_send_init(struct sender *);
-extern void udp_send_init(struct sender *);
-
-/** The list of supported senders. */
-struct sender senders[] = {
-       {
-               .name = "http",
-               .init = http_send_init,
-       },
-       {
-               .name = "dccp",
-               .init = dccp_send_init,
-       },
-       {
-               .name = "udp",
-               .init = udp_send_init,
-       },
-       {
-               .name = NULL,
-       }
-};
+extern const struct sender udp_sender, dccp_sender, http_sender;
+const struct sender * const senders[] = {
+       &http_sender, &dccp_sender, &udp_sender, NULL};
 
 /** The possible states of the afs socket. */
 enum afs_socket_status {
@@ -302,7 +278,7 @@ static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst)
        if (fcp->init_fec) {
                /*
                 * Set the maximum slice size to the Maximum Packet Size if the
-                * transport protocol allows to determine this value. The user
+                * transport protocol allows determination of this value. The user
                 * can specify a slice size up to this value.
                 */
                ret = fcp->init_fec(fc->sc);
@@ -349,7 +325,7 @@ static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst)
        return 1;
 }
 
-static void vss_get_chunk(int chunk_num, struct vss_task *vsst,
+static int vss_get_chunk(int chunk_num, struct vss_task *vsst,
                char **buf, size_t *sz)
 {
        int ret;
@@ -366,31 +342,32 @@ static void vss_get_chunk(int chunk_num, struct vss_task *vsst,
                assert(vsst->header_buf);
                *buf = vsst->header_buf; /* stripped header */
                *sz = vsst->header_len;
-               return;
+               return 0;
        }
        ret = afh_get_chunk(chunk_num, &mmd->afd.afhi,
                mmd->afd.audio_format_id, vsst->map, vsst->mapsize,
                (const char **)buf, sz, &vsst->afh_context);
        if (ret < 0) {
-               PARA_WARNING_LOG("could not get chunk %d: %s\n",
-                       chunk_num, para_strerror(-ret));
                *buf = NULL;
                *sz = 0;
        }
+       return ret;
 }
 
-static void compute_group_size(struct vss_task *vsst, struct fec_group *g,
+static int compute_group_size(struct vss_task *vsst, struct fec_group *g,
                int max_bytes)
 {
        char *buf;
        size_t len;
-       int i, max_chunks = PARA_MAX(1LU, 150 / tv2ms(vss_chunk_time()));
+       int ret, i, max_chunks = PARA_MAX(1LU, 150 / tv2ms(vss_chunk_time()));
 
        if (g->first_chunk == 0) {
                g->num_chunks = 1;
-               vss_get_chunk(0, vsst, &buf, &len);
+               ret = vss_get_chunk(0, vsst, &buf, &len);
+               if (ret < 0)
+                        return ret;
                g->bytes = len;
-               return;
+               return 0;
        }
 
        g->num_chunks = 0;
@@ -408,7 +385,9 @@ static void compute_group_size(struct vss_task *vsst, struct fec_group *g,
                        break;
                if (chunk_num >= mmd->afd.afhi.chunks_total) /* eof */
                        break;
-               vss_get_chunk(chunk_num, vsst, &buf, &len);
+               ret = vss_get_chunk(chunk_num, vsst, &buf, &len);
+               if (ret < 0)
+                        return ret;
                if (g->bytes + len > max_bytes)
                        break;
                /* Include this chunk */
@@ -416,6 +395,7 @@ static void compute_group_size(struct vss_task *vsst, struct fec_group *g,
                g->num_chunks++;
        }
        assert(g->num_chunks);
+       return 1;
 }
 
 /*
@@ -477,7 +457,9 @@ static int compute_slice_size(struct fec_client *fc, struct vss_task *vsst)
        if (!need_audio_header(fc, vsst)) {
                max_group_bytes = k * max_slice_bytes;
                g->num_header_slices = 0;
-               compute_group_size(vsst, g, max_group_bytes);
+               ret = compute_group_size(vsst, g, max_group_bytes);
+               if (ret < 0)
+                       return ret;
                g->slice_bytes = DIV_ROUND_UP(g->bytes, k);
                if (g->slice_bytes == 0)
                        g->slice_bytes = 1;
@@ -493,7 +475,9 @@ static int compute_slice_size(struct fec_client *fc, struct vss_task *vsst)
        h = vsst->header_len;
        max_group_bytes = (k - num_slices(h, max_slice_bytes, n - k))
                * max_slice_bytes;
-       compute_group_size(vsst, g, max_group_bytes);
+       ret = compute_group_size(vsst, g, max_group_bytes);
+       if (ret < 0)
+               return ret;
        d = g->bytes;
        if (d == 0) {
                g->slice_bytes = DIV_ROUND_UP(h, k);
@@ -600,7 +584,9 @@ static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst)
         * Setup data slices. Note that for ogg streams chunk 0 points to a
         * buffer on the heap rather than to the mapped audio file.
         */
-       vss_get_chunk(g->first_chunk, vsst, &buf, &len);
+       ret = vss_get_chunk(g->first_chunk, vsst, &buf, &len);
+       if (ret < 0)
+               return ret;
        for (p = buf; i < g->num_header_slices + data_slices; i++) {
                if (p + g->slice_bytes > buf + g->bytes) {
                        /*
@@ -912,10 +898,10 @@ static void vss_pre_select(struct sched *s, void *context)
                vsst->afsss = AFS_SOCKET_CHECK_FOR_WRITE;
        } else
                para_fd_set(vsst->afs_socket, &s->rfds, &s->max_fileno);
-       for (i = 0; senders[i].name; i++) {
-               if (!senders[i].pre_select)
+       FOR_EACH_SENDER(i) {
+               if (!senders[i]->pre_select)
                        continue;
-               senders[i].pre_select(&s->max_fileno, &s->rfds, &s->wfds);
+               senders[i]->pre_select(&s->max_fileno, &s->rfds, &s->wfds);
        }
        vss_compute_timeout(s, vsst);
 }
@@ -991,7 +977,7 @@ static void recv_afs_result(struct vss_task *vsst, fd_set *rfds)
                goto err;
        }
        ret = para_mmap(statbuf.st_size, PROT_READ, MAP_PRIVATE | MAP_POPULATE,
-               passed_fd, 0, &vsst->map);
+               passed_fd, &vsst->map);
        if (ret < 0)
                goto err;
        vsst->mapsize = statbuf.st_size;
@@ -1024,22 +1010,24 @@ err:
  */
 static void vss_send(struct vss_task *vsst)
 {
-       int i, fec_active = 0;
+       int i, ret;
+       bool fec_active = false;
        struct timeval due;
        struct fec_client *fc, *tmp_fc;
+       char *buf;
+       size_t len;
 
        if (!vsst->map || !vss_playing())
                return;
        if (chk_barrier("eof", &vsst->eof_barrier, &due, 1) < 0)
                return;
-       if (chk_barrier("data send", &vsst->data_send_barrier,
-                       &due, 1) < 0)
+       if (chk_barrier("data send", &vsst->data_send_barrier, &due, 1) < 0)
                return;
        list_for_each_entry_safe(fc, tmp_fc, &fec_client_list, node) {
                if (fc->state == FEC_STATE_DISABLED)
                        continue;
                if (!next_slice_is_due(fc, NULL)) {
-                       fec_active = 1;
+                       fec_active = true;
                        continue;
                }
                if (compute_next_fec_slice(fc, vsst) <= 0)
@@ -1049,7 +1037,7 @@ static void vss_send(struct vss_task *vsst)
                fc->current_slice_num++;
                fc->fcp->send_fec(fc->sc, (char *)fc->enc_buf,
                        fc->group.slice_bytes + FEC_HEADER_SIZE);
-               fec_active = 1;
+               fec_active = true;
        }
        if (mmd->current_chunk >= mmd->afd.afhi.chunks_total) { /* eof */
                if (!fec_active)
@@ -1058,49 +1046,31 @@ static void vss_send(struct vss_task *vsst)
        }
        compute_chunk_time(mmd->chunks_sent, &mmd->afd.afhi.chunk_tv,
                &mmd->stream_start, &due);
-       if (tv_diff(&due, now, NULL) <= 0) {
-               char *buf;
-               size_t len;
-
-               if (!mmd->chunks_sent) {
-                       mmd->stream_start = *now;
-                       mmd->events++;
-                       set_mmd_offset();
-               }
+       if (tv_diff(&due, now, NULL) > 0)
+               return;
+       if (!mmd->chunks_sent) {
+               mmd->stream_start = *now;
+               mmd->events++;
+               set_mmd_offset();
+       }
+       ret = vss_get_chunk(mmd->current_chunk, vsst, &buf, &len);
+       if (ret < 0) {
+               PARA_ERROR_LOG("could not get chunk %lu: %s\n",
+                       mmd->current_chunk, para_strerror(-ret));
+       } else {
                /*
-                * We call the send function also in case of empty chunks as
-                * they might have still some data queued which can be sent in
-                * this case.
+                * We call ->send() even if len is zero because senders might
+                * have data queued which can be sent now.
                 */
-               vss_get_chunk(mmd->current_chunk, vsst, &buf, &len);
-               for (i = 0; senders[i].name; i++) {
-                       if (!senders[i].send)
+               FOR_EACH_SENDER(i) {
+                       if (!senders[i]->send)
                                continue;
-                       senders[i].send(mmd->current_chunk, mmd->chunks_sent,
+                       senders[i]->send(mmd->current_chunk, mmd->chunks_sent,
                                buf, len, vsst->header_buf, vsst->header_len);
                }
-               /*
-                * Prefault next chunk(s)
-                *
-                * If the backing device of the memory-mapped audio file is
-                * slow and read-ahead is turned off or prevented for some
-                * reason, e.g. due to memory pressure, it may take much longer
-                * than the chunk interval to get the next chunk on the wire,
-                * causing buffer underruns on the client side. Mapping the
-                * file with MAP_POPULATE seems to help a bit, but it does not
-                * eliminate the delays completely. Moreover, it is supported
-                * only on Linux. So we do our own read-ahead here.
-                */
-               if (mmd->current_chunk > 0) { /* chunk 0 might be on the heap */
-                       buf += len;
-                       for (i = 0; i < 5 && buf < vsst->map + vsst->mapsize; i++) {
-                               __a_unused volatile char x = *buf;
-                               buf += 4096;
-                       }
-               }
-               mmd->chunks_sent++;
-               mmd->current_chunk++;
        }
+       mmd->chunks_sent++;
+       mmd->current_chunk++;
 }
 
 static int vss_post_select(struct sched *s, void *context)
@@ -1108,12 +1078,17 @@ static int vss_post_select(struct sched *s, void *context)
        int ret, i;
        struct vss_task *vsst = context;
 
+       ret = task_get_notification(vsst->task);
+       if (ret < 0) {
+               afh_free_header(vsst->header_buf, mmd->afd.audio_format_id);
+               return ret;
+       }
        if (!vsst->map || vss_next() || vss_paused() || vss_repos()) {
                /* shut down senders and fec clients */
                struct fec_client *fc, *tmp;
-               for (i = 0; senders[i].name; i++)
-                       if (senders[i].shutdown_clients)
-                               senders[i].shutdown_clients();
+               FOR_EACH_SENDER(i)
+                       if (senders[i]->shutdown_clients)
+                               senders[i]->shutdown_clients();
                list_for_each_entry_safe(fc, tmp, &fec_client_list, node)
                        fc->state = FEC_STATE_NONE;
                mmd->stream_start.tv_sec = 0;
@@ -1139,8 +1114,8 @@ static int vss_post_select(struct sched *s, void *context)
                int num = mmd->sender_cmd_data.cmd_num,
                        sender_num = mmd->sender_cmd_data.sender_num;
 
-               if (senders[sender_num].client_cmds[num]) {
-                       ret = senders[sender_num].client_cmds[num]
+               if (senders[sender_num]->client_cmds[num]) {
+                       ret = senders[sender_num]->client_cmds[num]
                                (&mmd->sender_cmd_data);
                        if (ret < 0)
                                PARA_ERROR_LOG("%s\n", para_strerror(-ret));
@@ -1157,10 +1132,10 @@ static int vss_post_select(struct sched *s, void *context)
                else
                        vsst->afsss = AFS_SOCKET_AFD_PENDING;
        }
-       for (i = 0; senders[i].name; i++) {
-               if (!senders[i].post_select)
+       FOR_EACH_SENDER(i) {
+               if (!senders[i]->post_select)
                        continue;
-               senders[i].post_select(&s->rfds, &s->wfds);
+               senders[i]->post_select(&s->rfds, &s->wfds);
        }
        if ((vss_playing() && !(mmd->vss_status_flags & VSS_PLAYING)) ||
                        (vss_next() && vss_playing()))
@@ -1178,11 +1153,10 @@ static int vss_post_select(struct sched *s, void *context)
  * This also initializes all supported senders and starts streaming
  * if the --autoplay command line flag was given.
  */
-void init_vss_task(int afs_socket, struct sched *s)
+void vss_init(int afs_socket, struct sched *s)
 {
        static struct vss_task vss_task_struct, *vsst = &vss_task_struct;
        int i;
-       char *hn = para_hostname(), *home = para_homedir();
        long unsigned announce_time = OPT_UINT32_VAL(ANNOUNCE_TIME),
                autoplay_delay = OPT_UINT32_VAL(AUTOPLAY_DELAY);
        vsst->header_interval.tv_sec = 5; /* should this be configurable? */
@@ -1190,12 +1164,10 @@ void init_vss_task(int afs_socket, struct sched *s)
        ms2tv(announce_time, &vsst->announce_tv);
        PARA_INFO_LOG("announce timeval: %lums\n", tv2ms(&vsst->announce_tv));
        INIT_LIST_HEAD(&fec_client_list);
-       for (i = 0; senders[i].name; i++) {
-               PARA_NOTICE_LOG("initializing %s sender\n", senders[i].name);
-               senders[i].init(&senders[i]);
+       FOR_EACH_SENDER(i) {
+               PARA_NOTICE_LOG("initializing %s sender\n", senders[i]->name);
+               senders[i]->init();
        }
-       free(hn);
-       free(home);
        mmd->sender_cmd_data.cmd_num = -1;
        if (OPT_GIVEN(AUTOPLAY)) {
                struct timeval tmp;
@@ -1207,9 +1179,26 @@ void init_vss_task(int afs_socket, struct sched *s)
                        &vsst->data_send_barrier);
        }
        vsst->task = task_register(&(struct task_info) {
-               .name = "vss task",
+               .name = "vss",
                .pre_select = vss_pre_select,
                .post_select = vss_post_select,
                .context = vsst,
        }, s);
 }
+
+/**
+ * Turn off the virtual streaming system.
+ *
+ * This is only executed on exit. It calls the ->shutdowwn method of all senders.
+ */
+void vss_shutdown(void)
+{
+       int i;
+
+       FOR_EACH_SENDER(i) {
+               if (!senders[i]->shutdown)
+                       continue;
+               PARA_NOTICE_LOG("shutting down %s sender\n", senders[i]->name);
+               senders[i]->shutdown();
+       }
+}