/*
- * Copyright (C) 2007 Andre Noll <maan@systemlinux.org>
+ * Copyright (C) 2007-2010 Andre Noll <maan@systemlinux.org>
*
* Licensed under the GPL v2. For licencing details see COPYING.
*/
/** \file chunk_queue.c Queuing functions for paraslash senders. */
+#include <regex.h>
+
#include "para.h"
#include "list.h"
#include "afh.h"
-#include "vss.h"
#include "string.h"
#include "error.h"
* allows them to enqueue chunks if they can not be sent out immediately.
*
* Chunk queues are "cheap" in the sense that only reference to the audio file
- * data is stored, but not the data itsself.
+ * data is stored, but not the data itself.
*/
struct chunk_queue {
/** The list of pending chunks for this client. */
struct list_head q;
/** The number of pending bytes for this client. */
unsigned long num_pending;
- /** Enqueueing more than that many bytes is an error. */
+ /** More than that many bytes in the queue is considered an error. */
unsigned long max_pending;
};
/** Describes one queued chunk in a chunk queue. */
struct queued_chunk {
- /** The number of the queued chunk, -1U means header. */
- unsigned chunk_num;
- /** The number of bytes already sent. */
- unsigned sent;
+ /** Pointer to the data to be queued. */
+ const char *buf;
+ /** The number of bytes of this chunk. */
+ size_t num_bytes;
/** Position of the chunk in the chunk queue. */
struct list_head node;
};
* Add a chunk to the given queue.
*
* \param cq the queue to add the chunk to.
- * \param chunk_num The number of the chunk to be queued.
- * \param sent The number of bytes of this chunk that the sender was able to
- * send.
+ * \param buf Pointer to the data to be queued.
+ * \param num_bytes The size of \a buf.
*
- * \return Positive on success, negative on errors.
+ * \return Standard.
*/
-int cq_enqueue(struct chunk_queue *cq, long unsigned chunk_num,
- size_t sent)
+int cq_enqueue(struct chunk_queue *cq, const char *buf, size_t num_bytes)
{
struct queued_chunk *qc;
- char *buf;
- size_t len;
- int ret;
- if (chunk_num != -1U) {
- ret = vss_get_chunk(chunk_num, &buf, &len);
- if (ret < 0)
- return ret;
- } else
- buf = vss_get_header(&len);
- if (cq->num_pending + len > cq->max_pending)
+ if (cq->num_pending + num_bytes > cq->max_pending)
return -E_QUEUE;
qc = para_malloc(sizeof(struct queued_chunk));
- cq->num_pending += len;
- qc->chunk_num = chunk_num;
- qc->sent = sent;
+ cq->num_pending += num_bytes;
+ qc->buf = buf;
+ qc->num_bytes = num_bytes;
list_add_tail(&qc->node, &cq->q);
PARA_DEBUG_LOG("%lu bytes queued for %p\n", cq->num_pending, &cq->q);
return 1;
*
* \param cq The chunk queue.
*
- * \return The next queued chunk, or \p NULL if there is no chunk awailable.
+ * \return The next queued chunk, or \p NULL if there is no chunk available.
*/
struct queued_chunk *cq_peek(struct chunk_queue *cq)
{
/**
* Remove the current chunk from the queue.
*
- * \param cq The chunk to remove.
+ * \param cq The queue to remove from.
*/
void cq_dequeue(struct chunk_queue *cq)
{
struct queued_chunk *qc = cq_peek(cq);
assert(qc);
+ assert(cq->num_pending >= qc->num_bytes);
+ cq->num_pending -= qc->num_bytes;
list_del(&qc->node);
free(qc);
}
/**
- * Change the number of bytes send for the current queued chunk.
+ * Force to add a chunk to the given queue.
+ *
+ * \param cq See \ref cq_enqueue.
+ * \param buf See \ref cq_enqueue.
+ * \param num_bytes See \ref cq_enqueue.
+ *
+ * If queuing the given buffer would result in exceeding the maximal queue
+ * size, buffers are dropped from the beginning of the queue. Note that this
+ * function still might fail.
+ *
+ * \return Standard.
+ */
+int cq_force_enqueue(struct chunk_queue *cq, const char *buf, size_t num_bytes)
+{
+ int ret;
+
+ if (num_bytes > cq->max_pending)
+ return -E_QUEUE;
+ for (;;) {
+ ret = cq_enqueue(cq, buf, num_bytes);
+ if (ret >= 0)
+ return ret;
+ cq_dequeue(cq);
+ }
+ /* never reached */
+}
+
+/**
+ * Change the number of bytes sent for the current queued chunk.
*
* \param cq The chunk queue.
* \param sent Number of bytes successfully sent.
{
struct queued_chunk *qc = cq_peek(cq);
assert(qc);
- qc->sent += sent;
+ qc->num_bytes -= sent;
+ qc->buf += sent;
cq->num_pending -= sent;
}
*
* \param qc The queued chunk.
* \param buf Result pointer.
- * \param len Number of bytes of \a buf.
+ * \param num_bytes Number of bytes of \a buf.
*
* \return Positive on success, negative on errors.
*/
-int cq_get(struct queued_chunk *qc, char **buf, size_t *len)
+int cq_get(struct queued_chunk *qc, const char **buf, size_t *num_bytes)
{
- int ret;
-
- if (qc->chunk_num != -1U) {
- ret = vss_get_chunk(qc->chunk_num, buf, len);
- if (ret < 0)
- return ret;
- } else
- *buf = vss_get_header(len);
- assert(*len > qc->sent);
- *buf += qc->sent;
- *len -= qc->sent;
+ *buf = qc->buf;
+ *num_bytes = qc->num_bytes;
return 1;
}