Merge branch 'maint'
[paraslash.git] / chunk_queue.c
1 /*
2 * Copyright (C) 2007-2011 Andre Noll <maan@systemlinux.org>
3 *
4 * Licensed under the GPL v2. For licencing details see COPYING.
5 */
6
7 /** \file chunk_queue.c Queuing functions for paraslash senders. */
8
9 #include <regex.h>
10
11 #include "para.h"
12 #include "list.h"
13 #include "afh.h"
14 #include "string.h"
15 #include "error.h"
16
17 /**
18 * Senders may use the chunk queue facility to deal with laggy connections. It
19 * allows them to enqueue chunks if they can not be sent out immediately.
20 *
21 * Chunk queues are "cheap" in the sense that only reference to the audio file
22 * data is stored, but not the data itself.
23 */
24 struct chunk_queue {
25 /** The list of pending chunks for this client. */
26 struct list_head q;
27 /** The number of pending bytes for this client. */
28 unsigned long num_pending;
29 /** More than that many bytes in the queue is considered an error. */
30 unsigned long max_pending;
31 };
32
33 /** Describes one queued chunk in a chunk queue. */
34 struct queued_chunk {
35 /** Pointer to the data to be queued. */
36 const char *buf;
37 /** The number of bytes of this chunk. */
38 size_t num_bytes;
39 /** Position of the chunk in the chunk queue. */
40 struct list_head node;
41 };
42
43 /**
44 * Add a chunk to the given queue.
45 *
46 * \param cq the queue to add the chunk to.
47 * \param buf Pointer to the data to be queued.
48 * \param num_bytes The size of \a buf.
49 *
50 * \return Standard.
51 */
52 int cq_enqueue(struct chunk_queue *cq, const char *buf, size_t num_bytes)
53 {
54 struct queued_chunk *qc;
55
56 if (cq->num_pending + num_bytes > cq->max_pending)
57 return -E_QUEUE;
58 qc = para_malloc(sizeof(struct queued_chunk));
59 cq->num_pending += num_bytes;
60 qc->buf = buf;
61 qc->num_bytes = num_bytes;
62 list_add_tail(&qc->node, &cq->q);
63 PARA_DEBUG_LOG("%lu bytes queued for %p\n", cq->num_pending, &cq->q);
64 return 1;
65 }
66
67 /**
68 * Lookup the next chunk in the queue.
69 *
70 * \param cq The chunk queue.
71 *
72 * \return The next queued chunk, or \p NULL if there is no chunk available.
73 */
74 struct queued_chunk *cq_peek(struct chunk_queue *cq)
75 {
76 if (list_empty(&cq->q))
77 return NULL;
78 return list_entry(cq->q.next, struct queued_chunk, node);
79 }
80
81 /**
82 * Remove the current chunk from the queue.
83 *
84 * \param cq The queue to remove from.
85 */
86 void cq_dequeue(struct chunk_queue *cq)
87 {
88 struct queued_chunk *qc = cq_peek(cq);
89 assert(qc);
90 assert(cq->num_pending >= qc->num_bytes);
91 cq->num_pending -= qc->num_bytes;
92 list_del(&qc->node);
93 free(qc);
94 }
95
96 /**
97 * Change the number of bytes sent for the current queued chunk.
98 *
99 * \param cq The chunk queue.
100 * \param sent Number of bytes successfully sent.
101 */
102 void cq_update(struct chunk_queue *cq, size_t sent)
103 {
104 struct queued_chunk *qc = cq_peek(cq);
105 assert(qc);
106 qc->num_bytes -= sent;
107 qc->buf += sent;
108 cq->num_pending -= sent;
109 }
110
111 /**
112 * Get a pointer to the given queued chunk.
113 *
114 * \param qc The queued chunk.
115 * \param buf Result pointer.
116 * \param num_bytes Number of bytes of \a buf.
117 *
118 * \return Positive on success, negative on errors.
119 */
120 int cq_get(struct queued_chunk *qc, const char **buf, size_t *num_bytes)
121 {
122 *buf = qc->buf;
123 *num_bytes = qc->num_bytes;
124 return 1;
125 }
126
127 /**
128 * Allocate and initialize a chunk queue.
129 *
130 * \param max_pending Maximal number of bytes that will be queued.
131 *
132 * \return A pointer to the new queue.
133 */
134 struct chunk_queue *cq_new(size_t max_pending)
135 {
136 struct chunk_queue *cq = para_malloc(sizeof(*cq));
137 INIT_LIST_HEAD(&cq->q);
138 cq->max_pending = max_pending;
139 cq->num_pending = 0;
140 return cq;
141 }
142
143 /**
144 * Deallocate all resources of this queue.
145 *
146 * \param cq The chunk queue.
147 */
148 void cq_destroy(struct chunk_queue *cq)
149 {
150 struct queued_chunk *qc, *tmp;
151 list_for_each_entry_safe(qc, tmp, &cq->q, node) {
152 list_del(&qc->node);
153 free(qc);
154 }
155 free(cq);
156 }