From: Andre Date: Tue, 23 May 2006 17:27:08 +0000 (+0200) Subject: first version of the universal paraslash scheduler X-Git-Tag: v0.2.14~101^2~31 X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=commitdiff_plain;h=2290d9be0703d3f83f38c2f100b1b41ec0790bb3;ds=sidebyside first version of the universal paraslash scheduler --- diff --git a/alsa_writer.c b/alsa_writer.c index 5aa90851..3a2226cf 100644 --- a/alsa_writer.c +++ b/alsa_writer.c @@ -27,6 +27,8 @@ #include "para.h" #include "fd.h" #include "string.h" +#include "list.h" +#include "sched.h" #include "write.h" #include @@ -44,6 +46,7 @@ struct private_alsa_data { snd_pcm_t *handle; /** determined and set by alsa_open() */ size_t bytes_per_frame; +struct timeval next_chunk; }; /* @@ -60,14 +63,11 @@ static int alsa_open(struct writer_node *w) unsigned buffer_time = 0; int err; snd_pcm_info_t *info; - snd_output_t *log; snd_pcm_uframes_t period_size; - struct private_alsa_data *pad = para_malloc(sizeof(struct private_alsa_data)); + struct private_alsa_data *pad = para_calloc(sizeof(struct private_alsa_data)); w->private_data = pad; snd_pcm_info_alloca(&info); - if (snd_output_stdio_attach(&log, stderr, 0) < 0) - return -E_ALSA_LOG; err = snd_pcm_open(&pad->handle, conf.device_arg, SND_PCM_STREAM_PLAYBACK, 0); if (err < 0) @@ -132,39 +132,68 @@ static int alsa_open(struct writer_node *w) // PARA_ERROR_LOG("%s\n", "failed to set nonblock mode"); return period_size * pad->bytes_per_frame; } +static void alsa_write_pre_select(struct sched *s, struct task *t) +{ + struct writer_node *wn = t->private_data; + struct private_alsa_data *pad = wn->private_data; + struct writer_node_group *wng = wn->wng; + struct timeval diff; -/** - * push out pcm frames - * \param data pointer do data to be written - * \param nbytes number of bytes (not frames) - * - * \return Number of bytes written, -E_ALSA_WRITE on errors. - */ -static int alsa_write(char *data, size_t nbytes, struct writer_node *wn) + t->ret = 0; + if (wng->eof && *wng->loaded < pad->bytes_per_frame) + return; + t->ret = 1; + if (*wng->loaded < pad->bytes_per_frame) + return; + if (tv_diff(&s->now, &pad->next_chunk, &diff) < 0) { + if (tv_diff(&s->timeout, &diff, NULL) < 0) + s->timeout = diff; + } else { + s->timeout.tv_sec = 0; + s->timeout.tv_usec = 0; + } +} + +static void alsa_write_post_select(struct sched *s, struct task *t) { + struct writer_node *wn = t->private_data; struct private_alsa_data *pad = wn->private_data; - size_t frames = nbytes / pad->bytes_per_frame; - unsigned char *d = (unsigned char*)data; - snd_pcm_sframes_t r, result = 0; - - while (frames > 0) { - /* write interleaved frames */ - r = snd_pcm_writei(pad->handle, d, frames); - if (r < 0) - PARA_ERROR_LOG("write error: %s\n", snd_strerror(r)); - if (r == -EAGAIN || (r >= 0 && r < frames)) - snd_pcm_wait(pad->handle, 1); - else if (r == -EPIPE) + struct writer_node_group *wng = wn->wng; + size_t frames = *wng->loaded / pad->bytes_per_frame; + snd_pcm_sframes_t ret, result = 0; + unsigned char *data = (unsigned char*)wng->buf; + + t->ret = 0; + if (!frames) + return; + if (tv_diff(&s->now, &pad->next_chunk, NULL) < 0) + return; +// PARA_INFO_LOG("%zd frames\n", frames); +// while (frames > 0) { + ret = snd_pcm_writei(pad->handle, data, frames); + if (ret == -EAGAIN || (ret >= 0 && ret < frames)) { + struct timeval tv = {0, 1000 * 10}; + PARA_INFO_LOG("EAGAIN. frames: %d, ret: %lu\n", frames, ret); + tv_add(&s->now, &tv, &pad->next_chunk); +// snd_pcm_wait(pad->handle, 1); + } else if (ret == -EPIPE) { + PARA_INFO_LOG("%s", "EPIPE\n"); snd_pcm_prepare(pad->handle); - else if (r < 0) - return -E_ALSA_WRITE; - if (r > 0) { - result += r; - frames -= r; - d += r * pad->bytes_per_frame; + } else if (ret < 0) { + PARA_INFO_LOG("ALSA ERR %d\n", frames); + t->ret = -E_ALSA_WRITE; + return; } - } - return result * pad->bytes_per_frame; + if (ret >= 0) { + result += ret; + frames -= ret; + data += ret * pad->bytes_per_frame; + } +// if (ret == -EAGAIN) +// break; +// } + t->ret = result * pad->bytes_per_frame; +// PARA_INFO_LOG("ret: %d, frames: %zd\n", t->ret, frames); } static void alsa_close(struct writer_node *wn) @@ -180,7 +209,8 @@ static void alsa_close(struct writer_node *wn) void alsa_writer_init(struct writer *w) { w->open = alsa_open; - w->write = alsa_write; w->close = alsa_close; + w->pre_select = alsa_write_pre_select; + w->post_select = alsa_write_post_select; w->shutdown = NULL; /* nothing to do */ } diff --git a/audiod.c b/audiod.c index e1551617..08b29e00 100644 --- a/audiod.c +++ b/audiod.c @@ -892,6 +892,7 @@ static void check_timeouts(void) audio_formats[s->format], slot_num); if (s->fci) s->fci->error = 42; + kill_stream_writer(slot_num); } } } diff --git a/configure.ac b/configure.ac index 18d6e9cb..74f5ae86 100644 --- a/configure.ac +++ b/configure.ac @@ -77,7 +77,7 @@ server_errlist_objs="server mp3_afh afs command net string signal random_selecto server_ldflags="" write_cmdline_objs="write.cmdline" -write_errlist_objs="write write_common file_writer time fd string" +write_errlist_objs="write write_common file_writer time fd string sched stdin" write_ldflags="" write_writers="file" diff --git a/error.h b/error.h index 59c9554a..84f42798 100644 --- a/error.h +++ b/error.h @@ -33,6 +33,8 @@ enum para_subsystem { SS_ORTP_RECV, SS_AUDIOD, SS_EXEC, + SS_SCHED, + SS_STDIN, SS_SIGNAL, SS_STRING, SS_STAT, @@ -84,6 +86,17 @@ enum para_subsystem { extern const char **para_errlist[]; /** \endcond */ +#define STDIN_ERRORS \ + PARA_ERROR(STDIN_READ, "failed to read from stdin"), \ + + +#define SCHED_ERRORS \ + PARA_ERROR(PRE_EOF, "pre_select returned zero"), \ + PARA_ERROR(POST_EOF, "post_select returned zero"), \ + + + + #define NET_ERRORS \ PARA_ERROR(SEND, "send error"), \ PARA_ERROR(RECV, "receive error"), \ @@ -319,11 +332,13 @@ extern const char **para_errlist[]; PARA_ERROR(DCCP_OVERRUN, "dccp output buffer buffer overrun"), \ PARA_ERROR(DCCP_CONNECT, "dccp connect error"), \ + #define DCCP_SEND_ERRORS \ PARA_ERROR(DCCP_BIND, "dccp bind error"), \ PARA_ERROR(DCCP_LISTEN, "dccp listen error"), \ PARA_ERROR(DCCP_WRITE, "dccp write error"), \ + #define FD_ERRORS \ PARA_ERROR(F_GETFL, "failed to get fd flags"), \ PARA_ERROR(F_SETFL, "failed to set fd flags"), \ @@ -335,6 +350,8 @@ extern const char **para_errlist[]; PARA_ERROR(WRITE_SYNTAX, "para_write syntax error"), \ PARA_ERROR(WRITE_OVERRUN, "buffer overrun"), \ PARA_ERROR(PREMATURE_END, "premature end of audio file"), \ + PARA_ERROR(NO_WAV_HEADER, "wave header not found"), \ + PARA_ERROR(NO_DELAY, "no initial delay"), \ #define ALSA_WRITER_ERRORS \ @@ -361,6 +378,7 @@ extern const char **para_errlist[]; #define FILE_WRITER_ERRORS \ PARA_ERROR(FW_WRITE, "file writer write error"), \ PARA_ERROR(FW_OPEN, "file writer: can not open output file"), \ + PARA_ERROR(FW_NO_FILE, "task started without open file"), \ #define WRITE_COMMON_ERRORS \ @@ -371,6 +389,7 @@ extern const char **para_errlist[]; PARA_ERROR(AACDEC_INIT, "failed to init aac decoder"), \ PARA_ERROR(AAC_DECODE, "aac decode error"), \ + /** * the subsystem shift * @@ -445,6 +464,8 @@ extern const char **para_errlist[]; /** \cond popcorn time */ SS_ENUM(GUI); +SS_ENUM(SCHED); +SS_ENUM(STDIN); SS_ENUM(WAV); SS_ENUM(COMPRESS); SS_ENUM(TIME); diff --git a/file_writer.c b/file_writer.c index a7a765e1..d8fb3e54 100644 --- a/file_writer.c +++ b/file_writer.c @@ -19,17 +19,21 @@ /** \file file_writer.c simple output plugin for testing purposes */ #include "para.h" +#include "list.h" +#include "sched.h" #include "write.h" #include "string.h" +#include "fd.h" #include "error.h" /** data specific to the file writer */ struct private_file_writer_data { /** the file descriptor of the output file */ int fd; +int check_fd; }; -static int file_writer_open(struct writer_node *w) +static int file_writer_open(struct writer_node *wn) { struct private_file_writer_data *pfwd = para_calloc( sizeof(struct private_file_writer_data)); @@ -38,7 +42,7 @@ static int file_writer_open(struct writer_node *w) free(home); free(tmp); - w->private_data = pfwd; + wn->private_data = pfwd; pfwd->fd = open(filename, O_WRONLY | O_CREAT, S_IRUSR | S_IWUSR); free(filename); if (pfwd->fd >= 0) @@ -56,6 +60,44 @@ static int file_writer_write(char *data, size_t nbytes, struct writer_node *wn) return ret; } +static void file_writer_pre_select(struct sched *s, struct task *t) +{ + struct writer_node *wn = t->private_data; + struct private_file_writer_data *pfwd = wn->private_data; + struct writer_node_group *wng = wn->wng; + +// PARA_INFO_LOG("task %p check_fd: %d\n", t, pfwd->check_fd); + pfwd->check_fd = 0; + t->ret = -E_FW_NO_FILE; + if (pfwd->fd <= 0) + return; + t->ret = 0; + if (!*wng->loaded) + return; + t->ret = 1; + para_fd_set(pfwd->fd, &s->wfds, &s->max_fileno); + pfwd->check_fd = 1; +} + +static void file_writer_post_select(struct sched *s, struct task *t) +{ + struct writer_node *wn = t->private_data; + struct private_file_writer_data *pfwd = wn->private_data; + struct writer_node_group *wng = wn->wng; + + t->ret = 0; + if (!pfwd->check_fd) + return; + if (!*wng->loaded) + return; + if (!FD_ISSET(pfwd->fd, &s->wfds)) + return; +// PARA_INFO_LOG("writing %zd\n", *wng->loaded); + t->ret = write(pfwd->fd, wng->buf, *wng->loaded); + if (t->ret < 0) + t->ret = -E_FW_WRITE; +} + static void file_writer_close(struct writer_node *wn) { struct private_file_writer_data *pfwd = wn->private_data; @@ -68,6 +110,8 @@ void file_writer_init(struct writer *w) { w->open = file_writer_open; w->write = file_writer_write; + w->pre_select = file_writer_pre_select; + w->post_select = file_writer_post_select; w->close = file_writer_close; w->shutdown = NULL; /* nothing to do */ } diff --git a/list.h b/list.h index 0080c80c..c85f50c7 100644 --- a/list.h +++ b/list.h @@ -169,4 +169,18 @@ static inline int list_empty(const struct list_head *head) n = list_entry(pos->member.next, typeof(*pos), member); \ &pos->member != (head); \ pos = n, n = list_entry(n->member.next, typeof(*n), member)) +/** + * list_for_each_entry_safe_reverse - iterate backwards over list of given type safe against + * removal of list entry + * @pos: the type * to use as a loop counter. + * @n: another type * to use as temporary storage + * @head: the head for your list. + * @member: the name of the list_struct within the struct. + */ +#define list_for_each_entry_safe_reverse(pos, n, head, member) \ + for (pos = list_entry((head)->prev, typeof(*pos), member), \ + n = list_entry(pos->member.prev, typeof(*pos), member); \ + &pos->member != (head); \ + pos = n, n = list_entry(n->member.prev, typeof(*n), member)) + #endif /* _LIST_H */ diff --git a/sched.c b/sched.c new file mode 100644 index 00000000..fad70132 --- /dev/null +++ b/sched.c @@ -0,0 +1,120 @@ +#include +#include "para.h" +#include "ipc.h" +#include "fd.h" +#include "list.h" +#include "sched.h" +#include "string.h" +#include "error.h" + +struct list_head pre_select_list; +struct list_head post_select_list; + +static void sched_preselect(struct sched *s) +{ + struct task *t, *tmp; +again: + list_for_each_entry_safe(t, tmp, &pre_select_list, pre_select_node) { + t->pre_select(s, t); + if (t->ret > 0 || !t->error_handler) + continue; + if (t->ret < 0) { + t->error_handler(t); + goto again; + } + if (!(t->flags & PRE_EOF_IS_ERROR)) + continue; + t->ret = -E_PRE_EOF; + t->error_handler(t); + goto again; + } +} + +static void sched_post_select(struct sched *s) +{ + struct task *t, *tmp; + + list_for_each_entry_safe(t, tmp, &post_select_list, post_select_node) { + t->post_select(s, t); + if (t->ret > 0 || !t->error_handler) + continue; + if (t->ret < 0) { + t->error_handler(t); + continue; + } + if (!(t->flags & POST_EOF_IS_ERROR)) + continue; + t->ret = -E_POST_EOF; + t->error_handler(t); + } +} + +int sched(struct sched *s) +{ + + gettimeofday(&s->now, NULL); +again: + FD_ZERO(&s->rfds); + FD_ZERO(&s->wfds); + s->timeout = s->default_timeout; + s->max_fileno = -1; + sched_preselect(s); + s->select_ret = para_select(s->max_fileno + 1, &s->rfds, + &s->wfds, &s->timeout); + if (s->select_ret < 0) + return s->select_ret; + gettimeofday(&s->now, NULL); + sched_post_select(s); + if (list_empty(&pre_select_list) && list_empty(&post_select_list)) + return 0; + goto again; +} + +void *register_task(struct task *t) +{ + PARA_INFO_LOG("registering task %p\n", t); + if (t->pre_select) { + PARA_DEBUG_LOG("pre_select: %p\n", &t->pre_select); + if (t->flags & PRE_ADD_TAIL) + list_add_tail(&t->pre_select_node, &pre_select_list); + else + list_add(&t->pre_select_node, &pre_select_list); + } + if (t->post_select) { + PARA_DEBUG_LOG("post_select: %p\n", &t->pre_select); + if (t->flags & POST_ADD_TAIL) + list_add_tail(&t->post_select_node, &post_select_list); + else + list_add(&t->post_select_node, &post_select_list); + } + return t; +} + +void unregister_task(struct task *t) +{ + PARA_INFO_LOG("unregistering task %p\n", t); + if (t->pre_select) + list_del(&t->pre_select_node); + if (t->post_select) + list_del(&t->post_select_node); +}; + +void init_sched(void) +{ + INIT_LIST_HEAD(&pre_select_list); + INIT_LIST_HEAD(&post_select_list); +}; + +void sched_shutdown(void) +{ + struct task *t, *tmp; + + list_for_each_entry_safe(t, tmp, &pre_select_list, pre_select_node) + unregister_task(t); + /* remove tasks which do not have a pre_select hook */ + list_for_each_entry_safe(t, tmp, &post_select_list, post_select_node) + unregister_task(t); +}; + + +//char *get_tast_list(); diff --git a/sched.h b/sched.h new file mode 100644 index 00000000..3de5e1cc --- /dev/null +++ b/sched.h @@ -0,0 +1,31 @@ +struct sched { + struct timeval now, timeout; + int max_fileno; + fd_set rfds, wfds; + int select_ret; + struct timeval default_timeout; +}; + +struct task { + void *private_data; + unsigned flags; + int ret; + void (*pre_select)(struct sched *s, struct task *t); + void (*post_select)(struct sched *s, struct task *t); + void (*error_handler)(struct task *t); + struct list_head pre_select_node; + struct list_head post_select_node; + char status[MAXLINE]; +}; + +enum task_flags { + PRE_ADD_TAIL = 1, + POST_ADD_TAIL = 2, + PRE_EOF_IS_ERROR = 4, + POST_EOF_IS_ERROR = 8, +}; + +void *register_task(struct task *t); +void unregister_task(struct task *t); +int sched(struct sched *s); +void init_sched(void); diff --git a/stdin.c b/stdin.c new file mode 100644 index 00000000..cecdcb77 --- /dev/null +++ b/stdin.c @@ -0,0 +1,48 @@ +#include "para.h" +#include "string.h" +#include "list.h" +#include "sched.h" +#include "fd.h" +#include "error.h" +#include "stdin.h" + +void stdin_pre_select(struct sched *s, struct task *t) +{ + struct stdin_task *sit = t->private_data; + if (sit->loaded < sit->bufsize) + para_fd_set(STDIN_FILENO, &s->rfds, &s->max_fileno); + t->ret = 1; /* success */ +} + +void stdin_post_select(struct sched *s, struct task *t) +{ + struct stdin_task *sit = t->private_data; + ssize_t ret; + + t->ret = 1; + if (sit->loaded >= sit->bufsize) + return; + if (!FD_ISSET(STDIN_FILENO, &s->rfds)) + return; + ret = read(STDIN_FILENO, sit->buf + sit->loaded, sit->bufsize - sit->loaded); + if (ret < 0) + t->ret = -E_STDIN_READ; + else { + sit->loaded += ret; + t->ret = ret; + } + sprintf(t->status, + "%p stdin reader: loaded = %d, ret = %d", + sit, sit->loaded, t->ret); +} + +#if 0 +void stdin_init(struct stdin_task *sit) +{ + sit->task.private_data = sit; + sit->task.pre_select = stdin_pre_select; + sit->task.post_select = stdin_post_select; + sit->task.flags = POST_EOF_IS_ERROR; + sprintf(sit->task.status, "%p stdin reader: initialized", &sit->task); +} +#endif diff --git a/stdin.h b/stdin.h new file mode 100644 index 00000000..9213bf46 --- /dev/null +++ b/stdin.h @@ -0,0 +1,9 @@ +struct stdin_task { + char *buf; + size_t bufsize; + size_t loaded; + struct task task; +}; + +void stdin_pre_select(struct sched *s, struct task *t); +void stdin_post_select(struct sched *s, struct task *t); diff --git a/write.c b/write.c index 43c0690b..508aee79 100644 --- a/write.c +++ b/write.c @@ -19,159 +19,95 @@ #include "para.h" #include "string.h" #include "write.cmdline.h" +#include "list.h" +#include "sched.h" +#include "stdin.h" #include "write.h" #include "write_common.h" #include "fd.h" - -#include /* gettimeofday */ - #include "error.h" -#define WAV_HEADER_LEN 44 +INIT_WRITE_ERRLISTS; -static char *audiobuf; -static struct timeval *start_time; -struct gengetopt_args_info conf; +struct check_wav_task { + char *buf; + size_t *loaded; + unsigned channels; + unsigned sample_rate; + struct task task; +}; -INIT_WRITE_ERRLISTS; +struct initial_delay_task { + struct timeval start_time; + struct task task; +}; -void para_log(int ll, const char* fmt,...) -{ - va_list argp; +struct gengetopt_args_info conf; +struct stdin_task sit; +struct check_wav_task cwt; +struct initial_delay_task idt; +static struct writer_node_group *wng; - if (ll < conf.loglevel_arg) - return; - va_start(argp, fmt); - vfprintf(stderr, fmt, argp); - va_end(argp); -} +#define WAV_HEADER_LEN 44 /** - * read WAV_HEADER_LEN bytes from stdin to audio buffer + * test if audio buffer contains a valid wave header * - * \return -E_READ_HDR on errors and on eof before WAV_HEADER_LEN could be - * read. A positive return value indicates success. + * \return If not, return 0, otherwise, store number of channels and sample rate + * in struct conf and return WAV_HEADER_LEN. */ -static int read_wav_header(void) +static void check_wav_pre_select(struct sched *s, struct task *t) { - ssize_t ret, count = 0; + struct check_wav_task *cwt = t->private_data; + unsigned char *a; - while (count < WAV_HEADER_LEN) { - ret = read(STDIN_FILENO, audiobuf + count, WAV_HEADER_LEN - count); - if (ret <= 0) - return -E_READ_HDR; - count += ret; + if (*cwt->loaded < WAV_HEADER_LEN) { + t->ret = 1; + return; } - return 1; + a = (unsigned char*)cwt->buf; + t->ret = -E_NO_WAV_HEADER; + if (a[0] != 'R' || a[1] != 'I' || a[2] != 'F' || a[3] != 'F') + return; + cwt->channels = (unsigned) a[22]; + cwt->sample_rate = a[24] + (a[25] << 8) + (a[26] << 16) + (a[27] << 24); + *cwt->loaded -= WAV_HEADER_LEN; + memmove(cwt->buf, cwt->buf + WAV_HEADER_LEN, *cwt->loaded); + t->ret = 0; + PARA_INFO_LOG("channels: %d, sample_rate: %d\n", cwt->channels, cwt->sample_rate); } -/** - * check if current time is later than start_time - * \param diff pointer to write remaining time to - * - * If start_time was not given, or current time is later than given - * start_time, return 0. Otherwise, return 1 and write the time - * difference between current time and start_time to diff. diff may be - * NULL. - * - */ -static int start_time_in_future(struct timeval *diff) +static void initial_delay_pre_select(struct sched *s, struct task *t) { - struct timeval now; - - if (!conf.start_time_given) - return 0; - gettimeofday(&now, NULL); - return tv_diff(start_time, &now, diff) > 0? 1 : 0; -} + struct initial_delay_task *idt = t->private_data; + struct timeval diff; -/** - * sleep until time given at command line - * - * This is called if the initial buffer is filled. It returns - * immediately if no start_time was given at the command line - * or if the given start time is in the past. - * - */ -static void do_initial_delay(struct timeval *delay) -{ - do - para_select(1, NULL, NULL, delay); - while (start_time_in_future(delay)); + PARA_ERROR_LOG("task %p, ret: %d\n", t, t->ret); + t->ret = -E_NO_DELAY; + if (!idt->start_time.tv_sec && !idt->start_time.tv_usec) + return; + t->ret = 0; /* timeout */ + if (tv_diff(&s->now, &idt->start_time, &diff) > 0) + return; + t->ret = 1; + if (tv_diff(&s->timeout , &diff, NULL) > 0) + s->timeout = diff; } -static int read_stdin(char *buf, size_t bytes_to_load, size_t *loaded) -{ - ssize_t ret; - - while (*loaded < bytes_to_load) { - ret = read(STDIN_FILENO, buf + *loaded, bytes_to_load - *loaded); - if (ret <= 0) { - if (ret < 0) - ret = -E_READ_STDIN; - return ret; - } - *loaded += ret; - } - return 1; -} -/** - * play raw pcm data - * \param loaded number of bytes already loaded - * - * If start_time was given, prebuffer data until buffer is full or - * start_time is reached. In any case, do not start playing before - * start_time. - * - * \return positive on success, negative on errors. - */ -static int pcm_write(struct writer_node_group *wng, size_t loaded) +void para_log(int ll, const char* fmt,...) { - size_t bufsize, prebuf_size, bytes_to_load; - struct timeval delay; - int ret, not_yet_started = 1; + va_list argp; - ret = wng_open(wng); - if (ret < 0) - goto out; - PARA_INFO_LOG("max chunk_bytes: %zd\n", wng->max_chunk_bytes); - bufsize = (conf.bufsize_arg * 1024 / wng->max_chunk_bytes) - * wng->max_chunk_bytes; - audiobuf = para_realloc(audiobuf, bufsize); - prebuf_size = conf.prebuffer_arg * bufsize / 100; - bytes_to_load = PARA_MIN(prebuf_size, wng->max_chunk_bytes); - ret = read_stdin(audiobuf, bytes_to_load, &loaded); - if (ret <= 0 || loaded < bytes_to_load) { - if (ret >= 0) - ret = -E_PREMATURE_END; - goto out; - } - if (not_yet_started && start_time && start_time_in_future(&delay)) - do_initial_delay(&delay); - not_yet_started = 0; -again: - ret = wng_write(wng, audiobuf, &loaded); - if (ret <= 0) - goto out; - ret = -E_WRITE_OVERRUN; - if (loaded >= bufsize) - goto out; - bytes_to_load = PARA_MIN(wng->max_chunk_bytes, bufsize); - ret = read_stdin(audiobuf, bytes_to_load, &loaded); - if (ret < 0) - goto out; - if (!ret) - wng->eof = 1; - goto again; -out: - wng_close(wng); - return ret; + if (ll < conf.loglevel_arg) + return; + va_start(argp, fmt); + vfprintf(stderr, fmt, argp); + va_end(argp); } static struct writer_node_group *check_args(void) { int i, ret = -E_WRITE_SYNTAX; - static struct timeval tv; struct writer_node_group *wng = NULL; if (conf.list_writers_given) { @@ -195,9 +131,8 @@ static struct writer_node_group *check_args(void) if (sscanf(conf.start_time_arg, "%lu:%lu", &sec, &usec) != 2) goto out; - tv.tv_sec = sec; - tv.tv_usec = usec; - start_time = &tv; + idt.start_time.tv_sec = sec; + idt.start_time.tv_usec = usec; } if (!conf.writer_given) { wng = setup_default_wng(); @@ -219,41 +154,88 @@ out: return NULL; } -/** - * test if audio buffer contains a valid wave header - * - * \return If not, return 0, otherwise, store number of channels and sample rate - * in struct conf and return WAV_HEADER_LEN. - */ -static size_t check_wave(void) +static void idt_error_handler(struct task *t) { - unsigned char *a = (unsigned char*)audiobuf; - if (a[0] != 'R' || a[1] != 'I' || a[2] != 'F' || a[3] != 'F') - return WAV_HEADER_LEN; - conf.channels_arg = (unsigned) a[22]; - conf.sample_rate_arg = a[24] + (a[25] << 8) + (a[26] << 16) + (a[27] << 24); - return 0; + PARA_ERROR_LOG("task %p, ret: %d\n", t, t->ret); + int ret; + unregister_task(t); + wng->buf = sit.buf; + wng->loaded = &sit.loaded; + ret = wng_open(wng); + if (ret < 0) { + PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret)); + exit(EXIT_FAILURE); + } +} + +static void cwt_error_handler(struct task *t) +{ + PARA_ERROR_LOG("task %p, ret: %d\n", t, t->ret); + if (t->ret < 0) { + PARA_ERROR_LOG("%s\n", PARA_STRERROR(-t->ret)); + if (t->ret != -E_NO_WAV_HEADER && t->ret != -E_PRE_EOF) + exit(EXIT_FAILURE); + if (t->ret == -E_PRE_EOF) { + conf.channels_arg = cwt.channels; + conf.sample_rate_arg = cwt.sample_rate; + } + } + unregister_task(t); + idt.task.pre_select = initial_delay_pre_select; + idt.task.private_data = &idt; + idt.task.error_handler = idt_error_handler; + idt.task.flags = PRE_EOF_IS_ERROR; + register_task(&idt.task); +} + +static void stdin_error_handler(struct task *t) +{ + unregister_task(t); + PARA_INFO_LOG("task %p, ret: %d\n", t, t->ret); + if (t->ret < 0) + PARA_ERROR_LOG("%s\n", PARA_STRERROR(-t->ret)); + wng->eof = 1; } int main(int argc, char *argv[]) { int ret = -E_WRITE_SYNTAX; - struct writer_node_group *wng = NULL; + struct sched s; cmdline_parser(argc, argv, &conf); wng = check_args(); if (!wng) goto out; init_supported_writers(); - audiobuf = para_malloc(WAV_HEADER_LEN); - ret = read_wav_header(); - if (ret < 0) - goto out; - ret = pcm_write(wng, check_wave()); + init_sched(); + + sit.bufsize = 16 * 1024, + sit.buf = para_malloc(16 * 1024), + sit.loaded = 0, + sit.task.pre_select = stdin_pre_select; + sit.task.post_select = stdin_post_select; + sit.task.error_handler = stdin_error_handler; + sit.task.flags = POST_EOF_IS_ERROR; + sit.task.private_data = &sit; + register_task(&sit.task); + + cwt.task.pre_select = check_wav_pre_select; + cwt.task.private_data = &cwt; + cwt.task.error_handler = cwt_error_handler; + cwt.buf = sit.buf; + cwt.loaded = &sit.loaded; + cwt.task.flags = PRE_EOF_IS_ERROR; + register_task(&cwt.task); + + s.default_timeout.tv_sec = 1; + s.default_timeout.tv_usec = 0; + ret = sched(&s); + out: - wng_destroy(wng); - free(audiobuf); - if (ret < 0) + if (ret < 0) { PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret)); + ret = EXIT_FAILURE; + } else + ret = EXIT_SUCCESS; return ret; } diff --git a/write.h b/write.h index 5f8ab3ab..aece2313 100644 --- a/write.h +++ b/write.h @@ -31,6 +31,8 @@ struct writer_node { void *private_data; /** send that many bytes in one go */ int chunk_bytes; + struct task task; + struct writer_node_group *wng; }; /** describes one supported writer */ @@ -62,6 +64,8 @@ int (*open)(struct writer_node *); * */ int (*write)(char *data, size_t nbytes, struct writer_node *); +void (*pre_select)(struct sched *s, struct task *t); +void (*post_select)(struct sched *s, struct task *t); /** * close one instance of the writer * @@ -91,6 +95,9 @@ int *written; size_t max_chunk_bytes; /** non-zero if end of file was encountered */ int eof; +char *buf; +size_t *loaded; +struct task task; }; /** loop over each writer node in a writer group */ diff --git a/write_common.c b/write_common.c index e76d7f46..6ff778a9 100644 --- a/write_common.c +++ b/write_common.c @@ -20,6 +20,8 @@ #include "para.h" #include "string.h" +#include "list.h" +#include "sched.h" #include "write.h" #include "error.h" @@ -70,6 +72,31 @@ out: return ret; } +static void wng_post_select(struct sched *s, struct task *t) +{ + struct writer_node_group *g = t->private_data; + int i; + size_t min_written = 0; + + FOR_EACH_WRITER_NODE(i, g) { + struct writer_node *wn = &g->writer_nodes[i]; + t->ret = wn->task.ret; + if (t->ret < 0) + return; + if (!i) + min_written = t->ret; + else + min_written = PARA_MIN(min_written, t->ret); + } + *g->loaded -= min_written; + if (!*g->loaded && g->eof) + t->ret = 0; + else + t->ret = 1; + if (*g->loaded && min_written) + memmove(g->buf, g->buf + min_written, *g->loaded); +} + int wng_open(struct writer_node_group *g) { int i, ret = 1; @@ -81,21 +108,49 @@ int wng_open(struct writer_node_group *g) goto out; wn->chunk_bytes = ret; g->max_chunk_bytes = PARA_MAX(g->max_chunk_bytes, ret); + wn->wng = g; + PARA_DEBUG_LOG("pre_select: %p\n", &wn->writer->pre_select); + PARA_DEBUG_LOG("post_select: %p\n", &wn->writer->post_select); + wn->task.pre_select = wn->writer->pre_select; + wn->task.post_select = wn->writer->post_select; + wn->task.private_data = wn; + register_task(&wn->task); } + register_task(&g->task); out: return ret; } +void wng_destroy(struct writer_node_group *g) +{ + if (!g) + return; + free(g->written); + free(g->writer_nodes); + free(g); +} + void wng_close(struct writer_node_group *g) { int i; FOR_EACH_WRITER_NODE(i, g) { struct writer_node *wn = &g->writer_nodes[i]; + unregister_task(&wn->task); wn->writer->close(wn); } } +static void wng_error_handler(struct task *t) +{ + struct writer_node_group *g = t->private_data; + + PARA_INFO_LOG("%p: ret = %d\n", t, t->ret); + unregister_task(t); + wng_close(g); + wng_destroy(g); +} + struct writer_node_group *wng_new(unsigned num_writers) { struct writer_node_group *g = para_calloc(sizeof(struct writer_node_group)); @@ -103,18 +158,13 @@ struct writer_node_group *wng_new(unsigned num_writers) g->writer_nodes = para_calloc(num_writers * sizeof(struct writer_node)); g->written = para_calloc(num_writers * sizeof(size_t)); + g->task.private_data = g; + g->task.post_select = wng_post_select; + g->task.error_handler = wng_error_handler; + g->task.flags = POST_ADD_TAIL | POST_EOF_IS_ERROR; return g; } -void wng_destroy(struct writer_node_group *g) -{ - if (!g) - return; - free(g->written); - free(g->writer_nodes); - free(g); -} - void init_supported_writers(void) { int i;