From: Andre Date: Tue, 30 May 2006 00:49:10 +0000 (+0200) Subject: more audiod fixes X-Git-Tag: v0.2.14~96 X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=commitdiff_plain;h=f23be61d476e3f354e31072da0e0e77050a8dc56 more audiod fixes introduce output_eof in struct receiver_node to give the receivers a chance to see if the connected consumer of the receiver data died in case of an error. Some eof fixes and cleanups also on the writer side. We're coming closer. --- diff --git a/audiod.c b/audiod.c index 56014cad..0c8ee287 100644 --- a/audiod.c +++ b/audiod.c @@ -610,12 +610,13 @@ static void open_filters(int slot_num) s->fc->inbuf = s->receiver_node->buf; s->fc->in_loaded = &s->receiver_node->loaded; s->fc->input_eof = &s->receiver_node->eof; - s->fc->task.pre_select = filter_pre_select; s->fc->task.event_handler = filter_event_handler; s->fc->task.private_data = s->fc; s->fc->task.flags = 0; s->fc->eof = 0; + + s->receiver_node->output_eof = &s->fc->eof; sprintf(s->fc->task.status, "filter chain"); for (i = 0; i < nf; i++) { struct filter_node *fn = para_calloc(sizeof(struct filter_node)); @@ -661,8 +662,20 @@ static struct filter_node *find_filter_node(int slot_num, int format, int filter static void wng_event_handler(struct task *t) { - PARA_INFO_LOG("%s\n", PARA_STRERROR(-t->ret)); - unregister_task(t); + struct writer_node_group *wng = t->private_data; + int i; + + wng_unregister(wng); + FOR_EACH_SLOT(i) { + struct slot_info *s = &slot[i]; + if (s->wng != wng) + continue; +// if (s->fc) +// s->fc->eof = 1; +// if (s->receiver_node) +// s->receiver_node->eof = 1; + PARA_INFO_LOG("slot %d: %s\n", i, PARA_STRERROR(-t->ret)); + } } static void open_writers(int slot_num) @@ -884,7 +897,6 @@ static void try_to_close_slot(int slot_num) return; PARA_INFO_LOG("closing slot %d \n", slot_num); wng_close(s->wng); - wng_destroy(s->wng); close_filters(s->fc); free(s->fc); close_receiver(slot_num); diff --git a/dccp_recv.c b/dccp_recv.c index deca7162..a9e30e8e 100644 --- a/dccp_recv.c +++ b/dccp_recv.c @@ -133,6 +133,11 @@ static void dccp_recv_post_select(struct sched *s, struct task *t) struct receiver_node *rn = t->private_data; struct private_dccp_recv_data *pdd = rn->private_data; + t->ret = -E_DCCP_RECV_EOF; + if (rn->output_eof && *rn->output_eof) { + rn->eof = 1; + return; + } t->ret = 1; if (!s->select_ret || !pdd || !FD_ISSET(pdd->fd, &s->rfds)) return; /* nothing to do */ diff --git a/filter_chain.c b/filter_chain.c index 4797b256..db0a4c00 100644 --- a/filter_chain.c +++ b/filter_chain.c @@ -141,7 +141,8 @@ again: *loaded -= t->ret; conv += t->ret; if (*loaded && t->ret) { - PARA_INFO_LOG("moving %zd bytes in input buffer for %s filter\n", + PARA_DEBUG_LOG("moving %zd bytes in input " + "buffer for %s filter\n", *loaded, fn->filter->name); memmove(ib, ib + t->ret, *loaded); } @@ -150,8 +151,10 @@ again: loaded = &fn->loaded; } conv_total += conv; -// PARA_INFO_LOG("eof (in/out/fc): %d/%d/%d out_loaded: %d, conv: %d, conv_total: %d\n", *fc->input_eof, -// *fc->output_eof, fc->eof, *fc->out_loaded, conv, conv_total); + PARA_DEBUG_LOG("eof (in/out/fc): %d/%d/%d out_loaded: %d, " + "conv: %d, conv_total: %d\n", *fc->input_eof, + fc->output_eof? *fc->output_eof : -42, + fc->eof, *fc->out_loaded, conv, conv_total); if (conv) goto again; t->ret = 1; diff --git a/http_recv.c b/http_recv.c index b566acf3..a681a003 100644 --- a/http_recv.c +++ b/http_recv.c @@ -110,6 +110,11 @@ static void http_recv_post_select(struct sched *s, struct task *t) struct receiver_node *rn = t->private_data; struct private_http_recv_data *phd = rn->private_data; + t->ret = -E_HTTP_RECV_EOF; + if (rn->output_eof && *rn->output_eof) { + rn->eof = 1; + return; + } t->ret = 1; if (!s->select_ret) /* we're not interested in timeouts */ return; diff --git a/ortp_recv.c b/ortp_recv.c index 9c901364..d8432abc 100644 --- a/ortp_recv.c +++ b/ortp_recv.c @@ -106,6 +106,11 @@ static void ortp_recv_post_select(struct sched *s, struct task *t) unsigned chunk_time; // PARA_INFO_LOG("rn: %p, pord: %p, session: %p\n", rn, pord, pord->session); + t->ret = -E_ORTP_RECV_EOF; + if (rn->output_eof && *rn->output_eof) { + rn->eof = 1; + return; + } t->ret = 1; if (pord->start.tv_sec) if (tv_diff(&s->now, &pord->next_chunk, NULL) < 0) diff --git a/recv.h b/recv.h index 3c8b543b..7fb45286 100644 --- a/recv.h +++ b/recv.h @@ -32,6 +32,7 @@ struct receiver_node { void *private_data; /** set to 1 if end of file is reached */ int eof; + int *output_eof; /** pointer to the configuration data for this instance */ void *conf; /** the task associated with this instance */ diff --git a/sched.c b/sched.c index ea79a3e8..9356dd0c 100644 --- a/sched.c +++ b/sched.c @@ -32,7 +32,7 @@ static void sched_post_select(struct sched *s) list_for_each_entry_safe(t, tmp, &post_select_list, post_select_node) { t->post_select(s, t); -// PARA_INFO_LOG("%s \n", t->status); +// PARA_INFO_LOG("%s: %d\n", t->status, t->ret); if (t->ret > 0 || !t->event_handler) continue; t->event_handler(t); diff --git a/write.c b/write.c index e82e0be0..fc50040a 100644 --- a/write.c +++ b/write.c @@ -168,7 +168,6 @@ static void wng_event_handler(struct task *t) PARA_INFO_LOG("%s\n", PARA_STRERROR(-t->ret)); unregister_task(t); wng_close(g); - wng_destroy(g); } diff --git a/write_common.c b/write_common.c index 39f124f8..50784cec 100644 --- a/write_common.c +++ b/write_common.c @@ -37,8 +37,10 @@ static void wng_post_select(__a_unused struct sched *s, struct task *t) FOR_EACH_WRITER_NODE(i, g) { struct writer_node *wn = &g->writer_nodes[i]; t->ret = wn->task.ret; - if (t->ret < 0) + if (t->ret < 0) { + g->eof = 1; return; + } if (!i) min_written = t->ret; else @@ -67,8 +69,6 @@ int wng_open(struct writer_node_group *g) goto out; wn->chunk_bytes = ret; g->max_chunk_bytes = PARA_MAX(g->max_chunk_bytes, ret); - PARA_DEBUG_LOG("pre_select: %p\n", &wn->writer->pre_select); - PARA_DEBUG_LOG("post_select: %p\n", &wn->writer->post_select); wn->task.pre_select = wn->writer->pre_select; wn->task.post_select = wn->writer->post_select; wn->task.private_data = wn; @@ -81,13 +81,15 @@ out: return ret; } -void wng_destroy(struct writer_node_group *g) +void wng_unregister(struct writer_node_group *g) { - if (!g) - return; - free(g->written); - free(g->writer_nodes); - free(g); + int i; + + FOR_EACH_WRITER_NODE(i, g) { + struct writer_node *wn = &g->writer_nodes[i]; + unregister_task(&wn->task); + } + unregister_task(&g->task); } void wng_close(struct writer_node_group *g) @@ -99,9 +101,11 @@ void wng_close(struct writer_node_group *g) PARA_NOTICE_LOG("closing wng with %d writer(s)\n", g->num_writers); FOR_EACH_WRITER_NODE(i, g) { struct writer_node *wn = &g->writer_nodes[i]; - unregister_task(&wn->task); wn->writer->close(wn); } + free(g->written); + free(g->writer_nodes); + free(g); } struct writer_node_group *wng_new(unsigned num_writers) diff --git a/write_common.h b/write_common.h index d9a8b590..cdc2b66c 100644 --- a/write_common.h +++ b/write_common.h @@ -22,7 +22,7 @@ int wng_write(struct writer_node_group *g, char *buf, size_t *loaded); int wng_open(struct writer_node_group *g); void wng_close(struct writer_node_group *g); struct writer_node_group *wng_new(unsigned num_writers); -void wng_destroy(struct writer_node_group *g); +void wng_unregister(struct writer_node_group *g); void init_supported_writers(void); void *check_writer_arg(char *wa, int *writer_num); struct writer_node_group *setup_default_wng(void);