/*
- * Copyright (C) 1997-2008 Andre Noll <maan@systemlinux.org>
+ * Copyright (C) 1997-2009 Andre Noll <maan@systemlinux.org>
*
* Licensed under the GPL v2. For licencing details see COPYING.
*/
* senders.
*/
-#include <sys/mman.h> /* mmap */
-#include <sys/time.h> /* gettimeofday */
-#include <sys/types.h>
#include <dirent.h>
#include "para.h"
#include "send.h"
#include "ipc.h"
#include "fd.h"
+#include "sched.h"
extern struct misc_meta_data *mmd;
extern void dccp_send_init(struct sender *);
extern void http_send_init(struct sender *);
-extern void ortp_send_init(struct sender *);
+extern void udp_send_init(struct sender *);
-/** the list of supported senders */
+/** The list of supported senders. */
struct sender senders[] = {
{
.name = "http",
.name = "dccp",
.init = dccp_send_init,
},
-#ifdef HAVE_ORTP
{
- .name = "ortp",
- .init = ortp_send_init,
+ .name = "udp",
+ .init = udp_send_init,
},
-#endif
{
.name = NULL,
}
};
-/** The possible states of the afs socket. See \ref afs_socket. */
+/** The possible states of the afs socket. */
enum afs_socket_status {
/** Socket is inactive. */
AFS_SOCKET_READY,
/** Socket fd was included in the write fd set for select(). */
AFS_SOCKET_CHECK_FOR_WRITE,
- /** vss wrote a request to the socket and waits for afs to reply. */
+ /** vss wrote a request to the socket and waits for reply from afs. */
AFS_SOCKET_AFD_PENDING
};
-
+/** The task structure for the virtual streaming system. */
struct vss_task {
+ /** Copied from the -announce_time command line option. */
struct timeval announce_tv;
+ /** End of the announcing interval. */
struct timeval data_send_barrier;
+ /** End of the EOF interval. */
struct timeval eof_barrier;
+ /** Only used if --autoplay_delay was given. */
struct timeval autoplay_barrier;
+ /** Used for afs-server communication. */
+ int afs_socket;
+ /** The current state of \a afs_socket. */
enum afs_socket_status afsss;
+ /** The memory mapped audio file. */
char *map;
+ /** Used by the scheduler. */
+ struct task task;
+ /** Pointer to the header of the mapped audio file. */
+ const char *header_buf;
+ /** Length of the audio file header. */
+ size_t header_len;
};
-static struct vss_task vss_task_struct, *vsst = &vss_task_struct;
-
/**
* Check if vss status flag \a P (playing) is set.
*
&& !(mmd->new_vss_status_flags & VSS_PLAYING);
}
-/**
- * Get the data of the given chunk.
- *
- * \param chunk_num The number of the desired chunk.
- * \param buf Chunk data.
- * \param len Chunk length in bytes.
- *
- * \return Standard.
- */
-int vss_get_chunk(long unsigned chunk_num, char **buf, size_t *len)
-{
- if (!vsst->map || !vss_playing())
- return -E_CHUNK;
- if (chunk_num >= mmd->afd.afhi.chunks_total)
- return -E_CHUNK;
- afh_get_chunk(chunk_num, &mmd->afd.afhi, vsst->map, buf, len);
- return 1;
-}
-
-/**
- * Initialize the virtual streaming system.
- *
- * This also initializes all supported senders and starts streaming
- * if the --autoplay command line flag was given.
- */
-void vss_init(void)
-{
- int i;
- char *hn = para_hostname(), *home = para_homedir();
- long unsigned announce_time = conf.announce_time_arg > 0?
- conf.announce_time_arg : 300,
- autoplay_delay = conf.autoplay_delay_arg > 0?
- conf.autoplay_delay_arg : 0;
- ms2tv(announce_time, &vsst->announce_tv);
- PARA_INFO_LOG("announce timeval: %lums\n", tv2ms(&vsst->announce_tv));
- for (i = 0; senders[i].name; i++) {
- PARA_NOTICE_LOG("initializing %s sender\n", senders[i].name);
- senders[i].init(&senders[i]);
- }
- free(hn);
- free(home);
- mmd->sender_cmd_data.cmd_num = -1;
- if (conf.autoplay_given) {
- struct timeval now, tmp;
- mmd->vss_status_flags |= VSS_PLAYING;
- mmd->new_vss_status_flags |= VSS_PLAYING;
- gettimeofday(&now, NULL);
- ms2tv(autoplay_delay, &tmp);
- tv_add(&now, &tmp, &vsst->autoplay_barrier);
- }
-}
-
-static int chk_barrier(const char *bname, const struct timeval *now,
- const struct timeval *barrier, struct timeval *diff,
- int print_log)
+static int chk_barrier(const char *bname, const struct timeval *barrier,
+ struct timeval *diff, int print_log)
{
long ms;
* != NULL: timeout for next chunk
* NULL: nothing to do
*/
-static struct timeval *vss_compute_timeout(void)
+static struct timeval *vss_compute_timeout(struct vss_task *vsst)
{
static struct timeval the_timeout;
- struct timeval now, next_chunk;
+ struct timeval next_chunk;
if (vss_next() && vsst->map) {
/* only sleep a bit, nec*/
the_timeout.tv_usec = 100;
return &the_timeout;
}
- gettimeofday(&now, NULL);
- if (chk_barrier("autoplay_delay", &now, &vsst->autoplay_barrier,
+ if (chk_barrier("autoplay_delay", &vsst->autoplay_barrier,
&the_timeout, 1) < 0)
return &the_timeout;
- if (chk_barrier("eof", &now, &vsst->eof_barrier, &the_timeout, 1) < 0)
+ if (chk_barrier("eof", &vsst->eof_barrier, &the_timeout, 1) < 0)
return &the_timeout;
- if (chk_barrier("data send", &now, &vsst->data_send_barrier,
+ if (chk_barrier("data send", &vsst->data_send_barrier,
&the_timeout, 1) < 0)
return &the_timeout;
if (!vss_playing() || !vsst->map)
return NULL;
compute_chunk_time(mmd->chunks_sent, &mmd->afd.afhi.chunk_tv,
&mmd->stream_start, &next_chunk);
- if (chk_barrier("chunk", &now, &next_chunk, &the_timeout, 0) < 0)
+ if (chk_barrier("chunk", &next_chunk, &the_timeout, 0) < 0)
return &the_timeout;
/* chunk is due or bof */
the_timeout.tv_sec = 0;
return &the_timeout;
}
-static void vss_eof(void)
+static void vss_eof(struct vss_task *vsst)
{
- struct timeval now;
- char *tmp;
-
+ mmd->stream_start = *now;
if (!vsst->map)
return;
if (mmd->new_vss_status_flags & VSS_NOMORE)
mmd->new_vss_status_flags = VSS_NEXT;
- gettimeofday(&now, NULL);
- tv_add(&mmd->afd.afhi.eof_tv, &now, &vsst->eof_barrier);
+ tv_add(&mmd->afd.afhi.eof_tv, now, &vsst->eof_barrier);
para_munmap(vsst->map, mmd->size);
vsst->map = NULL;
mmd->chunks_sent = 0;
mmd->offset = 0;
mmd->afd.afhi.seconds_total = 0;
+ mmd->afd.afhi.chunk_tv.tv_sec = 0;
+ mmd->afd.afhi.chunk_tv.tv_usec = 0;
free(mmd->afd.afhi.chunk_table);
mmd->afd.afhi.chunk_table = NULL;
- tmp = make_message("%s:\n%s:\n%s:\n", status_item_list[SI_AUDIO_FILE_INFO],
+ free(mmd->afd.afhi.info_string);
+ mmd->afd.afhi.info_string = make_message("%s:\n%s:\n%s:\n", status_item_list[SI_AUDIO_FILE_INFO],
status_item_list[SI_TAGINFO1], status_item_list[SI_TAGINFO2]);
- strncpy(mmd->afd.afhi.info_string, tmp, sizeof(mmd->afd.afhi.info_string));
- mmd->afd.afhi.info_string[sizeof(mmd->afd.afhi.info_string) - 1] = '\0';
make_empty_status_items(mmd->afd.verbose_ls_output);
- free(tmp);
mmd->mtime = 0;
mmd->size = 0;
mmd->events++;
}
-/**
- * Get the header of the current audio file.
- *
- * \param buf The length of the header is stored here.
- * \param len Points to a buffer containing the header on return.
- *
- * \sa afh_get_header.
- */
-void vss_get_header(char **buf, size_t *len)
-{
- afh_get_header(&mmd->afd.afhi, vsst->map, buf, len);
-}
-
/**
* Get the list of all supported audio formats.
*
*/
struct timeval *vss_chunk_time(void)
{
- if (!vsst->map)
+ if (mmd->afd.afhi.chunk_tv.tv_sec == 0 &&
+ mmd->afd.afhi.chunk_tv.tv_usec == 0)
return NULL;
return &mmd->afd.afhi.chunk_tv;
}
-static int need_to_request_new_audio_file(void)
+static int need_to_request_new_audio_file(struct vss_task *vsst)
{
+ struct timeval diff;
+
if (vsst->map) /* have audio file */
return 0;
if (!vss_playing()) /* don't need one */
return 0;
if (vsst->afsss == AFS_SOCKET_AFD_PENDING) /* already requested one */
return 0;
+ if (chk_barrier("autoplay_delay", &vsst->autoplay_barrier,
+ &diff, 1) < 0)
+ return 0;
return 1;
}
* This function gets called from para_server to determine the timeout value
* for its main select loop.
*
- * \param rfds The set of file descriptors to be checked for reading.
- * \param wfds The set of file descriptors to be checked for writing.
- * \param max_fileno The highest-numbered file descriptor.
+ * \param s Pointer to the server scheduler.
+ * \param t Pointer to the vss task structure.
*
* Before the timeout is computed, the current vss status flags are evaluated
* and acted upon by calling appropriate functions from the lower layers.
* - request a new audio file from afs,
* - shutdown of all senders (stop/pause command),
* - reposition the stream (ff/jmp command).
- *
- * \return A pointer to a struct timeval containing the timeout for the next
- * chunk of data to be sent, or NULL if we're not sending right now.
*/
-struct timeval *vss_preselect(fd_set *rfds, fd_set *wfds, int *max_fileno)
+static void vss_pre_select(struct sched *s, struct task *t)
{
int i;
- struct timeval now;
+ struct timeval *tv, diff;
+ struct vss_task *vsst = container_of(t, struct vss_task, task);
if (!vsst->map || vss_next() || vss_paused() || vss_repos())
for (i = 0; senders[i].name; i++)
senders[i].shutdown_clients();
if (vss_next())
- vss_eof();
+ vss_eof(vsst);
else if (vss_paused()) {
- if (mmd->chunks_sent) {
- gettimeofday(&now, NULL);
- tv_add(&mmd->afd.afhi.eof_tv, &now, &vsst->eof_barrier);
- }
+ if (mmd->chunks_sent)
+ tv_add(&mmd->afd.afhi.eof_tv, now, &vsst->eof_barrier);
mmd->chunks_sent = 0;
} else if (vss_repos()) {
- gettimeofday(&now, NULL);
- tv_add(&now, &vsst->announce_tv, &vsst->data_send_barrier);
- tv_add(&mmd->afd.afhi.eof_tv, &now, &vsst->eof_barrier);
+ tv_add(now, &vsst->announce_tv, &vsst->data_send_barrier);
+ tv_add(&mmd->afd.afhi.eof_tv, now, &vsst->eof_barrier);
mmd->chunks_sent = 0;
mmd->current_chunk = mmd->repos_request;
mmd->new_vss_status_flags &= ~VSS_REPOS;
}
-
- if (need_to_request_new_audio_file()) {
+ if (need_to_request_new_audio_file(vsst)) {
PARA_DEBUG_LOG("ready and playing, but no audio file\n");
- para_fd_set(afs_socket, wfds, max_fileno);
+ para_fd_set(vsst->afs_socket, &s->wfds, &s->max_fileno);
vsst->afsss = AFS_SOCKET_CHECK_FOR_WRITE;
} else
- para_fd_set(afs_socket, rfds, max_fileno);
+ para_fd_set(vsst->afs_socket, &s->rfds, &s->max_fileno);
for (i = 0; senders[i].name; i++) {
if (!senders[i].pre_select)
continue;
- senders[i].pre_select(max_fileno, rfds, wfds);
+ senders[i].pre_select(&s->max_fileno, &s->rfds, &s->wfds);
}
- return vss_compute_timeout();
+ tv = vss_compute_timeout(vsst);
+ if (tv && tv_diff(tv, &s->timeout, &diff) < 0)
+ s->timeout = *tv;
}
-static int recv_afs_msg(int *fd, uint32_t *code, uint32_t *data)
+static int recv_afs_msg(int afs_socket, int *fd, uint32_t *code, uint32_t *data)
{
char control[255], buf[8];
struct msghdr msg = {.msg_iov = NULL};
ret = recvmsg(afs_socket, &msg, 0);
if (ret < 0)
return -ERRNO_TO_PARA_ERROR(errno);
- vsst->afsss = AFS_SOCKET_READY;
if (iov.iov_len != sizeof(buf))
return -E_AFS_SHORT_READ;
*code = *(uint32_t*)buf;
return 1;
}
-static void recv_afs_result(void)
+static void recv_afs_result(struct vss_task *vsst)
{
int ret, passed_fd, shmid;
uint32_t afs_code = 0, afs_data = 0;
struct stat statbuf;
- struct timeval now;
+ vsst->afsss = AFS_SOCKET_READY;
mmd->afd.afhi.chunk_table = NULL;
- ret = recv_afs_msg(&passed_fd, &afs_code, &afs_data);
+ ret = recv_afs_msg(vsst->afs_socket, &passed_fd, &afs_code, &afs_data);
if (ret < 0)
goto err;
PARA_DEBUG_LOG("fd: %d, code: %u, shmid: %u\n", passed_fd, afs_code,
}
mmd->size = statbuf.st_size;
mmd->mtime = statbuf.st_mtime;
- vsst->map = para_mmap(mmd->size, PROT_READ, MAP_PRIVATE,
- passed_fd, 0);
+ ret = para_mmap(mmd->size, PROT_READ, MAP_PRIVATE, passed_fd,
+ 0, &vsst->map);
+ if (ret < 0)
+ goto err;
close(passed_fd);
mmd->chunks_sent = 0;
mmd->current_chunk = 0;
mmd->events++;
mmd->num_played++;
mmd->new_vss_status_flags &= (~VSS_NEXT);
- gettimeofday(&now, NULL);
- tv_add(&now, &vsst->announce_tv, &vsst->data_send_barrier);
+ afh_get_header(&mmd->afd.afhi, vsst->map, &vsst->header_buf,
+ &vsst->header_len);
return;
err:
free(mmd->afd.afhi.chunk_table);
* each supported sender's send() function which is supposed to send out the data
* to all connected clients.
*/
-static void vss_send_chunk(void)
+static void vss_send_chunk(struct vss_task *vsst)
{
int i;
- struct timeval now, due;
- char *buf;
+ struct timeval due;
+ const char *buf;
size_t len;
if (!vsst->map || !vss_playing())
return;
- gettimeofday(&now, NULL);
compute_chunk_time(mmd->chunks_sent, &mmd->afd.afhi.chunk_tv,
&mmd->stream_start, &due);
- if (tv_diff(&due, &now, NULL) > 0)
+ if (tv_diff(&due, now, NULL) > 0)
return;
- if (chk_barrier("eof", &now, &vsst->eof_barrier, &due, 1) < 0)
+ if (chk_barrier("eof", &vsst->eof_barrier, &due, 1) < 0)
return;
- if (chk_barrier("data send", &now, &vsst->data_send_barrier,
+ if (chk_barrier("data send", &vsst->data_send_barrier,
&due, 1) < 0)
return;
- mmd->new_vss_status_flags &= ~VSS_REPOS;
if (mmd->current_chunk >= mmd->afd.afhi.chunks_total) { /* eof */
mmd->new_vss_status_flags |= VSS_NEXT;
return;
*/
if (!mmd->chunks_sent) {
struct timeval tmp;
- gettimeofday(&mmd->stream_start, NULL);
+ mmd->stream_start = *now;
tv_scale(mmd->current_chunk, &mmd->afd.afhi.chunk_tv, &tmp);
mmd->offset = tv2ms(&tmp);
mmd->events++;
}
afh_get_chunk(mmd->current_chunk, &mmd->afd.afhi, vsst->map, &buf, &len);
for (i = 0; senders[i].name; i++)
- senders[i].send(mmd->current_chunk, mmd->chunks_sent, buf, len);
+ senders[i].send(mmd->current_chunk, mmd->chunks_sent, buf, len,
+ vsst->header_buf, vsst->header_len);
mmd->new_vss_status_flags |= VSS_PLAYING;
mmd->chunks_sent++;
mmd->current_chunk++;
}
-void vss_post_select(fd_set *rfds, fd_set *wfds)
+static void vss_post_select(struct sched *s, struct task *t)
{
int ret, i;
+ struct vss_task *vsst = container_of(t, struct vss_task, task);
if (mmd->sender_cmd_data.cmd_num >= 0) {
int num = mmd->sender_cmd_data.cmd_num,
- s = mmd->sender_cmd_data.sender_num;
+ sender_num = mmd->sender_cmd_data.sender_num;
- if (senders[s].client_cmds[num])
- senders[s].client_cmds[num](&mmd->sender_cmd_data);
+ if (senders[sender_num].client_cmds[num])
+ senders[sender_num].client_cmds[num](&mmd->sender_cmd_data);
mmd->sender_cmd_data.cmd_num = -1;
}
if (vsst->afsss != AFS_SOCKET_CHECK_FOR_WRITE) {
- if (FD_ISSET(afs_socket, rfds))
- recv_afs_result();
- } else if (FD_ISSET(afs_socket, wfds)) {
+ if (FD_ISSET(vsst->afs_socket, &s->rfds))
+ recv_afs_result(vsst);
+ } else if (FD_ISSET(vsst->afs_socket, &s->wfds)) {
PARA_NOTICE_LOG("requesting new fd from afs\n");
- ret = send_buffer(afs_socket, "new");
+ ret = send_buffer(vsst->afs_socket, "new");
vsst->afsss = AFS_SOCKET_AFD_PENDING;
}
for (i = 0; senders[i].name; i++) {
if (!senders[i].post_select)
continue;
- senders[i].post_select(rfds, wfds);
+ senders[i].post_select(&s->rfds, &s->wfds);
+ }
+ if ((vss_playing() && !(mmd->vss_status_flags & VSS_PLAYING)) ||
+ (vss_next() && vss_playing()))
+ tv_add(now, &vsst->announce_tv, &vsst->data_send_barrier);
+ vss_send_chunk(vsst);
+}
+
+/**
+ * Initialize the virtual streaming system task.
+ *
+ * \param afs_socket The fd for communication with afs.
+ *
+ * This also initializes all supported senders and starts streaming
+ * if the --autoplay command line flag was given.
+ */
+void init_vss_task(int afs_socket)
+{
+ static struct vss_task vss_task_struct, *vsst = &vss_task_struct;
+ int i;
+ char *hn = para_hostname(), *home = para_homedir();
+ long unsigned announce_time = conf.announce_time_arg > 0?
+ conf.announce_time_arg : 300,
+ autoplay_delay = conf.autoplay_delay_arg > 0?
+ conf.autoplay_delay_arg : 0;
+
+ vsst->afs_socket = afs_socket;
+ vsst->task.pre_select = vss_pre_select;
+ vsst->task.post_select = vss_post_select;
+ ms2tv(announce_time, &vsst->announce_tv);
+ PARA_INFO_LOG("announce timeval: %lums\n", tv2ms(&vsst->announce_tv));
+ for (i = 0; senders[i].name; i++) {
+ PARA_NOTICE_LOG("initializing %s sender\n", senders[i].name);
+ senders[i].init(&senders[i]);
+ }
+ free(hn);
+ free(home);
+ mmd->sender_cmd_data.cmd_num = -1;
+ make_empty_status_items(mmd->afd.verbose_ls_output);
+ if (conf.autoplay_given) {
+ struct timeval tmp;
+ mmd->vss_status_flags |= VSS_PLAYING;
+ mmd->new_vss_status_flags |= VSS_PLAYING;
+ ms2tv(autoplay_delay, &tmp);
+ tv_add(now, &tmp, &vsst->autoplay_barrier);
+ tv_add(&vsst->autoplay_barrier, &vsst->announce_tv,
+ &vsst->data_send_barrier);
}
- vss_send_chunk();
+ register_task(&vsst->task);
}