+static int cq_enqueue(struct chunk_queue *cq, long unsigned chunk_num,
+ size_t sent)
+{
+ struct queued_chunk *qc;
+ char *buf;
+ size_t len;
+ int ret;
+
+ if (chunk_num != -1U) {
+ ret = vss_get_chunk(chunk_num, &buf, &len);
+ if (ret < 0)
+ return ret;
+ } else
+ buf = vss_get_header(&len);
+ if (cq->num_pending + len > cq->max_pending)
+ return -E_QUEUE;
+ qc = para_malloc(sizeof(struct queued_chunk));
+ cq->num_pending += len;
+ qc->chunk_num = chunk_num;
+ qc->sent = sent;
+ list_add_tail(&qc->node, &cq->q);
+ PARA_DEBUG_LOG("%lu bytes queued for %p\n", cq->num_pending, &cq->q);
+ return 1;
+}
+
+static struct queued_chunk *cq_peek(struct chunk_queue *cq)
+{
+ if (list_empty(&cq->q))
+ return NULL;
+ return list_entry(cq->q.next, struct queued_chunk, node);
+}
+
+int cq_dequeue(struct chunk_queue *cq)
+{
+ struct queued_chunk *qc = cq_peek(cq);
+ assert(qc);
+ list_del(&qc->node);
+ free(qc);
+ return 1;
+}
+
+void cq_update(struct chunk_queue *cq, size_t sent)
+{
+ struct queued_chunk *qc = cq_peek(cq);
+ assert(qc);
+ qc->sent += sent;
+ cq->num_pending -= sent;
+}
+
+int cq_get(struct queued_chunk *qc, char **buf, size_t *len)
+{
+ int ret;
+
+ if (qc->chunk_num != -1U) {
+ ret = vss_get_chunk(qc->chunk_num, buf, len);
+ if (ret < 0)
+ return ret;
+ } else
+ *buf = vss_get_header(len);
+ assert(*len > qc->sent);
+ *buf += qc->sent;
+ *len -= qc->sent;
+ return 1;
+}
+
+void cq_init(struct chunk_queue *cq, size_t max_pending)
+{
+ INIT_LIST_HEAD(&cq->q);
+ cq->max_pending = max_pending;
+ cq->num_pending = 0;
+}
+
+void cq_destroy(struct chunk_queue *cq)
+{
+ struct queued_chunk *qc, *tmp;
+ list_for_each_entry_safe(qc, tmp, &cq->q, node) {
+ list_del(&qc->node);
+ free(qc);
+ }
+}
+