From: Andre Noll Date: Mon, 12 Apr 2010 16:21:23 +0000 (+0200) Subject: Merge branch 't/udp_robustness' X-Git-Tag: v0.4.2~11 X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=commitdiff_plain;h=ce20ee3c755b47f753f122b0fb58a481a0a9d7b9;hp=-c Merge branch 't/udp_robustness' --- ce20ee3c755b47f753f122b0fb58a481a0a9d7b9 diff --combined chunk_queue.c index e8ea07d9,5b102f28..3f5ac1d9 --- a/chunk_queue.c +++ b/chunk_queue.c @@@ -82,18 -82,44 +82,46 @@@ struct queued_chunk *cq_peek(struct chu /** * Remove the current chunk from the queue. * - * \param cq The chunk to remove. + * \param cq The queue to remove from. */ void cq_dequeue(struct chunk_queue *cq) { struct queued_chunk *qc = cq_peek(cq); assert(qc); + assert(cq->num_pending >= qc->num_bytes); + cq->num_pending -= qc->num_bytes; list_del(&qc->node); free(qc); } + /** + * Force to add a chunk to the given queue. + * + * \param cq See \ref cq_enqueue. + * \param buf See \ref cq_enqueue. + * \param num_bytes See \ref cq_enqueue. + * + * If queuing the given buffer would result in exceeding the maximal queue + * size, buffers are dropped from the beginning of the queue. Note that this + * function still might fail. + * + * \return Standard. + */ + int cq_force_enqueue(struct chunk_queue *cq, const char *buf, size_t num_bytes) + { + int ret; + + if (num_bytes > cq->max_pending) + return -E_QUEUE; + for (;;) { + ret = cq_enqueue(cq, buf, num_bytes); + if (ret >= 0) + return ret; + cq_dequeue(cq); + } + /* never reached */ + } + /** * Change the number of bytes sent for the current queued chunk. * diff --combined udp_send.c index 3f9059bb,c84f416a..fc3bb2f5 --- a/udp_send.c +++ b/udp_send.c @@@ -232,30 -232,29 +232,32 @@@ static int udp_com_delete(struct sender static int udp_send_fec(char *buf, size_t len, void *private_data) { struct udp_target *ut = private_data; - int ret = udp_init_session(ut); + int ret; + if (sender_status == SENDER_OFF) + return 0; + ret = udp_init_session(ut); if (ret < 0) goto fail; ret = send_queued_chunks(ut->fd, ut->cq, 0); + if (ret == -ERRNO_TO_PARA_ERROR(ECONNREFUSED)) + ret = 0; if (ret < 0) goto fail; if (!len) return 0; if (!ret) { /* still data left in the queue */ - ret = cq_enqueue(ut->cq, buf, len); - if (ret < 0) - goto fail; + ret = cq_force_enqueue(ut->cq, buf, len); + assert(ret >= 0); } ret = write_nonblock(ut->fd, buf, len, 0); + if (ret == -ERRNO_TO_PARA_ERROR(ECONNREFUSED)) + ret = 0; if (ret < 0) goto fail; if (ret != len) { - ret = cq_enqueue(ut->cq, buf + ret, len - ret); - if (ret < 0) - goto fail; + ret = cq_force_enqueue(ut->cq, buf + ret, len - ret); + assert(ret >= 0); } return 1; fail: