first version of the universal paraslash scheduler
authorAndre <maan@p133.(none)>
Tue, 23 May 2006 17:27:08 +0000 (19:27 +0200)
committerAndre <maan@p133.(none)>
Tue, 23 May 2006 17:27:08 +0000 (19:27 +0200)
13 files changed:
alsa_writer.c
audiod.c
configure.ac
error.h
file_writer.c
list.h
sched.c [new file with mode: 0644]
sched.h [new file with mode: 0644]
stdin.c [new file with mode: 0644]
stdin.h [new file with mode: 0644]
write.c
write.h
write_common.c

index 5aa9085..3a2226c 100644 (file)
@@ -27,6 +27,8 @@
 #include "para.h"
 #include "fd.h"
 #include "string.h"
+#include "list.h"
+#include "sched.h"
 #include "write.h"
 
 #include <alsa/asoundlib.h>
@@ -44,6 +46,7 @@ struct private_alsa_data {
 snd_pcm_t *handle;
 /** determined and set by alsa_open() */
 size_t bytes_per_frame;
+struct timeval next_chunk;
 };
 
 /*
@@ -60,14 +63,11 @@ static int alsa_open(struct writer_node *w)
        unsigned buffer_time = 0;
        int err;
        snd_pcm_info_t *info;
-       snd_output_t *log;
        snd_pcm_uframes_t period_size;
-       struct private_alsa_data *pad = para_malloc(sizeof(struct private_alsa_data));
+       struct private_alsa_data *pad = para_calloc(sizeof(struct private_alsa_data));
        w->private_data = pad;
 
        snd_pcm_info_alloca(&info);
-       if (snd_output_stdio_attach(&log, stderr, 0) < 0)
-               return -E_ALSA_LOG;
        err = snd_pcm_open(&pad->handle, conf.device_arg,
                SND_PCM_STREAM_PLAYBACK, 0);
        if (err < 0)
@@ -132,39 +132,68 @@ static int alsa_open(struct writer_node *w)
 //             PARA_ERROR_LOG("%s\n", "failed to set nonblock mode");
        return period_size * pad->bytes_per_frame;
 }
+static void alsa_write_pre_select(struct sched *s, struct task *t)
+{
+       struct writer_node *wn = t->private_data;
+       struct private_alsa_data *pad = wn->private_data;
+       struct writer_node_group *wng = wn->wng;
+       struct timeval diff;
 
-/**
- * push out pcm frames
- * \param data pointer do data to be written
- * \param nbytes number of bytes (not frames)
- *
- * \return Number of bytes written, -E_ALSA_WRITE on errors.
- */
-static int alsa_write(char *data, size_t nbytes, struct writer_node *wn)
+       t->ret = 0;
+       if (wng->eof && *wng->loaded < pad->bytes_per_frame)
+               return;
+       t->ret = 1;
+       if (*wng->loaded < pad->bytes_per_frame)
+               return;
+       if (tv_diff(&s->now, &pad->next_chunk, &diff) < 0) {
+               if (tv_diff(&s->timeout, &diff, NULL) < 0)
+                       s->timeout = diff;
+       } else {
+               s->timeout.tv_sec = 0;
+               s->timeout.tv_usec = 0;
+       }
+}
+
+static void alsa_write_post_select(struct sched *s, struct task *t)
 {
+       struct writer_node *wn = t->private_data;
        struct private_alsa_data *pad = wn->private_data;
-       size_t frames = nbytes / pad->bytes_per_frame;
-       unsigned char *d = (unsigned char*)data;
-       snd_pcm_sframes_t r, result = 0;
-
-       while (frames > 0) {
-               /* write interleaved frames */
-               r = snd_pcm_writei(pad->handle, d, frames);
-               if (r < 0)
-                       PARA_ERROR_LOG("write error: %s\n", snd_strerror(r));
-               if (r == -EAGAIN || (r >= 0 && r < frames))
-                       snd_pcm_wait(pad->handle, 1);
-               else if (r == -EPIPE)
+       struct writer_node_group *wng = wn->wng;
+       size_t frames = *wng->loaded / pad->bytes_per_frame;
+       snd_pcm_sframes_t ret, result = 0;
+       unsigned char *data = (unsigned char*)wng->buf;
+
+       t->ret = 0;
+       if (!frames)
+               return;
+       if (tv_diff(&s->now, &pad->next_chunk, NULL) < 0)
+               return;
+//     PARA_INFO_LOG("%zd frames\n", frames);
+//     while (frames > 0) {
+               ret = snd_pcm_writei(pad->handle, data, frames);
+               if (ret == -EAGAIN || (ret >= 0 && ret < frames)) {
+                       struct timeval tv = {0, 1000 * 10};
+                       PARA_INFO_LOG("EAGAIN. frames: %d, ret: %lu\n", frames, ret);
+                       tv_add(&s->now, &tv, &pad->next_chunk);
+//                     snd_pcm_wait(pad->handle, 1);
+               } else if (ret == -EPIPE) {
+                       PARA_INFO_LOG("%s", "EPIPE\n");
                        snd_pcm_prepare(pad->handle);
-               else if (r < 0)
-                       return -E_ALSA_WRITE;
-               if (r > 0) {
-                       result += r;
-                       frames -= r;
-                       d += r * pad->bytes_per_frame;
+               } else if (ret < 0) {
+                       PARA_INFO_LOG("ALSA ERR %d\n", frames);
+                       t->ret = -E_ALSA_WRITE;
+                       return;
                }
-       }
-       return result * pad->bytes_per_frame;
+               if (ret >= 0) {
+                       result += ret;
+                       frames -= ret;
+                       data += ret * pad->bytes_per_frame;
+               }
+//             if (ret == -EAGAIN)
+//                     break;
+//     }
+       t->ret = result * pad->bytes_per_frame;
+//     PARA_INFO_LOG("ret: %d, frames: %zd\n", t->ret, frames);
 }
 
 static void alsa_close(struct writer_node *wn)
@@ -180,7 +209,8 @@ static void alsa_close(struct writer_node *wn)
 void alsa_writer_init(struct writer *w)
 {
        w->open = alsa_open;
-       w->write = alsa_write;
        w->close = alsa_close;
+       w->pre_select = alsa_write_pre_select;
+       w->post_select = alsa_write_post_select;
        w->shutdown = NULL; /* nothing to do */
 }
index e155161..08b29e0 100644 (file)
--- a/audiod.c
+++ b/audiod.c
@@ -892,6 +892,7 @@ static void check_timeouts(void)
                                audio_formats[s->format], slot_num);
                        if (s->fci)
                                s->fci->error = 42;
+                       kill_stream_writer(slot_num);
                }
        }
 }
index 18d6e9c..74f5ae8 100644 (file)
@@ -77,7 +77,7 @@ server_errlist_objs="server mp3_afh afs command net string signal random_selecto
 server_ldflags=""
 
 write_cmdline_objs="write.cmdline"
-write_errlist_objs="write write_common file_writer time fd string"
+write_errlist_objs="write write_common file_writer time fd string sched stdin"
 write_ldflags=""
 write_writers="file"
 
diff --git a/error.h b/error.h
index 59c9554..84f4279 100644 (file)
--- a/error.h
+++ b/error.h
@@ -33,6 +33,8 @@ enum para_subsystem {
        SS_ORTP_RECV,
        SS_AUDIOD,
        SS_EXEC,
+       SS_SCHED,
+       SS_STDIN,
        SS_SIGNAL,
        SS_STRING,
        SS_STAT,
@@ -84,6 +86,17 @@ enum para_subsystem {
 extern const char **para_errlist[];
 /** \endcond */
 
+#define STDIN_ERRORS \
+       PARA_ERROR(STDIN_READ, "failed to read from stdin"), \
+
+
+#define SCHED_ERRORS \
+       PARA_ERROR(PRE_EOF, "pre_select returned zero"), \
+       PARA_ERROR(POST_EOF, "post_select returned zero"), \
+
+
+
+
 #define NET_ERRORS \
        PARA_ERROR(SEND, "send error"), \
        PARA_ERROR(RECV, "receive error"), \
@@ -319,11 +332,13 @@ extern const char **para_errlist[];
        PARA_ERROR(DCCP_OVERRUN, "dccp output buffer buffer overrun"), \
        PARA_ERROR(DCCP_CONNECT, "dccp connect error"), \
 
+
 #define DCCP_SEND_ERRORS \
        PARA_ERROR(DCCP_BIND, "dccp bind error"), \
        PARA_ERROR(DCCP_LISTEN, "dccp listen error"), \
        PARA_ERROR(DCCP_WRITE, "dccp write error"), \
 
+
 #define FD_ERRORS \
        PARA_ERROR(F_GETFL, "failed to get fd flags"), \
        PARA_ERROR(F_SETFL, "failed to set fd flags"), \
@@ -335,6 +350,8 @@ extern const char **para_errlist[];
        PARA_ERROR(WRITE_SYNTAX, "para_write syntax error"), \
        PARA_ERROR(WRITE_OVERRUN, "buffer overrun"), \
        PARA_ERROR(PREMATURE_END, "premature end of audio file"), \
+       PARA_ERROR(NO_WAV_HEADER, "wave header not found"), \
+       PARA_ERROR(NO_DELAY, "no initial delay"), \
 
 
 #define ALSA_WRITER_ERRORS \
@@ -361,6 +378,7 @@ extern const char **para_errlist[];
 #define FILE_WRITER_ERRORS \
        PARA_ERROR(FW_WRITE, "file writer write error"), \
        PARA_ERROR(FW_OPEN, "file writer: can not open output file"), \
+       PARA_ERROR(FW_NO_FILE, "task started without open file"), \
 
 
 #define WRITE_COMMON_ERRORS \
@@ -371,6 +389,7 @@ extern const char **para_errlist[];
        PARA_ERROR(AACDEC_INIT, "failed to init aac decoder"), \
        PARA_ERROR(AAC_DECODE, "aac decode error"), \
 
+
 /**
  * the subsystem shift
  *
@@ -445,6 +464,8 @@ extern const char **para_errlist[];
 
 /** \cond popcorn time */
 SS_ENUM(GUI);
+SS_ENUM(SCHED);
+SS_ENUM(STDIN);
 SS_ENUM(WAV);
 SS_ENUM(COMPRESS);
 SS_ENUM(TIME);
index a7a765e..d8fb3e5 100644 (file)
 /** \file file_writer.c simple output plugin for testing purposes */
 
 #include "para.h"
+#include "list.h"
+#include "sched.h"
 #include "write.h"
 #include "string.h"
+#include "fd.h"
 #include "error.h"
 
 /** data specific to the file writer */
 struct private_file_writer_data {
 /** the file descriptor of the output file */
 int fd;
+int check_fd;
 };
 
-static int file_writer_open(struct writer_node *w)
+static int file_writer_open(struct writer_node *wn)
 {
        struct private_file_writer_data *pfwd = para_calloc(
                sizeof(struct private_file_writer_data));
@@ -38,7 +42,7 @@ static int file_writer_open(struct writer_node *w)
 
        free(home);
        free(tmp);
-       w->private_data = pfwd;
+       wn->private_data = pfwd;
        pfwd->fd = open(filename, O_WRONLY | O_CREAT, S_IRUSR | S_IWUSR);
        free(filename);
        if (pfwd->fd >= 0)
@@ -56,6 +60,44 @@ static int file_writer_write(char *data, size_t nbytes, struct writer_node *wn)
        return ret;
 }
 
+static void file_writer_pre_select(struct sched *s, struct task *t)
+{
+       struct writer_node *wn = t->private_data;
+       struct private_file_writer_data *pfwd = wn->private_data;
+       struct writer_node_group *wng = wn->wng;
+
+//     PARA_INFO_LOG("task %p check_fd: %d\n", t, pfwd->check_fd);
+       pfwd->check_fd = 0;
+       t->ret = -E_FW_NO_FILE;
+       if (pfwd->fd <= 0)
+               return;
+       t->ret = 0;
+       if (!*wng->loaded)
+               return;
+       t->ret = 1;
+       para_fd_set(pfwd->fd, &s->wfds, &s->max_fileno);
+       pfwd->check_fd = 1;
+}
+
+static void file_writer_post_select(struct sched *s, struct task *t)
+{
+       struct writer_node *wn = t->private_data;
+       struct private_file_writer_data *pfwd = wn->private_data;
+       struct writer_node_group *wng = wn->wng;
+
+       t->ret = 0;
+       if (!pfwd->check_fd)
+               return;
+       if (!*wng->loaded)
+               return;
+       if (!FD_ISSET(pfwd->fd, &s->wfds))
+               return;
+//     PARA_INFO_LOG("writing %zd\n", *wng->loaded);
+       t->ret = write(pfwd->fd, wng->buf, *wng->loaded);
+       if (t->ret < 0)
+               t->ret = -E_FW_WRITE;
+}
+
 static void file_writer_close(struct writer_node *wn)
 {
        struct private_file_writer_data *pfwd = wn->private_data;
@@ -68,6 +110,8 @@ void file_writer_init(struct writer *w)
 {
        w->open = file_writer_open;
        w->write = file_writer_write;
+       w->pre_select = file_writer_pre_select;
+       w->post_select = file_writer_post_select;
        w->close = file_writer_close;
        w->shutdown = NULL; /* nothing to do */
 }
diff --git a/list.h b/list.h
index 0080c80..c85f50c 100644 (file)
--- a/list.h
+++ b/list.h
@@ -169,4 +169,18 @@ static inline int list_empty(const struct list_head *head)
                n = list_entry(pos->member.next, typeof(*pos), member); \
             &pos->member != (head);                                    \
             pos = n, n = list_entry(n->member.next, typeof(*n), member))
+/**
+ * list_for_each_entry_safe_reverse - iterate backwards over list of given type safe against
+ *                                   removal of list entry
+ * @pos:       the type * to use as a loop counter.
+ * @n:         another type * to use as temporary storage
+ * @head:      the head for your list.
+ * @member:    the name of the list_struct within the struct.
+ */
+#define list_for_each_entry_safe_reverse(pos, n, head, member)         \
+       for (pos = list_entry((head)->prev, typeof(*pos), member),      \
+               n = list_entry(pos->member.prev, typeof(*pos), member); \
+            &pos->member != (head);                                    \
+            pos = n, n = list_entry(n->member.prev, typeof(*n), member))
+
 #endif /* _LIST_H */
diff --git a/sched.c b/sched.c
new file mode 100644 (file)
index 0000000..fad7013
--- /dev/null
+++ b/sched.c
@@ -0,0 +1,120 @@
+#include <sys/time.h>
+#include "para.h"
+#include "ipc.h"
+#include "fd.h"
+#include "list.h"
+#include "sched.h"
+#include "string.h"
+#include "error.h"
+
+struct list_head pre_select_list;
+struct list_head post_select_list;
+
+static void sched_preselect(struct sched *s)
+{
+       struct task *t, *tmp;
+again:
+       list_for_each_entry_safe(t, tmp, &pre_select_list, pre_select_node) {
+               t->pre_select(s, t);
+               if (t->ret > 0 || !t->error_handler)
+                       continue;
+               if (t->ret < 0) {
+                       t->error_handler(t);
+                       goto again;
+               }
+               if (!(t->flags & PRE_EOF_IS_ERROR))
+                       continue;
+               t->ret = -E_PRE_EOF;
+               t->error_handler(t);
+               goto again;
+       }
+}
+
+static void sched_post_select(struct sched *s)
+{
+       struct task *t, *tmp;
+
+       list_for_each_entry_safe(t, tmp, &post_select_list, post_select_node) {
+               t->post_select(s, t);
+               if (t->ret > 0 || !t->error_handler)
+                       continue;
+               if (t->ret < 0) {
+                       t->error_handler(t);
+                       continue;
+               }
+               if (!(t->flags & POST_EOF_IS_ERROR))
+                       continue;
+               t->ret = -E_POST_EOF;
+               t->error_handler(t);
+       }
+}
+
+int sched(struct sched *s)
+{
+
+       gettimeofday(&s->now, NULL);
+again:
+       FD_ZERO(&s->rfds);
+       FD_ZERO(&s->wfds);
+       s->timeout = s->default_timeout;
+       s->max_fileno = -1;
+       sched_preselect(s);
+       s->select_ret = para_select(s->max_fileno + 1, &s->rfds,
+               &s->wfds, &s->timeout);
+       if (s->select_ret < 0)
+               return s->select_ret;
+       gettimeofday(&s->now, NULL);
+       sched_post_select(s);
+       if (list_empty(&pre_select_list) && list_empty(&post_select_list))
+               return 0;
+       goto again;
+}
+
+void *register_task(struct task *t)
+{
+       PARA_INFO_LOG("registering task %p\n", t);
+       if (t->pre_select) {
+               PARA_DEBUG_LOG("pre_select: %p\n", &t->pre_select);
+               if (t->flags & PRE_ADD_TAIL)
+                       list_add_tail(&t->pre_select_node, &pre_select_list);
+               else
+                       list_add(&t->pre_select_node, &pre_select_list);
+       }
+       if (t->post_select) {
+               PARA_DEBUG_LOG("post_select: %p\n", &t->pre_select);
+               if (t->flags & POST_ADD_TAIL)
+                       list_add_tail(&t->post_select_node, &post_select_list);
+               else
+                       list_add(&t->post_select_node, &post_select_list);
+       }
+       return t;
+}
+
+void unregister_task(struct task *t)
+{
+       PARA_INFO_LOG("unregistering task %p\n", t);
+       if (t->pre_select)
+               list_del(&t->pre_select_node);
+       if (t->post_select)
+               list_del(&t->post_select_node);
+};
+
+void init_sched(void)
+{
+       INIT_LIST_HEAD(&pre_select_list);
+       INIT_LIST_HEAD(&post_select_list);
+};
+
+void sched_shutdown(void)
+{
+       struct task *t, *tmp;
+
+       list_for_each_entry_safe(t, tmp, &pre_select_list, pre_select_node)
+               unregister_task(t);
+       /* remove tasks which do not have a pre_select hook */
+       list_for_each_entry_safe(t, tmp, &post_select_list, post_select_node)
+               unregister_task(t);
+};
+
+
+//char *get_tast_list();
diff --git a/sched.h b/sched.h
new file mode 100644 (file)
index 0000000..3de5e1c
--- /dev/null
+++ b/sched.h
@@ -0,0 +1,31 @@
+struct sched {
+       struct timeval now, timeout;
+       int max_fileno;
+       fd_set rfds, wfds;
+       int select_ret;
+       struct timeval default_timeout;
+};
+
+struct task {
+       void *private_data;
+       unsigned flags;
+       int ret;
+       void (*pre_select)(struct sched *s, struct task *t);
+       void (*post_select)(struct sched *s, struct task *t);
+       void (*error_handler)(struct task *t);
+       struct list_head pre_select_node;
+       struct list_head post_select_node;
+       char status[MAXLINE];
+};
+
+enum task_flags {
+       PRE_ADD_TAIL = 1,
+       POST_ADD_TAIL = 2,
+       PRE_EOF_IS_ERROR = 4,
+       POST_EOF_IS_ERROR = 8,
+};
+
+void *register_task(struct task *t);
+void unregister_task(struct task *t);
+int sched(struct sched *s);
+void init_sched(void);
diff --git a/stdin.c b/stdin.c
new file mode 100644 (file)
index 0000000..cecdcb7
--- /dev/null
+++ b/stdin.c
@@ -0,0 +1,48 @@
+#include "para.h"
+#include "string.h"
+#include "list.h"
+#include "sched.h"
+#include "fd.h"
+#include "error.h"
+#include "stdin.h"
+
+void stdin_pre_select(struct sched *s, struct task *t)
+{
+       struct stdin_task *sit = t->private_data;
+       if (sit->loaded < sit->bufsize)
+               para_fd_set(STDIN_FILENO, &s->rfds, &s->max_fileno);
+       t->ret = 1; /* success */
+}
+
+void stdin_post_select(struct sched *s, struct task *t)
+{
+       struct stdin_task *sit = t->private_data;
+       ssize_t ret;
+
+       t->ret = 1;
+       if (sit->loaded >= sit->bufsize)
+               return;
+       if (!FD_ISSET(STDIN_FILENO, &s->rfds))
+               return;
+       ret = read(STDIN_FILENO, sit->buf + sit->loaded, sit->bufsize - sit->loaded);
+       if (ret < 0)
+               t->ret = -E_STDIN_READ;
+       else {
+               sit->loaded += ret;
+               t->ret = ret;
+       }
+       sprintf(t->status,
+               "%p stdin reader: loaded = %d, ret = %d",
+               sit, sit->loaded, t->ret);
+}
+
+#if 0
+void stdin_init(struct stdin_task *sit)
+{
+       sit->task.private_data = sit;
+       sit->task.pre_select = stdin_pre_select;
+       sit->task.post_select = stdin_post_select;
+       sit->task.flags = POST_EOF_IS_ERROR;
+       sprintf(sit->task.status, "%p stdin reader: initialized", &sit->task);
+}
+#endif
diff --git a/stdin.h b/stdin.h
new file mode 100644 (file)
index 0000000..9213bf4
--- /dev/null
+++ b/stdin.h
@@ -0,0 +1,9 @@
+struct stdin_task {
+       char *buf;
+       size_t bufsize;
+       size_t loaded;
+       struct task task;
+};
+
+void stdin_pre_select(struct sched *s, struct task *t);
+void stdin_post_select(struct sched *s, struct task *t);
diff --git a/write.c b/write.c
index 43c0690..508aee7 100644 (file)
--- a/write.c
+++ b/write.c
 #include "para.h"
 #include "string.h"
 #include "write.cmdline.h"
+#include "list.h"
+#include "sched.h"
+#include "stdin.h"
 #include "write.h"
 #include "write_common.h"
 #include "fd.h"
-
-#include <sys/time.h> /* gettimeofday */
-
 #include "error.h"
 
-#define WAV_HEADER_LEN 44
+INIT_WRITE_ERRLISTS;
 
-static char *audiobuf;
-static struct timeval *start_time;
-struct gengetopt_args_info conf;
+struct check_wav_task {
+       char *buf;
+       size_t *loaded;
+       unsigned channels;
+       unsigned sample_rate;
+       struct task task;
+};
 
-INIT_WRITE_ERRLISTS;
+struct initial_delay_task {
+       struct timeval start_time;
+       struct task task;
+};
 
-void para_log(int ll, const char* fmt,...)
-{
-       va_list argp;
+struct gengetopt_args_info conf;
+struct stdin_task sit;
+struct check_wav_task cwt;
+struct initial_delay_task idt;
+static struct writer_node_group *wng;
 
-       if (ll < conf.loglevel_arg)
-               return;
-       va_start(argp, fmt);
-       vfprintf(stderr, fmt, argp);
-       va_end(argp);
-}
+#define WAV_HEADER_LEN 44
 
 /**
- * read WAV_HEADER_LEN bytes from stdin to audio buffer
+ * test if audio buffer contains a valid wave header
  *
- * \return -E_READ_HDR on errors and on eof before WAV_HEADER_LEN could be
- * read. A positive return value indicates success.
+ * \return If not, return 0, otherwise, store number of channels and sample rate
+ * in struct conf and return WAV_HEADER_LEN.
  */
-static int read_wav_header(void)
+static void check_wav_pre_select(struct sched *s, struct task *t)
 {
-       ssize_t ret, count = 0;
+       struct check_wav_task *cwt = t->private_data;
+       unsigned char *a;
 
-       while (count < WAV_HEADER_LEN) {
-               ret = read(STDIN_FILENO, audiobuf + count, WAV_HEADER_LEN - count);
-               if (ret <= 0)
-                       return -E_READ_HDR;
-               count += ret;
+       if (*cwt->loaded < WAV_HEADER_LEN) {
+               t->ret = 1;
+               return;
        }
-       return 1;
+       a = (unsigned char*)cwt->buf;
+       t->ret = -E_NO_WAV_HEADER;
+       if (a[0] != 'R' || a[1] != 'I' || a[2] != 'F' || a[3] != 'F')
+               return;
+       cwt->channels = (unsigned) a[22];
+       cwt->sample_rate = a[24] + (a[25] << 8) + (a[26] << 16) + (a[27] << 24);
+       *cwt->loaded -= WAV_HEADER_LEN;
+       memmove(cwt->buf, cwt->buf + WAV_HEADER_LEN, *cwt->loaded);
+       t->ret = 0;
+       PARA_INFO_LOG("channels: %d, sample_rate: %d\n", cwt->channels, cwt->sample_rate);
 }
 
-/**
- * check if current time is later than start_time
- * \param diff pointer to write remaining time to
- *
- * If start_time was not given, or current time is later than given
- * start_time, return 0. Otherwise, return 1 and write the time
- * difference between current time and start_time to diff. diff may be
- * NULL.
- *
- */
-static int start_time_in_future(struct timeval *diff)
+static void initial_delay_pre_select(struct sched *s, struct task *t)
 {
-       struct timeval now;
-
-       if (!conf.start_time_given)
-               return 0;
-       gettimeofday(&now, NULL);
-       return tv_diff(start_time, &now, diff) > 0? 1 : 0;
-}
+       struct initial_delay_task *idt = t->private_data;
+       struct timeval diff;
 
-/**
- * sleep until time given at command line
- *
- * This is called if the initial buffer is filled. It returns
- * immediately if no start_time was given at the command line
- * or if the given start time is in the past.
- *
- */
-static void do_initial_delay(struct timeval *delay)
-{
-       do
-               para_select(1, NULL, NULL, delay);
-       while (start_time_in_future(delay));
+       PARA_ERROR_LOG("task %p, ret: %d\n", t, t->ret);
+       t->ret = -E_NO_DELAY;
+       if (!idt->start_time.tv_sec && !idt->start_time.tv_usec)
+               return;
+       t->ret = 0; /* timeout */
+       if (tv_diff(&s->now, &idt->start_time, &diff) > 0)
+               return;
+       t->ret = 1;
+       if (tv_diff(&s->timeout , &diff, NULL) > 0)
+               s->timeout = diff;
 }
 
-static int read_stdin(char *buf, size_t bytes_to_load, size_t *loaded)
-{
-       ssize_t ret;
-
-       while (*loaded < bytes_to_load) {
-               ret = read(STDIN_FILENO, buf + *loaded, bytes_to_load - *loaded);
-               if (ret <= 0) {
-                       if (ret < 0)
-                               ret = -E_READ_STDIN;
-                       return ret;
-               }
-               *loaded += ret;
-       }
-       return 1;
-}
-/**
- * play raw pcm data
- * \param loaded number of bytes already loaded
- *
- * If start_time was given, prebuffer data until buffer is full or
- * start_time is reached. In any case, do not start playing before
- * start_time.
- *
- * \return positive on success, negative on errors.
- */
-static int pcm_write(struct writer_node_group *wng, size_t loaded)
+void para_log(int ll, const char* fmt,...)
 {
-       size_t bufsize, prebuf_size, bytes_to_load;
-       struct timeval delay;
-       int ret, not_yet_started = 1;
+       va_list argp;
 
-       ret = wng_open(wng);
-       if (ret < 0)
-               goto out;
-       PARA_INFO_LOG("max chunk_bytes: %zd\n", wng->max_chunk_bytes);
-       bufsize = (conf.bufsize_arg * 1024 / wng->max_chunk_bytes)
-               * wng->max_chunk_bytes;
-       audiobuf = para_realloc(audiobuf, bufsize);
-       prebuf_size = conf.prebuffer_arg * bufsize / 100;
-       bytes_to_load =  PARA_MIN(prebuf_size, wng->max_chunk_bytes);
-       ret = read_stdin(audiobuf, bytes_to_load, &loaded);
-       if (ret <= 0 || loaded < bytes_to_load) {
-               if (ret >= 0)
-                       ret = -E_PREMATURE_END;
-               goto out;
-       }
-       if (not_yet_started && start_time && start_time_in_future(&delay))
-               do_initial_delay(&delay);
-       not_yet_started = 0;
-again:
-       ret = wng_write(wng, audiobuf, &loaded);
-       if (ret <= 0)
-               goto out;
-       ret = -E_WRITE_OVERRUN;
-       if (loaded >= bufsize)
-               goto out;
-       bytes_to_load = PARA_MIN(wng->max_chunk_bytes, bufsize);
-       ret = read_stdin(audiobuf, bytes_to_load, &loaded);
-       if (ret < 0)
-               goto out;
-       if (!ret)
-               wng->eof = 1;
-       goto again;
-out:
-       wng_close(wng);
-       return ret;
+       if (ll < conf.loglevel_arg)
+               return;
+       va_start(argp, fmt);
+       vfprintf(stderr, fmt, argp);
+       va_end(argp);
 }
 
 static struct writer_node_group *check_args(void)
 {
        int i, ret = -E_WRITE_SYNTAX;
-       static struct timeval tv;
        struct writer_node_group *wng = NULL;
 
        if (conf.list_writers_given) {
@@ -195,9 +131,8 @@ static struct writer_node_group *check_args(void)
                if (sscanf(conf.start_time_arg, "%lu:%lu",
                                &sec, &usec) != 2)
                        goto out;
-               tv.tv_sec = sec;
-               tv.tv_usec = usec;
-               start_time = &tv;
+               idt.start_time.tv_sec = sec;
+               idt.start_time.tv_usec = usec;
        }
        if (!conf.writer_given) {
                wng = setup_default_wng();
@@ -219,41 +154,88 @@ out:
        return NULL;
 }
 
-/**
- * test if audio buffer contains a valid wave header
- *
- * \return If not, return 0, otherwise, store number of channels and sample rate
- * in struct conf and return WAV_HEADER_LEN.
- */
-static size_t check_wave(void)
+static void idt_error_handler(struct task *t)
 {
-       unsigned char *a = (unsigned char*)audiobuf;
-       if (a[0] != 'R' || a[1] != 'I' || a[2] != 'F' || a[3] != 'F')
-               return WAV_HEADER_LEN;
-       conf.channels_arg = (unsigned) a[22];
-       conf.sample_rate_arg = a[24] + (a[25] << 8) + (a[26] << 16) + (a[27] << 24);
-       return 0;
+       PARA_ERROR_LOG("task %p, ret: %d\n", t, t->ret);
+       int ret;
+       unregister_task(t);
+       wng->buf = sit.buf;
+       wng->loaded = &sit.loaded;
+       ret = wng_open(wng);
+       if (ret < 0) {
+               PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret));
+               exit(EXIT_FAILURE);
+       }
+}
+
+static void cwt_error_handler(struct task *t)
+{
+       PARA_ERROR_LOG("task %p, ret: %d\n", t, t->ret);
+       if (t->ret < 0) {
+               PARA_ERROR_LOG("%s\n", PARA_STRERROR(-t->ret));
+               if (t->ret != -E_NO_WAV_HEADER && t->ret != -E_PRE_EOF)
+                       exit(EXIT_FAILURE);
+               if (t->ret == -E_PRE_EOF) {
+                       conf.channels_arg = cwt.channels;
+                       conf.sample_rate_arg = cwt.sample_rate;
+               }
+       }
+       unregister_task(t);
+       idt.task.pre_select = initial_delay_pre_select;
+       idt.task.private_data = &idt;
+       idt.task.error_handler = idt_error_handler;
+       idt.task.flags = PRE_EOF_IS_ERROR;
+       register_task(&idt.task);
+}
+
+static void stdin_error_handler(struct task *t)
+{
+       unregister_task(t);
+       PARA_INFO_LOG("task %p, ret: %d\n", t, t->ret);
+       if (t->ret < 0)
+               PARA_ERROR_LOG("%s\n", PARA_STRERROR(-t->ret));
+       wng->eof = 1;
 }
 
 int main(int argc, char *argv[])
 {
        int ret = -E_WRITE_SYNTAX;
-       struct writer_node_group *wng = NULL;
+       struct sched s;
 
        cmdline_parser(argc, argv, &conf);
        wng = check_args();
        if (!wng)
                goto out;
        init_supported_writers();
-       audiobuf = para_malloc(WAV_HEADER_LEN);
-       ret = read_wav_header();
-       if (ret < 0)
-               goto out;
-       ret = pcm_write(wng, check_wave());
+       init_sched();
+
+       sit.bufsize = 16 * 1024,
+       sit.buf = para_malloc(16 * 1024),
+       sit.loaded = 0,
+       sit.task.pre_select = stdin_pre_select;
+       sit.task.post_select = stdin_post_select;
+       sit.task.error_handler = stdin_error_handler;
+       sit.task.flags = POST_EOF_IS_ERROR;
+       sit.task.private_data = &sit;
+       register_task(&sit.task);
+
+       cwt.task.pre_select = check_wav_pre_select;
+       cwt.task.private_data = &cwt;
+       cwt.task.error_handler = cwt_error_handler;
+       cwt.buf = sit.buf;
+       cwt.loaded = &sit.loaded;
+       cwt.task.flags = PRE_EOF_IS_ERROR;
+       register_task(&cwt.task);
+
+       s.default_timeout.tv_sec = 1;
+       s.default_timeout.tv_usec = 0;
+       ret = sched(&s);
+
 out:
-       wng_destroy(wng);
-       free(audiobuf);
-       if (ret < 0)
+       if (ret < 0) {
                PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret));
+               ret = EXIT_FAILURE;
+       } else
+               ret = EXIT_SUCCESS;
        return ret;
 }
diff --git a/write.h b/write.h
index 5f8ab3a..aece231 100644 (file)
--- a/write.h
+++ b/write.h
@@ -31,6 +31,8 @@ struct writer_node {
        void *private_data;
 /** send that many bytes in one go */
        int chunk_bytes;
+       struct task task;
+       struct writer_node_group *wng;
 };
 
 /** describes one supported writer */
@@ -62,6 +64,8 @@ int (*open)(struct writer_node *);
  *
  */
 int (*write)(char *data, size_t nbytes, struct writer_node *);
+void (*pre_select)(struct sched *s, struct task *t);
+void (*post_select)(struct sched *s, struct task *t);
 /**
  * close one instance of the writer
  *
@@ -91,6 +95,9 @@ int *written;
 size_t max_chunk_bytes;
 /** non-zero if end of file was encountered */
 int eof;
+char *buf;
+size_t *loaded;
+struct task task;
 };
 
 /** loop over each writer node in a writer group */
index e76d7f4..6ff778a 100644 (file)
@@ -20,6 +20,8 @@
 
 #include "para.h"
 #include "string.h"
+#include "list.h"
+#include "sched.h"
 #include "write.h"
 #include "error.h"
 
@@ -70,6 +72,31 @@ out:
        return ret;
 }
 
+static void wng_post_select(struct sched *s, struct task *t)
+{
+       struct writer_node_group *g = t->private_data;
+       int i;
+       size_t min_written = 0;
+
+       FOR_EACH_WRITER_NODE(i, g) {
+               struct writer_node *wn = &g->writer_nodes[i];
+               t->ret = wn->task.ret;
+               if (t->ret < 0)
+                       return;
+               if (!i)
+                       min_written = t->ret;
+               else
+                       min_written = PARA_MIN(min_written, t->ret);
+       }
+       *g->loaded -= min_written;
+       if (!*g->loaded && g->eof)
+               t->ret = 0;
+       else
+               t->ret = 1;
+       if (*g->loaded && min_written)
+               memmove(g->buf, g->buf + min_written, *g->loaded);
+}
+
 int wng_open(struct writer_node_group *g)
 {
        int i, ret = 1;
@@ -81,21 +108,49 @@ int wng_open(struct writer_node_group *g)
                        goto out;
                wn->chunk_bytes = ret;
                g->max_chunk_bytes = PARA_MAX(g->max_chunk_bytes, ret);
+               wn->wng = g;
+               PARA_DEBUG_LOG("pre_select: %p\n", &wn->writer->pre_select);
+               PARA_DEBUG_LOG("post_select: %p\n", &wn->writer->post_select);
+               wn->task.pre_select = wn->writer->pre_select;
+               wn->task.post_select = wn->writer->post_select;
+               wn->task.private_data = wn;
+               register_task(&wn->task);
        }
+       register_task(&g->task);
 out:
        return ret;
 }
 
+void wng_destroy(struct writer_node_group *g)
+{
+       if (!g)
+               return;
+       free(g->written);
+       free(g->writer_nodes);
+       free(g);
+}
+
 void wng_close(struct writer_node_group *g)
 {
        int i;
 
        FOR_EACH_WRITER_NODE(i, g) {
                struct writer_node *wn = &g->writer_nodes[i];
+               unregister_task(&wn->task);
                wn->writer->close(wn);
        }
 }
 
+static void wng_error_handler(struct task *t)
+{
+       struct writer_node_group *g = t->private_data;
+
+       PARA_INFO_LOG("%p: ret = %d\n", t, t->ret);
+       unregister_task(t);
+       wng_close(g);
+       wng_destroy(g);
+}
+
 struct writer_node_group *wng_new(unsigned num_writers)
 {
        struct writer_node_group *g = para_calloc(sizeof(struct writer_node_group));
@@ -103,18 +158,13 @@ struct writer_node_group *wng_new(unsigned num_writers)
        g->writer_nodes = para_calloc(num_writers
                * sizeof(struct writer_node));
        g->written = para_calloc(num_writers * sizeof(size_t));
+       g->task.private_data = g;
+       g->task.post_select = wng_post_select;
+       g->task.error_handler = wng_error_handler;
+       g->task.flags = POST_ADD_TAIL | POST_EOF_IS_ERROR;
        return g;
 }
 
-void wng_destroy(struct writer_node_group *g)
-{
-       if (!g)
-               return;
-       free(g->written);
-       free(g->writer_nodes);
-       free(g);
-}
-
 void init_supported_writers(void)
 {
        int i;