]> git.tuebingen.mpg.de Git - paraslash.git/blob - sched.c
Merge branch 'maint'
[paraslash.git] / sched.c
1 /* Copyright (C) 2006 Andre Noll <maan@tuebingen.mpg.de>, see file COPYING. */
2
3 /** \file sched.c Paraslash's scheduling functions. */
4
5 #include <regex.h>
6
7 #include "para.h"
8 #include "ipc.h"
9 #include "fd.h"
10 #include "list.h"
11 #include "sched.h"
12 #include "string.h"
13 #include "time.h"
14 #include "error.h"
15
16 /**
17  * The possible states of a task.
18  *
19  * In addition to the states listed here, a task may also enter zombie state.
20  * This happens when its ->post_monitor function returns negative, the ->status
21  * field is then set to this return value. Such tasks are not scheduled any
22  * more (i.e. ->pre_monitor() and ->post_monitor() are no longer called), but
23  * they stay on the scheduler task list until \ref task_reap() or
24  * \ref sched_shutdown() is called.
25  */
26 enum task_status {
27         /** Task has been reaped and may be removed from the task list. */
28         TS_DEAD,
29         /** Task is active. */
30         TS_RUNNING,
31 };
32
33 struct task {
34         /** A copy of the task name supplied when the task was registered. */
35         char *name;
36         /** Copied during task_register(). */
37         struct task_info info;
38         /* TS_RUNNING, TS_DEAD, or zombie (negative value). */
39         int status;
40         /** Position of the task in the task list of the scheduler. */
41         struct list_head node;
42         /** If less than zero, the task was notified by another task. */
43         int notification;
44 };
45
46 static struct timeval now_struct;
47 const struct timeval *now = &now_struct;
48
49 static void sched_pre_monitor(struct sched *s)
50 {
51         struct task *t, *tmp;
52
53         list_for_each_entry_safe(t, tmp, &s->task_list, node) {
54                 if (t->status < 0)
55                         continue;
56                 if (t->notification != 0)
57                         sched_min_delay(s);
58                 if (t->info.pre_monitor)
59                         t->info.pre_monitor(s, t->info.context);
60         }
61 }
62
63 static void unlink_and_free_task(struct task *t)
64 {
65         PARA_INFO_LOG("freeing task %s (%s)\n", t->name, t->status < 0?
66                 para_strerror(-t->status) :
67                 (t->status == TS_DEAD? "[dead]" : "[running]"));
68
69         list_del(&t->node);
70         free(t->name);
71         free(t);
72 }
73
74 //#define SCHED_DEBUG 1
75 static inline void call_post_monitor(struct sched *s, struct task *t)
76 {
77         int ret;
78
79 #ifndef SCHED_DEBUG
80         ret = t->info.post_monitor(s, t->info.context);
81 #else
82         struct timeval t1, t2, diff;
83         unsigned long pst;
84
85         clock_get_realtime(&t1);
86         ret = t->info.post_monitor(s, t->info.context);
87         clock_get_realtime(&t2);
88         tv_diff(&t1, &t2, &diff);
89         pst = tv2ms(&diff);
90         if (pst > 50)
91                 PARA_WARNING_LOG("%s: post_monitor time: %lums\n",
92                         t->name, pst);
93 #endif
94         t->status = ret < 0? ret : TS_RUNNING;
95 }
96
97 static unsigned sched_post_monitor(struct sched *s)
98 {
99         struct task *t, *tmp;
100         unsigned num_running_tasks = 0;
101
102         list_for_each_entry_safe(t, tmp, &s->task_list, node) {
103                 if (t->status == TS_DEAD) /* task has been reaped */
104                         unlink_and_free_task(t);
105                 else if (t->status == TS_RUNNING) {
106                         call_post_monitor(s, t); /* sets t->status */
107                         t->notification = 0;
108                         if (t->status == TS_RUNNING)
109                                 num_running_tasks++;
110                 }
111         }
112         return num_running_tasks;
113 }
114
115 /**
116  * The core function of all paraslash programs.
117  *
118  * \param s Pointer to the scheduler struct.
119  *
120  * This function updates the global now pointer, calls all registered
121  * pre_monitor hooks which may set the timeout and add any file descriptors to
122  * the pollfd array. Next, it calls the poll function and makes the result
123  * available to the registered tasks by calling their post_monitor hook.
124  *
125  * \return Zero if no more tasks are left in the task list, negative if the
126  * poll function returned an error.
127  *
128  * \sa \ref now.
129  */
130 int schedule(struct sched *s)
131 {
132         int ret;
133         unsigned num_running_tasks;
134
135         if (!s->poll_function)
136                 s->poll_function = xpoll;
137 again:
138         s->num_pfds = 0;
139         if (s->pidx)
140                 memset(s->pidx, 0xff, s->pidx_array_len * sizeof(unsigned));
141         s->timeout = s->default_timeout;
142         clock_get_realtime(&now_struct);
143         sched_pre_monitor(s);
144         ret = s->poll_function(s->pfd, s->num_pfds, s->timeout);
145         if (ret < 0)
146                 return ret;
147         clock_get_realtime(&now_struct);
148         num_running_tasks = sched_post_monitor(s);
149         if (num_running_tasks == 0)
150                 return 0;
151         goto again;
152 }
153
154 /**
155  * Obtain the error status of a task and deallocate its resources.
156  *
157  * \param tptr Identifies the task to reap.
158  *
159  * This function is similar to wait(2) in that it returns information about a
160  * terminated task which allows releasing the resources associated with the
161  * task. Until this function is called, the terminated task remains in a zombie
162  * state.
163  *
164  * \return If \a tptr is \p NULL, or \a *tptr is \p NULL, the function does
165  * nothing and returns zero. Otherwise, it is checked whether the task
166  * identified by \a tptr is still running. If it is, the function returns zero
167  * and again, no action is taken. Otherwise the (negative) error code of the
168  * terminated task is returned and \a *tptr is set to \p NULL. The task will
169  * then be removed removed from the scheduler task list.
170  *
171  * \sa \ref sched_shutdown(), wait(2).
172  */
173 int task_reap(struct task **tptr)
174 {
175         struct task *t;
176         int ret;
177
178         if (!tptr)
179                 return 0;
180         t = *tptr;
181         if (!t)
182                 return 0;
183         if (t->status >= 0)
184                 return 0;
185         ret = t->status;
186         /*
187          * With list_for_each_entry_safe() it is only safe to remove the
188          * _current_ list item. Since we are being called from the loop in
189          * schedule() via some task's ->post_monitor() function, freeing the
190          * given task here would result in use-after-free bugs in schedule().
191          * So we only set the task status to TS_DEAD which tells schedule() to
192          * free the task in the next iteration of its loop.
193          */
194         t->status = TS_DEAD;
195
196         *tptr = NULL;
197         return ret;
198 }
199
200 /**
201  * Deallocate all resources of all tasks of a scheduler instance.
202  *
203  * \param s The scheduler instance.
204  *
205  * This should only be called after \ref schedule() has returned.
206  */
207 void sched_shutdown(struct sched *s)
208 {
209         struct task *t, *tmp;
210
211         list_for_each_entry_safe(t, tmp, &s->task_list, node) {
212                 if (t->status == TS_RUNNING)
213                         /* The task list should contain only terminated tasks. */
214                         PARA_WARNING_LOG("shutting down running task %s\n",
215                                 t->name);
216                 unlink_and_free_task(t);
217         }
218         free(s->pfd);
219         free(s->pidx);
220 }
221
222 /**
223  * Add a task to the scheduler task list.
224  *
225  * \param info Task information supplied by the caller.
226  * \param s The scheduler instance.
227  *
228  * \return A pointer to a newly allocated task structure. It will be
229  * freed by sched_shutdown().
230  */
231 struct task *task_register(struct task_info *info, struct sched *s)
232 {
233         struct task *t = alloc(sizeof(*t));
234
235         assert(info->post_monitor);
236
237         if (!s->task_list.next)
238                 init_list_head(&s->task_list);
239
240         t->info = *info;
241         t->name = para_strdup(info->name);
242         t->notification = 0;
243         t->status = TS_RUNNING;
244         list_add_tail(&t->node, &s->task_list);
245         return t;
246 }
247
248 /**
249  * Get the list of all registered tasks.
250  *
251  * \param s The scheduler instance to get the task list from.
252  *
253  * \return The task list.
254  *
255  * Each entry of the list contains an identifier which is simply a hex number.
256  * The result is dynamically allocated and must be freed by the caller.
257  */
258 char *get_task_list(struct sched *s)
259 {
260         struct task *t, *tmp;
261         char *msg = NULL;
262
263         list_for_each_entry_safe(t, tmp, &s->task_list, node) {
264                 char *tmp_msg;
265                 tmp_msg = make_message("%s%p\t%s\t%s\n", msg? msg : "", t,
266                         t->status == TS_DEAD? "dead" :
267                                 (t->status == TS_RUNNING? "running" : "zombie"),
268                         t->name);
269                 free(msg);
270                 msg = tmp_msg;
271         }
272         return msg;
273 }
274
275 /**
276  * Set the notification value of a task.
277  *
278  * \param t The task to notify.
279  * \param err A positive error code.
280  *
281  * Tasks which honor notifications are supposed to call \ref
282  * task_get_notification() in their post_monitor function and act on the
283  * returned notification value.
284  *
285  * If the scheduler detects during its pre_monitor loop that at least one task
286  * has been notified, the loop terminates, and the post_monitor methods of all
287  * taks are immediately called again.
288  *
289  * The notification for a task is reset after the call to its post_monitor
290  * method.
291  *
292  * \sa \ref task_get_notification().
293  */
294 void task_notify(struct task *t, int err)
295 {
296         assert(err > 0);
297         if (t->notification == -err) /* ignore subsequent notifications */
298                 return;
299         PARA_INFO_LOG("notifying task %s: %s\n", t->name, para_strerror(err));
300         t->notification = -err;
301 }
302
303 /**
304  * Return the notification value of a task.
305  *
306  * \param t The task to get the notification value from.
307  *
308  * \return The notification value. If this is negative, the task has been
309  * notified by another task. Tasks are supposed to check for notifications by
310  * calling this function from their post_monitor method.
311  *
312  * \sa \ref task_notify().
313  */
314 int task_get_notification(const struct task *t)
315 {
316         return t->notification;
317 }
318
319 /**
320  * Return the status value of a task.
321  *
322  * \param t The task to get the status value from.
323  *
324  * \return Zero if task does not exist, one if task is running, negative error
325  * code if task has terminated.
326  */
327 int task_status(const struct task *t)
328 {
329         if (!t)
330                 return 0;
331         if (t->status == TS_DEAD) /* pretend dead tasks don't exist */
332                 return 0;
333         if (t->status == TS_RUNNING)
334                 return 1;
335         return t->status;
336 }
337
338 /**
339  * Set the notification value of all tasks of a scheduler instance.
340  *
341  * \param s The scheduler instance whose tasks should be notified.
342  * \param err A positive error code.
343  *
344  * This simply iterates over all existing tasks of \a s and sets each
345  * task's notification value to \p -err.
346  */
347 void task_notify_all(struct sched *s, int err)
348 {
349         struct task *t;
350
351         list_for_each_entry(t, &s->task_list, node)
352                 task_notify(t, err);
353 }
354
355 /**
356  * Set the I/O timeout to the minimal possible value.
357  *
358  * \param s Pointer to the scheduler struct.
359  *
360  * This causes the next poll() call to return immediately.
361  */
362 void sched_min_delay(struct sched *s)
363 {
364         s->timeout = 0;
365 }
366
367 /**
368  * Impose an upper bound for the I/O timeout.
369  *
370  * \param to Maximal allowed timeout.
371  * \param s Pointer to the scheduler struct.
372  *
373  * If the current I/O timeout is already smaller than to, this function does
374  * nothing. Otherwise the timeout is set to the given value.
375  *
376  * \sa \ref sched_request_timeout_ms().
377  */
378 void sched_request_timeout(struct timeval *to, struct sched *s)
379 {
380         long unsigned ms = tv2ms(to);
381         if (s->timeout > ms)
382                 s->timeout = ms;
383 }
384
385 /**
386  * Bound the I/O timeout to at most the given amount of milliseconds.
387  *
388  * \param ms The maximal allowed timeout in milliseconds.
389  * \param s Pointer to the scheduler struct.
390  *
391  * Like \ref sched_request_timeout() this imposes an upper bound on the I/O
392  * timeout.
393  */
394 void sched_request_timeout_ms(long unsigned ms, struct sched *s)
395 {
396         struct timeval tv;
397         ms2tv(ms, &tv);
398         sched_request_timeout(&tv, s);
399 }
400
401 /**
402  * Bound the I/O timeout by an absolute time in the future.
403  *
404  * \param barrier Defines the upper bound for the timeout.
405  * \param s Pointer to the scheduler struct.
406  *
407  * \return If the barrier is in the past, this function does nothing and
408  * returns zero. Otherwise it returns one.
409  *
410  * \sa \ref sched_request_barrier_or_min_delay().
411  */
412 int sched_request_barrier(struct timeval *barrier, struct sched *s)
413 {
414         struct timeval diff;
415
416         if (tv_diff(now, barrier, &diff) > 0)
417                 return 0;
418         sched_request_timeout(&diff, s);
419         return 1;
420 }
421
422 /**
423  * Bound the I/O timeout or request a minimal delay.
424  *
425  * \param barrier Absolute time as in \ref sched_request_barrier().
426  * \param s Pointer to the scheduler struct.
427  *
428  * \return If the barrier is in the past, this function requests a minimal
429  * timeout and returns zero. Otherwise it returns one.
430  *
431  * \sa \ref sched_min_delay(), \ref sched_request_barrier().
432  */
433 int sched_request_barrier_or_min_delay(struct timeval *barrier, struct sched *s)
434 {
435         struct timeval diff;
436
437         if (tv_diff(now, barrier, &diff) > 0) {
438                 sched_min_delay(s);
439                 return 0;
440         }
441         sched_request_timeout(&diff, s);
442         return 1;
443 }
444
445 static void add_pollfd(int fd, struct sched *s, short events)
446 {
447         assert(fd >= 0);
448 #if 0
449         {
450                 int flags = fcntl(fd, F_GETFL);
451                 if (!(flags & O_NONBLOCK)) {
452                         PARA_EMERG_LOG("fd %d is a blocking file descriptor\n", fd);
453                         exit(EXIT_FAILURE);
454                 }
455         }
456 #endif
457         if (s->pidx_array_len > fd) { /* is fd already registered? */
458                 if (s->pidx[fd] < s->pfd_array_len) { /* yes, it is */
459                         assert(s->pfd[s->pidx[fd]].fd == fd);
460                         s->pfd[s->pidx[fd]].events |= events;
461                         return;
462                 }
463         } else { /* need to extend the index array */
464                 unsigned old_len = s->pidx_array_len;
465                 while (s->pidx_array_len <= fd)
466                         s->pidx_array_len = s->pidx_array_len * 2 + 1;
467                 PARA_INFO_LOG("pidx array len: %u\n", s->pidx_array_len);
468                 s->pidx = para_realloc(s->pidx,
469                         s->pidx_array_len * sizeof(unsigned));
470                 memset(s->pidx + old_len, 0xff,
471                         (s->pidx_array_len - old_len) * sizeof(unsigned));
472         }
473         /*
474          * The given fd is not part of the pfd array yet. Initialize pidx[fd]
475          * to point at the next unused slot of this array and initialize the
476          * slot.
477          */
478         s->pidx[fd] = s->num_pfds;
479         if (s->pfd_array_len <= s->num_pfds) {
480                 unsigned old_len = s->pfd_array_len;
481                 s->pfd_array_len = old_len * 2 + 1;
482                 PARA_INFO_LOG("pfd array len: %u\n", s->pfd_array_len);
483                 s->pfd = para_realloc(s->pfd,
484                         s->pfd_array_len * sizeof(struct pollfd));
485                 memset(s->pfd + old_len, 0,
486                         (s->pfd_array_len - old_len) * sizeof(struct pollfd));
487         }
488         s->pfd[s->num_pfds].fd = fd;
489         s->pfd[s->num_pfds].events = events;
490         s->pfd[s->num_pfds].revents = 0;
491         s->num_pfds++;
492 }
493
494 /**
495  * Instruct the scheduler to monitor an fd for readiness for reading.
496  *
497  * \param fd The file descriptor.
498  * \param s The scheduler.
499  *
500  * \sa \ref sched_monitor_writefd().
501  */
502 void sched_monitor_readfd(int fd, struct sched *s)
503 {
504         add_pollfd(fd, s, POLLIN);
505 }
506
507 /**
508  * Instruct the scheduler to monitor an fd for readiness for writing.
509  *
510  * \param fd The file descriptor.
511  * \param s The scheduler.
512  *
513  * \sa \ref sched_monitor_readfd().
514  */
515 void sched_monitor_writefd(int fd, struct sched *s)
516 {
517         add_pollfd(fd, s, POLLOUT);
518 }
519
520 static int get_revents(int fd, const struct sched *s)
521 {
522         if (fd < 0)
523                 return 0;
524         if (fd >= s->pidx_array_len)
525                 return 0;
526         if (s->pidx[fd] >= s->num_pfds)
527                 return 0;
528         if (s->pfd[s->pidx[fd]].fd != fd)
529                 return 0;
530         assert((s->pfd[s->pidx[fd]].revents & POLLNVAL) == 0);
531         return s->pfd[s->pidx[fd]].revents;
532 }
533
534 /**
535  * Check whether there is data to read on the given fd.
536  *
537  * To be called from the ->post_monitor() method of a task.
538  *
539  * \param fd Should have been monitored with \ref sched_monitor_readfd().
540  * \param s The scheduler instance.
541  *
542  * \return True if the file descriptor is ready for reading, false otherwise.
543  * If fd is negative, or has not been monitored in the current iteration of the
544  * scheduler's main loop, the function also returns false.
545  *
546  * \sa \ref sched_write_ok().
547  */
548 bool sched_read_ok(int fd, const struct sched *s)
549 {
550         return get_revents(fd, s) & (POLLIN | POLLERR | POLLHUP);
551 }
552
553 /**
554  * Check whether writing is possible (i.e., does not block).
555  *
556  * \param fd Should have been monitored with \ref sched_monitor_writefd().
557  * \param s The scheduler instance.
558  *
559  * \return True if the file descriptor is ready for writing, false otherwise.
560  * The comment in \ref sched_read_ok() about invalid file descriptors applies
561  * to this function as well.
562  */
563 bool sched_write_ok(int fd, const struct sched *s)
564 {
565         return get_revents(fd, s) & (POLLOUT | POLLERR | POLLHUP);
566 }