-static int cq_enqueue(struct chunk_queue *cq, long unsigned chunk_num,
- size_t sent)
-{
- struct queued_chunk *qc;
- char *buf;
- size_t len;
- int ret;
-
- if (chunk_num != -1U) {
- ret = vss_get_chunk(chunk_num, &buf, &len);
- if (ret < 0)
- return ret;
- } else
- buf = vss_get_header(&len);
- if (cq->num_pending + len > cq->max_pending)
- return -E_QUEUE;
- qc = para_malloc(sizeof(struct queued_chunk));
- cq->num_pending += len;
- qc->chunk_num = chunk_num;
- qc->sent = sent;
- list_add_tail(&qc->node, &cq->q);
- PARA_DEBUG_LOG("%lu bytes queued for %p\n", cq->num_pending, &cq->q);
- return 1;
-}
-
-static struct queued_chunk *cq_peek(struct chunk_queue *cq)
-{
- if (list_empty(&cq->q))
- return NULL;
- return list_entry(cq->q.next, struct queued_chunk, node);
-}
-
-int cq_dequeue(struct chunk_queue *cq)
-{
- struct queued_chunk *qc = cq_peek(cq);
- assert(qc);
- list_del(&qc->node);
- free(qc);
- return 1;
-}
-
-void cq_update(struct chunk_queue *cq, size_t sent)
-{
- struct queued_chunk *qc = cq_peek(cq);
- assert(qc);
- qc->sent += sent;
- cq->num_pending -= sent;
-}
-
-int cq_get(struct queued_chunk *qc, char **buf, size_t *len)
-{
- int ret;
-
- if (qc->chunk_num != -1U) {
- ret = vss_get_chunk(qc->chunk_num, buf, len);
- if (ret < 0)
- return ret;
- } else
- *buf = vss_get_header(len);
- assert(*len > qc->sent);
- *buf += qc->sent;
- *len -= qc->sent;
- return 1;
-}
-
-void cq_init(struct chunk_queue *cq, size_t max_pending)
-{
- INIT_LIST_HEAD(&cq->q);
- cq->max_pending = max_pending;
- cq->num_pending = 0;
-}
-
-void cq_destroy(struct chunk_queue *cq)
-{
- struct queued_chunk *qc, *tmp;
- list_for_each_entry_safe(qc, tmp, &cq->q, node) {
- list_del(&qc->node);
- free(qc);
- }
-}
-