]> git.tuebingen.mpg.de Git - paraslash.git/blob - send_common.c
chunk_queue: Store a pointer to the data and the chunk size.
[paraslash.git] / send_common.c
1 /*
2  * Copyright (C) 2005-2008 Andre Noll <maan@systemlinux.org>
3  *
4  * Licensed under the GPL v2. For licencing details see COPYING.
5  */
6
7 /** \file send_common.c Functions used by more than one paraslash sender. */
8
9 #include <dirent.h>
10 #include "para.h"
11 #include "error.h"
12 #include "string.h"
13 #include "fd.h"
14 #include "net.h"
15 #include "list.h"
16 #include "afh.h"
17 #include "afs.h"
18 #include "server.h"
19 #include "acl.h"
20 #include "send.h"
21 #include "close_on_fork.h"
22 #include "chunk_queue.h"
23 #include "vss.h"
24
25 /** Clients will be kicked if there are more than that many bytes pending. */
26 #define MAX_CQ_BYTES 40000
27
28 /**
29  * Open a passive socket of given layer4 type.
30  *
31  * Set the resulting file descriptor to nonblocking mode and add it to the list
32  * of fds that are being closed in the child process when the server calls
33  * fork().
34  *
35  * \param l4type The transport-layer protocol.
36  * \param port The port number.
37  *
38  * \return The listening fd on success, negative on errors.
39  */
40 static int open_sender(unsigned l4type, int port)
41 {
42         int fd, ret = para_listen(AF_UNSPEC, l4type, port);
43
44         if (ret < 0)
45                 return ret;
46         fd = ret;
47         ret = mark_fd_nonblocking(fd);
48         if (ret < 0) {
49                 close(fd);
50                 return ret;
51         }
52         add_close_on_fork_list(fd);
53         return fd;
54 }
55
56 /**
57  * Shut down a client connected to a paraslash sender.
58  *
59  * \param sc The client to shut down.
60  * \param ss The sender whose clients are to be shut down.
61  *
62  * Close the file descriptor given by \a sc, remove it from the close-on-fork
63  * list, destroy the chunk queue of this client, delete the client from the
64  * list of connected clients and free the sender_client struct.
65  */
66 void shutdown_client(struct sender_client *sc, struct sender_status *ss)
67 {
68         PARA_INFO_LOG("shutting down %s on fd %d\n", sc->name, sc->fd);
69         free(sc->name);
70         close(sc->fd);
71         del_close_on_fork_list(sc->fd);
72         cq_destroy(sc->cq);
73         list_del(&sc->node);
74         free(sc->private_data);
75         free(sc);
76         ss->num_clients--;
77 }
78
79 void shutdown_clients(struct sender_status *ss)
80 {
81         struct sender_client *sc, *tmp;
82         list_for_each_entry_safe(sc, tmp, &ss->client_list, node)
83                 shutdown_client(sc, ss);
84 }
85
86 /**
87  * Write a buffer to a non-blocking file descriptor.
88  *
89  * \param fd The file descriptor.
90  * \param buf the buffer to write.
91  * \param len the number of bytes of \a buf.
92  * \param max_bytes_per_write Do not write more than that many bytes at once.
93  *
94  * If \a max_bytes_per_write is non-zero, do not send more than that many bytes
95  * per write().
96  *
97  * EAGAIN is not considered an error condition.  For example CCID3 has a
98  * sending wait queue which fills up and is emptied asynchronously. The EAGAIN
99  * case means that there is currently no space in the wait queue, but this can
100  * change at any moment.
101  *
102  * \return Negative on errors, number of bytes written else.
103  */
104 static int write_nonblock(int fd, const char *buf, size_t len,
105                 size_t max_bytes_per_write)
106 {
107         size_t written = 0;
108         int ret = 0;
109
110         while (written < len) {
111                 size_t num = len - written;
112
113                 if (max_bytes_per_write && max_bytes_per_write < num)
114                         num = max_bytes_per_write;
115                 ret = write(fd, buf + written, num);
116                 if (ret < 0 && errno == EAGAIN)
117                         return written;
118                 if (ret < 0)
119                         return -ERRNO_TO_PARA_ERROR(errno);
120                 written += ret;
121         }
122         return written;
123 }
124
125 static int queue_chunk_or_shutdown(struct sender_client *sc,
126                 struct sender_status *ss, const char *buf, size_t num_bytes)
127 {
128         int ret = cq_enqueue(sc->cq, buf, num_bytes);
129         if (ret < 0)
130                 shutdown_client(sc, ss);
131         return ret;
132 }
133
134 /* return: negative on errors, zero if not everything was sent, one otherwise */
135 static int send_queued_chunks(struct sender_client *sc,
136                 size_t max_bytes_per_write)
137 {
138         struct queued_chunk *qc;
139         while ((qc = cq_peek(sc->cq))) {
140                 const char *buf;
141                 size_t len;
142                 int ret;
143                 cq_get(qc, &buf, &len);
144                 ret = write_nonblock(sc->fd, buf, len, max_bytes_per_write);
145                 if (ret < 0)
146                         return ret;
147                 cq_update(sc->cq, ret);
148                 if (ret != len)
149                         return 0;
150                 cq_dequeue(sc->cq);
151         }
152         return 1;
153 }
154
155 /**
156  * Send one chunk of audio data to a connected client.
157  *
158  * \param sc The client.
159  * \param ss The sender.
160  * \param max_bytes_per_write Split writes to chunks of at most that many bytes.
161  * \param current_chunk The number of the chunk to write.
162  * \param buf The data to write.
163  * \param len The number of bytes of \a buf.
164  *
165  * On errors, the client is shut down. If only a part of the buffer could be
166  * written, the remainder is put into the chunk queue for that client.
167  */
168 void send_chunk(struct sender_client *sc, struct sender_status *ss,
169                 size_t max_bytes_per_write, long unsigned current_chunk,
170                 const char *buf, size_t len)
171 {
172         int ret;
173
174         if (!sc->header_sent && current_chunk) {
175                 size_t header_len;
176                 char *header_buf;
177
178                 vss_get_header(&header_buf, &header_len);
179                 if (header_buf && header_len > 0) {
180                         ret = queue_chunk_or_shutdown(sc, ss, header_buf, header_len);
181                         if (ret < 0)
182                                 goto out;
183                 }
184                 sc->header_sent = 1;
185         }
186         ret = send_queued_chunks(sc, max_bytes_per_write);
187         if (ret < 0) {
188                 shutdown_client(sc, ss);
189                 goto out;
190         }
191         if (!len)
192                 goto out;
193         if (!ret) { /* still data left in the queue */
194                 ret = queue_chunk_or_shutdown(sc, ss, buf, len);
195                 goto out;
196         }
197         ret = write_nonblock(sc->fd, buf, len, max_bytes_per_write);
198         if (ret < 0) {
199                 shutdown_client(sc, ss);
200                 goto out;
201         }
202         if (ret != len)
203                 ret = queue_chunk_or_shutdown(sc, ss, buf + ret, len - ret);
204 out:
205         if (ret < 0)
206                 PARA_NOTICE_LOG("%s\n", para_strerror(-ret));
207 }
208
209 /**
210  * Initialize a struct sender status.
211  *
212  * \param ss The struct to initialize.
213  * \param access_arg The array of access arguments given at the command line.
214  * \param num_access_args The number of elements in \a access_arg.
215  * \param port The tcp or dccp port to listen on.
216  * \param max_clients The maximal number of simultaneous connections.
217  * \param default_deny Whether a blacklist should be used for access control.
218  */
219 void init_sender_status(struct sender_status *ss, char **access_arg,
220         int num_access_args, int port, int max_clients, int default_deny)
221 {
222         ss->listen_fd = -1;
223         INIT_LIST_HEAD(&ss->client_list);
224         ss->port = port;
225         acl_init(&ss->acl, access_arg, num_access_args);
226         ss->num_clients = 0;
227         ss->max_clients = max_clients;
228         ss->default_deny = default_deny;
229 }
230
231 /**
232  * Return a string containing the current status of a sender.
233  *
234  * \param ss The sender.
235  * \param name Used for printing the header line.
236  *
237  * \return The string printed in the "si" command.
238  */
239 char *get_sender_info(struct sender_status *ss, char *name)
240 {
241         char *clnts = NULL, *ret;
242         struct sender_client *sc, *tmp_sc;
243
244         char *acl_contents = acl_get_contents(&ss->acl);
245         list_for_each_entry_safe(sc, tmp_sc, &ss->client_list, node) {
246                 char *tmp = make_message("%s%s ", clnts? clnts : "", sc->name);
247                 free(clnts);
248                 clnts = tmp;
249         }
250         ret = make_message(
251                 "%s sender:\n"
252                 "\tstatus: %s\n"
253                 "\tport: %d\n"
254                 "\tnumber of connected clients: %d\n"
255                 "\tmaximal number of clients: %d%s\n"
256                 "\tconnected clients: %s\n"
257                 "\taccess %s list: %s\n",
258                 name,
259                 (ss->listen_fd >= 0)? "on" : "off",
260                 ss->port,
261                 ss->num_clients,
262                 ss->max_clients,
263                 ss->max_clients > 0? "" : " (unlimited)",
264                 clnts? clnts : "(none)",
265                 ss->default_deny? "allow" : "deny",
266                 acl_contents? acl_contents : "(empty)"
267         );
268         free(acl_contents);
269         free(clnts);
270         return ret;
271 }
272
273 /**
274  * Allow connections from the given range of IP addresses.
275  *
276  * \param scd Contains the IP and the netmask.
277  * \param ss The sender.
278  *
279  * \sa generic_com_deny().
280  */
281 void generic_com_allow(struct sender_command_data *scd,
282                 struct sender_status *ss)
283 {
284         acl_allow(scd->addr, scd->netmask, &ss->acl, ss->default_deny);
285 }
286
287 /**
288  * Deny connections from the given range of IP addresses.
289  *
290  * \param scd see \ref generic_com_allow().
291  * \param ss see \ref generic_com_allow().
292  *
293  * \sa generic_com_allow().
294  */
295 void generic_com_deny(struct sender_command_data *scd,
296                 struct sender_status *ss)
297 {
298         acl_deny(scd->addr, scd->netmask, &ss->acl, ss->default_deny);
299 }
300
301 /**
302  * Activate a paraslash sender.
303  *
304  * \param ss The sender to activate.
305  * \param protocol The symbolic name of the transport-layer protocol.
306  *
307  * \return Standard.
308  */
309 int generic_com_on(struct sender_status *ss, unsigned protocol)
310 {
311         int ret;
312
313         if (ss->listen_fd >= 0)
314                 return 1;
315         ret = open_sender(protocol, ss->port);
316         if (ret < 0)
317                 return ret;
318         ss->listen_fd = ret;
319         return 1;
320 }
321
322 /**
323  * Deactivate a paraslash sender.
324  *
325  * Shutdown all connected clients and stop listening on the TCP/DCCP socket.
326  *
327  * \param ss The sender to deactivate.
328  *
329  * \sa \ref del_close_on_fork_list(), shutdown_clients().
330  */
331 void generic_com_off(struct sender_status *ss)
332 {
333         if (ss->listen_fd < 0)
334                 return;
335         PARA_NOTICE_LOG("closing port %d\n", ss->port);
336         close(ss->listen_fd);
337         del_close_on_fork_list(ss->listen_fd);
338         shutdown_clients(ss);
339         ss->listen_fd = -1;
340 }
341
342 /**
343  * Accept a connection on the socket this server is listening on.
344  *
345  * \param ss The sender whose listening fd is ready for reading.
346  *
347  * This must be called only if the socket fd of \a ss is ready for reading.  It
348  * calls para_accept() to accept the connection and performs the following
349  * actions on the resulting file descriptor \a fd:
350  *
351  *      - Checks whether the maximal number of connections are exceeded.
352  *      - Sets \a fd to nonblocking mode.
353  *      - Checks the acl of the sender to find out whether connections
354  *        are allowed from the IP of the connecting peer.
355  *      - Increases the number of connections for this sender.
356  *      - Creates and initializes a new chunk queue for queuing network
357  *        packets that can not be sent immediately.
358  *      - Allocates a new struct sender_client and fills in its \a fd, \a cq
359  *        and \a name members.
360  *      - Adds \a fd to the list of connected clients for this sender.
361  *      - Adds \a fd to the list of file descriptors that should be closed
362  *        in the child process when the server calls fork().
363  *
364  * \return A pointer to the allocated sender_client structure on success, \p
365  * NULL on errors.
366  *
367  * \sa \ref para_accept(), \ref mark_fd_nonblocking(), \ref acl_check_access(),
368  * \ref cq_new(), \ref add_close_on_fork_list().
369  */
370 struct sender_client *accept_sender_client(struct sender_status *ss)
371 {
372         struct sender_client *sc;
373         int fd, ret = para_accept(ss->listen_fd, NULL, 0);
374         if (ret < 0) {
375                 PARA_ERROR_LOG("%s\n", para_strerror(-ret));
376                 return NULL;
377         }
378         fd = ret;
379         ret = -E_MAX_CLIENTS;
380         if (ss->max_clients > 0 && ss->num_clients >= ss->max_clients)
381                 goto err_out;
382         ret = mark_fd_nonblocking(fd);
383         if (ret < 0)
384                 goto err_out;
385         ret = acl_check_access(fd, &ss->acl, ss->default_deny);
386         if (ret < 0)
387                 goto err_out;
388         ss->num_clients++;
389         sc = para_calloc(sizeof(*sc));
390         sc->fd = fd;
391         sc->name = make_message("%s", remote_name(fd));
392         sc->cq = cq_new(MAX_CQ_BYTES);
393         para_list_add(&sc->node, &ss->client_list);
394         add_close_on_fork_list(fd);
395         PARA_INFO_LOG("accepted client #%d: %s (fd %d)\n", ss->num_clients,
396                 sc->name, fd);
397         return sc;
398 err_out:
399         PARA_WARNING_LOG("%s\n", para_strerror(-ret));
400         close(fd);
401         return NULL;
402 }
403
404 /**
405  * Get the generic help text.
406  *
407  * \return A dynamically allocated string containing the help text for
408  * a paraslash sender.
409  */
410 char *generic_sender_help(void)
411 {
412         return make_message(
413                 "usage: {on|off}\n"
414                 "usage: {allow|deny} IP mask\n"
415                 "example: allow 127.0.0.1 32\n"
416         );
417 }