From: Andre Date: Mon, 29 May 2006 02:18:16 +0000 (+0200) Subject: Merge branch 'sched' X-Git-Tag: v0.2.14~101 X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=commitdiff_plain;h=2a8029478dfc65f0c935d864faf4aea9e3deb27d;hp=535b4586ea46b370b41e25daaecabff165f49722 Merge branch 'sched' --- diff --git a/Makefile.in b/Makefile.in index c9eb46fd..12210396 100644 --- a/Makefile.in +++ b/Makefile.in @@ -133,6 +133,12 @@ grab_client.cmdline.h grab_client.cmdline.c: grab_client.ggo --arg-struct-name=$(subst .ggo,,$<)_args_info \ --file-name=$(subst .ggo,,$<).cmdline \ --func-name $(subst _filter.ggo,,$<)_cmdline_parser < $< +%_write.cmdline.h %_write.cmdline.c: %_write.ggo + gengetopt -S $(module_ggo_opts) \ + --set-package=$(subst .ggo,,$<) \ + --arg-struct-name=$(subst .ggo,,$<)_args_info \ + --file-name=$(subst .ggo,,$<).cmdline \ + --func-name $(subst _write.ggo,,$<)_cmdline_parser < $< %.cmdline.h %.cmdline.c: %.ggo case $< in client.ggo) O="--unamed-opts=command";; \ diff --git a/aac_afh.c b/aac_afh.c index fdb37a7e..e6f99b4f 100644 --- a/aac_afh.c +++ b/aac_afh.c @@ -192,12 +192,11 @@ static int aac_get_file_info(FILE *file, char *info_str, long unsigned *frames, } /* - * Simple stream reposition routine + * nothing to do as we'll seek to the correct offset in aac read_chunk() anyway */ -static int aac_reposition_stream(long unsigned request) +static int aac_reposition_stream(__a_unused long unsigned request) { return 1; -// return -E_AAC_REPOS; } static char *aac_read_chunk(long unsigned current_chunk, ssize_t *len) diff --git a/aacdec.c b/aacdec.c index c36f6877..fe046b70 100644 --- a/aacdec.c +++ b/aacdec.c @@ -25,6 +25,7 @@ #include "para.h" #include "list.h" +#include "sched.h" #include "filter.h" #include "error.h" #include "string.h" @@ -52,7 +53,7 @@ struct private_aacdec_data { static ssize_t aacdec(char *input_buffer, size_t len, struct filter_node *fn) { struct private_aacdec_data *padd = fn->private_data; - struct filter_chain_info *fci = fn->fci; + struct filter_chain *fc = fn->fc; int i, ret; unsigned char *p, *outbuffer; unsigned char *inbuf = (unsigned char*)input_buffer; @@ -60,7 +61,7 @@ static ssize_t aacdec(char *input_buffer, size_t len, struct filter_node *fn) if (fn->loaded > fn->bufsize * 4 / 5) return 0; - if (len < 1000 && !*fci->eof) + if (len < 1000 && !*fc->input_eof) return 0; if (!padd->initialized) { @@ -86,10 +87,10 @@ static ssize_t aacdec(char *input_buffer, size_t len, struct filter_node *fn) &channels) < 0) goto out; } - fci->samplerate = rate; - fci->channels = channels; + fc->samplerate = rate; + fc->channels = channels; PARA_INFO_LOG("rate: %u, channels: %d\n", - fci->samplerate, fci->channels); + fc->samplerate, fc->channels); padd->initialized = 1; } if (padd->decoder_length > 0) { diff --git a/alsa_write.ggo b/alsa_write.ggo new file mode 100644 index 00000000..4863a53a --- /dev/null +++ b/alsa_write.ggo @@ -0,0 +1,28 @@ +section "alsa options" +###################### + +option "device" d +#~~~~~~~~~~~~~~~~ +"set PCM device" + string typestr="device" + default="plughw:0,0" + optional + +option "channels" c +#~~~~~~~~~~~~~~~~~~ +"number of channels (only neccessary for raw +audio)" + + int typestr="num" + default="2" + optional + +option "sample_rate" s +#~~~~~~~~~~~~~~~~~~~~~ + +"force given sample rate (only neccessary for +raw audio)" + + int typestr="num" + default="44100" + optional diff --git a/alsa_writer.c b/alsa_writer.c index 5aa90851..7c6e8ca7 100644 --- a/alsa_writer.c +++ b/alsa_writer.c @@ -27,23 +27,28 @@ #include "para.h" #include "fd.h" #include "string.h" +#include "list.h" +#include "sched.h" #include "write.h" #include -#include "write.cmdline.h" +#include "alsa_write.cmdline.h" #include "error.h" -extern struct gengetopt_args_info conf; #define FORMAT SND_PCM_FORMAT_S16_LE /** data specific to the alsa writer */ struct private_alsa_data { -/** the alsa handle */ -snd_pcm_t *handle; -/** determined and set by alsa_open() */ -size_t bytes_per_frame; + /** the alsa handle */ + snd_pcm_t *handle; + /** determined and set by alsa_open() */ + size_t bytes_per_frame; + /** don't write anything until this time */ + struct timeval next_chunk; + /** the return value of snd_pcm_hw_params_get_buffer_time_max() */ + unsigned buffer_time; }; /* @@ -57,18 +62,15 @@ static int alsa_open(struct writer_node *w) snd_pcm_sw_params_t *swparams; snd_pcm_uframes_t buffer_size, xfer_align, start_threshold, stop_threshold; - unsigned buffer_time = 0; int err; snd_pcm_info_t *info; - snd_output_t *log; snd_pcm_uframes_t period_size; - struct private_alsa_data *pad = para_malloc(sizeof(struct private_alsa_data)); - w->private_data = pad; + struct private_alsa_data *pad = para_calloc(sizeof(struct private_alsa_data)); + struct alsa_write_args_info *conf = w->conf; + w->private_data = pad; snd_pcm_info_alloca(&info); - if (snd_output_stdio_attach(&log, stderr, 0) < 0) - return -E_ALSA_LOG; - err = snd_pcm_open(&pad->handle, conf.device_arg, + err = snd_pcm_open(&pad->handle, conf->device_arg, SND_PCM_STREAM_PLAYBACK, 0); if (err < 0) return -E_PCM_OPEN; @@ -85,23 +87,23 @@ static int alsa_open(struct writer_node *w) if (snd_pcm_hw_params_set_format(pad->handle, hwparams, FORMAT) < 0) return -E_SAMPLE_FORMAT; if (snd_pcm_hw_params_set_channels(pad->handle, hwparams, - conf.channels_arg) < 0) + conf->channels_arg) < 0) return -E_CHANNEL_COUNT; if (snd_pcm_hw_params_set_rate_near(pad->handle, hwparams, - (unsigned int*) &conf.sample_rate_arg, 0) < 0) + (unsigned int*) &conf->sample_rate_arg, 0) < 0) return -E_SET_RATE; - err = snd_pcm_hw_params_get_buffer_time_max(hwparams, &buffer_time, 0); - if (err < 0 || !buffer_time) + err = snd_pcm_hw_params_get_buffer_time_max(hwparams, &pad->buffer_time, 0); + if (err < 0 || !pad->buffer_time) return -E_GET_BUFFER_TIME; - PARA_DEBUG_LOG("buffer time: %d\n", buffer_time); + PARA_INFO_LOG("buffer time: %d\n", pad->buffer_time); if (snd_pcm_hw_params_set_buffer_time_near(pad->handle, hwparams, - &buffer_time, 0) < 0) + &pad->buffer_time, 0) < 0) return -E_SET_BUFFER_TIME; if (snd_pcm_hw_params(pad->handle, hwparams) < 0) return -E_HW_PARAMS; snd_pcm_hw_params_get_period_size(hwparams, &period_size, 0); snd_pcm_hw_params_get_buffer_size(hwparams, &buffer_size); - PARA_DEBUG_LOG("buffer size: %lu, period_size: %lu\n", buffer_size, + PARA_INFO_LOG("buffer size: %lu, period_size: %lu\n", buffer_size, period_size); if (period_size == buffer_size) return -E_BAD_PERIOD; @@ -127,44 +129,75 @@ static int alsa_open(struct writer_node *w) if (snd_pcm_sw_params(pad->handle, swparams) < 0) return -E_SW_PARAMS; pad->bytes_per_frame = snd_pcm_format_physical_width(FORMAT) - * conf.channels_arg / 8; -// if (snd_pcm_nonblock(pad->handle, 1)) -// PARA_ERROR_LOG("%s\n", "failed to set nonblock mode"); + * conf->channels_arg / 8; + if (snd_pcm_nonblock(pad->handle, 1)) + PARA_ERROR_LOG("%s\n", "failed to set nonblock mode"); return period_size * pad->bytes_per_frame; } +static void alsa_write_pre_select(struct sched *s, struct task *t) +{ + struct writer_node *wn = t->private_data; + struct private_alsa_data *pad = wn->private_data; + struct writer_node_group *wng = wn->wng; + struct timeval diff; -/** - * push out pcm frames - * \param data pointer do data to be written - * \param nbytes number of bytes (not frames) - * - * \return Number of bytes written, -E_ALSA_WRITE on errors. - */ -static int alsa_write(char *data, size_t nbytes, struct writer_node *wn) + t->ret = 0; + if (*wng->input_eof && *wng->loaded < pad->bytes_per_frame) + return; + t->ret = 1; + if (*wng->loaded < pad->bytes_per_frame) + return; + if (tv_diff(&s->now, &pad->next_chunk, &diff) < 0) { + if (tv_diff(&s->timeout, &diff, NULL) > 0) + s->timeout = diff; + } else { + s->timeout.tv_sec = 0; + s->timeout.tv_usec = 0; + } +} + +static void alsa_write_post_select(struct sched *s, struct task *t) { + struct writer_node *wn = t->private_data; struct private_alsa_data *pad = wn->private_data; - size_t frames = nbytes / pad->bytes_per_frame; - unsigned char *d = (unsigned char*)data; - snd_pcm_sframes_t r, result = 0; - - while (frames > 0) { - /* write interleaved frames */ - r = snd_pcm_writei(pad->handle, d, frames); - if (r < 0) - PARA_ERROR_LOG("write error: %s\n", snd_strerror(r)); - if (r == -EAGAIN || (r >= 0 && r < frames)) - snd_pcm_wait(pad->handle, 1); - else if (r == -EPIPE) + struct writer_node_group *wng = wn->wng; + size_t frames = *wng->loaded / pad->bytes_per_frame; + snd_pcm_sframes_t ret, result = 0; + unsigned char *data = (unsigned char*)wng->buf; + + t->ret = 0; + if (!frames) { + if (*wng->input_eof) + t->ret = *wng->loaded; + return; + } + if (tv_diff(&s->now, &pad->next_chunk, NULL) < 0) + return; +// PARA_INFO_LOG("%zd frames\n", frames); +// while (frames > 0) { + ret = snd_pcm_writei(pad->handle, data, frames); + if (ret == -EAGAIN || (ret >= 0 && ret < frames)) { + struct timeval tv; + ms2tv(pad->buffer_time / 2000, &tv); + PARA_DEBUG_LOG("EAGAIN. frames: %d, ret: %lu\n", frames, ret); + tv_add(&s->now, &tv, &pad->next_chunk); + } else if (ret == -EPIPE) { + PARA_WARNING_LOG("%s", "EPIPE\n"); snd_pcm_prepare(pad->handle); - else if (r < 0) - return -E_ALSA_WRITE; - if (r > 0) { - result += r; - frames -= r; - d += r * pad->bytes_per_frame; + } else if (ret < 0) { + t->ret = -E_ALSA_WRITE; + return; } - } - return result * pad->bytes_per_frame; + if (ret >= 0) { + result += ret; + frames -= ret; + data += ret * pad->bytes_per_frame; + } +// if (ret == -EAGAIN) +// break; +// } + t->ret = result * pad->bytes_per_frame; +// PARA_INFO_LOG("ret: %d, frames: %zd\n", t->ret, frames); } static void alsa_close(struct writer_node *wn) @@ -176,11 +209,28 @@ static void alsa_close(struct writer_node *wn) free(pad); } +__malloc void *alsa_parse_config(char *options) +{ + struct alsa_write_args_info *conf + = para_calloc(sizeof(struct alsa_write_args_info)); + PARA_INFO_LOG("options: %s, %d\n", options, strcspn(options, " \t")); + int ret = alsa_cmdline_parser_string(options, conf, "alsa_write"); + if (ret) + goto err_out; + PARA_INFO_LOG("help given: %d\n", conf->help_given); + return conf; +err_out: + free(conf); + return NULL; +} + /** the init function of the alsa writer */ void alsa_writer_init(struct writer *w) { w->open = alsa_open; - w->write = alsa_write; w->close = alsa_close; + w->pre_select = alsa_write_pre_select; + w->post_select = alsa_write_post_select; + w->parse_config = alsa_parse_config; w->shutdown = NULL; /* nothing to do */ } diff --git a/audiod.c b/audiod.c index e1551617..d635fbfc 100644 --- a/audiod.c +++ b/audiod.c @@ -18,12 +18,12 @@ /** \file audiod.c the paraslash's audio daemon */ -#include /* gettimeofday */ #include "para.h" #include "audiod.cmdline.h" #include "list.h" #include "close_on_fork.h" +#include "sched.h" #include "recv.h" #include "filter.h" #include "grab_client.cmdline.h" @@ -35,6 +35,8 @@ #include "daemon.h" #include "string.h" #include "fd.h" +#include "write.h" +#include "write_common.h" /** define the array of error lists needed by para_audiod */ INIT_AUDIOD_ERRLISTS; @@ -52,92 +54,125 @@ enum {AUDIOD_OFF, AUDIOD_ON, AUDIOD_STANDBY}; /** defines how to handle one supported audio format */ struct audio_format_info { -/** pointer to the receiver for this audio format */ + /** pointer to the receiver for this audio format */ struct receiver *receiver; -/** the receiver configuration */ + /** the receiver configuration */ void *receiver_conf; -/** the number of filters that should be activated for this audio format */ + /** the number of filters that should be activated for this audio format */ unsigned int num_filters; -/** pointer to the array of filters to be activated */ + /** pointer to the array of filters to be activated */ struct filter **filters; -/** pointer to the array of filter configurations */ + /** pointer to the array of filter configurations */ void **filter_conf; -/** output of the last filter is written to stdin of this command */ - char *write_cmd; -/** do not start receiver/filters/writer before this time */ + /** the number of filters that should be activated for this audio format */ + unsigned int num_writers; + /** pointer to the array of writers to be activated */ + struct writer **writers; + /** pointer to the array of writer configurations */ + void **writer_conf; + /** do not start receiver/filters/writer before this time */ struct timeval restart_barrier; }; /** * describes one instance of a receiver-filter-writer chain * - * \sa receier_node, receiver, filter, filter_node, filter_chain_info + * \sa receier_node, receiver, filter, filter_node, filter_chain, writer, + * writer_node, writer_node_group. */ struct slot_info { -/** number of the audio format in this slot */ + /** number of the audio format in this slot */ int format; -/** the file descriptor of the writer */ - int write_fd; -/** the process id of the writer */ - pid_t wpid; -/** time of the last successful read from the receiver */ - struct timeval rtime; -/** time the last write to the write fd happend */ - struct timeval wtime; -/** writer start time */ + /** writer start time */ struct timeval wstime; -/** did we include \a write_fd in the fdset */ - int wcheck; -/** set to one if we have sent the TERM signal to \a wpid */ - int wkilled; -/** the receiver info associated with this slot */ + /** the receiver info associated with this slot */ struct receiver_node *receiver_node; -/** the active filter chain */ - struct filter_chain_info *fci; + /** the active filter chain */ + struct filter_chain *fc; + /** the active writer node group */ + struct writer_node_group *wng; }; - static struct slot_info slot[MAX_STREAM_SLOTS]; -/** defines one command of para_audiod */ -struct audiod_command { -/** the name of the command */ -const char *name; -/** pointer to the function that handles the command */ -int (*handler)(int, int, char**); -int (*line_handler)(int, char*); -/** one-line description of the command */ -const char *description; -/** summary of the command line options */ -const char *synopsis; -/** the long help text */ -const char *help; -}; - extern const char *status_item_list[NUM_STAT_ITEMS]; -static int com_grab(int, char *); -static int com_cycle(int, int, char **); -static int com_help(int, int, char **); -static int com_off(int, int, char **); -static int com_on(int, int, char **); -static int com_sb(int, int, char **); -static int com_stat(int, int, char **); -static int com_term(int, int, char **); -static int stat_pipe = -1, signal_pipe; - static struct gengetopt_args_info conf; static struct timeval server_stream_start, sa_time_diff; static int playing, current_decoder = -1, audiod_status = AUDIOD_ON, offset_seconds, length_seconds, - sa_time_diff_sign = 1, audiod_socket = -1; + sa_time_diff_sign = 1; static char *af_status, /* the audio format announced in server status */ *socket_name, *hostname; static char *stat_item_values[NUM_STAT_ITEMS]; static FILE *logfile; static const struct timeval restart_delay = {0, 300 * 1000}; - static struct audio_format_info afi[NUM_AUDIO_FORMATS]; +static struct timeval *now; + +static struct signal_task signal_task_struct, *sig_task = &signal_task_struct; + +/** + * the task for handling audiod commands + * + * \sa struct task, struct sched + */ +struct command_task { + /** the local listening socket */ + int fd; + /** the associated task structure */ + struct task task; +}; + +/** + * the task for audiod's child (para_client stat) + * + * \sa struct task, struct sched + */ +struct status_task { + /** the output of the stat command is read from this fd */ + int fd; + /** stat data is stored here */ + char buf[STRINGSIZE]; + /** number of bytes loaded in \a buf */ + unsigned loaded; + /** the associated task structure */ + struct task task; +}; +static struct status_task status_task_struct, *stat_task = &status_task_struct; + +struct signal_task { + int fd; + int signum; + struct task task; +}; +/** defines one command of para_audiod */ +struct audiod_command { + /** the name of the command */ + const char *name; + /** pointer to the function that handles the command */ + int (*handler)(int, int, char**); + /** + * if the command prefers to handle the full line (rather than the usual + * argv[] array), it stores a pointer to the corresponding line handling + * function here. In this case, the above \a handler pointer must be NULL. + */ + int (*line_handler)(int, char*); + /** one-line description of the command */ + const char *description; + /** summary of the command line options */ + const char *synopsis; + /** the long help text */ + const char *help; +}; +static int com_grab(int, char *); +static int com_cycle(int, int, char **); +static int com_help(int, int, char **); +static int com_off(int, int, char **); +static int com_on(int, int, char **); +static int com_sb(int, int, char **); +static int com_stat(int, int, char **); +static int com_term(int, int, char **); static struct audiod_command cmds[] = { { .name = "cycle", @@ -234,7 +269,7 @@ static struct audiod_command cmds[] = { }; /** iterate over all slots */ -#define FOR_EACH_SLOT(slot) for (slot = 0; slot < MAX_STREAM_SLOTS; slot++) +#define FOR_EACH_SLOT(_slot) for (_slot = 0; _slot < MAX_STREAM_SLOTS; _slot++) /** iterate over all supported audio formats */ #define FOR_EACH_AUDIO_FORMAT(af) for (af = 0; af < NUM_AUDIO_FORMATS; af++) /** iterate over the array of all audiod commands */ @@ -289,7 +324,7 @@ static int client_write(int fd, const char *buf) static char *get_time_string(struct timeval *newest_stime) { - struct timeval now, diff, adj_stream_start, tmp; + struct timeval diff, adj_stream_start, tmp; int total = 0, use_server_time = 1; if (!playing) { @@ -313,8 +348,7 @@ static char *get_time_string(struct timeval *newest_stime) use_server_time = 0; } } - gettimeofday(&now, NULL); - tv_diff(&now, &tmp, &diff); + tv_diff(now, &tmp, &diff); total = diff.tv_sec + offset_seconds; if (total > length_seconds) total = length_seconds; @@ -349,7 +383,7 @@ static struct timeval *wstime(void) struct timeval *max = NULL; FOR_EACH_SLOT(i) { struct slot_info *s = &slot[i]; - if (s->wpid <= 0) + if (!s->wng) continue; if (max && tv_diff(&s->wstime, max, NULL) <= 0) continue; @@ -367,7 +401,7 @@ __malloc static char *decoder_flags(void) char flag = '0'; if (s->receiver_node) flag += 1; - if (s->wpid > 0) + if (s->wng) flag += 2; decoder_flags[i] = flag; } @@ -390,8 +424,8 @@ static char *configfile_exists(void) static void setup_signal_handling(void) { - signal_pipe = para_signal_init(); - PARA_INFO_LOG("signal pipe: fd %d\n", signal_pipe); + sig_task->fd = para_signal_init(); + PARA_INFO_LOG("signal pipe: fd %d\n", sig_task->fd); para_install_sighandler(SIGINT); para_install_sighandler(SIGTERM); para_install_sighandler(SIGCHLD); @@ -440,30 +474,6 @@ static void clear_slot(int slot_num) s->format = -1; } -static void kill_stream_writer(int slot_num) -{ - struct slot_info *s = &slot[slot_num]; - - if (s->format < 0 || s->wkilled || s->wpid <= 0) - return; - PARA_DEBUG_LOG("kill -TERM %d (%s stream writer in slot %d)\n", - s->wpid, audio_formats[s->format], slot_num); - kill(s->wpid, SIGTERM); - s->wkilled = 1; - s->fci->error = 1; -} - -static void set_restart_barrier(int format, struct timeval *now) -{ - struct timeval tmp; - - if (now) - tmp = *now; - else - gettimeofday(&tmp, NULL); - tv_add(&tmp, &restart_delay, &afi[format].restart_barrier); -} - static void close_receiver(int slot_num) { struct slot_info *s = &slot[slot_num]; @@ -472,53 +482,26 @@ static void close_receiver(int slot_num) if (s->format < 0 || !s->receiver_node) return; a = &afi[s->format]; - PARA_NOTICE_LOG("closing %s recevier in slot %d\n", - audio_formats[s->format] , slot_num); + PARA_NOTICE_LOG("closing %s receiver in slot %d (eof = %d)\n", + audio_formats[s->format] , slot_num, s->receiver_node->eof); + if (!s->receiver_node->eof) + unregister_task(&s->receiver_node->task); a->receiver->close(s->receiver_node); free(s->receiver_node); s->receiver_node = NULL; - set_restart_barrier(s->format, NULL); + /* set restart barrier */ + tv_add(now, &restart_delay, &afi[s->format].restart_barrier); } static void kill_all_decoders(void) { int i; - FOR_EACH_SLOT(i) - if (slot[i].format >= 0) { - PARA_INFO_LOG("stopping decoder in slot %d\n", i); - kill_stream_writer(i); - } -} - -static void check_sigchld(void) -{ - pid_t pid; - int i; - struct timeval now; - gettimeofday(&now, NULL); - -reap_next_child: - pid = para_reap_child(); - if (pid <= 0) - return; FOR_EACH_SLOT(i) { struct slot_info *s = &slot[i]; - long lifetime; - if (s->format < 0) - continue; - if (pid == s->wpid) { - s->wpid = -1; - lifetime = now.tv_sec - s->wstime.tv_sec; - PARA_INFO_LOG("%s stream writer in slot %d died " - "after %li secs\n", - audio_formats[s->format], i, lifetime); - set_restart_barrier(s->format, &now); - goto reap_next_child; - } + if (s->receiver_node) + s->receiver_node->eof = 1; } - PARA_CRIT_LOG("para_client died (pid %d)\n", pid); - goto reap_next_child; } static int get_empty_slot(void) @@ -532,11 +515,7 @@ static int get_empty_slot(void) clear_slot(i); return i; } - if (s->write_fd > 0 || s->wpid > 0) - continue; - if (s->receiver_node) - continue; - if (s->fci) + if (s->wng || s->receiver_node || s->fc) continue; clear_slot(i); return i; @@ -553,7 +532,7 @@ static int decoder_running(int format) s = &slot[i]; if (s->format == format && s->receiver_node) ret |= 1; - if (s->format == format && s->wpid > 0) + if (s->format == format && s->wng) ret |= 2; } return ret; @@ -563,12 +542,12 @@ static void close_stat_pipe(void) { int i; - if (stat_pipe < 0) + if (stat_task->fd < 0) return; PARA_NOTICE_LOG("%s", "closing status pipe\n"); - close(stat_pipe); - del_close_on_fork_list(stat_pipe); - stat_pipe = -1; + close(stat_task->fd); + del_close_on_fork_list(stat_task->fd); + stat_task->fd = -1; kill_all_decoders(); for (i = 0; i < NUM_STAT_ITEMS; i++) { free(stat_item_values[i]); @@ -579,7 +558,8 @@ static void close_stat_pipe(void) offset_seconds = 0; audiod_status_dump(); playing = 0; - stat_item_values[SI_STATUS_BAR] = make_message("%s:no connection to para_server\n", + stat_item_values[SI_STATUS_BAR] = make_message( + "%s:no connection to para_server\n", status_item_list[SI_STATUS_BAR]); stat_client_write(stat_item_values[SI_STATUS_BAR], SI_STATUS_BAR); } @@ -587,43 +567,31 @@ static void close_stat_pipe(void) static void __noreturn clean_exit(int status, const char *msg) { PARA_EMERG_LOG("%s\n", msg); - kill_all_decoders(); if (socket_name) unlink(socket_name); - if (stat_pipe >= 0) + if (stat_task->fd >= 0) close_stat_pipe(); exit(status); } -__malloc static char *glob_cmd(char *cmd) +/** + * get the number of filters + * + * \param audio_format_num the number identifying the audio format + * + * \return the number of filters for the given audio format + * + * \sa struct filter; + */ +int num_filters(int audio_format_num) { - char *ret, *replacement; - struct timeval tmp, delay, rss; /* real stream start */ - - delay.tv_sec = conf.stream_delay_arg / 1000; - delay.tv_usec = (conf.stream_delay_arg % 1000) * 1000; -// PARA_INFO_LOG("delay: %lu:%lu\n", delay.tv_sec, delay.tv_usec); - if (sa_time_diff_sign < 0) - tv_add(&server_stream_start, &sa_time_diff, &rss); - else - tv_diff(&server_stream_start, &sa_time_diff, &rss); - tv_add(&rss, &delay, &tmp); - replacement = make_message("%lu:%lu", - (long unsigned)tmp.tv_sec, - (long unsigned)tmp.tv_usec); - ret = s_a_r(cmd, "STREAM_START", replacement); - free(replacement); - if (!ret) - goto out; - PARA_INFO_LOG("cmd: %s, repl: %s\n", cmd, ret); -out: - return ret; + return afi[audio_format_num].num_filters; } -/** get the number of filters for the given audio format */ -int num_filters(int audio_format_num) +static void filter_event_handler(struct task *t) { - return afi[audio_format_num].num_filters; + PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret)); + unregister_task(t); } static void open_filters(int slot_num) @@ -633,31 +601,37 @@ static void open_filters(int slot_num) int nf = a->num_filters; int i; - s->fci = para_calloc(sizeof(struct filter_chain_info)); - INIT_LIST_HEAD(&s->fci->filters); + s->fc = NULL; if (!nf) return; - s->fci->inbuf = s->receiver_node->buf; - s->fci->in_loaded = &s->receiver_node->loaded; - s->fci->outbuf = s->receiver_node->buf; - s->fci->out_loaded = &s->receiver_node->loaded; - s->fci->eof = &s->receiver_node->eof; + PARA_INFO_LOG("opening %s filters\n", audio_formats[s->format]); + s->fc = para_calloc(sizeof(struct filter_chain)); + INIT_LIST_HEAD(&s->fc->filters); + s->fc->inbuf = s->receiver_node->buf; + s->fc->in_loaded = &s->receiver_node->loaded; + s->fc->input_eof = &s->receiver_node->eof; + + s->fc->task.pre_select = filter_pre_select; + s->fc->task.event_handler = filter_event_handler; + s->fc->task.private_data = s->fc; + s->fc->task.flags = 0; + s->fc->eof = 0; + sprintf(s->fc->task.status, "filter chain"); for (i = 0; i < nf; i++) { struct filter_node *fn = para_calloc(sizeof(struct filter_node)); fn->conf = a->filter_conf[i]; - fn->fci = s->fci; + fn->fc = s->fc; fn->filter = a->filters[i]; INIT_LIST_HEAD(&fn->callbacks); - list_add_tail(&fn->node, &s->fci->filters); + list_add_tail(&fn->node, &s->fc->filters); fn->filter->open(fn); PARA_NOTICE_LOG("%s filter %d/%d (%s) started in slot %d\n", audio_formats[s->format], i + 1, nf, fn->filter->name, slot_num); - s->fci->outbuf = fn->buf; - s->fci->out_loaded = &fn->loaded; + s->fc->outbuf = fn->buf; + s->fc->out_loaded = &fn->loaded; } - PARA_DEBUG_LOG("output buffer for filter chain %p: %p\n", s->fci, - s->fci->outbuf); + register_task(&s->fc->task); } static struct filter_node *find_filter_node(int slot_num, int format, int filternum) @@ -667,7 +641,7 @@ static struct filter_node *find_filter_node(int slot_num, int format, int filter FOR_EACH_SLOT(i) { struct slot_info *s = &slot[i]; - if (s->format < 0 || !s->fci) + if (s->format < 0 || !s->fc) continue; if (slot_num >= 0 && slot_num != i) continue; @@ -677,7 +651,7 @@ static struct filter_node *find_filter_node(int slot_num, int format, int filter continue; /* success */ j = 1; - list_for_each_entry(fn, &s->fci->filters, node) + list_for_each_entry(fn, &s->fc->filters, node) if (filternum <= 0 || j++ == filternum) break; return fn; @@ -685,32 +659,50 @@ static struct filter_node *find_filter_node(int slot_num, int format, int filter return NULL; } -static void start_stream_writer(int slot_num) +static void wng_event_handler(struct task *t) { - int ret, fds[3] = {1, -1, -1}; + PARA_INFO_LOG("%s\n", PARA_STRERROR(-t->ret)); + unregister_task(t); +} + +static void open_writers(int slot_num) +{ + int ret, i; struct slot_info *s = &slot[slot_num]; struct audio_format_info *a = &afi[s->format]; - char *glob = NULL; - - if (a->write_cmd) - glob = glob_cmd(a->write_cmd); - if (!glob) - glob = para_strdup("para_write -w alsa"); - PARA_INFO_LOG("starting stream writer: %s\n", glob); - open_filters(slot_num); - ret = para_exec_cmdline_pid(&s->wpid, glob, fds); - free(glob); - if (ret < 0) { - PARA_ERROR_LOG("exec failed (%d)\n", ret); - return; + + PARA_INFO_LOG("opening %s writers\n", audio_formats[s->format]); + if (!a->num_writers) + s->wng = setup_default_wng(); + else + s->wng = wng_new(a->num_writers); + if (s->fc) { + s->wng->buf = s->fc->outbuf; + s->wng->loaded = s->fc->out_loaded; + s->wng->input_eof = &s->fc->eof; + s->fc->output_eof = &s->wng->eof; + } else { + s->wng->buf = s->receiver_node->buf; + s->wng->loaded = &s->receiver_node->loaded; + s->wng->input_eof = &s->receiver_node->eof; + } + s->wng->task.event_handler = wng_event_handler; + for (i = 0; i < a->num_writers; i++) { + s->wng->writer_nodes[i].conf = a->writer_conf[i]; + s->wng->writer_nodes[i].writer = a->writers[i]; + sprintf(s->wng->writer_nodes[i].task.status, "writer_node"); } - s->write_fd = fds[0]; - add_close_on_fork_list(s->write_fd); - /* we write to this fd in do_select, so we need non-blocking */ - mark_fd_nonblock(s->write_fd); - gettimeofday(&s->wstime, NULL); + ret = wng_open(s->wng); + s->wstime = *now; current_decoder = slot_num; - activate_inactive_grab_clients(slot_num, s->format, &s->fci->filters); + activate_inactive_grab_clients(slot_num, s->format, &s->fc->filters); +} + +static void rn_event_handler(struct task *t) +{ +// struct receiver_node *rn = t->private_data; + PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret)); + unregister_task(t); } static void open_receiver(int format) @@ -718,16 +710,17 @@ static void open_receiver(int format) struct audio_format_info *a = &afi[format]; struct slot_info *s; int ret, slot_num; + struct receiver_node *rn; slot_num = get_empty_slot(); if (slot_num < 0) clean_exit(EXIT_FAILURE, PARA_STRERROR(-slot_num)); s = &slot[slot_num]; s->format = format; - gettimeofday(&s->rtime, NULL); - s->wtime = s->rtime; s->receiver_node = para_calloc(sizeof(struct receiver_node)); - s->receiver_node->conf = a->receiver_conf; + rn = s->receiver_node; + rn->receiver = a->receiver; + rn->conf = a->receiver_conf; ret = a->receiver->open(s->receiver_node); if (ret < 0) { PARA_ERROR_LOG("failed to open receiver (%s)\n", @@ -738,18 +731,23 @@ static void open_receiver(int format) } PARA_NOTICE_LOG("started %s: %s receiver in slot %d\n", audio_formats[s->format], a->receiver->name, slot_num); + rn->task.private_data = s->receiver_node; + rn->task.pre_select = a->receiver->pre_select; + rn->task.post_select = a->receiver->post_select; + rn->task.event_handler = rn_event_handler; + rn->task.flags = 0; + sprintf(rn->task.status, "receiver node"); + register_task(&rn->task); } static int is_frozen(int format) { - struct timeval now; struct audio_format_info *a = &afi[format]; - gettimeofday(&now, NULL); - return (tv_diff(&now, &a->restart_barrier, NULL) > 0)? 0 : 1; + return (tv_diff(now, &a->restart_barrier, NULL) > 0)? 0 : 1; } -static void start_current_receiver(void) +static void open_current_receiver(void) { int i; @@ -758,21 +756,20 @@ static void start_current_receiver(void) i = get_audio_format_num(af_status); if (i < 0) return; - if ((decoder_running(i) & 1) || is_frozen(i)) + if (decoder_running(i) || is_frozen(i)) return; open_receiver(i); } static void compute_time_diff(const struct timeval *status_time) { - struct timeval now, tmp, diff; + struct timeval tmp, diff; static int count; int sign; const struct timeval max_deviation = {0, 500 * 1000}; const int time_smooth = 5; - gettimeofday(&now, NULL); - sign = tv_diff(status_time, &now, &diff); + sign = tv_diff(status_time, now, &diff); // PARA_NOTICE_LOG("%s: sign = %i, sa_time_diff_sign = %i\n", __func__, // sign, sa_time_diff_sign); if (!count) { @@ -855,7 +852,12 @@ static void handle_signal(int sig) { switch (sig) { case SIGCHLD: - return check_sigchld(); + for (;;) { + pid_t pid = para_reap_child(); + if (pid <= 0) + return; + PARA_CRIT_LOG("para_client died (pid %d)\n", pid); + } case SIGINT: case SIGTERM: case SIGHUP: @@ -865,179 +867,64 @@ static void handle_signal(int sig) } } -static void check_timeouts(void) -{ - struct timeval now; - int slot_num, timeout = conf.stream_timeout_arg; - - gettimeofday(&now, NULL); - FOR_EACH_SLOT(slot_num) { - struct slot_info *s = &slot[slot_num]; - if (s->format < 0) - continue; - /* check read time */ - if (s->receiver_node && - now.tv_sec > s->rtime.tv_sec + timeout) { - PARA_INFO_LOG("%s input buffer (slot %d) not ready\n", - audio_formats[s->format], slot_num); - if (s->fci) - s->fci->error = 42; - else - close_receiver(slot_num); - } - /* check write time */ - if (s->wpid > 0 && !s->wkilled && - now.tv_sec > s->wtime.tv_sec + timeout) { - PARA_INFO_LOG("%s output buffer (slot %d) not ready\n", - audio_formats[s->format], slot_num); - if (s->fci) - s->fci->error = 42; - } - } -} - -static size_t get_loaded_bytes(int slot_num) +static void try_to_close_slot(int slot_num) { - size_t loaded = 0; struct slot_info *s = &slot[slot_num]; - struct receiver_node *rn = s->receiver_node; - - if (s->format < 0) - goto out; - - if (afi[s->format].num_filters) { - if (s->fci) - loaded = *s->fci->out_loaded; - } else { - if (rn) - loaded = rn->loaded; - } -out: - return loaded; -} - - -static void close_decoder_if_idle(int slot_num) -{ - struct slot_info *s = &slot[slot_num]; - struct receiver_node *rn = s->receiver_node; if (s->format < 0) return; - if (!s->fci) + if (s->receiver_node && !s->receiver_node->eof) return; - if (!rn->eof && !s->fci->error && s->wpid > 0) + if (s->fc && !s->fc->eof) return; - if (!s->fci->error && s->wpid > 0) { /* eof */ - if (filter_io(s->fci) > 0) - return; - if (get_loaded_bytes(slot_num)) - return; - } - if (s->write_fd > 0) { - PARA_INFO_LOG("err: %d\n", s->fci->error); - PARA_INFO_LOG("slot %d: closing write fd %d\n", slot_num, - s->write_fd); - close(s->write_fd); - del_close_on_fork_list(s->write_fd); - s->write_fd = -1; - } - if (s->wpid > 0) - return; /* wait until writer dies before closing filters */ - PARA_INFO_LOG("closing all filters in slot %d (filter_chain %p)\n", - slot_num, s->fci); - close_filters(s->fci); - free(s->fci); + if (s->wng && !s->wng->eof) + return; + PARA_INFO_LOG("closing slot %d \n", slot_num); + wng_close(s->wng); + wng_destroy(s->wng); + close_filters(s->fc); + free(s->fc); close_receiver(slot_num); clear_slot(slot_num); } -static void set_stream_fds(fd_set *wfds, int *max_fileno) +static void audiod_pre_select(struct sched *s, __a_unused struct task *t) { int i; - check_timeouts(); + now = &s->now; + if (audiod_status != AUDIOD_ON) + kill_all_decoders(); + else if (playing) + open_current_receiver(); FOR_EACH_SLOT(i) { - struct slot_info *s = &slot[i]; - struct audio_format_info *a; struct receiver_node *rn; - close_decoder_if_idle(i); - s->wcheck = 0; - if (s->format < 0) + try_to_close_slot(i); + if (slot[i].format < 0) continue; - a = &afi[s->format]; - rn = s->receiver_node; - if (rn && rn->loaded && !s->wpid) { - PARA_INFO_LOG("no writer in slot %d\n", i); - start_stream_writer(i); + rn = slot[i].receiver_node; + if (rn && rn->loaded && !slot[i].wng) { + open_filters(i); + open_writers(i); } - if (s->write_fd <= 0) - continue; - if (!get_loaded_bytes(i)) - continue; - para_fd_set(s->write_fd, wfds, max_fileno); - s->wcheck = 1; } } -static int write_audio_data(int slot_num) +static void audiod_post_select(struct sched *s, __a_unused struct task *t) { - struct slot_info *s = &slot[slot_num]; - struct audio_format_info *a = &afi[s->format]; - struct receiver_node *rn = s->receiver_node; - int rv; - char **buf; - size_t *len; - - if (a->num_filters) { - buf = &s->fci->outbuf; - len = s->fci->out_loaded; - } else { - buf = &rn->buf; - len = &rn->loaded; - } - PARA_DEBUG_LOG("writing %p (%zd bytes)\n", *buf, *len); - rv = write(s->write_fd, *buf, *len); - PARA_DEBUG_LOG("wrote %d/%zd\n", rv, *len); - if (rv < 0) { - PARA_WARNING_LOG("write error in slot %d (fd %d): %s\n", - slot_num, s->write_fd, strerror(errno)); - *len = 0; - s->fci->error = E_WRITE_AUDIO_DATA; - } else if (rv != *len) { - PARA_DEBUG_LOG("partial %s write (%i/%zd) for slot %d\n", - audio_formats[s->format], rv, *len, slot_num); - *len -= rv; - memmove(*buf, *buf + rv, *len); - } else - *len = 0; - if (rv > 0) - gettimeofday(&s->wtime, NULL); - return rv; + /* only save away the current time for other users */ + now = &s->now; } -static void slot_io(fd_set *wfds) +static void init_audiod_task(struct task *t) { - int ret, i; - - FOR_EACH_SLOT(i) { - struct slot_info *s = &slot[i]; - struct receiver_node *rn = s->receiver_node; - - if (rn && rn->loaded) - gettimeofday(&s->rtime, NULL); - if (s->format >= 0 && s->write_fd > 0 && s->fci) { - ret = filter_io(s->fci); - if (ret < 0) - s->fci->error = -ret; -// PARA_DEBUG_LOG("slot %d, filter io %d bytes, check write: %d, loaded: %d/%d, eof: %d\n", -// i, ret, s->wcheck, rn->loaded, *s->fci->out_loaded, rn->eof); - } - if (s->write_fd <= 0 || !s->wcheck || !FD_ISSET(s->write_fd, wfds)) - continue; - write_audio_data(i); - } + t->pre_select = audiod_pre_select; + t->post_select = audiod_post_select; + t->event_handler = NULL; + t->private_data = t; + t->flags = 0; + sprintf(t->status, "audiod task"); } static int parse_stream_command(const char *txt, char **cmd) @@ -1072,53 +959,51 @@ static int add_filter(int format, char *cmdline) return filter_num; } -static int setup_default_filters(void) +static int init_writers(void) { - int i, ret = 1; + int i, ret, nw; + char *cmd; + struct audio_format_info *a; + init_supported_writers(); + nw = PARA_MAX(1, conf.writer_given); + PARA_INFO_LOG("maximal number of writers: %d\n", nw); FOR_EACH_AUDIO_FORMAT(i) { - struct audio_format_info *a = &afi[i]; - char *tmp; - int j; - if (a->num_filters) - continue; - /* add "dec" to audio format name */ - tmp = make_message("%sdec", audio_formats[i]); - for (j = 0; filters[j].name; j++) - if (!strcmp(tmp, filters[j].name)) - break; - free(tmp); - ret = -E_UNSUPPORTED_FILTER; - if (!filters[j].name) - goto out; - tmp = para_strdup(filters[j].name); - ret = add_filter(i, tmp); - free(tmp); + a = &afi[i]; + a->writer_conf = para_malloc(nw * sizeof(void *)); + a->writers = para_malloc(nw * sizeof(struct writer *)); + a->num_writers = 0; + } + for (i = 0; i < conf.writer_given; i++) { + void *wconf; + int writer_num; + ret = parse_stream_command(conf.writer_arg[i], &cmd); if (ret < 0) goto out; - PARA_INFO_LOG("%s -> default filter: %s\n", audio_formats[i], - filters[j].name); - ret = add_filter(i, "wav"); - if (ret < 0) + a = &afi[ret]; + nw = a->num_writers; + wconf = check_writer_arg(cmd, &writer_num); + if (!wconf) { + ret = writer_num; goto out; - PARA_INFO_LOG("%s -> default filter: wav\n", audio_formats[i]); + } + a->writers[nw] = &writers[writer_num]; + a->writer_conf[nw] = wconf; + PARA_INFO_LOG("%s writer #%d: %s\n", audio_formats[ret], + nw, writer_names[writer_num]); + a->num_writers++; } + ret = 1; out: return ret; } -static int init_stream_io(void) +static int init_receivers(void) { - int i, ret, receiver_num, nf; - char *cmd; + int i, ret, receiver_num; + char *cmd = NULL; + struct audio_format_info *a; - for (i = 0; i < conf.stream_write_cmd_given; i++) { - ret = parse_stream_command(conf.stream_write_cmd_arg[i], &cmd); - if (ret < 0) - goto out; - afi[ret].write_cmd = para_strdup(cmd); - PARA_INFO_LOG("%s write command: %s\n", audio_formats[ret], afi[ret].write_cmd); - } for (i = 0; receivers[i].name; i++) { PARA_INFO_LOG("initializing %s receiver\n", receivers[i].name); receivers[i].init(&receivers[i]); @@ -1147,7 +1032,7 @@ static int init_stream_io(void) */ cmd = para_strdup(receivers[0].name); FOR_EACH_AUDIO_FORMAT(i) { - struct audio_format_info *a = &afi[i]; + a = &afi[i]; if (a->receiver_conf) continue; a->receiver_conf = check_receiver_arg(cmd, &receiver_num); @@ -1155,17 +1040,57 @@ static int init_stream_io(void) return -E_RECV_SYNTAX; a->receiver = &receivers[receiver_num]; } + ret = 1; +out: free(cmd); - /* filters */ + return ret; +} + +static int init_default_filters(void) +{ + int i, ret = 1; + + FOR_EACH_AUDIO_FORMAT(i) { + struct audio_format_info *a = &afi[i]; + char *tmp; + int j; + + if (a->num_filters) + continue; /* no default -- nothing to to */ + /* add "dec" to audio format name */ + tmp = make_message("%sdec", audio_formats[i]); + for (j = 0; filters[j].name; j++) + if (!strcmp(tmp, filters[j].name)) + break; + free(tmp); + ret = -E_UNSUPPORTED_FILTER; + if (!filters[j].name) + goto out; + tmp = para_strdup(filters[j].name); + ret = add_filter(i, tmp); + free(tmp); + if (ret < 0) + goto out; + PARA_INFO_LOG("%s -> default filter: %s\n", audio_formats[i], + filters[j].name); + } +out: + return ret; +} + +static int init_filters(void) +{ + int i, ret, nf; + filter_init(filters); - nf = PARA_MAX(2, conf.filter_given) + 1; - PARA_INFO_LOG("allocating space for %d filters\n", nf); + nf = PARA_MAX(1, conf.filter_given); + PARA_INFO_LOG("maximal number of filters: %d\n", nf); FOR_EACH_AUDIO_FORMAT(i) { - afi[i].filter_conf = para_malloc(nf * sizeof(char *)); + afi[i].filter_conf = para_malloc(nf * sizeof(void *)); afi[i].filters = para_malloc(nf * sizeof(struct filter *)); } if (!conf.no_default_filters_given) - return setup_default_filters(); + return init_default_filters(); for (i = 0; i < conf.filter_given; i++) { char *arg = conf.filter_arg[i]; char *filter_name = strchr(arg, ':'); @@ -1181,11 +1106,27 @@ static int init_stream_io(void) if (ret < 0) goto out; } - ret = 1; + ret = init_default_filters(); /* use default values for the rest */ out: return ret; } +static int init_stream_io(void) +{ + int ret; + + ret = init_writers(); + if (ret < 0) + return ret; + ret = init_receivers(); + if (ret < 0) + return ret; + ret = init_filters(); + if (ret < 0) + return ret; + return 1; +} + static int dump_commands(int fd) { char *buf = para_strdup(""), *tmp = NULL; @@ -1313,29 +1254,6 @@ out: return ret; } -#if 0 -static char *list_filters(void) -{ - int i, j; - char *tmp, *msg = make_message("format\tnum\tcmd\n"); - - FOR_EACH_AUDIO_FORMAT(i) { - for (j = 0; j < afi[i].num_filters; j++) { - tmp = make_message("%s\t%i\t%s\n", - afi[i].name, j, afi[i].filter_cmds[j]); - msg = para_strcat(msg, tmp); - free(tmp); - } - tmp = make_message("%s\t%i\t%s\n", afi[i].name, - j, afi[i].write_cmd); - msg = para_strcat(msg, tmp); - free(tmp); - } - return msg; -} -#endif - - static int com_grab(int fd, char *cmdline) { struct grab_client *gc; @@ -1429,13 +1347,13 @@ static int check_perms(uid_t uid) return -E_UCRED_PERM; } -static int handle_connect(void) +static int handle_connect(int accept_fd) { int i, argc, ret, clifd = -1; char *cmd = NULL, *p, *buf = para_calloc(MAXLINE), **argv = NULL; struct sockaddr_un unix_addr; - ret = para_accept(audiod_socket, &unix_addr, sizeof(struct sockaddr_un)); + ret = para_accept(accept_fd, &unix_addr, sizeof(struct sockaddr_un)); if (ret < 0) goto out; clifd = ret; @@ -1486,9 +1404,10 @@ out: return ret; } -static void audiod_get_socket(void) +static int audiod_get_socket(void) { struct sockaddr_un unix_addr; + int fd; if (conf.socket_given) socket_name = para_strdup(conf.socket_arg); @@ -1498,20 +1417,21 @@ static void audiod_get_socket(void) hn); free(hn); } - PARA_NOTICE_LOG("connecting to local socket %s\n", socket_name); + PARA_NOTICE_LOG("local socket: %s\n", socket_name); if (conf.force_given) unlink(socket_name); - audiod_socket = create_pf_socket(socket_name, &unix_addr, + fd = create_pf_socket(socket_name, &unix_addr, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IWOTH); - if (audiod_socket < 0) { + if (fd < 0) { PARA_EMERG_LOG("%s", "can not connect to socket\n"); exit(EXIT_FAILURE); /* do not unlink socket */ } - if (listen(audiod_socket, 5) < 0) { + if (listen(fd , 5) < 0) { PARA_EMERG_LOG("%s", "can not listen on socket\n"); exit(EXIT_FAILURE); /* do not unlink socket */ } - add_close_on_fork_list(audiod_socket); + add_close_on_fork_list(fd); + return fd; } static int open_stat_pipe(void) @@ -1528,117 +1448,118 @@ static int open_stat_pipe(void) return ret; } -static void audiod_pre_select(fd_set *rfds, fd_set *wfds, struct timeval *tv, - int *max_fileno) +void signal_event_handler(struct task *t) { - int i, ret; + struct signal_task *st = t->private_data; + handle_signal(st->signum); +} - FOR_EACH_SLOT(i) { - struct slot_info *s = &slot[i]; - struct audio_format_info *a; - struct receiver_node *rn = s->receiver_node; - if (s->format < 0 || !rn) - continue; - a = &afi[s->format]; - ret = a->receiver->pre_select(rn, rfds, wfds, tv); -// PARA_NOTICE_LOG("%s preselect: %d\n", a->receiver->name, ret); - *max_fileno = PARA_MAX(*max_fileno, ret); - } +void signal_pre_select(struct sched *s, struct task *t) +{ + struct signal_task *st = t->private_data; + t->ret = 1; + para_fd_set(st->fd, &s->rfds, &s->max_fileno); } -static void audiod_post_select(int select_ret, fd_set *rfds, fd_set *wfds) + +void signal_post_select(struct sched *s, struct task *t) { - int i, ret; + struct signal_task *st = t->private_data; + t->ret = 1; + if (!FD_ISSET(st->fd, &s->rfds)) + return; + t->ret = -E_SIGNAL_CAUGHT; + st->signum = para_next_signal(); +} - FOR_EACH_SLOT(i) { - struct slot_info *s = &slot[i]; - struct audio_format_info *a; - struct receiver_node *rn = s->receiver_node; - if (s->format < 0 || !rn || rn->eof) - continue; - a = &afi[s->format]; - ret = a->receiver->post_select(rn, select_ret, rfds, wfds); - if (ret <= 0) { - if (ret) - PARA_ERROR_LOG("%s post select failed: %s (slot %d)\n", - a->receiver->name, PARA_STRERROR(-ret), i); - else - PARA_INFO_LOG("eof in slot %d\n", i); - rn->eof = 1; - } - if (ret < 0 && s->fci) - s->fci->error = ret; - } +void signal_setup_default(struct signal_task *st) +{ + st->task.pre_select = signal_pre_select; + st->task.post_select = signal_post_select; + st->task.private_data = st; + st->task.flags = 0; + sprintf(st->task.status, "signal task"); } -/* TODO: move everything before the select call to pre_select() */ -static void __noreturn audiod_mainloop(void) +static void command_pre_select(struct sched *s, struct task *t) { - fd_set rfds, wfds; - int ret, max_fileno, sbo = 0; - char status_buf[STRINGSIZE] = ""; - struct timeval tv; -repeat: - FD_ZERO(&wfds); - FD_ZERO(&rfds); - max_fileno = -1; - /* always check signal pipe and the local socket */ - para_fd_set(signal_pipe, &rfds, &max_fileno); - para_fd_set(audiod_socket, &rfds, &max_fileno); + struct command_task *ct = t->private_data; + para_fd_set(ct->fd, &s->rfds, &s->max_fileno); - if (audiod_status != AUDIOD_ON) - kill_all_decoders(); - else if (playing) - start_current_receiver(); +} + +static void command_post_select(struct sched *s, struct task *t) +{ + int ret; + struct command_task *ct = t->private_data; - set_stream_fds(&wfds, &max_fileno); - /* status pipe */ - if (stat_pipe >= 0 && audiod_status == AUDIOD_OFF) - close_stat_pipe(); - if (stat_pipe < 0 && audiod_status != AUDIOD_OFF) { - stat_pipe = open_stat_pipe(); - sbo = 0; - status_buf[0] = '\0'; - } - if (stat_pipe >= 0 && audiod_status != AUDIOD_OFF) - para_fd_set(stat_pipe, &rfds, &max_fileno); - /* local socket */ - tv.tv_sec = 0; - tv.tv_usec = 200 * 1000; - audiod_pre_select(&rfds, &wfds, &tv, &max_fileno); - ret = para_select(max_fileno + 1, &rfds, &wfds, &tv); - if (ret < 0) - goto repeat; if (audiod_status != AUDIOD_OFF) audiod_status_dump(); - audiod_post_select(ret, &rfds, &wfds); - /* read status pipe */ - if (stat_pipe >=0 && FD_ISSET(stat_pipe, &rfds)) { - ret = read(stat_pipe, status_buf + sbo, STRINGSIZE - 1 - sbo); - if (ret <= 0) { - close_stat_pipe(); - /* avoid busy loop if server is down */ - while (sleep(1) > 0) - ; /* try again*/ - } else { - status_buf[ret + sbo] = '\0'; - sbo = for_each_line(status_buf, ret + sbo, - &check_stat_line); - } - } - slot_io(&wfds); - if (FD_ISSET(audiod_socket, &rfds)) { - ret = handle_connect(); - if (ret < 0) { - PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret)); - } + t->ret = 1; /* always successful */ + if (!FD_ISSET(ct->fd, &s->rfds)) + return; + ret = handle_connect(ct->fd); + if (ret < 0) + PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret)); +} + +static void init_command_task(struct command_task *ct) +{ + ct->task.pre_select = command_pre_select; + ct->task.post_select = command_post_select; + ct->task.event_handler = NULL; + ct->task.private_data = ct; + ct->task.flags = 0; + ct->fd = audiod_get_socket(); /* doesn't return on errors */ + sprintf(ct->task.status, "command task"); +} + +static void status_pre_select(struct sched *s, struct task *t) +{ + struct status_task *st = t->private_data; + t->ret = 1; + if (st->fd >= 0 && audiod_status == AUDIOD_OFF) + close_stat_pipe(); + if (st->fd < 0 && audiod_status != AUDIOD_OFF) { + st->fd = open_stat_pipe(); + st->loaded = 0; + st->buf[0] = '\0'; } - /* signals */ - if (FD_ISSET(signal_pipe, &rfds)) { - int sig_nr = para_next_signal(); - if (sig_nr > 0) - handle_signal(sig_nr); + if (st->fd >= 0 && audiod_status != AUDIOD_OFF) + para_fd_set(st->fd, &s->rfds, &s->max_fileno); +} + +static void status_post_select(struct sched *s, struct task *t) +{ + struct status_task *st = t->private_data; + int ret; + + t->ret = 1; + if (st->fd < 0 || !FD_ISSET(st->fd, &s->rfds)) + return; + ret = read(st->fd, st->buf + st->loaded, + STRINGSIZE - 1 - st->loaded); + if (ret <= 0) { + close_stat_pipe(); + /* avoid busy loop if server is down */ + while (sleep(1) > 0) /* FIXME */ + ; /* try again*/ + } else { + st->buf[ret + st->loaded] = '\0'; + st->loaded = for_each_line(st->buf, ret + st->loaded, + &check_stat_line); } - goto repeat; +} + +static void init_status_task(struct status_task *st) +{ + st->task.pre_select = status_pre_select; + st->task.post_select = status_post_select; + st->task.private_data = st; + st->task.flags = 0; + st->loaded = 0; + st->fd = -1; + st->buf[0] = '\0'; + sprintf(st->task.status, "status task"); } static void set_initial_status(void) @@ -1658,10 +1579,15 @@ static void set_initial_status(void) PARA_WARNING_LOG("%s", "invalid mode\n"); } -int __noreturn main(int argc, char *argv[]) +int main(int argc, char *argv[]) { char *cf; - int i; + int ret, i; + struct sched s; + struct command_task command_task_struct, *cmd_task = &command_task_struct; + struct task audiod_task_struct, *audiod_task = &audiod_task_struct; + + init_sched(); valid_fd_012(); hostname = para_hostname(); @@ -1688,8 +1614,24 @@ int __noreturn main(int argc, char *argv[]) clear_slot(i); init_grabbing(); setup_signal_handling(); + signal_setup_default(sig_task); + sig_task->task.event_handler = signal_event_handler; + + init_status_task(stat_task); + init_command_task(cmd_task); + init_audiod_task(audiod_task); + if (conf.daemon_given) daemon_init(); - audiod_get_socket(); /* doesn't return on errors */ - audiod_mainloop(); + + register_task(&sig_task->task); + register_task(&cmd_task->task); + register_task(&stat_task->task); + register_task(audiod_task); + s.default_timeout.tv_sec = 0; + s.default_timeout.tv_usec = 99 * 1000; + ret = sched(&s); + + PARA_EMERG_LOG("%s\n", PARA_STRERROR(-ret)); + return EXIT_FAILURE; } diff --git a/audiod.ggo b/audiod.ggo index dd385a16..94bf59d0 100644 --- a/audiod.ggo +++ b/audiod.ggo @@ -143,21 +143,20 @@ see --no_default_filters." string typestr="filter_spec" optional multiple -option "stream_write_cmd" w +option "writer" w #~~~~~~~~~~~~~~~~~~~~~~~~~~ "Specify stream writer. -May be given multiple times, once for each -supported audio format. Default value is -'para_write -w alsa' for both mp3 and ogg. -You can use the START_TIME() macro for these -commands. Each occurence of START_TIME() -gets replaced at runtime by the stream start -time announced by para_server, plus any -offsets." +May be given multiple times, even multiple +times for the same audio format. Default +value is 'alsa' for all supported audio +formats. Example: - string typestr="format:command" + -w 'aac:osx' + +" + string typestr="writer_spec" optional multiple @@ -167,10 +166,8 @@ option "stream_delay" - "Time to add to para_server's start_time. -Amount of time to be added to the server -stream start time for stream_write_cmd if -START_TIME() was given. Useful for -syncronizing the audio output of clients." +Amount of time to be added to the server, before data is sent to +the writer. Useful for syncronizing the audio output of clients." int typestr="milliseconds" default="200" diff --git a/compress.c b/compress.c index 3e605d0e..936ddc3a 100644 --- a/compress.c +++ b/compress.c @@ -25,6 +25,7 @@ #include "para.h" #include "compress_filter.cmdline.h" #include "list.h" +#include "sched.h" #include "filter.h" #include "string.h" diff --git a/configure.ac b/configure.ac index 18d6e9cb..7c2c1ec5 100644 --- a/configure.ac +++ b/configure.ac @@ -57,17 +57,18 @@ AC_CHECK_LIB([menu], [new_menu], [extras="$extras para_dbadm"], recv_cmdline_objs="recv.cmdline http_recv.cmdline dccp_recv.cmdline" recv_errlist_objs="http_recv recv_common recv time string net dccp_recv - dccp fd" + dccp fd sched stdout" recv_ldflags="" filter_cmdline_objs="filter.cmdline compress_filter.cmdline" -filter_errlist_objs="filter_chain wav compress filter string" +filter_errlist_objs="filter_chain wav compress filter string stdin stdout sched fd" filter_ldflags="" audiod_cmdline_objs="audiod.cmdline grab_client.cmdline compress_filter.cmdline - http_recv.cmdline dccp_recv.cmdline" + http_recv.cmdline dccp_recv.cmdline file_write.cmdline" audiod_errlist_objs="audiod exec close_on_fork signal string daemon stat net - time grab_client filter_chain wav compress http_recv dccp dccp_recv recv_common fd" + time grab_client filter_chain wav compress http_recv dccp dccp_recv + recv_common fd sched write_common file_writer" audiod_ldflags="" server_cmdline_objs="server.cmdline" @@ -76,8 +77,8 @@ server_errlist_objs="server mp3_afh afs command net string signal random_selecto ipc dccp dccp_send fd" server_ldflags="" -write_cmdline_objs="write.cmdline" -write_errlist_objs="write write_common file_writer time fd string" +write_cmdline_objs="write.cmdline file_write.cmdline" +write_errlist_objs="write write_common file_writer time fd string sched stdin" write_ldflags="" write_writers="file" @@ -263,7 +264,7 @@ else fi ########################################################################### alsa have_alsa="yes" -msg="=> no alsa support for para_write" +msg="=> no alsa support for para_audiod/para_write" AC_CHECK_HEADERS([alsa/asoundlib.h], [], [ AC_MSG_WARN([no alsa/asoundlib $msg]) have_alsa="no" @@ -273,7 +274,12 @@ AC_CHECK_LIB([asound], [snd_pcm_open], [], [ have_alsa="no" ]) if test "$have_alsa" = "yes"; then + audiod_errlist_objs="$audiod_errlist_objs alsa_writer" + audiod_cmdline_objs="$audiod_cmdline_objs alsa_write.cmdline" + audiod_ldflags="$audiod_ldflags -lasound" + write_errlist_objs="$write_errlist_objs alsa_writer" + write_cmdline_objs="$write_cmdline_objs alsa_write.cmdline" write_ldflags="$write_ldflags -lasound" write_writers="$write_writers alsa" fi diff --git a/dccp_recv.c b/dccp_recv.c index f7db6fc8..deca7162 100644 --- a/dccp_recv.c +++ b/dccp_recv.c @@ -26,9 +26,12 @@ #include "para.h" #include "error.h" #include "dccp.h" +#include "list.h" +#include "sched.h" #include "recv.h" #include "string.h" #include "net.h" +#include "fd.h" #include "dccp_recv.cmdline.h" @@ -116,35 +119,35 @@ static void *dccp_recv_parse_config(int argc, char **argv) return NULL; } -static int dccp_recv_pre_select(struct receiver_node *rn, fd_set *rfds, - __a_unused fd_set *wfds, __a_unused struct timeval *timeout) +static void dccp_recv_pre_select(struct sched *s, struct task *t) { - struct private_dccp_recv_data *pdd = rn->private_data; + struct private_dccp_recv_data *pdd = t->private_data; if (!pdd) - return -1; - FD_SET(pdd->fd, rfds); - return pdd->fd; + return ; + para_fd_set(pdd->fd, &s->rfds, &s->max_fileno); } -static int dccp_recv_post_select(struct receiver_node *rn, int select_ret, - fd_set *rfds, __a_unused fd_set *wfds) +static void dccp_recv_post_select(struct sched *s, struct task *t) { - int ret; + struct receiver_node *rn = t->private_data; struct private_dccp_recv_data *pdd = rn->private_data; - if (!select_ret || !pdd || !FD_ISSET(pdd->fd, rfds)) - return 1; /* nothing to do */ + t->ret = 1; + if (!s->select_ret || !pdd || !FD_ISSET(pdd->fd, &s->rfds)) + return; /* nothing to do */ + t->ret = -E_DCCP_OVERRUN; if (rn->loaded >= DCCP_BUFSIZE) - return -E_DCCP_OVERRUN; - ret = recv_bin_buffer(pdd->fd, rn->buf + rn->loaded, + return; + t->ret = recv_bin_buffer(pdd->fd, rn->buf + rn->loaded, DCCP_BUFSIZE - rn->loaded); - if (ret <= 0) { - PARA_INFO_LOG("%s\n", ret? PARA_STRERROR(-ret) : "eof"); - return ret; + if (t->ret <= 0) { + rn->eof = 1; + if (!t->ret) + t->ret = -E_DCCP_RECV_EOF; + return; } - rn->loaded += ret; - return 1; + rn->loaded += t->ret; } /** diff --git a/error.h b/error.h index 59c9554a..8458ae2b 100644 --- a/error.h +++ b/error.h @@ -20,6 +20,7 @@ /** \cond list of all subsystems that support the shiny error facility */ enum para_subsystem { + SS_SCHED, SS_GUI, SS_TIME, SS_WAV, @@ -33,6 +34,8 @@ enum para_subsystem { SS_ORTP_RECV, SS_AUDIOD, SS_EXEC, + SS_STDIN, + SS_STDOUT, SS_SIGNAL, SS_STRING, SS_STAT, @@ -79,11 +82,22 @@ enum para_subsystem { #define ORTP_SEND_ERRORS #define GUI_ERRORS #define RINGBUFFER_ERRORS +#define SCHED_ERRORS extern const char **para_errlist[]; /** \endcond */ +#define STDIN_ERRORS \ + PARA_ERROR(STDIN_READ, "failed to read from stdin"), \ + PARA_ERROR(STDIN_EOF, "end of file"), \ + + +#define STDOUT_ERRORS \ + PARA_ERROR(STDOUT_WRITE, "failed to write to stdout"), \ + PARA_ERROR(STDOUT_EOF, "end of file"), \ + + #define NET_ERRORS \ PARA_ERROR(SEND, "send error"), \ PARA_ERROR(RECV, "receive error"), \ @@ -108,12 +122,14 @@ extern const char **para_errlist[]; PARA_ERROR(TOO_MANY_BAD_CHUNKS, "too many consecutive bad chunks"), \ PARA_ERROR(INVALID_HEADER, "invalid header packet"), \ PARA_ERROR(OVERRUN, "outout buffer overrun"), \ + PARA_ERROR(ORTP_RECV_EOF, "ortp_recv: end of file"), \ #define HTTP_RECV_ERRORS \ PARA_ERROR(SEND_HTTP_REQUEST, "failed to send http request"), \ PARA_ERROR(MISSING_OK, "did not receive OK message from peer"), \ - PARA_ERROR(HTTP_RECV_BUF, "did not receive buffer") + PARA_ERROR(HTTP_RECV_BUF, "did not receive buffer"), \ + PARA_ERROR(HTTP_RECV_EOF, "http_recv: end of file"), \ #define RECV_ERRORS \ @@ -138,6 +154,7 @@ extern const char **para_errlist[]; #define FILTER_CHAIN_ERRORS \ PARA_ERROR(UNSUPPORTED_FILTER, "given filter not supported"), \ PARA_ERROR(BAD_FILTER_OPTIONS, "invalid filter option given"), \ + PARA_ERROR(FC_EOF, "filter chain: eof"), \ #define STAT_ERRORS \ @@ -179,6 +196,7 @@ extern const char **para_errlist[]; PARA_ERROR(SIGNAL_READ, "read error from signal pipe"), \ PARA_ERROR(WAITPID, "waitpid error"), \ PARA_ERROR(SIGNAL_PIPE, "failed to setup signal pipe"), \ + PARA_ERROR(SIGNAL_CAUGHT, "caught signal"), \ #define STRING_ERRORS \ @@ -211,7 +229,6 @@ extern const char **para_errlist[]; #define AAC_COMMON_ERRORS \ - PARA_ERROR(AAC_BUF, "invalid buffer"), \ PARA_ERROR(ESDS, "did not find esds atom"), \ PARA_ERROR(STCO, "did not find stco atom"), \ @@ -312,6 +329,7 @@ extern const char **para_errlist[]; PARA_ERROR(DCCP_SOCKET, "can not create dccp socket"), \ PARA_ERROR(DCCP_PACKET_SIZE, "failed to set dccp packet size"), \ PARA_ERROR(DCCP_SERVICE, "could not get service code"), \ + PARA_ERROR(DCCP_RECV_EOF, "dccp_recv: end of file"), \ #define DCCP_RECV_ERRORS \ @@ -319,22 +337,25 @@ extern const char **para_errlist[]; PARA_ERROR(DCCP_OVERRUN, "dccp output buffer buffer overrun"), \ PARA_ERROR(DCCP_CONNECT, "dccp connect error"), \ + #define DCCP_SEND_ERRORS \ PARA_ERROR(DCCP_BIND, "dccp bind error"), \ PARA_ERROR(DCCP_LISTEN, "dccp listen error"), \ PARA_ERROR(DCCP_WRITE, "dccp write error"), \ + #define FD_ERRORS \ PARA_ERROR(F_GETFL, "failed to get fd flags"), \ PARA_ERROR(F_SETFL, "failed to set fd flags"), \ #define WRITE_ERRORS \ - PARA_ERROR(READ_HDR, "failed to read audio file header"), \ - PARA_ERROR(READ_STDIN, "failed to read from stdin"), \ PARA_ERROR(WRITE_SYNTAX, "para_write syntax error"), \ - PARA_ERROR(WRITE_OVERRUN, "buffer overrun"), \ PARA_ERROR(PREMATURE_END, "premature end of audio file"), \ + PARA_ERROR(NO_WAV_HEADER, "wave header not found"), \ + PARA_ERROR(WAV_HEADER_SUCCESS, "successfully read wave header"), \ + PARA_ERROR(NO_DELAY, "no initial delay"), \ + PARA_ERROR(DELAY_TIMEOUT, "initial delay timeout"), \ #define ALSA_WRITER_ERRORS \ @@ -355,22 +376,24 @@ extern const char **para_errlist[]; PARA_ERROR(SET_RATE, "snd_pcm_hw_params_set_rate_near failed"), \ PARA_ERROR(START_THRESHOLD, "snd_pcm_sw_params_set_start_threshold() failed"), \ PARA_ERROR(STOP_THRESHOLD, "snd_pcm_sw_params_set_stop_threshold() failed"), \ - PARA_ERROR(ALSA_LOG, "snd_output_stdio_attach() failed"), \ #define FILE_WRITER_ERRORS \ PARA_ERROR(FW_WRITE, "file writer write error"), \ PARA_ERROR(FW_OPEN, "file writer: can not open output file"), \ + PARA_ERROR(FW_NO_FILE, "task started without open file"), \ #define WRITE_COMMON_ERRORS \ PARA_ERROR(WRITE_COMMON_SYNTAX, "syntax error in write option"), \ + PARA_ERROR(WNG_EOF, "wng: end of file"), \ #define AACDEC_ERRORS \ PARA_ERROR(AACDEC_INIT, "failed to init aac decoder"), \ PARA_ERROR(AAC_DECODE, "aac decode error"), \ + /** * the subsystem shift * @@ -445,6 +468,9 @@ extern const char **para_errlist[]; /** \cond popcorn time */ SS_ENUM(GUI); +SS_ENUM(SCHED); +SS_ENUM(STDIN); +SS_ENUM(STDOUT); SS_ENUM(WAV); SS_ENUM(COMPRESS); SS_ENUM(TIME); diff --git a/file_write.ggo b/file_write.ggo new file mode 100644 index 00000000..a172f757 --- /dev/null +++ b/file_write.ggo @@ -0,0 +1,11 @@ +section "file writer options" + +option "filename" f +#~~~~~~~~~~~~~~~~~~ + +"select output file name. Defaults to a +random filename in ~/.paraslash." + + string typestr="filename" + optional + diff --git a/file_writer.c b/file_writer.c index a7a765e1..9145ef9f 100644 --- a/file_writer.c +++ b/file_writer.c @@ -19,28 +19,40 @@ /** \file file_writer.c simple output plugin for testing purposes */ #include "para.h" +#include "list.h" +#include "sched.h" #include "write.h" #include "string.h" +#include "fd.h" +#include "file_write.cmdline.h" #include "error.h" /** data specific to the file writer */ struct private_file_writer_data { -/** the file descriptor of the output file */ -int fd; + /** the file descriptor of the output file */ + int fd; + /** non-zero if \a fd was added to the write fd set */ + int check_fd; }; -static int file_writer_open(struct writer_node *w) +static int file_writer_open(struct writer_node *wn) { struct private_file_writer_data *pfwd = para_calloc( sizeof(struct private_file_writer_data)); - char *tmp = para_tmpname(), *home = para_homedir(), - *filename = make_message("%s/.paraslash/%s", home, tmp); - - free(home); - free(tmp); - w->private_data = pfwd; + struct file_write_args_info *conf = wn->conf; + char *filename; + if (conf->filename_given) + filename = conf->filename_arg; + else { + char *tmp = para_tmpname(), *home = para_homedir(); + filename = make_message("%s/.paraslash/%s", home, tmp); + free(home); + free(tmp); + } + wn->private_data = pfwd; pfwd->fd = open(filename, O_WRONLY | O_CREAT, S_IRUSR | S_IWUSR); - free(filename); + if (!conf->filename_given) + free(filename); if (pfwd->fd >= 0) return 8192; free(pfwd); @@ -56,6 +68,44 @@ static int file_writer_write(char *data, size_t nbytes, struct writer_node *wn) return ret; } +static void file_writer_pre_select(struct sched *s, struct task *t) +{ + struct writer_node *wn = t->private_data; + struct private_file_writer_data *pfwd = wn->private_data; + struct writer_node_group *wng = wn->wng; + +// PARA_INFO_LOG("task %p check_fd: %d\n", t, pfwd->check_fd); + pfwd->check_fd = 0; + t->ret = -E_FW_NO_FILE; + if (pfwd->fd <= 0) + return; + t->ret = 0; + if (!*wng->loaded) + return; + t->ret = 1; + para_fd_set(pfwd->fd, &s->wfds, &s->max_fileno); + pfwd->check_fd = 1; +} + +static void file_writer_post_select(struct sched *s, struct task *t) +{ + struct writer_node *wn = t->private_data; + struct private_file_writer_data *pfwd = wn->private_data; + struct writer_node_group *wng = wn->wng; + + t->ret = 0; + if (!pfwd->check_fd) + return; + if (!*wng->loaded) + return; + if (!FD_ISSET(pfwd->fd, &s->wfds)) + return; +// PARA_INFO_LOG("writing %zd\n", *wng->loaded); + t->ret = write(pfwd->fd, wng->buf, *wng->loaded); + if (t->ret < 0) + t->ret = -E_FW_WRITE; +} + static void file_writer_close(struct writer_node *wn) { struct private_file_writer_data *pfwd = wn->private_data; @@ -63,11 +113,27 @@ static void file_writer_close(struct writer_node *wn) free(pfwd); } +__malloc void *file_writer_parse_config(char *options) +{ + PARA_INFO_LOG("options: %s\n", options); + struct file_write_args_info *conf + = para_calloc(sizeof(struct file_write_args_info)); + int ret = file_cmdline_parser_string(options, conf, "file_write"); + PARA_INFO_LOG("conf->filename_given: %d\n", conf->filename_given); + if (!ret) + return conf; + free(conf); + return NULL; +} + /** the init function of the file writer */ void file_writer_init(struct writer *w) { w->open = file_writer_open; w->write = file_writer_write; + w->pre_select = file_writer_pre_select; + w->post_select = file_writer_post_select; + w->parse_config = file_writer_parse_config; w->close = file_writer_close; w->shutdown = NULL; /* nothing to do */ } diff --git a/filter.c b/filter.c index a035c6e6..56afebf3 100644 --- a/filter.c +++ b/filter.c @@ -21,16 +21,21 @@ #include "filter.cmdline.h" #include "list.h" +#include "sched.h" #include "filter.h" -#include "error.h" #include "string.h" +#include "stdin.h" +#include "stdout.h" +#include "error.h" INIT_FILTER_ERRLISTS; -#define INBUF_SIZE 32 * 1024 - -static struct filter_chain_info filter_chain_info_struct; -static struct filter_chain_info *fci = &filter_chain_info_struct; +static struct stdin_task stdin_task_struct; +static struct stdin_task *sit = &stdin_task_struct; +static struct filter_chain filter_chain_struct; +static struct filter_chain *fc = &filter_chain_struct; +static struct stdout_task stdout_task_struct; +static struct stdout_task *sot = &stdout_task_struct; struct gengetopt_args_info conf; @@ -46,52 +51,62 @@ __printf_2_3 void para_log(int ll, const char* fmt,...) va_end(argp); } -static char *inbuf; -static size_t loaded; -static int eof; +void filter_event_handler(struct task *t) +{ + PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret)); + unregister_task(t); +} -static int init_active_filter_list(void) +static void open_filters(void) +{ + struct filter_node *fn; + + list_for_each_entry(fn, &fc->filters, node) { + fn->filter->open(fn); + PARA_INFO_LOG("opened %s filter\n", fn->filter->name); + fc->outbuf = fn->buf; + fc->out_loaded = &fn->loaded; + } +} + + +static int init_filter_chain(void) { int i, filter_num; struct filter_node *fn; - INIT_LIST_HEAD(&fci->filters); + INIT_LIST_HEAD(&fc->filters); - fci->inbuf = inbuf; - fci->in_loaded = &loaded; - fci->eof = &eof; + fc->inbuf = sit->buf; + fc->in_loaded = &sit->loaded; + fc->input_eof = &sit->eof; + fc->eof = 0; + fc->output_eof = &sot->eof; + fc->task.private_data = fc; + fc->task.pre_select = filter_pre_select; + fc->task.event_handler = filter_event_handler; + sprintf(fc->task.status, "filter chain"); for (i = 0; i < conf.filter_given; i++) { - char *fa = para_strdup(conf.filter_arg[i]); + char *fa = conf.filter_arg[i]; fn = para_calloc(sizeof(struct filter_node)); filter_num = check_filter_arg(fa, &fn->conf); if (filter_num < 0) { free(fn); return filter_num; } - fn->fci = fci; + fn->fc = fc; INIT_LIST_HEAD(&fn->callbacks); fn->filter = &filters[filter_num]; PARA_DEBUG_LOG("adding %s to filter chain\n", fn->filter->name); - list_add_tail(&fn->node, &fci->filters); + list_add_tail(&fn->node, &fc->filters); } - if (list_empty(&fci->filters)) + if (list_empty(&fc->filters)) return -E_NO_FILTERS; + open_filters(); return 1; } -static void open_filters(void) -{ - struct filter_node *fn; - - list_for_each_entry(fn, &fci->filters, node) { - fn->filter->open(fn); - PARA_INFO_LOG("opened %s filter\n", fn->filter->name); - fci->outbuf = fn->buf; - fci->out_loaded = &fn->loaded; - } -} - static int parse_config(int argc, char *argv[]) { static char *cf; /* config file */ @@ -122,55 +137,38 @@ static int parse_config(int argc, char *argv[]) int main(int argc, char *argv[]) { - int converted, ret; - char *ib, *ob; /* input/output buffer */ - size_t *il, *ol; /* number of loaded bytes in input/output buffer */ + int ret; + struct sched s; + + init_sched(); + stdin_set_defaults(sit); + sit->buf = para_malloc(sit->bufsize), filter_init(filters); ret = parse_config(argc, argv); if (ret < 0) goto out; - inbuf = para_malloc(INBUF_SIZE); - ret = init_active_filter_list(); - if (ret < 0) - goto out; - open_filters(); - ib = fci->inbuf; - ob = fci->outbuf; - il = fci->in_loaded; - ol = fci->out_loaded; - PARA_DEBUG_LOG("ib %p in, ob: %p\n", ib, ob); -again: - if (*il < INBUF_SIZE && !eof) { - ret = read(STDIN_FILENO, ib + *il, INBUF_SIZE - *il); - PARA_DEBUG_LOG("read %d/%zd\n", ret, INBUF_SIZE - *il); - if (ret < 0) - goto out; - if (!ret) - eof = 1; - *il += ret; - } - ret = filter_io(fci); + ret = init_filter_chain(); if (ret < 0) goto out; - converted = ret; - if (*ol) { - ret = write(STDOUT_FILENO, ob, *ol); - PARA_DEBUG_LOG("wrote %d/%zd\n", ret, *ol); - if (ret <= 0) - goto out; - *ol -= ret; - if (*ol) { - PARA_NOTICE_LOG("short write: %zd bytes left\n", *ol); - memmove(ob, ob + ret, *ol); - } - } - if (!eof || converted) - goto again; - ret = 0; + + stdout_set_defaults(sot); + PARA_EMERG_LOG("fc->output_eof: %d\n", *fc->output_eof); + sot->buf = fc->outbuf; + sot->loaded = fc->out_loaded; + sot->input_eof = &fc->eof; + + register_task(&sot->task); + register_task(&fc->task); + register_task(&sit->task); + s.default_timeout.tv_sec = 1; + s.default_timeout.tv_usec = 0; + PARA_EMERG_LOG("fc->output_eof: %d\n", *fc->output_eof); + ret = sched(&s); out: + free(sit->buf); + close_filters(fc); if (ret < 0) PARA_EMERG_LOG("%s\n", PARA_STRERROR(-ret)); - close_filters(fci); - return ret; + return ret < 0? EXIT_FAILURE : EXIT_SUCCESS; } diff --git a/filter.h b/filter.h index 2fb9ad17..45ade8d3 100644 --- a/filter.h +++ b/filter.h @@ -22,71 +22,67 @@ * describes one running instance of a chain of filters * */ -struct filter_chain_info { -/** - * - * - * the number of channels of the current stream - * - * Set by the decoding filter - */ - unsigned int channels; -/** - * - * - * current samplerate in Hz - * - * Set by the decoding filter - */ - unsigned int samplerate; -/** - * - * - * the list containing all filter nodes in this filter chain - */ - struct list_head filters; -/** - * - * - * the input buffer of the filter chain - * - * This is set to point to the output buffer of the receiving application (the - * buffer used to read from stdin for para_filter; the output buffer of the - * current receiver for para_audiod) - */ - char *inbuf; -/** - * - * - * the output buffer of the filter chain - * - * Points to the output buffer of the last filter in the filter chain -**/ - char *outbuf; -/** - * - * - * pointer to variable containing the number of bytes loaded in the input buffer - */ - size_t *in_loaded; -/** - * - * - * pointer to variable containing the number of bytes loaded in the output buffer - */ - size_t *out_loaded; -/** - * - * - * non-zero if end of file was encountered - */ - int *eof; -/** - * - * - * non-zero if an error occured - */ - int error; +struct filter_chain { + /** + * + * + * the number of channels of the current stream + * + * Set by the decoding filter + */ + unsigned int channels; + /** + * + * + * current samplerate in Hz + * + * Set by the decoding filter + */ + unsigned int samplerate; + /** + * + * + * the list containing all filter nodes in this filter chain + */ + struct list_head filters; + /** + * + * + * the input buffer of the filter chain + * + * This is set to point to the output buffer of the receiving application (the + * buffer used to read from stdin for para_filter; the output buffer of the + * current receiver for para_audiod) + */ + char *inbuf; + /** + * + * + * the output buffer of the filter chain + * + * Points to the output buffer of the last filter in the filter chain + **/ + char *outbuf; + /** + * + * + * pointer to variable containing the number of bytes loaded in the input buffer + */ + size_t *in_loaded; + /** + * + * + * pointer to variable containing the number of bytes loaded in the output buffer + */ + size_t *out_loaded; + /** non-zero if this filter wont' produce any more output */ + int eof; + /** pointer to the eof flag of the receiving application */ + int *input_eof; + /** pointer to the eof flag of the writing application */ + int *output_eof; + /** the task associated with the filter chain */ + struct task task; }; /** @@ -104,7 +100,7 @@ struct filter_node { * * the filter chain this filter node belongs to */ - struct filter_chain_info *fci; + struct filter_chain *fc; /** * * @@ -212,11 +208,12 @@ struct filter_callback { }; -void close_filters(struct filter_chain_info *fci); -int filter_io(struct filter_chain_info *fci); +void close_filters(struct filter_chain *fc); +int filter_io(struct filter_chain *fc); void filter_init(struct filter *all_filters); int check_filter_arg(char *filter_arg, void **conf); int del_filter_callback(struct filter_callback *fcb); +void filter_pre_select(struct sched *s, struct task *t); /** * the structure associated with a paraslash filter diff --git a/filter_chain.c b/filter_chain.c index 4c1f94bc..17f67058 100644 --- a/filter_chain.c +++ b/filter_chain.c @@ -20,6 +20,8 @@ #include "para.h" #include "list.h" +#include "sched.h" +#include "fd.h" #include "filter.h" #include "error.h" #include "string.h" @@ -67,8 +69,11 @@ static void close_callbacks(struct filter_node *fn) { struct filter_callback *fcb, *tmp; - list_for_each_entry_safe(fcb, tmp, &fn->callbacks, node) + list_for_each_entry_safe(fcb, tmp, &fn->callbacks, node) { + PARA_INFO_LOG("closing %s filter callback\n", + fn->filter->name); close_filter_callback(fcb); + } } static void call_callbacks(struct filter_node *fn, char *inbuf, size_t inlen, @@ -95,83 +100,94 @@ static void call_callbacks(struct filter_node *fn, char *inbuf, size_t inlen, /** * call the convert function of each filter * - * \param fci the filter chain containing the list of filter nodes. - * * This is the core function of the filter subsystem. It loops over the list of - * filter nodes determined by \a fci and calls the filter's convert function if + * filter nodes determined by \a t and calls the filter's convert function if * there is input available for the filter node in question. If the convert * function consumed some or all of its input data, all registered input * callbacks are called. Similarly, if a convert function produced output, all * registerd output callbacks get called. * - * \return The sum of output bytes produced by the convert functions on success, - * negative return value on errors. + * \return The sum of output bytes produced by the convert functions on + * success, negative return value on errors (the return value is stored in + * t->ret). * * \sa filter_node, filter#convert, filter_callback */ -int filter_io(struct filter_chain_info *fci) +void filter_pre_select(__a_unused struct sched *s, struct task *t) { + struct filter_chain *fc = t->private_data; struct filter_node *fn; char *ib; size_t *loaded; int conv, conv_total = 0; + + t->ret = -E_FC_EOF; + if (*fc->output_eof) + goto err_out; again: - ib = fci->inbuf; - loaded = fci->in_loaded; + ib = fc->inbuf; + loaded = fc->in_loaded; conv = 0; - list_for_each_entry(fn, &fci->filters, node) { - int ret; + list_for_each_entry(fn, &fc->filters, node) { if (*loaded && fn->loaded < fn->bufsize) { size_t old_fn_loaded = fn->loaded; - PARA_DEBUG_LOG("fc %p loaded: %zd, calling %s convert\n", fci, *loaded, fn->filter->name); - ret = fn->filter->convert(ib, *loaded, fn); - if (ret < 0) { - if (!fci->error) - fci->error = -ret; - return ret; - } - call_callbacks(fn, ib, ret, fn->buf + old_fn_loaded, fn->loaded - old_fn_loaded); - *loaded -= ret; - conv += ret; - if (*loaded && ret) { - PARA_DEBUG_LOG("moving %zd bytes in input buffer for %s filter\n", + PARA_DEBUG_LOG("fc %p loaded: %zd, calling %s convert\n", + fc, *loaded, fn->filter->name); + t->ret = fn->filter->convert(ib, *loaded, fn); + if (t->ret < 0) + goto err_out; + call_callbacks(fn, ib, t->ret, fn->buf + old_fn_loaded, + fn->loaded - old_fn_loaded); + *loaded -= t->ret; + conv += t->ret; + if (*loaded && t->ret) { + PARA_INFO_LOG("moving %zd bytes in input buffer for %s filter\n", *loaded, fn->filter->name); - memmove(ib, ib + ret, *loaded); + memmove(ib, ib + t->ret, *loaded); } } ib = fn->buf; loaded = &fn->loaded; } -// PARA_DEBUG_LOG("loaded: %d\n", *loaded); conv_total += conv; + PARA_INFO_LOG("eof (in/out/fc): %d/%d/%d out_loaded: %d, conv: %d, conv_total: %d\n", *fc->input_eof, + *fc->output_eof, fc->eof, *fc->out_loaded, conv, conv_total); if (conv) goto again; - return conv_total; + t->ret = 1; + if (!*fc->input_eof) + return; + if (*fc->out_loaded) + return; + if (*fc->in_loaded && conv_total) + return; + t->ret = -E_FC_EOF; +err_out: + fc->eof = 1; } /** * close all filter nodes and its callbacks * - * \param fci the filter chain to close + * \param fc the filter chain to close * - * For each filter node determined by \a fci, call the close function of each + * For each filter node determined by \a fc, call the close function of each * registered filter callback as well as the close function of the * corresponding filter. Free all resources and destroy all callback lists and * the filter node list. * * \sa filter::close, filter_callback::close */ -void close_filters(struct filter_chain_info *fci) +void close_filters(struct filter_chain *fc) { struct filter_node *fn, *tmp; - if (!fci) + if (!fc) return; - PARA_DEBUG_LOG("closing filter chain %p\n", fci); - list_for_each_entry_safe(fn, tmp, &fci->filters, node) { - PARA_NOTICE_LOG("closing %s filter callbacks (fci %p, fn %p)\n", fn->filter->name, fci, fn); + PARA_NOTICE_LOG("closing filter chain %p\n", fc); + list_for_each_entry_safe(fn, tmp, &fc->filters, node) { close_callbacks(fn); - PARA_NOTICE_LOG("closing %s filter (fci %p, fn %p)\n", fn->filter->name, fci, fn); + PARA_INFO_LOG("closing %s filter\n", fn->filter->name); fn->filter->close(fn); list_del(&fn->node); free(fn); diff --git a/grab_client.c b/grab_client.c index 4f90a27a..f505fa87 100644 --- a/grab_client.c +++ b/grab_client.c @@ -27,6 +27,7 @@ #include "close_on_fork.h" #include "grab_client.cmdline.h" #include "list.h" +#include "sched.h" #include "filter.h" #include "grab_client.h" #include "audiod.h" diff --git a/http_recv.c b/http_recv.c index 5f1d7799..b566acf3 100644 --- a/http_recv.c +++ b/http_recv.c @@ -21,11 +21,14 @@ #include "para.h" #include "http.h" +#include "list.h" +#include "sched.h" #include "recv.h" #include "http_recv.cmdline.h" #include "error.h" #include "net.h" #include "string.h" +#include "fd.h" /** the output buffer size of the http receiver */ #define BUFSIZE (32 * 1024) @@ -89,61 +92,62 @@ static char *make_request_msg(void) return ret; } -static int http_pre_select(struct receiver_node *rn, fd_set *rfds, fd_set *wfds, - __a_unused struct timeval *timeout) +static void http_recv_pre_select(struct sched *s, struct task *t) { + struct receiver_node *rn = t->private_data; struct private_http_recv_data *phd = rn->private_data; + t->ret = 1; if (phd->status == HTTP_CONNECTED) - FD_SET(phd->fd, wfds); + para_fd_set(phd->fd, &s->wfds, &s->max_fileno); else - FD_SET(phd->fd, rfds); - return phd->fd; + para_fd_set(phd->fd, &s->rfds, &s->max_fileno); } -static int http_post_select(struct receiver_node *rn, int select_ret, - fd_set *rfds, fd_set *wfds) + +static void http_recv_post_select(struct sched *s, struct task *t) { - int ret; + struct receiver_node *rn = t->private_data; struct private_http_recv_data *phd = rn->private_data; - if (!select_ret) /* we're not interested in timeouts */ - return 1; + t->ret = 1; + if (!s->select_ret) /* we're not interested in timeouts */ + return; if (phd->status == HTTP_CONNECTED) { char *rq; - if (!FD_ISSET(phd->fd, wfds)) - return 1; /* nothing to do */ + if (!FD_ISSET(phd->fd, &s->wfds)) + return; /* nothing to do */ rq = make_request_msg(); PARA_NOTICE_LOG("%s", "sending http request\n"); - ret = send_va_buffer(phd->fd, "%s", rq); + t->ret = send_va_buffer(phd->fd, "%s", rq); free(rq); - if (ret < 0) - return E_SEND_HTTP_REQUEST; - phd->status = HTTP_SENT_GET_REQUEST; - return 1; + if (t->ret > 0) + phd->status = HTTP_SENT_GET_REQUEST; + return; } - if (!FD_ISSET(phd->fd, rfds)) - return 1; /* nothing to do */ + if (!FD_ISSET(phd->fd, &s->rfds)) + return; /* nothing to do */ if (phd->status == HTTP_SENT_GET_REQUEST) { - ret = recv_pattern(phd->fd, HTTP_OK_MSG, MAXLINE); - if (ret < 0) - return -E_MISSING_OK; + t->ret = recv_pattern(phd->fd, HTTP_OK_MSG, MAXLINE); + if (t->ret < 0) + return; PARA_NOTICE_LOG("%s", "received ok msg, streaming\n"); phd->status = HTTP_STREAMING; - return 1; + return; } + t->ret = -E_OVERRUN; /* already streaming */ - if (rn->loaded >= BUFSIZE) { - PARA_ERROR_LOG("%s", "buffer overrun\n"); - return -E_OVERRUN; + if (rn->loaded >= BUFSIZE) + return; + t->ret = recv_bin_buffer(phd->fd, rn->buf + rn->loaded, + BUFSIZE - rn->loaded); + if (t->ret <= 0) { + rn->eof = 1; + if (!t->ret) + t->ret = -E_HTTP_RECV_EOF; + return; } - ret = recv_bin_buffer(phd->fd, rn->buf + rn->loaded, BUFSIZE - rn->loaded); - if (ret <= 0) { - PARA_NOTICE_LOG("recv returned %d/%zd\n", ret, BUFSIZE - rn->loaded); - return ret < 0? -E_HTTP_RECV_BUF : 0; - } - rn->loaded += ret; - return 1; + rn->loaded += t->ret; } static void http_recv_close(struct receiver_node *rn) @@ -175,6 +179,7 @@ static int http_recv_open(struct receiver_node *rn) rn->buf = para_calloc(BUFSIZE); rn->private_data = para_calloc(sizeof(struct private_http_recv_data)); phd = rn->private_data; + PARA_NOTICE_LOG("phd = %p, rn = %p\n", phd, rn); ret = -E_HOST_INFO; if (!(he = get_host_info(conf->host_arg))) goto err_out; @@ -209,8 +214,8 @@ void http_recv_init(struct receiver *r) { r->open = http_recv_open; r->close = http_recv_close; - r->pre_select = http_pre_select; - r->post_select = http_post_select; + r->pre_select = http_recv_pre_select; + r->post_select = http_recv_post_select; r->shutdown = http_shutdown; r->parse_config = http_recv_parse_config; } diff --git a/list.h b/list.h index 0080c80c..c85f50c7 100644 --- a/list.h +++ b/list.h @@ -169,4 +169,18 @@ static inline int list_empty(const struct list_head *head) n = list_entry(pos->member.next, typeof(*pos), member); \ &pos->member != (head); \ pos = n, n = list_entry(n->member.next, typeof(*n), member)) +/** + * list_for_each_entry_safe_reverse - iterate backwards over list of given type safe against + * removal of list entry + * @pos: the type * to use as a loop counter. + * @n: another type * to use as temporary storage + * @head: the head for your list. + * @member: the name of the list_struct within the struct. + */ +#define list_for_each_entry_safe_reverse(pos, n, head, member) \ + for (pos = list_entry((head)->prev, typeof(*pos), member), \ + n = list_entry(pos->member.prev, typeof(*pos), member); \ + &pos->member != (head); \ + pos = n, n = list_entry(n->member.prev, typeof(*n), member)) + #endif /* _LIST_H */ diff --git a/mp3dec.c b/mp3dec.c index af9de548..f809daec 100644 --- a/mp3dec.c +++ b/mp3dec.c @@ -19,8 +19,8 @@ /** \file mp3dec.c paraslash's mp3 decoder */ #include "para.h" - #include "list.h" +#include "sched.h" #include "filter.h" #include "error.h" #include @@ -67,8 +67,8 @@ next_frame: goto out; return FRAME_HEADER_SIZE; } - fn->fci->samplerate = pmd->frame.header.samplerate; - fn->fci->channels = MAD_NCHANNELS(&pmd->frame.header); + fn->fc->samplerate = pmd->frame.header.samplerate; + fn->fc->channels = MAD_NCHANNELS(&pmd->frame.header); ret = mad_frame_decode(&pmd->frame, &pmd->stream); if (ret) { if (MAD_RECOVERABLE(pmd->stream.error) || pmd->stream.error == MAD_ERROR_BUFLEN) @@ -98,8 +98,8 @@ next_frame: out: if (pmd->stream.next_frame) { /* we still have some data */ size_t off = pmd->stream.bufend - pmd->stream.next_frame; - PARA_DEBUG_LOG("converted %zd, rate: %u, returning %zd\n", len - off, - fn->fci->samplerate, copy - off); +// PARA_DEBUG_LOG("converted %zd, rate: %u, returning %zd\n", len - off, +// fn->fc->samplerate, copy - off); return copy - off; } return copy; diff --git a/oggdec.c b/oggdec.c index 3554eb1b..e1c4517e 100644 --- a/oggdec.c +++ b/oggdec.c @@ -22,6 +22,7 @@ #include "oggdec_filter.cmdline.h" #include "list.h" +#include "sched.h" #include "filter.h" #include "error.h" #include "string.h" @@ -53,11 +54,11 @@ static size_t cb_read(void *buf, size_t size, size_t nmemb, void *datasource) size_t ret, have = pod->inbuf_len - pod->converted; char *p = pod->inbuf + pod->converted; - if (*fn->fci->eof) - return 0; // PARA_DEBUG_LOG("pod = %p\n", pod); // PARA_DEBUG_LOG("vorbis requests %d bytes, have %d\n", size * nmemb, have); if (pod->inbuf_len < size) { + if (*fn->fc->input_eof) + return 0; errno = EAGAIN; return -1; } @@ -133,7 +134,7 @@ static ssize_t ogg_convert(char *inbuffer, size_t len, struct filter_node *fn) if (!pod->vf) { int ib = 1024 * conf->initial_buffer_arg; /* initial buffer */ - if (len fci->eof && !fn->fci->error) { + if (len fc->input_eof) { PARA_INFO_LOG("initial input buffer %zd/%d, waiting for more data\n", len, ib); return 0; @@ -154,9 +155,9 @@ static ssize_t ogg_convert(char *inbuffer, size_t len, struct filter_node *fn) return -E_OGGDEC_BADHEADER; if (ret < 0) return -E_OGGDEC_FAULT; - fn->fci->channels = ov_info(pod->vf, 0)->channels; - fn->fci->samplerate = ov_info(pod->vf, 0)->rate; - PARA_NOTICE_LOG("%d channels, %d Hz\n", fn->fci->channels, fn->fci->samplerate); + fn->fc->channels = ov_info(pod->vf, 0)->channels; + fn->fc->samplerate = ov_info(pod->vf, 0)->rate; + PARA_NOTICE_LOG("%d channels, %d Hz\n", fn->fc->channels, fn->fc->samplerate); } again: ret = ov_read(pod->vf, fn->buf + fn->loaded, fn->bufsize - fn->loaded, @@ -167,7 +168,7 @@ again: if (ret < 0) return -E_OGGDEC_BADLINK; fn->loaded += ret; - if (!*fn->fci->eof && !fn->fci->error && fn->loaded < fn->bufsize) + if (!*fn->fc->input_eof && fn->loaded < fn->bufsize) goto again; return pod->converted; } diff --git a/ortp_recv.c b/ortp_recv.c index 905d580e..9c901364 100644 --- a/ortp_recv.c +++ b/ortp_recv.c @@ -21,6 +21,8 @@ #include "para.h" #include "ortp.h" +#include "list.h" +#include "sched.h" #include "recv.h" #include "ortp_recv.cmdline.h" @@ -38,7 +40,6 @@ extern int msg_to_buf(mblk_t *, char *, int); * \sa receiver receiver_node */ struct private_ortp_recv_data { - /** * * @@ -66,21 +67,19 @@ uint32_t timestamp; uint32_t chunk_ts; }; -static int ortp_recv_pre_select(struct receiver_node *rn, - __a_unused fd_set *rfds, __a_unused fd_set *wfds, - struct timeval *timeout) +static void ortp_recv_pre_select(struct sched *s, struct task *t) { + struct receiver_node *rn = t->private_data; struct private_ortp_recv_data *pord = rn->private_data; - struct timeval now, tmp; + struct timeval tmp; - gettimeofday(&now, NULL); - if (tv_diff(&now, &pord->next_chunk, &tmp) >= 0) { + if (tv_diff(&s->now, &pord->next_chunk, &tmp) >= 0) { tmp.tv_sec = 0; tmp.tv_usec = 1000; } - if (tv_diff(timeout, &tmp, NULL) > 0) - *timeout = tmp; - return -1; /* we did not modify the fd sets */ + if (tv_diff(&s->timeout, &tmp, NULL) > 0) + s->timeout = tmp; + t->ret = 1; } static void compute_next_chunk(unsigned chunk_time, @@ -97,47 +96,43 @@ static void compute_next_chunk(unsigned chunk_time, pord->next_chunk.tv_usec); } -static int ortp_recv_post_select(struct receiver_node *rn, - __a_unused int select_ret, __a_unused fd_set *rfds, - __a_unused fd_set *wfds) +static void ortp_recv_post_select(struct sched *s, struct task *t) { + struct receiver_node *rn = t->private_data; struct private_ortp_recv_data *pord = rn->private_data; mblk_t *mp; - int ret, packet_type, stream_type; + int packet_type, stream_type; char tmpbuf[CHUNK_SIZE + 3]; - struct timeval now; unsigned chunk_time; - gettimeofday(&now, NULL); -// PARA_DEBUG_LOG("rn: %p, pord: %p, session: %p\n", rn, pord, pord->session); - if (pord->start.tv_sec) { - struct timeval diff; - if (tv_diff(&now, &pord->next_chunk, &diff) < 0) - return 1; - } +// PARA_INFO_LOG("rn: %p, pord: %p, session: %p\n", rn, pord, pord->session); + t->ret = 1; + if (pord->start.tv_sec) + if (tv_diff(&s->now, &pord->next_chunk, NULL) < 0) + return; mp = rtp_session_recvm_with_ts(pord->session, pord->timestamp); if (!mp) { struct timeval min_delay = {0, 100}; // PARA_INFO_LOG("nope, chunk_ts = %d, loaded: %d, bad: %d\n", // pord->timestamp, rn->loaded, pord->c_bad); pord->c_bad++; + t->ret = -E_TOO_MANY_BAD_CHUNKS; if ((pord->c_bad > 5000 && pord->start.tv_sec) || pord->c_bad > 10000) - return -E_TOO_MANY_BAD_CHUNKS; - tv_add(&now, &min_delay, &pord->next_chunk); - return 1; + return; + t->ret = 1; + tv_add(&s->now, &min_delay, &pord->next_chunk); + return; } /* okay, we have a chunk of data */ if (!pord->start.tv_sec) - pord->start = now; - ret = msg_to_buf(mp, tmpbuf, CHUNK_SIZE); -// PARA_DEBUG_LOG("have it ts = %d, chunk_ts = %d, loaded: %d, " -// "bad: %d, len: %d\n", pord->timestamp, pord->chunk_ts, -// rn->loaded, pord->c_bad, ret); - if (ret < ORTP_AUDIO_HEADER_LEN) { - if (ret < 0) - ret = -E_MSG_TO_BUF; + pord->start = s->now; + t->ret = msg_to_buf(mp, tmpbuf, CHUNK_SIZE); + if (t->ret < ORTP_AUDIO_HEADER_LEN) { + rn->eof = 1; + if (t->ret < 0) + t->ret = -E_MSG_TO_BUF; else - ret = 0; + t->ret = -E_ORTP_RECV_EOF; goto err_out; } packet_type = READ_PACKET_TYPE(tmpbuf); @@ -150,65 +145,67 @@ static int ortp_recv_post_select(struct receiver_node *rn, switch (packet_type) { unsigned header_len, payload_len; case ORTP_EOF: - ret = 0; + rn->eof = 1; + t->ret = -E_ORTP_RECV_EOF; goto err_out; case ORTP_BOF: - PARA_INFO_LOG("bof (%d)\n", ret); + PARA_INFO_LOG("bof (%d)\n", t->ret); pord->have_header = 1; /* fall through */ case ORTP_DATA: if (!pord->have_header && stream_type) /* can't use the data, wait for header */ goto success; - if (ret + rn->loaded >= CHUNK_SIZE + ORTP_AUDIO_HEADER_LEN) { - ret = -E_OVERRUN; + if (t->ret + rn->loaded >= CHUNK_SIZE + ORTP_AUDIO_HEADER_LEN) { + t->ret = -E_OVERRUN; goto err_out; } - if (ret > ORTP_AUDIO_HEADER_LEN) { + if (t->ret > ORTP_AUDIO_HEADER_LEN) { memcpy(rn->buf + rn->loaded, tmpbuf + ORTP_AUDIO_HEADER_LEN, - ret - ORTP_AUDIO_HEADER_LEN); - rn->loaded += ret - ORTP_AUDIO_HEADER_LEN; + t->ret - ORTP_AUDIO_HEADER_LEN); + rn->loaded += t->ret - ORTP_AUDIO_HEADER_LEN; } goto success; case ORTP_HEADER: header_len = READ_HEADER_LEN(tmpbuf); PARA_DEBUG_LOG("header packet (%d bytes), header len: %d\n", - ret, header_len); + t->ret, header_len); if (!pord->have_header) { pord->have_header = 1; memcpy(rn->buf, tmpbuf + ORTP_AUDIO_HEADER_LEN, - ret - ORTP_AUDIO_HEADER_LEN); - rn->loaded = ret - ORTP_AUDIO_HEADER_LEN; + t->ret - ORTP_AUDIO_HEADER_LEN); + rn->loaded = t->ret - ORTP_AUDIO_HEADER_LEN; goto success; } - if (header_len + ORTP_AUDIO_HEADER_LEN > ret) { - ret = -E_INVALID_HEADER; + if (header_len + ORTP_AUDIO_HEADER_LEN > t->ret) { + t->ret = -E_INVALID_HEADER; goto err_out; } - payload_len = ret - ORTP_AUDIO_HEADER_LEN - header_len; + payload_len = t->ret - ORTP_AUDIO_HEADER_LEN - header_len; // PARA_INFO_LOG("len: %d header_len: %d, payload_len: %d, loaded: %d\n", ret, // header_len, payload_len, rn->loaded); if (rn->loaded + payload_len > CHUNK_SIZE) { - ret = -E_OVERRUN; + t->ret = -E_OVERRUN; goto err_out; } if (payload_len) memcpy(rn->buf + rn->loaded, tmpbuf - + (ret - payload_len), payload_len); + + (t->ret - payload_len), payload_len); rn->loaded += payload_len; goto success; } success: + t->ret = 1; freemsg(mp); if (pord->c_bad) { pord->c_bad = 0; - pord->next_chunk = now; + pord->next_chunk = s->now; } compute_next_chunk(chunk_time, pord); - return 1; + return; err_out: + rn->eof = 1; freemsg(mp); - return ret; } static void ortp_shutdown(void) diff --git a/recv.c b/recv.c index af15eb07..cf178d20 100644 --- a/recv.c +++ b/recv.c @@ -17,10 +17,13 @@ */ #include "para.h" +#include "list.h" +#include "sched.h" #include "recv.h" #include "recv.cmdline.h" #include "fd.h" #include "error.h" +#include "stdout.h" struct gengetopt_args_info conf; @@ -55,13 +58,23 @@ static void *parse_config(int argc, char *argv[], int *receiver_num) return check_receiver_arg(conf.receiver_arg, receiver_num); } +void rn_event_handler(struct task *t) +{ + PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret)); + unregister_task(t); +} + int main(int argc, char *argv[]) { - int ret, eof = 0, max, r_opened = 0, receiver_num; - struct timeval timeout; + int ret, r_opened = 0, receiver_num; struct receiver *r = NULL; - fd_set rfds, wfds; struct receiver_node rn; + struct stdout_task sot; + struct sched s; + + init_sched(); + s.default_timeout.tv_sec = 1; + s.default_timeout.tv_usec = 0; memset(&rn, 0, sizeof(struct receiver_node)); for (ret = 0; receivers[ret].name; ret++) @@ -78,44 +91,23 @@ int main(int argc, char *argv[]) if (ret < 0) goto out; r_opened = 1; -recv: - FD_ZERO(&rfds); - FD_ZERO(&wfds); - timeout.tv_sec = 0; - timeout.tv_usec = 999 * 1000; - max = -1; - ret = r->pre_select(&rn, &rfds, &wfds, &timeout); - max = PARA_MAX(max, ret); - PARA_DEBUG_LOG("timeout: %lums, max: %d\n", tv2ms(&timeout), max); - ret = para_select(max + 1, &rfds, &wfds, &timeout); - if (ret < 0) { - ret = -E_RECV_SELECT; - goto out; - } - ret = r->post_select(&rn, ret, &rfds, &wfds); - if (ret < 0) - goto out; - if (!ret) - eof = 1; - if (!rn.loaded) { - if (eof) - goto out; - goto recv; - } - ret = write(STDOUT_FILENO, rn.buf, rn.loaded); - PARA_DEBUG_LOG("wrote %d/%zd\n", ret, rn.loaded); - if (ret < 0) { - ret = -E_WRITE_STDOUT; - goto out; - } - if (ret != rn.loaded) { - PARA_INFO_LOG("short write %d/%zd\n", ret, rn.loaded); - memmove(rn.buf, rn.buf + ret, rn.loaded - ret); - } - rn.loaded -= ret; - if (rn.loaded || !eof) - goto recv; + stdout_set_defaults(&sot); + sot.buf = rn.buf; + sot.loaded = &rn.loaded; + sot.input_eof = &rn.eof; + register_task(&sot.task); + + rn.task.private_data = &rn; + rn.task.pre_select = r->pre_select; + rn.task.post_select = r->post_select; + rn.task.event_handler = rn_event_handler; + rn.task.flags = 0; + sprintf(rn.task.status, "receiver node"); + register_task(&rn.task); + + + ret = sched(&s); out: if (r_opened) r->close(&rn); diff --git a/recv.h b/recv.h index f7fc7259..3c8b543b 100644 --- a/recv.h +++ b/recv.h @@ -34,6 +34,8 @@ struct receiver_node { int eof; /** pointer to the configuration data for this instance */ void *conf; + /** the task associated with this instance */ + struct task task; }; /** @@ -111,38 +113,26 @@ struct receiver { * * The pre_select function gets called from the driving application before * entering its select loop. The receiver may use this hook to add any file - * descriptors to \a rfds and \a wfds in order to check the result later in the - * post_select hook. + * descriptors to the sets of file descriptors given by \a s. * - * \a timeout is a value-result parameter, initially containing the timeout for - * select() which was set by the application or by another receiver node. If - * the receiver wants its pre_select function to be called at some earlier time - * than the time determined by \a timeout, it may set \a timeout to an - * appropriate smaller value. However, it must never increase this timeout. * - * This function must return the highest-numbered descriptor it wants to being - * checked, or -1 if no file descriptors should be checked for this run. - * - * \sa select(2), receiver_node:private_data, time.c + * \sa select(2), time.c struct task, struct sched */ - int (*pre_select)(struct receiver_node *rn, fd_set *rfds, - fd_set *wfds, struct timeval *timeout); + void (*pre_select)(struct sched *s, struct task *t); /** * * * evaluate the result from select() * - * If the call to select() was succesful, this hook gets called. It should - * check all file descriptors which were added to any of the the fd sets during - * the previous call to pre_select. According to the result, it may then use - * any non-blocking I/O to establish a connection or to receive the audio data. + * This hook gets called after the call to select(). It should check all file + * descriptors which were added to any of the the fd sets during the previous + * call to pre_select. According to the result, it may then use any + * non-blocking I/O to establish a connection or to receive the audio data. * - * A negative return value is interpreted as an error. * * \sa select(2), struct receiver */ - int (*post_select)(struct receiver_node *rn, int select_ret, - fd_set *rfds, fd_set *wfds); + void (*post_select)(struct sched *s, struct task *t); }; diff --git a/recv_common.c b/recv_common.c index 078e44d1..e39719da 100644 --- a/recv_common.c +++ b/recv_common.c @@ -20,6 +20,8 @@ #include "para.h" +#include "list.h" +#include "sched.h" #include "recv.h" #include "string.h" diff --git a/sched.c b/sched.c new file mode 100644 index 00000000..9db7c092 --- /dev/null +++ b/sched.c @@ -0,0 +1,106 @@ +#include +#include "para.h" +#include "ipc.h" +#include "fd.h" +#include "list.h" +#include "sched.h" +#include "string.h" +#include "error.h" + +struct list_head pre_select_list; +struct list_head post_select_list; + +static void sched_preselect(struct sched *s) +{ + struct task *t, *tmp; +again: + list_for_each_entry_safe(t, tmp, &pre_select_list, pre_select_node) { + t->pre_select(s, t); + if (t->ret > 0 || !t->event_handler) + continue; + t->event_handler(t); + goto again; + } +} + +static void sched_post_select(struct sched *s) +{ + struct task *t, *tmp; + + list_for_each_entry_safe(t, tmp, &post_select_list, post_select_node) { + t->post_select(s, t); + if (t->ret > 0 || !t->event_handler) + continue; + t->event_handler(t); + } +} + +int sched(struct sched *s) +{ + + gettimeofday(&s->now, NULL); +again: + FD_ZERO(&s->rfds); + FD_ZERO(&s->wfds); + s->timeout = s->default_timeout; + s->max_fileno = -1; + sched_preselect(s); + s->select_ret = para_select(s->max_fileno + 1, &s->rfds, + &s->wfds, &s->timeout); + if (s->select_ret < 0) + return s->select_ret; + gettimeofday(&s->now, NULL); + sched_post_select(s); + if (list_empty(&pre_select_list) && list_empty(&post_select_list)) + return 0; + goto again; +} + +void *register_task(struct task *t) +{ + PARA_INFO_LOG("registering %s (%p)\n", t->status, t); + if (t->pre_select) { + PARA_DEBUG_LOG("pre_select: %p\n", &t->pre_select); + if (t->flags & PRE_ADD_TAIL) + list_add_tail(&t->pre_select_node, &pre_select_list); + else + list_add(&t->pre_select_node, &pre_select_list); + } + if (t->post_select) { + PARA_DEBUG_LOG("post_select: %p\n", &t->pre_select); + if (t->flags & POST_ADD_TAIL) + list_add_tail(&t->post_select_node, &post_select_list); + else + list_add(&t->post_select_node, &post_select_list); + } + return t; +} + +void unregister_task(struct task *t) +{ + PARA_INFO_LOG("unregistering %s (%p)\n", t->status, t); + if (t->pre_select) + list_del(&t->pre_select_node); + if (t->post_select) + list_del(&t->post_select_node); +}; + +void init_sched(void) +{ + INIT_LIST_HEAD(&pre_select_list); + INIT_LIST_HEAD(&post_select_list); +}; + +void sched_shutdown(void) +{ + struct task *t, *tmp; + + list_for_each_entry_safe(t, tmp, &pre_select_list, pre_select_node) + unregister_task(t); + /* remove tasks which do not have a pre_select hook */ + list_for_each_entry_safe(t, tmp, &post_select_list, post_select_node) + unregister_task(t); +}; + + +//char *get_tast_list(); diff --git a/sched.h b/sched.h new file mode 100644 index 00000000..9855475d --- /dev/null +++ b/sched.h @@ -0,0 +1,29 @@ +struct sched { + struct timeval now, timeout; + int max_fileno; + fd_set rfds, wfds; + int select_ret; + struct timeval default_timeout; +}; + +struct task { + void *private_data; + unsigned flags; + int ret; + void (*pre_select)(struct sched *s, struct task *t); + void (*post_select)(struct sched *s, struct task *t); + void (*event_handler)(struct task *t); + struct list_head pre_select_node; + struct list_head post_select_node; + char status[MAXLINE]; +}; + +enum task_flags { + PRE_ADD_TAIL = 1, + POST_ADD_TAIL = 2, +}; + +void *register_task(struct task *t); +void unregister_task(struct task *t); +int sched(struct sched *s); +void init_sched(void); diff --git a/server.c b/server.c index 1805c92b..2391e07c 100644 --- a/server.c +++ b/server.c @@ -33,7 +33,6 @@ #include "db.h" #include "server.h" #include "afs.h" -#include "afh.h" /* FIXME */ #include "config.h" #include "close_on_fork.h" #include "send.h" diff --git a/stdin.c b/stdin.c new file mode 100644 index 00000000..22f1dd97 --- /dev/null +++ b/stdin.c @@ -0,0 +1,56 @@ +#include "para.h" +#include "string.h" +#include "list.h" +#include "sched.h" +#include "fd.h" +#include "error.h" +#include "stdin.h" + +void stdin_pre_select(struct sched *s, struct task *t) +{ + struct stdin_task *sit = t->private_data; + if (sit->loaded < sit->bufsize) + para_fd_set(STDIN_FILENO, &s->rfds, &s->max_fileno); + t->ret = 1; /* success */ +} + +static void stdin_default_event_handler(struct task *t) +{ + PARA_NOTICE_LOG("%p: %s\n", t, PARA_STRERROR(-t->ret)); + unregister_task(t); +} + +void stdin_post_select(struct sched *s, struct task *t) +{ + struct stdin_task *sit = t->private_data; + ssize_t ret; + + t->ret = 1; + if (sit->loaded >= sit->bufsize) + return; + if (!FD_ISSET(STDIN_FILENO, &s->rfds)) + return; + ret = read(STDIN_FILENO, sit->buf + sit->loaded, sit->bufsize - sit->loaded); + if (ret < 0) + t->ret = -E_STDIN_READ; + else if (ret > 0) { + sit->loaded += ret; + t->ret = ret; + } else + t->ret = -E_STDIN_EOF; + if (t->ret < 0) + sit->eof = 1; +} + +void stdin_set_defaults(struct stdin_task *sit) +{ + sit->bufsize = 16 * 1024, + sit->loaded = 0, + sit->eof = 0, + sit->task.flags = 0, + sit->task.pre_select = stdin_pre_select; + sit->task.post_select = stdin_post_select; + sit->task.event_handler = stdin_default_event_handler; + sit->task.private_data = sit; + sprintf(sit->task.status, "stdin reader"); +} diff --git a/stdin.h b/stdin.h new file mode 100644 index 00000000..cb6cbfb6 --- /dev/null +++ b/stdin.h @@ -0,0 +1,11 @@ +struct stdin_task { + char *buf; + size_t bufsize; + size_t loaded; + struct task task; + int eof; +}; + +void stdin_pre_select(struct sched *s, struct task *t); +void stdin_post_select(struct sched *s, struct task *t); +void stdin_set_defaults(struct stdin_task *sit); diff --git a/stdout.c b/stdout.c new file mode 100644 index 00000000..1c60669a --- /dev/null +++ b/stdout.c @@ -0,0 +1,58 @@ +#include "para.h" +#include "string.h" +#include "list.h" +#include "sched.h" +#include "fd.h" +#include "error.h" +#include "stdout.h" + +void stdout_pre_select(struct sched *s, struct task *t) +{ + struct stdout_task *sot = t->private_data; + + t->ret = 1; + sot->check_fd = 0; + if (!*sot->loaded) + return; + sot->check_fd = 1; + para_fd_set(STDOUT_FILENO, &s->wfds, &s->max_fileno); +} + +void stdout_post_select(struct sched *s, struct task *t) +{ + struct stdout_task *sot = t->private_data; + ssize_t ret; + + t->ret = 1; + if (!sot->check_fd) { + if (*sot->input_eof) + t->ret = -E_STDOUT_EOF; + return; + } + if (!FD_ISSET(STDOUT_FILENO, &s->wfds)) + return; + t->ret = -E_STDOUT_WRITE; + ret = write(STDOUT_FILENO, sot->buf, *sot->loaded); + if (ret <= 0) + return; + *sot->loaded -= ret; + t->ret = 1; +} + +void stdout_default_event_handler(struct task *t) +{ + PARA_NOTICE_LOG("%p: %s\n", t, PARA_STRERROR(-t->ret)); + unregister_task(t); +} + + +void stdout_set_defaults(struct stdout_task *sot) +{ + sot->task.private_data = sot; + sot->task.pre_select = stdout_pre_select; + sot->task.post_select = stdout_post_select; + sot->task.event_handler = stdout_default_event_handler; + sot->task.flags = 0; + sot->eof = 0; + sprintf(sot->task.status, "stdout writer"); +} diff --git a/stdout.h b/stdout.h new file mode 100644 index 00000000..4dafafde --- /dev/null +++ b/stdout.h @@ -0,0 +1,14 @@ +/** \file stdout.h common code for uitlities that write to stdout */ +struct stdout_task { + char *buf; + size_t *bufsize; + size_t *loaded; + int *input_eof; + int eof; + struct task task; + int check_fd; +}; + +void stdout_pre_select(struct sched *s, struct task *t); +void stdout_post_select(struct sched *s, struct task *t); +void stdout_set_defaults(struct stdout_task *sot); diff --git a/wav.c b/wav.c index e229317d..5ab3bb39 100644 --- a/wav.c +++ b/wav.c @@ -21,6 +21,7 @@ #include "para.h" #include "list.h" +#include "sched.h" #include "filter.h" #include "string.h" @@ -66,7 +67,7 @@ static ssize_t wav_convert(char *inbuf, size_t len, struct filter_node *fn) int *bof = fn->private_data; if (*bof) { - make_wav_header(fn->fci->channels, fn->fci->samplerate, fn); + make_wav_header(fn->fc->channels, fn->fc->samplerate, fn); fn->loaded = WAV_HEADER_LEN; *bof = 0; // return 0; @@ -95,7 +96,7 @@ static void wav_open(struct filter_node *fn) fn->private_data = para_malloc(sizeof(int)); bof = fn->private_data; *bof = 1; - PARA_DEBUG_LOG("wav filter node: %p, output buffer: %p, loaded: %zd\n", + PARA_INFO_LOG("wav filter node: %p, output buffer: %p, loaded: %zd\n", fn, fn->buf, fn->loaded); } diff --git a/write.c b/write.c index 6e4ee063..e4206203 100644 --- a/write.c +++ b/write.c @@ -19,159 +19,97 @@ #include "para.h" #include "string.h" #include "write.cmdline.h" +#include "list.h" +#include "sched.h" +#include "stdin.h" #include "write.h" #include "write_common.h" #include "fd.h" - -#include /* gettimeofday */ - #include "error.h" -#define WAV_HEADER_LEN 44 - -static char *audiobuf; -static struct timeval *start_time; -struct gengetopt_args_info conf; - INIT_WRITE_ERRLISTS; -void para_log(int ll, const char* fmt,...) -{ - va_list argp; +struct check_wav_task { + char *buf; + size_t *loaded; + int *eof; + unsigned channels; + unsigned sample_rate; + struct task task; +}; + +struct initial_delay_task { + struct timeval start_time; + struct task task; +}; + +static struct gengetopt_args_info conf; +struct stdin_task sit; +struct check_wav_task cwt; +struct initial_delay_task idt; +static struct writer_node_group *wng; - if (ll < conf.loglevel_arg) - return; - va_start(argp, fmt); - vfprintf(stderr, fmt, argp); - va_end(argp); -} +#define WAV_HEADER_LEN 44 /** - * read WAV_HEADER_LEN bytes from stdin to audio buffer + * test if audio buffer contains a valid wave header * - * \return -E_READ_HDR on errors and on eof before WAV_HEADER_LEN could be - * read. A positive return value indicates success. + * \return If not, return -E_NO_WAV_HEADER, otherwise, return zero. If + * there is less than WAV_HEADER_LEN bytes awailable, return one. */ -static int read_wav_header(void) +static void check_wav_pre_select(__a_unused struct sched *s, struct task *t) { - ssize_t ret, count = 0; + struct check_wav_task *cwt = t->private_data; + unsigned char *a; - while (count < WAV_HEADER_LEN) { - ret = read(STDIN_FILENO, audiobuf + count, WAV_HEADER_LEN - count); - if (ret <= 0) - return -E_READ_HDR; - count += ret; + if (*cwt->loaded < WAV_HEADER_LEN) { + t->ret = *cwt->eof? -E_PREMATURE_END : 1; + return; } - return 1; + cwt->channels = 2; + cwt->sample_rate = 44100; + a = (unsigned char*)cwt->buf; + t->ret = -E_NO_WAV_HEADER; + if (a[0] != 'R' || a[1] != 'I' || a[2] != 'F' || a[3] != 'F') + return; + cwt->channels = (unsigned) a[22]; + cwt->sample_rate = a[24] + (a[25] << 8) + (a[26] << 16) + (a[27] << 24); + *cwt->loaded -= WAV_HEADER_LEN; + memmove(cwt->buf, cwt->buf + WAV_HEADER_LEN, *cwt->loaded); + t->ret = -E_WAV_HEADER_SUCCESS; + PARA_INFO_LOG("channels: %d, sample_rate: %d\n", cwt->channels, cwt->sample_rate); } -/** - * check if current time is later than start_time - * \param diff pointer to write remaining time to - * - * If start_time was not given, or current time is later than given - * start_time, return 0. Otherwise, return 1 and write the time - * difference between current time and start_time to diff. diff may be - * NULL. - * - */ -static int start_time_in_future(struct timeval *diff) +static void initial_delay_pre_select(struct sched *s, struct task *t) { - struct timeval now; + struct initial_delay_task *idt = t->private_data; + struct timeval diff; - if (!conf.start_time_given) - return 0; - gettimeofday(&now, NULL); - return tv_diff(start_time, &now, diff) > 0? 1 : 0; -} - -/** - * sleep until time given at command line - * - * This is called if the initial buffer is filled. It returns - * immediately if no start_time was given at the command line - * or if the given start time is in the past. - * - */ -static void do_initial_delay(struct timeval *delay) -{ - do - para_select(1, NULL, NULL, delay); - while (start_time_in_future(delay)); + t->ret = -E_NO_DELAY; + if (!idt->start_time.tv_sec && !idt->start_time.tv_usec) + return; + t->ret = -E_DELAY_TIMEOUT; + if (tv_diff(&s->now, &idt->start_time, &diff) > 0) + return; + t->ret = 1; + if (tv_diff(&s->timeout , &diff, NULL) > 0) + s->timeout = diff; } -static int read_stdin(char *buf, size_t bytes_to_load, size_t *loaded) -{ - ssize_t ret; - - while (*loaded < bytes_to_load) { - ret = read(STDIN_FILENO, buf + *loaded, bytes_to_load - *loaded); - if (ret <= 0) { - if (ret < 0) - ret = -E_READ_STDIN; - return ret; - } - *loaded += ret; - } - return 1; -} -/** - * play raw pcm data - * \param loaded number of bytes already loaded - * - * If start_time was given, prebuffer data until buffer is full or - * start_time is reached. In any case, do not start playing before - * start_time. - * - * \return positive on success, negative on errors. - */ -static int pcm_write(struct writer_node_group *wng, size_t loaded) +void para_log(int ll, const char* fmt,...) { - size_t bufsize, prebuf_size, bytes_to_load; - struct timeval delay; - int ret, not_yet_started = 1; + va_list argp; - ret = wng_open(wng); - if (ret < 0) - goto out; - PARA_INFO_LOG("max chunk_bytes: %zd\n", wng->max_chunk_bytes); - bufsize = (conf.bufsize_arg * 1024 / wng->max_chunk_bytes) - * wng->max_chunk_bytes; - audiobuf = para_realloc(audiobuf, bufsize); - prebuf_size = conf.prebuffer_arg * bufsize / 100; - bytes_to_load = PARA_MAX(prebuf_size, wng->max_chunk_bytes); - ret = read_stdin(audiobuf, bytes_to_load, &loaded); - if (ret <= 0 || loaded < bytes_to_load) { - if (ret >= 0) - ret = -E_PREMATURE_END; - goto out; - } - if (not_yet_started && start_time && start_time_in_future(&delay)) - do_initial_delay(&delay); - not_yet_started = 0; -again: - ret = wng_write(wng, audiobuf, &loaded); - if (ret <= 0) - goto out; - ret = -E_WRITE_OVERRUN; - if (loaded >= bufsize) - goto out; - bytes_to_load = PARA_MIN(wng->max_chunk_bytes, bufsize); - ret = read_stdin(audiobuf, bytes_to_load, &loaded); - if (ret < 0) - goto out; - if (!ret) - wng->eof = 1; - goto again; -out: - wng_close(wng); - return ret; + if (ll < conf.loglevel_arg) + return; + va_start(argp, fmt); + vfprintf(stderr, fmt, argp); + va_end(argp); } static struct writer_node_group *check_args(void) { int i, ret = -E_WRITE_SYNTAX; - static struct timeval tv; struct writer_node_group *wng = NULL; if (conf.list_writers_given) { @@ -188,16 +126,15 @@ static struct writer_node_group *check_args(void) free(msg); exit(EXIT_SUCCESS); } - if (conf.prebuffer_arg < 0 || conf.prebuffer_arg > 100) - goto out; +// if (conf.prebuffer_arg < 0 || conf.prebuffer_arg > 100) +// goto out; if (conf.start_time_given) { long unsigned sec, usec; if (sscanf(conf.start_time_arg, "%lu:%lu", &sec, &usec) != 2) goto out; - tv.tv_sec = sec; - tv.tv_usec = usec; - start_time = &tv; + idt.start_time.tv_sec = sec; + idt.start_time.tv_usec = usec; } if (!conf.writer_given) { wng = setup_default_wng(); @@ -205,11 +142,16 @@ static struct writer_node_group *check_args(void) goto out; } wng = wng_new(conf.writer_given); + ret = -E_WRITE_SYNTAX; for (i = 0; i < conf.writer_given; i++) { - ret = check_writer_arg(conf.writer_arg[i]); - if (ret < 0) + int writer_num; + wng->writer_nodes[i].conf = check_writer_arg( + conf.writer_arg[i], &writer_num); + if (!wng->writer_nodes[i].conf) goto out; - wng->writer_nodes[i].writer = &writers[ret]; + wng->writer_nodes[i].writer = &writers[writer_num]; + sprintf(wng->writer_nodes[i].task.status, "%s", + writer_names[writer_num]); } ret = 1; out: @@ -219,41 +161,89 @@ out: return NULL; } -/** - * test if audio buffer contains a valid wave header - * - * \return If not, return 0, otherwise, store number of channels and sample rate - * in struct conf and return WAV_HEADER_LEN. - */ -static size_t check_wave(void) +static void wng_event_handler(struct task *t) { - unsigned char *a = (unsigned char*)audiobuf; - if (a[0] != 'R' || a[1] != 'I' || a[2] != 'F' || a[3] != 'F') - return WAV_HEADER_LEN; - conf.channels_arg = (unsigned) a[22]; - conf.sample_rate_arg = a[24] + (a[25] << 8) + (a[26] << 16) + (a[27] << 24); - return 0; + struct writer_node_group *g = t->private_data; + + PARA_INFO_LOG("%s\n", PARA_STRERROR(-t->ret)); + unregister_task(t); + wng_close(g); + wng_destroy(g); +} + + +static void idt_event_handler(struct task *t) +{ + int ret; + + PARA_INFO_LOG("%s\n", PARA_STRERROR(-t->ret)); + unregister_task(t); + wng->buf = sit.buf; + wng->loaded = &sit.loaded; + wng->input_eof = &sit.eof; + wng->task.event_handler = wng_event_handler; + ret = wng_open(wng); + if (ret < 0) { + PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret)); + exit(EXIT_FAILURE); + } +} + +static void cwt_event_handler(struct task *t) +{ + if (t->ret != -E_NO_WAV_HEADER && t->ret != -E_WAV_HEADER_SUCCESS) { + PARA_ERROR_LOG("%s\n", PARA_STRERROR(-t->ret)); + exit(EXIT_FAILURE); + } + PARA_INFO_LOG("%s\n", PARA_STRERROR(-t->ret)); + unregister_task(t); +// if (t->ret == -E_WAV_HEADER_SUCCESS) { +// conf.channels_arg = cwt.channels; +// conf.sample_rate_arg = cwt.sample_rate; +// } + idt.task.pre_select = initial_delay_pre_select; + idt.task.private_data = &idt; + idt.task.event_handler = idt_event_handler; + sprintf(idt.task.status, "initial_delay"); + register_task(&idt.task); } int main(int argc, char *argv[]) { int ret = -E_WRITE_SYNTAX; - struct writer_node_group *wng = NULL; + struct sched s; cmdline_parser(argc, argv, &conf); + init_supported_writers(); + init_sched(); + wng = check_args(); if (!wng) goto out; - init_supported_writers(); - audiobuf = para_malloc(WAV_HEADER_LEN); - ret = read_wav_header(); - if (ret < 0) - goto out; - ret = pcm_write(wng, check_wave()); + stdin_set_defaults(&sit); + if (conf.bufsize_given) + sit.bufsize = conf.bufsize_arg; + sit.buf = para_malloc(sit.bufsize), + register_task(&sit.task); + + cwt.task.pre_select = check_wav_pre_select; + cwt.task.private_data = &cwt; + cwt.task.event_handler = cwt_event_handler; + cwt.buf = sit.buf; + cwt.loaded = &sit.loaded; + cwt.eof = &sit.eof; + sprintf(cwt.task.status, "check wav"); + register_task(&cwt.task); + + s.default_timeout.tv_sec = 1; + s.default_timeout.tv_usec = 0; + ret = sched(&s); + out: - wng_destroy(wng); - free(audiobuf); - if (ret < 0) + if (ret < 0) { PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret)); + ret = EXIT_FAILURE; + } else + ret = EXIT_SUCCESS; return ret; } diff --git a/write.ggo b/write.ggo index 4afff0c0..02d1a164 100644 --- a/write.ggo +++ b/write.ggo @@ -24,14 +24,6 @@ option "bufsize" b default="64" optional -option "prebuffer" p -#~~~~~~~~~~~~~~~~~~~ -"delay playback until buffer is filled" - - int typestr="percent" - default="100" - optional - option "writer" w #~~~~~~~~~~~~~~~~ @@ -52,34 +44,3 @@ denotes microseconds since the epoch" string typestr="timeval" optional - - -section "alsa options" -###################### - -option "device" d -#~~~~~~~~~~~~~~~~ -"set PCM device" - string typestr="device" - default="plughw:0,0" - optional - -option "channels" c -#~~~~~~~~~~~~~~~~~~ -"number of channels (only neccessary for raw -audio)" - - int typestr="num" - default="2" - optional - -option "sample_rate" s -#~~~~~~~~~~~~~~~~~~~~~ - -"force given sample rate (only neccessary for -raw audio)" - - int typestr="num" - default="44100" - optional - diff --git a/write.h b/write.h index 5f8ab3ab..f4c54ca3 100644 --- a/write.h +++ b/write.h @@ -25,12 +25,16 @@ enum writer_enum {WRITER_ENUM}; * decbribes one running instance of a writer */ struct writer_node { -/** points to the writer structure associated with this node */ + /** points to the writer structure associated with this node */ struct writer *writer; -/** writer-specific data */ + /** writer-specific data */ void *private_data; -/** send that many bytes in one go */ + /** send that many bytes in one go */ int chunk_bytes; + struct task task; + struct writer_node_group *wng; + /** the writer-specific configuration of this node */ + void *conf; }; /** describes one supported writer */ @@ -43,6 +47,18 @@ struct writer { * */ void (*init)(struct writer *w); +/** + * + * + * the command line parser of the writer + * + * It should check whether the command line options given by \a options are + * valid. On success, it should return a pointer to the writer-specific + * configuration data determined by \a options. Note that this might be called + * more than once with different values of \a options. + * + */ + void * (*parse_config)(char *options); /** * * open one instance of this writer @@ -62,6 +78,8 @@ int (*open)(struct writer_node *); * */ int (*write)(char *data, size_t nbytes, struct writer_node *); +void (*pre_select)(struct sched *s, struct task *t); +void (*post_select)(struct sched *s, struct task *t); /** * close one instance of the writer * @@ -81,16 +99,20 @@ void (*shutdown)(struct writer_node *); * describes a set of writer nodes that all write the same stream. */ struct writer_node_group { -/** number of nodes belonging to this group */ -unsigned num_writers; -/** array of pointers to the corresponding writer nodes */ -struct writer_node *writer_nodes; -/** keeps track of how many bytes have been written by each node */ -int *written; -/** the maximum of the chunk_bytes values of the writer nodes in this group */ -size_t max_chunk_bytes; -/** non-zero if end of file was encountered */ -int eof; + /** number of nodes belonging to this group */ + unsigned num_writers; + /** array of pointers to the corresponding writer nodes */ + struct writer_node *writer_nodes; + /** keeps track of how many bytes have been written by each node */ + int *written; + /** the maximum of the chunk_bytes values of the writer nodes in this group */ + size_t max_chunk_bytes; + /** non-zero if end of file was encountered */ + int *input_eof; + int eof; + char *buf; + size_t *loaded; + struct task task; }; /** loop over each writer node in a writer group */ diff --git a/write_common.c b/write_common.c index e76d7f46..cfdabe48 100644 --- a/write_common.c +++ b/write_common.c @@ -20,60 +20,45 @@ #include "para.h" #include "string.h" +#include "list.h" +#include "sched.h" #include "write.h" #include "error.h" const char *writer_names[] ={WRITER_NAMES}; struct writer writers[NUM_SUPPORTED_WRITERS] = {WRITER_ARRAY}; -int wng_write(struct writer_node_group *g, char *buf, size_t *loaded) +static void wng_post_select(__a_unused struct sched *s, struct task *t) { - int ret, i, need_more_writes = 1; + struct writer_node_group *g = t->private_data; + int i; size_t min_written = 0; - while (need_more_writes) { - need_more_writes = 0; - FOR_EACH_WRITER_NODE(i, g) { - size_t w = g->written[i]; - int bytes_to_write; - struct writer_node *wn = &g->writer_nodes[i]; - if (!i) - min_written = w; - else - min_written = PARA_MIN(min_written, w); - if (w == *loaded) - continue; - if (!g->eof && (*loaded < wn->chunk_bytes + w)) - continue; - bytes_to_write = PARA_MIN(wn->chunk_bytes, - *loaded - w); - ret = wn->writer->write(buf + w, bytes_to_write, wn); - if (ret < 0) - goto out; - if (ret != bytes_to_write) - PARA_WARNING_LOG("short write: %d/%d\n", ret, - bytes_to_write); - g->written[i] += ret; - need_more_writes = 1; - } + FOR_EACH_WRITER_NODE(i, g) { + struct writer_node *wn = &g->writer_nodes[i]; + t->ret = wn->task.ret; + if (t->ret < 0) + return; + if (!i) + min_written = t->ret; + else + min_written = PARA_MIN(min_written, t->ret); } - *loaded -= min_written; - ret = 0; - if (g->eof) - goto out; - if (*loaded) - memmove(buf, buf + min_written, *loaded); - FOR_EACH_WRITER_NODE(i, g) - g->written[i] -= min_written; - ret = 1; -out: - return ret; + *g->loaded -= min_written; + if (!*g->loaded && *g->input_eof) { + g->eof = 1; + t->ret = -E_WNG_EOF; + } else + t->ret = 1; + if (*g->loaded && min_written) + memmove(g->buf, g->buf + min_written, *g->loaded); } int wng_open(struct writer_node_group *g) { int i, ret = 1; + PARA_NOTICE_LOG("opening wng with %d writer(s)\n", g->num_writers); FOR_EACH_WRITER_NODE(i, g) { struct writer_node *wn = &g->writer_nodes[i]; ret = wn->writer->open(wn); @@ -81,17 +66,38 @@ int wng_open(struct writer_node_group *g) goto out; wn->chunk_bytes = ret; g->max_chunk_bytes = PARA_MAX(g->max_chunk_bytes, ret); + wn->wng = g; + PARA_DEBUG_LOG("pre_select: %p\n", &wn->writer->pre_select); + PARA_DEBUG_LOG("post_select: %p\n", &wn->writer->post_select); + wn->task.pre_select = wn->writer->pre_select; + wn->task.post_select = wn->writer->post_select; + wn->task.private_data = wn; + register_task(&wn->task); } + sprintf(g->task.status, "%s", "writer node group"); + g->eof = 0; + register_task(&g->task); out: return ret; } +void wng_destroy(struct writer_node_group *g) +{ + if (!g) + return; + free(g->written); + free(g->writer_nodes); + free(g); +} + void wng_close(struct writer_node_group *g) { int i; + PARA_NOTICE_LOG("closing wng with %d writer(s)\n", g->num_writers); FOR_EACH_WRITER_NODE(i, g) { struct writer_node *wn = &g->writer_nodes[i]; + unregister_task(&wn->task); wn->writer->close(wn); } } @@ -103,18 +109,12 @@ struct writer_node_group *wng_new(unsigned num_writers) g->writer_nodes = para_calloc(num_writers * sizeof(struct writer_node)); g->written = para_calloc(num_writers * sizeof(size_t)); + g->task.private_data = g; + g->task.post_select = wng_post_select; + g->task.flags = POST_ADD_TAIL; return g; } -void wng_destroy(struct writer_node_group *g) -{ - if (!g) - return; - free(g->written); - free(g->writer_nodes); - free(g); -} - void init_supported_writers(void) { int i; @@ -123,22 +123,30 @@ void init_supported_writers(void) writers[i].init(&writers[i]); } -int check_writer_arg(const char *arg) +void *check_writer_arg(char *wa, int *writer_num) { - int i, ret = -E_WRITE_COMMON_SYNTAX; - char *a = para_strdup(arg), *p = strchr(a, ':'); - if (p) - *p = '\0'; - p++; + int i; + + *writer_num = -E_WRITE_COMMON_SYNTAX; + PARA_INFO_LOG("checking %s\n", wa); FOR_EACH_WRITER(i) { - if (strcmp(writer_names[i], a)) + const char *name = writer_names[i]; + size_t len = strlen(name); + char c; + if (strlen(wa) < len) + continue; + if (strncmp(name, wa, len)) continue; - ret = i; - goto out; + c = wa[len]; + if (c && c != ' ') + continue; + if (c && !writers[i].parse_config) + return NULL; + *writer_num = i; + return writers[i].parse_config(c? wa + len + 1 : ""); } -out: - free(a); - return ret; + PARA_ERROR_LOG("%s", "writer not found\n"); + return NULL; } struct writer_node_group *setup_default_wng(void) @@ -151,7 +159,10 @@ struct writer_node_group *setup_default_wng(void) else default_writer = 1; wng->writer_nodes[0].writer = &writers[default_writer]; - PARA_INFO_LOG("using default writer: %s\n", + sprintf(wng->writer_nodes[0].task.status, "%s", writer_names[default_writer]); + PARA_INFO_LOG("using default writer: %s %p\n", + writer_names[default_writer], writers[default_writer].parse_config); + wng->writer_nodes[0].conf = writers[default_writer].parse_config(""); return wng; } diff --git a/write_common.h b/write_common.h index 0eb2ae71..d9a8b590 100644 --- a/write_common.h +++ b/write_common.h @@ -24,5 +24,5 @@ void wng_close(struct writer_node_group *g); struct writer_node_group *wng_new(unsigned num_writers); void wng_destroy(struct writer_node_group *g); void init_supported_writers(void); -int check_writer_arg(const char *arg); +void *check_writer_arg(char *wa, int *writer_num); struct writer_node_group *setup_default_wng(void);