Introduce the new nonblock API.
[paraslash.git] / chunk_queue.c
index 17d70eace1d1ee1c9a4b3ae17bd80c562c1dc212..3f5ac1d929262c116f6c7822f67f0ceb8cf1e7f6 100644 (file)
@@ -1,11 +1,13 @@
 /*
- * Copyright (C) 2007 Andre Noll <maan@systemlinux.org>
+ * Copyright (C) 2007-2010 Andre Noll <maan@systemlinux.org>
  *
  * Licensed under the GPL v2. For licencing details see COPYING.
  */
 
 /** \file chunk_queue.c Queuing functions for paraslash senders. */
 
+#include <regex.h>
+
 #include "para.h"
 #include "list.h"
 #include "afh.h"
  * allows them to enqueue chunks if they can not be sent out immediately.
  *
  * Chunk queues are "cheap" in the sense that only reference to the audio file
- * data is stored, but not the data itsself.
+ * data is stored, but not the data itself.
  */
 struct chunk_queue {
        /** The list of pending chunks for this client. */
        struct list_head q;
        /** The number of pending bytes for this client. */
        unsigned long num_pending;
-       /** Enqueueing more than that many bytes is an error. */
+       /** More than that many bytes in the queue is considered an error. */
        unsigned long max_pending;
 };
 
 /** Describes one queued chunk in a chunk queue. */
 struct queued_chunk {
-       /** The number of the queued chunk, -1U means header. */
-       unsigned chunk_num;
-       /** The number of bytes already sent. */
-       unsigned sent;
+       /** Pointer to the data to be queued. */
+       const char *buf;
+       /** The number of bytes of this chunk. */
+       size_t num_bytes;
        /** Position of the chunk in the chunk queue. */
        struct list_head node;
 };
@@ -43,32 +45,21 @@ struct queued_chunk {
  * Add a chunk to the given queue.
  *
  * \param cq the queue to add the chunk to.
- * \param chunk_num The number of the chunk to be queued.
- * \param sent The number of bytes of this chunk that the sender was able to
- * send.
+ * \param buf Pointer to the data to be queued.
+ * \param num_bytes The size of \a buf.
  *
- * \return Positive on success, negative on errors.
+ * \return Standard.
  */
-int cq_enqueue(struct chunk_queue *cq, long unsigned chunk_num,
-               size_t sent)
+int cq_enqueue(struct chunk_queue *cq, const char *buf, size_t num_bytes)
 {
        struct queued_chunk *qc;
-       char *buf;
-       size_t len;
-       int ret;
 
-       if (chunk_num != -1U) {
-               ret = vss_get_chunk(chunk_num, &buf, &len);
-               if (ret < 0)
-                       return ret;
-       } else
-               buf = vss_get_header(&len);
-       if (cq->num_pending + len > cq->max_pending)
+       if (cq->num_pending + num_bytes > cq->max_pending)
                return -E_QUEUE;
        qc = para_malloc(sizeof(struct queued_chunk));
-       cq->num_pending += len;
-       qc->chunk_num = chunk_num;
-       qc->sent = sent;
+       cq->num_pending += num_bytes;
+       qc->buf = buf;
+       qc->num_bytes = num_bytes;
        list_add_tail(&qc->node, &cq->q);
        PARA_DEBUG_LOG("%lu bytes queued for %p\n", cq->num_pending, &cq->q);
        return 1;
@@ -79,7 +70,7 @@ int cq_enqueue(struct chunk_queue *cq, long unsigned chunk_num,
  *
  * \param cq The chunk queue.
  *
- * \return The next queued chunk, or \p NULL if there is no chunk awailable.
+ * \return The next queued chunk, or \p NULL if there is no chunk available.
  */
 struct queued_chunk *cq_peek(struct chunk_queue *cq)
 {
@@ -91,18 +82,48 @@ struct queued_chunk *cq_peek(struct chunk_queue *cq)
 /**
  * Remove the current chunk from the queue.
  *
- * \param cq The chunk to remove.
+ * \param cq The queue to remove from.
  */
 void cq_dequeue(struct chunk_queue *cq)
 {
        struct queued_chunk *qc = cq_peek(cq);
        assert(qc);
+       assert(cq->num_pending >= qc->num_bytes);
+       cq->num_pending -= qc->num_bytes;
        list_del(&qc->node);
        free(qc);
 }
 
 /**
- * Change the number of bytes send for the current queued chunk.
+ * Force to add a chunk to the given queue.
+ *
+ * \param cq See \ref cq_enqueue.
+ * \param buf See \ref cq_enqueue.
+ * \param num_bytes See \ref cq_enqueue.
+ *
+ * If queuing the given buffer would result in exceeding the maximal queue
+ * size, buffers are dropped from the beginning of the queue. Note that this
+ * function still might fail.
+ *
+ * \return Standard.
+ */
+int cq_force_enqueue(struct chunk_queue *cq, const char *buf, size_t num_bytes)
+{
+       int ret;
+
+       if (num_bytes > cq->max_pending)
+               return -E_QUEUE;
+       for (;;) {
+               ret = cq_enqueue(cq, buf, num_bytes);
+               if (ret >= 0)
+                       return ret;
+               cq_dequeue(cq);
+       }
+       /* never reached */
+}
+
+/**
+ * Change the number of bytes sent for the current queued chunk.
  *
  * \param cq The chunk queue.
  * \param sent Number of bytes successfully sent.
@@ -111,7 +132,8 @@ void cq_update(struct chunk_queue *cq, size_t sent)
 {
        struct queued_chunk *qc = cq_peek(cq);
        assert(qc);
-       qc->sent += sent;
+       qc->num_bytes -= sent;
+       qc->buf += sent;
        cq->num_pending -= sent;
 }
 
@@ -120,23 +142,14 @@ void cq_update(struct chunk_queue *cq, size_t sent)
  *
  * \param qc The queued chunk.
  * \param buf Result pointer.
- * \param len Number of bytes of \a buf.
+ * \param num_bytes Number of bytes of \a buf.
  *
  * \return Positive on success, negative on errors.
  */
-int cq_get(struct queued_chunk *qc, char **buf, size_t *len)
+int cq_get(struct queued_chunk *qc, const char **buf, size_t *num_bytes)
 {
-       int ret;
-
-       if (qc->chunk_num != -1U) {
-               ret = vss_get_chunk(qc->chunk_num, buf, len);
-               if (ret < 0)
-                       return ret;
-       } else
-               *buf = vss_get_header(len);
-       assert(*len > qc->sent);
-       *buf += qc->sent;
-       *len -= qc->sent;
+       *buf = qc->buf;
+       *num_bytes = qc->num_bytes;
        return 1;
 }