Implement afs events.
[paraslash.git] / chunk_queue.c
1 /*
2 * Copyright (C) 2007 Andre Noll <maan@systemlinux.org>
3 *
4 * Licensed under the GPL v2. For licencing details see COPYING.
5 */
6
7 /** \file chunk_queue.c Queuing functions for paraslash senders. */
8
9 #include "para.h"
10 #include "list.h"
11 #include "afh.h"
12 #include "vss.h"
13 #include "string.h"
14 #include "error.h"
15
16 /**
17 * Senders may use the chunk queue facility to deal with laggy connections. It
18 * allows them to enqueue chunks if they can not be sent out immediately.
19 *
20 * Chunk queues are "cheap" in the sense that only reference to the audio file
21 * data is stored, but not the data itsself.
22 */
23 struct chunk_queue {
24 /** The list of pending chunks for this client. */
25 struct list_head q;
26 /** The number of pending bytes for this client. */
27 unsigned long num_pending;
28 /** Enqueueing more than that many bytes is an error. */
29 unsigned long max_pending;
30 };
31
32 /** Describes one queued chunk in a chunk queue. */
33 struct queued_chunk {
34 /** The number of the queued chunk, -1U means header. */
35 unsigned chunk_num;
36 /** The number of bytes already sent. */
37 unsigned sent;
38 /** Position of the chunk in the chunk queue. */
39 struct list_head node;
40 };
41
42 /**
43 * Add a chunk to the given queue.
44 *
45 * \param cq the queue to add the chunk to.
46 * \param chunk_num The number of the chunk to be queued.
47 * \param sent The number of bytes of this chunk that the sender was able to
48 * send.
49 *
50 * \return Positive on success, negative on errors.
51 */
52 int cq_enqueue(struct chunk_queue *cq, long unsigned chunk_num,
53 size_t sent)
54 {
55 struct queued_chunk *qc;
56 char *buf;
57 size_t len;
58 int ret;
59
60 if (chunk_num != -1U) {
61 ret = vss_get_chunk(chunk_num, &buf, &len);
62 if (ret < 0)
63 return ret;
64 } else
65 buf = vss_get_header(&len);
66 if (cq->num_pending + len > cq->max_pending)
67 return -E_QUEUE;
68 qc = para_malloc(sizeof(struct queued_chunk));
69 cq->num_pending += len;
70 qc->chunk_num = chunk_num;
71 qc->sent = sent;
72 list_add_tail(&qc->node, &cq->q);
73 PARA_DEBUG_LOG("%lu bytes queued for %p\n", cq->num_pending, &cq->q);
74 return 1;
75 }
76
77 /**
78 * Lookup the next chunk in the queue.
79 *
80 * \param cq The chunk queue.
81 *
82 * \return The next queued chunk, or \p NULL if there is no chunk awailable.
83 */
84 struct queued_chunk *cq_peek(struct chunk_queue *cq)
85 {
86 if (list_empty(&cq->q))
87 return NULL;
88 return list_entry(cq->q.next, struct queued_chunk, node);
89 }
90
91 /**
92 * Remove the current chunk from the queue.
93 *
94 * \param cq The chunk to remove.
95 */
96 void cq_dequeue(struct chunk_queue *cq)
97 {
98 struct queued_chunk *qc = cq_peek(cq);
99 assert(qc);
100 list_del(&qc->node);
101 free(qc);
102 }
103
104 /**
105 * Change the number of bytes send for the current queued chunk.
106 *
107 * \param cq The chunk queue.
108 * \param sent Number of bytes successfully sent.
109 */
110 void cq_update(struct chunk_queue *cq, size_t sent)
111 {
112 struct queued_chunk *qc = cq_peek(cq);
113 assert(qc);
114 qc->sent += sent;
115 cq->num_pending -= sent;
116 }
117
118 /**
119 * Get a pointer to the given queued chunk.
120 *
121 * \param qc The queued chunk.
122 * \param buf Result pointer.
123 * \param len Number of bytes of \a buf.
124 *
125 * \return Positive on success, negative on errors.
126 */
127 int cq_get(struct queued_chunk *qc, char **buf, size_t *len)
128 {
129 int ret;
130
131 if (qc->chunk_num != -1U) {
132 ret = vss_get_chunk(qc->chunk_num, buf, len);
133 if (ret < 0)
134 return ret;
135 } else
136 *buf = vss_get_header(len);
137 assert(*len > qc->sent);
138 *buf += qc->sent;
139 *len -= qc->sent;
140 return 1;
141 }
142
143 /**
144 * Allocate and initialize a chunk queue.
145 *
146 * \param max_pending Maximal number of bytes that will be queued.
147 *
148 * \return A pointer to the new queue.
149 */
150 struct chunk_queue *cq_new(size_t max_pending)
151 {
152 struct chunk_queue *cq = para_malloc(sizeof(*cq));
153 INIT_LIST_HEAD(&cq->q);
154 cq->max_pending = max_pending;
155 cq->num_pending = 0;
156 return cq;
157 }
158
159 /**
160 * Deallocate all resources of this queue.
161 *
162 * \param cq The chunk queue.
163 */
164 void cq_destroy(struct chunk_queue *cq)
165 {
166 struct queued_chunk *qc, *tmp;
167 list_for_each_entry_safe(qc, tmp, &cq->q, node) {
168 list_del(&qc->node);
169 free(qc);
170 }
171 free(cq);
172 }