2 * Copyright (C) 2005-2009 Andre Noll <maan@systemlinux.org>
4 * Licensed under the GPL v2. For licencing details see COPYING.
6 /** \file udp_recv.c Paraslash's udp receiver */
12 #include "portable_io.h"
13 #include "udp_header.h"
18 #include "udp_recv.cmdline.h"
24 /** The size of the receiver node buffer. */
25 #define UDP_RECV_CHUNK_SIZE (128 * 1024)
28 * Data specific to the udp receiver.
30 * \sa \ref receiver, \ref receiver_node.
32 struct private_udp_recv_data
{
34 * Whether a header was received.
36 * A flag indicating whether this receiver already received a packet
37 * which contains the audio file header.
39 * This flag has no effect if the audio stream indicates that no extra
40 * headers will be sent (mp3, aac). Otherwise, all data packets are
41 * dropped until the header is received.
44 /** The socket file descriptor. */
46 /** Non-zero on short reads. */
48 /** Copied from the first audio header received. */
52 static void udp_recv_pre_select(struct sched
*s
, struct task
*t
)
54 struct receiver_node
*rn
= container_of(t
, struct receiver_node
, task
);
55 struct private_udp_recv_data
*purd
= rn
->private_data
;
57 para_fd_set(purd
->fd
, &s
->rfds
, &s
->max_fileno
);
60 static int enough_space(size_t nbytes
, size_t loaded
)
62 return nbytes
+ loaded
< UDP_RECV_CHUNK_SIZE
;
66 * Perform some sanity checks on an udp audio file header.
68 * return: negative on error, 0: discard data, 1: use data
70 static int examine_audio_header(struct private_udp_recv_data
*purd
,
71 struct udp_audio_header
*uah
, size_t packet_size
)
73 /* payload_len includes header */
74 if (uah
->payload_len
< uah
->header_len
)
75 return -E_UDP_BAD_HEADER
;
76 switch (uah
->packet_type
) {
80 purd
->have_header
= 1;
83 if (uah
->header_len
) /* header in no-header packet */
84 return -E_UDP_BAD_HEADER
;
86 case UDP_HEADER_PACKET
:
87 if (!uah
->header_len
) /** no header in header packet */
88 return -E_UDP_BAD_HEADER
;
90 default: /* bad packet type */
91 return -E_UDP_BAD_HEADER
;
93 /* check stream type */
94 if (uah
->stream_type
!= UDP_PLAIN_STREAM
&&
95 uah
->stream_type
!= UDP_HEADER_STREAM
)
96 return -E_UDP_BAD_STREAM_TYPE
;
97 if (purd
->stream_type
== UDP_UNKNOWN_STREAM
)
98 purd
->stream_type
= uah
->stream_type
;
99 /* stream type must not change */
100 if (uah
->stream_type
!= purd
->stream_type
)
101 return -E_UDP_BAD_STREAM_TYPE
;
102 if (!purd
->have_header
&& uah
->stream_type
== UDP_HEADER_STREAM
)
103 /* can't use the data, wait for header packet */
105 if (packet_size
< uah
->payload_len
+ UDP_AUDIO_HEADER_LEN
)
106 /* we read only a part of the package */
107 purd
->need_more
= uah
->payload_len
108 + UDP_AUDIO_HEADER_LEN
- packet_size
;
112 static int add_rn_output(struct receiver_node
*rn
, char *buf
, size_t len
)
116 if (!enough_space(len
, rn
->loaded
))
117 return -E_UDP_OVERRUN
;
118 memcpy(rn
->buf
+ rn
->loaded
, buf
, len
);
123 static void udp_recv_post_select(__a_unused
struct sched
*s
, struct task
*t
)
125 struct receiver_node
*rn
= container_of(t
, struct receiver_node
, task
);
126 struct private_udp_recv_data
*purd
= rn
->private_data
;
128 char tmpbuf
[UDP_RECV_CHUNK_SIZE
];
132 struct udp_audio_header uah
;
134 if (rn
->output_error
&& *rn
->output_error
< 0) {
135 t
->error
= *rn
->output_error
;
138 if (!FD_ISSET(purd
->fd
, &s
->rfds
))
140 ret
= recv_bin_buffer(purd
->fd
, tmpbuf
, UDP_RECV_CHUNK_SIZE
);
142 if (is_errno(ret
, EINTR
) || is_errno(ret
, EAGAIN
))
147 t
->error
= -E_RECV_EOF
;
154 if (!purd
->need_more
) {
155 ret
= read_udp_audio_header(tmpbuf
, packet_size
, &uah
);
158 goto success
; /* drop data */
160 num
= PARA_MIN(purd
->need_more
, (uint16_t)packet_size
);
162 t
->error
= add_rn_output(rn
, tmpbuf
, num
);
165 purd
->need_more
-= num
;
166 if (packet_size
<= num
)
169 memmove(tmpbuf
, tmpbuf
+ num
, packet_size
);
171 assert(!purd
->need_more
);
172 t
->error
= examine_audio_header(purd
, &uah
, packet_size
);
175 data_len
= uah
.payload_len
;
176 data_buf
= tmpbuf
+ UDP_AUDIO_HEADER_LEN
;
177 if (uah
.packet_type
== UDP_HEADER_PACKET
) {
178 if (purd
->have_header
) { /* skip header */
179 data_buf
+= uah
.header_len
;
180 data_len
-= uah
.header_len
;
181 } else { /* only use the header */
182 purd
->have_header
= 1;
183 data_len
= uah
.header_len
;
186 t
->error
= add_rn_output(rn
, data_buf
, data_len
);
192 static void udp_shutdown(void)
197 static void udp_recv_close(struct receiver_node
*rn
)
199 struct private_udp_recv_data
*purd
= rn
->private_data
;
203 free(rn
->private_data
);
207 static void *udp_recv_parse_config(int argc
, char **argv
)
210 struct udp_recv_args_info
*tmp
=
211 para_calloc(sizeof(struct udp_recv_args_info
));
213 ret
= udp_recv_cmdline_parser(argc
, argv
, tmp
)? -E_UDP_SYNTAX
: 1;
220 static int udp_recv_open(struct receiver_node
*rn
)
222 struct private_udp_recv_data
*purd
;
223 struct udp_recv_args_info
*c
= rn
->conf
;
226 rn
->buf
= para_calloc(UDP_RECV_CHUNK_SIZE
);
227 rn
->private_data
= para_calloc(sizeof(struct private_udp_recv_data
));
228 purd
= rn
->private_data
;
229 ret
= create_udp_recv_socket(c
->host_arg
, c
->port_arg
);
233 ret
= mark_fd_nonblocking(purd
->fd
);
236 purd
->stream_type
= UDP_UNKNOWN_STREAM
;
237 PARA_NOTICE_LOG("receiving from %s:%d, fd=%d\n", c
->host_arg
,
238 c
->port_arg
, purd
->fd
);
241 free(rn
->private_data
);
247 * The init function of the udp receiver.
249 * \param r Pointer to the receiver struct to initialize.
251 * Initialize all function pointers of \a r.
253 void udp_recv_init(struct receiver
*r
)
255 struct udp_recv_args_info dummy
;
257 udp_recv_cmdline_parser_init(&dummy
);
258 r
->shutdown
= udp_shutdown
;
259 r
->open
= udp_recv_open
;
260 r
->close
= udp_recv_close
;
261 r
->pre_select
= udp_recv_pre_select
;
262 r
->post_select
= udp_recv_post_select
;
263 r
->parse_config
= udp_recv_parse_config
;
264 r
->help
= (struct ggo_help
) {
265 .short_help
= udp_recv_args_info_help
,
266 .detailed_help
= udp_recv_args_info_detailed_help