convert para_filter to the new scheduler
authorAndre <maan@p133.(none)>
Thu, 25 May 2006 12:49:03 +0000 (14:49 +0200)
committerAndre <maan@p133.(none)>
Thu, 25 May 2006 12:49:03 +0000 (14:49 +0200)
This was again straight forward.

16 files changed:
aacdec.c
compress.c
configure.ac
error.h
filter.c
filter.h
filter_chain.c
mp3dec.c
oggdec.c
recv.c
stdin.c
stdin.h
stdout.c
stdout.h
wav.c
write.c

index e07c146..25facd5 100644 (file)
--- a/aacdec.c
+++ b/aacdec.c
@@ -25,6 +25,7 @@
 #include "para.h"
 
 #include "list.h"
+#include "sched.h"
 #include "filter.h"
 #include "error.h"
 #include "string.h"
@@ -60,7 +61,7 @@ static ssize_t aacdec(char *input_buffer, size_t len, struct filter_node *fn)
 
        if (fn->loaded > fn->bufsize * 4 / 5)
                return 0;
-       if (len < 1000 && !*fc->eof)
+       if (len < 1000 && !*fc->reader_eof)
                return 0;
 
        if (!padd->initialized) {
index 3e605d0..936ddc3 100644 (file)
@@ -25,6 +25,7 @@
 #include "para.h"
 #include "compress_filter.cmdline.h"
 #include "list.h"
+#include "sched.h"
 #include "filter.h"
 #include "string.h"
 
index 1702893..5b3e2b0 100644 (file)
@@ -61,7 +61,7 @@ recv_errlist_objs="http_recv recv_common recv time string net dccp_recv
 recv_ldflags=""
 
 filter_cmdline_objs="filter.cmdline compress_filter.cmdline"
-filter_errlist_objs="filter_chain wav compress filter string"
+filter_errlist_objs="filter_chain wav compress filter string stdin stdout sched fd"
 filter_ldflags=""
 
 audiod_cmdline_objs="audiod.cmdline grab_client.cmdline compress_filter.cmdline
diff --git a/error.h b/error.h
index aa5666b..c4d9e5f 100644 (file)
--- a/error.h
+++ b/error.h
@@ -154,6 +154,7 @@ extern const char **para_errlist[];
 #define FILTER_CHAIN_ERRORS \
        PARA_ERROR(UNSUPPORTED_FILTER, "given filter not supported"), \
        PARA_ERROR(BAD_FILTER_OPTIONS, "invalid filter option given"), \
+       PARA_ERROR(FC_EOF, "filter chain: eof"), \
 
 
 #define STAT_ERRORS \
index c84b482..79ae4dc 100644 (file)
--- a/filter.c
+++ b/filter.c
 
 #include "filter.cmdline.h"
 #include "list.h"
+#include "sched.h"
 #include "filter.h"
-#include "error.h"
 #include "string.h"
+#include "stdin.h"
+#include "stdout.h"
+#include "error.h"
 
 INIT_FILTER_ERRLISTS;
 
 #define INBUF_SIZE 32 * 1024
 
+static struct stdin_task stdin_task_struct;
+static struct stdin_task *sit = &stdin_task_struct;
 static struct filter_chain filter_chain_struct;
 static struct filter_chain *fc = &filter_chain_struct;
+static struct stdout_task stdout_task_struct;
+static struct stdout_task *sot = &stdout_task_struct;
 
 struct gengetopt_args_info conf;
 
@@ -46,23 +53,38 @@ __printf_2_3 void para_log(int ll, const char* fmt,...)
        va_end(argp);
 }
 
-static char *inbuf;
-static size_t loaded;
-static int eof;
+void filter_event_handler(struct task *t)
+{
+       PARA_ERROR_LOG("%s\n", PARA_STRERROR(-t->ret));
+       unregister_task(t);
+}
 
-static int init_active_filter_list(void)
+static void open_filters(void)
+{
+       struct filter_node *fn;
+
+       list_for_each_entry(fn, &fc->filters, node) {
+               fn->filter->open(fn);
+               PARA_INFO_LOG("opened %s filter\n", fn->filter->name);
+               fc->outbuf = fn->buf;
+               fc->out_loaded = &fn->loaded;
+       }
+}
+
+
+static int init_filter_chain(void)
 {
        int i, filter_num;
        struct filter_node *fn;
 
        INIT_LIST_HEAD(&fc->filters);
 
-       fc->inbuf = inbuf;
-       fc->in_loaded = &loaded;
-       fc->eof = &eof;
+       fc->inbuf = sit->buf;
+       fc->in_loaded = &sit->loaded;
+       fc->reader_eof = &sit->eof;
 
        for (i = 0; i < conf.filter_given; i++) {
-               char *fa = para_strdup(conf.filter_arg[i]);
+               char *fa = conf.filter_arg[i];
                fn = para_calloc(sizeof(struct filter_node));
                filter_num = check_filter_arg(fa, &fn->conf);
                if (filter_num < 0) {
@@ -77,21 +99,14 @@ static int init_active_filter_list(void)
        }
        if (list_empty(&fc->filters))
                return -E_NO_FILTERS;
+       fc->task.private_data = fc;
+       fc->task.pre_select = filter_pre_select;
+       fc->task.event_handler = filter_event_handler;
+       sprintf(fc->task.status, "filter chain");
+       open_filters();
        return 1;
 }
 
-static void open_filters(void)
-{
-       struct filter_node *fn;
-
-       list_for_each_entry(fn, &fc->filters, node) {
-               fn->filter->open(fn);
-               PARA_INFO_LOG("opened %s filter\n", fn->filter->name);
-               fc->outbuf = fn->buf;
-               fc->out_loaded = &fn->loaded;
-       }
-}
-
 static int parse_config(int argc, char *argv[])
 {
        static char *cf; /* config file */
@@ -122,55 +137,36 @@ static int parse_config(int argc, char *argv[])
 
 int main(int argc, char *argv[])
 {
-       int converted, ret;
-       char *ib, *ob; /* input/output buffer */
-       size_t *il, *ol; /* number of loaded bytes in input/output buffer */
+       int ret;
+       struct sched s;
+
+       init_sched();
+       stdin_set_defaults(sit);
+       sit->buf = para_malloc(sit->bufsize),
 
        filter_init(filters);
        ret = parse_config(argc, argv);
        if (ret < 0)
                goto out;
-       inbuf = para_malloc(INBUF_SIZE);
-       ret = init_active_filter_list();
+       ret = init_filter_chain();
        if (ret < 0)
                goto out;
-       open_filters();
-       ib = fc->inbuf;
-       ob = fc->outbuf;
-       il = fc->in_loaded;
-       ol = fc->out_loaded;
-       PARA_DEBUG_LOG("ib %p in, ob: %p\n", ib, ob);
-again:
-       if (*il < INBUF_SIZE && !eof) {
-               ret  = read(STDIN_FILENO, ib + *il, INBUF_SIZE - *il);
-               PARA_DEBUG_LOG("read %d/%zd\n", ret, INBUF_SIZE - *il);
-               if (ret < 0)
-                       goto out;
-               if (!ret)
-                       eof = 1;
-               *il += ret;
-       }
-       ret = filter_io(fc);
-       if (ret < 0)
-               goto out;
-       converted = ret;
-       if (*ol) {
-               ret = write(STDOUT_FILENO, ob, *ol);
-               PARA_DEBUG_LOG("wrote %d/%zd\n", ret, *ol);
-               if (ret <= 0)
-                       goto out;
-               *ol -= ret;
-               if (*ol) {
-                       PARA_NOTICE_LOG("short write: %zd bytes left\n", *ol);
-                       memmove(ob, ob + ret, *ol);
-               }
-       }
-       if (!eof || converted)
-               goto again;
-       ret = 0;
+
+       stdout_set_defaults(sot);
+       sot->buf = fc->outbuf;
+       sot->loaded = fc->out_loaded;
+       sot->eof = &fc->eof;
+
+       register_task(&sot->task);
+       register_task(&fc->task);
+       register_task(&sit->task);
+       s.default_timeout.tv_sec = 1;
+       s.default_timeout.tv_usec = 0;
+       ret = sched(&s);
 out:
+       free(sit->buf);
        if (ret < 0)
                PARA_EMERG_LOG("%s\n", PARA_STRERROR(-ret));
        close_filters(fc);
-       return ret;
+       return ret < 0? EXIT_FAILURE : EXIT_SUCCESS;
 }
index a6b3907..10947b9 100644 (file)
--- a/filter.h
+++ b/filter.h
@@ -75,18 +75,12 @@ struct filter_chain {
         * pointer to variable containing the number of bytes loaded in the output buffer
         */
                size_t *out_loaded;
-       /**
-        *
-        *
-        * non-zero if end of file was encountered
-        */
-               int *eof;
-       /**
-        *
-        *
-        * non-zero if an error occured
-        */
-               int error;
+       /** non-zero if this filter wont' produce any more output */
+       int eof;
+       /** pointer to the eof flag of the receiving application */
+       int *reader_eof;
+       /** the task associated with the filter chain */
+       struct task task;
 };
 
 /**
@@ -217,6 +211,7 @@ int filter_io(struct filter_chain *fc);
 void filter_init(struct filter *all_filters);
 int check_filter_arg(char *filter_arg, void **conf);
 int del_filter_callback(struct filter_callback *fcb);
+void filter_pre_select(struct sched *s, struct task *t);
 
 /**
  * the structure associated with a paraslash filter
index d755e43..2510e49 100644 (file)
@@ -20,6 +20,8 @@
 
 #include "para.h"
 #include "list.h"
+#include "sched.h"
+#include "fd.h"
 #include "filter.h"
 #include "error.h"
 #include "string.h"
@@ -95,22 +97,22 @@ static void call_callbacks(struct filter_node *fn, char *inbuf, size_t inlen,
 /**
  * call the convert function of each filter
  *
- * \param fc the filter chain containing the list of filter nodes.
- *
  * This is the core function of the filter subsystem. It loops over the list of
- * filter nodes determined by \a fc and calls the filter's convert function if
+ * filter nodes determined by \a t and calls the filter's convert function if
  * there is input available for the filter node in question. If the convert
  * function consumed some or all of its input data, all registered input
  * callbacks are called.  Similarly, if a convert function produced output, all
  * registerd output callbacks get called.
  *
- * \return The sum of output bytes produced by the convert functions on success,
- * negative return value on errors.
+ * \return The sum of output bytes produced by the convert functions on
+ * success, negative return value on errors (the return value is stored in
+ * t->ret).
  *
  * \sa filter_node, filter#convert, filter_callback
  */
-int filter_io(struct filter_chain *fc)
+void filter_pre_select(__a_unused struct sched *s, struct task *t)
 {
+       struct filter_chain *fc = t->private_data;
        struct filter_node *fn;
        char *ib;
        size_t *loaded;
@@ -120,35 +122,40 @@ again:
        loaded = fc->in_loaded;
        conv = 0;
        list_for_each_entry(fn, &fc->filters, node) {
-               int ret;
                if (*loaded && fn->loaded < fn->bufsize) {
                        size_t old_fn_loaded = fn->loaded;
                        PARA_DEBUG_LOG("fc %p loaded: %zd, calling %s convert\n",
                                fc, *loaded, fn->filter->name);
-                       ret = fn->filter->convert(ib, *loaded, fn);
-                       if (ret < 0) {
-                               if (!fc->error)
-                                       fc->error = -ret;
-                               return ret;
-                       }
-                       call_callbacks(fn, ib, ret, fn->buf + old_fn_loaded,
+                       t->ret = fn->filter->convert(ib, *loaded, fn);
+                       if (t->ret < 0)
+                               return;
+                       call_callbacks(fn, ib, t->ret, fn->buf + old_fn_loaded,
                                fn->loaded - old_fn_loaded);
-                       *loaded -= ret;
-                       conv += ret;
-                       if (*loaded && ret) {
+                       *loaded -= t->ret;
+                       conv += t->ret;
+                       if (*loaded && t->ret) {
                                PARA_DEBUG_LOG("moving %zd bytes in input buffer for %s filter\n",
                                        *loaded,  fn->filter->name);
-                               memmove(ib, ib + ret, *loaded);
+                               memmove(ib, ib + t->ret, *loaded);
                        }
                }
                ib = fn->buf;
                loaded = &fn->loaded;
        }
-//     PARA_DEBUG_LOG("loaded: %d\n", *loaded);
        conv_total += conv;
+       PARA_DEBUG_LOG("reader eof: %d, eof: %d out_loaded: %d, conv: %d, conv_total: %d\n", *fc->reader_eof,
+               fc->eof, *fc->out_loaded, conv, conv_total);
        if (conv)
                goto again;
-       return conv_total;
+       t->ret = 1;
+       if (!*fc->reader_eof)
+               return;
+       if (*fc->out_loaded)
+               return;
+       if (*fc->in_loaded && conv_total)
+               return;
+       t->ret = -E_FC_EOF;
+       fc->eof = 1;
 }
 
 /**
index c02432e..dd7887d 100644 (file)
--- a/mp3dec.c
+++ b/mp3dec.c
@@ -19,8 +19,8 @@
 /** \file mp3dec.c paraslash's mp3 decoder */
 
 #include "para.h"
-
 #include "list.h"
+#include "sched.h"
 #include "filter.h"
 #include "error.h"
 #include <mad.h>
index 637fa96..481e42a 100644 (file)
--- a/oggdec.c
+++ b/oggdec.c
@@ -22,6 +22,7 @@
 
 #include "oggdec_filter.cmdline.h"
 #include "list.h"
+#include "sched.h"
 #include "filter.h"
 #include "error.h"
 #include "string.h"
@@ -53,11 +54,11 @@ static size_t cb_read(void *buf, size_t size, size_t nmemb, void *datasource)
        size_t ret, have = pod->inbuf_len - pod->converted;
        char *p = pod->inbuf + pod->converted;
 
-       if (*fn->fc->eof)
-               return 0;
 //     PARA_DEBUG_LOG("pod = %p\n", pod);
 //     PARA_DEBUG_LOG("vorbis requests %d bytes, have %d\n", size * nmemb, have);
        if (pod->inbuf_len < size) {
+               if (*fn->fc->reader_eof)
+                       return 0;
                errno = EAGAIN;
                return -1;
        }
@@ -133,7 +134,7 @@ static ssize_t ogg_convert(char *inbuffer, size_t len, struct filter_node *fn)
 
        if (!pod->vf) {
                int ib = 1024 * conf->initial_buffer_arg; /* initial buffer */
-               if (len <ib && !*fn->fc->eof && !fn->fc->error) {
+               if (len <ib && !*fn->fc->reader_eof) {
                        PARA_INFO_LOG("initial input buffer %zd/%d, waiting for more data\n",
                                len, ib);
                        return 0;
@@ -167,7 +168,7 @@ again:
        if (ret < 0)
                return -E_OGGDEC_BADLINK;
        fn->loaded += ret;
-       if (!*fn->fc->eof && !fn->fc->error && fn->loaded < fn->bufsize)
+       if (!*fn->fc->reader_eof && fn->loaded < fn->bufsize)
                goto again;
        return pod->converted;
 }
diff --git a/recv.c b/recv.c
index 4b84e8b..f872fec 100644 (file)
--- a/recv.c
+++ b/recv.c
@@ -133,22 +133,14 @@ out:
 
 void rn_event_handler(struct task *t)
 {
-       PARA_ERROR_LOG("%s\n", PARA_STRERROR(-t->ret));
-       unregister_task(t);
-}
-
-void stdout_event_handler(struct task *t)
-{
-       PARA_ERROR_LOG("%s\n", PARA_STRERROR(-t->ret));
+       PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret));
        unregister_task(t);
 }
 
 int main(int argc, char *argv[])
 {
-       int ret, eof = 0, max, r_opened = 0, receiver_num;
-       struct timeval timeout;
+       int ret, r_opened = 0, receiver_num;
        struct  receiver *r = NULL;
-       fd_set rfds, wfds;
        struct receiver_node rn;
        struct stdout_task sot;
        struct sched s;
@@ -173,12 +165,7 @@ int main(int argc, char *argv[])
                goto out;
        r_opened = 1;
 
-       sot.task.private_data = &sot;
-       sot.task.pre_select = stdout_pre_select;
-       sot.task.post_select = stdout_post_select;
-       sot.task.event_handler = stdout_event_handler;
-       sot.task.flags = 0;
-       sprintf(sot.task.status, "stdout writer");
+       stdout_set_defaults(&sot);
        sot.buf = rn.buf;
        sot.loaded = &rn.loaded;
        sot.eof = &rn.eof;
diff --git a/stdin.c b/stdin.c
index 299c326..ebfa6cc 100644 (file)
--- a/stdin.c
+++ b/stdin.c
@@ -14,6 +14,12 @@ void stdin_pre_select(struct sched *s, struct task *t)
        t->ret = 1; /* success */
 }
 
+static void stdin_default_event_handler(struct task *t)
+{
+       PARA_NOTICE_LOG("%p: %s\n", t, PARA_STRERROR(-t->ret));
+       unregister_task(t);
+}
+
 void stdin_post_select(struct sched *s, struct task *t)
 {
        struct stdin_task *sit = t->private_data;
@@ -35,3 +41,15 @@ void stdin_post_select(struct sched *s, struct task *t)
        if (t->ret < 0)
                sit->eof = 1;
 }
+
+void stdin_set_defaults(struct stdin_task *sit)
+{
+       sit->bufsize = 16 * 1024,
+       sit->loaded = 0,
+       sit->task.flags = 0,
+       sit->task.pre_select = stdin_pre_select;
+       sit->task.post_select = stdin_post_select;
+       sit->task.event_handler = stdin_default_event_handler;
+       sit->task.private_data = sit;
+       sprintf(sit->task.status, "stdin reader");
+}
diff --git a/stdin.h b/stdin.h
index 753cef5..cb6cbfb 100644 (file)
--- a/stdin.h
+++ b/stdin.h
@@ -8,3 +8,4 @@ struct stdin_task {
 
 void stdin_pre_select(struct sched *s, struct task *t);
 void stdin_post_select(struct sched *s, struct task *t);
+void stdin_set_defaults(struct stdin_task *sit);
index 8370305..af59807 100644 (file)
--- a/stdout.c
+++ b/stdout.c
@@ -38,3 +38,20 @@ void stdout_post_select(struct sched *s, struct task *t)
        *sot->loaded -= ret;
        t->ret = 1;
 }
+
+void stdout_default_event_handler(struct task *t)
+{
+       PARA_NOTICE_LOG("%p: %s\n", t, PARA_STRERROR(-t->ret));
+       unregister_task(t);
+}
+
+
+void stdout_set_defaults(struct stdout_task *sot)
+{
+       sot->task.private_data = sot;
+       sot->task.pre_select = stdout_pre_select;
+       sot->task.post_select = stdout_post_select;
+       sot->task.event_handler = stdout_default_event_handler;
+       sot->task.flags = 0;
+       sprintf(sot->task.status, "stdout writer");
+}
index f02483d..e467f41 100644 (file)
--- a/stdout.h
+++ b/stdout.h
@@ -9,3 +9,4 @@ struct stdout_task {
 
 void stdout_pre_select(struct sched *s, struct task *t);
 void stdout_post_select(struct sched *s, struct task *t);
+void stdout_set_defaults(struct stdout_task *sot);
diff --git a/wav.c b/wav.c
index b955695..0b24971 100644 (file)
--- a/wav.c
+++ b/wav.c
@@ -21,6 +21,7 @@
 #include "para.h"
 
 #include "list.h"
+#include "sched.h"
 #include "filter.h"
 #include "string.h"
 
diff --git a/write.c b/write.c
index 5bcd1f6..13b15a1 100644 (file)
--- a/write.c
+++ b/write.c
@@ -193,15 +193,6 @@ static void cwt_event_handler(struct task *t)
        register_task(&idt.task);
 }
 
-static void stdin_event_handler(struct task *t)
-{
-       unregister_task(t);
-       if (t->ret != -E_STDIN_EOF)
-               PARA_ERROR_LOG("%s\n", PARA_STRERROR(-t->ret));
-       else
-               PARA_INFO_LOG("%s\n", PARA_STRERROR(-t->ret));
-}
-
 int main(int argc, char *argv[])
 {
        int ret = -E_WRITE_SYNTAX;
@@ -214,14 +205,8 @@ int main(int argc, char *argv[])
        init_supported_writers();
        init_sched();
 
-       sit.bufsize = 16 * 1024,
-       sit.buf = para_malloc(16 * 1024),
-       sit.loaded = 0,
-       sit.task.pre_select = stdin_pre_select;
-       sit.task.post_select = stdin_post_select;
-       sit.task.event_handler = stdin_event_handler;
-       sit.task.private_data = &sit;
-       sprintf(sit.task.status, "stdin reader");
+       stdin_set_defaults(&sit);
+       sit.buf = para_malloc(sit.bufsize),
        register_task(&sit.task);
 
        cwt.task.pre_select = check_wav_pre_select;