+/*
+ * Copyright (C) 2007 Andre Noll <maan@systemlinux.org>
+ *
+ * Licensed under the GPL v2. For licencing details see COPYING.
+ */
+
+/** \file chunk_queue.c Queuing functions for paraslash senders. */
+
+#include "para.h"
+#include "list.h"
+#include "vss.h"
+#include "string.h"
+#include "error.h"
+
+/**
+ * Senders may use the chunk queue facility to deal with laggy connections. It
+ * allows them to enqueue chunks if they can not be sent out immediately.
+ *
+ * Chunk queues are "cheap" in the sense that only reference to the audio file
+ * data is stored, but not the data itsself.
+ */
+struct chunk_queue {
+ /** The list of pending chunks for this client. */
+ struct list_head q;
+ /** The number of pending bytes for this client. */
+ unsigned long num_pending;
+ /** Enqueueing more than that many bytes is an error. */
+ unsigned long max_pending;
+};
+
+/** Describes one queued chunk in a chunk queue. */
+struct queued_chunk {
+ /** The number of the queued chunk, -1U means header. */
+ unsigned chunk_num;
+ /** The number of bytes already sent. */
+ unsigned sent;
+ /** Position of the chunk in the chunk queue. */
+ struct list_head node;
+};
+
+/**
+ * Add a chunk to the given queue.
+ *
+ * \param cq the queue to add the chunk to.
+ * \param chunk_num The number of the chunk to be queued.
+ * \param sent The number of bytes of this chunk that the sender was able to
+ * send.
+ *
+ * \return Positive on success, negative on errors.
+ */
+int cq_enqueue(struct chunk_queue *cq, long unsigned chunk_num,
+ size_t sent)
+{
+ struct queued_chunk *qc;
+ char *buf;
+ size_t len;
+ int ret;
+
+ if (chunk_num != -1U) {
+ ret = vss_get_chunk(chunk_num, &buf, &len);
+ if (ret < 0)
+ return ret;
+ } else
+ buf = vss_get_header(&len);
+ if (cq->num_pending + len > cq->max_pending)
+ return -E_QUEUE;
+ qc = para_malloc(sizeof(struct queued_chunk));
+ cq->num_pending += len;
+ qc->chunk_num = chunk_num;
+ qc->sent = sent;
+ list_add_tail(&qc->node, &cq->q);
+ PARA_DEBUG_LOG("%lu bytes queued for %p\n", cq->num_pending, &cq->q);
+ return 1;
+}
+
+/**
+ * Lookup the next chunk in the queue.
+ *
+ * \param cq The chunk queue.
+ *
+ * \return The next queued chunk, or \p NULL if there is no chunk awailable.
+ */
+struct queued_chunk *cq_peek(struct chunk_queue *cq)
+{
+ if (list_empty(&cq->q))
+ return NULL;
+ return list_entry(cq->q.next, struct queued_chunk, node);
+}
+
+/**
+ * Remove the current chunk from the queue.
+ *
+ * \param cq The chunk to remove.
+ */
+void cq_dequeue(struct chunk_queue *cq)
+{
+ struct queued_chunk *qc = cq_peek(cq);
+ assert(qc);
+ list_del(&qc->node);
+ free(qc);
+}
+
+/**
+ * Change the number of bytes send for the current queued chunk.
+ *
+ * \param cq The chunk queue.
+ * \param sent Number of bytes successfully sent.
+ */
+void cq_update(struct chunk_queue *cq, size_t sent)
+{
+ struct queued_chunk *qc = cq_peek(cq);
+ assert(qc);
+ qc->sent += sent;
+ cq->num_pending -= sent;
+}
+
+/**
+ * Get a pointer to the given queued chunk.
+ *
+ * \param qc The queued chunk.
+ * \param buf Result pointer.
+ * \param len Number of bytes of \a buf.
+ *
+ * \return Positive on success, negative on errors.
+ */
+int cq_get(struct queued_chunk *qc, char **buf, size_t *len)
+{
+ int ret;
+
+ if (qc->chunk_num != -1U) {
+ ret = vss_get_chunk(qc->chunk_num, buf, len);
+ if (ret < 0)
+ return ret;
+ } else
+ *buf = vss_get_header(len);
+ assert(*len > qc->sent);
+ *buf += qc->sent;
+ *len -= qc->sent;
+ return 1;
+}
+
+/**
+ * Allocate and initialize a chunk queue.
+ *
+ * \param max_pending Maximal number of bytes that will be queued.
+ *
+ * \return A pointer to the new queue.
+ */
+struct chunk_queue *cq_new(size_t max_pending)
+{
+ struct chunk_queue *cq = para_malloc(sizeof(*cq));
+ INIT_LIST_HEAD(&cq->q);
+ cq->max_pending = max_pending;
+ cq->num_pending = 0;
+ return cq;
+}
+
+/**
+ * Deallocate all resources of this queue.
+ *
+ * \param cq The chunk queue.
+ */
+void cq_destroy(struct chunk_queue *cq)
+{
+ struct queued_chunk *qc, *tmp;
+ list_for_each_entry_safe(qc, tmp, &cq->q, node) {
+ list_del(&qc->node);
+ free(qc);
+ }
+ free(cq);
+}