introduce input_eof and ouput_eof
authorAndre <maan@p133.(none)>
Thu, 25 May 2006 13:53:00 +0000 (15:53 +0200)
committerAndre <maan@p133.(none)>
Thu, 25 May 2006 13:53:00 +0000 (15:53 +0200)
para_filter/para_audiod needs this. For example, it is pointless to
convert more audiod data if the writing application is no longer active.

14 files changed:
aacdec.c
alsa_writer.c
audiod.c
filter.c
filter.h
filter_chain.c
oggdec.c
recv.c
stdin.c
stdout.c
stdout.h
write.c
write.h
write_common.c

index 25facd5..fe046b7 100644 (file)
--- a/aacdec.c
+++ b/aacdec.c
@@ -61,7 +61,7 @@ static ssize_t aacdec(char *input_buffer, size_t len, struct filter_node *fn)
 
        if (fn->loaded > fn->bufsize * 4 / 5)
                return 0;
-       if (len < 1000 && !*fc->reader_eof)
+       if (len < 1000 && !*fc->input_eof)
                return 0;
 
        if (!padd->initialized) {
index ad65c92..f05dbbd 100644 (file)
@@ -140,7 +140,7 @@ static void alsa_write_pre_select(struct sched *s, struct task *t)
        struct timeval diff;
 
        t->ret = 0;
-       if (*wng->eof && *wng->loaded < pad->bytes_per_frame)
+       if (*wng->input_eof && *wng->loaded < pad->bytes_per_frame)
                return;
        t->ret = 1;
        if (*wng->loaded < pad->bytes_per_frame)
@@ -165,7 +165,7 @@ static void alsa_write_post_select(struct sched *s, struct task *t)
 
        t->ret = 0;
        if (!frames) {
-               if (*wng->eof)
+               if (*wng->input_eof)
                        t->ret = *wng->loaded;
                return;
        }
index 1d2d69c..db4d478 100644 (file)
--- a/audiod.c
+++ b/audiod.c
@@ -451,7 +451,6 @@ static void kill_stream_writer(int slot_num)
                s->wpid, audio_formats[s->format], slot_num);
        kill(s->wpid, SIGTERM);
        s->wkilled = 1;
-       s->fci->error = 1;
 }
 
 static void set_restart_barrier(int format, struct timeval *now)
@@ -928,16 +927,15 @@ static void close_decoder_if_idle(int slot_num)
                return;
        if (!s->fci)
                return;
-       if (!rn->eof && !s->fci->error && s->wpid > 0)
+       if (!rn->eof && !s->fc->eof && s->wpid > 0)
                return;
-       if (!s->fci->error && s->wpid > 0) { /* eof */
+       if (!s->fci->eof && s->wpid > 0) { /* eof */
                if (filter_io(s->fci) > 0)
                        return;
                if (get_loaded_bytes(slot_num))
                        return;
        }
        if (s->write_fd > 0) {
-               PARA_INFO_LOG("err: %d\n", s->fci->error);
                PARA_INFO_LOG("slot %d: closing write fd %d\n", slot_num,
                        s->write_fd);
                close(s->write_fd);
index 79ae4dc..cfa2e94 100644 (file)
--- a/filter.c
+++ b/filter.c
@@ -55,7 +55,7 @@ __printf_2_3 void para_log(int ll, const char* fmt,...)
 
 void filter_event_handler(struct task *t)
 {
-       PARA_ERROR_LOG("%s\n", PARA_STRERROR(-t->ret));
+       PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret));
        unregister_task(t);
 }
 
@@ -81,7 +81,13 @@ static int init_filter_chain(void)
 
        fc->inbuf = sit->buf;
        fc->in_loaded = &sit->loaded;
-       fc->reader_eof = &sit->eof;
+       fc->input_eof = &sit->eof;
+       fc->eof = 0;
+       fc->output_eof = &sot->eof;
+       fc->task.private_data = fc;
+       fc->task.pre_select = filter_pre_select;
+       fc->task.event_handler = filter_event_handler;
+       sprintf(fc->task.status, "filter chain");
 
        for (i = 0; i < conf.filter_given; i++) {
                char *fa = conf.filter_arg[i];
@@ -99,10 +105,6 @@ static int init_filter_chain(void)
        }
        if (list_empty(&fc->filters))
                return -E_NO_FILTERS;
-       fc->task.private_data = fc;
-       fc->task.pre_select = filter_pre_select;
-       fc->task.event_handler = filter_event_handler;
-       sprintf(fc->task.status, "filter chain");
        open_filters();
        return 1;
 }
@@ -153,15 +155,17 @@ int main(int argc, char *argv[])
                goto out;
 
        stdout_set_defaults(sot);
+       PARA_EMERG_LOG("fc->output_eof: %d\n", *fc->output_eof);
        sot->buf = fc->outbuf;
        sot->loaded = fc->out_loaded;
-       sot->eof = &fc->eof;
+       sot->input_eof = &fc->eof;
 
        register_task(&sot->task);
        register_task(&fc->task);
        register_task(&sit->task);
        s.default_timeout.tv_sec = 1;
        s.default_timeout.tv_usec = 0;
+       PARA_EMERG_LOG("fc->output_eof: %d\n", *fc->output_eof);
        ret = sched(&s);
 out:
        free(sit->buf);
index 10947b9..45ade8d 100644 (file)
--- a/filter.h
+++ b/filter.h
@@ -78,7 +78,9 @@ struct filter_chain {
        /** non-zero if this filter wont' produce any more output */
        int eof;
        /** pointer to the eof flag of the receiving application */
-       int *reader_eof;
+       int *input_eof;
+       /** pointer to the eof flag of the writing application */
+       int *output_eof;
        /** the task associated with the filter chain */
        struct task task;
 };
index 2510e49..5b87b08 100644 (file)
@@ -117,6 +117,10 @@ void filter_pre_select(__a_unused struct sched *s, struct task *t)
        char *ib;
        size_t *loaded;
        int conv, conv_total = 0;
+
+       t->ret = -E_FC_EOF;
+       if (*fc->output_eof)
+               goto err_out;
 again:
        ib = fc->inbuf;
        loaded = fc->in_loaded;
@@ -128,7 +132,7 @@ again:
                                fc, *loaded, fn->filter->name);
                        t->ret = fn->filter->convert(ib, *loaded, fn);
                        if (t->ret < 0)
-                               return;
+                               goto err_out;
                        call_callbacks(fn, ib, t->ret, fn->buf + old_fn_loaded,
                                fn->loaded - old_fn_loaded);
                        *loaded -= t->ret;
@@ -143,18 +147,19 @@ again:
                loaded = &fn->loaded;
        }
        conv_total += conv;
-       PARA_DEBUG_LOG("reader eof: %d, eof: %d out_loaded: %d, conv: %d, conv_total: %d\n", *fc->reader_eof,
-               fc->eof, *fc->out_loaded, conv, conv_total);
+       PARA_DEBUG_LOG("eof (in/out/fc): %d/%d/%d out_loaded: %d, conv: %d, conv_total: %d\n", *fc->input_eof,
+               *fc->output_eof, fc->eof, *fc->out_loaded, conv, conv_total);
        if (conv)
                goto again;
        t->ret = 1;
-       if (!*fc->reader_eof)
+       if (!*fc->input_eof)
                return;
        if (*fc->out_loaded)
                return;
        if (*fc->in_loaded && conv_total)
                return;
        t->ret = -E_FC_EOF;
+err_out:
        fc->eof = 1;
 }
 
index 481e42a..e1c4517 100644 (file)
--- a/oggdec.c
+++ b/oggdec.c
@@ -57,7 +57,7 @@ static size_t cb_read(void *buf, size_t size, size_t nmemb, void *datasource)
 //     PARA_DEBUG_LOG("pod = %p\n", pod);
 //     PARA_DEBUG_LOG("vorbis requests %d bytes, have %d\n", size * nmemb, have);
        if (pod->inbuf_len < size) {
-               if (*fn->fc->reader_eof)
+               if (*fn->fc->input_eof)
                        return 0;
                errno = EAGAIN;
                return -1;
@@ -134,7 +134,7 @@ static ssize_t ogg_convert(char *inbuffer, size_t len, struct filter_node *fn)
 
        if (!pod->vf) {
                int ib = 1024 * conf->initial_buffer_arg; /* initial buffer */
-               if (len <ib && !*fn->fc->reader_eof) {
+               if (len <ib && !*fn->fc->input_eof) {
                        PARA_INFO_LOG("initial input buffer %zd/%d, waiting for more data\n",
                                len, ib);
                        return 0;
@@ -168,7 +168,7 @@ again:
        if (ret < 0)
                return -E_OGGDEC_BADLINK;
        fn->loaded += ret;
-       if (!*fn->fc->reader_eof && fn->loaded < fn->bufsize)
+       if (!*fn->fc->input_eof && fn->loaded < fn->bufsize)
                goto again;
        return pod->converted;
 }
diff --git a/recv.c b/recv.c
index f872fec..cf178d2 100644 (file)
--- a/recv.c
+++ b/recv.c
@@ -58,79 +58,6 @@ static void *parse_config(int argc, char *argv[], int *receiver_num)
        return check_receiver_arg(conf.receiver_arg, receiver_num);
 }
 
-#if 0
-int main(int argc, char *argv[])
-{
-       int ret, eof = 0, max, r_opened = 0, receiver_num;
-       struct timeval timeout;
-       struct  receiver *r = NULL;
-       fd_set rfds, wfds;
-       struct receiver_node rn;
-
-       memset(&rn, 0, sizeof(struct receiver_node));
-       for (ret = 0; receivers[ret].name; ret++)
-               receivers[ret].init(&receivers[ret]);
-       ret = -E_RECV_SYNTAX;
-       rn.conf = parse_config(argc, argv, &receiver_num);
-       if (!rn.conf) {
-               PARA_EMERG_LOG("%s", "parse failed\n");
-               goto out;
-       }
-       r = &receivers[receiver_num];
-       rn.receiver = r;
-       ret = r->open(&rn);
-       if (ret < 0)
-               goto out;
-       r_opened = 1;
-recv:
-       FD_ZERO(&rfds);
-       FD_ZERO(&wfds);
-       timeout.tv_sec = 0;
-       timeout.tv_usec = 999 * 1000;
-       max = -1;
-       ret = r->pre_select(&rn, &rfds, &wfds, &timeout);
-       max = PARA_MAX(max, ret);
-
-       PARA_DEBUG_LOG("timeout: %lums, max: %d\n", tv2ms(&timeout), max);
-       ret = para_select(max + 1, &rfds, &wfds, &timeout);
-       if (ret < 0) {
-               ret = -E_RECV_SELECT;
-               goto out;
-       }
-       ret = r->post_select(&rn, ret, &rfds, &wfds);
-       if (ret < 0)
-               goto out;
-       if (!ret)
-               eof = 1;
-       if (!rn.loaded) {
-               if (eof)
-                       goto out;
-               goto recv;
-       }
-       ret = write(STDOUT_FILENO, rn.buf, rn.loaded);
-       PARA_DEBUG_LOG("wrote %d/%zd\n", ret, rn.loaded);
-       if (ret < 0) {
-               ret = -E_WRITE_STDOUT;
-               goto out;
-       }
-       if (ret != rn.loaded) {
-               PARA_INFO_LOG("short write %d/%zd\n", ret, rn.loaded);
-               memmove(rn.buf, rn.buf + ret, rn.loaded - ret);
-       }
-       rn.loaded -= ret;
-       if (rn.loaded || !eof)
-               goto recv;
-out:
-       if (r_opened)
-               r->close(&rn);
-       if (r)
-               r->shutdown();
-       if (ret < 0)
-               PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret));
-       return ret;
-}
-#endif
-
 void rn_event_handler(struct task *t)
 {
        PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret));
@@ -168,7 +95,7 @@ int main(int argc, char *argv[])
        stdout_set_defaults(&sot);
        sot.buf = rn.buf;
        sot.loaded = &rn.loaded;
-       sot.eof = &rn.eof;
+       sot.input_eof = &rn.eof;
        register_task(&sot.task);
 
        rn.task.private_data = &rn;
diff --git a/stdin.c b/stdin.c
index ebfa6cc..22f1dd9 100644 (file)
--- a/stdin.c
+++ b/stdin.c
@@ -46,6 +46,7 @@ void stdin_set_defaults(struct stdin_task *sit)
 {
        sit->bufsize = 16 * 1024,
        sit->loaded = 0,
+       sit->eof = 0,
        sit->task.flags = 0,
        sit->task.pre_select = stdin_pre_select;
        sit->task.post_select = stdin_post_select;
index af59807..1c60669 100644 (file)
--- a/stdout.c
+++ b/stdout.c
@@ -25,7 +25,7 @@ void stdout_post_select(struct sched *s, struct task *t)
 
        t->ret = 1;
        if (!sot->check_fd) {
-               if (*sot->eof)
+               if (*sot->input_eof)
                        t->ret = -E_STDOUT_EOF;
                return;
        }
@@ -53,5 +53,6 @@ void stdout_set_defaults(struct stdout_task *sot)
        sot->task.post_select = stdout_post_select;
        sot->task.event_handler = stdout_default_event_handler;
        sot->task.flags = 0;
+       sot->eof = 0;
        sprintf(sot->task.status, "stdout writer");
 }
index e467f41..5e45ace 100644 (file)
--- a/stdout.h
+++ b/stdout.h
@@ -2,7 +2,8 @@ struct stdout_task {
        char *buf;
        size_t *bufsize;
        size_t *loaded;
-       int *eof;
+       int *input_eof;
+       int eof;
        struct task task;
        int check_fd;
 };
diff --git a/write.c b/write.c
index 13b15a1..a3cd4b5 100644 (file)
--- a/write.c
+++ b/write.c
@@ -150,9 +150,8 @@ static struct writer_node_group *check_args(void)
        }
        ret = 1;
 out:
-       if (ret > 0) {
+       if (ret > 0)
                return wng;
-       }
        free(wng);
        return NULL;
 }
@@ -165,8 +164,7 @@ static void idt_event_handler(struct task *t)
        unregister_task(t);
        wng->buf = sit.buf;
        wng->loaded = &sit.loaded;
-       wng->eof = &sit.eof;
-       sprintf(wng->task.status, "%s", "writer node group");
+       wng->input_eof = &sit.eof;
        ret = wng_open(wng);
        if (ret < 0) {
                PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret));
diff --git a/write.h b/write.h
index e6e0a76..04ad09d 100644 (file)
--- a/write.h
+++ b/write.h
@@ -85,19 +85,20 @@ void (*shutdown)(struct writer_node *);
  * describes a set of writer nodes that all write the same stream.
  */
 struct writer_node_group {
-/** number of nodes belonging to this group */
-unsigned num_writers;
-/** array of pointers to the corresponding writer nodes */
-struct writer_node *writer_nodes;
-/** keeps track of how many bytes have been written by each node */
-int *written;
-/** the maximum of the chunk_bytes values of the writer nodes in this group */
-size_t max_chunk_bytes;
-/** non-zero if end of file was encountered */
-int *eof;
-char *buf;
-size_t *loaded;
-struct task task;
+       /** number of nodes belonging to this group */
+       unsigned num_writers;
+       /** array of pointers to the corresponding writer nodes */
+       struct writer_node *writer_nodes;
+       /** keeps track of how many bytes have been written by each node */
+       int *written;
+       /** the maximum of the chunk_bytes values of the writer nodes in this group */
+       size_t max_chunk_bytes;
+       /** non-zero if end of file was encountered */
+       int *input_eof;
+       int eof;
+       char *buf;
+       size_t *loaded;
+       struct task task;
 };
 
 /** loop over each writer node in a writer group */
index 0b2772a..6c0faa7 100644 (file)
@@ -45,7 +45,7 @@ static void wng_post_select(struct sched *s, struct task *t)
                        min_written = PARA_MIN(min_written, t->ret);
        }
        *g->loaded -= min_written;
-       if (!*g->loaded && *g->eof)
+       if (!*g->loaded && *g->input_eof)
                t->ret = -E_WNG_EOF;
        else
                t->ret = 1;
@@ -72,6 +72,8 @@ int wng_open(struct writer_node_group *g)
                wn->task.private_data = wn;
                register_task(&wn->task);
        }
+       sprintf(g->task.status, "%s", "writer node group");
+       g->eof = 0;
        register_task(&g->task);
 out:
        return ret;