Merge branch 't/nonblock_api'
authorAndre Noll <maan@systemlinux.org>
Thu, 13 May 2010 11:08:59 +0000 (13:08 +0200)
committerAndre Noll <maan@systemlinux.org>
Thu, 13 May 2010 11:08:59 +0000 (13:08 +0200)
24 files changed:
afs.c
audioc.c
audiod.c
audiod.h
audiod_command.c
client_common.c
dccp_recv.c
dccp_send.c
error.h
fd.c
fd.h
gui.c
http_recv.c
http_send.c
net.c
net.h
send.h
send_common.c
server.c
signal.c
signal.h
stdin.c
udp_recv.c
vss.c

diff --git a/afs.c b/afs.c
index d738c3d..2b748f2 100644 (file)
--- a/afs.c
+++ b/afs.c
@@ -709,15 +709,16 @@ static void signal_pre_select(struct sched *s, struct task *t)
 
 static void afs_signal_post_select(struct sched *s, struct task *t)
 {
-       struct signal_task *st = container_of(t, struct signal_task, task);
+       int signum;
+
        if (getppid() == 1) {
                PARA_EMERG_LOG("para_server died\n");
                goto shutdown;
        }
-       if (!FD_ISSET(st->fd, &s->rfds))
+       signum = para_next_signal(&s->rfds);
+       if (signum == 0)
                return;
-       st->signum = para_next_signal();
-       if (st->signum == SIGHUP) {
+       if (signum == SIGHUP) {
                close_afs_tables();
                parse_config_or_die(1);
                t->error = open_afs_tables();
@@ -726,7 +727,7 @@ static void afs_signal_post_select(struct sched *s, struct task *t)
                init_admissible_files(current_mop);
                return;
        }
-       PARA_EMERG_LOG("terminating on signal %d\n", st->signum);
+       PARA_EMERG_LOG("terminating on signal %d\n", signum);
 shutdown:
        sched_shutdown();
        t->error = -E_AFS_SIGNAL;
@@ -842,57 +843,56 @@ static int call_callback(int fd, int query_shmid)
        return shm_detach(query_shm);
 }
 
-static int execute_server_command(void)
+static int execute_server_command(fd_set *rfds)
 {
        char buf[8];
-       int ret = recv_bin_buffer(server_socket, buf, sizeof(buf) - 1);
+       size_t n;
+       int ret = read_nonblock(server_socket, buf, sizeof(buf) - 1, rfds, &n);
 
-       if (ret <= 0) {
-               if (!ret)
-                       ret = -ERRNO_TO_PARA_ERROR(ECONNRESET);
-               goto err;
-       }
-       buf[ret] = '\0';
-       PARA_DEBUG_LOG("received: %s\n", buf);
-       ret = -E_BAD_CMD;
+       if (ret < 0 || n == 0)
+               return ret;
+       buf[n] = '\0';
        if (strcmp(buf, "new"))
-               goto err;
-       ret = open_next_audio_file();
-err:
-       return ret;
+               return -E_BAD_CMD;
+       return open_next_audio_file();
 }
 
-static void execute_afs_command(int fd, uint32_t expected_cookie)
+/* returns 0 if no data available, 1 else */
+static int execute_afs_command(int fd, fd_set *rfds, uint32_t expected_cookie)
 {
        uint32_t cookie;
        int query_shmid;
        char buf[sizeof(cookie) + sizeof(query_shmid)];
-       int ret = recv_bin_buffer(fd, buf, sizeof(buf));
+       size_t n;
+       int ret = read_nonblock(fd, buf, sizeof(buf), rfds, &n);
 
        if (ret < 0)
                goto err;
-       if (ret != sizeof(buf)) {
+       if (n == 0)
+               return 0;
+       if (n != sizeof(buf)) {
                PARA_NOTICE_LOG("short read (%d bytes, expected %lu)\n",
                        ret, (long unsigned) sizeof(buf));
-               return;
+               return 1;
        }
        cookie = *(uint32_t *)buf;
        if (cookie != expected_cookie) {
-               PARA_NOTICE_LOG("received invalid cookie(got %u, expected %u)\n",
+               PARA_NOTICE_LOG("received invalid cookie (got %u, expected %u)\n",
                        (unsigned)cookie, (unsigned)expected_cookie);
-               return;
+               return 1;
        }
        query_shmid = *(int *)(buf + sizeof(cookie));
        if (query_shmid < 0) {
                PARA_WARNING_LOG("received invalid query shmid %d)\n",
                        query_shmid);
-               return;
+               return 1;
        }
        ret = call_callback(fd, query_shmid);
        if (ret >= 0)
-               return;
+               return 1;
 err:
        PARA_NOTICE_LOG("%s\n", para_strerror(-ret));
+       return 1;
 }
 
 /** Shutdown connection if query has not arrived until this many seconds. */
@@ -905,20 +905,16 @@ static void command_post_select(struct sched *s, struct task *t)
        struct afs_client *client, *tmp;
        int fd, ret;
 
-       if (FD_ISSET(server_socket, &s->rfds)) {
-               ret = execute_server_command();
-               if (ret < 0) {
-                       PARA_EMERG_LOG("%s\n", para_strerror(-ret));
-                       sched_shutdown();
-                       return;
-               }
+       ret = execute_server_command(&s->rfds);
+       if (ret < 0) {
+               PARA_EMERG_LOG("%s\n", para_strerror(-ret));
+               sched_shutdown();
+               return;
        }
-
        /* Check the list of connected clients. */
        list_for_each_entry_safe(client, tmp, &afs_client_list, node) {
-               if (FD_ISSET(client->fd, &s->rfds))
-                       execute_afs_command(client->fd, ct->cookie);
-               else { /* prevent bogus connection flooding */
+               ret = execute_afs_command(client->fd, &s->rfds, ct->cookie);
+               if (ret == 0) { /* prevent bogus connection flooding */
                        struct timeval diff;
                        tv_diff(now, &client->connect_time, &diff);
                        if (diff.tv_sec < AFS_CLIENT_TIMEOUT)
@@ -930,14 +926,11 @@ static void command_post_select(struct sched *s, struct task *t)
                free(client);
        }
        /* Accept connections on the local socket. */
-       if (!FD_ISSET(ct->fd, &s->rfds))
-               return;
-       ret = para_accept(ct->fd, &unix_addr, sizeof(unix_addr));
-       if (ret < 0) {
+       ret = para_accept(ct->fd, &s->rfds, &unix_addr, sizeof(unix_addr), &fd);
+       if (ret < 0)
                PARA_NOTICE_LOG("%s\n", para_strerror(-ret));
+       if (ret <= 0)
                return;
-       }
-       fd = ret;
        ret = mark_fd_nonblocking(fd);
        if (ret < 0) {
                PARA_NOTICE_LOG("%s\n", para_strerror(-ret));
index 3134faa..f91f41f 100644 (file)
--- a/audioc.c
+++ b/audioc.c
@@ -71,7 +71,7 @@ int main(int argc, char *argv[])
 {
        int ret = -E_AUDIOC_SYNTAX, fd;
        char *cf, *buf = NULL, *args;
-       size_t bufsize, loaded = 0;
+       size_t bufsize;
 
        if (audioc_cmdline_parser(argc, argv, &conf))
                goto out;
@@ -93,72 +93,33 @@ int main(int argc, char *argv[])
        args = conf.inputs_num?
                concat_args(conf.inputs_num, conf.inputs) :
                para_strdup("stat");
-       bufsize = conf.bufsize_arg;
-       buf = para_malloc(bufsize);
 
-       if (conf.socket_given) {
+       if (conf.socket_given)
                ret = create_remote_socket(conf.socket_arg);
-       } else {
-               char *hn = para_hostname(),
-                    *socket_name = make_message("/var/paraslash/audiod_socket.%s", hn);
-
+       else {
+               char *hn = para_hostname(), *socket_name = make_message(
+                       "/var/paraslash/audiod_socket.%s", hn);
                ret = create_remote_socket(socket_name);
                free(hn);
                free(socket_name);
        }
-       if (ret < 0)
+       if (ret < 0) {
+               PARA_EMERG_LOG("failed to create remote socket\n");
                goto out;
+       }
        fd = ret;
-       ret = mark_fd_nonblocking(fd);
-       if (ret < 0)
-               goto out;
-       ret = mark_fd_nonblocking(STDOUT_FILENO);
-       if (ret < 0)
-               goto out;
        ret = send_cred_buffer(fd, args);
        if (ret < 0)
                goto out;
-       for (;;) {
-               int max_fileno = -1, check_write = 0;
-               ssize_t len;
-               fd_set rfd, wfd;
-               FD_ZERO(&rfd);
-               FD_ZERO(&wfd);
-               if (loaded < bufsize)
-                       para_fd_set(fd, &rfd, &max_fileno);
-               if (loaded > 0) {
-                       para_fd_set(STDOUT_FILENO, &wfd, &max_fileno);
-                       check_write = 1;
-               }
-               ret = -E_AUDIOC_OVERRUN;
-               if (max_fileno < 0)
-                       goto out;
-               ret = para_select(max_fileno + 1, &rfd, &wfd, NULL);
-               if (ret < 0)
-                       goto out;
-               if (loaded < bufsize && FD_ISSET(fd, &rfd)) {
-                       len = recv_bin_buffer(fd, buf + loaded,
-                               bufsize - loaded);
-                       if (len <= 0) {
-                               ret = len < 0? -E_AUDIOC_READ : 0;
-                               goto out;
-                       }
-                       loaded += len;
-               }
-               if (check_write && FD_ISSET(STDOUT_FILENO, &wfd)) {
-                       ret = write(STDOUT_FILENO, buf, loaded);
-                       if (ret < 0) {
-                               ret = -E_AUDIOC_WRITE;
-                               goto out;
-                       }
-                       loaded -= ret;
-                       if (loaded && ret)
-                               memmove(buf, buf + ret, loaded);
-               }
-       }
+       bufsize = conf.bufsize_arg;
+       buf = para_malloc(bufsize);
+       do {
+               size_t n = ret = recv_bin_buffer(fd, buf, bufsize);
+               if (ret <= 0)
+                       break;
+               ret = write_all(STDOUT_FILENO, buf, &n);
+       } while (ret >= 0);
 out:
-       if (!ret && loaded && buf)
-               ret = write(STDOUT_FILENO, buf, loaded);
        if (ret < 0)
                PARA_ERROR_LOG("%s\n", para_strerror(-ret));
        return ret < 0? EXIT_FAILURE : EXIT_SUCCESS;
index 4a4a2ae..778318c 100644 (file)
--- a/audiod.c
+++ b/audiod.c
@@ -980,19 +980,16 @@ static void signal_pre_select(struct sched *s, struct task *t)
        para_fd_set(st->fd, &s->rfds, &s->max_fileno);
 }
 
-static void signal_post_select(struct sched *s, struct task *t)
+static void signal_post_select(struct sched *s, __a_unused struct task *t)
 {
-       struct signal_task *st = container_of(t, struct signal_task, task);
-
-       if (!FD_ISSET(st->fd, &s->rfds))
-               return;
+       int signum;
 
-       st->signum = para_next_signal();
-       switch (st->signum) {
+       signum = para_next_signal(&s->rfds);
+       switch (signum) {
        case SIGINT:
        case SIGTERM:
        case SIGHUP:
-               PARA_EMERG_LOG("terminating on signal %d\n", st->signum);
+               PARA_EMERG_LOG("terminating on signal %d\n", signum);
                clean_exit(EXIT_FAILURE, "caught deadly signal");
        }
 }
@@ -1023,9 +1020,7 @@ static void command_post_select(struct sched *s, struct task *t)
                last_status_dump = *now;
        }
 
-       if (!FD_ISSET(ct->fd, &s->rfds))
-               return;
-       ret = handle_connect(ct->fd);
+       ret = handle_connect(ct->fd, &s->rfds);
        if (ret < 0)
                PARA_ERROR_LOG("%s\n", para_strerror(-ret));
        audiod_status_dump();
index f0cb6ca..44b430c 100644 (file)
--- a/audiod.h
+++ b/audiod.h
@@ -70,7 +70,7 @@ extern struct audiod_args_info conf;
 extern int audiod_status;
 
 void __noreturn clean_exit(int status, const char *msg);
-int handle_connect(int accept_fd);
+int handle_connect(int accept_fd, fd_set *rfds);
 void audiod_status_dump(void);
 char *get_time_string(int slot_num);
 struct btr_node *audiod_get_btr_root(void);
index 8024ec5..ce1aff6 100644 (file)
@@ -421,31 +421,32 @@ static int check_perms(uid_t uid)
 }
 
 /**
- * handle arriving connections on the local socket
+ * Handle arriving connections on the local socket.
  *
- * \param accept_fd the fd to call accept() on
+ * \param accept_fd The fd to accept connections on.
+ * \param rfds If \a accept_fd is not set in \a rfds, do nothing.
  *
- * This is called whenever para_audiod's main task detects an incoming
- * connection by the readability of \a accept_fd. This function reads the
- * command sent by the peer, checks the connecting user's permissions by using
- * unix socket credentials (if supported by the OS) and calls the corresponding
- * command handler if permissions are OK.
+ * This is called in each iteration of the select loop. If there is an incoming
+ * connection on \a accept_fd, this function reads the command sent by the peer,
+ * checks the connecting user's permissions by using unix socket credentials
+ * (if supported by the OS) and calls the corresponding command handler if
+ * permissions are OK.
  *
- * \return positive on success, negative on errors
+ * \return Positive on success, negative on errors, zero if there was no
+ * connection to accept.
  *
  * \sa para_accept(), recv_cred_buffer()
  * */
-int handle_connect(int accept_fd)
+int handle_connect(int accept_fd, fd_set *rfds)
 {
-       int i, argc, ret, clifd = -1;
+       int i, argc, ret, clifd;
        char buf[MAXLINE], **argv = NULL;
        struct sockaddr_un unix_addr;
        uid_t uid;
 
-       ret = para_accept(accept_fd, &unix_addr, sizeof(struct sockaddr_un));
-       if (ret < 0)
-               goto out;
-       clifd = ret;
+       ret = para_accept(accept_fd, rfds, &unix_addr, sizeof(struct sockaddr_un), &clifd);
+       if (ret <= 0)
+               return ret;
        ret = recv_cred_buffer(clifd, buf, sizeof(buf) - 1);
        if (ret < 0)
                goto out;
index f34f81b..593cb2c 100644 (file)
@@ -107,17 +107,29 @@ static void client_pre_select(struct sched *s, struct task *t)
        }
 }
 
-static ssize_t client_recv_buffer(struct client_task *ct, char *buf, size_t len)
+static int client_recv_buffer(struct client_task *ct, fd_set *rfds,
+               char *buf, size_t sz, size_t *n)
 {
-       ssize_t ret;
+       int ret;
 
        if (ct->status < CL_SENT_CH_RESPONSE)
-               ret = recv_buffer(ct->rc4c.fd, buf, len);
-       else
-               ret = rc4_recv_buffer(&ct->rc4c, buf, len);
+               return read_nonblock(ct->rc4c.fd, buf, sz, rfds, n);
+
+       *n = 0;
+       ret = rc4_recv_buffer(&ct->rc4c, buf, sz);
+       /*
+        * rc4_recv_buffer is used with blocking fds elsewhere, so it
+        * does not use the nonblock-API. Therefore we need to
+        * check for EOF and EAGAIN.
+        */
        if (ret == 0)
                return -E_SERVER_EOF;
-       return ret;
+       if (ret == -ERRNO_TO_PARA_ERROR(EAGAIN))
+               return 0;
+       if (ret < 0)
+               return ret;
+       *n = ret;
+       return 0;
 }
 
 /**
@@ -138,6 +150,7 @@ static void client_post_select(struct sched *s, struct task *t)
        struct client_task *ct = container_of(t, struct client_task, task);
        struct btr_node *btrn = ct->btrn;
        int ret = 0;
+       size_t n;
        char buf[CLIENT_BUFSIZE];
 
        t->error = 0;
@@ -145,11 +158,9 @@ static void client_post_select(struct sched *s, struct task *t)
                return;
        switch (ct->status) {
        case CL_CONNECTED: /* receive welcome message */
-               if (!FD_ISSET(ct->rc4c.fd, &s->rfds))
-                       return;
-               ret = client_recv_buffer(ct, buf, sizeof(buf));
-               if (ret < 0)
-                       goto err;
+               ret = client_recv_buffer(ct, &s->rfds, buf, sizeof(buf), &n);
+               if (ret < 0 || n == 0)
+                       goto out;
                ct->status = CL_RECEIVED_WELCOME;
                return;
        case CL_RECEIVED_WELCOME: /* send auth command */
@@ -173,14 +184,12 @@ static void client_post_select(struct sched *s, struct task *t)
                /* the SHA1 of the decrypted challenge */
                unsigned char challenge_sha1[HASH_SIZE];
 
-               if (!FD_ISSET(ct->rc4c.fd, &s->rfds))
-                       return;
-               ret = client_recv_buffer(ct, buf, sizeof(buf));
-               if (ret < 0)
-                       goto err;
-               PARA_INFO_LOG("<-- [challenge] (%d bytes)\n", ret);
+               ret = client_recv_buffer(ct, &s->rfds, buf, sizeof(buf), &n);
+               if (ret < 0 || n == 0)
+                       goto out;
+               PARA_INFO_LOG("<-- [challenge] (%zu bytes)\n", n);
                ret = para_decrypt_buffer(ct->key_file, crypt_buf,
-                       (unsigned char *)buf, ret);
+                       (unsigned char *)buf, n);
                if (ret < 0)
                        goto err;
                sha1_hash((char *)crypt_buf, CHALLENGE_SIZE, challenge_sha1);
@@ -199,19 +208,15 @@ static void client_post_select(struct sched *s, struct task *t)
                }
        case CL_SENT_CH_RESPONSE: /* read server response */
                {
-               size_t bytes_received;
-               if (!FD_ISSET(ct->rc4c.fd, &s->rfds))
-                       return;
-               ret = client_recv_buffer(ct, buf, sizeof(buf));
-               if (ret < 0)
-                       goto err;
-               bytes_received = ret;
+               ret = client_recv_buffer(ct, &s->rfds, buf, sizeof(buf), &n);
+               if (ret < 0 || n == 0)
+                       goto out;
                /* check if server has sent "Proceed" message */
                ret = -E_CLIENT_AUTH;
-               if (bytes_received < PROCEED_MSG_LEN)
-                       goto err;
+               if (n < PROCEED_MSG_LEN)
+                       goto out;
                if (!strstr(buf, PROCEED_MSG))
-                       goto err;
+                       goto out;
                ct->status = CL_RECEIVED_PROCEED;
                return;
                }
@@ -239,23 +244,20 @@ static void client_post_select(struct sched *s, struct task *t)
        case CL_SENT_COMMAND:
                {
                char *buf2;
-               if (!FD_ISSET(ct->rc4c.fd, &s->rfds))
-                       return;
                /* can not use "buf" here because we need a malloced buffer */
                buf2 = para_malloc(CLIENT_BUFSIZE);
-               ret = client_recv_buffer(ct, buf2, CLIENT_BUFSIZE);
-               if (ret < 0) {
+               ret = client_recv_buffer(ct, &s->rfds, buf2, CLIENT_BUFSIZE, &n);
+               if (n > 0) {
+                       if (strstr(buf2, AWAITING_DATA_MSG)) {
+                               free(buf2);
+                               ct->status = CL_SENDING;
+                               return;
+                       }
+                       ct->status = CL_RECEIVING;
+                       btr_add_output(buf2, n, btrn);
+               } else
                        free(buf2);
-                       goto err;
-               }
-               if (strstr(buf2, AWAITING_DATA_MSG)) {
-                       free(buf2);
-                       ct->status = CL_SENDING;
-                       return;
-               }
-               ct->status = CL_RECEIVING;
-               btr_add_output(buf2, ret, btrn);
-               return;
+               goto out;
                }
        case CL_SENDING:
                {
@@ -283,20 +285,24 @@ static void client_post_select(struct sched *s, struct task *t)
                        goto err;
                if (ret == 0)
                        return;
+               /*
+                * The FD_ISSET() is not strictly necessary, but is allows us
+                * to skip the malloc below if there is nothing to read anyway.
+                */
                if (!FD_ISSET(ct->rc4c.fd, &s->rfds))
                        return;
                buf2 = para_malloc(CLIENT_BUFSIZE);
-               ret = client_recv_buffer(ct, buf2, CLIENT_BUFSIZE);
-               if (ret < 0) {
+               ret = client_recv_buffer(ct, &s->rfds, buf2, CLIENT_BUFSIZE, &n);
+               if (n > 0) {
+                       buf2 = para_realloc(buf2, n);
+                       btr_add_output(buf2, n, btrn);
+               } else
                        free(buf2);
-                       goto err;
-               }
-               buf2 = para_realloc(buf2, ret);
-               btr_add_output(buf2, ret, btrn);
-               return;
+               goto out;
                }
        }
 err:
+out:
        t->error = ret;
        if (ret < 0) {
                if (ret != -E_SERVER_EOF && ret != -E_BTR_EOF)
index f71a725..2ab9fca 100644 (file)
@@ -153,34 +153,27 @@ static void dccp_recv_post_select(struct sched *s, struct task *t)
        struct btr_node *btrn = rn->btrn;
        struct iovec iov[2];
        int ret, iovcnt;
+       size_t num_bytes;
 
        ret = btr_node_status(btrn, 0, BTR_NT_ROOT);
-       if (ret < 0)
-               goto err;
-       if (ret == 0)
-               return;
-       if (!FD_ISSET(pdd->fd, &s->rfds))
-               return; /* nothing to do */
+       if (ret <= 0)
+               goto out;
        iovcnt = btr_pool_get_buffers(pdd->btrp, iov);
        ret = -E_DCCP_OVERRUN;
        if (iovcnt == 0)
-               goto err;
-       ret = para_readv(pdd->fd, iov, iovcnt);
-       /* EAGAIN is possible even if FD_ISSET */
-       if (ret < 0 && is_errno(-ret, EAGAIN))
-               return;
-       if (ret == 0)
-               ret = -E_RECV_EOF;
-       if (ret < 0)
-               goto err;
-       if (ret <= iov[0].iov_len) /* only the first buffer was filled */
-               btr_add_output_pool(pdd->btrp, ret, btrn);
+               goto out;
+       ret = readv_nonblock(pdd->fd, iov, iovcnt, &s->rfds, &num_bytes);
+       if (num_bytes == 0)
+               goto out;
+       if (num_bytes <= iov[0].iov_len) /* only the first buffer was filled */
+               btr_add_output_pool(pdd->btrp, num_bytes, btrn);
        else { /* both buffers contain data */
                btr_add_output_pool(pdd->btrp, iov[0].iov_len, btrn);
-               btr_add_output_pool(pdd->btrp, ret - iov[0].iov_len, btrn);
+               btr_add_output_pool(pdd->btrp, num_bytes - iov[0].iov_len, btrn);
        }
-       return;
-err:
+out:
+       if (ret >= 0)
+               return;
        btr_remove_node(rn->btrn);
        t->error = ret;
 }
index fb2eafc..6248ae8 100644 (file)
@@ -70,9 +70,7 @@ static void dccp_post_select(fd_set *rfds, __a_unused fd_set *wfds)
        struct sender_client *sc;
        int tx_ccid;
 
-       if (dss->listen_fd < 0 || !FD_ISSET(dss->listen_fd, rfds))
-               return;
-       sc = accept_sender_client(dss);
+       sc = accept_sender_client(dss, rfds);
        if (!sc)
                return;
 
diff --git a/error.h b/error.h
index d92c9d6..3190306 100644 (file)
--- a/error.h
+++ b/error.h
@@ -228,7 +228,6 @@ extern const char **para_errlist[];
        PARA_ERROR(SENDMSG, "sendmsg() failed"), \
        PARA_ERROR(RECVMSG, "recvmsg() failed"), \
        PARA_ERROR(SCM_CREDENTIALS, "did not receive SCM credentials"), \
-       PARA_ERROR(RECV_PATTERN, "did not receive expected pattern"), \
 
 
 #define UDP_RECV_ERRORS \
@@ -374,6 +373,8 @@ extern const char **para_errlist[];
 
 #define FD_ERRORS \
        PARA_ERROR(FGETS, "fgets error"), \
+       PARA_ERROR(EOF, "end of file"), \
+       PARA_ERROR(READ_PATTERN, "did not read expected pattern"), \
 
 
 #define WRITE_ERRORS \
diff --git a/fd.c b/fd.c
index 46be228..7336bd5 100644 (file)
--- a/fd.c
+++ b/fd.c
 #include <dirent.h>
 #include <sys/mman.h>
 #include <fcntl.h>
-#include <sys/select.h>
 #include <sys/uio.h>
 
 #include "para.h"
 #include "error.h"
 #include "string.h"
+#include "fd.h"
 
 /**
  * Write a buffer to a file descriptor, re-write on short writes.
@@ -83,23 +83,143 @@ int write_nonblock(int fd, const char *buf, size_t len,
 }
 
 /**
- * Simple wrapper for readv().
+ * Read from a non-blocking file descriptor into multiple buffers.
  *
  * \param fd The file descriptor to read from.
  * \param iov Scatter/gather array used in readv().
  * \param iovcnt Number of elements in \a iov.
+ * \param rfds An optional fd set pointer.
+ * \param num_bytes Result pointer. Contains the number of bytes read from \a fd.
+ *
+ * If \a rfds is not \p NULL and the (non-blocking) file descriptor \a fd is
+ * not set in \a rfds, this function returns early without doing anything.
+ * Otherwise The function tries to read up to \a sz bytes from \a fd. As for
+ * write_nonblock(), EAGAIN is not considered an error condition. However, EOF
+ * is.
+ *
+ * \return Zero or a negative error code. If the underlying call to readv(2)
+ * returned zero (indicating an end of file condition) or failed for some
+ * reason other than \p EAGAIN, a negative return value is returned.
+ *
+ * In any case, \a num_bytes contains the number of bytes that have been
+ * successfully read from \a fd (zero if the first readv() call failed with
+ * EAGAIN). Note that even if the function returns negative, some data might
+ * have been read before the error occured. In this case \a num_bytes is
+ * positive.
+ *
+ * \sa \ref write_nonblock(), read(2), readv(2).
+ */
+int readv_nonblock(int fd, struct iovec *iov, int iovcnt, fd_set *rfds,
+               size_t *num_bytes)
+{
+       int ret, i, j;
+
+       *num_bytes = 0;
+       /*
+        * Avoid a shortcoming of select(): Reads from a non-blocking fd might
+        * return EAGAIN even if FD_ISSET() returns true. However, FD_ISSET()
+        * returning false definitely means that no data can currently be read.
+        * This is the common case, so it is worth to avoid the overhead of the
+        * read() system call in this case.
+        */
+       if (rfds && !FD_ISSET(fd, rfds))
+               return 0;
+
+       for (i = 0, j = 0; i < iovcnt;) {
+
+               /* fix up the first iov */
+               assert(j < iov[i].iov_len);
+               iov[i].iov_base += j;
+               iov[i].iov_len -= j;
+               ret = readv(fd, iov + i, iovcnt - i);
+               iov[i].iov_base -= j;
+               iov[i].iov_len += j;
+
+               if (ret == 0)
+                       return -E_EOF;
+               if (ret < 0) {
+                       if (errno == EAGAIN)
+                               return 0;
+                       return -ERRNO_TO_PARA_ERROR(errno);
+               }
+               *num_bytes += ret;
+               while (ret > 0) {
+                       if (ret < iov[i].iov_len - j) {
+                               j += ret;
+                               break;
+                       }
+                       ret -= iov[i].iov_len - j;
+                       j = 0;
+                       if (++i >= iovcnt)
+                               break;
+               }
+       }
+       return 0;
+}
+
+/**
+ * Read from a non-blocking file descriptor into a single buffer.
+ *
+ * \param fd The file descriptor to read from.
+ * \param buf The buffer to read data to.
+ * \param sz The size of \a buf.
+ * \param rfds \see \ref readv_nonblock().
+ * \param num_bytes \see \ref readv_nonblock().
+ *
+ * This is a simple wrapper for readv_nonblock() which uses an iovec with a single
+ * buffer.
+ *
+ * \return The return value of the underlying call to readv_nonblock().
+ */
+int read_nonblock(int fd, void *buf, size_t sz, fd_set *rfds, size_t *num_bytes)
+{
+       struct iovec iov = {.iov_base = buf, .iov_len = sz};
+       return readv_nonblock(fd, &iov, 1, rfds, num_bytes);
+}
+
+/**
+ * Read a buffer and check its content for a pattern.
+ *
+ * \param fd The file descriptor to receive from.
+ * \param pattern The expected pattern.
+ * \param bufsize The size of the internal buffer.
+ * \param rfds Passed to read_nonblock().
  *
- * \return A negative error code on errors, the return value of the underlying
- * call to readv() otherwise.
+ * This function tries to read at most \a bufsize bytes from the non-blocking
+ * file descriptor \a fd. If at least \p strlen(\a pattern) bytes have been
+ * received, the beginning of the received buffer is compared with \a pattern,
+ * ignoring case.
  *
- * \sa readv(2).
+ * \return Positive if \a pattern was received, negative on errors, zero if no data
+ * was available to read.
+ *
+ * \sa \ref read_nonblock(), \sa strncasecmp(3).
  */
-int para_readv(int fd, struct iovec *iov, int iovcnt)
+int read_pattern(int fd, const char *pattern, size_t bufsize, fd_set *rfds)
 {
-       int ret = readv(fd, iov, iovcnt);
+       size_t n, len;
+       char *buf = para_malloc(bufsize + 1);
+       int ret = read_nonblock(fd, buf, bufsize, rfds, &n);
 
+       buf[n] = '\0';
        if (ret < 0)
-               return -ERRNO_TO_PARA_ERROR(errno);
+               goto out;
+       ret = 0;
+       if (n == 0)
+               goto out;
+       ret = -E_READ_PATTERN;
+       len = strlen(pattern);
+       if (n < len)
+               goto out;
+       if (strncasecmp(buf, pattern, len) != 0)
+               goto out;
+       ret = 1;
+out:
+       if (ret < 0) {
+               PARA_NOTICE_LOG("%s\n", para_strerror(-ret));
+               PARA_NOTICE_LOG("recvd %zu bytes: %s\n", n, buf);
+       }
+       free(buf);
        return ret;
 }
 
diff --git a/fd.h b/fd.h
index 6809230..c21a7d1 100644 (file)
--- a/fd.h
+++ b/fd.h
@@ -7,7 +7,6 @@
 /** \file fd.h exported symbols from fd.c */
 
 int write_all(int fd, const char *buf, size_t *len);
-int para_readv(int fd, struct iovec *iov, int iovcnt);
 int file_exists(const char *);
 int para_select(int n, fd_set *readfds, fd_set *writefds,
                struct timeval *timeout_tv);
@@ -27,6 +26,10 @@ int mmap_full_file(const char *filename, int open_mode, void **map,
 int para_munmap(void *start, size_t length);
 int write_ok(int fd);
 void valid_fd_012(void);
+int readv_nonblock(int fd, struct iovec *iov, int iovcnt, fd_set *rfds,
+               size_t *num_bytes);
+int read_nonblock(int fd, void *buf, size_t sz, fd_set *rfds, size_t *num_bytes);
+int read_pattern(int fd, const char *pattern, size_t bufsize, fd_set *rfds);
 int write_nonblock(int fd, const char *buf, size_t len,
                size_t max_bytes_per_write);
 int for_each_file_in_dir(const char *dirname,
diff --git a/gui.c b/gui.c
index 64fab61..fa1538b 100644 (file)
--- a/gui.c
+++ b/gui.c
@@ -54,7 +54,7 @@ static int cmd_died, curses_active;
 static pid_t cmd_pid;
 
 static int command_pipe = -1;
-static int audiod_pipe = -1;
+static int stat_pipe = -1;
 static struct gui_args_info conf;
 
 enum {GETCH_MODE, COMMAND_MODE, EXTERNAL_MODE};
@@ -191,20 +191,6 @@ static struct gui_command command_list[] = {
        }
 };
 
-static int para_open_audiod_pipe(char *cmd)
-{
-       int fds[3] = {0, 1, 0};
-       pid_t pid;
-       int ret = para_exec_cmdline_pid(&pid, cmd, fds);
-       if (ret < 0)
-               return ret;
-       ret = mark_fd_nonblocking(fds[1]);
-       if (ret > 0)
-               return fds[1];
-       close(fds[1]);
-       return ret;
-}
-
 static int find_cmd_byname(char *name)
 {
        int i;
@@ -708,12 +694,15 @@ print:
        return 1;
 }
 
-static int read_audiod_pipe(int fd)
+static int read_stat_pipe(fd_set *rfds)
 {
        static char *buf;
        static int bufsize, loaded;
-       int ret;
+       int ret, ret2;
+       size_t sz;
 
+       if (stat_pipe < 0)
+               return 0;
        if (loaded >= bufsize) {
                if (bufsize > 1000 * 1000) {
                        loaded = 0;
@@ -723,16 +712,18 @@ static int read_audiod_pipe(int fd)
                buf = para_realloc(buf, bufsize);
        }
        assert(loaded < bufsize);
-       ret = read(fd, buf + loaded, bufsize - loaded);
-       if (ret <= 0)
-               return ret;
-       loaded += ret;
-       ret = for_each_stat_item(buf, loaded, update_item);
-       if (ret < 0)
-               return ret;
-       if (ret > 0 && ret < loaded)
-               memmove(buf, buf + loaded - ret, ret);
-       loaded = ret;
+       ret = read_nonblock(stat_pipe, buf + loaded, bufsize - loaded,
+               rfds, &sz);
+       loaded += sz;
+       ret2 = for_each_stat_item(buf, loaded, update_item);
+       if (ret < 0 || ret2 < 0) {
+               loaded = 0;
+               return ret2 < 0? ret2 : ret;
+       }
+       sz = ret2; /* what is left */
+       if (sz > 0 && sz < loaded)
+               memmove(buf, buf + loaded - sz, sz);
+       loaded = sz;
        return 1;
 }
 
@@ -902,15 +893,24 @@ static void handle_signal(int sig)
        }
 }
 
-static int open_audiod_pipe(void)
+static int open_stat_pipe(void)
 {
        static int init = 1;
+       int ret, fds[3] = {0, 1, 0};
+       pid_t pid;
 
        if (init)
                init = 0;
        else
                sleep(1);
-       return para_open_audiod_pipe(conf.stat_cmd_arg);
+       ret = para_exec_cmdline_pid(&pid, conf.stat_cmd_arg, fds);
+       if (ret < 0)
+               return ret;
+       ret = mark_fd_nonblocking(fds[1]);
+       if (ret >= 0)
+               return fds[1];
+       close(fds[1]);
+       return ret;
 }
 
 /*
@@ -931,7 +931,7 @@ static int do_select(int mode)
 {
        fd_set rfds;
        int ret;
-       int max_fileno, cp_numread = 1;
+       int max_fileno;
        char command_buf[4096] = "";
        int cbo = 0; /* command buf offset */
        struct timeval tv;
@@ -941,11 +941,10 @@ repeat:
 //     ret = refresh_status();
        FD_ZERO(&rfds);
        max_fileno = 0;
-       /* audiod pipe */
-       if (audiod_pipe < 0)
-               audiod_pipe = open_audiod_pipe();
-       if (audiod_pipe >= 0)
-               para_fd_set(audiod_pipe, &rfds, &max_fileno);
+       if (stat_pipe < 0)
+               stat_pipe = open_stat_pipe();
+       if (stat_pipe >= 0)
+               para_fd_set(stat_pipe, &rfds, &max_fileno);
        /* signal pipe */
        para_fd_set(signal_pipe, &rfds, &max_fileno);
        /* command pipe only for COMMAND_MODE */
@@ -955,46 +954,41 @@ repeat:
        if (ret <= 0)
                goto check_return; /* skip fd checks */
        /* signals */
-       if (FD_ISSET(signal_pipe, &rfds)) {
-               int sig_nr = para_next_signal();
-               if (sig_nr > 0)
-                       handle_signal(sig_nr);
-       }
+       ret = para_next_signal(&rfds);
+       if (ret > 0)
+               handle_signal(ret);
        /* read command pipe if ready */
-       if (command_pipe >= 0 && mode == COMMAND_MODE &&
-                       FD_ISSET(command_pipe, &rfds)) {
-               cp_numread = read(command_pipe, command_buf + cbo,
-                       sizeof(command_buf) - 1 - cbo);
-               if (cp_numread >= 0)
-                       cbo += cp_numread;
-               else {
-                       if (cp_numread < 0)
-                               PARA_ERROR_LOG("read error (%d)", cp_numread);
+       if (command_pipe >= 0 && mode == COMMAND_MODE) {
+               size_t sz;
+               ret = read_nonblock(command_pipe, command_buf + cbo,
+                       sizeof(command_buf) - 1 - cbo, &rfds, &sz);
+               cbo += sz;
+               sz = cbo;
+               cbo = for_each_line(command_buf, cbo, &add_output_line, NULL);
+               if (sz != cbo)
+                       wrefresh(bot.win);
+               if (ret < 0) {
+                       PARA_NOTICE_LOG("closing command pipe: %s",
+                               para_strerror(-ret));
                        close(command_pipe);
                        command_pipe = -1;
+                       return 0;
                }
        }
-       if (audiod_pipe >= 0 && FD_ISSET(audiod_pipe, &rfds))
-               if (read_audiod_pipe(audiod_pipe) <= 0) {
-                       close(audiod_pipe);
-                       audiod_pipe = -1;
-                       clear_all_items();
-                       free(stat_content[SI_BASENAME]);
-                       stat_content[SI_BASENAME] =
-                               para_strdup("audiod not running!?");
-                       print_all_items();
-               }
+       ret = read_stat_pipe(&rfds);
+       if (ret < 0) {
+               PARA_NOTICE_LOG("closing stat pipe: %s\n", para_strerror(-ret));
+               close(stat_pipe);
+               stat_pipe = -1;
+               clear_all_items();
+               free(stat_content[SI_BASENAME]);
+               stat_content[SI_BASENAME] =
+                       para_strdup("stat command terminated!?");
+               print_all_items();
+       }
 check_return:
        switch (mode) {
        case COMMAND_MODE:
-               if (cp_numread <= 0 && !cbo) /* command complete */
-                       return 0;
-               if (cbo)
-                       cbo = for_each_line(command_buf, cbo,
-                               &add_output_line, NULL);
-               if (cp_numread <= 0)
-                       cbo = 0;
-               wrefresh(bot.win);
                ret = wgetch(top.win);
                if (ret != ERR && ret != KEY_RESIZE) {
                        if (command_pipe) {
index cc376dd..9ade8df 100644 (file)
@@ -95,7 +95,7 @@ static void http_recv_post_select(struct sched *s, struct task *t)
        struct btr_node *btrn = rn->btrn;
        int ret;
        char *buf;
-       size_t sz;
+       size_t sz, n;
 
        t->error = 0;
        ret = btr_node_status(btrn, 0, BTR_NT_ROOT);
@@ -116,12 +116,12 @@ static void http_recv_post_select(struct sched *s, struct task *t)
                phd->status = HTTP_SENT_GET_REQUEST;
                return;
        }
-       if (!FD_ISSET(phd->fd, &s->rfds))
-               return;
        if (phd->status == HTTP_SENT_GET_REQUEST) {
-               ret = recv_pattern(phd->fd, HTTP_OK_MSG, strlen(HTTP_OK_MSG));
+               ret = read_pattern(phd->fd, HTTP_OK_MSG, strlen(HTTP_OK_MSG), &s->rfds);
                if (ret < 0)
                        goto err;
+               if (ret == 0)
+                       return;
                PARA_INFO_LOG("received ok msg, streaming\n");
                phd->status = HTTP_STREAMING;
                return;
@@ -130,13 +130,11 @@ static void http_recv_post_select(struct sched *s, struct task *t)
        sz = btr_pool_get_buffer(phd->btrp, &buf);
        if (sz == 0)
                goto err;
-       ret = recv_bin_buffer(phd->fd, buf, sz);
-       if (ret == 0)
-               ret = -E_RECV_EOF;
-       if (ret < 0)
-               goto err;
-       btr_add_output_pool(phd->btrp, ret, btrn);
-       return;
+       ret = read_nonblock(phd->fd, buf, sz, &s->rfds, &n);
+       if (n > 0)
+               btr_add_output_pool(phd->btrp, n, btrn);
+       if (ret >= 0)
+               return;
 err:
        btr_remove_node(rn->btrn);
        t->error = ret;
index 828d99e..424d63b 100644 (file)
@@ -96,23 +96,20 @@ static void http_post_select(fd_set *rfds, __a_unused fd_set *wfds)
 {
        struct sender_client *sc, *tmp;
        struct private_http_sender_data *phsd;
+       int ret;
 
-       if (hss->listen_fd < 0)
-               return;
        list_for_each_entry_safe(sc, tmp, &hss->client_list, node) {
                phsd = sc->private_data;
                switch (phsd->status) {
                case HTTP_STREAMING: /* nothing to do */
                        break;
                case HTTP_CONNECTED: /* need to recv get request */
-                       if (FD_ISSET(sc->fd, rfds)) {
-                               if (recv_pattern(sc->fd, HTTP_GET_MSG, MAXLINE)
-                                               < 0) {
-                                       phsd->status = HTTP_INVALID_GET_REQUEST;
-                               } else {
-                                       phsd->status = HTTP_GOT_GET_REQUEST;
-                                       PARA_INFO_LOG("received get request\n");
-                               }
+                       ret = read_pattern(sc->fd, HTTP_GET_MSG, MAXLINE, rfds);
+                       if (ret < 0)
+                               phsd->status = HTTP_INVALID_GET_REQUEST;
+                       else if (ret > 0) {
+                               phsd->status = HTTP_GOT_GET_REQUEST;
+                               PARA_INFO_LOG("received get request\n");
                        }
                        break;
                case HTTP_GOT_GET_REQUEST: /* need to send ok msg */
@@ -125,9 +122,7 @@ static void http_post_select(fd_set *rfds, __a_unused fd_set *wfds)
                        break;
                }
        }
-       if (!FD_ISSET(hss->listen_fd, rfds))
-               return;
-       sc = accept_sender_client(hss);
+       sc = accept_sender_client(hss, rfds);
        if (!sc)
                return;
        phsd = para_malloc(sizeof(*phsd));
diff --git a/net.c b/net.c
index 92ae217..59b7f36 100644 (file)
--- a/net.c
+++ b/net.c
@@ -756,23 +756,36 @@ int recv_buffer(int fd, char *buf, size_t size)
  * Wrapper around the accept system call.
  *
  * \param fd The listening socket.
+ * \param rfds An optional fd_set pointer.
  * \param addr Structure which is filled in with the address of the peer socket.
  * \param size Should contain the size of the structure pointed to by \a addr.
+ * \param new_fd Result pointer.
  *
- * Accept incoming connections on \a addr. Retry if interrupted.
+ * Accept incoming connections on \a addr, retry if interrupted. If \a rfds is
+ * not \p NULL, return 0 if \a fd is not set in \a rfds without calling accept().
  *
- * \return The new file descriptor on success, negative on errors.
+ * \return Negative on errors, zero if no connections are present to be accepted,
+ * one otherwise.
  *
  * \sa accept(2).
  */
-int para_accept(int fd, void *addr, socklen_t size)
+int para_accept(int fd, fd_set *rfds, void *addr, socklen_t size, int *new_fd)
 {
-       int new_fd;
+       int ret;
 
+       if (rfds && !FD_ISSET(fd, rfds))
+               return 0;
        do
-               new_fd = accept(fd, (struct sockaddr *) addr, &size);
-       while (new_fd < 0 && errno == EINTR);
-       return new_fd < 0? -ERRNO_TO_PARA_ERROR(errno) : new_fd;
+               ret = accept(fd, (struct sockaddr *) addr, &size);
+       while (ret < 0 && errno == EINTR);
+
+       if (ret >= 0) {
+               *new_fd = ret;
+               return 1;
+       }
+       if (errno == EAGAIN || errno == EWOULDBLOCK)
+               return 0;
+       return -ERRNO_TO_PARA_ERROR(errno);
 }
 
 /**
@@ -1017,41 +1030,3 @@ int recv_cred_buffer(int fd, char *buf, size_t size)
        return result;
 }
 #endif /* HAVE_UCRED */
-
-/**
- * Receive a buffer and check for a pattern.
- *
- * \param fd The file descriptor to receive from.
- * \param pattern The expected pattern.
- * \param bufsize The size of the internal buffer.
- *
- * \return Positive if \a pattern was received, negative otherwise.
- *
- * This function tries to receive at most \a bufsize bytes from file descriptor
- * \a fd. If at least \p strlen(\a pattern) bytes were received, the beginning
- * of the received buffer is compared with \a pattern, ignoring case.
- *
- * \sa recv_buffer(), \sa strncasecmp(3).
- */
-int recv_pattern(int fd, const char *pattern, size_t bufsize)
-{
-       size_t len = strlen(pattern);
-       char *buf = para_malloc(bufsize + 1);
-       int ret = -E_RECV_PATTERN, n = recv_buffer(fd, buf, bufsize + 1);
-
-       if (n < len)
-               goto out;
-       if (strncasecmp(buf, pattern, len))
-               goto out;
-       ret = 1;
-out:
-       if (ret < 0) {
-               PARA_NOTICE_LOG("did not receive pattern '%s'\n", pattern);
-               if (n > 0)
-                       PARA_NOTICE_LOG("recvd %d bytes: %s\n", n, buf);
-               else if (n < 0)
-                       PARA_NOTICE_LOG("%s\n", para_strerror(-n));
-       }
-       free(buf);
-       return ret;
-}
diff --git a/net.h b/net.h
index 93c0c5e..457c24d 100644 (file)
--- a/net.h
+++ b/net.h
@@ -139,13 +139,12 @@ __printf_2_3 int send_va_buffer(int fd, const char *fmt, ...);
 int recv_bin_buffer(int fd, char *buf, size_t size);
 int recv_buffer(int fd, char *buf, size_t size);
 
-int para_accept(int, void *addr, socklen_t size);
+int para_accept(int fd, fd_set *rfds, void *addr, socklen_t size, int *new_fd);
 int create_local_socket(const char *name, struct sockaddr_un *unix_addr,
        mode_t mode);
 int create_remote_socket(const char *name);
 int recv_cred_buffer(int, char *, size_t);
 ssize_t send_cred_buffer(int, char*);
-int recv_pattern(int fd, const char *pattern, size_t bufsize);
 
 /**
  * Functions and definitions to support \p IPPROTO_DCCP
diff --git a/send.h b/send.h
index 85e5ed1..acf62db 100644 (file)
--- a/send.h
+++ b/send.h
@@ -140,7 +140,7 @@ void generic_com_deny(struct sender_command_data *scd,
 int generic_com_on(struct sender_status *ss, unsigned protocol);
 void generic_com_off(struct sender_status *ss);
 char *generic_sender_help(void);
-struct sender_client *accept_sender_client(struct sender_status *ss);
+struct sender_client *accept_sender_client(struct sender_status *ss, fd_set *rfds);
 int send_queued_chunks(int fd, struct chunk_queue *cq,
                size_t max_bytes_per_write);
 int parse_fec_url(const char *arg, struct sender_command_data *scd);
index f931fda..b44c813 100644 (file)
@@ -348,15 +348,18 @@ void generic_com_off(struct sender_status *ss)
  * \sa \ref para_accept(), \ref mark_fd_nonblocking(), \ref acl_check_access(),
  * \ref cq_new(), \ref add_close_on_fork_list().
  */
-struct sender_client *accept_sender_client(struct sender_status *ss)
+struct sender_client *accept_sender_client(struct sender_status *ss, fd_set *rfds)
 {
        struct sender_client *sc;
-       int fd, ret = para_accept(ss->listen_fd, NULL, 0);
-       if (ret < 0) {
+       int fd, ret;
+
+       if (ss->listen_fd < 0)
+               return NULL;
+       ret = para_accept(ss->listen_fd, rfds, NULL, 0, &fd);
+       if (ret < 0)
                PARA_ERROR_LOG("%s\n", para_strerror(-ret));
+       if (ret <= 0)
                return NULL;
-       }
-       fd = ret;
        ret = -E_MAX_CLIENTS;
        if (ss->max_clients > 0 && ss->num_clients >= ss->max_clients)
                goto err_out;
index 89a8137..bc14303 100644 (file)
--- a/server.c
+++ b/server.c
@@ -276,15 +276,13 @@ static void handle_sighup(void)
                kill(mmd->afs_pid, SIGHUP);
 }
 
-static void signal_post_select(struct sched *s, struct task *t)
+static void signal_post_select(struct sched *s, __a_unused struct task *t)
 {
-       struct signal_task *st = container_of(t, struct signal_task, task);
+       int signum = para_next_signal(&s->rfds);
 
-       if (!FD_ISSET(st->fd, &s->rfds))
+       switch (signum) {
+       case 0:
                return;
-
-       st->signum = para_next_signal();
-       switch (st->signum) {
        case SIGHUP:
                handle_sighup();
                break;
@@ -304,7 +302,7 @@ static void signal_post_select(struct sched *s, struct task *t)
        /* die on sigint/sigterm. Kill all children too. */
        case SIGINT:
        case SIGTERM:
-               PARA_EMERG_LOG("terminating on signal %d\n", st->signum);
+               PARA_EMERG_LOG("terminating on signal %d\n", signum);
                kill(0, SIGTERM);
                /*
                 * We must wait for afs because afs catches SIGINT/SIGTERM.
@@ -366,12 +364,9 @@ static void command_post_select(struct sched *s, struct task *t)
        pid_t child_pid;
        uint32_t *chunk_table;
 
-       if (!FD_ISSET(sct->listen_fd, &s->rfds))
-               return;
-       ret = para_accept(sct->listen_fd, NULL, 0);
-       if (ret < 0)
+       ret = para_accept(sct->listen_fd, &s->rfds, NULL, 0, &new_fd);
+       if (ret <= 0)
                goto out;
-       new_fd = ret;
        peer_name = remote_name(new_fd);
        PARA_INFO_LOG("got connection from %s, forking\n", peer_name);
        mmd->num_connects++;
index bded532..0b4b6f0 100644 (file)
--- a/signal.c
+++ b/signal.c
@@ -151,27 +151,24 @@ void para_install_sighandler(int sig)
 /**
  * Return the number of the next pending signal.
  *
- * This should be called if the fd for the signal pipe is ready for reading.
+ * \param rfds Th fd_set containing the signal pipe.
  *
- * \return On success, the number of the received signal is returned.  If the
- * read returned zero or was interrupted by another signal the function returns
- * 0.  Otherwise, a negative error value is returned.
+ * \return On success, the number of the received signal is returned. If there
+ * is no signal currently pending, the function returns zero. On read errors
+ * from the signal pipe, the process is terminated.
  */
-int para_next_signal(void)
+int para_next_signal(fd_set *rfds)
 {
-       int s;
-       ssize_t r = read(signal_pipe[0], &s, sizeof(s));
+       size_t n;
+       int s, ret = read_nonblock(signal_pipe[0], &s, sizeof(s), rfds, &n);
 
-       if (!r) {
-               PARA_CRIT_LOG("read from signal pipe returned zero\n");
-               return 0;
-       }
-       if (r < 0) {
-               if (errno == EAGAIN || errno == EINTR)
-                       return 0;
-               return -ERRNO_TO_PARA_ERROR(errno);
+       if (ret < 0) {
+               PARA_EMERG_LOG("%s\n", para_strerror(-ret));
+               exit(EXIT_FAILURE);
        }
-       assert(r == sizeof(s));
+       if (n == 0)
+               return 0;
+       assert(n == sizeof(s));
        PARA_DEBUG_LOG("next signal: %d\n", s);
        return s;
 }
index 7be960e..1dbfc98 100644 (file)
--- a/signal.h
+++ b/signal.h
@@ -12,8 +12,6 @@
 struct signal_task {
        /** The signal pipe. */
        int fd;
-       /** The number of the most recent signal. */
-       int signum;
        /** The associated task structure. */
        struct task task;
 };
@@ -22,5 +20,5 @@ int para_signal_init(void);
 void para_sigaction(int sig, void (*handler)(int));
 void para_install_sighandler(int);
 int para_reap_child(pid_t *pid);
-int para_next_signal(void);
+int para_next_signal(fd_set *rfds);
 void para_signal_shutdown(void);
diff --git a/stdin.c b/stdin.c
index 5fc91c9..ca5eb0e 100644 (file)
--- a/stdin.c
+++ b/stdin.c
@@ -61,7 +61,7 @@ static void stdin_post_select(struct sched *s, struct task *t)
 {
        struct stdin_task *sit = container_of(t, struct stdin_task, task);
        ssize_t ret;
-       size_t sz;
+       size_t sz, n;
        char *buf = NULL;
 
        t->error = 0;
@@ -70,8 +70,6 @@ static void stdin_post_select(struct sched *s, struct task *t)
                goto err;
        if (ret == 0)
                return;
-       if (!FD_ISSET(STDIN_FILENO, &s->rfds))
-               return;
        sz = btr_pool_get_buffer(sit->btrp, &buf);
        if (sz == 0)
                return;
@@ -81,15 +79,11 @@ static void stdin_post_select(struct sched *s, struct task *t)
         * reference can not be freed, we're stuck.
         */
        sz = PARA_MIN(sz, btr_pool_size(sit->btrp) / 2);
-       ret = read(STDIN_FILENO, buf, sz);
-       if (ret < 0)
-               ret = -ERRNO_TO_PARA_ERROR(errno);
-       if (ret == 0)
-               ret = -E_STDIN_EOF;
-       if (ret < 0)
-               goto err;
-       btr_add_output_pool(sit->btrp, ret, sit->btrn);
-       return;
+       ret = read_nonblock(STDIN_FILENO, buf, sz, &s->rfds, &n);
+       if (n > 0)
+               btr_add_output_pool(sit->btrp, n, sit->btrn);
+       if (ret >= 0)
+               return;
 err:
        btr_remove_node(sit->btrn);
        //btr_pool_free(sit->btrp);
index 00ad3e2..5520c6f 100644 (file)
@@ -67,43 +67,36 @@ static void udp_recv_post_select(__a_unused struct sched *s, struct task *t)
        struct receiver_node *rn = container_of(t, struct receiver_node, task);
        struct private_udp_recv_data *purd = rn->private_data;
        struct btr_node *btrn = rn->btrn;
-       size_t packet_size;
+       size_t num_bytes;
        struct iovec iov[2];
-       int ret, iovcnt;
+       int ret, readv_ret, iovcnt;
 
        t->error = 0;
        ret = btr_node_status(btrn, 0, BTR_NT_ROOT);
-       if (ret < 0)
-               goto err;
-       if (ret == 0)
-               return;
-       if (!FD_ISSET(purd->fd, &s->rfds))
-               return;
+       if (ret <= 0)
+               goto out;
        iovcnt = btr_pool_get_buffers(purd->btrp, iov);
        ret = -E_UDP_OVERRUN;
        if (iovcnt == 0)
-               goto err;
-       ret = para_readv(purd->fd, iov, iovcnt);
-       /* EAGAIN is possible even if FD_ISSET */
-       if (ret < 0 && is_errno(-ret, EAGAIN))
-               return;
-       if (ret == 0)
-               ret = -E_RECV_EOF;
+               goto out;
+       ret = readv_nonblock(purd->fd, iov, iovcnt, &s->rfds, &num_bytes);
+       if (num_bytes == 0)
+               goto out;
+       readv_ret = ret;
+       ret = udp_check_eof(num_bytes, iov);
        if (ret < 0)
-               goto err;
-       packet_size = ret;
-       ret = udp_check_eof(packet_size, iov);
-       if (ret < 0)
-               goto err;
-       if (iov[0].iov_len >= packet_size)
-               btr_add_output_pool(purd->btrp, packet_size, btrn);
+               goto out;
+       if (iov[0].iov_len >= num_bytes)
+               btr_add_output_pool(purd->btrp, num_bytes, btrn);
        else { /* both buffers contain data */
                btr_add_output_pool(purd->btrp, iov[0].iov_len, btrn);
-               btr_add_output_pool(purd->btrp, packet_size - iov[0].iov_len,
+               btr_add_output_pool(purd->btrp, num_bytes - iov[0].iov_len,
                        btrn);
        }
-       return;
-err:
+       ret = readv_ret;
+out:
+       if (ret >= 0)
+               return;
        btr_remove_node(btrn);
        t->error = ret;
        close(purd->fd);
diff --git a/vss.c b/vss.c
index bfb0f0a..898180c 100644 (file)
--- a/vss.c
+++ b/vss.c
@@ -781,16 +781,20 @@ static int recv_afs_msg(int afs_socket, int *fd, uint32_t *code, uint32_t *data)
        return 1;
 }
 
-static void recv_afs_result(struct vss_task *vsst)
+static void recv_afs_result(struct vss_task *vsst, fd_set *rfds)
 {
        int ret, passed_fd, shmid;
        uint32_t afs_code = 0, afs_data = 0;
        struct stat statbuf;
 
-       vsst->afsss = AFS_SOCKET_READY;
+       if (!FD_ISSET(vsst->afs_socket, rfds))
+               return;
        ret = recv_afs_msg(vsst->afs_socket, &passed_fd, &afs_code, &afs_data);
+       if (ret == -ERRNO_TO_PARA_ERROR(EAGAIN))
+               return;
        if (ret < 0)
                goto err;
+       vsst->afsss = AFS_SOCKET_READY;
        PARA_DEBUG_LOG("fd: %d, code: %u, shmid: %u\n", passed_fd, afs_code,
                afs_data);
        ret = -E_NOFD;
@@ -920,10 +924,9 @@ static void vss_post_select(struct sched *s, struct task *t)
                        senders[sender_num].client_cmds[num](&mmd->sender_cmd_data);
                mmd->sender_cmd_data.cmd_num = -1;
        }
-       if (vsst->afsss != AFS_SOCKET_CHECK_FOR_WRITE) {
-               if (FD_ISSET(vsst->afs_socket, &s->rfds))
-                       recv_afs_result(vsst);
-       } else if (FD_ISSET(vsst->afs_socket, &s->wfds)) {
+       if (vsst->afsss != AFS_SOCKET_CHECK_FOR_WRITE)
+               recv_afs_result(vsst, &s->rfds);
+       else if (FD_ISSET(vsst->afs_socket, &s->wfds)) {
                PARA_NOTICE_LOG("requesting new fd from afs\n");
                ret = send_buffer(vsst->afs_socket, "new");
                if (ret < 0)