]> git.tuebingen.mpg.de Git - paraslash.git/commitdiff
more audiod fixes
authorAndre <maan@p133.(none)>
Tue, 30 May 2006 00:49:10 +0000 (02:49 +0200)
committerAndre <maan@p133.(none)>
Tue, 30 May 2006 00:49:10 +0000 (02:49 +0200)
introduce output_eof in struct receiver_node to give the receivers
a chance to see if the connected consumer of the receiver data
died in case of an error.

Some eof fixes and cleanups also on the writer side.

We're coming closer.

audiod.c
dccp_recv.c
filter_chain.c
http_recv.c
ortp_recv.c
recv.h
sched.c
write.c
write_common.c
write_common.h

index 56014cad0e832931613abc6ac879dcde5c67c9b7..0c8ee2875873ef5c86b392448d0c80e6ec6e4540 100644 (file)
--- a/audiod.c
+++ b/audiod.c
@@ -610,12 +610,13 @@ static void open_filters(int slot_num)
        s->fc->inbuf = s->receiver_node->buf;
        s->fc->in_loaded = &s->receiver_node->loaded;
        s->fc->input_eof = &s->receiver_node->eof;
-
        s->fc->task.pre_select = filter_pre_select;
        s->fc->task.event_handler = filter_event_handler;
        s->fc->task.private_data = s->fc;
        s->fc->task.flags = 0;
        s->fc->eof = 0;
+
+       s->receiver_node->output_eof = &s->fc->eof;
        sprintf(s->fc->task.status, "filter chain");
        for (i = 0; i < nf; i++) {
                struct filter_node *fn = para_calloc(sizeof(struct filter_node));
@@ -661,8 +662,20 @@ static struct filter_node *find_filter_node(int slot_num, int format, int filter
 
 static void wng_event_handler(struct task *t)
 {
-       PARA_INFO_LOG("%s\n", PARA_STRERROR(-t->ret));
-       unregister_task(t);
+       struct writer_node_group *wng = t->private_data;
+       int i;
+
+       wng_unregister(wng);
+       FOR_EACH_SLOT(i) {
+               struct slot_info *s = &slot[i];
+               if (s->wng != wng)
+                       continue;
+//             if (s->fc)
+//                     s->fc->eof = 1;
+//             if (s->receiver_node)
+//                     s->receiver_node->eof = 1;
+               PARA_INFO_LOG("slot %d: %s\n", i, PARA_STRERROR(-t->ret));
+       }
 }
 
 static void open_writers(int slot_num)
@@ -884,7 +897,6 @@ static void try_to_close_slot(int slot_num)
                return;
        PARA_INFO_LOG("closing slot %d \n", slot_num);
        wng_close(s->wng);
-       wng_destroy(s->wng);
        close_filters(s->fc);
        free(s->fc);
        close_receiver(slot_num);
index deca7162bfecc40e93dd0d7431f7bae9218f23ec..a9e30e8e5676006f4e35f96668c6dc831a7cab7c 100644 (file)
@@ -133,6 +133,11 @@ static void dccp_recv_post_select(struct sched *s, struct task *t)
        struct receiver_node *rn = t->private_data;
        struct private_dccp_recv_data *pdd = rn->private_data;
 
+       t->ret = -E_DCCP_RECV_EOF;
+       if (rn->output_eof && *rn->output_eof) {
+               rn->eof = 1;
+               return;
+       }
        t->ret = 1;
        if (!s->select_ret || !pdd || !FD_ISSET(pdd->fd, &s->rfds))
                return; /* nothing to do */
index 4797b256caf9d6af2c868ee232f733aad19ec18f..db0a4c00bd3e50e127e00c3dccc0cbc0236e3421 100644 (file)
@@ -141,7 +141,8 @@ again:
                        *loaded -= t->ret;
                        conv += t->ret;
                        if (*loaded && t->ret) {
-                               PARA_INFO_LOG("moving %zd bytes in input buffer for %s filter\n",
+                               PARA_DEBUG_LOG("moving %zd bytes in input "
+                                       "buffer for %s filter\n",
                                        *loaded,  fn->filter->name);
                                memmove(ib, ib + t->ret, *loaded);
                        }
@@ -150,8 +151,10 @@ again:
                loaded = &fn->loaded;
        }
        conv_total += conv;
-//     PARA_INFO_LOG("eof (in/out/fc): %d/%d/%d out_loaded: %d, conv: %d, conv_total: %d\n", *fc->input_eof,
-//             *fc->output_eof, fc->eof, *fc->out_loaded, conv, conv_total);
+       PARA_DEBUG_LOG("eof (in/out/fc): %d/%d/%d out_loaded: %d, "
+               "conv: %d, conv_total: %d\n", *fc->input_eof,
+               fc->output_eof? *fc->output_eof : -42,
+               fc->eof, *fc->out_loaded, conv, conv_total);
        if (conv)
                goto again;
        t->ret = 1;
index b566acf3c0612a48d5796ca01dae5359bb93386e..a681a003dadf83f4d8ce3763f794e5215e3a8de1 100644 (file)
@@ -110,6 +110,11 @@ static void http_recv_post_select(struct sched *s, struct task *t)
        struct receiver_node *rn = t->private_data;
        struct private_http_recv_data *phd = rn->private_data;
 
+       t->ret = -E_HTTP_RECV_EOF;
+       if (rn->output_eof && *rn->output_eof) {
+               rn->eof = 1;
+               return;
+       }
        t->ret = 1;
        if (!s->select_ret) /* we're not interested in timeouts */
                return;
index 9c9013648e794b12d6c6f0750eedc079048ebaf7..d8432abc0b9dbfdc74c2f6279babfb265b318f11 100644 (file)
@@ -106,6 +106,11 @@ static void ortp_recv_post_select(struct sched *s, struct task *t)
        unsigned chunk_time;
 
 //     PARA_INFO_LOG("rn: %p, pord: %p, session: %p\n", rn, pord, pord->session);
+       t->ret = -E_ORTP_RECV_EOF;
+       if (rn->output_eof && *rn->output_eof) {
+               rn->eof = 1;
+               return;
+       }
        t->ret = 1;
        if (pord->start.tv_sec)
                if (tv_diff(&s->now, &pord->next_chunk, NULL) < 0)
diff --git a/recv.h b/recv.h
index 3c8b543bda88f50d57bb1cc975dd95085d19a392..7fb45286cd61b93fb88f3ac2a0936cbfea5a6dfd 100644 (file)
--- a/recv.h
+++ b/recv.h
@@ -32,6 +32,7 @@ struct receiver_node {
        void *private_data;
        /** set to 1 if end of file is reached */
        int eof;
+       int *output_eof;
        /** pointer to the configuration data for this instance */
        void *conf;
        /** the task associated with this instance */
diff --git a/sched.c b/sched.c
index ea79a3e854ca61bd9bf24b09be1a0e8aebf74b1b..9356dd0c4d14be11fb1284eae7227a8489acf907 100644 (file)
--- a/sched.c
+++ b/sched.c
@@ -32,7 +32,7 @@ static void sched_post_select(struct sched *s)
 
        list_for_each_entry_safe(t, tmp, &post_select_list, post_select_node) {
                t->post_select(s, t);
-//             PARA_INFO_LOG("%s \n", t->status);
+//             PARA_INFO_LOG("%s: %d\n", t->status, t->ret);
                if (t->ret > 0 || !t->event_handler)
                        continue;
                t->event_handler(t);
diff --git a/write.c b/write.c
index e82e0be04aadd0a243e4c68d919cb7aa588f5c08..fc50040a28c07df608c551a56824188b7a383bf9 100644 (file)
--- a/write.c
+++ b/write.c
@@ -168,7 +168,6 @@ static void wng_event_handler(struct task *t)
        PARA_INFO_LOG("%s\n", PARA_STRERROR(-t->ret));
        unregister_task(t);
        wng_close(g);
-       wng_destroy(g);
 }
 
 
index 39f124f832dddb169ace064443629bead6be0f7e..50784cec6b8b7815c6969eb85b8088e35dd83dd1 100644 (file)
@@ -37,8 +37,10 @@ static void wng_post_select(__a_unused struct sched *s, struct task *t)
        FOR_EACH_WRITER_NODE(i, g) {
                struct writer_node *wn = &g->writer_nodes[i];
                t->ret = wn->task.ret;
-               if (t->ret < 0)
+               if (t->ret < 0) {
+                       g->eof = 1;
                        return;
+               }
                if (!i)
                        min_written = t->ret;
                else
@@ -67,8 +69,6 @@ int wng_open(struct writer_node_group *g)
                        goto out;
                wn->chunk_bytes = ret;
                g->max_chunk_bytes = PARA_MAX(g->max_chunk_bytes, ret);
-               PARA_DEBUG_LOG("pre_select: %p\n", &wn->writer->pre_select);
-               PARA_DEBUG_LOG("post_select: %p\n", &wn->writer->post_select);
                wn->task.pre_select = wn->writer->pre_select;
                wn->task.post_select = wn->writer->post_select;
                wn->task.private_data = wn;
@@ -81,13 +81,15 @@ out:
        return ret;
 }
 
-void wng_destroy(struct writer_node_group *g)
+void wng_unregister(struct writer_node_group *g)
 {
-       if (!g)
-               return;
-       free(g->written);
-       free(g->writer_nodes);
-       free(g);
+       int i;
+
+       FOR_EACH_WRITER_NODE(i, g) {
+               struct writer_node *wn = &g->writer_nodes[i];
+               unregister_task(&wn->task);
+       }
+       unregister_task(&g->task);
 }
 
 void wng_close(struct writer_node_group *g)
@@ -99,9 +101,11 @@ void wng_close(struct writer_node_group *g)
        PARA_NOTICE_LOG("closing wng with %d writer(s)\n", g->num_writers);
        FOR_EACH_WRITER_NODE(i, g) {
                struct writer_node *wn = &g->writer_nodes[i];
-               unregister_task(&wn->task);
                wn->writer->close(wn);
        }
+       free(g->written);
+       free(g->writer_nodes);
+       free(g);
 }
 
 struct writer_node_group *wng_new(unsigned num_writers)
index d9a8b59095f278c76b82e520ec4f72ff51c14010..cdc2b66c802c2de37379701c0422dfc165dd4dd0 100644 (file)
@@ -22,7 +22,7 @@ int wng_write(struct writer_node_group *g, char *buf, size_t *loaded);
 int wng_open(struct writer_node_group *g);
 void wng_close(struct writer_node_group *g);
 struct writer_node_group *wng_new(unsigned num_writers);
-void wng_destroy(struct writer_node_group *g);
+void wng_unregister(struct writer_node_group *g);
 void init_supported_writers(void);
 void *check_writer_arg(char *wa, int *writer_num);
 struct writer_node_group *setup_default_wng(void);