]> git.tuebingen.mpg.de Git - paraslash.git/commitdiff
task_register() conversion: receivers
authorAndre Noll <maan@systemlinux.org>
Sat, 25 Jan 2014 18:41:45 +0000 (19:41 +0100)
committerAndre Noll <maan@systemlinux.org>
Sun, 25 May 2014 13:36:37 +0000 (15:36 +0200)
This adds a new public function, task_reap(), to sched.c. It is
called by para_audiod and para_play to free the memory occupied by
the receiver node after EOF. sched_shutdown() can not be used for
this purpose since the scheduler stays active during the life time of
these programs (i.e. schedule() never returns) while receiver nodes
come and go.

The new task_reap() has to face the problem that it is called
from another task's ->post_select() method, so removing the task
being reaped from the scheduler task list is not possible in
task_reap(). Hence this patch adds the new flag "dead" to struct
task. It is initially unset and is turned on in task_reap() to indicate
that (a) the task has exited (i.e. ->post_select() returned negative)
and (b) task_reap() has been called to fetch the exit status. Only
if this flag is set, the scheduler removes the task from the task list.

afh_recv.c
audiod.c
dccp_recv.c
http_recv.c
play.c
recv.c
recv.h
recv_common.c
sched.c
sched.h
udp_recv.c

index 941ec4e94416e1e130555de9c7956567044ae1f0..e320fdee65627665d1247da98ddc68ff0786b364 100644 (file)
@@ -152,7 +152,7 @@ static void afh_recv_close(struct receiver_node *rn)
 
 static void afh_recv_pre_select(struct sched *s, struct task *t)
 {
-       struct receiver_node *rn = container_of(t, struct receiver_node, task);
+       struct receiver_node *rn = task_context(t);
        struct private_afh_recv_data *pard = rn->private_data;
        struct afh_info *afhi = &pard->afhi;
        struct afh_recv_args_info *conf = rn->conf;
@@ -172,7 +172,7 @@ static void afh_recv_pre_select(struct sched *s, struct task *t)
 
 static int afh_recv_post_select(__a_unused struct sched *s, struct task *t)
 {
-       struct receiver_node *rn = container_of(t, struct receiver_node, task);
+       struct receiver_node *rn = task_context(t);
        struct afh_recv_args_info *conf = rn->conf;
        struct private_afh_recv_data *pard = rn->private_data;
        struct btr_node *btrn = rn->btrn;
index b7b6d2d76af88ecbbb2f4a2f892f0848a4d5ad37..4b88ec5bba61c53be43e901edbae2bc8c7c84a5e 100644 (file)
--- a/audiod.c
+++ b/audiod.c
@@ -391,6 +391,7 @@ static void close_receiver(int slot_num)
                audio_formats[s->format], slot_num);
        a->receiver->close(s->receiver_node);
        btr_remove_node(&s->receiver_node->btrn);
+       task_reap(&s->receiver_node->task);
        free(s->receiver_node);
        s->receiver_node = NULL;
        tv_add(now, &(struct timeval)EMBRACE(0, 200 * 1000),
@@ -459,7 +460,7 @@ static void notify_receivers(int error)
                        continue;
                if (!s->receiver_node)
                        continue;
-               task_notify(&s->receiver_node->task, error);
+               task_notify(s->receiver_node->task, error);
        }
 }
 
@@ -566,10 +567,12 @@ static int open_receiver(int format)
        s->receiver_node = rn;
        PARA_NOTICE_LOG("started %s: %s receiver in slot %d\n",
                audio_formats[format], r->name, slot_num);
-       rn->task.pre_select = r->pre_select;
-       rn->task.post_select = r->post_select;
-       sprintf(rn->task.status, "%s receiver node", r->name);
-       register_task(&sched, &rn->task);
+       rn->task = task_register(&(struct task_info) {
+               .name = r->name,
+               .pre_select = r->pre_select,
+               .post_select = r->post_select,
+               .context = rn,
+       }, &sched);
        return slot_num;
 }
 
@@ -584,7 +587,7 @@ static bool receiver_running(void)
 
                if (!s->receiver_node)
                        continue;
-               if (s->receiver_node->task.error >= 0)
+               if (s->receiver_node->task->error >= 0)
                        return true;
                if (ss1 == ss2)
                        return true;
@@ -611,7 +614,7 @@ struct btr_node *audiod_get_btr_root(void)
                struct timeval rstime;
                if (!s->receiver_node)
                        continue;
-               if (s->receiver_node->task.error < 0)
+               if (s->receiver_node->task->error < 0)
                        continue;
                btr_get_node_start(s->receiver_node->btrn, &rstime);
                if (newest_slot >= 0 && tv_diff(&rstime, &newest_rstime, NULL) < 0)
@@ -1103,7 +1106,7 @@ static bool must_close_slot(int slot_num)
 
        if (s->format < 0)
                return false;
-       if (s->receiver_node && s->receiver_node->task.error >= 0)
+       if (s->receiver_node && s->receiver_node->task->error >= 0)
                return false;
        for (i = 0; i < a->num_filters; i++)
                if (s->fns && s->fns[i].task.error >= 0)
@@ -1427,6 +1430,7 @@ int main(int argc, char *argv[])
        sched.default_timeout.tv_sec = 2;
        sched.default_timeout.tv_usec = 999 * 1000;
        ret = schedule(&sched);
+       sched_shutdown(&sched);
 
        PARA_EMERG_LOG("%s\n", para_strerror(-ret));
        return EXIT_FAILURE;
index 3d6588ac4b338ad26c99c76276d0dc05ede9d115..1c41fd3c0d0d9d09b745bbd48fdabc181c74fec4 100644 (file)
@@ -121,7 +121,7 @@ static void *dccp_recv_parse_config(int argc, char **argv)
 
 static void dccp_recv_pre_select(struct sched *s, struct task *t)
 {
-       struct receiver_node *rn = container_of(t, struct receiver_node, task);
+       struct receiver_node *rn = task_context(t);
 
        if (generic_recv_pre_select(s, t) <= 0)
                return;
@@ -130,7 +130,7 @@ static void dccp_recv_pre_select(struct sched *s, struct task *t)
 
 static int dccp_recv_post_select(struct sched *s, struct task *t)
 {
-       struct receiver_node *rn = container_of(t, struct receiver_node, task);
+       struct receiver_node *rn = task_context(t);
        struct btr_node *btrn = rn->btrn;
        struct iovec iov[2];
        int ret, iovcnt;
index 1f02e48d9ae5ad2e32ae3273110cb540912e67a0..03fca4e11bf519a0202e191b7df704c617fd570b 100644 (file)
@@ -62,7 +62,7 @@ static char *make_request_msg(void)
 
 static void http_recv_pre_select(struct sched *s, struct task *t)
 {
-       struct receiver_node *rn = container_of(t, struct receiver_node, task);
+       struct receiver_node *rn = task_context(t);
        struct private_http_recv_data *phd = rn->private_data;
 
        if (generic_recv_pre_select(s, t) <= 0)
@@ -80,7 +80,7 @@ static void http_recv_pre_select(struct sched *s, struct task *t)
  */
 static int http_recv_post_select(struct sched *s, struct task *t)
 {
-       struct receiver_node *rn = container_of(t, struct receiver_node, task);
+       struct receiver_node *rn = task_context(t);
        struct private_http_recv_data *phd = rn->private_data;
        struct btr_node *btrn = rn->btrn;
        int ret, iovcnt;
diff --git a/play.c b/play.c
index 5736a2ddbdfc04199f6d2bf6544f9ce2d35a56f8..2dcc37ea2bf0954ccea2776c8dd410b85c9804b2 100644 (file)
--- a/play.c
+++ b/play.c
@@ -249,7 +249,7 @@ static int get_playback_error(struct play_task *pt)
                return 0;
        if (pt->fn.task.error >= 0)
                return 0;
-       if (pt->rn.task.error >= 0)
+       if (pt->rn.task->error >= 0)
                return 0;
        if (err == -E_BTR_EOF || err == -E_RECV_EOF || err == -E_EOF
                        || err == -E_WRITE_COMMON_EOF)
@@ -277,6 +277,7 @@ static int eof_cleanup(struct play_task *pt)
        free(pt->fn.conf);
        memset(&pt->fn, 0, sizeof(struct filter_node));
 
+       task_reap(&pt->rn.task);
        btr_remove_node(&pt->rn.btrn);
        /*
         * On eof (ret > 0), we do not wipe the receiver node struct until a
@@ -351,9 +352,6 @@ static int open_new_file(struct play_task *pt)
                free(tmp);
                tmp = NULL;
        }
-       pt->rn.task.pre_select = afh_recv->pre_select;
-       pt->rn.task.post_select = afh_recv->post_select;
-       sprintf(pt->rn.task.status, "%s receiver node", afh_recv->name);
        return 1;
 fail:
        wipe_receiver_node(pt);
@@ -405,7 +403,13 @@ static int load_file(struct play_task *pt)
        pt->wn.task.error = 0;
 
        /* success, register tasks */
-       register_task(&sched, &pt->rn.task);
+       pt->rn.task = task_register(
+               &(struct task_info) {
+                       .name = afh_recv->name,
+                       .pre_select = afh_recv->pre_select,
+                       .post_select = afh_recv->post_select,
+                       .context = &pt->rn
+               }, &sched);
        register_task(&sched, &pt->fn.task);
        register_writer_node(&pt->wn, pt->fn.btrn, &sched);
        return 1;
@@ -1250,6 +1254,7 @@ int main(int argc, char *argv[])
        sprintf(pt->task.status, "play task");
        register_task(&sched, &pt->task);
        ret = schedule(&sched);
+       sched_shutdown(&sched);
        if (ret < 0)
                PARA_ERROR_LOG("%s\n", para_strerror(-ret));
        return ret < 0? EXIT_FAILURE : EXIT_SUCCESS;
diff --git a/recv.c b/recv.c
index 4d8916f9bc4f0061732ec0394690d40470d9d177..0d7bb2ce3ce0c8133cd120ebb36f23e0d3e88b71 100644 (file)
--- a/recv.c
+++ b/recv.c
@@ -65,6 +65,7 @@ int main(int argc, char *argv[])
        struct receiver_node rn;
        struct stdout_task sot;
        static struct sched s;
+       struct task_info ti;
 
        recv_cmdline_parser(argc, argv, &conf);
        loglevel = get_loglevel_by_name(conf.loglevel_arg);
@@ -93,10 +94,11 @@ int main(int argc, char *argv[])
                EMBRACE(.parent = rn.btrn, .name = "stdout"));
        stdout_task_register(&sot, &s);
 
-       rn.task.pre_select = r->pre_select;
-       rn.task.post_select = r->post_select;
-       sprintf(rn.task.status, "%s", r->name);
-       register_task(&s, &rn.task);
+       ti.name = r->name;
+       ti.pre_select = r->pre_select;
+       ti.post_select = r->post_select;
+       ti.context = &rn;
+       rn.task = task_register(&ti, &s);
 
        s.default_timeout.tv_sec = 1;
        s.default_timeout.tv_usec = 0;
diff --git a/recv.h b/recv.h
index a590aabdb02b3a79902f4ea3d02e8379bd81133f..2b5e36d7c260c9fddbbd091c269886b3ad6dbfd3 100644 (file)
--- a/recv.h
+++ b/recv.h
@@ -17,7 +17,7 @@ struct receiver_node {
        /** Pointer to the configuration data for this instance. */
        void *conf;
        /** The task associated with this instance. */
-       struct task task;
+       struct task *task;
        /** The receiver node is always the root of the buffer tree. */
        struct btr_node *btrn;
        /** Each receiver node maintains a buffer pool for the received data. */
index eb5fe57ee5da961c208c38a2933e9f09422320d2..921d57ae7f48838a4cb6412658e60fa072ab2a99 100644 (file)
@@ -127,7 +127,7 @@ void print_receiver_helps(unsigned flags)
  */
 int generic_recv_pre_select(struct sched *s, struct task *t)
 {
-       struct receiver_node *rn = container_of(t, struct receiver_node, task);
+       struct receiver_node *rn = task_context(t);
        int ret = btr_node_status(rn->btrn, 0, BTR_NT_ROOT);
 
        if (ret < 0)
diff --git a/sched.c b/sched.c
index 101cc4c0dad3706e841881b2a6d2989825fc58b0..6b8e09108308336676e48c4d8c9e84eb6e388431 100644 (file)
--- a/sched.c
+++ b/sched.c
@@ -75,8 +75,11 @@ static unsigned sched_post_select(struct sched *s)
        unsigned num_running_tasks = 0;
 
        list_for_each_entry_safe(t, tmp, &s->task_list, node) {
-               if (t->error < 0)
+               if (t->error < 0) {
+                       if (t->dead) /* task has been reaped */
+                               unlink_and_free_task(t);
                        continue;
+               }
                call_post_select(s, t);
                t->notification = 0;
                if (t->error < 0) {
@@ -138,6 +141,53 @@ again:
        goto again;
 }
 
+/**
+ * Obtain the error status of a task and deallocate its resources.
+ *
+ * \param tptr Identifies the task to reap.
+ *
+ * This function is similar to wait(2) in that it returns information about a
+ * terminated task and allows to release the resources associated with the
+ * task. Until this function is called, the terminated task remains in a zombie
+ * state.
+ *
+ * \return If \a tptr is \p NULL, or \a *tptr is \p NULL, the function does
+ * nothing and returns zero. Otherwise, it is checked whether the task
+ * identified by \a tptr is still running. If it is, the function returns zero
+ * and again, no action is taken. Otherwise the (negative) error code of the
+ * terminated task is returned and \a *tptr is set to \p NULL. The task will
+ * then be removed removed from the scheduler task list.
+ *
+ * \sa \ref sched_shutdown(), wait(2).
+ */
+int task_reap(struct task **tptr)
+{
+       struct task *t;
+
+       if (!tptr)
+               return 0;
+       t = *tptr;
+       if (!t)
+               return 0;
+       if (!t->owned_by_sched)
+               return 0;
+       if (t->error >= 0)
+               return 0;
+       if (t->dead) /* will be freed in sched_post_select() */
+               return 0;
+       /*
+        * With list_for_each_entry_safe() it is only safe to remove the
+        * _current_ list item. Since we are being called from the loop in
+        * schedule() via some task's ->post_select() function, freeing the
+        * given task here would result in use-after-free bugs in schedule().
+        * So we only set t->dead which tells schedule() to free the task in
+        * the next iteration of its loop.
+        */
+       t->dead = true;
+       *tptr = NULL;
+       return t->error;
+}
+
 /**
  * Deallocate all resources of all tasks of a scheduler instance.
  *
@@ -199,6 +249,7 @@ struct task *task_register(struct task_info *info, struct sched *s)
        t->status[sizeof(t->status) - 1] = '\0';
        t->notification = 0;
        t->error = 0;
+       t->dead = false;
        t->pre_select = info->pre_select;
        t->post_select = info->post_select;
        t->context = info->context;
@@ -239,7 +290,7 @@ char *get_task_list(struct sched *s)
        list_for_each_entry_safe(t, tmp, &s->task_list, node) {
                char *tmp_msg;
                tmp_msg = make_message("%s%p\t%s\t%s\n", msg? msg : "", t,
-                       t->error < 0? "zombie" : "running",
+                       t->error < 0? (t->dead? "dead" : "zombie") : "running",
                        t->status);
                free(msg);
                msg = tmp_msg;
diff --git a/sched.h b/sched.h
index 40524845fdaa63b898684c2d7b1abeb650f5b9b1..fd714b618b681720b4ee187401f57d39fbd001f7 100644 (file)
--- a/sched.h
+++ b/sched.h
@@ -55,6 +55,8 @@ struct task {
        int notification;
        /** Whether the task structure should be freed in sched_shutdown(). */
        bool owned_by_sched;
+       /** True if task is in error state and exit status has been queried. */
+       bool dead;
        /** Usually a pointer to the struct containing this task. */
        void *context;
 };
@@ -104,6 +106,7 @@ char *get_task_list(struct sched *s);
 void task_notify(struct task *t, int err);
 void task_notify_all(struct sched *s, int err);
 int task_get_notification(const struct task *t);
+int task_reap(struct task **tptr);
 void sched_min_delay(struct sched *s);
 void sched_request_timeout(struct timeval *to, struct sched *s);
 void sched_request_timeout_ms(long unsigned ms, struct sched *s);
index 15cf73eb210dd0354b27abf795e60fdd1959377b..4d4b67f9b772d9d178ddbd92bc9d10fbf9f347ec 100644 (file)
@@ -29,7 +29,7 @@
 
 static void udp_recv_pre_select(struct sched *s, struct task *t)
 {
-       struct receiver_node *rn = container_of(t, struct receiver_node, task);
+       struct receiver_node *rn = task_context(t);
 
        if (generic_recv_pre_select(s, t) <= 0)
                return;
@@ -56,7 +56,7 @@ static int udp_check_eof(size_t sz, struct iovec iov[2])
 
 static int udp_recv_post_select(__a_unused struct sched *s, struct task *t)
 {
-       struct receiver_node *rn = container_of(t, struct receiver_node, task);
+       struct receiver_node *rn = task_context(t);
        struct btr_node *btrn = rn->btrn;
        size_t num_bytes;
        struct iovec iov[2];