static void afh_recv_pre_select(struct sched *s, struct task *t)
{
- struct receiver_node *rn = container_of(t, struct receiver_node, task);
+ struct receiver_node *rn = task_context(t);
struct private_afh_recv_data *pard = rn->private_data;
struct afh_info *afhi = &pard->afhi;
struct afh_recv_args_info *conf = rn->conf;
static int afh_recv_post_select(__a_unused struct sched *s, struct task *t)
{
- struct receiver_node *rn = container_of(t, struct receiver_node, task);
+ struct receiver_node *rn = task_context(t);
struct afh_recv_args_info *conf = rn->conf;
struct private_afh_recv_data *pard = rn->private_data;
struct btr_node *btrn = rn->btrn;
audio_formats[s->format], slot_num);
a->receiver->close(s->receiver_node);
btr_remove_node(&s->receiver_node->btrn);
+ task_reap(&s->receiver_node->task);
free(s->receiver_node);
s->receiver_node = NULL;
tv_add(now, &(struct timeval)EMBRACE(0, 200 * 1000),
continue;
if (!s->receiver_node)
continue;
- task_notify(&s->receiver_node->task, error);
+ task_notify(s->receiver_node->task, error);
}
}
s->receiver_node = rn;
PARA_NOTICE_LOG("started %s: %s receiver in slot %d\n",
audio_formats[format], r->name, slot_num);
- rn->task.pre_select = r->pre_select;
- rn->task.post_select = r->post_select;
- sprintf(rn->task.status, "%s receiver node", r->name);
- register_task(&sched, &rn->task);
+ rn->task = task_register(&(struct task_info) {
+ .name = r->name,
+ .pre_select = r->pre_select,
+ .post_select = r->post_select,
+ .context = rn,
+ }, &sched);
return slot_num;
}
if (!s->receiver_node)
continue;
- if (s->receiver_node->task.error >= 0)
+ if (s->receiver_node->task->error >= 0)
return true;
if (ss1 == ss2)
return true;
struct timeval rstime;
if (!s->receiver_node)
continue;
- if (s->receiver_node->task.error < 0)
+ if (s->receiver_node->task->error < 0)
continue;
btr_get_node_start(s->receiver_node->btrn, &rstime);
if (newest_slot >= 0 && tv_diff(&rstime, &newest_rstime, NULL) < 0)
if (s->format < 0)
return false;
- if (s->receiver_node && s->receiver_node->task.error >= 0)
+ if (s->receiver_node && s->receiver_node->task->error >= 0)
return false;
for (i = 0; i < a->num_filters; i++)
if (s->fns && s->fns[i].task.error >= 0)
sched.default_timeout.tv_sec = 2;
sched.default_timeout.tv_usec = 999 * 1000;
ret = schedule(&sched);
+ sched_shutdown(&sched);
PARA_EMERG_LOG("%s\n", para_strerror(-ret));
return EXIT_FAILURE;
static void dccp_recv_pre_select(struct sched *s, struct task *t)
{
- struct receiver_node *rn = container_of(t, struct receiver_node, task);
+ struct receiver_node *rn = task_context(t);
if (generic_recv_pre_select(s, t) <= 0)
return;
static int dccp_recv_post_select(struct sched *s, struct task *t)
{
- struct receiver_node *rn = container_of(t, struct receiver_node, task);
+ struct receiver_node *rn = task_context(t);
struct btr_node *btrn = rn->btrn;
struct iovec iov[2];
int ret, iovcnt;
static void http_recv_pre_select(struct sched *s, struct task *t)
{
- struct receiver_node *rn = container_of(t, struct receiver_node, task);
+ struct receiver_node *rn = task_context(t);
struct private_http_recv_data *phd = rn->private_data;
if (generic_recv_pre_select(s, t) <= 0)
*/
static int http_recv_post_select(struct sched *s, struct task *t)
{
- struct receiver_node *rn = container_of(t, struct receiver_node, task);
+ struct receiver_node *rn = task_context(t);
struct private_http_recv_data *phd = rn->private_data;
struct btr_node *btrn = rn->btrn;
int ret, iovcnt;
return 0;
if (pt->fn.task.error >= 0)
return 0;
- if (pt->rn.task.error >= 0)
+ if (pt->rn.task->error >= 0)
return 0;
if (err == -E_BTR_EOF || err == -E_RECV_EOF || err == -E_EOF
|| err == -E_WRITE_COMMON_EOF)
free(pt->fn.conf);
memset(&pt->fn, 0, sizeof(struct filter_node));
+ task_reap(&pt->rn.task);
btr_remove_node(&pt->rn.btrn);
/*
* On eof (ret > 0), we do not wipe the receiver node struct until a
free(tmp);
tmp = NULL;
}
- pt->rn.task.pre_select = afh_recv->pre_select;
- pt->rn.task.post_select = afh_recv->post_select;
- sprintf(pt->rn.task.status, "%s receiver node", afh_recv->name);
return 1;
fail:
wipe_receiver_node(pt);
pt->wn.task.error = 0;
/* success, register tasks */
- register_task(&sched, &pt->rn.task);
+ pt->rn.task = task_register(
+ &(struct task_info) {
+ .name = afh_recv->name,
+ .pre_select = afh_recv->pre_select,
+ .post_select = afh_recv->post_select,
+ .context = &pt->rn
+ }, &sched);
register_task(&sched, &pt->fn.task);
register_writer_node(&pt->wn, pt->fn.btrn, &sched);
return 1;
sprintf(pt->task.status, "play task");
register_task(&sched, &pt->task);
ret = schedule(&sched);
+ sched_shutdown(&sched);
if (ret < 0)
PARA_ERROR_LOG("%s\n", para_strerror(-ret));
return ret < 0? EXIT_FAILURE : EXIT_SUCCESS;
struct receiver_node rn;
struct stdout_task sot;
static struct sched s;
+ struct task_info ti;
recv_cmdline_parser(argc, argv, &conf);
loglevel = get_loglevel_by_name(conf.loglevel_arg);
EMBRACE(.parent = rn.btrn, .name = "stdout"));
stdout_task_register(&sot, &s);
- rn.task.pre_select = r->pre_select;
- rn.task.post_select = r->post_select;
- sprintf(rn.task.status, "%s", r->name);
- register_task(&s, &rn.task);
+ ti.name = r->name;
+ ti.pre_select = r->pre_select;
+ ti.post_select = r->post_select;
+ ti.context = &rn;
+ rn.task = task_register(&ti, &s);
s.default_timeout.tv_sec = 1;
s.default_timeout.tv_usec = 0;
/** Pointer to the configuration data for this instance. */
void *conf;
/** The task associated with this instance. */
- struct task task;
+ struct task *task;
/** The receiver node is always the root of the buffer tree. */
struct btr_node *btrn;
/** Each receiver node maintains a buffer pool for the received data. */
*/
int generic_recv_pre_select(struct sched *s, struct task *t)
{
- struct receiver_node *rn = container_of(t, struct receiver_node, task);
+ struct receiver_node *rn = task_context(t);
int ret = btr_node_status(rn->btrn, 0, BTR_NT_ROOT);
if (ret < 0)
unsigned num_running_tasks = 0;
list_for_each_entry_safe(t, tmp, &s->task_list, node) {
- if (t->error < 0)
+ if (t->error < 0) {
+ if (t->dead) /* task has been reaped */
+ unlink_and_free_task(t);
continue;
+ }
call_post_select(s, t);
t->notification = 0;
if (t->error < 0) {
goto again;
}
+/**
+ * Obtain the error status of a task and deallocate its resources.
+ *
+ * \param tptr Identifies the task to reap.
+ *
+ * This function is similar to wait(2) in that it returns information about a
+ * terminated task and allows to release the resources associated with the
+ * task. Until this function is called, the terminated task remains in a zombie
+ * state.
+ *
+ * \return If \a tptr is \p NULL, or \a *tptr is \p NULL, the function does
+ * nothing and returns zero. Otherwise, it is checked whether the task
+ * identified by \a tptr is still running. If it is, the function returns zero
+ * and again, no action is taken. Otherwise the (negative) error code of the
+ * terminated task is returned and \a *tptr is set to \p NULL. The task will
+ * then be removed removed from the scheduler task list.
+ *
+ * \sa \ref sched_shutdown(), wait(2).
+ */
+int task_reap(struct task **tptr)
+{
+ struct task *t;
+
+ if (!tptr)
+ return 0;
+ t = *tptr;
+ if (!t)
+ return 0;
+ if (!t->owned_by_sched)
+ return 0;
+ if (t->error >= 0)
+ return 0;
+ if (t->dead) /* will be freed in sched_post_select() */
+ return 0;
+ /*
+ * With list_for_each_entry_safe() it is only safe to remove the
+ * _current_ list item. Since we are being called from the loop in
+ * schedule() via some task's ->post_select() function, freeing the
+ * given task here would result in use-after-free bugs in schedule().
+ * So we only set t->dead which tells schedule() to free the task in
+ * the next iteration of its loop.
+ */
+ t->dead = true;
+ *tptr = NULL;
+ return t->error;
+}
+
/**
* Deallocate all resources of all tasks of a scheduler instance.
*
t->status[sizeof(t->status) - 1] = '\0';
t->notification = 0;
t->error = 0;
+ t->dead = false;
t->pre_select = info->pre_select;
t->post_select = info->post_select;
t->context = info->context;
list_for_each_entry_safe(t, tmp, &s->task_list, node) {
char *tmp_msg;
tmp_msg = make_message("%s%p\t%s\t%s\n", msg? msg : "", t,
- t->error < 0? "zombie" : "running",
+ t->error < 0? (t->dead? "dead" : "zombie") : "running",
t->status);
free(msg);
msg = tmp_msg;
int notification;
/** Whether the task structure should be freed in sched_shutdown(). */
bool owned_by_sched;
+ /** True if task is in error state and exit status has been queried. */
+ bool dead;
/** Usually a pointer to the struct containing this task. */
void *context;
};
void task_notify(struct task *t, int err);
void task_notify_all(struct sched *s, int err);
int task_get_notification(const struct task *t);
+int task_reap(struct task **tptr);
void sched_min_delay(struct sched *s);
void sched_request_timeout(struct timeval *to, struct sched *s);
void sched_request_timeout_ms(long unsigned ms, struct sched *s);
static void udp_recv_pre_select(struct sched *s, struct task *t)
{
- struct receiver_node *rn = container_of(t, struct receiver_node, task);
+ struct receiver_node *rn = task_context(t);
if (generic_recv_pre_select(s, t) <= 0)
return;
static int udp_recv_post_select(__a_unused struct sched *s, struct task *t)
{
- struct receiver_node *rn = container_of(t, struct receiver_node, task);
+ struct receiver_node *rn = task_context(t);
struct btr_node *btrn = rn->btrn;
size_t num_bytes;
struct iovec iov[2];