Add forward error correction code to the udp sender/receiver.
[paraslash.git] / udp_send.c
1 /*
2 * Copyright (C) 2005-2009 Andre Noll <maan@systemlinux.org>
3 *
4 * Licensed under the GPL v2. For licencing details see COPYING.
5 */
6
7 /** \file udp_send.c Para_server's udp sender. */
8
9
10 #include <sys/time.h>
11 #include <dirent.h>
12 #include <net/if.h>
13
14 #include "server.cmdline.h"
15 #include "para.h"
16 #include "error.h"
17 #include "string.h"
18 #include "afh.h"
19 #include "afs.h"
20 #include "server.h"
21 #include "vss.h"
22 #include "list.h"
23 #include "send.h"
24 #include "portable_io.h"
25 #include "net.h"
26 #include "fd.h"
27 #include "sched.h"
28 #include "close_on_fork.h"
29 #include "chunk_queue.h"
30
31 /** Describes one entry in the list of targets for the udp sender. */
32 struct udp_target {
33 /** The position of this target in the list of targets. */
34 struct list_head node;
35 /** The hostname (DNS name or IPv4/v6 address string). */
36 char host[MAX_HOSTLEN];
37 /** The UDP port. */
38 int port;
39 /** The socket fd. */
40 int fd;
41 /** The list of queued chunks for this fd. */
42 struct chunk_queue *cq;
43 struct fec_client *fc;
44 struct fec_client_parms fcp;
45 };
46
47 static struct list_head targets;
48 static int sender_status;
49
50 static void udp_close_target(struct udp_target *ut)
51 {
52 if (ut->fd < 0)
53 return;
54 close(ut->fd);
55 del_close_on_fork_list(ut->fd);
56 cq_destroy(ut->cq);
57 ut->cq = NULL;
58 ut->fd = -1;
59 }
60
61 static void udp_delete_target(struct udp_target *ut, const char *msg)
62 {
63 PARA_NOTICE_LOG("deleting %s#%d (%s) from list\n", ut->host,
64 ut->port, msg);
65 udp_close_target(ut);
66 vss_del_fec_client(ut->fc);
67 list_del(&ut->node);
68 free(ut);
69 }
70
71 /**
72 * Perform AF-independent multicast sender setup.
73 *
74 * \param fd The connected socket descriptor.
75 * \param ttl UDPv4 multicast TTL or UDPv6 multicast number of hops.
76 * Use -1 to mean default, 0..255 otherwise.
77 * \param iface The outgoing multicast interface, or NULL for the default.
78 *
79 * \return Zero if okay, negative on error.
80 */
81 static int mcast_sender_setup(struct udp_target *ut, int ttl, char *iface)
82 {
83 struct sockaddr_storage ss;
84 socklen_t sslen = sizeof(ss);
85
86 const int on = 1;
87 int id = iface == NULL ? 0 : if_nametoindex(iface);
88
89 if (getpeername(ut->fd, (struct sockaddr *)&ss, &sslen) < 0)
90 goto err;
91
92 if (iface != NULL && id == 0)
93 PARA_WARNING_LOG("could not resolve interface %s, using default", iface);
94
95 /* RFC 3493, 5.2: -1 means 'use kernel default' */
96 if (ttl < 0 || ttl > 255)
97 ttl = -1;
98
99 switch (ss.ss_family) {
100 case AF_INET:
101 if (!IN_MULTICAST(htonl(((struct sockaddr_in *)&ss)->sin_addr.s_addr)))
102 return 0;
103 if (id != 0) {
104 #ifdef HAVE_IP_MREQN
105 struct ip_mreqn mn;
106
107 memset(&mn, 0, sizeof(mn));
108 mn.imr_ifindex = id;
109 if (setsockopt(ut->fd, IPPROTO_IP, IP_MULTICAST_IF, &mn, sizeof(mn)) < 0)
110 goto err;
111 #else
112 PARA_ERROR_LOG("No support for setting outgoing IPv4 mcast interface.");
113 #endif
114 }
115 /*
116 * Enable receiving multicast messages generated on the local host
117 * At least on Linux, this is enabled by default.
118 */
119 if (setsockopt(ut->fd, IPPROTO_IP, IP_MULTICAST_LOOP, &on, sizeof(on)) < 0)
120 break;
121
122 /* Default: use local subnet (do not flood out into the WAN) */
123 if (ttl == -1)
124 ttl = 1;
125 if (setsockopt(ut->fd, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)) < 0)
126 break;
127 return 0;
128 case AF_INET6:
129 if (!IN6_IS_ADDR_MULTICAST(&((struct sockaddr_in6 *)&ss)->sin6_addr))
130 return 0;
131 if (id != 0 &&
132 setsockopt(ut->fd, IPPROTO_IPV6, IPV6_MULTICAST_IF, &id, sizeof(id)) < 0)
133 break;
134 if (setsockopt(ut->fd, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, &on, sizeof(on)) < 0)
135 break;
136 if (setsockopt(ut->fd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &ttl, sizeof(ttl)) < 0)
137 break;
138 return 0;
139 default:
140 PARA_ERROR_LOG("address family %d not supported", ss.ss_family);
141 return -E_ADDRESS_LOOKUP;
142 }
143 err:
144 return -ERRNO_TO_PARA_ERROR(errno);
145 }
146
147 /** The maximal size of the per-target chunk queue. */
148 #define UDP_CQ_BYTES 40000
149
150 static int udp_init_session(struct udp_target *ut)
151 {
152 int ret;
153 char *iface = NULL;
154
155 if (ut->fd >= 0) /* nothing to do */
156 return 0;
157
158 ret = makesock(AF_UNSPEC, IPPROTO_UDP, 0, ut->host, ut->port);
159 if (ret < 0)
160 return ret;
161 ut->fd = ret;
162
163 if (conf.udp_mcast_iface_given)
164 iface = conf.udp_mcast_iface_arg;
165
166 ret = mcast_sender_setup(ut, conf.udp_ttl_arg, iface);
167 if (ret < 0) {
168 close(ut->fd);
169 return ret;
170 }
171
172 ret = mark_fd_nonblocking(ut->fd);
173 if (ret < 0) {
174 close(ut->fd);
175 return ret;
176 }
177 add_close_on_fork_list(ut->fd);
178 ut->cq = cq_new(UDP_CQ_BYTES);
179 PARA_NOTICE_LOG("sending to udp %s#%d\n", ut->host, ut->port);
180 return 1;
181 }
182
183 static void udp_shutdown_targets(void)
184 {
185 struct udp_target *ut, *tmp;
186 const char *buf = NULL;
187 size_t len = 0; /* STFU, gcc */
188
189 list_for_each_entry_safe(ut, tmp, &targets, node) {
190 if (ut->fd < 0)
191 continue;
192 if (!buf)
193 len = vss_get_fec_eof_packet(&buf);
194 write(ut->fd, buf, len);
195 udp_close_target(ut);
196 }
197 }
198
199 static int udp_com_on(__a_unused struct sender_command_data *scd)
200 {
201 sender_status = SENDER_ON;
202 return 1;
203 }
204
205 static int udp_com_off(__a_unused struct sender_command_data *scd)
206 {
207 udp_shutdown_targets();
208 sender_status = SENDER_OFF;
209 return 1;
210 }
211
212 static int udp_com_delete(struct sender_command_data *scd)
213 {
214 struct udp_target *ut, *tmp;
215
216 list_for_each_entry_safe(ut, tmp, &targets, node) {
217 /* Unspecified port means wildcard port match */
218 if (scd->port > 0 && scd->port != ut->port)
219 continue;
220 if (strcmp(ut->host, scd->host))
221 continue;
222 udp_delete_target(ut, "com_delete");
223 }
224 return 1;
225 }
226
227 static int udp_send_fec(char *buf, size_t len, void *private_data)
228 {
229 struct udp_target *ut = private_data;
230 int ret = udp_init_session(ut);
231
232 if (ret < 0)
233 goto fail;
234 ret = send_queued_chunks(ut->fd, ut->cq, 0);
235 if (ret < 0)
236 goto fail;
237 if (!len)
238 return 0;
239 if (!ret) { /* still data left in the queue */
240 ret = cq_enqueue(ut->cq, buf, len);
241 if (ret < 0)
242 goto fail;
243 }
244 ret = write_nonblock(ut->fd, buf, len, 0);
245 if (ret < 0)
246 goto fail;
247 if (ret != len) {
248 ret = cq_enqueue(ut->cq, buf + ret, len - ret);
249 if (ret < 0)
250 goto fail;
251 }
252 return 1;
253 fail:
254 udp_delete_target(ut, para_strerror(-ret));
255 return ret;
256 }
257
258 static void udp_add_target(const char *host, int port)
259 {
260 struct udp_target *ut = para_calloc(sizeof(struct udp_target));
261
262 strncpy(ut->host, host, sizeof(ut->host));
263 ut->port = port > 0 ? port : conf.udp_default_port_arg;
264 ut->fd = -1; /* not yet connected */
265 PARA_INFO_LOG("adding to target list (%s#%d)\n",
266 ut->host, ut->port);
267 para_list_add(&ut->node, &targets);
268 ut->fcp.slices_per_group = 16;
269 ut->fcp.data_slices_per_group = 14;
270 ut->fcp.max_slice_bytes = 1400;
271 ut->fcp.send = udp_send_fec;
272 ut->fcp.private_data = ut;
273 vss_add_fec_client(&ut->fcp, &ut->fc);
274 }
275
276 static int udp_com_add(struct sender_command_data *scd)
277 {
278 udp_add_target(scd->host, scd->port);
279 return 1;
280 }
281
282 static char *udp_info(void)
283 {
284 struct udp_target *ut;
285 char *ret, *tgts = NULL;
286
287 list_for_each_entry(ut, &targets, node) {
288 bool is_v6 = strchr(ut->host, ':') != NULL;
289 char *tmp = make_message("%s%s%s%s:%d ", tgts ? : "",
290 is_v6 ? "[" : "", ut->host,
291 is_v6 ? "]" : "", ut->port);
292 free(tgts);
293 tgts = tmp;
294 }
295 ret = make_message(
296 "udp sender:\n"
297 "\tstatus: %s\n"
298 "\tport: udp %d\n"
299 "\ttargets: %s\n",
300 (sender_status == SENDER_ON)? "on" : "off",
301 conf.udp_default_port_arg,
302 tgts? tgts : "(none)"
303 );
304 free(tgts);
305 return ret;
306 }
307
308 static void udp_init_target_list(void)
309 {
310 char host[MAX_HOSTLEN];
311 int port, i;
312
313 INIT_LIST_HEAD(&targets);
314 for (i = 0; i < conf.udp_target_given; i++)
315 if (parse_url(conf.udp_target_arg[i], host,
316 sizeof(host), &port) == NULL)
317 PARA_CRIT_LOG("syntax error for udp target option "
318 "#%d, ignoring\n", i);
319 else
320 udp_add_target(host, port);
321 }
322
323 static char *udp_help(void)
324 {
325 return make_message(
326 "usage: {on|off}\n"
327 "usage: {add|delete} host[:port]\n"
328 "examples: add 224.0.1.38:1500 (IPv4 multicast)\n"
329 " add 10.10.1.42 (using default port)\n"
330 " add [FF05::42]:1500 (IPv6 multicast)\n"
331 " add [::1] (IPv6 localhost/default port)\n"
332 " delete myhost.net (host with port wildcard)\n"
333 " delete [badc0de::1] (IPv6 with port wildcard)\n"
334 );
335 }
336
337 /**
338 * The init function of para_server's udp sender.
339 *
340 * \param s Pointer to the http sender struct.
341 *
342 * It initializes all function pointers of \a s and the list of udp targets.
343 */
344 void udp_send_init(struct sender *s)
345 {
346 INIT_LIST_HEAD(&targets);
347 s->info = udp_info;
348 s->help = udp_help;
349 s->send = NULL;
350 s->pre_select = NULL;
351 s->post_select = NULL;
352 s->shutdown_clients = udp_shutdown_targets;
353 s->client_cmds[SENDER_ON] = udp_com_on;
354 s->client_cmds[SENDER_OFF] = udp_com_off;
355 s->client_cmds[SENDER_DENY] = NULL;
356 s->client_cmds[SENDER_ALLOW] = NULL;
357 s->client_cmds[SENDER_ADD] = udp_com_add;
358 s->client_cmds[SENDER_DELETE] = udp_com_delete;
359 sender_status = SENDER_OFF;
360 udp_init_target_list();
361 if (!conf.udp_no_autostart_given)
362 sender_status = SENDER_ON;
363 PARA_DEBUG_LOG("udp sender init complete\n");
364 }