X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=send_common.c;h=43aeb2b3b7597d4d5e46dfc51b036c5868d0cda1;hp=d6c734b5c8bc7cc303252dc9a3debf132a5dc692;hb=335730538150250f32c0df0b184fb494e2bb0df3;hpb=2abba90245f87fab096edfc3faf7df61646b713f diff --git a/send_common.c b/send_common.c index d6c734b5..43aeb2b3 100644 --- a/send_common.c +++ b/send_common.c @@ -16,16 +16,19 @@ #include "afh.h" #include "afs.h" #include "server.h" +#include "acl.h" #include "send.h" #include "close_on_fork.h" #include "chunk_queue.h" #include "vss.h" +/** Clients will be kicked if there are more than that many bytes pending. */ +#define MAX_CQ_BYTES 40000 /** * Open a passive socket of given layer4 type. * - * Set the resultig file descriptor to nonblocking mode and add it to the list + * Set the resulting file descriptor to nonblocking mode and add it to the list * of fds that are being closed in the child process when the server calls * fork(). * @@ -34,7 +37,7 @@ * * \return The listening fd on success, negative on errors. */ -int open_sender(unsigned l4type, int port) +static int open_sender(unsigned l4type, int port) { int fd, ret = para_listen(AF_UNSPEC, l4type, port); @@ -51,15 +54,16 @@ int open_sender(unsigned l4type, int port) } /** - * Shut down a connected client. + * Shut down a client connected to a paraslash sender. * - * \param sc The client to be shut down. + * \param sc The client to shut down. + * \param ss The sender whose clients are to be shut down. * * Close the file descriptor given by \a sc, remove it from the close-on-fork * list, destroy the chunk queue of this client, delete the client from the * list of connected clients and free the sender_client struct. */ -void shutdown_client(struct sender_client *sc) +void shutdown_client(struct sender_client *sc, struct sender_status *ss) { PARA_INFO_LOG("shutting down %s on fd %d\n", sc->name, sc->fd); free(sc->name); @@ -69,6 +73,14 @@ void shutdown_client(struct sender_client *sc) list_del(&sc->node); free(sc->private_data); free(sc); + ss->num_clients--; +} + +void shutdown_clients(struct sender_status *ss) +{ + struct sender_client *sc, *tmp; + list_for_each_entry_safe(sc, tmp, &ss->client_list, node) + shutdown_client(sc, ss); } /** @@ -111,11 +123,12 @@ static int write_nonblock(int fd, const char *buf, size_t len, } static int queue_chunk_or_shutdown(struct sender_client *sc, - long unsigned chunk_num, size_t sent) + struct sender_status *ss, long unsigned chunk_num, + size_t sent) { int ret = cq_enqueue(sc->cq, chunk_num, sent); if (ret < 0) - shutdown_client(sc); + shutdown_client(sc, ss); return ret; } @@ -144,6 +157,7 @@ static int send_queued_chunks(struct sender_client *sc, * Send one chunk of audio data to a connected client. * * \param sc The client. + * \param ss The sender. * \param max_bytes_per_write Split writes to chunks of at most that many bytes. * \param current_chunk The number of the chunk to write. * \param buf The data to write. @@ -152,16 +166,19 @@ static int send_queued_chunks(struct sender_client *sc, * On errors, the client is shut down. If only a part of the buffer could be * written, the remainder is put into the chunk queue for that client. */ -void send_chunk(struct sender_client *sc, size_t max_bytes_per_write, - long unsigned current_chunk, const char *buf, size_t len) +void send_chunk(struct sender_client *sc, struct sender_status *ss, + size_t max_bytes_per_write, long unsigned current_chunk, + const char *buf, size_t len) { int ret; if (!sc->header_sent && current_chunk) { size_t header_len; - char *header_buf = vss_get_header(&header_len); + char *header_buf; + + vss_get_header(&header_buf, &header_len); if (header_buf && header_len > 0) { - ret = queue_chunk_or_shutdown(sc, -1U, 0); + ret = queue_chunk_or_shutdown(sc, ss, -1U, 0); if (ret < 0) goto out; } @@ -169,23 +186,233 @@ void send_chunk(struct sender_client *sc, size_t max_bytes_per_write, } ret = send_queued_chunks(sc, max_bytes_per_write); if (ret < 0) { - shutdown_client(sc); + shutdown_client(sc, ss); goto out; } if (!len) goto out; if (!ret) { /* still data left in the queue */ - ret = queue_chunk_or_shutdown(sc, current_chunk, 0); + ret = queue_chunk_or_shutdown(sc, ss, current_chunk, 0); goto out; } ret = write_nonblock(sc->fd, buf, len, max_bytes_per_write); if (ret < 0) { - shutdown_client(sc); + shutdown_client(sc, ss); goto out; } if (ret != len) - ret = queue_chunk_or_shutdown(sc, current_chunk, ret); + ret = queue_chunk_or_shutdown(sc, ss, current_chunk, ret); out: if (ret < 0) PARA_NOTICE_LOG("%s\n", para_strerror(-ret)); } + +/** + * Initialize a struct sender status. + * + * \param ss The struct to initialize. + * \param access_arg The array of access arguments given at the command line. + * \param num_access_args The number of elements in \a access_arg. + * \param port The tcp or dccp port to listen on. + * \param max_clients The maximal number of simultaneous connections. + * \param default_deny Whether a blacklist should be used for access control. + */ +void init_sender_status(struct sender_status *ss, char **access_arg, + int num_access_args, int port, int max_clients, int default_deny) +{ + ss->listen_fd = -1; + INIT_LIST_HEAD(&ss->client_list); + ss->port = port; + acl_init(&ss->acl, access_arg, num_access_args); + ss->num_clients = 0; + ss->max_clients = max_clients; + ss->default_deny = default_deny; +} + +/** + * Return a string containing the current status of a sender. + * + * \param ss The sender. + * \param name Used for printing the header line. + * + * \return The string printed in the "si" command. + */ +char *get_sender_info(struct sender_status *ss, char *name) +{ + char *clnts = NULL, *ret; + struct sender_client *sc, *tmp_sc; + + char *acl_contents = acl_get_contents(&ss->acl); + list_for_each_entry_safe(sc, tmp_sc, &ss->client_list, node) { + char *tmp = make_message("%s%s ", clnts? clnts : "", sc->name); + free(clnts); + clnts = tmp; + } + ret = make_message( + "%s sender:\n" + "\tstatus: %s\n" + "\tport: %d\n" + "\tnumber of connected clients: %d\n" + "\tmaximal number of clients: %d%s\n" + "\tconnected clients: %s\n" + "\taccess %s list: %s\n", + name, + (ss->listen_fd >= 0)? "on" : "off", + ss->port, + ss->num_clients, + ss->max_clients, + ss->max_clients > 0? "" : " (unlimited)", + clnts? clnts : "(none)", + ss->default_deny? "allow" : "deny", + acl_contents? acl_contents : "(empty)" + ); + free(acl_contents); + free(clnts); + return ret; +} + +/** + * Allow connections from the given range of IP addresses. + * + * \param scd Contains the IP and the netmask. + * \param ss The sender. + * + * \sa generic_com_deny(). + */ +void generic_com_allow(struct sender_command_data *scd, + struct sender_status *ss) +{ + acl_allow(scd->addr, scd->netmask, &ss->acl, ss->default_deny); +} + +/** + * Deny connections from the given range of IP addresses. + * + * \param scd see \ref generic_com_allow(). + * \param ss see \ref generic_com_allow(). + * + * \sa generic_com_allow(). + */ +void generic_com_deny(struct sender_command_data *scd, + struct sender_status *ss) +{ + acl_deny(scd->addr, scd->netmask, &ss->acl, ss->default_deny); +} + +/** + * Activate a paraslash sender. + * + * \param ss The sender to activate. + * \param protocol The symbolic name of the transport-layer protocol. + * + * \return Standard. + */ +int generic_com_on(struct sender_status *ss, unsigned protocol) +{ + int ret; + + if (ss->listen_fd >= 0) + return 1; + ret = open_sender(protocol, ss->port); + if (ret < 0) + return ret; + ss->listen_fd = ret; + return 1; +} + +/** + * Deactivate a paraslash sender. + * + * Shutdown all connected clients and stop listening on the TCP/DCCP socket. + * + * \param ss The sender to deactivate. + * + * \sa \ref del_close_on_fork_list(), shutdown_clients(). + */ +void generic_com_off(struct sender_status *ss) +{ + if (ss->listen_fd < 0) + return; + PARA_NOTICE_LOG("closing port %d\n", ss->port); + close(ss->listen_fd); + del_close_on_fork_list(ss->listen_fd); + shutdown_clients(ss); + ss->listen_fd = -1; +} + +/** + * Accept a connection on the socket this server is listening on. + * + * \param ss The sender whose listening fd is ready for reading. + * + * This must be called only if the socket fd of \a ss is ready for reading. It + * calls para_accept() to accept the connection and performs the following + * actions on the resulting file descriptor \a fd: + * + * - Checks whether the maximal number of connections are exceeded. + * - Sets \a fd to nonblocking mode. + * - Checks the acl of the sender to find out whether connections + * are allowed from the IP of the connecting peer. + * - Increases the number of connections for this sender. + * - Creates and initializes a new chunk queue for queuing network + * packets that can not be sent immediately. + * - Allocates a new struct sender_client and fills in its \a fd, \a cq + * and \a name members. + * - Adds \a fd to the list of connected clients for this sender. + * - Adds \a fd to the list of file descriptors that should be closed + * in the child process when the server calls fork(). + * + * \return A pointer to the allocated sender_client structure on success, \p + * NULL on errors. + * + * \sa \ref para_accept(), \ref mark_fd_nonblocking(), \ref acl_check_access(), + * \ref cq_new(), \ref add_close_on_fork_list(). + */ +struct sender_client *accept_sender_client(struct sender_status *ss) +{ + struct sender_client *sc; + int fd, ret = para_accept(ss->listen_fd, NULL, 0); + if (ret < 0) { + PARA_ERROR_LOG("%s\n", para_strerror(-ret)); + return NULL; + } + fd = ret; + ret = -E_MAX_CLIENTS; + if (ss->max_clients > 0 && ss->num_clients >= ss->max_clients) + goto err_out; + ret = mark_fd_nonblocking(fd); + if (ret < 0) + goto err_out; + ret = acl_check_access(fd, &ss->acl, ss->default_deny); + if (ret < 0) + goto err_out; + ss->num_clients++; + sc = para_calloc(sizeof(*sc)); + sc->fd = fd; + sc->name = make_message("%s", remote_name(fd)); + sc->cq = cq_new(MAX_CQ_BYTES); + para_list_add(&sc->node, &ss->client_list); + add_close_on_fork_list(fd); + PARA_INFO_LOG("accepted client #%d: %s (fd %d)\n", ss->num_clients, + sc->name, fd); + return sc; +err_out: + PARA_WARNING_LOG("%s\n", para_strerror(-ret)); + close(fd); + return NULL; +} + +/** + * Get the generic help text. + * + * \return A dynamically allocated string containing the help text for + * a paraslash sender. + */ +char *generic_sender_help(void) +{ + return make_message( + "usage: {on|off}\n" + "usage: {allow|deny} IP mask\n" + "example: allow 127.0.0.1 32\n" + ); +}