From: Andre Noll Date: Sat, 15 Dec 2007 16:01:47 +0000 (+0100) Subject: Replace eof by error in receivers/filters/writers. X-Git-Tag: v0.3.0~57 X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=commitdiff_plain;h=19d9318abf42debb15d833d4e56ab636893285c3 Replace eof by error in receivers/filters/writers. This way it's possible to tell at a later time why the receiver/filter/writer terminated. This allows to increase the delay for reconnecting in case the receiver failed to connect to para_server: Let the receivers set the error value to -E_RECV_EOF in case a normal end of file event occurred and check this value when calculating the restart barrier. --- diff --git a/aacdec.c b/aacdec.c index 2f2a1ad4..258864a0 100644 --- a/aacdec.c +++ b/aacdec.c @@ -61,7 +61,7 @@ static ssize_t aacdec(char *input_buffer, size_t len, struct filter_node *fn) if (fn->loaded > fn->bufsize * 3 / 5) return 0; - if (len < 2048 && !*fc->input_eof) + if (len < 2048 && !*fc->input_error) return 0; if (!padd->initialized) { diff --git a/alsa_write.c b/alsa_write.c index faadec1f..c856ef4a 100644 --- a/alsa_write.c +++ b/alsa_write.c @@ -176,7 +176,7 @@ static int alsa_write_post_select(__a_unused struct sched *s, // PARA_INFO_LOG("%zd frames\n", frames); if (!frames) { - if (*wng->input_eof) + if (*wng->input_error) wn->written = *wng->loaded; return 1; } diff --git a/audiod.c b/audiod.c index dfb61db4..f83943a6 100644 --- a/audiod.c +++ b/audiod.c @@ -197,34 +197,34 @@ static void close_receiver(int slot_num) if (s->format < 0 || !s->receiver_node) return; a = &afi[s->format]; - PARA_NOTICE_LOG("closing %s receiver in slot %d (eof = %d)\n", - audio_formats[s->format] , slot_num, s->receiver_node->eof); + PARA_NOTICE_LOG("closing %s receiver in slot %d\n", + audio_formats[s->format], slot_num); a->receiver->close(s->receiver_node); free(s->receiver_node); s->receiver_node = NULL; } -static void kill_all_decoders(void) +static void kill_all_decoders(int error) { int i; FOR_EACH_SLOT(i) { struct slot_info *s = &slot[i]; - if (s->wng && !s->wng->eof) { + if (s->wng && !s->wng->error) { PARA_INFO_LOG("unregistering writer node group in slot %d\n", i); wng_unregister(s->wng); - s->wng->eof = 1; + s->wng->error = error; } - if (s->fc && !s->fc->eof) { + if (s->fc && !s->fc->error) { PARA_INFO_LOG("unregistering filter chain in slot %d\n", i); unregister_task(&s->fc->task); - s->fc->eof = 1; + s->fc->error = error; } - if (s->receiver_node && !s->receiver_node->eof) { + if (s->receiver_node && !s->receiver_node->error) { PARA_INFO_LOG("unregistering receiver_node in slot %d\n", i); unregister_task(&s->receiver_node->task); - s->receiver_node->eof = 1; + s->receiver_node->error = error; } } } @@ -266,7 +266,7 @@ static void filter_event_handler(struct task *t) { PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret)); struct filter_chain *fc = t->private_data; - fc->eof = 1; + fc->error = t->ret; unregister_task(t); } @@ -285,13 +285,13 @@ static void open_filters(int slot_num) INIT_LIST_HEAD(&s->fc->filters); s->fc->inbuf = s->receiver_node->buf; s->fc->in_loaded = &s->receiver_node->loaded; - s->fc->input_eof = &s->receiver_node->eof; + s->fc->input_error = &s->receiver_node->error; s->fc->task.pre_select = filter_pre_select; s->fc->task.event_handler = filter_event_handler; s->fc->task.private_data = s->fc; - s->fc->eof = 0; + s->fc->error = 0; - s->receiver_node->output_eof = &s->fc->eof; + s->receiver_node->output_error = &s->fc->error; sprintf(s->fc->task.status, "filter chain"); for (i = 0; i < nf; i++) { struct filter_node *fn = para_calloc(sizeof(struct filter_node)); @@ -315,7 +315,7 @@ static void wng_event_handler(struct task *t) struct writer_node_group *wng = t->private_data; PARA_INFO_LOG("%s\n", PARA_STRERROR(-t->ret)); - wng->eof = 1; + wng->error = t->ret; wng_unregister(wng); } @@ -333,15 +333,15 @@ static void open_writers(int slot_num) if (s->fc) { s->wng->buf = s->fc->outbuf; s->wng->loaded = s->fc->out_loaded; - s->wng->input_eof = &s->fc->eof; + s->wng->input_error = &s->fc->error; s->wng->channels = &s->fc->channels; s->wng->samplerate = &s->fc->samplerate; - s->fc->output_eof = &s->wng->eof; + s->fc->output_error = &s->wng->error; PARA_INFO_LOG("samplerate: %d\n", *s->wng->samplerate); } else { s->wng->buf = s->receiver_node->buf; s->wng->loaded = &s->receiver_node->loaded; - s->wng->input_eof = &s->receiver_node->eof; + s->wng->input_error = &s->receiver_node->error; } s->wng->task.event_handler = wng_event_handler; for (i = 0; i < a->num_writers; i++) { @@ -360,16 +360,19 @@ static void open_writers(int slot_num) static void rn_event_handler(struct task *t) { struct receiver_node *rn = t->private_data; - const struct timeval restart_delay = {0, 10 * 1000}; int i; PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret)); unregister_task(t); - rn->eof = 1; + rn->error = t->ret; /* set restart barrier */ FOR_EACH_SLOT(i) { + struct timeval restart_delay = {0, 10 * 1000}; if (slot[i].receiver_node != rn) continue; + if (rn->error != -E_RECV_EOF) + /* don't reconnect immediately on errors */ + restart_delay.tv_sec = 5; tv_add(now, &restart_delay, &afi[slot[i].format].restart_barrier); } } @@ -420,7 +423,7 @@ static int receiver_running(int format) FOR_EACH_SLOT(i) { struct slot_info *s = &slot[i]; if (s->format == format && s->receiver_node - && !s->receiver_node->eof) + && !s->receiver_node->error) return 1; } return 0; @@ -558,13 +561,13 @@ static void try_to_close_slot(int slot_num) if (s->format < 0) return; - if (s->receiver_node && !s->receiver_node->eof) + if (s->receiver_node && !s->receiver_node->error) return; - if (s->fc && !s->fc->eof) + if (s->fc && !s->fc->error) return; - if (s->wng && !s->wng->eof) + if (s->wng && !s->wng->error) return; - PARA_INFO_LOG("closing slot %d \n", slot_num); + PARA_INFO_LOG("closing slot %d\n", slot_num); wng_close(s->wng); close_filters(s->fc); free(s->fc); @@ -585,7 +588,7 @@ static void audiod_pre_select(struct sched *s, __a_unused struct task *t) t->ret = 1; if (audiod_status != AUDIOD_ON || !stat_task->playing) - return kill_all_decoders(); + return kill_all_decoders(-E_NOT_PLAYING); if (open_current_receiver(s)) s->timeout = min_delay; FOR_EACH_SLOT(i) { diff --git a/client.c b/client.c index 1506f656..a37bc7e2 100644 --- a/client.c +++ b/client.c @@ -41,13 +41,13 @@ static void client_event_handler(struct task *t) register_task(&sit.task); p->inbuf = sit.buf; p->in_loaded = &sit.loaded; - p->in_eof = &sit.eof; + p->in_error = &sit.error; return; } stdout_set_defaults(&sot); sot.buf = p->buf; sot.loaded = &p->loaded; - sot.input_eof = &p->eof; + sot.input_error = &p->eof; register_task(&sot.task); } diff --git a/client.h b/client.h index 06eeda38..199d5c9a 100644 --- a/client.h +++ b/client.h @@ -73,8 +73,8 @@ struct private_client_data { char *inbuf; /** number of bytes loaded in \p inbuf */ size_t *in_loaded; - /** non-zero if input task encountered an eof or an errro condition */ - int *in_eof; + /** Non-zero if input task encountered an eof or an error condition. */ + int *in_error; }; void client_close(struct private_client_data *pcd); diff --git a/client_common.c b/client_common.c index 7c31fcdd..3501a5df 100644 --- a/client_common.c +++ b/client_common.c @@ -224,8 +224,8 @@ void client_pre_select(struct sched *s, struct task *t) para_fd_set(pcd->fd, &s->wfds, &s->max_fileno); pcd->check_w = 1; } else { - if (*pcd->in_eof) { - t->ret = -E_INPUT_EOF; + if (*pcd->in_error) { + t->ret = *pcd->in_error; s->timeout.tv_sec = 0; s->timeout.tv_usec = 1; } diff --git a/dccp_recv.c b/dccp_recv.c index 755473c9..eeb7c57a 100644 --- a/dccp_recv.c +++ b/dccp_recv.c @@ -99,9 +99,10 @@ static void dccp_recv_post_select(struct sched *s, struct task *t) struct receiver_node *rn = t->private_data; struct private_dccp_recv_data *pdd = rn->private_data; - t->ret = -E_DCCP_RECV_EOF; - if (rn->output_eof && *rn->output_eof) + if (rn->output_error && *rn->output_error) { + t->ret = *rn->output_error; goto out; + } t->ret = 1; if (!s->select_ret || !FD_ISSET(pdd->fd, &s->rfds)) goto out; /* nothing to do */ @@ -112,14 +113,14 @@ static void dccp_recv_post_select(struct sched *s, struct task *t) DCCP_BUFSIZE - rn->loaded); if (t->ret <= 0) { if (!t->ret) - t->ret = -E_DCCP_RECV_EOF; + t->ret = -E_RECV_EOF; goto out; } rn->loaded += t->ret; return; out: if (t->ret < 0) - rn->eof = 1; + rn->error = t->ret; } /** diff --git a/error.h b/error.h index 534e7f97..469b85be 100644 --- a/error.h +++ b/error.h @@ -24,10 +24,9 @@ DEFINE_ERRLIST_OBJECT_ENUM; #define RINGBUFFER_ERRORS #define SCORE_ERRORS #define SHA1_ERRORS -#define RECV_ERRORS #define AFH_COMMON_ERRORS #define RBTREE_ERRORS - +#define RECV_ERRORS extern const char **para_errlist[]; @@ -153,7 +152,6 @@ extern const char **para_errlist[]; PARA_ERROR(NO_CONFIG, "config file not found"), \ PARA_ERROR(CLIENT_AUTH, "authentication failed"), \ PARA_ERROR(SERVER_EOF, "connection closed by para_server"), \ - PARA_ERROR(INPUT_EOF, "end of input"), \ PARA_ERROR(HANDSHAKE_COMPLETE, ""), /* not really an error */ \ @@ -170,7 +168,6 @@ extern const char **para_errlist[]; #define STDOUT_ERRORS \ PARA_ERROR(STDOUT_WRITE, "failed to write to stdout"), \ - PARA_ERROR(STDOUT_EOF, "end of file"), \ #define NET_ERRORS \ @@ -194,12 +191,14 @@ extern const char **para_errlist[]; #define HTTP_RECV_ERRORS \ - PARA_ERROR(HTTP_RECV_EOF, "http_recv: end of file"), \ PARA_ERROR(HTTP_RECV_OVERRUN, "http_recv: output buffer overrun"), \ #define RECV_COMMON_ERRORS \ PARA_ERROR(RECV_SYNTAX, "recv syntax error"), \ + PARA_ERROR(RECV_EOF, "end of file"), \ + PARA_ERROR(RECV_CONNECT, "connection failed"), \ + #define AUDIOD_ERRORS \ @@ -207,6 +206,7 @@ extern const char **para_errlist[]; PARA_ERROR(MISSING_COLON, "syntax error: missing colon"), \ PARA_ERROR(UNSUPPORTED_AUDIO_FORMAT, "given audio format not supported"), \ PARA_ERROR(SIGNAL_CAUGHT, "caught signal"), \ + PARA_ERROR(NOT_PLAYING, "not playing"), \ #define AUDIOD_COMMAND_ERRORS \ @@ -356,7 +356,6 @@ extern const char **para_errlist[]; #define DCCP_RECV_ERRORS \ PARA_ERROR(DCCP_OVERRUN, "dccp output buffer buffer overrun"), \ - PARA_ERROR(DCCP_RECV_EOF, "dccp_recv: end of file"), \ #define DCCP_SEND_ERRORS \ @@ -410,7 +409,6 @@ extern const char **para_errlist[]; #define WRITE_COMMON_ERRORS \ PARA_ERROR(WRITE_COMMON_SYNTAX, "syntax error in write option"), \ - PARA_ERROR(WNG_EOF, "wng: end of file"), \ #define AACDEC_ERRORS \ diff --git a/filter.c b/filter.c index 52a1ce5b..27bbe63e 100644 --- a/filter.c +++ b/filter.c @@ -66,9 +66,9 @@ static int init_filter_chain(void) fc->inbuf = sit->buf; fc->in_loaded = &sit->loaded; - fc->input_eof = &sit->eof; - fc->eof = 0; - fc->output_eof = &sot->eof; + fc->input_error = &sit->error; + fc->error = 0; + fc->output_error = &sot->error; fc->task.private_data = fc; fc->task.pre_select = filter_pre_select; fc->task.event_handler = filter_event_handler; @@ -160,7 +160,7 @@ int main(int argc, char *argv[]) stdout_set_defaults(sot); sot->buf = fc->outbuf; sot->loaded = fc->out_loaded; - sot->input_eof = &fc->eof; + sot->input_error = &fc->error; register_task(&sit->task); register_task(&fc->task); diff --git a/filter.h b/filter.h index 9e197f6c..09e45913 100644 --- a/filter.h +++ b/filter.h @@ -63,12 +63,12 @@ struct filter_chain { * pointer to variable containing the number of bytes loaded in the output buffer */ size_t *out_loaded; - /** non-zero if this filter wont' produce any more output */ - int eof; - /** pointer to the eof flag of the receiving application */ - int *input_eof; - /** pointer to the eof flag of the writing application */ - int *output_eof; + /** Non-zero if this filter wont' produce any more output. */ + int error; + /** Pointer to the error variable of the receiving application. */ + int *input_error; + /** Pointer to the eof flag of the writing application. */ + int *output_error; /** the task associated with the filter chain */ struct task task; }; diff --git a/filter_chain.c b/filter_chain.c index 7852220a..295f2d06 100644 --- a/filter_chain.c +++ b/filter_chain.c @@ -115,9 +115,10 @@ void filter_pre_select(__a_unused struct sched *s, struct task *t) size_t *loaded; int conv, conv_total = 0; - t->ret = -E_FC_EOF; - if (fc->output_eof && *fc->output_eof) + if (fc->output_error && *fc->output_error) { + t->ret = *fc->output_error; goto err_out; + } again: ib = fc->inbuf; loaded = fc->in_loaded; @@ -153,7 +154,7 @@ again: if (conv) goto again; t->ret = 1; - if (!*fc->input_eof) + if (!*fc->input_error) return; if (*fc->out_loaded) return; @@ -161,7 +162,7 @@ again: return; t->ret = -E_FC_EOF; err_out: - fc->eof = 1; + fc->error = t->ret; } /** diff --git a/http_recv.c b/http_recv.c index 4283db4b..8c19d6d7 100644 --- a/http_recv.c +++ b/http_recv.c @@ -100,9 +100,10 @@ static void http_recv_post_select(struct sched *s, struct task *t) struct receiver_node *rn = t->private_data; struct private_http_recv_data *phd = rn->private_data; - t->ret = -E_HTTP_RECV_EOF; - if (rn->output_eof && *rn->output_eof) + if (rn->output_error && *rn->output_error) { + t->ret = *rn->output_error; goto out; + } t->ret = 1; if (!s->select_ret) goto out; @@ -136,13 +137,13 @@ static void http_recv_post_select(struct sched *s, struct task *t) BUFSIZE - rn->loaded); if (t->ret <= 0) { if (!t->ret) - t->ret = -E_HTTP_RECV_EOF; + t->ret = -E_RECV_EOF; goto out; } rn->loaded += t->ret; out: if (t->ret < 0) - rn->eof = 1; + rn->error = t->ret; } static void http_recv_close(struct receiver_node *rn) diff --git a/oggdec.c b/oggdec.c index 3dced8a0..4c8aae5f 100644 --- a/oggdec.c +++ b/oggdec.c @@ -46,7 +46,7 @@ static size_t cb_read(void *buf, size_t size, size_t nmemb, void *datasource) // PARA_DEBUG_LOG("pod = %p\n", pod); // PARA_DEBUG_LOG("vorbis requests %d bytes, have %d\n", size * nmemb, have); if (pod->inbuf_len < size) { - if (*fn->fc->input_eof) + if (*fn->fc->input_error) return 0; errno = EAGAIN; return (size_t)-1; @@ -123,7 +123,7 @@ static ssize_t ogg_convert(char *inbuffer, size_t len, struct filter_node *fn) if (!pod->vf) { int ib = 1024 * conf->initial_buffer_arg; /* initial buffer */ - if (len fc->input_eof) { + if (len fc->input_error) { PARA_DEBUG_LOG("initial input buffer %zd/%d, " "waiting for more data\n", len, ib); return 0; @@ -149,7 +149,7 @@ static ssize_t ogg_convert(char *inbuffer, size_t len, struct filter_node *fn) PARA_NOTICE_LOG("%d channels, %d Hz\n", fn->fc->channels, fn->fc->samplerate); } - while (!*fn->fc->input_eof && fn->loaded < fn->bufsize) { + while (!*fn->fc->input_error && fn->loaded < fn->bufsize) { int length = fn->bufsize - fn->loaded; long read_ret = ov_read(pod->vf, fn->buf + fn->loaded, length, ENDIAN, 2 /* 16 bit */, 1 /* signed */, NULL); diff --git a/ortp_recv.c b/ortp_recv.c index fca470ed..b28d084a 100644 --- a/ortp_recv.c +++ b/ortp_recv.c @@ -123,9 +123,9 @@ static void ortp_recv_post_select(__a_unused struct sched *s, struct task *t) size_t packet_size; // PARA_INFO_LOG("rn: %p, pord: %p, session: %p\n", rn, pord, pord->session); - t->ret = -E_ORTP_RECV_EOF; - if (rn->output_eof && *rn->output_eof) { - rn->eof = 1; + if (rn->output_error && *rn->output_error) { + rn->error = *rn->output_error; + t->ret = rn->error; return; } t->ret = 1; @@ -150,11 +150,11 @@ static void ortp_recv_post_select(__a_unused struct sched *s, struct task *t) pord->start = *now; t->ret = msg_to_buf(mp, tmpbuf, CHUNK_SIZE); if (t->ret < ORTP_AUDIO_HEADER_LEN) { - rn->eof = 1; if (t->ret < 0) t->ret = -E_MSG_TO_BUF; else t->ret = -E_ORTP_RECV_EOF; + rn->error = t->ret; goto err_out; } packet_size = t->ret; @@ -168,8 +168,8 @@ static void ortp_recv_post_select(__a_unused struct sched *s, struct task *t) switch (packet_type) { unsigned header_len, payload_len; case ORTP_EOF: - rn->eof = 1; - t->ret = -E_ORTP_RECV_EOF; + t->ret = -E_RECV_EOF; + rn->error = t->ret; goto err_out; case ORTP_BOF: PARA_INFO_LOG("bof (%zu)\n", packet_size); @@ -222,7 +222,7 @@ success: compute_next_chunk(chunk_time, pord); return; err_out: - rn->eof = 1; + rn->error = t->ret; freemsg(mp); } diff --git a/recv.c b/recv.c index 0fe46411..d1909f25 100644 --- a/recv.c +++ b/recv.c @@ -49,7 +49,7 @@ static void rn_event_handler(struct task *t) { struct receiver_node *rn = t->private_data; PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret)); - rn->eof = 1; + rn->error = t->ret; unregister_task(t); } @@ -94,7 +94,7 @@ int main(int argc, char *argv[]) stdout_set_defaults(&sot); sot.buf = rn.buf; sot.loaded = &rn.loaded; - sot.input_eof = &rn.eof; + sot.input_error = &rn.error; register_task(&sot.task); rn.task.private_data = &rn; diff --git a/recv.h b/recv.h index e1ef4e21..6d318d6a 100644 --- a/recv.h +++ b/recv.h @@ -18,10 +18,10 @@ struct receiver_node { size_t loaded; /** receiver-specific data */ void *private_data; - /** set to 1 if end of file is reached */ - int eof; - /** pointer to the eof member of the consumer */ - int *output_eof; + /** Set to non-zero error value on errors or on end of file. */ + int error; + /** Pointer to the error member of the consumer. */ + int *output_error; /** pointer to the configuration data for this instance */ void *conf; /** the task associated with this instance */ diff --git a/stdin.c b/stdin.c index 649d464b..25eca2ef 100644 --- a/stdin.c +++ b/stdin.c @@ -75,7 +75,7 @@ static void stdin_post_select(struct sched *s, struct task *t) } else t->ret = -E_STDIN_EOF; if (t->ret < 0) - sit->eof = 1; + sit->error = t->ret; } /** @@ -92,7 +92,7 @@ void stdin_set_defaults(struct stdin_task *sit) { sit->bufsize = 16 * 1024, sit->loaded = 0, - sit->eof = 0, + sit->error = 0, sit->task.pre_select = stdin_pre_select; sit->task.post_select = stdin_post_select; sit->task.event_handler = stdin_default_event_handler; diff --git a/stdin.h b/stdin.h index 4f5a6112..515c4d26 100644 --- a/stdin.h +++ b/stdin.h @@ -19,7 +19,7 @@ struct stdin_task { /** The task structure. */ struct task task; /** Non-zero on read error, or if a read from stdin returned zero. */ - int eof; + int error; }; void stdin_set_defaults(struct stdin_task *sit); diff --git a/stdout.c b/stdout.c index fe188c9a..babed752 100644 --- a/stdout.c +++ b/stdout.c @@ -33,8 +33,8 @@ static void stdout_pre_select(struct sched *s, struct task *t) t->ret = 1; sot->check_fd = 0; if (!*sot->loaded) { - if (*sot->input_eof) { - t->ret = -E_STDOUT_EOF; + if (*sot->input_error) { + t->ret = *sot->input_error; s->timeout.tv_sec = 0; s->timeout.tv_usec = 1; } @@ -62,8 +62,8 @@ static void stdout_post_select(struct sched *s, struct task *t) t->ret = 1; if (!sot->check_fd) { - if (*sot->input_eof) - t->ret = -E_STDOUT_EOF; + if (*sot->input_error) + t->ret = *sot->input_error; return; } if (!FD_ISSET(STDOUT_FILENO, &s->wfds)) @@ -99,7 +99,7 @@ void stdout_set_defaults(struct stdout_task *sot) sot->task.pre_select = stdout_pre_select; sot->task.post_select = stdout_post_select; sot->task.event_handler = stdout_default_event_handler; - sot->eof = 0; + sot->error = 0; mark_fd_nonblocking(STDOUT_FILENO); sprintf(sot->task.status, "stdout writer"); } diff --git a/stdout.h b/stdout.h index ee4f35ff..92e1cf49 100644 --- a/stdout.h +++ b/stdout.h @@ -14,10 +14,10 @@ struct stdout_task { char *buf; /** Number of bytes loaded in \a buf. */ size_t *loaded; - /** Üointer to the eof flag of the feeding task. */ - int *input_eof; - /** Non-zero if a write error occured. */ - int eof; + /** Pointer to the error variable of the feeding task. */ + int *input_error; + /** Non-zero if a write error occurred. */ + int error; /** The task structure. */ struct task task; /** Whether \p STDOUT_FILENO was included in the write fd set. */ diff --git a/write.c b/write.c index 3b87335f..125df1c1 100644 --- a/write.c +++ b/write.c @@ -28,11 +28,11 @@ struct check_wav_task { char *buf; /** Number of bytes loaded in \a buf. */ size_t *loaded; - /** Non-zero if end of file was reached. */ - int *eof; + /** Non-zero if an error occurred or end of file was reached. */ + int *error; /** Number of channels specified in wav header given by \a buf. */ unsigned channels; - /** Samplerate specified in wav header given by \a buf. */ + /** Sample rate specified in wav header given by \a buf. */ unsigned samplerate; /** The task structure for this task. */ struct task task; @@ -67,7 +67,7 @@ static void check_wav_pre_select(__a_unused struct sched *s, struct task *t) unsigned char *a; if (*wt->loaded < WAV_HEADER_LEN) { - t->ret = *wt->eof? -E_PREMATURE_END : 1; + t->ret = *wt->error? -E_PREMATURE_END : 1; return; } wt->channels = 2; @@ -170,7 +170,7 @@ static void idt_event_handler(struct task *t) unregister_task(t); wng->buf = sit.buf; wng->loaded = &sit.loaded; - wng->input_eof = &sit.eof; + wng->input_error = &sit.error; wng->task.event_handler = wng_event_handler; wng->channels = &cwt.channels; wng->samplerate = &cwt.samplerate; @@ -234,7 +234,7 @@ int main(int argc, char *argv[]) cwt.task.event_handler = cwt_event_handler; cwt.buf = sit.buf; cwt.loaded = &sit.loaded; - cwt.eof = &sit.eof; + cwt.error = &sit.error; sprintf(cwt.task.status, "check wav"); register_task(&cwt.task); diff --git a/write.h b/write.h index 2eb4e3db..71c666d6 100644 --- a/write.h +++ b/write.h @@ -101,10 +101,10 @@ struct writer_node_group { struct writer_node *writer_nodes; /** the maximum of the chunk_bytes values of the writer nodes in this group */ size_t max_chunk_bytes; - /** non-zero if end of file was encountered by the feeding task */ - int *input_eof; - /** non-zero if end of file was encountered */ - int eof; + /** Non-zero if an error or end of file was encountered by the feeding task. */ + int *input_error; + /** Non-zero if an error occurred or end of file was encountered. */ + int error; /** current output buffer */ char *buf; /** number of bytes loaded in the output buffer */ diff --git a/write_common.c b/write_common.c index b35773f7..a8d21149 100644 --- a/write_common.c +++ b/write_common.c @@ -28,7 +28,7 @@ static void wng_pre_select(__a_unused struct sched *s, struct task *t) struct writer_node *wn = &g->writer_nodes[i]; t->ret = wn->writer->pre_select(s, wn); if (t->ret < 0) { - g->eof = 1; + g->error = t->ret; return; } } @@ -44,7 +44,7 @@ static void wng_post_select(struct sched *s, struct task *t) struct writer_node *wn = &g->writer_nodes[i]; t->ret = wn->writer->post_select(s, wn); if (t->ret < 0) { - g->eof = 1; + g->error = t->ret; return; } if (!i) @@ -58,9 +58,9 @@ static void wng_post_select(struct sched *s, struct task *t) FOR_EACH_WRITER_NODE(i, g) g->writer_nodes[i].written -= min_written; } - if (!*g->loaded && *g->input_eof) { - g->eof = 1; - t->ret = -E_WNG_EOF; + if (!*g->loaded && *g->input_error) { + g->error = *g->input_error; + t->ret = g->error; return; } t->ret = 1; @@ -96,7 +96,7 @@ int wng_open(struct writer_node_group *g) g->max_chunk_bytes = PARA_MAX(g->max_chunk_bytes, ret); } sprintf(g->task.status, "%s", "writer node group"); - g->eof = 0; + g->error = 0; return 1; err_out: PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret)); @@ -106,7 +106,7 @@ err_out: wn->writer->close(wn); } g->num_writers = 0; - g->eof = 1; + g->error = ret; return ret; } @@ -119,7 +119,6 @@ err_out: void wng_unregister(struct writer_node_group *g) { unregister_task(&g->task); - g->eof = 1; } /**