]> git.tuebingen.mpg.de Git - paraslash.git/commitdiff
Merge branch 't/udp_robustness'
authorAndre Noll <maan@systemlinux.org>
Mon, 12 Apr 2010 16:21:23 +0000 (18:21 +0200)
committerAndre Noll <maan@systemlinux.org>
Mon, 12 Apr 2010 16:21:23 +0000 (18:21 +0200)
chunk_queue.c
chunk_queue.h
udp_send.c

index e8ea07d93ff1cc10cfe1888ce283630505642a24..3f5ac1d929262c116f6c7822f67f0ceb8cf1e7f6 100644 (file)
@@ -94,6 +94,34 @@ void cq_dequeue(struct chunk_queue *cq)
        free(qc);
 }
 
+/**
+ * Force to add a chunk to the given queue.
+ *
+ * \param cq See \ref cq_enqueue.
+ * \param buf See \ref cq_enqueue.
+ * \param num_bytes See \ref cq_enqueue.
+ *
+ * If queuing the given buffer would result in exceeding the maximal queue
+ * size, buffers are dropped from the beginning of the queue. Note that this
+ * function still might fail.
+ *
+ * \return Standard.
+ */
+int cq_force_enqueue(struct chunk_queue *cq, const char *buf, size_t num_bytes)
+{
+       int ret;
+
+       if (num_bytes > cq->max_pending)
+               return -E_QUEUE;
+       for (;;) {
+               ret = cq_enqueue(cq, buf, num_bytes);
+               if (ret >= 0)
+                       return ret;
+               cq_dequeue(cq);
+       }
+       /* never reached */
+}
+
 /**
  * Change the number of bytes sent for the current queued chunk.
  *
index 3c138eb50f487b299e176da766786bd97b35d147..9e794ba8e8afd524f4837bf9be7ee891ecd8f4e3 100644 (file)
@@ -16,3 +16,4 @@ void cq_update(struct chunk_queue *cq, size_t sent);
 int cq_get(struct queued_chunk *qc, const char **buf, size_t *len);
 struct chunk_queue *cq_new(size_t max_pending);
 void cq_destroy(struct chunk_queue *cq);
+int cq_force_enqueue(struct chunk_queue *cq, const char *buf, size_t num_bytes);
index 3f9059bb0b03f81c50ec2ef722e255effd41b1ce..fc3bb2f5c5f657a45112cf2db3856407c8c0c611 100644 (file)
@@ -240,22 +240,24 @@ static int udp_send_fec(char *buf, size_t len, void *private_data)
        if (ret < 0)
                goto fail;
        ret = send_queued_chunks(ut->fd, ut->cq, 0);
+       if (ret == -ERRNO_TO_PARA_ERROR(ECONNREFUSED))
+               ret = 0;
        if (ret < 0)
                goto fail;
        if (!len)
                return 0;
        if (!ret) { /* still data left in the queue */
-               ret = cq_enqueue(ut->cq, buf, len);
-               if (ret < 0)
-                       goto fail;
+               ret = cq_force_enqueue(ut->cq, buf, len);
+               assert(ret >= 0);
        }
        ret = write_nonblock(ut->fd, buf, len, 0);
+       if (ret == -ERRNO_TO_PARA_ERROR(ECONNREFUSED))
+               ret = 0;
        if (ret < 0)
                goto fail;
        if (ret != len) {
-               ret = cq_enqueue(ut->cq, buf + ret, len - ret);
-               if (ret < 0)
-                       goto fail;
+               ret = cq_force_enqueue(ut->cq, buf + ret, len - ret);
+               assert(ret >= 0);
        }
        return 1;
 fail: