Introduce btr_node_status() and add btr support to the file writer.
authorAndre Noll <maan@systemlinux.org>
Tue, 5 Jan 2010 01:32:21 +0000 (02:32 +0100)
committerAndre Noll <maan@systemlinux.org>
Tue, 5 Jan 2010 01:32:21 +0000 (02:32 +0100)
16 files changed:
aacdec_filter.c
alsa_write.c
amp_filter.c
buffer_tree.c
buffer_tree.h
compress_filter.c
error.h
file_write.c
filter.h
filter_common.c
mp3dec_filter.c
oggdec_filter.c
wmadec_filter.c
write.c
write_common.c
write_common.h

index cb27633..ad1c06a 100644 (file)
@@ -220,11 +220,12 @@ static void aacdec_post_select(__a_unused struct sched *s, struct task *t)
 
 next_buffer:
        t->error = 0;
-       ret = prepare_filter_node(fn);
+       ret = btr_node_status(btrn, fn->min_iqs);
        if (ret < 0)
                goto err;
        if (ret == 0)
                return;
+       btr_merge(btrn, fn->min_iqs);
        len = btr_next_buffer(btrn, (char **)&inbuf);
        iqs = btr_get_input_queue_size(btrn);
        if (!padd->initialized) {
index 8b56f31..67c845d 100644 (file)
@@ -173,10 +173,10 @@ static int alsa_write_pre_select(struct sched *s, struct writer_node *wn)
        if (!pad->handle)
                return 1;
        if (wn->btrn) {
-               size_t sz = btr_get_input_queue_size(wn->btrn);
-               if (sz < pad->bytes_per_frame) {
-                       if (!btr_no_parent(wn->btrn))
-                               return 1;
+               int ret = btr_node_status(wn->btrn, wn->min_iqs);
+               if (ret == 0)
+                       return 1;
+               if (ret < 0) {
                        underrun = 10;
                        goto timeout;
                }
@@ -213,7 +213,7 @@ timeout:
 static void alsa_write_pre_select_btr(struct sched *s, struct task *t)
 {
        struct writer_node *wn = container_of(t, struct writer_node, task);
-       t->error = alsa_write_pre_select(s, wn);
+       alsa_write_pre_select(s, wn);
 }
 
 static void xrun(snd_pcm_t *handle)
@@ -291,6 +291,7 @@ static void alsa_close(struct writer_node *wn)
                snd_pcm_close(pad->handle);
                snd_config_update_free_global();
        }
+       alsa_cmdline_parser_free(wn->conf);
        free(pad);
 }
 
@@ -307,9 +308,10 @@ static void alsa_write_post_select_btr(__a_unused struct sched *s,
 
 again:
        t->error = 0;
-       ret = prepare_writer_node(wn);
+       ret = btr_node_status(btrn, wn->min_iqs);
        if (ret == 0)
                return;
+       btr_merge(btrn, wn->min_iqs);
        bytes = btr_next_buffer(btrn, &data);
        if (bytes < pad->bytes_per_frame) { /* eof */
                assert(btr_no_parent(btrn));
@@ -388,8 +390,6 @@ again:
        ret = -E_ALSA_WRITE;
 err:
        assert(ret < 0);
-       btr_del_node(btrn);
-       alsa_close(wn);
        t->error = ret;
 }
 
index c0835f8..1012246 100644 (file)
@@ -116,14 +116,18 @@ static void amp_post_select(__a_unused struct sched *s, struct task *t)
                return;
        }
 next_buffer:
-       ret = prepare_filter_node(fn);
-       in_bytes = btr_next_buffer(btrn, (char **)&in);
+       ret = btr_node_status(btrn, fn->min_iqs);
        if (ret < 0)
                goto err;
-       len = in_bytes / 2;
-       /* len == 0 happens if eof and in_bytes == 1. */
-       if (ret == 0 || len == 0)
+       if (ret == 0)
                return;
+       btr_merge(btrn, fn->min_iqs);
+       in_bytes = btr_next_buffer(btrn, (char **)&in);
+       len = in_bytes / 2;
+       if (len == 0) { /* eof and in_bytes == 1 */
+               ret = -E_AMP_EOF;
+               goto err;
+       }
 
        if (inplace)
                out = in;
index c826e8f..3b00154 100644 (file)
@@ -404,3 +404,22 @@ void btr_log_tree(struct btr_node *btrn, int loglevel)
 {
        return log_tree_recursively(btrn, loglevel, 0);
 }
+
+/** 640K ought to be enough for everybody ;) */
+#define BTRN_MAX_PENDING (640 * 1024)
+
+int btr_node_status(struct btr_node *btrn, size_t min_iqs)
+{
+       size_t iqs;
+
+       if (btr_eof(btrn))
+               return -E_BTR_EOF;
+       if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
+               return 0;
+       iqs = btr_get_input_queue_size(btrn);
+       if (iqs == 0) /* we have a parent, because not eof */
+               return 0;
+       if (iqs < min_iqs && !btr_no_parent(btrn))
+               return 0;
+       return 1;
+}
index 785cd27..a95d167 100644 (file)
@@ -24,3 +24,4 @@ bool btr_eof(struct btr_node *btrn);
 void btr_log_tree(struct btr_node *btrn, int loglevel);
 int btr_pushdown_one(struct btr_node *btrn);
 bool btr_inplace_ok(struct btr_node *btrn);
+int btr_node_status(struct btr_node *btrn, size_t min_iqs);
index b17549c..142ad89 100644 (file)
@@ -103,11 +103,12 @@ static void compress_post_select(__a_unused struct sched *s, struct task *t)
        //inplace = false;
 next_buffer:
        t->error = 0;
-       ret = prepare_filter_node(fn);
+       ret = btr_node_status(btrn, fn->min_iqs);
        if (ret < 0)
                goto err;
        if (ret == 0)
                return;
+       btr_merge(btrn, fn->min_iqs);
        length = btr_next_buffer(btrn, &inbuf) & ~(size_t)1;
        ip = (int16_t *)inbuf;
        if (inplace)
diff --git a/error.h b/error.h
index bf633f5..739eb6c 100644 (file)
--- a/error.h
+++ b/error.h
@@ -33,11 +33,13 @@ DEFINE_ERRLIST_OBJECT_ENUM;
 #define SERVER_COMMAND_LIST_ERRORS
 #define AFS_COMMAND_LIST_ERRORS
 #define AUDIOD_COMMAND_LIST_ERRORS
-#define BUFFER_TREE_ERRORS
 
 
 extern const char **para_errlist[];
 
+#define BUFFER_TREE_ERRORS \
+       PARA_ERROR(BTR_EOF, "buffer tree: end of file"), \
+
 #define STDOUT_ERRORS \
        PARA_ERROR(ORPHAN, "orphaned (EOF)"), \
 
@@ -107,6 +109,7 @@ extern const char **para_errlist[];
 #define AMP_FILTER_ERRORS \
        PARA_ERROR(AMP_SYNTAX, "syntax error in amp filter config"), \
        PARA_ERROR(AMP_ZERO_AMP, "no amplification necessary"), \
+       PARA_ERROR(AMP_EOF, "amp: end of file"), \
 
 
 #define SEND_COMMON_ERRORS \
index 6ba8019..a9635ca 100644 (file)
@@ -18,6 +18,7 @@
 #include "ggo.h"
 #include "buffer_tree.h"
 #include "write.h"
+#include "write_common.h"
 #include "string.h"
 #include "fd.h"
 #include "file_write.cmdline.h"
@@ -87,6 +88,25 @@ static int file_write_pre_select(struct sched *s, struct writer_node *wn)
        return 1;
 }
 
+static void file_write_pre_select_btr(struct sched *s, struct task *t)
+{
+       struct writer_node *wn = container_of(t, struct writer_node, task);
+       struct private_file_write_data *pfwd = wn->private_data;
+       int ret;
+
+       t->error = 0;
+       pfwd->check_fd = 0;
+       ret = btr_node_status(wn->btrn, wn->min_iqs);
+       if (ret >= 0) {
+               para_fd_set(pfwd->fd, &s->wfds, &s->max_fileno);
+               pfwd->check_fd = 1;
+       }
+       if (ret != 0) {
+               s->timeout.tv_sec = 0;
+               s->timeout.tv_usec = 1;
+       }
+}
+
 static int file_write_post_select(struct sched *s, struct writer_node *wn)
 {
        struct private_file_write_data *pfwd = wn->private_data;
@@ -111,10 +131,45 @@ static int file_write_post_select(struct sched *s, struct writer_node *wn)
 static void file_write_close(struct writer_node *wn)
 {
        struct private_file_write_data *pfwd = wn->private_data;
+
        close(pfwd->fd);
+       file_cmdline_parser_free(wn->conf);
        free(pfwd);
 }
 
+static void file_write_post_select_btr(__a_unused struct sched *s,
+               struct task *t)
+{
+       struct writer_node *wn = container_of(t, struct writer_node, task);
+       struct private_file_write_data *pfwd = wn->private_data;
+       struct btr_node *btrn = wn->btrn;
+       int ret;
+       char *buf;
+       size_t bytes;
+
+       t->error = 0;
+       ret = btr_node_status(btrn, wn->min_iqs);
+       if (ret == 0)
+               return;
+       if (ret < 0)
+               goto err;
+       if (!pfwd->check_fd)
+               return;
+       if (!FD_ISSET(pfwd->fd, &s->wfds))
+               return;
+       bytes = btr_next_buffer(btrn, &buf);
+       assert(bytes > 0);
+       //PARA_INFO_LOG("writing %zu\n", bytes);
+       ret = write(pfwd->fd, buf, bytes);
+       if (ret < 0)
+               goto err;
+       btr_consume(btrn, ret);
+       return;
+err:
+       assert(ret < 0);
+       t->error = ret;
+}
+
 __malloc static void *file_write_parse_config(const char *options)
 {
        struct file_write_args_info *conf
@@ -136,7 +191,9 @@ void file_write_init(struct writer *w)
        file_cmdline_parser_init(&dummy);
        w->open = file_write_open;
        w->pre_select = file_write_pre_select;
+       w->pre_select_btr = file_write_pre_select_btr;
        w->post_select = file_write_post_select;
+       w->post_select_btr = file_write_post_select_btr;
        w->parse_config = file_write_parse_config;
        w->close = file_write_close;
        w->shutdown = NULL; /* nothing to do */
index b690181..ae0a11e 100644 (file)
--- a/filter.h
+++ b/filter.h
@@ -205,7 +205,6 @@ void filter_init(void);
 int check_filter_arg(char *filter_arg, void **conf);
 void filter_post_select(__a_unused struct sched *s, struct task *t);
 void print_filter_helps(int detailed);
-int prepare_filter_node(struct filter_node *fn);
 void generic_filter_pre_select(struct sched *s, struct task *t);
 
 static inline void write_int16_host_endian(char *buf, int val)
index a27c2d3..acd84d5 100644 (file)
@@ -276,38 +276,14 @@ void print_filter_helps(int detailed)
 
 }
 
-/** 640K ought to be enough for everybody ;) */
-#define FILTER_MAX_PENDING (640 * 1024)
-
-int prepare_filter_node(struct filter_node *fn)
-{
-       struct btr_node *btrn = fn->btrn;
-       size_t iqs;
-
-       if (btr_eof(btrn))
-               return -E_FC_EOF;
-       if (btr_bytes_pending(btrn) > FILTER_MAX_PENDING)
-               return 0;
-       iqs = btr_get_input_queue_size(btrn);
-       if (iqs < fn->min_iqs && !btr_no_parent(btrn))
-               return 0;
-       assert(iqs != 0);
-       /* avoid "buffer too small" errors from the decoder */
-       btr_merge(btrn, fn->min_iqs);
-       return 1;
-}
-
 void generic_filter_pre_select(struct sched *s, struct task *t)
 {
        struct filter_node *fn = container_of(t, struct filter_node, task);
-       size_t iqs = btr_get_input_queue_size(fn->btrn);
 
        t->error = 0;
-       if (iqs < fn->min_iqs)
-               return;
-       if (btr_bytes_pending(fn->btrn) > FILTER_MAX_PENDING)
-               return; /* FIXME, should use reasonable bound on timeout */
-       s->timeout.tv_sec = 0;
-       s->timeout.tv_usec = 1;
+       if (btr_node_status(fn->btrn, fn->min_iqs) != 0) {
+               s->timeout.tv_sec = 0;
+               s->timeout.tv_usec = 1;
+       }
 }
 
index 1e074a7..a6a628e 100644 (file)
@@ -198,11 +198,12 @@ next_buffer:
        iqs = btr_get_input_queue_size(btrn);
        if (need_bad_data_delay(pmd, iqs))
                return;
-       ret = prepare_filter_node(fn);
+       ret = btr_node_status(btrn, fn->min_iqs);
        if (ret < 0)
                goto err;
        if (ret == 0)
                return;
+       btr_merge(btrn, fn->min_iqs);
        len = btr_next_buffer(btrn, &inbuffer);
        mad_stream_buffer(&pmd->stream, (unsigned char *)inbuffer, len);
 next_frame:
index ccc4255..34a04db 100644 (file)
@@ -191,11 +191,12 @@ static void ogg_post_select(__a_unused struct sched *s, struct task *t)
        char *in;
 
        t->error = 0;
-       ret = prepare_filter_node(fn);
+       ret = btr_node_status(btrn, fn->min_iqs);
        if (ret < 0)
                goto err;
        if (ret == 0)
                return;
+       btr_merge(btrn, fn->min_iqs);
        len = btr_next_buffer(btrn, &in);
        iqs = btr_get_input_queue_size(btrn);
        if (!pod->vf) {
index 214ceb1..d214b42 100644 (file)
@@ -1238,11 +1238,12 @@ static void wmadec_post_select(__a_unused struct sched *s, struct task *t)
 
 next_buffer:
        t->error = 0;
-       ret = prepare_filter_node(fn);
+       ret = btr_node_status(btrn, fn->min_iqs);
        if (ret < 0)
                goto err;
        if (ret == 0)
                return;
+       btr_merge(btrn, fn->min_iqs);
        len = btr_next_buffer(btrn, (char **)&in);
        ret = -E_WMADEC_EOF;
        if (len < fn->min_iqs)
diff --git a/write.c b/write.c
index c384894..9577f04 100644 (file)
--- a/write.c
+++ b/write.c
@@ -176,22 +176,19 @@ static void check_wav_post_select_btr(__a_unused struct sched *s, struct task *t
                PARA_NOTICE_LOG("wav header not found\n");
                cwt->state = CWS_NO_HEADER;
                sprintf(t->status, "check wav: no header");
-               goto consume;
+               goto out;
        }
        PARA_INFO_LOG("found wav header\n");
        cwt->state = CWS_HAVE_HEADER;
        sprintf(t->status, "check wav: have header");
        cwt->channels = (unsigned) a[22];
        cwt->samplerate = a[24] + (a[25] << 8) + (a[26] << 16) + (a[27] << 24);
-consume:
        PARA_INFO_LOG("channels: %d, sample rate: %d\n", cwt->channels, cwt->samplerate);
        btr_consume(cwt->btrn, WAV_HEADER_LEN);
 out:
-       if (sz) {
+       if (sz)
                btr_pushdown(cwt->btrn);
-               s->timeout.tv_sec = 0;
-               s->timeout.tv_usec = 1;
-       } else {
+       else {
                if (btr_no_parent(cwt->btrn))
                        t->error = -E_WRITE_EOF;
        }
@@ -308,6 +305,7 @@ static int main_btr(struct sched *s)
                wns[0] = setup_writer_node(NULL, cwt->btrn);
                if (!wns[0])
                        goto out;
+               i = 1;
        } else {
                wns = para_malloc(conf.writer_given * sizeof(*wns));
                for (i = 0; i < conf.writer_given; i++) {
@@ -326,8 +324,10 @@ out:
        for (i--; i >= 0; i--) {
                struct writer_node *wn = wns[i];
                struct writer *w = writers + wn->writer_num;
+
                w->close(wn);
-               free(wn->conf); /* FIXME should call gengetopt cleanup funtion */
+               btr_del_node(wn->btrn);
+               free(wn->conf);
                free(wn);
        }
        free(wns);
index ff8999f..14ce819 100644 (file)
@@ -257,7 +257,7 @@ struct writer_node *setup_writer_node(const char *arg, struct btr_node *parent)
 {
        struct writer_node *wn = para_calloc(sizeof(*wn));
        struct writer *w;
-       const char *name;
+       char *name;
 
        if (arg)
                wn->conf = check_writer_arg(arg, &wn->writer_num);
@@ -270,9 +270,10 @@ struct writer_node *setup_writer_node(const char *arg, struct btr_node *parent)
                return NULL;
        }
        w = writers + wn->writer_num;
-       name = writer_names[wn->writer_num];
+       name = make_message("%s writer", writer_names[wn->writer_num]);
        wn->btrn = btr_new_node(name, parent, w->execute, wn);
-       sprintf(wn->task.status, "%s", name);
+       strcpy(wn->task.status, name);
+       free(name);
        w->open(wn);
        wn->task.post_select = w->post_select_btr;
        wn->task.pre_select = w->pre_select_btr;
@@ -303,20 +304,3 @@ void print_writer_helps(int detailed)
                ggo_print_help(&w->help, detailed);
        }
 }
-
-int prepare_writer_node(struct writer_node *wn)
-{
-       struct btr_node *btrn = wn->btrn;
-       size_t iqs;
-
-       if (btr_eof(btrn))
-               return -E_WRITE_COMMON_EOF;
-       iqs = btr_get_input_queue_size(btrn);
-       if (iqs < wn->min_iqs && !btr_no_parent(btrn))
-               return 0;
-       assert(iqs != 0);
-       /* avoid "buffer too small" errors from the decoder */
-       btr_merge(btrn, wn->min_iqs);
-       return 1;
-}
-
index 34e21ce..f34ad3f 100644 (file)
@@ -14,4 +14,3 @@ void *check_writer_arg(const char *wa, int *writer_num);
 struct writer_node_group *setup_default_wng(void);
 void print_writer_helps(int detailed);
 struct writer_node *setup_writer_node(const char *arg, struct btr_node *parent);
-int prepare_writer_node(struct writer_node *wn);