From c9f109a9e7f2d6116b7906a852afc339c858c275 Mon Sep 17 00:00:00 2001 From: Andre Date: Thu, 9 Mar 2006 00:04:15 +0100 Subject: [PATCH] First draft of the dccp sender/receiver pair. --- configure.ac | 16 ++--- dccp.c | 28 ++++++++ dccp.h | 8 +++ dccp_recv.c | 137 ++++++++++++++++++++++++++++++++++++++++ dccp_recv.ggo | 3 + dccp_send.c | 172 ++++++++++++++++++++++++++++++++++++++++++++++++++ error.h | 23 ++++++- http_recv.ggo | 1 + recv.c | 2 +- recv.h | 3 + send.h | 19 ++++++ server.c | 7 +- server.ggo | 3 + 13 files changed, 411 insertions(+), 11 deletions(-) create mode 100644 dccp.c create mode 100644 dccp.h create mode 100644 dccp_recv.c create mode 100644 dccp_recv.ggo create mode 100644 dccp_send.c diff --git a/configure.ac b/configure.ac index fac1ab57..224d840a 100644 --- a/configure.ac +++ b/configure.ac @@ -56,24 +56,24 @@ AC_CHECK_LIB([readline], [readline], [], AC_CHECK_LIB([menu], [new_menu], [extras="$extras para_dbadm"], [AC_MSG_WARN([libmenu not found, cannot build para_dbadm])]) -########################################################################### -recv_cmdline_objs="recv.cmdline http_recv.cmdline" -recv_errlist_objs="http_recv recv_common recv time string net" + +recv_cmdline_objs="recv.cmdline http_recv.cmdline dccp_recv.cmdline" +recv_errlist_objs="http_recv recv_common recv time string net dccp_recv dccp" recv_ldflags="" filter_cmdline_objs="filter.cmdline compress_filter.cmdline" filter_errlist_objs="filter_chain wav compress filter string" filter_ldflags="" -audiod_cmdline_objs="audiod.cmdline grab_client.cmdline compress_filter.cmdline - http_recv.cmdline" -audiod_errlist_objs="audiod exec close_on_fork signal string daemon stat net - time grab_client filter_chain wav compress http_recv recv_common ringbuffer" +audiod_cmdline_objs="audiod.cmdline grab_client.cmdline compress_filter.cmdline + http_recv.cmdline dccp_recv.cmdline" +audiod_errlist_objs="audiod exec close_on_fork signal string daemon stat net + time grab_client filter_chain wav compress http_recv dccp dccp_recv recv_common ringbuffer" audiod_ldflags="" server_cmdline_objs="server.cmdline" server_errlist_objs="server mp3 afs command net string signal random_dbtool time daemon stat - crypt http_send db close_on_fork plm_dbtool ipc" + crypt http_send dccp dccp_send db close_on_fork plm_dbtool ipc" server_ldflags="" ########################################################################### ssl diff --git a/dccp.c b/dccp.c new file mode 100644 index 00000000..5e769be2 --- /dev/null +++ b/dccp.c @@ -0,0 +1,28 @@ +#include "para.h" +#include "error.h" +#include "dccp.h" + +int dccp_get_socket(void) +{ + int s = socket(AF_INET, SOCK_DCCP, IPPROTO_DCCP); + + if (s < 0) + return -E_DCCP_SOCKET; + return s; +} + +int dccp_set_socket(int fd) +{ + int pkt_size = 256, ret; + + /* hack to get a service code */ + ret = setsockopt(fd, SOL_DCCP, DCCP_SOCKOPT_PACKET_SIZE, + (char*)&pkt_size, sizeof(pkt_size)); + if (ret < 0) + return -E_DCCP_PACKET_SIZE; + ret = setsockopt(fd, SOL_DCCP, DCCP_SOCKOPT_SERVICE, + (char*)&pkt_size, sizeof(pkt_size)); + if (ret < 0) + return -E_DCCP_SERVICE; + return 1; +} diff --git a/dccp.h b/dccp.h new file mode 100644 index 00000000..6687bf32 --- /dev/null +++ b/dccp.h @@ -0,0 +1,8 @@ +#define SOL_DCCP 269 +#define SOCK_DCCP 6 +#define IPPROTO_DCCP 33 +#define DCCP_SOCKOPT_PACKET_SIZE 1 +#define DCCP_SOCKOPT_SERVICE 2 + +int dccp_get_socket(void); +int dccp_set_socket(int fd); diff --git a/dccp_recv.c b/dccp_recv.c new file mode 100644 index 00000000..f56f58cb --- /dev/null +++ b/dccp_recv.c @@ -0,0 +1,137 @@ +#include "para.h" +#include "error.h" +#include "dccp.h" +#include "recv.h" +#include "string.h" +#include "net.h" + +#include "dccp_recv.cmdline.h" + +/* needed by getaddrinfo */ +#include +#include +#include + + +#define DCCP_BUFSIZE 4096 + +/** + * data specific to the dccp receiver + * + * \sa receiver receiver_node + */ +struct private_dccp_recv_data { + /** the file descriptor for the dccp socket */ + int fd; +}; + + +static void dccp_recv_close(struct receiver_node *rn) +{ + + struct private_dccp_recv_data *pdd = rn->private_data; + + if (pdd && pdd->fd > 0) + close(pdd->fd); + free(rn->buf); + rn->buf = NULL; + free(rn->private_data); + rn->private_data = NULL; +} + + +static int dccp_recv_open(struct receiver_node *rn) +{ + struct private_dccp_recv_data *pdd; + struct gengetopt_args_info *conf = rn->conf; + int ret; + struct addrinfo *ai; + char *tmp; + + rn->buf = para_calloc(DCCP_BUFSIZE); + rn->private_data = para_calloc(sizeof(struct private_dccp_recv_data)); + pdd = rn->private_data; + ret = dccp_get_socket(); + if (ret < 0) + goto err_out; + pdd->fd = ret; + + tmp = make_message("%d", conf->port_arg); + ret = getaddrinfo(conf->host_arg, tmp, NULL, &ai); + free(tmp); + if (ret) { + ret = -E_ADDR_INFO; + goto err_out; + } + ret = dccp_set_socket(pdd->fd); + if (ret < 0) + goto err_out; + PARA_NOTICE_LOG("connecting to %s:%d\n", conf->host_arg, conf->port_arg); + ret = -E_DCCP_CONNECT; + if (connect(pdd->fd, ai->ai_addr, ai->ai_addrlen) < 0) + goto err_out; + return 1; +err_out: + dccp_recv_close(rn); + return ret; +} + +static void dccp_shutdown(void) +{ + ; /* nothing to do */ +} + +static void *dccp_recv_parse_config(int argc, char **argv) +{ + struct gengetopt_args_info *tmp = para_calloc(sizeof(struct gengetopt_args_info)); + + if (!dccp_recv_cmdline_parser(argc, argv, tmp)) + return tmp; + free(tmp); + return NULL; +} + +static int dccp_recv_pre_select(struct receiver_node *rn, fd_set *rfds, + __unused fd_set *wfds, __unused struct timeval *timeout) +{ + struct private_dccp_recv_data *pdd = rn->private_data; + + if (pdd) + FD_SET(pdd->fd, rfds); + return pdd->fd; +} + +static int dccp_recv_post_select(struct receiver_node *rn, int select_ret, + fd_set *rfds, __unused fd_set *wfds) +{ + int ret; + struct private_dccp_recv_data *pdd = rn->private_data; + + if (!select_ret || !pdd || !FD_ISSET(pdd->fd, rfds)) + return 1; /* nothing to do */ + if (rn->loaded >= DCCP_BUFSIZE) + return -E_DCCP_OVERRUN; + ret = recv_bin_buffer(pdd->fd, rn->buf + rn->loaded, + DCCP_BUFSIZE - rn->loaded); + if (ret <= 0) + return ret; + rn->loaded += ret; + return 1; +} + +/** + * the init function of the dccp receiver + * + * \param r pointer to the receiver struct to initialize + * + * Initialize all function pointers of \a r + */ +void dccp_recv_init(struct receiver *r) +{ + r->shutdown = dccp_shutdown; + r->open = dccp_recv_open; + r->close = dccp_recv_close; + r->pre_select = dccp_recv_pre_select; + r->post_select = dccp_recv_post_select; + r->parse_config = dccp_recv_parse_config; +} diff --git a/dccp_recv.ggo b/dccp_recv.ggo new file mode 100644 index 00000000..481dcb31 --- /dev/null +++ b/dccp_recv.ggo @@ -0,0 +1,3 @@ +section "options of the dccp receiver" +option "host" i "ip or host" string default="localhost" no +option "port" p "port to connect to" int default="5001" no diff --git a/dccp_send.c b/dccp_send.c new file mode 100644 index 00000000..ace92dee --- /dev/null +++ b/dccp_send.c @@ -0,0 +1,172 @@ +#include "server.h" +#include "net.h" +#include "list.h" +#include "afs.h" +#include "send.h" +#include "dccp.h" +#include "error.h" +#include "string.h" +#include "server.cmdline.h" +extern struct gengetopt_args_info conf; +/** the list of connected clients **/ +static struct list_head clients; +static int listen_fd = -1; +static struct sender *self; + +/** describes one connected client */ +struct dccp_client { + /** the dccp socket */ + int fd; + /** address information about the client */ + struct sockaddr_in addr; + /** the position of this client in the client list */ + struct list_head node; + int header_sent; /* non-zero if audio file header has been sent */ +}; + +static void dccp_pre_select(__unused struct audio_format *af, int *max_fileno, fd_set *rfds, + __unused fd_set *wfds) +{ + if (listen_fd < 0) + return; + FD_SET(listen_fd, rfds); + *max_fileno = MAX(*max_fileno, listen_fd); +} + +static void dccp_post_select(__unused struct audio_format *af, fd_set *rfds, + __unused fd_set *wfds) +{ + struct dccp_client *dc; + int ret; + + if (!FD_ISSET(listen_fd, rfds)) + return; + PARA_NOTICE_LOG("%s", "accepting...\n"); + dc = para_calloc(sizeof(struct dccp_client)); + ret = para_accept(listen_fd, &dc->addr, sizeof(struct sockaddr_in)); + if (ret < 0) { + PARA_ERROR_LOG("%s", PARA_STRERROR(-ret)); + return; + } + PARA_NOTICE_LOG("%s", "connection\n"); + dc->fd = ret; + list_add(&dc->node, &clients); +} + +static int dccp_open(void) +{ + struct sockaddr_in servaddr; + int ret; + + ret = dccp_get_socket(); + if (ret < 0) + return ret; + listen_fd = ret; + + bzero(&servaddr, sizeof(servaddr)); + servaddr.sin_family = AF_INET; + servaddr.sin_addr.s_addr = htonl(INADDR_ANY); + servaddr.sin_port = htons(conf.dccp_port_arg); + ret = bind(listen_fd, (struct sockaddr *)&servaddr, sizeof(servaddr)); + if (ret < 0) + return -E_DCCP_BIND; + ret = dccp_set_socket(listen_fd); + if (ret < 0) + return ret; + ret = listen(listen_fd, 0); + if (ret < 0) + return -E_DCCP_LISTEN; + PARA_DEBUG_LOG("listening on fd %d\n", listen_fd); + return 1; +} + +static void dccp_shutdown_client(struct dccp_client *dc) +{ + close(dc->fd); + list_del(&dc->node); + free(dc); +} + +static void dccp_send(__unused struct audio_format *af, + long unsigned current_chunk, + __unused long unsigned chunks_sent, const char *buf, size_t len) +{ + struct dccp_client *dc, *tmp; + int ret, header_len; + char *header_buf; + + if (listen_fd < 0 || !len) + return; + + list_for_each_entry_safe(dc, tmp, &clients, node) { + if (!_write_ok(dc->fd)) + continue; + if (!dc->header_sent && af->get_header_info && current_chunk) { + header_buf = af->get_header_info(&header_len); + if (!header_buf || header_len <= 0) + continue; /* header not yet available */ + ret = write(dc->fd, header_buf, header_len); + if (ret != header_len) { + dccp_shutdown_client(dc); + continue; + } + if (!_write_ok(dc->fd)) + continue; + } + PARA_DEBUG_LOG("writing %d bytes to fd %d\n", len, dc->fd); + ret = write(dc->fd, buf, len); + if (ret != len) + dccp_shutdown_client(dc); + } +} + +static void dccp_shutdown_clients(void) +{ + struct dccp_client *dc, *tmp; + + list_for_each_entry_safe(dc, tmp, &clients, node) + dccp_shutdown_client(dc); +} + +static char *dccp_info(void) +{ + static char *buf; + int num_clients = 0; + struct dccp_client *dc, *tmp; + + free(buf); + list_for_each_entry_safe(dc, tmp, &clients, node) + num_clients++; + buf = make_message("%d connected clients\n", num_clients); + return buf; +} + +static char *dccp_help(void) +{ + return make_message("no help available\n"); +} + +void dccp_send_init(struct sender *s) +{ + int ret; + + INIT_LIST_HEAD(&clients); + s->info = dccp_info; + s->send = dccp_send; + s->pre_select = dccp_pre_select; + s->post_select = dccp_post_select; + s->shutdown_clients = dccp_shutdown_clients; + s->help = dccp_help; + s->client_cmds[SENDER_ON] = NULL; + s->client_cmds[SENDER_OFF] = NULL; + s->client_cmds[SENDER_DENY] = NULL; + s->client_cmds[SENDER_ALLOW] = NULL; + s->client_cmds[SENDER_ADD] = NULL; + s->client_cmds[SENDER_DELETE] = NULL; + self = s; + ret = dccp_open(); + if (ret < 0) + PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret)); + else + s->status = SENDER_ON; +} diff --git a/error.h b/error.h index 98ca4ec6..9ed59c15 100644 --- a/error.h +++ b/error.h @@ -24,7 +24,8 @@ enum para_subsystem {SS_RECV, SS_STRING, SS_DAEMON, SS_STAT, SS_TIME, SS_GRAB_CLIENT, SS_HTTP_RECV, SS_RECV_COMMON, SS_FILTER_CHAIN, SS_WAV, SS_COMPRESS, SS_OGGDEC, SS_FILTER, SS_COMMAND, SS_RANDOM_DBTOOL, SS_PLM_DBTOOL, SS_CRYPT, SS_HTTP_SEND, SS_ORTP_SEND, SS_DB, SS_OGG, - SS_MP3, SS_MP3DEC, SS_SERVER, SS_AFS, SS_MYSQL, SS_IPC, SS_RINGBUFFER}; + SS_MP3, SS_MP3DEC, SS_SERVER, SS_AFS, SS_MYSQL, SS_IPC, SS_DCCP, SS_DCCP_RECV, + SS_DCCP_SEND, SS_RINGBUFFER}; #define NUM_SS (SS_RINGBUFFER + 1) extern const char **para_errlist[]; /** \endcond */ @@ -236,6 +237,23 @@ extern const char **para_errlist[]; PARA_ERROR(SHM_ATTACH, "can not attach shared memory area"), \ PARA_ERROR(SHM_DETACH, "can not detach shared memory area"), \ + +#define DCCP_ERRORS \ + PARA_ERROR(DCCP_SOCKET, "can not create dccp socket"), \ + PARA_ERROR(DCCP_PACKET_SIZE, "failed to set dccp packet size"), \ + PARA_ERROR(DCCP_SERVICE, "could not get service code"), \ + + +#define DCCP_RECV_ERRORS \ + PARA_ERROR(ADDR_INFO, "getaddrinfo error"), \ + PARA_ERROR(DCCP_OVERRUN, "dccp output buffer buffer overrun"), \ + PARA_ERROR(DCCP_CONNECT, "dccp connect error"), \ + +#define DCCP_SEND_ERRORS \ + PARA_ERROR(DCCP_BIND, "dccp bind error"), \ + PARA_ERROR(DCCP_LISTEN, "dccp listen error"), \ + + /* these do not need error handling (yet) */ #define SERVER_ERRORS #define WAV_ERRORS @@ -350,6 +368,9 @@ SS_ENUM(ORTP_SEND); SS_ENUM(DB); SS_ENUM(MYSQL); SS_ENUM(IPC); +SS_ENUM(DCCP); +SS_ENUM(DCCP_RECV); +SS_ENUM(DCCP_SEND); SS_ENUM(RINGBUFFER); /** \endcond */ #undef PARA_ERROR diff --git a/http_recv.ggo b/http_recv.ggo index 5c4ef4ec..925e4e64 100644 --- a/http_recv.ggo +++ b/http_recv.ggo @@ -1,2 +1,3 @@ +section "options of the http receiver" option "host" i "ip or host" string default="localhost" no option "port" p "tcp port to connect to" int default="8000" no diff --git a/recv.c b/recv.c index 31c18a31..c7bca83a 100644 --- a/recv.c +++ b/recv.c @@ -122,6 +122,6 @@ out: if (r) r->shutdown(); if (ret < 0) - PARA_NOTICE_LOG("%d: (%s)\n", ret, PARA_STRERROR(-ret)); + PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret)); return ret; } diff --git a/recv.h b/recv.h index 3284c29a..f7fc7259 100644 --- a/recv.h +++ b/recv.h @@ -149,6 +149,8 @@ struct receiver { /** \cond */ extern void http_recv_init(struct receiver *r); #define HTTP_RECEIVER {.name = "http", .init = http_recv_init}, +extern void dccp_recv_init(struct receiver *r); +#define DCCP_RECEIVER {.name = "dccp", .init = dccp_recv_init}, #ifdef HAVE_ORTP extern void ortp_recv_init(struct receiver *r); @@ -166,6 +168,7 @@ extern void (*crypt_function_send)(unsigned long len, const unsigned char *indat #define DEFINE_RECEIVER_ARRAY struct receiver receivers[] = { \ HTTP_RECEIVER \ + DCCP_RECEIVER \ ORTP_RECEIVER \ {.name = NULL}}; diff --git a/send.h b/send.h index 2c6c7274..75c13731 100644 --- a/send.h +++ b/send.h @@ -84,3 +84,22 @@ struct sender { */ int (*client_cmds[NUM_SENDER_CMDS])(struct sender_command_data*); }; + + + +static inline int _write_ok(int fd) +{ + struct timeval tv = {0, 0}; + fd_set wfds; + int ret; +again: + FD_ZERO(&wfds); + FD_SET(fd, &wfds); + ret = select(fd + 1, NULL, &wfds, NULL, &tv); + if (ret < 0 && errno == EINTR) + goto again; + if (ret < 0) + ret = 0; + return ret; +} + diff --git a/server.c b/server.c index 538d201b..67d9bce4 100644 --- a/server.c +++ b/server.c @@ -56,6 +56,7 @@ struct misc_meta_data *mmd; */ struct gengetopt_args_info conf; char *user_list = NULL; +extern void dccp_send_init(struct sender *); extern void http_send_init(struct sender *); extern void ortp_send_init(struct sender *); extern struct audio_format afl[]; @@ -94,6 +95,10 @@ struct sender senders[] = { .name = "http", .init = http_send_init, }, + { + .name = "dccp", + .init = dccp_send_init, + }, #ifdef HAVE_ORTP { .name = "ortp", @@ -316,7 +321,7 @@ static void init_random_seed(void) int fd, ret = -1, len = sizeof(unsigned int); unsigned int seed; - fd = open("/dev/random", O_RDONLY); + fd = open("/dev/urandom", O_RDONLY); if (fd < 0) goto out; ret = -2; diff --git a/server.ggo b/server.ggo index a7db080a..418b2b4a 100644 --- a/server.ggo +++ b/server.ggo @@ -35,6 +35,9 @@ option "http_access" - "Add given host/network to access control list (whitelist option "http_no_autostart" - "do not open tcp port on server startup" flag off option "http_max_clients" - "maximal simultaneous connections, non-positive value means unlimited" int typestr="number" default="-1" no +section "Dccp sender options" +option "dccp_port" - "port for http streaming" int typestr="portnumber" default="5001" no + section "Ortp sender options" option "ortp_target" - "Add given host/port to the list of targets. This option can be given multiple times. Example: '224.0.1.38:1500' instructs the ortp sender to send to udp port 1500 on host 224.0.1.38 (unassigned ip in the Local Network Control Block 224.0.0/24). This is useful for LAN-streaming." string typestr="a.b.c.d:p" no multiple option "ortp_no_autostart" - "do not start to send automatically" flag off -- 2.39.2