From 141b0fd36edcad22ee07c65b101e90064d8567b6 Mon Sep 17 00:00:00 2001 From: Andre Noll Date: Sat, 10 Jan 2009 01:37:49 +0100 Subject: [PATCH] Replace the ortp sender/receiver by the generic udp sender/receiver. The ortp code was broken for quite some time. The new code is smaller and we drop the dependency on libortp. --- Doxyfile | 1 - FEATURES | 2 +- INSTALL | 9 +- Makefile.in | 6 - NEWS | 11 +- README | 8 +- REQUIREMENTS | 5 - configure.ac | 65 +------ error.h | 11 +- net.c | 139 ++++++++++++++ net.h | 3 + ortp.h | 50 ----- ortp_recv.c | 299 ----------------------------- ortp_send.c | 350 ---------------------------------- recv.h | 15 +- send.h | 2 +- server.c | 2 +- server.ggo | 46 ++--- udp_header.h | 138 ++++++++++++++ udp_recv.c | 225 ++++++++++++++++++++++ ortp_recv.ggo => udp_recv.ggo | 10 +- udp_send.c | 291 ++++++++++++++++++++++++++++ vss.c | 8 +- 23 files changed, 859 insertions(+), 837 deletions(-) delete mode 100644 ortp.h delete mode 100644 ortp_recv.c delete mode 100644 ortp_send.c create mode 100644 udp_header.h create mode 100644 udp_recv.c rename ortp_recv.ggo => udp_recv.ggo (53%) create mode 100644 udp_send.c diff --git a/Doxyfile b/Doxyfile index 45acb188..43209a52 100644 --- a/Doxyfile +++ b/Doxyfile @@ -1060,7 +1060,6 @@ INCLUDE_FILE_PATTERNS = # instead of the = operator. PREDEFINED = HAVE_MAD \ - HAVE_ORTP \ HAVE_FAAD \ HAVE_OGGVORBIS \ __GNUC__=4 \ diff --git a/FEATURES b/FEATURES index 100a49c5..8e540a8e 100644 --- a/FEATURES +++ b/FEATURES @@ -6,7 +6,7 @@ Features * Runs on Linux, Mac OS, FreeBSD, NetBSD, Solaris and probably other Unixes * Mp3, ogg vorbis, aac(m4a) support - * Local or remote http, dccp, and ortp network audio streaming + * Local or remote http, dccp, and udp network audio streaming * IPv6 support * Volume normalizer * Stream grabbing at any point in the filter chain diff --git a/INSTALL b/INSTALL index 9cc36f41..4d89153f 100644 --- a/INSTALL +++ b/INSTALL @@ -11,11 +11,10 @@ See << REQUIREMENTS >> -for a list of required software. You don't need everything listed -there. In particular, mp3, ogg vorbis and aac, ortp support is -optional. Autoconf will detect what is installed on your system -and will only try to build those executables that can be built with -your setup. +for a list of required software. You don't need everything listed there. In +particular, mp3, ogg vorbis and aac support are all optional. Autoconf will +detect what is installed on your system and will only try to build those +executables that can be built with your setup. Note that no special library (not even the mp3 decoding library libmad) is needed for para_server if you only want to stream mp3 files. diff --git a/Makefile.in b/Makefile.in index 4e6ef962..30bc63fb 100644 --- a/Makefile.in +++ b/Makefile.in @@ -171,12 +171,6 @@ web/%.man.in.html: man/man1/%.1 man2html $< | sed -e '/^<\/BODY>/,$$d' -e '1,/<\/HEAD>/d' > $@ -ortp_recv.o: ortp_recv.c - $(CC) -c $(CPPFLAGS) $(DEBUG_CPPFLAGS) @ortp_cppflags@ $< - -ortp_send.o: ortp_send.c - $(CC) -c $(CPPFLAGS) $(DEBUG_CPPFLAGS) @ortp_cppflags@ $< - oggdec_filter.o: oggdec_filter.c $(CC) -c $(CPPFLAGS) $(DEBUG_CPPFLAGS) @oggvorbis_cppflags@ $< ogg_afh.o: ogg_afh.c diff --git a/NEWS b/NEWS index 8418cd8a..1b0de7b9 100644 --- a/NEWS +++ b/NEWS @@ -4,11 +4,20 @@ NEWS ---------------------------------------------- 0.3.4 (to be announced) "elliptic inheritance" ---------------------------------------------- + +The new udp sender and several small improvements. + + - The udp sender replaces the ortp sender. The new code is + both smaller and cleaner than the old ortp sender/receiver + code. As the udp sender does not depend on any special + libraries, it is built unconditionally. The default port + for udp streaming now defaults to 8000, like for the http + and the dccp senders/receivers. - new options for mp3dec: --ignore-crc, --bufsize + - new audiod option: --config-file. - Improved help/man pages: The documentation of para_audiod, para_recv, para_filter and para_write now also contains all options of the available receivers/filters/writers. - - new audiod option: --config-file. -------------------------------------------- 0.3.3 (2008-12-01) "axiomatic perspectivity" diff --git a/README b/README index 3308d0b3..dd0dc76a 100644 --- a/README +++ b/README @@ -18,7 +18,7 @@ accepts commands such as play, stop, pause, next from authenticated clients. However, there are many more commands. It supports three built-in network streaming methods (senders): http, dccp, -or rtp. +or udp. * The http sender is recommended for public streams that can be played by any player like mpg123, xmms, itunes, winamp... @@ -26,7 +26,7 @@ or rtp. * The dccp sender requires kernel support for the datagram congestion control protocol. - * The ortp sender is recommended for multicast LAN streaming. + * The udp sender is recommended for multicast LAN streaming. It is possible to activate more than one sender simultaneously. @@ -58,7 +58,7 @@ Its features include Despite of all these features, paraslash is lightweight. The stripped binary of para_server with all its features compiled in -mp3/ogg/aac support, http/dccp/ortp support) is about 160K on i386 +mp3/ogg/aac support, http/dccp/udp support) is about 160K on i386 under Linux. para_audiod (see below) is even smaller. ----------- @@ -79,7 +79,7 @@ encrypted with a symmetric rc4 session key. para_recv --------- -A command line http/dccp/ortp stream grabber. The http mode of this +A command line http/dccp/udp stream grabber. The http mode of this tool can be used to receive data from any http streaming source. ----------- diff --git a/REQUIREMENTS b/REQUIREMENTS index b03c4c3f..889ad5f9 100644 --- a/REQUIREMENTS +++ b/REQUIREMENTS @@ -42,11 +42,6 @@ Optional features: http://www.audiocoding.com/. Debian package: libfaad-dev. - - *ortp*: - If you intend to use the optional ortp streamer, you'll - need libortp which is contained in the linphone source code: - http://www.linphone.org/index.php/eng - - On Linux, you'll need to have ALSA's development package installed. The Debian package is called libasound2-dev. diff --git a/configure.ac b/configure.ac index 664280ac..eb1cea63 100644 --- a/configure.ac +++ b/configure.ac @@ -83,17 +83,18 @@ daemon stat crypt http_send close_on_fork ipc acl afh fade amp_filter dccp_send fd user_list chunk_queue afs osl aft mood score attribute blob ringbuffer playlist sha1 rbtree sched audiod grab_client filter_common wav_filter compress_filter http_recv dccp_recv recv_common write_common file_write audiod_command -client_common recv stdout filter stdin audioc write client fsck exec send_common ggo" +client_common recv stdout filter stdin audioc write client fsck exec send_common ggo +udp_recv udp_send" all_executables="server recv filter audioc write client fsck afh" -recv_cmdline_objs="recv.cmdline http_recv.cmdline dccp_recv.cmdline" +recv_cmdline_objs="recv.cmdline http_recv.cmdline dccp_recv.cmdline udp_recv.cmdline" recv_errlist_objs="http_recv recv_common recv time string net dccp_recv - fd sched stdout ggo" + fd sched stdout ggo udp_recv" recv_ldflags="" -receivers=" http dccp" -senders=" http dccp" +receivers=" http dccp udp" +senders=" http dccp udp" filter_cmdline_objs="filter.cmdline compress_filter.cmdline amp_filter.cmdline" filter_errlist_objs="filter_common wav_filter compress_filter filter string stdin stdout sched fd amp_filter ggo" @@ -106,11 +107,11 @@ audioc_ldflags="" audiod_cmdline_objs="audiod.cmdline grab_client.cmdline compress_filter.cmdline http_recv.cmdline dccp_recv.cmdline file_write.cmdline client.cmdline - audiod_command_list amp_filter.cmdline" + audiod_command_list amp_filter.cmdline udp_recv.cmdline" audiod_errlist_objs="audiod signal string daemon stat net time grab_client filter_common wav_filter compress_filter amp_filter http_recv dccp_recv recv_common fd sched write_common file_write audiod_command crypt - client_common ggo" + client_common ggo udp_recv" audiod_ldflags="" audiod_audio_formats="" @@ -122,7 +123,7 @@ server_cmdline_objs="server.cmdline server_command_list afs_command_list" server_errlist_objs="server afh_common mp3_afh vss command net string signal time daemon stat crypt http_send close_on_fork ipc dccp_send fd user_list chunk_queue afs osl aft mood score attribute - blob playlist sha1 rbtree sched acl send_common" + blob playlist sha1 rbtree sched acl send_common udp_send" server_ldflags="" server_audio_formats=" mp3" @@ -500,52 +501,6 @@ fi CPPFLAGS="$OLD_CPPFLAGS" LDFLAGS="$OLD_LDFLAGS" LIBS="$OLD_LIBS" -########################################################################### ortp -have_ortp="yes" -OLD_CPPFLAGS="$CPPFLAGS" -OLD_LD_FLAGS="$LDFLAGS" -OLD_LIBS="$LIBS" -AC_ARG_WITH(ortp_headers, [AC_HELP_STRING(--with-ortp-headers=dir, - [look for ortp/ortp.h also in dir])]) -if test -n "$with_ortp_headers"; then - ortp_cppflags="-I$with_ortp_headers" - CPPFLAGS="$CPPFLAGS $ortp_cppflags" -fi -AC_ARG_WITH(ortp_libs, [AC_HELP_STRING(--with-ortp-libs=dir, - [look for libortp also in dir])]) -if test -n "$with_ortp_libs"; then - ortp_libs="-L$with_ortp_libs" - LDFLAGS="$LDFLAGS $ortp_libs" -fi -AC_CHECK_HEADERS([ortp/ortp.h], [], [have_ortp="no"]) -AC_CHECK_LIB([ortp], [ortp_init], [], [have_ortp="no"]) -if test "$have_ortp" = "yes"; then - all_errlist_objs="$all_errlist_objs ortp_recv ortp_send" - - recv_cmdline_objs="$recv_cmdline_objs ortp_recv.cmdline" - recv_errlist_objs="$recv_errlist_objs ortp_recv" - - audiod_cmdline_objs="$audiod_cmdline_objs ortp_recv.cmdline" - audiod_errlist_objs="$audiod_errlist_objs ortp_recv" - - server_errlist_objs="$server_errlist_objs ortp_send" - - recv_ldflags="$recv_ldflags $ortp_libs -lortp" - server_ldflags="$server_ldflags $ortp_libs -lortp" - audiod_ldflags="$audiod_ldflags $ortp_libs -lortp" - - receivers="$receivers ortp" - senders="$senders ortp" - AC_DEFINE(HAVE_ORTP, 1, [define to 1 to turn on ortp support]) - AC_SUBST(ortp_cppflags) - AC_SUBST(ortp_libs) -else - AC_MSG_NOTICE([deactivating ortp sender/receiver]) -fi -CPPFLAGS="$OLD_CPPFLAGS" -LDFLAGS="$OLD_LDFLAGS" -LIBS="$OLD_LIBS" - AC_SUBST(extra_binaries, [$extras]) AC_SUBST(extra_filter_objs, [$extra_filter_objs]) @@ -735,7 +690,7 @@ AC_DEFINE_UNQUOTED(AUDIOD_AUDIO_FORMAT_ARRAY, $names, array of audio formats sup AC_OUTPUT AC_MSG_NOTICE([creating Makefile.deps]) -gcc -MM -MG $faad_cppflags $mad_cppflags $ortp_cppflags $oggvorbis_cppflags *.c > Makefile.deps +gcc -MM -MG $faad_cppflags $mad_cppflags $oggvorbis_cppflags *.c > Makefile.deps AC_MSG_NOTICE([ paraslash configuration: ~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/error.h b/error.h index 37ff48a9..28c7e867 100644 --- a/error.h +++ b/error.h @@ -17,7 +17,7 @@ DEFINE_ERRLIST_OBJECT_ENUM; #define TIME_ERRORS #define CLOSE_ON_FORK_ERRORS #define DAEMON_ERRORS -#define ORTP_SEND_ERRORS +#define UDP_SEND_ERRORS #define GUI_ERRORS #define RINGBUFFER_ERRORS #define SCORE_ERRORS @@ -207,13 +207,12 @@ extern const char **para_errlist[]; PARA_ERROR(RECV_PATTERN, "did not receive expected pattern"), \ -#define ORTP_RECV_ERRORS \ - PARA_ERROR(MSG_TO_BUF, "failed to extract rtp packet"), \ - PARA_ERROR(ORTP_SYNTAX, "ortp syntax error"), \ - PARA_ERROR(TOO_MANY_BAD_CHUNKS, "too many consecutive bad chunks"), \ +#define UDP_RECV_ERRORS \ + PARA_ERROR(UDP_NO_MAGIC, "not enough magic"), \ + PARA_ERROR(UDP_SYNTAX, "udp_recv syntax error"), \ + PARA_ERROR(UDP_SHORT_PACKET, "udp data packet too short"), \ PARA_ERROR(INVALID_HEADER, "invalid header packet"), \ PARA_ERROR(OVERRUN, "output buffer overrun"), \ - PARA_ERROR(ORTP_RECV_EOF, "ortp_recv: end of file"), \ #define HTTP_RECV_ERRORS \ diff --git a/net.c b/net.c index 9309ac1f..99183030 100644 --- a/net.c +++ b/net.c @@ -755,3 +755,142 @@ out: free(buf); return ret; } + +static int resolve(const char *hostname, unsigned short port, + struct sockaddr_in *addr) +{ + struct hostent *host; + + assert(hostname); + host = gethostbyname(hostname); + if (!host) + return -ERRNO_TO_PARA_ERROR(h_errno); + if (addr) { + memcpy(&addr->sin_addr, host->h_addr_list[0], host->h_length); + addr->sin_port = port; + } + return 1; +} + +/* + * Create an UDP socket. + * + * If the given address is a multicast adress, the socket will be set + * to use the multicast TTL ttl and sets the datagrams to loop back. + * + * \return The fd of the socket on success, negative on errors. + */ +static int create_udp_socket(struct sockaddr_in *addr, + unsigned short port, unsigned char ttl) +{ + int ret, fd, yes = 1; + + assert(addr); + ret = socket(PF_INET, SOCK_DGRAM, 0); + if (ret < 0) + return -ERRNO_TO_PARA_ERROR(errno); + fd = ret; + /* reuse addresses */ + ret = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); + if (ret < 0) { + ret = -ERRNO_TO_PARA_ERROR(errno); + goto err; + } + addr->sin_family = AF_INET; + addr->sin_port = htons(port); + /* set the TTL and turn on multicast loop */ + if (IN_MULTICAST(htonl(addr->sin_addr.s_addr))) { + unsigned char loop = 1; + ret = setsockopt(fd, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, + sizeof(ttl)); + if (ret < 0) { + ret = -ERRNO_TO_PARA_ERROR(errno); + goto err; + } + ret = setsockopt(fd, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, + sizeof(loop)); + if (ret < 0) { + ret = -ERRNO_TO_PARA_ERROR(errno); + goto err; + } + } + return fd; +err: + close(fd); + return ret; +} + +/** + * Create and connect a sending UDP socket. + * + * \param hostname Where to send to (name or IPv4 address). + * \param port The udp port to use. + * \param ttl Time to live (only relevant for multicast). + * + * \return The fd of the socket on success, negative on error. + */ +int create_udp_send_socket(char *hostname, unsigned short port, + unsigned char ttl) +{ + struct sockaddr_in addr; + int fd, ret = resolve(hostname, port, &addr); + + if (ret < 0) + return ret; + ret = create_udp_socket(&addr, port, ttl); + if (ret < 0) + return ret; + fd = ret; + ret = connect(fd, (struct sockaddr *)&addr, sizeof(addr)); + if (ret >= 0) + return fd; + ret = -ERRNO_TO_PARA_ERROR(errno); + close(fd); + return ret; +} + +/** + * Create and bind a receiving UDP socket. + * + * Bind the created UDP socket to \a hostname, and add multicast membership if + * hostname is a multicast hostname. + * + * \param hostname Name or IPv4 address to receive from. + * \param port The udp port. + * + * \return The fd of the socket on success, negative on errors. + */ +int create_udp_recv_socket(char *hostname, unsigned short port) +{ + struct sockaddr_in addr; + int fd, ret = resolve(hostname, port, &addr); + + if (ret < 0) + memset(&addr.sin_addr, 0, sizeof(addr.sin_addr)); + ret = create_udp_socket(&addr, port, 1); + if (ret < 0) + return ret; + fd = ret; + ret = bind(fd, (struct sockaddr *)&addr, sizeof(addr)); + if (ret < 0) { + ret = -ERRNO_TO_PARA_ERROR(errno); + goto err; + } + /* Add multicast membership */ + if (IN_MULTICAST(htonl(addr.sin_addr.s_addr))) { + struct ip_mreq mreq; + + mreq.imr_multiaddr.s_addr = addr.sin_addr.s_addr; + mreq.imr_interface.s_addr = htonl(INADDR_ANY); + ret = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, + &mreq, sizeof(mreq)); + if (ret < 0) { + ret = -ERRNO_TO_PARA_ERROR(errno); + goto err; + } + } + return fd; +err: + close(fd); + return ret; +} diff --git a/net.h b/net.h index 0cb2767c..54657f8b 100644 --- a/net.h +++ b/net.h @@ -61,3 +61,6 @@ int recv_pattern(int fd, const char *pattern, size_t bufsize); void enable_crypt(int fd, crypt_function *recv_f, crypt_function *send_f, void *private_data); void disable_crypt(int fd); +int create_udp_recv_socket(char *hostname, unsigned short port); +int create_udp_send_socket(char *hostname, unsigned short port, + unsigned char ttl); diff --git a/ortp.h b/ortp.h deleted file mode 100644 index a3205b3d..00000000 --- a/ortp.h +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright (C) 2006-2008 Andre Noll - * - * Licensed under the GPL v2. For licencing details see COPYING. - */ - -/** \file ortp.h some macros used by ortp_send.c and ortp_recv.c*/ - -/** the possible packet types */ -enum ortp_audio_packet_type {ORTP_EOF, ORTP_BOF, ORTP_HEADER, ORTP_DATA}; - -/** number of bytes of the paraslash ortp header */ -#define ORTP_AUDIO_HEADER_LEN 10 - -/** write type of this packet to \a buf */ -#define WRITE_PACKET_TYPE(buf, x) (buf)[0] = (unsigned char)((x)&0xff) -/** get type of this packet */ -#define READ_PACKET_TYPE(buf) (unsigned)(buf)[0] - -/** write the chunk time for this packet to \a buf */ -#define WRITE_CHUNK_TIME(buf, x) (buf)[1] = (unsigned char)((x)&0xff);\ - (buf)[2] = (unsigned char)(((x)>>8)&0xff);\ - (buf)[3] = (unsigned char)(((x)>>16)&0xff);\ - (buf)[4] = (unsigned char)(((x)>>24)&0xff); - -/** get the chunk time of this packet */ -#define READ_CHUNK_TIME(buf) (unsigned char)(buf)[1] + \ - ((unsigned char)(buf)[2] << 8) + \ - ((unsigned char)(buf)[3] << 16) + \ - ((unsigned char)(buf)[4] << 24) - -/** write the chunk timestamp */ -#define WRITE_CHUNK_TS(buf, x) (buf)[5] = (unsigned char)((x) & 0xff); \ - (buf)[6] = (unsigned char)(((x >> 8) & 0xff)); -/** get the chunk timestamp */ -#define READ_CHUNK_TS(buf) (unsigned char)(buf)[5] + \ - ((unsigned char)(buf)[6] << 8) - -/** write the stream type (header or headerless) */ -#define WRITE_STREAM_TYPE(buf, x) (buf)[7] = (unsigned char)((x)&0xff) -/** get the type of the stream (header or headerless) */ -#define READ_STREAM_TYPE(buf) (unsigned)(buf)[7] - -/** write the length of the header (only used for streams with header) */ -#define WRITE_HEADER_LEN(buf, x) (buf)[8] = (unsigned char)((x) & 0xff); \ - (buf)[9] = (unsigned char)(((x >> 8) & 0xff)); - -/** get the length of the header (only used for packets containing a header) */ -#define READ_HEADER_LEN(buf) (unsigned char)(buf)[8] + \ - ((unsigned char)(buf)[9] << 8) diff --git a/ortp_recv.c b/ortp_recv.c deleted file mode 100644 index 102fe9ac..00000000 --- a/ortp_recv.c +++ /dev/null @@ -1,299 +0,0 @@ -/* - * Copyright (C) 2005-2008 Andre Noll - * - * Licensed under the GPL v2. For licencing details see COPYING. - */ -/** \file ortp_recv.c paraslash's ortp receiver */ - -#include -#include "para.h" - -#include "ortp.h" -#include "list.h" -#include "sched.h" -#include "ggo.h" -#include "recv.h" -#include "ortp_recv.cmdline.h" - -#include "error.h" -#include "audiod.h" -#include "string.h" - -#define CHUNK_SIZE 128 * 1024 - -/** - * data specific to the ortp receiver - * - * \sa receiver receiver_node - */ -struct private_ortp_recv_data { -/** - * - * - * whether a header was received - * - * A flag indicating whether the ortp receiver already received a packet which - * contains the audio file header. - * - * This flag has no effect if the audio stream indicates that no extra headers - * will be sent (mp3). Otherwise, all data packets are dropped until the - * header is received. - */ -int have_header; -/** the ortp session this instance of the receiver belongs to */ -RtpSession *session; -/** the time the first data or header packet was received */ -struct timeval start; -/** ETA of the next chunk */ -struct timeval next_chunk; -/** number of consecutive bad chunks */ -int c_bad; -/** the current timestamp which is passed to the receiving function of libortp */ -uint32_t timestamp; -/** \a timestamp increases by this amount */ -uint32_t chunk_ts; -}; - - -static int msg_to_buf(mblk_t *mp, char *buffer, int len) -{ - mblk_t *m, *mprev; - size_t mlen, rlen = len; - - m = mp->b_cont; - mprev = mp; - while (m != NULL) { - mlen = m->b_wptr - m->b_rptr; - if (mlen <= rlen) { - mblk_t *consumed = m; - memcpy(buffer, m->b_rptr, mlen); - /* go to next mblk_t */ - mprev->b_cont = m->b_cont; - m = m->b_cont; - consumed->b_cont = NULL; - freeb (consumed); - buffer += mlen; - rlen -= mlen; - } else { - memcpy (buffer, m->b_rptr, rlen); - m->b_rptr += rlen; - return len; - } - } - return len - rlen; -} - - -static void ortp_recv_pre_select(struct sched *s, struct task *t) -{ - struct receiver_node *rn = container_of(t, struct receiver_node, task); - struct private_ortp_recv_data *pord = rn->private_data; - struct timeval tmp; - - if (tv_diff(now, &pord->next_chunk, &tmp) >= 0) { - tmp.tv_sec = 0; - tmp.tv_usec = 1000; - } - if (tv_diff(&s->timeout, &tmp, NULL) > 0) - s->timeout = tmp; -} - -static void compute_next_chunk(unsigned chunk_time, - struct private_ortp_recv_data *pord) -{ - struct timeval chunk_tv = {0, chunk_time}; - struct timeval tmp; - - tv_add(&chunk_tv, &pord->next_chunk, &tmp); - pord->next_chunk = tmp; - pord->timestamp += pord->chunk_ts; -// PARA_DEBUG_LOG("next chunk (ts = %d) due at %lu:%lu\n", -// pord->timestamp, pord->next_chunk.tv_sec, -// pord->next_chunk.tv_usec); -} - -static void ortp_recv_post_select(__a_unused struct sched *s, struct task *t) -{ - struct receiver_node *rn = container_of(t, struct receiver_node, task); - struct private_ortp_recv_data *pord = rn->private_data; - mblk_t *mp; - int packet_type, stream_type; - char tmpbuf[CHUNK_SIZE + 3]; - unsigned chunk_time; - size_t packet_size; - -// PARA_INFO_LOG("rn: %p, pord: %p, session: %p\n", rn, pord, pord->session); - if (rn->output_error && *rn->output_error < 0) { - t->error = *rn->output_error; - return; - } - if (pord->start.tv_sec) - if (tv_diff(now, &pord->next_chunk, NULL) < 0) - return; - mp = rtp_session_recvm_with_ts(pord->session, pord->timestamp); - if (!mp) { - struct timeval min_delay = {0, 100}; -// PARA_INFO_LOG("nope, chunk_ts = %d, loaded: %d, bad: %d\n", -// pord->timestamp, rn->loaded, pord->c_bad); - pord->c_bad++; - if ((pord->c_bad > 5000 && pord->start.tv_sec) || pord->c_bad > 10000) { - t->error = -E_TOO_MANY_BAD_CHUNKS; - return; - } - tv_add(now, &min_delay, &pord->next_chunk); - return; - } - /* okay, we have a chunk of data */ - if (!pord->start.tv_sec) - pord->start = *now; - t->error = msg_to_buf(mp, tmpbuf, CHUNK_SIZE); - if (t->error < ORTP_AUDIO_HEADER_LEN) { - if (t->error < 0) - t->error = -E_MSG_TO_BUF; - else - t->error = -E_ORTP_RECV_EOF; - goto err_out; - } - packet_size = t->error; - packet_type = READ_PACKET_TYPE(tmpbuf); - stream_type = READ_STREAM_TYPE(tmpbuf); - chunk_time = READ_CHUNK_TIME(tmpbuf); - pord->chunk_ts = READ_CHUNK_TS(tmpbuf); -// PARA_INFO_LOG("packet type: %d, stream type: %d, chunk time: %u," -// " chunk_ts: %u, loaded: %u, bad: %d\n", packet_type, -// stream_type, chunk_time, pord->chunk_ts, rn->loaded, pord->c_bad); - switch (packet_type) { - unsigned header_len, payload_len; - case ORTP_EOF: - t->error = -E_RECV_EOF; - goto err_out; - case ORTP_BOF: - PARA_INFO_LOG("bof (%zu)\n", packet_size); - pord->have_header = 1; - /* fall through */ - case ORTP_DATA: - if (!pord->have_header && stream_type) - /* can't use the data, wait for header */ - goto success; - if (packet_size + rn->loaded >= CHUNK_SIZE + ORTP_AUDIO_HEADER_LEN) { - t->error = -E_OVERRUN; - goto err_out; - } - if (packet_size > ORTP_AUDIO_HEADER_LEN) { - memcpy(rn->buf + rn->loaded, tmpbuf + ORTP_AUDIO_HEADER_LEN, - packet_size - ORTP_AUDIO_HEADER_LEN); - rn->loaded += packet_size - ORTP_AUDIO_HEADER_LEN; - } - goto success; - case ORTP_HEADER: - header_len = READ_HEADER_LEN(tmpbuf); - PARA_DEBUG_LOG("header packet (%zu bytes), header len: %d\n", - packet_size, header_len); - if (!pord->have_header) { - pord->have_header = 1; - memcpy(rn->buf, tmpbuf + ORTP_AUDIO_HEADER_LEN, - packet_size - ORTP_AUDIO_HEADER_LEN); - rn->loaded = packet_size - ORTP_AUDIO_HEADER_LEN; - goto success; - } - if (header_len + ORTP_AUDIO_HEADER_LEN > packet_size) { - t->error = -E_INVALID_HEADER; - goto err_out; - } - payload_len = packet_size - ORTP_AUDIO_HEADER_LEN - header_len; - if (rn->loaded + payload_len > CHUNK_SIZE) { - t->error = -E_OVERRUN; - goto err_out; - } - if (payload_len) - memcpy(rn->buf + rn->loaded, tmpbuf - + (packet_size - payload_len), payload_len); - rn->loaded += payload_len; - } -success: - t->error = 0; - freemsg(mp); - if (pord->c_bad) { - pord->c_bad = 0; - pord->next_chunk = *now; - } - compute_next_chunk(chunk_time, pord); - return; -err_out: - freemsg(mp); -} - -static void ortp_shutdown(void) -{ -// ortp_global_stats_display(); - ortp_exit(); -} - -static void ortp_recv_close(struct receiver_node *rn) -{ - struct private_ortp_recv_data *pord = rn->private_data; - - rtp_session_destroy(pord->session); - free(rn->private_data); - free(rn->buf); -} - -static void *ortp_recv_parse_config(int argc, char **argv) -{ - int ret; - - struct ortp_recv_args_info *tmp = - para_calloc(sizeof(struct ortp_recv_args_info)); - - ret = ortp_recv_cmdline_parser(argc, argv, tmp)? -E_ORTP_SYNTAX : 1; - if (ret > 0) - return tmp; - free(tmp); - return NULL; -} - -static int ortp_recv_open(struct receiver_node *rn) -{ - struct private_ortp_recv_data *pord; - struct ortp_recv_args_info *c = rn->conf; - - rn->buf = para_calloc(CHUNK_SIZE); - - rn->private_data = para_calloc(sizeof(struct private_ortp_recv_data)); - pord = rn->private_data; - pord->session = rtp_session_new(RTP_SESSION_RECVONLY); - PARA_NOTICE_LOG("receiving from %s:%d\n", c->host_arg, c->port_arg); - rtp_session_set_local_addr(pord->session, c->host_arg, c->port_arg); - rtp_session_set_remote_addr(pord->session, c->host_arg, c->port_arg); - rtp_session_set_payload_type(pord->session, PAYLOAD_AUDIO_CONTINUOUS); - rtp_session_enable_adaptive_jitter_compensation(pord->session, TRUE); - rtp_session_set_jitter_compensation(pord->session, - c->jitter_compensation_arg); - return 1; -} - -/** - * the init function of the ortp receiver - * - * \param r pointer to the receiver struct to initialize - * - * Initialize all function pointers of \a r and call libortp's ortp_init(). - */ -void ortp_recv_init(struct receiver *r) -{ - struct ortp_recv_args_info dummy; - - ortp_recv_cmdline_parser_init(&dummy); - r->shutdown = ortp_shutdown; - r->open = ortp_recv_open; - r->close = ortp_recv_close; - r->pre_select = ortp_recv_pre_select; - r->post_select = ortp_recv_post_select; - r->parse_config = ortp_recv_parse_config; - r->help = (struct ggo_help) { - .short_help = ortp_recv_args_info_help, - .detailed_help = ortp_recv_args_info_detailed_help - }; - - ortp_init(); -} diff --git a/ortp_send.c b/ortp_send.c deleted file mode 100644 index 17f14d17..00000000 --- a/ortp_send.c +++ /dev/null @@ -1,350 +0,0 @@ -/* - * Copyright (C) 2005-2008 Andre Noll - * - * Licensed under the GPL v2. For licencing details see COPYING. - */ - -/** \file ortp_send.c para_server's ortp sender */ - -#include -#include - -#include "server.cmdline.h" -#include "para.h" -#include "error.h" -#include "string.h" -#include "afh.h" -#include "afs.h" -#include "server.h" -#include "vss.h" -#include "list.h" -#include "send.h" -#include "ortp.h" - -/** Convert in_addr to ascii. */ -#define TARGET_ADDR(oc) inet_ntoa((oc)->addr) - -/** Describes one entry in the list of targets for the ortp sender. */ -struct ortp_target { - /** Address info. */ - struct in_addr addr; - /** Whether the ortp sender is activated. */ - int status; - /** The ortp timestamp increases by this amount. */ - uint32_t chunk_ts; - /** The currently used timestamp for this target. */ - uint32_t last_ts; - /** The position of this target in the list of targets. */ - struct list_head node; - /** The UDP port. */ - int port; - /** Non-zero if at least one chunk has been sent to this target. */ - int streaming; - /** The session pointer from libortp. */ - RtpSession *session; -}; - -static struct list_head targets; -static struct sender *self; -static int sender_status; - -static void ortp_delete_target(struct ortp_target *ot, const char *msg) -{ - PARA_NOTICE_LOG("deleting %s:%d (%s) from list\n", TARGET_ADDR(ot), - ot->port, msg); - if (ot->session) { - rtp_session_destroy(ot->session); - ot->session = NULL; - } - list_del(&ot->node); - free(ot); -} - -static void ortp_send_buf(char *buf, size_t len, long unsigned chunks_sent) -{ - struct ortp_target *ot, *tmp; - int ret, ortp_len = len; /* rtp_session_send_with_ts expects int */ - - if (ortp_len < 0) - return; - list_for_each_entry_safe(ot, tmp, &targets, node) { - uint32_t ts; - if (!ot->session) - continue; - WRITE_CHUNK_TS(buf, ot->chunk_ts); - ts = ot->chunk_ts * chunks_sent; - ret = rtp_session_send_with_ts(ot->session, - (unsigned char*)buf, ortp_len, ts); - ot->last_ts = ts; - if (ret < 0) - ortp_delete_target(ot, "send error"); - if (ret != len + 12) - PARA_NOTICE_LOG("short write %d\n", ret); - } -} - -static int set_multicast(RtpSession *s) -{ - unsigned char loop = 1; - int ret; - - ret = setsockopt(s->rtp.socket, - IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop)); - if (ret < 0) { - PARA_ERROR_LOG("IP_MULTICAST_LOOP error %d\n", ret); - - } - return 1; -} - -static void ortp_init_session(struct ortp_target *ot) -{ - RtpSession *s; - int ret; - - PARA_NOTICE_LOG("sending to udp %s:%d\n", TARGET_ADDR(ot), ot->port); - ot->session = rtp_session_new(RTP_SESSION_SENDONLY); - if (!ot->session) - return; - s = ot->session; - if (conf.ortp_jitter_compensation_arg) { - rtp_session_enable_adaptive_jitter_compensation(ot->session, TRUE); - rtp_session_set_jitter_compensation(ot->session, - conf.ortp_jitter_compensation_arg); - } - /* always successful */ - rtp_session_set_send_payload_type(s, PAYLOAD_AUDIO_CONTINUOUS); - ret = rtp_session_set_remote_addr(s, TARGET_ADDR(ot), ot->port); - if (ret < 0) { - rtp_session_destroy(ot->session); - ot->session = NULL; - return; - } - set_multicast(s); -} - -/* called by vss */ -static void ortp_shutdown_targets(void) -{ - unsigned char buf[ORTP_AUDIO_HEADER_LEN]; - struct ortp_target *ot, *tmp; - - WRITE_PACKET_TYPE(buf, ORTP_EOF); - list_for_each_entry_safe(ot, tmp, &targets, node) { - if (!ot->session || !ot->streaming) - continue; - PARA_INFO_LOG("sending eof to ortp target %s:%d, ts = %d\n", - TARGET_ADDR(ot), ot->port, ot->last_ts); - rtp_session_send_with_ts(ot->session, buf, - ORTP_AUDIO_HEADER_LEN, ot->last_ts); - ot->streaming = 0; - ot->chunk_ts = 0; - rtp_session_reset(ot->session); - } -} - -static int need_extra_header(long unsigned current_chunk) -{ - static struct timeval last_header; - struct timeval now, diff; - - if (!current_chunk) - return 0; - gettimeofday(&now, NULL); - tv_diff(&now, &last_header, &diff); - if (tv2ms(&diff) < conf.ortp_header_interval_arg) - return 0; - last_header = now; - return 1; -} - -static void ortp_send(long unsigned current_chunk, long unsigned chunks_sent, - const char *buf, size_t len, const char *header_buf, - size_t header_len) -{ - struct ortp_target *ot, *tmp; - size_t sendbuf_len; - int packet_type = ORTP_DATA; - char *sendbuf; - struct timeval *chunk_tv; - - if (sender_status != SENDER_ON) - return; - -// PARA_NOTICE_LOG("sending %lu\n", current_chunk); - chunk_tv = vss_chunk_time(); - if (!chunk_tv) - return; - list_for_each_entry_safe(ot, tmp, &targets, node) { - if (!ot->session) { - ortp_init_session(ot); - if (!ot->session) - continue; - } - if (!ot->chunk_ts) - ot->chunk_ts = rtp_session_time_to_ts(ot->session, - (int)tv2ms(chunk_tv)); -// PARA_DEBUG_LOG("len: %d, ts: %lu, ts: %d\n", -// len, ot->chunk_ts * chunks_sent, ot->chunk_ts); - ot->streaming = 1; - } - if (list_empty(&targets)) - return; - if (!need_extra_header(current_chunk)) - header_len = 0; - sendbuf_len = ORTP_AUDIO_HEADER_LEN + header_len + len; - sendbuf = para_malloc(sendbuf_len); - if (!current_chunk) - packet_type = ORTP_BOF; - else if (header_len) - packet_type = ORTP_HEADER; - WRITE_PACKET_TYPE(sendbuf, packet_type); - WRITE_CHUNK_TIME(sendbuf, chunk_tv->tv_usec); - WRITE_STREAM_TYPE(sendbuf, header_buf? 1 : 0); - WRITE_HEADER_LEN(sendbuf, header_len); - if (header_len) - memcpy(sendbuf + ORTP_AUDIO_HEADER_LEN, header_buf, - header_len); - memcpy(sendbuf + ORTP_AUDIO_HEADER_LEN + header_len, buf, len); - ortp_send_buf(sendbuf, sendbuf_len, chunks_sent); - free(sendbuf); -} - -static int ortp_com_on(__a_unused struct sender_command_data *scd) -{ - - sender_status = SENDER_ON; - return 1; -} - -static int ortp_com_off(__a_unused struct sender_command_data *scd) -{ - ortp_shutdown_targets(); - sender_status = SENDER_OFF; - return 1; -} - -static int ortp_com_delete(struct sender_command_data *scd) -{ - char *a = para_strdup(inet_ntoa(scd->addr)); - struct ortp_target *ot, *tmp; - list_for_each_entry_safe(ot, tmp, &targets, node) { - if (scd->port != ot->port) - continue; - if (strcmp(TARGET_ADDR(ot), a)) - continue; - ortp_delete_target(ot, "com_delete"); - } - return 1; -} - -static void ortp_add_target(int port, struct in_addr *addr) -{ - struct ortp_target *ot = para_calloc(sizeof(struct ortp_target)); - ot->port = port; - ot->addr = *addr; - PARA_INFO_LOG("adding to target list (%s:%d)\n", - TARGET_ADDR(ot), ot->port); - para_list_add(&ot->node, &targets); -} - -static int ortp_com_add(struct sender_command_data *scd) -{ - int port = (scd->port > 0)? scd->port : conf.ortp_default_port_arg; - ortp_add_target(port, &scd->addr); - return 1; -} - -static char *ortp_info(void) -{ - struct ortp_target *ot; - char *ret, *tgts = NULL; - - list_for_each_entry(ot, &targets, node) { - char *tmp = make_message("%s%s:%d ", tgts? tgts : "", - TARGET_ADDR(ot), ot->port); - free(tgts); - tgts = tmp; - } - ret = make_message( - "ortp sender:\n" - "\tstatus: %s\n" - "\tport: udp %d\n" - "\ttargets: %s\n", - (sender_status == SENDER_ON)? "on" : "off", - conf.ortp_default_port_arg, - tgts? tgts : "(none)" - ); - free(tgts); - return ret; -} - -static void ortp_init_target_list(void) -{ - int i; - - INIT_LIST_HEAD(&targets); - for (i = 0; i < conf.ortp_target_given; i++) { - char *arg = para_strdup(conf.ortp_target_arg[i]); - char *p = strchr(arg, ':'); - int port; - struct in_addr addr; - - if (!p) - goto err; - *p = '\0'; - if (!inet_pton(AF_INET, arg, &addr)) - goto err; - port = atoi(++p); - if (port < 0 || port > 65535) - port = conf.ortp_default_port_arg; - ortp_add_target(port, &addr); - goto success; -err: - PARA_CRIT_LOG("syntax error for ortp_target option " - "#%d, ignoring\n", i); -success: - free(arg); - continue; - } -} - -static char *ortp_help(void) -{ - return make_message( - "usage: {on|off}\n" - "usage: {add|delete} IP port\n" - "example: add 224.0.1.38 1500 (LAN-streaming)\n" - ); -} - -/** - * the init function of para_server's ortp sender - * - * \param s pointer to the http sender struct - * - * It initializes all function pointers of \a s and the list of ortp targets. - */ -void ortp_send_init(struct sender *s) -{ - ortp_init(); - INIT_LIST_HEAD(&targets); - s->info = ortp_info; - s->help = ortp_help; - s->send = ortp_send; - s->pre_select = NULL; - s->post_select = NULL; - s->shutdown_clients = ortp_shutdown_targets; - s->client_cmds[SENDER_ON] = ortp_com_on; - s->client_cmds[SENDER_OFF] = ortp_com_off; - s->client_cmds[SENDER_DENY] = NULL; - s->client_cmds[SENDER_ALLOW] = NULL; - s->client_cmds[SENDER_ADD] = ortp_com_add; - s->client_cmds[SENDER_DELETE] = ortp_com_delete; - self = s; - sender_status = SENDER_OFF; - ortp_init_target_list(); - if (!conf.ortp_no_autostart_given) - sender_status = SENDER_ON; - PARA_DEBUG_LOG("ortp sender init complete\n"); -} diff --git a/recv.h b/recv.h index 3aed0180..0d1c180e 100644 --- a/recv.h +++ b/recv.h @@ -29,7 +29,7 @@ struct receiver_node { /** * Describes one supported paraslash receiver. * - * \sa http_recv.c, ortp_recv.c + * \sa http_recv.c, udp_recv.c */ struct receiver { /** @@ -41,7 +41,7 @@ struct receiver { * * It must fill in all other function pointers and is assumed to succeed. * - * \sa http_recv_init ortp_recv_init. + * \sa http_recv_init udp_recv_init. */ void (*init)(struct receiver *r); /** @@ -114,13 +114,8 @@ extern void http_recv_init(struct receiver *r); #define HTTP_RECEIVER {.name = "http", .init = http_recv_init}, extern void dccp_recv_init(struct receiver *r); #define DCCP_RECEIVER {.name = "dccp", .init = dccp_recv_init}, - -#ifdef HAVE_ORTP -extern void ortp_recv_init(struct receiver *r); -#define ORTP_RECEIVER {.name = "ortp", .init = ortp_recv_init}, -#else -#define ORTP_RECEIVER -#endif +extern void udp_recv_init(struct receiver *r); +#define UDP_RECEIVER {.name = "udp", .init = udp_recv_init}, extern struct receiver receivers[]; /** \endcond */ @@ -129,7 +124,7 @@ extern struct receiver receivers[]; #define DEFINE_RECEIVER_ARRAY struct receiver receivers[] = { \ HTTP_RECEIVER \ DCCP_RECEIVER \ - ORTP_RECEIVER \ + UDP_RECEIVER \ {.name = NULL}}; #define FOR_EACH_RECEIVER(i) for (i = 0; receivers[i].name; i++) diff --git a/send.h b/send.h index 2fe1cc56..bb4058a7 100644 --- a/send.h +++ b/send.h @@ -12,7 +12,7 @@ enum {SENDER_ADD, SENDER_DELETE, SENDER_ALLOW, SENDER_DENY, SENDER_ON, SENDER_OF /** * Describes one supported sender of para_server. * - * \sa http_send.c ortp_send.c, dccp_send.c. + * \sa http_send.c udp_send.c, dccp_send.c. */ struct sender { /** The name of the sender. */ diff --git a/server.c b/server.c index 0924ace4..77bbc883 100644 --- a/server.c +++ b/server.c @@ -27,7 +27,7 @@ * - Volume normalizer: \ref compress_filter.c, * - Output: \ref alsa_write.c, \ref osx_write.c, * - http: \ref http_recv.c, \ref http_send.c, - * - ortp: \ref ortp_recv.c, \ref ortp_send.c, + * - udp: \ref udp_recv.c, \ref udp_send.c, * - dccp: \ref dccp_recv.c, \ref dccp_send.c, * - Audio file selector: \ref afs.c, \ref aft.c, \ref mood.c, * - Afs structures: \ref afs_table, \ref audio_file_data, diff --git a/server.ggo b/server.ggo index 9f391203..230d1dd0 100644 --- a/server.ggo +++ b/server.ggo @@ -303,48 +303,48 @@ details=" See http_max_clients for details. " -##################### -section "ortp sender" -##################### +#################### +section "udp sender" +#################### -option "ortp_target" - -#~~~~~~~~~~~~~~~~~~~~~ -"add ortp target" +option "udp_target" - +#~~~~~~~~~~~~~~~~~~~~ +"add udp target" string typestr="a.b.c.d:p" optional multiple details=" Add given host/port to the list of targets. This option can be given multiple times. Example: '224.0.1.38:1500' - instructs the ortp sender to send to udp port 1500 on host + instructs the udp sender to send to udp port 1500 on host 224.0.1.38 (unassigned ip in the Local Network Control Block - 224.0.0/24). This is useful for LAN-streaming. + 224.0.0/24). This is useful for multicast streaming. " -option "ortp_no_autostart" - -#~~~~~~~~~~~~~~~~~~~~~~~~~~~ +option "udp_no_autostart" - +#~~~~~~~~~~~~~~~~~~~~~~~~~~ "do not start sending" flag off details=" - If this option is given, ortp streaming may be activated at + If this option is given, udp streaming may be activated at a later time by using the sender command. " -option "ortp_default_port" - -#~~~~~~~~~~~~~~~~~~~~~~~~~~~ +option "udp_default_port" - +#~~~~~~~~~~~~~~~~~~~~~~~~~~ "udp port to send to" int typestr="port" -default="1500" +default="8000" optional -option "ortp_header_interval" H -#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +option "udp_header_interval" H +#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ "duration for sending header" int typestr="ms" default="2000" optional details=" - As the ortp sender has no idea about connected clients it + As the udp sender has no idea about connected clients it sends the audio file header periodically if necessary. This option is used to specify the duration of the interval between sending the header. Shorter values decrease the average time @@ -353,15 +353,3 @@ details=" that this affects only ogg vorbis streams as this is the only audio format that needs an audio file header. " - -option "ortp_jitter_compensation" j -#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -"configure jitter compensation" -int typestr="ms" -default="400" -optional -details=" - ortp's adaptive jitter compensation gets activated whenever - this value is greater than zero. See the ortp documentation - about details on this feature. -" diff --git a/udp_header.h b/udp_header.h new file mode 100644 index 00000000..55518057 --- /dev/null +++ b/udp_header.h @@ -0,0 +1,138 @@ +/* + * Copyright (C) 2006-2009 Andre Noll + * + * Licensed under the GPL v2. For licencing details see COPYING. + */ + +/** \file udp_header.h some macros used by udp_send.c and udp_recv.c. */ + +/** + * Number of bytes of the paraslash udp header. + * + * The udp sender prepends a header at the beginning of each data chunk. Within + * this header, the type of the current audio stream and the * type of this + * data chunk is coded. + */ +#define UDP_AUDIO_HEADER_LEN 8 + +/** The possible stream types. */ +enum udp_stream_type { + /** Used for mp3 and aac streams. */ + UDP_PLAIN_STREAM, + /* Ogg vorbis streams. */ + UDP_HEADER_STREAM +}; + +/** The possible packet types. */ +enum udp_audio_packet_type { + /** End of file. */ + UDP_EOF_PACKET, + /** Beginning of file. */ + UDP_BOF_PACKET, + /** Combined header/data packet (ogg only). */ + UDP_HEADER_PACKET, + /** Packet contains only audio file data. */ + UDP_DATA_PACKET +}; + +/** + * Write the magic bytes to the beginning of a buffer. + * + * \param buf The buffer. + */ +_static_inline_ void udp_write_magic(char *buf) +{ + memcpy(buf, "UDPM", 4); +} + +/** + * Check whether this buffer contains the magic bytes. + * + * \param buf The buffer. + * \param len The number of bytes of \a buf. + * + * \return Positive if \a buf contains the magic bytes, + * -1 otherwise. + */ +_static_inline_ int udp_check_magic(char *buf, size_t len) +{ + if (len < 4) + return -1; + if (memcmp(buf, "UDPM", 4)) + return -1; + return 1; +} + +/** + * Write the type of the audio stream to a buffer. + * + * \param buf The buffer. + * \param type The type to be written. + * + * \sa \ref udp_stream_type. + */ +_static_inline_ void udp_write_stream_type(char *buf, uint8_t type) +{ + write_u8(buf + 4, type); +} + +/** + * Read the type of the audio stream from a buffer. + * + * \param buf The buffer. + * + * \return Either UDP_PLAIN_STREAM or UDP_HEADER_STREAM. + * \sa \ref udp_stream_type. + */ +_static_inline_ uint8_t udp_read_stream_type(char *buf) +{ + return read_u8(buf + 4); +} + +/** + * Write the type of this packet to a buffer. + * + * \param buf The buffer. + * \param type The type to be written. + * + * \sa \ref udp_audio_packet_type. + */ +_static_inline_ void udp_write_packet_type(char *buf, uint8_t type) +{ + write_u8(buf + 5, type); +} + +/** + * Read the type of this buffer. + * + * \param buf The buffer. + * + * \return One of the four differnt packet types. + * \sa \ref udp_stream_type. + */ +_static_inline_ uint8_t udp_read_packet_type(char *buf) +{ + return read_u8(buf + 5); +} + +/** + * Write the length of the header (non-zero only for ogg streams). + * + * \param buf The buffer. + * \param len The length of the header in bytes. + */ +_static_inline_ void udp_write_header_len(char *buf, uint16_t len) +{ + write_u16(buf + 6, len); +} + +/** + * Read the length of the header. + * + * \param buf The buffer. + * \return The header length in bytes. + */ +_static_inline_ uint16_t udp_read_header_len(char *buf) +{ + return read_u16(buf + 6); +} diff --git a/udp_recv.c b/udp_recv.c new file mode 100644 index 00000000..b2563726 --- /dev/null +++ b/udp_recv.c @@ -0,0 +1,225 @@ +/* + * Copyright (C) 2005-2008 Andre Noll + * + * Licensed under the GPL v2. For licencing details see COPYING. + */ +/** \file udp_recv.c Paraslash's udp receiver */ + +#include + +#include "para.h" +#include "portable_io.h" +#include "udp_header.h" +#include "list.h" +#include "sched.h" +#include "ggo.h" +#include "recv.h" +#include "udp_recv.cmdline.h" +#include "error.h" +#include "audiod.h" +#include "string.h" +#include "net.h" +#include "fd.h" + +/** The size of the receiver node buffer. */ +#define UDP_RECV_CHUNK_SIZE (128 * 1024) + +/** + * Data specific to the udp receiver. + * + * \sa \ref receiver, \ref receiver_node. + */ +struct private_udp_recv_data { + /** + * Whether a header was received. + * + * A flag indicating whether this receiver already received a packet + * which contains the audio file header. + * + * This flag has no effect if the audio stream indicates that no extra + * headers will be sent (mp3, aac). Otherwise, all data packets are + * dropped until the header is received. + */ + int have_header; + /** The socket file descriptor. */ + int fd; +}; + +static void udp_recv_pre_select(struct sched *s, struct task *t) +{ + struct receiver_node *rn = container_of(t, struct receiver_node, task); + struct private_udp_recv_data *purd = rn->private_data; + + para_fd_set(purd->fd, &s->rfds, &s->max_fileno); +} + +static int enough_space(size_t packet_size, size_t loaded) +{ + return packet_size + loaded < UDP_RECV_CHUNK_SIZE + UDP_AUDIO_HEADER_LEN; +} + +static void udp_recv_post_select(__a_unused struct sched *s, struct task *t) +{ + struct receiver_node *rn = container_of(t, struct receiver_node, task); + struct private_udp_recv_data *purd = rn->private_data; + int ret; + char tmpbuf[UDP_RECV_CHUNK_SIZE]; + size_t packet_size; + uint8_t stream_type, packet_type; + + if (rn->output_error && *rn->output_error < 0) { + t->error = *rn->output_error; + return; + } + if (!FD_ISSET(purd->fd, &s->rfds)) + return; + ret = recv_bin_buffer(purd->fd, tmpbuf, UDP_RECV_CHUNK_SIZE); + if (ret < 0) { + t->error = 0; + if (errno != EINTR && errno != EAGAIN) + t->error = -ERRNO_TO_PARA_ERROR(errno); + return; + } + packet_size = ret; + if (packet_size < UDP_AUDIO_HEADER_LEN) { + t->error = -E_UDP_SHORT_PACKET; /* FIXME: We shouldn't fail here */ + return; + } + if (udp_check_magic(tmpbuf, packet_size) < 0) { + t->error = -E_UDP_NO_MAGIC; + return; + } + stream_type = udp_read_stream_type(tmpbuf); + packet_type = udp_read_packet_type(tmpbuf); +// PARA_INFO_LOG("packet type: %d, stream type: %d," +// " loaded: %u\n", packet_type, +// (unsigned) stream_type, rn->loaded); + switch (packet_type) { + uint16_t header_len, payload_len; + case UDP_EOF_PACKET: + t->error = -E_RECV_EOF; + return; + case UDP_BOF_PACKET: + PARA_INFO_LOG("bof (%zu)\n", packet_size); + purd->have_header = 1; + /* fall through */ + case UDP_DATA_PACKET: + if (!purd->have_header && stream_type == UDP_HEADER_STREAM) + /* can't use the data, wait for header */ + goto success; + if (!enough_space(packet_size, rn->loaded)) { + t->error = -E_OVERRUN; + return; + } + if (packet_size > UDP_AUDIO_HEADER_LEN) { + memcpy(rn->buf + rn->loaded, tmpbuf + UDP_AUDIO_HEADER_LEN, + packet_size - UDP_AUDIO_HEADER_LEN); + rn->loaded += packet_size - UDP_AUDIO_HEADER_LEN; + } + goto success; + case UDP_HEADER_PACKET: + if (!enough_space(packet_size, rn->loaded)) { + t->error = -E_OVERRUN; + return; + } + header_len = udp_read_header_len(tmpbuf); + if (header_len + UDP_AUDIO_HEADER_LEN > packet_size) { + t->error = -E_INVALID_HEADER; + return; + } +// PARA_DEBUG_LOG("header packet (%zu bytes), header len: %d\n", +// packet_size, header_len); + if (!purd->have_header) { + purd->have_header = 1; + rn->loaded = header_len; + memcpy(rn->buf, tmpbuf + UDP_AUDIO_HEADER_LEN, + rn->loaded); +// sleep(1); + goto success; + } + payload_len = packet_size - UDP_AUDIO_HEADER_LEN - header_len; + if (rn->loaded + payload_len > UDP_RECV_CHUNK_SIZE) { + t->error = -E_OVERRUN; + return; + } + if (payload_len) + memcpy(rn->buf + rn->loaded, tmpbuf + + (packet_size - payload_len), payload_len); + rn->loaded += payload_len; + } +success: + t->error = 0; +} + +static void udp_shutdown(void) +{ + return; +} + +static void udp_recv_close(struct receiver_node *rn) +{ + struct private_udp_recv_data *purd = rn->private_data; + + if (purd->fd >= 0) + close(purd->fd); + free(rn->private_data); + free(rn->buf); +} + +static void *udp_recv_parse_config(int argc, char **argv) +{ + int ret; + struct udp_recv_args_info *tmp = + para_calloc(sizeof(struct udp_recv_args_info)); + + ret = udp_recv_cmdline_parser(argc, argv, tmp)? -E_UDP_SYNTAX : 1; + if (ret >= 0) + return tmp; + free(tmp); + return NULL; +} + +static int udp_recv_open(struct receiver_node *rn) +{ + struct private_udp_recv_data *purd; + struct udp_recv_args_info *c = rn->conf; + int ret; + + rn->buf = para_calloc(UDP_RECV_CHUNK_SIZE); + rn->private_data = para_calloc(sizeof(struct private_udp_recv_data)); + purd = rn->private_data; + ret = create_udp_recv_socket(c->host_arg, c->port_arg); + if (ret < 0) + return ret; + purd->fd = ret; + ret = mark_fd_nonblocking(purd->fd); + if (ret < 0) + return ret; + PARA_NOTICE_LOG("receiving from %s:%d, fd=%d\n", c->host_arg, + c->port_arg, purd->fd); + return purd->fd; +} + +/** + * The init function of the udp receiver. + * + * \param r Pointer to the receiver struct to initialize. + * + * Initialize all function pointers of \a r. + */ +void udp_recv_init(struct receiver *r) +{ + struct udp_recv_args_info dummy; + + udp_recv_cmdline_parser_init(&dummy); + r->shutdown = udp_shutdown; + r->open = udp_recv_open; + r->close = udp_recv_close; + r->pre_select = udp_recv_pre_select; + r->post_select = udp_recv_post_select; + r->parse_config = udp_recv_parse_config; + r->help = (struct ggo_help) { + .short_help = udp_recv_args_info_help, + .detailed_help = udp_recv_args_info_detailed_help + }; +} diff --git a/ortp_recv.ggo b/udp_recv.ggo similarity index 53% rename from ortp_recv.ggo rename to udp_recv.ggo index a706d543..6ad09f47 100644 --- a/ortp_recv.ggo +++ b/udp_recv.ggo @@ -1,5 +1,5 @@ option "host" i -"ip or host to receive rtp packets from" +"ip or host to receive udp packets from" string default="224.0.1.38" optional details=" @@ -9,11 +9,5 @@ details=" option "port" p "udp port" int typestr="portnumber" -default="1500" -optional - -option "jitter_compensation" j -"set ortp's adaptive jitter compensation" -int typestr="milliseconds" -default="0" +default="8000" optional diff --git a/udp_send.c b/udp_send.c new file mode 100644 index 00000000..8f5d796b --- /dev/null +++ b/udp_send.c @@ -0,0 +1,291 @@ +/* + * Copyright (C) 2005-2008 Andre Noll + * + * Licensed under the GPL v2. For licencing details see COPYING. + */ + +/** \file udp_send.c Para_server's udp sender. */ + + +#include +#include + +#include "server.cmdline.h" +#include "para.h" +#include "error.h" +#include "string.h" +#include "afh.h" +#include "afs.h" +#include "server.h" +#include "vss.h" +#include "list.h" +#include "send.h" +#include "portable_io.h" +#include "udp_header.h" +#include "net.h" +#include "fd.h" +#include "sched.h" + + +/** Convert in_addr to ascii. */ +#define TARGET_ADDR(oc) inet_ntoa((oc)->addr) + +/** Describes one entry in the list of targets for the udp sender. */ +struct udp_target { + /** Address info. */ + struct in_addr addr; + /** The position of this target in the list of targets. */ + struct list_head node; + /** The UDP port. */ + int port; + /** The socket fd. */ + int fd; +}; + +static struct list_head targets; +static int sender_status; + +static void udp_delete_target(struct udp_target *ut, const char *msg) +{ + PARA_NOTICE_LOG("deleting %s:%d (%s) from list\n", TARGET_ADDR(ut), + ut->port, msg); + if (ut->fd >= 0) + close(ut->fd); + list_del(&ut->node); + free(ut); +} + +static void udp_send_buf(char *buf, size_t len) +{ + struct udp_target *ut, *tmp; + int ret; + + list_for_each_entry_safe(ut, tmp, &targets, node) { + size_t written = len; + if (ut->fd < 0) + continue; + ret = write_all(ut->fd, buf, &written); + if (ret < 0) + return udp_delete_target(ut, "send error"); + if (written != len) + PARA_WARNING_LOG("short write %zu/%zu\n", written, len); + } +} + +static void udp_init_session(struct udp_target *ut) +{ + PARA_NOTICE_LOG("sending to udp %s:%d\n", TARGET_ADDR(ut), ut->port); + ut->fd = create_udp_send_socket(TARGET_ADDR(ut), ut->port, 10); +} + +static void udp_shutdown_targets(void) +{ + char buf[UDP_AUDIO_HEADER_LEN]; + struct udp_target *ut, *tmp; + + udp_write_packet_type(buf, UDP_EOF_PACKET); + list_for_each_entry_safe(ut, tmp, &targets, node) { + if (ut->fd < 0) + continue; + PARA_INFO_LOG("sending eof to udp target %s:%d\n", + TARGET_ADDR(ut), ut->port); + write(ut->fd, buf, UDP_AUDIO_HEADER_LEN); + } +} + +static int need_extra_header(long unsigned current_chunk) +{ + static struct timeval last_header; + struct timeval diff; + + if (!current_chunk) + return 0; + tv_diff(now, &last_header, &diff); + if (tv2ms(&diff) < conf.udp_header_interval_arg) + return 0; + last_header = *now; + return 1; +} + +static void udp_send(long unsigned current_chunk, __a_unused long unsigned chunks_sent, + const char *buf, size_t len, const char *header_buf, + size_t header_len) +{ + struct udp_target *ut, *tmp; + size_t sendbuf_len; + int packet_type = UDP_DATA_PACKET; + char *sendbuf; + struct timeval *chunk_tv; + uint8_t stream_type = header_len? UDP_HEADER_STREAM : UDP_PLAIN_STREAM; + +// PARA_NOTICE_LOG("header_len: %zd, header_buf: %p\n", header_len, +// header_buf); + if (sender_status != SENDER_ON) + return; + + /* we might not yet know the chunk time */ + chunk_tv = vss_chunk_time(); + if (!chunk_tv) + return; + if (list_empty(&targets)) + return; + list_for_each_entry_safe(ut, tmp, &targets, node) { + if (ut->fd < 0) + udp_init_session(ut); + } + if (!need_extra_header(current_chunk)) + header_len = 0; + if (!current_chunk) + packet_type = UDP_BOF_PACKET; + else if (header_len) + packet_type = UDP_HEADER_PACKET; + sendbuf_len = UDP_AUDIO_HEADER_LEN + header_len + len; + sendbuf = para_malloc(sendbuf_len); + udp_write_magic(sendbuf); + udp_write_stream_type(sendbuf, stream_type); + udp_write_packet_type(sendbuf, packet_type); + udp_write_header_len(sendbuf, header_len); + if (header_len) + memcpy(sendbuf + UDP_AUDIO_HEADER_LEN, header_buf, + header_len); + memcpy(sendbuf + UDP_AUDIO_HEADER_LEN + header_len, buf, len); + udp_send_buf(sendbuf, sendbuf_len); + free(sendbuf); +} + +static int udp_com_on(__a_unused struct sender_command_data *scd) +{ + sender_status = SENDER_ON; + return 1; +} + +static int udp_com_off(__a_unused struct sender_command_data *scd) +{ + udp_shutdown_targets(); + sender_status = SENDER_OFF; + return 1; +} + +static int udp_com_delete(struct sender_command_data *scd) +{ + char *a = para_strdup(inet_ntoa(scd->addr)); + struct udp_target *ut, *tmp; + list_for_each_entry_safe(ut, tmp, &targets, node) { + if (scd->port != ut->port) + continue; + if (strcmp(TARGET_ADDR(ut), a)) + continue; + udp_delete_target(ut, "com_delete"); + } + return 1; +} + +static void udp_add_target(int port, struct in_addr *addr) +{ + struct udp_target *ut = para_calloc(sizeof(struct udp_target)); + ut->port = port; + ut->addr = *addr; + ut->fd = -1; /* not yet connected */ + PARA_INFO_LOG("adding to target list (%s:%d)\n", + TARGET_ADDR(ut), ut->port); + para_list_add(&ut->node, &targets); +} + +static int udp_com_add(struct sender_command_data *scd) +{ + int port = (scd->port > 0)? scd->port : conf.udp_default_port_arg; + udp_add_target(port, &scd->addr); + return 1; +} + +static char *udp_info(void) +{ + struct udp_target *ut; + char *ret, *tgts = NULL; + + list_for_each_entry(ut, &targets, node) { + char *tmp = make_message("%s%s:%d ", tgts? tgts : "", + TARGET_ADDR(ut), ut->port); + free(tgts); + tgts = tmp; + } + ret = make_message( + "udp sender:\n" + "\tstatus: %s\n" + "\tport: udp %d\n" + "\ttargets: %s\n", + (sender_status == SENDER_ON)? "on" : "off", + conf.udp_default_port_arg, + tgts? tgts : "(none)" + ); + free(tgts); + return ret; +} + +static void udp_init_target_list(void) +{ + int i; + + INIT_LIST_HEAD(&targets); + for (i = 0; i < conf.udp_target_given; i++) { + char *arg = para_strdup(conf.udp_target_arg[i]); + char *p = strchr(arg, ':'); + int port; + struct in_addr addr; + + if (!p) + goto err; + *p = '\0'; + if (!inet_pton(AF_INET, arg, &addr)) + goto err; + port = atoi(++p); + if (port < 0 || port > 65535) + port = conf.udp_default_port_arg; + udp_add_target(port, &addr); + goto success; +err: + PARA_CRIT_LOG("syntax error for udp target option " + "#%d, ignoring\n", i); +success: + free(arg); + continue; + } +} + +static char *udp_help(void) +{ + return make_message( + "usage: {on|off}\n" + "usage: {add|delete} IP port\n" + "example: add 224.0.1.38 8000 (multicast streaming)\n" + ); +} + +/** + * The init function of para_server's udp sender. + * + * \param s Pointer to the http sender struct. + * + * It initializes all function pointers of \a s and the list of udp targets. + */ +void udp_send_init(struct sender *s) +{ + INIT_LIST_HEAD(&targets); + s->info = udp_info; + s->help = udp_help; + s->send = udp_send; + s->pre_select = NULL; + s->post_select = NULL; + s->shutdown_clients = udp_shutdown_targets; + s->client_cmds[SENDER_ON] = udp_com_on; + s->client_cmds[SENDER_OFF] = udp_com_off; + s->client_cmds[SENDER_DENY] = NULL; + s->client_cmds[SENDER_ALLOW] = NULL; + s->client_cmds[SENDER_ADD] = udp_com_add; + s->client_cmds[SENDER_DELETE] = udp_com_delete; + sender_status = SENDER_OFF; + udp_init_target_list(); + if (!conf.udp_no_autostart_given) + sender_status = SENDER_ON; + PARA_DEBUG_LOG("udp sender init complete\n"); +} diff --git a/vss.c b/vss.c index 4ed75792..ba330a82 100644 --- a/vss.c +++ b/vss.c @@ -32,7 +32,7 @@ extern struct misc_meta_data *mmd; extern void dccp_send_init(struct sender *); extern void http_send_init(struct sender *); -extern void ortp_send_init(struct sender *); +extern void udp_send_init(struct sender *); /** The list of supported senders. */ struct sender senders[] = { @@ -44,12 +44,10 @@ struct sender senders[] = { .name = "dccp", .init = dccp_send_init, }, -#ifdef HAVE_ORTP { - .name = "ortp", - .init = ortp_send_init, + .name = "udp", + .init = udp_send_init, }, -#endif { .name = NULL, } -- 2.39.2