Replace eof by error in receivers/filters/writers.
authorAndre Noll <maan@systemlinux.org>
Sat, 15 Dec 2007 16:01:47 +0000 (17:01 +0100)
committerAndre Noll <maan@systemlinux.org>
Sat, 15 Dec 2007 16:01:47 +0000 (17:01 +0100)
This way it's possible to tell at a later time why
the receiver/filter/writer terminated.

This allows to increase the delay for reconnecting in case
the receiver failed to connect to para_server: Let the receivers
set the error value to -E_RECV_EOF in case a normal end of file
event occurred and check this value when calculating the restart
barrier.

23 files changed:
aacdec.c
alsa_write.c
audiod.c
client.c
client.h
client_common.c
dccp_recv.c
error.h
filter.c
filter.h
filter_chain.c
http_recv.c
oggdec.c
ortp_recv.c
recv.c
recv.h
stdin.c
stdin.h
stdout.c
stdout.h
write.c
write.h
write_common.c

index 2f2a1ad..258864a 100644 (file)
--- a/aacdec.c
+++ b/aacdec.c
@@ -61,7 +61,7 @@ static ssize_t aacdec(char *input_buffer, size_t len, struct filter_node *fn)
 
        if (fn->loaded > fn->bufsize * 3 / 5)
                return 0;
-       if (len < 2048 && !*fc->input_eof)
+       if (len < 2048 && !*fc->input_error)
                return 0;
 
        if (!padd->initialized) {
index faadec1..c856ef4 100644 (file)
@@ -176,7 +176,7 @@ static int alsa_write_post_select(__a_unused struct sched *s,
 
 //     PARA_INFO_LOG("%zd frames\n", frames);
        if (!frames) {
-               if (*wng->input_eof)
+               if (*wng->input_error)
                        wn->written = *wng->loaded;
                return 1;
        }
index dfb61db..f83943a 100644 (file)
--- a/audiod.c
+++ b/audiod.c
@@ -197,34 +197,34 @@ static void close_receiver(int slot_num)
        if (s->format < 0 || !s->receiver_node)
                return;
        a = &afi[s->format];
-       PARA_NOTICE_LOG("closing %s receiver in slot %d (eof = %d)\n",
-               audio_formats[s->format] , slot_num, s->receiver_node->eof);
+       PARA_NOTICE_LOG("closing %s receiver in slot %d\n",
+               audio_formats[s->format], slot_num);
        a->receiver->close(s->receiver_node);
        free(s->receiver_node);
        s->receiver_node = NULL;
 }
 
-static void kill_all_decoders(void)
+static void kill_all_decoders(int error)
 {
        int i;
 
        FOR_EACH_SLOT(i) {
                struct slot_info *s = &slot[i];
-               if (s->wng && !s->wng->eof) {
+               if (s->wng && !s->wng->error) {
                        PARA_INFO_LOG("unregistering writer node group in slot %d\n",
                                i);
                        wng_unregister(s->wng);
-                       s->wng->eof = 1;
+                       s->wng->error = error;
                }
-               if (s->fc && !s->fc->eof) {
+               if (s->fc && !s->fc->error) {
                        PARA_INFO_LOG("unregistering filter chain in slot %d\n", i);
                        unregister_task(&s->fc->task);
-                       s->fc->eof = 1;
+                       s->fc->error = error;
                }
-               if (s->receiver_node && !s->receiver_node->eof) {
+               if (s->receiver_node && !s->receiver_node->error) {
                        PARA_INFO_LOG("unregistering receiver_node in slot %d\n", i);
                        unregister_task(&s->receiver_node->task);
-                       s->receiver_node->eof = 1;
+                       s->receiver_node->error = error;
                }
        }
 }
@@ -266,7 +266,7 @@ static void filter_event_handler(struct task *t)
 {
        PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret));
        struct filter_chain *fc = t->private_data;
-       fc->eof = 1;
+       fc->error = t->ret;
        unregister_task(t);
 }
 
@@ -285,13 +285,13 @@ static void open_filters(int slot_num)
        INIT_LIST_HEAD(&s->fc->filters);
        s->fc->inbuf = s->receiver_node->buf;
        s->fc->in_loaded = &s->receiver_node->loaded;
-       s->fc->input_eof = &s->receiver_node->eof;
+       s->fc->input_error = &s->receiver_node->error;
        s->fc->task.pre_select = filter_pre_select;
        s->fc->task.event_handler = filter_event_handler;
        s->fc->task.private_data = s->fc;
-       s->fc->eof = 0;
+       s->fc->error = 0;
 
-       s->receiver_node->output_eof = &s->fc->eof;
+       s->receiver_node->output_error = &s->fc->error;
        sprintf(s->fc->task.status, "filter chain");
        for (i = 0; i < nf; i++) {
                struct filter_node *fn = para_calloc(sizeof(struct filter_node));
@@ -315,7 +315,7 @@ static void wng_event_handler(struct task *t)
        struct writer_node_group *wng = t->private_data;
 
        PARA_INFO_LOG("%s\n", PARA_STRERROR(-t->ret));
-       wng->eof = 1;
+       wng->error = t->ret;
        wng_unregister(wng);
 }
 
@@ -333,15 +333,15 @@ static void open_writers(int slot_num)
        if (s->fc) {
                s->wng->buf = s->fc->outbuf;
                s->wng->loaded = s->fc->out_loaded;
-               s->wng->input_eof = &s->fc->eof;
+               s->wng->input_error = &s->fc->error;
                s->wng->channels = &s->fc->channels;
                s->wng->samplerate = &s->fc->samplerate;
-               s->fc->output_eof = &s->wng->eof;
+               s->fc->output_error = &s->wng->error;
                PARA_INFO_LOG("samplerate: %d\n", *s->wng->samplerate);
        } else {
                s->wng->buf = s->receiver_node->buf;
                s->wng->loaded = &s->receiver_node->loaded;
-               s->wng->input_eof = &s->receiver_node->eof;
+               s->wng->input_error = &s->receiver_node->error;
        }
        s->wng->task.event_handler = wng_event_handler;
        for (i = 0; i < a->num_writers; i++) {
@@ -360,16 +360,19 @@ static void open_writers(int slot_num)
 static void rn_event_handler(struct task *t)
 {
        struct receiver_node *rn = t->private_data;
-       const struct timeval restart_delay = {0, 10 * 1000};
        int i;
 
        PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret));
        unregister_task(t);
-       rn->eof = 1;
+       rn->error = t->ret;
        /* set restart barrier */
        FOR_EACH_SLOT(i) {
+               struct timeval restart_delay = {0, 10 * 1000};
                if (slot[i].receiver_node != rn)
                        continue;
+               if (rn->error != -E_RECV_EOF)
+                       /* don't reconnect immediately on errors */
+                       restart_delay.tv_sec = 5;
                tv_add(now, &restart_delay, &afi[slot[i].format].restart_barrier);
        }
 }
@@ -420,7 +423,7 @@ static int receiver_running(int format)
        FOR_EACH_SLOT(i) {
                struct slot_info *s = &slot[i];
                if (s->format == format && s->receiver_node
-                               && !s->receiver_node->eof)
+                               && !s->receiver_node->error)
                        return 1;
        }
        return 0;
@@ -558,13 +561,13 @@ static void try_to_close_slot(int slot_num)
 
        if (s->format < 0)
                return;
-       if (s->receiver_node && !s->receiver_node->eof)
+       if (s->receiver_node && !s->receiver_node->error)
                return;
-       if (s->fc && !s->fc->eof)
+       if (s->fc && !s->fc->error)
                return;
-       if (s->wng && !s->wng->eof)
+       if (s->wng && !s->wng->error)
                return;
-       PARA_INFO_LOG("closing slot %d \n", slot_num);
+       PARA_INFO_LOG("closing slot %d\n", slot_num);
        wng_close(s->wng);
        close_filters(s->fc);
        free(s->fc);
@@ -585,7 +588,7 @@ static void audiod_pre_select(struct sched *s, __a_unused struct task *t)
 
        t->ret = 1;
        if (audiod_status != AUDIOD_ON || !stat_task->playing)
-               return kill_all_decoders();
+               return kill_all_decoders(-E_NOT_PLAYING);
        if (open_current_receiver(s))
                s->timeout = min_delay;
        FOR_EACH_SLOT(i) {
index 1506f65..a37bc7e 100644 (file)
--- a/client.c
+++ b/client.c
@@ -41,13 +41,13 @@ static void client_event_handler(struct task *t)
                register_task(&sit.task);
                p->inbuf = sit.buf;
                p->in_loaded = &sit.loaded;
-               p->in_eof = &sit.eof;
+               p->in_error = &sit.error;
                return;
        }
        stdout_set_defaults(&sot);
        sot.buf = p->buf;
        sot.loaded = &p->loaded;
-       sot.input_eof = &p->eof;
+       sot.input_error = &p->eof;
        register_task(&sot.task);
 }
 
index 06eeda3..199d5c9 100644 (file)
--- a/client.h
+++ b/client.h
@@ -73,8 +73,8 @@ struct private_client_data {
        char *inbuf;
        /** number of bytes loaded in \p inbuf */
        size_t *in_loaded;
-       /** non-zero if input task encountered an eof or an errro condition */
-       int *in_eof;
+       /** Non-zero if input task encountered an eof or an error condition. */
+       int *in_error;
 };
 
 void client_close(struct private_client_data *pcd);
index 7c31fcd..3501a5d 100644 (file)
@@ -224,8 +224,8 @@ void client_pre_select(struct sched *s, struct task *t)
                        para_fd_set(pcd->fd, &s->wfds, &s->max_fileno);
                        pcd->check_w = 1;
                } else {
-                       if (*pcd->in_eof) {
-                               t->ret = -E_INPUT_EOF;
+                       if (*pcd->in_error) {
+                               t->ret = *pcd->in_error;
                                s->timeout.tv_sec = 0;
                                s->timeout.tv_usec = 1;
                        }
index 755473c..eeb7c57 100644 (file)
@@ -99,9 +99,10 @@ static void dccp_recv_post_select(struct sched *s, struct task *t)
        struct receiver_node *rn = t->private_data;
        struct private_dccp_recv_data *pdd = rn->private_data;
 
-       t->ret = -E_DCCP_RECV_EOF;
-       if (rn->output_eof && *rn->output_eof)
+       if (rn->output_error && *rn->output_error) {
+               t->ret = *rn->output_error;
                goto out;
+       }
        t->ret = 1;
        if (!s->select_ret || !FD_ISSET(pdd->fd, &s->rfds))
                goto out; /* nothing to do */
@@ -112,14 +113,14 @@ static void dccp_recv_post_select(struct sched *s, struct task *t)
                DCCP_BUFSIZE - rn->loaded);
        if (t->ret <= 0) {
                if (!t->ret)
-                       t->ret = -E_DCCP_RECV_EOF;
+                       t->ret = -E_RECV_EOF;
                goto out;
        }
        rn->loaded += t->ret;
        return;
 out:
        if (t->ret < 0)
-               rn->eof = 1;
+               rn->error = t->ret;
 }
 
 /**
diff --git a/error.h b/error.h
index 534e7f9..469b85b 100644 (file)
--- a/error.h
+++ b/error.h
@@ -24,10 +24,9 @@ DEFINE_ERRLIST_OBJECT_ENUM;
 #define RINGBUFFER_ERRORS
 #define SCORE_ERRORS
 #define SHA1_ERRORS
-#define RECV_ERRORS
 #define AFH_COMMON_ERRORS
 #define RBTREE_ERRORS
-
+#define RECV_ERRORS
 
 extern const char **para_errlist[];
 
@@ -153,7 +152,6 @@ extern const char **para_errlist[];
        PARA_ERROR(NO_CONFIG, "config file not found"), \
        PARA_ERROR(CLIENT_AUTH, "authentication failed"), \
        PARA_ERROR(SERVER_EOF, "connection closed by para_server"), \
-       PARA_ERROR(INPUT_EOF, "end of input"), \
        PARA_ERROR(HANDSHAKE_COMPLETE, ""), /* not really an error */ \
 
 
@@ -170,7 +168,6 @@ extern const char **para_errlist[];
 
 #define STDOUT_ERRORS \
        PARA_ERROR(STDOUT_WRITE, "failed to write to stdout"), \
-       PARA_ERROR(STDOUT_EOF, "end of file"), \
 
 
 #define NET_ERRORS \
@@ -194,12 +191,14 @@ extern const char **para_errlist[];
 
 
 #define HTTP_RECV_ERRORS \
-       PARA_ERROR(HTTP_RECV_EOF, "http_recv: end of file"), \
        PARA_ERROR(HTTP_RECV_OVERRUN, "http_recv: output buffer overrun"), \
 
 
 #define RECV_COMMON_ERRORS \
        PARA_ERROR(RECV_SYNTAX, "recv syntax error"), \
+       PARA_ERROR(RECV_EOF, "end of file"), \
+       PARA_ERROR(RECV_CONNECT, "connection failed"), \
+
 
 
 #define AUDIOD_ERRORS \
@@ -207,6 +206,7 @@ extern const char **para_errlist[];
        PARA_ERROR(MISSING_COLON, "syntax error: missing colon"), \
        PARA_ERROR(UNSUPPORTED_AUDIO_FORMAT, "given audio format not supported"), \
        PARA_ERROR(SIGNAL_CAUGHT, "caught signal"), \
+       PARA_ERROR(NOT_PLAYING, "not playing"), \
 
 
 #define AUDIOD_COMMAND_ERRORS \
@@ -356,7 +356,6 @@ extern const char **para_errlist[];
 
 #define DCCP_RECV_ERRORS \
        PARA_ERROR(DCCP_OVERRUN, "dccp output buffer buffer overrun"), \
-       PARA_ERROR(DCCP_RECV_EOF, "dccp_recv: end of file"), \
 
 
 #define DCCP_SEND_ERRORS \
@@ -410,7 +409,6 @@ extern const char **para_errlist[];
 
 #define WRITE_COMMON_ERRORS \
        PARA_ERROR(WRITE_COMMON_SYNTAX, "syntax error in write option"), \
-       PARA_ERROR(WNG_EOF, "wng: end of file"), \
 
 
 #define AACDEC_ERRORS \
index 52a1ce5..27bbe63 100644 (file)
--- a/filter.c
+++ b/filter.c
@@ -66,9 +66,9 @@ static int init_filter_chain(void)
 
        fc->inbuf = sit->buf;
        fc->in_loaded = &sit->loaded;
-       fc->input_eof = &sit->eof;
-       fc->eof = 0;
-       fc->output_eof = &sot->eof;
+       fc->input_error = &sit->error;
+       fc->error = 0;
+       fc->output_error = &sot->error;
        fc->task.private_data = fc;
        fc->task.pre_select = filter_pre_select;
        fc->task.event_handler = filter_event_handler;
@@ -160,7 +160,7 @@ int main(int argc, char *argv[])
        stdout_set_defaults(sot);
        sot->buf = fc->outbuf;
        sot->loaded = fc->out_loaded;
-       sot->input_eof = &fc->eof;
+       sot->input_error = &fc->error;
 
        register_task(&sit->task);
        register_task(&fc->task);
index 9e197f6..09e4591 100644 (file)
--- a/filter.h
+++ b/filter.h
@@ -63,12 +63,12 @@ struct filter_chain {
         * pointer to variable containing the number of bytes loaded in the output buffer
         */
                size_t *out_loaded;
-       /** non-zero if this filter wont' produce any more output */
-       int eof;
-       /** pointer to the eof flag of the receiving application */
-       int *input_eof;
-       /** pointer to the eof flag of the writing application */
-       int *output_eof;
+       /** Non-zero if this filter wont' produce any more output. */
+       int error;
+       /** Pointer to the error variable of the receiving application. */
+       int *input_error;
+       /** Pointer to the eof flag of the writing application. */
+       int *output_error;
        /** the task associated with the filter chain */
        struct task task;
 };
index 7852220..295f2d0 100644 (file)
@@ -115,9 +115,10 @@ void filter_pre_select(__a_unused struct sched *s, struct task *t)
        size_t *loaded;
        int conv, conv_total = 0;
 
-       t->ret = -E_FC_EOF;
-       if (fc->output_eof && *fc->output_eof)
+       if (fc->output_error && *fc->output_error) {
+               t->ret =  *fc->output_error;
                goto err_out;
+       }
 again:
        ib = fc->inbuf;
        loaded = fc->in_loaded;
@@ -153,7 +154,7 @@ again:
        if (conv)
                goto again;
        t->ret = 1;
-       if (!*fc->input_eof)
+       if (!*fc->input_error)
                return;
        if (*fc->out_loaded)
                return;
@@ -161,7 +162,7 @@ again:
                return;
        t->ret = -E_FC_EOF;
 err_out:
-       fc->eof = 1;
+       fc->error = t->ret;
 }
 
 /**
index 4283db4..8c19d6d 100644 (file)
@@ -100,9 +100,10 @@ static void http_recv_post_select(struct sched *s, struct task *t)
        struct receiver_node *rn = t->private_data;
        struct private_http_recv_data *phd = rn->private_data;
 
-       t->ret = -E_HTTP_RECV_EOF;
-       if (rn->output_eof && *rn->output_eof)
+       if (rn->output_error && *rn->output_error) {
+               t->ret = *rn->output_error;
                goto out;
+       }
        t->ret = 1;
        if (!s->select_ret)
                goto out;
@@ -136,13 +137,13 @@ static void http_recv_post_select(struct sched *s, struct task *t)
                BUFSIZE - rn->loaded);
        if (t->ret <= 0) {
                if (!t->ret)
-                       t->ret = -E_HTTP_RECV_EOF;
+                       t->ret = -E_RECV_EOF;
                goto out;
        }
        rn->loaded += t->ret;
 out:
        if (t->ret < 0)
-               rn->eof = 1;
+               rn->error = t->ret;
 }
 
 static void http_recv_close(struct receiver_node *rn)
index 3dced8a..4c8aae5 100644 (file)
--- a/oggdec.c
+++ b/oggdec.c
@@ -46,7 +46,7 @@ static size_t cb_read(void *buf, size_t size, size_t nmemb, void *datasource)
 //     PARA_DEBUG_LOG("pod = %p\n", pod);
 //     PARA_DEBUG_LOG("vorbis requests %d bytes, have %d\n", size * nmemb, have);
        if (pod->inbuf_len < size) {
-               if (*fn->fc->input_eof)
+               if (*fn->fc->input_error)
                        return 0;
                errno = EAGAIN;
                return (size_t)-1;
@@ -123,7 +123,7 @@ static ssize_t ogg_convert(char *inbuffer, size_t len, struct filter_node *fn)
 
        if (!pod->vf) {
                int ib = 1024 * conf->initial_buffer_arg; /* initial buffer */
-               if (len <ib && !*fn->fc->input_eof) {
+               if (len <ib && !*fn->fc->input_error) {
                        PARA_DEBUG_LOG("initial input buffer %zd/%d, "
                                "waiting for more data\n", len, ib);
                        return 0;
@@ -149,7 +149,7 @@ static ssize_t ogg_convert(char *inbuffer, size_t len, struct filter_node *fn)
                PARA_NOTICE_LOG("%d channels, %d Hz\n", fn->fc->channels,
                        fn->fc->samplerate);
        }
-       while (!*fn->fc->input_eof && fn->loaded < fn->bufsize) {
+       while (!*fn->fc->input_error && fn->loaded < fn->bufsize) {
                int length = fn->bufsize - fn->loaded;
                long read_ret = ov_read(pod->vf, fn->buf + fn->loaded, length,
                        ENDIAN, 2 /* 16 bit */, 1 /* signed */, NULL);
index fca470e..b28d084 100644 (file)
@@ -123,9 +123,9 @@ static void ortp_recv_post_select(__a_unused struct sched *s, struct task *t)
        size_t packet_size;
 
 //     PARA_INFO_LOG("rn: %p, pord: %p, session: %p\n", rn, pord, pord->session);
-       t->ret = -E_ORTP_RECV_EOF;
-       if (rn->output_eof && *rn->output_eof) {
-               rn->eof = 1;
+       if (rn->output_error && *rn->output_error) {
+               rn->error = *rn->output_error;
+               t->ret = rn->error;
                return;
        }
        t->ret = 1;
@@ -150,11 +150,11 @@ static void ortp_recv_post_select(__a_unused struct sched *s, struct task *t)
                pord->start = *now;
        t->ret = msg_to_buf(mp, tmpbuf, CHUNK_SIZE);
        if (t->ret < ORTP_AUDIO_HEADER_LEN) {
-               rn->eof = 1;
                if (t->ret < 0)
                        t->ret = -E_MSG_TO_BUF;
                else
                        t->ret = -E_ORTP_RECV_EOF;
+               rn->error = t->ret;
                goto err_out;
        }
        packet_size = t->ret;
@@ -168,8 +168,8 @@ static void ortp_recv_post_select(__a_unused struct sched *s, struct task *t)
        switch (packet_type) {
        unsigned header_len, payload_len;
        case ORTP_EOF:
-               rn->eof = 1;
-               t->ret = -E_ORTP_RECV_EOF;
+               t->ret = -E_RECV_EOF;
+               rn->error = t->ret;
                goto err_out;
        case ORTP_BOF:
                PARA_INFO_LOG("bof (%zu)\n", packet_size);
@@ -222,7 +222,7 @@ success:
        compute_next_chunk(chunk_time, pord);
        return;
 err_out:
-       rn->eof = 1;
+       rn->error = t->ret;
        freemsg(mp);
 }
 
diff --git a/recv.c b/recv.c
index 0fe4641..d1909f2 100644 (file)
--- a/recv.c
+++ b/recv.c
@@ -49,7 +49,7 @@ static void rn_event_handler(struct task *t)
 {
        struct receiver_node *rn = t->private_data;
        PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret));
-       rn->eof = 1;
+       rn->error = t->ret;
        unregister_task(t);
 }
 
@@ -94,7 +94,7 @@ int main(int argc, char *argv[])
        stdout_set_defaults(&sot);
        sot.buf = rn.buf;
        sot.loaded = &rn.loaded;
-       sot.input_eof = &rn.eof;
+       sot.input_error = &rn.error;
        register_task(&sot.task);
 
        rn.task.private_data = &rn;
diff --git a/recv.h b/recv.h
index e1ef4e2..6d318d6 100644 (file)
--- a/recv.h
+++ b/recv.h
@@ -18,10 +18,10 @@ struct receiver_node {
        size_t loaded;
        /** receiver-specific data */
        void *private_data;
-       /** set to 1 if end of file is reached */
-       int eof;
-       /** pointer to the eof member of the consumer */
-       int *output_eof;
+       /** Set to non-zero error value on errors or on end of file. */
+       int error;
+       /** Pointer to the error member of the consumer. */
+       int *output_error;
        /** pointer to the configuration data for this instance */
        void *conf;
        /** the task associated with this instance */
diff --git a/stdin.c b/stdin.c
index 649d464..25eca2e 100644 (file)
--- a/stdin.c
+++ b/stdin.c
@@ -75,7 +75,7 @@ static void stdin_post_select(struct sched *s, struct task *t)
        } else
                t->ret = -E_STDIN_EOF;
        if (t->ret < 0)
-               sit->eof = 1;
+               sit->error = t->ret;
 }
 
 /**
@@ -92,7 +92,7 @@ void stdin_set_defaults(struct stdin_task *sit)
 {
        sit->bufsize = 16 * 1024,
        sit->loaded = 0,
-       sit->eof = 0,
+       sit->error = 0,
        sit->task.pre_select = stdin_pre_select;
        sit->task.post_select = stdin_post_select;
        sit->task.event_handler = stdin_default_event_handler;
diff --git a/stdin.h b/stdin.h
index 4f5a611..515c4d2 100644 (file)
--- a/stdin.h
+++ b/stdin.h
@@ -19,7 +19,7 @@ struct stdin_task {
        /** The task structure. */
        struct task task;
        /** Non-zero on read error, or if a read from stdin returned zero. */
-       int eof;
+       int error;
 };
 
 void stdin_set_defaults(struct stdin_task *sit);
index fe188c9..babed75 100644 (file)
--- a/stdout.c
+++ b/stdout.c
@@ -33,8 +33,8 @@ static void stdout_pre_select(struct sched *s, struct task *t)
        t->ret = 1;
        sot->check_fd = 0;
        if (!*sot->loaded) {
-               if (*sot->input_eof) {
-                       t->ret = -E_STDOUT_EOF;
+               if (*sot->input_error) {
+                       t->ret = *sot->input_error;
                        s->timeout.tv_sec = 0;
                        s->timeout.tv_usec = 1;
                }
@@ -62,8 +62,8 @@ static void stdout_post_select(struct sched *s, struct task *t)
 
        t->ret = 1;
        if (!sot->check_fd) {
-               if (*sot->input_eof)
-                       t->ret = -E_STDOUT_EOF;
+               if (*sot->input_error)
+                       t->ret = *sot->input_error;
                return;
        }
        if (!FD_ISSET(STDOUT_FILENO, &s->wfds))
@@ -99,7 +99,7 @@ void stdout_set_defaults(struct stdout_task *sot)
        sot->task.pre_select = stdout_pre_select;
        sot->task.post_select = stdout_post_select;
        sot->task.event_handler = stdout_default_event_handler;
-       sot->eof = 0;
+       sot->error = 0;
        mark_fd_nonblocking(STDOUT_FILENO);
        sprintf(sot->task.status, "stdout writer");
 }
index ee4f35f..92e1cf4 100644 (file)
--- a/stdout.h
+++ b/stdout.h
@@ -14,10 +14,10 @@ struct stdout_task {
        char *buf;
        /** Number of bytes loaded in \a buf. */
        size_t *loaded;
-       /** ├ťointer to the eof flag of the feeding task. */
-       int *input_eof;
-       /** Non-zero if a write error occured. */
-       int eof;
+       /** Pointer to the error variable of the feeding task. */
+       int *input_error;
+       /** Non-zero if a write error occurred. */
+       int error;
        /** The task structure. */
        struct task task;
        /** Whether \p STDOUT_FILENO was included in the write fd set. */
diff --git a/write.c b/write.c
index 3b87335..125df1c 100644 (file)
--- a/write.c
+++ b/write.c
@@ -28,11 +28,11 @@ struct check_wav_task {
        char *buf;
        /** Number of bytes loaded in \a buf. */
        size_t *loaded;
-       /** Non-zero if end of file was reached. */
-       int *eof;
+       /** Non-zero if an error occurred or end of file was reached. */
+       int *error;
        /** Number of channels specified in wav header given by \a buf. */
        unsigned channels;
-       /** Samplerate specified in wav header given by \a buf. */
+       /** Sample rate specified in wav header given by \a buf. */
        unsigned samplerate;
        /** The task structure for this task. */
        struct task task;
@@ -67,7 +67,7 @@ static void check_wav_pre_select(__a_unused struct sched *s, struct task *t)
        unsigned char *a;
 
        if (*wt->loaded < WAV_HEADER_LEN) {
-               t->ret = *wt->eof? -E_PREMATURE_END : 1;
+               t->ret = *wt->error? -E_PREMATURE_END : 1;
                return;
        }
        wt->channels = 2;
@@ -170,7 +170,7 @@ static void idt_event_handler(struct task *t)
        unregister_task(t);
        wng->buf = sit.buf;
        wng->loaded = &sit.loaded;
-       wng->input_eof = &sit.eof;
+       wng->input_error = &sit.error;
        wng->task.event_handler = wng_event_handler;
        wng->channels = &cwt.channels;
        wng->samplerate = &cwt.samplerate;
@@ -234,7 +234,7 @@ int main(int argc, char *argv[])
        cwt.task.event_handler = cwt_event_handler;
        cwt.buf = sit.buf;
        cwt.loaded = &sit.loaded;
-       cwt.eof = &sit.eof;
+       cwt.error = &sit.error;
        sprintf(cwt.task.status, "check wav");
        register_task(&cwt.task);
 
diff --git a/write.h b/write.h
index 2eb4e3d..71c666d 100644 (file)
--- a/write.h
+++ b/write.h
@@ -101,10 +101,10 @@ struct writer_node_group {
        struct writer_node *writer_nodes;
        /** the maximum of the chunk_bytes values of the writer nodes in this group */
        size_t max_chunk_bytes;
-       /** non-zero if end of file was encountered by the feeding task */
-       int *input_eof;
-       /** non-zero if end of file was encountered */
-       int eof;
+       /** Non-zero if an error or end of file was encountered by the feeding task. */
+       int *input_error;
+       /** Non-zero if an error occurred or end of file was encountered. */
+       int error;
        /** current output buffer */
        char *buf;
        /** number of bytes loaded in the output buffer */
index b35773f..a8d2114 100644 (file)
@@ -28,7 +28,7 @@ static void wng_pre_select(__a_unused struct sched *s, struct task *t)
                struct writer_node *wn = &g->writer_nodes[i];
                t->ret = wn->writer->pre_select(s, wn);
                if (t->ret < 0) {
-                       g->eof = 1;
+                       g->error = t->ret;
                        return;
                }
        }
@@ -44,7 +44,7 @@ static void wng_post_select(struct sched *s, struct task *t)
                struct writer_node *wn = &g->writer_nodes[i];
                t->ret = wn->writer->post_select(s, wn);
                if (t->ret < 0) {
-                       g->eof = 1;
+                       g->error = t->ret;
                        return;
                }
                if (!i)
@@ -58,9 +58,9 @@ static void wng_post_select(struct sched *s, struct task *t)
                FOR_EACH_WRITER_NODE(i, g)
                        g->writer_nodes[i].written -= min_written;
        }
-       if (!*g->loaded && *g->input_eof) {
-               g->eof = 1;
-               t->ret = -E_WNG_EOF;
+       if (!*g->loaded && *g->input_error) {
+               g->error = *g->input_error;
+               t->ret = g->error;
                return;
        }
        t->ret = 1;
@@ -96,7 +96,7 @@ int wng_open(struct writer_node_group *g)
                g->max_chunk_bytes = PARA_MAX(g->max_chunk_bytes, ret);
        }
        sprintf(g->task.status, "%s", "writer node group");
-       g->eof = 0;
+       g->error = 0;
        return 1;
 err_out:
        PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret));
@@ -106,7 +106,7 @@ err_out:
                wn->writer->close(wn);
        }
        g->num_writers = 0;
-       g->eof = 1;
+       g->error = ret;
        return ret;
 }
 
@@ -119,7 +119,6 @@ err_out:
 void wng_unregister(struct writer_node_group *g)
 {
        unregister_task(&g->task);
-       g->eof = 1;
 }
 
 /**