]> git.tuebingen.mpg.de Git - paraslash.git/blobdiff - send_common.c
Intoduce send_common.c and use it from the dccp sender.
[paraslash.git] / send_common.c
diff --git a/send_common.c b/send_common.c
new file mode 100644 (file)
index 0000000..d6c734b
--- /dev/null
@@ -0,0 +1,191 @@
+/*
+ * Copyright (C) 2005-2008 Andre Noll <maan@systemlinux.org>
+ *
+ * Licensed under the GPL v2. For licencing details see COPYING.
+ */
+
+/** \file send_common.c Functions used by more than one paraslash sender. */
+
+#include <dirent.h>
+#include "para.h"
+#include "error.h"
+#include "string.h"
+#include "fd.h"
+#include "net.h"
+#include "list.h"
+#include "afh.h"
+#include "afs.h"
+#include "server.h"
+#include "send.h"
+#include "close_on_fork.h"
+#include "chunk_queue.h"
+#include "vss.h"
+
+
+/**
+ * Open a passive socket of given layer4 type.
+ *
+ * Set the resultig file descriptor to nonblocking mode and add it to the list
+ * of fds that are being closed in the child process when the server calls
+ * fork().
+ *
+ * \param l4type The transport-layer protocol.
+ * \param port The port number.
+ *
+ * \return The listening fd on success, negative on errors.
+ */
+int open_sender(unsigned l4type, int port)
+{
+       int fd, ret = para_listen(AF_UNSPEC, l4type, port);
+
+       if (ret < 0)
+               return ret;
+       fd = ret;
+       ret = mark_fd_nonblocking(fd);
+       if (ret < 0) {
+               close(fd);
+               return ret;
+       }
+       add_close_on_fork_list(fd);
+       return fd;
+}
+
+/**
+ * Shut down a connected client.
+ *
+ * \param sc The client to be shut down.
+ *
+ * Close the file descriptor given by \a sc, remove it from the close-on-fork
+ * list, destroy the chunk queue of this client, delete the client from the
+ * list of connected clients and free the sender_client struct.
+ */
+void shutdown_client(struct sender_client *sc)
+{
+       PARA_INFO_LOG("shutting down %s on fd %d\n", sc->name, sc->fd);
+       free(sc->name);
+       close(sc->fd);
+       del_close_on_fork_list(sc->fd);
+       cq_destroy(sc->cq);
+       list_del(&sc->node);
+       free(sc->private_data);
+       free(sc);
+}
+
+/**
+ * Write a buffer to a non-blocking file descriptor.
+ *
+ * \param fd The file descriptor.
+ * \param buf the buffer to write.
+ * \param len the number of bytes of \a buf.
+ * \param max_bytes_per_write Do not write more than that many bytes at once.
+ *
+ * If \a max_bytes_per_write is non-zero, do not send more than that many bytes
+ * per write().
+ *
+ * EAGAIN is not considered an error condition.  For example CCID3 has a
+ * sending wait queue which fills up and is emptied asynchronously. The EAGAIN
+ * case means that there is currently no space in the wait queue, but this can
+ * change at any moment.
+ *
+ * \return Negative on errors, number of bytes written else.
+ */
+static int write_nonblock(int fd, const char *buf, size_t len,
+               size_t max_bytes_per_write)
+{
+       size_t written = 0;
+       int ret = 0;
+
+       while (written < len) {
+               size_t num = len - written;
+
+               if (max_bytes_per_write && max_bytes_per_write < num)
+                       num = max_bytes_per_write;
+               ret = write(fd, buf + written, num);
+               if (ret < 0 && errno == EAGAIN)
+                       return written;
+               if (ret < 0)
+                       return -ERRNO_TO_PARA_ERROR(errno);
+               written += ret;
+       }
+       return written;
+}
+
+static int queue_chunk_or_shutdown(struct sender_client *sc,
+               long unsigned chunk_num, size_t sent)
+{
+       int ret = cq_enqueue(sc->cq, chunk_num, sent);
+       if (ret < 0)
+               shutdown_client(sc);
+       return ret;
+}
+
+/* return: negative on errors, zero if not everything was sent, one otherwise */
+static int send_queued_chunks(struct sender_client *sc,
+               size_t max_bytes_per_write)
+{
+       struct queued_chunk *qc;
+       while ((qc = cq_peek(sc->cq))) {
+               char *buf;
+               size_t len;
+               int ret;
+               cq_get(qc, &buf, &len);
+               ret = write_nonblock(sc->fd, buf, len, max_bytes_per_write);
+               if (ret < 0)
+                       return ret;
+               cq_update(sc->cq, ret);
+               if (ret != len)
+                       return 0;
+               cq_dequeue(sc->cq);
+       }
+       return 1;
+}
+
+/**
+ * Send one chunk of audio data to a connected client.
+ *
+ * \param sc The client.
+ * \param max_bytes_per_write Split writes to chunks of at most that many bytes.
+ * \param current_chunk The number of the chunk to write.
+ * \param buf The data to write.
+ * \param len The number of bytes of \a buf.
+ *
+ * On errors, the client is shut down. If only a part of the buffer could be
+ * written, the remainder is put into the chunk queue for that client.
+ */
+void send_chunk(struct sender_client *sc, size_t max_bytes_per_write,
+               long unsigned current_chunk, const char *buf, size_t len)
+{
+       int ret;
+
+       if (!sc->header_sent && current_chunk) {
+               size_t header_len;
+               char *header_buf = vss_get_header(&header_len);
+               if (header_buf && header_len > 0) {
+                       ret = queue_chunk_or_shutdown(sc, -1U, 0);
+                       if (ret < 0)
+                               goto out;
+               }
+               sc->header_sent = 1;
+       }
+       ret = send_queued_chunks(sc, max_bytes_per_write);
+       if (ret < 0) {
+               shutdown_client(sc);
+               goto out;
+       }
+       if (!len)
+               goto out;
+       if (!ret) { /* still data left in the queue */
+               ret = queue_chunk_or_shutdown(sc, current_chunk, 0);
+               goto out;
+       }
+       ret = write_nonblock(sc->fd, buf, len, max_bytes_per_write);
+       if (ret < 0) {
+               shutdown_client(sc);
+               goto out;
+       }
+       if (ret != len)
+               ret = queue_chunk_or_shutdown(sc, current_chunk, ret);
+out:
+       if (ret < 0)
+               PARA_NOTICE_LOG("%s\n", para_strerror(-ret));
+}