]> git.tuebingen.mpg.de Git - paraslash.git/blob - chunk_queue.c
chunk queue: Fix chunk dequeuing.
[paraslash.git] / chunk_queue.c
1 /*
2  * Copyright (C) 2007-2009 Andre Noll <maan@systemlinux.org>
3  *
4  * Licensed under the GPL v2. For licencing details see COPYING.
5  */
6
7 /** \file chunk_queue.c Queuing functions for paraslash senders. */
8
9 #include "para.h"
10 #include "list.h"
11 #include "afh.h"
12 #include "vss.h"
13 #include "string.h"
14 #include "error.h"
15
16 /**
17  * Senders may use the chunk queue facility to deal with laggy connections.  It
18  * allows them to enqueue chunks if they can not be sent out immediately.
19  *
20  * Chunk queues are "cheap" in the sense that only reference to the audio file
21  * data is stored, but not the data itself.
22  */
23 struct chunk_queue {
24         /** The list of pending chunks for this client. */
25         struct list_head q;
26         /** The number of pending bytes for this client. */
27         unsigned long num_pending;
28         /** More than that many bytes in the queue is considered an error. */
29         unsigned long max_pending;
30 };
31
32 /** Describes one queued chunk in a chunk queue. */
33 struct queued_chunk {
34         /** Pointer to the data to be queued. */
35         const char *buf;
36         /** The number of bytes of this chunk. */
37         size_t num_bytes;
38         /** Position of the chunk in the chunk queue. */
39         struct list_head node;
40 };
41
42 /**
43  * Add a chunk to the given queue.
44  *
45  * \param cq the queue to add the chunk to.
46  * \param buf Pointer to the data to be queued.
47  * \param num_bytes The size of \a buf.
48  *
49  * \return Standard.
50  */
51 int cq_enqueue(struct chunk_queue *cq, const char *buf, size_t num_bytes)
52 {
53         struct queued_chunk *qc;
54
55         if (cq->num_pending + num_bytes > cq->max_pending)
56                 return -E_QUEUE;
57         qc = para_malloc(sizeof(struct queued_chunk));
58         cq->num_pending += num_bytes;
59         qc->buf = buf;
60         qc->num_bytes = num_bytes;
61         list_add_tail(&qc->node, &cq->q);
62         PARA_DEBUG_LOG("%lu bytes queued for %p\n", cq->num_pending, &cq->q);
63         return 1;
64 }
65
66 /**
67  * Lookup the next chunk in the queue.
68  *
69  * \param cq The chunk queue.
70  *
71  * \return The next queued chunk, or \p NULL if there is no chunk available.
72  */
73 struct queued_chunk *cq_peek(struct chunk_queue *cq)
74 {
75         if (list_empty(&cq->q))
76                 return NULL;
77         return list_entry(cq->q.next, struct queued_chunk, node);
78 }
79
80 /**
81  * Remove the current chunk from the queue.
82  *
83  * \param cq The queue to remove from.
84  */
85 void cq_dequeue(struct chunk_queue *cq)
86 {
87         struct queued_chunk *qc = cq_peek(cq);
88         assert(qc);
89         assert(cq->num_pending >= qc->num_bytes);
90         cq->num_pending -= qc->num_bytes;
91         list_del(&qc->node);
92         free(qc);
93 }
94
95 /**
96  * Change the number of bytes sent for the current queued chunk.
97  *
98  * \param cq The chunk queue.
99  * \param sent Number of bytes successfully sent.
100  */
101 void cq_update(struct chunk_queue *cq, size_t sent)
102 {
103         struct queued_chunk *qc = cq_peek(cq);
104         assert(qc);
105         qc->num_bytes -= sent;
106         qc->buf += sent;
107         cq->num_pending -= sent;
108 }
109
110 /**
111  * Get a pointer to the given queued chunk.
112  *
113  * \param qc The queued chunk.
114  * \param buf Result pointer.
115  * \param num_bytes Number of bytes of \a buf.
116  *
117  * \return Positive on success, negative on errors.
118  */
119 int cq_get(struct queued_chunk *qc, const char **buf, size_t *num_bytes)
120 {
121         *buf = qc->buf;
122         *num_bytes = qc->num_bytes;
123         return 1;
124 }
125
126 /**
127  * Allocate and initialize a chunk queue.
128  *
129  * \param max_pending Maximal number of bytes that will be queued.
130  *
131  * \return A pointer to the new queue.
132  */
133 struct chunk_queue *cq_new(size_t max_pending)
134 {
135         struct chunk_queue *cq = para_malloc(sizeof(*cq));
136         INIT_LIST_HEAD(&cq->q);
137         cq->max_pending = max_pending;
138         cq->num_pending = 0;
139         return cq;
140 }
141
142 /**
143  * Deallocate all resources of this queue.
144  *
145  * \param cq The chunk queue.
146  */
147 void cq_destroy(struct chunk_queue *cq)
148 {
149         struct queued_chunk *qc, *tmp;
150         list_for_each_entry_safe(qc, tmp, &cq->q, node) {
151                 list_del(&qc->node);
152                 free(qc);
153         }
154         free(cq);
155 }