From b0cad48a864fe3f621138e717ff025060c396fad Mon Sep 17 00:00:00 2001
From: Andre <maan@p133.(none)>
Date: Thu, 25 May 2006 15:53:00 +0200
Subject: [PATCH] introduce input_eof and ouput_eof

para_filter/para_audiod needs this. For example, it is pointless to
convert more audiod data if the writing application is no longer active.
---
 aacdec.c       |  2 +-
 alsa_writer.c  |  4 +--
 audiod.c       |  6 ++--
 filter.c       | 18 +++++++-----
 filter.h       |  4 ++-
 filter_chain.c | 13 ++++++---
 oggdec.c       |  6 ++--
 recv.c         | 75 +-------------------------------------------------
 stdin.c        |  1 +
 stdout.c       |  3 +-
 stdout.h       |  3 +-
 write.c        |  6 ++--
 write.h        | 27 +++++++++---------
 write_common.c |  4 ++-
 14 files changed, 56 insertions(+), 116 deletions(-)

diff --git a/aacdec.c b/aacdec.c
index 25facd53..fe046b70 100644
--- a/aacdec.c
+++ b/aacdec.c
@@ -61,7 +61,7 @@ static ssize_t aacdec(char *input_buffer, size_t len, struct filter_node *fn)
 
 	if (fn->loaded > fn->bufsize * 4 / 5)
 		return 0;
-	if (len < 1000 && !*fc->reader_eof)
+	if (len < 1000 && !*fc->input_eof)
 		return 0;
 
 	if (!padd->initialized) {
diff --git a/alsa_writer.c b/alsa_writer.c
index ad65c92f..f05dbbd0 100644
--- a/alsa_writer.c
+++ b/alsa_writer.c
@@ -140,7 +140,7 @@ static void alsa_write_pre_select(struct sched *s, struct task *t)
 	struct timeval diff;
 
 	t->ret = 0;
-	if (*wng->eof && *wng->loaded < pad->bytes_per_frame)
+	if (*wng->input_eof && *wng->loaded < pad->bytes_per_frame)
 		return;
 	t->ret = 1;
 	if (*wng->loaded < pad->bytes_per_frame)
@@ -165,7 +165,7 @@ static void alsa_write_post_select(struct sched *s, struct task *t)
 
 	t->ret = 0;
 	if (!frames) {
-		if (*wng->eof)
+		if (*wng->input_eof)
 			t->ret = *wng->loaded;
 		return;
 	}
diff --git a/audiod.c b/audiod.c
index 1d2d69c1..db4d478c 100644
--- a/audiod.c
+++ b/audiod.c
@@ -451,7 +451,6 @@ static void kill_stream_writer(int slot_num)
 		s->wpid, audio_formats[s->format], slot_num);
 	kill(s->wpid, SIGTERM);
 	s->wkilled = 1;
-	s->fci->error = 1;
 }
 
 static void set_restart_barrier(int format, struct timeval *now)
@@ -928,16 +927,15 @@ static void close_decoder_if_idle(int slot_num)
 		return;
 	if (!s->fci)
 		return;
-	if (!rn->eof && !s->fci->error && s->wpid > 0)
+	if (!rn->eof && !s->fc->eof && s->wpid > 0)
 		return;
-	if (!s->fci->error && s->wpid > 0) { /* eof */
+	if (!s->fci->eof && s->wpid > 0) { /* eof */
 		if (filter_io(s->fci) > 0)
 			return;
 		if (get_loaded_bytes(slot_num))
 			return;
 	}
 	if (s->write_fd > 0) {
-		PARA_INFO_LOG("err: %d\n", s->fci->error);
 		PARA_INFO_LOG("slot %d: closing write fd %d\n", slot_num,
 			s->write_fd);
 		close(s->write_fd);
diff --git a/filter.c b/filter.c
index 79ae4dcb..cfa2e94f 100644
--- a/filter.c
+++ b/filter.c
@@ -55,7 +55,7 @@ __printf_2_3 void para_log(int ll, const char* fmt,...)
 
 void filter_event_handler(struct task *t)
 {
-	PARA_ERROR_LOG("%s\n", PARA_STRERROR(-t->ret));
+	PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret));
 	unregister_task(t);
 }
 
@@ -81,7 +81,13 @@ static int init_filter_chain(void)
 
 	fc->inbuf = sit->buf;
 	fc->in_loaded = &sit->loaded;
-	fc->reader_eof = &sit->eof;
+	fc->input_eof = &sit->eof;
+	fc->eof = 0;
+	fc->output_eof = &sot->eof;
+	fc->task.private_data = fc;
+	fc->task.pre_select = filter_pre_select;
+	fc->task.event_handler = filter_event_handler;
+	sprintf(fc->task.status, "filter chain");
 
 	for (i = 0; i < conf.filter_given; i++) {
 		char *fa = conf.filter_arg[i];
@@ -99,10 +105,6 @@ static int init_filter_chain(void)
 	}
 	if (list_empty(&fc->filters))
 		return -E_NO_FILTERS;
-	fc->task.private_data = fc;
-	fc->task.pre_select = filter_pre_select;
-	fc->task.event_handler = filter_event_handler;
-	sprintf(fc->task.status, "filter chain");
 	open_filters();
 	return 1;
 }
@@ -153,15 +155,17 @@ int main(int argc, char *argv[])
 		goto out;
 
 	stdout_set_defaults(sot);
+	PARA_EMERG_LOG("fc->output_eof: %d\n", *fc->output_eof);
 	sot->buf = fc->outbuf;
 	sot->loaded = fc->out_loaded;
-	sot->eof = &fc->eof;
+	sot->input_eof = &fc->eof;
 
 	register_task(&sot->task);
 	register_task(&fc->task);
 	register_task(&sit->task);
 	s.default_timeout.tv_sec = 1;
 	s.default_timeout.tv_usec = 0;
+	PARA_EMERG_LOG("fc->output_eof: %d\n", *fc->output_eof);
 	ret = sched(&s);
 out:
 	free(sit->buf);
diff --git a/filter.h b/filter.h
index 10947b9e..45ade8d3 100644
--- a/filter.h
+++ b/filter.h
@@ -78,7 +78,9 @@ struct filter_chain {
 	/** non-zero if this filter wont' produce any more output */
 	int eof;
 	/** pointer to the eof flag of the receiving application */
-	int *reader_eof;
+	int *input_eof;
+	/** pointer to the eof flag of the writing application */
+	int *output_eof;
 	/** the task associated with the filter chain */
 	struct task task;
 };
diff --git a/filter_chain.c b/filter_chain.c
index 2510e49d..5b87b087 100644
--- a/filter_chain.c
+++ b/filter_chain.c
@@ -117,6 +117,10 @@ void filter_pre_select(__a_unused struct sched *s, struct task *t)
 	char *ib;
 	size_t *loaded;
 	int conv, conv_total = 0;
+
+	t->ret = -E_FC_EOF;
+	if (*fc->output_eof)
+		goto err_out;
 again:
 	ib = fc->inbuf;
 	loaded = fc->in_loaded;
@@ -128,7 +132,7 @@ again:
 				fc, *loaded, fn->filter->name);
 			t->ret = fn->filter->convert(ib, *loaded, fn);
 			if (t->ret < 0)
-				return;
+				goto err_out;
 			call_callbacks(fn, ib, t->ret, fn->buf + old_fn_loaded,
 				fn->loaded - old_fn_loaded);
 			*loaded -= t->ret;
@@ -143,18 +147,19 @@ again:
 		loaded = &fn->loaded;
 	}
 	conv_total += conv;
-	PARA_DEBUG_LOG("reader eof: %d, eof: %d out_loaded: %d, conv: %d, conv_total: %d\n", *fc->reader_eof,
-		fc->eof, *fc->out_loaded, conv, conv_total);
+	PARA_DEBUG_LOG("eof (in/out/fc): %d/%d/%d out_loaded: %d, conv: %d, conv_total: %d\n", *fc->input_eof,
+		*fc->output_eof, fc->eof, *fc->out_loaded, conv, conv_total);
 	if (conv)
 		goto again;
 	t->ret = 1;
-	if (!*fc->reader_eof)
+	if (!*fc->input_eof)
 		return;
 	if (*fc->out_loaded)
 		return;
 	if (*fc->in_loaded && conv_total)
 		return;
 	t->ret = -E_FC_EOF;
+err_out:
 	fc->eof = 1;
 }
 
diff --git a/oggdec.c b/oggdec.c
index 481e42ac..e1c4517e 100644
--- a/oggdec.c
+++ b/oggdec.c
@@ -57,7 +57,7 @@ static size_t cb_read(void *buf, size_t size, size_t nmemb, void *datasource)
 //	PARA_DEBUG_LOG("pod = %p\n", pod);
 //	PARA_DEBUG_LOG("vorbis requests %d bytes, have %d\n", size * nmemb, have);
 	if (pod->inbuf_len < size) {
-		if (*fn->fc->reader_eof)
+		if (*fn->fc->input_eof)
 			return 0;
 		errno = EAGAIN;
 		return -1;
@@ -134,7 +134,7 @@ static ssize_t ogg_convert(char *inbuffer, size_t len, struct filter_node *fn)
 
 	if (!pod->vf) {
 		int ib = 1024 * conf->initial_buffer_arg; /* initial buffer */
-		if (len <ib && !*fn->fc->reader_eof) {
+		if (len <ib && !*fn->fc->input_eof) {
 			PARA_INFO_LOG("initial input buffer %zd/%d, waiting for more data\n",
 				len, ib);
 			return 0;
@@ -168,7 +168,7 @@ again:
 	if (ret < 0)
 		return -E_OGGDEC_BADLINK;
 	fn->loaded += ret;
-	if (!*fn->fc->reader_eof && fn->loaded < fn->bufsize)
+	if (!*fn->fc->input_eof && fn->loaded < fn->bufsize)
 		goto again;
 	return pod->converted;
 }
diff --git a/recv.c b/recv.c
index f872fec8..cf178d20 100644
--- a/recv.c
+++ b/recv.c
@@ -58,79 +58,6 @@ static void *parse_config(int argc, char *argv[], int *receiver_num)
 	return check_receiver_arg(conf.receiver_arg, receiver_num);
 }
 
-#if 0
-int main(int argc, char *argv[])
-{
-	int ret, eof = 0, max, r_opened = 0, receiver_num;
-	struct timeval timeout;
-	struct	receiver *r = NULL;
-	fd_set rfds, wfds;
-	struct receiver_node rn;
-
-	memset(&rn, 0, sizeof(struct receiver_node));
-	for (ret = 0; receivers[ret].name; ret++)
-		receivers[ret].init(&receivers[ret]);
-	ret = -E_RECV_SYNTAX;
-	rn.conf = parse_config(argc, argv, &receiver_num);
-	if (!rn.conf) {
-		PARA_EMERG_LOG("%s", "parse failed\n");
-		goto out;
-	}
-	r = &receivers[receiver_num];
-	rn.receiver = r;
-	ret = r->open(&rn);
-	if (ret < 0)
-		goto out;
-	r_opened = 1;
-recv:
-	FD_ZERO(&rfds);
-	FD_ZERO(&wfds);
-	timeout.tv_sec = 0;
-	timeout.tv_usec = 999 * 1000;
-	max = -1;
-	ret = r->pre_select(&rn, &rfds, &wfds, &timeout);
-	max = PARA_MAX(max, ret);
-
-	PARA_DEBUG_LOG("timeout: %lums, max: %d\n", tv2ms(&timeout), max);
-	ret = para_select(max + 1, &rfds, &wfds, &timeout);
-	if (ret < 0) {
-		ret = -E_RECV_SELECT;
-		goto out;
-	}
-	ret = r->post_select(&rn, ret, &rfds, &wfds);
-	if (ret < 0)
-		goto out;
-	if (!ret)
-		eof = 1;
-	if (!rn.loaded) {
-		if (eof)
-			goto out;
-		goto recv;
-	}
-	ret = write(STDOUT_FILENO, rn.buf, rn.loaded);
-	PARA_DEBUG_LOG("wrote %d/%zd\n", ret, rn.loaded);
-	if (ret < 0) {
-		ret = -E_WRITE_STDOUT;
-		goto out;
-	}
-	if (ret != rn.loaded) {
-		PARA_INFO_LOG("short write %d/%zd\n", ret, rn.loaded);
-		memmove(rn.buf, rn.buf + ret, rn.loaded - ret);
-	}
-	rn.loaded -= ret;
-	if (rn.loaded || !eof)
-		goto recv;
-out:
-	if (r_opened)
-		r->close(&rn);
-	if (r)
-		r->shutdown();
-	if (ret < 0)
-		PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret));
-	return ret;
-}
-#endif
-
 void rn_event_handler(struct task *t)
 {
 	PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret));
@@ -168,7 +95,7 @@ int main(int argc, char *argv[])
 	stdout_set_defaults(&sot);
 	sot.buf = rn.buf;
 	sot.loaded = &rn.loaded;
-	sot.eof = &rn.eof;
+	sot.input_eof = &rn.eof;
 	register_task(&sot.task);
 
 	rn.task.private_data = &rn;
diff --git a/stdin.c b/stdin.c
index ebfa6ccf..22f1dd97 100644
--- a/stdin.c
+++ b/stdin.c
@@ -46,6 +46,7 @@ void stdin_set_defaults(struct stdin_task *sit)
 {
 	sit->bufsize = 16 * 1024,
 	sit->loaded = 0,
+	sit->eof = 0,
 	sit->task.flags = 0,
 	sit->task.pre_select = stdin_pre_select;
 	sit->task.post_select = stdin_post_select;
diff --git a/stdout.c b/stdout.c
index af598070..1c60669a 100644
--- a/stdout.c
+++ b/stdout.c
@@ -25,7 +25,7 @@ void stdout_post_select(struct sched *s, struct task *t)
 
 	t->ret = 1;
 	if (!sot->check_fd) {
-		if (*sot->eof)
+		if (*sot->input_eof)
 			t->ret = -E_STDOUT_EOF;
 		return;
 	}
@@ -53,5 +53,6 @@ void stdout_set_defaults(struct stdout_task *sot)
 	sot->task.post_select = stdout_post_select;
 	sot->task.event_handler = stdout_default_event_handler;
 	sot->task.flags = 0;
+	sot->eof = 0;
 	sprintf(sot->task.status, "stdout writer");
 }
diff --git a/stdout.h b/stdout.h
index e467f41d..5e45ace8 100644
--- a/stdout.h
+++ b/stdout.h
@@ -2,7 +2,8 @@ struct stdout_task {
 	char *buf;
 	size_t *bufsize;
 	size_t *loaded;
-	int *eof;
+	int *input_eof;
+	int eof;
 	struct task task;
 	int check_fd;
 };
diff --git a/write.c b/write.c
index 13b15a1d..a3cd4b5d 100644
--- a/write.c
+++ b/write.c
@@ -150,9 +150,8 @@ static struct writer_node_group *check_args(void)
 	}
 	ret = 1;
 out:
-	if (ret > 0) {
+	if (ret > 0)
 		return wng;
-	}
 	free(wng);
 	return NULL;
 }
@@ -165,8 +164,7 @@ static void idt_event_handler(struct task *t)
 	unregister_task(t);
 	wng->buf = sit.buf;
 	wng->loaded = &sit.loaded;
-	wng->eof = &sit.eof;
-	sprintf(wng->task.status, "%s", "writer node group");
+	wng->input_eof = &sit.eof;
 	ret = wng_open(wng);
 	if (ret < 0) {
 		PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret));
diff --git a/write.h b/write.h
index e6e0a762..04ad09d5 100644
--- a/write.h
+++ b/write.h
@@ -85,19 +85,20 @@ void (*shutdown)(struct writer_node *);
  * describes a set of writer nodes that all write the same stream.
  */
 struct writer_node_group {
-/** number of nodes belonging to this group */
-unsigned num_writers;
-/** array of pointers to the corresponding writer nodes */
-struct writer_node *writer_nodes;
-/** keeps track of how many bytes have been written by each node */
-int *written;
-/** the maximum of the chunk_bytes values of the writer nodes in this group */
-size_t max_chunk_bytes;
-/** non-zero if end of file was encountered */
-int *eof;
-char *buf;
-size_t *loaded;
-struct task task;
+	/** number of nodes belonging to this group */
+	unsigned num_writers;
+	/** array of pointers to the corresponding writer nodes */
+	struct writer_node *writer_nodes;
+	/** keeps track of how many bytes have been written by each node */
+	int *written;
+	/** the maximum of the chunk_bytes values of the writer nodes in this group */
+	size_t max_chunk_bytes;
+	/** non-zero if end of file was encountered */
+	int *input_eof;
+	int eof;
+	char *buf;
+	size_t *loaded;
+	struct task task;
 };
 
 /** loop over each writer node in a writer group */
diff --git a/write_common.c b/write_common.c
index 0b2772ae..6c0faa7c 100644
--- a/write_common.c
+++ b/write_common.c
@@ -45,7 +45,7 @@ static void wng_post_select(struct sched *s, struct task *t)
 			min_written = PARA_MIN(min_written, t->ret);
 	}
 	*g->loaded -= min_written;
-	if (!*g->loaded && *g->eof)
+	if (!*g->loaded && *g->input_eof)
 		t->ret = -E_WNG_EOF;
 	else
 		t->ret = 1;
@@ -72,6 +72,8 @@ int wng_open(struct writer_node_group *g)
 		wn->task.private_data = wn;
 		register_task(&wn->task);
 	}
+	sprintf(g->task.status, "%s", "writer node group");
+	g->eof = 0;
 	register_task(&g->task);
 out:
 	return ret;
-- 
2.39.5