Code which accesses the ->error field of another task has been
a source of bugs in the past. This patch is a first step to make
->error private to sched.c.
Currently the ->post_select() methods of all tasks are supposed
to set ->error to a negative value to indicate that the task
should be terminated, i.e. t->error being negative instructs the
scheduler to not call t->pre_select() or t->post_select() any more
in subsequent iterations. An equivalent way to achieve the same
is to let ->post_select() return an error code instead. Benefits
include
* It reduces the use of ->error outside of sched.c
* It's impossible to miss setting the error code
* It simplifies code slightly
Therefore we'd like to change the prototype of the ->post_select()
methods from
void (*post_select)(struct sched *s, struct task *t);
to
int (*post_select)(struct sched *s, struct task *t);
Changes to struct sched affects many parts of the code base,
hence converting all users at once would result in a very
large patch. Therefore we temporarily introduce an additional
->new_post_select() method according to the second declaration
above. The scheduler calls the new method if it is defined and falls
back to the old variant otherwise. This approach allows to switch
over one task after another without breaking things.
This patch introduces the infrastructure just described and switches
over the http receiver. Subsequent commits will change all other tasks
in the same way. After all tasks have been switched over we can get
rid of the old ->post_select variant and rename ->new_post_select()
back to ->post_select().
fn->filter_num = a->filter_nums[i];
fn->conf = a->filter_conf[i];
fn->task.pre_select = f->pre_select;
fn->filter_num = a->filter_nums[i];
fn->conf = a->filter_conf[i];
fn->task.pre_select = f->pre_select;
- fn->task.post_select = f->post_select;
-
+ if (f->new_post_select) {
+ fn->task.new_post_select = f->new_post_select;
+ fn->task.post_select = NULL;
+ } else {
+ fn->task.new_post_select = NULL;
+ fn->task.post_select = f->post_select;
+ }
fn->btrn = btr_new_node(&(struct btr_node_description)
EMBRACE(.name = f->name, .parent = parent,
.handler = f->execute, .context = fn));
fn->btrn = btr_new_node(&(struct btr_node_description)
EMBRACE(.name = f->name, .parent = parent,
.handler = f->execute, .context = fn));
PARA_NOTICE_LOG("started %s: %s receiver in slot %d\n",
audio_formats[format], r->name, slot_num);
rn->task.pre_select = r->pre_select;
PARA_NOTICE_LOG("started %s: %s receiver in slot %d\n",
audio_formats[format], r->name, slot_num);
rn->task.pre_select = r->pre_select;
- rn->task.post_select = r->post_select;
+ if (r->new_post_select) {
+ rn->task.new_post_select = r->new_post_select;
+ rn->task.post_select = NULL;
+ } else {
+ rn->task.new_post_select = NULL;
+ rn->task.post_select = r->post_select;
+ }
sprintf(rn->task.status, "%s receiver node", r->name);
register_task(&sched, &rn->task);
return slot_num;
sprintf(rn->task.status, "%s receiver node", r->name);
register_task(&sched, &rn->task);
return slot_num;
{
ct->task.pre_select = command_pre_select;
ct->task.post_select = command_post_select;
{
ct->task.pre_select = command_pre_select;
ct->task.post_select = command_post_select;
+ ct->task.new_post_select = NULL;
ct->task.error = 0;
ct->fd = audiod_get_socket(); /* doesn't return on errors */
sprintf(ct->task.status, "command task");
ct->task.error = 0;
ct->fd = audiod_get_socket(); /* doesn't return on errors */
sprintf(ct->task.status, "command task");
EMBRACE(.name = f->name, .parent = parent,
.handler = f->execute, .context = fn));
fn->task.pre_select = f->pre_select;
EMBRACE(.name = f->name, .parent = parent,
.handler = f->execute, .context = fn));
fn->task.pre_select = f->pre_select;
- fn->task.post_select = f->post_select;
+ if (f->new_post_select) {
+ fn->task.new_post_select = f->new_post_select;
+ fn->task.post_select = NULL;
+ } else {
+ fn->task.new_post_select = NULL;
+ fn->task.post_select = f->post_select;
+ }
f->open(fn);
register_task(&s, &fn->task);
parent = fn->btrn;
f->open(fn);
register_task(&s, &fn->task);
parent = fn->btrn;
* error code.
*/
void (*post_select)(struct sched *s, struct task *t);
* error code.
*/
void (*post_select)(struct sched *s, struct task *t);
+ /** New variant, see sched.h. */
+ int (*new_post_select)(struct sched *s, struct task *t);
/**
* Answer a buffer tree query.
*
/**
* Answer a buffer tree query.
*
struct receiver_node *rn = container_of(t, struct receiver_node, task);
struct private_http_recv_data *phd = rn->private_data;
struct receiver_node *rn = container_of(t, struct receiver_node, task);
struct private_http_recv_data *phd = rn->private_data;
if (generic_recv_pre_select(s, t) <= 0)
return;
if (phd->status == HTTP_CONNECTED)
if (generic_recv_pre_select(s, t) <= 0)
return;
if (phd->status == HTTP_CONNECTED)
* area with data read from the socket. In any case, update the state of the
* connection if necessary.
*/
* area with data read from the socket. In any case, update the state of the
* connection if necessary.
*/
-static void http_recv_post_select(struct sched *s, struct task *t)
+static int http_recv_post_select(struct sched *s, struct task *t)
{
struct receiver_node *rn = container_of(t, struct receiver_node, task);
struct private_http_recv_data *phd = rn->private_data;
{
struct receiver_node *rn = container_of(t, struct receiver_node, task);
struct private_http_recv_data *phd = rn->private_data;
if (ret < 0)
goto out;
if (ret == 0)
if (ret < 0)
goto out;
if (ret == 0)
if (phd->status == HTTP_CONNECTED) {
char *rq;
if (!FD_ISSET(rn->fd, &s->wfds))
if (phd->status == HTTP_CONNECTED) {
char *rq;
if (!FD_ISSET(rn->fd, &s->wfds))
rq = make_request_msg();
PARA_INFO_LOG("sending http request\n");
ret = write_va_buffer(rn->fd, "%s", rq);
rq = make_request_msg();
PARA_INFO_LOG("sending http request\n");
ret = write_va_buffer(rn->fd, "%s", rq);
if (ret < 0)
goto out;
phd->status = HTTP_SENT_GET_REQUEST;
if (ret < 0)
goto out;
phd->status = HTTP_SENT_GET_REQUEST;
}
if (phd->status == HTTP_SENT_GET_REQUEST) {
ret = read_pattern(rn->fd, HTTP_OK_MSG, strlen(HTTP_OK_MSG), &s->rfds);
if (ret < 0)
goto out;
if (ret == 0)
}
if (phd->status == HTTP_SENT_GET_REQUEST) {
ret = read_pattern(rn->fd, HTTP_OK_MSG, strlen(HTTP_OK_MSG), &s->rfds);
if (ret < 0)
goto out;
if (ret == 0)
PARA_INFO_LOG("received ok msg, streaming\n");
phd->status = HTTP_STREAMING;
PARA_INFO_LOG("received ok msg, streaming\n");
phd->status = HTTP_STREAMING;
}
ret = -E_HTTP_RECV_OVERRUN;
iovcnt = btr_pool_get_buffers(rn->btrp, iov);
}
ret = -E_HTTP_RECV_OVERRUN;
iovcnt = btr_pool_get_buffers(rn->btrp, iov);
btr_add_output_pool(rn->btrp, num_bytes - iov[0].iov_len, btrn);
}
out:
btr_add_output_pool(rn->btrp, num_bytes - iov[0].iov_len, btrn);
}
out:
- if (ret >= 0)
- return;
- btr_remove_node(&rn->btrn);
- t->error = ret;
+ if (ret < 0)
+ btr_remove_node(&rn->btrn);
+ return ret;
}
static void http_recv_close(struct receiver_node *rn)
}
static void http_recv_close(struct receiver_node *rn)
r->open = http_recv_open;
r->close = http_recv_close;
r->pre_select = http_recv_pre_select;
r->open = http_recv_open;
r->close = http_recv_close;
r->pre_select = http_recv_pre_select;
- r->post_select = http_recv_post_select;
+ r->post_select = NULL;
+ r->new_post_select = http_recv_post_select;
r->parse_config = http_recv_parse_config;
r->free_config = http_recv_free_config;
r->help = (struct ggo_help) {
r->parse_config = http_recv_parse_config;
r->free_config = http_recv_free_config;
r->help = (struct ggo_help) {
tmp = NULL;
}
pt->rn.task.pre_select = afh_recv->pre_select;
tmp = NULL;
}
pt->rn.task.pre_select = afh_recv->pre_select;
- pt->rn.task.post_select = afh_recv->post_select;
+ if (afh_recv->new_post_select) {
+ pt->rn.task.new_post_select = afh_recv->new_post_select;
+ pt->rn.task.post_select = NULL;
+ } else {
+ pt->rn.task.post_select = NULL;
+ pt->rn.task.new_post_select = afh_recv->new_post_select;
+ }
sprintf(pt->rn.task.status, "%s receiver node", afh_recv->name);
return 1;
fail:
sprintf(pt->rn.task.status, "%s receiver node", afh_recv->name);
return 1;
fail:
pt->fn.filter_num = ret;
decoder = filters + ret;
pt->fn.task.pre_select = decoder->pre_select;
pt->fn.filter_num = ret;
decoder = filters + ret;
pt->fn.task.pre_select = decoder->pre_select;
- pt->fn.task.post_select = decoder->post_select;
+ if (decoder->new_post_select) {
+ pt->fn.task.new_post_select = decoder->new_post_select;
+ pt->fn.task.post_select = NULL;
+ } else {
+ pt->fn.task.new_post_select = NULL;
+ pt->fn.task.post_select = decoder->post_select;
+ }
sprintf(pt->fn.task.status, "%s decoder", af);
pt->fn.btrn = btr_new_node(&(struct btr_node_description)
EMBRACE(.name = decoder->name, .parent = pt->rn.btrn,
sprintf(pt->fn.task.status, "%s decoder", af);
pt->fn.btrn = btr_new_node(&(struct btr_node_description)
EMBRACE(.name = decoder->name, .parent = pt->rn.btrn,
register_task(&s, &sot.task);
rn.task.pre_select = r->pre_select;
register_task(&s, &sot.task);
rn.task.pre_select = r->pre_select;
- rn.task.post_select = r->post_select;
+ if (r->new_post_select) {
+ rn.task.new_post_select = r->new_post_select;
+ rn.task.post_select = NULL;
+ } else {
+ rn.task.new_post_select = NULL;
+ rn.task.post_select = r->post_select;;
+ }
sprintf(rn.task.status, "%s", r->name);
register_task(&s, &rn.task);
sprintf(rn.task.status, "%s", r->name);
register_task(&s, &rn.task);
* \sa select(2), struct receiver.
*/
void (*post_select)(struct sched *s, struct task *t);
* \sa select(2), struct receiver.
*/
void (*post_select)(struct sched *s, struct task *t);
+ /** New variant, see sched.h. */
+ int (*new_post_select)(struct sched *s, struct task *t);
/** The two help texts of this receiver. */
struct ggo_help help;
/** The two help texts of this receiver. */
struct ggo_help help;
para_strerror(-t->error));
if (t->pre_select)
list_del(&t->pre_select_node);
para_strerror(-t->error));
if (t->pre_select)
list_del(&t->pre_select_node);
+ if (t->new_post_select || t->post_select)
list_del(&t->post_select_node);
}
list_del(&t->post_select_node);
}
static inline void call_post_select(struct sched *s, struct task *t)
{
#ifndef SCHED_DEBUG
static inline void call_post_select(struct sched *s, struct task *t)
{
#ifndef SCHED_DEBUG
+ if (t->new_post_select) {
+ t->error = t->new_post_select(s, t);
+ return;
+ }
+ return t->post_select(s, t);
#else
struct timeval t1, t2, diff;
unsigned long pst;
clock_get_realtime(&t1);
#else
struct timeval t1, t2, diff;
unsigned long pst;
clock_get_realtime(&t1);
+ if (t->new_post_select)
+ t->error = t->new_post_select(s, t);
+ else
+ t->post_select(s, t);
clock_get_realtime(&t2);
tv_diff(&t1, &t2, &diff);
pst = tv2ms(&diff);
clock_get_realtime(&t2);
tv_diff(&t1, &t2, &diff);
pst = tv2ms(&diff);
PARA_DEBUG_LOG("pre_select: %p\n", &t->pre_select);
list_add_tail(&t->pre_select_node, &s->pre_select_list);
}
PARA_DEBUG_LOG("pre_select: %p\n", &t->pre_select);
list_add_tail(&t->pre_select_node, &s->pre_select_list);
}
+ if (t->new_post_select) {
+ PARA_DEBUG_LOG("post_select: %p\n", &t->new_post_select);
+ list_add_tail(&t->post_select_node, &s->post_select_list);
+ } else if ((t->post_select)) {
PARA_DEBUG_LOG("post_select: %p\n", &t->post_select);
list_add_tail(&t->post_select_node, &s->post_select_list);
}
PARA_DEBUG_LOG("post_select: %p\n", &t->post_select);
list_add_tail(&t->post_select_node, &s->post_select_list);
}
* Evaluate and act upon the results of the previous select call.
*/
void (*post_select)(struct sched *s, struct task *t);
* Evaluate and act upon the results of the previous select call.
*/
void (*post_select)(struct sched *s, struct task *t);
+ /**
+ * The newer variant of the post select hook of \a t.
+ *
+ * This hook must return the error code rather than store it in
+ * t->error.
+ */
+ int (*new_post_select)(struct sched *s, struct task *t);
/** Whether this task is in error state. */
int error;
/** Position of the task in the pre_select list of the scheduler. */
/** Whether this task is in error state. */
int error;
/** Position of the task in the pre_select list of the scheduler. */
* Called from the post_select function of the writer node's task.
*/
void (*post_select)(struct sched *s, struct task *t);
* Called from the post_select function of the writer node's task.
*/
void (*post_select)(struct sched *s, struct task *t);
+ /** New variant, see sched.h. */
+ int (*new_post_select)(struct sched *s, struct task *t);
/**
* Close one instance of the writer.
*
/**
* Close one instance of the writer.
*
.handler = w->execute, .context = wn));
strcpy(wn->task.status, name);
free(name);
.handler = w->execute, .context = wn));
strcpy(wn->task.status, name);
free(name);
- wn->task.post_select = w->post_select;
wn->task.pre_select = w->pre_select;
wn->task.pre_select = w->pre_select;
+ if (w->new_post_select) {
+ wn->task.new_post_select = w->new_post_select;
+ wn->task.post_select = NULL;
+ } else {
+ wn->task.new_post_select = NULL;
+ wn->task.post_select = w->post_select;
+ }
register_task(s, &wn->task);
}
register_task(s, &wn->task);
}