X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=send_common.c;h=250a2a0bc5a80f8973b33d43bcb2e12e3ee723e9;hp=f0ba168957c4a76a7eac9b969c6d71c3f36a0fcf;hb=23b121a85984baa9252f4b4c0b8c4f186e394bb7;hpb=bc8187d6a4ca3191b3c54226ba54e4f0c4cf4e6e diff --git a/send_common.c b/send_common.c index f0ba1689..250a2a0b 100644 --- a/send_common.c +++ b/send_common.c @@ -1,12 +1,14 @@ /* - * Copyright (C) 2005-2008 Andre Noll + * Copyright (C) 2005-2013 Andre Noll * * Licensed under the GPL v2. For licencing details see COPYING. */ /** \file send_common.c Functions used by more than one paraslash sender. */ -#include +#include +#include + #include "para.h" #include "error.h" #include "string.h" @@ -20,6 +22,7 @@ #include "send.h" #include "close_on_fork.h" #include "chunk_queue.h" +#include "sched.h" #include "vss.h" /** Clients will be kicked if there are more than that many bytes pending. */ @@ -28,7 +31,7 @@ /** * Open a passive socket of given layer4 type. * - * Set the resultig file descriptor to nonblocking mode and add it to the list + * Set the resulting file descriptor to nonblocking mode and add it to the list * of fds that are being closed in the child process when the server calls * fork(). * @@ -39,7 +42,7 @@ */ static int open_sender(unsigned l4type, int port) { - int fd, ret = para_listen(AF_UNSPEC, l4type, port); + int fd, ret = para_listen_simple(l4type, port); if (ret < 0) return ret; @@ -54,13 +57,16 @@ static int open_sender(unsigned l4type, int port) } /** - * Shut down a connected client. + * Shut down a client connected to a paraslash sender. * - * \param sc The client to be shut down. + * \param sc The client to shut down. + * \param ss The sender whose clients are to be shut down. * * Close the file descriptor given by \a sc, remove it from the close-on-fork * list, destroy the chunk queue of this client, delete the client from the * list of connected clients and free the sender_client struct. + * + * \sa shutdown_clients(). */ void shutdown_client(struct sender_client *sc, struct sender_status *ss) { @@ -75,6 +81,14 @@ void shutdown_client(struct sender_client *sc, struct sender_status *ss) ss->num_clients--; } +/** + * Shut down all clients connected to a paraslash sender. + * + * \param ss The sender whose clients are to be shut down. + * + * This just loops over all connected clients and calls shutdown_client() + * for each client. + */ void shutdown_clients(struct sender_status *ss) { struct sender_client *sc, *tmp; @@ -83,126 +97,43 @@ void shutdown_clients(struct sender_status *ss) } /** - * Write a buffer to a non-blocking file descriptor. + * Try to empty the chunk queue for this fd. * * \param fd The file descriptor. - * \param buf the buffer to write. - * \param len the number of bytes of \a buf. - * \param max_bytes_per_write Do not write more than that many bytes at once. - * - * If \a max_bytes_per_write is non-zero, do not send more than that many bytes - * per write(). - * - * EAGAIN is not considered an error condition. For example CCID3 has a - * sending wait queue which fills up and is emptied asynchronously. The EAGAIN - * case means that there is currently no space in the wait queue, but this can - * change at any moment. + * \param cq The list of queued chunks. * - * \return Negative on errors, number of bytes written else. + * \return Negative on errors, zero if not everything was sent, one otherwise. */ -static int write_nonblock(int fd, const char *buf, size_t len, - size_t max_bytes_per_write) -{ - size_t written = 0; - int ret = 0; - - while (written < len) { - size_t num = len - written; - - if (max_bytes_per_write && max_bytes_per_write < num) - num = max_bytes_per_write; - ret = write(fd, buf + written, num); - if (ret < 0 && errno == EAGAIN) - return written; - if (ret < 0) - return -ERRNO_TO_PARA_ERROR(errno); - written += ret; - } - return written; -} - -static int queue_chunk_or_shutdown(struct sender_client *sc, - struct sender_status *ss, long unsigned chunk_num, - size_t sent) -{ - int ret = cq_enqueue(sc->cq, chunk_num, sent); - if (ret < 0) - shutdown_client(sc, ss); - return ret; -} - -/* return: negative on errors, zero if not everything was sent, one otherwise */ -static int send_queued_chunks(struct sender_client *sc, - size_t max_bytes_per_write) +int send_queued_chunks(int fd, struct chunk_queue *cq) { struct queued_chunk *qc; - while ((qc = cq_peek(sc->cq))) { - char *buf; + while ((qc = cq_peek(cq))) { + const char *buf; size_t len; int ret; + cq_get(qc, &buf, &len); - ret = write_nonblock(sc->fd, buf, len, max_bytes_per_write); + ret = xwrite(fd, buf, len); if (ret < 0) return ret; - cq_update(sc->cq, ret); + cq_update(cq, ret); if (ret != len) return 0; - cq_dequeue(sc->cq); + cq_dequeue(cq); } return 1; } /** - * Send one chunk of audio data to a connected client. - * - * \param sc The client. - * \param max_bytes_per_write Split writes to chunks of at most that many bytes. - * \param current_chunk The number of the chunk to write. - * \param buf The data to write. - * \param len The number of bytes of \a buf. + * Initialize a struct sender status. * - * On errors, the client is shut down. If only a part of the buffer could be - * written, the remainder is put into the chunk queue for that client. + * \param ss The struct to initialize. + * \param access_arg The array of access arguments given at the command line. + * \param num_access_args The number of elements in \a access_arg. + * \param port The tcp or dccp port to listen on. + * \param max_clients The maximal number of simultaneous connections. + * \param default_deny Whether a blacklist should be used for access control. */ -void send_chunk(struct sender_client *sc, struct sender_status *ss, - size_t max_bytes_per_write, long unsigned current_chunk, - const char *buf, size_t len) -{ - int ret; - - if (!sc->header_sent && current_chunk) { - size_t header_len; - char *header_buf = vss_get_header(&header_len); - if (header_buf && header_len > 0) { - ret = queue_chunk_or_shutdown(sc, ss, -1U, 0); - if (ret < 0) - goto out; - } - sc->header_sent = 1; - } - ret = send_queued_chunks(sc, max_bytes_per_write); - if (ret < 0) { - shutdown_client(sc, ss); - goto out; - } - if (!len) - goto out; - if (!ret) { /* still data left in the queue */ - ret = queue_chunk_or_shutdown(sc, ss, current_chunk, 0); - goto out; - } - ret = write_nonblock(sc->fd, buf, len, max_bytes_per_write); - if (ret < 0) { - shutdown_client(sc, ss); - goto out; - } - if (ret != len) - ret = queue_chunk_or_shutdown(sc, ss, current_chunk, ret); -out: - if (ret < 0) - PARA_NOTICE_LOG("%s\n", para_strerror(-ret)); -} - void init_sender_status(struct sender_status *ss, char **access_arg, int num_access_args, int port, int max_clients, int default_deny) { @@ -215,7 +146,15 @@ void init_sender_status(struct sender_status *ss, char **access_arg, ss->default_deny = default_deny; } -char *get_sender_info(struct sender_status *ss, char *name) +/** + * Return a string containing the current status of a sender. + * + * \param ss The sender. + * \param name Used for printing the header line. + * + * \return The string printed in the "si" command. + */ +char *get_sender_info(struct sender_status *ss, const char *name) { char *clnts = NULL, *ret; struct sender_client *sc, *tmp_sc; @@ -229,14 +168,14 @@ char *get_sender_info(struct sender_status *ss, char *name) ret = make_message( "%s sender:\n" "\tstatus: %s\n" - "\tport: %d\n" + "\tport: %s\n" "\tnumber of connected clients: %d\n" "\tmaximal number of clients: %d%s\n" "\tconnected clients: %s\n" "\taccess %s list: %s\n", name, (ss->listen_fd >= 0)? "on" : "off", - ss->port, + stringify_port(ss->port, strcmp(name, "http") ? "dccp" : "tcp"), ss->num_clients, ss->max_clients, ss->max_clients > 0? "" : " (unlimited)", @@ -249,18 +188,42 @@ char *get_sender_info(struct sender_status *ss, char *name) return ret; } +/** + * Allow connections from the given range of IP addresses. + * + * \param scd Contains the IP and the netmask. + * \param ss The sender. + * + * \sa generic_com_deny(). + */ void generic_com_allow(struct sender_command_data *scd, struct sender_status *ss) { - acl_allow(scd->addr, scd->netmask, &ss->acl, ss->default_deny); + acl_allow(scd->host, scd->netmask, &ss->acl, ss->default_deny); } +/** + * Deny connections from the given range of IP addresses. + * + * \param scd see \ref generic_com_allow(). + * \param ss see \ref generic_com_allow(). + * + * \sa generic_com_allow(). + */ void generic_com_deny(struct sender_command_data *scd, struct sender_status *ss) { - acl_deny(scd->addr, scd->netmask, &ss->acl, ss->default_deny); + acl_deny(scd->host, scd->netmask, &ss->acl, ss->default_deny); } +/** + * Activate a paraslash sender. + * + * \param ss The sender to activate. + * \param protocol The symbolic name of the transport-layer protocol. + * + * \return Standard. + */ int generic_com_on(struct sender_status *ss, unsigned protocol) { int ret; @@ -274,6 +237,15 @@ int generic_com_on(struct sender_status *ss, unsigned protocol) return 1; } +/** + * Deactivate a paraslash sender. + * + * Shutdown all connected clients and stop listening on the TCP/DCCP socket. + * + * \param ss The sender to deactivate. + * + * \sa \ref del_close_on_fork_list(), shutdown_clients(). + */ void generic_com_off(struct sender_status *ss) { if (ss->listen_fd < 0) @@ -285,15 +257,47 @@ void generic_com_off(struct sender_status *ss) ss->listen_fd = -1; } -struct sender_client *accept_sender_client(struct sender_status *ss) +/** + * Accept a connection on the socket this server is listening on. + * + * \param ss The sender whose listening fd is ready for reading. + * \param rfds Passed to para_accept(), + * + * This must be called only if the socket fd of \a ss is ready for reading. It + * calls para_accept() to accept the connection and performs the following + * actions on the resulting file descriptor \a fd: + * + * - Checks whether the maximal number of connections are exceeded. + * - Sets \a fd to nonblocking mode. + * - Checks the acl of the sender to find out whether connections + * are allowed from the IP of the connecting peer. + * - Increases the number of connections for this sender. + * - Creates and initializes a new chunk queue for queuing network + * packets that can not be sent immediately. + * - Allocates a new struct sender_client and fills in its \a fd, \a cq + * and \a name members. + * - Adds \a fd to the list of connected clients for this sender. + * - Adds \a fd to the list of file descriptors that should be closed + * in the child process when the server calls fork(). + * + * \return A pointer to the allocated sender_client structure on success, \p + * NULL on errors. + * + * \sa \ref para_accept(), \ref mark_fd_nonblocking(), \ref acl_check_access(), + * \ref cq_new(), \ref add_close_on_fork_list(). + */ +struct sender_client *accept_sender_client(struct sender_status *ss, fd_set *rfds) { struct sender_client *sc; - int fd, ret = para_accept(ss->listen_fd, NULL, 0); - if (ret < 0) { + int fd, ret; + + if (ss->listen_fd < 0) + return NULL; + ret = para_accept(ss->listen_fd, rfds, NULL, 0, &fd); + if (ret < 0) PARA_ERROR_LOG("%s\n", para_strerror(-ret)); + if (ret <= 0) return NULL; - } - fd = ret; ret = -E_MAX_CLIENTS; if (ss->max_clients > 0 && ss->num_clients >= ss->max_clients) goto err_out; @@ -306,7 +310,7 @@ struct sender_client *accept_sender_client(struct sender_status *ss) ss->num_clients++; sc = para_calloc(sizeof(*sc)); sc->fd = fd; - sc->name = make_message("%s", remote_name(fd)); + sc->name = para_strdup(remote_name(fd)); sc->cq = cq_new(MAX_CQ_BYTES); para_list_add(&sc->node, &ss->client_list); add_close_on_fork_list(fd); @@ -319,11 +323,106 @@ err_out: return NULL; } +/** + * Get the generic help text. + * + * \return A dynamically allocated string containing the help text for + * a paraslash sender. + */ char *generic_sender_help(void) { return make_message( "usage: {on|off}\n" - "usage: {allow|deny} IP mask\n" - "example: allow 127.0.0.1 32\n" + "usage: {allow|deny} IP[/netmask]\n" + " where mask defaults to 32\n" + "example: allow 192.168.0.1/24\n" ); } + +static int parse_fec_parms(const char *arg, struct sender_command_data *scd) +{ + int32_t val; + char *a = para_strdup(arg), + *b = strchr(a, ':'), + *c = strrchr(a, ':'); + int ret = -E_COMMAND_SYNTAX; + + if (!b || !c) + goto out; + *b = *c = '\0'; + + ret = para_atoi32(a, &val); + if (ret < 0) + goto out; + + /* optional max_slice_bytes (0 means "use MTU") */ + if (b == c) { + scd->max_slice_bytes = 0; + } else { + if (val < 0 || val > 65535) + goto fec_einval; + scd->max_slice_bytes = val; + + ret = para_atoi32(b + 1, &val); + if (ret < 0) + goto out; + } + + /* k = data_slices_per_group */ + if (val < 0 || val > 255) + goto fec_einval; + scd->data_slices_per_group = val; + + /* n = slices_per_group */ + ret = para_atoi32(c + 1, &val); + if (ret < 0) + goto out; + if (val < 0 || val < scd->data_slices_per_group) + goto fec_einval; + scd->slices_per_group = val; + ret = 0; +out: + free(a); + return ret; +fec_einval: + ret = -ERRNO_TO_PARA_ERROR(EINVAL); + goto out; +} + +/** + * Parse a FEC URL string. + * + * \param arg the URL string to parse. + * \param scd The structure containing host, port and the FEC parameters. + * + * \return Standard. + * + * A FEC URL consists of an ordinary URL string according to RFC 3986, + * optionally followed by a slash and the three FEC parameters slice_size, + * data_slices_per_group and slices_per_group. The three FEC parameters are + * separated by colons. + * + * \sa \ref parse_url(). + */ +int parse_fec_url(const char *arg, struct sender_command_data *scd) +{ + char *a = para_strdup(arg), *p = strchr(a, '/'); + int ret = 0; + + /* default fec parameters */ + scd->max_slice_bytes = 0; + scd->data_slices_per_group = 14; + scd->slices_per_group = 16; + + if (p) { + *p = '\0'; + ret = parse_fec_parms(p + 1, scd); + if (ret < 0) + goto out; + } + if (!parse_url(a, scd->host, sizeof(scd->host), &scd->port)) + ret = -ERRNO_TO_PARA_ERROR(EINVAL); +out: + free(a); + return ret; +}