]> git.tuebingen.mpg.de Git - paraslash.git/commitdiff
Merge branch 't/udp_robustness'
authorAndre Noll <maan@systemlinux.org>
Mon, 12 Apr 2010 16:21:23 +0000 (18:21 +0200)
committerAndre Noll <maan@systemlinux.org>
Mon, 12 Apr 2010 16:21:23 +0000 (18:21 +0200)
1  2 
chunk_queue.c
udp_send.c

diff --combined chunk_queue.c
index e8ea07d93ff1cc10cfe1888ce283630505642a24,5b102f286641d7b93418f3ae92e7dcdf6f71d0f6..3f5ac1d929262c116f6c7822f67f0ceb8cf1e7f6
@@@ -82,18 -82,44 +82,46 @@@ struct queued_chunk *cq_peek(struct chu
  /**
   * Remove the current chunk from the queue.
   *
 - * \param cq The chunk to remove.
 + * \param cq The queue to remove from.
   */
  void cq_dequeue(struct chunk_queue *cq)
  {
        struct queued_chunk *qc = cq_peek(cq);
        assert(qc);
 +      assert(cq->num_pending >= qc->num_bytes);
 +      cq->num_pending -= qc->num_bytes;
        list_del(&qc->node);
        free(qc);
  }
  
+ /**
+  * Force to add a chunk to the given queue.
+  *
+  * \param cq See \ref cq_enqueue.
+  * \param buf See \ref cq_enqueue.
+  * \param num_bytes See \ref cq_enqueue.
+  *
+  * If queuing the given buffer would result in exceeding the maximal queue
+  * size, buffers are dropped from the beginning of the queue. Note that this
+  * function still might fail.
+  *
+  * \return Standard.
+  */
+ int cq_force_enqueue(struct chunk_queue *cq, const char *buf, size_t num_bytes)
+ {
+       int ret;
+       if (num_bytes > cq->max_pending)
+               return -E_QUEUE;
+       for (;;) {
+               ret = cq_enqueue(cq, buf, num_bytes);
+               if (ret >= 0)
+                       return ret;
+               cq_dequeue(cq);
+       }
+       /* never reached */
+ }
  /**
   * Change the number of bytes sent for the current queued chunk.
   *
diff --combined udp_send.c
index 3f9059bb0b03f81c50ec2ef722e255effd41b1ce,c84f416a31ed3e5071c28969b6bd5057d9da0534..fc3bb2f5c5f657a45112cf2db3856407c8c0c611
@@@ -232,30 -232,29 +232,32 @@@ static int udp_com_delete(struct sender
  static int udp_send_fec(char *buf, size_t len, void *private_data)
  {
        struct udp_target *ut = private_data;
 -      int ret = udp_init_session(ut);
 +      int ret;
  
 +      if (sender_status == SENDER_OFF)
 +              return 0;
 +      ret = udp_init_session(ut);
        if (ret < 0)
                goto fail;
        ret = send_queued_chunks(ut->fd, ut->cq, 0);
+       if (ret == -ERRNO_TO_PARA_ERROR(ECONNREFUSED))
+               ret = 0;
        if (ret < 0)
                goto fail;
        if (!len)
                return 0;
        if (!ret) { /* still data left in the queue */
-               ret = cq_enqueue(ut->cq, buf, len);
-               if (ret < 0)
-                       goto fail;
+               ret = cq_force_enqueue(ut->cq, buf, len);
+               assert(ret >= 0);
        }
        ret = write_nonblock(ut->fd, buf, len, 0);
+       if (ret == -ERRNO_TO_PARA_ERROR(ECONNREFUSED))
+               ret = 0;
        if (ret < 0)
                goto fail;
        if (ret != len) {
-               ret = cq_enqueue(ut->cq, buf + ret, len - ret);
-               if (ret < 0)
-                       goto fail;
+               ret = cq_force_enqueue(ut->cq, buf + ret, len - ret);
+               assert(ret >= 0);
        }
        return 1;
  fail: