/*
- * Copyright (C) 1997-2011 Andre Noll <maan@systemlinux.org>
+ * Copyright (C) 1997 Andre Noll <maan@tuebingen.mpg.de>
*
* Licensed under the GPL v2. For licencing details see COPYING.
*/
* senders.
*/
+#include <sys/socket.h>
+#include <netinet/in.h>
#include <regex.h>
-#include <dirent.h>
#include <osl.h>
+#include <sys/types.h>
+#include <arpa/inet.h>
+#include <sys/un.h>
+#include <netdb.h>
#include "para.h"
#include "error.h"
#include "server.cmdline.h"
#include "list.h"
#include "send.h"
+#include "sched.h"
#include "vss.h"
#include "ipc.h"
#include "fd.h"
-#include "sched.h"
extern struct misc_meta_data *mmd;
/** The memory mapped audio file. */
char *map;
/** Used by the scheduler. */
- struct task task;
+ struct task *task;
/** Pointer to the header of the mapped audio file. */
char *header_buf;
/** Length of the audio file header. */
uint16_t slice_bytes;
};
+/** A FEC client is always in one of these states. */
enum fec_client_state {
FEC_STATE_NONE = 0, /**< not initialized and not enabled */
FEC_STATE_DISABLED, /**< temporarily disabled */
struct timeval next_header_time;
/** Used for the last source pointer of an audio file. */
unsigned char *extra_src_buf;
+ /** Needed for the last slice of the audio file header. */
+ unsigned char *extra_header_buf;
/** Extra slices needed to store largest chunk + header. */
int num_extra_slices;
/** Contains the FEC-encoded data. */
fc->src_data = para_realloc(fc->src_data, k * sizeof(char *));
fc->enc_buf = para_realloc(fc->enc_buf, fc->mps);
fc->extra_src_buf = para_realloc(fc->extra_src_buf, fc->mps);
+ fc->extra_header_buf = para_realloc(fc->extra_header_buf, fc->mps);
fc->state = FEC_STATE_READY_TO_RUN;
fc->next_header_time.tv_sec = 0;
return 1;
}
+static void vss_get_chunk(int chunk_num, struct vss_task *vsst,
+ char **buf, size_t *sz)
+{
+ /*
+ * Chunk zero is special for header streams: It is the first portion of
+ * the audio file which consists of the audio file header. It may be
+ * arbitrary large due to embedded meta data. Audio format handlers may
+ * replace the header by a stripped one with meta data omitted which is
+ * of bounded size. We always use the stripped header for streaming
+ * rather than the unmodified header (chunk zero).
+ */
+ if (chunk_num == 0 && vsst->header_len > 0) {
+ *buf = vsst->header_buf; /* stripped header */
+ *sz = vsst->header_len;
+ return;
+ }
+ afh_get_chunk(chunk_num, &mmd->afd.afhi, vsst->map, (const char **)buf,
+ sz);
+}
+
static void compute_group_size(struct vss_task *vsst, struct fec_group *g,
int max_bytes)
{
+ char *buf;
+ size_t len;
int i, max_chunks = PARA_MAX(1LU, 150 / tv2ms(vss_chunk_time()));
+ if (g->first_chunk == 0) {
+ g->num_chunks = 1;
+ vss_get_chunk(0, vsst, &buf, &len);
+ g->bytes = len;
+ return;
+ }
+
g->num_chunks = 0;
g->bytes = 0;
/*
* of exactly one chunk for these audio formats.
*/
for (i = 0;; i++) {
- const char *buf;
- size_t len;
int chunk_num = g->first_chunk + i;
if (g->bytes > 0 && i >= max_chunks) /* duration limit */
break;
if (chunk_num >= mmd->afd.afhi.chunks_total) /* eof */
break;
- afh_get_chunk(chunk_num, &mmd->afd.afhi, vsst->map, &buf, &len);
+ vss_get_chunk(chunk_num, vsst, &buf, &len);
if (g->bytes + len > max_bytes)
break;
/* Include this chunk */
{
int ret, i, k, n, data_slices;
size_t len;
- const char *buf;
+ char *buf, *p;
struct fec_group *g = &fc->group;
if (fc->state == FEC_STATE_NONE) {
fc->current_slice_num = 0;
if (g->num == 0)
set_group_timing(fc, vsst);
-
/* setup header slices */
buf = vsst->header_buf;
for (i = 0; i < g->num_header_slices; i++) {
- fc->src_data[i] = (const unsigned char *)buf;
- buf += g->slice_bytes;
+ uint32_t payload_size;
+ if (buf + g->slice_bytes <= vsst->header_buf + vsst->header_len) {
+ fc->src_data[i] = (const unsigned char *)buf;
+ buf += g->slice_bytes;
+ continue;
+ }
+ /*
+ * Can not use vss->header_buf for this slice as it
+ * goes beyond the buffer. This slice will not be fully
+ * used.
+ */
+ payload_size = vsst->header_buf + vsst->header_len - buf;
+ memcpy(fc->extra_header_buf, buf, payload_size);
+ if (payload_size < g->slice_bytes)
+ memset(fc->extra_header_buf + payload_size, 0,
+ g->slice_bytes - payload_size);
+ /*
+ * There might be more than one header slice to fill although
+ * only the first one will be used. Set all header slices to
+ * our extra buffer.
+ */
+ while (i < g->num_header_slices)
+ fc->src_data[i++] = fc->extra_header_buf;
+ break; /* we don't want i to be increased. */
}
- /* setup data slices */
- afh_get_chunk(g->first_chunk, &mmd->afd.afhi, vsst->map, &buf, &len);
- for (; i < g->num_header_slices + data_slices; i++) {
- if (buf + g->slice_bytes > vsst->map + mmd->size) {
+ /*
+ * Setup data slices. Note that for ogg streams chunk 0 points to a
+ * buffer on the heap rather than to the mapped audio file.
+ */
+ vss_get_chunk(g->first_chunk, vsst, &buf, &len);
+ for (p = buf; i < g->num_header_slices + data_slices; i++) {
+ if (p + g->slice_bytes > buf + g->bytes) {
/*
- * Can not use the memory mapped audio file for this
- * slice as it goes beyond the map. This slice will not
- * be fully used.
+ * We must make a copy for this slice since using p
+ * directly would exceed the buffer.
*/
- uint32_t payload_size = vsst->map + mmd->size - buf;
- memcpy(fc->extra_src_buf, buf, payload_size);
+ uint32_t payload_size = buf + g->bytes - p;
+ assert(payload_size + FEC_HEADER_SIZE <= fc->mps);
+ memcpy(fc->extra_src_buf, p, payload_size);
if (payload_size < g->slice_bytes)
memset(fc->extra_src_buf + payload_size, 0,
g->slice_bytes - payload_size);
i++;
break;
}
- fc->src_data[i] = (const unsigned char *)buf;
- buf += g->slice_bytes;
+ fc->src_data[i] = (const unsigned char *)p;
+ p += g->slice_bytes;
}
if (i < k) {
/* use arbitrary data for all remaining slices */
free(fc->src_data);
free(fc->enc_buf);
free(fc->extra_src_buf);
+ free(fc->extra_header_buf);
fec_free(fc->parms);
free(fc);
}
return ret < 0? 1 : 0;
}
-static void compute_slice_timeout(struct timeval *timeout)
-{
- struct fec_client *fc;
-
- list_for_each_entry(fc, &fec_client_list, node) {
- struct timeval diff;
-
- if (fc->state != FEC_STATE_READY_TO_RUN)
- continue;
- if (next_slice_is_due(fc, &diff)) {
- timeout->tv_sec = 0;
- timeout->tv_usec = 0;
- return;
- }
- /* timeout = min(timeout, diff) */
- if (tv_diff(&diff, timeout, NULL) < 0)
- *timeout = diff;
- }
-}
-
static void set_eof_barrier(struct vss_task *vsst)
{
struct fec_client *fc;
return -1;
}
-/*
- * != NULL: timeout for next chunk
- * NULL: nothing to do
- */
-static struct timeval *vss_compute_timeout(struct vss_task *vsst)
+static void vss_compute_timeout(struct sched *s, struct vss_task *vsst)
{
- static struct timeval the_timeout;
- struct timeval next_chunk;
-
- if (vss_next() && vsst->map) {
- /* only sleep a bit, nec*/
- the_timeout.tv_sec = 0;
- the_timeout.tv_usec = 100;
- return &the_timeout;
- }
- if (chk_barrier("autoplay_delay", &vsst->autoplay_barrier,
- &the_timeout, 1) < 0)
- return &the_timeout;
- if (chk_barrier("eof", &vsst->eof_barrier, &the_timeout, 1) < 0)
- return &the_timeout;
- if (chk_barrier("data send", &vsst->data_send_barrier,
- &the_timeout, 1) < 0)
- return &the_timeout;
+ struct timeval tv;
+ struct fec_client *fc;
+
if (!vss_playing() || !vsst->map)
- return NULL;
+ return;
+ if (vss_next() && vsst->map) /* only sleep a bit, nec*/
+ return sched_request_timeout_ms(100, s);
+
+ /* Each of these barriers must have passed until we may proceed */
+ if (sched_request_barrier(&vsst->autoplay_barrier, s) == 1)
+ return;
+ if (sched_request_barrier(&vsst->eof_barrier, s) == 1)
+ return;
+ if (sched_request_barrier(&vsst->data_send_barrier, s) == 1)
+ return;
+ /*
+ * Compute the select timeout as the minimal time until the next
+ * chunk/slice is due for any client.
+ */
compute_chunk_time(mmd->chunks_sent, &mmd->afd.afhi.chunk_tv,
- &mmd->stream_start, &next_chunk);
- if (chk_barrier("chunk", &next_chunk, &the_timeout, 0) >= 0) {
- /* chunk is due or bof */
- the_timeout.tv_sec = 0;
- the_timeout.tv_usec = 0;
- return &the_timeout;
+ &mmd->stream_start, &tv);
+ if (sched_request_barrier_or_min_delay(&tv, s) == 0)
+ return;
+ list_for_each_entry(fc, &fec_client_list, node) {
+ if (fc->state != FEC_STATE_READY_TO_RUN)
+ continue;
+ if (next_slice_is_due(fc, &tv))
+ return sched_min_delay(s);
+ sched_request_timeout(&tv, s);
}
- /* compute min of current timeout and next slice time */
- compute_slice_timeout(&the_timeout);
- return &the_timeout;
}
static void vss_eof(struct vss_task *vsst)
mmd->afd.afhi.chunk_tv.tv_usec = 0;
free(mmd->afd.afhi.chunk_table);
mmd->afd.afhi.chunk_table = NULL;
- mmd->mtime = 0;
mmd->size = 0;
mmd->events++;
}
mmd->offset = tv2ms(&offset);
}
-/**
- * Compute the timeout for the main select-loop of the scheduler.
- *
- * \param s Pointer to the server scheduler.
- * \param t Pointer to the vss task structure.
- *
- * Before the timeout is computed, the current vss status flags are evaluated
- * and acted upon by calling appropriate functions from the lower layers.
- * Possible actions include
- *
- * - request a new audio file from afs,
- * - shutdown of all senders (stop/pause command),
- * - reposition the stream (ff/jmp command).
- */
-static void vss_pre_select(struct sched *s, struct task *t)
+static void vss_pre_select(struct sched *s, void *context)
{
int i;
- struct timeval *tv;
- struct vss_task *vsst = container_of(t, struct vss_task, task);
+ struct vss_task *vsst = context;
- if (!vsst->map || vss_next() || vss_paused() || vss_repos()) {
- struct fec_client *fc, *tmp;
- for (i = 0; senders[i].name; i++)
- if (senders[i].shutdown_clients)
- senders[i].shutdown_clients();
- list_for_each_entry_safe(fc, tmp, &fec_client_list, node)
- fc->state = FEC_STATE_NONE;
- mmd->stream_start.tv_sec = 0;
- mmd->stream_start.tv_usec = 0;
- }
- if (vss_next())
- vss_eof(vsst);
- else if (vss_paused()) {
- if (mmd->chunks_sent)
- set_eof_barrier(vsst);
- mmd->chunks_sent = 0;
- } else if (vss_repos()) {
- tv_add(now, &vsst->announce_tv, &vsst->data_send_barrier);
- set_eof_barrier(vsst);
- mmd->chunks_sent = 0;
- mmd->current_chunk = mmd->repos_request;
- mmd->new_vss_status_flags &= ~VSS_REPOS;
- set_mmd_offset();
- }
if (need_to_request_new_audio_file(vsst)) {
PARA_DEBUG_LOG("ready and playing, but no audio file\n");
para_fd_set(vsst->afs_socket, &s->wfds, &s->max_fileno);
continue;
senders[i].pre_select(&s->max_fileno, &s->rfds, &s->wfds);
}
- tv = vss_compute_timeout(vsst);
- if (tv)
- sched_request_timeout(tv, s);
+ vss_compute_timeout(s, vsst);
}
static int recv_afs_msg(int afs_socket, int *fd, uint32_t *code, uint32_t *data)
{
- char control[255], buf[8];
+ char control[255] __a_aligned(8), buf[8];
struct msghdr msg = {.msg_iov = NULL};
struct cmsghdr *cmsg;
struct iovec iov;
return 1;
}
+#ifndef MAP_POPULATE
+#define MAP_POPULATE 0
+#endif
+
static void recv_afs_result(struct vss_task *vsst, fd_set *rfds)
{
int ret, passed_fd, shmid;
goto err;
}
mmd->size = statbuf.st_size;
- mmd->mtime = statbuf.st_mtime;
- ret = para_mmap(mmd->size, PROT_READ, MAP_PRIVATE, passed_fd,
- 0, &vsst->map);
+ ret = para_mmap(mmd->size, PROT_READ, MAP_PRIVATE | MAP_POPULATE,
+ passed_fd, 0, &vsst->map);
if (ret < 0)
goto err;
close(passed_fd);
compute_chunk_time(mmd->chunks_sent, &mmd->afd.afhi.chunk_tv,
&mmd->stream_start, &due);
if (tv_diff(&due, now, NULL) <= 0) {
- const char *buf;
+ char *buf;
size_t len;
if (!mmd->chunks_sent) {
* they might have still some data queued which can be sent in
* this case.
*/
- afh_get_chunk(mmd->current_chunk, &mmd->afd.afhi, vsst->map,
- &buf, &len);
+ vss_get_chunk(mmd->current_chunk, vsst, &buf, &len);
for (i = 0; senders[i].name; i++) {
if (!senders[i].send)
continue;
senders[i].send(mmd->current_chunk, mmd->chunks_sent,
buf, len, vsst->header_buf, vsst->header_len);
}
+ /*
+ * Prefault next chunk(s)
+ *
+ * If the backing device of the memory-mapped audio file is
+ * slow and read-ahead is turned off or prevented for some
+ * reason, e.g. due to memory pressure, it may take much longer
+ * than the chunk interval to get the next chunk on the wire,
+ * causing buffer underruns on the client side. Mapping the
+ * file with MAP_POPULATE seems to help a bit, but it does not
+ * eliminate the delays completely. Moreover, it is supported
+ * only on Linux. So we do our own read-ahead here.
+ */
+ if (mmd->current_chunk > 0) { /* chunk 0 might be on the heap */
+ buf += len;
+ for (i = 0; i < 5 && buf < vsst->map + mmd->size; i++) {
+ __a_unused volatile char x = *buf;
+ buf += 4096;
+ }
+ }
mmd->chunks_sent++;
mmd->current_chunk++;
}
}
-static void vss_post_select(struct sched *s, struct task *t)
+static int vss_post_select(struct sched *s, void *context)
{
int ret, i;
- struct vss_task *vsst = container_of(t, struct vss_task, task);
-
+ struct vss_task *vsst = context;
+ if (!vsst->map || vss_next() || vss_paused() || vss_repos()) {
+ /* shut down senders and fec clients */
+ struct fec_client *fc, *tmp;
+ for (i = 0; senders[i].name; i++)
+ if (senders[i].shutdown_clients)
+ senders[i].shutdown_clients();
+ list_for_each_entry_safe(fc, tmp, &fec_client_list, node)
+ fc->state = FEC_STATE_NONE;
+ mmd->stream_start.tv_sec = 0;
+ mmd->stream_start.tv_usec = 0;
+ }
+ if (vss_next())
+ vss_eof(vsst);
+ else if (vss_paused()) {
+ if (mmd->chunks_sent)
+ set_eof_barrier(vsst);
+ mmd->chunks_sent = 0;
+ } else if (vss_repos()) { /* repositioning due to ff/jmp command */
+ tv_add(now, &vsst->announce_tv, &vsst->data_send_barrier);
+ set_eof_barrier(vsst);
+ mmd->chunks_sent = 0;
+ mmd->current_chunk = afh_get_start_chunk(mmd->repos_request,
+ &mmd->afd.afhi);
+ mmd->new_vss_status_flags &= ~VSS_REPOS;
+ set_mmd_offset();
+ }
+ /* If a sender command is pending, run it. */
if (mmd->sender_cmd_data.cmd_num >= 0) {
int num = mmd->sender_cmd_data.cmd_num,
sender_num = mmd->sender_cmd_data.sender_num;
recv_afs_result(vsst, &s->rfds);
else if (FD_ISSET(vsst->afs_socket, &s->wfds)) {
PARA_NOTICE_LOG("requesting new fd from afs\n");
- ret = send_buffer(vsst->afs_socket, "new");
+ ret = write_buffer(vsst->afs_socket, "new");
if (ret < 0)
PARA_CRIT_LOG("%s\n", para_strerror(-ret));
else
(vss_next() && vss_playing()))
tv_add(now, &vsst->announce_tv, &vsst->data_send_barrier);
vss_send(vsst);
+ return 0;
}
/**
* Initialize the virtual streaming system task.
*
* \param afs_socket The fd for communication with afs.
+ * \param s The scheduler to register the vss task to.
*
* This also initializes all supported senders and starts streaming
* if the --autoplay command line flag was given.
*/
-void init_vss_task(int afs_socket)
+void init_vss_task(int afs_socket, struct sched *s)
{
static struct vss_task vss_task_struct, *vsst = &vss_task_struct;
int i;
conf.autoplay_delay_arg : 0;
vsst->header_interval.tv_sec = 5; /* should this be configurable? */
vsst->afs_socket = afs_socket;
- vsst->task.pre_select = vss_pre_select;
- vsst->task.post_select = vss_post_select;
ms2tv(announce_time, &vsst->announce_tv);
PARA_INFO_LOG("announce timeval: %lums\n", tv2ms(&vsst->announce_tv));
INIT_LIST_HEAD(&fec_client_list);
tv_add(&vsst->autoplay_barrier, &vsst->announce_tv,
&vsst->data_send_barrier);
}
- register_task(&vsst->task);
+ vsst->task = task_register(&(struct task_info) {
+ .name = "vss task",
+ .pre_select = vss_pre_select,
+ .post_select = vss_post_select,
+ .context = vsst,
+ }, s);
}