+
+static void add_pollfd(int fd, struct sched *s, short events)
+{
+ assert(fd >= 0);
+#if 0
+ {
+ int flags = fcntl(fd, F_GETFL);
+ if (!(flags & O_NONBLOCK)) {
+ PARA_EMERG_LOG("fd %d is a blocking file descriptor\n", fd);
+ exit(EXIT_FAILURE);
+ }
+ }
+#endif
+ if (s->pidx_array_len > fd) { /* is fd already registered? */
+ if (s->pidx[fd] < s->pfd_array_len) { /* yes, it is */
+ assert(s->pfd[s->pidx[fd]].fd == fd);
+ s->pfd[s->pidx[fd]].events |= events;
+ return;
+ }
+ } else { /* need to extend the index array */
+ unsigned old_len = s->pidx_array_len;
+ while (s->pidx_array_len <= fd)
+ s->pidx_array_len = s->pidx_array_len * 2 + 1;
+ PARA_INFO_LOG("pidx array len: %u\n", s->pidx_array_len);
+ s->pidx = para_realloc(s->pidx,
+ s->pidx_array_len * sizeof(unsigned));
+ memset(s->pidx + old_len, 0xff,
+ (s->pidx_array_len - old_len) * sizeof(unsigned));
+ }
+ /*
+ * The given fd is not part of the pfd array yet. Initialize pidx[fd]
+ * to point at the next unused slot of this array and initialize the
+ * slot.
+ */
+ s->pidx[fd] = s->num_pfds;
+ if (s->pfd_array_len <= s->num_pfds) {
+ unsigned old_len = s->pfd_array_len;
+ s->pfd_array_len = old_len * 2 + 1;
+ PARA_INFO_LOG("pfd array len: %u\n", s->pfd_array_len);
+ s->pfd = para_realloc(s->pfd,
+ s->pfd_array_len * sizeof(struct pollfd));
+ memset(s->pfd + old_len, 0,
+ (s->pfd_array_len - old_len) * sizeof(struct pollfd));
+ }
+ s->pfd[s->num_pfds].fd = fd;
+ s->pfd[s->num_pfds].events = events;
+ s->pfd[s->num_pfds].revents = 0;
+ s->num_pfds++;
+}
+
+/**
+ * Instruct the scheduler to monitor an fd for readiness for reading.
+ *
+ * \param fd The file descriptor.
+ * \param s The scheduler.
+ *
+ * \sa \ref sched_monitor_writefd().
+ */
+void sched_monitor_readfd(int fd, struct sched *s)
+{
+ add_pollfd(fd, s, POLLIN);
+}
+
+/**
+ * Instruct the scheduler to monitor an fd for readiness for writing.
+ *
+ * \param fd The file descriptor.
+ * \param s The scheduler.
+ *
+ * \sa \ref sched_monitor_readfd().
+ */
+void sched_monitor_writefd(int fd, struct sched *s)
+{
+ add_pollfd(fd, s, POLLOUT);
+}
+
+static int get_revents(int fd, const struct sched *s)
+{
+ if (fd < 0)
+ return 0;
+ if (fd >= s->pidx_array_len)
+ return 0;
+ if (s->pidx[fd] >= s->num_pfds)
+ return 0;
+ if (s->pfd[s->pidx[fd]].fd != fd)
+ return 0;
+ assert((s->pfd[s->pidx[fd]].revents & POLLNVAL) == 0);
+ return s->pfd[s->pidx[fd]].revents;
+}
+
+/**
+ * Check whether there is data to read on the given fd.
+ *
+ * To be called from the ->post_monitor() method of a task.
+ *
+ * \param fd Should have been monitored with \ref sched_monitor_readfd().
+ * \param s The scheduler instance.
+ *
+ * \return True if the file descriptor is ready for reading, false otherwise.
+ * If fd is negative, or has not been monitored in the current iteration of the
+ * scheduler's main loop, the function also returns false.
+ *
+ * \sa \ref sched_write_ok().
+ */
+bool sched_read_ok(int fd, const struct sched *s)
+{
+ return get_revents(fd, s) & (POLLIN | POLLERR | POLLHUP);
+}
+
+/**
+ * Check whether writing is possible (i.e., does not block).
+ *
+ * \param fd Should have been monitored with \ref sched_monitor_writefd().
+ * \param s The scheduler instance.
+ *
+ * \return True if the file descriptor is ready for writing, false otherwise.
+ * The comment in \ref sched_read_ok() about invalid file descriptors applies
+ * to this function as well.
+ */
+bool sched_write_ok(int fd, const struct sched *s)
+{
+ return get_revents(fd, s) & (POLLOUT | POLLERR | POLLHUP);
+}