--arg-struct-name=$(subst .ggo,,$<)_args_info \
--file-name=$(subst .ggo,,$<).cmdline \
--func-name $(subst _filter.ggo,,$<)_cmdline_parser < $<
+%_write.cmdline.h %_write.cmdline.c: %_write.ggo
+ gengetopt -S $(module_ggo_opts) \
+ --set-package=$(subst .ggo,,$<) \
+ --arg-struct-name=$(subst .ggo,,$<)_args_info \
+ --file-name=$(subst .ggo,,$<).cmdline \
+ --func-name $(subst _write.ggo,,$<)_cmdline_parser < $<
%.cmdline.h %.cmdline.c: %.ggo
case $< in client.ggo) O="--unamed-opts=command";; \
}
/*
- * Simple stream reposition routine
+ * nothing to do as we'll seek to the correct offset in aac read_chunk() anyway
*/
-static int aac_reposition_stream(long unsigned request)
+static int aac_reposition_stream(__a_unused long unsigned request)
{
return 1;
-// return -E_AAC_REPOS;
}
static char *aac_read_chunk(long unsigned current_chunk, ssize_t *len)
#include "para.h"
#include "list.h"
+#include "sched.h"
#include "filter.h"
#include "error.h"
#include "string.h"
static ssize_t aacdec(char *input_buffer, size_t len, struct filter_node *fn)
{
struct private_aacdec_data *padd = fn->private_data;
- struct filter_chain_info *fci = fn->fci;
+ struct filter_chain *fc = fn->fc;
int i, ret;
unsigned char *p, *outbuffer;
unsigned char *inbuf = (unsigned char*)input_buffer;
if (fn->loaded > fn->bufsize * 4 / 5)
return 0;
- if (len < 1000 && !*fci->eof)
+ if (len < 1000 && !*fc->input_eof)
return 0;
if (!padd->initialized) {
&channels) < 0)
goto out;
}
- fci->samplerate = rate;
- fci->channels = channels;
+ fc->samplerate = rate;
+ fc->channels = channels;
PARA_INFO_LOG("rate: %u, channels: %d\n",
- fci->samplerate, fci->channels);
+ fc->samplerate, fc->channels);
padd->initialized = 1;
}
if (padd->decoder_length > 0) {
--- /dev/null
+section "alsa options"
+######################
+
+option "device" d
+#~~~~~~~~~~~~~~~~
+"set PCM device"
+ string typestr="device"
+ default="plughw:0,0"
+ optional
+
+option "channels" c
+#~~~~~~~~~~~~~~~~~~
+"number of channels (only neccessary for raw
+audio)"
+
+ int typestr="num"
+ default="2"
+ optional
+
+option "sample_rate" s
+#~~~~~~~~~~~~~~~~~~~~~
+
+"force given sample rate (only neccessary for
+raw audio)"
+
+ int typestr="num"
+ default="44100"
+ optional
#include "para.h"
#include "fd.h"
#include "string.h"
+#include "list.h"
+#include "sched.h"
#include "write.h"
#include <alsa/asoundlib.h>
-#include "write.cmdline.h"
+#include "alsa_write.cmdline.h"
#include "error.h"
-extern struct gengetopt_args_info conf;
#define FORMAT SND_PCM_FORMAT_S16_LE
/** data specific to the alsa writer */
struct private_alsa_data {
-/** the alsa handle */
-snd_pcm_t *handle;
-/** determined and set by alsa_open() */
-size_t bytes_per_frame;
+ /** the alsa handle */
+ snd_pcm_t *handle;
+ /** determined and set by alsa_open() */
+ size_t bytes_per_frame;
+ /** don't write anything until this time */
+ struct timeval next_chunk;
+ /** the return value of snd_pcm_hw_params_get_buffer_time_max() */
+ unsigned buffer_time;
};
/*
snd_pcm_sw_params_t *swparams;
snd_pcm_uframes_t buffer_size, xfer_align, start_threshold,
stop_threshold;
- unsigned buffer_time = 0;
int err;
snd_pcm_info_t *info;
- snd_output_t *log;
snd_pcm_uframes_t period_size;
- struct private_alsa_data *pad = para_malloc(sizeof(struct private_alsa_data));
- w->private_data = pad;
+ struct private_alsa_data *pad = para_calloc(sizeof(struct private_alsa_data));
+ struct alsa_write_args_info *conf = w->conf;
+ w->private_data = pad;
snd_pcm_info_alloca(&info);
- if (snd_output_stdio_attach(&log, stderr, 0) < 0)
- return -E_ALSA_LOG;
- err = snd_pcm_open(&pad->handle, conf.device_arg,
+ err = snd_pcm_open(&pad->handle, conf->device_arg,
SND_PCM_STREAM_PLAYBACK, 0);
if (err < 0)
return -E_PCM_OPEN;
if (snd_pcm_hw_params_set_format(pad->handle, hwparams, FORMAT) < 0)
return -E_SAMPLE_FORMAT;
if (snd_pcm_hw_params_set_channels(pad->handle, hwparams,
- conf.channels_arg) < 0)
+ conf->channels_arg) < 0)
return -E_CHANNEL_COUNT;
if (snd_pcm_hw_params_set_rate_near(pad->handle, hwparams,
- (unsigned int*) &conf.sample_rate_arg, 0) < 0)
+ (unsigned int*) &conf->sample_rate_arg, 0) < 0)
return -E_SET_RATE;
- err = snd_pcm_hw_params_get_buffer_time_max(hwparams, &buffer_time, 0);
- if (err < 0 || !buffer_time)
+ err = snd_pcm_hw_params_get_buffer_time_max(hwparams, &pad->buffer_time, 0);
+ if (err < 0 || !pad->buffer_time)
return -E_GET_BUFFER_TIME;
- PARA_DEBUG_LOG("buffer time: %d\n", buffer_time);
+ PARA_INFO_LOG("buffer time: %d\n", pad->buffer_time);
if (snd_pcm_hw_params_set_buffer_time_near(pad->handle, hwparams,
- &buffer_time, 0) < 0)
+ &pad->buffer_time, 0) < 0)
return -E_SET_BUFFER_TIME;
if (snd_pcm_hw_params(pad->handle, hwparams) < 0)
return -E_HW_PARAMS;
snd_pcm_hw_params_get_period_size(hwparams, &period_size, 0);
snd_pcm_hw_params_get_buffer_size(hwparams, &buffer_size);
- PARA_DEBUG_LOG("buffer size: %lu, period_size: %lu\n", buffer_size,
+ PARA_INFO_LOG("buffer size: %lu, period_size: %lu\n", buffer_size,
period_size);
if (period_size == buffer_size)
return -E_BAD_PERIOD;
if (snd_pcm_sw_params(pad->handle, swparams) < 0)
return -E_SW_PARAMS;
pad->bytes_per_frame = snd_pcm_format_physical_width(FORMAT)
- * conf.channels_arg / 8;
-// if (snd_pcm_nonblock(pad->handle, 1))
-// PARA_ERROR_LOG("%s\n", "failed to set nonblock mode");
+ * conf->channels_arg / 8;
+ if (snd_pcm_nonblock(pad->handle, 1))
+ PARA_ERROR_LOG("%s\n", "failed to set nonblock mode");
return period_size * pad->bytes_per_frame;
}
+static void alsa_write_pre_select(struct sched *s, struct task *t)
+{
+ struct writer_node *wn = t->private_data;
+ struct private_alsa_data *pad = wn->private_data;
+ struct writer_node_group *wng = wn->wng;
+ struct timeval diff;
-/**
- * push out pcm frames
- * \param data pointer do data to be written
- * \param nbytes number of bytes (not frames)
- *
- * \return Number of bytes written, -E_ALSA_WRITE on errors.
- */
-static int alsa_write(char *data, size_t nbytes, struct writer_node *wn)
+ t->ret = 0;
+ if (*wng->input_eof && *wng->loaded < pad->bytes_per_frame)
+ return;
+ t->ret = 1;
+ if (*wng->loaded < pad->bytes_per_frame)
+ return;
+ if (tv_diff(&s->now, &pad->next_chunk, &diff) < 0) {
+ if (tv_diff(&s->timeout, &diff, NULL) > 0)
+ s->timeout = diff;
+ } else {
+ s->timeout.tv_sec = 0;
+ s->timeout.tv_usec = 0;
+ }
+}
+
+static void alsa_write_post_select(struct sched *s, struct task *t)
{
+ struct writer_node *wn = t->private_data;
struct private_alsa_data *pad = wn->private_data;
- size_t frames = nbytes / pad->bytes_per_frame;
- unsigned char *d = (unsigned char*)data;
- snd_pcm_sframes_t r, result = 0;
-
- while (frames > 0) {
- /* write interleaved frames */
- r = snd_pcm_writei(pad->handle, d, frames);
- if (r < 0)
- PARA_ERROR_LOG("write error: %s\n", snd_strerror(r));
- if (r == -EAGAIN || (r >= 0 && r < frames))
- snd_pcm_wait(pad->handle, 1);
- else if (r == -EPIPE)
+ struct writer_node_group *wng = wn->wng;
+ size_t frames = *wng->loaded / pad->bytes_per_frame;
+ snd_pcm_sframes_t ret, result = 0;
+ unsigned char *data = (unsigned char*)wng->buf;
+
+ t->ret = 0;
+ if (!frames) {
+ if (*wng->input_eof)
+ t->ret = *wng->loaded;
+ return;
+ }
+ if (tv_diff(&s->now, &pad->next_chunk, NULL) < 0)
+ return;
+// PARA_INFO_LOG("%zd frames\n", frames);
+// while (frames > 0) {
+ ret = snd_pcm_writei(pad->handle, data, frames);
+ if (ret == -EAGAIN || (ret >= 0 && ret < frames)) {
+ struct timeval tv;
+ ms2tv(pad->buffer_time / 2000, &tv);
+ PARA_DEBUG_LOG("EAGAIN. frames: %d, ret: %lu\n", frames, ret);
+ tv_add(&s->now, &tv, &pad->next_chunk);
+ } else if (ret == -EPIPE) {
+ PARA_WARNING_LOG("%s", "EPIPE\n");
snd_pcm_prepare(pad->handle);
- else if (r < 0)
- return -E_ALSA_WRITE;
- if (r > 0) {
- result += r;
- frames -= r;
- d += r * pad->bytes_per_frame;
+ } else if (ret < 0) {
+ t->ret = -E_ALSA_WRITE;
+ return;
}
- }
- return result * pad->bytes_per_frame;
+ if (ret >= 0) {
+ result += ret;
+ frames -= ret;
+ data += ret * pad->bytes_per_frame;
+ }
+// if (ret == -EAGAIN)
+// break;
+// }
+ t->ret = result * pad->bytes_per_frame;
+// PARA_INFO_LOG("ret: %d, frames: %zd\n", t->ret, frames);
}
static void alsa_close(struct writer_node *wn)
free(pad);
}
+__malloc void *alsa_parse_config(char *options)
+{
+ struct alsa_write_args_info *conf
+ = para_calloc(sizeof(struct alsa_write_args_info));
+ PARA_INFO_LOG("options: %s, %d\n", options, strcspn(options, " \t"));
+ int ret = alsa_cmdline_parser_string(options, conf, "alsa_write");
+ if (ret)
+ goto err_out;
+ PARA_INFO_LOG("help given: %d\n", conf->help_given);
+ return conf;
+err_out:
+ free(conf);
+ return NULL;
+}
+
/** the init function of the alsa writer */
void alsa_writer_init(struct writer *w)
{
w->open = alsa_open;
- w->write = alsa_write;
w->close = alsa_close;
+ w->pre_select = alsa_write_pre_select;
+ w->post_select = alsa_write_post_select;
+ w->parse_config = alsa_parse_config;
w->shutdown = NULL; /* nothing to do */
}
/** \file audiod.c the paraslash's audio daemon */
-#include <sys/time.h> /* gettimeofday */
#include "para.h"
#include "audiod.cmdline.h"
#include "list.h"
#include "close_on_fork.h"
+#include "sched.h"
#include "recv.h"
#include "filter.h"
#include "grab_client.cmdline.h"
#include "daemon.h"
#include "string.h"
#include "fd.h"
+#include "write.h"
+#include "write_common.h"
/** define the array of error lists needed by para_audiod */
INIT_AUDIOD_ERRLISTS;
/** defines how to handle one supported audio format */
struct audio_format_info {
-/** pointer to the receiver for this audio format */
+ /** pointer to the receiver for this audio format */
struct receiver *receiver;
-/** the receiver configuration */
+ /** the receiver configuration */
void *receiver_conf;
-/** the number of filters that should be activated for this audio format */
+ /** the number of filters that should be activated for this audio format */
unsigned int num_filters;
-/** pointer to the array of filters to be activated */
+ /** pointer to the array of filters to be activated */
struct filter **filters;
-/** pointer to the array of filter configurations */
+ /** pointer to the array of filter configurations */
void **filter_conf;
-/** output of the last filter is written to stdin of this command */
- char *write_cmd;
-/** do not start receiver/filters/writer before this time */
+ /** the number of filters that should be activated for this audio format */
+ unsigned int num_writers;
+ /** pointer to the array of writers to be activated */
+ struct writer **writers;
+ /** pointer to the array of writer configurations */
+ void **writer_conf;
+ /** do not start receiver/filters/writer before this time */
struct timeval restart_barrier;
};
/**
* describes one instance of a receiver-filter-writer chain
*
- * \sa receier_node, receiver, filter, filter_node, filter_chain_info
+ * \sa receier_node, receiver, filter, filter_node, filter_chain, writer,
+ * writer_node, writer_node_group.
*/
struct slot_info {
-/** number of the audio format in this slot */
+ /** number of the audio format in this slot */
int format;
-/** the file descriptor of the writer */
- int write_fd;
-/** the process id of the writer */
- pid_t wpid;
-/** time of the last successful read from the receiver */
- struct timeval rtime;
-/** time the last write to the write fd happend */
- struct timeval wtime;
-/** writer start time */
+ /** writer start time */
struct timeval wstime;
-/** did we include \a write_fd in the fdset */
- int wcheck;
-/** set to one if we have sent the TERM signal to \a wpid */
- int wkilled;
-/** the receiver info associated with this slot */
+ /** the receiver info associated with this slot */
struct receiver_node *receiver_node;
-/** the active filter chain */
- struct filter_chain_info *fci;
+ /** the active filter chain */
+ struct filter_chain *fc;
+ /** the active writer node group */
+ struct writer_node_group *wng;
};
-
static struct slot_info slot[MAX_STREAM_SLOTS];
-/** defines one command of para_audiod */
-struct audiod_command {
-/** the name of the command */
-const char *name;
-/** pointer to the function that handles the command */
-int (*handler)(int, int, char**);
-int (*line_handler)(int, char*);
-/** one-line description of the command */
-const char *description;
-/** summary of the command line options */
-const char *synopsis;
-/** the long help text */
-const char *help;
-};
-
extern const char *status_item_list[NUM_STAT_ITEMS];
-static int com_grab(int, char *);
-static int com_cycle(int, int, char **);
-static int com_help(int, int, char **);
-static int com_off(int, int, char **);
-static int com_on(int, int, char **);
-static int com_sb(int, int, char **);
-static int com_stat(int, int, char **);
-static int com_term(int, int, char **);
-static int stat_pipe = -1, signal_pipe;
-
static struct gengetopt_args_info conf;
static struct timeval server_stream_start, sa_time_diff;
static int playing, current_decoder = -1,
audiod_status = AUDIOD_ON, offset_seconds, length_seconds,
- sa_time_diff_sign = 1, audiod_socket = -1;
+ sa_time_diff_sign = 1;
static char *af_status, /* the audio format announced in server status */
*socket_name, *hostname;
static char *stat_item_values[NUM_STAT_ITEMS];
static FILE *logfile;
static const struct timeval restart_delay = {0, 300 * 1000};
-
static struct audio_format_info afi[NUM_AUDIO_FORMATS];
+static struct timeval *now;
+
+static struct signal_task signal_task_struct, *sig_task = &signal_task_struct;
+
+/**
+ * the task for handling audiod commands
+ *
+ * \sa struct task, struct sched
+ */
+struct command_task {
+ /** the local listening socket */
+ int fd;
+ /** the associated task structure */
+ struct task task;
+};
+
+/**
+ * the task for audiod's child (para_client stat)
+ *
+ * \sa struct task, struct sched
+ */
+struct status_task {
+ /** the output of the stat command is read from this fd */
+ int fd;
+ /** stat data is stored here */
+ char buf[STRINGSIZE];
+ /** number of bytes loaded in \a buf */
+ unsigned loaded;
+ /** the associated task structure */
+ struct task task;
+};
+static struct status_task status_task_struct, *stat_task = &status_task_struct;
+
+struct signal_task {
+ int fd;
+ int signum;
+ struct task task;
+};
+/** defines one command of para_audiod */
+struct audiod_command {
+ /** the name of the command */
+ const char *name;
+ /** pointer to the function that handles the command */
+ int (*handler)(int, int, char**);
+ /**
+ * if the command prefers to handle the full line (rather than the usual
+ * argv[] array), it stores a pointer to the corresponding line handling
+ * function here. In this case, the above \a handler pointer must be NULL.
+ */
+ int (*line_handler)(int, char*);
+ /** one-line description of the command */
+ const char *description;
+ /** summary of the command line options */
+ const char *synopsis;
+ /** the long help text */
+ const char *help;
+};
+static int com_grab(int, char *);
+static int com_cycle(int, int, char **);
+static int com_help(int, int, char **);
+static int com_off(int, int, char **);
+static int com_on(int, int, char **);
+static int com_sb(int, int, char **);
+static int com_stat(int, int, char **);
+static int com_term(int, int, char **);
static struct audiod_command cmds[] = {
{
.name = "cycle",
};
/** iterate over all slots */
-#define FOR_EACH_SLOT(slot) for (slot = 0; slot < MAX_STREAM_SLOTS; slot++)
+#define FOR_EACH_SLOT(_slot) for (_slot = 0; _slot < MAX_STREAM_SLOTS; _slot++)
/** iterate over all supported audio formats */
#define FOR_EACH_AUDIO_FORMAT(af) for (af = 0; af < NUM_AUDIO_FORMATS; af++)
/** iterate over the array of all audiod commands */
static char *get_time_string(struct timeval *newest_stime)
{
- struct timeval now, diff, adj_stream_start, tmp;
+ struct timeval diff, adj_stream_start, tmp;
int total = 0, use_server_time = 1;
if (!playing) {
use_server_time = 0;
}
}
- gettimeofday(&now, NULL);
- tv_diff(&now, &tmp, &diff);
+ tv_diff(now, &tmp, &diff);
total = diff.tv_sec + offset_seconds;
if (total > length_seconds)
total = length_seconds;
struct timeval *max = NULL;
FOR_EACH_SLOT(i) {
struct slot_info *s = &slot[i];
- if (s->wpid <= 0)
+ if (!s->wng)
continue;
if (max && tv_diff(&s->wstime, max, NULL) <= 0)
continue;
char flag = '0';
if (s->receiver_node)
flag += 1;
- if (s->wpid > 0)
+ if (s->wng)
flag += 2;
decoder_flags[i] = flag;
}
static void setup_signal_handling(void)
{
- signal_pipe = para_signal_init();
- PARA_INFO_LOG("signal pipe: fd %d\n", signal_pipe);
+ sig_task->fd = para_signal_init();
+ PARA_INFO_LOG("signal pipe: fd %d\n", sig_task->fd);
para_install_sighandler(SIGINT);
para_install_sighandler(SIGTERM);
para_install_sighandler(SIGCHLD);
s->format = -1;
}
-static void kill_stream_writer(int slot_num)
-{
- struct slot_info *s = &slot[slot_num];
-
- if (s->format < 0 || s->wkilled || s->wpid <= 0)
- return;
- PARA_DEBUG_LOG("kill -TERM %d (%s stream writer in slot %d)\n",
- s->wpid, audio_formats[s->format], slot_num);
- kill(s->wpid, SIGTERM);
- s->wkilled = 1;
- s->fci->error = 1;
-}
-
-static void set_restart_barrier(int format, struct timeval *now)
-{
- struct timeval tmp;
-
- if (now)
- tmp = *now;
- else
- gettimeofday(&tmp, NULL);
- tv_add(&tmp, &restart_delay, &afi[format].restart_barrier);
-}
-
static void close_receiver(int slot_num)
{
struct slot_info *s = &slot[slot_num];
if (s->format < 0 || !s->receiver_node)
return;
a = &afi[s->format];
- PARA_NOTICE_LOG("closing %s recevier in slot %d\n",
- audio_formats[s->format] , slot_num);
+ PARA_NOTICE_LOG("closing %s receiver in slot %d (eof = %d)\n",
+ audio_formats[s->format] , slot_num, s->receiver_node->eof);
+ if (!s->receiver_node->eof)
+ unregister_task(&s->receiver_node->task);
a->receiver->close(s->receiver_node);
free(s->receiver_node);
s->receiver_node = NULL;
- set_restart_barrier(s->format, NULL);
+ /* set restart barrier */
+ tv_add(now, &restart_delay, &afi[s->format].restart_barrier);
}
static void kill_all_decoders(void)
{
int i;
- FOR_EACH_SLOT(i)
- if (slot[i].format >= 0) {
- PARA_INFO_LOG("stopping decoder in slot %d\n", i);
- kill_stream_writer(i);
- }
-}
-
-static void check_sigchld(void)
-{
- pid_t pid;
- int i;
- struct timeval now;
- gettimeofday(&now, NULL);
-
-reap_next_child:
- pid = para_reap_child();
- if (pid <= 0)
- return;
FOR_EACH_SLOT(i) {
struct slot_info *s = &slot[i];
- long lifetime;
- if (s->format < 0)
- continue;
- if (pid == s->wpid) {
- s->wpid = -1;
- lifetime = now.tv_sec - s->wstime.tv_sec;
- PARA_INFO_LOG("%s stream writer in slot %d died "
- "after %li secs\n",
- audio_formats[s->format], i, lifetime);
- set_restart_barrier(s->format, &now);
- goto reap_next_child;
- }
+ if (s->receiver_node)
+ s->receiver_node->eof = 1;
}
- PARA_CRIT_LOG("para_client died (pid %d)\n", pid);
- goto reap_next_child;
}
static int get_empty_slot(void)
clear_slot(i);
return i;
}
- if (s->write_fd > 0 || s->wpid > 0)
- continue;
- if (s->receiver_node)
- continue;
- if (s->fci)
+ if (s->wng || s->receiver_node || s->fc)
continue;
clear_slot(i);
return i;
s = &slot[i];
if (s->format == format && s->receiver_node)
ret |= 1;
- if (s->format == format && s->wpid > 0)
+ if (s->format == format && s->wng)
ret |= 2;
}
return ret;
{
int i;
- if (stat_pipe < 0)
+ if (stat_task->fd < 0)
return;
PARA_NOTICE_LOG("%s", "closing status pipe\n");
- close(stat_pipe);
- del_close_on_fork_list(stat_pipe);
- stat_pipe = -1;
+ close(stat_task->fd);
+ del_close_on_fork_list(stat_task->fd);
+ stat_task->fd = -1;
kill_all_decoders();
for (i = 0; i < NUM_STAT_ITEMS; i++) {
free(stat_item_values[i]);
offset_seconds = 0;
audiod_status_dump();
playing = 0;
- stat_item_values[SI_STATUS_BAR] = make_message("%s:no connection to para_server\n",
+ stat_item_values[SI_STATUS_BAR] = make_message(
+ "%s:no connection to para_server\n",
status_item_list[SI_STATUS_BAR]);
stat_client_write(stat_item_values[SI_STATUS_BAR], SI_STATUS_BAR);
}
static void __noreturn clean_exit(int status, const char *msg)
{
PARA_EMERG_LOG("%s\n", msg);
- kill_all_decoders();
if (socket_name)
unlink(socket_name);
- if (stat_pipe >= 0)
+ if (stat_task->fd >= 0)
close_stat_pipe();
exit(status);
}
-__malloc static char *glob_cmd(char *cmd)
+/**
+ * get the number of filters
+ *
+ * \param audio_format_num the number identifying the audio format
+ *
+ * \return the number of filters for the given audio format
+ *
+ * \sa struct filter;
+ */
+int num_filters(int audio_format_num)
{
- char *ret, *replacement;
- struct timeval tmp, delay, rss; /* real stream start */
-
- delay.tv_sec = conf.stream_delay_arg / 1000;
- delay.tv_usec = (conf.stream_delay_arg % 1000) * 1000;
-// PARA_INFO_LOG("delay: %lu:%lu\n", delay.tv_sec, delay.tv_usec);
- if (sa_time_diff_sign < 0)
- tv_add(&server_stream_start, &sa_time_diff, &rss);
- else
- tv_diff(&server_stream_start, &sa_time_diff, &rss);
- tv_add(&rss, &delay, &tmp);
- replacement = make_message("%lu:%lu",
- (long unsigned)tmp.tv_sec,
- (long unsigned)tmp.tv_usec);
- ret = s_a_r(cmd, "STREAM_START", replacement);
- free(replacement);
- if (!ret)
- goto out;
- PARA_INFO_LOG("cmd: %s, repl: %s\n", cmd, ret);
-out:
- return ret;
+ return afi[audio_format_num].num_filters;
}
-/** get the number of filters for the given audio format */
-int num_filters(int audio_format_num)
+static void filter_event_handler(struct task *t)
{
- return afi[audio_format_num].num_filters;
+ PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret));
+ unregister_task(t);
}
static void open_filters(int slot_num)
int nf = a->num_filters;
int i;
- s->fci = para_calloc(sizeof(struct filter_chain_info));
- INIT_LIST_HEAD(&s->fci->filters);
+ s->fc = NULL;
if (!nf)
return;
- s->fci->inbuf = s->receiver_node->buf;
- s->fci->in_loaded = &s->receiver_node->loaded;
- s->fci->outbuf = s->receiver_node->buf;
- s->fci->out_loaded = &s->receiver_node->loaded;
- s->fci->eof = &s->receiver_node->eof;
+ PARA_INFO_LOG("opening %s filters\n", audio_formats[s->format]);
+ s->fc = para_calloc(sizeof(struct filter_chain));
+ INIT_LIST_HEAD(&s->fc->filters);
+ s->fc->inbuf = s->receiver_node->buf;
+ s->fc->in_loaded = &s->receiver_node->loaded;
+ s->fc->input_eof = &s->receiver_node->eof;
+
+ s->fc->task.pre_select = filter_pre_select;
+ s->fc->task.event_handler = filter_event_handler;
+ s->fc->task.private_data = s->fc;
+ s->fc->task.flags = 0;
+ s->fc->eof = 0;
+ sprintf(s->fc->task.status, "filter chain");
for (i = 0; i < nf; i++) {
struct filter_node *fn = para_calloc(sizeof(struct filter_node));
fn->conf = a->filter_conf[i];
- fn->fci = s->fci;
+ fn->fc = s->fc;
fn->filter = a->filters[i];
INIT_LIST_HEAD(&fn->callbacks);
- list_add_tail(&fn->node, &s->fci->filters);
+ list_add_tail(&fn->node, &s->fc->filters);
fn->filter->open(fn);
PARA_NOTICE_LOG("%s filter %d/%d (%s) started in slot %d\n",
audio_formats[s->format], i + 1, nf,
fn->filter->name, slot_num);
- s->fci->outbuf = fn->buf;
- s->fci->out_loaded = &fn->loaded;
+ s->fc->outbuf = fn->buf;
+ s->fc->out_loaded = &fn->loaded;
}
- PARA_DEBUG_LOG("output buffer for filter chain %p: %p\n", s->fci,
- s->fci->outbuf);
+ register_task(&s->fc->task);
}
static struct filter_node *find_filter_node(int slot_num, int format, int filternum)
FOR_EACH_SLOT(i) {
struct slot_info *s = &slot[i];
- if (s->format < 0 || !s->fci)
+ if (s->format < 0 || !s->fc)
continue;
if (slot_num >= 0 && slot_num != i)
continue;
continue;
/* success */
j = 1;
- list_for_each_entry(fn, &s->fci->filters, node)
+ list_for_each_entry(fn, &s->fc->filters, node)
if (filternum <= 0 || j++ == filternum)
break;
return fn;
return NULL;
}
-static void start_stream_writer(int slot_num)
+static void wng_event_handler(struct task *t)
{
- int ret, fds[3] = {1, -1, -1};
+ PARA_INFO_LOG("%s\n", PARA_STRERROR(-t->ret));
+ unregister_task(t);
+}
+
+static void open_writers(int slot_num)
+{
+ int ret, i;
struct slot_info *s = &slot[slot_num];
struct audio_format_info *a = &afi[s->format];
- char *glob = NULL;
-
- if (a->write_cmd)
- glob = glob_cmd(a->write_cmd);
- if (!glob)
- glob = para_strdup("para_write -w alsa");
- PARA_INFO_LOG("starting stream writer: %s\n", glob);
- open_filters(slot_num);
- ret = para_exec_cmdline_pid(&s->wpid, glob, fds);
- free(glob);
- if (ret < 0) {
- PARA_ERROR_LOG("exec failed (%d)\n", ret);
- return;
+
+ PARA_INFO_LOG("opening %s writers\n", audio_formats[s->format]);
+ if (!a->num_writers)
+ s->wng = setup_default_wng();
+ else
+ s->wng = wng_new(a->num_writers);
+ if (s->fc) {
+ s->wng->buf = s->fc->outbuf;
+ s->wng->loaded = s->fc->out_loaded;
+ s->wng->input_eof = &s->fc->eof;
+ s->fc->output_eof = &s->wng->eof;
+ } else {
+ s->wng->buf = s->receiver_node->buf;
+ s->wng->loaded = &s->receiver_node->loaded;
+ s->wng->input_eof = &s->receiver_node->eof;
+ }
+ s->wng->task.event_handler = wng_event_handler;
+ for (i = 0; i < a->num_writers; i++) {
+ s->wng->writer_nodes[i].conf = a->writer_conf[i];
+ s->wng->writer_nodes[i].writer = a->writers[i];
+ sprintf(s->wng->writer_nodes[i].task.status, "writer_node");
}
- s->write_fd = fds[0];
- add_close_on_fork_list(s->write_fd);
- /* we write to this fd in do_select, so we need non-blocking */
- mark_fd_nonblock(s->write_fd);
- gettimeofday(&s->wstime, NULL);
+ ret = wng_open(s->wng);
+ s->wstime = *now;
current_decoder = slot_num;
- activate_inactive_grab_clients(slot_num, s->format, &s->fci->filters);
+ activate_inactive_grab_clients(slot_num, s->format, &s->fc->filters);
+}
+
+static void rn_event_handler(struct task *t)
+{
+// struct receiver_node *rn = t->private_data;
+ PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret));
+ unregister_task(t);
}
static void open_receiver(int format)
struct audio_format_info *a = &afi[format];
struct slot_info *s;
int ret, slot_num;
+ struct receiver_node *rn;
slot_num = get_empty_slot();
if (slot_num < 0)
clean_exit(EXIT_FAILURE, PARA_STRERROR(-slot_num));
s = &slot[slot_num];
s->format = format;
- gettimeofday(&s->rtime, NULL);
- s->wtime = s->rtime;
s->receiver_node = para_calloc(sizeof(struct receiver_node));
- s->receiver_node->conf = a->receiver_conf;
+ rn = s->receiver_node;
+ rn->receiver = a->receiver;
+ rn->conf = a->receiver_conf;
ret = a->receiver->open(s->receiver_node);
if (ret < 0) {
PARA_ERROR_LOG("failed to open receiver (%s)\n",
}
PARA_NOTICE_LOG("started %s: %s receiver in slot %d\n",
audio_formats[s->format], a->receiver->name, slot_num);
+ rn->task.private_data = s->receiver_node;
+ rn->task.pre_select = a->receiver->pre_select;
+ rn->task.post_select = a->receiver->post_select;
+ rn->task.event_handler = rn_event_handler;
+ rn->task.flags = 0;
+ sprintf(rn->task.status, "receiver node");
+ register_task(&rn->task);
}
static int is_frozen(int format)
{
- struct timeval now;
struct audio_format_info *a = &afi[format];
- gettimeofday(&now, NULL);
- return (tv_diff(&now, &a->restart_barrier, NULL) > 0)? 0 : 1;
+ return (tv_diff(now, &a->restart_barrier, NULL) > 0)? 0 : 1;
}
-static void start_current_receiver(void)
+static void open_current_receiver(void)
{
int i;
i = get_audio_format_num(af_status);
if (i < 0)
return;
- if ((decoder_running(i) & 1) || is_frozen(i))
+ if (decoder_running(i) || is_frozen(i))
return;
open_receiver(i);
}
static void compute_time_diff(const struct timeval *status_time)
{
- struct timeval now, tmp, diff;
+ struct timeval tmp, diff;
static int count;
int sign;
const struct timeval max_deviation = {0, 500 * 1000};
const int time_smooth = 5;
- gettimeofday(&now, NULL);
- sign = tv_diff(status_time, &now, &diff);
+ sign = tv_diff(status_time, now, &diff);
// PARA_NOTICE_LOG("%s: sign = %i, sa_time_diff_sign = %i\n", __func__,
// sign, sa_time_diff_sign);
if (!count) {
{
switch (sig) {
case SIGCHLD:
- return check_sigchld();
+ for (;;) {
+ pid_t pid = para_reap_child();
+ if (pid <= 0)
+ return;
+ PARA_CRIT_LOG("para_client died (pid %d)\n", pid);
+ }
case SIGINT:
case SIGTERM:
case SIGHUP:
}
}
-static void check_timeouts(void)
-{
- struct timeval now;
- int slot_num, timeout = conf.stream_timeout_arg;
-
- gettimeofday(&now, NULL);
- FOR_EACH_SLOT(slot_num) {
- struct slot_info *s = &slot[slot_num];
- if (s->format < 0)
- continue;
- /* check read time */
- if (s->receiver_node &&
- now.tv_sec > s->rtime.tv_sec + timeout) {
- PARA_INFO_LOG("%s input buffer (slot %d) not ready\n",
- audio_formats[s->format], slot_num);
- if (s->fci)
- s->fci->error = 42;
- else
- close_receiver(slot_num);
- }
- /* check write time */
- if (s->wpid > 0 && !s->wkilled &&
- now.tv_sec > s->wtime.tv_sec + timeout) {
- PARA_INFO_LOG("%s output buffer (slot %d) not ready\n",
- audio_formats[s->format], slot_num);
- if (s->fci)
- s->fci->error = 42;
- }
- }
-}
-
-static size_t get_loaded_bytes(int slot_num)
+static void try_to_close_slot(int slot_num)
{
- size_t loaded = 0;
struct slot_info *s = &slot[slot_num];
- struct receiver_node *rn = s->receiver_node;
-
- if (s->format < 0)
- goto out;
-
- if (afi[s->format].num_filters) {
- if (s->fci)
- loaded = *s->fci->out_loaded;
- } else {
- if (rn)
- loaded = rn->loaded;
- }
-out:
- return loaded;
-}
-
-
-static void close_decoder_if_idle(int slot_num)
-{
- struct slot_info *s = &slot[slot_num];
- struct receiver_node *rn = s->receiver_node;
if (s->format < 0)
return;
- if (!s->fci)
+ if (s->receiver_node && !s->receiver_node->eof)
return;
- if (!rn->eof && !s->fci->error && s->wpid > 0)
+ if (s->fc && !s->fc->eof)
return;
- if (!s->fci->error && s->wpid > 0) { /* eof */
- if (filter_io(s->fci) > 0)
- return;
- if (get_loaded_bytes(slot_num))
- return;
- }
- if (s->write_fd > 0) {
- PARA_INFO_LOG("err: %d\n", s->fci->error);
- PARA_INFO_LOG("slot %d: closing write fd %d\n", slot_num,
- s->write_fd);
- close(s->write_fd);
- del_close_on_fork_list(s->write_fd);
- s->write_fd = -1;
- }
- if (s->wpid > 0)
- return; /* wait until writer dies before closing filters */
- PARA_INFO_LOG("closing all filters in slot %d (filter_chain %p)\n",
- slot_num, s->fci);
- close_filters(s->fci);
- free(s->fci);
+ if (s->wng && !s->wng->eof)
+ return;
+ PARA_INFO_LOG("closing slot %d \n", slot_num);
+ wng_close(s->wng);
+ wng_destroy(s->wng);
+ close_filters(s->fc);
+ free(s->fc);
close_receiver(slot_num);
clear_slot(slot_num);
}
-static void set_stream_fds(fd_set *wfds, int *max_fileno)
+static void audiod_pre_select(struct sched *s, __a_unused struct task *t)
{
int i;
- check_timeouts();
+ now = &s->now;
+ if (audiod_status != AUDIOD_ON)
+ kill_all_decoders();
+ else if (playing)
+ open_current_receiver();
FOR_EACH_SLOT(i) {
- struct slot_info *s = &slot[i];
- struct audio_format_info *a;
struct receiver_node *rn;
- close_decoder_if_idle(i);
- s->wcheck = 0;
- if (s->format < 0)
+ try_to_close_slot(i);
+ if (slot[i].format < 0)
continue;
- a = &afi[s->format];
- rn = s->receiver_node;
- if (rn && rn->loaded && !s->wpid) {
- PARA_INFO_LOG("no writer in slot %d\n", i);
- start_stream_writer(i);
+ rn = slot[i].receiver_node;
+ if (rn && rn->loaded && !slot[i].wng) {
+ open_filters(i);
+ open_writers(i);
}
- if (s->write_fd <= 0)
- continue;
- if (!get_loaded_bytes(i))
- continue;
- para_fd_set(s->write_fd, wfds, max_fileno);
- s->wcheck = 1;
}
}
-static int write_audio_data(int slot_num)
+static void audiod_post_select(struct sched *s, __a_unused struct task *t)
{
- struct slot_info *s = &slot[slot_num];
- struct audio_format_info *a = &afi[s->format];
- struct receiver_node *rn = s->receiver_node;
- int rv;
- char **buf;
- size_t *len;
-
- if (a->num_filters) {
- buf = &s->fci->outbuf;
- len = s->fci->out_loaded;
- } else {
- buf = &rn->buf;
- len = &rn->loaded;
- }
- PARA_DEBUG_LOG("writing %p (%zd bytes)\n", *buf, *len);
- rv = write(s->write_fd, *buf, *len);
- PARA_DEBUG_LOG("wrote %d/%zd\n", rv, *len);
- if (rv < 0) {
- PARA_WARNING_LOG("write error in slot %d (fd %d): %s\n",
- slot_num, s->write_fd, strerror(errno));
- *len = 0;
- s->fci->error = E_WRITE_AUDIO_DATA;
- } else if (rv != *len) {
- PARA_DEBUG_LOG("partial %s write (%i/%zd) for slot %d\n",
- audio_formats[s->format], rv, *len, slot_num);
- *len -= rv;
- memmove(*buf, *buf + rv, *len);
- } else
- *len = 0;
- if (rv > 0)
- gettimeofday(&s->wtime, NULL);
- return rv;
+ /* only save away the current time for other users */
+ now = &s->now;
}
-static void slot_io(fd_set *wfds)
+static void init_audiod_task(struct task *t)
{
- int ret, i;
-
- FOR_EACH_SLOT(i) {
- struct slot_info *s = &slot[i];
- struct receiver_node *rn = s->receiver_node;
-
- if (rn && rn->loaded)
- gettimeofday(&s->rtime, NULL);
- if (s->format >= 0 && s->write_fd > 0 && s->fci) {
- ret = filter_io(s->fci);
- if (ret < 0)
- s->fci->error = -ret;
-// PARA_DEBUG_LOG("slot %d, filter io %d bytes, check write: %d, loaded: %d/%d, eof: %d\n",
-// i, ret, s->wcheck, rn->loaded, *s->fci->out_loaded, rn->eof);
- }
- if (s->write_fd <= 0 || !s->wcheck || !FD_ISSET(s->write_fd, wfds))
- continue;
- write_audio_data(i);
- }
+ t->pre_select = audiod_pre_select;
+ t->post_select = audiod_post_select;
+ t->event_handler = NULL;
+ t->private_data = t;
+ t->flags = 0;
+ sprintf(t->status, "audiod task");
}
static int parse_stream_command(const char *txt, char **cmd)
return filter_num;
}
-static int setup_default_filters(void)
+static int init_writers(void)
{
- int i, ret = 1;
+ int i, ret, nw;
+ char *cmd;
+ struct audio_format_info *a;
+ init_supported_writers();
+ nw = PARA_MAX(1, conf.writer_given);
+ PARA_INFO_LOG("maximal number of writers: %d\n", nw);
FOR_EACH_AUDIO_FORMAT(i) {
- struct audio_format_info *a = &afi[i];
- char *tmp;
- int j;
- if (a->num_filters)
- continue;
- /* add "dec" to audio format name */
- tmp = make_message("%sdec", audio_formats[i]);
- for (j = 0; filters[j].name; j++)
- if (!strcmp(tmp, filters[j].name))
- break;
- free(tmp);
- ret = -E_UNSUPPORTED_FILTER;
- if (!filters[j].name)
- goto out;
- tmp = para_strdup(filters[j].name);
- ret = add_filter(i, tmp);
- free(tmp);
+ a = &afi[i];
+ a->writer_conf = para_malloc(nw * sizeof(void *));
+ a->writers = para_malloc(nw * sizeof(struct writer *));
+ a->num_writers = 0;
+ }
+ for (i = 0; i < conf.writer_given; i++) {
+ void *wconf;
+ int writer_num;
+ ret = parse_stream_command(conf.writer_arg[i], &cmd);
if (ret < 0)
goto out;
- PARA_INFO_LOG("%s -> default filter: %s\n", audio_formats[i],
- filters[j].name);
- ret = add_filter(i, "wav");
- if (ret < 0)
+ a = &afi[ret];
+ nw = a->num_writers;
+ wconf = check_writer_arg(cmd, &writer_num);
+ if (!wconf) {
+ ret = writer_num;
goto out;
- PARA_INFO_LOG("%s -> default filter: wav\n", audio_formats[i]);
+ }
+ a->writers[nw] = &writers[writer_num];
+ a->writer_conf[nw] = wconf;
+ PARA_INFO_LOG("%s writer #%d: %s\n", audio_formats[ret],
+ nw, writer_names[writer_num]);
+ a->num_writers++;
}
+ ret = 1;
out:
return ret;
}
-static int init_stream_io(void)
+static int init_receivers(void)
{
- int i, ret, receiver_num, nf;
- char *cmd;
+ int i, ret, receiver_num;
+ char *cmd = NULL;
+ struct audio_format_info *a;
- for (i = 0; i < conf.stream_write_cmd_given; i++) {
- ret = parse_stream_command(conf.stream_write_cmd_arg[i], &cmd);
- if (ret < 0)
- goto out;
- afi[ret].write_cmd = para_strdup(cmd);
- PARA_INFO_LOG("%s write command: %s\n", audio_formats[ret], afi[ret].write_cmd);
- }
for (i = 0; receivers[i].name; i++) {
PARA_INFO_LOG("initializing %s receiver\n", receivers[i].name);
receivers[i].init(&receivers[i]);
*/
cmd = para_strdup(receivers[0].name);
FOR_EACH_AUDIO_FORMAT(i) {
- struct audio_format_info *a = &afi[i];
+ a = &afi[i];
if (a->receiver_conf)
continue;
a->receiver_conf = check_receiver_arg(cmd, &receiver_num);
return -E_RECV_SYNTAX;
a->receiver = &receivers[receiver_num];
}
+ ret = 1;
+out:
free(cmd);
- /* filters */
+ return ret;
+}
+
+static int init_default_filters(void)
+{
+ int i, ret = 1;
+
+ FOR_EACH_AUDIO_FORMAT(i) {
+ struct audio_format_info *a = &afi[i];
+ char *tmp;
+ int j;
+
+ if (a->num_filters)
+ continue; /* no default -- nothing to to */
+ /* add "dec" to audio format name */
+ tmp = make_message("%sdec", audio_formats[i]);
+ for (j = 0; filters[j].name; j++)
+ if (!strcmp(tmp, filters[j].name))
+ break;
+ free(tmp);
+ ret = -E_UNSUPPORTED_FILTER;
+ if (!filters[j].name)
+ goto out;
+ tmp = para_strdup(filters[j].name);
+ ret = add_filter(i, tmp);
+ free(tmp);
+ if (ret < 0)
+ goto out;
+ PARA_INFO_LOG("%s -> default filter: %s\n", audio_formats[i],
+ filters[j].name);
+ }
+out:
+ return ret;
+}
+
+static int init_filters(void)
+{
+ int i, ret, nf;
+
filter_init(filters);
- nf = PARA_MAX(2, conf.filter_given) + 1;
- PARA_INFO_LOG("allocating space for %d filters\n", nf);
+ nf = PARA_MAX(1, conf.filter_given);
+ PARA_INFO_LOG("maximal number of filters: %d\n", nf);
FOR_EACH_AUDIO_FORMAT(i) {
- afi[i].filter_conf = para_malloc(nf * sizeof(char *));
+ afi[i].filter_conf = para_malloc(nf * sizeof(void *));
afi[i].filters = para_malloc(nf * sizeof(struct filter *));
}
if (!conf.no_default_filters_given)
- return setup_default_filters();
+ return init_default_filters();
for (i = 0; i < conf.filter_given; i++) {
char *arg = conf.filter_arg[i];
char *filter_name = strchr(arg, ':');
if (ret < 0)
goto out;
}
- ret = 1;
+ ret = init_default_filters(); /* use default values for the rest */
out:
return ret;
}
+static int init_stream_io(void)
+{
+ int ret;
+
+ ret = init_writers();
+ if (ret < 0)
+ return ret;
+ ret = init_receivers();
+ if (ret < 0)
+ return ret;
+ ret = init_filters();
+ if (ret < 0)
+ return ret;
+ return 1;
+}
+
static int dump_commands(int fd)
{
char *buf = para_strdup(""), *tmp = NULL;
return ret;
}
-#if 0
-static char *list_filters(void)
-{
- int i, j;
- char *tmp, *msg = make_message("format\tnum\tcmd\n");
-
- FOR_EACH_AUDIO_FORMAT(i) {
- for (j = 0; j < afi[i].num_filters; j++) {
- tmp = make_message("%s\t%i\t%s\n",
- afi[i].name, j, afi[i].filter_cmds[j]);
- msg = para_strcat(msg, tmp);
- free(tmp);
- }
- tmp = make_message("%s\t%i\t%s\n", afi[i].name,
- j, afi[i].write_cmd);
- msg = para_strcat(msg, tmp);
- free(tmp);
- }
- return msg;
-}
-#endif
-
-
static int com_grab(int fd, char *cmdline)
{
struct grab_client *gc;
return -E_UCRED_PERM;
}
-static int handle_connect(void)
+static int handle_connect(int accept_fd)
{
int i, argc, ret, clifd = -1;
char *cmd = NULL, *p, *buf = para_calloc(MAXLINE), **argv = NULL;
struct sockaddr_un unix_addr;
- ret = para_accept(audiod_socket, &unix_addr, sizeof(struct sockaddr_un));
+ ret = para_accept(accept_fd, &unix_addr, sizeof(struct sockaddr_un));
if (ret < 0)
goto out;
clifd = ret;
return ret;
}
-static void audiod_get_socket(void)
+static int audiod_get_socket(void)
{
struct sockaddr_un unix_addr;
+ int fd;
if (conf.socket_given)
socket_name = para_strdup(conf.socket_arg);
hn);
free(hn);
}
- PARA_NOTICE_LOG("connecting to local socket %s\n", socket_name);
+ PARA_NOTICE_LOG("local socket: %s\n", socket_name);
if (conf.force_given)
unlink(socket_name);
- audiod_socket = create_pf_socket(socket_name, &unix_addr,
+ fd = create_pf_socket(socket_name, &unix_addr,
S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IWOTH);
- if (audiod_socket < 0) {
+ if (fd < 0) {
PARA_EMERG_LOG("%s", "can not connect to socket\n");
exit(EXIT_FAILURE); /* do not unlink socket */
}
- if (listen(audiod_socket, 5) < 0) {
+ if (listen(fd , 5) < 0) {
PARA_EMERG_LOG("%s", "can not listen on socket\n");
exit(EXIT_FAILURE); /* do not unlink socket */
}
- add_close_on_fork_list(audiod_socket);
+ add_close_on_fork_list(fd);
+ return fd;
}
static int open_stat_pipe(void)
return ret;
}
-static void audiod_pre_select(fd_set *rfds, fd_set *wfds, struct timeval *tv,
- int *max_fileno)
+void signal_event_handler(struct task *t)
{
- int i, ret;
+ struct signal_task *st = t->private_data;
+ handle_signal(st->signum);
+}
- FOR_EACH_SLOT(i) {
- struct slot_info *s = &slot[i];
- struct audio_format_info *a;
- struct receiver_node *rn = s->receiver_node;
- if (s->format < 0 || !rn)
- continue;
- a = &afi[s->format];
- ret = a->receiver->pre_select(rn, rfds, wfds, tv);
-// PARA_NOTICE_LOG("%s preselect: %d\n", a->receiver->name, ret);
- *max_fileno = PARA_MAX(*max_fileno, ret);
- }
+void signal_pre_select(struct sched *s, struct task *t)
+{
+ struct signal_task *st = t->private_data;
+ t->ret = 1;
+ para_fd_set(st->fd, &s->rfds, &s->max_fileno);
}
-static void audiod_post_select(int select_ret, fd_set *rfds, fd_set *wfds)
+
+void signal_post_select(struct sched *s, struct task *t)
{
- int i, ret;
+ struct signal_task *st = t->private_data;
+ t->ret = 1;
+ if (!FD_ISSET(st->fd, &s->rfds))
+ return;
+ t->ret = -E_SIGNAL_CAUGHT;
+ st->signum = para_next_signal();
+}
- FOR_EACH_SLOT(i) {
- struct slot_info *s = &slot[i];
- struct audio_format_info *a;
- struct receiver_node *rn = s->receiver_node;
- if (s->format < 0 || !rn || rn->eof)
- continue;
- a = &afi[s->format];
- ret = a->receiver->post_select(rn, select_ret, rfds, wfds);
- if (ret <= 0) {
- if (ret)
- PARA_ERROR_LOG("%s post select failed: %s (slot %d)\n",
- a->receiver->name, PARA_STRERROR(-ret), i);
- else
- PARA_INFO_LOG("eof in slot %d\n", i);
- rn->eof = 1;
- }
- if (ret < 0 && s->fci)
- s->fci->error = ret;
- }
+void signal_setup_default(struct signal_task *st)
+{
+ st->task.pre_select = signal_pre_select;
+ st->task.post_select = signal_post_select;
+ st->task.private_data = st;
+ st->task.flags = 0;
+ sprintf(st->task.status, "signal task");
}
-/* TODO: move everything before the select call to pre_select() */
-static void __noreturn audiod_mainloop(void)
+static void command_pre_select(struct sched *s, struct task *t)
{
- fd_set rfds, wfds;
- int ret, max_fileno, sbo = 0;
- char status_buf[STRINGSIZE] = "";
- struct timeval tv;
-repeat:
- FD_ZERO(&wfds);
- FD_ZERO(&rfds);
- max_fileno = -1;
- /* always check signal pipe and the local socket */
- para_fd_set(signal_pipe, &rfds, &max_fileno);
- para_fd_set(audiod_socket, &rfds, &max_fileno);
+ struct command_task *ct = t->private_data;
+ para_fd_set(ct->fd, &s->rfds, &s->max_fileno);
- if (audiod_status != AUDIOD_ON)
- kill_all_decoders();
- else if (playing)
- start_current_receiver();
+}
+
+static void command_post_select(struct sched *s, struct task *t)
+{
+ int ret;
+ struct command_task *ct = t->private_data;
- set_stream_fds(&wfds, &max_fileno);
- /* status pipe */
- if (stat_pipe >= 0 && audiod_status == AUDIOD_OFF)
- close_stat_pipe();
- if (stat_pipe < 0 && audiod_status != AUDIOD_OFF) {
- stat_pipe = open_stat_pipe();
- sbo = 0;
- status_buf[0] = '\0';
- }
- if (stat_pipe >= 0 && audiod_status != AUDIOD_OFF)
- para_fd_set(stat_pipe, &rfds, &max_fileno);
- /* local socket */
- tv.tv_sec = 0;
- tv.tv_usec = 200 * 1000;
- audiod_pre_select(&rfds, &wfds, &tv, &max_fileno);
- ret = para_select(max_fileno + 1, &rfds, &wfds, &tv);
- if (ret < 0)
- goto repeat;
if (audiod_status != AUDIOD_OFF)
audiod_status_dump();
- audiod_post_select(ret, &rfds, &wfds);
- /* read status pipe */
- if (stat_pipe >=0 && FD_ISSET(stat_pipe, &rfds)) {
- ret = read(stat_pipe, status_buf + sbo, STRINGSIZE - 1 - sbo);
- if (ret <= 0) {
- close_stat_pipe();
- /* avoid busy loop if server is down */
- while (sleep(1) > 0)
- ; /* try again*/
- } else {
- status_buf[ret + sbo] = '\0';
- sbo = for_each_line(status_buf, ret + sbo,
- &check_stat_line);
- }
- }
- slot_io(&wfds);
- if (FD_ISSET(audiod_socket, &rfds)) {
- ret = handle_connect();
- if (ret < 0) {
- PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret));
- }
+ t->ret = 1; /* always successful */
+ if (!FD_ISSET(ct->fd, &s->rfds))
+ return;
+ ret = handle_connect(ct->fd);
+ if (ret < 0)
+ PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret));
+}
+
+static void init_command_task(struct command_task *ct)
+{
+ ct->task.pre_select = command_pre_select;
+ ct->task.post_select = command_post_select;
+ ct->task.event_handler = NULL;
+ ct->task.private_data = ct;
+ ct->task.flags = 0;
+ ct->fd = audiod_get_socket(); /* doesn't return on errors */
+ sprintf(ct->task.status, "command task");
+}
+
+static void status_pre_select(struct sched *s, struct task *t)
+{
+ struct status_task *st = t->private_data;
+ t->ret = 1;
+ if (st->fd >= 0 && audiod_status == AUDIOD_OFF)
+ close_stat_pipe();
+ if (st->fd < 0 && audiod_status != AUDIOD_OFF) {
+ st->fd = open_stat_pipe();
+ st->loaded = 0;
+ st->buf[0] = '\0';
}
- /* signals */
- if (FD_ISSET(signal_pipe, &rfds)) {
- int sig_nr = para_next_signal();
- if (sig_nr > 0)
- handle_signal(sig_nr);
+ if (st->fd >= 0 && audiod_status != AUDIOD_OFF)
+ para_fd_set(st->fd, &s->rfds, &s->max_fileno);
+}
+
+static void status_post_select(struct sched *s, struct task *t)
+{
+ struct status_task *st = t->private_data;
+ int ret;
+
+ t->ret = 1;
+ if (st->fd < 0 || !FD_ISSET(st->fd, &s->rfds))
+ return;
+ ret = read(st->fd, st->buf + st->loaded,
+ STRINGSIZE - 1 - st->loaded);
+ if (ret <= 0) {
+ close_stat_pipe();
+ /* avoid busy loop if server is down */
+ while (sleep(1) > 0) /* FIXME */
+ ; /* try again*/
+ } else {
+ st->buf[ret + st->loaded] = '\0';
+ st->loaded = for_each_line(st->buf, ret + st->loaded,
+ &check_stat_line);
}
- goto repeat;
+}
+
+static void init_status_task(struct status_task *st)
+{
+ st->task.pre_select = status_pre_select;
+ st->task.post_select = status_post_select;
+ st->task.private_data = st;
+ st->task.flags = 0;
+ st->loaded = 0;
+ st->fd = -1;
+ st->buf[0] = '\0';
+ sprintf(st->task.status, "status task");
}
static void set_initial_status(void)
PARA_WARNING_LOG("%s", "invalid mode\n");
}
-int __noreturn main(int argc, char *argv[])
+int main(int argc, char *argv[])
{
char *cf;
- int i;
+ int ret, i;
+ struct sched s;
+ struct command_task command_task_struct, *cmd_task = &command_task_struct;
+ struct task audiod_task_struct, *audiod_task = &audiod_task_struct;
+
+ init_sched();
valid_fd_012();
hostname = para_hostname();
clear_slot(i);
init_grabbing();
setup_signal_handling();
+ signal_setup_default(sig_task);
+ sig_task->task.event_handler = signal_event_handler;
+
+ init_status_task(stat_task);
+ init_command_task(cmd_task);
+ init_audiod_task(audiod_task);
+
if (conf.daemon_given)
daemon_init();
- audiod_get_socket(); /* doesn't return on errors */
- audiod_mainloop();
+
+ register_task(&sig_task->task);
+ register_task(&cmd_task->task);
+ register_task(&stat_task->task);
+ register_task(audiod_task);
+ s.default_timeout.tv_sec = 0;
+ s.default_timeout.tv_usec = 99 * 1000;
+ ret = sched(&s);
+
+ PARA_EMERG_LOG("%s\n", PARA_STRERROR(-ret));
+ return EXIT_FAILURE;
}
string typestr="filter_spec" optional multiple
-option "stream_write_cmd" w
+option "writer" w
#~~~~~~~~~~~~~~~~~~~~~~~~~~
"Specify stream writer.
-May be given multiple times, once for each
-supported audio format. Default value is
-'para_write -w alsa' for both mp3 and ogg.
-You can use the START_TIME() macro for these
-commands. Each occurence of START_TIME()
-gets replaced at runtime by the stream start
-time announced by para_server, plus any
-offsets."
+May be given multiple times, even multiple
+times for the same audio format. Default
+value is 'alsa' for all supported audio
+formats. Example:
- string typestr="format:command"
+ -w 'aac:osx'
+
+"
+ string typestr="writer_spec"
optional
multiple
"Time to add to para_server's start_time.
-Amount of time to be added to the server
-stream start time for stream_write_cmd if
-START_TIME() was given. Useful for
-syncronizing the audio output of clients."
+Amount of time to be added to the server, before data is sent to
+the writer. Useful for syncronizing the audio output of clients."
int typestr="milliseconds"
default="200"
#include "para.h"
#include "compress_filter.cmdline.h"
#include "list.h"
+#include "sched.h"
#include "filter.h"
#include "string.h"
recv_cmdline_objs="recv.cmdline http_recv.cmdline dccp_recv.cmdline"
recv_errlist_objs="http_recv recv_common recv time string net dccp_recv
- dccp fd"
+ dccp fd sched stdout"
recv_ldflags=""
filter_cmdline_objs="filter.cmdline compress_filter.cmdline"
-filter_errlist_objs="filter_chain wav compress filter string"
+filter_errlist_objs="filter_chain wav compress filter string stdin stdout sched fd"
filter_ldflags=""
audiod_cmdline_objs="audiod.cmdline grab_client.cmdline compress_filter.cmdline
- http_recv.cmdline dccp_recv.cmdline"
+ http_recv.cmdline dccp_recv.cmdline file_write.cmdline"
audiod_errlist_objs="audiod exec close_on_fork signal string daemon stat net
- time grab_client filter_chain wav compress http_recv dccp dccp_recv recv_common fd"
+ time grab_client filter_chain wav compress http_recv dccp dccp_recv
+ recv_common fd sched write_common file_writer"
audiod_ldflags=""
server_cmdline_objs="server.cmdline"
ipc dccp dccp_send fd"
server_ldflags=""
-write_cmdline_objs="write.cmdline"
-write_errlist_objs="write write_common file_writer time fd string"
+write_cmdline_objs="write.cmdline file_write.cmdline"
+write_errlist_objs="write write_common file_writer time fd string sched stdin"
write_ldflags=""
write_writers="file"
fi
########################################################################### alsa
have_alsa="yes"
-msg="=> no alsa support for para_write"
+msg="=> no alsa support for para_audiod/para_write"
AC_CHECK_HEADERS([alsa/asoundlib.h], [], [
AC_MSG_WARN([no alsa/asoundlib $msg])
have_alsa="no"
have_alsa="no"
])
if test "$have_alsa" = "yes"; then
+ audiod_errlist_objs="$audiod_errlist_objs alsa_writer"
+ audiod_cmdline_objs="$audiod_cmdline_objs alsa_write.cmdline"
+ audiod_ldflags="$audiod_ldflags -lasound"
+
write_errlist_objs="$write_errlist_objs alsa_writer"
+ write_cmdline_objs="$write_cmdline_objs alsa_write.cmdline"
write_ldflags="$write_ldflags -lasound"
write_writers="$write_writers alsa"
fi
#include "para.h"
#include "error.h"
#include "dccp.h"
+#include "list.h"
+#include "sched.h"
#include "recv.h"
#include "string.h"
#include "net.h"
+#include "fd.h"
#include "dccp_recv.cmdline.h"
return NULL;
}
-static int dccp_recv_pre_select(struct receiver_node *rn, fd_set *rfds,
- __a_unused fd_set *wfds, __a_unused struct timeval *timeout)
+static void dccp_recv_pre_select(struct sched *s, struct task *t)
{
- struct private_dccp_recv_data *pdd = rn->private_data;
+ struct private_dccp_recv_data *pdd = t->private_data;
if (!pdd)
- return -1;
- FD_SET(pdd->fd, rfds);
- return pdd->fd;
+ return ;
+ para_fd_set(pdd->fd, &s->rfds, &s->max_fileno);
}
-static int dccp_recv_post_select(struct receiver_node *rn, int select_ret,
- fd_set *rfds, __a_unused fd_set *wfds)
+static void dccp_recv_post_select(struct sched *s, struct task *t)
{
- int ret;
+ struct receiver_node *rn = t->private_data;
struct private_dccp_recv_data *pdd = rn->private_data;
- if (!select_ret || !pdd || !FD_ISSET(pdd->fd, rfds))
- return 1; /* nothing to do */
+ t->ret = 1;
+ if (!s->select_ret || !pdd || !FD_ISSET(pdd->fd, &s->rfds))
+ return; /* nothing to do */
+ t->ret = -E_DCCP_OVERRUN;
if (rn->loaded >= DCCP_BUFSIZE)
- return -E_DCCP_OVERRUN;
- ret = recv_bin_buffer(pdd->fd, rn->buf + rn->loaded,
+ return;
+ t->ret = recv_bin_buffer(pdd->fd, rn->buf + rn->loaded,
DCCP_BUFSIZE - rn->loaded);
- if (ret <= 0) {
- PARA_INFO_LOG("%s\n", ret? PARA_STRERROR(-ret) : "eof");
- return ret;
+ if (t->ret <= 0) {
+ rn->eof = 1;
+ if (!t->ret)
+ t->ret = -E_DCCP_RECV_EOF;
+ return;
}
- rn->loaded += ret;
- return 1;
+ rn->loaded += t->ret;
}
/**
/** \cond list of all subsystems that support the shiny error facility */
enum para_subsystem {
+ SS_SCHED,
SS_GUI,
SS_TIME,
SS_WAV,
SS_ORTP_RECV,
SS_AUDIOD,
SS_EXEC,
+ SS_STDIN,
+ SS_STDOUT,
SS_SIGNAL,
SS_STRING,
SS_STAT,
#define ORTP_SEND_ERRORS
#define GUI_ERRORS
#define RINGBUFFER_ERRORS
+#define SCHED_ERRORS
extern const char **para_errlist[];
/** \endcond */
+#define STDIN_ERRORS \
+ PARA_ERROR(STDIN_READ, "failed to read from stdin"), \
+ PARA_ERROR(STDIN_EOF, "end of file"), \
+
+
+#define STDOUT_ERRORS \
+ PARA_ERROR(STDOUT_WRITE, "failed to write to stdout"), \
+ PARA_ERROR(STDOUT_EOF, "end of file"), \
+
+
#define NET_ERRORS \
PARA_ERROR(SEND, "send error"), \
PARA_ERROR(RECV, "receive error"), \
PARA_ERROR(TOO_MANY_BAD_CHUNKS, "too many consecutive bad chunks"), \
PARA_ERROR(INVALID_HEADER, "invalid header packet"), \
PARA_ERROR(OVERRUN, "outout buffer overrun"), \
+ PARA_ERROR(ORTP_RECV_EOF, "ortp_recv: end of file"), \
#define HTTP_RECV_ERRORS \
PARA_ERROR(SEND_HTTP_REQUEST, "failed to send http request"), \
PARA_ERROR(MISSING_OK, "did not receive OK message from peer"), \
- PARA_ERROR(HTTP_RECV_BUF, "did not receive buffer")
+ PARA_ERROR(HTTP_RECV_BUF, "did not receive buffer"), \
+ PARA_ERROR(HTTP_RECV_EOF, "http_recv: end of file"), \
#define RECV_ERRORS \
#define FILTER_CHAIN_ERRORS \
PARA_ERROR(UNSUPPORTED_FILTER, "given filter not supported"), \
PARA_ERROR(BAD_FILTER_OPTIONS, "invalid filter option given"), \
+ PARA_ERROR(FC_EOF, "filter chain: eof"), \
#define STAT_ERRORS \
PARA_ERROR(SIGNAL_READ, "read error from signal pipe"), \
PARA_ERROR(WAITPID, "waitpid error"), \
PARA_ERROR(SIGNAL_PIPE, "failed to setup signal pipe"), \
+ PARA_ERROR(SIGNAL_CAUGHT, "caught signal"), \
#define STRING_ERRORS \
#define AAC_COMMON_ERRORS \
- PARA_ERROR(AAC_BUF, "invalid buffer"), \
PARA_ERROR(ESDS, "did not find esds atom"), \
PARA_ERROR(STCO, "did not find stco atom"), \
PARA_ERROR(DCCP_SOCKET, "can not create dccp socket"), \
PARA_ERROR(DCCP_PACKET_SIZE, "failed to set dccp packet size"), \
PARA_ERROR(DCCP_SERVICE, "could not get service code"), \
+ PARA_ERROR(DCCP_RECV_EOF, "dccp_recv: end of file"), \
#define DCCP_RECV_ERRORS \
PARA_ERROR(DCCP_OVERRUN, "dccp output buffer buffer overrun"), \
PARA_ERROR(DCCP_CONNECT, "dccp connect error"), \
+
#define DCCP_SEND_ERRORS \
PARA_ERROR(DCCP_BIND, "dccp bind error"), \
PARA_ERROR(DCCP_LISTEN, "dccp listen error"), \
PARA_ERROR(DCCP_WRITE, "dccp write error"), \
+
#define FD_ERRORS \
PARA_ERROR(F_GETFL, "failed to get fd flags"), \
PARA_ERROR(F_SETFL, "failed to set fd flags"), \
#define WRITE_ERRORS \
- PARA_ERROR(READ_HDR, "failed to read audio file header"), \
- PARA_ERROR(READ_STDIN, "failed to read from stdin"), \
PARA_ERROR(WRITE_SYNTAX, "para_write syntax error"), \
- PARA_ERROR(WRITE_OVERRUN, "buffer overrun"), \
PARA_ERROR(PREMATURE_END, "premature end of audio file"), \
+ PARA_ERROR(NO_WAV_HEADER, "wave header not found"), \
+ PARA_ERROR(WAV_HEADER_SUCCESS, "successfully read wave header"), \
+ PARA_ERROR(NO_DELAY, "no initial delay"), \
+ PARA_ERROR(DELAY_TIMEOUT, "initial delay timeout"), \
#define ALSA_WRITER_ERRORS \
PARA_ERROR(SET_RATE, "snd_pcm_hw_params_set_rate_near failed"), \
PARA_ERROR(START_THRESHOLD, "snd_pcm_sw_params_set_start_threshold() failed"), \
PARA_ERROR(STOP_THRESHOLD, "snd_pcm_sw_params_set_stop_threshold() failed"), \
- PARA_ERROR(ALSA_LOG, "snd_output_stdio_attach() failed"), \
#define FILE_WRITER_ERRORS \
PARA_ERROR(FW_WRITE, "file writer write error"), \
PARA_ERROR(FW_OPEN, "file writer: can not open output file"), \
+ PARA_ERROR(FW_NO_FILE, "task started without open file"), \
#define WRITE_COMMON_ERRORS \
PARA_ERROR(WRITE_COMMON_SYNTAX, "syntax error in write option"), \
+ PARA_ERROR(WNG_EOF, "wng: end of file"), \
#define AACDEC_ERRORS \
PARA_ERROR(AACDEC_INIT, "failed to init aac decoder"), \
PARA_ERROR(AAC_DECODE, "aac decode error"), \
+
/**
* the subsystem shift
*
/** \cond popcorn time */
SS_ENUM(GUI);
+SS_ENUM(SCHED);
+SS_ENUM(STDIN);
+SS_ENUM(STDOUT);
SS_ENUM(WAV);
SS_ENUM(COMPRESS);
SS_ENUM(TIME);
--- /dev/null
+section "file writer options"
+
+option "filename" f
+#~~~~~~~~~~~~~~~~~~
+
+"select output file name. Defaults to a
+random filename in ~/.paraslash."
+
+ string typestr="filename"
+ optional
+
/** \file file_writer.c simple output plugin for testing purposes */
#include "para.h"
+#include "list.h"
+#include "sched.h"
#include "write.h"
#include "string.h"
+#include "fd.h"
+#include "file_write.cmdline.h"
#include "error.h"
/** data specific to the file writer */
struct private_file_writer_data {
-/** the file descriptor of the output file */
-int fd;
+ /** the file descriptor of the output file */
+ int fd;
+ /** non-zero if \a fd was added to the write fd set */
+ int check_fd;
};
-static int file_writer_open(struct writer_node *w)
+static int file_writer_open(struct writer_node *wn)
{
struct private_file_writer_data *pfwd = para_calloc(
sizeof(struct private_file_writer_data));
- char *tmp = para_tmpname(), *home = para_homedir(),
- *filename = make_message("%s/.paraslash/%s", home, tmp);
-
- free(home);
- free(tmp);
- w->private_data = pfwd;
+ struct file_write_args_info *conf = wn->conf;
+ char *filename;
+ if (conf->filename_given)
+ filename = conf->filename_arg;
+ else {
+ char *tmp = para_tmpname(), *home = para_homedir();
+ filename = make_message("%s/.paraslash/%s", home, tmp);
+ free(home);
+ free(tmp);
+ }
+ wn->private_data = pfwd;
pfwd->fd = open(filename, O_WRONLY | O_CREAT, S_IRUSR | S_IWUSR);
- free(filename);
+ if (!conf->filename_given)
+ free(filename);
if (pfwd->fd >= 0)
return 8192;
free(pfwd);
return ret;
}
+static void file_writer_pre_select(struct sched *s, struct task *t)
+{
+ struct writer_node *wn = t->private_data;
+ struct private_file_writer_data *pfwd = wn->private_data;
+ struct writer_node_group *wng = wn->wng;
+
+// PARA_INFO_LOG("task %p check_fd: %d\n", t, pfwd->check_fd);
+ pfwd->check_fd = 0;
+ t->ret = -E_FW_NO_FILE;
+ if (pfwd->fd <= 0)
+ return;
+ t->ret = 0;
+ if (!*wng->loaded)
+ return;
+ t->ret = 1;
+ para_fd_set(pfwd->fd, &s->wfds, &s->max_fileno);
+ pfwd->check_fd = 1;
+}
+
+static void file_writer_post_select(struct sched *s, struct task *t)
+{
+ struct writer_node *wn = t->private_data;
+ struct private_file_writer_data *pfwd = wn->private_data;
+ struct writer_node_group *wng = wn->wng;
+
+ t->ret = 0;
+ if (!pfwd->check_fd)
+ return;
+ if (!*wng->loaded)
+ return;
+ if (!FD_ISSET(pfwd->fd, &s->wfds))
+ return;
+// PARA_INFO_LOG("writing %zd\n", *wng->loaded);
+ t->ret = write(pfwd->fd, wng->buf, *wng->loaded);
+ if (t->ret < 0)
+ t->ret = -E_FW_WRITE;
+}
+
static void file_writer_close(struct writer_node *wn)
{
struct private_file_writer_data *pfwd = wn->private_data;
free(pfwd);
}
+__malloc void *file_writer_parse_config(char *options)
+{
+ PARA_INFO_LOG("options: %s\n", options);
+ struct file_write_args_info *conf
+ = para_calloc(sizeof(struct file_write_args_info));
+ int ret = file_cmdline_parser_string(options, conf, "file_write");
+ PARA_INFO_LOG("conf->filename_given: %d\n", conf->filename_given);
+ if (!ret)
+ return conf;
+ free(conf);
+ return NULL;
+}
+
/** the init function of the file writer */
void file_writer_init(struct writer *w)
{
w->open = file_writer_open;
w->write = file_writer_write;
+ w->pre_select = file_writer_pre_select;
+ w->post_select = file_writer_post_select;
+ w->parse_config = file_writer_parse_config;
w->close = file_writer_close;
w->shutdown = NULL; /* nothing to do */
}
#include "filter.cmdline.h"
#include "list.h"
+#include "sched.h"
#include "filter.h"
-#include "error.h"
#include "string.h"
+#include "stdin.h"
+#include "stdout.h"
+#include "error.h"
INIT_FILTER_ERRLISTS;
-#define INBUF_SIZE 32 * 1024
-
-static struct filter_chain_info filter_chain_info_struct;
-static struct filter_chain_info *fci = &filter_chain_info_struct;
+static struct stdin_task stdin_task_struct;
+static struct stdin_task *sit = &stdin_task_struct;
+static struct filter_chain filter_chain_struct;
+static struct filter_chain *fc = &filter_chain_struct;
+static struct stdout_task stdout_task_struct;
+static struct stdout_task *sot = &stdout_task_struct;
struct gengetopt_args_info conf;
va_end(argp);
}
-static char *inbuf;
-static size_t loaded;
-static int eof;
+void filter_event_handler(struct task *t)
+{
+ PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret));
+ unregister_task(t);
+}
-static int init_active_filter_list(void)
+static void open_filters(void)
+{
+ struct filter_node *fn;
+
+ list_for_each_entry(fn, &fc->filters, node) {
+ fn->filter->open(fn);
+ PARA_INFO_LOG("opened %s filter\n", fn->filter->name);
+ fc->outbuf = fn->buf;
+ fc->out_loaded = &fn->loaded;
+ }
+}
+
+
+static int init_filter_chain(void)
{
int i, filter_num;
struct filter_node *fn;
- INIT_LIST_HEAD(&fci->filters);
+ INIT_LIST_HEAD(&fc->filters);
- fci->inbuf = inbuf;
- fci->in_loaded = &loaded;
- fci->eof = &eof;
+ fc->inbuf = sit->buf;
+ fc->in_loaded = &sit->loaded;
+ fc->input_eof = &sit->eof;
+ fc->eof = 0;
+ fc->output_eof = &sot->eof;
+ fc->task.private_data = fc;
+ fc->task.pre_select = filter_pre_select;
+ fc->task.event_handler = filter_event_handler;
+ sprintf(fc->task.status, "filter chain");
for (i = 0; i < conf.filter_given; i++) {
- char *fa = para_strdup(conf.filter_arg[i]);
+ char *fa = conf.filter_arg[i];
fn = para_calloc(sizeof(struct filter_node));
filter_num = check_filter_arg(fa, &fn->conf);
if (filter_num < 0) {
free(fn);
return filter_num;
}
- fn->fci = fci;
+ fn->fc = fc;
INIT_LIST_HEAD(&fn->callbacks);
fn->filter = &filters[filter_num];
PARA_DEBUG_LOG("adding %s to filter chain\n", fn->filter->name);
- list_add_tail(&fn->node, &fci->filters);
+ list_add_tail(&fn->node, &fc->filters);
}
- if (list_empty(&fci->filters))
+ if (list_empty(&fc->filters))
return -E_NO_FILTERS;
+ open_filters();
return 1;
}
-static void open_filters(void)
-{
- struct filter_node *fn;
-
- list_for_each_entry(fn, &fci->filters, node) {
- fn->filter->open(fn);
- PARA_INFO_LOG("opened %s filter\n", fn->filter->name);
- fci->outbuf = fn->buf;
- fci->out_loaded = &fn->loaded;
- }
-}
-
static int parse_config(int argc, char *argv[])
{
static char *cf; /* config file */
int main(int argc, char *argv[])
{
- int converted, ret;
- char *ib, *ob; /* input/output buffer */
- size_t *il, *ol; /* number of loaded bytes in input/output buffer */
+ int ret;
+ struct sched s;
+
+ init_sched();
+ stdin_set_defaults(sit);
+ sit->buf = para_malloc(sit->bufsize),
filter_init(filters);
ret = parse_config(argc, argv);
if (ret < 0)
goto out;
- inbuf = para_malloc(INBUF_SIZE);
- ret = init_active_filter_list();
- if (ret < 0)
- goto out;
- open_filters();
- ib = fci->inbuf;
- ob = fci->outbuf;
- il = fci->in_loaded;
- ol = fci->out_loaded;
- PARA_DEBUG_LOG("ib %p in, ob: %p\n", ib, ob);
-again:
- if (*il < INBUF_SIZE && !eof) {
- ret = read(STDIN_FILENO, ib + *il, INBUF_SIZE - *il);
- PARA_DEBUG_LOG("read %d/%zd\n", ret, INBUF_SIZE - *il);
- if (ret < 0)
- goto out;
- if (!ret)
- eof = 1;
- *il += ret;
- }
- ret = filter_io(fci);
+ ret = init_filter_chain();
if (ret < 0)
goto out;
- converted = ret;
- if (*ol) {
- ret = write(STDOUT_FILENO, ob, *ol);
- PARA_DEBUG_LOG("wrote %d/%zd\n", ret, *ol);
- if (ret <= 0)
- goto out;
- *ol -= ret;
- if (*ol) {
- PARA_NOTICE_LOG("short write: %zd bytes left\n", *ol);
- memmove(ob, ob + ret, *ol);
- }
- }
- if (!eof || converted)
- goto again;
- ret = 0;
+
+ stdout_set_defaults(sot);
+ PARA_EMERG_LOG("fc->output_eof: %d\n", *fc->output_eof);
+ sot->buf = fc->outbuf;
+ sot->loaded = fc->out_loaded;
+ sot->input_eof = &fc->eof;
+
+ register_task(&sot->task);
+ register_task(&fc->task);
+ register_task(&sit->task);
+ s.default_timeout.tv_sec = 1;
+ s.default_timeout.tv_usec = 0;
+ PARA_EMERG_LOG("fc->output_eof: %d\n", *fc->output_eof);
+ ret = sched(&s);
out:
+ free(sit->buf);
+ close_filters(fc);
if (ret < 0)
PARA_EMERG_LOG("%s\n", PARA_STRERROR(-ret));
- close_filters(fci);
- return ret;
+ return ret < 0? EXIT_FAILURE : EXIT_SUCCESS;
}
* describes one running instance of a chain of filters
*
*/
-struct filter_chain_info {
-/**
- *
- *
- * the number of channels of the current stream
- *
- * Set by the decoding filter
- */
- unsigned int channels;
-/**
- *
- *
- * current samplerate in Hz
- *
- * Set by the decoding filter
- */
- unsigned int samplerate;
-/**
- *
- *
- * the list containing all filter nodes in this filter chain
- */
- struct list_head filters;
-/**
- *
- *
- * the input buffer of the filter chain
- *
- * This is set to point to the output buffer of the receiving application (the
- * buffer used to read from stdin for para_filter; the output buffer of the
- * current receiver for para_audiod)
- */
- char *inbuf;
-/**
- *
- *
- * the output buffer of the filter chain
- *
- * Points to the output buffer of the last filter in the filter chain
-**/
- char *outbuf;
-/**
- *
- *
- * pointer to variable containing the number of bytes loaded in the input buffer
- */
- size_t *in_loaded;
-/**
- *
- *
- * pointer to variable containing the number of bytes loaded in the output buffer
- */
- size_t *out_loaded;
-/**
- *
- *
- * non-zero if end of file was encountered
- */
- int *eof;
-/**
- *
- *
- * non-zero if an error occured
- */
- int error;
+struct filter_chain {
+ /**
+ *
+ *
+ * the number of channels of the current stream
+ *
+ * Set by the decoding filter
+ */
+ unsigned int channels;
+ /**
+ *
+ *
+ * current samplerate in Hz
+ *
+ * Set by the decoding filter
+ */
+ unsigned int samplerate;
+ /**
+ *
+ *
+ * the list containing all filter nodes in this filter chain
+ */
+ struct list_head filters;
+ /**
+ *
+ *
+ * the input buffer of the filter chain
+ *
+ * This is set to point to the output buffer of the receiving application (the
+ * buffer used to read from stdin for para_filter; the output buffer of the
+ * current receiver for para_audiod)
+ */
+ char *inbuf;
+ /**
+ *
+ *
+ * the output buffer of the filter chain
+ *
+ * Points to the output buffer of the last filter in the filter chain
+ **/
+ char *outbuf;
+ /**
+ *
+ *
+ * pointer to variable containing the number of bytes loaded in the input buffer
+ */
+ size_t *in_loaded;
+ /**
+ *
+ *
+ * pointer to variable containing the number of bytes loaded in the output buffer
+ */
+ size_t *out_loaded;
+ /** non-zero if this filter wont' produce any more output */
+ int eof;
+ /** pointer to the eof flag of the receiving application */
+ int *input_eof;
+ /** pointer to the eof flag of the writing application */
+ int *output_eof;
+ /** the task associated with the filter chain */
+ struct task task;
};
/**
*
* the filter chain this filter node belongs to
*/
- struct filter_chain_info *fci;
+ struct filter_chain *fc;
/**
*
*
};
-void close_filters(struct filter_chain_info *fci);
-int filter_io(struct filter_chain_info *fci);
+void close_filters(struct filter_chain *fc);
+int filter_io(struct filter_chain *fc);
void filter_init(struct filter *all_filters);
int check_filter_arg(char *filter_arg, void **conf);
int del_filter_callback(struct filter_callback *fcb);
+void filter_pre_select(struct sched *s, struct task *t);
/**
* the structure associated with a paraslash filter
#include "para.h"
#include "list.h"
+#include "sched.h"
+#include "fd.h"
#include "filter.h"
#include "error.h"
#include "string.h"
{
struct filter_callback *fcb, *tmp;
- list_for_each_entry_safe(fcb, tmp, &fn->callbacks, node)
+ list_for_each_entry_safe(fcb, tmp, &fn->callbacks, node) {
+ PARA_INFO_LOG("closing %s filter callback\n",
+ fn->filter->name);
close_filter_callback(fcb);
+ }
}
static void call_callbacks(struct filter_node *fn, char *inbuf, size_t inlen,
/**
* call the convert function of each filter
*
- * \param fci the filter chain containing the list of filter nodes.
- *
* This is the core function of the filter subsystem. It loops over the list of
- * filter nodes determined by \a fci and calls the filter's convert function if
+ * filter nodes determined by \a t and calls the filter's convert function if
* there is input available for the filter node in question. If the convert
* function consumed some or all of its input data, all registered input
* callbacks are called. Similarly, if a convert function produced output, all
* registerd output callbacks get called.
*
- * \return The sum of output bytes produced by the convert functions on success,
- * negative return value on errors.
+ * \return The sum of output bytes produced by the convert functions on
+ * success, negative return value on errors (the return value is stored in
+ * t->ret).
*
* \sa filter_node, filter#convert, filter_callback
*/
-int filter_io(struct filter_chain_info *fci)
+void filter_pre_select(__a_unused struct sched *s, struct task *t)
{
+ struct filter_chain *fc = t->private_data;
struct filter_node *fn;
char *ib;
size_t *loaded;
int conv, conv_total = 0;
+
+ t->ret = -E_FC_EOF;
+ if (*fc->output_eof)
+ goto err_out;
again:
- ib = fci->inbuf;
- loaded = fci->in_loaded;
+ ib = fc->inbuf;
+ loaded = fc->in_loaded;
conv = 0;
- list_for_each_entry(fn, &fci->filters, node) {
- int ret;
+ list_for_each_entry(fn, &fc->filters, node) {
if (*loaded && fn->loaded < fn->bufsize) {
size_t old_fn_loaded = fn->loaded;
- PARA_DEBUG_LOG("fc %p loaded: %zd, calling %s convert\n", fci, *loaded, fn->filter->name);
- ret = fn->filter->convert(ib, *loaded, fn);
- if (ret < 0) {
- if (!fci->error)
- fci->error = -ret;
- return ret;
- }
- call_callbacks(fn, ib, ret, fn->buf + old_fn_loaded, fn->loaded - old_fn_loaded);
- *loaded -= ret;
- conv += ret;
- if (*loaded && ret) {
- PARA_DEBUG_LOG("moving %zd bytes in input buffer for %s filter\n",
+ PARA_DEBUG_LOG("fc %p loaded: %zd, calling %s convert\n",
+ fc, *loaded, fn->filter->name);
+ t->ret = fn->filter->convert(ib, *loaded, fn);
+ if (t->ret < 0)
+ goto err_out;
+ call_callbacks(fn, ib, t->ret, fn->buf + old_fn_loaded,
+ fn->loaded - old_fn_loaded);
+ *loaded -= t->ret;
+ conv += t->ret;
+ if (*loaded && t->ret) {
+ PARA_INFO_LOG("moving %zd bytes in input buffer for %s filter\n",
*loaded, fn->filter->name);
- memmove(ib, ib + ret, *loaded);
+ memmove(ib, ib + t->ret, *loaded);
}
}
ib = fn->buf;
loaded = &fn->loaded;
}
-// PARA_DEBUG_LOG("loaded: %d\n", *loaded);
conv_total += conv;
+ PARA_INFO_LOG("eof (in/out/fc): %d/%d/%d out_loaded: %d, conv: %d, conv_total: %d\n", *fc->input_eof,
+ *fc->output_eof, fc->eof, *fc->out_loaded, conv, conv_total);
if (conv)
goto again;
- return conv_total;
+ t->ret = 1;
+ if (!*fc->input_eof)
+ return;
+ if (*fc->out_loaded)
+ return;
+ if (*fc->in_loaded && conv_total)
+ return;
+ t->ret = -E_FC_EOF;
+err_out:
+ fc->eof = 1;
}
/**
* close all filter nodes and its callbacks
*
- * \param fci the filter chain to close
+ * \param fc the filter chain to close
*
- * For each filter node determined by \a fci, call the close function of each
+ * For each filter node determined by \a fc, call the close function of each
* registered filter callback as well as the close function of the
* corresponding filter. Free all resources and destroy all callback lists and
* the filter node list.
*
* \sa filter::close, filter_callback::close
*/
-void close_filters(struct filter_chain_info *fci)
+void close_filters(struct filter_chain *fc)
{
struct filter_node *fn, *tmp;
- if (!fci)
+ if (!fc)
return;
- PARA_DEBUG_LOG("closing filter chain %p\n", fci);
- list_for_each_entry_safe(fn, tmp, &fci->filters, node) {
- PARA_NOTICE_LOG("closing %s filter callbacks (fci %p, fn %p)\n", fn->filter->name, fci, fn);
+ PARA_NOTICE_LOG("closing filter chain %p\n", fc);
+ list_for_each_entry_safe(fn, tmp, &fc->filters, node) {
close_callbacks(fn);
- PARA_NOTICE_LOG("closing %s filter (fci %p, fn %p)\n", fn->filter->name, fci, fn);
+ PARA_INFO_LOG("closing %s filter\n", fn->filter->name);
fn->filter->close(fn);
list_del(&fn->node);
free(fn);
#include "close_on_fork.h"
#include "grab_client.cmdline.h"
#include "list.h"
+#include "sched.h"
#include "filter.h"
#include "grab_client.h"
#include "audiod.h"
#include "para.h"
#include "http.h"
+#include "list.h"
+#include "sched.h"
#include "recv.h"
#include "http_recv.cmdline.h"
#include "error.h"
#include "net.h"
#include "string.h"
+#include "fd.h"
/** the output buffer size of the http receiver */
#define BUFSIZE (32 * 1024)
return ret;
}
-static int http_pre_select(struct receiver_node *rn, fd_set *rfds, fd_set *wfds,
- __a_unused struct timeval *timeout)
+static void http_recv_pre_select(struct sched *s, struct task *t)
{
+ struct receiver_node *rn = t->private_data;
struct private_http_recv_data *phd = rn->private_data;
+ t->ret = 1;
if (phd->status == HTTP_CONNECTED)
- FD_SET(phd->fd, wfds);
+ para_fd_set(phd->fd, &s->wfds, &s->max_fileno);
else
- FD_SET(phd->fd, rfds);
- return phd->fd;
+ para_fd_set(phd->fd, &s->rfds, &s->max_fileno);
}
-static int http_post_select(struct receiver_node *rn, int select_ret,
- fd_set *rfds, fd_set *wfds)
+
+static void http_recv_post_select(struct sched *s, struct task *t)
{
- int ret;
+ struct receiver_node *rn = t->private_data;
struct private_http_recv_data *phd = rn->private_data;
- if (!select_ret) /* we're not interested in timeouts */
- return 1;
+ t->ret = 1;
+ if (!s->select_ret) /* we're not interested in timeouts */
+ return;
if (phd->status == HTTP_CONNECTED) {
char *rq;
- if (!FD_ISSET(phd->fd, wfds))
- return 1; /* nothing to do */
+ if (!FD_ISSET(phd->fd, &s->wfds))
+ return; /* nothing to do */
rq = make_request_msg();
PARA_NOTICE_LOG("%s", "sending http request\n");
- ret = send_va_buffer(phd->fd, "%s", rq);
+ t->ret = send_va_buffer(phd->fd, "%s", rq);
free(rq);
- if (ret < 0)
- return E_SEND_HTTP_REQUEST;
- phd->status = HTTP_SENT_GET_REQUEST;
- return 1;
+ if (t->ret > 0)
+ phd->status = HTTP_SENT_GET_REQUEST;
+ return;
}
- if (!FD_ISSET(phd->fd, rfds))
- return 1; /* nothing to do */
+ if (!FD_ISSET(phd->fd, &s->rfds))
+ return; /* nothing to do */
if (phd->status == HTTP_SENT_GET_REQUEST) {
- ret = recv_pattern(phd->fd, HTTP_OK_MSG, MAXLINE);
- if (ret < 0)
- return -E_MISSING_OK;
+ t->ret = recv_pattern(phd->fd, HTTP_OK_MSG, MAXLINE);
+ if (t->ret < 0)
+ return;
PARA_NOTICE_LOG("%s", "received ok msg, streaming\n");
phd->status = HTTP_STREAMING;
- return 1;
+ return;
}
+ t->ret = -E_OVERRUN;
/* already streaming */
- if (rn->loaded >= BUFSIZE) {
- PARA_ERROR_LOG("%s", "buffer overrun\n");
- return -E_OVERRUN;
+ if (rn->loaded >= BUFSIZE)
+ return;
+ t->ret = recv_bin_buffer(phd->fd, rn->buf + rn->loaded,
+ BUFSIZE - rn->loaded);
+ if (t->ret <= 0) {
+ rn->eof = 1;
+ if (!t->ret)
+ t->ret = -E_HTTP_RECV_EOF;
+ return;
}
- ret = recv_bin_buffer(phd->fd, rn->buf + rn->loaded, BUFSIZE - rn->loaded);
- if (ret <= 0) {
- PARA_NOTICE_LOG("recv returned %d/%zd\n", ret, BUFSIZE - rn->loaded);
- return ret < 0? -E_HTTP_RECV_BUF : 0;
- }
- rn->loaded += ret;
- return 1;
+ rn->loaded += t->ret;
}
static void http_recv_close(struct receiver_node *rn)
rn->buf = para_calloc(BUFSIZE);
rn->private_data = para_calloc(sizeof(struct private_http_recv_data));
phd = rn->private_data;
+ PARA_NOTICE_LOG("phd = %p, rn = %p\n", phd, rn);
ret = -E_HOST_INFO;
if (!(he = get_host_info(conf->host_arg)))
goto err_out;
{
r->open = http_recv_open;
r->close = http_recv_close;
- r->pre_select = http_pre_select;
- r->post_select = http_post_select;
+ r->pre_select = http_recv_pre_select;
+ r->post_select = http_recv_post_select;
r->shutdown = http_shutdown;
r->parse_config = http_recv_parse_config;
}
n = list_entry(pos->member.next, typeof(*pos), member); \
&pos->member != (head); \
pos = n, n = list_entry(n->member.next, typeof(*n), member))
+/**
+ * list_for_each_entry_safe_reverse - iterate backwards over list of given type safe against
+ * removal of list entry
+ * @pos: the type * to use as a loop counter.
+ * @n: another type * to use as temporary storage
+ * @head: the head for your list.
+ * @member: the name of the list_struct within the struct.
+ */
+#define list_for_each_entry_safe_reverse(pos, n, head, member) \
+ for (pos = list_entry((head)->prev, typeof(*pos), member), \
+ n = list_entry(pos->member.prev, typeof(*pos), member); \
+ &pos->member != (head); \
+ pos = n, n = list_entry(n->member.prev, typeof(*n), member))
+
#endif /* _LIST_H */
/** \file mp3dec.c paraslash's mp3 decoder */
#include "para.h"
-
#include "list.h"
+#include "sched.h"
#include "filter.h"
#include "error.h"
#include <mad.h>
goto out;
return FRAME_HEADER_SIZE;
}
- fn->fci->samplerate = pmd->frame.header.samplerate;
- fn->fci->channels = MAD_NCHANNELS(&pmd->frame.header);
+ fn->fc->samplerate = pmd->frame.header.samplerate;
+ fn->fc->channels = MAD_NCHANNELS(&pmd->frame.header);
ret = mad_frame_decode(&pmd->frame, &pmd->stream);
if (ret) {
if (MAD_RECOVERABLE(pmd->stream.error) || pmd->stream.error == MAD_ERROR_BUFLEN)
out:
if (pmd->stream.next_frame) { /* we still have some data */
size_t off = pmd->stream.bufend - pmd->stream.next_frame;
- PARA_DEBUG_LOG("converted %zd, rate: %u, returning %zd\n", len - off,
- fn->fci->samplerate, copy - off);
+// PARA_DEBUG_LOG("converted %zd, rate: %u, returning %zd\n", len - off,
+// fn->fc->samplerate, copy - off);
return copy - off;
}
return copy;
#include "oggdec_filter.cmdline.h"
#include "list.h"
+#include "sched.h"
#include "filter.h"
#include "error.h"
#include "string.h"
size_t ret, have = pod->inbuf_len - pod->converted;
char *p = pod->inbuf + pod->converted;
- if (*fn->fci->eof)
- return 0;
// PARA_DEBUG_LOG("pod = %p\n", pod);
// PARA_DEBUG_LOG("vorbis requests %d bytes, have %d\n", size * nmemb, have);
if (pod->inbuf_len < size) {
+ if (*fn->fc->input_eof)
+ return 0;
errno = EAGAIN;
return -1;
}
if (!pod->vf) {
int ib = 1024 * conf->initial_buffer_arg; /* initial buffer */
- if (len <ib && !*fn->fci->eof && !fn->fci->error) {
+ if (len <ib && !*fn->fc->input_eof) {
PARA_INFO_LOG("initial input buffer %zd/%d, waiting for more data\n",
len, ib);
return 0;
return -E_OGGDEC_BADHEADER;
if (ret < 0)
return -E_OGGDEC_FAULT;
- fn->fci->channels = ov_info(pod->vf, 0)->channels;
- fn->fci->samplerate = ov_info(pod->vf, 0)->rate;
- PARA_NOTICE_LOG("%d channels, %d Hz\n", fn->fci->channels, fn->fci->samplerate);
+ fn->fc->channels = ov_info(pod->vf, 0)->channels;
+ fn->fc->samplerate = ov_info(pod->vf, 0)->rate;
+ PARA_NOTICE_LOG("%d channels, %d Hz\n", fn->fc->channels, fn->fc->samplerate);
}
again:
ret = ov_read(pod->vf, fn->buf + fn->loaded, fn->bufsize - fn->loaded,
if (ret < 0)
return -E_OGGDEC_BADLINK;
fn->loaded += ret;
- if (!*fn->fci->eof && !fn->fci->error && fn->loaded < fn->bufsize)
+ if (!*fn->fc->input_eof && fn->loaded < fn->bufsize)
goto again;
return pod->converted;
}
#include "para.h"
#include "ortp.h"
+#include "list.h"
+#include "sched.h"
#include "recv.h"
#include "ortp_recv.cmdline.h"
* \sa receiver receiver_node
*/
struct private_ortp_recv_data {
-
/**
*
*
uint32_t chunk_ts;
};
-static int ortp_recv_pre_select(struct receiver_node *rn,
- __a_unused fd_set *rfds, __a_unused fd_set *wfds,
- struct timeval *timeout)
+static void ortp_recv_pre_select(struct sched *s, struct task *t)
{
+ struct receiver_node *rn = t->private_data;
struct private_ortp_recv_data *pord = rn->private_data;
- struct timeval now, tmp;
+ struct timeval tmp;
- gettimeofday(&now, NULL);
- if (tv_diff(&now, &pord->next_chunk, &tmp) >= 0) {
+ if (tv_diff(&s->now, &pord->next_chunk, &tmp) >= 0) {
tmp.tv_sec = 0;
tmp.tv_usec = 1000;
}
- if (tv_diff(timeout, &tmp, NULL) > 0)
- *timeout = tmp;
- return -1; /* we did not modify the fd sets */
+ if (tv_diff(&s->timeout, &tmp, NULL) > 0)
+ s->timeout = tmp;
+ t->ret = 1;
}
static void compute_next_chunk(unsigned chunk_time,
pord->next_chunk.tv_usec);
}
-static int ortp_recv_post_select(struct receiver_node *rn,
- __a_unused int select_ret, __a_unused fd_set *rfds,
- __a_unused fd_set *wfds)
+static void ortp_recv_post_select(struct sched *s, struct task *t)
{
+ struct receiver_node *rn = t->private_data;
struct private_ortp_recv_data *pord = rn->private_data;
mblk_t *mp;
- int ret, packet_type, stream_type;
+ int packet_type, stream_type;
char tmpbuf[CHUNK_SIZE + 3];
- struct timeval now;
unsigned chunk_time;
- gettimeofday(&now, NULL);
-// PARA_DEBUG_LOG("rn: %p, pord: %p, session: %p\n", rn, pord, pord->session);
- if (pord->start.tv_sec) {
- struct timeval diff;
- if (tv_diff(&now, &pord->next_chunk, &diff) < 0)
- return 1;
- }
+// PARA_INFO_LOG("rn: %p, pord: %p, session: %p\n", rn, pord, pord->session);
+ t->ret = 1;
+ if (pord->start.tv_sec)
+ if (tv_diff(&s->now, &pord->next_chunk, NULL) < 0)
+ return;
mp = rtp_session_recvm_with_ts(pord->session, pord->timestamp);
if (!mp) {
struct timeval min_delay = {0, 100};
// PARA_INFO_LOG("nope, chunk_ts = %d, loaded: %d, bad: %d\n",
// pord->timestamp, rn->loaded, pord->c_bad);
pord->c_bad++;
+ t->ret = -E_TOO_MANY_BAD_CHUNKS;
if ((pord->c_bad > 5000 && pord->start.tv_sec) || pord->c_bad > 10000)
- return -E_TOO_MANY_BAD_CHUNKS;
- tv_add(&now, &min_delay, &pord->next_chunk);
- return 1;
+ return;
+ t->ret = 1;
+ tv_add(&s->now, &min_delay, &pord->next_chunk);
+ return;
}
/* okay, we have a chunk of data */
if (!pord->start.tv_sec)
- pord->start = now;
- ret = msg_to_buf(mp, tmpbuf, CHUNK_SIZE);
-// PARA_DEBUG_LOG("have it ts = %d, chunk_ts = %d, loaded: %d, "
-// "bad: %d, len: %d\n", pord->timestamp, pord->chunk_ts,
-// rn->loaded, pord->c_bad, ret);
- if (ret < ORTP_AUDIO_HEADER_LEN) {
- if (ret < 0)
- ret = -E_MSG_TO_BUF;
+ pord->start = s->now;
+ t->ret = msg_to_buf(mp, tmpbuf, CHUNK_SIZE);
+ if (t->ret < ORTP_AUDIO_HEADER_LEN) {
+ rn->eof = 1;
+ if (t->ret < 0)
+ t->ret = -E_MSG_TO_BUF;
else
- ret = 0;
+ t->ret = -E_ORTP_RECV_EOF;
goto err_out;
}
packet_type = READ_PACKET_TYPE(tmpbuf);
switch (packet_type) {
unsigned header_len, payload_len;
case ORTP_EOF:
- ret = 0;
+ rn->eof = 1;
+ t->ret = -E_ORTP_RECV_EOF;
goto err_out;
case ORTP_BOF:
- PARA_INFO_LOG("bof (%d)\n", ret);
+ PARA_INFO_LOG("bof (%d)\n", t->ret);
pord->have_header = 1;
/* fall through */
case ORTP_DATA:
if (!pord->have_header && stream_type)
/* can't use the data, wait for header */
goto success;
- if (ret + rn->loaded >= CHUNK_SIZE + ORTP_AUDIO_HEADER_LEN) {
- ret = -E_OVERRUN;
+ if (t->ret + rn->loaded >= CHUNK_SIZE + ORTP_AUDIO_HEADER_LEN) {
+ t->ret = -E_OVERRUN;
goto err_out;
}
- if (ret > ORTP_AUDIO_HEADER_LEN) {
+ if (t->ret > ORTP_AUDIO_HEADER_LEN) {
memcpy(rn->buf + rn->loaded, tmpbuf + ORTP_AUDIO_HEADER_LEN,
- ret - ORTP_AUDIO_HEADER_LEN);
- rn->loaded += ret - ORTP_AUDIO_HEADER_LEN;
+ t->ret - ORTP_AUDIO_HEADER_LEN);
+ rn->loaded += t->ret - ORTP_AUDIO_HEADER_LEN;
}
goto success;
case ORTP_HEADER:
header_len = READ_HEADER_LEN(tmpbuf);
PARA_DEBUG_LOG("header packet (%d bytes), header len: %d\n",
- ret, header_len);
+ t->ret, header_len);
if (!pord->have_header) {
pord->have_header = 1;
memcpy(rn->buf, tmpbuf + ORTP_AUDIO_HEADER_LEN,
- ret - ORTP_AUDIO_HEADER_LEN);
- rn->loaded = ret - ORTP_AUDIO_HEADER_LEN;
+ t->ret - ORTP_AUDIO_HEADER_LEN);
+ rn->loaded = t->ret - ORTP_AUDIO_HEADER_LEN;
goto success;
}
- if (header_len + ORTP_AUDIO_HEADER_LEN > ret) {
- ret = -E_INVALID_HEADER;
+ if (header_len + ORTP_AUDIO_HEADER_LEN > t->ret) {
+ t->ret = -E_INVALID_HEADER;
goto err_out;
}
- payload_len = ret - ORTP_AUDIO_HEADER_LEN - header_len;
+ payload_len = t->ret - ORTP_AUDIO_HEADER_LEN - header_len;
// PARA_INFO_LOG("len: %d header_len: %d, payload_len: %d, loaded: %d\n", ret,
// header_len, payload_len, rn->loaded);
if (rn->loaded + payload_len > CHUNK_SIZE) {
- ret = -E_OVERRUN;
+ t->ret = -E_OVERRUN;
goto err_out;
}
if (payload_len)
memcpy(rn->buf + rn->loaded, tmpbuf
- + (ret - payload_len), payload_len);
+ + (t->ret - payload_len), payload_len);
rn->loaded += payload_len;
goto success;
}
success:
+ t->ret = 1;
freemsg(mp);
if (pord->c_bad) {
pord->c_bad = 0;
- pord->next_chunk = now;
+ pord->next_chunk = s->now;
}
compute_next_chunk(chunk_time, pord);
- return 1;
+ return;
err_out:
+ rn->eof = 1;
freemsg(mp);
- return ret;
}
static void ortp_shutdown(void)
*/
#include "para.h"
+#include "list.h"
+#include "sched.h"
#include "recv.h"
#include "recv.cmdline.h"
#include "fd.h"
#include "error.h"
+#include "stdout.h"
struct gengetopt_args_info conf;
return check_receiver_arg(conf.receiver_arg, receiver_num);
}
+void rn_event_handler(struct task *t)
+{
+ PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret));
+ unregister_task(t);
+}
+
int main(int argc, char *argv[])
{
- int ret, eof = 0, max, r_opened = 0, receiver_num;
- struct timeval timeout;
+ int ret, r_opened = 0, receiver_num;
struct receiver *r = NULL;
- fd_set rfds, wfds;
struct receiver_node rn;
+ struct stdout_task sot;
+ struct sched s;
+
+ init_sched();
+ s.default_timeout.tv_sec = 1;
+ s.default_timeout.tv_usec = 0;
memset(&rn, 0, sizeof(struct receiver_node));
for (ret = 0; receivers[ret].name; ret++)
if (ret < 0)
goto out;
r_opened = 1;
-recv:
- FD_ZERO(&rfds);
- FD_ZERO(&wfds);
- timeout.tv_sec = 0;
- timeout.tv_usec = 999 * 1000;
- max = -1;
- ret = r->pre_select(&rn, &rfds, &wfds, &timeout);
- max = PARA_MAX(max, ret);
- PARA_DEBUG_LOG("timeout: %lums, max: %d\n", tv2ms(&timeout), max);
- ret = para_select(max + 1, &rfds, &wfds, &timeout);
- if (ret < 0) {
- ret = -E_RECV_SELECT;
- goto out;
- }
- ret = r->post_select(&rn, ret, &rfds, &wfds);
- if (ret < 0)
- goto out;
- if (!ret)
- eof = 1;
- if (!rn.loaded) {
- if (eof)
- goto out;
- goto recv;
- }
- ret = write(STDOUT_FILENO, rn.buf, rn.loaded);
- PARA_DEBUG_LOG("wrote %d/%zd\n", ret, rn.loaded);
- if (ret < 0) {
- ret = -E_WRITE_STDOUT;
- goto out;
- }
- if (ret != rn.loaded) {
- PARA_INFO_LOG("short write %d/%zd\n", ret, rn.loaded);
- memmove(rn.buf, rn.buf + ret, rn.loaded - ret);
- }
- rn.loaded -= ret;
- if (rn.loaded || !eof)
- goto recv;
+ stdout_set_defaults(&sot);
+ sot.buf = rn.buf;
+ sot.loaded = &rn.loaded;
+ sot.input_eof = &rn.eof;
+ register_task(&sot.task);
+
+ rn.task.private_data = &rn;
+ rn.task.pre_select = r->pre_select;
+ rn.task.post_select = r->post_select;
+ rn.task.event_handler = rn_event_handler;
+ rn.task.flags = 0;
+ sprintf(rn.task.status, "receiver node");
+ register_task(&rn.task);
+
+
+ ret = sched(&s);
out:
if (r_opened)
r->close(&rn);
int eof;
/** pointer to the configuration data for this instance */
void *conf;
+ /** the task associated with this instance */
+ struct task task;
};
/**
*
* The pre_select function gets called from the driving application before
* entering its select loop. The receiver may use this hook to add any file
- * descriptors to \a rfds and \a wfds in order to check the result later in the
- * post_select hook.
+ * descriptors to the sets of file descriptors given by \a s.
*
- * \a timeout is a value-result parameter, initially containing the timeout for
- * select() which was set by the application or by another receiver node. If
- * the receiver wants its pre_select function to be called at some earlier time
- * than the time determined by \a timeout, it may set \a timeout to an
- * appropriate smaller value. However, it must never increase this timeout.
*
- * This function must return the highest-numbered descriptor it wants to being
- * checked, or -1 if no file descriptors should be checked for this run.
- *
- * \sa select(2), receiver_node:private_data, time.c
+ * \sa select(2), time.c struct task, struct sched
*/
- int (*pre_select)(struct receiver_node *rn, fd_set *rfds,
- fd_set *wfds, struct timeval *timeout);
+ void (*pre_select)(struct sched *s, struct task *t);
/**
*
*
* evaluate the result from select()
*
- * If the call to select() was succesful, this hook gets called. It should
- * check all file descriptors which were added to any of the the fd sets during
- * the previous call to pre_select. According to the result, it may then use
- * any non-blocking I/O to establish a connection or to receive the audio data.
+ * This hook gets called after the call to select(). It should check all file
+ * descriptors which were added to any of the the fd sets during the previous
+ * call to pre_select. According to the result, it may then use any
+ * non-blocking I/O to establish a connection or to receive the audio data.
*
- * A negative return value is interpreted as an error.
*
* \sa select(2), struct receiver
*/
- int (*post_select)(struct receiver_node *rn, int select_ret,
- fd_set *rfds, fd_set *wfds);
+ void (*post_select)(struct sched *s, struct task *t);
};
#include "para.h"
+#include "list.h"
+#include "sched.h"
#include "recv.h"
#include "string.h"
--- /dev/null
+#include <sys/time.h>
+#include "para.h"
+#include "ipc.h"
+#include "fd.h"
+#include "list.h"
+#include "sched.h"
+#include "string.h"
+#include "error.h"
+
+struct list_head pre_select_list;
+struct list_head post_select_list;
+
+static void sched_preselect(struct sched *s)
+{
+ struct task *t, *tmp;
+again:
+ list_for_each_entry_safe(t, tmp, &pre_select_list, pre_select_node) {
+ t->pre_select(s, t);
+ if (t->ret > 0 || !t->event_handler)
+ continue;
+ t->event_handler(t);
+ goto again;
+ }
+}
+
+static void sched_post_select(struct sched *s)
+{
+ struct task *t, *tmp;
+
+ list_for_each_entry_safe(t, tmp, &post_select_list, post_select_node) {
+ t->post_select(s, t);
+ if (t->ret > 0 || !t->event_handler)
+ continue;
+ t->event_handler(t);
+ }
+}
+
+int sched(struct sched *s)
+{
+
+ gettimeofday(&s->now, NULL);
+again:
+ FD_ZERO(&s->rfds);
+ FD_ZERO(&s->wfds);
+ s->timeout = s->default_timeout;
+ s->max_fileno = -1;
+ sched_preselect(s);
+ s->select_ret = para_select(s->max_fileno + 1, &s->rfds,
+ &s->wfds, &s->timeout);
+ if (s->select_ret < 0)
+ return s->select_ret;
+ gettimeofday(&s->now, NULL);
+ sched_post_select(s);
+ if (list_empty(&pre_select_list) && list_empty(&post_select_list))
+ return 0;
+ goto again;
+}
+
+void *register_task(struct task *t)
+{
+ PARA_INFO_LOG("registering %s (%p)\n", t->status, t);
+ if (t->pre_select) {
+ PARA_DEBUG_LOG("pre_select: %p\n", &t->pre_select);
+ if (t->flags & PRE_ADD_TAIL)
+ list_add_tail(&t->pre_select_node, &pre_select_list);
+ else
+ list_add(&t->pre_select_node, &pre_select_list);
+ }
+ if (t->post_select) {
+ PARA_DEBUG_LOG("post_select: %p\n", &t->pre_select);
+ if (t->flags & POST_ADD_TAIL)
+ list_add_tail(&t->post_select_node, &post_select_list);
+ else
+ list_add(&t->post_select_node, &post_select_list);
+ }
+ return t;
+}
+
+void unregister_task(struct task *t)
+{
+ PARA_INFO_LOG("unregistering %s (%p)\n", t->status, t);
+ if (t->pre_select)
+ list_del(&t->pre_select_node);
+ if (t->post_select)
+ list_del(&t->post_select_node);
+};
+
+void init_sched(void)
+{
+ INIT_LIST_HEAD(&pre_select_list);
+ INIT_LIST_HEAD(&post_select_list);
+};
+
+void sched_shutdown(void)
+{
+ struct task *t, *tmp;
+
+ list_for_each_entry_safe(t, tmp, &pre_select_list, pre_select_node)
+ unregister_task(t);
+ /* remove tasks which do not have a pre_select hook */
+ list_for_each_entry_safe(t, tmp, &post_select_list, post_select_node)
+ unregister_task(t);
+};
+
+
+//char *get_tast_list();
--- /dev/null
+struct sched {
+ struct timeval now, timeout;
+ int max_fileno;
+ fd_set rfds, wfds;
+ int select_ret;
+ struct timeval default_timeout;
+};
+
+struct task {
+ void *private_data;
+ unsigned flags;
+ int ret;
+ void (*pre_select)(struct sched *s, struct task *t);
+ void (*post_select)(struct sched *s, struct task *t);
+ void (*event_handler)(struct task *t);
+ struct list_head pre_select_node;
+ struct list_head post_select_node;
+ char status[MAXLINE];
+};
+
+enum task_flags {
+ PRE_ADD_TAIL = 1,
+ POST_ADD_TAIL = 2,
+};
+
+void *register_task(struct task *t);
+void unregister_task(struct task *t);
+int sched(struct sched *s);
+void init_sched(void);
#include "db.h"
#include "server.h"
#include "afs.h"
-#include "afh.h" /* FIXME */
#include "config.h"
#include "close_on_fork.h"
#include "send.h"
--- /dev/null
+#include "para.h"
+#include "string.h"
+#include "list.h"
+#include "sched.h"
+#include "fd.h"
+#include "error.h"
+#include "stdin.h"
+
+void stdin_pre_select(struct sched *s, struct task *t)
+{
+ struct stdin_task *sit = t->private_data;
+ if (sit->loaded < sit->bufsize)
+ para_fd_set(STDIN_FILENO, &s->rfds, &s->max_fileno);
+ t->ret = 1; /* success */
+}
+
+static void stdin_default_event_handler(struct task *t)
+{
+ PARA_NOTICE_LOG("%p: %s\n", t, PARA_STRERROR(-t->ret));
+ unregister_task(t);
+}
+
+void stdin_post_select(struct sched *s, struct task *t)
+{
+ struct stdin_task *sit = t->private_data;
+ ssize_t ret;
+
+ t->ret = 1;
+ if (sit->loaded >= sit->bufsize)
+ return;
+ if (!FD_ISSET(STDIN_FILENO, &s->rfds))
+ return;
+ ret = read(STDIN_FILENO, sit->buf + sit->loaded, sit->bufsize - sit->loaded);
+ if (ret < 0)
+ t->ret = -E_STDIN_READ;
+ else if (ret > 0) {
+ sit->loaded += ret;
+ t->ret = ret;
+ } else
+ t->ret = -E_STDIN_EOF;
+ if (t->ret < 0)
+ sit->eof = 1;
+}
+
+void stdin_set_defaults(struct stdin_task *sit)
+{
+ sit->bufsize = 16 * 1024,
+ sit->loaded = 0,
+ sit->eof = 0,
+ sit->task.flags = 0,
+ sit->task.pre_select = stdin_pre_select;
+ sit->task.post_select = stdin_post_select;
+ sit->task.event_handler = stdin_default_event_handler;
+ sit->task.private_data = sit;
+ sprintf(sit->task.status, "stdin reader");
+}
--- /dev/null
+struct stdin_task {
+ char *buf;
+ size_t bufsize;
+ size_t loaded;
+ struct task task;
+ int eof;
+};
+
+void stdin_pre_select(struct sched *s, struct task *t);
+void stdin_post_select(struct sched *s, struct task *t);
+void stdin_set_defaults(struct stdin_task *sit);
--- /dev/null
+#include "para.h"
+#include "string.h"
+#include "list.h"
+#include "sched.h"
+#include "fd.h"
+#include "error.h"
+#include "stdout.h"
+
+void stdout_pre_select(struct sched *s, struct task *t)
+{
+ struct stdout_task *sot = t->private_data;
+
+ t->ret = 1;
+ sot->check_fd = 0;
+ if (!*sot->loaded)
+ return;
+ sot->check_fd = 1;
+ para_fd_set(STDOUT_FILENO, &s->wfds, &s->max_fileno);
+}
+
+void stdout_post_select(struct sched *s, struct task *t)
+{
+ struct stdout_task *sot = t->private_data;
+ ssize_t ret;
+
+ t->ret = 1;
+ if (!sot->check_fd) {
+ if (*sot->input_eof)
+ t->ret = -E_STDOUT_EOF;
+ return;
+ }
+ if (!FD_ISSET(STDOUT_FILENO, &s->wfds))
+ return;
+ t->ret = -E_STDOUT_WRITE;
+ ret = write(STDOUT_FILENO, sot->buf, *sot->loaded);
+ if (ret <= 0)
+ return;
+ *sot->loaded -= ret;
+ t->ret = 1;
+}
+
+void stdout_default_event_handler(struct task *t)
+{
+ PARA_NOTICE_LOG("%p: %s\n", t, PARA_STRERROR(-t->ret));
+ unregister_task(t);
+}
+
+
+void stdout_set_defaults(struct stdout_task *sot)
+{
+ sot->task.private_data = sot;
+ sot->task.pre_select = stdout_pre_select;
+ sot->task.post_select = stdout_post_select;
+ sot->task.event_handler = stdout_default_event_handler;
+ sot->task.flags = 0;
+ sot->eof = 0;
+ sprintf(sot->task.status, "stdout writer");
+}
--- /dev/null
+/** \file stdout.h common code for uitlities that write to stdout */
+struct stdout_task {
+ char *buf;
+ size_t *bufsize;
+ size_t *loaded;
+ int *input_eof;
+ int eof;
+ struct task task;
+ int check_fd;
+};
+
+void stdout_pre_select(struct sched *s, struct task *t);
+void stdout_post_select(struct sched *s, struct task *t);
+void stdout_set_defaults(struct stdout_task *sot);
#include "para.h"
#include "list.h"
+#include "sched.h"
#include "filter.h"
#include "string.h"
int *bof = fn->private_data;
if (*bof) {
- make_wav_header(fn->fci->channels, fn->fci->samplerate, fn);
+ make_wav_header(fn->fc->channels, fn->fc->samplerate, fn);
fn->loaded = WAV_HEADER_LEN;
*bof = 0;
// return 0;
fn->private_data = para_malloc(sizeof(int));
bof = fn->private_data;
*bof = 1;
- PARA_DEBUG_LOG("wav filter node: %p, output buffer: %p, loaded: %zd\n",
+ PARA_INFO_LOG("wav filter node: %p, output buffer: %p, loaded: %zd\n",
fn, fn->buf, fn->loaded);
}
#include "para.h"
#include "string.h"
#include "write.cmdline.h"
+#include "list.h"
+#include "sched.h"
+#include "stdin.h"
#include "write.h"
#include "write_common.h"
#include "fd.h"
-
-#include <sys/time.h> /* gettimeofday */
-
#include "error.h"
-#define WAV_HEADER_LEN 44
-
-static char *audiobuf;
-static struct timeval *start_time;
-struct gengetopt_args_info conf;
-
INIT_WRITE_ERRLISTS;
-void para_log(int ll, const char* fmt,...)
-{
- va_list argp;
+struct check_wav_task {
+ char *buf;
+ size_t *loaded;
+ int *eof;
+ unsigned channels;
+ unsigned sample_rate;
+ struct task task;
+};
+
+struct initial_delay_task {
+ struct timeval start_time;
+ struct task task;
+};
+
+static struct gengetopt_args_info conf;
+struct stdin_task sit;
+struct check_wav_task cwt;
+struct initial_delay_task idt;
+static struct writer_node_group *wng;
- if (ll < conf.loglevel_arg)
- return;
- va_start(argp, fmt);
- vfprintf(stderr, fmt, argp);
- va_end(argp);
-}
+#define WAV_HEADER_LEN 44
/**
- * read WAV_HEADER_LEN bytes from stdin to audio buffer
+ * test if audio buffer contains a valid wave header
*
- * \return -E_READ_HDR on errors and on eof before WAV_HEADER_LEN could be
- * read. A positive return value indicates success.
+ * \return If not, return -E_NO_WAV_HEADER, otherwise, return zero. If
+ * there is less than WAV_HEADER_LEN bytes awailable, return one.
*/
-static int read_wav_header(void)
+static void check_wav_pre_select(__a_unused struct sched *s, struct task *t)
{
- ssize_t ret, count = 0;
+ struct check_wav_task *cwt = t->private_data;
+ unsigned char *a;
- while (count < WAV_HEADER_LEN) {
- ret = read(STDIN_FILENO, audiobuf + count, WAV_HEADER_LEN - count);
- if (ret <= 0)
- return -E_READ_HDR;
- count += ret;
+ if (*cwt->loaded < WAV_HEADER_LEN) {
+ t->ret = *cwt->eof? -E_PREMATURE_END : 1;
+ return;
}
- return 1;
+ cwt->channels = 2;
+ cwt->sample_rate = 44100;
+ a = (unsigned char*)cwt->buf;
+ t->ret = -E_NO_WAV_HEADER;
+ if (a[0] != 'R' || a[1] != 'I' || a[2] != 'F' || a[3] != 'F')
+ return;
+ cwt->channels = (unsigned) a[22];
+ cwt->sample_rate = a[24] + (a[25] << 8) + (a[26] << 16) + (a[27] << 24);
+ *cwt->loaded -= WAV_HEADER_LEN;
+ memmove(cwt->buf, cwt->buf + WAV_HEADER_LEN, *cwt->loaded);
+ t->ret = -E_WAV_HEADER_SUCCESS;
+ PARA_INFO_LOG("channels: %d, sample_rate: %d\n", cwt->channels, cwt->sample_rate);
}
-/**
- * check if current time is later than start_time
- * \param diff pointer to write remaining time to
- *
- * If start_time was not given, or current time is later than given
- * start_time, return 0. Otherwise, return 1 and write the time
- * difference between current time and start_time to diff. diff may be
- * NULL.
- *
- */
-static int start_time_in_future(struct timeval *diff)
+static void initial_delay_pre_select(struct sched *s, struct task *t)
{
- struct timeval now;
+ struct initial_delay_task *idt = t->private_data;
+ struct timeval diff;
- if (!conf.start_time_given)
- return 0;
- gettimeofday(&now, NULL);
- return tv_diff(start_time, &now, diff) > 0? 1 : 0;
-}
-
-/**
- * sleep until time given at command line
- *
- * This is called if the initial buffer is filled. It returns
- * immediately if no start_time was given at the command line
- * or if the given start time is in the past.
- *
- */
-static void do_initial_delay(struct timeval *delay)
-{
- do
- para_select(1, NULL, NULL, delay);
- while (start_time_in_future(delay));
+ t->ret = -E_NO_DELAY;
+ if (!idt->start_time.tv_sec && !idt->start_time.tv_usec)
+ return;
+ t->ret = -E_DELAY_TIMEOUT;
+ if (tv_diff(&s->now, &idt->start_time, &diff) > 0)
+ return;
+ t->ret = 1;
+ if (tv_diff(&s->timeout , &diff, NULL) > 0)
+ s->timeout = diff;
}
-static int read_stdin(char *buf, size_t bytes_to_load, size_t *loaded)
-{
- ssize_t ret;
-
- while (*loaded < bytes_to_load) {
- ret = read(STDIN_FILENO, buf + *loaded, bytes_to_load - *loaded);
- if (ret <= 0) {
- if (ret < 0)
- ret = -E_READ_STDIN;
- return ret;
- }
- *loaded += ret;
- }
- return 1;
-}
-/**
- * play raw pcm data
- * \param loaded number of bytes already loaded
- *
- * If start_time was given, prebuffer data until buffer is full or
- * start_time is reached. In any case, do not start playing before
- * start_time.
- *
- * \return positive on success, negative on errors.
- */
-static int pcm_write(struct writer_node_group *wng, size_t loaded)
+void para_log(int ll, const char* fmt,...)
{
- size_t bufsize, prebuf_size, bytes_to_load;
- struct timeval delay;
- int ret, not_yet_started = 1;
+ va_list argp;
- ret = wng_open(wng);
- if (ret < 0)
- goto out;
- PARA_INFO_LOG("max chunk_bytes: %zd\n", wng->max_chunk_bytes);
- bufsize = (conf.bufsize_arg * 1024 / wng->max_chunk_bytes)
- * wng->max_chunk_bytes;
- audiobuf = para_realloc(audiobuf, bufsize);
- prebuf_size = conf.prebuffer_arg * bufsize / 100;
- bytes_to_load = PARA_MAX(prebuf_size, wng->max_chunk_bytes);
- ret = read_stdin(audiobuf, bytes_to_load, &loaded);
- if (ret <= 0 || loaded < bytes_to_load) {
- if (ret >= 0)
- ret = -E_PREMATURE_END;
- goto out;
- }
- if (not_yet_started && start_time && start_time_in_future(&delay))
- do_initial_delay(&delay);
- not_yet_started = 0;
-again:
- ret = wng_write(wng, audiobuf, &loaded);
- if (ret <= 0)
- goto out;
- ret = -E_WRITE_OVERRUN;
- if (loaded >= bufsize)
- goto out;
- bytes_to_load = PARA_MIN(wng->max_chunk_bytes, bufsize);
- ret = read_stdin(audiobuf, bytes_to_load, &loaded);
- if (ret < 0)
- goto out;
- if (!ret)
- wng->eof = 1;
- goto again;
-out:
- wng_close(wng);
- return ret;
+ if (ll < conf.loglevel_arg)
+ return;
+ va_start(argp, fmt);
+ vfprintf(stderr, fmt, argp);
+ va_end(argp);
}
static struct writer_node_group *check_args(void)
{
int i, ret = -E_WRITE_SYNTAX;
- static struct timeval tv;
struct writer_node_group *wng = NULL;
if (conf.list_writers_given) {
free(msg);
exit(EXIT_SUCCESS);
}
- if (conf.prebuffer_arg < 0 || conf.prebuffer_arg > 100)
- goto out;
+// if (conf.prebuffer_arg < 0 || conf.prebuffer_arg > 100)
+// goto out;
if (conf.start_time_given) {
long unsigned sec, usec;
if (sscanf(conf.start_time_arg, "%lu:%lu",
&sec, &usec) != 2)
goto out;
- tv.tv_sec = sec;
- tv.tv_usec = usec;
- start_time = &tv;
+ idt.start_time.tv_sec = sec;
+ idt.start_time.tv_usec = usec;
}
if (!conf.writer_given) {
wng = setup_default_wng();
goto out;
}
wng = wng_new(conf.writer_given);
+ ret = -E_WRITE_SYNTAX;
for (i = 0; i < conf.writer_given; i++) {
- ret = check_writer_arg(conf.writer_arg[i]);
- if (ret < 0)
+ int writer_num;
+ wng->writer_nodes[i].conf = check_writer_arg(
+ conf.writer_arg[i], &writer_num);
+ if (!wng->writer_nodes[i].conf)
goto out;
- wng->writer_nodes[i].writer = &writers[ret];
+ wng->writer_nodes[i].writer = &writers[writer_num];
+ sprintf(wng->writer_nodes[i].task.status, "%s",
+ writer_names[writer_num]);
}
ret = 1;
out:
return NULL;
}
-/**
- * test if audio buffer contains a valid wave header
- *
- * \return If not, return 0, otherwise, store number of channels and sample rate
- * in struct conf and return WAV_HEADER_LEN.
- */
-static size_t check_wave(void)
+static void wng_event_handler(struct task *t)
{
- unsigned char *a = (unsigned char*)audiobuf;
- if (a[0] != 'R' || a[1] != 'I' || a[2] != 'F' || a[3] != 'F')
- return WAV_HEADER_LEN;
- conf.channels_arg = (unsigned) a[22];
- conf.sample_rate_arg = a[24] + (a[25] << 8) + (a[26] << 16) + (a[27] << 24);
- return 0;
+ struct writer_node_group *g = t->private_data;
+
+ PARA_INFO_LOG("%s\n", PARA_STRERROR(-t->ret));
+ unregister_task(t);
+ wng_close(g);
+ wng_destroy(g);
+}
+
+
+static void idt_event_handler(struct task *t)
+{
+ int ret;
+
+ PARA_INFO_LOG("%s\n", PARA_STRERROR(-t->ret));
+ unregister_task(t);
+ wng->buf = sit.buf;
+ wng->loaded = &sit.loaded;
+ wng->input_eof = &sit.eof;
+ wng->task.event_handler = wng_event_handler;
+ ret = wng_open(wng);
+ if (ret < 0) {
+ PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret));
+ exit(EXIT_FAILURE);
+ }
+}
+
+static void cwt_event_handler(struct task *t)
+{
+ if (t->ret != -E_NO_WAV_HEADER && t->ret != -E_WAV_HEADER_SUCCESS) {
+ PARA_ERROR_LOG("%s\n", PARA_STRERROR(-t->ret));
+ exit(EXIT_FAILURE);
+ }
+ PARA_INFO_LOG("%s\n", PARA_STRERROR(-t->ret));
+ unregister_task(t);
+// if (t->ret == -E_WAV_HEADER_SUCCESS) {
+// conf.channels_arg = cwt.channels;
+// conf.sample_rate_arg = cwt.sample_rate;
+// }
+ idt.task.pre_select = initial_delay_pre_select;
+ idt.task.private_data = &idt;
+ idt.task.event_handler = idt_event_handler;
+ sprintf(idt.task.status, "initial_delay");
+ register_task(&idt.task);
}
int main(int argc, char *argv[])
{
int ret = -E_WRITE_SYNTAX;
- struct writer_node_group *wng = NULL;
+ struct sched s;
cmdline_parser(argc, argv, &conf);
+ init_supported_writers();
+ init_sched();
+
wng = check_args();
if (!wng)
goto out;
- init_supported_writers();
- audiobuf = para_malloc(WAV_HEADER_LEN);
- ret = read_wav_header();
- if (ret < 0)
- goto out;
- ret = pcm_write(wng, check_wave());
+ stdin_set_defaults(&sit);
+ if (conf.bufsize_given)
+ sit.bufsize = conf.bufsize_arg;
+ sit.buf = para_malloc(sit.bufsize),
+ register_task(&sit.task);
+
+ cwt.task.pre_select = check_wav_pre_select;
+ cwt.task.private_data = &cwt;
+ cwt.task.event_handler = cwt_event_handler;
+ cwt.buf = sit.buf;
+ cwt.loaded = &sit.loaded;
+ cwt.eof = &sit.eof;
+ sprintf(cwt.task.status, "check wav");
+ register_task(&cwt.task);
+
+ s.default_timeout.tv_sec = 1;
+ s.default_timeout.tv_usec = 0;
+ ret = sched(&s);
+
out:
- wng_destroy(wng);
- free(audiobuf);
- if (ret < 0)
+ if (ret < 0) {
PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret));
+ ret = EXIT_FAILURE;
+ } else
+ ret = EXIT_SUCCESS;
return ret;
}
default="64"
optional
-option "prebuffer" p
-#~~~~~~~~~~~~~~~~~~~
-"delay playback until buffer is filled"
-
- int typestr="percent"
- default="100"
- optional
-
option "writer" w
#~~~~~~~~~~~~~~~~
string typestr="timeval"
optional
-
-
-section "alsa options"
-######################
-
-option "device" d
-#~~~~~~~~~~~~~~~~
-"set PCM device"
- string typestr="device"
- default="plughw:0,0"
- optional
-
-option "channels" c
-#~~~~~~~~~~~~~~~~~~
-"number of channels (only neccessary for raw
-audio)"
-
- int typestr="num"
- default="2"
- optional
-
-option "sample_rate" s
-#~~~~~~~~~~~~~~~~~~~~~
-
-"force given sample rate (only neccessary for
-raw audio)"
-
- int typestr="num"
- default="44100"
- optional
-
* decbribes one running instance of a writer
*/
struct writer_node {
-/** points to the writer structure associated with this node */
+ /** points to the writer structure associated with this node */
struct writer *writer;
-/** writer-specific data */
+ /** writer-specific data */
void *private_data;
-/** send that many bytes in one go */
+ /** send that many bytes in one go */
int chunk_bytes;
+ struct task task;
+ struct writer_node_group *wng;
+ /** the writer-specific configuration of this node */
+ void *conf;
};
/** describes one supported writer */
*
*/
void (*init)(struct writer *w);
+/**
+ *
+ *
+ * the command line parser of the writer
+ *
+ * It should check whether the command line options given by \a options are
+ * valid. On success, it should return a pointer to the writer-specific
+ * configuration data determined by \a options. Note that this might be called
+ * more than once with different values of \a options.
+ *
+ */
+ void * (*parse_config)(char *options);
/**
*
* open one instance of this writer
*
*/
int (*write)(char *data, size_t nbytes, struct writer_node *);
+void (*pre_select)(struct sched *s, struct task *t);
+void (*post_select)(struct sched *s, struct task *t);
/**
* close one instance of the writer
*
* describes a set of writer nodes that all write the same stream.
*/
struct writer_node_group {
-/** number of nodes belonging to this group */
-unsigned num_writers;
-/** array of pointers to the corresponding writer nodes */
-struct writer_node *writer_nodes;
-/** keeps track of how many bytes have been written by each node */
-int *written;
-/** the maximum of the chunk_bytes values of the writer nodes in this group */
-size_t max_chunk_bytes;
-/** non-zero if end of file was encountered */
-int eof;
+ /** number of nodes belonging to this group */
+ unsigned num_writers;
+ /** array of pointers to the corresponding writer nodes */
+ struct writer_node *writer_nodes;
+ /** keeps track of how many bytes have been written by each node */
+ int *written;
+ /** the maximum of the chunk_bytes values of the writer nodes in this group */
+ size_t max_chunk_bytes;
+ /** non-zero if end of file was encountered */
+ int *input_eof;
+ int eof;
+ char *buf;
+ size_t *loaded;
+ struct task task;
};
/** loop over each writer node in a writer group */
#include "para.h"
#include "string.h"
+#include "list.h"
+#include "sched.h"
#include "write.h"
#include "error.h"
const char *writer_names[] ={WRITER_NAMES};
struct writer writers[NUM_SUPPORTED_WRITERS] = {WRITER_ARRAY};
-int wng_write(struct writer_node_group *g, char *buf, size_t *loaded)
+static void wng_post_select(__a_unused struct sched *s, struct task *t)
{
- int ret, i, need_more_writes = 1;
+ struct writer_node_group *g = t->private_data;
+ int i;
size_t min_written = 0;
- while (need_more_writes) {
- need_more_writes = 0;
- FOR_EACH_WRITER_NODE(i, g) {
- size_t w = g->written[i];
- int bytes_to_write;
- struct writer_node *wn = &g->writer_nodes[i];
- if (!i)
- min_written = w;
- else
- min_written = PARA_MIN(min_written, w);
- if (w == *loaded)
- continue;
- if (!g->eof && (*loaded < wn->chunk_bytes + w))
- continue;
- bytes_to_write = PARA_MIN(wn->chunk_bytes,
- *loaded - w);
- ret = wn->writer->write(buf + w, bytes_to_write, wn);
- if (ret < 0)
- goto out;
- if (ret != bytes_to_write)
- PARA_WARNING_LOG("short write: %d/%d\n", ret,
- bytes_to_write);
- g->written[i] += ret;
- need_more_writes = 1;
- }
+ FOR_EACH_WRITER_NODE(i, g) {
+ struct writer_node *wn = &g->writer_nodes[i];
+ t->ret = wn->task.ret;
+ if (t->ret < 0)
+ return;
+ if (!i)
+ min_written = t->ret;
+ else
+ min_written = PARA_MIN(min_written, t->ret);
}
- *loaded -= min_written;
- ret = 0;
- if (g->eof)
- goto out;
- if (*loaded)
- memmove(buf, buf + min_written, *loaded);
- FOR_EACH_WRITER_NODE(i, g)
- g->written[i] -= min_written;
- ret = 1;
-out:
- return ret;
+ *g->loaded -= min_written;
+ if (!*g->loaded && *g->input_eof) {
+ g->eof = 1;
+ t->ret = -E_WNG_EOF;
+ } else
+ t->ret = 1;
+ if (*g->loaded && min_written)
+ memmove(g->buf, g->buf + min_written, *g->loaded);
}
int wng_open(struct writer_node_group *g)
{
int i, ret = 1;
+ PARA_NOTICE_LOG("opening wng with %d writer(s)\n", g->num_writers);
FOR_EACH_WRITER_NODE(i, g) {
struct writer_node *wn = &g->writer_nodes[i];
ret = wn->writer->open(wn);
goto out;
wn->chunk_bytes = ret;
g->max_chunk_bytes = PARA_MAX(g->max_chunk_bytes, ret);
+ wn->wng = g;
+ PARA_DEBUG_LOG("pre_select: %p\n", &wn->writer->pre_select);
+ PARA_DEBUG_LOG("post_select: %p\n", &wn->writer->post_select);
+ wn->task.pre_select = wn->writer->pre_select;
+ wn->task.post_select = wn->writer->post_select;
+ wn->task.private_data = wn;
+ register_task(&wn->task);
}
+ sprintf(g->task.status, "%s", "writer node group");
+ g->eof = 0;
+ register_task(&g->task);
out:
return ret;
}
+void wng_destroy(struct writer_node_group *g)
+{
+ if (!g)
+ return;
+ free(g->written);
+ free(g->writer_nodes);
+ free(g);
+}
+
void wng_close(struct writer_node_group *g)
{
int i;
+ PARA_NOTICE_LOG("closing wng with %d writer(s)\n", g->num_writers);
FOR_EACH_WRITER_NODE(i, g) {
struct writer_node *wn = &g->writer_nodes[i];
+ unregister_task(&wn->task);
wn->writer->close(wn);
}
}
g->writer_nodes = para_calloc(num_writers
* sizeof(struct writer_node));
g->written = para_calloc(num_writers * sizeof(size_t));
+ g->task.private_data = g;
+ g->task.post_select = wng_post_select;
+ g->task.flags = POST_ADD_TAIL;
return g;
}
-void wng_destroy(struct writer_node_group *g)
-{
- if (!g)
- return;
- free(g->written);
- free(g->writer_nodes);
- free(g);
-}
-
void init_supported_writers(void)
{
int i;
writers[i].init(&writers[i]);
}
-int check_writer_arg(const char *arg)
+void *check_writer_arg(char *wa, int *writer_num)
{
- int i, ret = -E_WRITE_COMMON_SYNTAX;
- char *a = para_strdup(arg), *p = strchr(a, ':');
- if (p)
- *p = '\0';
- p++;
+ int i;
+
+ *writer_num = -E_WRITE_COMMON_SYNTAX;
+ PARA_INFO_LOG("checking %s\n", wa);
FOR_EACH_WRITER(i) {
- if (strcmp(writer_names[i], a))
+ const char *name = writer_names[i];
+ size_t len = strlen(name);
+ char c;
+ if (strlen(wa) < len)
+ continue;
+ if (strncmp(name, wa, len))
continue;
- ret = i;
- goto out;
+ c = wa[len];
+ if (c && c != ' ')
+ continue;
+ if (c && !writers[i].parse_config)
+ return NULL;
+ *writer_num = i;
+ return writers[i].parse_config(c? wa + len + 1 : "");
}
-out:
- free(a);
- return ret;
+ PARA_ERROR_LOG("%s", "writer not found\n");
+ return NULL;
}
struct writer_node_group *setup_default_wng(void)
else
default_writer = 1;
wng->writer_nodes[0].writer = &writers[default_writer];
- PARA_INFO_LOG("using default writer: %s\n",
+ sprintf(wng->writer_nodes[0].task.status, "%s",
writer_names[default_writer]);
+ PARA_INFO_LOG("using default writer: %s %p\n",
+ writer_names[default_writer], writers[default_writer].parse_config);
+ wng->writer_nodes[0].conf = writers[default_writer].parse_config("");
return wng;
}
struct writer_node_group *wng_new(unsigned num_writers);
void wng_destroy(struct writer_node_group *g);
void init_supported_writers(void);
-int check_writer_arg(const char *arg);
+void *check_writer_arg(char *wa, int *writer_num);
struct writer_node_group *setup_default_wng(void);