-/*
- * Copyright (C) 2006-2014 Andre Noll <maan@systemlinux.org>
- *
- * Licensed under the GPL v2. For licencing details see COPYING.
- */
+/* Copyright (C) 2006 Andre Noll <maan@tuebingen.mpg.de>, see file COPYING. */
/** \file sched.c Paraslash's scheduling functions. */
#include <regex.h>
-#include <assert.h>
#include "para.h"
#include "ipc.h"
#include "time.h"
#include "error.h"
+/**
+ * The possible states of a task.
+ *
+ * In addition to the states listed here, a task may also enter zombie state.
+ * This happens when its ->post_monitor function returns negative, the ->status
+ * field is then set to this return value. Such tasks are not scheduled any
+ * more (i.e. ->pre_monitor() and ->post_monitor() are no longer called), but
+ * they stay on the scheduler task list until \ref task_reap() or
+ * \ref sched_shutdown() is called.
+ */
+enum task_status {
+ /** Task has been reaped and may be removed from the task list. */
+ TS_DEAD,
+ /** Task is active. */
+ TS_RUNNING,
+};
+
struct task {
- /** Copied from the task_info struct during task_register(). */
- void (*pre_select)(struct sched *s, struct task *t);
- /** Copied from the task_info struct during task_register(). */
- int (*post_select)(struct sched *s, struct task *t);
- /** Whether this task is active (>=0) or in error state (<0). */
- int error;
+ /** A copy of the task name supplied when the task was registered. */
+ char *name;
+ /** Copied during task_register(). */
+ struct task_info info;
+ /* TS_RUNNING, TS_DEAD, or zombie (negative value). */
+ int status;
/** Position of the task in the task list of the scheduler. */
struct list_head node;
- /** The task name supplied when the task was registered(). */
- char status[255];
/** If less than zero, the task was notified by another task. */
int notification;
- /** True if task is in error state and exit status has been queried. */
- bool dead;
- /** Usually a pointer to the struct containing this task. */
- void *context;
};
static struct timeval now_struct;
-struct timeval *now = &now_struct;
+const struct timeval *now = &now_struct;
-static inline bool timeout_is_zero(struct sched *s)
-{
- struct timeval *tv = &s->select_timeout;
- return tv->tv_sec == 0 && tv->tv_usec == 0;
-}
-
-static void sched_preselect(struct sched *s)
+static void sched_pre_monitor(struct sched *s)
{
struct task *t, *tmp;
list_for_each_entry_safe(t, tmp, &s->task_list, node) {
- if (t->error < 0)
+ if (t->status < 0)
continue;
if (t->notification != 0)
sched_min_delay(s);
- if (t->pre_select)
- t->pre_select(s, t);
+ if (t->info.pre_monitor)
+ t->info.pre_monitor(s, t->info.context);
}
}
static void unlink_and_free_task(struct task *t)
{
- PARA_INFO_LOG("freeing task %s\n", t->status);
+ PARA_INFO_LOG("freeing task %s (%s)\n", t->name, t->status < 0?
+ para_strerror(-t->status) :
+ (t->status == TS_DEAD? "[dead]" : "[running]"));
+
list_del(&t->node);
+ free(t->name);
free(t);
}
//#define SCHED_DEBUG 1
-static inline void call_post_select(struct sched *s, struct task *t)
+static inline void call_post_monitor(struct sched *s, struct task *t)
{
+ int ret;
+
#ifndef SCHED_DEBUG
- t->error = t->post_select(s, t);
+ ret = t->info.post_monitor(s, t->info.context);
#else
struct timeval t1, t2, diff;
unsigned long pst;
clock_get_realtime(&t1);
- t->error = t->post_select(s, t);
+ ret = t->info.post_monitor(s, t->info.context);
clock_get_realtime(&t2);
tv_diff(&t1, &t2, &diff);
pst = tv2ms(&diff);
if (pst > 50)
- PARA_WARNING_LOG("%s: post_select time: %lums\n",
- t->status, pst);
+ PARA_WARNING_LOG("%s: post_monitor time: %lums\n",
+ t->name, pst);
#endif
+ t->status = ret < 0? ret : TS_RUNNING;
}
-static unsigned sched_post_select(struct sched *s)
+static unsigned sched_post_monitor(struct sched *s)
{
struct task *t, *tmp;
unsigned num_running_tasks = 0;
list_for_each_entry_safe(t, tmp, &s->task_list, node) {
- if (t->error < 0) {
- if (t->dead) /* task has been reaped */
- unlink_and_free_task(t);
- continue;
+ if (t->status == TS_DEAD) /* task has been reaped */
+ unlink_and_free_task(t);
+ else if (t->status == TS_RUNNING) {
+ call_post_monitor(s, t); /* sets t->status */
+ t->notification = 0;
+ if (t->status == TS_RUNNING)
+ num_running_tasks++;
}
- call_post_select(s, t);
- t->notification = 0;
- if (t->error >= 0)
- num_running_tasks++;
}
return num_running_tasks;
}
*
* \param s Pointer to the scheduler struct.
*
- * This function updates the global \a now pointer, calls all registered
- * pre_select hooks which may set the timeout and add any file descriptors to
- * the fd sets of \a s. Next, it calls para_select() and makes the result available
- * to the registered tasks by calling their post_select hook.
+ * This function updates the global now pointer, calls all registered
+ * pre_monitor hooks which may set the timeout and add any file descriptors to
+ * the pollfd array. Next, it calls the poll function and makes the result
+ * available to the registered tasks by calling their post_monitor hook.
*
* \return Zero if no more tasks are left in the task list, negative if the
- * select function returned an error.
+ * poll function returned an error.
*
- * \sa \ref task, \ref now.
+ * \sa \ref now.
*/
int schedule(struct sched *s)
{
int ret;
unsigned num_running_tasks;
- if (!s->select_function)
- s->select_function = para_select;
+ if (!s->poll_function)
+ s->poll_function = xpoll;
again:
- FD_ZERO(&s->rfds);
- FD_ZERO(&s->wfds);
- s->select_timeout = s->default_timeout;
- s->max_fileno = -1;
- clock_get_realtime(now);
- sched_preselect(s);
- ret = s->select_function(s->max_fileno + 1, &s->rfds, &s->wfds,
- &s->select_timeout);
+ s->num_pfds = 0;
+ if (s->pidx)
+ memset(s->pidx, 0xff, s->pidx_array_len * sizeof(unsigned));
+ s->timeout = s->default_timeout;
+ clock_get_realtime(&now_struct);
+ sched_pre_monitor(s);
+ ret = s->poll_function(s->pfd, s->num_pfds, s->timeout);
if (ret < 0)
return ret;
- if (ret == 0) {
- /*
- * APUE: Be careful not to check the descriptor sets on return
- * unless the return value is greater than zero. The return
- * state of the descriptor sets is implementation dependent if
- * either a signal is caught or the timer expires.
- */
- FD_ZERO(&s->rfds);
- FD_ZERO(&s->wfds);
- }
- clock_get_realtime(now);
- num_running_tasks = sched_post_select(s);
+ clock_get_realtime(&now_struct);
+ num_running_tasks = sched_post_monitor(s);
if (num_running_tasks == 0)
return 0;
goto again;
* \param tptr Identifies the task to reap.
*
* This function is similar to wait(2) in that it returns information about a
- * terminated task and allows to release the resources associated with the
+ * terminated task which allows releasing the resources associated with the
* task. Until this function is called, the terminated task remains in a zombie
* state.
*
int task_reap(struct task **tptr)
{
struct task *t;
+ int ret;
if (!tptr)
return 0;
t = *tptr;
if (!t)
return 0;
- if (t->error >= 0)
- return 0;
- if (t->dead) /* will be freed in sched_post_select() */
+ if (t->status >= 0)
return 0;
+ ret = t->status;
/*
* With list_for_each_entry_safe() it is only safe to remove the
* _current_ list item. Since we are being called from the loop in
- * schedule() via some task's ->post_select() function, freeing the
+ * schedule() via some task's ->post_monitor() function, freeing the
* given task here would result in use-after-free bugs in schedule().
- * So we only set t->dead which tells schedule() to free the task in
- * the next iteration of its loop.
+ * So we only set the task status to TS_DEAD which tells schedule() to
+ * free the task in the next iteration of its loop.
*/
- t->dead = true;
+ t->status = TS_DEAD;
+
*tptr = NULL;
- return t->error;
+ return ret;
}
/**
struct task *t, *tmp;
list_for_each_entry_safe(t, tmp, &s->task_list, node) {
- if (t->error >= 0)
+ if (t->status == TS_RUNNING)
/* The task list should contain only terminated tasks. */
PARA_WARNING_LOG("shutting down running task %s\n",
- t->status);
+ t->name);
unlink_and_free_task(t);
}
+ free(s->pfd);
+ free(s->pidx);
}
/**
{
struct task *t = para_malloc(sizeof(*t));
- assert(info->post_select);
+ assert(info->post_monitor);
if (!s->task_list.next)
- INIT_LIST_HEAD(&s->task_list);
+ init_list_head(&s->task_list);
- snprintf(t->status, sizeof(t->status) - 1, "%s", info->name);
- t->status[sizeof(t->status) - 1] = '\0';
+ t->info = *info;
+ t->name = para_strdup(info->name);
t->notification = 0;
- t->error = 0;
- t->dead = false;
- t->pre_select = info->pre_select;
- t->post_select = info->post_select;
- t->context = info->context;
+ t->status = TS_RUNNING;
list_add_tail(&t->node, &s->task_list);
return t;
}
-/**
- * Obtain the context pointer of a task.
- *
- * \param t Return this task's context pointer.
- *
- * \return A pointer to the memory location specified previously as \a
- * task_info->context when the task was registered with \ref task_register().
- */
-void *task_context(struct task *t)
-{
- return t->context;
-}
-
/**
* Get the list of all registered tasks.
*
list_for_each_entry_safe(t, tmp, &s->task_list, node) {
char *tmp_msg;
tmp_msg = make_message("%s%p\t%s\t%s\n", msg? msg : "", t,
- t->error < 0? (t->dead? "dead" : "zombie") : "running",
- t->status);
+ t->status == TS_DEAD? "dead" :
+ (t->status == TS_RUNNING? "running" : "zombie"),
+ t->name);
free(msg);
msg = tmp_msg;
}
* \param err A positive error code.
*
* Tasks which honor notifications are supposed to call \ref
- * task_get_notification() in their post_select function and act on the
+ * task_get_notification() in their post_monitor function and act on the
* returned notification value.
*
- * If the scheduler detects during its pre_select loop that at least one task
- * has been notified, the loop terminates, and the post_select methods of all
+ * If the scheduler detects during its pre_monitor loop that at least one task
+ * has been notified, the loop terminates, and the post_monitor methods of all
* taks are immediately called again.
*
- * The notification for a task is reset after the call to its post_select
+ * The notification for a task is reset after the call to its post_monitor
* method.
*
* \sa \ref task_get_notification().
assert(err > 0);
if (t->notification == -err) /* ignore subsequent notifications */
return;
- PARA_INFO_LOG("notifying task %s: %s\n", t->status, para_strerror(err));
+ PARA_INFO_LOG("notifying task %s: %s\n", t->name, para_strerror(err));
t->notification = -err;
}
*
* \return The notification value. If this is negative, the task has been
* notified by another task. Tasks are supposed to check for notifications by
- * calling this function from their post_select method.
+ * calling this function from their post_monitor method.
*
* \sa \ref task_notify().
*/
{
if (!t)
return 0;
- if (t->dead)
+ if (t->status == TS_DEAD) /* pretend dead tasks don't exist */
return 0;
- if (t->error >= 0)
+ if (t->status == TS_RUNNING)
return 1;
- return t->error;
+ return t->status;
}
/**
}
/**
- * Set the select timeout to the minimal possible value.
+ * Set the I/O timeout to the minimal possible value.
*
* \param s Pointer to the scheduler struct.
*
- * This causes the next select() call to return immediately.
+ * This causes the next poll() call to return immediately.
*/
void sched_min_delay(struct sched *s)
{
- s->select_timeout.tv_sec = s->select_timeout.tv_usec = 0;
+ s->timeout = 0;
}
/**
- * Impose an upper bound for the timeout of the next select() call.
+ * Impose an upper bound for the I/O timeout.
*
* \param to Maximal allowed timeout.
* \param s Pointer to the scheduler struct.
*
- * If the current scheduler timeout is already smaller than \a to, this
- * function does nothing. Otherwise the timeout for the next select() call is
- * set to the given value.
+ * If the current I/O timeout is already smaller than to, this function does
+ * nothing. Otherwise the timeout is set to the given value.
*
- * \sa sched_request_timeout_ms().
+ * \sa \ref sched_request_timeout_ms().
*/
void sched_request_timeout(struct timeval *to, struct sched *s)
{
- if (tv_diff(&s->select_timeout, to, NULL) > 0)
- s->select_timeout = *to;
+ long unsigned ms = tv2ms(to);
+ if (s->timeout > ms)
+ s->timeout = ms;
}
/**
- * Force the next select() call to return before the given amount of milliseconds.
+ * Bound the I/O timeout to at most the given amount of milliseconds.
*
* \param ms The maximal allowed timeout in milliseconds.
* \param s Pointer to the scheduler struct.
*
- * Like sched_request_timeout() this imposes an upper bound on the timeout
- * value for the next select() call.
+ * Like \ref sched_request_timeout() this imposes an upper bound on the I/O
+ * timeout.
*/
void sched_request_timeout_ms(long unsigned ms, struct sched *s)
{
}
/**
- * Force the next select() call to return before the given future time.
+ * Bound the I/O timeout by an absolute time in the future.
*
- * \param barrier Absolute time before select() should return.
+ * \param barrier Defines the upper bound for the timeout.
* \param s Pointer to the scheduler struct.
*
- * \return If \a barrier is in the past, this function does nothing and returns
- * zero. Otherwise it returns one.
+ * \return If the barrier is in the past, this function does nothing and
+ * returns zero. Otherwise it returns one.
*
- * \sa sched_request_barrier_or_min_delay().
+ * \sa \ref sched_request_barrier_or_min_delay().
*/
int sched_request_barrier(struct timeval *barrier, struct sched *s)
{
}
/**
- * Force the next select() call to return before the given time.
+ * Bound the I/O timeout or request a minimal delay.
*
- * \param barrier Absolute time before select() should return.
+ * \param barrier Absolute time as in \ref sched_request_barrier().
* \param s Pointer to the scheduler struct.
*
- * \return If \a barrier is in the past, this function requests a minimal
+ * \return If the barrier is in the past, this function requests a minimal
* timeout and returns zero. Otherwise it returns one.
*
- * \sa sched_min_delay(), sched_request_barrier().
+ * \sa \ref sched_min_delay(), \ref sched_request_barrier().
*/
int sched_request_barrier_or_min_delay(struct timeval *barrier, struct sched *s)
{
sched_request_timeout(&diff, s);
return 1;
}
+
+static void add_pollfd(int fd, struct sched *s, short events)
+{
+ assert(fd >= 0);
+#if 0
+ {
+ int flags = fcntl(fd, F_GETFL);
+ if (!(flags & O_NONBLOCK)) {
+ PARA_EMERG_LOG("fd %d is a blocking file descriptor\n", fd);
+ exit(EXIT_FAILURE);
+ }
+ }
+#endif
+ if (s->pidx_array_len > fd) { /* is fd already registered? */
+ if (s->pidx[fd] < s->pfd_array_len) { /* yes, it is */
+ assert(s->pfd[s->pidx[fd]].fd == fd);
+ s->pfd[s->pidx[fd]].events |= events;
+ return;
+ }
+ } else { /* need to extend the index array */
+ unsigned old_len = s->pidx_array_len;
+ while (s->pidx_array_len <= fd)
+ s->pidx_array_len = s->pidx_array_len * 2 + 1;
+ PARA_INFO_LOG("pidx array len: %u\n", s->pidx_array_len);
+ s->pidx = para_realloc(s->pidx,
+ s->pidx_array_len * sizeof(unsigned));
+ memset(s->pidx + old_len, 0xff,
+ (s->pidx_array_len - old_len) * sizeof(unsigned));
+ }
+ /*
+ * The given fd is not part of the pfd array yet. Initialize pidx[fd]
+ * to point at the next unused slot of this array and initialize the
+ * slot.
+ */
+ s->pidx[fd] = s->num_pfds;
+ if (s->pfd_array_len <= s->num_pfds) {
+ unsigned old_len = s->pfd_array_len;
+ s->pfd_array_len = old_len * 2 + 1;
+ PARA_INFO_LOG("pfd array len: %u\n", s->pfd_array_len);
+ s->pfd = para_realloc(s->pfd,
+ s->pfd_array_len * sizeof(struct pollfd));
+ memset(s->pfd + old_len, 0,
+ (s->pfd_array_len - old_len) * sizeof(struct pollfd));
+ }
+ s->pfd[s->num_pfds].fd = fd;
+ s->pfd[s->num_pfds].events = events;
+ s->pfd[s->num_pfds].revents = 0;
+ s->num_pfds++;
+}
+
+/**
+ * Instruct the scheduler to monitor an fd for readiness for reading.
+ *
+ * \param fd The file descriptor.
+ * \param s The scheduler.
+ *
+ * \sa \ref sched_monitor_writefd().
+ */
+void sched_monitor_readfd(int fd, struct sched *s)
+{
+ add_pollfd(fd, s, POLLIN);
+}
+
+/**
+ * Instruct the scheduler to monitor an fd for readiness for writing.
+ *
+ * \param fd The file descriptor.
+ * \param s The scheduler.
+ *
+ * \sa \ref sched_monitor_readfd().
+ */
+void sched_monitor_writefd(int fd, struct sched *s)
+{
+ add_pollfd(fd, s, POLLOUT);
+}
+
+static int get_revents(int fd, const struct sched *s)
+{
+ if (fd < 0)
+ return 0;
+ if (fd >= s->pidx_array_len)
+ return 0;
+ if (s->pidx[fd] >= s->num_pfds)
+ return 0;
+ if (s->pfd[s->pidx[fd]].fd != fd)
+ return 0;
+ assert((s->pfd[s->pidx[fd]].revents & POLLNVAL) == 0);
+ return s->pfd[s->pidx[fd]].revents;
+}
+
+/**
+ * Check whether there is data to read on the given fd.
+ *
+ * To be called from the ->post_monitor() method of a task.
+ *
+ * \param fd Should have been monitored with \ref sched_monitor_readfd().
+ * \param s The scheduler instance.
+ *
+ * \return True if the file descriptor is ready for reading, false otherwise.
+ * If fd is negative, or has not been monitored in the current iteration of the
+ * scheduler's main loop, the function also returns false.
+ *
+ * \sa \ref sched_write_ok().
+ */
+bool sched_read_ok(int fd, const struct sched *s)
+{
+ return get_revents(fd, s) & (POLLIN | POLLERR | POLLHUP);
+}
+
+/**
+ * Check whether writing is possible (i.e., does not block).
+ *
+ * \param fd Should have been monitored with \ref sched_monitor_writefd().
+ * \param s The scheduler instance.
+ *
+ * \return True if the file descriptor is ready for writing, false otherwise.
+ * The comment in \ref sched_read_ok() about invalid file descriptors applies
+ * to this function as well.
+ */
+bool sched_write_ok(int fd, const struct sched *s)
+{
+ return get_revents(fd, s) & (POLLOUT | POLLERR | POLLHUP);
+}