Introduce the new nonblock API.
[paraslash.git] / chunk_queue.c
1 /*
2 * Copyright (C) 2007-2010 Andre Noll <maan@systemlinux.org>
3 *
4 * Licensed under the GPL v2. For licencing details see COPYING.
5 */
6
7 /** \file chunk_queue.c Queuing functions for paraslash senders. */
8
9 #include <regex.h>
10
11 #include "para.h"
12 #include "list.h"
13 #include "afh.h"
14 #include "vss.h"
15 #include "string.h"
16 #include "error.h"
17
18 /**
19 * Senders may use the chunk queue facility to deal with laggy connections. It
20 * allows them to enqueue chunks if they can not be sent out immediately.
21 *
22 * Chunk queues are "cheap" in the sense that only reference to the audio file
23 * data is stored, but not the data itself.
24 */
25 struct chunk_queue {
26 /** The list of pending chunks for this client. */
27 struct list_head q;
28 /** The number of pending bytes for this client. */
29 unsigned long num_pending;
30 /** More than that many bytes in the queue is considered an error. */
31 unsigned long max_pending;
32 };
33
34 /** Describes one queued chunk in a chunk queue. */
35 struct queued_chunk {
36 /** Pointer to the data to be queued. */
37 const char *buf;
38 /** The number of bytes of this chunk. */
39 size_t num_bytes;
40 /** Position of the chunk in the chunk queue. */
41 struct list_head node;
42 };
43
44 /**
45 * Add a chunk to the given queue.
46 *
47 * \param cq the queue to add the chunk to.
48 * \param buf Pointer to the data to be queued.
49 * \param num_bytes The size of \a buf.
50 *
51 * \return Standard.
52 */
53 int cq_enqueue(struct chunk_queue *cq, const char *buf, size_t num_bytes)
54 {
55 struct queued_chunk *qc;
56
57 if (cq->num_pending + num_bytes > cq->max_pending)
58 return -E_QUEUE;
59 qc = para_malloc(sizeof(struct queued_chunk));
60 cq->num_pending += num_bytes;
61 qc->buf = buf;
62 qc->num_bytes = num_bytes;
63 list_add_tail(&qc->node, &cq->q);
64 PARA_DEBUG_LOG("%lu bytes queued for %p\n", cq->num_pending, &cq->q);
65 return 1;
66 }
67
68 /**
69 * Lookup the next chunk in the queue.
70 *
71 * \param cq The chunk queue.
72 *
73 * \return The next queued chunk, or \p NULL if there is no chunk available.
74 */
75 struct queued_chunk *cq_peek(struct chunk_queue *cq)
76 {
77 if (list_empty(&cq->q))
78 return NULL;
79 return list_entry(cq->q.next, struct queued_chunk, node);
80 }
81
82 /**
83 * Remove the current chunk from the queue.
84 *
85 * \param cq The queue to remove from.
86 */
87 void cq_dequeue(struct chunk_queue *cq)
88 {
89 struct queued_chunk *qc = cq_peek(cq);
90 assert(qc);
91 assert(cq->num_pending >= qc->num_bytes);
92 cq->num_pending -= qc->num_bytes;
93 list_del(&qc->node);
94 free(qc);
95 }
96
97 /**
98 * Force to add a chunk to the given queue.
99 *
100 * \param cq See \ref cq_enqueue.
101 * \param buf See \ref cq_enqueue.
102 * \param num_bytes See \ref cq_enqueue.
103 *
104 * If queuing the given buffer would result in exceeding the maximal queue
105 * size, buffers are dropped from the beginning of the queue. Note that this
106 * function still might fail.
107 *
108 * \return Standard.
109 */
110 int cq_force_enqueue(struct chunk_queue *cq, const char *buf, size_t num_bytes)
111 {
112 int ret;
113
114 if (num_bytes > cq->max_pending)
115 return -E_QUEUE;
116 for (;;) {
117 ret = cq_enqueue(cq, buf, num_bytes);
118 if (ret >= 0)
119 return ret;
120 cq_dequeue(cq);
121 }
122 /* never reached */
123 }
124
125 /**
126 * Change the number of bytes sent for the current queued chunk.
127 *
128 * \param cq The chunk queue.
129 * \param sent Number of bytes successfully sent.
130 */
131 void cq_update(struct chunk_queue *cq, size_t sent)
132 {
133 struct queued_chunk *qc = cq_peek(cq);
134 assert(qc);
135 qc->num_bytes -= sent;
136 qc->buf += sent;
137 cq->num_pending -= sent;
138 }
139
140 /**
141 * Get a pointer to the given queued chunk.
142 *
143 * \param qc The queued chunk.
144 * \param buf Result pointer.
145 * \param num_bytes Number of bytes of \a buf.
146 *
147 * \return Positive on success, negative on errors.
148 */
149 int cq_get(struct queued_chunk *qc, const char **buf, size_t *num_bytes)
150 {
151 *buf = qc->buf;
152 *num_bytes = qc->num_bytes;
153 return 1;
154 }
155
156 /**
157 * Allocate and initialize a chunk queue.
158 *
159 * \param max_pending Maximal number of bytes that will be queued.
160 *
161 * \return A pointer to the new queue.
162 */
163 struct chunk_queue *cq_new(size_t max_pending)
164 {
165 struct chunk_queue *cq = para_malloc(sizeof(*cq));
166 INIT_LIST_HEAD(&cq->q);
167 cq->max_pending = max_pending;
168 cq->num_pending = 0;
169 return cq;
170 }
171
172 /**
173 * Deallocate all resources of this queue.
174 *
175 * \param cq The chunk queue.
176 */
177 void cq_destroy(struct chunk_queue *cq)
178 {
179 struct queued_chunk *qc, *tmp;
180 list_for_each_entry_safe(qc, tmp, &cq->q, node) {
181 list_del(&qc->node);
182 free(qc);
183 }
184 free(cq);
185 }