static void afs_signal_post_select(struct sched *s, struct task *t)
{
- struct signal_task *st = container_of(t, struct signal_task, task);
+ int signum;
+
if (getppid() == 1) {
PARA_EMERG_LOG("para_server died\n");
goto shutdown;
}
- if (!FD_ISSET(st->fd, &s->rfds))
+ signum = para_next_signal(&s->rfds);
+ if (signum == 0)
return;
- st->signum = para_next_signal();
- if (st->signum == SIGHUP) {
+ if (signum == SIGHUP) {
close_afs_tables();
parse_config_or_die(1);
t->error = open_afs_tables();
init_admissible_files(current_mop);
return;
}
- PARA_EMERG_LOG("terminating on signal %d\n", st->signum);
+ PARA_EMERG_LOG("terminating on signal %d\n", signum);
shutdown:
sched_shutdown();
t->error = -E_AFS_SIGNAL;
return shm_detach(query_shm);
}
-static int execute_server_command(void)
+static int execute_server_command(fd_set *rfds)
{
char buf[8];
- int ret = recv_bin_buffer(server_socket, buf, sizeof(buf) - 1);
+ size_t n;
+ int ret = read_nonblock(server_socket, buf, sizeof(buf) - 1, rfds, &n);
- if (ret <= 0) {
- if (!ret)
- ret = -ERRNO_TO_PARA_ERROR(ECONNRESET);
- goto err;
- }
- buf[ret] = '\0';
- PARA_DEBUG_LOG("received: %s\n", buf);
- ret = -E_BAD_CMD;
+ if (ret < 0 || n == 0)
+ return ret;
+ buf[n] = '\0';
if (strcmp(buf, "new"))
- goto err;
- ret = open_next_audio_file();
-err:
- return ret;
+ return -E_BAD_CMD;
+ return open_next_audio_file();
}
-static void execute_afs_command(int fd, uint32_t expected_cookie)
+/* returns 0 if no data available, 1 else */
+static int execute_afs_command(int fd, fd_set *rfds, uint32_t expected_cookie)
{
uint32_t cookie;
int query_shmid;
char buf[sizeof(cookie) + sizeof(query_shmid)];
- int ret = recv_bin_buffer(fd, buf, sizeof(buf));
+ size_t n;
+ int ret = read_nonblock(fd, buf, sizeof(buf), rfds, &n);
if (ret < 0)
goto err;
- if (ret != sizeof(buf)) {
+ if (n == 0)
+ return 0;
+ if (n != sizeof(buf)) {
PARA_NOTICE_LOG("short read (%d bytes, expected %lu)\n",
ret, (long unsigned) sizeof(buf));
- return;
+ return 1;
}
cookie = *(uint32_t *)buf;
if (cookie != expected_cookie) {
- PARA_NOTICE_LOG("received invalid cookie(got %u, expected %u)\n",
+ PARA_NOTICE_LOG("received invalid cookie (got %u, expected %u)\n",
(unsigned)cookie, (unsigned)expected_cookie);
- return;
+ return 1;
}
query_shmid = *(int *)(buf + sizeof(cookie));
if (query_shmid < 0) {
PARA_WARNING_LOG("received invalid query shmid %d)\n",
query_shmid);
- return;
+ return 1;
}
ret = call_callback(fd, query_shmid);
if (ret >= 0)
- return;
+ return 1;
err:
PARA_NOTICE_LOG("%s\n", para_strerror(-ret));
+ return 1;
}
/** Shutdown connection if query has not arrived until this many seconds. */
struct afs_client *client, *tmp;
int fd, ret;
- if (FD_ISSET(server_socket, &s->rfds)) {
- ret = execute_server_command();
- if (ret < 0) {
- PARA_EMERG_LOG("%s\n", para_strerror(-ret));
- sched_shutdown();
- return;
- }
+ ret = execute_server_command(&s->rfds);
+ if (ret < 0) {
+ PARA_EMERG_LOG("%s\n", para_strerror(-ret));
+ sched_shutdown();
+ return;
}
-
/* Check the list of connected clients. */
list_for_each_entry_safe(client, tmp, &afs_client_list, node) {
- if (FD_ISSET(client->fd, &s->rfds))
- execute_afs_command(client->fd, ct->cookie);
- else { /* prevent bogus connection flooding */
+ ret = execute_afs_command(client->fd, &s->rfds, ct->cookie);
+ if (ret == 0) { /* prevent bogus connection flooding */
struct timeval diff;
tv_diff(now, &client->connect_time, &diff);
if (diff.tv_sec < AFS_CLIENT_TIMEOUT)
free(client);
}
/* Accept connections on the local socket. */
- if (!FD_ISSET(ct->fd, &s->rfds))
- return;
- ret = para_accept(ct->fd, &unix_addr, sizeof(unix_addr));
- if (ret < 0) {
+ ret = para_accept(ct->fd, &s->rfds, &unix_addr, sizeof(unix_addr), &fd);
+ if (ret < 0)
PARA_NOTICE_LOG("%s\n", para_strerror(-ret));
+ if (ret <= 0)
return;
- }
- fd = ret;
ret = mark_fd_nonblocking(fd);
if (ret < 0) {
PARA_NOTICE_LOG("%s\n", para_strerror(-ret));
{
int ret = -E_AUDIOC_SYNTAX, fd;
char *cf, *buf = NULL, *args;
- size_t bufsize, loaded = 0;
+ size_t bufsize;
if (audioc_cmdline_parser(argc, argv, &conf))
goto out;
args = conf.inputs_num?
concat_args(conf.inputs_num, conf.inputs) :
para_strdup("stat");
- bufsize = conf.bufsize_arg;
- buf = para_malloc(bufsize);
- if (conf.socket_given) {
+ if (conf.socket_given)
ret = create_remote_socket(conf.socket_arg);
- } else {
- char *hn = para_hostname(),
- *socket_name = make_message("/var/paraslash/audiod_socket.%s", hn);
-
+ else {
+ char *hn = para_hostname(), *socket_name = make_message(
+ "/var/paraslash/audiod_socket.%s", hn);
ret = create_remote_socket(socket_name);
free(hn);
free(socket_name);
}
- if (ret < 0)
+ if (ret < 0) {
+ PARA_EMERG_LOG("failed to create remote socket\n");
goto out;
+ }
fd = ret;
- ret = mark_fd_nonblocking(fd);
- if (ret < 0)
- goto out;
- ret = mark_fd_nonblocking(STDOUT_FILENO);
- if (ret < 0)
- goto out;
ret = send_cred_buffer(fd, args);
if (ret < 0)
goto out;
- for (;;) {
- int max_fileno = -1, check_write = 0;
- ssize_t len;
- fd_set rfd, wfd;
- FD_ZERO(&rfd);
- FD_ZERO(&wfd);
- if (loaded < bufsize)
- para_fd_set(fd, &rfd, &max_fileno);
- if (loaded > 0) {
- para_fd_set(STDOUT_FILENO, &wfd, &max_fileno);
- check_write = 1;
- }
- ret = -E_AUDIOC_OVERRUN;
- if (max_fileno < 0)
- goto out;
- ret = para_select(max_fileno + 1, &rfd, &wfd, NULL);
- if (ret < 0)
- goto out;
- if (loaded < bufsize && FD_ISSET(fd, &rfd)) {
- len = recv_bin_buffer(fd, buf + loaded,
- bufsize - loaded);
- if (len <= 0) {
- ret = len < 0? -E_AUDIOC_READ : 0;
- goto out;
- }
- loaded += len;
- }
- if (check_write && FD_ISSET(STDOUT_FILENO, &wfd)) {
- ret = write(STDOUT_FILENO, buf, loaded);
- if (ret < 0) {
- ret = -E_AUDIOC_WRITE;
- goto out;
- }
- loaded -= ret;
- if (loaded && ret)
- memmove(buf, buf + ret, loaded);
- }
- }
+ bufsize = conf.bufsize_arg;
+ buf = para_malloc(bufsize);
+ do {
+ size_t n = ret = recv_bin_buffer(fd, buf, bufsize);
+ if (ret <= 0)
+ break;
+ ret = write_all(STDOUT_FILENO, buf, &n);
+ } while (ret >= 0);
out:
- if (!ret && loaded && buf)
- ret = write(STDOUT_FILENO, buf, loaded);
if (ret < 0)
PARA_ERROR_LOG("%s\n", para_strerror(-ret));
return ret < 0? EXIT_FAILURE : EXIT_SUCCESS;
para_fd_set(st->fd, &s->rfds, &s->max_fileno);
}
-static void signal_post_select(struct sched *s, struct task *t)
+static void signal_post_select(struct sched *s, __a_unused struct task *t)
{
- struct signal_task *st = container_of(t, struct signal_task, task);
-
- if (!FD_ISSET(st->fd, &s->rfds))
- return;
+ int signum;
- st->signum = para_next_signal();
- switch (st->signum) {
+ signum = para_next_signal(&s->rfds);
+ switch (signum) {
case SIGINT:
case SIGTERM:
case SIGHUP:
- PARA_EMERG_LOG("terminating on signal %d\n", st->signum);
+ PARA_EMERG_LOG("terminating on signal %d\n", signum);
clean_exit(EXIT_FAILURE, "caught deadly signal");
}
}
last_status_dump = *now;
}
- if (!FD_ISSET(ct->fd, &s->rfds))
- return;
- ret = handle_connect(ct->fd);
+ ret = handle_connect(ct->fd, &s->rfds);
if (ret < 0)
PARA_ERROR_LOG("%s\n", para_strerror(-ret));
audiod_status_dump();
extern int audiod_status;
void __noreturn clean_exit(int status, const char *msg);
-int handle_connect(int accept_fd);
+int handle_connect(int accept_fd, fd_set *rfds);
void audiod_status_dump(void);
char *get_time_string(int slot_num);
struct btr_node *audiod_get_btr_root(void);
}
/**
- * handle arriving connections on the local socket
+ * Handle arriving connections on the local socket.
*
- * \param accept_fd the fd to call accept() on
+ * \param accept_fd The fd to accept connections on.
+ * \param rfds If \a accept_fd is not set in \a rfds, do nothing.
*
- * This is called whenever para_audiod's main task detects an incoming
- * connection by the readability of \a accept_fd. This function reads the
- * command sent by the peer, checks the connecting user's permissions by using
- * unix socket credentials (if supported by the OS) and calls the corresponding
- * command handler if permissions are OK.
+ * This is called in each iteration of the select loop. If there is an incoming
+ * connection on \a accept_fd, this function reads the command sent by the peer,
+ * checks the connecting user's permissions by using unix socket credentials
+ * (if supported by the OS) and calls the corresponding command handler if
+ * permissions are OK.
*
- * \return positive on success, negative on errors
+ * \return Positive on success, negative on errors, zero if there was no
+ * connection to accept.
*
* \sa para_accept(), recv_cred_buffer()
* */
-int handle_connect(int accept_fd)
+int handle_connect(int accept_fd, fd_set *rfds)
{
- int i, argc, ret, clifd = -1;
+ int i, argc, ret, clifd;
char buf[MAXLINE], **argv = NULL;
struct sockaddr_un unix_addr;
uid_t uid;
- ret = para_accept(accept_fd, &unix_addr, sizeof(struct sockaddr_un));
- if (ret < 0)
- goto out;
- clifd = ret;
+ ret = para_accept(accept_fd, rfds, &unix_addr, sizeof(struct sockaddr_un), &clifd);
+ if (ret <= 0)
+ return ret;
ret = recv_cred_buffer(clifd, buf, sizeof(buf) - 1);
if (ret < 0)
goto out;
}
}
-static ssize_t client_recv_buffer(struct client_task *ct, char *buf, size_t len)
+static int client_recv_buffer(struct client_task *ct, fd_set *rfds,
+ char *buf, size_t sz, size_t *n)
{
- ssize_t ret;
+ int ret;
if (ct->status < CL_SENT_CH_RESPONSE)
- ret = recv_buffer(ct->rc4c.fd, buf, len);
- else
- ret = rc4_recv_buffer(&ct->rc4c, buf, len);
+ return read_nonblock(ct->rc4c.fd, buf, sz, rfds, n);
+
+ *n = 0;
+ ret = rc4_recv_buffer(&ct->rc4c, buf, sz);
+ /*
+ * rc4_recv_buffer is used with blocking fds elsewhere, so it
+ * does not use the nonblock-API. Therefore we need to
+ * check for EOF and EAGAIN.
+ */
if (ret == 0)
return -E_SERVER_EOF;
- return ret;
+ if (ret == -ERRNO_TO_PARA_ERROR(EAGAIN))
+ return 0;
+ if (ret < 0)
+ return ret;
+ *n = ret;
+ return 0;
}
/**
struct client_task *ct = container_of(t, struct client_task, task);
struct btr_node *btrn = ct->btrn;
int ret = 0;
+ size_t n;
char buf[CLIENT_BUFSIZE];
t->error = 0;
return;
switch (ct->status) {
case CL_CONNECTED: /* receive welcome message */
- if (!FD_ISSET(ct->rc4c.fd, &s->rfds))
- return;
- ret = client_recv_buffer(ct, buf, sizeof(buf));
- if (ret < 0)
- goto err;
+ ret = client_recv_buffer(ct, &s->rfds, buf, sizeof(buf), &n);
+ if (ret < 0 || n == 0)
+ goto out;
ct->status = CL_RECEIVED_WELCOME;
return;
case CL_RECEIVED_WELCOME: /* send auth command */
/* the SHA1 of the decrypted challenge */
unsigned char challenge_sha1[HASH_SIZE];
- if (!FD_ISSET(ct->rc4c.fd, &s->rfds))
- return;
- ret = client_recv_buffer(ct, buf, sizeof(buf));
- if (ret < 0)
- goto err;
- PARA_INFO_LOG("<-- [challenge] (%d bytes)\n", ret);
+ ret = client_recv_buffer(ct, &s->rfds, buf, sizeof(buf), &n);
+ if (ret < 0 || n == 0)
+ goto out;
+ PARA_INFO_LOG("<-- [challenge] (%zu bytes)\n", n);
ret = para_decrypt_buffer(ct->key_file, crypt_buf,
- (unsigned char *)buf, ret);
+ (unsigned char *)buf, n);
if (ret < 0)
goto err;
sha1_hash((char *)crypt_buf, CHALLENGE_SIZE, challenge_sha1);
}
case CL_SENT_CH_RESPONSE: /* read server response */
{
- size_t bytes_received;
- if (!FD_ISSET(ct->rc4c.fd, &s->rfds))
- return;
- ret = client_recv_buffer(ct, buf, sizeof(buf));
- if (ret < 0)
- goto err;
- bytes_received = ret;
+ ret = client_recv_buffer(ct, &s->rfds, buf, sizeof(buf), &n);
+ if (ret < 0 || n == 0)
+ goto out;
/* check if server has sent "Proceed" message */
ret = -E_CLIENT_AUTH;
- if (bytes_received < PROCEED_MSG_LEN)
- goto err;
+ if (n < PROCEED_MSG_LEN)
+ goto out;
if (!strstr(buf, PROCEED_MSG))
- goto err;
+ goto out;
ct->status = CL_RECEIVED_PROCEED;
return;
}
case CL_SENT_COMMAND:
{
char *buf2;
- if (!FD_ISSET(ct->rc4c.fd, &s->rfds))
- return;
/* can not use "buf" here because we need a malloced buffer */
buf2 = para_malloc(CLIENT_BUFSIZE);
- ret = client_recv_buffer(ct, buf2, CLIENT_BUFSIZE);
- if (ret < 0) {
+ ret = client_recv_buffer(ct, &s->rfds, buf2, CLIENT_BUFSIZE, &n);
+ if (n > 0) {
+ if (strstr(buf2, AWAITING_DATA_MSG)) {
+ free(buf2);
+ ct->status = CL_SENDING;
+ return;
+ }
+ ct->status = CL_RECEIVING;
+ btr_add_output(buf2, n, btrn);
+ } else
free(buf2);
- goto err;
- }
- if (strstr(buf2, AWAITING_DATA_MSG)) {
- free(buf2);
- ct->status = CL_SENDING;
- return;
- }
- ct->status = CL_RECEIVING;
- btr_add_output(buf2, ret, btrn);
- return;
+ goto out;
}
case CL_SENDING:
{
goto err;
if (ret == 0)
return;
+ /*
+ * The FD_ISSET() is not strictly necessary, but is allows us
+ * to skip the malloc below if there is nothing to read anyway.
+ */
if (!FD_ISSET(ct->rc4c.fd, &s->rfds))
return;
buf2 = para_malloc(CLIENT_BUFSIZE);
- ret = client_recv_buffer(ct, buf2, CLIENT_BUFSIZE);
- if (ret < 0) {
+ ret = client_recv_buffer(ct, &s->rfds, buf2, CLIENT_BUFSIZE, &n);
+ if (n > 0) {
+ buf2 = para_realloc(buf2, n);
+ btr_add_output(buf2, n, btrn);
+ } else
free(buf2);
- goto err;
- }
- buf2 = para_realloc(buf2, ret);
- btr_add_output(buf2, ret, btrn);
- return;
+ goto out;
}
}
err:
+out:
t->error = ret;
if (ret < 0) {
if (ret != -E_SERVER_EOF && ret != -E_BTR_EOF)
struct btr_node *btrn = rn->btrn;
struct iovec iov[2];
int ret, iovcnt;
+ size_t num_bytes;
ret = btr_node_status(btrn, 0, BTR_NT_ROOT);
- if (ret < 0)
- goto err;
- if (ret == 0)
- return;
- if (!FD_ISSET(pdd->fd, &s->rfds))
- return; /* nothing to do */
+ if (ret <= 0)
+ goto out;
iovcnt = btr_pool_get_buffers(pdd->btrp, iov);
ret = -E_DCCP_OVERRUN;
if (iovcnt == 0)
- goto err;
- ret = para_readv(pdd->fd, iov, iovcnt);
- /* EAGAIN is possible even if FD_ISSET */
- if (ret < 0 && is_errno(-ret, EAGAIN))
- return;
- if (ret == 0)
- ret = -E_RECV_EOF;
- if (ret < 0)
- goto err;
- if (ret <= iov[0].iov_len) /* only the first buffer was filled */
- btr_add_output_pool(pdd->btrp, ret, btrn);
+ goto out;
+ ret = readv_nonblock(pdd->fd, iov, iovcnt, &s->rfds, &num_bytes);
+ if (num_bytes == 0)
+ goto out;
+ if (num_bytes <= iov[0].iov_len) /* only the first buffer was filled */
+ btr_add_output_pool(pdd->btrp, num_bytes, btrn);
else { /* both buffers contain data */
btr_add_output_pool(pdd->btrp, iov[0].iov_len, btrn);
- btr_add_output_pool(pdd->btrp, ret - iov[0].iov_len, btrn);
+ btr_add_output_pool(pdd->btrp, num_bytes - iov[0].iov_len, btrn);
}
- return;
-err:
+out:
+ if (ret >= 0)
+ return;
btr_remove_node(rn->btrn);
t->error = ret;
}
struct sender_client *sc;
int tx_ccid;
- if (dss->listen_fd < 0 || !FD_ISSET(dss->listen_fd, rfds))
- return;
- sc = accept_sender_client(dss);
+ sc = accept_sender_client(dss, rfds);
if (!sc)
return;
PARA_ERROR(SENDMSG, "sendmsg() failed"), \
PARA_ERROR(RECVMSG, "recvmsg() failed"), \
PARA_ERROR(SCM_CREDENTIALS, "did not receive SCM credentials"), \
- PARA_ERROR(RECV_PATTERN, "did not receive expected pattern"), \
#define UDP_RECV_ERRORS \
#define FD_ERRORS \
PARA_ERROR(FGETS, "fgets error"), \
+ PARA_ERROR(EOF, "end of file"), \
+ PARA_ERROR(READ_PATTERN, "did not read expected pattern"), \
#define WRITE_ERRORS \
#include <dirent.h>
#include <sys/mman.h>
#include <fcntl.h>
-#include <sys/select.h>
#include <sys/uio.h>
#include "para.h"
#include "error.h"
#include "string.h"
+#include "fd.h"
/**
* Write a buffer to a file descriptor, re-write on short writes.
}
/**
- * Simple wrapper for readv().
+ * Read from a non-blocking file descriptor into multiple buffers.
*
* \param fd The file descriptor to read from.
* \param iov Scatter/gather array used in readv().
* \param iovcnt Number of elements in \a iov.
+ * \param rfds An optional fd set pointer.
+ * \param num_bytes Result pointer. Contains the number of bytes read from \a fd.
+ *
+ * If \a rfds is not \p NULL and the (non-blocking) file descriptor \a fd is
+ * not set in \a rfds, this function returns early without doing anything.
+ * Otherwise The function tries to read up to \a sz bytes from \a fd. As for
+ * write_nonblock(), EAGAIN is not considered an error condition. However, EOF
+ * is.
+ *
+ * \return Zero or a negative error code. If the underlying call to readv(2)
+ * returned zero (indicating an end of file condition) or failed for some
+ * reason other than \p EAGAIN, a negative return value is returned.
+ *
+ * In any case, \a num_bytes contains the number of bytes that have been
+ * successfully read from \a fd (zero if the first readv() call failed with
+ * EAGAIN). Note that even if the function returns negative, some data might
+ * have been read before the error occured. In this case \a num_bytes is
+ * positive.
+ *
+ * \sa \ref write_nonblock(), read(2), readv(2).
+ */
+int readv_nonblock(int fd, struct iovec *iov, int iovcnt, fd_set *rfds,
+ size_t *num_bytes)
+{
+ int ret, i, j;
+
+ *num_bytes = 0;
+ /*
+ * Avoid a shortcoming of select(): Reads from a non-blocking fd might
+ * return EAGAIN even if FD_ISSET() returns true. However, FD_ISSET()
+ * returning false definitely means that no data can currently be read.
+ * This is the common case, so it is worth to avoid the overhead of the
+ * read() system call in this case.
+ */
+ if (rfds && !FD_ISSET(fd, rfds))
+ return 0;
+
+ for (i = 0, j = 0; i < iovcnt;) {
+
+ /* fix up the first iov */
+ assert(j < iov[i].iov_len);
+ iov[i].iov_base += j;
+ iov[i].iov_len -= j;
+ ret = readv(fd, iov + i, iovcnt - i);
+ iov[i].iov_base -= j;
+ iov[i].iov_len += j;
+
+ if (ret == 0)
+ return -E_EOF;
+ if (ret < 0) {
+ if (errno == EAGAIN)
+ return 0;
+ return -ERRNO_TO_PARA_ERROR(errno);
+ }
+ *num_bytes += ret;
+ while (ret > 0) {
+ if (ret < iov[i].iov_len - j) {
+ j += ret;
+ break;
+ }
+ ret -= iov[i].iov_len - j;
+ j = 0;
+ if (++i >= iovcnt)
+ break;
+ }
+ }
+ return 0;
+}
+
+/**
+ * Read from a non-blocking file descriptor into a single buffer.
+ *
+ * \param fd The file descriptor to read from.
+ * \param buf The buffer to read data to.
+ * \param sz The size of \a buf.
+ * \param rfds \see \ref readv_nonblock().
+ * \param num_bytes \see \ref readv_nonblock().
+ *
+ * This is a simple wrapper for readv_nonblock() which uses an iovec with a single
+ * buffer.
+ *
+ * \return The return value of the underlying call to readv_nonblock().
+ */
+int read_nonblock(int fd, void *buf, size_t sz, fd_set *rfds, size_t *num_bytes)
+{
+ struct iovec iov = {.iov_base = buf, .iov_len = sz};
+ return readv_nonblock(fd, &iov, 1, rfds, num_bytes);
+}
+
+/**
+ * Read a buffer and check its content for a pattern.
+ *
+ * \param fd The file descriptor to receive from.
+ * \param pattern The expected pattern.
+ * \param bufsize The size of the internal buffer.
+ * \param rfds Passed to read_nonblock().
*
- * \return A negative error code on errors, the return value of the underlying
- * call to readv() otherwise.
+ * This function tries to read at most \a bufsize bytes from the non-blocking
+ * file descriptor \a fd. If at least \p strlen(\a pattern) bytes have been
+ * received, the beginning of the received buffer is compared with \a pattern,
+ * ignoring case.
*
- * \sa readv(2).
+ * \return Positive if \a pattern was received, negative on errors, zero if no data
+ * was available to read.
+ *
+ * \sa \ref read_nonblock(), \sa strncasecmp(3).
*/
-int para_readv(int fd, struct iovec *iov, int iovcnt)
+int read_pattern(int fd, const char *pattern, size_t bufsize, fd_set *rfds)
{
- int ret = readv(fd, iov, iovcnt);
+ size_t n, len;
+ char *buf = para_malloc(bufsize + 1);
+ int ret = read_nonblock(fd, buf, bufsize, rfds, &n);
+ buf[n] = '\0';
if (ret < 0)
- return -ERRNO_TO_PARA_ERROR(errno);
+ goto out;
+ ret = 0;
+ if (n == 0)
+ goto out;
+ ret = -E_READ_PATTERN;
+ len = strlen(pattern);
+ if (n < len)
+ goto out;
+ if (strncasecmp(buf, pattern, len) != 0)
+ goto out;
+ ret = 1;
+out:
+ if (ret < 0) {
+ PARA_NOTICE_LOG("%s\n", para_strerror(-ret));
+ PARA_NOTICE_LOG("recvd %zu bytes: %s\n", n, buf);
+ }
+ free(buf);
return ret;
}
/** \file fd.h exported symbols from fd.c */
int write_all(int fd, const char *buf, size_t *len);
-int para_readv(int fd, struct iovec *iov, int iovcnt);
int file_exists(const char *);
int para_select(int n, fd_set *readfds, fd_set *writefds,
struct timeval *timeout_tv);
int para_munmap(void *start, size_t length);
int write_ok(int fd);
void valid_fd_012(void);
+int readv_nonblock(int fd, struct iovec *iov, int iovcnt, fd_set *rfds,
+ size_t *num_bytes);
+int read_nonblock(int fd, void *buf, size_t sz, fd_set *rfds, size_t *num_bytes);
+int read_pattern(int fd, const char *pattern, size_t bufsize, fd_set *rfds);
int write_nonblock(int fd, const char *buf, size_t len,
size_t max_bytes_per_write);
int for_each_file_in_dir(const char *dirname,
static pid_t cmd_pid;
static int command_pipe = -1;
-static int audiod_pipe = -1;
+static int stat_pipe = -1;
static struct gui_args_info conf;
enum {GETCH_MODE, COMMAND_MODE, EXTERNAL_MODE};
}
};
-static int para_open_audiod_pipe(char *cmd)
-{
- int fds[3] = {0, 1, 0};
- pid_t pid;
- int ret = para_exec_cmdline_pid(&pid, cmd, fds);
- if (ret < 0)
- return ret;
- ret = mark_fd_nonblocking(fds[1]);
- if (ret > 0)
- return fds[1];
- close(fds[1]);
- return ret;
-}
-
static int find_cmd_byname(char *name)
{
int i;
return 1;
}
-static int read_audiod_pipe(int fd)
+static int read_stat_pipe(fd_set *rfds)
{
static char *buf;
static int bufsize, loaded;
- int ret;
+ int ret, ret2;
+ size_t sz;
+ if (stat_pipe < 0)
+ return 0;
if (loaded >= bufsize) {
if (bufsize > 1000 * 1000) {
loaded = 0;
buf = para_realloc(buf, bufsize);
}
assert(loaded < bufsize);
- ret = read(fd, buf + loaded, bufsize - loaded);
- if (ret <= 0)
- return ret;
- loaded += ret;
- ret = for_each_stat_item(buf, loaded, update_item);
- if (ret < 0)
- return ret;
- if (ret > 0 && ret < loaded)
- memmove(buf, buf + loaded - ret, ret);
- loaded = ret;
+ ret = read_nonblock(stat_pipe, buf + loaded, bufsize - loaded,
+ rfds, &sz);
+ loaded += sz;
+ ret2 = for_each_stat_item(buf, loaded, update_item);
+ if (ret < 0 || ret2 < 0) {
+ loaded = 0;
+ return ret2 < 0? ret2 : ret;
+ }
+ sz = ret2; /* what is left */
+ if (sz > 0 && sz < loaded)
+ memmove(buf, buf + loaded - sz, sz);
+ loaded = sz;
return 1;
}
}
}
-static int open_audiod_pipe(void)
+static int open_stat_pipe(void)
{
static int init = 1;
+ int ret, fds[3] = {0, 1, 0};
+ pid_t pid;
if (init)
init = 0;
else
sleep(1);
- return para_open_audiod_pipe(conf.stat_cmd_arg);
+ ret = para_exec_cmdline_pid(&pid, conf.stat_cmd_arg, fds);
+ if (ret < 0)
+ return ret;
+ ret = mark_fd_nonblocking(fds[1]);
+ if (ret >= 0)
+ return fds[1];
+ close(fds[1]);
+ return ret;
}
/*
{
fd_set rfds;
int ret;
- int max_fileno, cp_numread = 1;
+ int max_fileno;
char command_buf[4096] = "";
int cbo = 0; /* command buf offset */
struct timeval tv;
// ret = refresh_status();
FD_ZERO(&rfds);
max_fileno = 0;
- /* audiod pipe */
- if (audiod_pipe < 0)
- audiod_pipe = open_audiod_pipe();
- if (audiod_pipe >= 0)
- para_fd_set(audiod_pipe, &rfds, &max_fileno);
+ if (stat_pipe < 0)
+ stat_pipe = open_stat_pipe();
+ if (stat_pipe >= 0)
+ para_fd_set(stat_pipe, &rfds, &max_fileno);
/* signal pipe */
para_fd_set(signal_pipe, &rfds, &max_fileno);
/* command pipe only for COMMAND_MODE */
if (ret <= 0)
goto check_return; /* skip fd checks */
/* signals */
- if (FD_ISSET(signal_pipe, &rfds)) {
- int sig_nr = para_next_signal();
- if (sig_nr > 0)
- handle_signal(sig_nr);
- }
+ ret = para_next_signal(&rfds);
+ if (ret > 0)
+ handle_signal(ret);
/* read command pipe if ready */
- if (command_pipe >= 0 && mode == COMMAND_MODE &&
- FD_ISSET(command_pipe, &rfds)) {
- cp_numread = read(command_pipe, command_buf + cbo,
- sizeof(command_buf) - 1 - cbo);
- if (cp_numread >= 0)
- cbo += cp_numread;
- else {
- if (cp_numread < 0)
- PARA_ERROR_LOG("read error (%d)", cp_numread);
+ if (command_pipe >= 0 && mode == COMMAND_MODE) {
+ size_t sz;
+ ret = read_nonblock(command_pipe, command_buf + cbo,
+ sizeof(command_buf) - 1 - cbo, &rfds, &sz);
+ cbo += sz;
+ sz = cbo;
+ cbo = for_each_line(command_buf, cbo, &add_output_line, NULL);
+ if (sz != cbo)
+ wrefresh(bot.win);
+ if (ret < 0) {
+ PARA_NOTICE_LOG("closing command pipe: %s",
+ para_strerror(-ret));
close(command_pipe);
command_pipe = -1;
+ return 0;
}
}
- if (audiod_pipe >= 0 && FD_ISSET(audiod_pipe, &rfds))
- if (read_audiod_pipe(audiod_pipe) <= 0) {
- close(audiod_pipe);
- audiod_pipe = -1;
- clear_all_items();
- free(stat_content[SI_BASENAME]);
- stat_content[SI_BASENAME] =
- para_strdup("audiod not running!?");
- print_all_items();
- }
+ ret = read_stat_pipe(&rfds);
+ if (ret < 0) {
+ PARA_NOTICE_LOG("closing stat pipe: %s\n", para_strerror(-ret));
+ close(stat_pipe);
+ stat_pipe = -1;
+ clear_all_items();
+ free(stat_content[SI_BASENAME]);
+ stat_content[SI_BASENAME] =
+ para_strdup("stat command terminated!?");
+ print_all_items();
+ }
check_return:
switch (mode) {
case COMMAND_MODE:
- if (cp_numread <= 0 && !cbo) /* command complete */
- return 0;
- if (cbo)
- cbo = for_each_line(command_buf, cbo,
- &add_output_line, NULL);
- if (cp_numread <= 0)
- cbo = 0;
- wrefresh(bot.win);
ret = wgetch(top.win);
if (ret != ERR && ret != KEY_RESIZE) {
if (command_pipe) {
struct btr_node *btrn = rn->btrn;
int ret;
char *buf;
- size_t sz;
+ size_t sz, n;
t->error = 0;
ret = btr_node_status(btrn, 0, BTR_NT_ROOT);
phd->status = HTTP_SENT_GET_REQUEST;
return;
}
- if (!FD_ISSET(phd->fd, &s->rfds))
- return;
if (phd->status == HTTP_SENT_GET_REQUEST) {
- ret = recv_pattern(phd->fd, HTTP_OK_MSG, strlen(HTTP_OK_MSG));
+ ret = read_pattern(phd->fd, HTTP_OK_MSG, strlen(HTTP_OK_MSG), &s->rfds);
if (ret < 0)
goto err;
+ if (ret == 0)
+ return;
PARA_INFO_LOG("received ok msg, streaming\n");
phd->status = HTTP_STREAMING;
return;
sz = btr_pool_get_buffer(phd->btrp, &buf);
if (sz == 0)
goto err;
- ret = recv_bin_buffer(phd->fd, buf, sz);
- if (ret == 0)
- ret = -E_RECV_EOF;
- if (ret < 0)
- goto err;
- btr_add_output_pool(phd->btrp, ret, btrn);
- return;
+ ret = read_nonblock(phd->fd, buf, sz, &s->rfds, &n);
+ if (n > 0)
+ btr_add_output_pool(phd->btrp, n, btrn);
+ if (ret >= 0)
+ return;
err:
btr_remove_node(rn->btrn);
t->error = ret;
{
struct sender_client *sc, *tmp;
struct private_http_sender_data *phsd;
+ int ret;
- if (hss->listen_fd < 0)
- return;
list_for_each_entry_safe(sc, tmp, &hss->client_list, node) {
phsd = sc->private_data;
switch (phsd->status) {
case HTTP_STREAMING: /* nothing to do */
break;
case HTTP_CONNECTED: /* need to recv get request */
- if (FD_ISSET(sc->fd, rfds)) {
- if (recv_pattern(sc->fd, HTTP_GET_MSG, MAXLINE)
- < 0) {
- phsd->status = HTTP_INVALID_GET_REQUEST;
- } else {
- phsd->status = HTTP_GOT_GET_REQUEST;
- PARA_INFO_LOG("received get request\n");
- }
+ ret = read_pattern(sc->fd, HTTP_GET_MSG, MAXLINE, rfds);
+ if (ret < 0)
+ phsd->status = HTTP_INVALID_GET_REQUEST;
+ else if (ret > 0) {
+ phsd->status = HTTP_GOT_GET_REQUEST;
+ PARA_INFO_LOG("received get request\n");
}
break;
case HTTP_GOT_GET_REQUEST: /* need to send ok msg */
break;
}
}
- if (!FD_ISSET(hss->listen_fd, rfds))
- return;
- sc = accept_sender_client(hss);
+ sc = accept_sender_client(hss, rfds);
if (!sc)
return;
phsd = para_malloc(sizeof(*phsd));
* Wrapper around the accept system call.
*
* \param fd The listening socket.
+ * \param rfds An optional fd_set pointer.
* \param addr Structure which is filled in with the address of the peer socket.
* \param size Should contain the size of the structure pointed to by \a addr.
+ * \param new_fd Result pointer.
*
- * Accept incoming connections on \a addr. Retry if interrupted.
+ * Accept incoming connections on \a addr, retry if interrupted. If \a rfds is
+ * not \p NULL, return 0 if \a fd is not set in \a rfds without calling accept().
*
- * \return The new file descriptor on success, negative on errors.
+ * \return Negative on errors, zero if no connections are present to be accepted,
+ * one otherwise.
*
* \sa accept(2).
*/
-int para_accept(int fd, void *addr, socklen_t size)
+int para_accept(int fd, fd_set *rfds, void *addr, socklen_t size, int *new_fd)
{
- int new_fd;
+ int ret;
+ if (rfds && !FD_ISSET(fd, rfds))
+ return 0;
do
- new_fd = accept(fd, (struct sockaddr *) addr, &size);
- while (new_fd < 0 && errno == EINTR);
- return new_fd < 0? -ERRNO_TO_PARA_ERROR(errno) : new_fd;
+ ret = accept(fd, (struct sockaddr *) addr, &size);
+ while (ret < 0 && errno == EINTR);
+
+ if (ret >= 0) {
+ *new_fd = ret;
+ return 1;
+ }
+ if (errno == EAGAIN || errno == EWOULDBLOCK)
+ return 0;
+ return -ERRNO_TO_PARA_ERROR(errno);
}
/**
return result;
}
#endif /* HAVE_UCRED */
-
-/**
- * Receive a buffer and check for a pattern.
- *
- * \param fd The file descriptor to receive from.
- * \param pattern The expected pattern.
- * \param bufsize The size of the internal buffer.
- *
- * \return Positive if \a pattern was received, negative otherwise.
- *
- * This function tries to receive at most \a bufsize bytes from file descriptor
- * \a fd. If at least \p strlen(\a pattern) bytes were received, the beginning
- * of the received buffer is compared with \a pattern, ignoring case.
- *
- * \sa recv_buffer(), \sa strncasecmp(3).
- */
-int recv_pattern(int fd, const char *pattern, size_t bufsize)
-{
- size_t len = strlen(pattern);
- char *buf = para_malloc(bufsize + 1);
- int ret = -E_RECV_PATTERN, n = recv_buffer(fd, buf, bufsize + 1);
-
- if (n < len)
- goto out;
- if (strncasecmp(buf, pattern, len))
- goto out;
- ret = 1;
-out:
- if (ret < 0) {
- PARA_NOTICE_LOG("did not receive pattern '%s'\n", pattern);
- if (n > 0)
- PARA_NOTICE_LOG("recvd %d bytes: %s\n", n, buf);
- else if (n < 0)
- PARA_NOTICE_LOG("%s\n", para_strerror(-n));
- }
- free(buf);
- return ret;
-}
int recv_bin_buffer(int fd, char *buf, size_t size);
int recv_buffer(int fd, char *buf, size_t size);
-int para_accept(int, void *addr, socklen_t size);
+int para_accept(int fd, fd_set *rfds, void *addr, socklen_t size, int *new_fd);
int create_local_socket(const char *name, struct sockaddr_un *unix_addr,
mode_t mode);
int create_remote_socket(const char *name);
int recv_cred_buffer(int, char *, size_t);
ssize_t send_cred_buffer(int, char*);
-int recv_pattern(int fd, const char *pattern, size_t bufsize);
/**
* Functions and definitions to support \p IPPROTO_DCCP
int generic_com_on(struct sender_status *ss, unsigned protocol);
void generic_com_off(struct sender_status *ss);
char *generic_sender_help(void);
-struct sender_client *accept_sender_client(struct sender_status *ss);
+struct sender_client *accept_sender_client(struct sender_status *ss, fd_set *rfds);
int send_queued_chunks(int fd, struct chunk_queue *cq,
size_t max_bytes_per_write);
int parse_fec_url(const char *arg, struct sender_command_data *scd);
* \sa \ref para_accept(), \ref mark_fd_nonblocking(), \ref acl_check_access(),
* \ref cq_new(), \ref add_close_on_fork_list().
*/
-struct sender_client *accept_sender_client(struct sender_status *ss)
+struct sender_client *accept_sender_client(struct sender_status *ss, fd_set *rfds)
{
struct sender_client *sc;
- int fd, ret = para_accept(ss->listen_fd, NULL, 0);
- if (ret < 0) {
+ int fd, ret;
+
+ if (ss->listen_fd < 0)
+ return NULL;
+ ret = para_accept(ss->listen_fd, rfds, NULL, 0, &fd);
+ if (ret < 0)
PARA_ERROR_LOG("%s\n", para_strerror(-ret));
+ if (ret <= 0)
return NULL;
- }
- fd = ret;
ret = -E_MAX_CLIENTS;
if (ss->max_clients > 0 && ss->num_clients >= ss->max_clients)
goto err_out;
kill(mmd->afs_pid, SIGHUP);
}
-static void signal_post_select(struct sched *s, struct task *t)
+static void signal_post_select(struct sched *s, __a_unused struct task *t)
{
- struct signal_task *st = container_of(t, struct signal_task, task);
+ int signum = para_next_signal(&s->rfds);
- if (!FD_ISSET(st->fd, &s->rfds))
+ switch (signum) {
+ case 0:
return;
-
- st->signum = para_next_signal();
- switch (st->signum) {
case SIGHUP:
handle_sighup();
break;
/* die on sigint/sigterm. Kill all children too. */
case SIGINT:
case SIGTERM:
- PARA_EMERG_LOG("terminating on signal %d\n", st->signum);
+ PARA_EMERG_LOG("terminating on signal %d\n", signum);
kill(0, SIGTERM);
/*
* We must wait for afs because afs catches SIGINT/SIGTERM.
pid_t child_pid;
uint32_t *chunk_table;
- if (!FD_ISSET(sct->listen_fd, &s->rfds))
- return;
- ret = para_accept(sct->listen_fd, NULL, 0);
- if (ret < 0)
+ ret = para_accept(sct->listen_fd, &s->rfds, NULL, 0, &new_fd);
+ if (ret <= 0)
goto out;
- new_fd = ret;
peer_name = remote_name(new_fd);
PARA_INFO_LOG("got connection from %s, forking\n", peer_name);
mmd->num_connects++;
/**
* Return the number of the next pending signal.
*
- * This should be called if the fd for the signal pipe is ready for reading.
+ * \param rfds Th fd_set containing the signal pipe.
*
- * \return On success, the number of the received signal is returned. If the
- * read returned zero or was interrupted by another signal the function returns
- * 0. Otherwise, a negative error value is returned.
+ * \return On success, the number of the received signal is returned. If there
+ * is no signal currently pending, the function returns zero. On read errors
+ * from the signal pipe, the process is terminated.
*/
-int para_next_signal(void)
+int para_next_signal(fd_set *rfds)
{
- int s;
- ssize_t r = read(signal_pipe[0], &s, sizeof(s));
+ size_t n;
+ int s, ret = read_nonblock(signal_pipe[0], &s, sizeof(s), rfds, &n);
- if (!r) {
- PARA_CRIT_LOG("read from signal pipe returned zero\n");
- return 0;
- }
- if (r < 0) {
- if (errno == EAGAIN || errno == EINTR)
- return 0;
- return -ERRNO_TO_PARA_ERROR(errno);
+ if (ret < 0) {
+ PARA_EMERG_LOG("%s\n", para_strerror(-ret));
+ exit(EXIT_FAILURE);
}
- assert(r == sizeof(s));
+ if (n == 0)
+ return 0;
+ assert(n == sizeof(s));
PARA_DEBUG_LOG("next signal: %d\n", s);
return s;
}
struct signal_task {
/** The signal pipe. */
int fd;
- /** The number of the most recent signal. */
- int signum;
/** The associated task structure. */
struct task task;
};
void para_sigaction(int sig, void (*handler)(int));
void para_install_sighandler(int);
int para_reap_child(pid_t *pid);
-int para_next_signal(void);
+int para_next_signal(fd_set *rfds);
void para_signal_shutdown(void);
{
struct stdin_task *sit = container_of(t, struct stdin_task, task);
ssize_t ret;
- size_t sz;
+ size_t sz, n;
char *buf = NULL;
t->error = 0;
goto err;
if (ret == 0)
return;
- if (!FD_ISSET(STDIN_FILENO, &s->rfds))
- return;
sz = btr_pool_get_buffer(sit->btrp, &buf);
if (sz == 0)
return;
* reference can not be freed, we're stuck.
*/
sz = PARA_MIN(sz, btr_pool_size(sit->btrp) / 2);
- ret = read(STDIN_FILENO, buf, sz);
- if (ret < 0)
- ret = -ERRNO_TO_PARA_ERROR(errno);
- if (ret == 0)
- ret = -E_STDIN_EOF;
- if (ret < 0)
- goto err;
- btr_add_output_pool(sit->btrp, ret, sit->btrn);
- return;
+ ret = read_nonblock(STDIN_FILENO, buf, sz, &s->rfds, &n);
+ if (n > 0)
+ btr_add_output_pool(sit->btrp, n, sit->btrn);
+ if (ret >= 0)
+ return;
err:
btr_remove_node(sit->btrn);
//btr_pool_free(sit->btrp);
struct receiver_node *rn = container_of(t, struct receiver_node, task);
struct private_udp_recv_data *purd = rn->private_data;
struct btr_node *btrn = rn->btrn;
- size_t packet_size;
+ size_t num_bytes;
struct iovec iov[2];
- int ret, iovcnt;
+ int ret, readv_ret, iovcnt;
t->error = 0;
ret = btr_node_status(btrn, 0, BTR_NT_ROOT);
- if (ret < 0)
- goto err;
- if (ret == 0)
- return;
- if (!FD_ISSET(purd->fd, &s->rfds))
- return;
+ if (ret <= 0)
+ goto out;
iovcnt = btr_pool_get_buffers(purd->btrp, iov);
ret = -E_UDP_OVERRUN;
if (iovcnt == 0)
- goto err;
- ret = para_readv(purd->fd, iov, iovcnt);
- /* EAGAIN is possible even if FD_ISSET */
- if (ret < 0 && is_errno(-ret, EAGAIN))
- return;
- if (ret == 0)
- ret = -E_RECV_EOF;
+ goto out;
+ ret = readv_nonblock(purd->fd, iov, iovcnt, &s->rfds, &num_bytes);
+ if (num_bytes == 0)
+ goto out;
+ readv_ret = ret;
+ ret = udp_check_eof(num_bytes, iov);
if (ret < 0)
- goto err;
- packet_size = ret;
- ret = udp_check_eof(packet_size, iov);
- if (ret < 0)
- goto err;
- if (iov[0].iov_len >= packet_size)
- btr_add_output_pool(purd->btrp, packet_size, btrn);
+ goto out;
+ if (iov[0].iov_len >= num_bytes)
+ btr_add_output_pool(purd->btrp, num_bytes, btrn);
else { /* both buffers contain data */
btr_add_output_pool(purd->btrp, iov[0].iov_len, btrn);
- btr_add_output_pool(purd->btrp, packet_size - iov[0].iov_len,
+ btr_add_output_pool(purd->btrp, num_bytes - iov[0].iov_len,
btrn);
}
- return;
-err:
+ ret = readv_ret;
+out:
+ if (ret >= 0)
+ return;
btr_remove_node(btrn);
t->error = ret;
close(purd->fd);
return 1;
}
-static void recv_afs_result(struct vss_task *vsst)
+static void recv_afs_result(struct vss_task *vsst, fd_set *rfds)
{
int ret, passed_fd, shmid;
uint32_t afs_code = 0, afs_data = 0;
struct stat statbuf;
- vsst->afsss = AFS_SOCKET_READY;
+ if (!FD_ISSET(vsst->afs_socket, rfds))
+ return;
ret = recv_afs_msg(vsst->afs_socket, &passed_fd, &afs_code, &afs_data);
+ if (ret == -ERRNO_TO_PARA_ERROR(EAGAIN))
+ return;
if (ret < 0)
goto err;
+ vsst->afsss = AFS_SOCKET_READY;
PARA_DEBUG_LOG("fd: %d, code: %u, shmid: %u\n", passed_fd, afs_code,
afs_data);
ret = -E_NOFD;
senders[sender_num].client_cmds[num](&mmd->sender_cmd_data);
mmd->sender_cmd_data.cmd_num = -1;
}
- if (vsst->afsss != AFS_SOCKET_CHECK_FOR_WRITE) {
- if (FD_ISSET(vsst->afs_socket, &s->rfds))
- recv_afs_result(vsst);
- } else if (FD_ISSET(vsst->afs_socket, &s->wfds)) {
+ if (vsst->afsss != AFS_SOCKET_CHECK_FOR_WRITE)
+ recv_afs_result(vsst, &s->rfds);
+ else if (FD_ISSET(vsst->afs_socket, &s->wfds)) {
PARA_NOTICE_LOG("requesting new fd from afs\n");
ret = send_buffer(vsst->afs_socket, "new");
if (ret < 0)