* The possible states of a task.
*
* In addition to the states listed here, a task may also enter zombie state.
- * This happens when its ->post_select function returns negative, the ->status
+ * This happens when its ->post_monitor function returns negative, the ->status
* field is then set to this return value. Such tasks are not scheduled any
- * more (i.e. ->pre_select() and ->post_select() are no longer called), but
+ * more (i.e. ->pre_monitor() and ->post_monitor() are no longer called), but
* they stay on the scheduler task list until \ref task_reap() or
* \ref sched_shutdown() is called.
*/
static struct timeval now_struct;
const struct timeval *now = &now_struct;
-static void sched_preselect(struct sched *s)
+static void sched_pre_monitor(struct sched *s)
{
struct task *t, *tmp;
continue;
if (t->notification != 0)
sched_min_delay(s);
- if (t->info.pre_select)
- t->info.pre_select(s, t->info.context);
+ if (t->info.pre_monitor)
+ t->info.pre_monitor(s, t->info.context);
}
}
}
//#define SCHED_DEBUG 1
-static inline void call_post_select(struct sched *s, struct task *t)
+static inline void call_post_monitor(struct sched *s, struct task *t)
{
int ret;
#ifndef SCHED_DEBUG
- ret = t->info.post_select(s, t->info.context);
+ ret = t->info.post_monitor(s, t->info.context);
#else
struct timeval t1, t2, diff;
unsigned long pst;
clock_get_realtime(&t1);
- ret = t->info.post_select(s, t->info.context);
+ ret = t->info.post_monitor(s, t->info.context);
clock_get_realtime(&t2);
tv_diff(&t1, &t2, &diff);
pst = tv2ms(&diff);
if (pst > 50)
- PARA_WARNING_LOG("%s: post_select time: %lums\n",
+ PARA_WARNING_LOG("%s: post_monitor time: %lums\n",
t->name, pst);
#endif
t->status = ret < 0? ret : TS_RUNNING;
}
-static unsigned sched_post_select(struct sched *s)
+static unsigned sched_post_monitor(struct sched *s)
{
struct task *t, *tmp;
unsigned num_running_tasks = 0;
if (t->status == TS_DEAD) /* task has been reaped */
unlink_and_free_task(t);
else if (t->status == TS_RUNNING) {
- call_post_select(s, t); /* sets t->status */
+ call_post_monitor(s, t); /* sets t->status */
t->notification = 0;
if (t->status == TS_RUNNING)
num_running_tasks++;
*
* \param s Pointer to the scheduler struct.
*
- * This function updates the global \a now pointer, calls all registered
- * pre_select hooks which may set the timeout and add any file descriptors to
- * the fd sets of \a s. Next, it calls para_select() and makes the result available
- * to the registered tasks by calling their post_select hook.
+ * This function updates the global now pointer, calls all registered
+ * pre_monitor hooks which may set the timeout and add any file descriptors to
+ * the pollfd array. Next, it calls the poll function and makes the result
+ * available to the registered tasks by calling their post_monitor hook.
*
* \return Zero if no more tasks are left in the task list, negative if the
- * select function returned an error.
+ * poll function returned an error.
*
* \sa \ref now.
*/
int ret;
unsigned num_running_tasks;
- if (!s->select_function)
- s->select_function = para_select;
+ if (!s->poll_function)
+ s->poll_function = xpoll;
again:
- FD_ZERO(&s->rfds);
- FD_ZERO(&s->wfds);
- s->select_timeout = s->default_timeout;
- s->max_fileno = -1;
+ s->num_pfds = 0;
+ if (s->pidx)
+ memset(s->pidx, 0xff, s->pidx_array_len * sizeof(unsigned));
+ s->timeout = s->default_timeout;
clock_get_realtime(&now_struct);
- sched_preselect(s);
- ret = s->select_function(s->max_fileno + 1, &s->rfds, &s->wfds,
- &s->select_timeout);
+ sched_pre_monitor(s);
+ ret = s->poll_function(s->pfd, s->num_pfds, s->timeout);
if (ret < 0)
return ret;
- if (ret == 0) {
- /*
- * APUE: Be careful not to check the descriptor sets on return
- * unless the return value is greater than zero. The return
- * state of the descriptor sets is implementation dependent if
- * either a signal is caught or the timer expires.
- */
- FD_ZERO(&s->rfds);
- FD_ZERO(&s->wfds);
- }
clock_get_realtime(&now_struct);
- num_running_tasks = sched_post_select(s);
+ num_running_tasks = sched_post_monitor(s);
if (num_running_tasks == 0)
return 0;
goto again;
/*
* With list_for_each_entry_safe() it is only safe to remove the
* _current_ list item. Since we are being called from the loop in
- * schedule() via some task's ->post_select() function, freeing the
+ * schedule() via some task's ->post_monitor() function, freeing the
* given task here would result in use-after-free bugs in schedule().
* So we only set the task status to TS_DEAD which tells schedule() to
* free the task in the next iteration of its loop.
t->name);
unlink_and_free_task(t);
}
+ free(s->pfd);
+ free(s->pidx);
}
/**
*/
struct task *task_register(struct task_info *info, struct sched *s)
{
- struct task *t = para_malloc(sizeof(*t));
+ struct task *t = alloc(sizeof(*t));
- assert(info->post_select);
+ assert(info->post_monitor);
if (!s->task_list.next)
init_list_head(&s->task_list);
* \param err A positive error code.
*
* Tasks which honor notifications are supposed to call \ref
- * task_get_notification() in their post_select function and act on the
+ * task_get_notification() in their post_monitor function and act on the
* returned notification value.
*
- * If the scheduler detects during its pre_select loop that at least one task
- * has been notified, the loop terminates, and the post_select methods of all
+ * If the scheduler detects during its pre_monitor loop that at least one task
+ * has been notified, the loop terminates, and the post_monitor methods of all
* taks are immediately called again.
*
- * The notification for a task is reset after the call to its post_select
+ * The notification for a task is reset after the call to its post_monitor
* method.
*
* \sa \ref task_get_notification().
*
* \return The notification value. If this is negative, the task has been
* notified by another task. Tasks are supposed to check for notifications by
- * calling this function from their post_select method.
+ * calling this function from their post_monitor method.
*
* \sa \ref task_notify().
*/
}
/**
- * Set the select timeout to the minimal possible value.
+ * Set the I/O timeout to the minimal possible value.
*
* \param s Pointer to the scheduler struct.
*
- * This causes the next select() call to return immediately.
+ * This causes the next poll() call to return immediately.
*/
void sched_min_delay(struct sched *s)
{
- s->select_timeout.tv_sec = s->select_timeout.tv_usec = 0;
+ s->timeout = 0;
}
/**
- * Impose an upper bound for the timeout of the next select() call.
+ * Impose an upper bound for the I/O timeout.
*
* \param to Maximal allowed timeout.
* \param s Pointer to the scheduler struct.
*
- * If the current scheduler timeout is already smaller than \a to, this
- * function does nothing. Otherwise the timeout for the next select() call is
- * set to the given value.
+ * If the current I/O timeout is already smaller than to, this function does
+ * nothing. Otherwise the timeout is set to the given value.
*
* \sa \ref sched_request_timeout_ms().
*/
void sched_request_timeout(struct timeval *to, struct sched *s)
{
- if (tv_diff(&s->select_timeout, to, NULL) > 0)
- s->select_timeout = *to;
+ long unsigned ms = tv2ms(to);
+ if (s->timeout > ms)
+ s->timeout = ms;
}
/**
- * Force the next select() call to return before the given amount of milliseconds.
+ * Bound the I/O timeout to at most the given amount of milliseconds.
*
* \param ms The maximal allowed timeout in milliseconds.
* \param s Pointer to the scheduler struct.
*
- * Like sched_request_timeout() this imposes an upper bound on the timeout
- * value for the next select() call.
+ * Like \ref sched_request_timeout() this imposes an upper bound on the I/O
+ * timeout.
*/
void sched_request_timeout_ms(long unsigned ms, struct sched *s)
{
}
/**
- * Force the next select() call to return before the given future time.
+ * Bound the I/O timeout by an absolute time in the future.
*
- * \param barrier Absolute time before select() should return.
+ * \param barrier Defines the upper bound for the timeout.
* \param s Pointer to the scheduler struct.
*
- * \return If \a barrier is in the past, this function does nothing and returns
- * zero. Otherwise it returns one.
+ * \return If the barrier is in the past, this function does nothing and
+ * returns zero. Otherwise it returns one.
*
* \sa \ref sched_request_barrier_or_min_delay().
*/
}
/**
- * Force the next select() call to return before the given time.
+ * Bound the I/O timeout or request a minimal delay.
*
- * \param barrier Absolute time before select() should return.
+ * \param barrier Absolute time as in \ref sched_request_barrier().
* \param s Pointer to the scheduler struct.
*
- * \return If \a barrier is in the past, this function requests a minimal
+ * \return If the barrier is in the past, this function requests a minimal
* timeout and returns zero. Otherwise it returns one.
*
* \sa \ref sched_min_delay(), \ref sched_request_barrier().
sched_request_timeout(&diff, s);
return 1;
}
+
+static void add_pollfd(int fd, struct sched *s, short events)
+{
+ assert(fd >= 0);
+#if 0
+ {
+ int flags = fcntl(fd, F_GETFL);
+ if (!(flags & O_NONBLOCK)) {
+ PARA_EMERG_LOG("fd %d is a blocking file descriptor\n", fd);
+ exit(EXIT_FAILURE);
+ }
+ }
+#endif
+ if (s->pidx_array_len > fd) { /* is fd already registered? */
+ if (s->pidx[fd] < s->pfd_array_len) { /* yes, it is */
+ assert(s->pfd[s->pidx[fd]].fd == fd);
+ s->pfd[s->pidx[fd]].events |= events;
+ return;
+ }
+ } else { /* need to extend the index array */
+ unsigned old_len = s->pidx_array_len;
+ while (s->pidx_array_len <= fd)
+ s->pidx_array_len = s->pidx_array_len * 2 + 1;
+ PARA_INFO_LOG("pidx array len: %u\n", s->pidx_array_len);
+ s->pidx = para_realloc(s->pidx,
+ s->pidx_array_len * sizeof(unsigned));
+ memset(s->pidx + old_len, 0xff,
+ (s->pidx_array_len - old_len) * sizeof(unsigned));
+ }
+ /*
+ * The given fd is not part of the pfd array yet. Initialize pidx[fd]
+ * to point at the next unused slot of this array and initialize the
+ * slot.
+ */
+ s->pidx[fd] = s->num_pfds;
+ if (s->pfd_array_len <= s->num_pfds) {
+ unsigned old_len = s->pfd_array_len;
+ s->pfd_array_len = old_len * 2 + 1;
+ PARA_INFO_LOG("pfd array len: %u\n", s->pfd_array_len);
+ s->pfd = para_realloc(s->pfd,
+ s->pfd_array_len * sizeof(struct pollfd));
+ memset(s->pfd + old_len, 0,
+ (s->pfd_array_len - old_len) * sizeof(struct pollfd));
+ }
+ s->pfd[s->num_pfds].fd = fd;
+ s->pfd[s->num_pfds].events = events;
+ s->pfd[s->num_pfds].revents = 0;
+ s->num_pfds++;
+}
+
+/**
+ * Instruct the scheduler to monitor an fd for readiness for reading.
+ *
+ * \param fd The file descriptor.
+ * \param s The scheduler.
+ *
+ * \sa \ref sched_monitor_writefd().
+ */
+void sched_monitor_readfd(int fd, struct sched *s)
+{
+ add_pollfd(fd, s, POLLIN);
+}
+
+/**
+ * Instruct the scheduler to monitor an fd for readiness for writing.
+ *
+ * \param fd The file descriptor.
+ * \param s The scheduler.
+ *
+ * \sa \ref sched_monitor_readfd().
+ */
+void sched_monitor_writefd(int fd, struct sched *s)
+{
+ add_pollfd(fd, s, POLLOUT);
+}
+
+static int get_revents(int fd, const struct sched *s)
+{
+ if (fd < 0)
+ return 0;
+ if (fd >= s->pidx_array_len)
+ return 0;
+ if (s->pidx[fd] >= s->num_pfds)
+ return 0;
+ if (s->pfd[s->pidx[fd]].fd != fd)
+ return 0;
+ assert((s->pfd[s->pidx[fd]].revents & POLLNVAL) == 0);
+ return s->pfd[s->pidx[fd]].revents;
+}
+
+/**
+ * Check whether there is data to read on the given fd.
+ *
+ * To be called from the ->post_monitor() method of a task.
+ *
+ * \param fd Should have been monitored with \ref sched_monitor_readfd().
+ * \param s The scheduler instance.
+ *
+ * \return True if the file descriptor is ready for reading, false otherwise.
+ * If fd is negative, or has not been monitored in the current iteration of the
+ * scheduler's main loop, the function also returns false.
+ *
+ * \sa \ref sched_write_ok().
+ */
+bool sched_read_ok(int fd, const struct sched *s)
+{
+ return get_revents(fd, s) & (POLLIN | POLLERR | POLLHUP);
+}
+
+/**
+ * Check whether writing is possible (i.e., does not block).
+ *
+ * \param fd Should have been monitored with \ref sched_monitor_writefd().
+ * \param s The scheduler instance.
+ *
+ * \return True if the file descriptor is ready for writing, false otherwise.
+ * The comment in \ref sched_read_ok() about invalid file descriptors applies
+ * to this function as well.
+ */
+bool sched_write_ok(int fd, const struct sched *s)
+{
+ return get_revents(fd, s) & (POLLOUT | POLLERR | POLLHUP);
+}