]> git.tuebingen.mpg.de Git - paraslash.git/blob - chunk_queue.c
5b102f286641d7b93418f3ae92e7dcdf6f71d0f6
[paraslash.git] / chunk_queue.c
1 /*
2  * Copyright (C) 2007-2010 Andre Noll <maan@systemlinux.org>
3  *
4  * Licensed under the GPL v2. For licencing details see COPYING.
5  */
6
7 /** \file chunk_queue.c Queuing functions for paraslash senders. */
8
9 #include <regex.h>
10
11 #include "para.h"
12 #include "list.h"
13 #include "afh.h"
14 #include "vss.h"
15 #include "string.h"
16 #include "error.h"
17
18 /**
19  * Senders may use the chunk queue facility to deal with laggy connections.  It
20  * allows them to enqueue chunks if they can not be sent out immediately.
21  *
22  * Chunk queues are "cheap" in the sense that only reference to the audio file
23  * data is stored, but not the data itself.
24  */
25 struct chunk_queue {
26         /** The list of pending chunks for this client. */
27         struct list_head q;
28         /** The number of pending bytes for this client. */
29         unsigned long num_pending;
30         /** More than that many bytes in the queue is considered an error. */
31         unsigned long max_pending;
32 };
33
34 /** Describes one queued chunk in a chunk queue. */
35 struct queued_chunk {
36         /** Pointer to the data to be queued. */
37         const char *buf;
38         /** The number of bytes of this chunk. */
39         size_t num_bytes;
40         /** Position of the chunk in the chunk queue. */
41         struct list_head node;
42 };
43
44 /**
45  * Add a chunk to the given queue.
46  *
47  * \param cq the queue to add the chunk to.
48  * \param buf Pointer to the data to be queued.
49  * \param num_bytes The size of \a buf.
50  *
51  * \return Standard.
52  */
53 int cq_enqueue(struct chunk_queue *cq, const char *buf, size_t num_bytes)
54 {
55         struct queued_chunk *qc;
56
57         if (cq->num_pending + num_bytes > cq->max_pending)
58                 return -E_QUEUE;
59         qc = para_malloc(sizeof(struct queued_chunk));
60         cq->num_pending += num_bytes;
61         qc->buf = buf;
62         qc->num_bytes = num_bytes;
63         list_add_tail(&qc->node, &cq->q);
64         PARA_DEBUG_LOG("%lu bytes queued for %p\n", cq->num_pending, &cq->q);
65         return 1;
66 }
67
68 /**
69  * Lookup the next chunk in the queue.
70  *
71  * \param cq The chunk queue.
72  *
73  * \return The next queued chunk, or \p NULL if there is no chunk available.
74  */
75 struct queued_chunk *cq_peek(struct chunk_queue *cq)
76 {
77         if (list_empty(&cq->q))
78                 return NULL;
79         return list_entry(cq->q.next, struct queued_chunk, node);
80 }
81
82 /**
83  * Remove the current chunk from the queue.
84  *
85  * \param cq The chunk to remove.
86  */
87 void cq_dequeue(struct chunk_queue *cq)
88 {
89         struct queued_chunk *qc = cq_peek(cq);
90         assert(qc);
91         list_del(&qc->node);
92         free(qc);
93 }
94
95 /**
96  * Force to add a chunk to the given queue.
97  *
98  * \param cq See \ref cq_enqueue.
99  * \param buf See \ref cq_enqueue.
100  * \param num_bytes See \ref cq_enqueue.
101  *
102  * If queuing the given buffer would result in exceeding the maximal queue
103  * size, buffers are dropped from the beginning of the queue. Note that this
104  * function still might fail.
105  *
106  * \return Standard.
107  */
108 int cq_force_enqueue(struct chunk_queue *cq, const char *buf, size_t num_bytes)
109 {
110         int ret;
111
112         if (num_bytes > cq->max_pending)
113                 return -E_QUEUE;
114         for (;;) {
115                 ret = cq_enqueue(cq, buf, num_bytes);
116                 if (ret >= 0)
117                         return ret;
118                 cq_dequeue(cq);
119         }
120         /* never reached */
121 }
122
123 /**
124  * Change the number of bytes sent for the current queued chunk.
125  *
126  * \param cq The chunk queue.
127  * \param sent Number of bytes successfully sent.
128  */
129 void cq_update(struct chunk_queue *cq, size_t sent)
130 {
131         struct queued_chunk *qc = cq_peek(cq);
132         assert(qc);
133         qc->num_bytes -= sent;
134         qc->buf += sent;
135         cq->num_pending -= sent;
136 }
137
138 /**
139  * Get a pointer to the given queued chunk.
140  *
141  * \param qc The queued chunk.
142  * \param buf Result pointer.
143  * \param num_bytes Number of bytes of \a buf.
144  *
145  * \return Positive on success, negative on errors.
146  */
147 int cq_get(struct queued_chunk *qc, const char **buf, size_t *num_bytes)
148 {
149         *buf = qc->buf;
150         *num_bytes = qc->num_bytes;
151         return 1;
152 }
153
154 /**
155  * Allocate and initialize a chunk queue.
156  *
157  * \param max_pending Maximal number of bytes that will be queued.
158  *
159  * \return A pointer to the new queue.
160  */
161 struct chunk_queue *cq_new(size_t max_pending)
162 {
163         struct chunk_queue *cq = para_malloc(sizeof(*cq));
164         INIT_LIST_HEAD(&cq->q);
165         cq->max_pending = max_pending;
166         cq->num_pending = 0;
167         return cq;
168 }
169
170 /**
171  * Deallocate all resources of this queue.
172  *
173  * \param cq The chunk queue.
174  */
175 void cq_destroy(struct chunk_queue *cq)
176 {
177         struct queued_chunk *qc, *tmp;
178         list_for_each_entry_safe(qc, tmp, &cq->q, node) {
179                 list_del(&qc->node);
180                 free(qc);
181         }
182         free(cq);
183 }