]> git.tuebingen.mpg.de Git - paraslash.git/blobdiff - sched.c
paraslash 0.7.3
[paraslash.git] / sched.c
diff --git a/sched.c b/sched.c
index 8b68667bf453c481df6950ec727be3b134037f99..20822038b81aff04d8ee8c21f7f8d7773831ca37 100644 (file)
--- a/sched.c
+++ b/sched.c
@@ -1,13 +1,8 @@
-/*
- * Copyright (C) 2006-2014 Andre Noll <maan@systemlinux.org>
- *
- * Licensed under the GPL v2. For licencing details see COPYING.
- */
+/* Copyright (C) 2006 Andre Noll <maan@tuebingen.mpg.de>, see file COPYING. */
 
 /** \file sched.c Paraslash's scheduling functions. */
 
 #include <regex.h>
-#include <assert.h>
 
 #include "para.h"
 #include "ipc.h"
 #include "time.h"
 #include "error.h"
 
-static struct timeval now_struct;
-struct timeval *now = &now_struct;
-
-/*
- * Remove a task from the scheduler.
- *
- * \param t The task to remove.
- *
- * If the pre_select pointer of \a t is not \p NULL, it is removed from
- * the pre_select list of the scheduler. Same goes for \a post_select.
+/**
+ * The possible states of a task.
+ *
+ * In addition to the states listed here, a task may also enter zombie state.
+ * This happens when its ->post_monitor function returns negative, the ->status
+ * field is then set to this return value. Such tasks are not scheduled any
+ * more (i.e. ->pre_monitor() and ->post_monitor() are no longer called), but
+ * they stay on the scheduler task list until \ref task_reap() or
+ * \ref sched_shutdown() is called.
  */
-static void unregister_task(struct task *t)
-{
-       assert(t->error < 0);
-       PARA_INFO_LOG("unregistering %s (%s)\n", t->status,
-               para_strerror(-t->error));
-       if (t->pre_select)
-               list_del(&t->pre_select_node);
-       if (t->post_select)
-               list_del(&t->post_select_node);
-}
+enum task_status {
+       /** Task has been reaped and may be removed from the task list. */
+       TS_DEAD,
+       /** Task is active. */
+       TS_RUNNING,
+};
 
-static inline bool timeout_is_zero(struct sched *s)
-{
-       struct timeval *tv = &s->select_timeout;
-       return tv->tv_sec == 0 && tv->tv_usec == 0;
-}
+struct task {
+       /** A copy of the task name supplied when the task was registered. */
+       char *name;
+       /** Copied during task_register(). */
+       struct task_info info;
+       /* TS_RUNNING, TS_DEAD, or zombie (negative value). */
+       int status;
+       /** Position of the task in the task list of the scheduler. */
+       struct list_head node;
+       /** If less than zero, the task was notified by another task. */
+       int notification;
+};
 
-static void sched_preselect(struct sched *s)
+static struct timeval now_struct;
+const struct timeval *now = &now_struct;
+
+static void sched_pre_monitor(struct sched *s)
 {
        struct task *t, *tmp;
 
-       list_for_each_entry_safe(t, tmp, &s->pre_select_list, pre_select_node) {
+       list_for_each_entry_safe(t, tmp, &s->task_list, node) {
+               if (t->status < 0)
+                       continue;
                if (t->notification != 0)
                        sched_min_delay(s);
-               if (t->pre_select)
-                       t->pre_select(s, t);
+               if (t->info.pre_monitor)
+                       t->info.pre_monitor(s, t->info.context);
        }
 }
 
+static void unlink_and_free_task(struct task *t)
+{
+       list_del(&t->node);
+       free(t->name);
+       free(t);
+}
+
 //#define SCHED_DEBUG 1
-static inline void call_post_select(struct sched *s, struct task *t)
+static inline void call_post_monitor(struct sched *s, struct task *t)
 {
+       int ret;
+
 #ifndef SCHED_DEBUG
-       t->error = t->post_select(s, t);
+       ret = t->info.post_monitor(s, t->info.context);
 #else
        struct timeval t1, t2, diff;
        unsigned long pst;
 
        clock_get_realtime(&t1);
-       t->error = t->post_select(s, t);
+       ret = t->info.post_monitor(s, t->info.context);
        clock_get_realtime(&t2);
        tv_diff(&t1, &t2, &diff);
        pst = tv2ms(&diff);
        if (pst > 50)
-               PARA_WARNING_LOG("%s: post_select time: %lums\n",
-                       t->status, pst);
+               PARA_WARNING_LOG("%s: post_monitor time: %lums\n",
+                       t->name, pst);
 #endif
+       t->status = ret < 0? ret : TS_RUNNING;
 }
 
-static void sched_post_select(struct sched *s)
+static unsigned sched_post_monitor(struct sched *s)
 {
        struct task *t, *tmp;
+       unsigned num_running_tasks = 0;
 
-       list_for_each_entry_safe(t, tmp, &s->post_select_list, post_select_node) {
-               if (t->error >= 0)
-                       call_post_select(s, t);
-//             PARA_INFO_LOG("%s: %d\n", t->status, t->ret);
-               t->notification = 0;
-               if (t->error >= 0)
-                       continue;
-               unregister_task(t);
+       list_for_each_entry_safe(t, tmp, &s->task_list, node) {
+               if (t->status == TS_DEAD) /* task has been reaped */
+                       unlink_and_free_task(t);
+               else if (t->status == TS_RUNNING) {
+                       call_post_monitor(s, t); /* sets t->status */
+                       t->notification = 0;
+                       if (t->status == TS_RUNNING)
+                               num_running_tasks++;
+               }
        }
+       return num_running_tasks;
 }
 
 /**
- * The core function for all paraslash programs.
+ * The core function of all paraslash programs.
  *
  * \param s Pointer to the scheduler struct.
  *
- * This function updates the global \a now pointer, calls all registered
- * pre_select hooks which may set the timeout and add any file descriptors to
- * the fd sets of \a s.  Next, it calls para_select() and makes the result available
- * to the registered tasks by calling their post_select hook.
+ * This function updates the global now pointer, calls all registered
+ * pre_monitor hooks which may set the timeout and add any file descriptors to
+ * the pollfd array. Next, it calls the poll function and makes the result
+ * available to the registered tasks by calling their post_monitor hook.
  *
- * \return Zero if no more tasks are left in either of the two lists, negative
- * if para_select returned an error.
+ * \return Zero if no more tasks are left in the task list, negative if the
+ * poll function returned an error.
  *
- * \sa task, now.
+ * \sa \ref now.
  */
 int schedule(struct sched *s)
 {
        int ret;
+       unsigned num_running_tasks;
 
-       if (!s->select_function)
-               s->select_function = para_select;
+       if (!s->poll_function)
+               s->poll_function = xpoll;
 again:
-       FD_ZERO(&s->rfds);
-       FD_ZERO(&s->wfds);
-       s->select_timeout = s->default_timeout;
-       s->max_fileno = -1;
-       clock_get_realtime(now);
-       sched_preselect(s);
-       ret = s->select_function(s->max_fileno + 1, &s->rfds, &s->wfds,
-               &s->select_timeout);
+       s->num_pfds = 0;
+       if (s->pidx)
+               memset(s->pidx, 0xff, s->pidx_array_len * sizeof(unsigned));
+       s->timeout = s->default_timeout;
+       clock_get_realtime(&now_struct);
+       sched_pre_monitor(s);
+       ret = s->poll_function(s->pfd, s->num_pfds, s->timeout);
        if (ret < 0)
                return ret;
-       if (ret == 0) {
-               /*
-                * APUE: Be careful not to check the descriptor sets on return
-                * unless the return value is greater than zero. The return
-                * state of the descriptor sets is implementation dependent if
-                * either a signal is caught or the timer expires.
-                */
-               FD_ZERO(&s->rfds);
-               FD_ZERO(&s->wfds);
-       }
-       clock_get_realtime(now);
-       sched_post_select(s);
-       if (list_empty(&s->pre_select_list) && list_empty(&s->post_select_list))
+       clock_get_realtime(&now_struct);
+       num_running_tasks = sched_post_monitor(s);
+       if (num_running_tasks == 0)
                return 0;
        goto again;
 }
 
 /**
- * Add a task to the scheduler.
+ * Obtain the error status of a task and deallocate its resources.
  *
- * \param t The task to add.
- * \param s The scheduler instance to add the task to.
+ * \param tptr Identifies the task to reap.
  *
- * If the pre_select pointer of \a t is not \p NULL, it is added to
- * the pre_select list of the scheduler. Same goes for post_select.
+ * This function is similar to wait(2) in that it returns information about a
+ * terminated task which allows releasing the resources associated with the
+ * task. Until this function is called, the terminated task remains in a zombie
+ * state.
  *
- * \sa task::pre_select, task::post_select
+ * \return If \a tptr is \p NULL, or \a *tptr is \p NULL, the function does
+ * nothing and returns zero. Otherwise, it is checked whether the task
+ * identified by \a tptr is still running. If it is, the function returns zero
+ * and again, no action is taken. Otherwise the (negative) error code of the
+ * terminated task is returned and \a *tptr is set to \p NULL. The task will
+ * then be removed removed from the scheduler task list.
+ *
+ * \sa \ref sched_shutdown(), wait(2).
  */
-void register_task(struct sched *s, struct task *t)
+int task_reap(struct task **tptr)
 {
-       PARA_INFO_LOG("registering %s (%p)\n", t->status, t);
-       t->notification = 0;
-       if (!s->pre_select_list.next)
-               INIT_LIST_HEAD(&s->pre_select_list);
-       if (!s->post_select_list.next)
-               INIT_LIST_HEAD(&s->post_select_list);
-       if (t->pre_select) {
-               PARA_DEBUG_LOG("pre_select: %p\n", &t->pre_select);
-               list_add_tail(&t->pre_select_node, &s->pre_select_list);
-       }
-       if (t->post_select) {
-               PARA_DEBUG_LOG("post_select: %p\n", &t->post_select);
-               list_add_tail(&t->post_select_node, &s->post_select_list);
+       struct task *t;
+       int ret;
+
+       if (!tptr)
+               return 0;
+       t = *tptr;
+       if (!t)
+               return 0;
+       if (t->status >= 0)
+               return 0;
+       ret = t->status;
+       PARA_INFO_LOG("reaping %s: %s\n", t->name, para_strerror(-ret));
+       /*
+        * With list_for_each_entry_safe() it is only safe to remove the
+        * _current_ list item. Since we are being called from the loop in
+        * schedule() via some task's ->post_monitor() function, freeing the
+        * given task here would result in use-after-free bugs in schedule().
+        * So we only set the task status to TS_DEAD which tells schedule() to
+        * free the task in the next iteration of its loop.
+        */
+       t->status = TS_DEAD;
+
+       *tptr = NULL;
+       return ret;
+}
+
+/**
+ * Deallocate all resources of all tasks of a scheduler instance.
+ *
+ * \param s The scheduler instance.
+ *
+ * This should only be called after \ref schedule() has returned.
+ */
+void sched_shutdown(struct sched *s)
+{
+       struct task *t, *tmp;
+
+       list_for_each_entry_safe(t, tmp, &s->task_list, node) {
+               if (t->status == TS_RUNNING)
+                       /* The task list should contain only terminated tasks. */
+                       PARA_WARNING_LOG("shutting down running task %s\n",
+                               t->name);
+               unlink_and_free_task(t);
        }
+       free(s->pfd);
+       free(s->pidx);
+}
+
+/**
+ * Add a task to the scheduler task list.
+ *
+ * \param info Task information supplied by the caller.
+ * \param s The scheduler instance.
+ *
+ * \return A pointer to a newly allocated task structure. It will be
+ * freed by sched_shutdown().
+ */
+struct task *task_register(struct task_info *info, struct sched *s)
+{
+       struct task *t = alloc(sizeof(*t));
+
+       assert(info->post_monitor);
+
+       if (!s->task_list.next)
+               init_list_head(&s->task_list);
+
+       t->info = *info;
+       t->name = para_strdup(info->name);
+       t->notification = 0;
+       t->status = TS_RUNNING;
+       list_add_tail(&t->node, &s->task_list);
+       return t;
 }
 
 /**
@@ -186,21 +257,15 @@ char *get_task_list(struct sched *s)
        struct task *t, *tmp;
        char *msg = NULL;
 
-       list_for_each_entry_safe(t, tmp, &s->pre_select_list, pre_select_node) {
-               char *tmp_msg;
-               tmp_msg = make_message("%s%p\tpre\t%s\n", msg? msg : "", t, t->status);
-               free(msg);
-               msg = tmp_msg;
-       }
-       list_for_each_entry_safe(t, tmp, &s->post_select_list, post_select_node) {
+       list_for_each_entry_safe(t, tmp, &s->task_list, node) {
                char *tmp_msg;
-//             if (t->pre_select)
-//                     continue;
-               tmp_msg = make_message("%s%p\tpost\t%s\n", msg? msg : "", t, t->status);
+               tmp_msg = make_message("%s%p\t%s\t%s\n", msg? msg : "", t,
+                       t->status == TS_DEAD? "dead" :
+                               (t->status == TS_RUNNING? "running" : "zombie"),
+                       t->name);
                free(msg);
                msg = tmp_msg;
        }
-       //PARA_DEBUG_LOG("task list:\n%s", msg);
        return msg;
 }
 
@@ -211,14 +276,14 @@ char *get_task_list(struct sched *s)
  * \param err A positive error code.
  *
  * Tasks which honor notifications are supposed to call \ref
- * task_get_notification() in their post_select function and act on the
+ * task_get_notification() in their post_monitor function and act on the
  * returned notification value.
  *
- * If the scheduler detects during its pre_select loop that at least one task
- * has been notified, the loop terminates, and the post_select methods of all
+ * If the scheduler detects during its pre_monitor loop that at least one task
+ * has been notified, the loop terminates, and the post_monitor methods of all
  * taks are immediately called again.
  *
- * The notification for a task is reset after the call to its post_select
+ * The notification for a task is reset after the call to its post_monitor
  * method.
  *
  * \sa \ref task_get_notification().
@@ -228,7 +293,7 @@ void task_notify(struct task *t, int err)
        assert(err > 0);
        if (t->notification == -err) /* ignore subsequent notifications */
                return;
-       PARA_INFO_LOG("notifying task %s: %s\n", t->status, para_strerror(err));
+       PARA_INFO_LOG("notifying task %s: %s\n", t->name, para_strerror(err));
        t->notification = -err;
 }
 
@@ -239,15 +304,34 @@ void task_notify(struct task *t, int err)
  *
  * \return The notification value. If this is negative, the task has been
  * notified by another task. Tasks are supposed to check for notifications by
- * calling this function from their post_select method.
+ * calling this function from their post_monitor method.
  *
  * \sa \ref task_notify().
  */
-int task_get_notification(struct task *t)
+int task_get_notification(const struct task *t)
 {
        return t->notification;
 }
 
+/**
+ * Return the status value of a task.
+ *
+ * \param t The task to get the status value from.
+ *
+ * \return Zero if task does not exist, one if task is running, negative error
+ * code if task has terminated.
+ */
+int task_status(const struct task *t)
+{
+       if (!t)
+               return 0;
+       if (t->status == TS_DEAD) /* pretend dead tasks don't exist */
+               return 0;
+       if (t->status == TS_RUNNING)
+               return 1;
+       return t->status;
+}
+
 /**
  * Set the notification value of all tasks of a scheduler instance.
  *
@@ -261,50 +345,48 @@ void task_notify_all(struct sched *s, int err)
 {
        struct task *t;
 
-       list_for_each_entry(t, &s->pre_select_list, pre_select_node)
-               task_notify(t, err);
-       list_for_each_entry(t, &s->post_select_list, post_select_node)
+       list_for_each_entry(t, &s->task_list, node)
                task_notify(t, err);
 }
 
 /**
- * Set the select timeout to the minimal possible value.
+ * Set the I/O timeout to the minimal possible value.
  *
  * \param s Pointer to the scheduler struct.
  *
- * This causes the next select() call to return immediately.
+ * This causes the next poll() call to return immediately.
  */
 void sched_min_delay(struct sched *s)
 {
-       s->select_timeout.tv_sec = s->select_timeout.tv_usec = 0;
+       s->timeout = 0;
 }
 
 /**
- * Impose an upper bound for the timeout of the next select() call.
+ * Impose an upper bound for the I/O timeout.
  *
  * \param to Maximal allowed timeout.
  * \param s Pointer to the scheduler struct.
  *
- * If the current scheduler timeout is already smaller than \a to, this
- * function does nothing. Otherwise the timeout for the next select() call is
- * set to the given value.
+ * If the current I/O timeout is already smaller than to, this function does
+ * nothing. Otherwise the timeout is set to the given value.
  *
- * \sa sched_request_timeout_ms().
+ * \sa \ref sched_request_timeout_ms().
  */
 void sched_request_timeout(struct timeval *to, struct sched *s)
 {
-       if (tv_diff(&s->select_timeout, to, NULL) > 0)
-               s->select_timeout = *to;
+       long unsigned ms = tv2ms(to);
+       if (s->timeout > ms)
+               s->timeout = ms;
 }
 
 /**
- * Force the next select() call to return before the given amount of milliseconds.
+ * Bound the I/O timeout to at most the given amount of milliseconds.
  *
  * \param ms The maximal allowed timeout in milliseconds.
  * \param s Pointer to the scheduler struct.
  *
- * Like sched_request_timeout() this imposes an upper bound on the timeout
- * value for the next select() call.
+ * Like \ref sched_request_timeout() this imposes an upper bound on the I/O
+ * timeout.
  */
 void sched_request_timeout_ms(long unsigned ms, struct sched *s)
 {
@@ -314,15 +396,15 @@ void sched_request_timeout_ms(long unsigned ms, struct sched *s)
 }
 
 /**
- * Force the next select() call to return before the given future time.
+ * Bound the I/O timeout by an absolute time in the future.
  *
- * \param barrier Absolute time before select() should return.
+ * \param barrier Defines the upper bound for the timeout.
  * \param s Pointer to the scheduler struct.
  *
- * \return If \a barrier is in the past, this function does nothing and returns
- * zero. Otherwise it returns one.
+ * \return If the barrier is in the past, this function does nothing and
+ * returns zero. Otherwise it returns one.
  *
- * \sa sched_request_barrier_or_min_delay().
+ * \sa \ref sched_request_barrier_or_min_delay().
  */
 int sched_request_barrier(struct timeval *barrier, struct sched *s)
 {
@@ -335,15 +417,15 @@ int sched_request_barrier(struct timeval *barrier, struct sched *s)
 }
 
 /**
- * Force the next select() call to return before the given time.
+ * Bound the I/O timeout or request a minimal delay.
  *
- * \param barrier Absolute time before select() should return.
+ * \param barrier Absolute time as in \ref sched_request_barrier().
  * \param s Pointer to the scheduler struct.
  *
- * \return If \a barrier is in the past, this function requests a minimal
+ * \return If the barrier is in the past, this function requests a minimal
  * timeout and returns zero. Otherwise it returns one.
  *
- * \sa sched_min_delay(), sched_request_barrier().
+ * \sa \ref sched_min_delay(), \ref sched_request_barrier().
  */
 int sched_request_barrier_or_min_delay(struct timeval *barrier, struct sched *s)
 {
@@ -356,3 +438,126 @@ int sched_request_barrier_or_min_delay(struct timeval *barrier, struct sched *s)
        sched_request_timeout(&diff, s);
        return 1;
 }
+
+static void add_pollfd(int fd, struct sched *s, short events)
+{
+       assert(fd >= 0);
+#if 0
+       {
+               int flags = fcntl(fd, F_GETFL);
+               if (!(flags & O_NONBLOCK)) {
+                       PARA_EMERG_LOG("fd %d is a blocking file descriptor\n", fd);
+                       exit(EXIT_FAILURE);
+               }
+       }
+#endif
+       if (s->pidx_array_len > fd) { /* is fd already registered? */
+               if (s->pidx[fd] < s->pfd_array_len) { /* yes, it is */
+                       assert(s->pfd[s->pidx[fd]].fd == fd);
+                       s->pfd[s->pidx[fd]].events |= events;
+                       return;
+               }
+       } else { /* need to extend the index array */
+               unsigned old_len = s->pidx_array_len;
+               while (s->pidx_array_len <= fd)
+                       s->pidx_array_len = s->pidx_array_len * 2 + 1;
+               PARA_INFO_LOG("pidx array len: %u\n", s->pidx_array_len);
+               s->pidx = para_realloc(s->pidx,
+                       s->pidx_array_len * sizeof(unsigned));
+               memset(s->pidx + old_len, 0xff,
+                       (s->pidx_array_len - old_len) * sizeof(unsigned));
+       }
+       /*
+        * The given fd is not part of the pfd array yet. Initialize pidx[fd]
+        * to point at the next unused slot of this array and initialize the
+        * slot.
+        */
+       s->pidx[fd] = s->num_pfds;
+       if (s->pfd_array_len <= s->num_pfds) {
+               unsigned old_len = s->pfd_array_len;
+               s->pfd_array_len = old_len * 2 + 1;
+               PARA_INFO_LOG("pfd array len: %u\n", s->pfd_array_len);
+               s->pfd = para_realloc(s->pfd,
+                       s->pfd_array_len * sizeof(struct pollfd));
+               memset(s->pfd + old_len, 0,
+                       (s->pfd_array_len - old_len) * sizeof(struct pollfd));
+       }
+       s->pfd[s->num_pfds].fd = fd;
+       s->pfd[s->num_pfds].events = events;
+       s->pfd[s->num_pfds].revents = 0;
+       s->num_pfds++;
+}
+
+/**
+ * Instruct the scheduler to monitor an fd for readiness for reading.
+ *
+ * \param fd The file descriptor.
+ * \param s The scheduler.
+ *
+ * \sa \ref sched_monitor_writefd().
+ */
+void sched_monitor_readfd(int fd, struct sched *s)
+{
+       add_pollfd(fd, s, POLLIN);
+}
+
+/**
+ * Instruct the scheduler to monitor an fd for readiness for writing.
+ *
+ * \param fd The file descriptor.
+ * \param s The scheduler.
+ *
+ * \sa \ref sched_monitor_readfd().
+ */
+void sched_monitor_writefd(int fd, struct sched *s)
+{
+       add_pollfd(fd, s, POLLOUT);
+}
+
+static int get_revents(int fd, const struct sched *s)
+{
+       if (fd < 0)
+               return 0;
+       if (fd >= s->pidx_array_len)
+               return 0;
+       if (s->pidx[fd] >= s->num_pfds)
+               return 0;
+       if (s->pfd[s->pidx[fd]].fd != fd)
+               return 0;
+       assert((s->pfd[s->pidx[fd]].revents & POLLNVAL) == 0);
+       return s->pfd[s->pidx[fd]].revents;
+}
+
+/**
+ * Check whether there is data to read on the given fd.
+ *
+ * To be called from the ->post_monitor() method of a task.
+ *
+ * \param fd Should have been monitored with \ref sched_monitor_readfd().
+ * \param s The scheduler instance.
+ *
+ * \return True if the file descriptor is ready for reading, false otherwise.
+ * If fd is negative, or has not been monitored in the current iteration of the
+ * scheduler's main loop, the function also returns false.
+ *
+ * \sa \ref sched_write_ok().
+ */
+bool sched_read_ok(int fd, const struct sched *s)
+{
+       return get_revents(fd, s) & (POLLIN | POLLERR | POLLHUP);
+}
+
+/**
+ * Check whether writing is possible (i.e., does not block).
+ *
+ * \param fd Should have been monitored with \ref sched_monitor_writefd().
+ * \param s The scheduler instance.
+ *
+ * \return True if the file descriptor is ready for writing, false otherwise.
+ * The comment in \ref sched_read_ok() about invalid file descriptors applies
+ * to this function as well.
+ */
+bool sched_write_ok(int fd, const struct sched *s)
+{
+       return get_revents(fd, s) & (POLLOUT | POLLERR | POLLHUP);
+}