/*
- * Copyright (C) 1997-2008 Andre Noll <maan@systemlinux.org>
+ * Copyright (C) 1997-2009 Andre Noll <maan@systemlinux.org>
*
* Licensed under the GPL v2. For licencing details see COPYING.
*/
* senders.
*/
-#include <sys/mman.h> /* mmap */
-#include <sys/time.h> /* gettimeofday */
-#include <sys/types.h>
#include <dirent.h>
#include "para.h"
#include "send.h"
#include "ipc.h"
#include "fd.h"
-
-static struct timeval announce_tv;
-static struct timeval data_send_barrier;
-static struct timeval eof_barrier;
-static struct timeval autoplay_barrier;
+#include "sched.h"
extern struct misc_meta_data *mmd;
-extern struct sender senders[];
-static char *map;
+extern void dccp_send_init(struct sender *);
+extern void http_send_init(struct sender *);
+extern void udp_send_init(struct sender *);
+
+/** The list of supported senders. */
+struct sender senders[] = {
+ {
+ .name = "http",
+ .init = http_send_init,
+ },
+ {
+ .name = "dccp",
+ .init = dccp_send_init,
+ },
+ {
+ .name = "udp",
+ .init = udp_send_init,
+ },
+ {
+ .name = NULL,
+ }
+};
+
+/** The possible states of the afs socket. */
+enum afs_socket_status {
+ /** Socket is inactive. */
+ AFS_SOCKET_READY,
+ /** Socket fd was included in the write fd set for select(). */
+ AFS_SOCKET_CHECK_FOR_WRITE,
+ /** vss wrote a request to the socket and waits for reply from afs. */
+ AFS_SOCKET_AFD_PENDING
+};
+
+/** The task structure for the virtual streaming system. */
+struct vss_task {
+ /** Copied from the -announce_time command line option. */
+ struct timeval announce_tv;
+ /** End of the announcing interval. */
+ struct timeval data_send_barrier;
+ /** End of the EOF interval. */
+ struct timeval eof_barrier;
+ /** Only used if --autoplay_delay was given. */
+ struct timeval autoplay_barrier;
+ /** Used for afs-server communication. */
+ int afs_socket;
+ /** The current state of \a afs_socket. */
+ enum afs_socket_status afsss;
+ /** The memory mapped audio file. */
+ char *map;
+ /** Used by the scheduler. */
+ struct task task;
+ /** Pointer to the header of the mapped audio file. */
+ const char *header_buf;
+ /** Length of the audio file header. */
+ size_t header_len;
+};
/**
* Check if vss status flag \a P (playing) is set.
&& !(mmd->new_vss_status_flags & VSS_PLAYING);
}
-/**
- * Initialize the virtual streaming system.
- *
- * This also initializes all supported senders and starts streaming
- * if the --autoplay command line flag was given.
- */
-void vss_init(void)
-{
- int i;
- char *hn = para_hostname(), *home = para_homedir();
- long unsigned announce_time = conf.announce_time_arg > 0?
- conf.announce_time_arg : 300,
- autoplay_delay = conf.autoplay_delay_arg > 0?
- conf.autoplay_delay_arg : 0;
- ms2tv(announce_time, &announce_tv);
- PARA_INFO_LOG("announce timeval: %lums\n", tv2ms(&announce_tv));
- for (i = 0; senders[i].name; i++) {
- PARA_NOTICE_LOG("initializing %s sender\n", senders[i].name);
- senders[i].init(&senders[i]);
- }
- free(hn);
- free(home);
- if (conf.autoplay_given) {
- struct timeval now, tmp;
- mmd->vss_status_flags |= VSS_PLAYING;
- mmd->new_vss_status_flags |= VSS_PLAYING;
- gettimeofday(&now, NULL);
- ms2tv(autoplay_delay, &tmp);
- tv_add(&now, &tmp, &autoplay_barrier);
- }
-}
-
-static int chk_barrier(const char *bname, const struct timeval *now,
- const struct timeval *barrier, struct timeval *diff,
- int print_log)
+static int chk_barrier(const char *bname, const struct timeval *barrier,
+ struct timeval *diff, int print_log)
{
long ms;
* != NULL: timeout for next chunk
* NULL: nothing to do
*/
-static struct timeval *vss_compute_timeout(void)
+static struct timeval *vss_compute_timeout(struct vss_task *vsst)
{
static struct timeval the_timeout;
- struct timeval now, next_chunk;
+ struct timeval next_chunk;
- if (vss_next() && map) {
+ if (vss_next() && vsst->map) {
/* only sleep a bit, nec*/
the_timeout.tv_sec = 0;
the_timeout.tv_usec = 100;
return &the_timeout;
}
- gettimeofday(&now, NULL);
- if (chk_barrier("autoplay_delay", &now, &autoplay_barrier,
+ if (chk_barrier("autoplay_delay", &vsst->autoplay_barrier,
&the_timeout, 1) < 0)
return &the_timeout;
- if (chk_barrier("eof", &now, &eof_barrier, &the_timeout, 1) < 0)
+ if (chk_barrier("eof", &vsst->eof_barrier, &the_timeout, 1) < 0)
return &the_timeout;
- if (chk_barrier("data send", &now, &data_send_barrier,
+ if (chk_barrier("data send", &vsst->data_send_barrier,
&the_timeout, 1) < 0)
return &the_timeout;
- if (!vss_playing() || !map)
+ if (!vss_playing() || !vsst->map)
return NULL;
compute_chunk_time(mmd->chunks_sent, &mmd->afd.afhi.chunk_tv,
&mmd->stream_start, &next_chunk);
- if (chk_barrier("chunk", &now, &next_chunk, &the_timeout, 0) < 0)
+ if (chk_barrier("chunk", &next_chunk, &the_timeout, 0) < 0)
return &the_timeout;
/* chunk is due or bof */
the_timeout.tv_sec = 0;
return &the_timeout;
}
-static void vss_eof(void)
+static void vss_eof(struct vss_task *vsst)
{
- struct timeval now;
- int i;
- char *tmp;
-
- if (!map) {
- for (i = 0; senders[i].name; i++)
- senders[i].shutdown_clients();
+ mmd->stream_start = *now;
+ if (!vsst->map)
return;
- }
if (mmd->new_vss_status_flags & VSS_NOMORE)
mmd->new_vss_status_flags = VSS_NEXT;
- gettimeofday(&now, NULL);
- tv_add(&mmd->afd.afhi.eof_tv, &now, &eof_barrier);
- munmap(map, mmd->size);
- map = NULL;
+ tv_add(&mmd->afd.afhi.eof_tv, now, &vsst->eof_barrier);
+ para_munmap(vsst->map, mmd->size);
+ vsst->map = NULL;
mmd->chunks_sent = 0;
mmd->offset = 0;
mmd->afd.afhi.seconds_total = 0;
+ mmd->afd.afhi.chunk_tv.tv_sec = 0;
+ mmd->afd.afhi.chunk_tv.tv_usec = 0;
free(mmd->afd.afhi.chunk_table);
mmd->afd.afhi.chunk_table = NULL;
- tmp = make_message("%s:\n%s:\n%s:\n", status_item_list[SI_AUDIO_FILE_INFO],
+ free(mmd->afd.afhi.info_string);
+ mmd->afd.afhi.info_string = make_message("%s:\n%s:\n%s:\n", status_item_list[SI_AUDIO_FILE_INFO],
status_item_list[SI_TAGINFO1], status_item_list[SI_TAGINFO2]);
- strncpy(mmd->afd.afhi.info_string, tmp, sizeof(mmd->afd.afhi.info_string));
- mmd->afd.afhi.info_string[sizeof(mmd->afd.afhi.info_string) - 1] = '\0';
make_empty_status_items(mmd->afd.verbose_ls_output);
- free(tmp);
mmd->mtime = 0;
mmd->size = 0;
mmd->events++;
}
-/**
- * Get the header of the current audio file.
- *
- * \param buf The length of the header is stored here.
- * \param len Points to a buffer containing the header on return.
- *
- * \sa afh_get_header.
- */
-void vss_get_header(char **buf, size_t *len)
-{
- afh_get_header(&mmd->afd.afhi, map, buf, len);
-}
-
/**
* Get the list of all supported audio formats.
*
*/
struct timeval *vss_chunk_time(void)
{
- if (!map)
+ if (mmd->afd.afhi.chunk_tv.tv_sec == 0 &&
+ mmd->afd.afhi.chunk_tv.tv_usec == 0)
return NULL;
return &mmd->afd.afhi.chunk_tv;
}
-/** The possible states of the afs socket. See \ref afs_socket. */
-enum afs_socket_status {
- /** Socket is inactive. */
- AFS_SOCKET_READY,
- /** Socket fd was included in the write fd set for select(). */
- AFS_SOCKET_CHECK_FOR_WRITE,
- /** vss wrote a request to the socket and waits for afs to reply. */
- AFS_SOCKET_AFD_PENDING
-};
+static int need_to_request_new_audio_file(struct vss_task *vsst)
+{
+ struct timeval diff;
-static enum afs_socket_status afsss;
+ if (vsst->map) /* have audio file */
+ return 0;
+ if (!vss_playing()) /* don't need one */
+ return 0;
+ if (mmd->new_vss_status_flags & VSS_NOMORE)
+ return 0;
+ if (vsst->afsss == AFS_SOCKET_AFD_PENDING) /* already requested one */
+ return 0;
+ if (chk_barrier("autoplay_delay", &vsst->autoplay_barrier,
+ &diff, 1) < 0)
+ return 0;
+ return 1;
+}
/**
* Compute the timeout for para_server's main select-loop.
* This function gets called from para_server to determine the timeout value
* for its main select loop.
*
- * \param rfds The set of file descriptors to be checked for reading.
- * \param wfds The set of file descriptors to be checked for writing.
- * \param max_fileno The highest-numbered file descriptor.
+ * \param s Pointer to the server scheduler.
+ * \param t Pointer to the vss task structure.
*
* Before the timeout is computed, the current vss status flags are evaluated
* and acted upon by calling appropriate functions from the lower layers.
* - request a new audio file from afs,
* - shutdown of all senders (stop/pause command),
* - reposition the stream (ff/jmp command).
- *
- * \return A pointer to a struct timeval containing the timeout for the next
- * chunk of data to be sent, or NULL if we're not sending right now.
*/
-struct timeval *vss_preselect(fd_set *rfds, fd_set *wfds, int *max_fileno)
+static void vss_pre_select(struct sched *s, struct task *t)
{
int i;
- struct timeval *tv;
+ struct timeval *tv, diff;
+ struct vss_task *vsst = container_of(t, struct vss_task, task);
- para_fd_set(afs_socket, rfds, max_fileno);
- if (!map)
- for (i = 0; senders[i].name; i++)
- senders[i].shutdown_clients();
- else {
- if (vss_next()) {
- vss_eof();
- return vss_compute_timeout();
- }
- }
- if (vss_paused() || vss_repos()) {
+ if (!vsst->map || vss_next() || vss_paused() || vss_repos())
for (i = 0; senders[i].name; i++)
senders[i].shutdown_clients();
- if (map) {
- struct timeval now;
- gettimeofday(&now, NULL);
- if (!vss_paused() || mmd->chunks_sent)
- tv_add(&mmd->afd.afhi.eof_tv, &now, &eof_barrier);
- if (vss_repos())
- tv_add(&now, &announce_tv, &data_send_barrier);
- }
+ if (vss_next())
+ vss_eof(vsst);
+ else if (vss_paused()) {
+ if (mmd->chunks_sent)
+ tv_add(&mmd->afd.afhi.eof_tv, now, &vsst->eof_barrier);
+ mmd->chunks_sent = 0;
+ } else if (vss_repos()) {
+ tv_add(now, &vsst->announce_tv, &vsst->data_send_barrier);
+ tv_add(&mmd->afd.afhi.eof_tv, now, &vsst->eof_barrier);
mmd->chunks_sent = 0;
- }
- if (vss_repos()) {
- mmd->new_vss_status_flags &= ~(VSS_REPOS);
mmd->current_chunk = mmd->repos_request;
+ mmd->new_vss_status_flags &= ~VSS_REPOS;
}
- tv = vss_compute_timeout();
- if (tv)
- return tv;
- if (!map && vss_playing() &&
- !(mmd->new_vss_status_flags & VSS_NOMORE)) {
- if (afsss == AFS_SOCKET_READY ||
- afsss == AFS_SOCKET_CHECK_FOR_WRITE) {
- PARA_DEBUG_LOG("ready and playing, but no audio file\n");
- para_fd_set(afs_socket, wfds, max_fileno);
- afsss = AFS_SOCKET_CHECK_FOR_WRITE;
- }
+ if (need_to_request_new_audio_file(vsst)) {
+ PARA_DEBUG_LOG("ready and playing, but no audio file\n");
+ para_fd_set(vsst->afs_socket, &s->wfds, &s->max_fileno);
+ vsst->afsss = AFS_SOCKET_CHECK_FOR_WRITE;
+ } else
+ para_fd_set(vsst->afs_socket, &s->rfds, &s->max_fileno);
+ for (i = 0; senders[i].name; i++) {
+ if (!senders[i].pre_select)
+ continue;
+ senders[i].pre_select(&s->max_fileno, &s->rfds, &s->wfds);
}
- return tv;
+ tv = vss_compute_timeout(vsst);
+ if (tv && tv_diff(tv, &s->timeout, &diff) < 0)
+ s->timeout = *tv;
}
-static int recv_afs_msg(int *fd, uint32_t *code, uint32_t *data)
+static int recv_afs_msg(int afs_socket, int *fd, uint32_t *code, uint32_t *data)
{
char control[255], buf[8];
struct msghdr msg = {.msg_iov = NULL};
ret = recvmsg(afs_socket, &msg, 0);
if (ret < 0)
return -ERRNO_TO_PARA_ERROR(errno);
- afsss = AFS_SOCKET_READY;
if (iov.iov_len != sizeof(buf))
return -E_AFS_SHORT_READ;
*code = *(uint32_t*)buf;
return 1;
}
-static void recv_afs_result(void)
+static void recv_afs_result(struct vss_task *vsst)
{
int ret, passed_fd, shmid;
uint32_t afs_code = 0, afs_data = 0;
struct stat statbuf;
- struct timeval now;
+ vsst->afsss = AFS_SOCKET_READY;
mmd->afd.afhi.chunk_table = NULL;
- ret = recv_afs_msg(&passed_fd, &afs_code, &afs_data);
+ ret = recv_afs_msg(vsst->afs_socket, &passed_fd, &afs_code, &afs_data);
if (ret < 0)
goto err;
PARA_DEBUG_LOG("fd: %d, code: %u, shmid: %u\n", passed_fd, afs_code,
}
mmd->size = statbuf.st_size;
mmd->mtime = statbuf.st_mtime;
- map = para_mmap(mmd->size, PROT_READ, MAP_PRIVATE,
- passed_fd, 0);
+ ret = para_mmap(mmd->size, PROT_READ, MAP_PRIVATE, passed_fd,
+ 0, &vsst->map);
+ if (ret < 0)
+ goto err;
close(passed_fd);
mmd->chunks_sent = 0;
mmd->current_chunk = 0;
mmd->events++;
mmd->num_played++;
mmd->new_vss_status_flags &= (~VSS_NEXT);
- gettimeofday(&now, NULL);
- tv_add(&now, &announce_tv, &data_send_barrier);
+ afh_get_header(&mmd->afd.afhi, vsst->map, &vsst->header_buf,
+ &vsst->header_len);
return;
err:
free(mmd->afd.afhi.chunk_table);
mmd->new_vss_status_flags = VSS_NEXT;
}
-void vss_post_select(fd_set *rfds, fd_set *wfds)
-{
- int ret;
-
- if (FD_ISSET(afs_socket, rfds))
- recv_afs_result();
- if (afsss != AFS_SOCKET_CHECK_FOR_WRITE || !FD_ISSET(afs_socket, wfds))
- return;
- PARA_NOTICE_LOG("requesting new fd from afs\n");
- ret = send_buffer(afs_socket, "new");
- afsss = AFS_SOCKET_AFD_PENDING;
-}
-/**
- * Get the data of the given chunk.
- *
- * \param chunk_num The number of the desired chunk.
- * \param buf Chunk data.
- * \param len Chunk length in bytes.
- *
- * \return Standard.
- */
-int vss_get_chunk(long unsigned chunk_num, char **buf, size_t *len)
-{
- if (!map || !vss_playing())
- return -E_CHUNK;
- if (chunk_num >= mmd->afd.afhi.chunks_total)
- return -E_CHUNK;
- afh_get_chunk(chunk_num, &mmd->afd.afhi, map, buf, len);
- return 1;
-}
-
/**
* Main sending function.
*
* each supported sender's send() function which is supposed to send out the data
* to all connected clients.
*/
-void vss_send_chunk(void)
+static void vss_send_chunk(struct vss_task *vsst)
{
int i;
- struct timeval now, due;
- char *buf;
+ struct timeval due;
+ const char *buf;
size_t len;
- if (!map || !vss_playing())
+ if (!vsst->map || !vss_playing())
return;
- gettimeofday(&now, NULL);
compute_chunk_time(mmd->chunks_sent, &mmd->afd.afhi.chunk_tv,
&mmd->stream_start, &due);
- if (tv_diff(&due, &now, NULL) > 0)
+ if (tv_diff(&due, now, NULL) > 0)
return;
- if (chk_barrier("eof", &now, &eof_barrier, &due, 1) < 0)
+ if (chk_barrier("eof", &vsst->eof_barrier, &due, 1) < 0)
return;
- if (chk_barrier("data send", &now, &data_send_barrier,
+ if (chk_barrier("data send", &vsst->data_send_barrier,
&due, 1) < 0)
return;
- mmd->new_vss_status_flags &= ~VSS_REPOS;
if (mmd->current_chunk >= mmd->afd.afhi.chunks_total) { /* eof */
mmd->new_vss_status_flags |= VSS_NEXT;
- return vss_eof();
+ return;
}
/*
* We call the send function also in case of empty chunks as they
*/
if (!mmd->chunks_sent) {
struct timeval tmp;
- gettimeofday(&mmd->stream_start, NULL);
+ mmd->stream_start = *now;
tv_scale(mmd->current_chunk, &mmd->afd.afhi.chunk_tv, &tmp);
mmd->offset = tv2ms(&tmp);
mmd->events++;
}
- afh_get_chunk(mmd->current_chunk, &mmd->afd.afhi, map, &buf, &len);
+ afh_get_chunk(mmd->current_chunk, &mmd->afd.afhi, vsst->map, &buf, &len);
for (i = 0; senders[i].name; i++)
- senders[i].send(mmd->current_chunk, mmd->chunks_sent, buf, len);
+ senders[i].send(mmd->current_chunk, mmd->chunks_sent, buf, len,
+ vsst->header_buf, vsst->header_len);
mmd->new_vss_status_flags |= VSS_PLAYING;
mmd->chunks_sent++;
mmd->current_chunk++;
}
+
+static void vss_post_select(struct sched *s, struct task *t)
+{
+ int ret, i;
+ struct vss_task *vsst = container_of(t, struct vss_task, task);
+
+ if (mmd->sender_cmd_data.cmd_num >= 0) {
+ int num = mmd->sender_cmd_data.cmd_num,
+ sender_num = mmd->sender_cmd_data.sender_num;
+
+ if (senders[sender_num].client_cmds[num])
+ senders[sender_num].client_cmds[num](&mmd->sender_cmd_data);
+ mmd->sender_cmd_data.cmd_num = -1;
+ }
+ if (vsst->afsss != AFS_SOCKET_CHECK_FOR_WRITE) {
+ if (FD_ISSET(vsst->afs_socket, &s->rfds))
+ recv_afs_result(vsst);
+ } else if (FD_ISSET(vsst->afs_socket, &s->wfds)) {
+ PARA_NOTICE_LOG("requesting new fd from afs\n");
+ ret = send_buffer(vsst->afs_socket, "new");
+ vsst->afsss = AFS_SOCKET_AFD_PENDING;
+ }
+ for (i = 0; senders[i].name; i++) {
+ if (!senders[i].post_select)
+ continue;
+ senders[i].post_select(&s->rfds, &s->wfds);
+ }
+ if ((vss_playing() && !(mmd->vss_status_flags & VSS_PLAYING)) ||
+ (vss_next() && vss_playing()))
+ tv_add(now, &vsst->announce_tv, &vsst->data_send_barrier);
+ vss_send_chunk(vsst);
+}
+
+/**
+ * Initialize the virtual streaming system task.
+ *
+ * \param afs_socket The fd for communication with afs.
+ *
+ * This also initializes all supported senders and starts streaming
+ * if the --autoplay command line flag was given.
+ */
+void init_vss_task(int afs_socket)
+{
+ static struct vss_task vss_task_struct, *vsst = &vss_task_struct;
+ int i;
+ char *hn = para_hostname(), *home = para_homedir();
+ long unsigned announce_time = conf.announce_time_arg > 0?
+ conf.announce_time_arg : 300,
+ autoplay_delay = conf.autoplay_delay_arg > 0?
+ conf.autoplay_delay_arg : 0;
+
+ vsst->afs_socket = afs_socket;
+ vsst->task.pre_select = vss_pre_select;
+ vsst->task.post_select = vss_post_select;
+ ms2tv(announce_time, &vsst->announce_tv);
+ PARA_INFO_LOG("announce timeval: %lums\n", tv2ms(&vsst->announce_tv));
+ for (i = 0; senders[i].name; i++) {
+ PARA_NOTICE_LOG("initializing %s sender\n", senders[i].name);
+ senders[i].init(&senders[i]);
+ }
+ free(hn);
+ free(home);
+ mmd->sender_cmd_data.cmd_num = -1;
+ make_empty_status_items(mmd->afd.verbose_ls_output);
+ if (conf.autoplay_given) {
+ struct timeval tmp;
+ mmd->vss_status_flags |= VSS_PLAYING;
+ mmd->new_vss_status_flags |= VSS_PLAYING;
+ ms2tv(autoplay_delay, &tmp);
+ tv_add(now, &tmp, &vsst->autoplay_barrier);
+ tv_add(&vsst->autoplay_barrier, &vsst->announce_tv,
+ &vsst->data_send_barrier);
+ }
+ register_task(&vsst->task);
+}