]> git.tuebingen.mpg.de Git - paraslash.git/blob - sched.c
paraslash 0.7.3
[paraslash.git] / sched.c
1 /* Copyright (C) 2006 Andre Noll <maan@tuebingen.mpg.de>, see file COPYING. */
2
3 /** \file sched.c Paraslash's scheduling functions. */
4
5 #include <regex.h>
6
7 #include "para.h"
8 #include "ipc.h"
9 #include "fd.h"
10 #include "list.h"
11 #include "sched.h"
12 #include "string.h"
13 #include "time.h"
14 #include "error.h"
15
16 /**
17  * The possible states of a task.
18  *
19  * In addition to the states listed here, a task may also enter zombie state.
20  * This happens when its ->post_monitor function returns negative, the ->status
21  * field is then set to this return value. Such tasks are not scheduled any
22  * more (i.e. ->pre_monitor() and ->post_monitor() are no longer called), but
23  * they stay on the scheduler task list until \ref task_reap() or
24  * \ref sched_shutdown() is called.
25  */
26 enum task_status {
27         /** Task has been reaped and may be removed from the task list. */
28         TS_DEAD,
29         /** Task is active. */
30         TS_RUNNING,
31 };
32
33 struct task {
34         /** A copy of the task name supplied when the task was registered. */
35         char *name;
36         /** Copied during task_register(). */
37         struct task_info info;
38         /* TS_RUNNING, TS_DEAD, or zombie (negative value). */
39         int status;
40         /** Position of the task in the task list of the scheduler. */
41         struct list_head node;
42         /** If less than zero, the task was notified by another task. */
43         int notification;
44 };
45
46 static struct timeval now_struct;
47 const struct timeval *now = &now_struct;
48
49 static void sched_pre_monitor(struct sched *s)
50 {
51         struct task *t, *tmp;
52
53         list_for_each_entry_safe(t, tmp, &s->task_list, node) {
54                 if (t->status < 0)
55                         continue;
56                 if (t->notification != 0)
57                         sched_min_delay(s);
58                 if (t->info.pre_monitor)
59                         t->info.pre_monitor(s, t->info.context);
60         }
61 }
62
63 static void unlink_and_free_task(struct task *t)
64 {
65         list_del(&t->node);
66         free(t->name);
67         free(t);
68 }
69
70 //#define SCHED_DEBUG 1
71 static inline void call_post_monitor(struct sched *s, struct task *t)
72 {
73         int ret;
74
75 #ifndef SCHED_DEBUG
76         ret = t->info.post_monitor(s, t->info.context);
77 #else
78         struct timeval t1, t2, diff;
79         unsigned long pst;
80
81         clock_get_realtime(&t1);
82         ret = t->info.post_monitor(s, t->info.context);
83         clock_get_realtime(&t2);
84         tv_diff(&t1, &t2, &diff);
85         pst = tv2ms(&diff);
86         if (pst > 50)
87                 PARA_WARNING_LOG("%s: post_monitor time: %lums\n",
88                         t->name, pst);
89 #endif
90         t->status = ret < 0? ret : TS_RUNNING;
91 }
92
93 static unsigned sched_post_monitor(struct sched *s)
94 {
95         struct task *t, *tmp;
96         unsigned num_running_tasks = 0;
97
98         list_for_each_entry_safe(t, tmp, &s->task_list, node) {
99                 if (t->status == TS_DEAD) /* task has been reaped */
100                         unlink_and_free_task(t);
101                 else if (t->status == TS_RUNNING) {
102                         call_post_monitor(s, t); /* sets t->status */
103                         t->notification = 0;
104                         if (t->status == TS_RUNNING)
105                                 num_running_tasks++;
106                 }
107         }
108         return num_running_tasks;
109 }
110
111 /**
112  * The core function of all paraslash programs.
113  *
114  * \param s Pointer to the scheduler struct.
115  *
116  * This function updates the global now pointer, calls all registered
117  * pre_monitor hooks which may set the timeout and add any file descriptors to
118  * the pollfd array. Next, it calls the poll function and makes the result
119  * available to the registered tasks by calling their post_monitor hook.
120  *
121  * \return Zero if no more tasks are left in the task list, negative if the
122  * poll function returned an error.
123  *
124  * \sa \ref now.
125  */
126 int schedule(struct sched *s)
127 {
128         int ret;
129         unsigned num_running_tasks;
130
131         if (!s->poll_function)
132                 s->poll_function = xpoll;
133 again:
134         s->num_pfds = 0;
135         if (s->pidx)
136                 memset(s->pidx, 0xff, s->pidx_array_len * sizeof(unsigned));
137         s->timeout = s->default_timeout;
138         clock_get_realtime(&now_struct);
139         sched_pre_monitor(s);
140         ret = s->poll_function(s->pfd, s->num_pfds, s->timeout);
141         if (ret < 0)
142                 return ret;
143         clock_get_realtime(&now_struct);
144         num_running_tasks = sched_post_monitor(s);
145         if (num_running_tasks == 0)
146                 return 0;
147         goto again;
148 }
149
150 /**
151  * Obtain the error status of a task and deallocate its resources.
152  *
153  * \param tptr Identifies the task to reap.
154  *
155  * This function is similar to wait(2) in that it returns information about a
156  * terminated task which allows releasing the resources associated with the
157  * task. Until this function is called, the terminated task remains in a zombie
158  * state.
159  *
160  * \return If \a tptr is \p NULL, or \a *tptr is \p NULL, the function does
161  * nothing and returns zero. Otherwise, it is checked whether the task
162  * identified by \a tptr is still running. If it is, the function returns zero
163  * and again, no action is taken. Otherwise the (negative) error code of the
164  * terminated task is returned and \a *tptr is set to \p NULL. The task will
165  * then be removed removed from the scheduler task list.
166  *
167  * \sa \ref sched_shutdown(), wait(2).
168  */
169 int task_reap(struct task **tptr)
170 {
171         struct task *t;
172         int ret;
173
174         if (!tptr)
175                 return 0;
176         t = *tptr;
177         if (!t)
178                 return 0;
179         if (t->status >= 0)
180                 return 0;
181         ret = t->status;
182         PARA_INFO_LOG("reaping %s: %s\n", t->name, para_strerror(-ret));
183         /*
184          * With list_for_each_entry_safe() it is only safe to remove the
185          * _current_ list item. Since we are being called from the loop in
186          * schedule() via some task's ->post_monitor() function, freeing the
187          * given task here would result in use-after-free bugs in schedule().
188          * So we only set the task status to TS_DEAD which tells schedule() to
189          * free the task in the next iteration of its loop.
190          */
191         t->status = TS_DEAD;
192
193         *tptr = NULL;
194         return ret;
195 }
196
197 /**
198  * Deallocate all resources of all tasks of a scheduler instance.
199  *
200  * \param s The scheduler instance.
201  *
202  * This should only be called after \ref schedule() has returned.
203  */
204 void sched_shutdown(struct sched *s)
205 {
206         struct task *t, *tmp;
207
208         list_for_each_entry_safe(t, tmp, &s->task_list, node) {
209                 if (t->status == TS_RUNNING)
210                         /* The task list should contain only terminated tasks. */
211                         PARA_WARNING_LOG("shutting down running task %s\n",
212                                 t->name);
213                 unlink_and_free_task(t);
214         }
215         free(s->pfd);
216         free(s->pidx);
217 }
218
219 /**
220  * Add a task to the scheduler task list.
221  *
222  * \param info Task information supplied by the caller.
223  * \param s The scheduler instance.
224  *
225  * \return A pointer to a newly allocated task structure. It will be
226  * freed by sched_shutdown().
227  */
228 struct task *task_register(struct task_info *info, struct sched *s)
229 {
230         struct task *t = alloc(sizeof(*t));
231
232         assert(info->post_monitor);
233
234         if (!s->task_list.next)
235                 init_list_head(&s->task_list);
236
237         t->info = *info;
238         t->name = para_strdup(info->name);
239         t->notification = 0;
240         t->status = TS_RUNNING;
241         list_add_tail(&t->node, &s->task_list);
242         return t;
243 }
244
245 /**
246  * Get the list of all registered tasks.
247  *
248  * \param s The scheduler instance to get the task list from.
249  *
250  * \return The task list.
251  *
252  * Each entry of the list contains an identifier which is simply a hex number.
253  * The result is dynamically allocated and must be freed by the caller.
254  */
255 char *get_task_list(struct sched *s)
256 {
257         struct task *t, *tmp;
258         char *msg = NULL;
259
260         list_for_each_entry_safe(t, tmp, &s->task_list, node) {
261                 char *tmp_msg;
262                 tmp_msg = make_message("%s%p\t%s\t%s\n", msg? msg : "", t,
263                         t->status == TS_DEAD? "dead" :
264                                 (t->status == TS_RUNNING? "running" : "zombie"),
265                         t->name);
266                 free(msg);
267                 msg = tmp_msg;
268         }
269         return msg;
270 }
271
272 /**
273  * Set the notification value of a task.
274  *
275  * \param t The task to notify.
276  * \param err A positive error code.
277  *
278  * Tasks which honor notifications are supposed to call \ref
279  * task_get_notification() in their post_monitor function and act on the
280  * returned notification value.
281  *
282  * If the scheduler detects during its pre_monitor loop that at least one task
283  * has been notified, the loop terminates, and the post_monitor methods of all
284  * taks are immediately called again.
285  *
286  * The notification for a task is reset after the call to its post_monitor
287  * method.
288  *
289  * \sa \ref task_get_notification().
290  */
291 void task_notify(struct task *t, int err)
292 {
293         assert(err > 0);
294         if (t->notification == -err) /* ignore subsequent notifications */
295                 return;
296         PARA_INFO_LOG("notifying task %s: %s\n", t->name, para_strerror(err));
297         t->notification = -err;
298 }
299
300 /**
301  * Return the notification value of a task.
302  *
303  * \param t The task to get the notification value from.
304  *
305  * \return The notification value. If this is negative, the task has been
306  * notified by another task. Tasks are supposed to check for notifications by
307  * calling this function from their post_monitor method.
308  *
309  * \sa \ref task_notify().
310  */
311 int task_get_notification(const struct task *t)
312 {
313         return t->notification;
314 }
315
316 /**
317  * Return the status value of a task.
318  *
319  * \param t The task to get the status value from.
320  *
321  * \return Zero if task does not exist, one if task is running, negative error
322  * code if task has terminated.
323  */
324 int task_status(const struct task *t)
325 {
326         if (!t)
327                 return 0;
328         if (t->status == TS_DEAD) /* pretend dead tasks don't exist */
329                 return 0;
330         if (t->status == TS_RUNNING)
331                 return 1;
332         return t->status;
333 }
334
335 /**
336  * Set the notification value of all tasks of a scheduler instance.
337  *
338  * \param s The scheduler instance whose tasks should be notified.
339  * \param err A positive error code.
340  *
341  * This simply iterates over all existing tasks of \a s and sets each
342  * task's notification value to \p -err.
343  */
344 void task_notify_all(struct sched *s, int err)
345 {
346         struct task *t;
347
348         list_for_each_entry(t, &s->task_list, node)
349                 task_notify(t, err);
350 }
351
352 /**
353  * Set the I/O timeout to the minimal possible value.
354  *
355  * \param s Pointer to the scheduler struct.
356  *
357  * This causes the next poll() call to return immediately.
358  */
359 void sched_min_delay(struct sched *s)
360 {
361         s->timeout = 0;
362 }
363
364 /**
365  * Impose an upper bound for the I/O timeout.
366  *
367  * \param to Maximal allowed timeout.
368  * \param s Pointer to the scheduler struct.
369  *
370  * If the current I/O timeout is already smaller than to, this function does
371  * nothing. Otherwise the timeout is set to the given value.
372  *
373  * \sa \ref sched_request_timeout_ms().
374  */
375 void sched_request_timeout(struct timeval *to, struct sched *s)
376 {
377         long unsigned ms = tv2ms(to);
378         if (s->timeout > ms)
379                 s->timeout = ms;
380 }
381
382 /**
383  * Bound the I/O timeout to at most the given amount of milliseconds.
384  *
385  * \param ms The maximal allowed timeout in milliseconds.
386  * \param s Pointer to the scheduler struct.
387  *
388  * Like \ref sched_request_timeout() this imposes an upper bound on the I/O
389  * timeout.
390  */
391 void sched_request_timeout_ms(long unsigned ms, struct sched *s)
392 {
393         struct timeval tv;
394         ms2tv(ms, &tv);
395         sched_request_timeout(&tv, s);
396 }
397
398 /**
399  * Bound the I/O timeout by an absolute time in the future.
400  *
401  * \param barrier Defines the upper bound for the timeout.
402  * \param s Pointer to the scheduler struct.
403  *
404  * \return If the barrier is in the past, this function does nothing and
405  * returns zero. Otherwise it returns one.
406  *
407  * \sa \ref sched_request_barrier_or_min_delay().
408  */
409 int sched_request_barrier(struct timeval *barrier, struct sched *s)
410 {
411         struct timeval diff;
412
413         if (tv_diff(now, barrier, &diff) > 0)
414                 return 0;
415         sched_request_timeout(&diff, s);
416         return 1;
417 }
418
419 /**
420  * Bound the I/O timeout or request a minimal delay.
421  *
422  * \param barrier Absolute time as in \ref sched_request_barrier().
423  * \param s Pointer to the scheduler struct.
424  *
425  * \return If the barrier is in the past, this function requests a minimal
426  * timeout and returns zero. Otherwise it returns one.
427  *
428  * \sa \ref sched_min_delay(), \ref sched_request_barrier().
429  */
430 int sched_request_barrier_or_min_delay(struct timeval *barrier, struct sched *s)
431 {
432         struct timeval diff;
433
434         if (tv_diff(now, barrier, &diff) > 0) {
435                 sched_min_delay(s);
436                 return 0;
437         }
438         sched_request_timeout(&diff, s);
439         return 1;
440 }
441
442 static void add_pollfd(int fd, struct sched *s, short events)
443 {
444         assert(fd >= 0);
445 #if 0
446         {
447                 int flags = fcntl(fd, F_GETFL);
448                 if (!(flags & O_NONBLOCK)) {
449                         PARA_EMERG_LOG("fd %d is a blocking file descriptor\n", fd);
450                         exit(EXIT_FAILURE);
451                 }
452         }
453 #endif
454         if (s->pidx_array_len > fd) { /* is fd already registered? */
455                 if (s->pidx[fd] < s->pfd_array_len) { /* yes, it is */
456                         assert(s->pfd[s->pidx[fd]].fd == fd);
457                         s->pfd[s->pidx[fd]].events |= events;
458                         return;
459                 }
460         } else { /* need to extend the index array */
461                 unsigned old_len = s->pidx_array_len;
462                 while (s->pidx_array_len <= fd)
463                         s->pidx_array_len = s->pidx_array_len * 2 + 1;
464                 PARA_INFO_LOG("pidx array len: %u\n", s->pidx_array_len);
465                 s->pidx = para_realloc(s->pidx,
466                         s->pidx_array_len * sizeof(unsigned));
467                 memset(s->pidx + old_len, 0xff,
468                         (s->pidx_array_len - old_len) * sizeof(unsigned));
469         }
470         /*
471          * The given fd is not part of the pfd array yet. Initialize pidx[fd]
472          * to point at the next unused slot of this array and initialize the
473          * slot.
474          */
475         s->pidx[fd] = s->num_pfds;
476         if (s->pfd_array_len <= s->num_pfds) {
477                 unsigned old_len = s->pfd_array_len;
478                 s->pfd_array_len = old_len * 2 + 1;
479                 PARA_INFO_LOG("pfd array len: %u\n", s->pfd_array_len);
480                 s->pfd = para_realloc(s->pfd,
481                         s->pfd_array_len * sizeof(struct pollfd));
482                 memset(s->pfd + old_len, 0,
483                         (s->pfd_array_len - old_len) * sizeof(struct pollfd));
484         }
485         s->pfd[s->num_pfds].fd = fd;
486         s->pfd[s->num_pfds].events = events;
487         s->pfd[s->num_pfds].revents = 0;
488         s->num_pfds++;
489 }
490
491 /**
492  * Instruct the scheduler to monitor an fd for readiness for reading.
493  *
494  * \param fd The file descriptor.
495  * \param s The scheduler.
496  *
497  * \sa \ref sched_monitor_writefd().
498  */
499 void sched_monitor_readfd(int fd, struct sched *s)
500 {
501         add_pollfd(fd, s, POLLIN);
502 }
503
504 /**
505  * Instruct the scheduler to monitor an fd for readiness for writing.
506  *
507  * \param fd The file descriptor.
508  * \param s The scheduler.
509  *
510  * \sa \ref sched_monitor_readfd().
511  */
512 void sched_monitor_writefd(int fd, struct sched *s)
513 {
514         add_pollfd(fd, s, POLLOUT);
515 }
516
517 static int get_revents(int fd, const struct sched *s)
518 {
519         if (fd < 0)
520                 return 0;
521         if (fd >= s->pidx_array_len)
522                 return 0;
523         if (s->pidx[fd] >= s->num_pfds)
524                 return 0;
525         if (s->pfd[s->pidx[fd]].fd != fd)
526                 return 0;
527         assert((s->pfd[s->pidx[fd]].revents & POLLNVAL) == 0);
528         return s->pfd[s->pidx[fd]].revents;
529 }
530
531 /**
532  * Check whether there is data to read on the given fd.
533  *
534  * To be called from the ->post_monitor() method of a task.
535  *
536  * \param fd Should have been monitored with \ref sched_monitor_readfd().
537  * \param s The scheduler instance.
538  *
539  * \return True if the file descriptor is ready for reading, false otherwise.
540  * If fd is negative, or has not been monitored in the current iteration of the
541  * scheduler's main loop, the function also returns false.
542  *
543  * \sa \ref sched_write_ok().
544  */
545 bool sched_read_ok(int fd, const struct sched *s)
546 {
547         return get_revents(fd, s) & (POLLIN | POLLERR | POLLHUP);
548 }
549
550 /**
551  * Check whether writing is possible (i.e., does not block).
552  *
553  * \param fd Should have been monitored with \ref sched_monitor_writefd().
554  * \param s The scheduler instance.
555  *
556  * \return True if the file descriptor is ready for writing, false otherwise.
557  * The comment in \ref sched_read_ok() about invalid file descriptors applies
558  * to this function as well.
559  */
560 bool sched_write_ok(int fd, const struct sched *s)
561 {
562         return get_revents(fd, s) & (POLLOUT | POLLERR | POLLHUP);
563 }