Intoduce send_common.c and use it from the dccp sender.
[paraslash.git] / send_common.c
1 /*
2 * Copyright (C) 2005-2008 Andre Noll <maan@systemlinux.org>
3 *
4 * Licensed under the GPL v2. For licencing details see COPYING.
5 */
6
7 /** \file send_common.c Functions used by more than one paraslash sender. */
8
9 #include <dirent.h>
10 #include "para.h"
11 #include "error.h"
12 #include "string.h"
13 #include "fd.h"
14 #include "net.h"
15 #include "list.h"
16 #include "afh.h"
17 #include "afs.h"
18 #include "server.h"
19 #include "send.h"
20 #include "close_on_fork.h"
21 #include "chunk_queue.h"
22 #include "vss.h"
23
24
25 /**
26 * Open a passive socket of given layer4 type.
27 *
28 * Set the resultig file descriptor to nonblocking mode and add it to the list
29 * of fds that are being closed in the child process when the server calls
30 * fork().
31 *
32 * \param l4type The transport-layer protocol.
33 * \param port The port number.
34 *
35 * \return The listening fd on success, negative on errors.
36 */
37 int open_sender(unsigned l4type, int port)
38 {
39 int fd, ret = para_listen(AF_UNSPEC, l4type, port);
40
41 if (ret < 0)
42 return ret;
43 fd = ret;
44 ret = mark_fd_nonblocking(fd);
45 if (ret < 0) {
46 close(fd);
47 return ret;
48 }
49 add_close_on_fork_list(fd);
50 return fd;
51 }
52
53 /**
54 * Shut down a connected client.
55 *
56 * \param sc The client to be shut down.
57 *
58 * Close the file descriptor given by \a sc, remove it from the close-on-fork
59 * list, destroy the chunk queue of this client, delete the client from the
60 * list of connected clients and free the sender_client struct.
61 */
62 void shutdown_client(struct sender_client *sc)
63 {
64 PARA_INFO_LOG("shutting down %s on fd %d\n", sc->name, sc->fd);
65 free(sc->name);
66 close(sc->fd);
67 del_close_on_fork_list(sc->fd);
68 cq_destroy(sc->cq);
69 list_del(&sc->node);
70 free(sc->private_data);
71 free(sc);
72 }
73
74 /**
75 * Write a buffer to a non-blocking file descriptor.
76 *
77 * \param fd The file descriptor.
78 * \param buf the buffer to write.
79 * \param len the number of bytes of \a buf.
80 * \param max_bytes_per_write Do not write more than that many bytes at once.
81 *
82 * If \a max_bytes_per_write is non-zero, do not send more than that many bytes
83 * per write().
84 *
85 * EAGAIN is not considered an error condition. For example CCID3 has a
86 * sending wait queue which fills up and is emptied asynchronously. The EAGAIN
87 * case means that there is currently no space in the wait queue, but this can
88 * change at any moment.
89 *
90 * \return Negative on errors, number of bytes written else.
91 */
92 static int write_nonblock(int fd, const char *buf, size_t len,
93 size_t max_bytes_per_write)
94 {
95 size_t written = 0;
96 int ret = 0;
97
98 while (written < len) {
99 size_t num = len - written;
100
101 if (max_bytes_per_write && max_bytes_per_write < num)
102 num = max_bytes_per_write;
103 ret = write(fd, buf + written, num);
104 if (ret < 0 && errno == EAGAIN)
105 return written;
106 if (ret < 0)
107 return -ERRNO_TO_PARA_ERROR(errno);
108 written += ret;
109 }
110 return written;
111 }
112
113 static int queue_chunk_or_shutdown(struct sender_client *sc,
114 long unsigned chunk_num, size_t sent)
115 {
116 int ret = cq_enqueue(sc->cq, chunk_num, sent);
117 if (ret < 0)
118 shutdown_client(sc);
119 return ret;
120 }
121
122 /* return: negative on errors, zero if not everything was sent, one otherwise */
123 static int send_queued_chunks(struct sender_client *sc,
124 size_t max_bytes_per_write)
125 {
126 struct queued_chunk *qc;
127 while ((qc = cq_peek(sc->cq))) {
128 char *buf;
129 size_t len;
130 int ret;
131 cq_get(qc, &buf, &len);
132 ret = write_nonblock(sc->fd, buf, len, max_bytes_per_write);
133 if (ret < 0)
134 return ret;
135 cq_update(sc->cq, ret);
136 if (ret != len)
137 return 0;
138 cq_dequeue(sc->cq);
139 }
140 return 1;
141 }
142
143 /**
144 * Send one chunk of audio data to a connected client.
145 *
146 * \param sc The client.
147 * \param max_bytes_per_write Split writes to chunks of at most that many bytes.
148 * \param current_chunk The number of the chunk to write.
149 * \param buf The data to write.
150 * \param len The number of bytes of \a buf.
151 *
152 * On errors, the client is shut down. If only a part of the buffer could be
153 * written, the remainder is put into the chunk queue for that client.
154 */
155 void send_chunk(struct sender_client *sc, size_t max_bytes_per_write,
156 long unsigned current_chunk, const char *buf, size_t len)
157 {
158 int ret;
159
160 if (!sc->header_sent && current_chunk) {
161 size_t header_len;
162 char *header_buf = vss_get_header(&header_len);
163 if (header_buf && header_len > 0) {
164 ret = queue_chunk_or_shutdown(sc, -1U, 0);
165 if (ret < 0)
166 goto out;
167 }
168 sc->header_sent = 1;
169 }
170 ret = send_queued_chunks(sc, max_bytes_per_write);
171 if (ret < 0) {
172 shutdown_client(sc);
173 goto out;
174 }
175 if (!len)
176 goto out;
177 if (!ret) { /* still data left in the queue */
178 ret = queue_chunk_or_shutdown(sc, current_chunk, 0);
179 goto out;
180 }
181 ret = write_nonblock(sc->fd, buf, len, max_bytes_per_write);
182 if (ret < 0) {
183 shutdown_client(sc);
184 goto out;
185 }
186 if (ret != len)
187 ret = queue_chunk_or_shutdown(sc, current_chunk, ret);
188 out:
189 if (ret < 0)
190 PARA_NOTICE_LOG("%s\n", para_strerror(-ret));
191 }