vss: Improve comment about sending empty chunks.
[paraslash.git] / vss.c
diff --git a/vss.c b/vss.c
index bfea37a191f8b90db60db720916eb37ebf900223..625fabe766bb7707ac2a35125f7af0ce250f1cae 100644 (file)
--- a/vss.c
+++ b/vss.c
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 1997-2012 Andre Noll <maan@systemlinux.org>
+ * Copyright (C) 1997 Andre Noll <maan@tuebingen.mpg.de>
  *
  * Licensed under the GPL v2. For licencing details see COPYING.
  */
  * senders.
  */
 
+#include <sys/socket.h>
+#include <netinet/in.h>
 #include <regex.h>
 #include <osl.h>
+#include <sys/types.h>
+#include <arpa/inet.h>
+#include <sys/un.h>
+#include <netdb.h>
+#include <lopsub.h>
 
+#include "server.lsg.h"
 #include "para.h"
 #include "error.h"
 #include "portable_io.h"
@@ -23,7 +31,6 @@
 #include "afs.h"
 #include "server.h"
 #include "net.h"
-#include "server.cmdline.h"
 #include "list.h"
 #include "send.h"
 #include "sched.h"
@@ -82,14 +89,18 @@ struct vss_task {
        enum afs_socket_status afsss;
        /** The memory mapped audio file. */
        char *map;
+       /** The size of the memory mapping. */
+       size_t mapsize;
        /** Used by the scheduler. */
-       struct task task;
+       struct task *task;
        /** Pointer to the header of the mapped audio file. */
        char *header_buf;
        /** Length of the audio file header. */
        size_t header_len;
        /** Time between audio file headers are sent. */
        struct timeval header_interval;
+       /* Only used if afh supports dynamic chunks. */
+       void *afh_context;
 };
 
 /**
@@ -341,6 +352,8 @@ static int initialize_fec_client(struct fec_client *fc, struct vss_task *vsst)
 static void vss_get_chunk(int chunk_num, struct vss_task *vsst,
                char **buf, size_t *sz)
 {
+       int ret;
+
        /*
         * Chunk zero is special for header streams: It is the first portion of
         * the audio file which consists of the audio file header. It may be
@@ -350,12 +363,20 @@ static void vss_get_chunk(int chunk_num, struct vss_task *vsst,
         * rather than the unmodified header (chunk zero).
         */
        if (chunk_num == 0 && vsst->header_len > 0) {
+               assert(vsst->header_buf);
                *buf = vsst->header_buf; /* stripped header */
                *sz = vsst->header_len;
                return;
        }
-       afh_get_chunk(chunk_num, &mmd->afd.afhi, vsst->map, (const char **)buf,
-               sz);
+       ret = afh_get_chunk(chunk_num, &mmd->afd.afhi,
+               mmd->afd.audio_format_id, vsst->map, vsst->mapsize,
+               (const char **)buf, sz, &vsst->afh_context);
+       if (ret < 0) {
+               PARA_WARNING_LOG("could not get chunk %d: %s\n",
+                       chunk_num, para_strerror(-ret));
+               *buf = NULL;
+               *sz = 0;
+       }
 }
 
 static void compute_group_size(struct vss_task *vsst, struct fec_group *g,
@@ -565,8 +586,14 @@ static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst)
                if (payload_size < g->slice_bytes)
                        memset(fc->extra_header_buf + payload_size, 0,
                                g->slice_bytes - payload_size);
-               fc->src_data[i] = fc->extra_header_buf;
-               assert(i == g->num_header_slices - 1);
+               /*
+                * There might be more than one header slice to fill although
+                * only the first one will be used. Set all header slices to
+                * our extra buffer.
+                */
+               while (i < g->num_header_slices)
+                       fc->src_data[i++] = fc->extra_header_buf;
+               break; /* we don't want i to be increased. */
        }
 
        /*
@@ -599,7 +626,7 @@ static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst)
                for (; i < k; i++)
                        fc->src_data[i] = (const unsigned char *)buf;
        }
-       PARA_DEBUG_LOG("FEC group %d: %d chunks (%d - %d), %d bytes\n",
+       PARA_DEBUG_LOG("FEC group %u: %u chunks (%u - %u), %u bytes\n",
                g->num, g->num_chunks, g->first_chunk,
                g->first_chunk + g->num_chunks - 1, g->bytes
        );
@@ -834,7 +861,7 @@ static void vss_eof(struct vss_task *vsst)
        set_eof_barrier(vsst);
        afh_free_header(vsst->header_buf, mmd->afd.audio_format_id);
        vsst->header_buf = NULL;
-       para_munmap(vsst->map, mmd->size);
+       para_munmap(vsst->map, vsst->mapsize);
        vsst->map = NULL;
        mmd->chunks_sent = 0;
        //mmd->offset = 0;
@@ -843,8 +870,9 @@ static void vss_eof(struct vss_task *vsst)
        mmd->afd.afhi.chunk_tv.tv_usec = 0;
        free(mmd->afd.afhi.chunk_table);
        mmd->afd.afhi.chunk_table = NULL;
-       mmd->mtime = 0;
-       mmd->size = 0;
+       vsst->mapsize = 0;
+       afh_close(vsst->afh_context, mmd->afd.audio_format_id);
+       vsst->afh_context = NULL;
        mmd->events++;
 }
 
@@ -873,49 +901,11 @@ static void set_mmd_offset(void)
        mmd->offset = tv2ms(&offset);
 }
 
-/**
- * Compute the timeout for the main select-loop of the scheduler.
- *
- * \param s Pointer to the server scheduler.
- * \param t Pointer to the vss task structure.
- *
- * Before the timeout is computed, the current vss status flags are evaluated
- * and acted upon by calling appropriate functions from the lower layers.
- * Possible actions include
- *
- *     - request a new audio file from afs,
- *     - shutdown of all senders (stop/pause command),
- *     - reposition the stream (ff/jmp command).
- */
-static void vss_pre_select(struct sched *s, struct task *t)
+static void vss_pre_select(struct sched *s, void *context)
 {
        int i;
-       struct vss_task *vsst = container_of(t, struct vss_task, task);
+       struct vss_task *vsst = context;
 
-       if (!vsst->map || vss_next() || vss_paused() || vss_repos()) {
-               struct fec_client *fc, *tmp;
-               for (i = 0; senders[i].name; i++)
-                       if (senders[i].shutdown_clients)
-                               senders[i].shutdown_clients();
-               list_for_each_entry_safe(fc, tmp, &fec_client_list, node)
-                       fc->state = FEC_STATE_NONE;
-               mmd->stream_start.tv_sec = 0;
-               mmd->stream_start.tv_usec = 0;
-       }
-       if (vss_next())
-               vss_eof(vsst);
-       else if (vss_paused()) {
-               if (mmd->chunks_sent)
-                       set_eof_barrier(vsst);
-               mmd->chunks_sent = 0;
-       } else if (vss_repos()) {
-               tv_add(now, &vsst->announce_tv, &vsst->data_send_barrier);
-               set_eof_barrier(vsst);
-               mmd->chunks_sent = 0;
-               mmd->current_chunk = mmd->repos_request;
-               mmd->new_vss_status_flags &= ~VSS_REPOS;
-               set_mmd_offset();
-       }
        if (need_to_request_new_audio_file(vsst)) {
                PARA_DEBUG_LOG("ready and playing, but no audio file\n");
                para_fd_set(vsst->afs_socket, &s->wfds, &s->max_fileno);
@@ -932,7 +922,7 @@ static void vss_pre_select(struct sched *s, struct task *t)
 
 static int recv_afs_msg(int afs_socket, int *fd, uint32_t *code, uint32_t *data)
 {
-       char control[255], buf[8];
+       char control[255] __a_aligned(8), buf[8];
        struct msghdr msg = {.msg_iov = NULL};
        struct cmsghdr *cmsg;
        struct iovec iov;
@@ -1000,12 +990,11 @@ static void recv_afs_result(struct vss_task *vsst, fd_set *rfds)
                ret = -ERRNO_TO_PARA_ERROR(errno);
                goto err;
        }
-       mmd->size = statbuf.st_size;
-       mmd->mtime = statbuf.st_mtime;
-       ret = para_mmap(mmd->size, PROT_READ, MAP_PRIVATE | MAP_POPULATE,
+       ret = para_mmap(statbuf.st_size, PROT_READ, MAP_PRIVATE | MAP_POPULATE,
                passed_fd, 0, &vsst->map);
        if (ret < 0)
                goto err;
+       vsst->mapsize = statbuf.st_size;
        close(passed_fd);
        mmd->chunks_sent = 0;
        mmd->current_chunk = 0;
@@ -1014,7 +1003,7 @@ static void recv_afs_result(struct vss_task *vsst, fd_set *rfds)
        mmd->num_played++;
        mmd->new_vss_status_flags &= (~VSS_NEXT);
        afh_get_header(&mmd->afd.afhi, mmd->afd.audio_format_id,
-               vsst->map, mmd->size, &vsst->header_buf, &vsst->header_len);
+               vsst->map, vsst->mapsize, &vsst->header_buf, &vsst->header_len);
        return;
 err:
        free(mmd->afd.afhi.chunk_table);
@@ -1043,8 +1032,7 @@ static void vss_send(struct vss_task *vsst)
                return;
        if (chk_barrier("eof", &vsst->eof_barrier, &due, 1) < 0)
                return;
-       if (chk_barrier("data send", &vsst->data_send_barrier,
-                       &due, 1) < 0)
+       if (chk_barrier("data send", &vsst->data_send_barrier, &due, 1) < 0)
                return;
        list_for_each_entry_safe(fc, tmp_fc, &fec_client_list, node) {
                if (fc->state == FEC_STATE_DISABLED)
@@ -1055,11 +1043,11 @@ static void vss_send(struct vss_task *vsst)
                }
                if (compute_next_fec_slice(fc, vsst) <= 0)
                        continue;
-               PARA_DEBUG_LOG("sending %d:%d (%u bytes)\n", fc->group.num,
+               PARA_DEBUG_LOG("sending %u:%u (%u bytes)\n", fc->group.num,
                        fc->current_slice_num, fc->group.slice_bytes);
+               fc->current_slice_num++;
                fc->fcp->send_fec(fc->sc, (char *)fc->enc_buf,
                        fc->group.slice_bytes + FEC_HEADER_SIZE);
-               fc->current_slice_num++;
                fec_active = 1;
        }
        if (mmd->current_chunk >= mmd->afd.afhi.chunks_total) { /* eof */
@@ -1078,13 +1066,12 @@ static void vss_send(struct vss_task *vsst)
                        mmd->events++;
                        set_mmd_offset();
                }
-               /*
-                * We call the send function also in case of empty chunks as
-                * they might have still some data queued which can be sent in
-                * this case.
-                */
                vss_get_chunk(mmd->current_chunk, vsst, &buf, &len);
                for (i = 0; senders[i].name; i++) {
+                       /*
+                        * We call ->send() even if len is zero because senders
+                        * might have data queued which can be sent now.
+                        */
                        if (!senders[i].send)
                                continue;
                        senders[i].send(mmd->current_chunk, mmd->chunks_sent,
@@ -1104,7 +1091,7 @@ static void vss_send(struct vss_task *vsst)
                 */
                if (mmd->current_chunk > 0) { /* chunk 0 might be on the heap */
                        buf += len;
-                       for (i = 0; i < 5 && buf < vsst->map + mmd->size; i++) {
+                       for (i = 0; i < 5 && buf < vsst->map + vsst->mapsize; i++) {
                                __a_unused volatile char x = *buf;
                                buf += 4096;
                        }
@@ -1114,12 +1101,38 @@ static void vss_send(struct vss_task *vsst)
        }
 }
 
-static void vss_post_select(struct sched *s, struct task *t)
+static int vss_post_select(struct sched *s, void *context)
 {
        int ret, i;
-       struct vss_task *vsst = container_of(t, struct vss_task, task);
-
+       struct vss_task *vsst = context;
 
+       if (!vsst->map || vss_next() || vss_paused() || vss_repos()) {
+               /* shut down senders and fec clients */
+               struct fec_client *fc, *tmp;
+               for (i = 0; senders[i].name; i++)
+                       if (senders[i].shutdown_clients)
+                               senders[i].shutdown_clients();
+               list_for_each_entry_safe(fc, tmp, &fec_client_list, node)
+                       fc->state = FEC_STATE_NONE;
+               mmd->stream_start.tv_sec = 0;
+               mmd->stream_start.tv_usec = 0;
+       }
+       if (vss_next())
+               vss_eof(vsst);
+       else if (vss_paused()) {
+               if (mmd->chunks_sent)
+                       set_eof_barrier(vsst);
+               mmd->chunks_sent = 0;
+       } else if (vss_repos()) { /* repositioning due to ff/jmp command */
+               tv_add(now, &vsst->announce_tv, &vsst->data_send_barrier);
+               set_eof_barrier(vsst);
+               mmd->chunks_sent = 0;
+               mmd->current_chunk = afh_get_start_chunk(mmd->repos_request,
+                       &mmd->afd.afhi, mmd->afd.audio_format_id);
+               mmd->new_vss_status_flags &= ~VSS_REPOS;
+               set_mmd_offset();
+       }
+       /* If a sender command is pending, run it. */
        if (mmd->sender_cmd_data.cmd_num >= 0) {
                int num = mmd->sender_cmd_data.cmd_num,
                        sender_num = mmd->sender_cmd_data.sender_num;
@@ -1136,7 +1149,7 @@ static void vss_post_select(struct sched *s, struct task *t)
                recv_afs_result(vsst, &s->rfds);
        else if (FD_ISSET(vsst->afs_socket, &s->wfds)) {
                PARA_NOTICE_LOG("requesting new fd from afs\n");
-               ret = send_buffer(vsst->afs_socket, "new");
+               ret = write_buffer(vsst->afs_socket, "new");
                if (ret < 0)
                        PARA_CRIT_LOG("%s\n", para_strerror(-ret));
                else
@@ -1151,6 +1164,7 @@ static void vss_post_select(struct sched *s, struct task *t)
                        (vss_next() && vss_playing()))
                tv_add(now, &vsst->announce_tv, &vsst->data_send_barrier);
        vss_send(vsst);
+       return 0;
 }
 
 /**
@@ -1167,14 +1181,10 @@ void init_vss_task(int afs_socket, struct sched *s)
        static struct vss_task vss_task_struct, *vsst = &vss_task_struct;
        int i;
        char *hn = para_hostname(), *home = para_homedir();
-       long unsigned announce_time = conf.announce_time_arg > 0?
-                       conf.announce_time_arg : 300,
-               autoplay_delay = conf.autoplay_delay_arg > 0?
-                       conf.autoplay_delay_arg : 0;
+       long unsigned announce_time = OPT_UINT32_VAL(ANNOUNCE_TIME),
+               autoplay_delay = OPT_UINT32_VAL(AUTOPLAY_DELAY);
        vsst->header_interval.tv_sec = 5; /* should this be configurable? */
        vsst->afs_socket = afs_socket;
-       vsst->task.pre_select = vss_pre_select;
-       vsst->task.post_select = vss_post_select;
        ms2tv(announce_time, &vsst->announce_tv);
        PARA_INFO_LOG("announce timeval: %lums\n", tv2ms(&vsst->announce_tv));
        INIT_LIST_HEAD(&fec_client_list);
@@ -1185,15 +1195,19 @@ void init_vss_task(int afs_socket, struct sched *s)
        free(hn);
        free(home);
        mmd->sender_cmd_data.cmd_num = -1;
-       if (conf.autoplay_given) {
+       if (OPT_GIVEN(AUTOPLAY)) {
                struct timeval tmp;
                mmd->vss_status_flags |= VSS_PLAYING;
                mmd->new_vss_status_flags |= VSS_PLAYING;
                ms2tv(autoplay_delay, &tmp);
-               tv_add(now, &tmp, &vsst->autoplay_barrier);
+               tv_add(clock_get_realtime(NULL), &tmp, &vsst->autoplay_barrier);
                tv_add(&vsst->autoplay_barrier, &vsst->announce_tv,
                        &vsst->data_send_barrier);
        }
-       sprintf(vsst->task.status, "vss task");
-       register_task(s, &vsst->task);
+       vsst->task = task_register(&(struct task_info) {
+               .name = "vss task",
+               .pre_select = vss_pre_select,
+               .post_select = vss_post_select,
+               .context = vsst,
+       }, s);
 }