if (fn->loaded > fn->bufsize * 3 / 5)
return 0;
- if (len < 2048 && !*fc->input_eof)
+ if (len < 2048 && !*fc->input_error)
return 0;
if (!padd->initialized) {
// PARA_INFO_LOG("%zd frames\n", frames);
if (!frames) {
- if (*wng->input_eof)
+ if (*wng->input_error)
wn->written = *wng->loaded;
return 1;
}
if (s->format < 0 || !s->receiver_node)
return;
a = &afi[s->format];
- PARA_NOTICE_LOG("closing %s receiver in slot %d (eof = %d)\n",
- audio_formats[s->format] , slot_num, s->receiver_node->eof);
+ PARA_NOTICE_LOG("closing %s receiver in slot %d\n",
+ audio_formats[s->format], slot_num);
a->receiver->close(s->receiver_node);
free(s->receiver_node);
s->receiver_node = NULL;
}
-static void kill_all_decoders(void)
+static void kill_all_decoders(int error)
{
int i;
FOR_EACH_SLOT(i) {
struct slot_info *s = &slot[i];
- if (s->wng && !s->wng->eof) {
+ if (s->wng && !s->wng->error) {
PARA_INFO_LOG("unregistering writer node group in slot %d\n",
i);
wng_unregister(s->wng);
- s->wng->eof = 1;
+ s->wng->error = error;
}
- if (s->fc && !s->fc->eof) {
+ if (s->fc && !s->fc->error) {
PARA_INFO_LOG("unregistering filter chain in slot %d\n", i);
unregister_task(&s->fc->task);
- s->fc->eof = 1;
+ s->fc->error = error;
}
- if (s->receiver_node && !s->receiver_node->eof) {
+ if (s->receiver_node && !s->receiver_node->error) {
PARA_INFO_LOG("unregistering receiver_node in slot %d\n", i);
unregister_task(&s->receiver_node->task);
- s->receiver_node->eof = 1;
+ s->receiver_node->error = error;
}
}
}
{
PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret));
struct filter_chain *fc = t->private_data;
- fc->eof = 1;
+ fc->error = t->ret;
unregister_task(t);
}
INIT_LIST_HEAD(&s->fc->filters);
s->fc->inbuf = s->receiver_node->buf;
s->fc->in_loaded = &s->receiver_node->loaded;
- s->fc->input_eof = &s->receiver_node->eof;
+ s->fc->input_error = &s->receiver_node->error;
s->fc->task.pre_select = filter_pre_select;
s->fc->task.event_handler = filter_event_handler;
s->fc->task.private_data = s->fc;
- s->fc->eof = 0;
+ s->fc->error = 0;
- s->receiver_node->output_eof = &s->fc->eof;
+ s->receiver_node->output_error = &s->fc->error;
sprintf(s->fc->task.status, "filter chain");
for (i = 0; i < nf; i++) {
struct filter_node *fn = para_calloc(sizeof(struct filter_node));
struct writer_node_group *wng = t->private_data;
PARA_INFO_LOG("%s\n", PARA_STRERROR(-t->ret));
- wng->eof = 1;
+ wng->error = t->ret;
wng_unregister(wng);
}
if (s->fc) {
s->wng->buf = s->fc->outbuf;
s->wng->loaded = s->fc->out_loaded;
- s->wng->input_eof = &s->fc->eof;
+ s->wng->input_error = &s->fc->error;
s->wng->channels = &s->fc->channels;
s->wng->samplerate = &s->fc->samplerate;
- s->fc->output_eof = &s->wng->eof;
+ s->fc->output_error = &s->wng->error;
PARA_INFO_LOG("samplerate: %d\n", *s->wng->samplerate);
} else {
s->wng->buf = s->receiver_node->buf;
s->wng->loaded = &s->receiver_node->loaded;
- s->wng->input_eof = &s->receiver_node->eof;
+ s->wng->input_error = &s->receiver_node->error;
}
s->wng->task.event_handler = wng_event_handler;
for (i = 0; i < a->num_writers; i++) {
static void rn_event_handler(struct task *t)
{
struct receiver_node *rn = t->private_data;
- const struct timeval restart_delay = {0, 10 * 1000};
int i;
PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret));
unregister_task(t);
- rn->eof = 1;
+ rn->error = t->ret;
/* set restart barrier */
FOR_EACH_SLOT(i) {
+ struct timeval restart_delay = {0, 10 * 1000};
if (slot[i].receiver_node != rn)
continue;
+ if (rn->error != -E_RECV_EOF)
+ /* don't reconnect immediately on errors */
+ restart_delay.tv_sec = 5;
tv_add(now, &restart_delay, &afi[slot[i].format].restart_barrier);
}
}
FOR_EACH_SLOT(i) {
struct slot_info *s = &slot[i];
if (s->format == format && s->receiver_node
- && !s->receiver_node->eof)
+ && !s->receiver_node->error)
return 1;
}
return 0;
if (s->format < 0)
return;
- if (s->receiver_node && !s->receiver_node->eof)
+ if (s->receiver_node && !s->receiver_node->error)
return;
- if (s->fc && !s->fc->eof)
+ if (s->fc && !s->fc->error)
return;
- if (s->wng && !s->wng->eof)
+ if (s->wng && !s->wng->error)
return;
- PARA_INFO_LOG("closing slot %d \n", slot_num);
+ PARA_INFO_LOG("closing slot %d\n", slot_num);
wng_close(s->wng);
close_filters(s->fc);
free(s->fc);
t->ret = 1;
if (audiod_status != AUDIOD_ON || !stat_task->playing)
- return kill_all_decoders();
+ return kill_all_decoders(-E_NOT_PLAYING);
if (open_current_receiver(s))
s->timeout = min_delay;
FOR_EACH_SLOT(i) {
register_task(&sit.task);
p->inbuf = sit.buf;
p->in_loaded = &sit.loaded;
- p->in_eof = &sit.eof;
+ p->in_error = &sit.error;
return;
}
stdout_set_defaults(&sot);
sot.buf = p->buf;
sot.loaded = &p->loaded;
- sot.input_eof = &p->eof;
+ sot.input_error = &p->eof;
register_task(&sot.task);
}
char *inbuf;
/** number of bytes loaded in \p inbuf */
size_t *in_loaded;
- /** non-zero if input task encountered an eof or an errro condition */
- int *in_eof;
+ /** Non-zero if input task encountered an eof or an error condition. */
+ int *in_error;
};
void client_close(struct private_client_data *pcd);
para_fd_set(pcd->fd, &s->wfds, &s->max_fileno);
pcd->check_w = 1;
} else {
- if (*pcd->in_eof) {
- t->ret = -E_INPUT_EOF;
+ if (*pcd->in_error) {
+ t->ret = *pcd->in_error;
s->timeout.tv_sec = 0;
s->timeout.tv_usec = 1;
}
struct receiver_node *rn = t->private_data;
struct private_dccp_recv_data *pdd = rn->private_data;
- t->ret = -E_DCCP_RECV_EOF;
- if (rn->output_eof && *rn->output_eof)
+ if (rn->output_error && *rn->output_error) {
+ t->ret = *rn->output_error;
goto out;
+ }
t->ret = 1;
if (!s->select_ret || !FD_ISSET(pdd->fd, &s->rfds))
goto out; /* nothing to do */
DCCP_BUFSIZE - rn->loaded);
if (t->ret <= 0) {
if (!t->ret)
- t->ret = -E_DCCP_RECV_EOF;
+ t->ret = -E_RECV_EOF;
goto out;
}
rn->loaded += t->ret;
return;
out:
if (t->ret < 0)
- rn->eof = 1;
+ rn->error = t->ret;
}
/**
#define RINGBUFFER_ERRORS
#define SCORE_ERRORS
#define SHA1_ERRORS
-#define RECV_ERRORS
#define AFH_COMMON_ERRORS
#define RBTREE_ERRORS
-
+#define RECV_ERRORS
extern const char **para_errlist[];
PARA_ERROR(NO_CONFIG, "config file not found"), \
PARA_ERROR(CLIENT_AUTH, "authentication failed"), \
PARA_ERROR(SERVER_EOF, "connection closed by para_server"), \
- PARA_ERROR(INPUT_EOF, "end of input"), \
PARA_ERROR(HANDSHAKE_COMPLETE, ""), /* not really an error */ \
#define STDOUT_ERRORS \
PARA_ERROR(STDOUT_WRITE, "failed to write to stdout"), \
- PARA_ERROR(STDOUT_EOF, "end of file"), \
#define NET_ERRORS \
#define HTTP_RECV_ERRORS \
- PARA_ERROR(HTTP_RECV_EOF, "http_recv: end of file"), \
PARA_ERROR(HTTP_RECV_OVERRUN, "http_recv: output buffer overrun"), \
#define RECV_COMMON_ERRORS \
PARA_ERROR(RECV_SYNTAX, "recv syntax error"), \
+ PARA_ERROR(RECV_EOF, "end of file"), \
+ PARA_ERROR(RECV_CONNECT, "connection failed"), \
+
#define AUDIOD_ERRORS \
PARA_ERROR(MISSING_COLON, "syntax error: missing colon"), \
PARA_ERROR(UNSUPPORTED_AUDIO_FORMAT, "given audio format not supported"), \
PARA_ERROR(SIGNAL_CAUGHT, "caught signal"), \
+ PARA_ERROR(NOT_PLAYING, "not playing"), \
#define AUDIOD_COMMAND_ERRORS \
#define DCCP_RECV_ERRORS \
PARA_ERROR(DCCP_OVERRUN, "dccp output buffer buffer overrun"), \
- PARA_ERROR(DCCP_RECV_EOF, "dccp_recv: end of file"), \
#define DCCP_SEND_ERRORS \
#define WRITE_COMMON_ERRORS \
PARA_ERROR(WRITE_COMMON_SYNTAX, "syntax error in write option"), \
- PARA_ERROR(WNG_EOF, "wng: end of file"), \
#define AACDEC_ERRORS \
fc->inbuf = sit->buf;
fc->in_loaded = &sit->loaded;
- fc->input_eof = &sit->eof;
- fc->eof = 0;
- fc->output_eof = &sot->eof;
+ fc->input_error = &sit->error;
+ fc->error = 0;
+ fc->output_error = &sot->error;
fc->task.private_data = fc;
fc->task.pre_select = filter_pre_select;
fc->task.event_handler = filter_event_handler;
stdout_set_defaults(sot);
sot->buf = fc->outbuf;
sot->loaded = fc->out_loaded;
- sot->input_eof = &fc->eof;
+ sot->input_error = &fc->error;
register_task(&sit->task);
register_task(&fc->task);
* pointer to variable containing the number of bytes loaded in the output buffer
*/
size_t *out_loaded;
- /** non-zero if this filter wont' produce any more output */
- int eof;
- /** pointer to the eof flag of the receiving application */
- int *input_eof;
- /** pointer to the eof flag of the writing application */
- int *output_eof;
+ /** Non-zero if this filter wont' produce any more output. */
+ int error;
+ /** Pointer to the error variable of the receiving application. */
+ int *input_error;
+ /** Pointer to the eof flag of the writing application. */
+ int *output_error;
/** the task associated with the filter chain */
struct task task;
};
size_t *loaded;
int conv, conv_total = 0;
- t->ret = -E_FC_EOF;
- if (fc->output_eof && *fc->output_eof)
+ if (fc->output_error && *fc->output_error) {
+ t->ret = *fc->output_error;
goto err_out;
+ }
again:
ib = fc->inbuf;
loaded = fc->in_loaded;
if (conv)
goto again;
t->ret = 1;
- if (!*fc->input_eof)
+ if (!*fc->input_error)
return;
if (*fc->out_loaded)
return;
return;
t->ret = -E_FC_EOF;
err_out:
- fc->eof = 1;
+ fc->error = t->ret;
}
/**
struct receiver_node *rn = t->private_data;
struct private_http_recv_data *phd = rn->private_data;
- t->ret = -E_HTTP_RECV_EOF;
- if (rn->output_eof && *rn->output_eof)
+ if (rn->output_error && *rn->output_error) {
+ t->ret = *rn->output_error;
goto out;
+ }
t->ret = 1;
if (!s->select_ret)
goto out;
BUFSIZE - rn->loaded);
if (t->ret <= 0) {
if (!t->ret)
- t->ret = -E_HTTP_RECV_EOF;
+ t->ret = -E_RECV_EOF;
goto out;
}
rn->loaded += t->ret;
out:
if (t->ret < 0)
- rn->eof = 1;
+ rn->error = t->ret;
}
static void http_recv_close(struct receiver_node *rn)
// PARA_DEBUG_LOG("pod = %p\n", pod);
// PARA_DEBUG_LOG("vorbis requests %d bytes, have %d\n", size * nmemb, have);
if (pod->inbuf_len < size) {
- if (*fn->fc->input_eof)
+ if (*fn->fc->input_error)
return 0;
errno = EAGAIN;
return (size_t)-1;
if (!pod->vf) {
int ib = 1024 * conf->initial_buffer_arg; /* initial buffer */
- if (len <ib && !*fn->fc->input_eof) {
+ if (len <ib && !*fn->fc->input_error) {
PARA_DEBUG_LOG("initial input buffer %zd/%d, "
"waiting for more data\n", len, ib);
return 0;
PARA_NOTICE_LOG("%d channels, %d Hz\n", fn->fc->channels,
fn->fc->samplerate);
}
- while (!*fn->fc->input_eof && fn->loaded < fn->bufsize) {
+ while (!*fn->fc->input_error && fn->loaded < fn->bufsize) {
int length = fn->bufsize - fn->loaded;
long read_ret = ov_read(pod->vf, fn->buf + fn->loaded, length,
ENDIAN, 2 /* 16 bit */, 1 /* signed */, NULL);
size_t packet_size;
// PARA_INFO_LOG("rn: %p, pord: %p, session: %p\n", rn, pord, pord->session);
- t->ret = -E_ORTP_RECV_EOF;
- if (rn->output_eof && *rn->output_eof) {
- rn->eof = 1;
+ if (rn->output_error && *rn->output_error) {
+ rn->error = *rn->output_error;
+ t->ret = rn->error;
return;
}
t->ret = 1;
pord->start = *now;
t->ret = msg_to_buf(mp, tmpbuf, CHUNK_SIZE);
if (t->ret < ORTP_AUDIO_HEADER_LEN) {
- rn->eof = 1;
if (t->ret < 0)
t->ret = -E_MSG_TO_BUF;
else
t->ret = -E_ORTP_RECV_EOF;
+ rn->error = t->ret;
goto err_out;
}
packet_size = t->ret;
switch (packet_type) {
unsigned header_len, payload_len;
case ORTP_EOF:
- rn->eof = 1;
- t->ret = -E_ORTP_RECV_EOF;
+ t->ret = -E_RECV_EOF;
+ rn->error = t->ret;
goto err_out;
case ORTP_BOF:
PARA_INFO_LOG("bof (%zu)\n", packet_size);
compute_next_chunk(chunk_time, pord);
return;
err_out:
- rn->eof = 1;
+ rn->error = t->ret;
freemsg(mp);
}
{
struct receiver_node *rn = t->private_data;
PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret));
- rn->eof = 1;
+ rn->error = t->ret;
unregister_task(t);
}
stdout_set_defaults(&sot);
sot.buf = rn.buf;
sot.loaded = &rn.loaded;
- sot.input_eof = &rn.eof;
+ sot.input_error = &rn.error;
register_task(&sot.task);
rn.task.private_data = &rn;
size_t loaded;
/** receiver-specific data */
void *private_data;
- /** set to 1 if end of file is reached */
- int eof;
- /** pointer to the eof member of the consumer */
- int *output_eof;
+ /** Set to non-zero error value on errors or on end of file. */
+ int error;
+ /** Pointer to the error member of the consumer. */
+ int *output_error;
/** pointer to the configuration data for this instance */
void *conf;
/** the task associated with this instance */
} else
t->ret = -E_STDIN_EOF;
if (t->ret < 0)
- sit->eof = 1;
+ sit->error = t->ret;
}
/**
{
sit->bufsize = 16 * 1024,
sit->loaded = 0,
- sit->eof = 0,
+ sit->error = 0,
sit->task.pre_select = stdin_pre_select;
sit->task.post_select = stdin_post_select;
sit->task.event_handler = stdin_default_event_handler;
/** The task structure. */
struct task task;
/** Non-zero on read error, or if a read from stdin returned zero. */
- int eof;
+ int error;
};
void stdin_set_defaults(struct stdin_task *sit);
t->ret = 1;
sot->check_fd = 0;
if (!*sot->loaded) {
- if (*sot->input_eof) {
- t->ret = -E_STDOUT_EOF;
+ if (*sot->input_error) {
+ t->ret = *sot->input_error;
s->timeout.tv_sec = 0;
s->timeout.tv_usec = 1;
}
t->ret = 1;
if (!sot->check_fd) {
- if (*sot->input_eof)
- t->ret = -E_STDOUT_EOF;
+ if (*sot->input_error)
+ t->ret = *sot->input_error;
return;
}
if (!FD_ISSET(STDOUT_FILENO, &s->wfds))
sot->task.pre_select = stdout_pre_select;
sot->task.post_select = stdout_post_select;
sot->task.event_handler = stdout_default_event_handler;
- sot->eof = 0;
+ sot->error = 0;
mark_fd_nonblocking(STDOUT_FILENO);
sprintf(sot->task.status, "stdout writer");
}
char *buf;
/** Number of bytes loaded in \a buf. */
size_t *loaded;
- /** Üointer to the eof flag of the feeding task. */
- int *input_eof;
- /** Non-zero if a write error occured. */
- int eof;
+ /** Pointer to the error variable of the feeding task. */
+ int *input_error;
+ /** Non-zero if a write error occurred. */
+ int error;
/** The task structure. */
struct task task;
/** Whether \p STDOUT_FILENO was included in the write fd set. */
char *buf;
/** Number of bytes loaded in \a buf. */
size_t *loaded;
- /** Non-zero if end of file was reached. */
- int *eof;
+ /** Non-zero if an error occurred or end of file was reached. */
+ int *error;
/** Number of channels specified in wav header given by \a buf. */
unsigned channels;
- /** Samplerate specified in wav header given by \a buf. */
+ /** Sample rate specified in wav header given by \a buf. */
unsigned samplerate;
/** The task structure for this task. */
struct task task;
unsigned char *a;
if (*wt->loaded < WAV_HEADER_LEN) {
- t->ret = *wt->eof? -E_PREMATURE_END : 1;
+ t->ret = *wt->error? -E_PREMATURE_END : 1;
return;
}
wt->channels = 2;
unregister_task(t);
wng->buf = sit.buf;
wng->loaded = &sit.loaded;
- wng->input_eof = &sit.eof;
+ wng->input_error = &sit.error;
wng->task.event_handler = wng_event_handler;
wng->channels = &cwt.channels;
wng->samplerate = &cwt.samplerate;
cwt.task.event_handler = cwt_event_handler;
cwt.buf = sit.buf;
cwt.loaded = &sit.loaded;
- cwt.eof = &sit.eof;
+ cwt.error = &sit.error;
sprintf(cwt.task.status, "check wav");
register_task(&cwt.task);
struct writer_node *writer_nodes;
/** the maximum of the chunk_bytes values of the writer nodes in this group */
size_t max_chunk_bytes;
- /** non-zero if end of file was encountered by the feeding task */
- int *input_eof;
- /** non-zero if end of file was encountered */
- int eof;
+ /** Non-zero if an error or end of file was encountered by the feeding task. */
+ int *input_error;
+ /** Non-zero if an error occurred or end of file was encountered. */
+ int error;
/** current output buffer */
char *buf;
/** number of bytes loaded in the output buffer */
struct writer_node *wn = &g->writer_nodes[i];
t->ret = wn->writer->pre_select(s, wn);
if (t->ret < 0) {
- g->eof = 1;
+ g->error = t->ret;
return;
}
}
struct writer_node *wn = &g->writer_nodes[i];
t->ret = wn->writer->post_select(s, wn);
if (t->ret < 0) {
- g->eof = 1;
+ g->error = t->ret;
return;
}
if (!i)
FOR_EACH_WRITER_NODE(i, g)
g->writer_nodes[i].written -= min_written;
}
- if (!*g->loaded && *g->input_eof) {
- g->eof = 1;
- t->ret = -E_WNG_EOF;
+ if (!*g->loaded && *g->input_error) {
+ g->error = *g->input_error;
+ t->ret = g->error;
return;
}
t->ret = 1;
g->max_chunk_bytes = PARA_MAX(g->max_chunk_bytes, ret);
}
sprintf(g->task.status, "%s", "writer node group");
- g->eof = 0;
+ g->error = 0;
return 1;
err_out:
PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret));
wn->writer->close(wn);
}
g->num_writers = 0;
- g->eof = 1;
+ g->error = ret;
return ret;
}
void wng_unregister(struct writer_node_group *g)
{
unregister_task(&g->task);
- g->eof = 1;
}
/**