X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=send_common.c;h=354c9ed509deddc14f4528ce318e0d1ad90a8b7e;hp=388fde03bdd4399d40ef3d9ad127f81a8ea75cb3;hb=4744d937c4160898d1fe151257606430750e580c;hpb=8ea8abb73199b32fdd7afdf8825afa42ed8de244 diff --git a/send_common.c b/send_common.c index 388fde03..354c9ed5 100644 --- a/send_common.c +++ b/send_common.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2005-2009 Andre Noll + * Copyright (C) 2005-2012 Andre Noll * * Licensed under the GPL v2. For licencing details see COPYING. */ @@ -7,7 +7,6 @@ /** \file send_common.c Functions used by more than one paraslash sender. */ #include -#include #include #include "para.h" @@ -23,6 +22,7 @@ #include "send.h" #include "close_on_fork.h" #include "chunk_queue.h" +#include "sched.h" #include "vss.h" /** Clients will be kicked if there are more than that many bytes pending. */ @@ -42,7 +42,7 @@ */ static int open_sender(unsigned l4type, int port) { - int fd, ret = para_listen(AF_UNSPEC, l4type, port); + int fd, ret = para_listen_simple(l4type, port); if (ret < 0) return ret; @@ -96,34 +96,24 @@ void shutdown_clients(struct sender_status *ss) shutdown_client(sc, ss); } -static int queue_chunk_or_shutdown(struct sender_client *sc, - struct sender_status *ss, const char *buf, size_t num_bytes) -{ - int ret = cq_enqueue(sc->cq, buf, num_bytes); - if (ret < 0) - shutdown_client(sc, ss); - return ret; -} - /** * Try to empty the chunk queue for this fd. * * \param fd The file descriptor. * \param cq The list of queued chunks. - * \param max_bytes_per_write Do not send more than this in one go. * * \return Negative on errors, zero if not everything was sent, one otherwise. */ -int send_queued_chunks(int fd, struct chunk_queue *cq, - size_t max_bytes_per_write) +int send_queued_chunks(int fd, struct chunk_queue *cq) { struct queued_chunk *qc; while ((qc = cq_peek(cq))) { const char *buf; size_t len; int ret; + cq_get(qc, &buf, &len); - ret = write_nonblock(fd, buf, len, max_bytes_per_write); + ret = xwrite(fd, buf, len); if (ret < 0) return ret; cq_update(cq, ret); @@ -134,59 +124,6 @@ int send_queued_chunks(int fd, struct chunk_queue *cq, return 1; } -/** - * Send one chunk of audio data to a connected client. - * - * \param sc The client. - * \param ss The sender. - * \param max_bytes_per_write Split writes to chunks of at most that many bytes. - * \param current_chunk The number of the chunk to write. - * \param buf The data to write. - * \param len The number of bytes of \a buf. - * \param header_buf The audio file header. - * \param header_len The number of bytes of \a header_buf. - * - * On errors, the client is shut down. If only a part of the buffer could be - * written, the remainder is put into the chunk queue for that client. - */ -void send_chunk(struct sender_client *sc, struct sender_status *ss, - size_t max_bytes_per_write, long unsigned current_chunk, - const char *buf, size_t len, const char *header_buf, - size_t header_len) -{ - int ret; - - if (!sc->header_sent && current_chunk) { - if (header_buf && header_len > 0) { - ret = queue_chunk_or_shutdown(sc, ss, header_buf, header_len); - if (ret < 0) - goto out; - } - sc->header_sent = 1; - } - ret = send_queued_chunks(sc->fd, sc->cq, max_bytes_per_write); - if (ret < 0) { - shutdown_client(sc, ss); - goto out; - } - if (!len) - goto out; - if (!ret) { /* still data left in the queue */ - ret = queue_chunk_or_shutdown(sc, ss, buf, len); - goto out; - } - ret = write_nonblock(sc->fd, buf, len, max_bytes_per_write); - if (ret < 0) { - shutdown_client(sc, ss); - goto out; - } - if (ret != len) - ret = queue_chunk_or_shutdown(sc, ss, buf + ret, len - ret); -out: - if (ret < 0) - PARA_NOTICE_LOG("%s\n", para_strerror(-ret)); -} - /** * Initialize a struct sender status. * @@ -231,14 +168,14 @@ char *get_sender_info(struct sender_status *ss, const char *name) ret = make_message( "%s sender:\n" "\tstatus: %s\n" - "\tport: %d\n" + "\tport: %s\n" "\tnumber of connected clients: %d\n" "\tmaximal number of clients: %d%s\n" "\tconnected clients: %s\n" "\taccess %s list: %s\n", name, (ss->listen_fd >= 0)? "on" : "off", - ss->port, + stringify_port(ss->port, strcmp(name, "http") ? "dccp" : "tcp"), ss->num_clients, ss->max_clients, ss->max_clients > 0? "" : " (unlimited)", @@ -324,6 +261,7 @@ void generic_com_off(struct sender_status *ss) * Accept a connection on the socket this server is listening on. * * \param ss The sender whose listening fd is ready for reading. + * \param rfds Passed to para_accept(), * * This must be called only if the socket fd of \a ss is ready for reading. It * calls para_accept() to accept the connection and performs the following @@ -348,15 +286,18 @@ void generic_com_off(struct sender_status *ss) * \sa \ref para_accept(), \ref mark_fd_nonblocking(), \ref acl_check_access(), * \ref cq_new(), \ref add_close_on_fork_list(). */ -struct sender_client *accept_sender_client(struct sender_status *ss) +struct sender_client *accept_sender_client(struct sender_status *ss, fd_set *rfds) { struct sender_client *sc; - int fd, ret = para_accept(ss->listen_fd, NULL, 0); - if (ret < 0) { + int fd, ret; + + if (ss->listen_fd < 0) + return NULL; + ret = para_accept(ss->listen_fd, rfds, NULL, 0, &fd); + if (ret < 0) PARA_ERROR_LOG("%s\n", para_strerror(-ret)); + if (ret <= 0) return NULL; - } - fd = ret; ret = -E_MAX_CLIENTS; if (ss->max_clients > 0 && ss->num_clients >= ss->max_clients) goto err_out; @@ -369,7 +310,7 @@ struct sender_client *accept_sender_client(struct sender_status *ss) ss->num_clients++; sc = para_calloc(sizeof(*sc)); sc->fd = fd; - sc->name = make_message("%s", remote_name(fd)); + sc->name = para_strdup(remote_name(fd)); sc->cq = cq_new(MAX_CQ_BYTES); para_list_add(&sc->node, &ss->client_list); add_close_on_fork_list(fd); @@ -401,47 +342,51 @@ char *generic_sender_help(void) static int parse_fec_parms(const char *arg, struct sender_command_data *scd) { int32_t val; - char *a = para_strdup(arg), *b = a, *e = strchr(b, ':'); + char *a = para_strdup(arg), + *b = strchr(a, ':'), + *c = strrchr(a, ':'); int ret = -E_COMMAND_SYNTAX; - /* parse max slice bytes */ - if (!e) - goto out; - *e = '\0'; - ret = para_atoi32(b, &val); - if (ret < 0) - goto out; - ret = -ERRNO_TO_PARA_ERROR(EINVAL); - if (val < 0 || val > 65535) + if (!b || !c) goto out; - scd->max_slice_bytes = val; - /* parse data_slices_per_group */ - b = e + 1; - e = strchr(b, ':'); - ret = -E_COMMAND_SYNTAX; - if (!e) - goto out; - *e = '\0'; - ret = para_atoi32(b, &val); + *b = *c = '\0'; + + ret = para_atoi32(a, &val); if (ret < 0) goto out; - ret = -ERRNO_TO_PARA_ERROR(EINVAL); + + /* optional max_slice_bytes (0 means "use MTU") */ + if (b == c) { + scd->max_slice_bytes = 0; + } else { + if (val < 0 || val > 65535) + goto fec_einval; + scd->max_slice_bytes = val; + + ret = para_atoi32(b + 1, &val); + if (ret < 0) + goto out; + } + + /* k = data_slices_per_group */ if (val < 0 || val > 255) - goto out; + goto fec_einval; scd->data_slices_per_group = val; - /* parse slices_per_group */ - b = e + 1; - ret = para_atoi32(b, &val); + + /* n = slices_per_group */ + ret = para_atoi32(c + 1, &val); if (ret < 0) goto out; - ret = -ERRNO_TO_PARA_ERROR(EINVAL); if (val < 0 || val < scd->data_slices_per_group) - goto out; + goto fec_einval; scd->slices_per_group = val; ret = 0; out: free(a); return ret; +fec_einval: + ret = -ERRNO_TO_PARA_ERROR(EINVAL); + goto out; } /** @@ -461,26 +406,22 @@ out: */ int parse_fec_url(const char *arg, struct sender_command_data *scd) { - int ret; - ssize_t len = sizeof(scd->host); char *a = para_strdup(arg), *p = strchr(a, '/'); + int ret = 0; + + /* default fec parameters */ + scd->max_slice_bytes = 0; + scd->data_slices_per_group = 14; + scd->slices_per_group = 16; if (p) { *p = '\0'; - len = strlen(a); - } - ret = -ERRNO_TO_PARA_ERROR(EINVAL); - if (!parse_url(a, scd->host, len, &scd->port)) - goto out; - if (p) { ret = parse_fec_parms(p + 1, scd); - goto out; + if (ret < 0) + goto out; } - /* use default fec parameters. */ - scd->max_slice_bytes = 1490; - scd->slices_per_group = 16; - scd->data_slices_per_group = 14; - ret = 0; + if (!parse_url(a, scd->host, sizeof(scd->host), &scd->port)) + ret = -ERRNO_TO_PARA_ERROR(EINVAL); out: free(a); return ret;