Implement server side afs callbacks.
[paraslash.git] / chunk_queue.c
1 /*
2  * Copyright (C) 2007 Andre Noll <maan@systemlinux.org>
3  *
4  * Licensed under the GPL v2. For licencing details see COPYING.
5  */
6
7 /** \file chunk_queue.c Queuing functions for paraslash senders. */
8
9 #include "para.h"
10 #include "list.h"
11 #include "vss.h"
12 #include "string.h"
13 #include "error.h"
14
15 /**
16  * Senders may use the chunk queue facility to deal with laggy connections.  It
17  * allows them to enqueue chunks if they can not be sent out immediately.
18  *
19  * Chunk queues are "cheap" in the sense that only reference to the audio file
20  * data is stored, but not the data itsself.
21  */
22 struct chunk_queue {
23         /** The list of pending chunks for this client. */
24         struct list_head q;
25         /** The number of pending bytes for this client. */
26         unsigned long num_pending;
27         /** Enqueueing more than that many bytes is an error. */
28         unsigned long max_pending;
29 };
30
31 /** Describes one queued chunk in a chunk queue. */
32 struct queued_chunk {
33         /** The number of the queued chunk, -1U means header. */
34         unsigned chunk_num;
35         /** The number of bytes already sent. */
36         unsigned sent;
37         /** Position of the chunk in the chunk queue. */
38         struct list_head node;
39 };
40
41 /**
42  * Add a chunk to the given queue.
43  *
44  * \param cq the queue to add the chunk to.
45  * \param chunk_num The number of the chunk to be queued.
46  * \param sent The number of bytes of this chunk that the sender was able to
47  * send.
48  *
49  * \return Positive on success, negative on errors.
50  */
51 int cq_enqueue(struct chunk_queue *cq, long unsigned chunk_num,
52                 size_t sent)
53 {
54         struct queued_chunk *qc;
55         char *buf;
56         size_t len;
57         int ret;
58
59         if (chunk_num != -1U) {
60                 ret = vss_get_chunk(chunk_num, &buf, &len);
61                 if (ret < 0)
62                         return ret;
63         } else
64                 buf = vss_get_header(&len);
65         if (cq->num_pending + len > cq->max_pending)
66                 return -E_QUEUE;
67         qc = para_malloc(sizeof(struct queued_chunk));
68         cq->num_pending += len;
69         qc->chunk_num = chunk_num;
70         qc->sent = sent;
71         list_add_tail(&qc->node, &cq->q);
72         PARA_DEBUG_LOG("%lu bytes queued for %p\n", cq->num_pending, &cq->q);
73         return 1;
74 }
75
76 /**
77  * Lookup the next chunk in the queue.
78  *
79  * \param cq The chunk queue.
80  *
81  * \return The next queued chunk, or \p NULL if there is no chunk awailable.
82  */
83 struct queued_chunk *cq_peek(struct chunk_queue *cq)
84 {
85         if (list_empty(&cq->q))
86                 return NULL;
87         return list_entry(cq->q.next, struct queued_chunk, node);
88 }
89
90 /**
91  * Remove the current chunk from the queue.
92  *
93  * \param cq The chunk to remove.
94  */
95 void cq_dequeue(struct chunk_queue *cq)
96 {
97         struct queued_chunk *qc = cq_peek(cq);
98         assert(qc);
99         list_del(&qc->node);
100         free(qc);
101 }
102
103 /**
104  * Change the number of bytes send for the current queued chunk.
105  *
106  * \param cq The chunk queue.
107  * \param sent Number of bytes successfully sent.
108  */
109 void cq_update(struct chunk_queue *cq, size_t sent)
110 {
111         struct queued_chunk *qc = cq_peek(cq);
112         assert(qc);
113         qc->sent += sent;
114         cq->num_pending -= sent;
115 }
116
117 /**
118  * Get a pointer to the given queued chunk.
119  *
120  * \param qc The queued chunk.
121  * \param buf Result pointer.
122  * \param len Number of bytes of \a buf.
123  *
124  * \return Positive on success, negative on errors.
125  */
126 int cq_get(struct queued_chunk *qc, char **buf, size_t *len)
127 {
128         int ret;
129
130         if (qc->chunk_num != -1U) {
131                 ret = vss_get_chunk(qc->chunk_num, buf, len);
132                 if (ret < 0)
133                         return ret;
134         } else
135                 *buf = vss_get_header(len);
136         assert(*len > qc->sent);
137         *buf += qc->sent;
138         *len -= qc->sent;
139         return 1;
140 }
141
142 /**
143  * Allocate and initialize a chunk queue.
144  *
145  * \param max_pending Maximal number of bytes that will be queued.
146  *
147  * \return A pointer to the new queue.
148  */
149 struct chunk_queue *cq_new(size_t max_pending)
150 {
151         struct chunk_queue *cq = para_malloc(sizeof(*cq));
152         INIT_LIST_HEAD(&cq->q);
153         cq->max_pending = max_pending;
154         cq->num_pending = 0;
155         return cq;
156 }
157
158 /**
159  * Deallocate all resources of this queue.
160  *
161  * \param cq The chunk queue.
162  */
163 void cq_destroy(struct chunk_queue *cq)
164 {
165         struct queued_chunk *qc, *tmp;
166         list_for_each_entry_safe(qc, tmp, &cq->q, node) {
167                 list_del(&qc->node);
168                 free(qc);
169         }
170         free(cq);
171 }