/**
* Remove the current chunk from the queue.
*
- * \param cq The chunk to remove.
+ * \param cq The queue to remove from.
*/
void cq_dequeue(struct chunk_queue *cq)
{
struct queued_chunk *qc = cq_peek(cq);
assert(qc);
+ assert(cq->num_pending >= qc->num_bytes);
+ cq->num_pending -= qc->num_bytes;
list_del(&qc->node);
free(qc);
}
+ /**
+ * Force to add a chunk to the given queue.
+ *
+ * \param cq See \ref cq_enqueue.
+ * \param buf See \ref cq_enqueue.
+ * \param num_bytes See \ref cq_enqueue.
+ *
+ * If queuing the given buffer would result in exceeding the maximal queue
+ * size, buffers are dropped from the beginning of the queue. Note that this
+ * function still might fail.
+ *
+ * \return Standard.
+ */
+ int cq_force_enqueue(struct chunk_queue *cq, const char *buf, size_t num_bytes)
+ {
+ int ret;
+
+ if (num_bytes > cq->max_pending)
+ return -E_QUEUE;
+ for (;;) {
+ ret = cq_enqueue(cq, buf, num_bytes);
+ if (ret >= 0)
+ return ret;
+ cq_dequeue(cq);
+ }
+ /* never reached */
+ }
+
/**
* Change the number of bytes sent for the current queued chunk.
*
static int udp_send_fec(char *buf, size_t len, void *private_data)
{
struct udp_target *ut = private_data;
- int ret = udp_init_session(ut);
+ int ret;
+ if (sender_status == SENDER_OFF)
+ return 0;
+ ret = udp_init_session(ut);
if (ret < 0)
goto fail;
ret = send_queued_chunks(ut->fd, ut->cq, 0);
+ if (ret == -ERRNO_TO_PARA_ERROR(ECONNREFUSED))
+ ret = 0;
if (ret < 0)
goto fail;
if (!len)
return 0;
if (!ret) { /* still data left in the queue */
- ret = cq_enqueue(ut->cq, buf, len);
- if (ret < 0)
- goto fail;
+ ret = cq_force_enqueue(ut->cq, buf, len);
+ assert(ret >= 0);
}
ret = write_nonblock(ut->fd, buf, len, 0);
+ if (ret == -ERRNO_TO_PARA_ERROR(ECONNREFUSED))
+ ret = 0;
if (ret < 0)
goto fail;
if (ret != len) {
- ret = cq_enqueue(ut->cq, buf + ret, len - ret);
- if (ret < 0)
- goto fail;
+ ret = cq_force_enqueue(ut->cq, buf + ret, len - ret);
+ assert(ret >= 0);
}
return 1;
fail: