]> git.tuebingen.mpg.de Git - paraslash.git/commitdiff
Merge branch 'refs/heads/t/poll'
authorAndre Noll <maan@tuebingen.mpg.de>
Sun, 11 Sep 2022 14:34:12 +0000 (16:34 +0200)
committerAndre Noll <maan@tuebingen.mpg.de>
Sun, 11 Sep 2022 14:35:30 +0000 (16:35 +0200)
This series modifies all calls to select(2) to use poll(2) instead in
order to avoid the known shortcomings of the select API, in particular
its limit of at most 1024 file descriptors and the fact that fds above
1023 cannot be monitored with select(2) even if fewer than 1024 fds
are open.

The first patches of the series prepare this switch, converting the
easy cases, hiding select specific data structures such as fd sets,
and adjusting function names and documentation. The crucial commit
is the last one. See its rather verbose log message for details.

* refs/heads/t/poll:
  Switch from select(2) to poll(2).
  Rename ->{pre,post}_select methods to ->{pre,post}_monitor.
  Misc documentation cleanups related to select().
  stdin/stdout: Streamline documentation of {pre,post}_select().
  Consolidate receiver/filter/writer {pre,post}_select() docs.
  Hide implementation of para_fd_set().
  send: Avoid select-specific arguments in {pre,post}_select().
  sched: Introduce sched_{read,write}_ok().
  audiod: Rename handle_connect().
  net: Drop fd_set parameter from para_accept().
  fd: Drop fd_set parameter from read_nonblock() and friends.
  interactive: Avoid select(2) in input_available().
  fd.c: Prefer poll(2) over select(2) for write_ok().
  sched: Use integer value for select timeout.

67 files changed:
NEWS.md
aacdec_filter.c
afh_recv.c
afs.c
alsa_write.c
amp_filter.c
ao_write.c
audioc.c
audiod.c
audiod.h
audiod_command.c
buffer_tree.c
check_wav.c
check_wav.h
client.c
client_common.c
command.c
compress_filter.c
configure.ac
dccp_recv.c
dccp_send.c
fd.c
fd.h
fecdec_filter.c
file_write.c
filter.c
filter.h
filter_common.c
flacdec_filter.c
grab_client.c
gui.c
http_recv.c
http_send.c
interactive.c
interactive.h
mp3dec_filter.c
net.c
net.h
oggdec_filter.c
opusdec_filter.c
oss_write.c
para.h
play.c
prebuffer_filter.c
recv.c
recv.h
recv_common.c
resample_filter.c
sched.c
sched.h
send.h
send_common.c
server.c
signal.c
signal.h
spxdec_filter.c
stdin.c
stdout.c
sync_filter.c
udp_recv.c
udp_send.c
vss.c
wav_filter.c
wmadec_filter.c
write.c
write.h
write_common.c

diff --git a/NEWS.md b/NEWS.md
index e56aace3e37bbf67801808e2d26dfa878e9de27b..0f2eec0d5e232f5ff11a74bd11860e77ea0e7e56 100644 (file)
--- a/NEWS.md
+++ b/NEWS.md
@@ -13,6 +13,9 @@ NEWS
   to install faad from source to get support for aac/m4a files. The
   faad decoder package must still be installed.
 
+- All calls to select(2) have been replaced by calls to poll(2)
+  to avoid known shortcomings of the select API.
+
 [tarball](./releases/paraslash-git.tar.xz)
 
 ----------------------------------
index a2459d82b31991a8e9ac578e0559a398c83f4e10..36a783c5ecf6ae4538a55fb8c61669707bd4a9b6 100644 (file)
@@ -74,7 +74,7 @@ static void aacdec_close(struct filter_node *fn)
        fn->private_data = NULL;
 }
 
-static int aacdec_post_select(__a_unused struct sched *s, void *context)
+static int aacdec_post_monitor(__a_unused struct sched *s, void *context)
 {
        struct filter_node *fn = context;
        struct btr_node *btrn = fn->btrn;
@@ -158,7 +158,7 @@ err:
 const struct filter lsg_filter_cmd_com_aacdec_user_data = {
        .open = aacdec_open,
        .close = aacdec_close,
-       .pre_select = generic_filter_pre_select,
-       .post_select = aacdec_post_select,
+       .pre_monitor = generic_filter_pre_monitor,
+       .post_monitor = aacdec_post_monitor,
        .execute = aacdec_execute
 };
index 6a0ec239bbcbd120efd130b5f8f24c2b136d4516..889fdce821f4b9aec85c1b01976e4fae6e05e176 100644 (file)
@@ -142,14 +142,14 @@ static void afh_recv_close(struct receiver_node *rn)
        freep(&rn->private_data);
 }
 
-static void afh_recv_pre_select(struct sched *s, void *context)
+static void afh_recv_pre_monitor(struct sched *s, void *context)
 {
        struct receiver_node *rn = context;
        struct private_afh_recv_data *pard = rn->private_data;
        struct afh_info *afhi = &pard->afhi;
        struct lls_parse_result *lpr = rn->lpr;
        struct timeval chunk_time;
-       int state = generic_recv_pre_select(s, rn);
+       int state = generic_recv_pre_monitor(s, rn);
        unsigned j_given = RECV_CMD_OPT_GIVEN(AFH, JUST_IN_TIME, lpr);
 
        if (state <= 0)
@@ -163,7 +163,7 @@ static void afh_recv_pre_select(struct sched *s, void *context)
        sched_request_barrier_or_min_delay(&chunk_time, s);
 }
 
-static int afh_recv_post_select(__a_unused struct sched *s, void *context)
+static int afh_recv_post_monitor(__a_unused struct sched *s, void *context)
 {
        struct receiver_node *rn = context;
        struct lls_parse_result *lpr = rn->lpr;
@@ -242,7 +242,7 @@ out:
 const struct receiver lsg_recv_cmd_com_afh_user_data = {
        .open = afh_recv_open,
        .close = afh_recv_close,
-       .pre_select = afh_recv_pre_select,
-       .post_select = afh_recv_post_select,
+       .pre_monitor = afh_recv_pre_monitor,
+       .post_monitor = afh_recv_post_monitor,
        .execute = afh_execute,
 };
diff --git a/afs.c b/afs.c
index 710670255b2ec1cf67b9ab4e74823bfe19dd3a02..febe13b3e7c561cf2082ab9200c6578806ab8026 100644 (file)
--- a/afs.c
+++ b/afs.c
@@ -707,7 +707,7 @@ static int open_afs_tables(void)
        return ret;
 }
 
-static int afs_signal_post_select(struct sched *s, __a_unused void *context)
+static int afs_signal_post_monitor(struct sched *s, __a_unused void *context)
 {
        int signum, ret;
 
@@ -715,7 +715,7 @@ static int afs_signal_post_select(struct sched *s, __a_unused void *context)
                PARA_EMERG_LOG("para_server died\n");
                goto shutdown;
        }
-       signum = para_next_signal(&s->rfds);
+       signum = para_next_signal();
        if (signum == 0)
                return 0;
        if (signum == SIGHUP) {
@@ -743,8 +743,8 @@ static void register_signal_task(struct sched *s)
 
        signal_task->task = task_register(&(struct task_info) {
                .name = "signal",
-               .pre_select = signal_pre_select,
-               .post_select = afs_signal_post_select,
+               .pre_monitor = signal_pre_monitor,
+               .post_monitor = afs_signal_post_monitor,
                .context = signal_task,
 
        }, s);
@@ -762,15 +762,15 @@ struct afs_client {
        struct timeval connect_time;
 };
 
-static void command_pre_select(struct sched *s, void *context)
+static void command_pre_monitor(struct sched *s, void *context)
 {
        struct command_task *ct = context;
        struct afs_client *client;
 
-       para_fd_set(server_socket, &s->rfds, &s->max_fileno);
-       para_fd_set(ct->fd, &s->rfds, &s->max_fileno);
+       sched_monitor_readfd(server_socket, s);
+       sched_monitor_readfd(ct->fd, s);
        list_for_each_entry(client, &afs_client_list, node)
-               para_fd_set(client->fd, &s->rfds, &s->max_fileno);
+               sched_monitor_readfd(client->fd, s);
 }
 
 /**
@@ -862,11 +862,11 @@ static int call_callback(int fd, int query_shmid)
        return ret;
 }
 
-static int execute_server_command(fd_set *rfds)
+static int execute_server_command(void)
 {
        char buf[8];
        size_t n;
-       int ret = read_nonblock(server_socket, buf, sizeof(buf) - 1, rfds, &n);
+       int ret = read_nonblock(server_socket, buf, sizeof(buf) - 1, &n);
 
        if (ret < 0 || n == 0)
                return ret;
@@ -877,13 +877,13 @@ static int execute_server_command(fd_set *rfds)
 }
 
 /* returns 0 if no data available, 1 else */
-static int execute_afs_command(int fd, fd_set *rfds)
+static int execute_afs_command(int fd)
 {
        uint32_t cookie;
        int query_shmid;
        char buf[sizeof(cookie) + sizeof(query_shmid)];
        size_t n;
-       int ret = read_nonblock(fd, buf, sizeof(buf), rfds, &n);
+       int ret = read_nonblock(fd, buf, sizeof(buf), &n);
 
        if (ret < 0)
                goto err;
@@ -917,7 +917,7 @@ err:
 /** Shutdown connection if query has not arrived until this many seconds. */
 #define AFS_CLIENT_TIMEOUT 3
 
-static int command_post_select(struct sched *s, void *context)
+static int command_post_monitor(struct sched *s, void *context)
 {
        struct command_task *ct = context;
        struct sockaddr_un unix_addr;
@@ -927,7 +927,7 @@ static int command_post_select(struct sched *s, void *context)
        ret = task_get_notification(ct->task);
        if (ret < 0)
                return ret;
-       ret = execute_server_command(&s->rfds);
+       ret = execute_server_command();
        if (ret < 0) {
                PARA_EMERG_LOG("%s\n", para_strerror(-ret));
                task_notify_all(s, -ret);
@@ -935,7 +935,7 @@ static int command_post_select(struct sched *s, void *context)
        }
        /* Check the list of connected clients. */
        list_for_each_entry_safe(client, tmp, &afs_client_list, node) {
-               ret = execute_afs_command(client->fd, &s->rfds);
+               ret = execute_afs_command(client->fd);
                if (ret == 0) { /* prevent bogus connection flooding */
                        struct timeval diff;
                        tv_diff(now, &client->connect_time, &diff);
@@ -948,7 +948,7 @@ static int command_post_select(struct sched *s, void *context)
                free(client);
        }
        /* Accept connections on the local socket. */
-       ret = para_accept(ct->fd, &s->rfds, &unix_addr, sizeof(unix_addr), &fd);
+       ret = para_accept(ct->fd, &unix_addr, sizeof(unix_addr), &fd);
        if (ret < 0)
                PARA_NOTICE_LOG("%s\n", para_strerror(-ret));
        if (ret <= 0)
@@ -973,8 +973,8 @@ static void register_command_task(struct sched *s)
 
        ct->task = task_register(&(struct task_info) {
                .name = "afs command",
-               .pre_select = command_pre_select,
-               .post_select = command_post_select,
+               .pre_monitor = command_pre_monitor,
+               .post_monitor = command_post_monitor,
                .context = ct,
        }, s);
 }
@@ -1003,8 +1003,7 @@ __noreturn void afs_init(int socket_fd)
        PARA_INFO_LOG("server_socket: %d\n", server_socket);
        init_admissible_files(OPT_STRING_VAL(AFS_INITIAL_MODE));
        register_command_task(&s);
-       s.default_timeout.tv_sec = 0;
-       s.default_timeout.tv_usec = 999 * 1000;
+       s.default_timeout = 1000;
        ret = write(socket_fd, "\0", 1);
        if (ret != 1) {
                if (ret == 0)
index bbbf8b650ac86c4952ac46f22c166583018bfb88..2bf3fd0e9010539fa6784e554ae7dc8c21441d3d 100644 (file)
@@ -48,7 +48,7 @@ struct private_alsa_write_data {
        /* time until buffer underrun occurs, in milliseconds */
        unsigned buffer_time;
        struct timeval drain_barrier;
-       /* File descriptor for select(). */
+       /* File descriptor to monitor for reading. */
        int poll_fd;
 };
 
@@ -202,7 +202,7 @@ out:
        return ret;
 }
 
-static void alsa_write_pre_select(struct sched *s, void *context)
+static void alsa_write_pre_monitor(struct sched *s, void *context)
 {
        struct pollfd pfd;
        struct writer_node *wn = context;
@@ -230,7 +230,7 @@ static void alsa_write_pre_select(struct sched *s, void *context)
                return;
        }
        pad->poll_fd = pfd.fd;
-       para_fd_set(pfd.fd, &s->rfds, &s->max_fileno);
+       sched_monitor_readfd(pfd.fd, s);
 }
 
 static void alsa_close(struct writer_node *wn)
@@ -254,7 +254,7 @@ free_pad:
        free(pad);
 }
 
-static int alsa_write_post_select(__a_unused struct sched *s, void *context)
+static int alsa_write_post_monitor(__a_unused struct sched *s, void *context)
 {
        struct writer_node *wn = context;
        struct private_alsa_write_data *pad = wn->private_data;
@@ -321,7 +321,7 @@ again:
        frames = snd_pcm_writei(pad->handle, data, frames);
        if (frames == 0 || frames == -EAGAIN) {
                char buf[100];
-               if (pad->poll_fd >= 0 && FD_ISSET(pad->poll_fd, &s->rfds))
+               if (pad->poll_fd >= 0 && sched_read_ok(pad->poll_fd, s))
                        if (read(pad->poll_fd, buf, 100))
                                do_nothing;
                return 0;
@@ -349,7 +349,7 @@ err:
 
 struct writer lsg_write_cmd_com_alsa_user_data = {
 
-       .pre_select = alsa_write_pre_select,
-       .post_select = alsa_write_post_select,
+       .pre_monitor = alsa_write_pre_monitor,
+       .post_monitor = alsa_write_post_monitor,
        .close = alsa_close,
 };
index 61b1653ea069ce479b6b609da5703fc57bc717c1..9369e4bcbeb1f9c9eec251f5136a34d9e223f8bb 100644 (file)
@@ -43,7 +43,7 @@ static void amp_open(struct filter_node *fn)
                pad->amp, pad->amp / 64.0 + 1.0);
 }
 
-static int amp_post_select(__a_unused struct sched *s, void *context)
+static int amp_post_monitor(__a_unused struct sched *s, void *context)
 {
        struct filter_node *fn = context;
        struct private_amp_data *pad = fn->private_data;
@@ -100,6 +100,6 @@ err:
 const struct filter lsg_filter_cmd_com_amp_user_data = {
        .open = amp_open,
        .close = amp_close,
-       .pre_select = generic_filter_pre_select,
-       .post_select = amp_post_select,
+       .pre_monitor = generic_filter_pre_monitor,
+       .post_monitor = amp_post_monitor,
 };
index 037b92993325539552ddd625b881a843aa57c5a9..41e609b70ab089acc384fb48c4fc40c58657c711 100644 (file)
@@ -42,7 +42,7 @@ static void aow_close(struct writer_node *wn)
        ao_shutdown();
 }
 
-static void aow_pre_select(struct sched *s, void *context)
+static void aow_pre_monitor(struct sched *s, void *context)
 {
        struct writer_node *wn = context;
        struct private_aow_data *pawd = wn->private_data;
@@ -342,7 +342,7 @@ fail:
        return -E_AO_PTHREAD;
 }
 
-static int aow_post_select(__a_unused struct sched *s, void *context)
+static int aow_post_monitor(__a_unused struct sched *s, void *context)
 {
        struct writer_node *wn = context;
        struct private_aow_data *pawd = wn->private_data;
@@ -421,7 +421,7 @@ out:
 
 struct writer lsg_write_cmd_com_ao_user_data = {
        .close = aow_close,
-       .pre_select = aow_pre_select,
-       .post_select = aow_post_select,
+       .pre_monitor = aow_pre_monitor,
+       .post_monitor = aow_post_monitor,
 };
 
index af67063367044b675877173a1ed351bdd28f2dd1..2506c3f8ea6149df47bc84fcb6e5b1b608e70cc5 100644 (file)
--- a/audioc.c
+++ b/audioc.c
@@ -143,17 +143,17 @@ static struct i9e_completer audiod_completers[] = {
        {.name = NULL}
 };
 
-static void audioc_pre_select(struct sched *s, void *context)
+static void audioc_pre_monitor(struct sched *s, void *context)
 {
        struct audioc_task *at = context;
        int ret = btr_node_status(at->btrn, 0, BTR_NT_ROOT);
 
        if (ret < 0)
                sched_min_delay(s);
-       para_fd_set(at->fd, &s->rfds, &s->max_fileno);
+       sched_monitor_readfd(at->fd, s);
 }
 
-static int audioc_post_select(struct sched *s, void *context)
+static int audioc_post_monitor(struct sched *s, void *context)
 {
        char *buf = NULL;
        struct audioc_task *at = context;
@@ -162,7 +162,7 @@ static int audioc_post_select(struct sched *s, void *context)
 
        if (ret < 0)
                goto out;
-       if (!FD_ISSET(at->fd, &s->rfds))
+       if (!sched_read_ok(at->fd, s))
                return 0;
        bufsize = PARA_MAX(1024U, OPT_UINT32_VAL(BUFSIZE));
        buf = para_malloc(bufsize);
@@ -211,8 +211,8 @@ static int audioc_i9e_line_handler(char *line)
                EMBRACE(.name = "audioc line handler"));
        at->task = task_register(&(struct task_info) {
                .name = "audioc",
-               .pre_select = audioc_pre_select,
-               .post_select = audioc_post_select,
+               .pre_monitor = audioc_pre_monitor,
+               .post_monitor = audioc_post_monitor,
                .context = at,
        }, &sched);
        i9e_attach_to_stdout(at->btrn);
@@ -250,9 +250,9 @@ __noreturn static void interactive_session(void)
        sigemptyset(&act.sa_mask);
        act.sa_flags = 0;
        sigaction(SIGINT, &act, NULL);
-       sched.select_function = i9e_select;
+       sched.poll_function = i9e_poll;
 
-       sched.default_timeout.tv_sec = 1;
+       sched.default_timeout = 1000;
        ret = i9e_open(&ici, &sched);
        if (ret < 0)
                goto out;
index 119adbc0be2ddbf24a7d06de4b77a5a3d88bd230..a084558b167af829d2c0f6a0fbd71a2610c1f3b0 100644 (file)
--- a/audiod.c
+++ b/audiod.c
@@ -123,7 +123,7 @@ enum vss_status_flags {
  * This is needed also in audiod_command.c (for the tasks command), so it can
  * not be made static.
  */
-struct sched sched = {.max_fileno = 0};
+struct sched sched = {.timeout = 0};
 
 /* The task for obtaining para_server's status (para_client stat). */
 struct status_task {
@@ -584,8 +584,8 @@ static void open_filters(struct slot_info *s)
                sprintf(buf, "%s (slot %d)", name, (int)(s - slot));
                fn->task = task_register(&(struct task_info) {
                        .name = buf,
-                       .pre_select = f->pre_select,
-                       .post_select = f->post_select,
+                       .pre_monitor = f->pre_monitor,
+                       .post_monitor = f->post_monitor,
                        .context = fn,
                }, &sched);
                parent = fn->btrn;
@@ -648,8 +648,8 @@ static int open_receiver(int format)
                audio_formats[format], name, slot_num);
        rn->task = task_register(&(struct task_info) {
                .name = name,
-               .pre_select = r->pre_select,
-               .post_select = r->post_select,
+               .pre_monitor = r->pre_monitor,
+               .post_monitor = r->post_monitor,
                .context = rn,
        }, &sched);
        return slot_num;
@@ -1055,7 +1055,7 @@ static void init_local_socket(struct command_task *ct)
        exit(EXIT_FAILURE);
 }
 
-static int signal_post_select(struct sched *s, void *context)
+static int signal_post_monitor(struct sched *s, void *context)
 {
        struct signal_task *st = context;
        int ret, signum;
@@ -1063,7 +1063,7 @@ static int signal_post_select(struct sched *s, void *context)
        ret = task_get_notification(st->task);
        if (ret < 0)
                return ret;
-       signum = para_next_signal(&s->rfds);
+       signum = para_next_signal();
        switch (signum) {
        case SIGINT:
        case SIGTERM:
@@ -1075,13 +1075,13 @@ static int signal_post_select(struct sched *s, void *context)
        return 0;
 }
 
-static void command_pre_select(struct sched *s, void *context)
+static void command_pre_monitor(struct sched *s, void *context)
 {
        struct command_task *ct = context;
-       para_fd_set(ct->fd, &s->rfds, &s->max_fileno);
+       sched_monitor_readfd(ct->fd, s);
 }
 
-static int command_post_select(struct sched *s, void *context)
+static int command_post_monitor(struct sched *s, void *context)
 {
        int ret;
        struct command_task *ct = context;
@@ -1092,7 +1092,7 @@ static int command_post_select(struct sched *s, void *context)
        ret = task_get_notification(ct->task);
        if (ret < 0)
                return ret;
-       ret = handle_connect(ct->fd, &s->rfds);
+       ret = dispatch_local_connection(ct->fd);
        if (ret < 0) {
                PARA_NOTICE_LOG("%s\n", para_strerror(-ret));
                if (ret == -E_AUDIOD_TERM) {
@@ -1132,8 +1132,8 @@ static void init_command_task(struct command_task *ct)
 
        ct->task = task_register(&(struct task_info) {
                .name = "command",
-               .pre_select = command_pre_select,
-               .post_select = command_post_select,
+               .pre_monitor = command_pre_monitor,
+               .post_monitor = command_post_monitor,
                .context = ct,
        }, &sched);
 }
@@ -1254,7 +1254,7 @@ static void start_stop_decoders(void)
        audiod_status_dump(true);
 }
 
-static void status_pre_select(struct sched *s, void *context)
+static void status_pre_monitor(struct sched *s, void *context)
 {
        struct status_task *st = context;
        int i, ret, cafn = stat_task->current_audio_format_num;
@@ -1286,7 +1286,7 @@ min_delay:
 }
 
 /* restart the client task if necessary */
-static int status_post_select(struct sched *s, void *context)
+static int status_post_monitor(struct sched *s, void *context)
 {
        struct status_task *st = context;
        int ret;
@@ -1377,8 +1377,8 @@ static void init_status_task(struct status_task *st)
 
        stat_task->task = task_register(&(struct task_info) {
                .name = "stat",
-               .pre_select = status_pre_select,
-               .post_select = status_post_select,
+               .pre_monitor = status_pre_monitor,
+               .post_monitor = status_post_monitor,
                .context = stat_task,
        }, &sched);
 }
@@ -1505,13 +1505,12 @@ int main(int argc, char *argv[])
 
        signal_task->task = task_register(&(struct task_info) {
                .name = "signal",
-               .pre_select = signal_pre_select,
-               .post_select = signal_post_select,
+               .pre_monitor = signal_pre_monitor,
+               .post_monitor = signal_post_monitor,
                .context = signal_task,
        }, &sched);
 
-       sched.default_timeout.tv_sec = 2;
-       sched.default_timeout.tv_usec = 999 * 1000;
+       sched.default_timeout = 2999;
        ret = schedule(&sched);
        audiod_cleanup();
        sched_shutdown(&sched);
index b40fdd6743faa4650888c2bdf69756d6c12e40d8..dedb038fbf772848330b024b93efabce58c1c9d3 100644 (file)
--- a/audiod.h
+++ b/audiod.h
@@ -21,5 +21,5 @@ bool uid_is_whitelisted(uid_t uid);
 /* defined in audiod_command.c */
 void audiod_status_dump(bool force);
 void close_stat_clients(void);
-int handle_connect(int accept_fd, fd_set *rfds);
+int dispatch_local_connection(int accept_fd);
 void stat_client_write_item(int item_num);
index bb54dfab87f7965a18f6cccbe6bc4077c147fa29..795e2ac84657150a1a8d64947e30a7ba4a217796 100644 (file)
@@ -360,10 +360,9 @@ EXPORT_AUDIOD_CMD_HANDLER(version)
  * Handle arriving connections on the local socket.
  *
  * \param accept_fd The fd to accept connections on.
- * \param rfds If \a accept_fd is not set in \a rfds, do nothing.
  *
- * This is called in each iteration of the select loop. If there is an incoming
- * connection on \a accept_fd, this function reads the command sent by the peer,
+ * This is called in each iteration of the main loop of the scheduler. If there
+ * is an incoming connection, the function reads the command sent by the peer,
  * checks the connecting user's permissions by using unix socket credentials
  * (if supported by the OS) and calls the corresponding command handler if
  * permissions are OK.
@@ -372,8 +371,8 @@ EXPORT_AUDIOD_CMD_HANDLER(version)
  * connection to accept.
  *
  * \sa \ref para_accept(), \ref recv_cred_buffer().
- * */
-int handle_connect(int accept_fd, fd_set *rfds)
+ */
+int dispatch_local_connection(int accept_fd)
 {
        int argc, ret, clifd;
        char buf[MAXLINE], **argv = NULL;
@@ -384,7 +383,7 @@ int handle_connect(int accept_fd, fd_set *rfds)
        char *errctx = NULL;
        const struct audiod_command_info *aci;
 
-       ret = para_accept(accept_fd, rfds, &unix_addr, sizeof(struct sockaddr_un), &clifd);
+       ret = para_accept(accept_fd, &unix_addr, sizeof(struct sockaddr_un), &clifd);
        if (ret <= 0)
                return ret;
        ret = recv_cred_buffer(clifd, buf, sizeof(buf) - 1);
index f0d2002d13c28b31c17f7130a4f3c34fc858bd58..49e40fb952c2e6c36a62001b1bc0a46601f412e6 100644 (file)
@@ -570,7 +570,7 @@ bool btr_no_parent(struct btr_node *btrn)
  * buffer.
  *
  * Since the buffer tree may change at any time, this function should be called
- * during each post_select call.
+ * during each post_monitor call.
  *
  * \return True if \a btrn has no siblings.
  */
@@ -1181,7 +1181,7 @@ struct btr_node *btr_search_node(const char *name, struct btr_node *root)
  * \param type The supposed type of \a btrn.
  *
  * Most users of the buffer tree subsystem call this function from both
- * their pre_select and the post_select methods.
+ * their ->pre_monitor() and ->post_monitor() methods.
  *
  * \return Negative if an error condition was detected, zero if there
  * is nothing to do and positive otherwise.
index 89ebdacc0805e9575acea92d09ed3b2dd81ebac3..100975dc79c4b5a0daf78facfbc632d2e9972916 100644 (file)
@@ -39,15 +39,15 @@ struct check_wav_context {
 };
 
 /**
- * Set select timeout according to the given context.
+ * Request a minimal timeout if not idle.
  *
- * \param s Contains the timeval that should be set.
- * \param cwc Contains a pointer to the buffer tree node.
+ * \param s The scheduler instance.
+ * \param cwc The buffer tree node is derived from this.
  *
- * This requests a minimal timeout from the scheduler if btrn of \a cwc is not
- * idle.
+ * If no data is available and the buffer tree node is not in error state, the
+ * function does nothing.
  */
-void check_wav_pre_select(struct sched *s, struct check_wav_context *cwc)
+void check_wav_pre_monitor(struct sched *s, struct check_wav_context *cwc)
 {
        int ret = btr_node_status(cwc->btrn, cwc->min_iqs, BTR_NT_INTERNAL);
        if (ret != 0)
@@ -121,7 +121,7 @@ out:
  *
  * \return Standard.
  */
-int check_wav_post_select(struct check_wav_context *cwc)
+int check_wav_post_monitor(struct check_wav_context *cwc)
 {
        struct btr_node *btrn = cwc->btrn;
        unsigned char *a;
@@ -198,8 +198,8 @@ out:
  * children of this node can figure out channel count, sample rate, etc.
  *
  * \return The (opaque) handle of the newly created check_wav instance. It is
- * supposed to be passed to \ref check_wav_pre_select() and \ref
- * check_wav_post_select().
+ * supposed to be passed to \ref check_wav_pre_monitor() and \ref
+ * check_wav_post_monitor().
  *
  * \sa \ref btr_new_node.
  */
@@ -225,7 +225,7 @@ struct check_wav_context *check_wav_init(struct btr_node *parent,
  *
  * \param cwc Determines the instance to shut down.
  *
- * This function may only be called after check_wav_post_select() has returned
+ * This function may only be called after check_wav_post_monitor() has returned
  * negative.
  */
 void check_wav_shutdown(struct check_wav_context *cwc)
index 79b11962baea9b3fe743f08ad68bca6a70e552bc..e6188c5243db16d82c524fc47f4066c8f9aec9df 100644 (file)
@@ -42,6 +42,6 @@ struct wav_params {
 struct check_wav_context *check_wav_init(struct btr_node *parent,
                struct btr_node *child, struct wav_params *params,
                struct btr_node **cw_btrn);
-void check_wav_pre_select(struct sched *s, struct check_wav_context *cwc);
-int check_wav_post_select(struct check_wav_context *cwc);
+void check_wav_pre_monitor(struct sched *s, struct check_wav_context *cwc);
+int check_wav_post_monitor(struct check_wav_context *cwc);
 void check_wav_shutdown(struct check_wav_context *cwc);
index 8caf44839919fb33c1368715c528f507ebc7a5c9..ed0d5e02f7881409b4f5c5ce9735a5a42e486555 100644 (file)
--- a/client.c
+++ b/client.c
@@ -42,7 +42,7 @@ struct exec_task {
        size_t result_size;
 };
 
-static void exec_pre_select(struct sched *s, void *context)
+static void exec_pre_monitor(struct sched *s, void *context)
 {
        struct exec_task *et = context;
        int ret = btr_node_status(et->btrn, 0, BTR_NT_LEAF);
@@ -51,7 +51,7 @@ static void exec_pre_select(struct sched *s, void *context)
                sched_min_delay(s);
 }
 
-static int exec_post_select(__a_unused struct sched *s, void *context)
+static int exec_post_monitor(__a_unused struct sched *s, void *context)
 {
        struct exec_task *et = context;
        struct btr_node *btrn = et->btrn;
@@ -123,7 +123,7 @@ fail:
 static int execute_client_command(const char *cmd, char **result)
 {
        int ret;
-       struct sched command_sched = {.default_timeout = {.tv_sec = 1}};
+       struct sched command_sched = {.default_timeout = 1000};
        struct exec_task exec_task = {
                .result_buf = para_strdup(""),
                .result_size = 1,
@@ -138,8 +138,8 @@ static int execute_client_command(const char *cmd, char **result)
                EMBRACE(.name = "exec_collect"));
        exec_task.task = task_register(&(struct task_info) {
                .name = "client exec",
-               .pre_select = exec_pre_select,
-               .post_select = exec_post_select,
+               .pre_monitor = exec_pre_monitor,
+               .post_monitor = exec_post_monitor,
                .context = &exec_task,
        }, &command_sched);
        ret = client_connect(ct, &command_sched, NULL, exec_task.btrn);
@@ -532,7 +532,7 @@ __noreturn static void interactive_session(void)
        sigemptyset(&act.sa_mask);
        act.sa_flags = 0;
        sigaction(SIGINT, &act, NULL);
-       sched.select_function = i9e_select;
+       sched.poll_function = i9e_poll;
 
        ret = i9e_open(&ici, &sched);
        if (ret < 0)
@@ -578,7 +578,7 @@ struct supervisor_task {
        struct task *task;
 };
 
-static int supervisor_post_select(struct sched *s, void *context)
+static int supervisor_post_monitor(struct sched *s, void *context)
 {
        struct supervisor_task *svt = context;
        int ret = task_status(ct->task);
@@ -624,7 +624,7 @@ int main(int argc, char *argv[])
        int ret;
 
        crypt_init();
-       sched.default_timeout.tv_sec = 1;
+       sched.default_timeout = 1000;
 
        ret = client_parse_config(argc, argv, &ct, &client_loglevel);
        if (ret < 0)
@@ -648,7 +648,7 @@ int main(int argc, char *argv[])
                EMBRACE(.name = "stdout", .parent = ct->btrn[0]));
        supervisor_task.task = task_register(&(struct task_info) {
                .name = "supervisor",
-               .post_select = supervisor_post_select,
+               .post_monitor = supervisor_post_monitor,
                .context = &supervisor_task,
        }, &sched);
 
index f8ee80c9d7a01956769284eaa117f1606c59cd76..3beeed1f49effebe1d6f92ceea0fdebb589a7fe6 100644 (file)
@@ -57,7 +57,7 @@ void client_close(struct client_task *ct)
  * The context pointer is assumed to refer to a client task structure that was
  * initialized earlier by client_open().
  */
-static void client_pre_select(struct sched *s, void *context)
+static void client_pre_monitor(struct sched *s, void *context)
 {
        int ret;
        struct client_task *ct = context;
@@ -68,13 +68,13 @@ static void client_pre_select(struct sched *s, void *context)
        case CL_CONNECTED:
        case CL_SENT_AUTH:
        case CL_SENT_CH_RESPONSE:
-               para_fd_set(ct->scc.fd, &s->rfds, &s->max_fileno);
+               sched_monitor_readfd(ct->scc.fd, s);
                return;
 
        case CL_RECEIVED_WELCOME:
        case CL_RECEIVED_PROCEED:
        case CL_RECEIVED_CHALLENGE:
-               para_fd_set(ct->scc.fd, &s->wfds, &s->max_fileno);
+               sched_monitor_writefd(ct->scc.fd, s);
                return;
 
        case CL_SENDING:
@@ -83,7 +83,7 @@ static void client_pre_select(struct sched *s, void *context)
                        if (ret < 0)
                                sched_min_delay(s);
                        else if (ret > 0)
-                               para_fd_set(ct->scc.fd, &s->wfds, &s->max_fileno);
+                               sched_monitor_writefd(ct->scc.fd, s);
                }
                __attribute__ ((fallthrough));
        case CL_EXECUTING:
@@ -92,7 +92,7 @@ static void client_pre_select(struct sched *s, void *context)
                        if (ret < 0)
                                sched_min_delay(s);
                        else if (ret > 0)
-                               para_fd_set(ct->scc.fd, &s->rfds, &s->max_fileno);
+                               sched_monitor_readfd(ct->scc.fd, s);
                }
                return;
        }
@@ -125,8 +125,7 @@ static int send_sb(struct client_task *ct, int channel, void *buf, size_t numbyt
        return 0;
 }
 
-static int recv_sb(struct client_task *ct, fd_set *rfds,
-               struct sb_buffer *result)
+static int recv_sb(struct client_task *ct, struct sb_buffer *result)
 {
        int ret;
        size_t n;
@@ -134,8 +133,6 @@ static int recv_sb(struct client_task *ct, fd_set *rfds,
        void *trafo_context;
        struct iovec iov;
 
-       if (!FD_ISSET(ct->scc.fd, rfds))
-               return 0;
        if (ct->status < CL_SENT_CH_RESPONSE)
                trafo = trafo_context = NULL;
        else {
@@ -146,7 +143,7 @@ static int recv_sb(struct client_task *ct, fd_set *rfds,
                ct->sbc[0] = sb_new_recv(0, trafo, trafo_context);
 again:
        sb_get_recv_buffer(ct->sbc[0], &iov);
-       ret = read_nonblock(ct->scc.fd, iov.iov_base, iov.iov_len, rfds, &n);
+       ret = read_nonblock(ct->scc.fd, iov.iov_base, iov.iov_len, &n);
        if (ret < 0) {
                sb_free(ct->sbc[0]);
                ct->sbc[0] = NULL;
@@ -274,7 +271,7 @@ static bool has_feature(const char *feature, struct client_task *ct)
  * The context pointer refers to a client task structure that was initialized
  * earlier by client_open().
  */
-static int client_post_select(struct sched *s, void *context)
+static int client_post_monitor(struct sched *s, void *context)
 {
        struct client_task *ct = context;
        int ret = 0;
@@ -288,7 +285,7 @@ static int client_post_select(struct sched *s, void *context)
                return 0;
        switch (ct->status) {
        case CL_CONNECTED: /* receive welcome message */
-               ret = read_nonblock(ct->scc.fd, buf, sizeof(buf), &s->rfds, &n);
+               ret = read_nonblock(ct->scc.fd, buf, sizeof(buf), &n);
                if (ret < 0 || n == 0)
                        goto out;
                ct->features = parse_features(buf);
@@ -302,7 +299,7 @@ static int client_post_select(struct sched *s, void *context)
                 * 0.8.0 we no longer need to request the feature.
                 */
                bool has_sha256;
-               if (!FD_ISSET(ct->scc.fd, &s->wfds))
+               if (!sched_write_ok(ct->scc.fd, s))
                        return 0;
                has_sha256 = has_feature("sha256", ct);
                sprintf(buf, AUTH_REQUEST_MSG "%s%s", ct->user, has_sha256?
@@ -324,7 +321,7 @@ static int client_post_select(struct sched *s, void *context)
                unsigned char crypt_buf[1024];
                struct sb_buffer sbb;
 
-               ret = recv_sb(ct, &s->rfds, &sbb);
+               ret = recv_sb(ct, &sbb);
                if (ret <= 0)
                        goto out;
                if (sbb.band != SBD_CHALLENGE) {
@@ -371,7 +368,7 @@ static int client_post_select(struct sched *s, void *context)
        case CL_SENT_CH_RESPONSE: /* read server response */
                {
                struct sb_buffer sbb;
-               ret = recv_sb(ct, &s->rfds, &sbb);
+               ret = recv_sb(ct, &sbb);
                if (ret <= 0)
                        goto out;
                free(sbb.iov.iov_base);
@@ -383,7 +380,7 @@ static int client_post_select(struct sched *s, void *context)
                }
        case CL_RECEIVED_PROCEED: /* concat args and send command */
                {
-               if (!FD_ISSET(ct->scc.fd, &s->wfds))
+               if (!sched_write_ok(ct->scc.fd, s))
                        return 0;
                ret = send_sb_command(ct);
                if (ret <= 0)
@@ -405,7 +402,7 @@ static int client_post_select(struct sched *s, void *context)
                        }
                        if (ret < 0)
                                goto close1;
-                       if (ret > 0 && FD_ISSET(ct->scc.fd, &s->wfds)) {
+                       if (ret > 0 && sched_write_ok(ct->scc.fd, s)) {
                                sz = btr_next_buffer(ct->btrn[1], &buf2);
                                assert(sz);
                                ret = send_sb(ct, 1, buf2, sz, SBD_BLOB_DATA, true);
@@ -421,9 +418,9 @@ static int client_post_select(struct sched *s, void *context)
                        ret = btr_node_status(ct->btrn[0], 0, BTR_NT_ROOT);
                        if (ret < 0)
                                goto close0;
-                       if (ret > 0 && FD_ISSET(ct->scc.fd, &s->rfds)) {
+                       if (ret > 0 && sched_read_ok(ct->scc.fd, s)) {
                                struct sb_buffer sbb;
-                               ret = recv_sb(ct, &s->rfds, &sbb);
+                               ret = recv_sb(ct, &sbb);
                                if (ret < 0)
                                        goto close0;
                                if (ret > 0) {
@@ -503,8 +500,8 @@ int client_connect(struct client_task *ct, struct sched *s,
 
        ct->task = task_register(&(struct task_info) {
                .name = "client",
-               .pre_select = client_pre_select,
-               .post_select = client_post_select,
+               .pre_monitor = client_pre_monitor,
+               .post_monitor = client_post_monitor,
                .context = ct,
        }, s);
        return 1;
index e3f12931fd9d17fe5489636919a14379d60d0969..200ff054b35f0735cdb41cf09eda14b8b9b77826 100644 (file)
--- a/command.c
+++ b/command.c
@@ -22,8 +22,8 @@
 #include "net.h"
 #include "server.h"
 #include "list.h"
-#include "send.h"
 #include "sched.h"
+#include "send.h"
 #include "vss.h"
 #include "daemon.h"
 #include "fd.h"
index ff4ce6fb7663c5a0c35e1a42b433060b3b162b89..9f9d8515e3794139a65f9f3e7598d3d900829bcc 100644 (file)
@@ -37,7 +37,7 @@ static void compress_close(struct filter_node *fn)
        free(fn->private_data);
 }
 
-static int compress_post_select(__a_unused struct sched *s, void *context)
+static int compress_post_monitor(__a_unused struct sched *s, void *context)
 {
        struct filter_node *fn = context;
        struct private_compress_data *pcd = fn->private_data;
@@ -162,6 +162,6 @@ const struct filter lsg_filter_cmd_com_compress_user_data = {
        .setup = compress_setup,
        .open = compress_open,
        .close = compress_close,
-       .pre_select = generic_filter_pre_select,
-       .post_select = compress_post_select,
+       .pre_monitor = generic_filter_pre_monitor,
+       .post_monitor = compress_post_monitor,
 };
index 9056002acd7010349939e06c2b3ccc3f8cfc5b5b..c72581118326924a25b5ca9b020316439135d60b 100644 (file)
@@ -584,7 +584,7 @@ fi
 if test $HAVE_OSS = yes -o $HAVE_ALSA = yes; then
        build_mixer="yes"
        executables="$executables mixer"
-       mixer_errlist_objs="mixer exec string fd lsu version"
+       mixer_errlist_objs="mixer exec string fd time lsu version"
        if test $HAVE_OSS = yes; then
                mixer_errlist_objs="$mixer_errlist_objs oss_mix"
        fi
@@ -840,6 +840,7 @@ audioc_errlist_objs="
        lsu
        net
        fd
+       time
        version
 "
 if test $HAVE_READLINE = yes; then
@@ -847,7 +848,6 @@ if test $HAVE_READLINE = yes; then
                buffer_tree
                interactive
                sched
-               time
        "
 fi
 audioc_objs="$audioc_errlist_objs"
index 639c93fcbf99bc93ddc054ad43d03f30811270ac..faacd39f8fb0b4377f495a63d115ba5bd3de0336 100644 (file)
@@ -109,16 +109,16 @@ err:
        return ret;
 }
 
-static void dccp_recv_pre_select(struct sched *s, void *context)
+static void dccp_recv_pre_monitor(struct sched *s, void *context)
 {
        struct receiver_node *rn = context;
 
-       if (generic_recv_pre_select(s, rn) <= 0)
+       if (generic_recv_pre_monitor(s, rn) <= 0)
                return;
-       para_fd_set(rn->fd, &s->rfds, &s->max_fileno);
+       sched_monitor_readfd(rn->fd, s);
 }
 
-static int dccp_recv_post_select(struct sched *s, void *context)
+static int dccp_recv_post_monitor(__a_unused struct sched *s, void *context)
 {
        struct receiver_node *rn = context;
        struct btr_node *btrn = rn->btrn;
@@ -136,7 +136,7 @@ static int dccp_recv_post_select(struct sched *s, void *context)
        ret = -E_DCCP_OVERRUN;
        if (iovcnt == 0)
                goto out;
-       ret = readv_nonblock(rn->fd, iov, iovcnt, &s->rfds, &num_bytes);
+       ret = readv_nonblock(rn->fd, iov, iovcnt, &num_bytes);
        if (num_bytes == 0)
                goto out;
        if (num_bytes <= iov[0].iov_len) /* only the first buffer was filled */
@@ -154,6 +154,6 @@ out:
 const struct receiver lsg_recv_cmd_com_dccp_user_data = {
        .open = dccp_recv_open,
        .close = dccp_recv_close,
-       .pre_select = dccp_recv_pre_select,
-       .post_select = dccp_recv_post_select,
+       .pre_monitor = dccp_recv_pre_monitor,
+       .post_monitor = dccp_recv_post_monitor,
 };
index bca7ad6781e29d2bb1bea686d3971e02ede59909..9e9372715c5fac244ed5943d74e3d98089859831 100644 (file)
@@ -24,8 +24,8 @@
 #include "net.h"
 #include "server.h"
 #include "list.h"
-#include "send.h"
 #include "sched.h"
+#include "send.h"
 #include "vss.h"
 #include "fd.h"
 
@@ -36,14 +36,13 @@ struct dccp_fec_client {
        struct fec_client *fc;
 };
 
-static void dccp_pre_select(int *max_fileno, fd_set *rfds,
-               __a_unused fd_set *wfds)
+static void dccp_pre_monitor(struct sched *s)
 {
        unsigned n;
 
        FOR_EACH_LISTEN_FD(n, dss)
                if (dss->listen_fds[n] >= 0)
-                       para_fd_set(dss->listen_fds[n], rfds, max_fileno);
+                       sched_monitor_readfd(dss->listen_fds[n], s);
 }
 
 /**
@@ -119,14 +118,14 @@ static void dccp_send_fec(struct sender_client *sc, char *buf, size_t len)
                dccp_shutdown_client(sc);
 }
 
-static void dccp_post_select(fd_set *rfds, __a_unused fd_set *wfds)
+static void dccp_post_monitor(__a_unused struct sched *s)
 {
        struct sender_client *sc;
        struct dccp_fec_client *dfc;
        int tx_ccid;
        uint32_t k, n;
 
-       sc = accept_sender_client(dss, rfds);
+       sc = accept_sender_client(dss);
        if (!sc)
                return;
 
@@ -249,8 +248,8 @@ const struct sender dccp_sender = {
        .name = "dccp",
        .init = dccp_send_init,
        .shutdown = dccp_shutdown,
-       .pre_select = dccp_pre_select,
-       .post_select = dccp_post_select,
+       .pre_monitor = dccp_pre_monitor,
+       .post_monitor = dccp_post_monitor,
        .shutdown_clients = dccp_shutdown_clients,
        .client_cmds = {
                [SENDER_on] = dccp_com_on,
diff --git a/fd.c b/fd.c
index 33891d2e6c9f1c3568428b1df93b4b92e295ef61..800106e132b2bc21085f65803d70b6bad492f854 100644 (file)
--- a/fd.c
+++ b/fd.c
@@ -176,14 +176,11 @@ __printf_2_3 int write_va_buffer(int fd, const char *fmt, ...)
  * \param fd The file descriptor to read from.
  * \param iov Scatter/gather array used in readv().
  * \param iovcnt Number of elements in \a iov.
- * \param rfds An optional fd set pointer.
  * \param num_bytes Result pointer. Contains the number of bytes read from \a fd.
  *
- * If rfds is not NULL and the (non-blocking) file descriptor fd is not set in
- * rfds, this function returns early without doing anything. Otherwise it tries
- * to read up to sz bytes from fd, where sz is the sum of the lengths of all
- * vectors in iov. Like \ref xwrite(), EAGAIN and EINTR are not considered
- * error conditions. However, EOF is.
+ * This function tries to read up to sz bytes from fd, where sz is the sum of
+ * the lengths of all vectors in iov. Like \ref xwrite(), EAGAIN and EINTR are
+ * not considered error conditions. However, EOF is.
  *
  * \return Zero or a negative error code. If the underlying call to readv(2)
  * returned zero (indicating an end of file condition) or failed for some
@@ -197,24 +194,12 @@ __printf_2_3 int write_va_buffer(int fd, const char *fmt, ...)
  *
  * \sa \ref xwrite(), read(2), readv(2).
  */
-int readv_nonblock(int fd, struct iovec *iov, int iovcnt, fd_set *rfds,
-               size_t *num_bytes)
+int readv_nonblock(int fd, struct iovec *iov, int iovcnt, size_t *num_bytes)
 {
        int ret, i, j;
 
        *num_bytes = 0;
-       /*
-        * Avoid a shortcoming of select(): Reads from a non-blocking fd might
-        * return EAGAIN even if FD_ISSET() returns true. However, FD_ISSET()
-        * returning false definitely means that no data can currently be read.
-        * This is the common case, so it is worth to avoid the overhead of the
-        * read() system call in this case.
-        */
-       if (rfds && !FD_ISSET(fd, rfds))
-               return 0;
-
        for (i = 0, j = 0; i < iovcnt;) {
-
                /* fix up the first iov */
                assert(j < iov[i].iov_len);
                iov[i].iov_base += j;
@@ -251,7 +236,6 @@ int readv_nonblock(int fd, struct iovec *iov, int iovcnt, fd_set *rfds,
  * \param fd The file descriptor to read from.
  * \param buf The buffer to read data to.
  * \param sz The size of \a buf.
- * \param rfds \see \ref readv_nonblock().
  * \param num_bytes \see \ref readv_nonblock().
  *
  * This is a simple wrapper for readv_nonblock() which uses an iovec with a single
@@ -259,10 +243,10 @@ int readv_nonblock(int fd, struct iovec *iov, int iovcnt, fd_set *rfds,
  *
  * \return The return value of the underlying call to readv_nonblock().
  */
-int read_nonblock(int fd, void *buf, size_t sz, fd_set *rfds, size_t *num_bytes)
+int read_nonblock(int fd, void *buf, size_t sz, size_t *num_bytes)
 {
        struct iovec iov = {.iov_base = buf, .iov_len = sz};
-       return readv_nonblock(fd, &iov, 1, rfds, num_bytes);
+       return readv_nonblock(fd, &iov, 1, num_bytes);
 }
 
 /**
@@ -271,7 +255,6 @@ int read_nonblock(int fd, void *buf, size_t sz, fd_set *rfds, size_t *num_bytes)
  * \param fd The file descriptor to receive from.
  * \param pattern The expected pattern.
  * \param bufsize The size of the internal buffer.
- * \param rfds Passed to read_nonblock().
  *
  * This function tries to read at most \a bufsize bytes from the non-blocking
  * file descriptor \a fd. If at least \p strlen(\a pattern) bytes have been
@@ -283,11 +266,11 @@ int read_nonblock(int fd, void *buf, size_t sz, fd_set *rfds, size_t *num_bytes)
  *
  * \sa \ref read_nonblock(), \sa strncasecmp(3).
  */
-int read_pattern(int fd, const char *pattern, size_t bufsize, fd_set *rfds)
+int read_pattern(int fd, const char *pattern, size_t bufsize)
 {
        size_t n, len;
        char *buf = para_malloc(bufsize + 1);
-       int ret = read_nonblock(fd, buf, bufsize, rfds, &n);
+       int ret = read_nonblock(fd, buf, bufsize, &n);
 
        buf[n] = '\0';
        if (ret < 0)
@@ -325,36 +308,6 @@ bool file_exists(const char *fn)
        return !stat(fn, &statbuf);
 }
 
-/**
- * Paraslash's wrapper for select(2).
- *
- * It calls select(2) (with no exceptfds) and starts over if select() was
- * interrupted by a signal.
- *
- * \param n The highest-numbered descriptor in any of the two sets, plus 1.
- * \param readfds fds that should be checked for readability.
- * \param writefds fds that should be checked for writablility.
- * \param timeout_tv upper bound on the amount of time elapsed before select()
- * returns.
- *
- * \return The return value of the underlying select() call on success, the
- * negative system error code on errors.
- *
- * All arguments are passed verbatim to select(2).
- * \sa select(2) select_tut(2).
- */
-int para_select(int n, fd_set *readfds, fd_set *writefds,
-               struct timeval *timeout_tv)
-{
-       int ret;
-       do
-               ret = select(n, readfds, writefds, NULL, timeout_tv);
-       while (ret < 0 && errno == EINTR);
-       if (ret < 0)
-               return -ERRNO_TO_PARA_ERROR(errno);
-       return ret;
-}
-
 /**
  * Set a file descriptor to blocking mode.
  *
@@ -391,34 +344,6 @@ __must_check int mark_fd_nonblocking(int fd)
        return 1;
 }
 
-/**
- * Set a file descriptor in a fd_set.
- *
- * \param fd The file descriptor to be set.
- * \param fds The file descriptor set.
- * \param max_fileno Highest-numbered file descriptor.
- *
- * This wrapper for FD_SET() passes its first two arguments to \p FD_SET. Upon
- * return, \a max_fileno contains the maximum of the old_value and \a fd.
- *
- * \sa \ref para_select.
-*/
-void para_fd_set(int fd, fd_set *fds, int *max_fileno)
-{
-       assert(fd >= 0 && fd < FD_SETSIZE);
-#if 0
-       {
-               int flags = fcntl(fd, F_GETFL);
-               if (!(flags & O_NONBLOCK)) {
-                       PARA_EMERG_LOG("fd %d is a blocking file descriptor\n", fd);
-                       exit(EXIT_FAILURE);
-               }
-       }
-#endif
-       FD_SET(fd, fds);
-       *max_fileno = PARA_MAX(*max_fileno, fd);
-}
-
 /**
  * Paraslash's wrapper for mmap.
  *
@@ -642,6 +567,47 @@ int para_munmap(void *start, size_t length)
        return -ERRNO_TO_PARA_ERROR(err);
 }
 
+/**
+ * Simple wrapper for poll(2).
+ *
+ * It calls poll(2) and starts over if the call was interrupted by a signal.
+ *
+ * \param fds See poll(2).
+ * \param nfds See poll(2).
+ * \param timeout See poll(2).
+ *
+ * \return The return value of the underlying poll() call on success, the
+ * negative paraslash error code on errors.
+ *
+ * All arguments are passed verbatim to poll(2).
+ */
+int xpoll(struct pollfd *fds, nfds_t nfds, int timeout)
+{
+       int ret;
+
+       do
+               ret = poll(fds, nfds, timeout);
+       while (ret < 0 && errno == EINTR);
+       return ret < 0? -ERRNO_TO_PARA_ERROR(errno) : ret;
+}
+
+/**
+ * Check a file descriptor for readability.
+ *
+ * \param fd The file descriptor.
+ *
+ * \return positive if fd is ready for reading, zero if it isn't, negative if
+ * an error occurred.
+ *
+ * \sa \ref write_ok().
+ */
+int read_ok(int fd)
+{
+       struct pollfd pfd = {.fd = fd, .events = POLLIN};
+       int ret = xpoll(&pfd, 1, 0);
+       return ret < 0? ret : pfd.revents & POLLIN;
+}
+
 /**
  * Check a file descriptor for writability.
  *
@@ -649,18 +615,14 @@ int para_munmap(void *start, size_t length)
  *
  * \return positive if fd is ready for writing, zero if it isn't, negative if
  * an error occurred.
+ *
+ * \sa \ref read_ok().
  */
-
 int write_ok(int fd)
 {
-       struct timeval tv;
-       fd_set wfds;
-
-       FD_ZERO(&wfds);
-       FD_SET(fd, &wfds);
-       tv.tv_sec = 0;
-       tv.tv_usec = 0;
-       return para_select(fd + 1, NULL, &wfds, &tv);
+       struct pollfd pfd = {.fd = fd, .events = POLLOUT};
+       int ret = xpoll(&pfd, 1, 0);
+       return ret < 0? ret : pfd.revents & POLLOUT;
 }
 
 /**
diff --git a/fd.h b/fd.h
index c9e79426f5c27ebf621367ce24b1c073c4e97e23..270d0ce200df30f16333fcfbdb47173500cacddc 100644 (file)
--- a/fd.h
+++ b/fd.h
@@ -6,11 +6,9 @@ int xrename(const char *oldpath, const char *newpath);
 int write_all(int fd, const char *buf, size_t len);
 __printf_2_3 int write_va_buffer(int fd, const char *fmt, ...);
 bool file_exists(const char *);
-int para_select(int n, fd_set *readfds, fd_set *writefds,
-               struct timeval *timeout_tv);
+int xpoll(struct pollfd *fds, nfds_t nfds, int timeout);
 __must_check int mark_fd_nonblocking(int fd);
 __must_check int mark_fd_blocking(int fd);
-void para_fd_set(int fd, fd_set *fds, int *max_fileno);
 int para_mmap(size_t length, int prot, int flags, int fd, void *map);
 int para_open(const char *path, int flags, mode_t mode);
 int para_mkdir(const char *path, mode_t mode);
@@ -18,12 +16,12 @@ int para_chdir(const char *path);
 int mmap_full_file(const char *filename, int open_mode, void **map,
        size_t *size, int *fd_ptr);
 int para_munmap(void *start, size_t length);
+int read_ok(int fd);
 int write_ok(int fd);
 void valid_fd_012(void);
-int readv_nonblock(int fd, struct iovec *iov, int iovcnt, fd_set *rfds,
-               size_t *num_bytes);
-int read_nonblock(int fd, void *buf, size_t sz, fd_set *rfds, size_t *num_bytes);
-int read_pattern(int fd, const char *pattern, size_t bufsize, fd_set *rfds);
+int readv_nonblock(int fd, struct iovec *iov, int iovcnt, size_t *num_bytes);
+int read_nonblock(int fd, void *buf, size_t sz, size_t *num_bytes);
+int read_pattern(int fd, const char *pattern, size_t bufsize);
 int xwrite(int fd, const char *buf, size_t len);
 int xwritev(int fd, struct iovec *iov, int iovcnt);
 int for_each_file_in_dir(const char *dirname,
index 13d4f7b22f6a46da75ef4b629275d603fc5f39d1..d629603c4b11f37825cd71b7bf5335d65625fa83 100644 (file)
@@ -431,7 +431,7 @@ static void fecdec_close(struct filter_node *fn)
        fn->private_data = NULL;
 }
 
-static int fecdec_post_select(__a_unused struct sched *s, void *context)
+static int fecdec_post_monitor(__a_unused struct sched *s, void *context)
 {
        struct filter_node *fn = context;
        struct btr_node *btrn = fn->btrn;
@@ -478,7 +478,7 @@ static void fecdec_open(struct filter_node *fn)
 
 const struct filter lsg_filter_cmd_com_fecdec_user_data = {
        .open = fecdec_open,
-       .pre_select = generic_filter_pre_select,
-       .post_select = fecdec_post_select,
+       .pre_monitor = generic_filter_pre_monitor,
+       .post_monitor = fecdec_post_monitor,
        .close = fecdec_close,
 };
index 9a5ed5d7fab7dc01fe46fa5ecab6940ea3a28828..64153178e969191201b869fe6bfa34f37f61af23 100644 (file)
@@ -69,7 +69,7 @@ static int prepare_output_file(struct writer_node *wn)
        return 1;
 }
 
-static void file_write_pre_select(struct sched *s, void *context)
+static void file_write_pre_monitor(struct sched *s, void *context)
 {
        struct writer_node *wn = context;
        struct private_file_write_data *pfwd = wn->private_data;
@@ -79,7 +79,7 @@ static void file_write_pre_select(struct sched *s, void *context)
                return;
        if (ret < 0 || !pfwd)
                return sched_min_delay(s);
-       para_fd_set(pfwd->fd, &s->wfds, &s->max_fileno);
+       sched_monitor_writefd(pfwd->fd, s);
 }
 
 static void file_write_close(struct writer_node *wn)
@@ -92,7 +92,7 @@ static void file_write_close(struct writer_node *wn)
        free(pfwd);
 }
 
-static int file_write_post_select(__a_unused struct sched *s, void *context)
+static int file_write_post_monitor(__a_unused struct sched *s, void *context)
 {
        struct writer_node *wn = context;
        struct private_file_write_data *pfwd = wn->private_data;
@@ -111,7 +111,7 @@ static int file_write_post_select(__a_unused struct sched *s, void *context)
                ret = prepare_output_file(wn);
                goto out;
        }
-       if (!FD_ISSET(pfwd->fd, &s->wfds))
+       if (!sched_write_ok(pfwd->fd, s))
                return 0;
        bytes = btr_next_buffer(btrn, &buf);
        assert(bytes > 0);
@@ -128,7 +128,7 @@ out:
 
 /** the init function of the file writer */
 struct writer lsg_write_cmd_com_file_user_data = {
-       .pre_select = file_write_pre_select,
-       .post_select = file_write_post_select,
+       .pre_monitor = file_write_pre_monitor,
+       .post_monitor = file_write_post_monitor,
        .close = file_write_close,
 };
index d4a2423904ca96f87f1ec11600f1c67e2b101ae1..85d3da7e57270e21ec6897c3086c93397f9b5e31 100644 (file)
--- a/filter.c
+++ b/filter.c
@@ -137,8 +137,8 @@ int main(int argc, char *argv[])
                        EMBRACE(.name = name, .parent = parent,
                        .handler = f->execute, .context = fn));
                ti.name = name;
-               ti.pre_select = f->pre_select;
-               ti.post_select = f->post_select;
+               ti.pre_monitor = f->pre_monitor;
+               ti.post_monitor = f->post_monitor;
                ti.context = fn;
                if (f->open)
                        f->open(fn);
@@ -149,8 +149,7 @@ int main(int argc, char *argv[])
                EMBRACE(.name = "stdout", .parent = parent));
        stdout_task_register(sot, &s);
 
-       s.default_timeout.tv_sec = 1;
-       s.default_timeout.tv_usec = 0;
+       s.default_timeout = 1000;
        btr_log_tree(sit->btrn, LL_INFO);
        ret = schedule(&s);
        sched_shutdown(&s);
index 69d4dfee8534941ab74bb17d7d2a5ff8e23eed6b..77057e6a7d2b9a997219d907b3ddd459716283ad 100644 (file)
--- a/filter.h
+++ b/filter.h
@@ -28,16 +28,18 @@ struct filter_node {
 };
 
 /**
- * The structure associated with a paraslash filter.
+ * Describes a method to convert audio data.
  *
- * Paraslash filters are "modules" which transform an audio stream. struct
- * filter contains methods which are implemented by each filter.
+ * Paraslash filters are "modules" which transform the data of an audio stream.
+ * This structure contains the methods which have to be implemented by each
+ * filter.
  *
- * Note: As several instances of the same filter may be running at the same
- * time, all these filter functions must be reentrant; no static non-constant
- * variables may be used.
+ * As several instances of the same filter may be running at the same time, all
+ * filter methods must be reentrant and no static non-constant variables must
+ * be used.
  *
- * \sa \ref filter_node.
+ * \sa \ref filter_node, struct \ref receiver, struct \ref writer, struct \ref
+ * sched.
  */
 struct filter {
        /**
@@ -81,24 +83,10 @@ struct filter {
         * This should free whatever ->setup() has allocated.
         */
        void (*teardown)(const struct lls_parse_result *lpr, void *conf);
-       /**
-        * Set scheduler timeout and add file descriptors to fd sets.
-        *
-        * This function controls the timeout value for the next call to
-        * select(2). It may decrease the current timeout but shall never
-        * increase it. The second purpose of this function is to add file
-        * descriptors to the two fd sets of the sched structure. The
-        * descriptors in these sets will be watched by the subsequent
-        * select(2) call.
-        */
-       void (*pre_select)(struct sched *s, void *context);
-       /**
-        * Convert (filter) the given data.
-        *
-        * Pointer to the converting function of the filter. On errors, the
-        * post_select function is supposed to return a negative error code.
-        */
-       int (*post_select)(struct sched *s, void *context);
+       /** Force a zero timeout if data is available in the buffer tree. */
+       void (*pre_monitor)(struct sched *s, void *context);
+       /** Convert (filter) input data into output data. */
+       int (*post_monitor)(struct sched *s, void *context);
        /**
         * Answer a buffer tree query.
         *
@@ -124,7 +112,7 @@ int filter_setup(const char *fa, void **conf, struct lls_parse_result **lprp);
 #define FILTER_CMD_OPT_STRING_VAL(_cmd, _opt, _lpr) \
        (lls_string_val(0, FILTER_CMD_OPT_RESULT(_cmd, _opt, _lpr)))
 
-void generic_filter_pre_select(struct sched *s, void *context);
+void generic_filter_pre_monitor(struct sched *s, void *context);
 int decoder_execute(const char *cmd, unsigned sample_rate, unsigned channels,
                char **result);
 
index add788a8f30465251ff111822b7131cd2e94054b..f48e457005ca3510fb51d5e5f95405af15d927c1 100644 (file)
@@ -169,17 +169,16 @@ void print_filter_list(void)
 }
 
 /**
- * Set select timeout of the scheduler.
+ * Request a minimal timeout if not idle.
  *
- * \param s The scheduler.
- * \param context Pointer to the filter node (task context).
+ * \param s The scheduler instance.
+ * \param context Pointer to the filter node.
  *
- * This looks at the status of the btr node of the filter. If data is available
- * in the input queue of the filter, or if an error occurred, a minimal timeout
- * for the next select call is requested from the scheduler. Otherwise the
- * scheduler timeout is left unchanged.
+ * If the buffer tree node of the given filter node has data available (or is
+ * in error state) a minimal I/O timeout is requested from the scheduler.
+ * Otherwise the function does nothing.
  */
-void generic_filter_pre_select(struct sched *s, void *context)
+void generic_filter_pre_monitor(struct sched *s, void *context)
 {
        struct filter_node *fn = context;
 
index 6a3a8effaf83b3b9e333e429762f0ff2d3938afe..2c9f8607c0d7603664ef13acd3a418f13625b8f7 100644 (file)
@@ -205,7 +205,7 @@ static bool output_queue_full(struct btr_node *btrn)
        return btr_get_output_queue_size(btrn) > FLACDEC_MAX_OUTPUT_SIZE;
 }
 
-static void flacdec_pre_select(struct sched *s, void *context)
+static void flacdec_pre_monitor(struct sched *s, void *context)
 {
        struct filter_node *fn = context;
        struct private_flacdec_data *pfd = fn->private_data;
@@ -221,7 +221,7 @@ static void flacdec_pre_select(struct sched *s, void *context)
                return sched_min_delay(s);
 }
 
-static int flacdec_post_select(__a_unused struct sched *s, void *context)
+static int flacdec_post_monitor(__a_unused struct sched *s, void *context)
 {
        struct filter_node *fn = context;
        struct private_flacdec_data *pfd = fn->private_data;
@@ -294,7 +294,7 @@ static void flacdec_open(struct filter_node *fn)
 const struct filter lsg_filter_cmd_com_flacdec_user_data = {
        .open = flacdec_open,
        .close = flacdec_close,
-       .pre_select = flacdec_pre_select,
-       .post_select = flacdec_post_select,
+       .pre_monitor = flacdec_pre_monitor,
+       .post_monitor = flacdec_post_monitor,
        .execute = flacdec_execute,
 };
index 8370649301906eaef68a4c30d1b1f3975283dea8..393e2ce331117a6d29231f551b78103b6bfe9f83 100644 (file)
@@ -89,7 +89,7 @@ err:
        return -E_GC_WRITE;
 }
 
-static void gc_pre_select(struct sched *s, void *context)
+static void gc_pre_monitor(struct sched *s, void *context)
 {
        struct grab_client *gc = context;
        int ret = btr_node_status(gc->btrn, 0, BTR_NT_LEAF);
@@ -98,14 +98,14 @@ static void gc_pre_select(struct sched *s, void *context)
                return;
        if (ret < 0)
                sched_min_delay(s);
-       para_fd_set(gc->fd, &s->wfds, &s->max_fileno);
+       sched_monitor_writefd(gc->fd, s);
 }
 
 /*
- * We need this forward declaration as post_select() needs
+ * We need this forward declaration as gc_post_monitor() needs
  * activate_grab_client and vice versa.
  */
-static int gc_post_select(struct sched *s, void *context);
+static int gc_post_monitor(struct sched *s, void *context);
 
 /**
  * Move a grab client to the active list and start it.
@@ -129,8 +129,8 @@ static void gc_activate(struct grab_client *gc, struct sched *s)
 
        gc->task = task_register(&(struct task_info) {
                .name = name,
-               .pre_select = gc_pre_select,
-               .post_select = gc_post_select,
+               .pre_monitor = gc_pre_monitor,
+               .post_monitor = gc_post_monitor,
                .context = gc,
        }, s);
 }
@@ -171,7 +171,7 @@ static int gc_close(struct grab_client *gc, int err)
                /*
                 * We must not free the gc structure here as it contains ->task
                 * which is still used because this function is called from
-                * post_select().
+                * post_monitor().
                 */
                close(gc->fd);
                gc->fd = -1;
@@ -182,7 +182,7 @@ static int gc_close(struct grab_client *gc, int err)
        return 0;
 }
 
-static int gc_post_select(__a_unused struct sched *s, void *context)
+static int gc_post_monitor(__a_unused struct sched *s, void *context)
 {
        struct grab_client *gc = context;
        struct btr_node *btrn = gc->btrn;
diff --git a/gui.c b/gui.c
index d779ff864ddebfc38a34ba1a0d166b5c04089556..72908f23e46d0762ab4442975070dc5bd46c7269 100644 (file)
--- a/gui.c
+++ b/gui.c
@@ -609,19 +609,19 @@ static void clear_all_items(void)
        }
 }
 
-static void status_pre_select(struct sched *s, void *context)
+static void status_pre_monitor(struct sched *s, void *context)
 {
        struct status_task *st = context;
 
        if (st->fd >= 0)
-               para_fd_set(st->fd, &s->rfds, &s->max_fileno);
+               sched_monitor_readfd(st->fd, s);
        if (task_get_notification(st->task) < 0)
                return sched_min_delay(s);
        if (st->fd < 0)
                sched_request_barrier_or_min_delay(&st->next_exec, s);
 }
 
-static int status_post_select(struct sched *s, void *context)
+static int status_post_monitor(__a_unused struct sched *s, void *context)
 {
        struct status_task *st = context;
        size_t sz;
@@ -667,7 +667,7 @@ static int status_post_select(struct sched *s, void *context)
        }
        assert(st->loaded < st->bufsize);
        ret = read_nonblock(st->fd, st->buf + st->loaded,
-               st->bufsize - st->loaded, &s->rfds, &sz);
+               st->bufsize - st->loaded, &sz);
        st->loaded += sz;
        ret2 = for_each_stat_item(st->buf, st->loaded, update_item);
        if (ret < 0 || ret2 < 0) {
@@ -892,9 +892,9 @@ static void reread_conf(void)
 }
 
 /* React to various signal-related events. */
-static int signal_post_select(struct sched *s, __a_unused void *context)
+static int signal_post_monitor(struct sched *s, __a_unused void *context)
 {
-       int ret = para_next_signal(&s->rfds);
+       int ret = para_next_signal();
 
        if (ret <= 0)
                return 0;
@@ -931,18 +931,18 @@ static enum exec_status exec_status(void)
        return EXEC_IDLE;
 }
 
-static void exec_pre_select(struct sched *s, void *context)
+static void exec_pre_monitor(struct sched *s, void *context)
 {
        struct exec_task *et = context;
        if (exec_fds[0] >= 0)
-               para_fd_set(exec_fds[0], &s->rfds, &s->max_fileno);
+               sched_monitor_readfd(exec_fds[0], s);
        if (exec_fds[1] >= 0)
-               para_fd_set(exec_fds[1], &s->rfds, &s->max_fileno);
+               sched_monitor_readfd(exec_fds[1], s);
        if (task_get_notification(et->task) < 0)
                sched_min_delay(s);
 }
 
-static int exec_post_select(struct sched *s, void *context)
+static int exec_post_monitor(__a_unused struct sched *s, void *context)
 {
        struct exec_task *ct = context;
        int i, ret;
@@ -963,7 +963,7 @@ static int exec_post_select(struct sched *s, void *context)
                        continue;
                ret = read_nonblock(exec_fds[i],
                        ct->command_buf[i] + ct->cbo[i],
-                       COMMAND_BUF_SIZE - 1 - ct->cbo[i], &s->rfds, &sz);
+                       COMMAND_BUF_SIZE - 1 - ct->cbo[i], &sz);
                ct->cbo[i] += sz;
                sz = ct->cbo[i];
                ct->cbo[i] = for_each_line(ct->flags[i], ct->command_buf[i],
@@ -992,10 +992,10 @@ static int exec_post_select(struct sched *s, void *context)
        return 0;
 }
 
-static void input_pre_select(struct sched *s, __a_unused void *context)
+static void input_pre_monitor(struct sched *s, __a_unused void *context)
 {
        if (exec_status() != EXEC_XCMD)
-               para_fd_set(STDIN_FILENO, &s->rfds, &s->max_fileno);
+               sched_monitor_readfd(STDIN_FILENO, s);
        if (window_update_needed())
                sched_min_delay(s);
 }
@@ -1089,7 +1089,7 @@ static void handle_command(int c)
                keyname);
 }
 
-static int input_post_select(__a_unused struct sched *s,
+static int input_post_monitor(__a_unused struct sched *s,
                __a_unused void *context)
 {
        int ret;
@@ -1115,7 +1115,7 @@ static int input_post_select(__a_unused struct sched *s,
        ret = wgetch(top.win);
        if (ret == ERR)
                return 0;
-       if (ret == KEY_RESIZE) /* already handled in signal_post_select() */
+       if (ret == KEY_RESIZE) /* already handled in signal_post_monitor() */
                return 0;
        if (exs == EXEC_IDLE)
                handle_command(ret);
@@ -1391,26 +1391,26 @@ static int setup_tasks_and_schedule(void)
        struct status_task status_task = {.fd = -1};
        struct input_task input_task = {.task = NULL};
        struct signal_task *signal_task;
-       struct sched sched = {.default_timeout = {.tv_sec = 1}};
+       struct sched sched = {.default_timeout = 1000};
 
        exec_task.task = task_register(&(struct task_info) {
                .name = "exec",
-               .pre_select = exec_pre_select,
-               .post_select = exec_post_select,
+               .pre_monitor = exec_pre_monitor,
+               .post_monitor = exec_post_monitor,
                .context = &exec_task,
        }, &sched);
 
        status_task.task = task_register(&(struct task_info) {
                .name = "status",
-               .pre_select = status_pre_select,
-               .post_select = status_post_select,
+               .pre_monitor = status_pre_monitor,
+               .post_monitor = status_post_monitor,
                .context = &status_task,
        }, &sched);
 
        input_task.task = task_register(&(struct task_info) {
                .name = "input",
-               .pre_select = input_pre_select,
-               .post_select = input_post_select,
+               .pre_monitor = input_pre_monitor,
+               .post_monitor = input_post_monitor,
                .context = &input_task,
        }, &sched);
 
@@ -1422,8 +1422,8 @@ static int setup_tasks_and_schedule(void)
        para_install_sighandler(SIGWINCH);
        signal_task->task = task_register(&(struct task_info) {
                .name = "signal",
-               .pre_select = signal_pre_select,
-               .post_select = signal_post_select,
+               .pre_monitor = signal_pre_monitor,
+               .post_monitor = signal_post_monitor,
                .context = signal_task,
        }, &sched);
        ret = schedule(&sched);
index 1fb60bad810d86dbbb0098a3714c2caa3d9e835b..59e9696b47e082f09bf6c58ccf49eadb3c3153e9 100644 (file)
@@ -56,17 +56,17 @@ static char *make_request_msg(void)
        return ret;
 }
 
-static void http_recv_pre_select(struct sched *s, void *context)
+static void http_recv_pre_monitor(struct sched *s, void *context)
 {
        struct receiver_node *rn = context;
        struct private_http_recv_data *phd = rn->private_data;
 
-       if (generic_recv_pre_select(s, rn) <= 0)
+       if (generic_recv_pre_monitor(s, rn) <= 0)
                return;
        if  (phd->status == HTTP_CONNECTED)
-               para_fd_set(rn->fd, &s->wfds, &s->max_fileno);
+               sched_monitor_writefd(rn->fd, s);
        else
-               para_fd_set(rn->fd, &s->rfds, &s->max_fileno);
+               sched_monitor_readfd(rn->fd, s);
 }
 
 /*
@@ -74,7 +74,7 @@ static void http_recv_pre_select(struct sched *s, void *context)
  * area with data read from the socket. In any case, update the state of the
  * connection if necessary.
  */
-static int http_recv_post_select(struct sched *s, void *context)
+static int http_recv_post_monitor(struct sched *s, void *context)
 {
        struct receiver_node *rn = context;
        struct private_http_recv_data *phd = rn->private_data;
@@ -93,7 +93,7 @@ static int http_recv_post_select(struct sched *s, void *context)
                return 0;
        if (phd->status == HTTP_CONNECTED) {
                char *rq;
-               if (!FD_ISSET(rn->fd, &s->wfds))
+               if (!sched_write_ok(rn->fd, s))
                        return 0;
                rq = make_request_msg();
                PARA_INFO_LOG("sending http request\n");
@@ -105,7 +105,7 @@ static int http_recv_post_select(struct sched *s, void *context)
                return 0;
        }
        if (phd->status == HTTP_SENT_GET_REQUEST) {
-               ret = read_pattern(rn->fd, HTTP_OK_MSG, strlen(HTTP_OK_MSG), &s->rfds);
+               ret = read_pattern(rn->fd, HTTP_OK_MSG, strlen(HTTP_OK_MSG));
                if (ret < 0) {
                        PARA_ERROR_LOG("did not receive HTTP OK message\n");
                        goto out;
@@ -120,7 +120,7 @@ static int http_recv_post_select(struct sched *s, void *context)
        iovcnt = btr_pool_get_buffers(rn->btrp, iov);
        if (iovcnt == 0)
                goto out;
-       ret = readv_nonblock(rn->fd, iov, iovcnt, &s->rfds, &num_bytes);
+       ret = readv_nonblock(rn->fd, iov, iovcnt, &num_bytes);
        if (num_bytes == 0)
                goto out;
        if (num_bytes <= iov[0].iov_len) /* only the first buffer was filled */
@@ -170,6 +170,6 @@ static int http_recv_open(struct receiver_node *rn)
 const struct receiver lsg_recv_cmd_com_http_user_data = {
        .open = http_recv_open,
        .close = http_recv_close,
-       .pre_select = http_recv_pre_select,
-       .post_select = http_recv_post_select,
+       .pre_monitor = http_recv_pre_monitor,
+       .post_monitor = http_recv_post_monitor,
 };
index c6b9decca941489c82b629982ddb6d6dbdb2ea33..0a90e8840cdf76e683a276d569fba3bfcd710c2b 100644 (file)
@@ -20,8 +20,8 @@
 #include "server.h"
 #include "http.h"
 #include "list.h"
-#include "send.h"
 #include "sched.h"
+#include "send.h"
 #include "vss.h"
 #include "close_on_fork.h"
 #include "fd.h"
@@ -158,7 +158,7 @@ static void http_send(long unsigned current_chunk,
        }
 }
 
-static void http_post_select(fd_set *rfds, __a_unused fd_set *wfds)
+static void http_post_monitor(__a_unused struct sched *s)
 {
        struct sender_client *sc, *tmp;
        struct private_http_sender_data *phsd;
@@ -170,7 +170,7 @@ static void http_post_select(fd_set *rfds, __a_unused fd_set *wfds)
                case HTTP_STREAMING: /* nothing to do */
                        break;
                case HTTP_CONNECTED: /* need to recv get request */
-                       ret = read_pattern(sc->fd, HTTP_GET_MSG, MAXLINE, rfds);
+                       ret = read_pattern(sc->fd, HTTP_GET_MSG, MAXLINE);
                        if (ret < 0)
                                phsd->status = HTTP_INVALID_GET_REQUEST;
                        else if (ret > 0) {
@@ -188,7 +188,7 @@ static void http_post_select(fd_set *rfds, __a_unused fd_set *wfds)
                        break;
                }
        }
-       sc = accept_sender_client(hss, rfds);
+       sc = accept_sender_client(hss);
        if (!sc)
                return;
        phsd = para_malloc(sizeof(*phsd));
@@ -196,7 +196,7 @@ static void http_post_select(fd_set *rfds, __a_unused fd_set *wfds)
        phsd->status = HTTP_CONNECTED;
 }
 
-static void http_pre_select(int *max_fileno, fd_set *rfds, fd_set *wfds)
+static void http_pre_monitor(struct sched *s)
 {
        struct sender_client *sc, *tmp;
        unsigned n;
@@ -204,15 +204,15 @@ static void http_pre_select(int *max_fileno, fd_set *rfds, fd_set *wfds)
        FOR_EACH_LISTEN_FD(n, hss) {
                if (hss->listen_fds[n] < 0)
                        continue;
-               para_fd_set(hss->listen_fds[n], rfds, max_fileno);
+               sched_monitor_readfd(hss->listen_fds[n], s);
        }
        list_for_each_entry_safe(sc, tmp, &hss->client_list, node) {
                struct private_http_sender_data *phsd = sc->private_data;
                if (phsd->status == HTTP_CONNECTED) /* need to recv get request */
-                       para_fd_set(sc->fd, rfds, max_fileno);
+                       sched_monitor_readfd(sc->fd, s);
                if (phsd->status == HTTP_GOT_GET_REQUEST ||
                                phsd->status == HTTP_INVALID_GET_REQUEST)
-                       para_fd_set(sc->fd, wfds, max_fileno);
+                       sched_monitor_writefd(sc->fd, s);
        }
 }
 
@@ -274,8 +274,8 @@ const struct sender http_sender = {
        .name = "http",
        .init = http_send_init,
        .shutdown = http_shutdown,
-       .pre_select = http_pre_select,
-       .post_select = http_post_select,
+       .pre_monitor = http_pre_monitor,
+       .post_monitor = http_post_monitor,
        .send = http_send,
        .shutdown_clients = http_shutdown_clients,
        .client_cmds = {
index 041376a1057bcf2e98a4c23c2b23c42d73a68345..812a7d5bef8a37bd3ebdbd5a2484b735827a3395 100644 (file)
@@ -258,18 +258,6 @@ static void clear_bottom_line(void)
        rl_point = point;
 }
 
-static bool input_available(void)
-{
-       fd_set rfds;
-       struct timeval tv = {0, 0};
-       int ret;
-
-       FD_ZERO(&rfds);
-       FD_SET(i9ep->ici->fds[0], &rfds);
-       ret = para_select(1, &rfds, NULL, &tv);
-       return ret > 0;
-}
-
 static void i9e_line_handler(char *line)
 {
        int ret;
@@ -294,7 +282,7 @@ free_line:
        free(line);
 }
 
-static int i9e_post_select(__a_unused struct sched *s, __a_unused void *context)
+static int i9e_post_monitor(__a_unused struct sched *s, __a_unused void *context)
 {
        int ret;
        struct i9e_client_info *ici = i9ep->ici;
@@ -310,7 +298,7 @@ static int i9e_post_select(__a_unused struct sched *s, __a_unused void *context)
        ret = 0;
        if (i9ep->caught_sigint)
                goto rm_btrn;
-       while (input_available()) {
+       while (read_ok(i9ep->ici->fds[0]) > 0) {
                if (i9ep->stdout_btrn) {
                        while (i9ep->key_sequence_length < sizeof(i9ep->key_sequence) - 1) {
                                buf = i9ep->key_sequence + i9ep->key_sequence_length;
@@ -327,7 +315,7 @@ static int i9e_post_select(__a_unused struct sched *s, __a_unused void *context)
                                i9ep->key_sequence_length++;
                                rl_stuff_char((int)(unsigned char)*buf);
                                rl_callback_read_char();
-                               if (!input_available())
+                               if (read_ok(i9ep->ici->fds[0]) <= 0)
                                        break;
                        }
                        i9ep->key_sequence_length = 0;
@@ -374,7 +362,7 @@ out:
        return ret;
 }
 
-static void i9e_pre_select(struct sched *s, __a_unused void *context)
+static void i9e_pre_monitor(struct sched *s, __a_unused void *context)
 {
        int ret;
 
@@ -389,7 +377,7 @@ static void i9e_pre_select(struct sched *s, __a_unused void *context)
                        return;
                }
                if (ret > 0)
-                       para_fd_set(i9ep->ici->fds[1], &s->wfds, &s->max_fileno);
+                       sched_monitor_writefd(i9ep->ici->fds[1], s);
        }
        /*
         * fd[0] might have been reset to blocking mode if our job was moved to
@@ -400,7 +388,7 @@ static void i9e_pre_select(struct sched *s, __a_unused void *context)
        if (ret < 0)
                PARA_WARNING_LOG("set to nonblock failed: (fd0 %d, %s)\n",
                        i9ep->ici->fds[0], para_strerror(-ret));
-       para_fd_set(i9ep->ici->fds[0], &s->rfds, &s->max_fileno);
+       sched_monitor_readfd(i9ep->ici->fds[0], s);
 }
 
 static void update_winsize(void)
@@ -477,8 +465,8 @@ int i9e_open(struct i9e_client_info *ici, struct sched *s)
                return ret;
        i9ep->task = task_register(&(struct task_info) {
                .name = "i9e",
-               .pre_select = i9e_pre_select,
-               .post_select = i9e_post_select,
+               .pre_monitor = i9e_pre_monitor,
+               .post_monitor = i9e_post_monitor,
                .context = i9ep,
        }, s);
 
@@ -605,23 +593,21 @@ void i9e_signal_dispatch(int sig_num)
 }
 
 /**
- * Wrapper for select(2) which does not restart on interrupts.
+ * Wrapper for poll(2) which handles EINTR and returns paraslash error codes.
  *
- * \param n \sa \ref para_select().
- * \param readfds \sa \ref para_select().
- * \param writefds \sa \ref para_select().
- * \param timeout_tv \sa \ref para_select().
+ * \param fds See poll(2).
+ * \param nfds See poll(2).
+ * \param timeout See poll(2).
  *
- * \return \sa \ref para_select().
+ * \return See poll(2).
  *
- * The only difference between this function and \ref para_select() is that
- * \ref i9e_select() returns zero if the select call returned \p EINTR.
+ * The only difference between this function and \ref xpoll() is that \ref
+ * i9e_poll() returns zero if the system call was interrupted while xpoll()
+ * restarts the system call in this case.
  */
-int i9e_select(int n, fd_set *readfds, fd_set *writefds,
-               struct timeval *timeout_tv)
+int i9e_poll(struct pollfd *fds, nfds_t nfds, int timeout)
 {
-       int ret = select(n, readfds, writefds, NULL, timeout_tv);
-
+       int ret = poll(fds, nfds, timeout);
        if (ret < 0) {
                if (errno == EINTR)
                        ret = 0;
index ddf02d76d2bc44a71133844fa0dddf4d9db03778..53b1ad347e6f5b108814e8db309c5434272e9ab7 100644 (file)
@@ -84,8 +84,7 @@ void i9e_print_status_bar(char *buf, unsigned len);
 void i9e_close(void);
 void i9e_signal_dispatch(int sig_num);
 __printf_2_3 void i9e_log(int ll, const char* fmt,...);
-int i9e_select(int n, fd_set *readfds, fd_set *writefds,
-               struct timeval *timeout_tv);
+int i9e_poll(struct pollfd *fds, nfds_t nfds, int timeout);
 int i9e_extract_completions(const char *word, char **string_list,
                char ***result);
 char **i9e_complete_commands(const char *word, struct i9e_completer *completers);
index ccb1553b820357e13f2cafe2c0e8a7e0502ac8fd..6a196f3a06c06a42ecca40fde56de3f0d3cf4926 100644 (file)
@@ -73,7 +73,7 @@ static void mp3dec_close(struct filter_node *fn)
 
 #define MP3DEC_MAX_FRAME 8192
 
-static int mp3dec_post_select(__a_unused struct sched *s, void *context)
+static int mp3dec_post_monitor(__a_unused struct sched *s, void *context)
 {
        struct filter_node *fn = context;
        int i, ret;
@@ -93,7 +93,7 @@ next_buffer:
        btr_merge(btrn, fn->min_iqs);
        len = btr_next_buffer(btrn, &inbuffer);
        /*
-        * Decode at most 8K in one go to give the post_select() functions of
+        * Decode at most 8K in one go to give the post_monitor() functions of
         * other buffer tree nodes a chance to run. This is necessary to avoid
         * buffer underruns on slow machines.
         */
@@ -187,7 +187,7 @@ static int mp3dec_execute(struct btr_node *btrn, const char *cmd, char **result)
 const struct filter lsg_filter_cmd_com_mp3dec_user_data = {
        .open = mp3dec_open,
        .close = mp3dec_close,
-       .pre_select = generic_filter_pre_select,
-       .post_select = mp3dec_post_select,
+       .pre_monitor = generic_filter_pre_monitor,
+       .post_monitor = mp3dec_post_monitor,
        .execute = mp3dec_execute,
 };
diff --git a/net.c b/net.c
index e1951e5e8d87501b1ef9096d3f3df64117e904f0..4a6f9a63cd8475f8e7a0f583be3ac7e3d205c1a7 100644 (file)
--- a/net.c
+++ b/net.c
@@ -801,25 +801,21 @@ int recv_buffer(int fd, char *buf, size_t size)
  * Wrapper around the accept system call.
  *
  * \param fd The listening socket.
- * \param rfds An optional fd_set pointer.
  * \param addr Structure which is filled in with the address of the peer socket.
  * \param size Should contain the size of the structure pointed to by \a addr.
  * \param new_fd Result pointer.
  *
- * Accept incoming connections on \a addr, retry if interrupted. If \a rfds is
- * not \p NULL, return 0 if \a fd is not set in \a rfds without calling accept().
+ * Accept incoming connections on addr, retry if interrupted.
  *
  * \return Negative on errors, zero if no connections are present to be accepted,
  * one otherwise.
  *
  * \sa accept(2).
  */
-int para_accept(int fd, fd_set *rfds, void *addr, socklen_t size, int *new_fd)
+int para_accept(int fd, void *addr, socklen_t size, int *new_fd)
 {
        int ret;
 
-       if (rfds && !FD_ISSET(fd, rfds))
-               return 0;
        do
                ret = accept(fd, (struct sockaddr *) addr, &size);
        while (ret < 0 && errno == EINTR);
diff --git a/net.h b/net.h
index 2256f376497b89d3382e2bf80ce09725ef64ab02..fd89dc5db0695ae6285484e0755ae0d28b286693 100644 (file)
--- a/net.h
+++ b/net.h
@@ -143,7 +143,7 @@ extern int generic_max_transport_msg_size(int sockfd);
 int recv_bin_buffer(int fd, char *buf, size_t size);
 int recv_buffer(int fd, char *buf, size_t size);
 
-int para_accept(int fd, fd_set *rfds, void *addr, socklen_t size, int *new_fd);
+int para_accept(int fd, void *addr, socklen_t size, int *new_fd);
 int create_local_socket(const char *name);
 int connect_local_socket(const char *name);
 int recv_cred_buffer(int, char *, size_t);
index 708a27e52b68c1c2add01a3b3fe39f721485e39f..a6fa056aa9bc354082542563b9d53864e01f390a 100644 (file)
@@ -178,13 +178,13 @@ out:
 
 /**
   * Allocate chunks of this size and produce at most one chunk of output per
-  * ->post_select() invocation. If the buffer could only be filled partially
+  * ->post_monitor() invocation. If the buffer could only be filled partially
   * due to insufficient input being available, it is shrunk to the real output
   * size and the resized buffer is fed into the output queue.
   */
 #define OGGDEC_OUTPUT_CHUNK_SIZE (32 * 1024)
 
-static void ogg_pre_select(struct sched *s, void *context)
+static void ogg_pre_monitor(struct sched *s, void *context)
 {
        struct filter_node *fn = context;
        struct private_oggdec_data *pod = fn->private_data;
@@ -201,7 +201,7 @@ static void ogg_pre_select(struct sched *s, void *context)
        sched_min_delay(s);
 }
 
-static int ogg_post_select(__a_unused struct sched *s, void *context)
+static int ogg_post_monitor(__a_unused struct sched *s, void *context)
 {
        struct filter_node *fn = context;
        struct private_oggdec_data *pod = fn->private_data;
@@ -262,7 +262,7 @@ out:
 const struct filter lsg_filter_cmd_com_oggdec_user_data = {
        .open = ogg_open,
        .close = ogg_close,
-       .pre_select = ogg_pre_select,
-       .post_select = ogg_post_select,
+       .pre_monitor = ogg_pre_monitor,
+       .post_monitor = ogg_post_monitor,
        .execute = oggdec_execute
 };
index 10ed394d295072d909441fe211fbcdc695ff62d0..31f9640bbb915fc11ff1665c84fae0a61b902645 100644 (file)
@@ -207,7 +207,7 @@ static int decode_packet(struct opusdec_context *ctx, ogg_packet *op,
 
 #define OPUSDEC_MAX_OUTPUT_SIZE (1024 * 1024)
 
-static int opusdec_post_select(__a_unused struct sched *s, void *context)
+static int opusdec_post_monitor(__a_unused struct sched *s, void *context)
 {
        struct filter_node *fn = context;
        struct opusdec_context *ctx = fn->private_data;
@@ -269,7 +269,7 @@ out:
        return ret;
 }
 
-static void opusdec_pre_select(struct sched *s, void *context)
+static void opusdec_pre_monitor(struct sched *s, void *context)
 {
        struct filter_node *fn = context;
        struct opusdec_context *ctx = fn->private_data;
@@ -286,7 +286,7 @@ static void opusdec_pre_select(struct sched *s, void *context)
 const struct filter lsg_filter_cmd_com_opusdec_user_data = {
        .open = opusdec_open,
        .close = opusdec_close,
-       .pre_select = opusdec_pre_select,
-       .post_select = opusdec_post_select,
+       .pre_monitor = opusdec_pre_monitor,
+       .post_monitor = opusdec_post_monitor,
        .execute = opusdec_execute,
 };
index 0565167c256a43bfe86a6442f195a320e1458db8..1a837e5700c7cfdb7e86951f8ae999645c49bd67 100644 (file)
@@ -61,7 +61,7 @@ static int get_oss_format(enum sample_format sf)
        }
 }
 
-static void oss_pre_select(struct sched *s, void *context)
+static void oss_pre_monitor(struct sched *s, void *context)
 {
        struct writer_node *wn = context;
        struct private_oss_write_data *powd = wn->private_data;
@@ -71,7 +71,7 @@ static void oss_pre_select(struct sched *s, void *context)
                return;
        if (ret < 0 || !powd)
                return sched_min_delay(s);
-       para_fd_set(powd->fd, &s->wfds, &s->max_fileno);
+       sched_monitor_writefd(powd->fd, s);
 }
 
 static void oss_close(struct writer_node *wn)
@@ -178,7 +178,7 @@ err_free:
        return ret;
 }
 
-static int oss_post_select(__a_unused struct sched *s, void *context)
+static int oss_post_monitor(__a_unused struct sched *s, void *context)
 {
        struct writer_node *wn = context;
        struct private_oss_write_data *powd = wn->private_data;
@@ -222,7 +222,7 @@ static int oss_post_select(__a_unused struct sched *s, void *context)
                goto out;
        }
        ret = 0;
-       if (!FD_ISSET(powd->fd, &s->wfds))
+       if (!sched_write_ok(powd->fd, s))
                goto out;
        /* get maximal number of bytes that can be written */
        ret = ioctl(powd->fd, SNDCTL_DSP_GETOSPACE, &abi);
@@ -245,7 +245,7 @@ out:
 }
 
 const struct writer lsg_write_cmd_com_oss_user_data = {
-       .pre_select = oss_pre_select,
-       .post_select = oss_post_select,
+       .pre_monitor = oss_pre_monitor,
+       .post_monitor = oss_post_monitor,
        .close = oss_close,
 };
diff --git a/para.h b/para.h
index b406818bd52a27bfc1794e0c95d8f9c0e9ef12a0..2525bee6e3a276da039790c0f39e056aed808f94 100644 (file)
--- a/para.h
+++ b/para.h
@@ -20,6 +20,8 @@
 #include <stdbool.h>
 #include <inttypes.h>
 #include <sys/uio.h>
+#include <poll.h>
+
 #include "gcc-compat.h"
 
 /** used in various contexts */
diff --git a/play.c b/play.c
index ba9fff70190c7e150b136e15eb5375015897b2d0..66383ebe0b8cb6a742801cfd33cd08155ee857b1 100644 (file)
--- a/play.c
+++ b/play.c
@@ -50,7 +50,7 @@ static struct lls_parse_result *play_lpr;
  * Describes a request to change the state of para_play.
  *
  * There is only one variable of this type: \a rq of the global play task
- * structure. Command handlers only set this variable and the post_select()
+ * structure. Command handlers only set this variable and the post_monitor()
  * function of the play task investigates its value during each iteration of
  * the scheduler run and performs the actual work.
  */
@@ -117,7 +117,7 @@ INIT_STDERR_LOGGING(loglevel);
 
 char *stat_item_values[NUM_STAT_ITEMS] = {NULL};
 
-static struct sched sched = {.max_fileno = 0};
+static struct sched sched;
 static struct play_task play_task, *pt = &play_task;
 
 #define AFH_RECV_CMD (lls_cmd(LSG_RECV_CMD_CMD_AFH, recv_cmd_suite))
@@ -405,16 +405,16 @@ static int load_file(void)
        pt->rn.task = task_register(
                &(struct task_info) {
                        .name = lls_command_name(AFH_RECV_CMD),
-                       .pre_select = AFH_RECV->pre_select,
-                       .post_select = AFH_RECV->post_select,
+                       .pre_monitor = AFH_RECV->pre_monitor,
+                       .post_monitor = AFH_RECV->post_monitor,
                        .context = &pt->rn
                }, &sched);
        sprintf(buf, "%s decoder", af);
        pt->fn.task = task_register(
                &(struct task_info) {
                        .name = buf,
-                       .pre_select = decoder->pre_select,
-                       .post_select = decoder->post_select,
+                       .pre_monitor = decoder->pre_monitor,
+                       .post_monitor = decoder->post_monitor,
                        .context = &pt->fn
                }, &sched);
        register_writer_node(&pt->wn, pt->fn.btrn, &sched);
@@ -1075,7 +1075,7 @@ static void session_open(void)
        sigemptyset(&act.sa_mask);
        act.sa_flags = 0;
        sigaction(SIGWINCH, &act, NULL);
-       sched.select_function = i9e_select;
+       sched.poll_function = i9e_poll;
 
        ici.bound_keyseqs = get_mapped_keyseqs();
        pt->btrn = ici.producer = btr_new_node(&(struct btr_node_description)
@@ -1109,16 +1109,16 @@ static void session_update_time_string(char *str, unsigned len)
 /*
  * If we are about to die we must call i9e_close() to reset the terminal.
  * However, i9e_close() must be called in *this* context, i.e. from
- * play_task.post_select() rather than from i9e_post_select(), because
+ * play_task.post_monitor() rather than from i9e_post_monitor(), because
  * otherwise i9e would access freed memory upon return. So the play task must
  * stay alive until the i9e task terminates.
  *
  * We achieve this by sending a fake SIGTERM signal via i9e_signal_dispatch()
- * and reschedule. In the next iteration, i9e->post_select returns an error and
+ * and reschedule. In the next iteration, i9e->post_monitor returns an error and
  * terminates. Subsequent calls to i9e_get_error() then return negative and we
  * are allowed to call i9e_close() and terminate as well.
  */
-static int session_post_select(__a_unused struct sched *s)
+static int session_post_monitor(__a_unused struct sched *s)
 {
        int ret;
 
@@ -1141,11 +1141,11 @@ static int session_post_select(__a_unused struct sched *s)
 
 #else /* HAVE_READLINE */
 
-static int session_post_select(struct sched *s)
+static int session_post_monitor(struct sched *s)
 {
        char c;
 
-       if (!FD_ISSET(STDIN_FILENO, &s->rfds))
+       if (!sched_read_ok(STDIN_FILENO, s))
                return 0;
        if (read(STDIN_FILENO, &c, 1))
                do_nothing;
@@ -1164,11 +1164,11 @@ static void session_update_time_string(char *str, __a_unused unsigned len)
 }
 #endif /* HAVE_READLINE */
 
-static void play_pre_select(struct sched *s, __a_unused void *context)
+static void play_pre_monitor(struct sched *s, __a_unused void *context)
 {
        char state;
 
-       para_fd_set(STDIN_FILENO, &s->rfds, &s->max_fileno);
+       sched_monitor_readfd(STDIN_FILENO, s);
        state = get_playback_state();
        if (state == 'R' || state == 'F' || state == 'X')
                return sched_min_delay(s);
@@ -1202,7 +1202,7 @@ static unsigned get_time_string(char **result)
        );
 }
 
-static int play_post_select(struct sched *s, __a_unused void *context)
+static int play_post_monitor(struct sched *s, __a_unused void *context)
 {
        int ret;
 
@@ -1211,7 +1211,7 @@ static int play_post_select(struct sched *s, __a_unused void *context)
                pt->rq = CRT_TERM_RQ;
                return 0;
        }
-       ret = session_post_select(s);
+       ret = session_post_monitor(s);
        if (ret < 0)
                goto out;
        if (!pt->wn.btrn && !pt->fn.btrn) {
@@ -1255,7 +1255,7 @@ int main(int argc, char *argv[])
        int ret;
        unsigned num_inputs;
 
-       sched.default_timeout.tv_sec = 5;
+       sched.default_timeout = 5000;
        parse_config_or_die(argc, argv);
        session_open();
        num_inputs = lls_num_inputs(play_lpr);
@@ -1265,8 +1265,8 @@ int main(int argc, char *argv[])
        pt->playing = true;
        pt->task = task_register(&(struct task_info){
                .name = "play",
-               .pre_select = play_pre_select,
-               .post_select = play_post_select,
+               .pre_monitor = play_pre_monitor,
+               .post_monitor = play_post_monitor,
                .context = pt,
        }, &sched);
        ret = schedule(&sched);
index 9a801900c157e1da2da979c92d7bd60d03e21b9c..031aa47e6c57efa0157a241a974c11af8bf64882 100644 (file)
@@ -22,7 +22,7 @@ struct private_prebuffer_data {
        struct timeval barrier;
 };
 
-static void prebuffer_pre_select(struct sched *s, void *context)
+static void prebuffer_pre_monitor(struct sched *s, void *context)
 {
        struct filter_node *fn = context;
        struct btr_node *btrn = fn->btrn;
@@ -50,7 +50,7 @@ static void prebuffer_close(struct filter_node *fn)
        free(fn->private_data);
 }
 
-static int prebuffer_post_select(__a_unused struct sched *s, void *context)
+static int prebuffer_post_monitor(__a_unused struct sched *s, void *context)
 {
        struct filter_node *fn = context;
        struct btr_node *btrn = fn->btrn;
@@ -87,6 +87,6 @@ static void prebuffer_open(struct filter_node *fn)
 const struct filter lsg_filter_cmd_com_prebuffer_user_data = {
        .open = prebuffer_open,
        .close = prebuffer_close,
-       .pre_select = prebuffer_pre_select,
-       .post_select = prebuffer_post_select,
+       .pre_monitor = prebuffer_pre_monitor,
+       .post_monitor = prebuffer_post_monitor,
 };
diff --git a/recv.c b/recv.c
index 10d55d218071ccf6a314fc87d4e5f78001012db4..68417187348b058b375993b9a74eb6c2ca7d826b 100644 (file)
--- a/recv.c
+++ b/recv.c
@@ -97,13 +97,12 @@ int main(int argc, char *argv[])
        stdout_task_register(&sot, &s);
 
        ti.name = lls_command_name(cmd);
-       ti.pre_select = r->pre_select;
-       ti.post_select = r->post_select;
+       ti.pre_monitor = r->pre_monitor;
+       ti.post_monitor = r->post_monitor;
        ti.context = &rn;
        rn.task = task_register(&ti, &s);
 
-       s.default_timeout.tv_sec = 1;
-       s.default_timeout.tv_usec = 0;
+       s.default_timeout = 1000;
        ret = schedule(&s);
        sched_shutdown(&s);
        r->close(&rn);
diff --git a/recv.h b/recv.h
index 36b0f1db62e348c9ab7ccdfc3b32ba656d698c5f..391395b241d67abf12eb70996a10ee7950519087 100644 (file)
--- a/recv.h
+++ b/recv.h
@@ -21,11 +21,11 @@ struct receiver_node {
        /**
         * The file descriptor to receive the stream.
         *
-        * The pre_select function of the receiver adds this file descriptor to
+        * The pre_monitor function of the receiver adds this file descriptor to
         * the set of file descriptors which are watched for readability or
         * writability, depending on the state of the connection (if any).
         *
-        * If \a fd is readable, the post_select function of the receiver reads
+        * If \a fd is readable, the post_monitor function of the receiver reads
         * data from this fd into the buffer pool area of \a btrp.
         *
         * \sa \ref receiver.
@@ -34,9 +34,18 @@ struct receiver_node {
 };
 
 /**
- * Describes one supported paraslash receiver.
+ * Describes a possible data source for audio streams.
  *
- * \sa \ref http_recv.c, \ref udp_recv.c.
+ * A paraslash receiver is a modular piece of software which is capable of
+ * receiving an audio data stream from a data source. Received audio data is
+ * fed to consumers through the buffer tree mechanism.
+ *
+ * This structure contains the methods which have to be implemented by each
+ * receiver.
+ *
+ * \sa \ref http_recv.c, \ref udp_recv.c, \ref dccp_recv.c, \ref afh_recv.c,
+ * struct \ref receiver_node, struct \ref filter, struct \ref writer, struct
+ * \ref sched.
  */
 struct receiver {
        /**
@@ -45,8 +54,6 @@ struct receiver {
         * This should allocate the output buffer of the given receiver node
         * and prepare it for retrieving the audio stream according to the
         * configuration stored in rn->lpr.
-        *
-        * \sa struct \ref receiver_node.
         */
        int (*open)(struct receiver_node *rn);
        /**
@@ -58,31 +65,10 @@ struct receiver {
         * \sa \ref receiver_node.
         */
        void (*close)(struct receiver_node *rn);
-       /**
-        * Add file descriptors to fd_sets and compute timeout for select(2).
-        *
-        * If this is not NULL, the function is called in each iteration of the
-        * scheduler's select loop. The receiver may define it to add file
-        * descriptors to the file descriptor sets given by s. Those will be
-        * monitored in the subsequent call to select(2). The function may also
-        * lower the timeout value of s to make select(2) return earlier if no
-        * file descriptors are ready for I/O.
-        *
-        * \sa select(2), \ref time.c, struct \ref sched.
-        */
-       void (*pre_select)(struct sched *s, void *context);
-       /**
-        * Evaluate the result from select(2).
-        *
-        * This is called after the call to select(2). It should check all file
-        * descriptors which were added to any of the fd sets in the previous
-        * call to ->pre_select() and perform (non-blocking) I/O operations on
-        * those fds which are ready for I/O, for example in order to establish
-        * a connection or to receive a part of the audio stream.
-        *
-        * \sa select(2), struct \ref receiver.
-        */
-       int (*post_select)(struct sched *s, void *context);
+       /** Ask the scheduler to monitor receive fds. */
+       void (*pre_monitor)(struct sched *s, void *context);
+       /** Receive data and make it available to consumers. */
+       int (*post_monitor)(struct sched *s, void *context);
        /**
         * Answer a buffer tree query.
         *
@@ -110,4 +96,4 @@ struct receiver {
 
 int check_receiver_arg(const char *ra, struct lls_parse_result **lprp);
 void print_receiver_helps(bool detailed);
-int generic_recv_pre_select(struct sched *s, struct receiver_node *rn);
+int generic_recv_pre_monitor(struct sched *s, struct receiver_node *rn);
index 31fd81f1ec52c3d0b6204e8b3663a7d36e14fbb3..ad34991c6a5b281aa6c39e392befa4fe64422335 100644 (file)
@@ -98,19 +98,19 @@ void print_receiver_helps(bool detailed)
 }
 
 /**
- * Simple pre-select hook, used by all receivers.
+ * Request a minimal timeout in case of buffer tree errors.
  *
- * \param s Scheduler info.
- * \param rn The receiver node.
+ * \param s The scheduler instance.
+ * \param rn The buffer tree node is derived from this.
  *
- * This requests a minimal delay from the scheduler if the status of the buffer
- * tree node indicates an error/eof condition. No file descriptors are added to
- * the fd sets of \a s.
+ * If the buffer tree node of the given receiver node is in error or EOF state,
+ * a minimal I/O timeout is requested from the scheduler. Otherwise, the
+ * function does nothing. No file descriptors are asked to be monitored.
  *
- * \return The status of the btr node of the receiver node, i.e. the return
- * value of the underlying call to \ref btr_node_status().
+ * \return The status of of the receiver node's buffer tree node. That is, the
+ * return value of the underlying call to \ref btr_node_status().
  */
-int generic_recv_pre_select(struct sched *s, struct receiver_node *rn)
+int generic_recv_pre_monitor(struct sched *s, struct receiver_node *rn)
 {
        int ret = btr_node_status(rn->btrn, 0, BTR_NT_ROOT);
 
index bbdda51c525630c6d1411dfa547193f8ccdae9ce..01b4ac48ce11b97d3199520a6e6f1342db87d470 100644 (file)
@@ -62,7 +62,7 @@ static void resample_open(struct filter_node *fn)
        btr_log_tree(btr_parent(btr_parent(btrn)), LL_INFO);
 }
 
-static void resample_pre_select(struct sched *s, void *context)
+static void resample_pre_monitor(struct sched *s, void *context)
 {
        struct filter_node *fn = context;
        struct resample_context *ctx = fn->private_data;
@@ -70,7 +70,7 @@ static void resample_pre_select(struct sched *s, void *context)
 
        if (ret != 0)
                return sched_min_delay(s);
-       check_wav_pre_select(s, ctx->cwc);
+       check_wav_pre_monitor(s, ctx->cwc);
 }
 
 static int get_btr_val(const char *what, struct btr_node *btrn)
@@ -187,7 +187,7 @@ static int resample_frames(int16_t *in, size_t num_frames, bool have_more,
        return data.input_frames_used;
 }
 
-static int resample_post_select(__a_unused struct sched *s, void *context)
+static int resample_post_monitor(__a_unused struct sched *s, void *context)
 {
        int ret;
        struct filter_node *fn = context;
@@ -197,7 +197,7 @@ static int resample_post_select(__a_unused struct sched *s, void *context)
        size_t in_bytes, num_frames;
        bool have_more;
 
-       ret = check_wav_post_select(ctx->cwc);
+       ret = check_wav_post_monitor(ctx->cwc);
        if (ret < 0)
                goto out;
        ret = btr_node_status(btrn, fn->min_iqs, BTR_NT_INTERNAL);
@@ -236,7 +236,7 @@ out:
        if (ret < 0) {
                btr_remove_node(&fn->btrn);
                /* This releases the check_wav btr node */
-               check_wav_post_select(ctx->cwc);
+               check_wav_post_monitor(ctx->cwc);
        }
        return ret;
 }
@@ -277,8 +277,8 @@ static void resample_teardown(__a_unused const struct lls_parse_result *lpr,
 const struct filter lsg_filter_cmd_com_resample_user_data = {
        .setup = resample_setup,
        .open = resample_open,
-       .pre_select = resample_pre_select,
-       .post_select = resample_post_select,
+       .pre_monitor = resample_pre_monitor,
+       .post_monitor = resample_post_monitor,
        .close = resample_close,
        .teardown = resample_teardown,
        .execute = resample_execute
diff --git a/sched.c b/sched.c
index aac8efed1bcf8d897e6aef4d07973f2babc8d4f4..eb8d03c2bd99fdd1990d307aec977040c2611909 100644 (file)
--- a/sched.c
+++ b/sched.c
@@ -17,9 +17,9 @@
  * The possible states of a task.
  *
  * In addition to the states listed here, a task may also enter zombie state.
- * This happens when its ->post_select function returns negative, the ->status
+ * This happens when its ->post_monitor function returns negative, the ->status
  * field is then set to this return value. Such tasks are not scheduled any
- * more (i.e. ->pre_select() and ->post_select() are no longer called), but
+ * more (i.e. ->pre_monitor() and ->post_monitor() are no longer called), but
  * they stay on the scheduler task list until \ref task_reap() or
  * \ref sched_shutdown() is called.
  */
@@ -46,7 +46,7 @@ struct task {
 static struct timeval now_struct;
 const struct timeval *now = &now_struct;
 
-static void sched_preselect(struct sched *s)
+static void sched_pre_monitor(struct sched *s)
 {
        struct task *t, *tmp;
 
@@ -55,8 +55,8 @@ static void sched_preselect(struct sched *s)
                        continue;
                if (t->notification != 0)
                        sched_min_delay(s);
-               if (t->info.pre_select)
-                       t->info.pre_select(s, t->info.context);
+               if (t->info.pre_monitor)
+                       t->info.pre_monitor(s, t->info.context);
        }
 }
 
@@ -72,29 +72,29 @@ static void unlink_and_free_task(struct task *t)
 }
 
 //#define SCHED_DEBUG 1
-static inline void call_post_select(struct sched *s, struct task *t)
+static inline void call_post_monitor(struct sched *s, struct task *t)
 {
        int ret;
 
 #ifndef SCHED_DEBUG
-       ret = t->info.post_select(s, t->info.context);
+       ret = t->info.post_monitor(s, t->info.context);
 #else
        struct timeval t1, t2, diff;
        unsigned long pst;
 
        clock_get_realtime(&t1);
-       ret = t->info.post_select(s, t->info.context);
+       ret = t->info.post_monitor(s, t->info.context);
        clock_get_realtime(&t2);
        tv_diff(&t1, &t2, &diff);
        pst = tv2ms(&diff);
        if (pst > 50)
-               PARA_WARNING_LOG("%s: post_select time: %lums\n",
+               PARA_WARNING_LOG("%s: post_monitor time: %lums\n",
                        t->name, pst);
 #endif
        t->status = ret < 0? ret : TS_RUNNING;
 }
 
-static unsigned sched_post_select(struct sched *s)
+static unsigned sched_post_monitor(struct sched *s)
 {
        struct task *t, *tmp;
        unsigned num_running_tasks = 0;
@@ -103,7 +103,7 @@ static unsigned sched_post_select(struct sched *s)
                if (t->status == TS_DEAD) /* task has been reaped */
                        unlink_and_free_task(t);
                else if (t->status == TS_RUNNING) {
-                       call_post_select(s, t); /* sets t->status */
+                       call_post_monitor(s, t); /* sets t->status */
                        t->notification = 0;
                        if (t->status == TS_RUNNING)
                                num_running_tasks++;
@@ -117,13 +117,13 @@ static unsigned sched_post_select(struct sched *s)
  *
  * \param s Pointer to the scheduler struct.
  *
- * This function updates the global \a now pointer, calls all registered
- * pre_select hooks which may set the timeout and add any file descriptors to
- * the fd sets of \a s.  Next, it calls para_select() and makes the result available
- * to the registered tasks by calling their post_select hook.
+ * This function updates the global now pointer, calls all registered
+ * pre_monitor hooks which may set the timeout and add any file descriptors to
+ * the pollfd array. Next, it calls the poll function and makes the result
+ * available to the registered tasks by calling their post_monitor hook.
  *
  * \return Zero if no more tasks are left in the task list, negative if the
- * select function returned an error.
+ * poll function returned an error.
  *
  * \sa \ref now.
  */
@@ -132,31 +132,20 @@ int schedule(struct sched *s)
        int ret;
        unsigned num_running_tasks;
 
-       if (!s->select_function)
-               s->select_function = para_select;
+       if (!s->poll_function)
+               s->poll_function = xpoll;
 again:
-       FD_ZERO(&s->rfds);
-       FD_ZERO(&s->wfds);
-       s->select_timeout = s->default_timeout;
-       s->max_fileno = -1;
+       s->num_pfds = 0;
+       if (s->pidx)
+               memset(s->pidx, 0xff, s->pidx_array_len * sizeof(unsigned));
+       s->timeout = s->default_timeout;
        clock_get_realtime(&now_struct);
-       sched_preselect(s);
-       ret = s->select_function(s->max_fileno + 1, &s->rfds, &s->wfds,
-               &s->select_timeout);
+       sched_pre_monitor(s);
+       ret = s->poll_function(s->pfd, s->num_pfds, s->timeout);
        if (ret < 0)
                return ret;
-       if (ret == 0) {
-               /*
-                * APUE: Be careful not to check the descriptor sets on return
-                * unless the return value is greater than zero. The return
-                * state of the descriptor sets is implementation dependent if
-                * either a signal is caught or the timer expires.
-                */
-               FD_ZERO(&s->rfds);
-               FD_ZERO(&s->wfds);
-       }
        clock_get_realtime(&now_struct);
-       num_running_tasks = sched_post_select(s);
+       num_running_tasks = sched_post_monitor(s);
        if (num_running_tasks == 0)
                return 0;
        goto again;
@@ -197,7 +186,7 @@ int task_reap(struct task **tptr)
        /*
         * With list_for_each_entry_safe() it is only safe to remove the
         * _current_ list item. Since we are being called from the loop in
-        * schedule() via some task's ->post_select() function, freeing the
+        * schedule() via some task's ->post_monitor() function, freeing the
         * given task here would result in use-after-free bugs in schedule().
         * So we only set the task status to TS_DEAD which tells schedule() to
         * free the task in the next iteration of its loop.
@@ -226,6 +215,8 @@ void sched_shutdown(struct sched *s)
                                t->name);
                unlink_and_free_task(t);
        }
+       free(s->pfd);
+       free(s->pidx);
 }
 
 /**
@@ -241,7 +232,7 @@ struct task *task_register(struct task_info *info, struct sched *s)
 {
        struct task *t = para_malloc(sizeof(*t));
 
-       assert(info->post_select);
+       assert(info->post_monitor);
 
        if (!s->task_list.next)
                init_list_head(&s->task_list);
@@ -288,14 +279,14 @@ char *get_task_list(struct sched *s)
  * \param err A positive error code.
  *
  * Tasks which honor notifications are supposed to call \ref
- * task_get_notification() in their post_select function and act on the
+ * task_get_notification() in their post_monitor function and act on the
  * returned notification value.
  *
- * If the scheduler detects during its pre_select loop that at least one task
- * has been notified, the loop terminates, and the post_select methods of all
+ * If the scheduler detects during its pre_monitor loop that at least one task
+ * has been notified, the loop terminates, and the post_monitor methods of all
  * taks are immediately called again.
  *
- * The notification for a task is reset after the call to its post_select
+ * The notification for a task is reset after the call to its post_monitor
  * method.
  *
  * \sa \ref task_get_notification().
@@ -316,7 +307,7 @@ void task_notify(struct task *t, int err)
  *
  * \return The notification value. If this is negative, the task has been
  * notified by another task. Tasks are supposed to check for notifications by
- * calling this function from their post_select method.
+ * calling this function from their post_monitor method.
  *
  * \sa \ref task_notify().
  */
@@ -362,43 +353,43 @@ void task_notify_all(struct sched *s, int err)
 }
 
 /**
- * Set the select timeout to the minimal possible value.
+ * Set the I/O timeout to the minimal possible value.
  *
  * \param s Pointer to the scheduler struct.
  *
- * This causes the next select() call to return immediately.
+ * This causes the next poll() call to return immediately.
  */
 void sched_min_delay(struct sched *s)
 {
-       s->select_timeout.tv_sec = s->select_timeout.tv_usec = 0;
+       s->timeout = 0;
 }
 
 /**
- * Impose an upper bound for the timeout of the next select() call.
+ * Impose an upper bound for the I/O timeout.
  *
  * \param to Maximal allowed timeout.
  * \param s Pointer to the scheduler struct.
  *
- * If the current scheduler timeout is already smaller than \a to, this
- * function does nothing. Otherwise the timeout for the next select() call is
- * set to the given value.
+ * If the current I/O timeout is already smaller than to, this function does
+ * nothing. Otherwise the timeout is set to the given value.
  *
  * \sa \ref sched_request_timeout_ms().
  */
 void sched_request_timeout(struct timeval *to, struct sched *s)
 {
-       if (tv_diff(&s->select_timeout, to, NULL) > 0)
-               s->select_timeout = *to;
+       long unsigned ms = tv2ms(to);
+       if (s->timeout > ms)
+               s->timeout = ms;
 }
 
 /**
- * Force the next select() call to return before the given amount of milliseconds.
+ * Bound the I/O timeout to at most the given amount of milliseconds.
  *
  * \param ms The maximal allowed timeout in milliseconds.
  * \param s Pointer to the scheduler struct.
  *
- * Like sched_request_timeout() this imposes an upper bound on the timeout
- * value for the next select() call.
+ * Like \ref sched_request_timeout() this imposes an upper bound on the I/O
+ * timeout.
  */
 void sched_request_timeout_ms(long unsigned ms, struct sched *s)
 {
@@ -408,13 +399,13 @@ void sched_request_timeout_ms(long unsigned ms, struct sched *s)
 }
 
 /**
- * Force the next select() call to return before the given future time.
+ * Bound the I/O timeout by an absolute time in the future.
  *
- * \param barrier Absolute time before select() should return.
+ * \param barrier Defines the upper bound for the timeout.
  * \param s Pointer to the scheduler struct.
  *
- * \return If \a barrier is in the past, this function does nothing and returns
- * zero. Otherwise it returns one.
+ * \return If the barrier is in the past, this function does nothing and
+ * returns zero. Otherwise it returns one.
  *
  * \sa \ref sched_request_barrier_or_min_delay().
  */
@@ -429,12 +420,12 @@ int sched_request_barrier(struct timeval *barrier, struct sched *s)
 }
 
 /**
- * Force the next select() call to return before the given time.
+ * Bound the I/O timeout or request a minimal delay.
  *
- * \param barrier Absolute time before select() should return.
+ * \param barrier Absolute time as in \ref sched_request_barrier().
  * \param s Pointer to the scheduler struct.
  *
- * \return If \a barrier is in the past, this function requests a minimal
+ * \return If the barrier is in the past, this function requests a minimal
  * timeout and returns zero. Otherwise it returns one.
  *
  * \sa \ref sched_min_delay(), \ref sched_request_barrier().
@@ -450,3 +441,126 @@ int sched_request_barrier_or_min_delay(struct timeval *barrier, struct sched *s)
        sched_request_timeout(&diff, s);
        return 1;
 }
+
+static void add_pollfd(int fd, struct sched *s, short events)
+{
+       assert(fd >= 0);
+#if 0
+       {
+               int flags = fcntl(fd, F_GETFL);
+               if (!(flags & O_NONBLOCK)) {
+                       PARA_EMERG_LOG("fd %d is a blocking file descriptor\n", fd);
+                       exit(EXIT_FAILURE);
+               }
+       }
+#endif
+       if (s->pidx_array_len > fd) { /* is fd already registered? */
+               if (s->pidx[fd] < s->pfd_array_len) { /* yes, it is */
+                       assert(s->pfd[s->pidx[fd]].fd == fd);
+                       s->pfd[s->pidx[fd]].events |= events;
+                       return;
+               }
+       } else { /* need to extend the index array */
+               unsigned old_len = s->pidx_array_len;
+               while (s->pidx_array_len <= fd)
+                       s->pidx_array_len = s->pidx_array_len * 2 + 1;
+               PARA_INFO_LOG("pidx array len: %u\n", s->pidx_array_len);
+               s->pidx = para_realloc(s->pidx,
+                       s->pidx_array_len * sizeof(unsigned));
+               memset(s->pidx + old_len, 0xff,
+                       (s->pidx_array_len - old_len) * sizeof(unsigned));
+       }
+       /*
+        * The given fd is not part of the pfd array yet. Initialize pidx[fd]
+        * to point at the next unused slot of this array and initialize the
+        * slot.
+        */
+       s->pidx[fd] = s->num_pfds;
+       if (s->pfd_array_len <= s->num_pfds) {
+               unsigned old_len = s->pfd_array_len;
+               s->pfd_array_len = old_len * 2 + 1;
+               PARA_INFO_LOG("pfd array len: %u\n", s->pfd_array_len);
+               s->pfd = para_realloc(s->pfd,
+                       s->pfd_array_len * sizeof(struct pollfd));
+               memset(s->pfd + old_len, 0,
+                       (s->pfd_array_len - old_len) * sizeof(struct pollfd));
+       }
+       s->pfd[s->num_pfds].fd = fd;
+       s->pfd[s->num_pfds].events = events;
+       s->pfd[s->num_pfds].revents = 0;
+       s->num_pfds++;
+}
+
+/**
+ * Instruct the scheduler to monitor an fd for readiness for reading.
+ *
+ * \param fd The file descriptor.
+ * \param s The scheduler.
+ *
+ * \sa \ref sched_monitor_writefd().
+ */
+void sched_monitor_readfd(int fd, struct sched *s)
+{
+       add_pollfd(fd, s, POLLIN);
+}
+
+/**
+ * Instruct the scheduler to monitor an fd for readiness for writing.
+ *
+ * \param fd The file descriptor.
+ * \param s The scheduler.
+ *
+ * \sa \ref sched_monitor_readfd().
+ */
+void sched_monitor_writefd(int fd, struct sched *s)
+{
+       add_pollfd(fd, s, POLLOUT);
+}
+
+static int get_revents(int fd, const struct sched *s)
+{
+       if (fd < 0)
+               return 0;
+       if (fd >= s->pidx_array_len)
+               return 0;
+       if (s->pidx[fd] >= s->num_pfds)
+               return 0;
+       if (s->pfd[s->pidx[fd]].fd != fd)
+               return 0;
+       assert((s->pfd[s->pidx[fd]].revents & POLLNVAL) == 0);
+       return s->pfd[s->pidx[fd]].revents;
+}
+
+/**
+ * Check whether there is data to read on the given fd.
+ *
+ * To be called from the ->post_monitor() method of a task.
+ *
+ * \param fd Should have been monitored with \ref sched_monitor_readfd().
+ * \param s The scheduler instance.
+ *
+ * \return True if the file descriptor is ready for reading, false otherwise.
+ * If fd is negative, or has not been monitored in the current iteration of the
+ * scheduler's main loop, the function also returns false.
+ *
+ * \sa \ref sched_write_ok().
+ */
+bool sched_read_ok(int fd, const struct sched *s)
+{
+       return get_revents(fd, s) & (POLLIN | POLLERR | POLLHUP);
+}
+
+/**
+ * Check whether writing is possible (i.e., does not block).
+ *
+ * \param fd Should have been monitored with \ref sched_monitor_writefd().
+ * \param s The scheduler instance.
+ *
+ * \return True if the file descriptor is ready for writing, false otherwise.
+ * The comment in \ref sched_read_ok() about invalid file descriptors applies
+ * to this function as well.
+ */
+bool sched_write_ok(int fd, const struct sched *s)
+{
+       return get_revents(fd, s) & (POLLOUT | POLLERR | POLLHUP);
+}
diff --git a/sched.h b/sched.h
index 35e2503e383be3611fc302f5a732e45cf322d506..ede5e67ea2ac042c5765a92e6100bc00ab47e103 100644 (file)
--- a/sched.h
+++ b/sched.h
@@ -7,24 +7,30 @@
  * Paraslash's scheduler.
  *
  * Designed with KISS in mind. It maintains a list of task structures which is
- * extended when a new task is registered. Each task may define a pre_select
+ * extended when a new task is registered. Each task may define a pre_monitor
  * function which is called from the scheduler main loop before it calls
- * select(). Similarly, each task must define a post_select function which is
- * called after the select call.
+ * poll(2). Similarly, each task must define a post_monitor function which is
+ * called after poll(2) returns.
+ *
+ * \sa select(2), poll(2).
  */
 struct sched {
-       /** Initial value before any pre_select call. */
-       struct timeval default_timeout;
-       /** The current timeout for the upcoming select call. */
-       struct timeval select_timeout;
-       /** fds that should be watched for readability. */
-       fd_set rfds;
-       /** fds that should be watched for writability. */
-       fd_set wfds;
-       /** Highest numbered file descriptor in any of the above fd sets. */
-       int max_fileno;
-       /** If non-NULL, use this function instead of para_select. */
-       int (*select_function)(int, fd_set *, fd_set *, struct timeval *);
+       /** Initial value (in milliseconds) before any pre_monitor call. */
+       int default_timeout;
+       /** The timeout (also in milliseconds) for the next iteration. */
+       int timeout;
+       /** Passed to poll(2). */
+       struct pollfd *pfd;
+       /** Number of elements in the above array, passed to poll(2). */
+       unsigned pfd_array_len;
+       /** Number of fds registered for montitoring so far. */
+       unsigned num_pfds;
+       /** Maps fds to indices of the pfd array. */
+       unsigned *pidx;
+       /** Mumber of elements in the above pidx array. */
+       unsigned pidx_array_len;
+       /** If non-NULL, use this function instead of \ref xpoll(). */
+       int (*poll_function)(struct pollfd *fds, nfds_t nfds, int timeout);
        /** Tasks which have been registered to the scheduler. */
        struct list_head task_list;
 };
@@ -36,23 +42,32 @@ struct task_info {
        /** Used for log messages and by \ref get_task_list(). */
        const char *name;
        /**
-        * The optional pre select method.
+        * Configure watch fds and impose an upper bound on the I/O timeout.
+        *
+        * If this is not NULL, the function is called at each iteration of the
+        * scheduler's main loop. Its purpose is to tell the scheduler that
+        * certain file descriptors should be monitored for readiness for I/O.
+        * The function may also lower the scheduler's timeout value (but shall
+        * never increase it) to impose an upper bound on the waiting time in
+        * case no file descriptors happen to be ready.
         *
-        * Its purpose is to add file descriptors to the fd sets of the
-        * scheduler and to decrease the select timeout if necessary.
+        * \sa \ref time.c.
         */
-       void (*pre_select)(struct sched *s, void *context);
+       void (*pre_monitor)(struct sched *s, void *context);
        /**
-        * The mandatory post select method.
+        * Perform I/O on file descriptors which are ready for I/O.
+        *
+        * This mandatory hook is called after the system call which monitors
+        * file descriptors returns. The function should perform non-blocking
+        * I/O on those file descriptors which are reported as being ready.
         *
-        * Its purpose is to evaluate and act upon the results of the previous
-        * select call. If this function returns a negative value, the
-        * scheduler unregisters the task.
+        * If this function returns a negative value, the scheduler unregisters
+        * the task.
         */
-       int (*post_select)(struct sched *s, void *context);
+       int (*post_monitor)(struct sched *s, void *context);
        /**
         * This pointer is saved when the task is registered. It is passed to
-        * ->pre_select() and ->post_select(). Usually this is a pointer to the
+        * ->pre_monitor() and ->post_monitor(). Usually this is a pointer to the
         * struct owned by the caller which contains the task pointer.
         */
        void *context;
@@ -80,3 +95,7 @@ void sched_request_timeout(struct timeval *to, struct sched *s);
 void sched_request_timeout_ms(long unsigned ms, struct sched *s);
 int sched_request_barrier(struct timeval *barrier, struct sched *s);
 int sched_request_barrier_or_min_delay(struct timeval *barrier, struct sched *s);
+void sched_monitor_readfd(int fd, struct sched *s);
+void sched_monitor_writefd(int fd, struct sched *s);
+bool sched_read_ok(int fd, const struct sched *s);
+bool sched_write_ok(int fd, const struct sched *s);
diff --git a/send.h b/send.h
index f6aafbb41a2bf9b089f2ddcc9968278dea734fa1..dec5b0db1381694fcb35a98785a50068cb1f6330 100644 (file)
--- a/send.h
+++ b/send.h
@@ -76,27 +76,10 @@ struct sender {
        void (*send)(long unsigned current_chunk, long unsigned chunks_sent,
                const char *buf, size_t len, const char *header_buf,
                size_t header_len);
-       /**
-        * Add file descriptors to fd_sets.
-        *
-        * The pre_select function of each supported sender is called just before
-        * para_server enters its main select loop. Each sender may add its own
-        * file descriptors to the \a rfds or the \a wfds set.
-        *
-        * If a file descriptor was added, \a max_fileno must be increased by
-        * this function, if necessary.
-        *
-        * \sa select(2).
-        */
-       void (*pre_select)(int *max_fileno, fd_set *rfds, fd_set *wfds);
-       /**
-        * Handle the file descriptors which are ready for I/O.
-        *
-        * If the pre_select hook added one ore more file descriptors to the
-        * read or write set, this is the hook to check the result and do any
-        * I/O on those descriptors which are ready for reading/writing.
-        */
-       void (*post_select)(fd_set *rfds, fd_set *wfds);
+       /** Ask the scheduler to monitor file descriptors. */
+       void (*pre_monitor)(struct sched *s);
+       /** Perform I/O on the file descriptors which are ready. */
+       void (*post_monitor)(struct sched *s);
        /**
         * Terminate all connected clients.
         *
@@ -222,6 +205,6 @@ void generic_com_on(struct sender_status *ss, unsigned protocol);
 void generic_acl_deplete(struct list_head *acl);
 void generic_com_off(struct sender_status *ss);
 char *generic_sender_help(void);
-struct sender_client *accept_sender_client(struct sender_status *ss, fd_set *rfds);
+struct sender_client *accept_sender_client(struct sender_status *ss);
 int send_queued_chunks(int fd, struct chunk_queue *cq);
 int parse_fec_url(const char *arg, struct sender_command_data *scd);
index 90242d5c9b5ccb125617004e16fbf9b64d5484f6..ce167542c8c3178986037c806d88af6b0c4623f2 100644 (file)
 #include "afs.h"
 #include "server.h"
 #include "acl.h"
+#include "sched.h"
 #include "send.h"
 #include "close_on_fork.h"
 #include "chunk_queue.h"
-#include "sched.h"
 #include "vss.h"
 
 /** Clients will be kicked if there are more than that many bytes pending. */
@@ -343,7 +343,6 @@ void generic_com_off(struct sender_status *ss)
  * Accept a connection on the socket(s) this server is listening on.
  *
  * \param ss The sender whose listening fd is ready for reading.
- * \param rfds Passed to para_accept(),
  *
  * This accepts incoming connections on any of the listening sockets of the
  * server. If there is a connection pending, the function
@@ -367,7 +366,7 @@ void generic_com_off(struct sender_status *ss)
  * \sa \ref para_accept(), \ref mark_fd_nonblocking(), \ref acl_check_access(),
  * \ref cq_new(), \ref add_close_on_fork_list().
  */
-struct sender_client *accept_sender_client(struct sender_status *ss, fd_set *rfds)
+struct sender_client *accept_sender_client(struct sender_status *ss)
 {
        struct sender_client *sc;
        int fd, ret;
@@ -376,7 +375,7 @@ struct sender_client *accept_sender_client(struct sender_status *ss, fd_set *rfd
        FOR_EACH_LISTEN_FD(n, ss) {
                if (ss->listen_fds[n] < 0)
                        continue;
-               ret = para_accept(ss->listen_fds[n], rfds, NULL, 0, &fd);
+               ret = para_accept(ss->listen_fds[n], NULL, 0, &fd);
                if (ret < 0)
                        goto warn;
                if (ret == 0)
index e0df714be975b083f4d3cc7116ad0af2501a2b46..2c66cc279c9faa99e009ae747fd4d8a54738e2e8 100644 (file)
--- a/server.c
+++ b/server.c
@@ -24,8 +24,8 @@
 #include "net.h"
 #include "server.h"
 #include "list.h"
-#include "send.h"
 #include "sched.h"
+#include "send.h"
 #include "vss.h"
 #include "config.h"
 #include "close_on_fork.h"
@@ -250,14 +250,14 @@ static void handle_sighup(void)
                kill(afs_pid, SIGHUP);
 }
 
-static int signal_post_select(struct sched *s, __a_unused void *context)
+static int signal_post_monitor(struct sched *s, __a_unused void *context)
 {
        int ret, signum;
 
        ret = task_get_notification(signal_task->task);
        if (ret < 0)
                return ret;
-       signum = para_next_signal(&s->rfds);
+       signum = para_next_signal();
        switch (signum) {
        case 0:
                return 0;
@@ -313,20 +313,20 @@ static void init_signal_task(void)
        add_close_on_fork_list(signal_task->fd);
        signal_task->task = task_register(&(struct task_info) {
                .name = "signal",
-               .pre_select = signal_pre_select,
-               .post_select = signal_post_select,
+               .pre_monitor = signal_pre_monitor,
+               .post_monitor = signal_post_monitor,
                .context = signal_task,
 
        }, &sched);
 }
 
-static void command_pre_select(struct sched *s, void *context)
+static void command_pre_monitor(struct sched *s, void *context)
 {
        unsigned n;
        struct server_command_task *sct = context;
 
        for (n = 0; n < sct->num_listen_fds; n++)
-               para_fd_set(sct->listen_fds[n], &s->rfds, &s->max_fileno);
+               sched_monitor_readfd(sct->listen_fds[n], s);
 }
 
 static int command_task_accept(unsigned listen_idx, struct sched *s,
@@ -337,7 +337,7 @@ static int command_task_accept(unsigned listen_idx, struct sched *s,
        pid_t child_pid;
        uint32_t *chunk_table;
 
-       ret = para_accept(sct->listen_fds[listen_idx], &s->rfds, NULL, 0, &new_fd);
+       ret = para_accept(sct->listen_fds[listen_idx], NULL, 0, &new_fd);
        if (ret <= 0)
                goto out;
        mmd->num_connects++;
@@ -383,13 +383,13 @@ static int command_task_accept(unsigned listen_idx, struct sched *s,
        /*
         * After we return, the scheduler calls server_select() with a minimal
         * timeout value, because the remaining tasks have a notification
-        * pending. Next it calls the ->post_select method of these tasks,
+        * pending. Next it calls the ->post_monitor method of these tasks,
         * which will return negative in view of the notification. This causes
         * schedule() to return as there are no more runnable tasks.
         *
         * Note that semaphores are not inherited across a fork(), so we don't
-        * hold the lock at this point. Since server_select() drops the lock
-        * prior to calling para_select(), we need to acquire it here.
+        * hold the lock at this point. Since server_poll() drops the lock
+        * prior to calling poll(), we need to acquire it here.
         */
        mutex_lock(mmd_mutex);
        return -E_CHILD_CONTEXT;
@@ -399,7 +399,7 @@ out:
        return 0;
 }
 
-static int command_post_select(struct sched *s, void *context)
+static int command_post_monitor(struct sched *s, void *context)
 {
        struct server_command_task *sct = context;
        unsigned n;
@@ -459,8 +459,8 @@ static void init_server_command_task(struct server_command_task *sct,
 
        sct->task = task_register(&(struct task_info) {
                .name = "server command",
-               .pre_select = command_pre_select,
-               .post_select = command_post_select,
+               .pre_monitor = command_pre_monitor,
+               .post_monitor = command_post_monitor,
                .context = sct,
        }, &sched);
        /*
@@ -617,14 +617,13 @@ out:
        killpg(0, SIGUSR1);
 }
 
-static int server_select(int max_fileno, fd_set *readfds, fd_set *writefds,
-               struct timeval *timeout_tv)
+static int server_poll(struct pollfd *fds, nfds_t nfds, int timeout)
 {
        int ret;
 
        status_refresh();
        mutex_unlock(mmd_mutex);
-       ret = para_select(max_fileno + 1, readfds, writefds, timeout_tv);
+       ret = xpoll(fds, nfds, timeout);
        mutex_lock(mmd_mutex);
        return ret;
 }
@@ -658,15 +657,15 @@ int main(int argc, char *argv[])
        struct server_command_task server_command_task_struct,
                *sct = &server_command_task_struct;
 
-       sched.default_timeout.tv_sec = 1;
-       sched.select_function = server_select;
+       sched.default_timeout = 1000;
+       sched.poll_function = server_poll;
 
        server_init(argc, argv, sct);
        mutex_lock(mmd_mutex);
        ret = schedule(&sched);
        /*
-        * We hold the mmd lock: it was re-acquired in server_select()
-        * after the select call.
+        * We hold the mmd lock: it was re-acquired in server_poll()
+        * after the poll(2) call.
         */
        mutex_unlock(mmd_mutex);
        sched_shutdown(&sched);
index 32d6ab6624e5f493ac393b630a603c0968f11497..e5ef7a4119dc271320079d778313c5abf1d259d2 100644 (file)
--- a/signal.c
+++ b/signal.c
@@ -27,7 +27,7 @@ static int signal_pipe[2];
  * signal arrives, the signal handler writes the number of the signal received
  * to one end of the signal pipe. The application can test for pending signals
  * by checking if the file descriptor of the other end of the signal pipe is
- * ready for reading, see select(2).
+ * ready for reading.
  *
  * \return This function either succeeds or calls exit(3) to terminate the
  * current process. On success, a signal task structure is returned.
@@ -202,16 +202,14 @@ void para_unblock_signal(int sig)
 /**
  * Return the number of the next pending signal.
  *
- * \param rfds The fd_set containing the signal pipe.
- *
  * \return On success, the number of the received signal is returned. If there
  * is no signal currently pending, the function returns zero. On read errors
  * from the signal pipe, the process is terminated.
  */
-int para_next_signal(fd_set *rfds)
+int para_next_signal(void)
 {
        size_t n;
-       int s, ret = read_nonblock(signal_pipe[0], &s, sizeof(s), rfds, &n);
+       int s, ret = read_nonblock(signal_pipe[0], &s, sizeof(s), &n);
 
        if (ret < 0) {
                PARA_EMERG_LOG("%s\n", para_strerror(-ret));
index e5532ded5a9378619f7951708b49c5b38bf52d18..d9e98e78a635930577038d6090c9d5260772ddec 100644 (file)
--- a/signal.h
+++ b/signal.h
@@ -13,32 +13,31 @@ struct signal_task {
 };
 
 /**
- * A generic pre-select method for signal tasks.
+ * Monitor the signal fd for reading.
  *
- * \param s Passed to para_fd_set().
+ * \param s The scheduler instance.
  * \param context Signal task pointer.
  *
  * This convenience helper is called from several programs which need to handle
- * signals, including para_server and para_audiod. These programs define a
- * signal task structure and set its ->pre_select method to this function which
- * adds the file descriptor of the signal task to the set of descriptors to be
- * watched in the next select() call.
+ * signals, including para_server and para_audiod. These programs set up a
+ * signal pipe and a signal task structure, and use this function to tell the
+ * scheduler to monitor the read end of the pipe.
  *
  * Although the second parameter must be in fact a pointer to a signal_task
- * structure, the parameter is specified as void * here to match the
- * ->pre_select method of struct task.
+ * structure, the parameter is specified as void * here to match the signature
+ * declared in struct \ref task_info.
  */
-_static_inline_ void signal_pre_select(struct sched *s, void *context)
+_static_inline_ void signal_pre_monitor(struct sched *s, void *context)
 {
        struct signal_task *st = context;
-       para_fd_set(st->fd, &s->rfds, &s->max_fileno);
+       sched_monitor_readfd(st->fd, s);
 }
 
 struct signal_task *signal_init_or_die(void);
 void para_sigaction(int sig, void (*handler)(int));
 void para_install_sighandler(int);
 int para_reap_child(pid_t *pid);
-int para_next_signal(fd_set *rfds);
+int para_next_signal(void);
 void signal_shutdown(struct signal_task *st);
 void para_block_signal(int sig);
 void para_unblock_signal(int sig);
index 7be817ddaa49bf97e5d4025c43be63b42da9cd4c..94a9c78835e93157be51753bc910f251820501e8 100644 (file)
@@ -246,7 +246,7 @@ static int compute_skip_samples(ogg_page *og, struct private_spxdec_data *psd)
        return ret;
 }
 
-static int speexdec_post_select(__a_unused struct sched *s, void *context)
+static int speexdec_post_monitor(__a_unused struct sched *s, void *context)
 {
        struct filter_node *fn = context;
        struct private_spxdec_data *psd = fn->private_data;
@@ -305,7 +305,7 @@ fail:
 const struct filter lsg_filter_cmd_com_spxdec_user_data = {
        .open = spxdec_open,
        .close = speexdec_close,
-       .pre_select = generic_filter_pre_select,
-       .post_select = speexdec_post_select,
+       .pre_monitor = generic_filter_pre_monitor,
+       .post_monitor = speexdec_post_monitor,
        .execute = speexdec_execute,
 };
diff --git a/stdin.c b/stdin.c
index 9408235a045a6767e2acf443208d95196990d379..d025b949e8e8d2566abde3f2f214db7998ba44ff 100644 (file)
--- a/stdin.c
+++ b/stdin.c
 #include "string.h"
 
 /*
- * If there is space left in the buffer of the stdin task add STDIN_FILENO to
- * the read fd set of s.
+ * If there is space left in the buffer of the stdin task, ask the scheduler to
+ * monitor STDIN_FILENO.
  */
-static void stdin_pre_select(struct sched *s, void *context)
+static void stdin_pre_monitor(struct sched *s, void *context)
 {
        struct stdin_task *sit = context;
        int ret;
@@ -28,16 +28,15 @@ static void stdin_pre_select(struct sched *s, void *context)
        if (ret <= 0)
                return;
        if (btr_pool_unused(sit->btrp) > 0)
-               return para_fd_set(STDIN_FILENO, &s->rfds, &s->max_fileno);
+               return sched_monitor_readfd(STDIN_FILENO, s);
        sched_request_timeout_ms(100, s);
 }
 
 /*
- * This function checks if STDIN_FILENO was included by in the read fd set of s
- * during the previous pre_select call. If so, and if STDIN_FILENO is readable,
- * data is read from stdin and fed into the buffer tree.
+ * Feed data from stdin into the buffer tree if STDIN_FILENO is ready for
+ * reading.
  */
-static int stdin_post_select(struct sched *s, void *context)
+static int stdin_post_monitor(__a_unused struct sched *s, void *context)
 {
        struct stdin_task *sit = context;
        ssize_t ret;
@@ -64,7 +63,7 @@ static int stdin_post_select(struct sched *s, void *context)
         * reference can not be freed, we're stuck.
         */
        sz = PARA_MIN(sz, btr_pool_size(sit->btrp) / 2);
-       ret = read_nonblock(STDIN_FILENO, buf, sz, &s->rfds, &n);
+       ret = read_nonblock(STDIN_FILENO, buf, sz, &n);
        if (n > 0)
                btr_add_output_pool(sit->btrp, n, sit->btrn);
        if (ret >= 0)
@@ -91,8 +90,8 @@ void stdin_task_register(struct stdin_task *sit, struct sched *s)
        int ret;
        struct task_info ti = {
                .name = "stdin",
-               .pre_select = stdin_pre_select,
-               .post_select = stdin_post_select,
+               .pre_monitor = stdin_pre_monitor,
+               .post_monitor = stdin_post_monitor,
                .context = sit,
        };
 
index 1f7791094a33a961c193c273a4c09e9b16f2be07..ba5f19670fce6c3c04d256a2056477064f5df7d3 100644 (file)
--- a/stdout.c
+++ b/stdout.c
 #include "stdout.h"
 #include "buffer_tree.h"
 
-/* Add STDOUT_FILENO to the write fd set if there is input data available. */
-static void stdout_pre_select(struct sched *s, void *context)
+/* Monitor STDOUT_FILENO if there is input data available. */
+static void stdout_pre_monitor(struct sched *s, void *context)
 {
        struct stdout_task *sot = context;
        int ret;
 
        ret = btr_node_status(sot->btrn, 0, BTR_NT_LEAF);
        if (ret > 0)
-               para_fd_set(STDOUT_FILENO, &s->wfds, &s->max_fileno);
+               sched_monitor_writefd(STDOUT_FILENO, s);
        else if (ret < 0)
                sched_min_delay(s);
 }
 
 /*
- * This function writes input data from the buffer tree to stdout if
- * STDOUT_FILENO is writable.
+ * If input from the buffer tree is available and STDOUT_FILENO is ready, write
+ * as much as possible.
  */
-static int stdout_post_select(struct sched *s, void *context)
+static int stdout_post_monitor(struct sched *s, void *context)
 {
        struct stdout_task *sot = context;
        struct btr_node *btrn = sot->btrn;
@@ -40,7 +40,7 @@ static int stdout_post_select(struct sched *s, void *context)
                goto out;
        if (ret == 0)
                return 0;
-       if (!FD_ISSET(STDOUT_FILENO, &s->wfds))
+       if (!sched_write_ok(STDOUT_FILENO, s))
                return 0;
 
        if (sot->must_set_nonblock_flag) {
@@ -79,8 +79,8 @@ void stdout_task_register(struct stdout_task *sot, struct sched *s)
 {
        int ret;
        struct task_info ti = {
-               .pre_select = stdout_pre_select,
-               .post_select = stdout_post_select,
+               .pre_monitor = stdout_pre_monitor,
+               .post_monitor = stdout_post_monitor,
                .context = sot,
                .name = "stdout",
        };
index 8e9ff2c5de79a6385b6472453f3dec417b9ac5cc..3174a4ef741b0afe1107ff6bd9146f1c82f5f77e 100644 (file)
@@ -248,7 +248,7 @@ static void sync_set_timeout(struct sync_filter_context *ctx,
        tv_add(now, &to, &ctx->timeout);
 }
 
-static void sync_pre_select(struct sched *s, void *context)
+static void sync_pre_monitor(struct sched *s, void *context)
 {
        int ret;
        struct filter_node *fn = context;
@@ -261,7 +261,7 @@ static void sync_pre_select(struct sched *s, void *context)
        ret = btr_node_status(fn->btrn, 0, BTR_NT_INTERNAL);
        if (ret < 0)
                return sched_min_delay(s);
-       para_fd_set(ctx->listen_fd, &s->rfds, &s->max_fileno);
+       sched_monitor_readfd(ctx->listen_fd, s);
        if (ret == 0)
                return;
        if (ctx->timeout.tv_sec == 0) { /* must ping buddies */
@@ -284,7 +284,7 @@ static struct sync_buddy *sync_find_buddy(struct sockaddr *addr,
        return NULL;
 }
 
-static int sync_post_select(__a_unused struct sched *s, void *context)
+static int sync_post_monitor(__a_unused struct sched *s, void *context)
 {
        int ret;
        struct filter_node *fn = context;
@@ -324,7 +324,7 @@ static int sync_post_select(__a_unused struct sched *s, void *context)
                }
                ctx->ping_sent = true;
        }
-       if (FD_ISSET(ctx->listen_fd, &s->rfds)) {
+       if (sched_read_ok(ctx->listen_fd, s)) {
                char c;
                for (;;) {
                        struct sockaddr src_addr;
@@ -377,8 +377,8 @@ out:
 const struct filter lsg_filter_cmd_com_sync_user_data = {
        .setup = sync_setup,
        .open = sync_open,
-       .pre_select = sync_pre_select,
-       .post_select = sync_post_select,
+       .pre_monitor = sync_pre_monitor,
+       .post_monitor = sync_post_monitor,
        .close = sync_close,
        .teardown = sync_teardown
 };
index 58d45ab43d6508619d34186b3f4a23f6d455ecc7..8d1274bc1e919e703f3856cc8159269d33756f2c 100644 (file)
 #include "net.h"
 #include "fd.h"
 
-static void udp_recv_pre_select(struct sched *s, void *context)
+static void udp_recv_pre_monitor(struct sched *s, void *context)
 {
        struct receiver_node *rn = context;
 
-       if (generic_recv_pre_select(s, rn) <= 0)
+       if (generic_recv_pre_monitor(s, rn) <= 0)
                return;
-       para_fd_set(rn->fd, &s->rfds, &s->max_fileno);
+       sched_monitor_readfd(rn->fd, s);
 }
 
 static int udp_check_eof(size_t sz, struct iovec iov[2])
@@ -50,7 +50,7 @@ static int udp_check_eof(size_t sz, struct iovec iov[2])
        return -E_RECV_EOF;
 }
 
-static int udp_recv_post_select(__a_unused struct sched *s, void *context)
+static int udp_recv_post_monitor(__a_unused struct sched *s, void *context)
 {
        struct receiver_node *rn = context;
        struct btr_node *btrn = rn->btrn;
@@ -68,7 +68,7 @@ static int udp_recv_post_select(__a_unused struct sched *s, void *context)
        ret = -E_UDP_OVERRUN;
        if (iovcnt == 0)
                goto out;
-       ret = readv_nonblock(rn->fd, iov, iovcnt, &s->rfds, &num_bytes);
+       ret = readv_nonblock(rn->fd, iov, iovcnt, &num_bytes);
        if (num_bytes == 0)
                goto out;
        readv_ret = ret;
@@ -189,6 +189,6 @@ err:
 const struct receiver lsg_recv_cmd_com_udp_user_data = {
        .open = udp_recv_open,
        .close = udp_recv_close,
-       .pre_select = udp_recv_pre_select,
-       .post_select = udp_recv_post_select,
+       .pre_monitor = udp_recv_pre_monitor,
+       .post_monitor = udp_recv_post_monitor,
 };
index 68d75e3c3ef87dc089a3a1eec4d59fb647e73e44..ac656ff2cc02a47353fb73231fa96ad517dcd090 100644 (file)
@@ -21,8 +21,8 @@
 #include "net.h"
 #include "server.h"
 #include "list.h"
-#include "send.h"
 #include "sched.h"
+#include "send.h"
 #include "vss.h"
 #include "portable_io.h"
 #include "fd.h"
diff --git a/vss.c b/vss.c
index f9bf57b5575a7348fa5b5bb8a2db3c446d9dd92c..4f270c4a50b35df749c8f9c3cf2bbeba9464eb7b 100644 (file)
--- a/vss.c
+++ b/vss.c
@@ -28,8 +28,8 @@
 #include "net.h"
 #include "server.h"
 #include "list.h"
-#include "send.h"
 #include "sched.h"
+#include "send.h"
 #include "vss.h"
 #include "ipc.h"
 #include "fd.h"
@@ -43,7 +43,7 @@ const struct sender * const senders[] = {
 enum afs_socket_status {
        /** Socket is inactive. */
        AFS_SOCKET_READY,
-       /** Socket fd was included in the write fd set for select(). */
+       /** Socket fd was monitored for writing. */
        AFS_SOCKET_CHECK_FOR_WRITE,
        /** vss wrote a request to the socket and waits for reply from afs. */
        AFS_SOCKET_AFD_PENDING
@@ -827,7 +827,7 @@ static void vss_compute_timeout(struct sched *s, struct vss_task *vsst)
        if (sched_request_barrier(&vsst->data_send_barrier, s) == 1)
                return;
        /*
-        * Compute the select timeout as the minimal time until the next
+        * Compute the I/O timeout as the minimal time until the next
         * chunk/slice is due for any client.
         */
        compute_chunk_time(mmd->chunks_sent, &mmd->afd.afhi.chunk_tv,
@@ -892,21 +892,21 @@ static void set_mmd_offset(void)
        mmd->offset = tv2ms(&offset);
 }
 
-static void vss_pre_select(struct sched *s, void *context)
+static void vss_pre_monitor(struct sched *s, void *context)
 {
        int i;
        struct vss_task *vsst = context;
 
        if (need_to_request_new_audio_file(vsst)) {
                PARA_DEBUG_LOG("ready and playing, but no audio file\n");
-               para_fd_set(vsst->afs_socket, &s->wfds, &s->max_fileno);
+               sched_monitor_writefd(vsst->afs_socket, s);
                vsst->afsss = AFS_SOCKET_CHECK_FOR_WRITE;
        } else
-               para_fd_set(vsst->afs_socket, &s->rfds, &s->max_fileno);
+               sched_monitor_readfd(vsst->afs_socket, s);
        FOR_EACH_SENDER(i) {
-               if (!senders[i]->pre_select)
+               if (!senders[i]->pre_monitor)
                        continue;
-               senders[i]->pre_select(&s->max_fileno, &s->rfds, &s->wfds);
+               senders[i]->pre_monitor(s);
        }
        vss_compute_timeout(s, vsst);
 }
@@ -950,13 +950,13 @@ static int recv_afs_msg(int afs_socket, int *fd, uint32_t *code, uint32_t *data)
 #define MAP_POPULATE 0
 #endif
 
-static void recv_afs_result(struct vss_task *vsst, fd_set *rfds)
+static void recv_afs_result(struct vss_task *vsst, const struct sched *s)
 {
        int ret, passed_fd, shmid;
        uint32_t afs_code = 0, afs_data = 0;
        struct stat statbuf;
 
-       if (!FD_ISSET(vsst->afs_socket, rfds))
+       if (!sched_read_ok(vsst->afs_socket, s))
                return;
        ret = recv_afs_msg(vsst->afs_socket, &passed_fd, &afs_code, &afs_data);
        if (ret == -ERRNO_TO_PARA_ERROR(EAGAIN))
@@ -1016,7 +1016,7 @@ err:
 /**
  * Main sending function.
  *
- * This function gets called from vss_post_select(). It checks whether the next
+ * This function gets called from vss_post_monitor(). It checks whether the next
  * chunk of data should be pushed out. It obtains a pointer to the data to be
  * sent out as well as its length from mmd->afd.afhi. This information is then
  * passed to each supported sender's send() function as well as to the send()
@@ -1087,7 +1087,7 @@ static void vss_send(struct vss_task *vsst)
        mmd->current_chunk++;
 }
 
-static int vss_post_select(struct sched *s, void *context)
+static int vss_post_monitor(struct sched *s, void *context)
 {
        int ret, i;
        struct vss_task *vsst = context;
@@ -1137,8 +1137,8 @@ static int vss_post_select(struct sched *s, void *context)
                mmd->sender_cmd_data.cmd_num = -1;
        }
        if (vsst->afsss != AFS_SOCKET_CHECK_FOR_WRITE)
-               recv_afs_result(vsst, &s->rfds);
-       else if (FD_ISSET(vsst->afs_socket, &s->wfds)) {
+               recv_afs_result(vsst, s);
+       else if (sched_write_ok(vsst->afs_socket, s)) {
                PARA_INFO_LOG("requesting new fd from afs\n");
                ret = write_buffer(vsst->afs_socket, "new");
                if (ret < 0)
@@ -1147,9 +1147,9 @@ static int vss_post_select(struct sched *s, void *context)
                        vsst->afsss = AFS_SOCKET_AFD_PENDING;
        }
        FOR_EACH_SENDER(i) {
-               if (!senders[i]->post_select)
+               if (!senders[i]->post_monitor)
                        continue;
-               senders[i]->post_select(&s->rfds, &s->wfds);
+               senders[i]->post_monitor(s);
        }
        if ((vss_playing() && !(mmd->vss_status_flags & VSS_PLAYING)) ||
                        (vss_next() && vss_playing()))
@@ -1194,8 +1194,8 @@ void vss_init(int afs_socket, struct sched *s)
        }
        vsst->task = task_register(&(struct task_info) {
                .name = "vss",
-               .pre_select = vss_pre_select,
-               .post_select = vss_post_select,
+               .pre_monitor = vss_pre_monitor,
+               .post_monitor = vss_post_monitor,
                .context = vsst,
        }, s);
 }
index e749160d3337e87dc7c908c023ea100c955590e4..692306b541fcb28012b1a2a4d873a83d93855437 100644 (file)
@@ -58,7 +58,7 @@ static void wav_open(struct filter_node *fn)
        *bof = 1;
 }
 
-static void wav_pre_select(struct sched *s, void *context)
+static void wav_pre_monitor(struct sched *s, void *context)
 {
        struct filter_node *fn = context;
        size_t iqs = btr_get_input_queue_size(fn->btrn);
@@ -68,7 +68,7 @@ static void wav_pre_select(struct sched *s, void *context)
        sched_min_delay(s);
 }
 
-static int wav_post_select(__a_unused struct sched *s, void *context)
+static int wav_post_monitor(__a_unused struct sched *s, void *context)
 {
        struct filter_node *fn = context;
        struct btr_node *btrn = fn->btrn;
@@ -118,6 +118,6 @@ err:
 const struct filter lsg_filter_cmd_com_wav_user_data = {
        .close = wav_close,
        .open = wav_open,
-       .pre_select = wav_pre_select,
-       .post_select = wav_post_select,
+       .pre_monitor = wav_pre_monitor,
+       .post_monitor = wav_post_monitor,
 };
index edf50cb0b3834d8971fe9602f6528e2f2bd86e5d..8061f9aeedb32d6cd0b0cf2083eefd991ed8a1d8 100644 (file)
@@ -16,7 +16,6 @@
 
 #include <math.h>
 #include <regex.h>
-#include <sys/select.h>
 
 #include "para.h"
 #include "error.h"
@@ -1159,7 +1158,7 @@ static int wmadec_execute(struct btr_node *btrn, const char *cmd, char **result)
 
 #define WMA_OUTPUT_BUFFER_SIZE (128 * 1024)
 
-static int wmadec_post_select(__a_unused struct sched *s, void *context)
+static int wmadec_post_monitor(__a_unused struct sched *s, void *context)
 {
        struct filter_node *fn = context;
        int ret, converted, out_size;
@@ -1229,6 +1228,6 @@ const struct filter lsg_filter_cmd_com_wmadec_user_data = {
        .open = wmadec_open,
        .close = wmadec_close,
        .execute = wmadec_execute,
-       .pre_select = generic_filter_pre_select,
-       .post_select = wmadec_post_select,
+       .pre_monitor = generic_filter_pre_monitor,
+       .post_monitor = wmadec_post_monitor,
 };
diff --git a/write.c b/write.c
index acfb94605b53d259a89ea41e75507b81e2cb2ca4..9e2de3d8207d977d074546280304db085990e761 100644 (file)
--- a/write.c
+++ b/write.c
@@ -54,16 +54,16 @@ struct write_task {
        struct check_wav_context *cwc;
 };
 
-static void write_pre_select(struct sched *s, void *context)
+static void write_pre_monitor(struct sched *s, void *context)
 {
        struct write_task *wt = context;
-       check_wav_pre_select(s, wt->cwc);
+       check_wav_pre_monitor(s, wt->cwc);
 }
 
-static int write_post_select(__a_unused struct sched *s, void *context)
+static int write_post_monitor(__a_unused struct sched *s, void *context)
 {
        struct write_task *wt = context;
-       return check_wav_post_select(wt->cwc);
+       return check_wav_post_monitor(wt->cwc);
 }
 
 static int setup_and_schedule(struct lls_parse_result *lpr)
@@ -83,8 +83,8 @@ static int setup_and_schedule(struct lls_parse_result *lpr)
        wt.cwc = check_wav_init(sit.btrn, NULL, &wp, &cw_btrn);
        wt.task = task_register(&(struct task_info) {
                .name = "write",
-               .pre_select = write_pre_select,
-               .post_select = write_post_select,
+               .pre_monitor = write_pre_monitor,
+               .post_monitor = write_post_monitor,
                .context = &wt,
        }, &s);
 
@@ -96,8 +96,7 @@ static int setup_and_schedule(struct lls_parse_result *lpr)
                wns[i].wid = check_writer_arg_or_die(arg, &wns[i].lpr);
                register_writer_node(wns + i, cw_btrn, &s);
        }
-       s.default_timeout.tv_sec = 10;
-       s.default_timeout.tv_usec = 50000;
+       s.default_timeout = 10500;
        ret = schedule(&s);
        if (ret >= 0) {
                int j, ts;
diff --git a/write.h b/write.h
index 833cb69a5cb6373bae819d5b7c323e1d567c6104..35a8d29f1a3f7dd493b65c394bc8ce63aa7f9f8f 100644 (file)
--- a/write.h
+++ b/write.h
@@ -20,21 +20,23 @@ struct writer_node {
        size_t min_iqs;
 };
 
-/** Describes one supported writer. */
+/**
+ * Describes a data sink for audio streams.
+ *
+ * A paraslash writer obtains data via the buffer tree mechanism from its
+ * parent node. It consumes data without producing output on its own.
+ *
+ * This structure contains the methods which have to be implemented by each
+ * writer.
+ *
+ * \sa struct \ref writer_node, struct \ref receiver, struct \ref filter,
+ * struct \ref sched.
+ */
 struct writer {
-       /**
-        * Prepare the fd sets for select.
-        *
-        * This is called from scheduler. It may use the sched pointer to add
-        * any file descriptors or to decrease the select timeout.
-        */
-       void (*pre_select)(struct sched *s, void *context);
-       /**
-        * Write audio data.
-        *
-        * Called from the post_select function of the writer node's task.
-        */
-       int (*post_select)(struct sched *s, void *context);
+       /** Ask the scheduler to check whether data can be written. */
+       void (*pre_monitor)(struct sched *s, void *context);
+       /** Write audio data. */
+       int (*post_monitor)(struct sched *s, void *context);
        /**
         * Close one instance of the writer.
         *
index 41c3eb23728ab11cde71f837d4c58a1ef6908d24..806d682f7a4b914e019fe152945e13a0d623cd76 100644 (file)
@@ -139,8 +139,8 @@ void register_writer_node(struct writer_node *wn, struct btr_node *parent,
                .handler = w->execute, .context = wn));
        wn->task = task_register(&(struct task_info) {
                .name = writer_name(wn->wid),
-               .pre_select = w->pre_select,
-               .post_select = w->post_select,
+               .pre_monitor = w->pre_monitor,
+               .post_monitor = w->post_monitor,
                .context = wn,
        }, s);
 }