task_register() conversion: receivers
authorAndre Noll <maan@systemlinux.org>
Sat, 25 Jan 2014 18:41:45 +0000 (19:41 +0100)
committerAndre Noll <maan@systemlinux.org>
Sun, 25 May 2014 13:36:37 +0000 (15:36 +0200)
This adds a new public function, task_reap(), to sched.c. It is
called by para_audiod and para_play to free the memory occupied by
the receiver node after EOF. sched_shutdown() can not be used for
this purpose since the scheduler stays active during the life time of
these programs (i.e. schedule() never returns) while receiver nodes
come and go.

The new task_reap() has to face the problem that it is called
from another task's ->post_select() method, so removing the task
being reaped from the scheduler task list is not possible in
task_reap(). Hence this patch adds the new flag "dead" to struct
task. It is initially unset and is turned on in task_reap() to indicate
that (a) the task has exited (i.e. ->post_select() returned negative)
and (b) task_reap() has been called to fetch the exit status. Only
if this flag is set, the scheduler removes the task from the task list.

afh_recv.c
audiod.c
dccp_recv.c
http_recv.c
play.c
recv.c
recv.h
recv_common.c
sched.c
sched.h
udp_recv.c

index 941ec4e..e320fde 100644 (file)
@@ -152,7 +152,7 @@ static void afh_recv_close(struct receiver_node *rn)
 
 static void afh_recv_pre_select(struct sched *s, struct task *t)
 {
-       struct receiver_node *rn = container_of(t, struct receiver_node, task);
+       struct receiver_node *rn = task_context(t);
        struct private_afh_recv_data *pard = rn->private_data;
        struct afh_info *afhi = &pard->afhi;
        struct afh_recv_args_info *conf = rn->conf;
@@ -172,7 +172,7 @@ static void afh_recv_pre_select(struct sched *s, struct task *t)
 
 static int afh_recv_post_select(__a_unused struct sched *s, struct task *t)
 {
-       struct receiver_node *rn = container_of(t, struct receiver_node, task);
+       struct receiver_node *rn = task_context(t);
        struct afh_recv_args_info *conf = rn->conf;
        struct private_afh_recv_data *pard = rn->private_data;
        struct btr_node *btrn = rn->btrn;
index b7b6d2d..4b88ec5 100644 (file)
--- a/audiod.c
+++ b/audiod.c
@@ -391,6 +391,7 @@ static void close_receiver(int slot_num)
                audio_formats[s->format], slot_num);
        a->receiver->close(s->receiver_node);
        btr_remove_node(&s->receiver_node->btrn);
+       task_reap(&s->receiver_node->task);
        free(s->receiver_node);
        s->receiver_node = NULL;
        tv_add(now, &(struct timeval)EMBRACE(0, 200 * 1000),
@@ -459,7 +460,7 @@ static void notify_receivers(int error)
                        continue;
                if (!s->receiver_node)
                        continue;
-               task_notify(&s->receiver_node->task, error);
+               task_notify(s->receiver_node->task, error);
        }
 }
 
@@ -566,10 +567,12 @@ static int open_receiver(int format)
        s->receiver_node = rn;
        PARA_NOTICE_LOG("started %s: %s receiver in slot %d\n",
                audio_formats[format], r->name, slot_num);
-       rn->task.pre_select = r->pre_select;
-       rn->task.post_select = r->post_select;
-       sprintf(rn->task.status, "%s receiver node", r->name);
-       register_task(&sched, &rn->task);
+       rn->task = task_register(&(struct task_info) {
+               .name = r->name,
+               .pre_select = r->pre_select,
+               .post_select = r->post_select,
+               .context = rn,
+       }, &sched);
        return slot_num;
 }
 
@@ -584,7 +587,7 @@ static bool receiver_running(void)
 
                if (!s->receiver_node)
                        continue;
-               if (s->receiver_node->task.error >= 0)
+               if (s->receiver_node->task->error >= 0)
                        return true;
                if (ss1 == ss2)
                        return true;
@@ -611,7 +614,7 @@ struct btr_node *audiod_get_btr_root(void)
                struct timeval rstime;
                if (!s->receiver_node)
                        continue;
-               if (s->receiver_node->task.error < 0)
+               if (s->receiver_node->task->error < 0)
                        continue;
                btr_get_node_start(s->receiver_node->btrn, &rstime);
                if (newest_slot >= 0 && tv_diff(&rstime, &newest_rstime, NULL) < 0)
@@ -1103,7 +1106,7 @@ static bool must_close_slot(int slot_num)
 
        if (s->format < 0)
                return false;
-       if (s->receiver_node && s->receiver_node->task.error >= 0)
+       if (s->receiver_node && s->receiver_node->task->error >= 0)
                return false;
        for (i = 0; i < a->num_filters; i++)
                if (s->fns && s->fns[i].task.error >= 0)
@@ -1427,6 +1430,7 @@ int main(int argc, char *argv[])
        sched.default_timeout.tv_sec = 2;
        sched.default_timeout.tv_usec = 999 * 1000;
        ret = schedule(&sched);
+       sched_shutdown(&sched);
 
        PARA_EMERG_LOG("%s\n", para_strerror(-ret));
        return EXIT_FAILURE;
index 3d6588a..1c41fd3 100644 (file)
@@ -121,7 +121,7 @@ static void *dccp_recv_parse_config(int argc, char **argv)
 
 static void dccp_recv_pre_select(struct sched *s, struct task *t)
 {
-       struct receiver_node *rn = container_of(t, struct receiver_node, task);
+       struct receiver_node *rn = task_context(t);
 
        if (generic_recv_pre_select(s, t) <= 0)
                return;
@@ -130,7 +130,7 @@ static void dccp_recv_pre_select(struct sched *s, struct task *t)
 
 static int dccp_recv_post_select(struct sched *s, struct task *t)
 {
-       struct receiver_node *rn = container_of(t, struct receiver_node, task);
+       struct receiver_node *rn = task_context(t);
        struct btr_node *btrn = rn->btrn;
        struct iovec iov[2];
        int ret, iovcnt;
index 1f02e48..03fca4e 100644 (file)
@@ -62,7 +62,7 @@ static char *make_request_msg(void)
 
 static void http_recv_pre_select(struct sched *s, struct task *t)
 {
-       struct receiver_node *rn = container_of(t, struct receiver_node, task);
+       struct receiver_node *rn = task_context(t);
        struct private_http_recv_data *phd = rn->private_data;
 
        if (generic_recv_pre_select(s, t) <= 0)
@@ -80,7 +80,7 @@ static void http_recv_pre_select(struct sched *s, struct task *t)
  */
 static int http_recv_post_select(struct sched *s, struct task *t)
 {
-       struct receiver_node *rn = container_of(t, struct receiver_node, task);
+       struct receiver_node *rn = task_context(t);
        struct private_http_recv_data *phd = rn->private_data;
        struct btr_node *btrn = rn->btrn;
        int ret, iovcnt;
diff --git a/play.c b/play.c
index 5736a2d..2dcc37e 100644 (file)
--- a/play.c
+++ b/play.c
@@ -249,7 +249,7 @@ static int get_playback_error(struct play_task *pt)
                return 0;
        if (pt->fn.task.error >= 0)
                return 0;
-       if (pt->rn.task.error >= 0)
+       if (pt->rn.task->error >= 0)
                return 0;
        if (err == -E_BTR_EOF || err == -E_RECV_EOF || err == -E_EOF
                        || err == -E_WRITE_COMMON_EOF)
@@ -277,6 +277,7 @@ static int eof_cleanup(struct play_task *pt)
        free(pt->fn.conf);
        memset(&pt->fn, 0, sizeof(struct filter_node));
 
+       task_reap(&pt->rn.task);
        btr_remove_node(&pt->rn.btrn);
        /*
         * On eof (ret > 0), we do not wipe the receiver node struct until a
@@ -351,9 +352,6 @@ static int open_new_file(struct play_task *pt)
                free(tmp);
                tmp = NULL;
        }
-       pt->rn.task.pre_select = afh_recv->pre_select;
-       pt->rn.task.post_select = afh_recv->post_select;
-       sprintf(pt->rn.task.status, "%s receiver node", afh_recv->name);
        return 1;
 fail:
        wipe_receiver_node(pt);
@@ -405,7 +403,13 @@ static int load_file(struct play_task *pt)
        pt->wn.task.error = 0;
 
        /* success, register tasks */
-       register_task(&sched, &pt->rn.task);
+       pt->rn.task = task_register(
+               &(struct task_info) {
+                       .name = afh_recv->name,
+                       .pre_select = afh_recv->pre_select,
+                       .post_select = afh_recv->post_select,
+                       .context = &pt->rn
+               }, &sched);
        register_task(&sched, &pt->fn.task);
        register_writer_node(&pt->wn, pt->fn.btrn, &sched);
        return 1;
@@ -1250,6 +1254,7 @@ int main(int argc, char *argv[])
        sprintf(pt->task.status, "play task");
        register_task(&sched, &pt->task);
        ret = schedule(&sched);
+       sched_shutdown(&sched);
        if (ret < 0)
                PARA_ERROR_LOG("%s\n", para_strerror(-ret));
        return ret < 0? EXIT_FAILURE : EXIT_SUCCESS;
diff --git a/recv.c b/recv.c
index 4d8916f..0d7bb2c 100644 (file)
--- a/recv.c
+++ b/recv.c
@@ -65,6 +65,7 @@ int main(int argc, char *argv[])
        struct receiver_node rn;
        struct stdout_task sot;
        static struct sched s;
+       struct task_info ti;
 
        recv_cmdline_parser(argc, argv, &conf);
        loglevel = get_loglevel_by_name(conf.loglevel_arg);
@@ -93,10 +94,11 @@ int main(int argc, char *argv[])
                EMBRACE(.parent = rn.btrn, .name = "stdout"));
        stdout_task_register(&sot, &s);
 
-       rn.task.pre_select = r->pre_select;
-       rn.task.post_select = r->post_select;
-       sprintf(rn.task.status, "%s", r->name);
-       register_task(&s, &rn.task);
+       ti.name = r->name;
+       ti.pre_select = r->pre_select;
+       ti.post_select = r->post_select;
+       ti.context = &rn;
+       rn.task = task_register(&ti, &s);
 
        s.default_timeout.tv_sec = 1;
        s.default_timeout.tv_usec = 0;
diff --git a/recv.h b/recv.h
index a590aab..2b5e36d 100644 (file)
--- a/recv.h
+++ b/recv.h
@@ -17,7 +17,7 @@ struct receiver_node {
        /** Pointer to the configuration data for this instance. */
        void *conf;
        /** The task associated with this instance. */
-       struct task task;
+       struct task *task;
        /** The receiver node is always the root of the buffer tree. */
        struct btr_node *btrn;
        /** Each receiver node maintains a buffer pool for the received data. */
index eb5fe57..921d57a 100644 (file)
@@ -127,7 +127,7 @@ void print_receiver_helps(unsigned flags)
  */
 int generic_recv_pre_select(struct sched *s, struct task *t)
 {
-       struct receiver_node *rn = container_of(t, struct receiver_node, task);
+       struct receiver_node *rn = task_context(t);
        int ret = btr_node_status(rn->btrn, 0, BTR_NT_ROOT);
 
        if (ret < 0)
diff --git a/sched.c b/sched.c
index 101cc4c..6b8e091 100644 (file)
--- a/sched.c
+++ b/sched.c
@@ -75,8 +75,11 @@ static unsigned sched_post_select(struct sched *s)
        unsigned num_running_tasks = 0;
 
        list_for_each_entry_safe(t, tmp, &s->task_list, node) {
-               if (t->error < 0)
+               if (t->error < 0) {
+                       if (t->dead) /* task has been reaped */
+                               unlink_and_free_task(t);
                        continue;
+               }
                call_post_select(s, t);
                t->notification = 0;
                if (t->error < 0) {
@@ -138,6 +141,53 @@ again:
        goto again;
 }
 
+/**
+ * Obtain the error status of a task and deallocate its resources.
+ *
+ * \param tptr Identifies the task to reap.
+ *
+ * This function is similar to wait(2) in that it returns information about a
+ * terminated task and allows to release the resources associated with the
+ * task. Until this function is called, the terminated task remains in a zombie
+ * state.
+ *
+ * \return If \a tptr is \p NULL, or \a *tptr is \p NULL, the function does
+ * nothing and returns zero. Otherwise, it is checked whether the task
+ * identified by \a tptr is still running. If it is, the function returns zero
+ * and again, no action is taken. Otherwise the (negative) error code of the
+ * terminated task is returned and \a *tptr is set to \p NULL. The task will
+ * then be removed removed from the scheduler task list.
+ *
+ * \sa \ref sched_shutdown(), wait(2).
+ */
+int task_reap(struct task **tptr)
+{
+       struct task *t;
+
+       if (!tptr)
+               return 0;
+       t = *tptr;
+       if (!t)
+               return 0;
+       if (!t->owned_by_sched)
+               return 0;
+       if (t->error >= 0)
+               return 0;
+       if (t->dead) /* will be freed in sched_post_select() */
+               return 0;
+       /*
+        * With list_for_each_entry_safe() it is only safe to remove the
+        * _current_ list item. Since we are being called from the loop in
+        * schedule() via some task's ->post_select() function, freeing the
+        * given task here would result in use-after-free bugs in schedule().
+        * So we only set t->dead which tells schedule() to free the task in
+        * the next iteration of its loop.
+        */
+       t->dead = true;
+       *tptr = NULL;
+       return t->error;
+}
+
 /**
  * Deallocate all resources of all tasks of a scheduler instance.
  *
@@ -199,6 +249,7 @@ struct task *task_register(struct task_info *info, struct sched *s)
        t->status[sizeof(t->status) - 1] = '\0';
        t->notification = 0;
        t->error = 0;
+       t->dead = false;
        t->pre_select = info->pre_select;
        t->post_select = info->post_select;
        t->context = info->context;
@@ -239,7 +290,7 @@ char *get_task_list(struct sched *s)
        list_for_each_entry_safe(t, tmp, &s->task_list, node) {
                char *tmp_msg;
                tmp_msg = make_message("%s%p\t%s\t%s\n", msg? msg : "", t,
-                       t->error < 0? "zombie" : "running",
+                       t->error < 0? (t->dead? "dead" : "zombie") : "running",
                        t->status);
                free(msg);
                msg = tmp_msg;
diff --git a/sched.h b/sched.h
index 4052484..fd714b6 100644 (file)
--- a/sched.h
+++ b/sched.h
@@ -55,6 +55,8 @@ struct task {
        int notification;
        /** Whether the task structure should be freed in sched_shutdown(). */
        bool owned_by_sched;
+       /** True if task is in error state and exit status has been queried. */
+       bool dead;
        /** Usually a pointer to the struct containing this task. */
        void *context;
 };
@@ -104,6 +106,7 @@ char *get_task_list(struct sched *s);
 void task_notify(struct task *t, int err);
 void task_notify_all(struct sched *s, int err);
 int task_get_notification(const struct task *t);
+int task_reap(struct task **tptr);
 void sched_min_delay(struct sched *s);
 void sched_request_timeout(struct timeval *to, struct sched *s);
 void sched_request_timeout_ms(long unsigned ms, struct sched *s);
index 15cf73e..4d4b67f 100644 (file)
@@ -29,7 +29,7 @@
 
 static void udp_recv_pre_select(struct sched *s, struct task *t)
 {
-       struct receiver_node *rn = container_of(t, struct receiver_node, task);
+       struct receiver_node *rn = task_context(t);
 
        if (generic_recv_pre_select(s, t) <= 0)
                return;
@@ -56,7 +56,7 @@ static int udp_check_eof(size_t sz, struct iovec iov[2])
 
 static int udp_recv_post_select(__a_unused struct sched *s, struct task *t)
 {
-       struct receiver_node *rn = container_of(t, struct receiver_node, task);
+       struct receiver_node *rn = task_context(t);
        struct btr_node *btrn = rn->btrn;
        size_t num_bytes;
        struct iovec iov[2];