compress.c: Cosmetics.
[paraslash.git] / send_common.c
1 /*
2  * Copyright (C) 2005-2008 Andre Noll <maan@systemlinux.org>
3  *
4  * Licensed under the GPL v2. For licencing details see COPYING.
5  */
6
7 /** \file send_common.c Functions used by more than one paraslash sender. */
8
9 #include <dirent.h>
10 #include "para.h"
11 #include "error.h"
12 #include "string.h"
13 #include "fd.h"
14 #include "net.h"
15 #include "list.h"
16 #include "afh.h"
17 #include "afs.h"
18 #include "server.h"
19 #include "send.h"
20 #include "close_on_fork.h"
21 #include "chunk_queue.h"
22 #include "vss.h"
23
24
25 /**
26  * Open a passive socket of given layer4 type.
27  *
28  * Set the resultig file descriptor to nonblocking mode and add it to the list
29  * of fds that are being closed in the child process when the server calls
30  * fork().
31  *
32  * \param l4type The transport-layer protocol.
33  * \param port The port number.
34  *
35  * \return The listening fd on success, negative on errors.
36  */
37 int open_sender(unsigned l4type, int port)
38 {
39         int fd, ret = para_listen(AF_UNSPEC, l4type, port);
40
41         if (ret < 0)
42                 return ret;
43         fd = ret;
44         ret = mark_fd_nonblocking(fd);
45         if (ret < 0) {
46                 close(fd);
47                 return ret;
48         }
49         add_close_on_fork_list(fd);
50         return fd;
51 }
52
53 /**
54  * Shut down a connected client.
55  *
56  * \param sc The client to be shut down.
57  *
58  * Close the file descriptor given by \a sc, remove it from the close-on-fork
59  * list, destroy the chunk queue of this client, delete the client from the
60  * list of connected clients and free the sender_client struct.
61  */
62 void shutdown_client(struct sender_client *sc)
63 {
64         PARA_INFO_LOG("shutting down %s on fd %d\n", sc->name, sc->fd);
65         free(sc->name);
66         close(sc->fd);
67         del_close_on_fork_list(sc->fd);
68         cq_destroy(sc->cq);
69         list_del(&sc->node);
70         free(sc->private_data);
71         free(sc);
72 }
73
74 /**
75  * Write a buffer to a non-blocking file descriptor.
76  *
77  * \param fd The file descriptor.
78  * \param buf the buffer to write.
79  * \param len the number of bytes of \a buf.
80  * \param max_bytes_per_write Do not write more than that many bytes at once.
81  *
82  * If \a max_bytes_per_write is non-zero, do not send more than that many bytes
83  * per write().
84  *
85  * EAGAIN is not considered an error condition.  For example CCID3 has a
86  * sending wait queue which fills up and is emptied asynchronously. The EAGAIN
87  * case means that there is currently no space in the wait queue, but this can
88  * change at any moment.
89  *
90  * \return Negative on errors, number of bytes written else.
91  */
92 static int write_nonblock(int fd, const char *buf, size_t len,
93                 size_t max_bytes_per_write)
94 {
95         size_t written = 0;
96         int ret = 0;
97
98         while (written < len) {
99                 size_t num = len - written;
100
101                 if (max_bytes_per_write && max_bytes_per_write < num)
102                         num = max_bytes_per_write;
103                 ret = write(fd, buf + written, num);
104                 if (ret < 0 && errno == EAGAIN)
105                         return written;
106                 if (ret < 0)
107                         return -ERRNO_TO_PARA_ERROR(errno);
108                 written += ret;
109         }
110         return written;
111 }
112
113 static int queue_chunk_or_shutdown(struct sender_client *sc,
114                 long unsigned chunk_num, size_t sent)
115 {
116         int ret = cq_enqueue(sc->cq, chunk_num, sent);
117         if (ret < 0)
118                 shutdown_client(sc);
119         return ret;
120 }
121
122 /* return: negative on errors, zero if not everything was sent, one otherwise */
123 static int send_queued_chunks(struct sender_client *sc,
124                 size_t max_bytes_per_write)
125 {
126         struct queued_chunk *qc;
127         while ((qc = cq_peek(sc->cq))) {
128                 char *buf;
129                 size_t len;
130                 int ret;
131                 cq_get(qc, &buf, &len);
132                 ret = write_nonblock(sc->fd, buf, len, max_bytes_per_write);
133                 if (ret < 0)
134                         return ret;
135                 cq_update(sc->cq, ret);
136                 if (ret != len)
137                         return 0;
138                 cq_dequeue(sc->cq);
139         }
140         return 1;
141 }
142
143 /**
144  * Send one chunk of audio data to a connected client.
145  *
146  * \param sc The client.
147  * \param max_bytes_per_write Split writes to chunks of at most that many bytes.
148  * \param current_chunk The number of the chunk to write.
149  * \param buf The data to write.
150  * \param len The number of bytes of \a buf.
151  *
152  * On errors, the client is shut down. If only a part of the buffer could be
153  * written, the remainder is put into the chunk queue for that client.
154  */
155 void send_chunk(struct sender_client *sc, size_t max_bytes_per_write,
156                 long unsigned current_chunk, const char *buf, size_t len)
157 {
158         int ret;
159
160         if (!sc->header_sent && current_chunk) {
161                 size_t header_len;
162                 char *header_buf = vss_get_header(&header_len);
163                 if (header_buf && header_len > 0) {
164                         ret = queue_chunk_or_shutdown(sc, -1U, 0);
165                         if (ret < 0)
166                                 goto out;
167                 }
168                 sc->header_sent = 1;
169         }
170         ret = send_queued_chunks(sc, max_bytes_per_write);
171         if (ret < 0) {
172                 shutdown_client(sc);
173                 goto out;
174         }
175         if (!len)
176                 goto out;
177         if (!ret) { /* still data left in the queue */
178                 ret = queue_chunk_or_shutdown(sc, current_chunk, 0);
179                 goto out;
180         }
181         ret = write_nonblock(sc->fd, buf, len, max_bytes_per_write);
182         if (ret < 0) {
183                 shutdown_client(sc);
184                 goto out;
185         }
186         if (ret != len)
187                 ret = queue_chunk_or_shutdown(sc, current_chunk, ret);
188 out:
189         if (ret < 0)
190                 PARA_NOTICE_LOG("%s\n", para_strerror(-ret));
191 }