]> git.tuebingen.mpg.de Git - paraslash.git/commitdiff
Merge branch 'sched'
authorAndre <maan@p133.(none)>
Mon, 29 May 2006 02:18:16 +0000 (04:18 +0200)
committerAndre <maan@p133.(none)>
Mon, 29 May 2006 02:18:16 +0000 (04:18 +0200)
38 files changed:
Makefile.in
aac_afh.c
aacdec.c
alsa_write.ggo [new file with mode: 0644]
alsa_writer.c
audiod.c
audiod.ggo
compress.c
configure.ac
dccp_recv.c
error.h
file_write.ggo [new file with mode: 0644]
file_writer.c
filter.c
filter.h
filter_chain.c
grab_client.c
http_recv.c
list.h
mp3dec.c
oggdec.c
ortp_recv.c
recv.c
recv.h
recv_common.c
sched.c [new file with mode: 0644]
sched.h [new file with mode: 0644]
server.c
stdin.c [new file with mode: 0644]
stdin.h [new file with mode: 0644]
stdout.c [new file with mode: 0644]
stdout.h [new file with mode: 0644]
wav.c
write.c
write.ggo
write.h
write_common.c
write_common.h

index c9eb46fd9f98578e93f1882fed74a0e0788bb70c..122103965e21f02b51c70337f0c69d2f1b70e08b 100644 (file)
@@ -133,6 +133,12 @@ grab_client.cmdline.h grab_client.cmdline.c: grab_client.ggo
                --arg-struct-name=$(subst .ggo,,$<)_args_info \
                --file-name=$(subst .ggo,,$<).cmdline \
                --func-name $(subst _filter.ggo,,$<)_cmdline_parser < $<
+%_write.cmdline.h %_write.cmdline.c: %_write.ggo
+       gengetopt -S $(module_ggo_opts) \
+               --set-package=$(subst .ggo,,$<) \
+               --arg-struct-name=$(subst .ggo,,$<)_args_info \
+               --file-name=$(subst .ggo,,$<).cmdline \
+               --func-name $(subst _write.ggo,,$<)_cmdline_parser < $<
 
 %.cmdline.h %.cmdline.c: %.ggo
        case $< in client.ggo) O="--unamed-opts=command";; \
index fdb37a7e0f3d8c89ac1ee6992aad2cfeb920f763..e6f99b4f51958b4ddced2d67d997ea2f2926f2ad 100644 (file)
--- a/aac_afh.c
+++ b/aac_afh.c
@@ -192,12 +192,11 @@ static int aac_get_file_info(FILE *file, char *info_str, long unsigned *frames,
 }
 
 /*
- * Simple stream reposition routine
+ * nothing to do as we'll seek to the correct offset in aac read_chunk() anyway
  */
-static int aac_reposition_stream(long unsigned request)
+static int aac_reposition_stream(__a_unused long unsigned request)
 {
        return 1;
-//     return -E_AAC_REPOS;
 }
 
 static char *aac_read_chunk(long unsigned current_chunk, ssize_t *len)
index c36f6877c901c2a6e9bf9fdc41284147c627045d..fe046b705e3d632304d9ba5ac4f5c6e77713f8e7 100644 (file)
--- a/aacdec.c
+++ b/aacdec.c
@@ -25,6 +25,7 @@
 #include "para.h"
 
 #include "list.h"
+#include "sched.h"
 #include "filter.h"
 #include "error.h"
 #include "string.h"
@@ -52,7 +53,7 @@ struct private_aacdec_data {
 static ssize_t aacdec(char *input_buffer, size_t len, struct filter_node *fn)
 {
        struct private_aacdec_data *padd = fn->private_data;
-       struct filter_chain_info *fci = fn->fci;
+       struct filter_chain *fc = fn->fc;
        int i, ret;
        unsigned char *p, *outbuffer;
        unsigned char *inbuf = (unsigned char*)input_buffer;
@@ -60,7 +61,7 @@ static ssize_t aacdec(char *input_buffer, size_t len, struct filter_node *fn)
 
        if (fn->loaded > fn->bufsize * 4 / 5)
                return 0;
-       if (len < 1000 && !*fci->eof)
+       if (len < 1000 && !*fc->input_eof)
                return 0;
 
        if (!padd->initialized) {
@@ -86,10 +87,10 @@ static ssize_t aacdec(char *input_buffer, size_t len, struct filter_node *fn)
                                        &channels) < 0)
                                goto out;
                }
-               fci->samplerate = rate;
-               fci->channels = channels;
+               fc->samplerate = rate;
+               fc->channels = channels;
                PARA_INFO_LOG("rate: %u, channels: %d\n",
-                       fci->samplerate, fci->channels);
+                       fc->samplerate, fc->channels);
                padd->initialized = 1;
        }
        if (padd->decoder_length > 0) {
diff --git a/alsa_write.ggo b/alsa_write.ggo
new file mode 100644 (file)
index 0000000..4863a53
--- /dev/null
@@ -0,0 +1,28 @@
+section "alsa options"
+######################
+
+option "device" d
+#~~~~~~~~~~~~~~~~
+"set PCM device"
+       string typestr="device"
+       default="plughw:0,0"
+       optional
+
+option "channels" c
+#~~~~~~~~~~~~~~~~~~
+"number of channels (only neccessary for raw
+audio)"
+
+       int typestr="num"
+       default="2"
+       optional
+
+option "sample_rate" s
+#~~~~~~~~~~~~~~~~~~~~~
+
+"force given sample rate (only neccessary for
+raw audio)"
+
+       int typestr="num"
+       default="44100"
+       optional
index 5aa908518b113305372d126ead651ec961243046..7c6e8ca73b2c3ee638aa6f7917da556b69fa4e8b 100644 (file)
 #include "para.h"
 #include "fd.h"
 #include "string.h"
+#include "list.h"
+#include "sched.h"
 #include "write.h"
 
 #include <alsa/asoundlib.h>
 
-#include "write.cmdline.h"
+#include "alsa_write.cmdline.h"
 #include "error.h"
 
-extern struct gengetopt_args_info conf;
 
 #define FORMAT SND_PCM_FORMAT_S16_LE
 
 /** data specific to the alsa writer */
 struct private_alsa_data {
-/** the alsa handle */
-snd_pcm_t *handle;
-/** determined and set by alsa_open() */
-size_t bytes_per_frame;
+       /** the alsa handle */
+       snd_pcm_t *handle;
+       /** determined and set by alsa_open() */
+       size_t bytes_per_frame;
+       /** don't write anything until this time */
+       struct timeval next_chunk;
+       /** the return value of snd_pcm_hw_params_get_buffer_time_max() */
+       unsigned buffer_time;
 };
 
 /*
@@ -57,18 +62,15 @@ static int alsa_open(struct writer_node *w)
        snd_pcm_sw_params_t *swparams;
        snd_pcm_uframes_t buffer_size, xfer_align, start_threshold,
                stop_threshold;
-       unsigned buffer_time = 0;
        int err;
        snd_pcm_info_t *info;
-       snd_output_t *log;
        snd_pcm_uframes_t period_size;
-       struct private_alsa_data *pad = para_malloc(sizeof(struct private_alsa_data));
-       w->private_data = pad;
+       struct private_alsa_data *pad = para_calloc(sizeof(struct private_alsa_data));
+       struct alsa_write_args_info *conf = w->conf;
 
+       w->private_data = pad;
        snd_pcm_info_alloca(&info);
-       if (snd_output_stdio_attach(&log, stderr, 0) < 0)
-               return -E_ALSA_LOG;
-       err = snd_pcm_open(&pad->handle, conf.device_arg,
+       err = snd_pcm_open(&pad->handle, conf->device_arg,
                SND_PCM_STREAM_PLAYBACK, 0);
        if (err < 0)
                return -E_PCM_OPEN;
@@ -85,23 +87,23 @@ static int alsa_open(struct writer_node *w)
        if (snd_pcm_hw_params_set_format(pad->handle, hwparams, FORMAT) < 0)
                return -E_SAMPLE_FORMAT;
        if (snd_pcm_hw_params_set_channels(pad->handle, hwparams,
-                       conf.channels_arg) < 0)
+                       conf->channels_arg) < 0)
                return -E_CHANNEL_COUNT;
        if (snd_pcm_hw_params_set_rate_near(pad->handle, hwparams,
-                       (unsigned int*) &conf.sample_rate_arg, 0) < 0)
+                       (unsigned int*) &conf->sample_rate_arg, 0) < 0)
                return -E_SET_RATE;
-       err = snd_pcm_hw_params_get_buffer_time_max(hwparams, &buffer_time, 0);
-       if (err < 0 || !buffer_time)
+       err = snd_pcm_hw_params_get_buffer_time_max(hwparams, &pad->buffer_time, 0);
+       if (err < 0 || !pad->buffer_time)
                return -E_GET_BUFFER_TIME;
-       PARA_DEBUG_LOG("buffer time: %d\n", buffer_time);
+       PARA_INFO_LOG("buffer time: %d\n", pad->buffer_time);
        if (snd_pcm_hw_params_set_buffer_time_near(pad->handle, hwparams,
-                       &buffer_time, 0) < 0)
+                       &pad->buffer_time, 0) < 0)
                return -E_SET_BUFFER_TIME;
        if (snd_pcm_hw_params(pad->handle, hwparams) < 0)
                return -E_HW_PARAMS;
        snd_pcm_hw_params_get_period_size(hwparams, &period_size, 0);
        snd_pcm_hw_params_get_buffer_size(hwparams, &buffer_size);
-       PARA_DEBUG_LOG("buffer size: %lu, period_size: %lu\n", buffer_size,
+       PARA_INFO_LOG("buffer size: %lu, period_size: %lu\n", buffer_size,
                period_size);
        if (period_size == buffer_size)
                return -E_BAD_PERIOD;
@@ -127,44 +129,75 @@ static int alsa_open(struct writer_node *w)
        if (snd_pcm_sw_params(pad->handle, swparams) < 0)
                return -E_SW_PARAMS;
        pad->bytes_per_frame = snd_pcm_format_physical_width(FORMAT)
-               * conf.channels_arg / 8;
-//     if (snd_pcm_nonblock(pad->handle, 1))
-//             PARA_ERROR_LOG("%s\n", "failed to set nonblock mode");
+               * conf->channels_arg / 8;
+       if (snd_pcm_nonblock(pad->handle, 1))
+               PARA_ERROR_LOG("%s\n", "failed to set nonblock mode");
        return period_size * pad->bytes_per_frame;
 }
+static void alsa_write_pre_select(struct sched *s, struct task *t)
+{
+       struct writer_node *wn = t->private_data;
+       struct private_alsa_data *pad = wn->private_data;
+       struct writer_node_group *wng = wn->wng;
+       struct timeval diff;
 
-/**
- * push out pcm frames
- * \param data pointer do data to be written
- * \param nbytes number of bytes (not frames)
- *
- * \return Number of bytes written, -E_ALSA_WRITE on errors.
- */
-static int alsa_write(char *data, size_t nbytes, struct writer_node *wn)
+       t->ret = 0;
+       if (*wng->input_eof && *wng->loaded < pad->bytes_per_frame)
+               return;
+       t->ret = 1;
+       if (*wng->loaded < pad->bytes_per_frame)
+               return;
+       if (tv_diff(&s->now, &pad->next_chunk, &diff) < 0) {
+               if (tv_diff(&s->timeout, &diff, NULL) > 0)
+                       s->timeout = diff;
+       } else {
+               s->timeout.tv_sec = 0;
+               s->timeout.tv_usec = 0;
+       }
+}
+
+static void alsa_write_post_select(struct sched *s, struct task *t)
 {
+       struct writer_node *wn = t->private_data;
        struct private_alsa_data *pad = wn->private_data;
-       size_t frames = nbytes / pad->bytes_per_frame;
-       unsigned char *d = (unsigned char*)data;
-       snd_pcm_sframes_t r, result = 0;
-
-       while (frames > 0) {
-               /* write interleaved frames */
-               r = snd_pcm_writei(pad->handle, d, frames);
-               if (r < 0)
-                       PARA_ERROR_LOG("write error: %s\n", snd_strerror(r));
-               if (r == -EAGAIN || (r >= 0 && r < frames))
-                       snd_pcm_wait(pad->handle, 1);
-               else if (r == -EPIPE)
+       struct writer_node_group *wng = wn->wng;
+       size_t frames = *wng->loaded / pad->bytes_per_frame;
+       snd_pcm_sframes_t ret, result = 0;
+       unsigned char *data = (unsigned char*)wng->buf;
+
+       t->ret = 0;
+       if (!frames) {
+               if (*wng->input_eof)
+                       t->ret = *wng->loaded;
+               return;
+       }
+       if (tv_diff(&s->now, &pad->next_chunk, NULL) < 0)
+               return;
+//     PARA_INFO_LOG("%zd frames\n", frames);
+//     while (frames > 0) {
+               ret = snd_pcm_writei(pad->handle, data, frames);
+               if (ret == -EAGAIN || (ret >= 0 && ret < frames)) {
+                       struct timeval tv;
+                       ms2tv(pad->buffer_time / 2000, &tv);
+                       PARA_DEBUG_LOG("EAGAIN. frames: %d, ret: %lu\n", frames, ret);
+                       tv_add(&s->now, &tv, &pad->next_chunk);
+               } else if (ret == -EPIPE) {
+                       PARA_WARNING_LOG("%s", "EPIPE\n");
                        snd_pcm_prepare(pad->handle);
-               else if (r < 0)
-                       return -E_ALSA_WRITE;
-               if (r > 0) {
-                       result += r;
-                       frames -= r;
-                       d += r * pad->bytes_per_frame;
+               } else if (ret < 0) {
+                       t->ret = -E_ALSA_WRITE;
+                       return;
                }
-       }
-       return result * pad->bytes_per_frame;
+               if (ret >= 0) {
+                       result += ret;
+                       frames -= ret;
+                       data += ret * pad->bytes_per_frame;
+               }
+//             if (ret == -EAGAIN)
+//                     break;
+//     }
+       t->ret = result * pad->bytes_per_frame;
+//     PARA_INFO_LOG("ret: %d, frames: %zd\n", t->ret, frames);
 }
 
 static void alsa_close(struct writer_node *wn)
@@ -176,11 +209,28 @@ static void alsa_close(struct writer_node *wn)
        free(pad);
 }
 
+__malloc void *alsa_parse_config(char *options)
+{
+       struct alsa_write_args_info *conf
+               = para_calloc(sizeof(struct alsa_write_args_info));
+       PARA_INFO_LOG("options: %s, %d\n", options, strcspn(options, " \t"));
+       int ret = alsa_cmdline_parser_string(options, conf, "alsa_write");
+       if (ret)
+               goto err_out;
+       PARA_INFO_LOG("help given: %d\n", conf->help_given);
+       return conf;
+err_out:
+       free(conf);
+       return NULL;
+}
+
 /** the init function of the alsa writer */
 void alsa_writer_init(struct writer *w)
 {
        w->open = alsa_open;
-       w->write = alsa_write;
        w->close = alsa_close;
+       w->pre_select = alsa_write_pre_select;
+       w->post_select = alsa_write_post_select;
+       w->parse_config = alsa_parse_config;
        w->shutdown = NULL; /* nothing to do */
 }
index e1551617b5af0aa827322be36105836099add3e7..d635fbfc334316b9dd78ab2b67e266dd757c828c 100644 (file)
--- a/audiod.c
+++ b/audiod.c
 
 /** \file audiod.c the paraslash's audio daemon */
 
-#include <sys/time.h> /* gettimeofday */
 #include "para.h"
 
 #include "audiod.cmdline.h"
 #include "list.h"
 #include "close_on_fork.h"
+#include "sched.h"
 #include "recv.h"
 #include "filter.h"
 #include "grab_client.cmdline.h"
@@ -35,6 +35,8 @@
 #include "daemon.h"
 #include "string.h"
 #include "fd.h"
+#include "write.h"
+#include "write_common.h"
 
 /** define the array of error lists needed by para_audiod */
 INIT_AUDIOD_ERRLISTS;
@@ -52,92 +54,125 @@ enum {AUDIOD_OFF, AUDIOD_ON, AUDIOD_STANDBY};
 
 /** defines how to handle one supported audio format */
 struct audio_format_info {
-/** pointer to the receiver for this audio format */
+       /** pointer to the receiver for this audio format */
        struct receiver *receiver;
-/** the receiver configuration */
+       /** the receiver configuration */
        void *receiver_conf;
-/** the number of filters that should be activated for this audio format */
+       /** the number of filters that should be activated for this audio format */
        unsigned int num_filters;
-/** pointer to the array of filters to be activated */
+       /** pointer to the array of filters to be activated */
        struct filter **filters;
-/** pointer to the array of filter configurations */
+       /** pointer to the array of filter configurations */
        void **filter_conf;
-/** output of the last filter is written to stdin of this command */
-       char *write_cmd;
-/** do not start receiver/filters/writer before this time */
+       /** the number of filters that should be activated for this audio format */
+       unsigned int num_writers;
+       /** pointer to the array of writers to be activated */
+       struct writer **writers;
+       /** pointer to the array of writer configurations */
+       void **writer_conf;
+       /** do not start receiver/filters/writer before this time */
        struct timeval restart_barrier;
 };
 
 /**
  * describes one instance of a receiver-filter-writer chain
  *
- * \sa receier_node, receiver, filter, filter_node, filter_chain_info
+ * \sa receier_node, receiver, filter, filter_node, filter_chain, writer,
+ * writer_node, writer_node_group.
   */
 struct slot_info {
-/** number of the audio format in this slot */
+       /** number of the audio format in this slot */
        int format;
-/** the file descriptor of the writer */
-       int write_fd;
-/** the process id of the writer */
-       pid_t wpid;
-/** time of the last successful read from the receiver */
-       struct timeval rtime;
-/** time the last write to the write fd happend */
-       struct timeval wtime;
-/** writer start time */
+       /** writer start time */
        struct timeval wstime;
-/** did we include \a write_fd in the fdset */
-       int  wcheck;
-/** set to one if we have sent the TERM signal to \a wpid */
-       int wkilled;
-/** the receiver info associated with this slot */
+       /** the receiver info associated with this slot */
        struct receiver_node *receiver_node;
-/** the active filter chain */
-       struct filter_chain_info *fci;
+       /** the active filter chain */
+       struct filter_chain *fc;
+       /** the active writer node group */
+       struct writer_node_group *wng;
 };
-
 static struct slot_info slot[MAX_STREAM_SLOTS];
 
-/** defines one command of para_audiod */
-struct audiod_command {
-/** the name of the command */
-const char *name;
-/** pointer to the function that handles the command */
-int (*handler)(int, int, char**);
-int (*line_handler)(int, char*);
-/** one-line description of the command */
-const char *description;
-/** summary of the command line options */
-const char *synopsis;
-/** the long help text */
-const char *help;
-};
-
 extern const char *status_item_list[NUM_STAT_ITEMS];
 
-static int com_grab(int, char *);
-static int com_cycle(int, int, char **);
-static int com_help(int, int, char **);
-static int com_off(int, int, char **);
-static int com_on(int, int, char **);
-static int com_sb(int, int, char **);
-static int com_stat(int, int, char **);
-static int com_term(int, int, char **);
-static int stat_pipe = -1, signal_pipe;
-
 static struct gengetopt_args_info conf;
 static struct timeval server_stream_start, sa_time_diff;
 static int playing, current_decoder = -1,
        audiod_status = AUDIOD_ON, offset_seconds, length_seconds,
-       sa_time_diff_sign = 1, audiod_socket = -1;
+       sa_time_diff_sign = 1;
 static char *af_status, /* the audio format announced in server status */
        *socket_name, *hostname;
 static char *stat_item_values[NUM_STAT_ITEMS];
 static FILE *logfile;
 static const struct timeval restart_delay = {0, 300 * 1000};
-
 static struct audio_format_info afi[NUM_AUDIO_FORMATS];
+static struct timeval *now;
+
+static struct signal_task signal_task_struct, *sig_task = &signal_task_struct;
+
+/**
+ * the task for handling audiod commands
+ *
+ * \sa struct task, struct sched
+ */
+struct command_task {
+       /** the local listening socket */
+       int fd;
+       /** the associated task structure */
+       struct task task;
+};
+
+/**
+ * the task for audiod's child (para_client stat)
+ *
+ * \sa struct task, struct sched
+ */
+struct status_task {
+       /** the output of the stat command is read from this fd */
+       int fd;
+       /** stat data is stored here */
+       char buf[STRINGSIZE];
+       /** number of bytes loaded in \a buf */
+       unsigned loaded;
+       /** the associated task structure */
+       struct task task;
+};
+static struct status_task status_task_struct, *stat_task = &status_task_struct;
+
+struct signal_task {
+       int fd;
+       int signum;
+       struct task task;
+};
 
+/** defines one command of para_audiod */
+struct audiod_command {
+       /** the name of the command */
+       const char *name;
+       /** pointer to the function that handles the command */
+       int (*handler)(int, int, char**);
+       /**
+        * if the command prefers to handle the full line (rather than the usual
+        * argv[] array), it stores a pointer to the corresponding line handling
+        * function here. In this case, the above \a handler pointer must be NULL.
+        */
+       int (*line_handler)(int, char*);
+       /** one-line description of the command */
+       const char *description;
+       /** summary of the command line options */
+       const char *synopsis;
+       /** the long help text */
+       const char *help;
+};
+static int com_grab(int, char *);
+static int com_cycle(int, int, char **);
+static int com_help(int, int, char **);
+static int com_off(int, int, char **);
+static int com_on(int, int, char **);
+static int com_sb(int, int, char **);
+static int com_stat(int, int, char **);
+static int com_term(int, int, char **);
 static struct audiod_command cmds[] = {
 {
 .name = "cycle",
@@ -234,7 +269,7 @@ static struct audiod_command cmds[] = {
 };
 
 /** iterate over all slots */
-#define FOR_EACH_SLOT(slot) for (slot = 0; slot < MAX_STREAM_SLOTS; slot++)
+#define FOR_EACH_SLOT(_slot) for (_slot = 0; _slot < MAX_STREAM_SLOTS; _slot++)
 /** iterate over all supported audio formats */
 #define FOR_EACH_AUDIO_FORMAT(af) for (af = 0; af < NUM_AUDIO_FORMATS; af++)
 /** iterate over the array of all audiod commands */
@@ -289,7 +324,7 @@ static int client_write(int fd, const char *buf)
 
 static char *get_time_string(struct timeval *newest_stime)
 {
-       struct timeval now, diff, adj_stream_start, tmp;
+       struct timeval diff, adj_stream_start, tmp;
        int total = 0, use_server_time = 1;
 
        if (!playing) {
@@ -313,8 +348,7 @@ static char *get_time_string(struct timeval *newest_stime)
                        use_server_time = 0;
                }
        }
-       gettimeofday(&now, NULL);
-       tv_diff(&now, &tmp, &diff);
+       tv_diff(now, &tmp, &diff);
        total = diff.tv_sec + offset_seconds;
        if (total > length_seconds)
                total = length_seconds;
@@ -349,7 +383,7 @@ static struct timeval *wstime(void)
        struct timeval *max = NULL;
        FOR_EACH_SLOT(i) {
                struct slot_info *s = &slot[i];
-               if (s->wpid <= 0)
+               if (!s->wng)
                        continue;
                if (max && tv_diff(&s->wstime, max, NULL) <= 0)
                        continue;
@@ -367,7 +401,7 @@ __malloc static char *decoder_flags(void)
                char flag = '0';
                if (s->receiver_node)
                        flag += 1;
-               if (s->wpid > 0)
+               if (s->wng)
                        flag += 2;
                decoder_flags[i] = flag;
        }
@@ -390,8 +424,8 @@ static char *configfile_exists(void)
 
 static void setup_signal_handling(void)
 {
-       signal_pipe = para_signal_init();
-       PARA_INFO_LOG("signal pipe: fd %d\n", signal_pipe);
+       sig_task->fd = para_signal_init();
+       PARA_INFO_LOG("signal pipe: fd %d\n", sig_task->fd);
        para_install_sighandler(SIGINT);
        para_install_sighandler(SIGTERM);
        para_install_sighandler(SIGCHLD);
@@ -440,30 +474,6 @@ static void clear_slot(int slot_num)
        s->format = -1;
 }
 
-static void kill_stream_writer(int slot_num)
-{
-       struct slot_info *s = &slot[slot_num];
-
-       if (s->format < 0 || s->wkilled || s->wpid <= 0)
-               return;
-       PARA_DEBUG_LOG("kill -TERM %d (%s stream writer in slot %d)\n",
-               s->wpid, audio_formats[s->format], slot_num);
-       kill(s->wpid, SIGTERM);
-       s->wkilled = 1;
-       s->fci->error = 1;
-}
-
-static void set_restart_barrier(int format, struct timeval *now)
-{
-       struct timeval tmp;
-
-       if (now)
-               tmp = *now;
-       else
-               gettimeofday(&tmp, NULL);
-       tv_add(&tmp, &restart_delay, &afi[format].restart_barrier);
-}
-
 static void close_receiver(int slot_num)
 {
        struct slot_info *s = &slot[slot_num];
@@ -472,53 +482,26 @@ static void close_receiver(int slot_num)
        if (s->format < 0 || !s->receiver_node)
                return;
        a = &afi[s->format];
-       PARA_NOTICE_LOG("closing %s recevier in slot %d\n",
-               audio_formats[s->format] , slot_num);
+       PARA_NOTICE_LOG("closing %s receiver in slot %d (eof = %d)\n",
+               audio_formats[s->format] , slot_num, s->receiver_node->eof);
+       if (!s->receiver_node->eof)
+               unregister_task(&s->receiver_node->task);
        a->receiver->close(s->receiver_node);
        free(s->receiver_node);
        s->receiver_node = NULL;
-       set_restart_barrier(s->format, NULL);
+       /* set restart barrier */
+       tv_add(now, &restart_delay, &afi[s->format].restart_barrier);
 }
 
 static void kill_all_decoders(void)
 {
        int i;
 
-       FOR_EACH_SLOT(i)
-               if (slot[i].format >= 0) {
-                       PARA_INFO_LOG("stopping decoder in slot %d\n", i);
-                       kill_stream_writer(i);
-               }
-}
-
-static void check_sigchld(void)
-{
-       pid_t pid;
-       int i;
-       struct timeval now;
-       gettimeofday(&now, NULL);
-
-reap_next_child:
-       pid = para_reap_child();
-       if (pid <= 0)
-               return;
        FOR_EACH_SLOT(i) {
                struct slot_info *s = &slot[i];
-               long lifetime;
-               if (s->format < 0)
-                       continue;
-               if (pid == s->wpid) {
-                       s->wpid = -1;
-                       lifetime = now.tv_sec - s->wstime.tv_sec;
-                       PARA_INFO_LOG("%s stream writer in slot %d died "
-                               "after %li secs\n",
-                               audio_formats[s->format], i, lifetime);
-                       set_restart_barrier(s->format, &now);
-                       goto reap_next_child;
-               }
+               if (s->receiver_node)
+                       s->receiver_node->eof = 1;
        }
-       PARA_CRIT_LOG("para_client died (pid %d)\n", pid);
-       goto reap_next_child;
 }
 
 static int get_empty_slot(void)
@@ -532,11 +515,7 @@ static int get_empty_slot(void)
                        clear_slot(i);
                        return i;
                }
-               if (s->write_fd > 0 || s->wpid > 0)
-                       continue;
-               if (s->receiver_node)
-                       continue;
-               if (s->fci)
+               if (s->wng || s->receiver_node || s->fc)
                        continue;
                clear_slot(i);
                return i;
@@ -553,7 +532,7 @@ static int decoder_running(int format)
                s = &slot[i];
                if (s->format == format && s->receiver_node)
                        ret |= 1;
-               if (s->format == format && s->wpid > 0)
+               if (s->format == format && s->wng)
                        ret |= 2;
        }
        return ret;
@@ -563,12 +542,12 @@ static void close_stat_pipe(void)
 {
        int i;
 
-       if (stat_pipe < 0)
+       if (stat_task->fd < 0)
                return;
        PARA_NOTICE_LOG("%s", "closing status pipe\n");
-       close(stat_pipe);
-       del_close_on_fork_list(stat_pipe);
-       stat_pipe = -1;
+       close(stat_task->fd);
+       del_close_on_fork_list(stat_task->fd);
+       stat_task->fd = -1;
        kill_all_decoders();
        for (i = 0; i < NUM_STAT_ITEMS; i++) {
                free(stat_item_values[i]);
@@ -579,7 +558,8 @@ static void close_stat_pipe(void)
        offset_seconds = 0;
        audiod_status_dump();
        playing = 0;
-       stat_item_values[SI_STATUS_BAR] = make_message("%s:no connection to para_server\n",
+       stat_item_values[SI_STATUS_BAR] = make_message(
+               "%s:no connection to para_server\n",
                status_item_list[SI_STATUS_BAR]);
        stat_client_write(stat_item_values[SI_STATUS_BAR], SI_STATUS_BAR);
 }
@@ -587,43 +567,31 @@ static void close_stat_pipe(void)
 static void __noreturn clean_exit(int status, const char *msg)
 {
        PARA_EMERG_LOG("%s\n", msg);
-       kill_all_decoders();
        if (socket_name)
                unlink(socket_name);
-       if (stat_pipe >= 0)
+       if (stat_task->fd >= 0)
                close_stat_pipe();
        exit(status);
 }
 
-__malloc static char *glob_cmd(char *cmd)
+/**
+ * get the number of filters
+ *
+ * \param audio_format_num the number identifying the audio format
+ *
+ * \return the number of filters for the given audio format
+ *
+ * \sa struct filter;
+ */
+int num_filters(int audio_format_num)
 {
-       char *ret, *replacement;
-       struct timeval tmp, delay, rss; /* real stream start */
-
-       delay.tv_sec = conf.stream_delay_arg / 1000;
-       delay.tv_usec = (conf.stream_delay_arg % 1000) * 1000;
-//     PARA_INFO_LOG("delay: %lu:%lu\n", delay.tv_sec, delay.tv_usec);
-       if (sa_time_diff_sign < 0)
-               tv_add(&server_stream_start, &sa_time_diff, &rss);
-       else
-               tv_diff(&server_stream_start, &sa_time_diff, &rss);
-       tv_add(&rss, &delay, &tmp);
-       replacement = make_message("%lu:%lu",
-               (long unsigned)tmp.tv_sec,
-               (long unsigned)tmp.tv_usec);
-       ret = s_a_r(cmd, "STREAM_START", replacement);
-       free(replacement);
-       if (!ret)
-               goto out;
-       PARA_INFO_LOG("cmd: %s, repl: %s\n", cmd, ret);
-out:
-       return ret;
+       return afi[audio_format_num].num_filters;
 }
 
-/** get the number of filters for the given audio format */
-int num_filters(int audio_format_num)
+static void filter_event_handler(struct task *t)
 {
-       return afi[audio_format_num].num_filters;
+       PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret));
+       unregister_task(t);
 }
 
 static void open_filters(int slot_num)
@@ -633,31 +601,37 @@ static void open_filters(int slot_num)
        int nf = a->num_filters;
        int i;
 
-       s->fci = para_calloc(sizeof(struct filter_chain_info));
-       INIT_LIST_HEAD(&s->fci->filters);
+       s->fc = NULL;
        if (!nf)
                return;
-       s->fci->inbuf = s->receiver_node->buf;
-       s->fci->in_loaded = &s->receiver_node->loaded;
-       s->fci->outbuf = s->receiver_node->buf;
-       s->fci->out_loaded = &s->receiver_node->loaded;
-       s->fci->eof = &s->receiver_node->eof;
+       PARA_INFO_LOG("opening %s filters\n", audio_formats[s->format]);
+       s->fc = para_calloc(sizeof(struct filter_chain));
+       INIT_LIST_HEAD(&s->fc->filters);
+       s->fc->inbuf = s->receiver_node->buf;
+       s->fc->in_loaded = &s->receiver_node->loaded;
+       s->fc->input_eof = &s->receiver_node->eof;
+
+       s->fc->task.pre_select = filter_pre_select;
+       s->fc->task.event_handler = filter_event_handler;
+       s->fc->task.private_data = s->fc;
+       s->fc->task.flags = 0;
+       s->fc->eof = 0;
+       sprintf(s->fc->task.status, "filter chain");
        for (i = 0; i < nf; i++) {
                struct filter_node *fn = para_calloc(sizeof(struct filter_node));
                fn->conf = a->filter_conf[i];
-               fn->fci = s->fci;
+               fn->fc = s->fc;
                fn->filter = a->filters[i];
                INIT_LIST_HEAD(&fn->callbacks);
-               list_add_tail(&fn->node, &s->fci->filters);
+               list_add_tail(&fn->node, &s->fc->filters);
                fn->filter->open(fn);
                PARA_NOTICE_LOG("%s filter %d/%d (%s) started in slot %d\n",
                        audio_formats[s->format], i + 1,  nf,
                        fn->filter->name, slot_num);
-               s->fci->outbuf = fn->buf;
-               s->fci->out_loaded = &fn->loaded;
+               s->fc->outbuf = fn->buf;
+               s->fc->out_loaded = &fn->loaded;
        }
-       PARA_DEBUG_LOG("output buffer for filter chain %p: %p\n", s->fci,
-               s->fci->outbuf);
+       register_task(&s->fc->task);
 }
 
 static struct filter_node *find_filter_node(int slot_num, int format, int filternum)
@@ -667,7 +641,7 @@ static struct filter_node *find_filter_node(int slot_num, int format, int filter
 
        FOR_EACH_SLOT(i) {
                struct slot_info *s = &slot[i];
-               if (s->format < 0 || !s->fci)
+               if (s->format < 0 || !s->fc)
                        continue;
                if (slot_num >= 0 && slot_num != i)
                        continue;
@@ -677,7 +651,7 @@ static struct filter_node *find_filter_node(int slot_num, int format, int filter
                        continue;
                /* success */
                j = 1;
-               list_for_each_entry(fn, &s->fci->filters, node)
+               list_for_each_entry(fn, &s->fc->filters, node)
                        if (filternum <= 0 || j++ == filternum)
                                break;
                return fn;
@@ -685,32 +659,50 @@ static struct filter_node *find_filter_node(int slot_num, int format, int filter
        return NULL;
 }
 
-static void start_stream_writer(int slot_num)
+static void wng_event_handler(struct task *t)
 {
-       int ret, fds[3] = {1, -1, -1};
+       PARA_INFO_LOG("%s\n", PARA_STRERROR(-t->ret));
+       unregister_task(t);
+}
+
+static void open_writers(int slot_num)
+{
+       int ret, i;
        struct slot_info *s = &slot[slot_num];
        struct audio_format_info *a = &afi[s->format];
-       char *glob = NULL;
-
-       if (a->write_cmd)
-               glob = glob_cmd(a->write_cmd);
-       if (!glob)
-               glob = para_strdup("para_write -w alsa");
-       PARA_INFO_LOG("starting stream writer: %s\n", glob);
-       open_filters(slot_num);
-       ret = para_exec_cmdline_pid(&s->wpid, glob, fds);
-       free(glob);
-       if (ret < 0) {
-               PARA_ERROR_LOG("exec failed (%d)\n", ret);
-               return;
+
+       PARA_INFO_LOG("opening %s writers\n", audio_formats[s->format]);
+       if (!a->num_writers)
+               s->wng = setup_default_wng();
+       else
+               s->wng = wng_new(a->num_writers);
+       if (s->fc) {
+               s->wng->buf = s->fc->outbuf;
+               s->wng->loaded = s->fc->out_loaded;
+               s->wng->input_eof = &s->fc->eof;
+               s->fc->output_eof = &s->wng->eof;
+       } else {
+               s->wng->buf = s->receiver_node->buf;
+               s->wng->loaded = &s->receiver_node->loaded;
+               s->wng->input_eof = &s->receiver_node->eof;
+       }
+       s->wng->task.event_handler = wng_event_handler;
+       for (i = 0; i < a->num_writers; i++) {
+               s->wng->writer_nodes[i].conf = a->writer_conf[i];
+               s->wng->writer_nodes[i].writer = a->writers[i];
+               sprintf(s->wng->writer_nodes[i].task.status, "writer_node");
        }
-       s->write_fd = fds[0];
-       add_close_on_fork_list(s->write_fd);
-       /* we write to this fd in do_select, so we need non-blocking */
-       mark_fd_nonblock(s->write_fd);
-       gettimeofday(&s->wstime, NULL);
+       ret = wng_open(s->wng);
+       s->wstime = *now;
        current_decoder = slot_num;
-       activate_inactive_grab_clients(slot_num, s->format, &s->fci->filters);
+       activate_inactive_grab_clients(slot_num, s->format, &s->fc->filters);
+}
+
+static void rn_event_handler(struct task *t)
+{
+//     struct receiver_node *rn = t->private_data;
+       PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret));
+       unregister_task(t);
 }
 
 static void open_receiver(int format)
@@ -718,16 +710,17 @@ static void open_receiver(int format)
        struct audio_format_info *a = &afi[format];
        struct slot_info *s;
        int ret, slot_num;
+       struct receiver_node *rn;
 
        slot_num = get_empty_slot();
        if (slot_num < 0)
                clean_exit(EXIT_FAILURE, PARA_STRERROR(-slot_num));
        s = &slot[slot_num];
        s->format = format;
-       gettimeofday(&s->rtime, NULL);
-       s->wtime = s->rtime;
        s->receiver_node = para_calloc(sizeof(struct receiver_node));
-       s->receiver_node->conf = a->receiver_conf;
+       rn = s->receiver_node;
+       rn->receiver = a->receiver;
+       rn->conf = a->receiver_conf;
        ret = a->receiver->open(s->receiver_node);
        if (ret < 0) {
                PARA_ERROR_LOG("failed to open receiver (%s)\n",
@@ -738,18 +731,23 @@ static void open_receiver(int format)
        }
        PARA_NOTICE_LOG("started %s: %s receiver in slot %d\n",
                audio_formats[s->format], a->receiver->name, slot_num);
+       rn->task.private_data = s->receiver_node;
+       rn->task.pre_select = a->receiver->pre_select;
+       rn->task.post_select = a->receiver->post_select;
+       rn->task.event_handler = rn_event_handler;
+       rn->task.flags = 0;
+       sprintf(rn->task.status, "receiver node");
+       register_task(&rn->task);
 }
 
 static int is_frozen(int format)
 {
-       struct timeval now;
        struct audio_format_info *a = &afi[format];
 
-       gettimeofday(&now, NULL);
-       return (tv_diff(&now, &a->restart_barrier, NULL) > 0)? 0 : 1;
+       return (tv_diff(now, &a->restart_barrier, NULL) > 0)? 0 : 1;
 }
 
-static void start_current_receiver(void)
+static void open_current_receiver(void)
 {
        int i;
 
@@ -758,21 +756,20 @@ static void start_current_receiver(void)
        i = get_audio_format_num(af_status);
        if (i < 0)
                return;
-       if ((decoder_running(i) & 1) || is_frozen(i))
+       if (decoder_running(i) || is_frozen(i))
                return;
        open_receiver(i);
 }
 
 static void compute_time_diff(const struct timeval *status_time)
 {
-       struct timeval now, tmp, diff;
+       struct timeval tmp, diff;
        static int count;
        int sign;
        const struct timeval max_deviation = {0, 500 * 1000};
        const int time_smooth = 5;
 
-       gettimeofday(&now, NULL);
-       sign = tv_diff(status_time, &now, &diff);
+       sign = tv_diff(status_time, now, &diff);
 //             PARA_NOTICE_LOG("%s: sign = %i, sa_time_diff_sign = %i\n", __func__,
 //                     sign, sa_time_diff_sign);
        if (!count) {
@@ -855,7 +852,12 @@ static void handle_signal(int sig)
 {
        switch (sig) {
        case SIGCHLD:
-               return check_sigchld();
+               for (;;) {
+                       pid_t pid = para_reap_child();
+                       if (pid <= 0)
+                               return;
+                       PARA_CRIT_LOG("para_client died (pid %d)\n", pid);
+               }
        case SIGINT:
        case SIGTERM:
        case SIGHUP:
@@ -865,179 +867,64 @@ static void handle_signal(int sig)
        }
 }
 
-static void check_timeouts(void)
-{
-       struct timeval now;
-       int slot_num, timeout = conf.stream_timeout_arg;
-
-       gettimeofday(&now, NULL);
-       FOR_EACH_SLOT(slot_num) {
-               struct slot_info *s = &slot[slot_num];
-               if (s->format < 0)
-                       continue;
-               /* check read time */
-               if (s->receiver_node &&
-                       now.tv_sec > s->rtime.tv_sec + timeout) {
-                       PARA_INFO_LOG("%s input buffer (slot %d) not ready\n",
-                               audio_formats[s->format], slot_num);
-                       if (s->fci)
-                               s->fci->error = 42;
-                       else
-                               close_receiver(slot_num);
-               }
-               /* check write time */
-               if (s->wpid > 0 && !s->wkilled &&
-                       now.tv_sec > s->wtime.tv_sec + timeout) {
-                       PARA_INFO_LOG("%s output buffer (slot %d) not ready\n",
-                               audio_formats[s->format], slot_num);
-                       if (s->fci)
-                               s->fci->error = 42;
-               }
-       }
-}
-
-static size_t get_loaded_bytes(int slot_num)
+static void try_to_close_slot(int slot_num)
 {
-       size_t loaded = 0;
        struct slot_info *s = &slot[slot_num];
-       struct receiver_node *rn = s->receiver_node;
-
-       if (s->format < 0)
-               goto out;
-
-       if (afi[s->format].num_filters) {
-               if (s->fci)
-                       loaded = *s->fci->out_loaded;
-       } else {
-               if (rn)
-                       loaded = rn->loaded;
-       }
-out:
-       return loaded;
-}
-
-
-static void close_decoder_if_idle(int slot_num)
-{
-       struct slot_info *s = &slot[slot_num];
-       struct receiver_node *rn = s->receiver_node;
 
        if (s->format < 0)
                return;
-       if (!s->fci)
+       if (s->receiver_node && !s->receiver_node->eof)
                return;
-       if (!rn->eof && !s->fci->error && s->wpid > 0)
+       if (s->fc && !s->fc->eof)
                return;
-       if (!s->fci->error && s->wpid > 0) { /* eof */
-               if (filter_io(s->fci) > 0)
-                       return;
-               if (get_loaded_bytes(slot_num))
-                       return;
-       }
-       if (s->write_fd > 0) {
-               PARA_INFO_LOG("err: %d\n", s->fci->error);
-               PARA_INFO_LOG("slot %d: closing write fd %d\n", slot_num,
-                       s->write_fd);
-               close(s->write_fd);
-               del_close_on_fork_list(s->write_fd);
-               s->write_fd = -1;
-       }
-       if (s->wpid > 0)
-               return; /* wait until writer dies before closing filters */
-       PARA_INFO_LOG("closing all filters in slot %d (filter_chain %p)\n",
-               slot_num, s->fci);
-       close_filters(s->fci);
-       free(s->fci);
+       if (s->wng && !s->wng->eof)
+               return;
+       PARA_INFO_LOG("closing slot %d \n", slot_num);
+       wng_close(s->wng);
+       wng_destroy(s->wng);
+       close_filters(s->fc);
+       free(s->fc);
        close_receiver(slot_num);
        clear_slot(slot_num);
 }
 
-static void set_stream_fds(fd_set *wfds, int *max_fileno)
+static void audiod_pre_select(struct sched *s, __a_unused struct task *t)
 {
        int i;
 
-       check_timeouts();
+       now = &s->now;
+       if (audiod_status != AUDIOD_ON)
+               kill_all_decoders();
+       else if (playing)
+               open_current_receiver();
        FOR_EACH_SLOT(i) {
-               struct slot_info *s = &slot[i];
-               struct audio_format_info *a;
                struct receiver_node *rn;
 
-               close_decoder_if_idle(i);
-               s->wcheck = 0;
-               if (s->format < 0)
+               try_to_close_slot(i);
+               if (slot[i].format < 0)
                        continue;
-               a = &afi[s->format];
-               rn = s->receiver_node;
-               if (rn && rn->loaded && !s->wpid) {
-                       PARA_INFO_LOG("no writer in slot %d\n", i);
-                       start_stream_writer(i);
+               rn = slot[i].receiver_node;
+               if (rn && rn->loaded && !slot[i].wng) {
+                       open_filters(i);
+                       open_writers(i);
                }
-               if (s->write_fd <= 0)
-                       continue;
-               if (!get_loaded_bytes(i))
-                       continue;
-               para_fd_set(s->write_fd, wfds, max_fileno);
-               s->wcheck = 1;
        }
 }
 
-static int write_audio_data(int slot_num)
+static void audiod_post_select(struct sched *s, __a_unused struct task *t)
 {
-       struct slot_info *s = &slot[slot_num];
-       struct audio_format_info *a = &afi[s->format];
-       struct receiver_node *rn = s->receiver_node;
-       int rv;
-       char **buf;
-       size_t *len;
-
-       if (a->num_filters) {
-               buf = &s->fci->outbuf;
-               len = s->fci->out_loaded;
-       } else {
-               buf = &rn->buf;
-               len = &rn->loaded;
-       }
-       PARA_DEBUG_LOG("writing %p (%zd bytes)\n", *buf, *len);
-       rv = write(s->write_fd, *buf, *len);
-       PARA_DEBUG_LOG("wrote %d/%zd\n", rv, *len);
-       if (rv < 0) {
-               PARA_WARNING_LOG("write error in slot %d (fd %d): %s\n",
-                       slot_num, s->write_fd, strerror(errno));
-               *len = 0;
-               s->fci->error = E_WRITE_AUDIO_DATA;
-       } else if (rv != *len) {
-               PARA_DEBUG_LOG("partial %s write (%i/%zd) for slot %d\n",
-                       audio_formats[s->format], rv, *len, slot_num);
-               *len -= rv;
-               memmove(*buf, *buf + rv, *len);
-       } else
-               *len = 0;
-       if (rv > 0)
-               gettimeofday(&s->wtime, NULL);
-       return rv;
+       /* only save away the current time for other users */
+       now = &s->now;
 }
 
-static void slot_io(fd_set *wfds)
+static void init_audiod_task(struct task *t)
 {
-       int ret, i;
-
-       FOR_EACH_SLOT(i) {
-               struct slot_info *s = &slot[i];
-               struct receiver_node *rn = s->receiver_node;
-
-               if (rn && rn->loaded)
-                       gettimeofday(&s->rtime, NULL);
-               if (s->format >= 0 && s->write_fd > 0 && s->fci) {
-                       ret = filter_io(s->fci);
-                       if (ret < 0)
-                               s->fci->error = -ret;
-//                     PARA_DEBUG_LOG("slot %d, filter io %d bytes, check write: %d, loaded: %d/%d, eof: %d\n",
-//                              i, ret, s->wcheck, rn->loaded, *s->fci->out_loaded, rn->eof);
-               }
-               if (s->write_fd <= 0 || !s->wcheck || !FD_ISSET(s->write_fd, wfds))
-                       continue;
-               write_audio_data(i);
-       }
+       t->pre_select = audiod_pre_select;
+       t->post_select = audiod_post_select;
+       t->event_handler = NULL;
+       t->private_data = t;
+       t->flags = 0;
+       sprintf(t->status, "audiod task");
 }
 
 static int parse_stream_command(const char *txt, char **cmd)
@@ -1072,53 +959,51 @@ static int add_filter(int format, char *cmdline)
        return filter_num;
 }
 
-static int setup_default_filters(void)
+static int init_writers(void)
 {
-       int i, ret = 1;
+       int i, ret, nw;
+       char *cmd;
+       struct audio_format_info *a;
 
+       init_supported_writers();
+       nw = PARA_MAX(1, conf.writer_given);
+       PARA_INFO_LOG("maximal number of writers: %d\n", nw);
        FOR_EACH_AUDIO_FORMAT(i) {
-               struct audio_format_info *a = &afi[i];
-               char *tmp;
-               int j;
-               if (a->num_filters)
-                       continue;
-               /* add "dec" to audio format name */
-               tmp = make_message("%sdec", audio_formats[i]);
-               for (j = 0; filters[j].name; j++)
-                       if (!strcmp(tmp, filters[j].name))
-                               break;
-               free(tmp);
-               ret = -E_UNSUPPORTED_FILTER;
-               if (!filters[j].name)
-                       goto out;
-               tmp = para_strdup(filters[j].name);
-               ret = add_filter(i, tmp);
-               free(tmp);
+               a = &afi[i];
+               a->writer_conf = para_malloc(nw * sizeof(void *));
+               a->writers = para_malloc(nw * sizeof(struct writer *));
+               a->num_writers = 0;
+       }
+       for (i = 0; i < conf.writer_given; i++) {
+               void *wconf;
+               int writer_num;
+               ret = parse_stream_command(conf.writer_arg[i], &cmd);
                if (ret < 0)
                        goto out;
-               PARA_INFO_LOG("%s -> default filter: %s\n", audio_formats[i],
-                       filters[j].name);
-               ret = add_filter(i, "wav");
-               if (ret < 0)
+               a = &afi[ret];
+               nw = a->num_writers;
+               wconf = check_writer_arg(cmd, &writer_num);
+               if (!wconf) {
+                       ret = writer_num;
                        goto out;
-               PARA_INFO_LOG("%s -> default filter: wav\n", audio_formats[i]);
+               }
+               a->writers[nw] = &writers[writer_num];
+               a->writer_conf[nw] = wconf;
+               PARA_INFO_LOG("%s writer #%d: %s\n", audio_formats[ret],
+                       nw, writer_names[writer_num]);
+               a->num_writers++;
        }
+       ret = 1;
 out:
        return ret;
 }
 
-static int init_stream_io(void)
+static int init_receivers(void)
 {
-       int i, ret, receiver_num, nf;
-       char *cmd;
+       int i, ret, receiver_num;
+       char *cmd = NULL;
+       struct audio_format_info *a;
 
-       for (i = 0; i < conf.stream_write_cmd_given; i++) {
-               ret = parse_stream_command(conf.stream_write_cmd_arg[i], &cmd);
-               if (ret < 0)
-                       goto out;
-               afi[ret].write_cmd = para_strdup(cmd);
-               PARA_INFO_LOG("%s write command: %s\n", audio_formats[ret], afi[ret].write_cmd);
-       }
        for (i = 0; receivers[i].name; i++) {
                PARA_INFO_LOG("initializing %s receiver\n", receivers[i].name);
                receivers[i].init(&receivers[i]);
@@ -1147,7 +1032,7 @@ static int init_stream_io(void)
         */
        cmd = para_strdup(receivers[0].name);
        FOR_EACH_AUDIO_FORMAT(i) {
-               struct audio_format_info *a = &afi[i];
+               a = &afi[i];
                if (a->receiver_conf)
                        continue;
                a->receiver_conf = check_receiver_arg(cmd, &receiver_num);
@@ -1155,17 +1040,57 @@ static int init_stream_io(void)
                        return -E_RECV_SYNTAX;
                a->receiver = &receivers[receiver_num];
        }
+       ret = 1;
+out:
        free(cmd);
-       /* filters */
+       return ret;
+}
+
+static int init_default_filters(void)
+{
+       int i, ret = 1;
+
+       FOR_EACH_AUDIO_FORMAT(i) {
+               struct audio_format_info *a = &afi[i];
+               char *tmp;
+               int j;
+
+               if (a->num_filters)
+                       continue; /* no default -- nothing to to */
+               /* add "dec" to audio format name */
+               tmp = make_message("%sdec", audio_formats[i]);
+               for (j = 0; filters[j].name; j++)
+                       if (!strcmp(tmp, filters[j].name))
+                               break;
+               free(tmp);
+               ret = -E_UNSUPPORTED_FILTER;
+               if (!filters[j].name)
+                       goto out;
+               tmp = para_strdup(filters[j].name);
+               ret = add_filter(i, tmp);
+               free(tmp);
+               if (ret < 0)
+                       goto out;
+               PARA_INFO_LOG("%s -> default filter: %s\n", audio_formats[i],
+                       filters[j].name);
+       }
+out:
+       return ret;
+}
+
+static int init_filters(void)
+{
+       int i, ret, nf;
+
        filter_init(filters);
-       nf = PARA_MAX(2,  conf.filter_given) + 1;
-       PARA_INFO_LOG("allocating space for %d filters\n", nf);
+       nf = PARA_MAX(1,  conf.filter_given);
+       PARA_INFO_LOG("maximal number of filters: %d\n", nf);
        FOR_EACH_AUDIO_FORMAT(i) {
-               afi[i].filter_conf = para_malloc(nf * sizeof(char *));
+               afi[i].filter_conf = para_malloc(nf * sizeof(void *));
                afi[i].filters = para_malloc(nf * sizeof(struct filter *));
        }
        if (!conf.no_default_filters_given)
-               return setup_default_filters();
+               return init_default_filters();
        for (i = 0; i < conf.filter_given; i++) {
                char *arg = conf.filter_arg[i];
                char *filter_name = strchr(arg, ':');
@@ -1181,11 +1106,27 @@ static int init_stream_io(void)
                if (ret < 0)
                        goto out;
        }
-       ret = 1;
+       ret = init_default_filters(); /* use default values for the rest */
 out:
        return ret;
 }
 
+static int init_stream_io(void)
+{
+       int ret;
+
+       ret = init_writers();
+       if (ret < 0)
+               return ret;
+       ret = init_receivers();
+       if (ret < 0)
+               return ret;
+       ret = init_filters();
+       if (ret < 0)
+               return ret;
+       return 1;
+}
+
 static int dump_commands(int fd)
 {
        char *buf = para_strdup(""), *tmp = NULL;
@@ -1313,29 +1254,6 @@ out:
        return ret;
 }
 
-#if 0
-static char *list_filters(void)
-{
-       int i, j;
-       char *tmp, *msg = make_message("format\tnum\tcmd\n");
-
-       FOR_EACH_AUDIO_FORMAT(i) {
-               for (j = 0; j < afi[i].num_filters; j++) {
-                       tmp = make_message("%s\t%i\t%s\n",
-                               afi[i].name, j, afi[i].filter_cmds[j]);
-                       msg = para_strcat(msg, tmp);
-                       free(tmp);
-               }
-               tmp = make_message("%s\t%i\t%s\n", afi[i].name,
-                       j, afi[i].write_cmd);
-               msg = para_strcat(msg, tmp);
-               free(tmp);
-       }
-       return msg;
-}
-#endif
-
-
 static int com_grab(int fd, char *cmdline)
 {
        struct grab_client *gc;
@@ -1429,13 +1347,13 @@ static int check_perms(uid_t uid)
        return -E_UCRED_PERM;
 }
 
-static int handle_connect(void)
+static int handle_connect(int accept_fd)
 {
        int i, argc, ret, clifd = -1;
        char *cmd = NULL, *p, *buf = para_calloc(MAXLINE), **argv = NULL;
        struct sockaddr_un unix_addr;
 
-       ret = para_accept(audiod_socket, &unix_addr, sizeof(struct sockaddr_un));
+       ret = para_accept(accept_fd, &unix_addr, sizeof(struct sockaddr_un));
        if (ret < 0)
                goto out;
        clifd = ret;
@@ -1486,9 +1404,10 @@ out:
        return ret;
 }
 
-static void audiod_get_socket(void)
+static int audiod_get_socket(void)
 {
        struct sockaddr_un unix_addr;
+       int fd;
 
        if (conf.socket_given)
                socket_name = para_strdup(conf.socket_arg);
@@ -1498,20 +1417,21 @@ static void audiod_get_socket(void)
                        hn);
                free(hn);
        }
-       PARA_NOTICE_LOG("connecting to local socket %s\n", socket_name);
+       PARA_NOTICE_LOG("local socket: %s\n", socket_name);
        if (conf.force_given)
                unlink(socket_name);
-       audiod_socket = create_pf_socket(socket_name, &unix_addr,
+       fd = create_pf_socket(socket_name, &unix_addr,
                        S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IWOTH);
-       if (audiod_socket < 0) {
+       if (fd < 0) {
                PARA_EMERG_LOG("%s", "can not connect to socket\n");
                exit(EXIT_FAILURE); /* do not unlink socket */
        }
-       if (listen(audiod_socket, 5) < 0) {
+       if (listen(fd , 5) < 0) {
                PARA_EMERG_LOG("%s", "can not listen on socket\n");
                exit(EXIT_FAILURE); /* do not unlink socket */
        }
-       add_close_on_fork_list(audiod_socket);
+       add_close_on_fork_list(fd);
+       return fd;
 }
 
 static int open_stat_pipe(void)
@@ -1528,117 +1448,118 @@ static int open_stat_pipe(void)
        return ret;
 }
 
-static void audiod_pre_select(fd_set *rfds, fd_set *wfds, struct timeval *tv,
-               int *max_fileno)
+void signal_event_handler(struct task *t)
 {
-       int i, ret;
+       struct signal_task *st = t->private_data;
+       handle_signal(st->signum);
+}
 
-       FOR_EACH_SLOT(i) {
-               struct slot_info *s = &slot[i];
-               struct audio_format_info *a;
-               struct receiver_node *rn = s->receiver_node;
-               if (s->format < 0 || !rn)
-                       continue;
-               a = &afi[s->format];
-               ret = a->receiver->pre_select(rn, rfds, wfds, tv);
-//             PARA_NOTICE_LOG("%s preselect: %d\n", a->receiver->name, ret);
-               *max_fileno = PARA_MAX(*max_fileno, ret);
-       }
+void signal_pre_select(struct sched *s, struct task *t)
+{
+       struct signal_task *st = t->private_data;
+       t->ret = 1;
+       para_fd_set(st->fd, &s->rfds, &s->max_fileno);
 }
-static void audiod_post_select(int select_ret, fd_set *rfds, fd_set *wfds)
+
+void signal_post_select(struct sched *s, struct task *t)
 {
-       int i, ret;
+       struct signal_task *st = t->private_data;
+       t->ret = 1;
+       if (!FD_ISSET(st->fd, &s->rfds))
+               return;
+       t->ret = -E_SIGNAL_CAUGHT;
+       st->signum = para_next_signal();
+}
 
-       FOR_EACH_SLOT(i) {
-               struct slot_info *s = &slot[i];
-               struct audio_format_info *a;
-               struct receiver_node *rn = s->receiver_node;
-               if (s->format < 0 || !rn || rn->eof)
-                       continue;
-               a = &afi[s->format];
-               ret = a->receiver->post_select(rn, select_ret, rfds, wfds);
-               if (ret <= 0) {
-                       if (ret)
-                               PARA_ERROR_LOG("%s post select failed: %s (slot %d)\n",
-                               a->receiver->name, PARA_STRERROR(-ret), i);
-                       else
-                               PARA_INFO_LOG("eof in slot %d\n", i);
-                       rn->eof = 1;
-               }
-               if (ret < 0 && s->fci)
-                       s->fci->error = ret;
-       }
+void signal_setup_default(struct signal_task *st)
+{
+       st->task.pre_select = signal_pre_select;
+       st->task.post_select = signal_post_select;
+       st->task.private_data = st;
+       st->task.flags = 0;
+       sprintf(st->task.status, "signal task");
 }
 
-/* TODO: move everything before the select call to pre_select() */
-static void __noreturn audiod_mainloop(void)
+static void command_pre_select(struct sched *s, struct task *t)
 {
-       fd_set rfds, wfds;
-       int ret, max_fileno, sbo = 0;
-       char status_buf[STRINGSIZE] = "";
-       struct timeval tv;
-repeat:
-       FD_ZERO(&wfds);
-       FD_ZERO(&rfds);
-       max_fileno = -1;
-       /* always check signal pipe and the local socket */
-       para_fd_set(signal_pipe, &rfds, &max_fileno);
-       para_fd_set(audiod_socket, &rfds, &max_fileno);
+       struct command_task *ct = t->private_data;
+       para_fd_set(ct->fd, &s->rfds, &s->max_fileno);
 
-       if (audiod_status != AUDIOD_ON)
-               kill_all_decoders();
-       else if (playing)
-               start_current_receiver();
+}
+
+static void command_post_select(struct sched *s, struct task *t)
+{
+       int ret;
+       struct command_task *ct = t->private_data;
 
-       set_stream_fds(&wfds, &max_fileno);
-       /* status pipe */
-       if (stat_pipe >= 0 && audiod_status == AUDIOD_OFF)
-               close_stat_pipe();
-       if (stat_pipe < 0 && audiod_status != AUDIOD_OFF) {
-               stat_pipe = open_stat_pipe();
-               sbo = 0;
-               status_buf[0] = '\0';
-       }
-       if (stat_pipe >= 0 && audiod_status != AUDIOD_OFF)
-               para_fd_set(stat_pipe, &rfds, &max_fileno);
-       /* local socket */
-       tv.tv_sec = 0;
-       tv.tv_usec = 200 * 1000;
-       audiod_pre_select(&rfds, &wfds, &tv, &max_fileno);
-       ret = para_select(max_fileno + 1, &rfds, &wfds, &tv);
-       if (ret < 0)
-               goto repeat;
        if (audiod_status != AUDIOD_OFF)
                audiod_status_dump();
-       audiod_post_select(ret, &rfds, &wfds);
-       /* read status pipe */
-       if (stat_pipe >=0 && FD_ISSET(stat_pipe, &rfds)) {
-               ret = read(stat_pipe, status_buf + sbo, STRINGSIZE - 1 - sbo);
-               if (ret <= 0) {
-                       close_stat_pipe();
-                       /* avoid busy loop if server is down */
-                       while (sleep(1) > 0)
-                               ; /* try again*/
-               } else {
-                       status_buf[ret + sbo] = '\0';
-                       sbo = for_each_line(status_buf, ret + sbo,
-                               &check_stat_line);
-               }
-       }
-       slot_io(&wfds);
-       if (FD_ISSET(audiod_socket, &rfds)) {
-               ret = handle_connect();
-               if (ret < 0) {
-                       PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret));
-               }
+       t->ret = 1; /* always successful */
+       if (!FD_ISSET(ct->fd, &s->rfds))
+               return;
+       ret = handle_connect(ct->fd);
+       if (ret < 0)
+               PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret));
+}
+
+static void init_command_task(struct command_task *ct)
+{
+       ct->task.pre_select = command_pre_select;
+       ct->task.post_select = command_post_select;
+       ct->task.event_handler = NULL;
+       ct->task.private_data = ct;
+       ct->task.flags = 0;
+       ct->fd = audiod_get_socket(); /* doesn't return on errors */
+       sprintf(ct->task.status, "command task");
+}
+
+static void status_pre_select(struct sched *s, struct task *t)
+{
+       struct status_task *st = t->private_data;
+       t->ret = 1;
+       if (st->fd >= 0 && audiod_status == AUDIOD_OFF)
+               close_stat_pipe();
+       if (st->fd < 0 && audiod_status != AUDIOD_OFF) {
+               st->fd = open_stat_pipe();
+               st->loaded = 0;
+               st->buf[0] = '\0';
        }
-       /* signals */
-       if (FD_ISSET(signal_pipe, &rfds)) {
-               int sig_nr = para_next_signal();
-               if (sig_nr > 0)
-                       handle_signal(sig_nr);
+       if (st->fd >= 0 && audiod_status != AUDIOD_OFF)
+               para_fd_set(st->fd, &s->rfds, &s->max_fileno);
+}
+
+static void status_post_select(struct sched *s, struct task *t)
+{
+       struct status_task *st = t->private_data;
+       int ret;
+
+       t->ret = 1;
+       if (st->fd < 0 || !FD_ISSET(st->fd, &s->rfds))
+               return;
+       ret = read(st->fd, st->buf + st->loaded,
+               STRINGSIZE - 1 - st->loaded);
+       if (ret <= 0) {
+               close_stat_pipe();
+               /* avoid busy loop if server is down */
+               while (sleep(1) > 0) /* FIXME */
+                       ; /* try again*/
+       } else {
+               st->buf[ret + st->loaded] = '\0';
+               st->loaded = for_each_line(st->buf, ret + st->loaded,
+                       &check_stat_line);
        }
-       goto repeat;
+}
+
+static void init_status_task(struct status_task *st)
+{
+       st->task.pre_select = status_pre_select;
+       st->task.post_select = status_post_select;
+       st->task.private_data = st;
+       st->task.flags = 0;
+       st->loaded = 0;
+       st->fd = -1;
+       st->buf[0] = '\0';
+       sprintf(st->task.status, "status task");
 }
 
 static void set_initial_status(void)
@@ -1658,10 +1579,15 @@ static void set_initial_status(void)
                PARA_WARNING_LOG("%s", "invalid mode\n");
 }
 
-int __noreturn main(int argc, char *argv[])
+int main(int argc, char *argv[])
 {
        char *cf;
-       int i;
+       int ret, i;
+       struct sched s;
+       struct command_task command_task_struct, *cmd_task = &command_task_struct;
+       struct task audiod_task_struct, *audiod_task = &audiod_task_struct;
+
+       init_sched();
 
        valid_fd_012();
        hostname = para_hostname();
@@ -1688,8 +1614,24 @@ int __noreturn main(int argc, char *argv[])
                clear_slot(i);
        init_grabbing();
        setup_signal_handling();
+       signal_setup_default(sig_task);
+       sig_task->task.event_handler = signal_event_handler;
+
+       init_status_task(stat_task);
+       init_command_task(cmd_task);
+       init_audiod_task(audiod_task);
+
        if (conf.daemon_given)
                daemon_init();
-       audiod_get_socket(); /* doesn't return on errors */
-       audiod_mainloop();
+
+       register_task(&sig_task->task);
+       register_task(&cmd_task->task);
+       register_task(&stat_task->task);
+       register_task(audiod_task);
+       s.default_timeout.tv_sec = 0;
+       s.default_timeout.tv_usec = 99 * 1000;
+       ret = sched(&s);
+
+       PARA_EMERG_LOG("%s\n", PARA_STRERROR(-ret));
+       return EXIT_FAILURE;
 }
index dd385a16c09d0ecc070d3ef9af396256fe70c963..94bf59d012a9837a29a3e6089555051700379e8c 100644 (file)
@@ -143,21 +143,20 @@ see --no_default_filters."
 string typestr="filter_spec" optional multiple
 
 
-option "stream_write_cmd" w
+option "writer" w
 #~~~~~~~~~~~~~~~~~~~~~~~~~~
 
 "Specify stream writer.
 
-May be given multiple times, once for each
-supported audio format. Default value is
-'para_write -w alsa' for both mp3 and ogg.
-You can use the START_TIME() macro for these
-commands.  Each occurence of START_TIME()
-gets replaced at runtime by the stream start
-time announced by para_server, plus any
-offsets."
+May be given multiple times, even multiple
+times for the same audio format.  Default
+value is 'alsa' for all supported audio
+formats. Example:
 
-       string typestr="format:command"
+       -w 'aac:osx'
+
+"
+       string typestr="writer_spec"
        optional
        multiple
 
@@ -167,10 +166,8 @@ option "stream_delay" -
 
 "Time to add to para_server's start_time.
 
-Amount of time to be added to the server
-stream start time for stream_write_cmd if
-START_TIME() was given. Useful for
-syncronizing the audio output of clients."
+Amount of time to be added to the server, before data is sent to
+the writer.  Useful for syncronizing the audio output of clients."
 
        int typestr="milliseconds"
        default="200"
index 3e605d0e091a2bf4ffcfb99f21adf19550188036..936ddc3afe2cf5449d03109aac87925818a3739b 100644 (file)
@@ -25,6 +25,7 @@
 #include "para.h"
 #include "compress_filter.cmdline.h"
 #include "list.h"
+#include "sched.h"
 #include "filter.h"
 #include "string.h"
 
index 18d6e9cb67cb4f9743f47504daf2941b385c6974..7c2c1ec56b0180fe1cbe66279f6fe738fc66964a 100644 (file)
@@ -57,17 +57,18 @@ AC_CHECK_LIB([menu], [new_menu], [extras="$extras para_dbadm"],
 
 recv_cmdline_objs="recv.cmdline http_recv.cmdline dccp_recv.cmdline"
 recv_errlist_objs="http_recv recv_common recv time string net dccp_recv
-       dccp fd"
+       dccp fd sched stdout"
 recv_ldflags=""
 
 filter_cmdline_objs="filter.cmdline compress_filter.cmdline"
-filter_errlist_objs="filter_chain wav compress filter string"
+filter_errlist_objs="filter_chain wav compress filter string stdin stdout sched fd"
 filter_ldflags=""
 
 audiod_cmdline_objs="audiod.cmdline grab_client.cmdline compress_filter.cmdline
-       http_recv.cmdline dccp_recv.cmdline"
+       http_recv.cmdline dccp_recv.cmdline file_write.cmdline"
 audiod_errlist_objs="audiod exec close_on_fork signal string daemon stat net
-       time grab_client filter_chain wav compress http_recv dccp dccp_recv recv_common fd"
+       time grab_client filter_chain wav compress http_recv dccp dccp_recv
+       recv_common fd sched write_common file_writer"
 audiod_ldflags=""
 
 server_cmdline_objs="server.cmdline"
@@ -76,8 +77,8 @@ server_errlist_objs="server mp3_afh afs command net string signal random_selecto
        ipc dccp dccp_send fd"
 server_ldflags=""
 
-write_cmdline_objs="write.cmdline"
-write_errlist_objs="write write_common file_writer time fd string"
+write_cmdline_objs="write.cmdline file_write.cmdline"
+write_errlist_objs="write write_common file_writer time fd string sched stdin"
 write_ldflags=""
 write_writers="file"
 
@@ -263,7 +264,7 @@ else
 fi
 ########################################################################### alsa
 have_alsa="yes"
-msg="=> no alsa support for para_write"
+msg="=> no alsa support for para_audiod/para_write"
 AC_CHECK_HEADERS([alsa/asoundlib.h], [], [
        AC_MSG_WARN([no alsa/asoundlib $msg])
        have_alsa="no"
@@ -273,7 +274,12 @@ AC_CHECK_LIB([asound], [snd_pcm_open], [], [
        have_alsa="no"
 ])
 if test "$have_alsa" = "yes"; then
+       audiod_errlist_objs="$audiod_errlist_objs alsa_writer"
+       audiod_cmdline_objs="$audiod_cmdline_objs alsa_write.cmdline"
+       audiod_ldflags="$audiod_ldflags -lasound"
+
        write_errlist_objs="$write_errlist_objs alsa_writer"
+       write_cmdline_objs="$write_cmdline_objs alsa_write.cmdline"
        write_ldflags="$write_ldflags -lasound"
        write_writers="$write_writers alsa"
 fi
index f7db6fc8ac8b17dfb8452e93b91b0155f26cb366..deca7162bfecc40e93dd0d7431f7bae9218f23ec 100644 (file)
 #include "para.h"
 #include "error.h"
 #include "dccp.h"
+#include "list.h"
+#include "sched.h"
 #include "recv.h"
 #include "string.h"
 #include "net.h"
+#include "fd.h"
 
 #include "dccp_recv.cmdline.h"
 
@@ -116,35 +119,35 @@ static void *dccp_recv_parse_config(int argc, char **argv)
        return NULL;
 }
 
-static int dccp_recv_pre_select(struct receiver_node *rn, fd_set *rfds,
-               __a_unused fd_set *wfds, __a_unused struct timeval *timeout)
+static void dccp_recv_pre_select(struct sched *s, struct task *t)
 {
-       struct private_dccp_recv_data *pdd = rn->private_data;
+       struct private_dccp_recv_data *pdd = t->private_data;
 
        if (!pdd)
-               return -1;
-       FD_SET(pdd->fd, rfds);
-       return pdd->fd;
+               return ;
+       para_fd_set(pdd->fd, &s->rfds, &s->max_fileno);
 }
 
-static int dccp_recv_post_select(struct receiver_node *rn, int select_ret,
-               fd_set *rfds, __a_unused fd_set *wfds)
+static void dccp_recv_post_select(struct sched *s, struct task *t)
 {
-       int ret;
+       struct receiver_node *rn = t->private_data;
        struct private_dccp_recv_data *pdd = rn->private_data;
 
-       if (!select_ret || !pdd || !FD_ISSET(pdd->fd, rfds))
-               return 1; /* nothing to do */
+       t->ret = 1;
+       if (!s->select_ret || !pdd || !FD_ISSET(pdd->fd, &s->rfds))
+               return; /* nothing to do */
+       t->ret = -E_DCCP_OVERRUN;
        if (rn->loaded >= DCCP_BUFSIZE)
-               return -E_DCCP_OVERRUN;
-       ret = recv_bin_buffer(pdd->fd, rn->buf + rn->loaded,
+               return;
+       t->ret = recv_bin_buffer(pdd->fd, rn->buf + rn->loaded,
                DCCP_BUFSIZE - rn->loaded);
-       if (ret <= 0) {
-               PARA_INFO_LOG("%s\n", ret? PARA_STRERROR(-ret) : "eof");
-               return ret;
+       if (t->ret <= 0) {
+               rn->eof = 1;
+               if (!t->ret)
+                       t->ret = -E_DCCP_RECV_EOF;
+               return;
        }
-       rn->loaded += ret;
-       return 1;
+       rn->loaded += t->ret;
 }
 
 /**
diff --git a/error.h b/error.h
index 59c9554a5bbaecc47780292b7b873c6f0ce76535..8458ae2bb1b7dea13c242949bb2b8763d1213ef2 100644 (file)
--- a/error.h
+++ b/error.h
@@ -20,6 +20,7 @@
 
 /** \cond list of all subsystems that support the shiny error facility */
 enum para_subsystem {
+       SS_SCHED,
        SS_GUI,
        SS_TIME,
        SS_WAV,
@@ -33,6 +34,8 @@ enum para_subsystem {
        SS_ORTP_RECV,
        SS_AUDIOD,
        SS_EXEC,
+       SS_STDIN,
+       SS_STDOUT,
        SS_SIGNAL,
        SS_STRING,
        SS_STAT,
@@ -79,11 +82,22 @@ enum para_subsystem {
 #define ORTP_SEND_ERRORS
 #define GUI_ERRORS
 #define RINGBUFFER_ERRORS
+#define SCHED_ERRORS
 
 
 extern const char **para_errlist[];
 /** \endcond */
 
+#define STDIN_ERRORS \
+       PARA_ERROR(STDIN_READ, "failed to read from stdin"), \
+       PARA_ERROR(STDIN_EOF, "end of file"), \
+
+
+#define STDOUT_ERRORS \
+       PARA_ERROR(STDOUT_WRITE, "failed to write to stdout"), \
+       PARA_ERROR(STDOUT_EOF, "end of file"), \
+
+
 #define NET_ERRORS \
        PARA_ERROR(SEND, "send error"), \
        PARA_ERROR(RECV, "receive error"), \
@@ -108,12 +122,14 @@ extern const char **para_errlist[];
        PARA_ERROR(TOO_MANY_BAD_CHUNKS, "too many consecutive bad chunks"), \
        PARA_ERROR(INVALID_HEADER, "invalid header packet"), \
        PARA_ERROR(OVERRUN, "outout buffer overrun"), \
+       PARA_ERROR(ORTP_RECV_EOF, "ortp_recv: end of file"), \
 
 
 #define HTTP_RECV_ERRORS \
        PARA_ERROR(SEND_HTTP_REQUEST, "failed to send http request"), \
        PARA_ERROR(MISSING_OK, "did not receive OK message from peer"), \
-       PARA_ERROR(HTTP_RECV_BUF, "did not receive buffer")
+       PARA_ERROR(HTTP_RECV_BUF, "did not receive buffer"), \
+       PARA_ERROR(HTTP_RECV_EOF, "http_recv: end of file"), \
 
 
 #define RECV_ERRORS \
@@ -138,6 +154,7 @@ extern const char **para_errlist[];
 #define FILTER_CHAIN_ERRORS \
        PARA_ERROR(UNSUPPORTED_FILTER, "given filter not supported"), \
        PARA_ERROR(BAD_FILTER_OPTIONS, "invalid filter option given"), \
+       PARA_ERROR(FC_EOF, "filter chain: eof"), \
 
 
 #define STAT_ERRORS \
@@ -179,6 +196,7 @@ extern const char **para_errlist[];
        PARA_ERROR(SIGNAL_READ, "read error from signal pipe"), \
        PARA_ERROR(WAITPID, "waitpid error"), \
        PARA_ERROR(SIGNAL_PIPE, "failed to setup signal pipe"), \
+       PARA_ERROR(SIGNAL_CAUGHT, "caught signal"), \
 
 
 #define STRING_ERRORS \
@@ -211,7 +229,6 @@ extern const char **para_errlist[];
 
 
 #define AAC_COMMON_ERRORS \
-       PARA_ERROR(AAC_BUF, "invalid buffer"), \
        PARA_ERROR(ESDS, "did not find esds atom"), \
        PARA_ERROR(STCO, "did not find stco atom"), \
 
@@ -312,6 +329,7 @@ extern const char **para_errlist[];
        PARA_ERROR(DCCP_SOCKET, "can not create dccp socket"), \
        PARA_ERROR(DCCP_PACKET_SIZE, "failed to set dccp packet size"), \
        PARA_ERROR(DCCP_SERVICE, "could not get service code"), \
+       PARA_ERROR(DCCP_RECV_EOF, "dccp_recv: end of file"), \
 
 
 #define DCCP_RECV_ERRORS \
@@ -319,22 +337,25 @@ extern const char **para_errlist[];
        PARA_ERROR(DCCP_OVERRUN, "dccp output buffer buffer overrun"), \
        PARA_ERROR(DCCP_CONNECT, "dccp connect error"), \
 
+
 #define DCCP_SEND_ERRORS \
        PARA_ERROR(DCCP_BIND, "dccp bind error"), \
        PARA_ERROR(DCCP_LISTEN, "dccp listen error"), \
        PARA_ERROR(DCCP_WRITE, "dccp write error"), \
 
+
 #define FD_ERRORS \
        PARA_ERROR(F_GETFL, "failed to get fd flags"), \
        PARA_ERROR(F_SETFL, "failed to set fd flags"), \
 
 
 #define WRITE_ERRORS \
-       PARA_ERROR(READ_HDR, "failed to read audio file header"), \
-       PARA_ERROR(READ_STDIN, "failed to read from stdin"), \
        PARA_ERROR(WRITE_SYNTAX, "para_write syntax error"), \
-       PARA_ERROR(WRITE_OVERRUN, "buffer overrun"), \
        PARA_ERROR(PREMATURE_END, "premature end of audio file"), \
+       PARA_ERROR(NO_WAV_HEADER, "wave header not found"), \
+       PARA_ERROR(WAV_HEADER_SUCCESS, "successfully read wave header"), \
+       PARA_ERROR(NO_DELAY, "no initial delay"), \
+       PARA_ERROR(DELAY_TIMEOUT, "initial delay timeout"), \
 
 
 #define ALSA_WRITER_ERRORS \
@@ -355,22 +376,24 @@ extern const char **para_errlist[];
        PARA_ERROR(SET_RATE, "snd_pcm_hw_params_set_rate_near failed"), \
        PARA_ERROR(START_THRESHOLD, "snd_pcm_sw_params_set_start_threshold() failed"), \
        PARA_ERROR(STOP_THRESHOLD, "snd_pcm_sw_params_set_stop_threshold() failed"), \
-       PARA_ERROR(ALSA_LOG, "snd_output_stdio_attach() failed"), \
 
 
 #define FILE_WRITER_ERRORS \
        PARA_ERROR(FW_WRITE, "file writer write error"), \
        PARA_ERROR(FW_OPEN, "file writer: can not open output file"), \
+       PARA_ERROR(FW_NO_FILE, "task started without open file"), \
 
 
 #define WRITE_COMMON_ERRORS \
        PARA_ERROR(WRITE_COMMON_SYNTAX, "syntax error in write option"), \
+       PARA_ERROR(WNG_EOF, "wng: end of file"), \
 
 
 #define AACDEC_ERRORS \
        PARA_ERROR(AACDEC_INIT, "failed to init aac decoder"), \
        PARA_ERROR(AAC_DECODE, "aac decode error"), \
 
+
 /**
  * the subsystem shift
  *
@@ -445,6 +468,9 @@ extern const char **para_errlist[];
 
 /** \cond popcorn time */
 SS_ENUM(GUI);
+SS_ENUM(SCHED);
+SS_ENUM(STDIN);
+SS_ENUM(STDOUT);
 SS_ENUM(WAV);
 SS_ENUM(COMPRESS);
 SS_ENUM(TIME);
diff --git a/file_write.ggo b/file_write.ggo
new file mode 100644 (file)
index 0000000..a172f75
--- /dev/null
@@ -0,0 +1,11 @@
+section "file writer options"
+
+option "filename" f
+#~~~~~~~~~~~~~~~~~~
+
+"select output file name. Defaults to a
+random filename in ~/.paraslash."
+
+       string typestr="filename"
+       optional
+
index a7a765e1175c6379e1881973faa9553255a3cd40..9145ef9fb7ae24432e7e975af0601e965691e398 100644 (file)
 /** \file file_writer.c simple output plugin for testing purposes */
 
 #include "para.h"
+#include "list.h"
+#include "sched.h"
 #include "write.h"
 #include "string.h"
+#include "fd.h"
+#include "file_write.cmdline.h"
 #include "error.h"
 
 /** data specific to the file writer */
 struct private_file_writer_data {
-/** the file descriptor of the output file */
-int fd;
+       /** the file descriptor of the output file */
+       int fd;
+       /** non-zero if \a fd was added to the write fd set */
+       int check_fd;
 };
 
-static int file_writer_open(struct writer_node *w)
+static int file_writer_open(struct writer_node *wn)
 {
        struct private_file_writer_data *pfwd = para_calloc(
                sizeof(struct private_file_writer_data));
-       char *tmp = para_tmpname(), *home = para_homedir(),
-               *filename = make_message("%s/.paraslash/%s", home, tmp);
-
-       free(home);
-       free(tmp);
-       w->private_data = pfwd;
+       struct file_write_args_info *conf = wn->conf;
+       char *filename;
+       if (conf->filename_given)
+               filename = conf->filename_arg;
+       else {
+               char *tmp = para_tmpname(), *home = para_homedir();
+               filename = make_message("%s/.paraslash/%s", home, tmp);
+               free(home);
+               free(tmp);
+       }
+       wn->private_data = pfwd;
        pfwd->fd = open(filename, O_WRONLY | O_CREAT, S_IRUSR | S_IWUSR);
-       free(filename);
+       if (!conf->filename_given)
+               free(filename);
        if (pfwd->fd >= 0)
                return 8192;
        free(pfwd);
@@ -56,6 +68,44 @@ static int file_writer_write(char *data, size_t nbytes, struct writer_node *wn)
        return ret;
 }
 
+static void file_writer_pre_select(struct sched *s, struct task *t)
+{
+       struct writer_node *wn = t->private_data;
+       struct private_file_writer_data *pfwd = wn->private_data;
+       struct writer_node_group *wng = wn->wng;
+
+//     PARA_INFO_LOG("task %p check_fd: %d\n", t, pfwd->check_fd);
+       pfwd->check_fd = 0;
+       t->ret = -E_FW_NO_FILE;
+       if (pfwd->fd <= 0)
+               return;
+       t->ret = 0;
+       if (!*wng->loaded)
+               return;
+       t->ret = 1;
+       para_fd_set(pfwd->fd, &s->wfds, &s->max_fileno);
+       pfwd->check_fd = 1;
+}
+
+static void file_writer_post_select(struct sched *s, struct task *t)
+{
+       struct writer_node *wn = t->private_data;
+       struct private_file_writer_data *pfwd = wn->private_data;
+       struct writer_node_group *wng = wn->wng;
+
+       t->ret = 0;
+       if (!pfwd->check_fd)
+               return;
+       if (!*wng->loaded)
+               return;
+       if (!FD_ISSET(pfwd->fd, &s->wfds))
+               return;
+//     PARA_INFO_LOG("writing %zd\n", *wng->loaded);
+       t->ret = write(pfwd->fd, wng->buf, *wng->loaded);
+       if (t->ret < 0)
+               t->ret = -E_FW_WRITE;
+}
+
 static void file_writer_close(struct writer_node *wn)
 {
        struct private_file_writer_data *pfwd = wn->private_data;
@@ -63,11 +113,27 @@ static void file_writer_close(struct writer_node *wn)
        free(pfwd);
 }
 
+__malloc void *file_writer_parse_config(char *options)
+{
+       PARA_INFO_LOG("options: %s\n", options);
+       struct file_write_args_info *conf
+               = para_calloc(sizeof(struct file_write_args_info));
+       int ret = file_cmdline_parser_string(options, conf, "file_write");
+       PARA_INFO_LOG("conf->filename_given: %d\n", conf->filename_given);
+       if (!ret)
+               return conf;
+       free(conf);
+       return NULL;
+}
+
 /** the init function of the file writer */
 void file_writer_init(struct writer *w)
 {
        w->open = file_writer_open;
        w->write = file_writer_write;
+       w->pre_select = file_writer_pre_select;
+       w->post_select = file_writer_post_select;
+       w->parse_config = file_writer_parse_config;
        w->close = file_writer_close;
        w->shutdown = NULL; /* nothing to do */
 }
index a035c6e652cdfebed3d59708d763768c5c5e3bc2..56afebf3a6ecfed020cf2c6dad065168a80277e0 100644 (file)
--- a/filter.c
+++ b/filter.c
 
 #include "filter.cmdline.h"
 #include "list.h"
+#include "sched.h"
 #include "filter.h"
-#include "error.h"
 #include "string.h"
+#include "stdin.h"
+#include "stdout.h"
+#include "error.h"
 
 INIT_FILTER_ERRLISTS;
 
-#define INBUF_SIZE 32 * 1024
-
-static struct filter_chain_info filter_chain_info_struct;
-static struct filter_chain_info *fci = &filter_chain_info_struct;
+static struct stdin_task stdin_task_struct;
+static struct stdin_task *sit = &stdin_task_struct;
+static struct filter_chain filter_chain_struct;
+static struct filter_chain *fc = &filter_chain_struct;
+static struct stdout_task stdout_task_struct;
+static struct stdout_task *sot = &stdout_task_struct;
 
 struct gengetopt_args_info conf;
 
@@ -46,52 +51,62 @@ __printf_2_3 void para_log(int ll, const char* fmt,...)
        va_end(argp);
 }
 
-static char *inbuf;
-static size_t loaded;
-static int eof;
+void filter_event_handler(struct task *t)
+{
+       PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret));
+       unregister_task(t);
+}
 
-static int init_active_filter_list(void)
+static void open_filters(void)
+{
+       struct filter_node *fn;
+
+       list_for_each_entry(fn, &fc->filters, node) {
+               fn->filter->open(fn);
+               PARA_INFO_LOG("opened %s filter\n", fn->filter->name);
+               fc->outbuf = fn->buf;
+               fc->out_loaded = &fn->loaded;
+       }
+}
+
+
+static int init_filter_chain(void)
 {
        int i, filter_num;
        struct filter_node *fn;
 
-       INIT_LIST_HEAD(&fci->filters);
+       INIT_LIST_HEAD(&fc->filters);
 
-       fci->inbuf = inbuf;
-       fci->in_loaded = &loaded;
-       fci->eof = &eof;
+       fc->inbuf = sit->buf;
+       fc->in_loaded = &sit->loaded;
+       fc->input_eof = &sit->eof;
+       fc->eof = 0;
+       fc->output_eof = &sot->eof;
+       fc->task.private_data = fc;
+       fc->task.pre_select = filter_pre_select;
+       fc->task.event_handler = filter_event_handler;
+       sprintf(fc->task.status, "filter chain");
 
        for (i = 0; i < conf.filter_given; i++) {
-               char *fa = para_strdup(conf.filter_arg[i]);
+               char *fa = conf.filter_arg[i];
                fn = para_calloc(sizeof(struct filter_node));
                filter_num = check_filter_arg(fa, &fn->conf);
                if (filter_num < 0) {
                        free(fn);
                        return filter_num;
                }
-               fn->fci = fci;
+               fn->fc = fc;
                INIT_LIST_HEAD(&fn->callbacks);
                fn->filter = &filters[filter_num];
                PARA_DEBUG_LOG("adding %s to filter chain\n", fn->filter->name);
-               list_add_tail(&fn->node, &fci->filters);
+               list_add_tail(&fn->node, &fc->filters);
        }
-       if (list_empty(&fci->filters))
+       if (list_empty(&fc->filters))
                return -E_NO_FILTERS;
+       open_filters();
        return 1;
 }
 
-static void open_filters(void)
-{
-       struct filter_node *fn;
-
-       list_for_each_entry(fn, &fci->filters, node) {
-               fn->filter->open(fn);
-               PARA_INFO_LOG("opened %s filter\n", fn->filter->name);
-               fci->outbuf = fn->buf;
-               fci->out_loaded = &fn->loaded;
-       }
-}
-
 static int parse_config(int argc, char *argv[])
 {
        static char *cf; /* config file */
@@ -122,55 +137,38 @@ static int parse_config(int argc, char *argv[])
 
 int main(int argc, char *argv[])
 {
-       int converted, ret;
-       char *ib, *ob; /* input/output buffer */
-       size_t *il, *ol; /* number of loaded bytes in input/output buffer */
+       int ret;
+       struct sched s;
+
+       init_sched();
+       stdin_set_defaults(sit);
+       sit->buf = para_malloc(sit->bufsize),
 
        filter_init(filters);
        ret = parse_config(argc, argv);
        if (ret < 0)
                goto out;
-       inbuf = para_malloc(INBUF_SIZE);
-       ret = init_active_filter_list();
-       if (ret < 0)
-               goto out;
-       open_filters();
-       ib = fci->inbuf;
-       ob = fci->outbuf;
-       il = fci->in_loaded;
-       ol = fci->out_loaded;
-       PARA_DEBUG_LOG("ib %p in, ob: %p\n", ib, ob);
-again:
-       if (*il < INBUF_SIZE && !eof) {
-               ret  = read(STDIN_FILENO, ib + *il, INBUF_SIZE - *il);
-               PARA_DEBUG_LOG("read %d/%zd\n", ret, INBUF_SIZE - *il);
-               if (ret < 0)
-                       goto out;
-               if (!ret)
-                       eof = 1;
-               *il += ret;
-       }
-       ret = filter_io(fci);
+       ret = init_filter_chain();
        if (ret < 0)
                goto out;
-       converted = ret;
-       if (*ol) {
-               ret = write(STDOUT_FILENO, ob, *ol);
-               PARA_DEBUG_LOG("wrote %d/%zd\n", ret, *ol);
-               if (ret <= 0)
-                       goto out;
-               *ol -= ret;
-               if (*ol) {
-                       PARA_NOTICE_LOG("short write: %zd bytes left\n", *ol);
-                       memmove(ob, ob + ret, *ol);
-               }
-       }
-       if (!eof || converted)
-               goto again;
-       ret = 0;
+
+       stdout_set_defaults(sot);
+       PARA_EMERG_LOG("fc->output_eof: %d\n", *fc->output_eof);
+       sot->buf = fc->outbuf;
+       sot->loaded = fc->out_loaded;
+       sot->input_eof = &fc->eof;
+
+       register_task(&sot->task);
+       register_task(&fc->task);
+       register_task(&sit->task);
+       s.default_timeout.tv_sec = 1;
+       s.default_timeout.tv_usec = 0;
+       PARA_EMERG_LOG("fc->output_eof: %d\n", *fc->output_eof);
+       ret = sched(&s);
 out:
+       free(sit->buf);
+       close_filters(fc);
        if (ret < 0)
                PARA_EMERG_LOG("%s\n", PARA_STRERROR(-ret));
-       close_filters(fci);
-       return ret;
+       return ret < 0? EXIT_FAILURE : EXIT_SUCCESS;
 }
index 2fb9ad175fbeecaf816c4707e79d2fe96091461d..45ade8d3697f86001c900ea3c7fe6541a44d48f1 100644 (file)
--- a/filter.h
+++ b/filter.h
  * describes one running instance of a chain of filters
  *
  */
-struct filter_chain_info {
-/**
- *
- *
- * the number of channels of the current stream
- *
- * Set by the decoding filter
- */
-       unsigned int channels;
-/**
- *
- *
- * current samplerate in Hz
- *
- * Set by the decoding filter
- */
-       unsigned int samplerate;
-/**
- *
- *
- * the list containing all filter nodes in this filter chain
- */
-       struct list_head filters;
-/**
- *
- *
- * the input buffer of the filter chain
- *
- * This is set to point to the output buffer of the receiving application (the
- * buffer used to read from stdin for para_filter; the output buffer of the
- * current receiver for para_audiod)
- */
-       char *inbuf;
-/**
- *
- *
- * the output buffer of the filter chain
- *
- * Points to the output buffer of the last filter in the filter chain
-**/
-       char *outbuf;
-/**
- *
- *
- * pointer to variable containing the number of bytes loaded in the input buffer
- */
-       size_t *in_loaded;
-/**
- *
- *
- * pointer to variable containing the number of bytes loaded in the output buffer
- */
-       size_t *out_loaded;
-/**
- *
- *
- * non-zero if end of file was encountered
- */
-       int *eof;
-/**
- *
- *
- * non-zero if an error occured
- */
-       int error;
+struct filter_chain {
+       /**
+        *
+        *
+        * the number of channels of the current stream
+        *
+        * Set by the decoding filter
+        */
+               unsigned int channels;
+       /**
+        *
+        *
+        * current samplerate in Hz
+        *
+        * Set by the decoding filter
+        */
+               unsigned int samplerate;
+       /**
+        *
+        *
+        * the list containing all filter nodes in this filter chain
+        */
+               struct list_head filters;
+       /**
+        *
+        *
+        * the input buffer of the filter chain
+        *
+        * This is set to point to the output buffer of the receiving application (the
+        * buffer used to read from stdin for para_filter; the output buffer of the
+        * current receiver for para_audiod)
+        */
+               char *inbuf;
+       /**
+        *
+        *
+        * the output buffer of the filter chain
+        *
+        * Points to the output buffer of the last filter in the filter chain
+       **/
+               char *outbuf;
+       /**
+        *
+        *
+        * pointer to variable containing the number of bytes loaded in the input buffer
+        */
+               size_t *in_loaded;
+       /**
+        *
+        *
+        * pointer to variable containing the number of bytes loaded in the output buffer
+        */
+               size_t *out_loaded;
+       /** non-zero if this filter wont' produce any more output */
+       int eof;
+       /** pointer to the eof flag of the receiving application */
+       int *input_eof;
+       /** pointer to the eof flag of the writing application */
+       int *output_eof;
+       /** the task associated with the filter chain */
+       struct task task;
 };
 
 /**
@@ -104,7 +100,7 @@ struct filter_node {
  *
  * the filter chain this filter node belongs to
  */
-       struct filter_chain_info *fci;
+       struct filter_chain *fc;
 /**
  *
  *
@@ -212,11 +208,12 @@ struct filter_callback {
 };
 
 
-void close_filters(struct filter_chain_info *fci);
-int filter_io(struct filter_chain_info *fci);
+void close_filters(struct filter_chain *fc);
+int filter_io(struct filter_chain *fc);
 void filter_init(struct filter *all_filters);
 int check_filter_arg(char *filter_arg, void **conf);
 int del_filter_callback(struct filter_callback *fcb);
+void filter_pre_select(struct sched *s, struct task *t);
 
 /**
  * the structure associated with a paraslash filter
index 4c1f94bcb890526548e1702209625add432232e1..17f67058df88c9e128b7afdb5a2a3c0f801e90ad 100644 (file)
@@ -20,6 +20,8 @@
 
 #include "para.h"
 #include "list.h"
+#include "sched.h"
+#include "fd.h"
 #include "filter.h"
 #include "error.h"
 #include "string.h"
@@ -67,8 +69,11 @@ static void close_callbacks(struct filter_node *fn)
 {
        struct filter_callback *fcb, *tmp;
 
-       list_for_each_entry_safe(fcb, tmp, &fn->callbacks, node)
+       list_for_each_entry_safe(fcb, tmp, &fn->callbacks, node) {
+               PARA_INFO_LOG("closing %s filter callback\n",
+                       fn->filter->name);
                close_filter_callback(fcb);
+       }
 }
 
 static void call_callbacks(struct filter_node *fn, char *inbuf, size_t inlen,
@@ -95,83 +100,94 @@ static void call_callbacks(struct filter_node *fn, char *inbuf, size_t inlen,
 /**
  * call the convert function of each filter
  *
- * \param fci the filter chain containing the list of filter nodes.
- *
  * This is the core function of the filter subsystem. It loops over the list of
- * filter nodes determined by \a fci and calls the filter's convert function if
+ * filter nodes determined by \a t and calls the filter's convert function if
  * there is input available for the filter node in question. If the convert
  * function consumed some or all of its input data, all registered input
  * callbacks are called.  Similarly, if a convert function produced output, all
  * registerd output callbacks get called.
  *
- * \return The sum of output bytes produced by the convert functions on success,
- * negative return value on errors.
+ * \return The sum of output bytes produced by the convert functions on
+ * success, negative return value on errors (the return value is stored in
+ * t->ret).
  *
  * \sa filter_node, filter#convert, filter_callback
  */
-int filter_io(struct filter_chain_info *fci)
+void filter_pre_select(__a_unused struct sched *s, struct task *t)
 {
+       struct filter_chain *fc = t->private_data;
        struct filter_node *fn;
        char *ib;
        size_t *loaded;
        int conv, conv_total = 0;
+
+       t->ret = -E_FC_EOF;
+       if (*fc->output_eof)
+               goto err_out;
 again:
-       ib = fci->inbuf;
-       loaded = fci->in_loaded;
+       ib = fc->inbuf;
+       loaded = fc->in_loaded;
        conv = 0;
-       list_for_each_entry(fn, &fci->filters, node) {
-               int ret;
+       list_for_each_entry(fn, &fc->filters, node) {
                if (*loaded && fn->loaded < fn->bufsize) {
                        size_t old_fn_loaded = fn->loaded;
-                       PARA_DEBUG_LOG("fc %p loaded: %zd, calling %s convert\n", fci, *loaded, fn->filter->name);
-                       ret = fn->filter->convert(ib, *loaded, fn);
-                       if (ret < 0) {
-                               if (!fci->error)
-                                       fci->error = -ret;
-                               return ret;
-                       }
-                       call_callbacks(fn, ib, ret, fn->buf + old_fn_loaded, fn->loaded - old_fn_loaded);
-                       *loaded -= ret;
-                       conv += ret;
-                       if (*loaded && ret) {
-                               PARA_DEBUG_LOG("moving %zd bytes in input buffer for %s filter\n",
+                       PARA_DEBUG_LOG("fc %p loaded: %zd, calling %s convert\n",
+                               fc, *loaded, fn->filter->name);
+                       t->ret = fn->filter->convert(ib, *loaded, fn);
+                       if (t->ret < 0)
+                               goto err_out;
+                       call_callbacks(fn, ib, t->ret, fn->buf + old_fn_loaded,
+                               fn->loaded - old_fn_loaded);
+                       *loaded -= t->ret;
+                       conv += t->ret;
+                       if (*loaded && t->ret) {
+                               PARA_INFO_LOG("moving %zd bytes in input buffer for %s filter\n",
                                        *loaded,  fn->filter->name);
-                               memmove(ib, ib + ret, *loaded);
+                               memmove(ib, ib + t->ret, *loaded);
                        }
                }
                ib = fn->buf;
                loaded = &fn->loaded;
        }
-//     PARA_DEBUG_LOG("loaded: %d\n", *loaded);
        conv_total += conv;
+       PARA_INFO_LOG("eof (in/out/fc): %d/%d/%d out_loaded: %d, conv: %d, conv_total: %d\n", *fc->input_eof,
+               *fc->output_eof, fc->eof, *fc->out_loaded, conv, conv_total);
        if (conv)
                goto again;
-       return conv_total;
+       t->ret = 1;
+       if (!*fc->input_eof)
+               return;
+       if (*fc->out_loaded)
+               return;
+       if (*fc->in_loaded && conv_total)
+               return;
+       t->ret = -E_FC_EOF;
+err_out:
+       fc->eof = 1;
 }
 
 /**
  * close all filter nodes and its callbacks
  *
- * \param fci the filter chain to close
+ * \param fc the filter chain to close
  *
- * For each filter node determined by \a fci, call the close function of each
+ * For each filter node determined by \a fc, call the close function of each
  * registered filter callback as well as the close function of the
  * corresponding filter.  Free all resources and destroy all callback lists and
  * the filter node list.
  *
  * \sa filter::close, filter_callback::close
  */
-void close_filters(struct filter_chain_info *fci)
+void close_filters(struct filter_chain *fc)
 {
        struct filter_node *fn, *tmp;
 
-       if (!fci)
+       if (!fc)
                return;
-       PARA_DEBUG_LOG("closing filter chain %p\n", fci);
-       list_for_each_entry_safe(fn, tmp, &fci->filters, node) {
-               PARA_NOTICE_LOG("closing %s filter callbacks (fci %p, fn %p)\n", fn->filter->name, fci, fn);
+       PARA_NOTICE_LOG("closing filter chain %p\n", fc);
+       list_for_each_entry_safe(fn, tmp, &fc->filters, node) {
                close_callbacks(fn);
-               PARA_NOTICE_LOG("closing %s filter (fci %p, fn %p)\n", fn->filter->name, fci, fn);
+               PARA_INFO_LOG("closing %s filter\n", fn->filter->name);
                fn->filter->close(fn);
                list_del(&fn->node);
                free(fn);
index 4f90a27ab24066ec007dbdfd1a66f6af9dc430ff..f505fa8739070d4a9a01563c2e60a8395fdea70a 100644 (file)
@@ -27,6 +27,7 @@
 #include "close_on_fork.h"
 #include "grab_client.cmdline.h"
 #include "list.h"
+#include "sched.h"
 #include "filter.h"
 #include "grab_client.h"
 #include "audiod.h"
index 5f1d779991d60dfceefd446c54c0ef700e6ca523..b566acf3c0612a48d5796ca01dae5359bb93386e 100644 (file)
 #include "para.h"
 
 #include "http.h"
+#include "list.h"
+#include "sched.h"
 #include "recv.h"
 #include "http_recv.cmdline.h"
 #include "error.h"
 #include "net.h"
 #include "string.h"
+#include "fd.h"
 
 /** the output buffer size of the http receiver */
 #define BUFSIZE (32 * 1024)
@@ -89,61 +92,62 @@ static char *make_request_msg(void)
        return ret;
 }
 
-static int http_pre_select(struct receiver_node *rn, fd_set *rfds, fd_set *wfds,
-       __a_unused struct timeval *timeout)
+static void http_recv_pre_select(struct sched *s, struct task *t)
 {
+       struct receiver_node *rn = t->private_data;
        struct private_http_recv_data *phd = rn->private_data;
 
+       t->ret = 1;
        if  (phd->status == HTTP_CONNECTED)
-               FD_SET(phd->fd, wfds);
+               para_fd_set(phd->fd, &s->wfds, &s->max_fileno);
        else
-               FD_SET(phd->fd, rfds);
-       return phd->fd;
+               para_fd_set(phd->fd, &s->rfds, &s->max_fileno);
 }
 
-static int http_post_select(struct receiver_node *rn, int select_ret,
-               fd_set *rfds, fd_set *wfds)
+
+static void http_recv_post_select(struct sched *s, struct task *t)
 {
-       int ret;
+       struct receiver_node *rn = t->private_data;
        struct private_http_recv_data *phd = rn->private_data;
 
-       if (!select_ret) /* we're not interested in timeouts */
-               return 1;
+       t->ret = 1;
+       if (!s->select_ret) /* we're not interested in timeouts */
+               return;
        if  (phd->status == HTTP_CONNECTED) {
                char *rq;
-               if (!FD_ISSET(phd->fd, wfds))
-                       return 1; /* nothing to do */
+               if (!FD_ISSET(phd->fd, &s->wfds))
+                       return; /* nothing to do */
                rq = make_request_msg();
                PARA_NOTICE_LOG("%s", "sending http request\n");
-               ret = send_va_buffer(phd->fd, "%s", rq);
+               t->ret = send_va_buffer(phd->fd, "%s", rq);
                free(rq);
-               if (ret < 0)
-                       return E_SEND_HTTP_REQUEST;
-               phd->status = HTTP_SENT_GET_REQUEST;
-               return 1;
+               if (t->ret > 0)
+                       phd->status = HTTP_SENT_GET_REQUEST;
+               return;
        }
-       if (!FD_ISSET(phd->fd, rfds))
-               return 1; /* nothing to do */
+       if (!FD_ISSET(phd->fd, &s->rfds))
+               return; /* nothing to do */
        if (phd->status == HTTP_SENT_GET_REQUEST) {
-               ret = recv_pattern(phd->fd, HTTP_OK_MSG, MAXLINE);
-               if (ret < 0)
-                       return -E_MISSING_OK;
+               t->ret = recv_pattern(phd->fd, HTTP_OK_MSG, MAXLINE);
+               if (t->ret < 0)
+                       return;
                PARA_NOTICE_LOG("%s", "received ok msg, streaming\n");
                phd->status = HTTP_STREAMING;
-               return 1;
+               return;
        }
+       t->ret = -E_OVERRUN;
        /* already streaming */
-       if (rn->loaded >= BUFSIZE) {
-               PARA_ERROR_LOG("%s", "buffer overrun\n");
-               return -E_OVERRUN;
+       if (rn->loaded >= BUFSIZE)
+               return;
+       t->ret = recv_bin_buffer(phd->fd, rn->buf + rn->loaded,
+               BUFSIZE - rn->loaded);
+       if (t->ret <= 0) {
+               rn->eof = 1;
+               if (!t->ret)
+                       t->ret = -E_HTTP_RECV_EOF;
+               return;
        }
-       ret = recv_bin_buffer(phd->fd, rn->buf + rn->loaded, BUFSIZE - rn->loaded);
-       if (ret <= 0) {
-               PARA_NOTICE_LOG("recv returned %d/%zd\n", ret, BUFSIZE - rn->loaded);
-               return ret < 0? -E_HTTP_RECV_BUF : 0;
-       }
-       rn->loaded += ret;
-       return 1;
+       rn->loaded += t->ret;
 }
 
 static void http_recv_close(struct receiver_node *rn)
@@ -175,6 +179,7 @@ static int http_recv_open(struct receiver_node *rn)
        rn->buf = para_calloc(BUFSIZE);
        rn->private_data = para_calloc(sizeof(struct private_http_recv_data));
        phd = rn->private_data;
+       PARA_NOTICE_LOG("phd = %p, rn = %p\n", phd, rn);
        ret = -E_HOST_INFO;
        if (!(he = get_host_info(conf->host_arg)))
                goto err_out;
@@ -209,8 +214,8 @@ void http_recv_init(struct receiver *r)
 {
        r->open = http_recv_open;
        r->close = http_recv_close;
-       r->pre_select = http_pre_select;
-       r->post_select = http_post_select;
+       r->pre_select = http_recv_pre_select;
+       r->post_select = http_recv_post_select;
        r->shutdown = http_shutdown;
        r->parse_config = http_recv_parse_config;
 }
diff --git a/list.h b/list.h
index 0080c80c32df511a14ee25d099c32a5e3beddd19..c85f50c7e79c45f8ae47ab0e721763044e80d09f 100644 (file)
--- a/list.h
+++ b/list.h
@@ -169,4 +169,18 @@ static inline int list_empty(const struct list_head *head)
                n = list_entry(pos->member.next, typeof(*pos), member); \
             &pos->member != (head);                                    \
             pos = n, n = list_entry(n->member.next, typeof(*n), member))
+/**
+ * list_for_each_entry_safe_reverse - iterate backwards over list of given type safe against
+ *                                   removal of list entry
+ * @pos:       the type * to use as a loop counter.
+ * @n:         another type * to use as temporary storage
+ * @head:      the head for your list.
+ * @member:    the name of the list_struct within the struct.
+ */
+#define list_for_each_entry_safe_reverse(pos, n, head, member)         \
+       for (pos = list_entry((head)->prev, typeof(*pos), member),      \
+               n = list_entry(pos->member.prev, typeof(*pos), member); \
+            &pos->member != (head);                                    \
+            pos = n, n = list_entry(n->member.prev, typeof(*n), member))
+
 #endif /* _LIST_H */
index af9de5484a0f9f150a153500fef42fd6a2920f9e..f809daec441c1e5957c48171cdcd3eeb470977b0 100644 (file)
--- a/mp3dec.c
+++ b/mp3dec.c
@@ -19,8 +19,8 @@
 /** \file mp3dec.c paraslash's mp3 decoder */
 
 #include "para.h"
-
 #include "list.h"
+#include "sched.h"
 #include "filter.h"
 #include "error.h"
 #include <mad.h>
@@ -67,8 +67,8 @@ next_frame:
                        goto out;
                return FRAME_HEADER_SIZE;
        }
-       fn->fci->samplerate = pmd->frame.header.samplerate;
-       fn->fci->channels = MAD_NCHANNELS(&pmd->frame.header);
+       fn->fc->samplerate = pmd->frame.header.samplerate;
+       fn->fc->channels = MAD_NCHANNELS(&pmd->frame.header);
        ret = mad_frame_decode(&pmd->frame, &pmd->stream);
        if (ret) {
                if (MAD_RECOVERABLE(pmd->stream.error) || pmd->stream.error == MAD_ERROR_BUFLEN)
@@ -98,8 +98,8 @@ next_frame:
 out:
        if (pmd->stream.next_frame) { /* we still have some data */
                size_t off = pmd->stream.bufend - pmd->stream.next_frame;
-               PARA_DEBUG_LOG("converted %zd, rate: %u, returning %zd\n", len - off,
-                       fn->fci->samplerate, copy - off);
+//             PARA_DEBUG_LOG("converted %zd, rate: %u, returning %zd\n", len - off,
+//                     fn->fc->samplerate, copy - off);
                return copy - off;
        }
        return copy;
index 3554eb1b8ff3c6135d47e21c88e5abfd480b0dc5..e1c4517ec94e3cda5e5ba92d86485ec47666aef6 100644 (file)
--- a/oggdec.c
+++ b/oggdec.c
@@ -22,6 +22,7 @@
 
 #include "oggdec_filter.cmdline.h"
 #include "list.h"
+#include "sched.h"
 #include "filter.h"
 #include "error.h"
 #include "string.h"
@@ -53,11 +54,11 @@ static size_t cb_read(void *buf, size_t size, size_t nmemb, void *datasource)
        size_t ret, have = pod->inbuf_len - pod->converted;
        char *p = pod->inbuf + pod->converted;
 
-       if (*fn->fci->eof)
-               return 0;
 //     PARA_DEBUG_LOG("pod = %p\n", pod);
 //     PARA_DEBUG_LOG("vorbis requests %d bytes, have %d\n", size * nmemb, have);
        if (pod->inbuf_len < size) {
+               if (*fn->fc->input_eof)
+                       return 0;
                errno = EAGAIN;
                return -1;
        }
@@ -133,7 +134,7 @@ static ssize_t ogg_convert(char *inbuffer, size_t len, struct filter_node *fn)
 
        if (!pod->vf) {
                int ib = 1024 * conf->initial_buffer_arg; /* initial buffer */
-               if (len <ib && !*fn->fci->eof && !fn->fci->error) {
+               if (len <ib && !*fn->fc->input_eof) {
                        PARA_INFO_LOG("initial input buffer %zd/%d, waiting for more data\n",
                                len, ib);
                        return 0;
@@ -154,9 +155,9 @@ static ssize_t ogg_convert(char *inbuffer, size_t len, struct filter_node *fn)
                        return -E_OGGDEC_BADHEADER;
                if (ret < 0)
                        return -E_OGGDEC_FAULT;
-               fn->fci->channels = ov_info(pod->vf, 0)->channels;
-               fn->fci->samplerate = ov_info(pod->vf, 0)->rate;
-               PARA_NOTICE_LOG("%d channels, %d Hz\n", fn->fci->channels, fn->fci->samplerate);
+               fn->fc->channels = ov_info(pod->vf, 0)->channels;
+               fn->fc->samplerate = ov_info(pod->vf, 0)->rate;
+               PARA_NOTICE_LOG("%d channels, %d Hz\n", fn->fc->channels, fn->fc->samplerate);
        }
 again:
        ret = ov_read(pod->vf, fn->buf + fn->loaded, fn->bufsize - fn->loaded,
@@ -167,7 +168,7 @@ again:
        if (ret < 0)
                return -E_OGGDEC_BADLINK;
        fn->loaded += ret;
-       if (!*fn->fci->eof && !fn->fci->error && fn->loaded < fn->bufsize)
+       if (!*fn->fc->input_eof && fn->loaded < fn->bufsize)
                goto again;
        return pod->converted;
 }
index 905d580e1ba4dcf00872e40cf1119878e09794bb..9c9013648e794b12d6c6f0750eedc079048ebaf7 100644 (file)
@@ -21,6 +21,8 @@
 #include "para.h"
 
 #include "ortp.h"
+#include "list.h"
+#include "sched.h"
 #include "recv.h"
 #include "ortp_recv.cmdline.h"
 
@@ -38,7 +40,6 @@ extern int msg_to_buf(mblk_t *, char *, int);
  * \sa receiver receiver_node
  */
 struct private_ortp_recv_data {
-
 /**
  *
  *
@@ -66,21 +67,19 @@ uint32_t timestamp;
 uint32_t chunk_ts;
 };
 
-static int ortp_recv_pre_select(struct receiver_node *rn,
-               __a_unused fd_set *rfds, __a_unused fd_set *wfds,
-               struct timeval *timeout)
+static void ortp_recv_pre_select(struct sched *s, struct task *t)
 {
+       struct receiver_node *rn = t->private_data;
        struct private_ortp_recv_data *pord = rn->private_data;
-       struct timeval now, tmp;
+       struct timeval tmp;
 
-       gettimeofday(&now, NULL);
-       if (tv_diff(&now, &pord->next_chunk, &tmp) >= 0) {
+       if (tv_diff(&s->now, &pord->next_chunk, &tmp) >= 0) {
                tmp.tv_sec = 0;
                tmp.tv_usec = 1000;
        }
-       if (tv_diff(timeout, &tmp, NULL) > 0)
-               *timeout = tmp;
-       return -1; /* we did not modify the fd sets */
+       if (tv_diff(&s->timeout, &tmp, NULL) > 0)
+               s->timeout = tmp;
+       t->ret = 1;
 }
 
 static void compute_next_chunk(unsigned chunk_time,
@@ -97,47 +96,43 @@ static void compute_next_chunk(unsigned chunk_time,
                pord->next_chunk.tv_usec);
 }
 
-static int ortp_recv_post_select(struct receiver_node *rn,
-               __a_unused int select_ret, __a_unused fd_set *rfds,
-               __a_unused fd_set *wfds)
+static void ortp_recv_post_select(struct sched *s, struct task *t)
 {
+       struct receiver_node *rn = t->private_data;
        struct private_ortp_recv_data *pord = rn->private_data;
        mblk_t *mp;
-       int ret, packet_type, stream_type;
+       int packet_type, stream_type;
        char tmpbuf[CHUNK_SIZE + 3];
-       struct timeval now;
        unsigned chunk_time;
 
-       gettimeofday(&now, NULL);
-//     PARA_DEBUG_LOG("rn: %p, pord: %p, session: %p\n", rn, pord, pord->session);
-       if (pord->start.tv_sec) {
-               struct timeval diff;
-               if (tv_diff(&now, &pord->next_chunk, &diff) < 0)
-                               return 1;
-       }
+//     PARA_INFO_LOG("rn: %p, pord: %p, session: %p\n", rn, pord, pord->session);
+       t->ret = 1;
+       if (pord->start.tv_sec)
+               if (tv_diff(&s->now, &pord->next_chunk, NULL) < 0)
+                       return;
        mp = rtp_session_recvm_with_ts(pord->session, pord->timestamp);
        if (!mp) {
                struct timeval min_delay = {0, 100};
 //             PARA_INFO_LOG("nope, chunk_ts = %d, loaded: %d, bad: %d\n",
 //                     pord->timestamp, rn->loaded, pord->c_bad);
                pord->c_bad++;
+               t->ret = -E_TOO_MANY_BAD_CHUNKS;
                if ((pord->c_bad > 5000 && pord->start.tv_sec) || pord->c_bad > 10000)
-                       return -E_TOO_MANY_BAD_CHUNKS;
-               tv_add(&now, &min_delay, &pord->next_chunk);
-               return 1;
+                       return;
+               t->ret = 1;
+               tv_add(&s->now, &min_delay, &pord->next_chunk);
+               return;
        }
        /* okay, we have a chunk of data */
        if (!pord->start.tv_sec)
-               pord->start = now;
-       ret = msg_to_buf(mp, tmpbuf, CHUNK_SIZE);
-//     PARA_DEBUG_LOG("have it ts = %d, chunk_ts = %d, loaded: %d, "
-//             "bad: %d, len: %d\n", pord->timestamp, pord->chunk_ts,
-//             rn->loaded, pord->c_bad, ret);
-       if (ret < ORTP_AUDIO_HEADER_LEN) {
-               if (ret < 0)
-                       ret = -E_MSG_TO_BUF;
+               pord->start = s->now;
+       t->ret = msg_to_buf(mp, tmpbuf, CHUNK_SIZE);
+       if (t->ret < ORTP_AUDIO_HEADER_LEN) {
+               rn->eof = 1;
+               if (t->ret < 0)
+                       t->ret = -E_MSG_TO_BUF;
                else
-                       ret = 0;
+                       t->ret = -E_ORTP_RECV_EOF;
                goto err_out;
        }
        packet_type = READ_PACKET_TYPE(tmpbuf);
@@ -150,65 +145,67 @@ static int ortp_recv_post_select(struct receiver_node *rn,
        switch (packet_type) {
        unsigned header_len, payload_len;
        case ORTP_EOF:
-               ret = 0;
+               rn->eof = 1;
+               t->ret = -E_ORTP_RECV_EOF;
                goto err_out;
        case ORTP_BOF:
-               PARA_INFO_LOG("bof (%d)\n", ret);
+               PARA_INFO_LOG("bof (%d)\n", t->ret);
                pord->have_header = 1;
                /* fall through */
        case ORTP_DATA:
                if (!pord->have_header && stream_type)
                /* can't use the data, wait for header */
                        goto success;
-               if (ret + rn->loaded >= CHUNK_SIZE + ORTP_AUDIO_HEADER_LEN) {
-                       ret = -E_OVERRUN;
+               if (t->ret + rn->loaded >= CHUNK_SIZE + ORTP_AUDIO_HEADER_LEN) {
+                       t->ret = -E_OVERRUN;
                        goto err_out;
                }
-               if (ret > ORTP_AUDIO_HEADER_LEN) {
+               if (t->ret > ORTP_AUDIO_HEADER_LEN) {
                        memcpy(rn->buf + rn->loaded, tmpbuf + ORTP_AUDIO_HEADER_LEN,
-                               ret - ORTP_AUDIO_HEADER_LEN);
-                       rn->loaded += ret - ORTP_AUDIO_HEADER_LEN;
+                               t->ret - ORTP_AUDIO_HEADER_LEN);
+                       rn->loaded += t->ret - ORTP_AUDIO_HEADER_LEN;
                }
                goto success;
        case ORTP_HEADER:
                header_len = READ_HEADER_LEN(tmpbuf);
                PARA_DEBUG_LOG("header packet (%d bytes), header len: %d\n",
-                       ret, header_len);
+                       t->ret, header_len);
                if (!pord->have_header) {
                        pord->have_header = 1;
                        memcpy(rn->buf, tmpbuf + ORTP_AUDIO_HEADER_LEN,
-                               ret - ORTP_AUDIO_HEADER_LEN);
-                       rn->loaded = ret - ORTP_AUDIO_HEADER_LEN;
+                               t->ret - ORTP_AUDIO_HEADER_LEN);
+                       rn->loaded = t->ret - ORTP_AUDIO_HEADER_LEN;
                        goto success;
                }
-               if (header_len + ORTP_AUDIO_HEADER_LEN > ret) {
-                       ret = -E_INVALID_HEADER;
+               if (header_len + ORTP_AUDIO_HEADER_LEN > t->ret) {
+                       t->ret = -E_INVALID_HEADER;
                        goto err_out;
                }
-               payload_len = ret - ORTP_AUDIO_HEADER_LEN - header_len;
+               payload_len = t->ret - ORTP_AUDIO_HEADER_LEN - header_len;
 //             PARA_INFO_LOG("len: %d header_len: %d, payload_len: %d, loaded: %d\n", ret,
 //                     header_len, payload_len, rn->loaded);
                if (rn->loaded + payload_len > CHUNK_SIZE) {
-                       ret = -E_OVERRUN;
+                       t->ret = -E_OVERRUN;
                        goto err_out;
                }
                if (payload_len)
                        memcpy(rn->buf + rn->loaded, tmpbuf
-                               + (ret - payload_len), payload_len);
+                               + (t->ret - payload_len), payload_len);
                rn->loaded += payload_len;
                goto success;
        }
 success:
+       t->ret = 1;
        freemsg(mp);
        if (pord->c_bad) {
                pord->c_bad = 0;
-               pord->next_chunk = now;
+               pord->next_chunk = s->now;
        }
        compute_next_chunk(chunk_time, pord);
-       return 1;
+       return;
 err_out:
+       rn->eof = 1;
        freemsg(mp);
-       return ret;
 }
 
 static void ortp_shutdown(void)
diff --git a/recv.c b/recv.c
index af15eb07efb005f8b959e764d74b1bf2a27db60d..cf178d207d2ca400a5613fdb610cd35d7258dbc1 100644 (file)
--- a/recv.c
+++ b/recv.c
  */
 #include "para.h"
 
+#include "list.h"
+#include "sched.h"
 #include "recv.h"
 #include "recv.cmdline.h"
 #include "fd.h"
 #include "error.h"
+#include "stdout.h"
 
 struct gengetopt_args_info conf;
 
@@ -55,13 +58,23 @@ static void *parse_config(int argc, char *argv[], int *receiver_num)
        return check_receiver_arg(conf.receiver_arg, receiver_num);
 }
 
+void rn_event_handler(struct task *t)
+{
+       PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret));
+       unregister_task(t);
+}
+
 int main(int argc, char *argv[])
 {
-       int ret, eof = 0, max, r_opened = 0, receiver_num;
-       struct timeval timeout;
+       int ret, r_opened = 0, receiver_num;
        struct  receiver *r = NULL;
-       fd_set rfds, wfds;
        struct receiver_node rn;
+       struct stdout_task sot;
+       struct sched s;
+
+       init_sched();
+       s.default_timeout.tv_sec = 1;
+       s.default_timeout.tv_usec = 0;
 
        memset(&rn, 0, sizeof(struct receiver_node));
        for (ret = 0; receivers[ret].name; ret++)
@@ -78,44 +91,23 @@ int main(int argc, char *argv[])
        if (ret < 0)
                goto out;
        r_opened = 1;
-recv:
-       FD_ZERO(&rfds);
-       FD_ZERO(&wfds);
-       timeout.tv_sec = 0;
-       timeout.tv_usec = 999 * 1000;
-       max = -1;
-       ret = r->pre_select(&rn, &rfds, &wfds, &timeout);
-       max = PARA_MAX(max, ret);
 
-       PARA_DEBUG_LOG("timeout: %lums, max: %d\n", tv2ms(&timeout), max);
-       ret = para_select(max + 1, &rfds, &wfds, &timeout);
-       if (ret < 0) {
-               ret = -E_RECV_SELECT;
-               goto out;
-       }
-       ret = r->post_select(&rn, ret, &rfds, &wfds);
-       if (ret < 0)
-               goto out;
-       if (!ret)
-               eof = 1;
-       if (!rn.loaded) {
-               if (eof)
-                       goto out;
-               goto recv;
-       }
-       ret = write(STDOUT_FILENO, rn.buf, rn.loaded);
-       PARA_DEBUG_LOG("wrote %d/%zd\n", ret, rn.loaded);
-       if (ret < 0) {
-               ret = -E_WRITE_STDOUT;
-               goto out;
-       }
-       if (ret != rn.loaded) {
-               PARA_INFO_LOG("short write %d/%zd\n", ret, rn.loaded);
-               memmove(rn.buf, rn.buf + ret, rn.loaded - ret);
-       }
-       rn.loaded -= ret;
-       if (rn.loaded || !eof)
-               goto recv;
+       stdout_set_defaults(&sot);
+       sot.buf = rn.buf;
+       sot.loaded = &rn.loaded;
+       sot.input_eof = &rn.eof;
+       register_task(&sot.task);
+
+       rn.task.private_data = &rn;
+       rn.task.pre_select = r->pre_select;
+       rn.task.post_select = r->post_select;
+       rn.task.event_handler = rn_event_handler;
+       rn.task.flags = 0;
+       sprintf(rn.task.status, "receiver node");
+       register_task(&rn.task);
+
+
+       ret = sched(&s);
 out:
        if (r_opened)
                r->close(&rn);
diff --git a/recv.h b/recv.h
index f7fc7259b17a3d1210fe65a9c8d874638d1948f5..3c8b543bda88f50d57bb1cc975dd95085d19a392 100644 (file)
--- a/recv.h
+++ b/recv.h
@@ -34,6 +34,8 @@ struct receiver_node {
        int eof;
        /** pointer to the configuration data for this instance */
        void *conf;
+       /** the task associated with this instance */
+       struct task task;
 };
 
 /**
@@ -111,38 +113,26 @@ struct receiver {
  *
  * The pre_select function gets called from the driving application before
  * entering its select loop. The receiver may use this hook to add any file
- * descriptors to \a rfds and \a wfds in order to check the result later in the
- * post_select hook.
+ * descriptors to the sets of file descriptors given by \a s.
  *
- * \a timeout is a value-result parameter, initially containing the timeout for
- * select() which was set by the application or by another receiver node. If
- * the receiver wants its pre_select function to be called at some earlier time
- * than the time determined by \a timeout, it may set \a timeout to an
- * appropriate smaller value. However, it must never increase this timeout.
  *
- * This function must return the highest-numbered descriptor it wants to being
- * checked, or -1 if no file descriptors should be checked for this run.
- *
- * \sa select(2), receiver_node:private_data, time.c
+ * \sa select(2), time.c struct task, struct sched
  */
-       int (*pre_select)(struct receiver_node *rn, fd_set *rfds,
-               fd_set *wfds, struct timeval *timeout);
+       void (*pre_select)(struct sched *s, struct task *t);
 /**
  *
  *
  * evaluate the result from select()
  *
- * If the call to select() was succesful, this hook gets called. It should
- * check all file descriptors which were added to any of the the fd sets during
- * the previous call to pre_select. According to the result, it may then use
- * any non-blocking I/O to establish a connection or to receive the audio data.
+ * This hook gets called after the call to select(). It should check all file
+ * descriptors which were added to any of the the fd sets during the previous
+ * call to pre_select. According to the result, it may then use any
+ * non-blocking I/O to establish a connection or to receive the audio data.
  *
- * A negative return value is interpreted as an error.
  *
  * \sa select(2), struct receiver
  */
-       int (*post_select)(struct receiver_node *rn, int select_ret,
-               fd_set *rfds, fd_set *wfds);
+       void (*post_select)(struct sched *s, struct task *t);
 };
 
 
index 078e44d17eee3dd97770b5694574c243244d5b30..e39719da37dde6ab5524fc5d81880f0e4edb326b 100644 (file)
@@ -20,6 +20,8 @@
 
 #include "para.h"
 
+#include "list.h"
+#include "sched.h"
 #include "recv.h"
 #include "string.h"
 
diff --git a/sched.c b/sched.c
new file mode 100644 (file)
index 0000000..9db7c09
--- /dev/null
+++ b/sched.c
@@ -0,0 +1,106 @@
+#include <sys/time.h>
+#include "para.h"
+#include "ipc.h"
+#include "fd.h"
+#include "list.h"
+#include "sched.h"
+#include "string.h"
+#include "error.h"
+
+struct list_head pre_select_list;
+struct list_head post_select_list;
+
+static void sched_preselect(struct sched *s)
+{
+       struct task *t, *tmp;
+again:
+       list_for_each_entry_safe(t, tmp, &pre_select_list, pre_select_node) {
+               t->pre_select(s, t);
+               if (t->ret > 0 || !t->event_handler)
+                       continue;
+               t->event_handler(t);
+               goto again;
+       }
+}
+
+static void sched_post_select(struct sched *s)
+{
+       struct task *t, *tmp;
+
+       list_for_each_entry_safe(t, tmp, &post_select_list, post_select_node) {
+               t->post_select(s, t);
+               if (t->ret > 0 || !t->event_handler)
+                       continue;
+               t->event_handler(t);
+       }
+}
+
+int sched(struct sched *s)
+{
+
+       gettimeofday(&s->now, NULL);
+again:
+       FD_ZERO(&s->rfds);
+       FD_ZERO(&s->wfds);
+       s->timeout = s->default_timeout;
+       s->max_fileno = -1;
+       sched_preselect(s);
+       s->select_ret = para_select(s->max_fileno + 1, &s->rfds,
+               &s->wfds, &s->timeout);
+       if (s->select_ret < 0)
+               return s->select_ret;
+       gettimeofday(&s->now, NULL);
+       sched_post_select(s);
+       if (list_empty(&pre_select_list) && list_empty(&post_select_list))
+               return 0;
+       goto again;
+}
+
+void *register_task(struct task *t)
+{
+       PARA_INFO_LOG("registering %s (%p)\n", t->status, t);
+       if (t->pre_select) {
+               PARA_DEBUG_LOG("pre_select: %p\n", &t->pre_select);
+               if (t->flags & PRE_ADD_TAIL)
+                       list_add_tail(&t->pre_select_node, &pre_select_list);
+               else
+                       list_add(&t->pre_select_node, &pre_select_list);
+       }
+       if (t->post_select) {
+               PARA_DEBUG_LOG("post_select: %p\n", &t->pre_select);
+               if (t->flags & POST_ADD_TAIL)
+                       list_add_tail(&t->post_select_node, &post_select_list);
+               else
+                       list_add(&t->post_select_node, &post_select_list);
+       }
+       return t;
+}
+
+void unregister_task(struct task *t)
+{
+       PARA_INFO_LOG("unregistering %s (%p)\n", t->status, t);
+       if (t->pre_select)
+               list_del(&t->pre_select_node);
+       if (t->post_select)
+               list_del(&t->post_select_node);
+};
+
+void init_sched(void)
+{
+       INIT_LIST_HEAD(&pre_select_list);
+       INIT_LIST_HEAD(&post_select_list);
+};
+
+void sched_shutdown(void)
+{
+       struct task *t, *tmp;
+
+       list_for_each_entry_safe(t, tmp, &pre_select_list, pre_select_node)
+               unregister_task(t);
+       /* remove tasks which do not have a pre_select hook */
+       list_for_each_entry_safe(t, tmp, &post_select_list, post_select_node)
+               unregister_task(t);
+};
+
+
+//char *get_tast_list();
diff --git a/sched.h b/sched.h
new file mode 100644 (file)
index 0000000..9855475
--- /dev/null
+++ b/sched.h
@@ -0,0 +1,29 @@
+struct sched {
+       struct timeval now, timeout;
+       int max_fileno;
+       fd_set rfds, wfds;
+       int select_ret;
+       struct timeval default_timeout;
+};
+
+struct task {
+       void *private_data;
+       unsigned flags;
+       int ret;
+       void (*pre_select)(struct sched *s, struct task *t);
+       void (*post_select)(struct sched *s, struct task *t);
+       void (*event_handler)(struct task *t);
+       struct list_head pre_select_node;
+       struct list_head post_select_node;
+       char status[MAXLINE];
+};
+
+enum task_flags {
+       PRE_ADD_TAIL = 1,
+       POST_ADD_TAIL = 2,
+};
+
+void *register_task(struct task *t);
+void unregister_task(struct task *t);
+int sched(struct sched *s);
+void init_sched(void);
index 1805c92b62de84d7131bf71c905dc3d8d23a0d93..2391e07c360d93f68e85a76c99bc783e1f2d4bc0 100644 (file)
--- a/server.c
+++ b/server.c
@@ -33,7 +33,6 @@
 #include "db.h"
 #include "server.h"
 #include "afs.h"
-#include "afh.h" /* FIXME */
 #include "config.h"
 #include "close_on_fork.h"
 #include "send.h"
diff --git a/stdin.c b/stdin.c
new file mode 100644 (file)
index 0000000..22f1dd9
--- /dev/null
+++ b/stdin.c
@@ -0,0 +1,56 @@
+#include "para.h"
+#include "string.h"
+#include "list.h"
+#include "sched.h"
+#include "fd.h"
+#include "error.h"
+#include "stdin.h"
+
+void stdin_pre_select(struct sched *s, struct task *t)
+{
+       struct stdin_task *sit = t->private_data;
+       if (sit->loaded < sit->bufsize)
+               para_fd_set(STDIN_FILENO, &s->rfds, &s->max_fileno);
+       t->ret = 1; /* success */
+}
+
+static void stdin_default_event_handler(struct task *t)
+{
+       PARA_NOTICE_LOG("%p: %s\n", t, PARA_STRERROR(-t->ret));
+       unregister_task(t);
+}
+
+void stdin_post_select(struct sched *s, struct task *t)
+{
+       struct stdin_task *sit = t->private_data;
+       ssize_t ret;
+
+       t->ret = 1;
+       if (sit->loaded >= sit->bufsize)
+               return;
+       if (!FD_ISSET(STDIN_FILENO, &s->rfds))
+               return;
+       ret = read(STDIN_FILENO, sit->buf + sit->loaded, sit->bufsize - sit->loaded);
+       if (ret < 0)
+               t->ret = -E_STDIN_READ;
+       else if (ret > 0) {
+               sit->loaded += ret;
+               t->ret = ret;
+       } else
+               t->ret = -E_STDIN_EOF;
+       if (t->ret < 0)
+               sit->eof = 1;
+}
+
+void stdin_set_defaults(struct stdin_task *sit)
+{
+       sit->bufsize = 16 * 1024,
+       sit->loaded = 0,
+       sit->eof = 0,
+       sit->task.flags = 0,
+       sit->task.pre_select = stdin_pre_select;
+       sit->task.post_select = stdin_post_select;
+       sit->task.event_handler = stdin_default_event_handler;
+       sit->task.private_data = sit;
+       sprintf(sit->task.status, "stdin reader");
+}
diff --git a/stdin.h b/stdin.h
new file mode 100644 (file)
index 0000000..cb6cbfb
--- /dev/null
+++ b/stdin.h
@@ -0,0 +1,11 @@
+struct stdin_task {
+       char *buf;
+       size_t bufsize;
+       size_t loaded;
+       struct task task;
+       int eof;
+};
+
+void stdin_pre_select(struct sched *s, struct task *t);
+void stdin_post_select(struct sched *s, struct task *t);
+void stdin_set_defaults(struct stdin_task *sit);
diff --git a/stdout.c b/stdout.c
new file mode 100644 (file)
index 0000000..1c60669
--- /dev/null
+++ b/stdout.c
@@ -0,0 +1,58 @@
+#include "para.h"
+#include "string.h"
+#include "list.h"
+#include "sched.h"
+#include "fd.h"
+#include "error.h"
+#include "stdout.h"
+
+void stdout_pre_select(struct sched *s, struct task *t)
+{
+       struct stdout_task *sot = t->private_data;
+
+       t->ret = 1;
+       sot->check_fd = 0;
+       if (!*sot->loaded)
+               return;
+       sot->check_fd = 1;
+       para_fd_set(STDOUT_FILENO, &s->wfds, &s->max_fileno);
+}
+
+void stdout_post_select(struct sched *s, struct task *t)
+{
+       struct stdout_task *sot = t->private_data;
+       ssize_t ret;
+
+       t->ret = 1;
+       if (!sot->check_fd) {
+               if (*sot->input_eof)
+                       t->ret = -E_STDOUT_EOF;
+               return;
+       }
+       if (!FD_ISSET(STDOUT_FILENO, &s->wfds))
+               return;
+       t->ret = -E_STDOUT_WRITE;
+       ret = write(STDOUT_FILENO, sot->buf, *sot->loaded);
+       if (ret <= 0)
+               return;
+       *sot->loaded -= ret;
+       t->ret = 1;
+}
+
+void stdout_default_event_handler(struct task *t)
+{
+       PARA_NOTICE_LOG("%p: %s\n", t, PARA_STRERROR(-t->ret));
+       unregister_task(t);
+}
+
+
+void stdout_set_defaults(struct stdout_task *sot)
+{
+       sot->task.private_data = sot;
+       sot->task.pre_select = stdout_pre_select;
+       sot->task.post_select = stdout_post_select;
+       sot->task.event_handler = stdout_default_event_handler;
+       sot->task.flags = 0;
+       sot->eof = 0;
+       sprintf(sot->task.status, "stdout writer");
+}
diff --git a/stdout.h b/stdout.h
new file mode 100644 (file)
index 0000000..4dafafd
--- /dev/null
+++ b/stdout.h
@@ -0,0 +1,14 @@
+/** \file stdout.h common code for uitlities that write to stdout */
+struct stdout_task {
+       char *buf;
+       size_t *bufsize;
+       size_t *loaded;
+       int *input_eof;
+       int eof;
+       struct task task;
+       int check_fd;
+};
+
+void stdout_pre_select(struct sched *s, struct task *t);
+void stdout_post_select(struct sched *s, struct task *t);
+void stdout_set_defaults(struct stdout_task *sot);
diff --git a/wav.c b/wav.c
index e229317dd6309380263673f359de4d4d8d50e59b..5ab3bb3937385a855e7dd5fecdcc50635e3d1ac2 100644 (file)
--- a/wav.c
+++ b/wav.c
@@ -21,6 +21,7 @@
 #include "para.h"
 
 #include "list.h"
+#include "sched.h"
 #include "filter.h"
 #include "string.h"
 
@@ -66,7 +67,7 @@ static ssize_t wav_convert(char *inbuf, size_t len, struct filter_node *fn)
        int *bof = fn->private_data;
 
        if (*bof) {
-               make_wav_header(fn->fci->channels, fn->fci->samplerate, fn);
+               make_wav_header(fn->fc->channels, fn->fc->samplerate, fn);
                fn->loaded = WAV_HEADER_LEN;
                *bof = 0;
 //             return 0;
@@ -95,7 +96,7 @@ static void wav_open(struct filter_node *fn)
        fn->private_data = para_malloc(sizeof(int));
        bof = fn->private_data;
        *bof = 1;
-       PARA_DEBUG_LOG("wav filter node: %p, output buffer: %p, loaded: %zd\n",
+       PARA_INFO_LOG("wav filter node: %p, output buffer: %p, loaded: %zd\n",
                fn, fn->buf, fn->loaded);
 }
 
diff --git a/write.c b/write.c
index 6e4ee0639654eac69ae76cb94e5a0bcffb316688..e42062038dc6d1054c80e84c2d3dd998b359bce9 100644 (file)
--- a/write.c
+++ b/write.c
 #include "para.h"
 #include "string.h"
 #include "write.cmdline.h"
+#include "list.h"
+#include "sched.h"
+#include "stdin.h"
 #include "write.h"
 #include "write_common.h"
 #include "fd.h"
-
-#include <sys/time.h> /* gettimeofday */
-
 #include "error.h"
 
-#define WAV_HEADER_LEN 44
-
-static char *audiobuf;
-static struct timeval *start_time;
-struct gengetopt_args_info conf;
-
 INIT_WRITE_ERRLISTS;
 
-void para_log(int ll, const char* fmt,...)
-{
-       va_list argp;
+struct check_wav_task {
+       char *buf;
+       size_t *loaded;
+       int *eof;
+       unsigned channels;
+       unsigned sample_rate;
+       struct task task;
+};
+
+struct initial_delay_task {
+       struct timeval start_time;
+       struct task task;
+};
+
+static struct gengetopt_args_info conf;
+struct stdin_task sit;
+struct check_wav_task cwt;
+struct initial_delay_task idt;
+static struct writer_node_group *wng;
 
-       if (ll < conf.loglevel_arg)
-               return;
-       va_start(argp, fmt);
-       vfprintf(stderr, fmt, argp);
-       va_end(argp);
-}
+#define WAV_HEADER_LEN 44
 
 /**
- * read WAV_HEADER_LEN bytes from stdin to audio buffer
+ * test if audio buffer contains a valid wave header
  *
- * \return -E_READ_HDR on errors and on eof before WAV_HEADER_LEN could be
- * read. A positive return value indicates success.
+ * \return If not, return -E_NO_WAV_HEADER, otherwise, return zero. If
+ * there is less than WAV_HEADER_LEN bytes awailable, return one.
  */
-static int read_wav_header(void)
+static void check_wav_pre_select(__a_unused struct sched *s, struct task *t)
 {
-       ssize_t ret, count = 0;
+       struct check_wav_task *cwt = t->private_data;
+       unsigned char *a;
 
-       while (count < WAV_HEADER_LEN) {
-               ret = read(STDIN_FILENO, audiobuf + count, WAV_HEADER_LEN - count);
-               if (ret <= 0)
-                       return -E_READ_HDR;
-               count += ret;
+       if (*cwt->loaded < WAV_HEADER_LEN) {
+               t->ret = *cwt->eof? -E_PREMATURE_END : 1;
+               return;
        }
-       return 1;
+       cwt->channels = 2;
+       cwt->sample_rate = 44100;
+       a = (unsigned char*)cwt->buf;
+       t->ret = -E_NO_WAV_HEADER;
+       if (a[0] != 'R' || a[1] != 'I' || a[2] != 'F' || a[3] != 'F')
+               return;
+       cwt->channels = (unsigned) a[22];
+       cwt->sample_rate = a[24] + (a[25] << 8) + (a[26] << 16) + (a[27] << 24);
+       *cwt->loaded -= WAV_HEADER_LEN;
+       memmove(cwt->buf, cwt->buf + WAV_HEADER_LEN, *cwt->loaded);
+       t->ret = -E_WAV_HEADER_SUCCESS;
+       PARA_INFO_LOG("channels: %d, sample_rate: %d\n", cwt->channels, cwt->sample_rate);
 }
 
-/**
- * check if current time is later than start_time
- * \param diff pointer to write remaining time to
- *
- * If start_time was not given, or current time is later than given
- * start_time, return 0. Otherwise, return 1 and write the time
- * difference between current time and start_time to diff. diff may be
- * NULL.
- *
- */
-static int start_time_in_future(struct timeval *diff)
+static void initial_delay_pre_select(struct sched *s, struct task *t)
 {
-       struct timeval now;
+       struct initial_delay_task *idt = t->private_data;
+       struct timeval diff;
 
-       if (!conf.start_time_given)
-               return 0;
-       gettimeofday(&now, NULL);
-       return tv_diff(start_time, &now, diff) > 0? 1 : 0;
-}
-
-/**
- * sleep until time given at command line
- *
- * This is called if the initial buffer is filled. It returns
- * immediately if no start_time was given at the command line
- * or if the given start time is in the past.
- *
- */
-static void do_initial_delay(struct timeval *delay)
-{
-       do
-               para_select(1, NULL, NULL, delay);
-       while (start_time_in_future(delay));
+       t->ret = -E_NO_DELAY;
+       if (!idt->start_time.tv_sec && !idt->start_time.tv_usec)
+               return;
+       t->ret = -E_DELAY_TIMEOUT;
+       if (tv_diff(&s->now, &idt->start_time, &diff) > 0)
+               return;
+       t->ret = 1;
+       if (tv_diff(&s->timeout , &diff, NULL) > 0)
+               s->timeout = diff;
 }
 
-static int read_stdin(char *buf, size_t bytes_to_load, size_t *loaded)
-{
-       ssize_t ret;
-
-       while (*loaded < bytes_to_load) {
-               ret = read(STDIN_FILENO, buf + *loaded, bytes_to_load - *loaded);
-               if (ret <= 0) {
-                       if (ret < 0)
-                               ret = -E_READ_STDIN;
-                       return ret;
-               }
-               *loaded += ret;
-       }
-       return 1;
-}
-/**
- * play raw pcm data
- * \param loaded number of bytes already loaded
- *
- * If start_time was given, prebuffer data until buffer is full or
- * start_time is reached. In any case, do not start playing before
- * start_time.
- *
- * \return positive on success, negative on errors.
- */
-static int pcm_write(struct writer_node_group *wng, size_t loaded)
+void para_log(int ll, const char* fmt,...)
 {
-       size_t bufsize, prebuf_size, bytes_to_load;
-       struct timeval delay;
-       int ret, not_yet_started = 1;
+       va_list argp;
 
-       ret = wng_open(wng);
-       if (ret < 0)
-               goto out;
-       PARA_INFO_LOG("max chunk_bytes: %zd\n", wng->max_chunk_bytes);
-       bufsize = (conf.bufsize_arg * 1024 / wng->max_chunk_bytes)
-               * wng->max_chunk_bytes;
-       audiobuf = para_realloc(audiobuf, bufsize);
-       prebuf_size = conf.prebuffer_arg * bufsize / 100;
-       bytes_to_load =  PARA_MAX(prebuf_size, wng->max_chunk_bytes);
-       ret = read_stdin(audiobuf, bytes_to_load, &loaded);
-       if (ret <= 0 || loaded < bytes_to_load) {
-               if (ret >= 0)
-                       ret = -E_PREMATURE_END;
-               goto out;
-       }
-       if (not_yet_started && start_time && start_time_in_future(&delay))
-               do_initial_delay(&delay);
-       not_yet_started = 0;
-again:
-       ret = wng_write(wng, audiobuf, &loaded);
-       if (ret <= 0)
-               goto out;
-       ret = -E_WRITE_OVERRUN;
-       if (loaded >= bufsize)
-               goto out;
-       bytes_to_load = PARA_MIN(wng->max_chunk_bytes, bufsize);
-       ret = read_stdin(audiobuf, bytes_to_load, &loaded);
-       if (ret < 0)
-               goto out;
-       if (!ret)
-               wng->eof = 1;
-       goto again;
-out:
-       wng_close(wng);
-       return ret;
+       if (ll < conf.loglevel_arg)
+               return;
+       va_start(argp, fmt);
+       vfprintf(stderr, fmt, argp);
+       va_end(argp);
 }
 
 static struct writer_node_group *check_args(void)
 {
        int i, ret = -E_WRITE_SYNTAX;
-       static struct timeval tv;
        struct writer_node_group *wng = NULL;
 
        if (conf.list_writers_given) {
@@ -188,16 +126,15 @@ static struct writer_node_group *check_args(void)
                free(msg);
                exit(EXIT_SUCCESS);
        }
-       if (conf.prebuffer_arg < 0 || conf.prebuffer_arg > 100)
-               goto out;
+//     if (conf.prebuffer_arg < 0 || conf.prebuffer_arg > 100)
+//             goto out;
        if (conf.start_time_given) {
                long unsigned sec, usec;
                if (sscanf(conf.start_time_arg, "%lu:%lu",
                                &sec, &usec) != 2)
                        goto out;
-               tv.tv_sec = sec;
-               tv.tv_usec = usec;
-               start_time = &tv;
+               idt.start_time.tv_sec = sec;
+               idt.start_time.tv_usec = usec;
        }
        if (!conf.writer_given) {
                wng = setup_default_wng();
@@ -205,11 +142,16 @@ static struct writer_node_group *check_args(void)
                goto out;
        }
        wng = wng_new(conf.writer_given);
+       ret = -E_WRITE_SYNTAX;
        for (i = 0; i < conf.writer_given; i++) {
-               ret = check_writer_arg(conf.writer_arg[i]);
-               if (ret < 0)
+               int writer_num;
+               wng->writer_nodes[i].conf = check_writer_arg(
+                       conf.writer_arg[i], &writer_num);
+               if (!wng->writer_nodes[i].conf)
                        goto out;
-               wng->writer_nodes[i].writer = &writers[ret];
+               wng->writer_nodes[i].writer = &writers[writer_num];
+               sprintf(wng->writer_nodes[i].task.status, "%s",
+                       writer_names[writer_num]);
        }
        ret = 1;
 out:
@@ -219,41 +161,89 @@ out:
        return NULL;
 }
 
-/**
- * test if audio buffer contains a valid wave header
- *
- * \return If not, return 0, otherwise, store number of channels and sample rate
- * in struct conf and return WAV_HEADER_LEN.
- */
-static size_t check_wave(void)
+static void wng_event_handler(struct task *t)
 {
-       unsigned char *a = (unsigned char*)audiobuf;
-       if (a[0] != 'R' || a[1] != 'I' || a[2] != 'F' || a[3] != 'F')
-               return WAV_HEADER_LEN;
-       conf.channels_arg = (unsigned) a[22];
-       conf.sample_rate_arg = a[24] + (a[25] << 8) + (a[26] << 16) + (a[27] << 24);
-       return 0;
+       struct writer_node_group *g = t->private_data;
+
+       PARA_INFO_LOG("%s\n", PARA_STRERROR(-t->ret));
+       unregister_task(t);
+       wng_close(g);
+       wng_destroy(g);
+}
+
+
+static void idt_event_handler(struct task *t)
+{
+       int ret;
+
+       PARA_INFO_LOG("%s\n", PARA_STRERROR(-t->ret));
+       unregister_task(t);
+       wng->buf = sit.buf;
+       wng->loaded = &sit.loaded;
+       wng->input_eof = &sit.eof;
+       wng->task.event_handler = wng_event_handler;
+       ret = wng_open(wng);
+       if (ret < 0) {
+               PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret));
+               exit(EXIT_FAILURE);
+       }
+}
+
+static void cwt_event_handler(struct task *t)
+{
+       if (t->ret != -E_NO_WAV_HEADER && t->ret != -E_WAV_HEADER_SUCCESS) {
+               PARA_ERROR_LOG("%s\n", PARA_STRERROR(-t->ret));
+               exit(EXIT_FAILURE);
+       }
+       PARA_INFO_LOG("%s\n", PARA_STRERROR(-t->ret));
+       unregister_task(t);
+//     if (t->ret == -E_WAV_HEADER_SUCCESS) {
+//             conf.channels_arg = cwt.channels;
+//             conf.sample_rate_arg = cwt.sample_rate;
+//     }
+       idt.task.pre_select = initial_delay_pre_select;
+       idt.task.private_data = &idt;
+       idt.task.event_handler = idt_event_handler;
+       sprintf(idt.task.status, "initial_delay");
+       register_task(&idt.task);
 }
 
 int main(int argc, char *argv[])
 {
        int ret = -E_WRITE_SYNTAX;
-       struct writer_node_group *wng = NULL;
+       struct sched s;
 
        cmdline_parser(argc, argv, &conf);
+       init_supported_writers();
+       init_sched();
+
        wng = check_args();
        if (!wng)
                goto out;
-       init_supported_writers();
-       audiobuf = para_malloc(WAV_HEADER_LEN);
-       ret = read_wav_header();
-       if (ret < 0)
-               goto out;
-       ret = pcm_write(wng, check_wave());
+       stdin_set_defaults(&sit);
+       if (conf.bufsize_given)
+               sit.bufsize = conf.bufsize_arg;
+       sit.buf = para_malloc(sit.bufsize),
+       register_task(&sit.task);
+
+       cwt.task.pre_select = check_wav_pre_select;
+       cwt.task.private_data = &cwt;
+       cwt.task.event_handler = cwt_event_handler;
+       cwt.buf = sit.buf;
+       cwt.loaded = &sit.loaded;
+       cwt.eof = &sit.eof;
+       sprintf(cwt.task.status, "check wav");
+       register_task(&cwt.task);
+
+       s.default_timeout.tv_sec = 1;
+       s.default_timeout.tv_usec = 0;
+       ret = sched(&s);
+
 out:
-       wng_destroy(wng);
-       free(audiobuf);
-       if (ret < 0)
+       if (ret < 0) {
                PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret));
+               ret = EXIT_FAILURE;
+       } else
+               ret = EXIT_SUCCESS;
        return ret;
 }
index 4afff0c02b610f1a880ad7218e5f1ce79ccae5ce..02d1a164bb06d031012d86a649dea44f32053abc 100644 (file)
--- a/write.ggo
+++ b/write.ggo
@@ -24,14 +24,6 @@ option "bufsize" b
        default="64"
        optional
 
-option "prebuffer" p
-#~~~~~~~~~~~~~~~~~~~
-"delay playback until buffer is filled"
-
-       int typestr="percent"
-       default="100"
-       optional
-
 option "writer" w
 #~~~~~~~~~~~~~~~~
 
@@ -52,34 +44,3 @@ denotes microseconds since the epoch"
 
        string typestr="timeval"
        optional
-
-
-section "alsa options"
-######################
-
-option "device" d
-#~~~~~~~~~~~~~~~~
-"set PCM device"
-       string typestr="device"
-       default="plughw:0,0"
-       optional
-
-option "channels" c
-#~~~~~~~~~~~~~~~~~~
-"number of channels (only neccessary for raw
-audio)"
-
-       int typestr="num"
-       default="2"
-       optional
-
-option "sample_rate" s
-#~~~~~~~~~~~~~~~~~~~~~
-
-"force given sample rate (only neccessary for
-raw audio)"
-
-       int typestr="num"
-       default="44100"
-       optional
-
diff --git a/write.h b/write.h
index 5f8ab3ab142eff3cbd89189ede339fd4ef413b40..f4c54ca31de240b13056e684986483a05c980bf0 100644 (file)
--- a/write.h
+++ b/write.h
@@ -25,12 +25,16 @@ enum writer_enum {WRITER_ENUM};
  * decbribes one running instance of a writer
  */
 struct writer_node {
-/** points to the writer structure associated with this node */
+       /** points to the writer structure associated with this node */
        struct writer *writer;
-/** writer-specific data */
+       /** writer-specific data */
        void *private_data;
-/** send that many bytes in one go */
+       /** send that many bytes in one go */
        int chunk_bytes;
+       struct task task;
+       struct writer_node_group *wng;
+       /** the writer-specific configuration of this node */
+       void *conf;
 };
 
 /** describes one supported writer */
@@ -43,6 +47,18 @@ struct writer {
  *
  */
 void (*init)(struct writer *w);
+/**
+ *
+ *
+ * the command line parser of the writer
+ *
+ * It should check whether the command line options given by \a options are
+ * valid.  On success, it should return a pointer to the writer-specific
+ * configuration data determined by \a options.  Note that this might be called
+ * more than once with different values of \a options.
+ *
+ */
+       void * (*parse_config)(char *options);
 /**
  *
  * open one instance of this writer
@@ -62,6 +78,8 @@ int (*open)(struct writer_node *);
  *
  */
 int (*write)(char *data, size_t nbytes, struct writer_node *);
+void (*pre_select)(struct sched *s, struct task *t);
+void (*post_select)(struct sched *s, struct task *t);
 /**
  * close one instance of the writer
  *
@@ -81,16 +99,20 @@ void (*shutdown)(struct writer_node *);
  * describes a set of writer nodes that all write the same stream.
  */
 struct writer_node_group {
-/** number of nodes belonging to this group */
-unsigned num_writers;
-/** array of pointers to the corresponding writer nodes */
-struct writer_node *writer_nodes;
-/** keeps track of how many bytes have been written by each node */
-int *written;
-/** the maximum of the chunk_bytes values of the writer nodes in this group */
-size_t max_chunk_bytes;
-/** non-zero if end of file was encountered */
-int eof;
+       /** number of nodes belonging to this group */
+       unsigned num_writers;
+       /** array of pointers to the corresponding writer nodes */
+       struct writer_node *writer_nodes;
+       /** keeps track of how many bytes have been written by each node */
+       int *written;
+       /** the maximum of the chunk_bytes values of the writer nodes in this group */
+       size_t max_chunk_bytes;
+       /** non-zero if end of file was encountered */
+       int *input_eof;
+       int eof;
+       char *buf;
+       size_t *loaded;
+       struct task task;
 };
 
 /** loop over each writer node in a writer group */
index e76d7f46a537a1c00f47dc055506d39e13cf9bc2..cfdabe4821bc6af60a12b99f3c9c5ff0b03d60d5 100644 (file)
 
 #include "para.h"
 #include "string.h"
+#include "list.h"
+#include "sched.h"
 #include "write.h"
 #include "error.h"
 
 const char *writer_names[] ={WRITER_NAMES};
 struct writer writers[NUM_SUPPORTED_WRITERS] = {WRITER_ARRAY};
 
-int wng_write(struct writer_node_group *g, char *buf, size_t *loaded)
+static void wng_post_select(__a_unused struct sched *s, struct task *t)
 {
-       int ret, i, need_more_writes = 1;
+       struct writer_node_group *g = t->private_data;
+       int i;
        size_t min_written = 0;
 
-       while (need_more_writes) {
-               need_more_writes = 0;
-               FOR_EACH_WRITER_NODE(i, g) {
-                       size_t w = g->written[i];
-                       int bytes_to_write;
-                       struct writer_node *wn = &g->writer_nodes[i];
-                       if (!i)
-                               min_written = w;
-                       else
-                               min_written = PARA_MIN(min_written, w);
-                       if (w == *loaded)
-                               continue;
-                       if (!g->eof && (*loaded < wn->chunk_bytes + w))
-                               continue;
-                       bytes_to_write = PARA_MIN(wn->chunk_bytes,
-                               *loaded - w);
-                       ret = wn->writer->write(buf + w, bytes_to_write, wn);
-                       if (ret < 0)
-                               goto out;
-                       if (ret != bytes_to_write)
-                               PARA_WARNING_LOG("short write: %d/%d\n", ret,
-                                       bytes_to_write);
-                       g->written[i] += ret;
-                       need_more_writes = 1;
-               }
+       FOR_EACH_WRITER_NODE(i, g) {
+               struct writer_node *wn = &g->writer_nodes[i];
+               t->ret = wn->task.ret;
+               if (t->ret < 0)
+                       return;
+               if (!i)
+                       min_written = t->ret;
+               else
+                       min_written = PARA_MIN(min_written, t->ret);
        }
-       *loaded -= min_written;
-       ret = 0;
-       if (g->eof)
-               goto out;
-       if (*loaded)
-               memmove(buf, buf + min_written, *loaded);
-       FOR_EACH_WRITER_NODE(i, g)
-               g->written[i] -= min_written;
-       ret = 1;
-out:
-       return ret;
+       *g->loaded -= min_written;
+       if (!*g->loaded && *g->input_eof) {
+               g->eof = 1;
+               t->ret = -E_WNG_EOF;
+       } else
+               t->ret = 1;
+       if (*g->loaded && min_written)
+               memmove(g->buf, g->buf + min_written, *g->loaded);
 }
 
 int wng_open(struct writer_node_group *g)
 {
        int i, ret = 1;
 
+       PARA_NOTICE_LOG("opening wng with %d writer(s)\n", g->num_writers);
        FOR_EACH_WRITER_NODE(i, g) {
                struct writer_node *wn = &g->writer_nodes[i];
                ret = wn->writer->open(wn);
@@ -81,17 +66,38 @@ int wng_open(struct writer_node_group *g)
                        goto out;
                wn->chunk_bytes = ret;
                g->max_chunk_bytes = PARA_MAX(g->max_chunk_bytes, ret);
+               wn->wng = g;
+               PARA_DEBUG_LOG("pre_select: %p\n", &wn->writer->pre_select);
+               PARA_DEBUG_LOG("post_select: %p\n", &wn->writer->post_select);
+               wn->task.pre_select = wn->writer->pre_select;
+               wn->task.post_select = wn->writer->post_select;
+               wn->task.private_data = wn;
+               register_task(&wn->task);
        }
+       sprintf(g->task.status, "%s", "writer node group");
+       g->eof = 0;
+       register_task(&g->task);
 out:
        return ret;
 }
 
+void wng_destroy(struct writer_node_group *g)
+{
+       if (!g)
+               return;
+       free(g->written);
+       free(g->writer_nodes);
+       free(g);
+}
+
 void wng_close(struct writer_node_group *g)
 {
        int i;
 
+       PARA_NOTICE_LOG("closing wng with %d writer(s)\n", g->num_writers);
        FOR_EACH_WRITER_NODE(i, g) {
                struct writer_node *wn = &g->writer_nodes[i];
+               unregister_task(&wn->task);
                wn->writer->close(wn);
        }
 }
@@ -103,18 +109,12 @@ struct writer_node_group *wng_new(unsigned num_writers)
        g->writer_nodes = para_calloc(num_writers
                * sizeof(struct writer_node));
        g->written = para_calloc(num_writers * sizeof(size_t));
+       g->task.private_data = g;
+       g->task.post_select = wng_post_select;
+       g->task.flags = POST_ADD_TAIL;
        return g;
 }
 
-void wng_destroy(struct writer_node_group *g)
-{
-       if (!g)
-               return;
-       free(g->written);
-       free(g->writer_nodes);
-       free(g);
-}
-
 void init_supported_writers(void)
 {
        int i;
@@ -123,22 +123,30 @@ void init_supported_writers(void)
                writers[i].init(&writers[i]);
 }
 
-int check_writer_arg(const char *arg)
+void *check_writer_arg(char *wa, int *writer_num)
 {
-       int i, ret = -E_WRITE_COMMON_SYNTAX;
-       char *a = para_strdup(arg), *p = strchr(a, ':');
-       if (p)
-               *p = '\0';
-       p++;
+       int i;
+
+       *writer_num = -E_WRITE_COMMON_SYNTAX;
+       PARA_INFO_LOG("checking  %s\n", wa);
        FOR_EACH_WRITER(i) {
-               if (strcmp(writer_names[i], a))
+               const char *name = writer_names[i];
+               size_t len = strlen(name);
+               char c;
+               if (strlen(wa) < len)
+                       continue;
+               if (strncmp(name, wa, len))
                        continue;
-               ret = i;
-               goto out;
+               c = wa[len];
+               if (c && c != ' ')
+                       continue;
+               if (c && !writers[i].parse_config)
+                       return NULL;
+               *writer_num = i;
+               return writers[i].parse_config(c? wa + len + 1 : "");
        }
-out:
-       free(a);
-       return ret;
+       PARA_ERROR_LOG("%s", "writer not found\n");
+       return NULL;
 }
 
 struct writer_node_group *setup_default_wng(void)
@@ -151,7 +159,10 @@ struct writer_node_group *setup_default_wng(void)
        else
                default_writer = 1;
        wng->writer_nodes[0].writer = &writers[default_writer];
-       PARA_INFO_LOG("using default writer: %s\n",
+       sprintf(wng->writer_nodes[0].task.status, "%s",
                writer_names[default_writer]);
+       PARA_INFO_LOG("using default writer: %s %p\n",
+               writer_names[default_writer], writers[default_writer].parse_config);
+       wng->writer_nodes[0].conf = writers[default_writer].parse_config("");
        return wng;
 }
index 0eb2ae715652af243dc3f02b200d3e27012fd3b7..d9a8b59095f278c76b82e520ec4f72ff51c14010 100644 (file)
@@ -24,5 +24,5 @@ void wng_close(struct writer_node_group *g);
 struct writer_node_group *wng_new(unsigned num_writers);
 void wng_destroy(struct writer_node_group *g);
 void init_supported_writers(void);
-int check_writer_arg(const char *arg);
+void *check_writer_arg(char *wa, int *writer_num);
 struct writer_node_group *setup_default_wng(void);