From: Andre Noll Date: Sun, 11 Sep 2022 14:34:12 +0000 (+0200) Subject: Merge branch 'refs/heads/t/poll' X-Git-Tag: v0.7.1~6 X-Git-Url: http://git.tuebingen.mpg.de/?a=commitdiff_plain;h=b142089267ef501e438c3dc77ecf19bead3d4e58;hp=4314f515abc204cc1696427f34aa9822d1a7150b;p=paraslash.git Merge branch 'refs/heads/t/poll' This series modifies all calls to select(2) to use poll(2) instead in order to avoid the known shortcomings of the select API, in particular its limit of at most 1024 file descriptors and the fact that fds above 1023 cannot be monitored with select(2) even if fewer than 1024 fds are open. The first patches of the series prepare this switch, converting the easy cases, hiding select specific data structures such as fd sets, and adjusting function names and documentation. The crucial commit is the last one. See its rather verbose log message for details. * refs/heads/t/poll: Switch from select(2) to poll(2). Rename ->{pre,post}_select methods to ->{pre,post}_monitor. Misc documentation cleanups related to select(). stdin/stdout: Streamline documentation of {pre,post}_select(). Consolidate receiver/filter/writer {pre,post}_select() docs. Hide implementation of para_fd_set(). send: Avoid select-specific arguments in {pre,post}_select(). sched: Introduce sched_{read,write}_ok(). audiod: Rename handle_connect(). net: Drop fd_set parameter from para_accept(). fd: Drop fd_set parameter from read_nonblock() and friends. interactive: Avoid select(2) in input_available(). fd.c: Prefer poll(2) over select(2) for write_ok(). sched: Use integer value for select timeout. --- diff --git a/NEWS.md b/NEWS.md index e56aace3..0f2eec0d 100644 --- a/NEWS.md +++ b/NEWS.md @@ -13,6 +13,9 @@ NEWS to install faad from source to get support for aac/m4a files. The faad decoder package must still be installed. +- All calls to select(2) have been replaced by calls to poll(2) + to avoid known shortcomings of the select API. + [tarball](./releases/paraslash-git.tar.xz) ---------------------------------- diff --git a/aacdec_filter.c b/aacdec_filter.c index a2459d82..36a783c5 100644 --- a/aacdec_filter.c +++ b/aacdec_filter.c @@ -74,7 +74,7 @@ static void aacdec_close(struct filter_node *fn) fn->private_data = NULL; } -static int aacdec_post_select(__a_unused struct sched *s, void *context) +static int aacdec_post_monitor(__a_unused struct sched *s, void *context) { struct filter_node *fn = context; struct btr_node *btrn = fn->btrn; @@ -158,7 +158,7 @@ err: const struct filter lsg_filter_cmd_com_aacdec_user_data = { .open = aacdec_open, .close = aacdec_close, - .pre_select = generic_filter_pre_select, - .post_select = aacdec_post_select, + .pre_monitor = generic_filter_pre_monitor, + .post_monitor = aacdec_post_monitor, .execute = aacdec_execute }; diff --git a/afh_recv.c b/afh_recv.c index 6a0ec239..889fdce8 100644 --- a/afh_recv.c +++ b/afh_recv.c @@ -142,14 +142,14 @@ static void afh_recv_close(struct receiver_node *rn) freep(&rn->private_data); } -static void afh_recv_pre_select(struct sched *s, void *context) +static void afh_recv_pre_monitor(struct sched *s, void *context) { struct receiver_node *rn = context; struct private_afh_recv_data *pard = rn->private_data; struct afh_info *afhi = &pard->afhi; struct lls_parse_result *lpr = rn->lpr; struct timeval chunk_time; - int state = generic_recv_pre_select(s, rn); + int state = generic_recv_pre_monitor(s, rn); unsigned j_given = RECV_CMD_OPT_GIVEN(AFH, JUST_IN_TIME, lpr); if (state <= 0) @@ -163,7 +163,7 @@ static void afh_recv_pre_select(struct sched *s, void *context) sched_request_barrier_or_min_delay(&chunk_time, s); } -static int afh_recv_post_select(__a_unused struct sched *s, void *context) +static int afh_recv_post_monitor(__a_unused struct sched *s, void *context) { struct receiver_node *rn = context; struct lls_parse_result *lpr = rn->lpr; @@ -242,7 +242,7 @@ out: const struct receiver lsg_recv_cmd_com_afh_user_data = { .open = afh_recv_open, .close = afh_recv_close, - .pre_select = afh_recv_pre_select, - .post_select = afh_recv_post_select, + .pre_monitor = afh_recv_pre_monitor, + .post_monitor = afh_recv_post_monitor, .execute = afh_execute, }; diff --git a/afs.c b/afs.c index 71067025..febe13b3 100644 --- a/afs.c +++ b/afs.c @@ -707,7 +707,7 @@ static int open_afs_tables(void) return ret; } -static int afs_signal_post_select(struct sched *s, __a_unused void *context) +static int afs_signal_post_monitor(struct sched *s, __a_unused void *context) { int signum, ret; @@ -715,7 +715,7 @@ static int afs_signal_post_select(struct sched *s, __a_unused void *context) PARA_EMERG_LOG("para_server died\n"); goto shutdown; } - signum = para_next_signal(&s->rfds); + signum = para_next_signal(); if (signum == 0) return 0; if (signum == SIGHUP) { @@ -743,8 +743,8 @@ static void register_signal_task(struct sched *s) signal_task->task = task_register(&(struct task_info) { .name = "signal", - .pre_select = signal_pre_select, - .post_select = afs_signal_post_select, + .pre_monitor = signal_pre_monitor, + .post_monitor = afs_signal_post_monitor, .context = signal_task, }, s); @@ -762,15 +762,15 @@ struct afs_client { struct timeval connect_time; }; -static void command_pre_select(struct sched *s, void *context) +static void command_pre_monitor(struct sched *s, void *context) { struct command_task *ct = context; struct afs_client *client; - para_fd_set(server_socket, &s->rfds, &s->max_fileno); - para_fd_set(ct->fd, &s->rfds, &s->max_fileno); + sched_monitor_readfd(server_socket, s); + sched_monitor_readfd(ct->fd, s); list_for_each_entry(client, &afs_client_list, node) - para_fd_set(client->fd, &s->rfds, &s->max_fileno); + sched_monitor_readfd(client->fd, s); } /** @@ -862,11 +862,11 @@ static int call_callback(int fd, int query_shmid) return ret; } -static int execute_server_command(fd_set *rfds) +static int execute_server_command(void) { char buf[8]; size_t n; - int ret = read_nonblock(server_socket, buf, sizeof(buf) - 1, rfds, &n); + int ret = read_nonblock(server_socket, buf, sizeof(buf) - 1, &n); if (ret < 0 || n == 0) return ret; @@ -877,13 +877,13 @@ static int execute_server_command(fd_set *rfds) } /* returns 0 if no data available, 1 else */ -static int execute_afs_command(int fd, fd_set *rfds) +static int execute_afs_command(int fd) { uint32_t cookie; int query_shmid; char buf[sizeof(cookie) + sizeof(query_shmid)]; size_t n; - int ret = read_nonblock(fd, buf, sizeof(buf), rfds, &n); + int ret = read_nonblock(fd, buf, sizeof(buf), &n); if (ret < 0) goto err; @@ -917,7 +917,7 @@ err: /** Shutdown connection if query has not arrived until this many seconds. */ #define AFS_CLIENT_TIMEOUT 3 -static int command_post_select(struct sched *s, void *context) +static int command_post_monitor(struct sched *s, void *context) { struct command_task *ct = context; struct sockaddr_un unix_addr; @@ -927,7 +927,7 @@ static int command_post_select(struct sched *s, void *context) ret = task_get_notification(ct->task); if (ret < 0) return ret; - ret = execute_server_command(&s->rfds); + ret = execute_server_command(); if (ret < 0) { PARA_EMERG_LOG("%s\n", para_strerror(-ret)); task_notify_all(s, -ret); @@ -935,7 +935,7 @@ static int command_post_select(struct sched *s, void *context) } /* Check the list of connected clients. */ list_for_each_entry_safe(client, tmp, &afs_client_list, node) { - ret = execute_afs_command(client->fd, &s->rfds); + ret = execute_afs_command(client->fd); if (ret == 0) { /* prevent bogus connection flooding */ struct timeval diff; tv_diff(now, &client->connect_time, &diff); @@ -948,7 +948,7 @@ static int command_post_select(struct sched *s, void *context) free(client); } /* Accept connections on the local socket. */ - ret = para_accept(ct->fd, &s->rfds, &unix_addr, sizeof(unix_addr), &fd); + ret = para_accept(ct->fd, &unix_addr, sizeof(unix_addr), &fd); if (ret < 0) PARA_NOTICE_LOG("%s\n", para_strerror(-ret)); if (ret <= 0) @@ -973,8 +973,8 @@ static void register_command_task(struct sched *s) ct->task = task_register(&(struct task_info) { .name = "afs command", - .pre_select = command_pre_select, - .post_select = command_post_select, + .pre_monitor = command_pre_monitor, + .post_monitor = command_post_monitor, .context = ct, }, s); } @@ -1003,8 +1003,7 @@ __noreturn void afs_init(int socket_fd) PARA_INFO_LOG("server_socket: %d\n", server_socket); init_admissible_files(OPT_STRING_VAL(AFS_INITIAL_MODE)); register_command_task(&s); - s.default_timeout.tv_sec = 0; - s.default_timeout.tv_usec = 999 * 1000; + s.default_timeout = 1000; ret = write(socket_fd, "\0", 1); if (ret != 1) { if (ret == 0) diff --git a/alsa_write.c b/alsa_write.c index bbbf8b65..2bf3fd0e 100644 --- a/alsa_write.c +++ b/alsa_write.c @@ -48,7 +48,7 @@ struct private_alsa_write_data { /* time until buffer underrun occurs, in milliseconds */ unsigned buffer_time; struct timeval drain_barrier; - /* File descriptor for select(). */ + /* File descriptor to monitor for reading. */ int poll_fd; }; @@ -202,7 +202,7 @@ out: return ret; } -static void alsa_write_pre_select(struct sched *s, void *context) +static void alsa_write_pre_monitor(struct sched *s, void *context) { struct pollfd pfd; struct writer_node *wn = context; @@ -230,7 +230,7 @@ static void alsa_write_pre_select(struct sched *s, void *context) return; } pad->poll_fd = pfd.fd; - para_fd_set(pfd.fd, &s->rfds, &s->max_fileno); + sched_monitor_readfd(pfd.fd, s); } static void alsa_close(struct writer_node *wn) @@ -254,7 +254,7 @@ free_pad: free(pad); } -static int alsa_write_post_select(__a_unused struct sched *s, void *context) +static int alsa_write_post_monitor(__a_unused struct sched *s, void *context) { struct writer_node *wn = context; struct private_alsa_write_data *pad = wn->private_data; @@ -321,7 +321,7 @@ again: frames = snd_pcm_writei(pad->handle, data, frames); if (frames == 0 || frames == -EAGAIN) { char buf[100]; - if (pad->poll_fd >= 0 && FD_ISSET(pad->poll_fd, &s->rfds)) + if (pad->poll_fd >= 0 && sched_read_ok(pad->poll_fd, s)) if (read(pad->poll_fd, buf, 100)) do_nothing; return 0; @@ -349,7 +349,7 @@ err: struct writer lsg_write_cmd_com_alsa_user_data = { - .pre_select = alsa_write_pre_select, - .post_select = alsa_write_post_select, + .pre_monitor = alsa_write_pre_monitor, + .post_monitor = alsa_write_post_monitor, .close = alsa_close, }; diff --git a/amp_filter.c b/amp_filter.c index 61b1653e..9369e4bc 100644 --- a/amp_filter.c +++ b/amp_filter.c @@ -43,7 +43,7 @@ static void amp_open(struct filter_node *fn) pad->amp, pad->amp / 64.0 + 1.0); } -static int amp_post_select(__a_unused struct sched *s, void *context) +static int amp_post_monitor(__a_unused struct sched *s, void *context) { struct filter_node *fn = context; struct private_amp_data *pad = fn->private_data; @@ -100,6 +100,6 @@ err: const struct filter lsg_filter_cmd_com_amp_user_data = { .open = amp_open, .close = amp_close, - .pre_select = generic_filter_pre_select, - .post_select = amp_post_select, + .pre_monitor = generic_filter_pre_monitor, + .post_monitor = amp_post_monitor, }; diff --git a/ao_write.c b/ao_write.c index 037b9299..41e609b7 100644 --- a/ao_write.c +++ b/ao_write.c @@ -42,7 +42,7 @@ static void aow_close(struct writer_node *wn) ao_shutdown(); } -static void aow_pre_select(struct sched *s, void *context) +static void aow_pre_monitor(struct sched *s, void *context) { struct writer_node *wn = context; struct private_aow_data *pawd = wn->private_data; @@ -342,7 +342,7 @@ fail: return -E_AO_PTHREAD; } -static int aow_post_select(__a_unused struct sched *s, void *context) +static int aow_post_monitor(__a_unused struct sched *s, void *context) { struct writer_node *wn = context; struct private_aow_data *pawd = wn->private_data; @@ -421,7 +421,7 @@ out: struct writer lsg_write_cmd_com_ao_user_data = { .close = aow_close, - .pre_select = aow_pre_select, - .post_select = aow_post_select, + .pre_monitor = aow_pre_monitor, + .post_monitor = aow_post_monitor, }; diff --git a/audioc.c b/audioc.c index af670633..2506c3f8 100644 --- a/audioc.c +++ b/audioc.c @@ -143,17 +143,17 @@ static struct i9e_completer audiod_completers[] = { {.name = NULL} }; -static void audioc_pre_select(struct sched *s, void *context) +static void audioc_pre_monitor(struct sched *s, void *context) { struct audioc_task *at = context; int ret = btr_node_status(at->btrn, 0, BTR_NT_ROOT); if (ret < 0) sched_min_delay(s); - para_fd_set(at->fd, &s->rfds, &s->max_fileno); + sched_monitor_readfd(at->fd, s); } -static int audioc_post_select(struct sched *s, void *context) +static int audioc_post_monitor(struct sched *s, void *context) { char *buf = NULL; struct audioc_task *at = context; @@ -162,7 +162,7 @@ static int audioc_post_select(struct sched *s, void *context) if (ret < 0) goto out; - if (!FD_ISSET(at->fd, &s->rfds)) + if (!sched_read_ok(at->fd, s)) return 0; bufsize = PARA_MAX(1024U, OPT_UINT32_VAL(BUFSIZE)); buf = para_malloc(bufsize); @@ -211,8 +211,8 @@ static int audioc_i9e_line_handler(char *line) EMBRACE(.name = "audioc line handler")); at->task = task_register(&(struct task_info) { .name = "audioc", - .pre_select = audioc_pre_select, - .post_select = audioc_post_select, + .pre_monitor = audioc_pre_monitor, + .post_monitor = audioc_post_monitor, .context = at, }, &sched); i9e_attach_to_stdout(at->btrn); @@ -250,9 +250,9 @@ __noreturn static void interactive_session(void) sigemptyset(&act.sa_mask); act.sa_flags = 0; sigaction(SIGINT, &act, NULL); - sched.select_function = i9e_select; + sched.poll_function = i9e_poll; - sched.default_timeout.tv_sec = 1; + sched.default_timeout = 1000; ret = i9e_open(&ici, &sched); if (ret < 0) goto out; diff --git a/audiod.c b/audiod.c index 119adbc0..a084558b 100644 --- a/audiod.c +++ b/audiod.c @@ -123,7 +123,7 @@ enum vss_status_flags { * This is needed also in audiod_command.c (for the tasks command), so it can * not be made static. */ -struct sched sched = {.max_fileno = 0}; +struct sched sched = {.timeout = 0}; /* The task for obtaining para_server's status (para_client stat). */ struct status_task { @@ -584,8 +584,8 @@ static void open_filters(struct slot_info *s) sprintf(buf, "%s (slot %d)", name, (int)(s - slot)); fn->task = task_register(&(struct task_info) { .name = buf, - .pre_select = f->pre_select, - .post_select = f->post_select, + .pre_monitor = f->pre_monitor, + .post_monitor = f->post_monitor, .context = fn, }, &sched); parent = fn->btrn; @@ -648,8 +648,8 @@ static int open_receiver(int format) audio_formats[format], name, slot_num); rn->task = task_register(&(struct task_info) { .name = name, - .pre_select = r->pre_select, - .post_select = r->post_select, + .pre_monitor = r->pre_monitor, + .post_monitor = r->post_monitor, .context = rn, }, &sched); return slot_num; @@ -1055,7 +1055,7 @@ static void init_local_socket(struct command_task *ct) exit(EXIT_FAILURE); } -static int signal_post_select(struct sched *s, void *context) +static int signal_post_monitor(struct sched *s, void *context) { struct signal_task *st = context; int ret, signum; @@ -1063,7 +1063,7 @@ static int signal_post_select(struct sched *s, void *context) ret = task_get_notification(st->task); if (ret < 0) return ret; - signum = para_next_signal(&s->rfds); + signum = para_next_signal(); switch (signum) { case SIGINT: case SIGTERM: @@ -1075,13 +1075,13 @@ static int signal_post_select(struct sched *s, void *context) return 0; } -static void command_pre_select(struct sched *s, void *context) +static void command_pre_monitor(struct sched *s, void *context) { struct command_task *ct = context; - para_fd_set(ct->fd, &s->rfds, &s->max_fileno); + sched_monitor_readfd(ct->fd, s); } -static int command_post_select(struct sched *s, void *context) +static int command_post_monitor(struct sched *s, void *context) { int ret; struct command_task *ct = context; @@ -1092,7 +1092,7 @@ static int command_post_select(struct sched *s, void *context) ret = task_get_notification(ct->task); if (ret < 0) return ret; - ret = handle_connect(ct->fd, &s->rfds); + ret = dispatch_local_connection(ct->fd); if (ret < 0) { PARA_NOTICE_LOG("%s\n", para_strerror(-ret)); if (ret == -E_AUDIOD_TERM) { @@ -1132,8 +1132,8 @@ static void init_command_task(struct command_task *ct) ct->task = task_register(&(struct task_info) { .name = "command", - .pre_select = command_pre_select, - .post_select = command_post_select, + .pre_monitor = command_pre_monitor, + .post_monitor = command_post_monitor, .context = ct, }, &sched); } @@ -1254,7 +1254,7 @@ static void start_stop_decoders(void) audiod_status_dump(true); } -static void status_pre_select(struct sched *s, void *context) +static void status_pre_monitor(struct sched *s, void *context) { struct status_task *st = context; int i, ret, cafn = stat_task->current_audio_format_num; @@ -1286,7 +1286,7 @@ min_delay: } /* restart the client task if necessary */ -static int status_post_select(struct sched *s, void *context) +static int status_post_monitor(struct sched *s, void *context) { struct status_task *st = context; int ret; @@ -1377,8 +1377,8 @@ static void init_status_task(struct status_task *st) stat_task->task = task_register(&(struct task_info) { .name = "stat", - .pre_select = status_pre_select, - .post_select = status_post_select, + .pre_monitor = status_pre_monitor, + .post_monitor = status_post_monitor, .context = stat_task, }, &sched); } @@ -1505,13 +1505,12 @@ int main(int argc, char *argv[]) signal_task->task = task_register(&(struct task_info) { .name = "signal", - .pre_select = signal_pre_select, - .post_select = signal_post_select, + .pre_monitor = signal_pre_monitor, + .post_monitor = signal_post_monitor, .context = signal_task, }, &sched); - sched.default_timeout.tv_sec = 2; - sched.default_timeout.tv_usec = 999 * 1000; + sched.default_timeout = 2999; ret = schedule(&sched); audiod_cleanup(); sched_shutdown(&sched); diff --git a/audiod.h b/audiod.h index b40fdd67..dedb038f 100644 --- a/audiod.h +++ b/audiod.h @@ -21,5 +21,5 @@ bool uid_is_whitelisted(uid_t uid); /* defined in audiod_command.c */ void audiod_status_dump(bool force); void close_stat_clients(void); -int handle_connect(int accept_fd, fd_set *rfds); +int dispatch_local_connection(int accept_fd); void stat_client_write_item(int item_num); diff --git a/audiod_command.c b/audiod_command.c index bb54dfab..795e2ac8 100644 --- a/audiod_command.c +++ b/audiod_command.c @@ -360,10 +360,9 @@ EXPORT_AUDIOD_CMD_HANDLER(version) * Handle arriving connections on the local socket. * * \param accept_fd The fd to accept connections on. - * \param rfds If \a accept_fd is not set in \a rfds, do nothing. * - * This is called in each iteration of the select loop. If there is an incoming - * connection on \a accept_fd, this function reads the command sent by the peer, + * This is called in each iteration of the main loop of the scheduler. If there + * is an incoming connection, the function reads the command sent by the peer, * checks the connecting user's permissions by using unix socket credentials * (if supported by the OS) and calls the corresponding command handler if * permissions are OK. @@ -372,8 +371,8 @@ EXPORT_AUDIOD_CMD_HANDLER(version) * connection to accept. * * \sa \ref para_accept(), \ref recv_cred_buffer(). - * */ -int handle_connect(int accept_fd, fd_set *rfds) + */ +int dispatch_local_connection(int accept_fd) { int argc, ret, clifd; char buf[MAXLINE], **argv = NULL; @@ -384,7 +383,7 @@ int handle_connect(int accept_fd, fd_set *rfds) char *errctx = NULL; const struct audiod_command_info *aci; - ret = para_accept(accept_fd, rfds, &unix_addr, sizeof(struct sockaddr_un), &clifd); + ret = para_accept(accept_fd, &unix_addr, sizeof(struct sockaddr_un), &clifd); if (ret <= 0) return ret; ret = recv_cred_buffer(clifd, buf, sizeof(buf) - 1); diff --git a/buffer_tree.c b/buffer_tree.c index f0d2002d..49e40fb9 100644 --- a/buffer_tree.c +++ b/buffer_tree.c @@ -570,7 +570,7 @@ bool btr_no_parent(struct btr_node *btrn) * buffer. * * Since the buffer tree may change at any time, this function should be called - * during each post_select call. + * during each post_monitor call. * * \return True if \a btrn has no siblings. */ @@ -1181,7 +1181,7 @@ struct btr_node *btr_search_node(const char *name, struct btr_node *root) * \param type The supposed type of \a btrn. * * Most users of the buffer tree subsystem call this function from both - * their pre_select and the post_select methods. + * their ->pre_monitor() and ->post_monitor() methods. * * \return Negative if an error condition was detected, zero if there * is nothing to do and positive otherwise. diff --git a/check_wav.c b/check_wav.c index 89ebdacc..100975dc 100644 --- a/check_wav.c +++ b/check_wav.c @@ -39,15 +39,15 @@ struct check_wav_context { }; /** - * Set select timeout according to the given context. + * Request a minimal timeout if not idle. * - * \param s Contains the timeval that should be set. - * \param cwc Contains a pointer to the buffer tree node. + * \param s The scheduler instance. + * \param cwc The buffer tree node is derived from this. * - * This requests a minimal timeout from the scheduler if btrn of \a cwc is not - * idle. + * If no data is available and the buffer tree node is not in error state, the + * function does nothing. */ -void check_wav_pre_select(struct sched *s, struct check_wav_context *cwc) +void check_wav_pre_monitor(struct sched *s, struct check_wav_context *cwc) { int ret = btr_node_status(cwc->btrn, cwc->min_iqs, BTR_NT_INTERNAL); if (ret != 0) @@ -121,7 +121,7 @@ out: * * \return Standard. */ -int check_wav_post_select(struct check_wav_context *cwc) +int check_wav_post_monitor(struct check_wav_context *cwc) { struct btr_node *btrn = cwc->btrn; unsigned char *a; @@ -198,8 +198,8 @@ out: * children of this node can figure out channel count, sample rate, etc. * * \return The (opaque) handle of the newly created check_wav instance. It is - * supposed to be passed to \ref check_wav_pre_select() and \ref - * check_wav_post_select(). + * supposed to be passed to \ref check_wav_pre_monitor() and \ref + * check_wav_post_monitor(). * * \sa \ref btr_new_node. */ @@ -225,7 +225,7 @@ struct check_wav_context *check_wav_init(struct btr_node *parent, * * \param cwc Determines the instance to shut down. * - * This function may only be called after check_wav_post_select() has returned + * This function may only be called after check_wav_post_monitor() has returned * negative. */ void check_wav_shutdown(struct check_wav_context *cwc) diff --git a/check_wav.h b/check_wav.h index 79b11962..e6188c52 100644 --- a/check_wav.h +++ b/check_wav.h @@ -42,6 +42,6 @@ struct wav_params { struct check_wav_context *check_wav_init(struct btr_node *parent, struct btr_node *child, struct wav_params *params, struct btr_node **cw_btrn); -void check_wav_pre_select(struct sched *s, struct check_wav_context *cwc); -int check_wav_post_select(struct check_wav_context *cwc); +void check_wav_pre_monitor(struct sched *s, struct check_wav_context *cwc); +int check_wav_post_monitor(struct check_wav_context *cwc); void check_wav_shutdown(struct check_wav_context *cwc); diff --git a/client.c b/client.c index 8caf4483..ed0d5e02 100644 --- a/client.c +++ b/client.c @@ -42,7 +42,7 @@ struct exec_task { size_t result_size; }; -static void exec_pre_select(struct sched *s, void *context) +static void exec_pre_monitor(struct sched *s, void *context) { struct exec_task *et = context; int ret = btr_node_status(et->btrn, 0, BTR_NT_LEAF); @@ -51,7 +51,7 @@ static void exec_pre_select(struct sched *s, void *context) sched_min_delay(s); } -static int exec_post_select(__a_unused struct sched *s, void *context) +static int exec_post_monitor(__a_unused struct sched *s, void *context) { struct exec_task *et = context; struct btr_node *btrn = et->btrn; @@ -123,7 +123,7 @@ fail: static int execute_client_command(const char *cmd, char **result) { int ret; - struct sched command_sched = {.default_timeout = {.tv_sec = 1}}; + struct sched command_sched = {.default_timeout = 1000}; struct exec_task exec_task = { .result_buf = para_strdup(""), .result_size = 1, @@ -138,8 +138,8 @@ static int execute_client_command(const char *cmd, char **result) EMBRACE(.name = "exec_collect")); exec_task.task = task_register(&(struct task_info) { .name = "client exec", - .pre_select = exec_pre_select, - .post_select = exec_post_select, + .pre_monitor = exec_pre_monitor, + .post_monitor = exec_post_monitor, .context = &exec_task, }, &command_sched); ret = client_connect(ct, &command_sched, NULL, exec_task.btrn); @@ -532,7 +532,7 @@ __noreturn static void interactive_session(void) sigemptyset(&act.sa_mask); act.sa_flags = 0; sigaction(SIGINT, &act, NULL); - sched.select_function = i9e_select; + sched.poll_function = i9e_poll; ret = i9e_open(&ici, &sched); if (ret < 0) @@ -578,7 +578,7 @@ struct supervisor_task { struct task *task; }; -static int supervisor_post_select(struct sched *s, void *context) +static int supervisor_post_monitor(struct sched *s, void *context) { struct supervisor_task *svt = context; int ret = task_status(ct->task); @@ -624,7 +624,7 @@ int main(int argc, char *argv[]) int ret; crypt_init(); - sched.default_timeout.tv_sec = 1; + sched.default_timeout = 1000; ret = client_parse_config(argc, argv, &ct, &client_loglevel); if (ret < 0) @@ -648,7 +648,7 @@ int main(int argc, char *argv[]) EMBRACE(.name = "stdout", .parent = ct->btrn[0])); supervisor_task.task = task_register(&(struct task_info) { .name = "supervisor", - .post_select = supervisor_post_select, + .post_monitor = supervisor_post_monitor, .context = &supervisor_task, }, &sched); diff --git a/client_common.c b/client_common.c index f8ee80c9..3beeed1f 100644 --- a/client_common.c +++ b/client_common.c @@ -57,7 +57,7 @@ void client_close(struct client_task *ct) * The context pointer is assumed to refer to a client task structure that was * initialized earlier by client_open(). */ -static void client_pre_select(struct sched *s, void *context) +static void client_pre_monitor(struct sched *s, void *context) { int ret; struct client_task *ct = context; @@ -68,13 +68,13 @@ static void client_pre_select(struct sched *s, void *context) case CL_CONNECTED: case CL_SENT_AUTH: case CL_SENT_CH_RESPONSE: - para_fd_set(ct->scc.fd, &s->rfds, &s->max_fileno); + sched_monitor_readfd(ct->scc.fd, s); return; case CL_RECEIVED_WELCOME: case CL_RECEIVED_PROCEED: case CL_RECEIVED_CHALLENGE: - para_fd_set(ct->scc.fd, &s->wfds, &s->max_fileno); + sched_monitor_writefd(ct->scc.fd, s); return; case CL_SENDING: @@ -83,7 +83,7 @@ static void client_pre_select(struct sched *s, void *context) if (ret < 0) sched_min_delay(s); else if (ret > 0) - para_fd_set(ct->scc.fd, &s->wfds, &s->max_fileno); + sched_monitor_writefd(ct->scc.fd, s); } __attribute__ ((fallthrough)); case CL_EXECUTING: @@ -92,7 +92,7 @@ static void client_pre_select(struct sched *s, void *context) if (ret < 0) sched_min_delay(s); else if (ret > 0) - para_fd_set(ct->scc.fd, &s->rfds, &s->max_fileno); + sched_monitor_readfd(ct->scc.fd, s); } return; } @@ -125,8 +125,7 @@ static int send_sb(struct client_task *ct, int channel, void *buf, size_t numbyt return 0; } -static int recv_sb(struct client_task *ct, fd_set *rfds, - struct sb_buffer *result) +static int recv_sb(struct client_task *ct, struct sb_buffer *result) { int ret; size_t n; @@ -134,8 +133,6 @@ static int recv_sb(struct client_task *ct, fd_set *rfds, void *trafo_context; struct iovec iov; - if (!FD_ISSET(ct->scc.fd, rfds)) - return 0; if (ct->status < CL_SENT_CH_RESPONSE) trafo = trafo_context = NULL; else { @@ -146,7 +143,7 @@ static int recv_sb(struct client_task *ct, fd_set *rfds, ct->sbc[0] = sb_new_recv(0, trafo, trafo_context); again: sb_get_recv_buffer(ct->sbc[0], &iov); - ret = read_nonblock(ct->scc.fd, iov.iov_base, iov.iov_len, rfds, &n); + ret = read_nonblock(ct->scc.fd, iov.iov_base, iov.iov_len, &n); if (ret < 0) { sb_free(ct->sbc[0]); ct->sbc[0] = NULL; @@ -274,7 +271,7 @@ static bool has_feature(const char *feature, struct client_task *ct) * The context pointer refers to a client task structure that was initialized * earlier by client_open(). */ -static int client_post_select(struct sched *s, void *context) +static int client_post_monitor(struct sched *s, void *context) { struct client_task *ct = context; int ret = 0; @@ -288,7 +285,7 @@ static int client_post_select(struct sched *s, void *context) return 0; switch (ct->status) { case CL_CONNECTED: /* receive welcome message */ - ret = read_nonblock(ct->scc.fd, buf, sizeof(buf), &s->rfds, &n); + ret = read_nonblock(ct->scc.fd, buf, sizeof(buf), &n); if (ret < 0 || n == 0) goto out; ct->features = parse_features(buf); @@ -302,7 +299,7 @@ static int client_post_select(struct sched *s, void *context) * 0.8.0 we no longer need to request the feature. */ bool has_sha256; - if (!FD_ISSET(ct->scc.fd, &s->wfds)) + if (!sched_write_ok(ct->scc.fd, s)) return 0; has_sha256 = has_feature("sha256", ct); sprintf(buf, AUTH_REQUEST_MSG "%s%s", ct->user, has_sha256? @@ -324,7 +321,7 @@ static int client_post_select(struct sched *s, void *context) unsigned char crypt_buf[1024]; struct sb_buffer sbb; - ret = recv_sb(ct, &s->rfds, &sbb); + ret = recv_sb(ct, &sbb); if (ret <= 0) goto out; if (sbb.band != SBD_CHALLENGE) { @@ -371,7 +368,7 @@ static int client_post_select(struct sched *s, void *context) case CL_SENT_CH_RESPONSE: /* read server response */ { struct sb_buffer sbb; - ret = recv_sb(ct, &s->rfds, &sbb); + ret = recv_sb(ct, &sbb); if (ret <= 0) goto out; free(sbb.iov.iov_base); @@ -383,7 +380,7 @@ static int client_post_select(struct sched *s, void *context) } case CL_RECEIVED_PROCEED: /* concat args and send command */ { - if (!FD_ISSET(ct->scc.fd, &s->wfds)) + if (!sched_write_ok(ct->scc.fd, s)) return 0; ret = send_sb_command(ct); if (ret <= 0) @@ -405,7 +402,7 @@ static int client_post_select(struct sched *s, void *context) } if (ret < 0) goto close1; - if (ret > 0 && FD_ISSET(ct->scc.fd, &s->wfds)) { + if (ret > 0 && sched_write_ok(ct->scc.fd, s)) { sz = btr_next_buffer(ct->btrn[1], &buf2); assert(sz); ret = send_sb(ct, 1, buf2, sz, SBD_BLOB_DATA, true); @@ -421,9 +418,9 @@ static int client_post_select(struct sched *s, void *context) ret = btr_node_status(ct->btrn[0], 0, BTR_NT_ROOT); if (ret < 0) goto close0; - if (ret > 0 && FD_ISSET(ct->scc.fd, &s->rfds)) { + if (ret > 0 && sched_read_ok(ct->scc.fd, s)) { struct sb_buffer sbb; - ret = recv_sb(ct, &s->rfds, &sbb); + ret = recv_sb(ct, &sbb); if (ret < 0) goto close0; if (ret > 0) { @@ -503,8 +500,8 @@ int client_connect(struct client_task *ct, struct sched *s, ct->task = task_register(&(struct task_info) { .name = "client", - .pre_select = client_pre_select, - .post_select = client_post_select, + .pre_monitor = client_pre_monitor, + .post_monitor = client_post_monitor, .context = ct, }, s); return 1; diff --git a/command.c b/command.c index e3f12931..200ff054 100644 --- a/command.c +++ b/command.c @@ -22,8 +22,8 @@ #include "net.h" #include "server.h" #include "list.h" -#include "send.h" #include "sched.h" +#include "send.h" #include "vss.h" #include "daemon.h" #include "fd.h" diff --git a/compress_filter.c b/compress_filter.c index ff4ce6fb..9f9d8515 100644 --- a/compress_filter.c +++ b/compress_filter.c @@ -37,7 +37,7 @@ static void compress_close(struct filter_node *fn) free(fn->private_data); } -static int compress_post_select(__a_unused struct sched *s, void *context) +static int compress_post_monitor(__a_unused struct sched *s, void *context) { struct filter_node *fn = context; struct private_compress_data *pcd = fn->private_data; @@ -162,6 +162,6 @@ const struct filter lsg_filter_cmd_com_compress_user_data = { .setup = compress_setup, .open = compress_open, .close = compress_close, - .pre_select = generic_filter_pre_select, - .post_select = compress_post_select, + .pre_monitor = generic_filter_pre_monitor, + .post_monitor = compress_post_monitor, }; diff --git a/configure.ac b/configure.ac index 9056002a..c7258111 100644 --- a/configure.ac +++ b/configure.ac @@ -584,7 +584,7 @@ fi if test $HAVE_OSS = yes -o $HAVE_ALSA = yes; then build_mixer="yes" executables="$executables mixer" - mixer_errlist_objs="mixer exec string fd lsu version" + mixer_errlist_objs="mixer exec string fd time lsu version" if test $HAVE_OSS = yes; then mixer_errlist_objs="$mixer_errlist_objs oss_mix" fi @@ -840,6 +840,7 @@ audioc_errlist_objs=" lsu net fd + time version " if test $HAVE_READLINE = yes; then @@ -847,7 +848,6 @@ if test $HAVE_READLINE = yes; then buffer_tree interactive sched - time " fi audioc_objs="$audioc_errlist_objs" diff --git a/dccp_recv.c b/dccp_recv.c index 639c93fc..faacd39f 100644 --- a/dccp_recv.c +++ b/dccp_recv.c @@ -109,16 +109,16 @@ err: return ret; } -static void dccp_recv_pre_select(struct sched *s, void *context) +static void dccp_recv_pre_monitor(struct sched *s, void *context) { struct receiver_node *rn = context; - if (generic_recv_pre_select(s, rn) <= 0) + if (generic_recv_pre_monitor(s, rn) <= 0) return; - para_fd_set(rn->fd, &s->rfds, &s->max_fileno); + sched_monitor_readfd(rn->fd, s); } -static int dccp_recv_post_select(struct sched *s, void *context) +static int dccp_recv_post_monitor(__a_unused struct sched *s, void *context) { struct receiver_node *rn = context; struct btr_node *btrn = rn->btrn; @@ -136,7 +136,7 @@ static int dccp_recv_post_select(struct sched *s, void *context) ret = -E_DCCP_OVERRUN; if (iovcnt == 0) goto out; - ret = readv_nonblock(rn->fd, iov, iovcnt, &s->rfds, &num_bytes); + ret = readv_nonblock(rn->fd, iov, iovcnt, &num_bytes); if (num_bytes == 0) goto out; if (num_bytes <= iov[0].iov_len) /* only the first buffer was filled */ @@ -154,6 +154,6 @@ out: const struct receiver lsg_recv_cmd_com_dccp_user_data = { .open = dccp_recv_open, .close = dccp_recv_close, - .pre_select = dccp_recv_pre_select, - .post_select = dccp_recv_post_select, + .pre_monitor = dccp_recv_pre_monitor, + .post_monitor = dccp_recv_post_monitor, }; diff --git a/dccp_send.c b/dccp_send.c index bca7ad67..9e937271 100644 --- a/dccp_send.c +++ b/dccp_send.c @@ -24,8 +24,8 @@ #include "net.h" #include "server.h" #include "list.h" -#include "send.h" #include "sched.h" +#include "send.h" #include "vss.h" #include "fd.h" @@ -36,14 +36,13 @@ struct dccp_fec_client { struct fec_client *fc; }; -static void dccp_pre_select(int *max_fileno, fd_set *rfds, - __a_unused fd_set *wfds) +static void dccp_pre_monitor(struct sched *s) { unsigned n; FOR_EACH_LISTEN_FD(n, dss) if (dss->listen_fds[n] >= 0) - para_fd_set(dss->listen_fds[n], rfds, max_fileno); + sched_monitor_readfd(dss->listen_fds[n], s); } /** @@ -119,14 +118,14 @@ static void dccp_send_fec(struct sender_client *sc, char *buf, size_t len) dccp_shutdown_client(sc); } -static void dccp_post_select(fd_set *rfds, __a_unused fd_set *wfds) +static void dccp_post_monitor(__a_unused struct sched *s) { struct sender_client *sc; struct dccp_fec_client *dfc; int tx_ccid; uint32_t k, n; - sc = accept_sender_client(dss, rfds); + sc = accept_sender_client(dss); if (!sc) return; @@ -249,8 +248,8 @@ const struct sender dccp_sender = { .name = "dccp", .init = dccp_send_init, .shutdown = dccp_shutdown, - .pre_select = dccp_pre_select, - .post_select = dccp_post_select, + .pre_monitor = dccp_pre_monitor, + .post_monitor = dccp_post_monitor, .shutdown_clients = dccp_shutdown_clients, .client_cmds = { [SENDER_on] = dccp_com_on, diff --git a/fd.c b/fd.c index 33891d2e..800106e1 100644 --- a/fd.c +++ b/fd.c @@ -176,14 +176,11 @@ __printf_2_3 int write_va_buffer(int fd, const char *fmt, ...) * \param fd The file descriptor to read from. * \param iov Scatter/gather array used in readv(). * \param iovcnt Number of elements in \a iov. - * \param rfds An optional fd set pointer. * \param num_bytes Result pointer. Contains the number of bytes read from \a fd. * - * If rfds is not NULL and the (non-blocking) file descriptor fd is not set in - * rfds, this function returns early without doing anything. Otherwise it tries - * to read up to sz bytes from fd, where sz is the sum of the lengths of all - * vectors in iov. Like \ref xwrite(), EAGAIN and EINTR are not considered - * error conditions. However, EOF is. + * This function tries to read up to sz bytes from fd, where sz is the sum of + * the lengths of all vectors in iov. Like \ref xwrite(), EAGAIN and EINTR are + * not considered error conditions. However, EOF is. * * \return Zero or a negative error code. If the underlying call to readv(2) * returned zero (indicating an end of file condition) or failed for some @@ -197,24 +194,12 @@ __printf_2_3 int write_va_buffer(int fd, const char *fmt, ...) * * \sa \ref xwrite(), read(2), readv(2). */ -int readv_nonblock(int fd, struct iovec *iov, int iovcnt, fd_set *rfds, - size_t *num_bytes) +int readv_nonblock(int fd, struct iovec *iov, int iovcnt, size_t *num_bytes) { int ret, i, j; *num_bytes = 0; - /* - * Avoid a shortcoming of select(): Reads from a non-blocking fd might - * return EAGAIN even if FD_ISSET() returns true. However, FD_ISSET() - * returning false definitely means that no data can currently be read. - * This is the common case, so it is worth to avoid the overhead of the - * read() system call in this case. - */ - if (rfds && !FD_ISSET(fd, rfds)) - return 0; - for (i = 0, j = 0; i < iovcnt;) { - /* fix up the first iov */ assert(j < iov[i].iov_len); iov[i].iov_base += j; @@ -251,7 +236,6 @@ int readv_nonblock(int fd, struct iovec *iov, int iovcnt, fd_set *rfds, * \param fd The file descriptor to read from. * \param buf The buffer to read data to. * \param sz The size of \a buf. - * \param rfds \see \ref readv_nonblock(). * \param num_bytes \see \ref readv_nonblock(). * * This is a simple wrapper for readv_nonblock() which uses an iovec with a single @@ -259,10 +243,10 @@ int readv_nonblock(int fd, struct iovec *iov, int iovcnt, fd_set *rfds, * * \return The return value of the underlying call to readv_nonblock(). */ -int read_nonblock(int fd, void *buf, size_t sz, fd_set *rfds, size_t *num_bytes) +int read_nonblock(int fd, void *buf, size_t sz, size_t *num_bytes) { struct iovec iov = {.iov_base = buf, .iov_len = sz}; - return readv_nonblock(fd, &iov, 1, rfds, num_bytes); + return readv_nonblock(fd, &iov, 1, num_bytes); } /** @@ -271,7 +255,6 @@ int read_nonblock(int fd, void *buf, size_t sz, fd_set *rfds, size_t *num_bytes) * \param fd The file descriptor to receive from. * \param pattern The expected pattern. * \param bufsize The size of the internal buffer. - * \param rfds Passed to read_nonblock(). * * This function tries to read at most \a bufsize bytes from the non-blocking * file descriptor \a fd. If at least \p strlen(\a pattern) bytes have been @@ -283,11 +266,11 @@ int read_nonblock(int fd, void *buf, size_t sz, fd_set *rfds, size_t *num_bytes) * * \sa \ref read_nonblock(), \sa strncasecmp(3). */ -int read_pattern(int fd, const char *pattern, size_t bufsize, fd_set *rfds) +int read_pattern(int fd, const char *pattern, size_t bufsize) { size_t n, len; char *buf = para_malloc(bufsize + 1); - int ret = read_nonblock(fd, buf, bufsize, rfds, &n); + int ret = read_nonblock(fd, buf, bufsize, &n); buf[n] = '\0'; if (ret < 0) @@ -325,36 +308,6 @@ bool file_exists(const char *fn) return !stat(fn, &statbuf); } -/** - * Paraslash's wrapper for select(2). - * - * It calls select(2) (with no exceptfds) and starts over if select() was - * interrupted by a signal. - * - * \param n The highest-numbered descriptor in any of the two sets, plus 1. - * \param readfds fds that should be checked for readability. - * \param writefds fds that should be checked for writablility. - * \param timeout_tv upper bound on the amount of time elapsed before select() - * returns. - * - * \return The return value of the underlying select() call on success, the - * negative system error code on errors. - * - * All arguments are passed verbatim to select(2). - * \sa select(2) select_tut(2). - */ -int para_select(int n, fd_set *readfds, fd_set *writefds, - struct timeval *timeout_tv) -{ - int ret; - do - ret = select(n, readfds, writefds, NULL, timeout_tv); - while (ret < 0 && errno == EINTR); - if (ret < 0) - return -ERRNO_TO_PARA_ERROR(errno); - return ret; -} - /** * Set a file descriptor to blocking mode. * @@ -391,34 +344,6 @@ __must_check int mark_fd_nonblocking(int fd) return 1; } -/** - * Set a file descriptor in a fd_set. - * - * \param fd The file descriptor to be set. - * \param fds The file descriptor set. - * \param max_fileno Highest-numbered file descriptor. - * - * This wrapper for FD_SET() passes its first two arguments to \p FD_SET. Upon - * return, \a max_fileno contains the maximum of the old_value and \a fd. - * - * \sa \ref para_select. -*/ -void para_fd_set(int fd, fd_set *fds, int *max_fileno) -{ - assert(fd >= 0 && fd < FD_SETSIZE); -#if 0 - { - int flags = fcntl(fd, F_GETFL); - if (!(flags & O_NONBLOCK)) { - PARA_EMERG_LOG("fd %d is a blocking file descriptor\n", fd); - exit(EXIT_FAILURE); - } - } -#endif - FD_SET(fd, fds); - *max_fileno = PARA_MAX(*max_fileno, fd); -} - /** * Paraslash's wrapper for mmap. * @@ -642,6 +567,47 @@ int para_munmap(void *start, size_t length) return -ERRNO_TO_PARA_ERROR(err); } +/** + * Simple wrapper for poll(2). + * + * It calls poll(2) and starts over if the call was interrupted by a signal. + * + * \param fds See poll(2). + * \param nfds See poll(2). + * \param timeout See poll(2). + * + * \return The return value of the underlying poll() call on success, the + * negative paraslash error code on errors. + * + * All arguments are passed verbatim to poll(2). + */ +int xpoll(struct pollfd *fds, nfds_t nfds, int timeout) +{ + int ret; + + do + ret = poll(fds, nfds, timeout); + while (ret < 0 && errno == EINTR); + return ret < 0? -ERRNO_TO_PARA_ERROR(errno) : ret; +} + +/** + * Check a file descriptor for readability. + * + * \param fd The file descriptor. + * + * \return positive if fd is ready for reading, zero if it isn't, negative if + * an error occurred. + * + * \sa \ref write_ok(). + */ +int read_ok(int fd) +{ + struct pollfd pfd = {.fd = fd, .events = POLLIN}; + int ret = xpoll(&pfd, 1, 0); + return ret < 0? ret : pfd.revents & POLLIN; +} + /** * Check a file descriptor for writability. * @@ -649,18 +615,14 @@ int para_munmap(void *start, size_t length) * * \return positive if fd is ready for writing, zero if it isn't, negative if * an error occurred. + * + * \sa \ref read_ok(). */ - int write_ok(int fd) { - struct timeval tv; - fd_set wfds; - - FD_ZERO(&wfds); - FD_SET(fd, &wfds); - tv.tv_sec = 0; - tv.tv_usec = 0; - return para_select(fd + 1, NULL, &wfds, &tv); + struct pollfd pfd = {.fd = fd, .events = POLLOUT}; + int ret = xpoll(&pfd, 1, 0); + return ret < 0? ret : pfd.revents & POLLOUT; } /** diff --git a/fd.h b/fd.h index c9e79426..270d0ce2 100644 --- a/fd.h +++ b/fd.h @@ -6,11 +6,9 @@ int xrename(const char *oldpath, const char *newpath); int write_all(int fd, const char *buf, size_t len); __printf_2_3 int write_va_buffer(int fd, const char *fmt, ...); bool file_exists(const char *); -int para_select(int n, fd_set *readfds, fd_set *writefds, - struct timeval *timeout_tv); +int xpoll(struct pollfd *fds, nfds_t nfds, int timeout); __must_check int mark_fd_nonblocking(int fd); __must_check int mark_fd_blocking(int fd); -void para_fd_set(int fd, fd_set *fds, int *max_fileno); int para_mmap(size_t length, int prot, int flags, int fd, void *map); int para_open(const char *path, int flags, mode_t mode); int para_mkdir(const char *path, mode_t mode); @@ -18,12 +16,12 @@ int para_chdir(const char *path); int mmap_full_file(const char *filename, int open_mode, void **map, size_t *size, int *fd_ptr); int para_munmap(void *start, size_t length); +int read_ok(int fd); int write_ok(int fd); void valid_fd_012(void); -int readv_nonblock(int fd, struct iovec *iov, int iovcnt, fd_set *rfds, - size_t *num_bytes); -int read_nonblock(int fd, void *buf, size_t sz, fd_set *rfds, size_t *num_bytes); -int read_pattern(int fd, const char *pattern, size_t bufsize, fd_set *rfds); +int readv_nonblock(int fd, struct iovec *iov, int iovcnt, size_t *num_bytes); +int read_nonblock(int fd, void *buf, size_t sz, size_t *num_bytes); +int read_pattern(int fd, const char *pattern, size_t bufsize); int xwrite(int fd, const char *buf, size_t len); int xwritev(int fd, struct iovec *iov, int iovcnt); int for_each_file_in_dir(const char *dirname, diff --git a/fecdec_filter.c b/fecdec_filter.c index 13d4f7b2..d629603c 100644 --- a/fecdec_filter.c +++ b/fecdec_filter.c @@ -431,7 +431,7 @@ static void fecdec_close(struct filter_node *fn) fn->private_data = NULL; } -static int fecdec_post_select(__a_unused struct sched *s, void *context) +static int fecdec_post_monitor(__a_unused struct sched *s, void *context) { struct filter_node *fn = context; struct btr_node *btrn = fn->btrn; @@ -478,7 +478,7 @@ static void fecdec_open(struct filter_node *fn) const struct filter lsg_filter_cmd_com_fecdec_user_data = { .open = fecdec_open, - .pre_select = generic_filter_pre_select, - .post_select = fecdec_post_select, + .pre_monitor = generic_filter_pre_monitor, + .post_monitor = fecdec_post_monitor, .close = fecdec_close, }; diff --git a/file_write.c b/file_write.c index 9a5ed5d7..64153178 100644 --- a/file_write.c +++ b/file_write.c @@ -69,7 +69,7 @@ static int prepare_output_file(struct writer_node *wn) return 1; } -static void file_write_pre_select(struct sched *s, void *context) +static void file_write_pre_monitor(struct sched *s, void *context) { struct writer_node *wn = context; struct private_file_write_data *pfwd = wn->private_data; @@ -79,7 +79,7 @@ static void file_write_pre_select(struct sched *s, void *context) return; if (ret < 0 || !pfwd) return sched_min_delay(s); - para_fd_set(pfwd->fd, &s->wfds, &s->max_fileno); + sched_monitor_writefd(pfwd->fd, s); } static void file_write_close(struct writer_node *wn) @@ -92,7 +92,7 @@ static void file_write_close(struct writer_node *wn) free(pfwd); } -static int file_write_post_select(__a_unused struct sched *s, void *context) +static int file_write_post_monitor(__a_unused struct sched *s, void *context) { struct writer_node *wn = context; struct private_file_write_data *pfwd = wn->private_data; @@ -111,7 +111,7 @@ static int file_write_post_select(__a_unused struct sched *s, void *context) ret = prepare_output_file(wn); goto out; } - if (!FD_ISSET(pfwd->fd, &s->wfds)) + if (!sched_write_ok(pfwd->fd, s)) return 0; bytes = btr_next_buffer(btrn, &buf); assert(bytes > 0); @@ -128,7 +128,7 @@ out: /** the init function of the file writer */ struct writer lsg_write_cmd_com_file_user_data = { - .pre_select = file_write_pre_select, - .post_select = file_write_post_select, + .pre_monitor = file_write_pre_monitor, + .post_monitor = file_write_post_monitor, .close = file_write_close, }; diff --git a/filter.c b/filter.c index d4a24239..85d3da7e 100644 --- a/filter.c +++ b/filter.c @@ -137,8 +137,8 @@ int main(int argc, char *argv[]) EMBRACE(.name = name, .parent = parent, .handler = f->execute, .context = fn)); ti.name = name; - ti.pre_select = f->pre_select; - ti.post_select = f->post_select; + ti.pre_monitor = f->pre_monitor; + ti.post_monitor = f->post_monitor; ti.context = fn; if (f->open) f->open(fn); @@ -149,8 +149,7 @@ int main(int argc, char *argv[]) EMBRACE(.name = "stdout", .parent = parent)); stdout_task_register(sot, &s); - s.default_timeout.tv_sec = 1; - s.default_timeout.tv_usec = 0; + s.default_timeout = 1000; btr_log_tree(sit->btrn, LL_INFO); ret = schedule(&s); sched_shutdown(&s); diff --git a/filter.h b/filter.h index 69d4dfee..77057e6a 100644 --- a/filter.h +++ b/filter.h @@ -28,16 +28,18 @@ struct filter_node { }; /** - * The structure associated with a paraslash filter. + * Describes a method to convert audio data. * - * Paraslash filters are "modules" which transform an audio stream. struct - * filter contains methods which are implemented by each filter. + * Paraslash filters are "modules" which transform the data of an audio stream. + * This structure contains the methods which have to be implemented by each + * filter. * - * Note: As several instances of the same filter may be running at the same - * time, all these filter functions must be reentrant; no static non-constant - * variables may be used. + * As several instances of the same filter may be running at the same time, all + * filter methods must be reentrant and no static non-constant variables must + * be used. * - * \sa \ref filter_node. + * \sa \ref filter_node, struct \ref receiver, struct \ref writer, struct \ref + * sched. */ struct filter { /** @@ -81,24 +83,10 @@ struct filter { * This should free whatever ->setup() has allocated. */ void (*teardown)(const struct lls_parse_result *lpr, void *conf); - /** - * Set scheduler timeout and add file descriptors to fd sets. - * - * This function controls the timeout value for the next call to - * select(2). It may decrease the current timeout but shall never - * increase it. The second purpose of this function is to add file - * descriptors to the two fd sets of the sched structure. The - * descriptors in these sets will be watched by the subsequent - * select(2) call. - */ - void (*pre_select)(struct sched *s, void *context); - /** - * Convert (filter) the given data. - * - * Pointer to the converting function of the filter. On errors, the - * post_select function is supposed to return a negative error code. - */ - int (*post_select)(struct sched *s, void *context); + /** Force a zero timeout if data is available in the buffer tree. */ + void (*pre_monitor)(struct sched *s, void *context); + /** Convert (filter) input data into output data. */ + int (*post_monitor)(struct sched *s, void *context); /** * Answer a buffer tree query. * @@ -124,7 +112,7 @@ int filter_setup(const char *fa, void **conf, struct lls_parse_result **lprp); #define FILTER_CMD_OPT_STRING_VAL(_cmd, _opt, _lpr) \ (lls_string_val(0, FILTER_CMD_OPT_RESULT(_cmd, _opt, _lpr))) -void generic_filter_pre_select(struct sched *s, void *context); +void generic_filter_pre_monitor(struct sched *s, void *context); int decoder_execute(const char *cmd, unsigned sample_rate, unsigned channels, char **result); diff --git a/filter_common.c b/filter_common.c index add788a8..f48e4570 100644 --- a/filter_common.c +++ b/filter_common.c @@ -169,17 +169,16 @@ void print_filter_list(void) } /** - * Set select timeout of the scheduler. + * Request a minimal timeout if not idle. * - * \param s The scheduler. - * \param context Pointer to the filter node (task context). + * \param s The scheduler instance. + * \param context Pointer to the filter node. * - * This looks at the status of the btr node of the filter. If data is available - * in the input queue of the filter, or if an error occurred, a minimal timeout - * for the next select call is requested from the scheduler. Otherwise the - * scheduler timeout is left unchanged. + * If the buffer tree node of the given filter node has data available (or is + * in error state) a minimal I/O timeout is requested from the scheduler. + * Otherwise the function does nothing. */ -void generic_filter_pre_select(struct sched *s, void *context) +void generic_filter_pre_monitor(struct sched *s, void *context) { struct filter_node *fn = context; diff --git a/flacdec_filter.c b/flacdec_filter.c index 6a3a8eff..2c9f8607 100644 --- a/flacdec_filter.c +++ b/flacdec_filter.c @@ -205,7 +205,7 @@ static bool output_queue_full(struct btr_node *btrn) return btr_get_output_queue_size(btrn) > FLACDEC_MAX_OUTPUT_SIZE; } -static void flacdec_pre_select(struct sched *s, void *context) +static void flacdec_pre_monitor(struct sched *s, void *context) { struct filter_node *fn = context; struct private_flacdec_data *pfd = fn->private_data; @@ -221,7 +221,7 @@ static void flacdec_pre_select(struct sched *s, void *context) return sched_min_delay(s); } -static int flacdec_post_select(__a_unused struct sched *s, void *context) +static int flacdec_post_monitor(__a_unused struct sched *s, void *context) { struct filter_node *fn = context; struct private_flacdec_data *pfd = fn->private_data; @@ -294,7 +294,7 @@ static void flacdec_open(struct filter_node *fn) const struct filter lsg_filter_cmd_com_flacdec_user_data = { .open = flacdec_open, .close = flacdec_close, - .pre_select = flacdec_pre_select, - .post_select = flacdec_post_select, + .pre_monitor = flacdec_pre_monitor, + .post_monitor = flacdec_post_monitor, .execute = flacdec_execute, }; diff --git a/grab_client.c b/grab_client.c index 83706493..393e2ce3 100644 --- a/grab_client.c +++ b/grab_client.c @@ -89,7 +89,7 @@ err: return -E_GC_WRITE; } -static void gc_pre_select(struct sched *s, void *context) +static void gc_pre_monitor(struct sched *s, void *context) { struct grab_client *gc = context; int ret = btr_node_status(gc->btrn, 0, BTR_NT_LEAF); @@ -98,14 +98,14 @@ static void gc_pre_select(struct sched *s, void *context) return; if (ret < 0) sched_min_delay(s); - para_fd_set(gc->fd, &s->wfds, &s->max_fileno); + sched_monitor_writefd(gc->fd, s); } /* - * We need this forward declaration as post_select() needs + * We need this forward declaration as gc_post_monitor() needs * activate_grab_client and vice versa. */ -static int gc_post_select(struct sched *s, void *context); +static int gc_post_monitor(struct sched *s, void *context); /** * Move a grab client to the active list and start it. @@ -129,8 +129,8 @@ static void gc_activate(struct grab_client *gc, struct sched *s) gc->task = task_register(&(struct task_info) { .name = name, - .pre_select = gc_pre_select, - .post_select = gc_post_select, + .pre_monitor = gc_pre_monitor, + .post_monitor = gc_post_monitor, .context = gc, }, s); } @@ -171,7 +171,7 @@ static int gc_close(struct grab_client *gc, int err) /* * We must not free the gc structure here as it contains ->task * which is still used because this function is called from - * post_select(). + * post_monitor(). */ close(gc->fd); gc->fd = -1; @@ -182,7 +182,7 @@ static int gc_close(struct grab_client *gc, int err) return 0; } -static int gc_post_select(__a_unused struct sched *s, void *context) +static int gc_post_monitor(__a_unused struct sched *s, void *context) { struct grab_client *gc = context; struct btr_node *btrn = gc->btrn; diff --git a/gui.c b/gui.c index d779ff86..72908f23 100644 --- a/gui.c +++ b/gui.c @@ -609,19 +609,19 @@ static void clear_all_items(void) } } -static void status_pre_select(struct sched *s, void *context) +static void status_pre_monitor(struct sched *s, void *context) { struct status_task *st = context; if (st->fd >= 0) - para_fd_set(st->fd, &s->rfds, &s->max_fileno); + sched_monitor_readfd(st->fd, s); if (task_get_notification(st->task) < 0) return sched_min_delay(s); if (st->fd < 0) sched_request_barrier_or_min_delay(&st->next_exec, s); } -static int status_post_select(struct sched *s, void *context) +static int status_post_monitor(__a_unused struct sched *s, void *context) { struct status_task *st = context; size_t sz; @@ -667,7 +667,7 @@ static int status_post_select(struct sched *s, void *context) } assert(st->loaded < st->bufsize); ret = read_nonblock(st->fd, st->buf + st->loaded, - st->bufsize - st->loaded, &s->rfds, &sz); + st->bufsize - st->loaded, &sz); st->loaded += sz; ret2 = for_each_stat_item(st->buf, st->loaded, update_item); if (ret < 0 || ret2 < 0) { @@ -892,9 +892,9 @@ static void reread_conf(void) } /* React to various signal-related events. */ -static int signal_post_select(struct sched *s, __a_unused void *context) +static int signal_post_monitor(struct sched *s, __a_unused void *context) { - int ret = para_next_signal(&s->rfds); + int ret = para_next_signal(); if (ret <= 0) return 0; @@ -931,18 +931,18 @@ static enum exec_status exec_status(void) return EXEC_IDLE; } -static void exec_pre_select(struct sched *s, void *context) +static void exec_pre_monitor(struct sched *s, void *context) { struct exec_task *et = context; if (exec_fds[0] >= 0) - para_fd_set(exec_fds[0], &s->rfds, &s->max_fileno); + sched_monitor_readfd(exec_fds[0], s); if (exec_fds[1] >= 0) - para_fd_set(exec_fds[1], &s->rfds, &s->max_fileno); + sched_monitor_readfd(exec_fds[1], s); if (task_get_notification(et->task) < 0) sched_min_delay(s); } -static int exec_post_select(struct sched *s, void *context) +static int exec_post_monitor(__a_unused struct sched *s, void *context) { struct exec_task *ct = context; int i, ret; @@ -963,7 +963,7 @@ static int exec_post_select(struct sched *s, void *context) continue; ret = read_nonblock(exec_fds[i], ct->command_buf[i] + ct->cbo[i], - COMMAND_BUF_SIZE - 1 - ct->cbo[i], &s->rfds, &sz); + COMMAND_BUF_SIZE - 1 - ct->cbo[i], &sz); ct->cbo[i] += sz; sz = ct->cbo[i]; ct->cbo[i] = for_each_line(ct->flags[i], ct->command_buf[i], @@ -992,10 +992,10 @@ static int exec_post_select(struct sched *s, void *context) return 0; } -static void input_pre_select(struct sched *s, __a_unused void *context) +static void input_pre_monitor(struct sched *s, __a_unused void *context) { if (exec_status() != EXEC_XCMD) - para_fd_set(STDIN_FILENO, &s->rfds, &s->max_fileno); + sched_monitor_readfd(STDIN_FILENO, s); if (window_update_needed()) sched_min_delay(s); } @@ -1089,7 +1089,7 @@ static void handle_command(int c) keyname); } -static int input_post_select(__a_unused struct sched *s, +static int input_post_monitor(__a_unused struct sched *s, __a_unused void *context) { int ret; @@ -1115,7 +1115,7 @@ static int input_post_select(__a_unused struct sched *s, ret = wgetch(top.win); if (ret == ERR) return 0; - if (ret == KEY_RESIZE) /* already handled in signal_post_select() */ + if (ret == KEY_RESIZE) /* already handled in signal_post_monitor() */ return 0; if (exs == EXEC_IDLE) handle_command(ret); @@ -1391,26 +1391,26 @@ static int setup_tasks_and_schedule(void) struct status_task status_task = {.fd = -1}; struct input_task input_task = {.task = NULL}; struct signal_task *signal_task; - struct sched sched = {.default_timeout = {.tv_sec = 1}}; + struct sched sched = {.default_timeout = 1000}; exec_task.task = task_register(&(struct task_info) { .name = "exec", - .pre_select = exec_pre_select, - .post_select = exec_post_select, + .pre_monitor = exec_pre_monitor, + .post_monitor = exec_post_monitor, .context = &exec_task, }, &sched); status_task.task = task_register(&(struct task_info) { .name = "status", - .pre_select = status_pre_select, - .post_select = status_post_select, + .pre_monitor = status_pre_monitor, + .post_monitor = status_post_monitor, .context = &status_task, }, &sched); input_task.task = task_register(&(struct task_info) { .name = "input", - .pre_select = input_pre_select, - .post_select = input_post_select, + .pre_monitor = input_pre_monitor, + .post_monitor = input_post_monitor, .context = &input_task, }, &sched); @@ -1422,8 +1422,8 @@ static int setup_tasks_and_schedule(void) para_install_sighandler(SIGWINCH); signal_task->task = task_register(&(struct task_info) { .name = "signal", - .pre_select = signal_pre_select, - .post_select = signal_post_select, + .pre_monitor = signal_pre_monitor, + .post_monitor = signal_post_monitor, .context = signal_task, }, &sched); ret = schedule(&sched); diff --git a/http_recv.c b/http_recv.c index 1fb60bad..59e9696b 100644 --- a/http_recv.c +++ b/http_recv.c @@ -56,17 +56,17 @@ static char *make_request_msg(void) return ret; } -static void http_recv_pre_select(struct sched *s, void *context) +static void http_recv_pre_monitor(struct sched *s, void *context) { struct receiver_node *rn = context; struct private_http_recv_data *phd = rn->private_data; - if (generic_recv_pre_select(s, rn) <= 0) + if (generic_recv_pre_monitor(s, rn) <= 0) return; if (phd->status == HTTP_CONNECTED) - para_fd_set(rn->fd, &s->wfds, &s->max_fileno); + sched_monitor_writefd(rn->fd, s); else - para_fd_set(rn->fd, &s->rfds, &s->max_fileno); + sched_monitor_readfd(rn->fd, s); } /* @@ -74,7 +74,7 @@ static void http_recv_pre_select(struct sched *s, void *context) * area with data read from the socket. In any case, update the state of the * connection if necessary. */ -static int http_recv_post_select(struct sched *s, void *context) +static int http_recv_post_monitor(struct sched *s, void *context) { struct receiver_node *rn = context; struct private_http_recv_data *phd = rn->private_data; @@ -93,7 +93,7 @@ static int http_recv_post_select(struct sched *s, void *context) return 0; if (phd->status == HTTP_CONNECTED) { char *rq; - if (!FD_ISSET(rn->fd, &s->wfds)) + if (!sched_write_ok(rn->fd, s)) return 0; rq = make_request_msg(); PARA_INFO_LOG("sending http request\n"); @@ -105,7 +105,7 @@ static int http_recv_post_select(struct sched *s, void *context) return 0; } if (phd->status == HTTP_SENT_GET_REQUEST) { - ret = read_pattern(rn->fd, HTTP_OK_MSG, strlen(HTTP_OK_MSG), &s->rfds); + ret = read_pattern(rn->fd, HTTP_OK_MSG, strlen(HTTP_OK_MSG)); if (ret < 0) { PARA_ERROR_LOG("did not receive HTTP OK message\n"); goto out; @@ -120,7 +120,7 @@ static int http_recv_post_select(struct sched *s, void *context) iovcnt = btr_pool_get_buffers(rn->btrp, iov); if (iovcnt == 0) goto out; - ret = readv_nonblock(rn->fd, iov, iovcnt, &s->rfds, &num_bytes); + ret = readv_nonblock(rn->fd, iov, iovcnt, &num_bytes); if (num_bytes == 0) goto out; if (num_bytes <= iov[0].iov_len) /* only the first buffer was filled */ @@ -170,6 +170,6 @@ static int http_recv_open(struct receiver_node *rn) const struct receiver lsg_recv_cmd_com_http_user_data = { .open = http_recv_open, .close = http_recv_close, - .pre_select = http_recv_pre_select, - .post_select = http_recv_post_select, + .pre_monitor = http_recv_pre_monitor, + .post_monitor = http_recv_post_monitor, }; diff --git a/http_send.c b/http_send.c index c6b9decc..0a90e884 100644 --- a/http_send.c +++ b/http_send.c @@ -20,8 +20,8 @@ #include "server.h" #include "http.h" #include "list.h" -#include "send.h" #include "sched.h" +#include "send.h" #include "vss.h" #include "close_on_fork.h" #include "fd.h" @@ -158,7 +158,7 @@ static void http_send(long unsigned current_chunk, } } -static void http_post_select(fd_set *rfds, __a_unused fd_set *wfds) +static void http_post_monitor(__a_unused struct sched *s) { struct sender_client *sc, *tmp; struct private_http_sender_data *phsd; @@ -170,7 +170,7 @@ static void http_post_select(fd_set *rfds, __a_unused fd_set *wfds) case HTTP_STREAMING: /* nothing to do */ break; case HTTP_CONNECTED: /* need to recv get request */ - ret = read_pattern(sc->fd, HTTP_GET_MSG, MAXLINE, rfds); + ret = read_pattern(sc->fd, HTTP_GET_MSG, MAXLINE); if (ret < 0) phsd->status = HTTP_INVALID_GET_REQUEST; else if (ret > 0) { @@ -188,7 +188,7 @@ static void http_post_select(fd_set *rfds, __a_unused fd_set *wfds) break; } } - sc = accept_sender_client(hss, rfds); + sc = accept_sender_client(hss); if (!sc) return; phsd = para_malloc(sizeof(*phsd)); @@ -196,7 +196,7 @@ static void http_post_select(fd_set *rfds, __a_unused fd_set *wfds) phsd->status = HTTP_CONNECTED; } -static void http_pre_select(int *max_fileno, fd_set *rfds, fd_set *wfds) +static void http_pre_monitor(struct sched *s) { struct sender_client *sc, *tmp; unsigned n; @@ -204,15 +204,15 @@ static void http_pre_select(int *max_fileno, fd_set *rfds, fd_set *wfds) FOR_EACH_LISTEN_FD(n, hss) { if (hss->listen_fds[n] < 0) continue; - para_fd_set(hss->listen_fds[n], rfds, max_fileno); + sched_monitor_readfd(hss->listen_fds[n], s); } list_for_each_entry_safe(sc, tmp, &hss->client_list, node) { struct private_http_sender_data *phsd = sc->private_data; if (phsd->status == HTTP_CONNECTED) /* need to recv get request */ - para_fd_set(sc->fd, rfds, max_fileno); + sched_monitor_readfd(sc->fd, s); if (phsd->status == HTTP_GOT_GET_REQUEST || phsd->status == HTTP_INVALID_GET_REQUEST) - para_fd_set(sc->fd, wfds, max_fileno); + sched_monitor_writefd(sc->fd, s); } } @@ -274,8 +274,8 @@ const struct sender http_sender = { .name = "http", .init = http_send_init, .shutdown = http_shutdown, - .pre_select = http_pre_select, - .post_select = http_post_select, + .pre_monitor = http_pre_monitor, + .post_monitor = http_post_monitor, .send = http_send, .shutdown_clients = http_shutdown_clients, .client_cmds = { diff --git a/interactive.c b/interactive.c index 041376a1..812a7d5b 100644 --- a/interactive.c +++ b/interactive.c @@ -258,18 +258,6 @@ static void clear_bottom_line(void) rl_point = point; } -static bool input_available(void) -{ - fd_set rfds; - struct timeval tv = {0, 0}; - int ret; - - FD_ZERO(&rfds); - FD_SET(i9ep->ici->fds[0], &rfds); - ret = para_select(1, &rfds, NULL, &tv); - return ret > 0; -} - static void i9e_line_handler(char *line) { int ret; @@ -294,7 +282,7 @@ free_line: free(line); } -static int i9e_post_select(__a_unused struct sched *s, __a_unused void *context) +static int i9e_post_monitor(__a_unused struct sched *s, __a_unused void *context) { int ret; struct i9e_client_info *ici = i9ep->ici; @@ -310,7 +298,7 @@ static int i9e_post_select(__a_unused struct sched *s, __a_unused void *context) ret = 0; if (i9ep->caught_sigint) goto rm_btrn; - while (input_available()) { + while (read_ok(i9ep->ici->fds[0]) > 0) { if (i9ep->stdout_btrn) { while (i9ep->key_sequence_length < sizeof(i9ep->key_sequence) - 1) { buf = i9ep->key_sequence + i9ep->key_sequence_length; @@ -327,7 +315,7 @@ static int i9e_post_select(__a_unused struct sched *s, __a_unused void *context) i9ep->key_sequence_length++; rl_stuff_char((int)(unsigned char)*buf); rl_callback_read_char(); - if (!input_available()) + if (read_ok(i9ep->ici->fds[0]) <= 0) break; } i9ep->key_sequence_length = 0; @@ -374,7 +362,7 @@ out: return ret; } -static void i9e_pre_select(struct sched *s, __a_unused void *context) +static void i9e_pre_monitor(struct sched *s, __a_unused void *context) { int ret; @@ -389,7 +377,7 @@ static void i9e_pre_select(struct sched *s, __a_unused void *context) return; } if (ret > 0) - para_fd_set(i9ep->ici->fds[1], &s->wfds, &s->max_fileno); + sched_monitor_writefd(i9ep->ici->fds[1], s); } /* * fd[0] might have been reset to blocking mode if our job was moved to @@ -400,7 +388,7 @@ static void i9e_pre_select(struct sched *s, __a_unused void *context) if (ret < 0) PARA_WARNING_LOG("set to nonblock failed: (fd0 %d, %s)\n", i9ep->ici->fds[0], para_strerror(-ret)); - para_fd_set(i9ep->ici->fds[0], &s->rfds, &s->max_fileno); + sched_monitor_readfd(i9ep->ici->fds[0], s); } static void update_winsize(void) @@ -477,8 +465,8 @@ int i9e_open(struct i9e_client_info *ici, struct sched *s) return ret; i9ep->task = task_register(&(struct task_info) { .name = "i9e", - .pre_select = i9e_pre_select, - .post_select = i9e_post_select, + .pre_monitor = i9e_pre_monitor, + .post_monitor = i9e_post_monitor, .context = i9ep, }, s); @@ -605,23 +593,21 @@ void i9e_signal_dispatch(int sig_num) } /** - * Wrapper for select(2) which does not restart on interrupts. + * Wrapper for poll(2) which handles EINTR and returns paraslash error codes. * - * \param n \sa \ref para_select(). - * \param readfds \sa \ref para_select(). - * \param writefds \sa \ref para_select(). - * \param timeout_tv \sa \ref para_select(). + * \param fds See poll(2). + * \param nfds See poll(2). + * \param timeout See poll(2). * - * \return \sa \ref para_select(). + * \return See poll(2). * - * The only difference between this function and \ref para_select() is that - * \ref i9e_select() returns zero if the select call returned \p EINTR. + * The only difference between this function and \ref xpoll() is that \ref + * i9e_poll() returns zero if the system call was interrupted while xpoll() + * restarts the system call in this case. */ -int i9e_select(int n, fd_set *readfds, fd_set *writefds, - struct timeval *timeout_tv) +int i9e_poll(struct pollfd *fds, nfds_t nfds, int timeout) { - int ret = select(n, readfds, writefds, NULL, timeout_tv); - + int ret = poll(fds, nfds, timeout); if (ret < 0) { if (errno == EINTR) ret = 0; diff --git a/interactive.h b/interactive.h index ddf02d76..53b1ad34 100644 --- a/interactive.h +++ b/interactive.h @@ -84,8 +84,7 @@ void i9e_print_status_bar(char *buf, unsigned len); void i9e_close(void); void i9e_signal_dispatch(int sig_num); __printf_2_3 void i9e_log(int ll, const char* fmt,...); -int i9e_select(int n, fd_set *readfds, fd_set *writefds, - struct timeval *timeout_tv); +int i9e_poll(struct pollfd *fds, nfds_t nfds, int timeout); int i9e_extract_completions(const char *word, char **string_list, char ***result); char **i9e_complete_commands(const char *word, struct i9e_completer *completers); diff --git a/mp3dec_filter.c b/mp3dec_filter.c index ccb1553b..6a196f3a 100644 --- a/mp3dec_filter.c +++ b/mp3dec_filter.c @@ -73,7 +73,7 @@ static void mp3dec_close(struct filter_node *fn) #define MP3DEC_MAX_FRAME 8192 -static int mp3dec_post_select(__a_unused struct sched *s, void *context) +static int mp3dec_post_monitor(__a_unused struct sched *s, void *context) { struct filter_node *fn = context; int i, ret; @@ -93,7 +93,7 @@ next_buffer: btr_merge(btrn, fn->min_iqs); len = btr_next_buffer(btrn, &inbuffer); /* - * Decode at most 8K in one go to give the post_select() functions of + * Decode at most 8K in one go to give the post_monitor() functions of * other buffer tree nodes a chance to run. This is necessary to avoid * buffer underruns on slow machines. */ @@ -187,7 +187,7 @@ static int mp3dec_execute(struct btr_node *btrn, const char *cmd, char **result) const struct filter lsg_filter_cmd_com_mp3dec_user_data = { .open = mp3dec_open, .close = mp3dec_close, - .pre_select = generic_filter_pre_select, - .post_select = mp3dec_post_select, + .pre_monitor = generic_filter_pre_monitor, + .post_monitor = mp3dec_post_monitor, .execute = mp3dec_execute, }; diff --git a/net.c b/net.c index e1951e5e..4a6f9a63 100644 --- a/net.c +++ b/net.c @@ -801,25 +801,21 @@ int recv_buffer(int fd, char *buf, size_t size) * Wrapper around the accept system call. * * \param fd The listening socket. - * \param rfds An optional fd_set pointer. * \param addr Structure which is filled in with the address of the peer socket. * \param size Should contain the size of the structure pointed to by \a addr. * \param new_fd Result pointer. * - * Accept incoming connections on \a addr, retry if interrupted. If \a rfds is - * not \p NULL, return 0 if \a fd is not set in \a rfds without calling accept(). + * Accept incoming connections on addr, retry if interrupted. * * \return Negative on errors, zero if no connections are present to be accepted, * one otherwise. * * \sa accept(2). */ -int para_accept(int fd, fd_set *rfds, void *addr, socklen_t size, int *new_fd) +int para_accept(int fd, void *addr, socklen_t size, int *new_fd) { int ret; - if (rfds && !FD_ISSET(fd, rfds)) - return 0; do ret = accept(fd, (struct sockaddr *) addr, &size); while (ret < 0 && errno == EINTR); diff --git a/net.h b/net.h index 2256f376..fd89dc5d 100644 --- a/net.h +++ b/net.h @@ -143,7 +143,7 @@ extern int generic_max_transport_msg_size(int sockfd); int recv_bin_buffer(int fd, char *buf, size_t size); int recv_buffer(int fd, char *buf, size_t size); -int para_accept(int fd, fd_set *rfds, void *addr, socklen_t size, int *new_fd); +int para_accept(int fd, void *addr, socklen_t size, int *new_fd); int create_local_socket(const char *name); int connect_local_socket(const char *name); int recv_cred_buffer(int, char *, size_t); diff --git a/oggdec_filter.c b/oggdec_filter.c index 708a27e5..a6fa056a 100644 --- a/oggdec_filter.c +++ b/oggdec_filter.c @@ -178,13 +178,13 @@ out: /** * Allocate chunks of this size and produce at most one chunk of output per - * ->post_select() invocation. If the buffer could only be filled partially + * ->post_monitor() invocation. If the buffer could only be filled partially * due to insufficient input being available, it is shrunk to the real output * size and the resized buffer is fed into the output queue. */ #define OGGDEC_OUTPUT_CHUNK_SIZE (32 * 1024) -static void ogg_pre_select(struct sched *s, void *context) +static void ogg_pre_monitor(struct sched *s, void *context) { struct filter_node *fn = context; struct private_oggdec_data *pod = fn->private_data; @@ -201,7 +201,7 @@ static void ogg_pre_select(struct sched *s, void *context) sched_min_delay(s); } -static int ogg_post_select(__a_unused struct sched *s, void *context) +static int ogg_post_monitor(__a_unused struct sched *s, void *context) { struct filter_node *fn = context; struct private_oggdec_data *pod = fn->private_data; @@ -262,7 +262,7 @@ out: const struct filter lsg_filter_cmd_com_oggdec_user_data = { .open = ogg_open, .close = ogg_close, - .pre_select = ogg_pre_select, - .post_select = ogg_post_select, + .pre_monitor = ogg_pre_monitor, + .post_monitor = ogg_post_monitor, .execute = oggdec_execute }; diff --git a/opusdec_filter.c b/opusdec_filter.c index 10ed394d..31f9640b 100644 --- a/opusdec_filter.c +++ b/opusdec_filter.c @@ -207,7 +207,7 @@ static int decode_packet(struct opusdec_context *ctx, ogg_packet *op, #define OPUSDEC_MAX_OUTPUT_SIZE (1024 * 1024) -static int opusdec_post_select(__a_unused struct sched *s, void *context) +static int opusdec_post_monitor(__a_unused struct sched *s, void *context) { struct filter_node *fn = context; struct opusdec_context *ctx = fn->private_data; @@ -269,7 +269,7 @@ out: return ret; } -static void opusdec_pre_select(struct sched *s, void *context) +static void opusdec_pre_monitor(struct sched *s, void *context) { struct filter_node *fn = context; struct opusdec_context *ctx = fn->private_data; @@ -286,7 +286,7 @@ static void opusdec_pre_select(struct sched *s, void *context) const struct filter lsg_filter_cmd_com_opusdec_user_data = { .open = opusdec_open, .close = opusdec_close, - .pre_select = opusdec_pre_select, - .post_select = opusdec_post_select, + .pre_monitor = opusdec_pre_monitor, + .post_monitor = opusdec_post_monitor, .execute = opusdec_execute, }; diff --git a/oss_write.c b/oss_write.c index 0565167c..1a837e57 100644 --- a/oss_write.c +++ b/oss_write.c @@ -61,7 +61,7 @@ static int get_oss_format(enum sample_format sf) } } -static void oss_pre_select(struct sched *s, void *context) +static void oss_pre_monitor(struct sched *s, void *context) { struct writer_node *wn = context; struct private_oss_write_data *powd = wn->private_data; @@ -71,7 +71,7 @@ static void oss_pre_select(struct sched *s, void *context) return; if (ret < 0 || !powd) return sched_min_delay(s); - para_fd_set(powd->fd, &s->wfds, &s->max_fileno); + sched_monitor_writefd(powd->fd, s); } static void oss_close(struct writer_node *wn) @@ -178,7 +178,7 @@ err_free: return ret; } -static int oss_post_select(__a_unused struct sched *s, void *context) +static int oss_post_monitor(__a_unused struct sched *s, void *context) { struct writer_node *wn = context; struct private_oss_write_data *powd = wn->private_data; @@ -222,7 +222,7 @@ static int oss_post_select(__a_unused struct sched *s, void *context) goto out; } ret = 0; - if (!FD_ISSET(powd->fd, &s->wfds)) + if (!sched_write_ok(powd->fd, s)) goto out; /* get maximal number of bytes that can be written */ ret = ioctl(powd->fd, SNDCTL_DSP_GETOSPACE, &abi); @@ -245,7 +245,7 @@ out: } const struct writer lsg_write_cmd_com_oss_user_data = { - .pre_select = oss_pre_select, - .post_select = oss_post_select, + .pre_monitor = oss_pre_monitor, + .post_monitor = oss_post_monitor, .close = oss_close, }; diff --git a/para.h b/para.h index b406818b..2525bee6 100644 --- a/para.h +++ b/para.h @@ -20,6 +20,8 @@ #include #include #include +#include + #include "gcc-compat.h" /** used in various contexts */ diff --git a/play.c b/play.c index ba9fff70..66383ebe 100644 --- a/play.c +++ b/play.c @@ -50,7 +50,7 @@ static struct lls_parse_result *play_lpr; * Describes a request to change the state of para_play. * * There is only one variable of this type: \a rq of the global play task - * structure. Command handlers only set this variable and the post_select() + * structure. Command handlers only set this variable and the post_monitor() * function of the play task investigates its value during each iteration of * the scheduler run and performs the actual work. */ @@ -117,7 +117,7 @@ INIT_STDERR_LOGGING(loglevel); char *stat_item_values[NUM_STAT_ITEMS] = {NULL}; -static struct sched sched = {.max_fileno = 0}; +static struct sched sched; static struct play_task play_task, *pt = &play_task; #define AFH_RECV_CMD (lls_cmd(LSG_RECV_CMD_CMD_AFH, recv_cmd_suite)) @@ -405,16 +405,16 @@ static int load_file(void) pt->rn.task = task_register( &(struct task_info) { .name = lls_command_name(AFH_RECV_CMD), - .pre_select = AFH_RECV->pre_select, - .post_select = AFH_RECV->post_select, + .pre_monitor = AFH_RECV->pre_monitor, + .post_monitor = AFH_RECV->post_monitor, .context = &pt->rn }, &sched); sprintf(buf, "%s decoder", af); pt->fn.task = task_register( &(struct task_info) { .name = buf, - .pre_select = decoder->pre_select, - .post_select = decoder->post_select, + .pre_monitor = decoder->pre_monitor, + .post_monitor = decoder->post_monitor, .context = &pt->fn }, &sched); register_writer_node(&pt->wn, pt->fn.btrn, &sched); @@ -1075,7 +1075,7 @@ static void session_open(void) sigemptyset(&act.sa_mask); act.sa_flags = 0; sigaction(SIGWINCH, &act, NULL); - sched.select_function = i9e_select; + sched.poll_function = i9e_poll; ici.bound_keyseqs = get_mapped_keyseqs(); pt->btrn = ici.producer = btr_new_node(&(struct btr_node_description) @@ -1109,16 +1109,16 @@ static void session_update_time_string(char *str, unsigned len) /* * If we are about to die we must call i9e_close() to reset the terminal. * However, i9e_close() must be called in *this* context, i.e. from - * play_task.post_select() rather than from i9e_post_select(), because + * play_task.post_monitor() rather than from i9e_post_monitor(), because * otherwise i9e would access freed memory upon return. So the play task must * stay alive until the i9e task terminates. * * We achieve this by sending a fake SIGTERM signal via i9e_signal_dispatch() - * and reschedule. In the next iteration, i9e->post_select returns an error and + * and reschedule. In the next iteration, i9e->post_monitor returns an error and * terminates. Subsequent calls to i9e_get_error() then return negative and we * are allowed to call i9e_close() and terminate as well. */ -static int session_post_select(__a_unused struct sched *s) +static int session_post_monitor(__a_unused struct sched *s) { int ret; @@ -1141,11 +1141,11 @@ static int session_post_select(__a_unused struct sched *s) #else /* HAVE_READLINE */ -static int session_post_select(struct sched *s) +static int session_post_monitor(struct sched *s) { char c; - if (!FD_ISSET(STDIN_FILENO, &s->rfds)) + if (!sched_read_ok(STDIN_FILENO, s)) return 0; if (read(STDIN_FILENO, &c, 1)) do_nothing; @@ -1164,11 +1164,11 @@ static void session_update_time_string(char *str, __a_unused unsigned len) } #endif /* HAVE_READLINE */ -static void play_pre_select(struct sched *s, __a_unused void *context) +static void play_pre_monitor(struct sched *s, __a_unused void *context) { char state; - para_fd_set(STDIN_FILENO, &s->rfds, &s->max_fileno); + sched_monitor_readfd(STDIN_FILENO, s); state = get_playback_state(); if (state == 'R' || state == 'F' || state == 'X') return sched_min_delay(s); @@ -1202,7 +1202,7 @@ static unsigned get_time_string(char **result) ); } -static int play_post_select(struct sched *s, __a_unused void *context) +static int play_post_monitor(struct sched *s, __a_unused void *context) { int ret; @@ -1211,7 +1211,7 @@ static int play_post_select(struct sched *s, __a_unused void *context) pt->rq = CRT_TERM_RQ; return 0; } - ret = session_post_select(s); + ret = session_post_monitor(s); if (ret < 0) goto out; if (!pt->wn.btrn && !pt->fn.btrn) { @@ -1255,7 +1255,7 @@ int main(int argc, char *argv[]) int ret; unsigned num_inputs; - sched.default_timeout.tv_sec = 5; + sched.default_timeout = 5000; parse_config_or_die(argc, argv); session_open(); num_inputs = lls_num_inputs(play_lpr); @@ -1265,8 +1265,8 @@ int main(int argc, char *argv[]) pt->playing = true; pt->task = task_register(&(struct task_info){ .name = "play", - .pre_select = play_pre_select, - .post_select = play_post_select, + .pre_monitor = play_pre_monitor, + .post_monitor = play_post_monitor, .context = pt, }, &sched); ret = schedule(&sched); diff --git a/prebuffer_filter.c b/prebuffer_filter.c index 9a801900..031aa47e 100644 --- a/prebuffer_filter.c +++ b/prebuffer_filter.c @@ -22,7 +22,7 @@ struct private_prebuffer_data { struct timeval barrier; }; -static void prebuffer_pre_select(struct sched *s, void *context) +static void prebuffer_pre_monitor(struct sched *s, void *context) { struct filter_node *fn = context; struct btr_node *btrn = fn->btrn; @@ -50,7 +50,7 @@ static void prebuffer_close(struct filter_node *fn) free(fn->private_data); } -static int prebuffer_post_select(__a_unused struct sched *s, void *context) +static int prebuffer_post_monitor(__a_unused struct sched *s, void *context) { struct filter_node *fn = context; struct btr_node *btrn = fn->btrn; @@ -87,6 +87,6 @@ static void prebuffer_open(struct filter_node *fn) const struct filter lsg_filter_cmd_com_prebuffer_user_data = { .open = prebuffer_open, .close = prebuffer_close, - .pre_select = prebuffer_pre_select, - .post_select = prebuffer_post_select, + .pre_monitor = prebuffer_pre_monitor, + .post_monitor = prebuffer_post_monitor, }; diff --git a/recv.c b/recv.c index 10d55d21..68417187 100644 --- a/recv.c +++ b/recv.c @@ -97,13 +97,12 @@ int main(int argc, char *argv[]) stdout_task_register(&sot, &s); ti.name = lls_command_name(cmd); - ti.pre_select = r->pre_select; - ti.post_select = r->post_select; + ti.pre_monitor = r->pre_monitor; + ti.post_monitor = r->post_monitor; ti.context = &rn; rn.task = task_register(&ti, &s); - s.default_timeout.tv_sec = 1; - s.default_timeout.tv_usec = 0; + s.default_timeout = 1000; ret = schedule(&s); sched_shutdown(&s); r->close(&rn); diff --git a/recv.h b/recv.h index 36b0f1db..391395b2 100644 --- a/recv.h +++ b/recv.h @@ -21,11 +21,11 @@ struct receiver_node { /** * The file descriptor to receive the stream. * - * The pre_select function of the receiver adds this file descriptor to + * The pre_monitor function of the receiver adds this file descriptor to * the set of file descriptors which are watched for readability or * writability, depending on the state of the connection (if any). * - * If \a fd is readable, the post_select function of the receiver reads + * If \a fd is readable, the post_monitor function of the receiver reads * data from this fd into the buffer pool area of \a btrp. * * \sa \ref receiver. @@ -34,9 +34,18 @@ struct receiver_node { }; /** - * Describes one supported paraslash receiver. + * Describes a possible data source for audio streams. * - * \sa \ref http_recv.c, \ref udp_recv.c. + * A paraslash receiver is a modular piece of software which is capable of + * receiving an audio data stream from a data source. Received audio data is + * fed to consumers through the buffer tree mechanism. + * + * This structure contains the methods which have to be implemented by each + * receiver. + * + * \sa \ref http_recv.c, \ref udp_recv.c, \ref dccp_recv.c, \ref afh_recv.c, + * struct \ref receiver_node, struct \ref filter, struct \ref writer, struct + * \ref sched. */ struct receiver { /** @@ -45,8 +54,6 @@ struct receiver { * This should allocate the output buffer of the given receiver node * and prepare it for retrieving the audio stream according to the * configuration stored in rn->lpr. - * - * \sa struct \ref receiver_node. */ int (*open)(struct receiver_node *rn); /** @@ -58,31 +65,10 @@ struct receiver { * \sa \ref receiver_node. */ void (*close)(struct receiver_node *rn); - /** - * Add file descriptors to fd_sets and compute timeout for select(2). - * - * If this is not NULL, the function is called in each iteration of the - * scheduler's select loop. The receiver may define it to add file - * descriptors to the file descriptor sets given by s. Those will be - * monitored in the subsequent call to select(2). The function may also - * lower the timeout value of s to make select(2) return earlier if no - * file descriptors are ready for I/O. - * - * \sa select(2), \ref time.c, struct \ref sched. - */ - void (*pre_select)(struct sched *s, void *context); - /** - * Evaluate the result from select(2). - * - * This is called after the call to select(2). It should check all file - * descriptors which were added to any of the fd sets in the previous - * call to ->pre_select() and perform (non-blocking) I/O operations on - * those fds which are ready for I/O, for example in order to establish - * a connection or to receive a part of the audio stream. - * - * \sa select(2), struct \ref receiver. - */ - int (*post_select)(struct sched *s, void *context); + /** Ask the scheduler to monitor receive fds. */ + void (*pre_monitor)(struct sched *s, void *context); + /** Receive data and make it available to consumers. */ + int (*post_monitor)(struct sched *s, void *context); /** * Answer a buffer tree query. * @@ -110,4 +96,4 @@ struct receiver { int check_receiver_arg(const char *ra, struct lls_parse_result **lprp); void print_receiver_helps(bool detailed); -int generic_recv_pre_select(struct sched *s, struct receiver_node *rn); +int generic_recv_pre_monitor(struct sched *s, struct receiver_node *rn); diff --git a/recv_common.c b/recv_common.c index 31fd81f1..ad34991c 100644 --- a/recv_common.c +++ b/recv_common.c @@ -98,19 +98,19 @@ void print_receiver_helps(bool detailed) } /** - * Simple pre-select hook, used by all receivers. + * Request a minimal timeout in case of buffer tree errors. * - * \param s Scheduler info. - * \param rn The receiver node. + * \param s The scheduler instance. + * \param rn The buffer tree node is derived from this. * - * This requests a minimal delay from the scheduler if the status of the buffer - * tree node indicates an error/eof condition. No file descriptors are added to - * the fd sets of \a s. + * If the buffer tree node of the given receiver node is in error or EOF state, + * a minimal I/O timeout is requested from the scheduler. Otherwise, the + * function does nothing. No file descriptors are asked to be monitored. * - * \return The status of the btr node of the receiver node, i.e. the return - * value of the underlying call to \ref btr_node_status(). + * \return The status of of the receiver node's buffer tree node. That is, the + * return value of the underlying call to \ref btr_node_status(). */ -int generic_recv_pre_select(struct sched *s, struct receiver_node *rn) +int generic_recv_pre_monitor(struct sched *s, struct receiver_node *rn) { int ret = btr_node_status(rn->btrn, 0, BTR_NT_ROOT); diff --git a/resample_filter.c b/resample_filter.c index bbdda51c..01b4ac48 100644 --- a/resample_filter.c +++ b/resample_filter.c @@ -62,7 +62,7 @@ static void resample_open(struct filter_node *fn) btr_log_tree(btr_parent(btr_parent(btrn)), LL_INFO); } -static void resample_pre_select(struct sched *s, void *context) +static void resample_pre_monitor(struct sched *s, void *context) { struct filter_node *fn = context; struct resample_context *ctx = fn->private_data; @@ -70,7 +70,7 @@ static void resample_pre_select(struct sched *s, void *context) if (ret != 0) return sched_min_delay(s); - check_wav_pre_select(s, ctx->cwc); + check_wav_pre_monitor(s, ctx->cwc); } static int get_btr_val(const char *what, struct btr_node *btrn) @@ -187,7 +187,7 @@ static int resample_frames(int16_t *in, size_t num_frames, bool have_more, return data.input_frames_used; } -static int resample_post_select(__a_unused struct sched *s, void *context) +static int resample_post_monitor(__a_unused struct sched *s, void *context) { int ret; struct filter_node *fn = context; @@ -197,7 +197,7 @@ static int resample_post_select(__a_unused struct sched *s, void *context) size_t in_bytes, num_frames; bool have_more; - ret = check_wav_post_select(ctx->cwc); + ret = check_wav_post_monitor(ctx->cwc); if (ret < 0) goto out; ret = btr_node_status(btrn, fn->min_iqs, BTR_NT_INTERNAL); @@ -236,7 +236,7 @@ out: if (ret < 0) { btr_remove_node(&fn->btrn); /* This releases the check_wav btr node */ - check_wav_post_select(ctx->cwc); + check_wav_post_monitor(ctx->cwc); } return ret; } @@ -277,8 +277,8 @@ static void resample_teardown(__a_unused const struct lls_parse_result *lpr, const struct filter lsg_filter_cmd_com_resample_user_data = { .setup = resample_setup, .open = resample_open, - .pre_select = resample_pre_select, - .post_select = resample_post_select, + .pre_monitor = resample_pre_monitor, + .post_monitor = resample_post_monitor, .close = resample_close, .teardown = resample_teardown, .execute = resample_execute diff --git a/sched.c b/sched.c index aac8efed..eb8d03c2 100644 --- a/sched.c +++ b/sched.c @@ -17,9 +17,9 @@ * The possible states of a task. * * In addition to the states listed here, a task may also enter zombie state. - * This happens when its ->post_select function returns negative, the ->status + * This happens when its ->post_monitor function returns negative, the ->status * field is then set to this return value. Such tasks are not scheduled any - * more (i.e. ->pre_select() and ->post_select() are no longer called), but + * more (i.e. ->pre_monitor() and ->post_monitor() are no longer called), but * they stay on the scheduler task list until \ref task_reap() or * \ref sched_shutdown() is called. */ @@ -46,7 +46,7 @@ struct task { static struct timeval now_struct; const struct timeval *now = &now_struct; -static void sched_preselect(struct sched *s) +static void sched_pre_monitor(struct sched *s) { struct task *t, *tmp; @@ -55,8 +55,8 @@ static void sched_preselect(struct sched *s) continue; if (t->notification != 0) sched_min_delay(s); - if (t->info.pre_select) - t->info.pre_select(s, t->info.context); + if (t->info.pre_monitor) + t->info.pre_monitor(s, t->info.context); } } @@ -72,29 +72,29 @@ static void unlink_and_free_task(struct task *t) } //#define SCHED_DEBUG 1 -static inline void call_post_select(struct sched *s, struct task *t) +static inline void call_post_monitor(struct sched *s, struct task *t) { int ret; #ifndef SCHED_DEBUG - ret = t->info.post_select(s, t->info.context); + ret = t->info.post_monitor(s, t->info.context); #else struct timeval t1, t2, diff; unsigned long pst; clock_get_realtime(&t1); - ret = t->info.post_select(s, t->info.context); + ret = t->info.post_monitor(s, t->info.context); clock_get_realtime(&t2); tv_diff(&t1, &t2, &diff); pst = tv2ms(&diff); if (pst > 50) - PARA_WARNING_LOG("%s: post_select time: %lums\n", + PARA_WARNING_LOG("%s: post_monitor time: %lums\n", t->name, pst); #endif t->status = ret < 0? ret : TS_RUNNING; } -static unsigned sched_post_select(struct sched *s) +static unsigned sched_post_monitor(struct sched *s) { struct task *t, *tmp; unsigned num_running_tasks = 0; @@ -103,7 +103,7 @@ static unsigned sched_post_select(struct sched *s) if (t->status == TS_DEAD) /* task has been reaped */ unlink_and_free_task(t); else if (t->status == TS_RUNNING) { - call_post_select(s, t); /* sets t->status */ + call_post_monitor(s, t); /* sets t->status */ t->notification = 0; if (t->status == TS_RUNNING) num_running_tasks++; @@ -117,13 +117,13 @@ static unsigned sched_post_select(struct sched *s) * * \param s Pointer to the scheduler struct. * - * This function updates the global \a now pointer, calls all registered - * pre_select hooks which may set the timeout and add any file descriptors to - * the fd sets of \a s. Next, it calls para_select() and makes the result available - * to the registered tasks by calling their post_select hook. + * This function updates the global now pointer, calls all registered + * pre_monitor hooks which may set the timeout and add any file descriptors to + * the pollfd array. Next, it calls the poll function and makes the result + * available to the registered tasks by calling their post_monitor hook. * * \return Zero if no more tasks are left in the task list, negative if the - * select function returned an error. + * poll function returned an error. * * \sa \ref now. */ @@ -132,31 +132,20 @@ int schedule(struct sched *s) int ret; unsigned num_running_tasks; - if (!s->select_function) - s->select_function = para_select; + if (!s->poll_function) + s->poll_function = xpoll; again: - FD_ZERO(&s->rfds); - FD_ZERO(&s->wfds); - s->select_timeout = s->default_timeout; - s->max_fileno = -1; + s->num_pfds = 0; + if (s->pidx) + memset(s->pidx, 0xff, s->pidx_array_len * sizeof(unsigned)); + s->timeout = s->default_timeout; clock_get_realtime(&now_struct); - sched_preselect(s); - ret = s->select_function(s->max_fileno + 1, &s->rfds, &s->wfds, - &s->select_timeout); + sched_pre_monitor(s); + ret = s->poll_function(s->pfd, s->num_pfds, s->timeout); if (ret < 0) return ret; - if (ret == 0) { - /* - * APUE: Be careful not to check the descriptor sets on return - * unless the return value is greater than zero. The return - * state of the descriptor sets is implementation dependent if - * either a signal is caught or the timer expires. - */ - FD_ZERO(&s->rfds); - FD_ZERO(&s->wfds); - } clock_get_realtime(&now_struct); - num_running_tasks = sched_post_select(s); + num_running_tasks = sched_post_monitor(s); if (num_running_tasks == 0) return 0; goto again; @@ -197,7 +186,7 @@ int task_reap(struct task **tptr) /* * With list_for_each_entry_safe() it is only safe to remove the * _current_ list item. Since we are being called from the loop in - * schedule() via some task's ->post_select() function, freeing the + * schedule() via some task's ->post_monitor() function, freeing the * given task here would result in use-after-free bugs in schedule(). * So we only set the task status to TS_DEAD which tells schedule() to * free the task in the next iteration of its loop. @@ -226,6 +215,8 @@ void sched_shutdown(struct sched *s) t->name); unlink_and_free_task(t); } + free(s->pfd); + free(s->pidx); } /** @@ -241,7 +232,7 @@ struct task *task_register(struct task_info *info, struct sched *s) { struct task *t = para_malloc(sizeof(*t)); - assert(info->post_select); + assert(info->post_monitor); if (!s->task_list.next) init_list_head(&s->task_list); @@ -288,14 +279,14 @@ char *get_task_list(struct sched *s) * \param err A positive error code. * * Tasks which honor notifications are supposed to call \ref - * task_get_notification() in their post_select function and act on the + * task_get_notification() in their post_monitor function and act on the * returned notification value. * - * If the scheduler detects during its pre_select loop that at least one task - * has been notified, the loop terminates, and the post_select methods of all + * If the scheduler detects during its pre_monitor loop that at least one task + * has been notified, the loop terminates, and the post_monitor methods of all * taks are immediately called again. * - * The notification for a task is reset after the call to its post_select + * The notification for a task is reset after the call to its post_monitor * method. * * \sa \ref task_get_notification(). @@ -316,7 +307,7 @@ void task_notify(struct task *t, int err) * * \return The notification value. If this is negative, the task has been * notified by another task. Tasks are supposed to check for notifications by - * calling this function from their post_select method. + * calling this function from their post_monitor method. * * \sa \ref task_notify(). */ @@ -362,43 +353,43 @@ void task_notify_all(struct sched *s, int err) } /** - * Set the select timeout to the minimal possible value. + * Set the I/O timeout to the minimal possible value. * * \param s Pointer to the scheduler struct. * - * This causes the next select() call to return immediately. + * This causes the next poll() call to return immediately. */ void sched_min_delay(struct sched *s) { - s->select_timeout.tv_sec = s->select_timeout.tv_usec = 0; + s->timeout = 0; } /** - * Impose an upper bound for the timeout of the next select() call. + * Impose an upper bound for the I/O timeout. * * \param to Maximal allowed timeout. * \param s Pointer to the scheduler struct. * - * If the current scheduler timeout is already smaller than \a to, this - * function does nothing. Otherwise the timeout for the next select() call is - * set to the given value. + * If the current I/O timeout is already smaller than to, this function does + * nothing. Otherwise the timeout is set to the given value. * * \sa \ref sched_request_timeout_ms(). */ void sched_request_timeout(struct timeval *to, struct sched *s) { - if (tv_diff(&s->select_timeout, to, NULL) > 0) - s->select_timeout = *to; + long unsigned ms = tv2ms(to); + if (s->timeout > ms) + s->timeout = ms; } /** - * Force the next select() call to return before the given amount of milliseconds. + * Bound the I/O timeout to at most the given amount of milliseconds. * * \param ms The maximal allowed timeout in milliseconds. * \param s Pointer to the scheduler struct. * - * Like sched_request_timeout() this imposes an upper bound on the timeout - * value for the next select() call. + * Like \ref sched_request_timeout() this imposes an upper bound on the I/O + * timeout. */ void sched_request_timeout_ms(long unsigned ms, struct sched *s) { @@ -408,13 +399,13 @@ void sched_request_timeout_ms(long unsigned ms, struct sched *s) } /** - * Force the next select() call to return before the given future time. + * Bound the I/O timeout by an absolute time in the future. * - * \param barrier Absolute time before select() should return. + * \param barrier Defines the upper bound for the timeout. * \param s Pointer to the scheduler struct. * - * \return If \a barrier is in the past, this function does nothing and returns - * zero. Otherwise it returns one. + * \return If the barrier is in the past, this function does nothing and + * returns zero. Otherwise it returns one. * * \sa \ref sched_request_barrier_or_min_delay(). */ @@ -429,12 +420,12 @@ int sched_request_barrier(struct timeval *barrier, struct sched *s) } /** - * Force the next select() call to return before the given time. + * Bound the I/O timeout or request a minimal delay. * - * \param barrier Absolute time before select() should return. + * \param barrier Absolute time as in \ref sched_request_barrier(). * \param s Pointer to the scheduler struct. * - * \return If \a barrier is in the past, this function requests a minimal + * \return If the barrier is in the past, this function requests a minimal * timeout and returns zero. Otherwise it returns one. * * \sa \ref sched_min_delay(), \ref sched_request_barrier(). @@ -450,3 +441,126 @@ int sched_request_barrier_or_min_delay(struct timeval *barrier, struct sched *s) sched_request_timeout(&diff, s); return 1; } + +static void add_pollfd(int fd, struct sched *s, short events) +{ + assert(fd >= 0); +#if 0 + { + int flags = fcntl(fd, F_GETFL); + if (!(flags & O_NONBLOCK)) { + PARA_EMERG_LOG("fd %d is a blocking file descriptor\n", fd); + exit(EXIT_FAILURE); + } + } +#endif + if (s->pidx_array_len > fd) { /* is fd already registered? */ + if (s->pidx[fd] < s->pfd_array_len) { /* yes, it is */ + assert(s->pfd[s->pidx[fd]].fd == fd); + s->pfd[s->pidx[fd]].events |= events; + return; + } + } else { /* need to extend the index array */ + unsigned old_len = s->pidx_array_len; + while (s->pidx_array_len <= fd) + s->pidx_array_len = s->pidx_array_len * 2 + 1; + PARA_INFO_LOG("pidx array len: %u\n", s->pidx_array_len); + s->pidx = para_realloc(s->pidx, + s->pidx_array_len * sizeof(unsigned)); + memset(s->pidx + old_len, 0xff, + (s->pidx_array_len - old_len) * sizeof(unsigned)); + } + /* + * The given fd is not part of the pfd array yet. Initialize pidx[fd] + * to point at the next unused slot of this array and initialize the + * slot. + */ + s->pidx[fd] = s->num_pfds; + if (s->pfd_array_len <= s->num_pfds) { + unsigned old_len = s->pfd_array_len; + s->pfd_array_len = old_len * 2 + 1; + PARA_INFO_LOG("pfd array len: %u\n", s->pfd_array_len); + s->pfd = para_realloc(s->pfd, + s->pfd_array_len * sizeof(struct pollfd)); + memset(s->pfd + old_len, 0, + (s->pfd_array_len - old_len) * sizeof(struct pollfd)); + } + s->pfd[s->num_pfds].fd = fd; + s->pfd[s->num_pfds].events = events; + s->pfd[s->num_pfds].revents = 0; + s->num_pfds++; +} + +/** + * Instruct the scheduler to monitor an fd for readiness for reading. + * + * \param fd The file descriptor. + * \param s The scheduler. + * + * \sa \ref sched_monitor_writefd(). + */ +void sched_monitor_readfd(int fd, struct sched *s) +{ + add_pollfd(fd, s, POLLIN); +} + +/** + * Instruct the scheduler to monitor an fd for readiness for writing. + * + * \param fd The file descriptor. + * \param s The scheduler. + * + * \sa \ref sched_monitor_readfd(). + */ +void sched_monitor_writefd(int fd, struct sched *s) +{ + add_pollfd(fd, s, POLLOUT); +} + +static int get_revents(int fd, const struct sched *s) +{ + if (fd < 0) + return 0; + if (fd >= s->pidx_array_len) + return 0; + if (s->pidx[fd] >= s->num_pfds) + return 0; + if (s->pfd[s->pidx[fd]].fd != fd) + return 0; + assert((s->pfd[s->pidx[fd]].revents & POLLNVAL) == 0); + return s->pfd[s->pidx[fd]].revents; +} + +/** + * Check whether there is data to read on the given fd. + * + * To be called from the ->post_monitor() method of a task. + * + * \param fd Should have been monitored with \ref sched_monitor_readfd(). + * \param s The scheduler instance. + * + * \return True if the file descriptor is ready for reading, false otherwise. + * If fd is negative, or has not been monitored in the current iteration of the + * scheduler's main loop, the function also returns false. + * + * \sa \ref sched_write_ok(). + */ +bool sched_read_ok(int fd, const struct sched *s) +{ + return get_revents(fd, s) & (POLLIN | POLLERR | POLLHUP); +} + +/** + * Check whether writing is possible (i.e., does not block). + * + * \param fd Should have been monitored with \ref sched_monitor_writefd(). + * \param s The scheduler instance. + * + * \return True if the file descriptor is ready for writing, false otherwise. + * The comment in \ref sched_read_ok() about invalid file descriptors applies + * to this function as well. + */ +bool sched_write_ok(int fd, const struct sched *s) +{ + return get_revents(fd, s) & (POLLOUT | POLLERR | POLLHUP); +} diff --git a/sched.h b/sched.h index 35e2503e..ede5e67e 100644 --- a/sched.h +++ b/sched.h @@ -7,24 +7,30 @@ * Paraslash's scheduler. * * Designed with KISS in mind. It maintains a list of task structures which is - * extended when a new task is registered. Each task may define a pre_select + * extended when a new task is registered. Each task may define a pre_monitor * function which is called from the scheduler main loop before it calls - * select(). Similarly, each task must define a post_select function which is - * called after the select call. + * poll(2). Similarly, each task must define a post_monitor function which is + * called after poll(2) returns. + * + * \sa select(2), poll(2). */ struct sched { - /** Initial value before any pre_select call. */ - struct timeval default_timeout; - /** The current timeout for the upcoming select call. */ - struct timeval select_timeout; - /** fds that should be watched for readability. */ - fd_set rfds; - /** fds that should be watched for writability. */ - fd_set wfds; - /** Highest numbered file descriptor in any of the above fd sets. */ - int max_fileno; - /** If non-NULL, use this function instead of para_select. */ - int (*select_function)(int, fd_set *, fd_set *, struct timeval *); + /** Initial value (in milliseconds) before any pre_monitor call. */ + int default_timeout; + /** The timeout (also in milliseconds) for the next iteration. */ + int timeout; + /** Passed to poll(2). */ + struct pollfd *pfd; + /** Number of elements in the above array, passed to poll(2). */ + unsigned pfd_array_len; + /** Number of fds registered for montitoring so far. */ + unsigned num_pfds; + /** Maps fds to indices of the pfd array. */ + unsigned *pidx; + /** Mumber of elements in the above pidx array. */ + unsigned pidx_array_len; + /** If non-NULL, use this function instead of \ref xpoll(). */ + int (*poll_function)(struct pollfd *fds, nfds_t nfds, int timeout); /** Tasks which have been registered to the scheduler. */ struct list_head task_list; }; @@ -36,23 +42,32 @@ struct task_info { /** Used for log messages and by \ref get_task_list(). */ const char *name; /** - * The optional pre select method. + * Configure watch fds and impose an upper bound on the I/O timeout. + * + * If this is not NULL, the function is called at each iteration of the + * scheduler's main loop. Its purpose is to tell the scheduler that + * certain file descriptors should be monitored for readiness for I/O. + * The function may also lower the scheduler's timeout value (but shall + * never increase it) to impose an upper bound on the waiting time in + * case no file descriptors happen to be ready. * - * Its purpose is to add file descriptors to the fd sets of the - * scheduler and to decrease the select timeout if necessary. + * \sa \ref time.c. */ - void (*pre_select)(struct sched *s, void *context); + void (*pre_monitor)(struct sched *s, void *context); /** - * The mandatory post select method. + * Perform I/O on file descriptors which are ready for I/O. + * + * This mandatory hook is called after the system call which monitors + * file descriptors returns. The function should perform non-blocking + * I/O on those file descriptors which are reported as being ready. * - * Its purpose is to evaluate and act upon the results of the previous - * select call. If this function returns a negative value, the - * scheduler unregisters the task. + * If this function returns a negative value, the scheduler unregisters + * the task. */ - int (*post_select)(struct sched *s, void *context); + int (*post_monitor)(struct sched *s, void *context); /** * This pointer is saved when the task is registered. It is passed to - * ->pre_select() and ->post_select(). Usually this is a pointer to the + * ->pre_monitor() and ->post_monitor(). Usually this is a pointer to the * struct owned by the caller which contains the task pointer. */ void *context; @@ -80,3 +95,7 @@ void sched_request_timeout(struct timeval *to, struct sched *s); void sched_request_timeout_ms(long unsigned ms, struct sched *s); int sched_request_barrier(struct timeval *barrier, struct sched *s); int sched_request_barrier_or_min_delay(struct timeval *barrier, struct sched *s); +void sched_monitor_readfd(int fd, struct sched *s); +void sched_monitor_writefd(int fd, struct sched *s); +bool sched_read_ok(int fd, const struct sched *s); +bool sched_write_ok(int fd, const struct sched *s); diff --git a/send.h b/send.h index f6aafbb4..dec5b0db 100644 --- a/send.h +++ b/send.h @@ -76,27 +76,10 @@ struct sender { void (*send)(long unsigned current_chunk, long unsigned chunks_sent, const char *buf, size_t len, const char *header_buf, size_t header_len); - /** - * Add file descriptors to fd_sets. - * - * The pre_select function of each supported sender is called just before - * para_server enters its main select loop. Each sender may add its own - * file descriptors to the \a rfds or the \a wfds set. - * - * If a file descriptor was added, \a max_fileno must be increased by - * this function, if necessary. - * - * \sa select(2). - */ - void (*pre_select)(int *max_fileno, fd_set *rfds, fd_set *wfds); - /** - * Handle the file descriptors which are ready for I/O. - * - * If the pre_select hook added one ore more file descriptors to the - * read or write set, this is the hook to check the result and do any - * I/O on those descriptors which are ready for reading/writing. - */ - void (*post_select)(fd_set *rfds, fd_set *wfds); + /** Ask the scheduler to monitor file descriptors. */ + void (*pre_monitor)(struct sched *s); + /** Perform I/O on the file descriptors which are ready. */ + void (*post_monitor)(struct sched *s); /** * Terminate all connected clients. * @@ -222,6 +205,6 @@ void generic_com_on(struct sender_status *ss, unsigned protocol); void generic_acl_deplete(struct list_head *acl); void generic_com_off(struct sender_status *ss); char *generic_sender_help(void); -struct sender_client *accept_sender_client(struct sender_status *ss, fd_set *rfds); +struct sender_client *accept_sender_client(struct sender_status *ss); int send_queued_chunks(int fd, struct chunk_queue *cq); int parse_fec_url(const char *arg, struct sender_command_data *scd); diff --git a/send_common.c b/send_common.c index 90242d5c..ce167542 100644 --- a/send_common.c +++ b/send_common.c @@ -21,10 +21,10 @@ #include "afs.h" #include "server.h" #include "acl.h" +#include "sched.h" #include "send.h" #include "close_on_fork.h" #include "chunk_queue.h" -#include "sched.h" #include "vss.h" /** Clients will be kicked if there are more than that many bytes pending. */ @@ -343,7 +343,6 @@ void generic_com_off(struct sender_status *ss) * Accept a connection on the socket(s) this server is listening on. * * \param ss The sender whose listening fd is ready for reading. - * \param rfds Passed to para_accept(), * * This accepts incoming connections on any of the listening sockets of the * server. If there is a connection pending, the function @@ -367,7 +366,7 @@ void generic_com_off(struct sender_status *ss) * \sa \ref para_accept(), \ref mark_fd_nonblocking(), \ref acl_check_access(), * \ref cq_new(), \ref add_close_on_fork_list(). */ -struct sender_client *accept_sender_client(struct sender_status *ss, fd_set *rfds) +struct sender_client *accept_sender_client(struct sender_status *ss) { struct sender_client *sc; int fd, ret; @@ -376,7 +375,7 @@ struct sender_client *accept_sender_client(struct sender_status *ss, fd_set *rfd FOR_EACH_LISTEN_FD(n, ss) { if (ss->listen_fds[n] < 0) continue; - ret = para_accept(ss->listen_fds[n], rfds, NULL, 0, &fd); + ret = para_accept(ss->listen_fds[n], NULL, 0, &fd); if (ret < 0) goto warn; if (ret == 0) diff --git a/server.c b/server.c index e0df714b..2c66cc27 100644 --- a/server.c +++ b/server.c @@ -24,8 +24,8 @@ #include "net.h" #include "server.h" #include "list.h" -#include "send.h" #include "sched.h" +#include "send.h" #include "vss.h" #include "config.h" #include "close_on_fork.h" @@ -250,14 +250,14 @@ static void handle_sighup(void) kill(afs_pid, SIGHUP); } -static int signal_post_select(struct sched *s, __a_unused void *context) +static int signal_post_monitor(struct sched *s, __a_unused void *context) { int ret, signum; ret = task_get_notification(signal_task->task); if (ret < 0) return ret; - signum = para_next_signal(&s->rfds); + signum = para_next_signal(); switch (signum) { case 0: return 0; @@ -313,20 +313,20 @@ static void init_signal_task(void) add_close_on_fork_list(signal_task->fd); signal_task->task = task_register(&(struct task_info) { .name = "signal", - .pre_select = signal_pre_select, - .post_select = signal_post_select, + .pre_monitor = signal_pre_monitor, + .post_monitor = signal_post_monitor, .context = signal_task, }, &sched); } -static void command_pre_select(struct sched *s, void *context) +static void command_pre_monitor(struct sched *s, void *context) { unsigned n; struct server_command_task *sct = context; for (n = 0; n < sct->num_listen_fds; n++) - para_fd_set(sct->listen_fds[n], &s->rfds, &s->max_fileno); + sched_monitor_readfd(sct->listen_fds[n], s); } static int command_task_accept(unsigned listen_idx, struct sched *s, @@ -337,7 +337,7 @@ static int command_task_accept(unsigned listen_idx, struct sched *s, pid_t child_pid; uint32_t *chunk_table; - ret = para_accept(sct->listen_fds[listen_idx], &s->rfds, NULL, 0, &new_fd); + ret = para_accept(sct->listen_fds[listen_idx], NULL, 0, &new_fd); if (ret <= 0) goto out; mmd->num_connects++; @@ -383,13 +383,13 @@ static int command_task_accept(unsigned listen_idx, struct sched *s, /* * After we return, the scheduler calls server_select() with a minimal * timeout value, because the remaining tasks have a notification - * pending. Next it calls the ->post_select method of these tasks, + * pending. Next it calls the ->post_monitor method of these tasks, * which will return negative in view of the notification. This causes * schedule() to return as there are no more runnable tasks. * * Note that semaphores are not inherited across a fork(), so we don't - * hold the lock at this point. Since server_select() drops the lock - * prior to calling para_select(), we need to acquire it here. + * hold the lock at this point. Since server_poll() drops the lock + * prior to calling poll(), we need to acquire it here. */ mutex_lock(mmd_mutex); return -E_CHILD_CONTEXT; @@ -399,7 +399,7 @@ out: return 0; } -static int command_post_select(struct sched *s, void *context) +static int command_post_monitor(struct sched *s, void *context) { struct server_command_task *sct = context; unsigned n; @@ -459,8 +459,8 @@ static void init_server_command_task(struct server_command_task *sct, sct->task = task_register(&(struct task_info) { .name = "server command", - .pre_select = command_pre_select, - .post_select = command_post_select, + .pre_monitor = command_pre_monitor, + .post_monitor = command_post_monitor, .context = sct, }, &sched); /* @@ -617,14 +617,13 @@ out: killpg(0, SIGUSR1); } -static int server_select(int max_fileno, fd_set *readfds, fd_set *writefds, - struct timeval *timeout_tv) +static int server_poll(struct pollfd *fds, nfds_t nfds, int timeout) { int ret; status_refresh(); mutex_unlock(mmd_mutex); - ret = para_select(max_fileno + 1, readfds, writefds, timeout_tv); + ret = xpoll(fds, nfds, timeout); mutex_lock(mmd_mutex); return ret; } @@ -658,15 +657,15 @@ int main(int argc, char *argv[]) struct server_command_task server_command_task_struct, *sct = &server_command_task_struct; - sched.default_timeout.tv_sec = 1; - sched.select_function = server_select; + sched.default_timeout = 1000; + sched.poll_function = server_poll; server_init(argc, argv, sct); mutex_lock(mmd_mutex); ret = schedule(&sched); /* - * We hold the mmd lock: it was re-acquired in server_select() - * after the select call. + * We hold the mmd lock: it was re-acquired in server_poll() + * after the poll(2) call. */ mutex_unlock(mmd_mutex); sched_shutdown(&sched); diff --git a/signal.c b/signal.c index 32d6ab66..e5ef7a41 100644 --- a/signal.c +++ b/signal.c @@ -27,7 +27,7 @@ static int signal_pipe[2]; * signal arrives, the signal handler writes the number of the signal received * to one end of the signal pipe. The application can test for pending signals * by checking if the file descriptor of the other end of the signal pipe is - * ready for reading, see select(2). + * ready for reading. * * \return This function either succeeds or calls exit(3) to terminate the * current process. On success, a signal task structure is returned. @@ -202,16 +202,14 @@ void para_unblock_signal(int sig) /** * Return the number of the next pending signal. * - * \param rfds The fd_set containing the signal pipe. - * * \return On success, the number of the received signal is returned. If there * is no signal currently pending, the function returns zero. On read errors * from the signal pipe, the process is terminated. */ -int para_next_signal(fd_set *rfds) +int para_next_signal(void) { size_t n; - int s, ret = read_nonblock(signal_pipe[0], &s, sizeof(s), rfds, &n); + int s, ret = read_nonblock(signal_pipe[0], &s, sizeof(s), &n); if (ret < 0) { PARA_EMERG_LOG("%s\n", para_strerror(-ret)); diff --git a/signal.h b/signal.h index e5532ded..d9e98e78 100644 --- a/signal.h +++ b/signal.h @@ -13,32 +13,31 @@ struct signal_task { }; /** - * A generic pre-select method for signal tasks. + * Monitor the signal fd for reading. * - * \param s Passed to para_fd_set(). + * \param s The scheduler instance. * \param context Signal task pointer. * * This convenience helper is called from several programs which need to handle - * signals, including para_server and para_audiod. These programs define a - * signal task structure and set its ->pre_select method to this function which - * adds the file descriptor of the signal task to the set of descriptors to be - * watched in the next select() call. + * signals, including para_server and para_audiod. These programs set up a + * signal pipe and a signal task structure, and use this function to tell the + * scheduler to monitor the read end of the pipe. * * Although the second parameter must be in fact a pointer to a signal_task - * structure, the parameter is specified as void * here to match the - * ->pre_select method of struct task. + * structure, the parameter is specified as void * here to match the signature + * declared in struct \ref task_info. */ -_static_inline_ void signal_pre_select(struct sched *s, void *context) +_static_inline_ void signal_pre_monitor(struct sched *s, void *context) { struct signal_task *st = context; - para_fd_set(st->fd, &s->rfds, &s->max_fileno); + sched_monitor_readfd(st->fd, s); } struct signal_task *signal_init_or_die(void); void para_sigaction(int sig, void (*handler)(int)); void para_install_sighandler(int); int para_reap_child(pid_t *pid); -int para_next_signal(fd_set *rfds); +int para_next_signal(void); void signal_shutdown(struct signal_task *st); void para_block_signal(int sig); void para_unblock_signal(int sig); diff --git a/spxdec_filter.c b/spxdec_filter.c index 7be817dd..94a9c788 100644 --- a/spxdec_filter.c +++ b/spxdec_filter.c @@ -246,7 +246,7 @@ static int compute_skip_samples(ogg_page *og, struct private_spxdec_data *psd) return ret; } -static int speexdec_post_select(__a_unused struct sched *s, void *context) +static int speexdec_post_monitor(__a_unused struct sched *s, void *context) { struct filter_node *fn = context; struct private_spxdec_data *psd = fn->private_data; @@ -305,7 +305,7 @@ fail: const struct filter lsg_filter_cmd_com_spxdec_user_data = { .open = spxdec_open, .close = speexdec_close, - .pre_select = generic_filter_pre_select, - .post_select = speexdec_post_select, + .pre_monitor = generic_filter_pre_monitor, + .post_monitor = speexdec_post_monitor, .execute = speexdec_execute, }; diff --git a/stdin.c b/stdin.c index 9408235a..d025b949 100644 --- a/stdin.c +++ b/stdin.c @@ -14,10 +14,10 @@ #include "string.h" /* - * If there is space left in the buffer of the stdin task add STDIN_FILENO to - * the read fd set of s. + * If there is space left in the buffer of the stdin task, ask the scheduler to + * monitor STDIN_FILENO. */ -static void stdin_pre_select(struct sched *s, void *context) +static void stdin_pre_monitor(struct sched *s, void *context) { struct stdin_task *sit = context; int ret; @@ -28,16 +28,15 @@ static void stdin_pre_select(struct sched *s, void *context) if (ret <= 0) return; if (btr_pool_unused(sit->btrp) > 0) - return para_fd_set(STDIN_FILENO, &s->rfds, &s->max_fileno); + return sched_monitor_readfd(STDIN_FILENO, s); sched_request_timeout_ms(100, s); } /* - * This function checks if STDIN_FILENO was included by in the read fd set of s - * during the previous pre_select call. If so, and if STDIN_FILENO is readable, - * data is read from stdin and fed into the buffer tree. + * Feed data from stdin into the buffer tree if STDIN_FILENO is ready for + * reading. */ -static int stdin_post_select(struct sched *s, void *context) +static int stdin_post_monitor(__a_unused struct sched *s, void *context) { struct stdin_task *sit = context; ssize_t ret; @@ -64,7 +63,7 @@ static int stdin_post_select(struct sched *s, void *context) * reference can not be freed, we're stuck. */ sz = PARA_MIN(sz, btr_pool_size(sit->btrp) / 2); - ret = read_nonblock(STDIN_FILENO, buf, sz, &s->rfds, &n); + ret = read_nonblock(STDIN_FILENO, buf, sz, &n); if (n > 0) btr_add_output_pool(sit->btrp, n, sit->btrn); if (ret >= 0) @@ -91,8 +90,8 @@ void stdin_task_register(struct stdin_task *sit, struct sched *s) int ret; struct task_info ti = { .name = "stdin", - .pre_select = stdin_pre_select, - .post_select = stdin_post_select, + .pre_monitor = stdin_pre_monitor, + .post_monitor = stdin_post_monitor, .context = sit, }; diff --git a/stdout.c b/stdout.c index 1f779109..ba5f1967 100644 --- a/stdout.c +++ b/stdout.c @@ -10,24 +10,24 @@ #include "stdout.h" #include "buffer_tree.h" -/* Add STDOUT_FILENO to the write fd set if there is input data available. */ -static void stdout_pre_select(struct sched *s, void *context) +/* Monitor STDOUT_FILENO if there is input data available. */ +static void stdout_pre_monitor(struct sched *s, void *context) { struct stdout_task *sot = context; int ret; ret = btr_node_status(sot->btrn, 0, BTR_NT_LEAF); if (ret > 0) - para_fd_set(STDOUT_FILENO, &s->wfds, &s->max_fileno); + sched_monitor_writefd(STDOUT_FILENO, s); else if (ret < 0) sched_min_delay(s); } /* - * This function writes input data from the buffer tree to stdout if - * STDOUT_FILENO is writable. + * If input from the buffer tree is available and STDOUT_FILENO is ready, write + * as much as possible. */ -static int stdout_post_select(struct sched *s, void *context) +static int stdout_post_monitor(struct sched *s, void *context) { struct stdout_task *sot = context; struct btr_node *btrn = sot->btrn; @@ -40,7 +40,7 @@ static int stdout_post_select(struct sched *s, void *context) goto out; if (ret == 0) return 0; - if (!FD_ISSET(STDOUT_FILENO, &s->wfds)) + if (!sched_write_ok(STDOUT_FILENO, s)) return 0; if (sot->must_set_nonblock_flag) { @@ -79,8 +79,8 @@ void stdout_task_register(struct stdout_task *sot, struct sched *s) { int ret; struct task_info ti = { - .pre_select = stdout_pre_select, - .post_select = stdout_post_select, + .pre_monitor = stdout_pre_monitor, + .post_monitor = stdout_post_monitor, .context = sot, .name = "stdout", }; diff --git a/sync_filter.c b/sync_filter.c index 8e9ff2c5..3174a4ef 100644 --- a/sync_filter.c +++ b/sync_filter.c @@ -248,7 +248,7 @@ static void sync_set_timeout(struct sync_filter_context *ctx, tv_add(now, &to, &ctx->timeout); } -static void sync_pre_select(struct sched *s, void *context) +static void sync_pre_monitor(struct sched *s, void *context) { int ret; struct filter_node *fn = context; @@ -261,7 +261,7 @@ static void sync_pre_select(struct sched *s, void *context) ret = btr_node_status(fn->btrn, 0, BTR_NT_INTERNAL); if (ret < 0) return sched_min_delay(s); - para_fd_set(ctx->listen_fd, &s->rfds, &s->max_fileno); + sched_monitor_readfd(ctx->listen_fd, s); if (ret == 0) return; if (ctx->timeout.tv_sec == 0) { /* must ping buddies */ @@ -284,7 +284,7 @@ static struct sync_buddy *sync_find_buddy(struct sockaddr *addr, return NULL; } -static int sync_post_select(__a_unused struct sched *s, void *context) +static int sync_post_monitor(__a_unused struct sched *s, void *context) { int ret; struct filter_node *fn = context; @@ -324,7 +324,7 @@ static int sync_post_select(__a_unused struct sched *s, void *context) } ctx->ping_sent = true; } - if (FD_ISSET(ctx->listen_fd, &s->rfds)) { + if (sched_read_ok(ctx->listen_fd, s)) { char c; for (;;) { struct sockaddr src_addr; @@ -377,8 +377,8 @@ out: const struct filter lsg_filter_cmd_com_sync_user_data = { .setup = sync_setup, .open = sync_open, - .pre_select = sync_pre_select, - .post_select = sync_post_select, + .pre_monitor = sync_pre_monitor, + .post_monitor = sync_post_monitor, .close = sync_close, .teardown = sync_teardown }; diff --git a/udp_recv.c b/udp_recv.c index 58d45ab4..8d1274bc 100644 --- a/udp_recv.c +++ b/udp_recv.c @@ -23,13 +23,13 @@ #include "net.h" #include "fd.h" -static void udp_recv_pre_select(struct sched *s, void *context) +static void udp_recv_pre_monitor(struct sched *s, void *context) { struct receiver_node *rn = context; - if (generic_recv_pre_select(s, rn) <= 0) + if (generic_recv_pre_monitor(s, rn) <= 0) return; - para_fd_set(rn->fd, &s->rfds, &s->max_fileno); + sched_monitor_readfd(rn->fd, s); } static int udp_check_eof(size_t sz, struct iovec iov[2]) @@ -50,7 +50,7 @@ static int udp_check_eof(size_t sz, struct iovec iov[2]) return -E_RECV_EOF; } -static int udp_recv_post_select(__a_unused struct sched *s, void *context) +static int udp_recv_post_monitor(__a_unused struct sched *s, void *context) { struct receiver_node *rn = context; struct btr_node *btrn = rn->btrn; @@ -68,7 +68,7 @@ static int udp_recv_post_select(__a_unused struct sched *s, void *context) ret = -E_UDP_OVERRUN; if (iovcnt == 0) goto out; - ret = readv_nonblock(rn->fd, iov, iovcnt, &s->rfds, &num_bytes); + ret = readv_nonblock(rn->fd, iov, iovcnt, &num_bytes); if (num_bytes == 0) goto out; readv_ret = ret; @@ -189,6 +189,6 @@ err: const struct receiver lsg_recv_cmd_com_udp_user_data = { .open = udp_recv_open, .close = udp_recv_close, - .pre_select = udp_recv_pre_select, - .post_select = udp_recv_post_select, + .pre_monitor = udp_recv_pre_monitor, + .post_monitor = udp_recv_post_monitor, }; diff --git a/udp_send.c b/udp_send.c index 68d75e3c..ac656ff2 100644 --- a/udp_send.c +++ b/udp_send.c @@ -21,8 +21,8 @@ #include "net.h" #include "server.h" #include "list.h" -#include "send.h" #include "sched.h" +#include "send.h" #include "vss.h" #include "portable_io.h" #include "fd.h" diff --git a/vss.c b/vss.c index f9bf57b5..4f270c4a 100644 --- a/vss.c +++ b/vss.c @@ -28,8 +28,8 @@ #include "net.h" #include "server.h" #include "list.h" -#include "send.h" #include "sched.h" +#include "send.h" #include "vss.h" #include "ipc.h" #include "fd.h" @@ -43,7 +43,7 @@ const struct sender * const senders[] = { enum afs_socket_status { /** Socket is inactive. */ AFS_SOCKET_READY, - /** Socket fd was included in the write fd set for select(). */ + /** Socket fd was monitored for writing. */ AFS_SOCKET_CHECK_FOR_WRITE, /** vss wrote a request to the socket and waits for reply from afs. */ AFS_SOCKET_AFD_PENDING @@ -827,7 +827,7 @@ static void vss_compute_timeout(struct sched *s, struct vss_task *vsst) if (sched_request_barrier(&vsst->data_send_barrier, s) == 1) return; /* - * Compute the select timeout as the minimal time until the next + * Compute the I/O timeout as the minimal time until the next * chunk/slice is due for any client. */ compute_chunk_time(mmd->chunks_sent, &mmd->afd.afhi.chunk_tv, @@ -892,21 +892,21 @@ static void set_mmd_offset(void) mmd->offset = tv2ms(&offset); } -static void vss_pre_select(struct sched *s, void *context) +static void vss_pre_monitor(struct sched *s, void *context) { int i; struct vss_task *vsst = context; if (need_to_request_new_audio_file(vsst)) { PARA_DEBUG_LOG("ready and playing, but no audio file\n"); - para_fd_set(vsst->afs_socket, &s->wfds, &s->max_fileno); + sched_monitor_writefd(vsst->afs_socket, s); vsst->afsss = AFS_SOCKET_CHECK_FOR_WRITE; } else - para_fd_set(vsst->afs_socket, &s->rfds, &s->max_fileno); + sched_monitor_readfd(vsst->afs_socket, s); FOR_EACH_SENDER(i) { - if (!senders[i]->pre_select) + if (!senders[i]->pre_monitor) continue; - senders[i]->pre_select(&s->max_fileno, &s->rfds, &s->wfds); + senders[i]->pre_monitor(s); } vss_compute_timeout(s, vsst); } @@ -950,13 +950,13 @@ static int recv_afs_msg(int afs_socket, int *fd, uint32_t *code, uint32_t *data) #define MAP_POPULATE 0 #endif -static void recv_afs_result(struct vss_task *vsst, fd_set *rfds) +static void recv_afs_result(struct vss_task *vsst, const struct sched *s) { int ret, passed_fd, shmid; uint32_t afs_code = 0, afs_data = 0; struct stat statbuf; - if (!FD_ISSET(vsst->afs_socket, rfds)) + if (!sched_read_ok(vsst->afs_socket, s)) return; ret = recv_afs_msg(vsst->afs_socket, &passed_fd, &afs_code, &afs_data); if (ret == -ERRNO_TO_PARA_ERROR(EAGAIN)) @@ -1016,7 +1016,7 @@ err: /** * Main sending function. * - * This function gets called from vss_post_select(). It checks whether the next + * This function gets called from vss_post_monitor(). It checks whether the next * chunk of data should be pushed out. It obtains a pointer to the data to be * sent out as well as its length from mmd->afd.afhi. This information is then * passed to each supported sender's send() function as well as to the send() @@ -1087,7 +1087,7 @@ static void vss_send(struct vss_task *vsst) mmd->current_chunk++; } -static int vss_post_select(struct sched *s, void *context) +static int vss_post_monitor(struct sched *s, void *context) { int ret, i; struct vss_task *vsst = context; @@ -1137,8 +1137,8 @@ static int vss_post_select(struct sched *s, void *context) mmd->sender_cmd_data.cmd_num = -1; } if (vsst->afsss != AFS_SOCKET_CHECK_FOR_WRITE) - recv_afs_result(vsst, &s->rfds); - else if (FD_ISSET(vsst->afs_socket, &s->wfds)) { + recv_afs_result(vsst, s); + else if (sched_write_ok(vsst->afs_socket, s)) { PARA_INFO_LOG("requesting new fd from afs\n"); ret = write_buffer(vsst->afs_socket, "new"); if (ret < 0) @@ -1147,9 +1147,9 @@ static int vss_post_select(struct sched *s, void *context) vsst->afsss = AFS_SOCKET_AFD_PENDING; } FOR_EACH_SENDER(i) { - if (!senders[i]->post_select) + if (!senders[i]->post_monitor) continue; - senders[i]->post_select(&s->rfds, &s->wfds); + senders[i]->post_monitor(s); } if ((vss_playing() && !(mmd->vss_status_flags & VSS_PLAYING)) || (vss_next() && vss_playing())) @@ -1194,8 +1194,8 @@ void vss_init(int afs_socket, struct sched *s) } vsst->task = task_register(&(struct task_info) { .name = "vss", - .pre_select = vss_pre_select, - .post_select = vss_post_select, + .pre_monitor = vss_pre_monitor, + .post_monitor = vss_post_monitor, .context = vsst, }, s); } diff --git a/wav_filter.c b/wav_filter.c index e749160d..692306b5 100644 --- a/wav_filter.c +++ b/wav_filter.c @@ -58,7 +58,7 @@ static void wav_open(struct filter_node *fn) *bof = 1; } -static void wav_pre_select(struct sched *s, void *context) +static void wav_pre_monitor(struct sched *s, void *context) { struct filter_node *fn = context; size_t iqs = btr_get_input_queue_size(fn->btrn); @@ -68,7 +68,7 @@ static void wav_pre_select(struct sched *s, void *context) sched_min_delay(s); } -static int wav_post_select(__a_unused struct sched *s, void *context) +static int wav_post_monitor(__a_unused struct sched *s, void *context) { struct filter_node *fn = context; struct btr_node *btrn = fn->btrn; @@ -118,6 +118,6 @@ err: const struct filter lsg_filter_cmd_com_wav_user_data = { .close = wav_close, .open = wav_open, - .pre_select = wav_pre_select, - .post_select = wav_post_select, + .pre_monitor = wav_pre_monitor, + .post_monitor = wav_post_monitor, }; diff --git a/wmadec_filter.c b/wmadec_filter.c index edf50cb0..8061f9ae 100644 --- a/wmadec_filter.c +++ b/wmadec_filter.c @@ -16,7 +16,6 @@ #include #include -#include #include "para.h" #include "error.h" @@ -1159,7 +1158,7 @@ static int wmadec_execute(struct btr_node *btrn, const char *cmd, char **result) #define WMA_OUTPUT_BUFFER_SIZE (128 * 1024) -static int wmadec_post_select(__a_unused struct sched *s, void *context) +static int wmadec_post_monitor(__a_unused struct sched *s, void *context) { struct filter_node *fn = context; int ret, converted, out_size; @@ -1229,6 +1228,6 @@ const struct filter lsg_filter_cmd_com_wmadec_user_data = { .open = wmadec_open, .close = wmadec_close, .execute = wmadec_execute, - .pre_select = generic_filter_pre_select, - .post_select = wmadec_post_select, + .pre_monitor = generic_filter_pre_monitor, + .post_monitor = wmadec_post_monitor, }; diff --git a/write.c b/write.c index acfb9460..9e2de3d8 100644 --- a/write.c +++ b/write.c @@ -54,16 +54,16 @@ struct write_task { struct check_wav_context *cwc; }; -static void write_pre_select(struct sched *s, void *context) +static void write_pre_monitor(struct sched *s, void *context) { struct write_task *wt = context; - check_wav_pre_select(s, wt->cwc); + check_wav_pre_monitor(s, wt->cwc); } -static int write_post_select(__a_unused struct sched *s, void *context) +static int write_post_monitor(__a_unused struct sched *s, void *context) { struct write_task *wt = context; - return check_wav_post_select(wt->cwc); + return check_wav_post_monitor(wt->cwc); } static int setup_and_schedule(struct lls_parse_result *lpr) @@ -83,8 +83,8 @@ static int setup_and_schedule(struct lls_parse_result *lpr) wt.cwc = check_wav_init(sit.btrn, NULL, &wp, &cw_btrn); wt.task = task_register(&(struct task_info) { .name = "write", - .pre_select = write_pre_select, - .post_select = write_post_select, + .pre_monitor = write_pre_monitor, + .post_monitor = write_post_monitor, .context = &wt, }, &s); @@ -96,8 +96,7 @@ static int setup_and_schedule(struct lls_parse_result *lpr) wns[i].wid = check_writer_arg_or_die(arg, &wns[i].lpr); register_writer_node(wns + i, cw_btrn, &s); } - s.default_timeout.tv_sec = 10; - s.default_timeout.tv_usec = 50000; + s.default_timeout = 10500; ret = schedule(&s); if (ret >= 0) { int j, ts; diff --git a/write.h b/write.h index 833cb69a..35a8d29f 100644 --- a/write.h +++ b/write.h @@ -20,21 +20,23 @@ struct writer_node { size_t min_iqs; }; -/** Describes one supported writer. */ +/** + * Describes a data sink for audio streams. + * + * A paraslash writer obtains data via the buffer tree mechanism from its + * parent node. It consumes data without producing output on its own. + * + * This structure contains the methods which have to be implemented by each + * writer. + * + * \sa struct \ref writer_node, struct \ref receiver, struct \ref filter, + * struct \ref sched. + */ struct writer { - /** - * Prepare the fd sets for select. - * - * This is called from scheduler. It may use the sched pointer to add - * any file descriptors or to decrease the select timeout. - */ - void (*pre_select)(struct sched *s, void *context); - /** - * Write audio data. - * - * Called from the post_select function of the writer node's task. - */ - int (*post_select)(struct sched *s, void *context); + /** Ask the scheduler to check whether data can be written. */ + void (*pre_monitor)(struct sched *s, void *context); + /** Write audio data. */ + int (*post_monitor)(struct sched *s, void *context); /** * Close one instance of the writer. * diff --git a/write_common.c b/write_common.c index 41c3eb23..806d682f 100644 --- a/write_common.c +++ b/write_common.c @@ -139,8 +139,8 @@ void register_writer_node(struct writer_node *wn, struct btr_node *parent, .handler = w->execute, .context = wn)); wn->task = task_register(&(struct task_info) { .name = writer_name(wn->wid), - .pre_select = w->pre_select, - .post_select = w->post_select, + .pre_monitor = w->pre_monitor, + .post_monitor = w->post_monitor, .context = wn, }, s); }