udp_send: Force chunk queuing.
authorAndre Noll <maan@systemlinux.org>
Tue, 6 Apr 2010 17:51:17 +0000 (19:51 +0200)
committerAndre Noll <maan@systemlinux.org>
Tue, 6 Apr 2010 17:51:17 +0000 (19:51 +0200)
This adds the public cq_force_enqueue() function to chunk_queue.c.
Unlike cq_enqueue() the new function does not return an error if the
queue is full, but drops old chunks from the queue in order to make
space for the new chunk.

The upd sender uses the new function to avoid kicking targets.

chunk_queue.c
chunk_queue.h
udp_send.c

index ad28190a163d32c4731316a3bd609f626f7871ec..5b102f286641d7b93418f3ae92e7dcdf6f71d0f6 100644 (file)
@@ -92,6 +92,34 @@ void cq_dequeue(struct chunk_queue *cq)
        free(qc);
 }
 
+/**
+ * Force to add a chunk to the given queue.
+ *
+ * \param cq See \ref cq_enqueue.
+ * \param buf See \ref cq_enqueue.
+ * \param num_bytes See \ref cq_enqueue.
+ *
+ * If queuing the given buffer would result in exceeding the maximal queue
+ * size, buffers are dropped from the beginning of the queue. Note that this
+ * function still might fail.
+ *
+ * \return Standard.
+ */
+int cq_force_enqueue(struct chunk_queue *cq, const char *buf, size_t num_bytes)
+{
+       int ret;
+
+       if (num_bytes > cq->max_pending)
+               return -E_QUEUE;
+       for (;;) {
+               ret = cq_enqueue(cq, buf, num_bytes);
+               if (ret >= 0)
+                       return ret;
+               cq_dequeue(cq);
+       }
+       /* never reached */
+}
+
 /**
  * Change the number of bytes sent for the current queued chunk.
  *
index 3c138eb50f487b299e176da766786bd97b35d147..9e794ba8e8afd524f4837bf9be7ee891ecd8f4e3 100644 (file)
@@ -16,3 +16,4 @@ void cq_update(struct chunk_queue *cq, size_t sent);
 int cq_get(struct queued_chunk *qc, const char **buf, size_t *len);
 struct chunk_queue *cq_new(size_t max_pending);
 void cq_destroy(struct chunk_queue *cq);
+int cq_force_enqueue(struct chunk_queue *cq, const char *buf, size_t num_bytes);
index 533af63a271f190718c45d8073f988b7971bf338..c84f416a31ed3e5071c28969b6bd5057d9da0534 100644 (file)
@@ -244,9 +244,8 @@ static int udp_send_fec(char *buf, size_t len, void *private_data)
        if (!len)
                return 0;
        if (!ret) { /* still data left in the queue */
-               ret = cq_enqueue(ut->cq, buf, len);
-               if (ret < 0)
-                       goto fail;
+               ret = cq_force_enqueue(ut->cq, buf, len);
+               assert(ret >= 0);
        }
        ret = write_nonblock(ut->fd, buf, len, 0);
        if (ret == -ERRNO_TO_PARA_ERROR(ECONNREFUSED))
@@ -254,9 +253,8 @@ static int udp_send_fec(char *buf, size_t len, void *private_data)
        if (ret < 0)
                goto fail;
        if (ret != len) {
-               ret = cq_enqueue(ut->cq, buf + ret, len - ret);
-               if (ret < 0)
-                       goto fail;
+               ret = cq_force_enqueue(ut->cq, buf + ret, len - ret);
+               assert(ret >= 0);
        }
        return 1;
 fail: