more audiod fixes
authorAndre <maan@p133.(none)>
Tue, 30 May 2006 00:49:10 +0000 (02:49 +0200)
committerAndre <maan@p133.(none)>
Tue, 30 May 2006 00:49:10 +0000 (02:49 +0200)
introduce output_eof in struct receiver_node to give the receivers
a chance to see if the connected consumer of the receiver data
died in case of an error.

Some eof fixes and cleanups also on the writer side.

We're coming closer.

audiod.c
dccp_recv.c
filter_chain.c
http_recv.c
ortp_recv.c
recv.h
sched.c
write.c
write_common.c
write_common.h

index 56014ca..0c8ee28 100644 (file)
--- a/audiod.c
+++ b/audiod.c
@@ -610,12 +610,13 @@ static void open_filters(int slot_num)
        s->fc->inbuf = s->receiver_node->buf;
        s->fc->in_loaded = &s->receiver_node->loaded;
        s->fc->input_eof = &s->receiver_node->eof;
-
        s->fc->task.pre_select = filter_pre_select;
        s->fc->task.event_handler = filter_event_handler;
        s->fc->task.private_data = s->fc;
        s->fc->task.flags = 0;
        s->fc->eof = 0;
+
+       s->receiver_node->output_eof = &s->fc->eof;
        sprintf(s->fc->task.status, "filter chain");
        for (i = 0; i < nf; i++) {
                struct filter_node *fn = para_calloc(sizeof(struct filter_node));
@@ -661,8 +662,20 @@ static struct filter_node *find_filter_node(int slot_num, int format, int filter
 
 static void wng_event_handler(struct task *t)
 {
-       PARA_INFO_LOG("%s\n", PARA_STRERROR(-t->ret));
-       unregister_task(t);
+       struct writer_node_group *wng = t->private_data;
+       int i;
+
+       wng_unregister(wng);
+       FOR_EACH_SLOT(i) {
+               struct slot_info *s = &slot[i];
+               if (s->wng != wng)
+                       continue;
+//             if (s->fc)
+//                     s->fc->eof = 1;
+//             if (s->receiver_node)
+//                     s->receiver_node->eof = 1;
+               PARA_INFO_LOG("slot %d: %s\n", i, PARA_STRERROR(-t->ret));
+       }
 }
 
 static void open_writers(int slot_num)
@@ -884,7 +897,6 @@ static void try_to_close_slot(int slot_num)
                return;
        PARA_INFO_LOG("closing slot %d \n", slot_num);
        wng_close(s->wng);
-       wng_destroy(s->wng);
        close_filters(s->fc);
        free(s->fc);
        close_receiver(slot_num);
index deca716..a9e30e8 100644 (file)
@@ -133,6 +133,11 @@ static void dccp_recv_post_select(struct sched *s, struct task *t)
        struct receiver_node *rn = t->private_data;
        struct private_dccp_recv_data *pdd = rn->private_data;
 
+       t->ret = -E_DCCP_RECV_EOF;
+       if (rn->output_eof && *rn->output_eof) {
+               rn->eof = 1;
+               return;
+       }
        t->ret = 1;
        if (!s->select_ret || !pdd || !FD_ISSET(pdd->fd, &s->rfds))
                return; /* nothing to do */
index 4797b25..db0a4c0 100644 (file)
@@ -141,7 +141,8 @@ again:
                        *loaded -= t->ret;
                        conv += t->ret;
                        if (*loaded && t->ret) {
-                               PARA_INFO_LOG("moving %zd bytes in input buffer for %s filter\n",
+                               PARA_DEBUG_LOG("moving %zd bytes in input "
+                                       "buffer for %s filter\n",
                                        *loaded,  fn->filter->name);
                                memmove(ib, ib + t->ret, *loaded);
                        }
@@ -150,8 +151,10 @@ again:
                loaded = &fn->loaded;
        }
        conv_total += conv;
-//     PARA_INFO_LOG("eof (in/out/fc): %d/%d/%d out_loaded: %d, conv: %d, conv_total: %d\n", *fc->input_eof,
-//             *fc->output_eof, fc->eof, *fc->out_loaded, conv, conv_total);
+       PARA_DEBUG_LOG("eof (in/out/fc): %d/%d/%d out_loaded: %d, "
+               "conv: %d, conv_total: %d\n", *fc->input_eof,
+               fc->output_eof? *fc->output_eof : -42,
+               fc->eof, *fc->out_loaded, conv, conv_total);
        if (conv)
                goto again;
        t->ret = 1;
index b566acf..a681a00 100644 (file)
@@ -110,6 +110,11 @@ static void http_recv_post_select(struct sched *s, struct task *t)
        struct receiver_node *rn = t->private_data;
        struct private_http_recv_data *phd = rn->private_data;
 
+       t->ret = -E_HTTP_RECV_EOF;
+       if (rn->output_eof && *rn->output_eof) {
+               rn->eof = 1;
+               return;
+       }
        t->ret = 1;
        if (!s->select_ret) /* we're not interested in timeouts */
                return;
index 9c90136..d8432ab 100644 (file)
@@ -106,6 +106,11 @@ static void ortp_recv_post_select(struct sched *s, struct task *t)
        unsigned chunk_time;
 
 //     PARA_INFO_LOG("rn: %p, pord: %p, session: %p\n", rn, pord, pord->session);
+       t->ret = -E_ORTP_RECV_EOF;
+       if (rn->output_eof && *rn->output_eof) {
+               rn->eof = 1;
+               return;
+       }
        t->ret = 1;
        if (pord->start.tv_sec)
                if (tv_diff(&s->now, &pord->next_chunk, NULL) < 0)
diff --git a/recv.h b/recv.h
index 3c8b543..7fb4528 100644 (file)
--- a/recv.h
+++ b/recv.h
@@ -32,6 +32,7 @@ struct receiver_node {
        void *private_data;
        /** set to 1 if end of file is reached */
        int eof;
+       int *output_eof;
        /** pointer to the configuration data for this instance */
        void *conf;
        /** the task associated with this instance */
diff --git a/sched.c b/sched.c
index ea79a3e..9356dd0 100644 (file)
--- a/sched.c
+++ b/sched.c
@@ -32,7 +32,7 @@ static void sched_post_select(struct sched *s)
 
        list_for_each_entry_safe(t, tmp, &post_select_list, post_select_node) {
                t->post_select(s, t);
-//             PARA_INFO_LOG("%s \n", t->status);
+//             PARA_INFO_LOG("%s: %d\n", t->status, t->ret);
                if (t->ret > 0 || !t->event_handler)
                        continue;
                t->event_handler(t);
diff --git a/write.c b/write.c
index e82e0be..fc50040 100644 (file)
--- a/write.c
+++ b/write.c
@@ -168,7 +168,6 @@ static void wng_event_handler(struct task *t)
        PARA_INFO_LOG("%s\n", PARA_STRERROR(-t->ret));
        unregister_task(t);
        wng_close(g);
-       wng_destroy(g);
 }
 
 
index 39f124f..50784ce 100644 (file)
@@ -37,8 +37,10 @@ static void wng_post_select(__a_unused struct sched *s, struct task *t)
        FOR_EACH_WRITER_NODE(i, g) {
                struct writer_node *wn = &g->writer_nodes[i];
                t->ret = wn->task.ret;
-               if (t->ret < 0)
+               if (t->ret < 0) {
+                       g->eof = 1;
                        return;
+               }
                if (!i)
                        min_written = t->ret;
                else
@@ -67,8 +69,6 @@ int wng_open(struct writer_node_group *g)
                        goto out;
                wn->chunk_bytes = ret;
                g->max_chunk_bytes = PARA_MAX(g->max_chunk_bytes, ret);
-               PARA_DEBUG_LOG("pre_select: %p\n", &wn->writer->pre_select);
-               PARA_DEBUG_LOG("post_select: %p\n", &wn->writer->post_select);
                wn->task.pre_select = wn->writer->pre_select;
                wn->task.post_select = wn->writer->post_select;
                wn->task.private_data = wn;
@@ -81,13 +81,15 @@ out:
        return ret;
 }
 
-void wng_destroy(struct writer_node_group *g)
+void wng_unregister(struct writer_node_group *g)
 {
-       if (!g)
-               return;
-       free(g->written);
-       free(g->writer_nodes);
-       free(g);
+       int i;
+
+       FOR_EACH_WRITER_NODE(i, g) {
+               struct writer_node *wn = &g->writer_nodes[i];
+               unregister_task(&wn->task);
+       }
+       unregister_task(&g->task);
 }
 
 void wng_close(struct writer_node_group *g)
@@ -99,9 +101,11 @@ void wng_close(struct writer_node_group *g)
        PARA_NOTICE_LOG("closing wng with %d writer(s)\n", g->num_writers);
        FOR_EACH_WRITER_NODE(i, g) {
                struct writer_node *wn = &g->writer_nodes[i];
-               unregister_task(&wn->task);
                wn->writer->close(wn);
        }
+       free(g->written);
+       free(g->writer_nodes);
+       free(g);
 }
 
 struct writer_node_group *wng_new(unsigned num_writers)
index d9a8b59..cdc2b66 100644 (file)
@@ -22,7 +22,7 @@ int wng_write(struct writer_node_group *g, char *buf, size_t *loaded);
 int wng_open(struct writer_node_group *g);
 void wng_close(struct writer_node_group *g);
 struct writer_node_group *wng_new(unsigned num_writers);
-void wng_destroy(struct writer_node_group *g);
+void wng_unregister(struct writer_node_group *g);
 void init_supported_writers(void);
 void *check_writer_arg(char *wa, int *writer_num);
 struct writer_node_group *setup_default_wng(void);