08f57e9d1ecf9bae1cadead0757fe29418c446c3
[paraslash.git] / chunk_queue.c
1 /* Copyright (C) 2007 Andre Noll <maan@tuebingen.mpg.de>, see file COPYING. */
2
3 /** \file chunk_queue.c Queuing functions for paraslash senders. */
4
5 #include <regex.h>
6
7 #include "para.h"
8 #include "list.h"
9 #include "afh.h"
10 #include "string.h"
11 #include "error.h"
12 #include "chunk_queue.h"
13
14 /**
15  * Senders may use the chunk queue facility to deal with laggy connections.  It
16  * allows them to enqueue chunks if they can not be sent out immediately.
17  *
18  * Chunk queues are "cheap" in the sense that only reference to the audio file
19  * data is stored, but not the data itself.
20  */
21 struct chunk_queue {
22         /** The list of pending chunks for this client. */
23         struct list_head q;
24         /** The number of pending bytes for this client. */
25         unsigned long num_pending;
26         /** More than that many bytes in the queue is considered an error. */
27         unsigned long max_pending;
28 };
29
30 /** Describes one queued chunk in a chunk queue. */
31 struct queued_chunk {
32         /** Pointer to the data to be queued. */
33         const char *buf;
34         /** The number of bytes of this chunk. */
35         size_t num_bytes;
36         /** Position of the chunk in the chunk queue. */
37         struct list_head node;
38 };
39
40 /**
41  * Add a chunk to the given queue.
42  *
43  * \param cq the queue to add the chunk to.
44  * \param buf Pointer to the data to be queued.
45  * \param num_bytes The size of \a buf.
46  *
47  * \return Standard.
48  */
49 int cq_enqueue(struct chunk_queue *cq, const char *buf, size_t num_bytes)
50 {
51         struct queued_chunk *qc;
52
53         if (cq->num_pending + num_bytes > cq->max_pending)
54                 return -E_QUEUE;
55         qc = para_malloc(sizeof(struct queued_chunk));
56         cq->num_pending += num_bytes;
57         qc->buf = buf;
58         qc->num_bytes = num_bytes;
59         list_add_tail(&qc->node, &cq->q);
60         PARA_DEBUG_LOG("%lu bytes queued for %p\n", cq->num_pending, &cq->q);
61         return 1;
62 }
63
64 /**
65  * Lookup the next chunk in the queue.
66  *
67  * \param cq The chunk queue.
68  *
69  * \return The next queued chunk, or \p NULL if there is no chunk available.
70  */
71 struct queued_chunk *cq_peek(struct chunk_queue *cq)
72 {
73         if (list_empty(&cq->q))
74                 return NULL;
75         return list_entry(cq->q.next, struct queued_chunk, node);
76 }
77
78 /**
79  * Remove the current chunk from the queue.
80  *
81  * \param cq The queue to remove from.
82  */
83 void cq_dequeue(struct chunk_queue *cq)
84 {
85         struct queued_chunk *qc = cq_peek(cq);
86         assert(qc);
87         assert(cq->num_pending >= qc->num_bytes);
88         cq->num_pending -= qc->num_bytes;
89         list_del(&qc->node);
90         free(qc);
91 }
92
93 /**
94  * Change the number of bytes sent for the current queued chunk.
95  *
96  * \param cq The chunk queue.
97  * \param sent Number of bytes successfully sent.
98  */
99 void cq_update(struct chunk_queue *cq, size_t sent)
100 {
101         struct queued_chunk *qc = cq_peek(cq);
102         assert(qc);
103         qc->num_bytes -= sent;
104         qc->buf += sent;
105         cq->num_pending -= sent;
106 }
107
108 /**
109  * Get a pointer to the given queued chunk.
110  *
111  * \param qc The queued chunk.
112  * \param buf Result pointer.
113  * \param num_bytes Number of bytes of \a buf.
114  *
115  * \return Positive on success, negative on errors.
116  */
117 int cq_get(struct queued_chunk *qc, const char **buf, size_t *num_bytes)
118 {
119         *buf = qc->buf;
120         *num_bytes = qc->num_bytes;
121         return 1;
122 }
123
124 /**
125  * Allocate and initialize a chunk queue.
126  *
127  * \param max_pending Maximal number of bytes that will be queued.
128  *
129  * \return A pointer to the new queue.
130  */
131 struct chunk_queue *cq_new(size_t max_pending)
132 {
133         struct chunk_queue *cq = para_malloc(sizeof(*cq));
134         INIT_LIST_HEAD(&cq->q);
135         cq->max_pending = max_pending;
136         cq->num_pending = 0;
137         return cq;
138 }
139
140 /**
141  * Deallocate all resources of this queue.
142  *
143  * \param cq The chunk queue.
144  */
145 void cq_destroy(struct chunk_queue *cq)
146 {
147         struct queued_chunk *qc, *tmp;
148         list_for_each_entry_safe(qc, tmp, &cq->q, node) {
149                 list_del(&qc->node);
150                 free(qc);
151         }
152         free(cq);
153 }