Make build of para_server optional.
[paraslash.git] / chunk_queue.c
1 /*
2  * Copyright (C) 2007-2010 Andre Noll <maan@systemlinux.org>
3  *
4  * Licensed under the GPL v2. For licencing details see COPYING.
5  */
6
7 /** \file chunk_queue.c Queuing functions for paraslash senders. */
8
9 #include <regex.h>
10
11 #include "para.h"
12 #include "list.h"
13 #include "afh.h"
14 #include "string.h"
15 #include "error.h"
16
17 /**
18  * Senders may use the chunk queue facility to deal with laggy connections.  It
19  * allows them to enqueue chunks if they can not be sent out immediately.
20  *
21  * Chunk queues are "cheap" in the sense that only reference to the audio file
22  * data is stored, but not the data itself.
23  */
24 struct chunk_queue {
25         /** The list of pending chunks for this client. */
26         struct list_head q;
27         /** The number of pending bytes for this client. */
28         unsigned long num_pending;
29         /** More than that many bytes in the queue is considered an error. */
30         unsigned long max_pending;
31 };
32
33 /** Describes one queued chunk in a chunk queue. */
34 struct queued_chunk {
35         /** Pointer to the data to be queued. */
36         const char *buf;
37         /** The number of bytes of this chunk. */
38         size_t num_bytes;
39         /** Position of the chunk in the chunk queue. */
40         struct list_head node;
41 };
42
43 /**
44  * Add a chunk to the given queue.
45  *
46  * \param cq the queue to add the chunk to.
47  * \param buf Pointer to the data to be queued.
48  * \param num_bytes The size of \a buf.
49  *
50  * \return Standard.
51  */
52 int cq_enqueue(struct chunk_queue *cq, const char *buf, size_t num_bytes)
53 {
54         struct queued_chunk *qc;
55
56         if (cq->num_pending + num_bytes > cq->max_pending)
57                 return -E_QUEUE;
58         qc = para_malloc(sizeof(struct queued_chunk));
59         cq->num_pending += num_bytes;
60         qc->buf = buf;
61         qc->num_bytes = num_bytes;
62         list_add_tail(&qc->node, &cq->q);
63         PARA_DEBUG_LOG("%lu bytes queued for %p\n", cq->num_pending, &cq->q);
64         return 1;
65 }
66
67 /**
68  * Lookup the next chunk in the queue.
69  *
70  * \param cq The chunk queue.
71  *
72  * \return The next queued chunk, or \p NULL if there is no chunk available.
73  */
74 struct queued_chunk *cq_peek(struct chunk_queue *cq)
75 {
76         if (list_empty(&cq->q))
77                 return NULL;
78         return list_entry(cq->q.next, struct queued_chunk, node);
79 }
80
81 /**
82  * Remove the current chunk from the queue.
83  *
84  * \param cq The queue to remove from.
85  */
86 void cq_dequeue(struct chunk_queue *cq)
87 {
88         struct queued_chunk *qc = cq_peek(cq);
89         assert(qc);
90         assert(cq->num_pending >= qc->num_bytes);
91         cq->num_pending -= qc->num_bytes;
92         list_del(&qc->node);
93         free(qc);
94 }
95
96 /**
97  * Force to add a chunk to the given queue.
98  *
99  * \param cq See \ref cq_enqueue.
100  * \param buf See \ref cq_enqueue.
101  * \param num_bytes See \ref cq_enqueue.
102  *
103  * If queuing the given buffer would result in exceeding the maximal queue
104  * size, buffers are dropped from the beginning of the queue. Note that this
105  * function still might fail.
106  *
107  * \return Standard.
108  */
109 int cq_force_enqueue(struct chunk_queue *cq, const char *buf, size_t num_bytes)
110 {
111         int ret;
112
113         if (num_bytes > cq->max_pending)
114                 return -E_QUEUE;
115         for (;;) {
116                 ret = cq_enqueue(cq, buf, num_bytes);
117                 if (ret >= 0)
118                         return ret;
119                 cq_dequeue(cq);
120         }
121         /* never reached */
122 }
123
124 /**
125  * Change the number of bytes sent for the current queued chunk.
126  *
127  * \param cq The chunk queue.
128  * \param sent Number of bytes successfully sent.
129  */
130 void cq_update(struct chunk_queue *cq, size_t sent)
131 {
132         struct queued_chunk *qc = cq_peek(cq);
133         assert(qc);
134         qc->num_bytes -= sent;
135         qc->buf += sent;
136         cq->num_pending -= sent;
137 }
138
139 /**
140  * Get a pointer to the given queued chunk.
141  *
142  * \param qc The queued chunk.
143  * \param buf Result pointer.
144  * \param num_bytes Number of bytes of \a buf.
145  *
146  * \return Positive on success, negative on errors.
147  */
148 int cq_get(struct queued_chunk *qc, const char **buf, size_t *num_bytes)
149 {
150         *buf = qc->buf;
151         *num_bytes = qc->num_bytes;
152         return 1;
153 }
154
155 /**
156  * Allocate and initialize a chunk queue.
157  *
158  * \param max_pending Maximal number of bytes that will be queued.
159  *
160  * \return A pointer to the new queue.
161  */
162 struct chunk_queue *cq_new(size_t max_pending)
163 {
164         struct chunk_queue *cq = para_malloc(sizeof(*cq));
165         INIT_LIST_HEAD(&cq->q);
166         cq->max_pending = max_pending;
167         cq->num_pending = 0;
168         return cq;
169 }
170
171 /**
172  * Deallocate all resources of this queue.
173  *
174  * \param cq The chunk queue.
175  */
176 void cq_destroy(struct chunk_queue *cq)
177 {
178         struct queued_chunk *qc, *tmp;
179         list_for_each_entry_safe(qc, tmp, &cq->q, node) {
180                 list_del(&qc->node);
181                 free(qc);
182         }
183         free(cq);
184 }