udp_send: Force chunk queuing.
authorAndre Noll <maan@systemlinux.org>
Tue, 6 Apr 2010 17:51:17 +0000 (19:51 +0200)
committerAndre Noll <maan@systemlinux.org>
Tue, 6 Apr 2010 17:51:17 +0000 (19:51 +0200)
This adds the public cq_force_enqueue() function to chunk_queue.c.
Unlike cq_enqueue() the new function does not return an error if the
queue is full, but drops old chunks from the queue in order to make
space for the new chunk.

The upd sender uses the new function to avoid kicking targets.

chunk_queue.c
chunk_queue.h
udp_send.c

index ad28190..5b102f2 100644 (file)
@@ -92,6 +92,34 @@ void cq_dequeue(struct chunk_queue *cq)
        free(qc);
 }
 
+/**
+ * Force to add a chunk to the given queue.
+ *
+ * \param cq See \ref cq_enqueue.
+ * \param buf See \ref cq_enqueue.
+ * \param num_bytes See \ref cq_enqueue.
+ *
+ * If queuing the given buffer would result in exceeding the maximal queue
+ * size, buffers are dropped from the beginning of the queue. Note that this
+ * function still might fail.
+ *
+ * \return Standard.
+ */
+int cq_force_enqueue(struct chunk_queue *cq, const char *buf, size_t num_bytes)
+{
+       int ret;
+
+       if (num_bytes > cq->max_pending)
+               return -E_QUEUE;
+       for (;;) {
+               ret = cq_enqueue(cq, buf, num_bytes);
+               if (ret >= 0)
+                       return ret;
+               cq_dequeue(cq);
+       }
+       /* never reached */
+}
+
 /**
  * Change the number of bytes sent for the current queued chunk.
  *
index 3c138eb..9e794ba 100644 (file)
@@ -16,3 +16,4 @@ void cq_update(struct chunk_queue *cq, size_t sent);
 int cq_get(struct queued_chunk *qc, const char **buf, size_t *len);
 struct chunk_queue *cq_new(size_t max_pending);
 void cq_destroy(struct chunk_queue *cq);
+int cq_force_enqueue(struct chunk_queue *cq, const char *buf, size_t num_bytes);
index 533af63..c84f416 100644 (file)
@@ -244,9 +244,8 @@ static int udp_send_fec(char *buf, size_t len, void *private_data)
        if (!len)
                return 0;
        if (!ret) { /* still data left in the queue */
-               ret = cq_enqueue(ut->cq, buf, len);
-               if (ret < 0)
-                       goto fail;
+               ret = cq_force_enqueue(ut->cq, buf, len);
+               assert(ret >= 0);
        }
        ret = write_nonblock(ut->fd, buf, len, 0);
        if (ret == -ERRNO_TO_PARA_ERROR(ECONNREFUSED))
@@ -254,9 +253,8 @@ static int udp_send_fec(char *buf, size_t len, void *private_data)
        if (ret < 0)
                goto fail;
        if (ret != len) {
-               ret = cq_enqueue(ut->cq, buf + ret, len - ret);
-               if (ret < 0)
-                       goto fail;
+               ret = cq_force_enqueue(ut->cq, buf + ret, len - ret);
+               assert(ret >= 0);
        }
        return 1;
 fail: