Intoduce send_common.c and use it from the dccp sender.
authorAndre Noll <maan@systemlinux.org>
Sat, 2 Feb 2008 20:28:51 +0000 (21:28 +0100)
committerAndre Noll <maan@systemlinux.org>
Sat, 2 Feb 2008 20:28:51 +0000 (21:28 +0100)
This new file contains code that was identical for the
dccp and the http sender. A subsequent patch will convert
the http sender to also use the shared code in send_common.c

command.c
configure.ac
dccp_send.c
error.h
http_send.c
ortp_send.c
send.h
send_common.c [new file with mode: 0644]
server.c
vss.c

index 3185533..995dc4a 100644 (file)
--- a/command.c
+++ b/command.c
 #include "afs.h"
 #include "server.h"
 #include "vss.h"
+#include "list.h"
 #include "send.h"
 #include "rc4.h"
 #include "net.h"
 #include "daemon.h"
 #include "fd.h"
-#include "list.h"
 #include "user_list.h"
 #include "server_command_list.h"
 #include "afs_command_list.h"
index a8efcc1..c612a54 100644 (file)
@@ -83,7 +83,7 @@ daemon stat crypt http_send close_on_fork ipc acl
 dccp_send fd user_list chunk_queue afs osl aft mood score attribute blob ringbuffer
 playlist sha1 rbtree sched audiod grab_client filter_chain wav compress
 http_recv dccp_recv recv_common write_common file_write audiod_command
-client_common recv stdout filter stdin audioc write client fsck exec"
+client_common recv stdout filter stdin audioc write client fsck exec send_common"
 all_executables="server audiod recv filter audioc write client fsck"
 
 recv_cmdline_objs="recv.cmdline http_recv.cmdline dccp_recv.cmdline"
@@ -117,7 +117,7 @@ server_cmdline_objs="server.cmdline server_command_list afs_command_list"
 server_errlist_objs="server afh_common mp3_afh vss command net string signal
        time daemon stat crypt http_send close_on_fork
        ipc dccp_send fd user_list chunk_queue afs osl aft mood score attribute
-       blob playlist sha1 rbtree sched acl"
+       blob playlist sha1 rbtree sched acl send_common"
 server_ldflags=""
 server_audio_formats=" mp3"
 
index 887801a..c68f577 100644 (file)
@@ -36,32 +36,19 @@ static int listen_fd = -1;
 /** Maximal number of bytes in a chunk queue. */
 #define DCCP_MAX_PENDING_BYTES 40000
 
-/** describes one connected client */
-struct dccp_client {
-       /** the dccp socket */
-       int fd;
-       /** The socket `name' of the client. */
-       char *name;
-       /** the position of this client in the client list */
-       struct list_head node;
-       /** non-zero if audio file header has been sent */
-       int header_sent;
-       /** The list of pending chunks for this client. */
-       struct chunk_queue *cq;
-};
+/** Do not write more than that many bytes at once. */
+#define DCCP_MAX_BYTES_PER_WRITE 1024
 
 static void dccp_pre_select(int *max_fileno, fd_set *rfds,
                __a_unused fd_set *wfds)
 {
-       if (listen_fd < 0)
-               return;
-       FD_SET(listen_fd, rfds);
-       *max_fileno = PARA_MAX(*max_fileno, listen_fd);
+       if (listen_fd >= 0)
+               para_fd_set(listen_fd, rfds, max_fileno);
 }
 
 static void dccp_post_select(fd_set *rfds, __a_unused fd_set *wfds)
 {
-       struct dccp_client *dc;
+       struct sender_client *sc;
        int ret, fd;
 
        if (listen_fd < 0 || !FD_ISSET(listen_fd, rfds))
@@ -78,161 +65,51 @@ static void dccp_post_select(fd_set *rfds, __a_unused fd_set *wfds)
         * reduce processing costs a bit. See analogous comment in dccp_recv.c.
         */
        if (shutdown(fd, SHUT_RD) < 0) {
-               PARA_ERROR_LOG("shutdown(SHUT_RD): %s\n", strerror(errno));
+               ret = -ERRNO_TO_PARA_ERROR(errno);
                goto err;
        }
        ret = mark_fd_nonblocking(fd);
-       if (ret < 0) {
-               PARA_ERROR_LOG("%s\n", para_strerror(-ret));
+       if (ret < 0)
                goto err;
-       }
-       dc = para_calloc(sizeof(struct dccp_client));
-       dc->fd = fd;
-       dc->name = make_message("%s", remote_name(dc->fd));
-       PARA_NOTICE_LOG("connection from %s\n", dc->name);
-       para_list_add(&dc->node, &clients);
-       add_close_on_fork_list(dc->fd);
-       dc->cq = cq_new(DCCP_MAX_PENDING_BYTES);
+       sc = para_calloc(sizeof(*sc));
+       sc->fd = fd;
+       sc->name = make_message("%s", remote_name(sc->fd));
+       PARA_NOTICE_LOG("connection from %s\n", sc->name);
+       para_list_add(&sc->node, &clients);
+       add_close_on_fork_list(sc->fd);
+       sc->cq = cq_new(DCCP_MAX_PENDING_BYTES);
        return;
 err:
+       PARA_ERROR_LOG("%s\n", para_strerror(-ret));
        close(fd);
 }
 
-static int dccp_open(void)
-{
-       int ret = para_listen(AF_UNSPEC, IPPROTO_DCCP, conf.dccp_port_arg);
-
-       if (ret < 0)
-               return ret;
-       listen_fd = ret;
-       ret = mark_fd_nonblocking(listen_fd);
-       if (ret < 0) {
-               PARA_EMERG_LOG("%s\n", para_strerror(-ret));
-               exit(EXIT_FAILURE);
-       }
-       add_close_on_fork_list(listen_fd);
-       return 1;
-}
-
-static void dccp_shutdown_client(struct dccp_client *dc)
-{
-       PARA_DEBUG_LOG("shutting down %s (fd %d)\n", dc->name, dc->fd);
-       free(dc->name);
-       close(dc->fd);
-       del_close_on_fork_list(dc->fd);
-       cq_destroy(dc->cq);
-       list_del(&dc->node);
-       free(dc);
-}
-
-/*
- * ret: Negative on errors, zero if nothing was written and write returned
- * EAGAIN, number of bytes written else.
- */
-static int dccp_write(int fd, const char *buf, size_t len)
-{
-       size_t written = 0;
-       int ret = 0;
-
-       while (written < len) {
-               ret = write(fd, buf + written, PARA_MIN(1024, len - written));
-               /*
-                * Error handling: CCID3 has a sending wait queue which fills
-                * up and is emptied asynchronously. The EAGAIN case means that
-                * there is currently no space in the wait queue, but this can
-                * change at any moment and is thus not an error condition.
-                */
-               if (ret < 0 && errno == EAGAIN)
-                       return written;
-               if (ret < 0)
-                       return -ERRNO_TO_PARA_ERROR(errno);
-               written += ret;
-       }
-       return written;
-}
-
-static int queue_chunk_or_shutdown(struct dccp_client *dc, long unsigned chunk_num,
-       size_t sent)
-{
-       int ret = cq_enqueue(dc->cq, chunk_num, sent);
-       if (ret < 0) {
-               PARA_NOTICE_LOG("enqueue error\n");
-               dccp_shutdown_client(dc);
-       }
-       return ret;
-}
-
-static int send_queued_chunks(struct dccp_client *dc)
-{
-       struct queued_chunk *qc;
-       while ((qc = cq_peek(dc->cq))) {
-               char *buf;
-               size_t len;
-               int ret;
-               cq_get(qc, &buf, &len);
-               ret = dccp_write(dc->fd, buf, len);
-               if (ret < 0)
-                       return ret;
-               cq_update(dc->cq, ret);
-               if (ret != len)
-                       return 1;
-               cq_dequeue(dc->cq);
-       }
-       return 1;
-}
-
 static void dccp_send(long unsigned current_chunk,
                __a_unused long unsigned chunks_sent, const char *buf, size_t len)
 {
-       struct dccp_client *dc, *tmp;
-       int ret;
-       char *header_buf;
+       struct sender_client *sc, *tmp;
 
-       list_for_each_entry_safe(dc, tmp, &clients, node) {
-               if (!dc->header_sent && current_chunk) {
-                       size_t header_len;
-                       header_buf = vss_get_header(&header_len);
-                       if (header_buf && header_len > 0) {
-                               if (queue_chunk_or_shutdown(dc, -1U, 0) < 0)
-                                       continue;
-                       }
-                       dc->header_sent = 1;
-               }
-               ret = send_queued_chunks(dc);
-               if (ret < 0) {
-                       dccp_shutdown_client(dc);
-                       continue;
-               }
-               if (!len)
-                       continue;
-//             PARA_DEBUG_LOG("writing %d bytes to fd %d\n", len, dc->fd);
-               ret = dccp_write(dc->fd, buf, len);
-               if (ret < 0) {
-                       PARA_NOTICE_LOG("%s\n", para_strerror(-ret));
-                       dccp_shutdown_client(dc);
-                       continue;
-               }
-               if (ret != len)
-                       queue_chunk_or_shutdown(dc, current_chunk, ret);
-       }
+       list_for_each_entry_safe(sc, tmp, &clients, node)
+               send_chunk(sc, DCCP_MAX_BYTES_PER_WRITE, current_chunk, buf,
+                       len);
 }
 
 static void dccp_shutdown_clients(void)
 {
-       struct dccp_client *dc, *tmp;
+       struct sender_client *sc, *tmp;
 
-       list_for_each_entry_safe(dc, tmp, &clients, node)
-               dccp_shutdown_client(dc);
+       list_for_each_entry_safe(sc, tmp, &clients, node)
+               shutdown_client(sc);
 }
 
 static char *dccp_info(void)
 {
        static char *buf;
        int num_clients = 0;
-       struct dccp_client *dc, *tmp;
+       struct sender_client *sc, *tmp;
 
        free(buf);
-       list_for_each_entry_safe(dc, tmp, &clients, node)
+       list_for_each_entry_safe(sc, tmp, &clients, node)
                num_clients++;
        buf = make_message("dccp connected clients: %d\n", num_clients);
        return buf;
@@ -244,9 +121,9 @@ static char *dccp_help(void)
 }
 
 /**
- * the init function of the dccp sender
+ * The init function of the dccp sender.
  *
- * \param s pointer to the dccp sender struct
+ * \param s pointer to the dccp sender struct.
  *
  * It initializes all function pointers of \a s and starts
  * listening on the given port.
@@ -268,7 +145,9 @@ void dccp_send_init(struct sender *s)
        s->client_cmds[SENDER_ALLOW] = NULL;
        s->client_cmds[SENDER_ADD] = NULL;
        s->client_cmds[SENDER_DELETE] = NULL;
-       ret = dccp_open();
+       ret = open_sender(IPPROTO_DCCP, conf.dccp_port_arg);
        if (ret < 0)
                PARA_ERROR_LOG("%s\n", para_strerror(-ret));
+       else
+               listen_fd = ret;
 }
diff --git a/error.h b/error.h
index ece054b..3a05fe0 100644 (file)
--- a/error.h
+++ b/error.h
@@ -28,6 +28,7 @@ DEFINE_ERRLIST_OBJECT_ENUM;
 #define RBTREE_ERRORS
 #define RECV_ERRORS
 #define ACL_ERRORS
+#define SEND_COMMON_ERRORS
 
 extern const char **para_errlist[];
 
index 2646ebb..b629d95 100644 (file)
@@ -18,8 +18,8 @@
 #include "server.h"
 #include "http.h"
 #include "vss.h"
-#include "send.h"
 #include "list.h"
+#include "send.h"
 #include "close_on_fork.h"
 #include "net.h"
 #include "fd.h"
index 6a9d19c..96f4b40 100644 (file)
@@ -17,8 +17,8 @@
 #include "afs.h"
 #include "server.h"
 #include "vss.h"
-#include "send.h"
 #include "list.h"
+#include "send.h"
 #include "ortp.h"
 
 /** Convert in_addr to ascii. */
diff --git a/send.h b/send.h
index 94d8afd..69ebd37 100644 (file)
--- a/send.h
+++ b/send.h
@@ -87,3 +87,24 @@ struct sender {
         */
        int (*client_cmds[NUM_SENDER_CMDS])(struct sender_command_data*);
 };
+
+/** Describes one client, connected to a paraslash sender. */
+struct sender_client {
+       /** The file descriptor of the client. */
+       int fd;
+       /** The socket "name" of the client. */
+       char *name;
+       /** The position of this client in the client list. */
+       struct list_head node;
+       /** Non-zero if audio file header has been sent. */
+       int header_sent;
+       /** The list of pending chunks for this client. */
+       struct chunk_queue *cq;
+       /** Data specific to the particular sender. */
+       void *private_data;
+};
+
+int open_sender(unsigned l4type, int port);
+void shutdown_client(struct sender_client *sc);
+void send_chunk(struct sender_client *sc, size_t max_bytes_per_write,
+               long unsigned current_chunk, const char *buf, size_t len);
diff --git a/send_common.c b/send_common.c
new file mode 100644 (file)
index 0000000..d6c734b
--- /dev/null
@@ -0,0 +1,191 @@
+/*
+ * Copyright (C) 2005-2008 Andre Noll <maan@systemlinux.org>
+ *
+ * Licensed under the GPL v2. For licencing details see COPYING.
+ */
+
+/** \file send_common.c Functions used by more than one paraslash sender. */
+
+#include <dirent.h>
+#include "para.h"
+#include "error.h"
+#include "string.h"
+#include "fd.h"
+#include "net.h"
+#include "list.h"
+#include "afh.h"
+#include "afs.h"
+#include "server.h"
+#include "send.h"
+#include "close_on_fork.h"
+#include "chunk_queue.h"
+#include "vss.h"
+
+
+/**
+ * Open a passive socket of given layer4 type.
+ *
+ * Set the resultig file descriptor to nonblocking mode and add it to the list
+ * of fds that are being closed in the child process when the server calls
+ * fork().
+ *
+ * \param l4type The transport-layer protocol.
+ * \param port The port number.
+ *
+ * \return The listening fd on success, negative on errors.
+ */
+int open_sender(unsigned l4type, int port)
+{
+       int fd, ret = para_listen(AF_UNSPEC, l4type, port);
+
+       if (ret < 0)
+               return ret;
+       fd = ret;
+       ret = mark_fd_nonblocking(fd);
+       if (ret < 0) {
+               close(fd);
+               return ret;
+       }
+       add_close_on_fork_list(fd);
+       return fd;
+}
+
+/**
+ * Shut down a connected client.
+ *
+ * \param sc The client to be shut down.
+ *
+ * Close the file descriptor given by \a sc, remove it from the close-on-fork
+ * list, destroy the chunk queue of this client, delete the client from the
+ * list of connected clients and free the sender_client struct.
+ */
+void shutdown_client(struct sender_client *sc)
+{
+       PARA_INFO_LOG("shutting down %s on fd %d\n", sc->name, sc->fd);
+       free(sc->name);
+       close(sc->fd);
+       del_close_on_fork_list(sc->fd);
+       cq_destroy(sc->cq);
+       list_del(&sc->node);
+       free(sc->private_data);
+       free(sc);
+}
+
+/**
+ * Write a buffer to a non-blocking file descriptor.
+ *
+ * \param fd The file descriptor.
+ * \param buf the buffer to write.
+ * \param len the number of bytes of \a buf.
+ * \param max_bytes_per_write Do not write more than that many bytes at once.
+ *
+ * If \a max_bytes_per_write is non-zero, do not send more than that many bytes
+ * per write().
+ *
+ * EAGAIN is not considered an error condition.  For example CCID3 has a
+ * sending wait queue which fills up and is emptied asynchronously. The EAGAIN
+ * case means that there is currently no space in the wait queue, but this can
+ * change at any moment.
+ *
+ * \return Negative on errors, number of bytes written else.
+ */
+static int write_nonblock(int fd, const char *buf, size_t len,
+               size_t max_bytes_per_write)
+{
+       size_t written = 0;
+       int ret = 0;
+
+       while (written < len) {
+               size_t num = len - written;
+
+               if (max_bytes_per_write && max_bytes_per_write < num)
+                       num = max_bytes_per_write;
+               ret = write(fd, buf + written, num);
+               if (ret < 0 && errno == EAGAIN)
+                       return written;
+               if (ret < 0)
+                       return -ERRNO_TO_PARA_ERROR(errno);
+               written += ret;
+       }
+       return written;
+}
+
+static int queue_chunk_or_shutdown(struct sender_client *sc,
+               long unsigned chunk_num, size_t sent)
+{
+       int ret = cq_enqueue(sc->cq, chunk_num, sent);
+       if (ret < 0)
+               shutdown_client(sc);
+       return ret;
+}
+
+/* return: negative on errors, zero if not everything was sent, one otherwise */
+static int send_queued_chunks(struct sender_client *sc,
+               size_t max_bytes_per_write)
+{
+       struct queued_chunk *qc;
+       while ((qc = cq_peek(sc->cq))) {
+               char *buf;
+               size_t len;
+               int ret;
+               cq_get(qc, &buf, &len);
+               ret = write_nonblock(sc->fd, buf, len, max_bytes_per_write);
+               if (ret < 0)
+                       return ret;
+               cq_update(sc->cq, ret);
+               if (ret != len)
+                       return 0;
+               cq_dequeue(sc->cq);
+       }
+       return 1;
+}
+
+/**
+ * Send one chunk of audio data to a connected client.
+ *
+ * \param sc The client.
+ * \param max_bytes_per_write Split writes to chunks of at most that many bytes.
+ * \param current_chunk The number of the chunk to write.
+ * \param buf The data to write.
+ * \param len The number of bytes of \a buf.
+ *
+ * On errors, the client is shut down. If only a part of the buffer could be
+ * written, the remainder is put into the chunk queue for that client.
+ */
+void send_chunk(struct sender_client *sc, size_t max_bytes_per_write,
+               long unsigned current_chunk, const char *buf, size_t len)
+{
+       int ret;
+
+       if (!sc->header_sent && current_chunk) {
+               size_t header_len;
+               char *header_buf = vss_get_header(&header_len);
+               if (header_buf && header_len > 0) {
+                       ret = queue_chunk_or_shutdown(sc, -1U, 0);
+                       if (ret < 0)
+                               goto out;
+               }
+               sc->header_sent = 1;
+       }
+       ret = send_queued_chunks(sc, max_bytes_per_write);
+       if (ret < 0) {
+               shutdown_client(sc);
+               goto out;
+       }
+       if (!len)
+               goto out;
+       if (!ret) { /* still data left in the queue */
+               ret = queue_chunk_or_shutdown(sc, current_chunk, 0);
+               goto out;
+       }
+       ret = write_nonblock(sc->fd, buf, len, max_bytes_per_write);
+       if (ret < 0) {
+               shutdown_client(sc);
+               goto out;
+       }
+       if (ret != len)
+               ret = queue_chunk_or_shutdown(sc, current_chunk, ret);
+out:
+       if (ret < 0)
+               PARA_NOTICE_LOG("%s\n", para_strerror(-ret));
+}
index f6bc07d..aa8fdda 100644 (file)
--- a/server.c
+++ b/server.c
 #include "vss.h"
 #include "config.h"
 #include "close_on_fork.h"
+#include "list.h"
 #include "send.h"
 #include "net.h"
 #include "daemon.h"
 #include "ipc.h"
 #include "fd.h"
-#include "list.h"
 #include "sched.h"
 #include "signal.h"
 #include "user_list.h"
diff --git a/vss.c b/vss.c
index 1f1f6c1..d74cec3 100644 (file)
--- a/vss.c
+++ b/vss.c
@@ -25,6 +25,7 @@
 #include "net.h"
 #include "server.cmdline.h"
 #include "vss.h"
+#include "list.h"
 #include "send.h"
 #include "ipc.h"
 #include "fd.h"