From: Andre Noll Date: Fri, 25 Nov 2011 12:13:35 +0000 (+0100) Subject: Merge branch 't/generic_match_count' X-Git-Tag: v0.4.9~3 X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=commitdiff_plain;h=4867d9a4948aadde4dd59436e1c5fb0fc141ba89;hp=e65bdaa6b207d5cd409acaa5d0d33fe85dd7a28d Merge branch 't/generic_match_count' --- diff --git a/Makefile.in b/Makefile.in index 68aa6bc9..23ceda47 100644 --- a/Makefile.in +++ b/Makefile.in @@ -66,7 +66,6 @@ CPPFLAGS += -I$(cmdline_dir) CPPFLAGS += @osl_cppflags@ man_pages := $(patsubst %, man/man1/%.1, @executables@) -man_pages_in := $(patsubst %, web/%.man.in.html, @executables@) ggo_dir := ggo object_dir := objects @@ -120,8 +119,6 @@ tarball: $(tarball) @[ -z "$(Q)" ] || echo 'GEN $@' $(Q) ./command_util.sh man < $< > $@ -server_command_lists_ch = server_command_list.c afs_command_list.c \ - server_command_list.h afs_command_list.h server_command_lists_man = server_command_list.man afs_command_list.man man/man1/para_server.1: para_server $(server_command_lists_man) | $(man_dir) @[ -z "$(Q)" ] || echo 'MAN $<' @@ -255,7 +252,7 @@ para_gui: $(gui_objs) @[ -z "$(Q)" ] || echo 'LD $@' $(Q) $(CC) $(LDFLAGS) -o $@ $(gui_objs) -lcurses -para_audiod: audiod_command_list.c audiod_command_list.h $(audiod_objs) +para_audiod: $(audiod_objs) @[ -z "$(Q)" ] || echo 'LD $@' $(Q) $(CC) $(LDFLAGS) -o $@ $(audiod_objs) @audiod_ldflags@ @@ -267,7 +264,7 @@ para_fade: $(fade_objs) @[ -z "$(Q)" ] || echo 'LD $@' $(Q) $(CC) $(LDFLAGS) -o $@ $(fade_objs) @fade_ldflags@ -para_server: $(server_command_lists_ch) $(server_objs) +para_server: $(server_objs) @[ -z "$(Q)" ] || echo 'LD $@' $(Q) $(CC) $(LDFLAGS) -o $@ $(server_objs) @server_ldflags@ diff --git a/NEWS b/NEWS index a79cc29f..0b8b33e8 100644 --- a/NEWS +++ b/NEWS @@ -15,6 +15,11 @@ communication between para_server and para_audiod. - The alsa writer eats up less CPU cycles when configured to use the DMIX plugin. + - Simplified and unified receiver code. + - Makefile cleanups. + - Commands which print a list of matching audio files now + emit a meaningful error message if no audio file matched the + given pattern(s). -------------------------------------- 0.4.8 (2011-08-19) "nested assignment" diff --git a/audiod.c b/audiod.c index 4864c855..8008e2cb 100644 --- a/audiod.c +++ b/audiod.c @@ -1381,7 +1381,7 @@ int main(int argc, char *argv[]) init_command_task(cmd_task); if (conf.daemon_given) - daemonize(); + daemonize(false /* parent exits immediately */); register_task(&sig_task->task); register_task(&cmd_task->task); diff --git a/daemon.c b/daemon.c index ffdec4e3..0bf2f0ac 100644 --- a/daemon.c +++ b/daemon.c @@ -12,6 +12,7 @@ #include #include #include +#include #include "para.h" #include "daemon.h" @@ -142,14 +143,25 @@ static bool daemon_test_flag(unsigned flag) return me->flags & flag; } +static void dummy_sighandler(__a_unused int s) +{ +} + /** * Do the usual stuff to become a daemon. * - * Fork, become session leader, dup fd 0, 1, 2 to /dev/null. + * \param parent_waits Whether the parent process should pause before exit. * - * \sa fork(2), setsid(2), dup(2). + * Fork, become session leader, cd to /, and dup fd 0, 1, 2 to /dev/null. If \a + * parent_waits is false, the parent process terminates immediately. + * Otherwise, it calls pause() to sleep until it receives \p SIGTERM or \p + * SIGCHLD and exits successfully thereafter. This behaviour is useful if the + * daemon process should not detach from the console until the child process + * has completed its setup. + * + * \sa fork(2), setsid(2), dup(2), pause(2). */ -void daemonize(void) +void daemonize(bool parent_waits) { pid_t pid; int null; @@ -158,8 +170,14 @@ void daemonize(void) pid = fork(); if (pid < 0) goto err; - if (pid) + if (pid) { + if (parent_waits) { + signal(SIGTERM, dummy_sighandler); + signal(SIGCHLD, dummy_sighandler); + pause(); + } exit(EXIT_SUCCESS); /* parent exits */ + } /* become session leader */ if (setsid() < 0) goto err; diff --git a/daemon.h b/daemon.h index d5583f58..3fe72ea9 100644 --- a/daemon.h +++ b/daemon.h @@ -1,7 +1,7 @@ /** \file daemon.h exported symbols from daemon.c */ -void daemonize(void); +void daemonize(bool parent_waits); void daemon_open_log_or_die(void); void daemon_close_log(void); void log_welcome(const char *whoami); diff --git a/dccp_recv.c b/dccp_recv.c index a9eab006..41518539 100644 --- a/dccp_recv.c +++ b/dccp_recv.c @@ -27,33 +27,15 @@ #include "dccp_recv.cmdline.h" -/** - * data specific to the dccp receiver - * - * \sa receiver receiver_node - */ -struct private_dccp_recv_data { - /** the file descriptor for the dccp socket */ - int fd; - struct btr_pool *btrp; -}; - static void dccp_recv_close(struct receiver_node *rn) { - struct private_dccp_recv_data *pdd = rn->private_data; - - if (!pdd) - return; - if (pdd->fd > 0) - close(pdd->fd); - btr_pool_free(pdd->btrp); - free(pdd); - rn->private_data = NULL; + if (rn->fd > 0) + close(rn->fd); + btr_pool_free(rn->btrp); } static int dccp_recv_open(struct receiver_node *rn) { - struct private_dccp_recv_data *pdd; struct dccp_recv_args_info *conf = rn->conf; struct flowopts *fo = NULL; uint8_t *ccids = NULL; @@ -87,9 +69,8 @@ static int dccp_recv_open(struct receiver_node *rn) ret = mark_fd_nonblocking(fd); if (ret < 0) goto err; - rn->private_data = pdd = para_calloc(sizeof(struct private_dccp_recv_data)); - pdd->btrp = btr_pool_new("dccp_recv", 320 * 1024); - pdd->fd = fd; + rn->btrp = btr_pool_new("dccp_recv", 320 * 1024); + rn->fd = fd; return 1; err: close(fd); @@ -136,18 +117,16 @@ static void *dccp_recv_parse_config(int argc, char **argv) static void dccp_recv_pre_select(struct sched *s, struct task *t) { struct receiver_node *rn = container_of(t, struct receiver_node, task); - struct private_dccp_recv_data *pdd = rn->private_data; t->error = 0; if (generic_recv_pre_select(s, t) <= 0) return; - para_fd_set(pdd->fd, &s->rfds, &s->max_fileno); + para_fd_set(rn->fd, &s->rfds, &s->max_fileno); } static void dccp_recv_post_select(struct sched *s, struct task *t) { struct receiver_node *rn = container_of(t, struct receiver_node, task); - struct private_dccp_recv_data *pdd = rn->private_data; struct btr_node *btrn = rn->btrn; struct iovec iov[2]; int ret, iovcnt; @@ -156,18 +135,18 @@ static void dccp_recv_post_select(struct sched *s, struct task *t) ret = btr_node_status(btrn, 0, BTR_NT_ROOT); if (ret <= 0) goto out; - iovcnt = btr_pool_get_buffers(pdd->btrp, iov); + iovcnt = btr_pool_get_buffers(rn->btrp, iov); ret = -E_DCCP_OVERRUN; if (iovcnt == 0) goto out; - ret = readv_nonblock(pdd->fd, iov, iovcnt, &s->rfds, &num_bytes); + ret = readv_nonblock(rn->fd, iov, iovcnt, &s->rfds, &num_bytes); if (num_bytes == 0) goto out; if (num_bytes <= iov[0].iov_len) /* only the first buffer was filled */ - btr_add_output_pool(pdd->btrp, num_bytes, btrn); + btr_add_output_pool(rn->btrp, num_bytes, btrn); else { /* both buffers contain data */ - btr_add_output_pool(pdd->btrp, iov[0].iov_len, btrn); - btr_add_output_pool(pdd->btrp, num_bytes - iov[0].iov_len, btrn); + btr_add_output_pool(rn->btrp, iov[0].iov_len, btrn); + btr_add_output_pool(rn->btrp, num_bytes - iov[0].iov_len, btrn); } out: if (ret >= 0) diff --git a/http_recv.c b/http_recv.c index 5717692c..e6031e75 100644 --- a/http_recv.c +++ b/http_recv.c @@ -44,24 +44,6 @@ struct private_http_recv_data { * \sa receiver::open, receiver_node. */ enum http_recv_status status; - /** - * The file descriptor used for receiving the http stream. - * - * The pre_select function of the http receiver adds this file descriptor to - * the set of file descriptors which are checked for reading/writing (depending - * on the current status) by the select loop of the application (para_audiod or - * para_recv). - * - * The post_select function of the http receiver uses \a fd, if ready, to - * establish the http connection, and updates \a status according to the new - * state of the connection. As soon as \a status is \p HTTP_STREAMING, \a fd is - * going to be only checked for reading. If data is available, it is read into - * the output buffer of the receiver node by post_select. - * - * \sa receiver::pre_select receiver::post_select receiver_node, http_recv_status - */ - int fd; - struct btr_pool *btrp; }; static char *make_request_msg(void) @@ -82,43 +64,48 @@ static void http_recv_pre_select(struct sched *s, struct task *t) if (generic_recv_pre_select(s, t) <= 0) return; if (phd->status == HTTP_CONNECTED) - para_fd_set(phd->fd, &s->wfds, &s->max_fileno); + para_fd_set(rn->fd, &s->wfds, &s->max_fileno); else - para_fd_set(phd->fd, &s->rfds, &s->max_fileno); + para_fd_set(rn->fd, &s->rfds, &s->max_fileno); } +/* + * Establish the http connection. If already established, fill the buffer pool + * area with data read from the socket. In any case, update the state of the + * connection if necessary. + */ static void http_recv_post_select(struct sched *s, struct task *t) { struct receiver_node *rn = container_of(t, struct receiver_node, task); struct private_http_recv_data *phd = rn->private_data; struct btr_node *btrn = rn->btrn; - int ret; - char *buf; - size_t sz, n; + int ret, iovcnt; + struct iovec iov[2]; + size_t num_bytes; t->error = 0; ret = btr_node_status(btrn, 0, BTR_NT_ROOT); if (ret < 0) - goto err; + goto out; if (ret == 0) return; if (phd->status == HTTP_CONNECTED) { char *rq; - if (!FD_ISSET(phd->fd, &s->wfds)) + if (!FD_ISSET(rn->fd, &s->wfds)) return; rq = make_request_msg(); PARA_INFO_LOG("sending http request\n"); - ret = send_va_buffer(phd->fd, "%s", rq); + ret = send_va_buffer(rn->fd, "%s", rq); free(rq); if (ret < 0) - goto err; + goto out; phd->status = HTTP_SENT_GET_REQUEST; return; } if (phd->status == HTTP_SENT_GET_REQUEST) { - ret = read_pattern(phd->fd, HTTP_OK_MSG, strlen(HTTP_OK_MSG), &s->rfds); + ret = read_pattern(rn->fd, HTTP_OK_MSG, strlen(HTTP_OK_MSG), &s->rfds); if (ret < 0) - goto err; + goto out; if (ret == 0) return; PARA_INFO_LOG("received ok msg, streaming\n"); @@ -126,25 +113,29 @@ static void http_recv_post_select(struct sched *s, struct task *t) return; } ret = -E_HTTP_RECV_OVERRUN; - sz = btr_pool_get_buffer(phd->btrp, &buf); - if (sz == 0) - goto err; - ret = read_nonblock(phd->fd, buf, sz, &s->rfds, &n); - if (n > 0) - btr_add_output_pool(phd->btrp, n, btrn); + iovcnt = btr_pool_get_buffers(rn->btrp, iov); + if (iovcnt == 0) + goto out; + ret = readv_nonblock(rn->fd, iov, iovcnt, &s->rfds, &num_bytes); + if (num_bytes == 0) + goto out; + if (num_bytes <= iov[0].iov_len) /* only the first buffer was filled */ + btr_add_output_pool(rn->btrp, num_bytes, btrn); + else { /* both buffers contain data */ + btr_add_output_pool(rn->btrp, iov[0].iov_len, btrn); + btr_add_output_pool(rn->btrp, num_bytes - iov[0].iov_len, btrn); + } +out: if (ret >= 0) return; -err: btr_remove_node(rn->btrn); t->error = ret; } static void http_recv_close(struct receiver_node *rn) { - struct private_http_recv_data *phd = rn->private_data; - - close(phd->fd); - btr_pool_free(phd->btrp); + close(rn->fd); + btr_pool_free(rn->btrp); free(rn->private_data); } @@ -174,9 +165,9 @@ static int http_recv_open(struct receiver_node *rn) return ret; } rn->private_data = phd = para_calloc(sizeof(struct private_http_recv_data)); - phd->fd = fd; + rn->fd = fd; phd->status = HTTP_CONNECTED; - phd->btrp = btr_pool_new("http_recv", 320 * 1024); + rn->btrp = btr_pool_new("http_recv", 320 * 1024); return 1; } diff --git a/recv.h b/recv.h index 7555dfb4..92b63d27 100644 --- a/recv.h +++ b/recv.h @@ -20,6 +20,21 @@ struct receiver_node { struct task task; /** The receiver node is always the root of the buffer tree. */ struct btr_node *btrn; + /** Each receiver node maintains a buffer pool for the received data. */ + struct btr_pool *btrp; + /** + * The file descriptor to receive the stream. + * + * The pre_select function of the receiver adds this file descriptor to + * the set of file descriptors which are watched for readability or + * writability, depending on the state of the connection (if any). + * + * If \a fd is readable, the post_select function of the receiver reads + * data from this fd into the buffer pool area of \a btrp. + * + * \sa \ref receiver. + */ + int fd; }; /** diff --git a/server.c b/server.c index 0cfac607..7f020c8f 100644 --- a/server.c +++ b/server.c @@ -491,10 +491,9 @@ static void server_init(int argc, char **argv) gettimeofday(now, NULL); set_server_start_time(now); init_user_list(user_list_file); - init_server_command_task(argc, argv); /* become daemon */ if (conf.daemon_given) - daemonize(); + daemonize(true /* parent waits for SIGTERM */); PARA_NOTICE_LOG("initializing audio format handlers\n"); afh_init(); @@ -519,6 +518,9 @@ static void server_init(int argc, char **argv) para_unblock_signal(SIGCHLD); PARA_NOTICE_LOG("initializing virtual streaming system\n"); init_vss_task(afs_socket); + init_server_command_task(argc, argv); + if (conf.daemon_given) + kill(getppid(), SIGTERM); PARA_NOTICE_LOG("server init complete\n"); } diff --git a/udp_recv.c b/udp_recv.c index 45d24eae..964b60d4 100644 --- a/udp_recv.c +++ b/udp_recv.c @@ -22,25 +22,13 @@ #include "fd.h" #include "buffer_tree.h" -/** - * Data specific to the udp receiver. - * - * \sa \ref receiver, \ref receiver_node. - */ -struct private_udp_recv_data { - /** The socket file descriptor. */ - int fd; - struct btr_pool *btrp; -}; - static void udp_recv_pre_select(struct sched *s, struct task *t) { struct receiver_node *rn = container_of(t, struct receiver_node, task); - struct private_udp_recv_data *purd = rn->private_data; if (generic_recv_pre_select(s, t) <= 0) return; - para_fd_set(purd->fd, &s->rfds, &s->max_fileno); + para_fd_set(rn->fd, &s->rfds, &s->max_fileno); } static int udp_check_eof(size_t sz, struct iovec iov[2]) @@ -64,7 +52,6 @@ static int udp_check_eof(size_t sz, struct iovec iov[2]) static void udp_recv_post_select(__a_unused struct sched *s, struct task *t) { struct receiver_node *rn = container_of(t, struct receiver_node, task); - struct private_udp_recv_data *purd = rn->private_data; struct btr_node *btrn = rn->btrn; size_t num_bytes; struct iovec iov[2]; @@ -74,11 +61,11 @@ static void udp_recv_post_select(__a_unused struct sched *s, struct task *t) ret = btr_node_status(btrn, 0, BTR_NT_ROOT); if (ret <= 0) goto out; - iovcnt = btr_pool_get_buffers(purd->btrp, iov); + iovcnt = btr_pool_get_buffers(rn->btrp, iov); ret = -E_UDP_OVERRUN; if (iovcnt == 0) goto out; - ret = readv_nonblock(purd->fd, iov, iovcnt, &s->rfds, &num_bytes); + ret = readv_nonblock(rn->fd, iov, iovcnt, &s->rfds, &num_bytes); if (num_bytes == 0) goto out; readv_ret = ret; @@ -86,10 +73,10 @@ static void udp_recv_post_select(__a_unused struct sched *s, struct task *t) if (ret < 0) goto out; if (iov[0].iov_len >= num_bytes) - btr_add_output_pool(purd->btrp, num_bytes, btrn); + btr_add_output_pool(rn->btrp, num_bytes, btrn); else { /* both buffers contain data */ - btr_add_output_pool(purd->btrp, iov[0].iov_len, btrn); - btr_add_output_pool(purd->btrp, num_bytes - iov[0].iov_len, + btr_add_output_pool(rn->btrp, iov[0].iov_len, btrn); + btr_add_output_pool(rn->btrp, num_bytes - iov[0].iov_len, btrn); } ret = readv_ret; @@ -98,18 +85,15 @@ out: return; btr_remove_node(btrn); t->error = ret; - close(purd->fd); - purd->fd = -1; + close(rn->fd); + rn->fd = -1; } static void udp_recv_close(struct receiver_node *rn) { - struct private_udp_recv_data *purd = rn->private_data; - - if (purd->fd >= 0) - close(purd->fd); - btr_pool_free(purd->btrp); - free(rn->private_data); + if (rn->fd >= 0) + close(rn->fd); + btr_pool_free(rn->btrp); } static void *udp_recv_parse_config(int argc, char **argv) @@ -188,36 +172,31 @@ err: static int udp_recv_open(struct receiver_node *rn) { - struct private_udp_recv_data *purd; struct udp_recv_args_info *c = rn->conf; char *iface = c->iface_given ? c->iface_arg : NULL; int ret; - rn->private_data = para_calloc(sizeof(struct private_udp_recv_data)); - purd = rn->private_data; - ret = makesock(IPPROTO_UDP, 1, c->host_arg, c->port_arg, NULL); if (ret < 0) goto err; - purd->fd = ret; + rn->fd = ret; - ret = mcast_receiver_setup(purd->fd, iface); + ret = mcast_receiver_setup(rn->fd, iface); if (ret < 0) { - close(purd->fd); + close(rn->fd); goto err; } - ret = mark_fd_nonblocking(purd->fd); + ret = mark_fd_nonblocking(rn->fd); if (ret < 0) { - close(purd->fd); + close(rn->fd); goto err; } PARA_INFO_LOG("receiving from %s:%d, fd=%d\n", c->host_arg, - c->port_arg, purd->fd); - purd->btrp = btr_pool_new("udp_recv", 320 * 1024); - return purd->fd; + c->port_arg, rn->fd); + rn->btrp = btr_pool_new("udp_recv", 320 * 1024); + return rn->fd; err: - free(rn->private_data); return ret; } diff --git a/vss.c b/vss.c index e336a9e7..4a8aafa8 100644 --- a/vss.c +++ b/vss.c @@ -361,8 +361,17 @@ static void vss_get_chunk(int chunk_num, struct vss_task *vsst, static void compute_group_size(struct vss_task *vsst, struct fec_group *g, int max_bytes) { + char *buf; + size_t len; int i, max_chunks = PARA_MAX(1LU, 150 / tv2ms(vss_chunk_time())); + if (g->first_chunk == 0) { + g->num_chunks = 1; + vss_get_chunk(0, vsst, &buf, &len); + g->bytes = len; + return; + } + g->num_chunks = 0; g->bytes = 0; /* @@ -372,8 +381,6 @@ static void compute_group_size(struct vss_task *vsst, struct fec_group *g, * of exactly one chunk for these audio formats. */ for (i = 0;; i++) { - char *buf; - size_t len; int chunk_num = g->first_chunk + i; if (g->bytes > 0 && i >= max_chunks) /* duration limit */ @@ -502,7 +509,7 @@ static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst) { int ret, i, k, n, data_slices; size_t len; - char *buf; + char *buf, *p; struct fec_group *g = &fc->group; if (fc->state == FEC_STATE_NONE) { @@ -562,16 +569,20 @@ static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst) assert(i == g->num_header_slices - 1); } - /* setup data slices */ + /* + * Setup data slices. Note that for ogg streams chunk 0 points to a + * buffer on the heap rather than to the mapped audio file. + */ vss_get_chunk(g->first_chunk, vsst, &buf, &len); - for (; i < g->num_header_slices + data_slices; i++) { - if (buf + g->slice_bytes > vsst->map + mmd->size) { + for (p = buf; i < g->num_header_slices + data_slices; i++) { + if (p + g->slice_bytes > buf + g->bytes) { /* - * Can not use the memory mapped audio file for this - * slice as it goes beyond the map. + * We must make a copy for this slice since using p + * directly would exceed the buffer. */ - uint32_t payload_size = vsst->map + mmd->size - buf; - memcpy(fc->extra_src_buf, buf, payload_size); + uint32_t payload_size = buf + g->bytes - p; + assert(payload_size + FEC_HEADER_SIZE <= fc->mps); + memcpy(fc->extra_src_buf, p, payload_size); if (payload_size < g->slice_bytes) memset(fc->extra_src_buf + payload_size, 0, g->slice_bytes - payload_size); @@ -579,8 +590,8 @@ static int setup_next_fec_group(struct fec_client *fc, struct vss_task *vsst) i++; break; } - fc->src_data[i] = (const unsigned char *)buf; - buf += g->slice_bytes; + fc->src_data[i] = (const unsigned char *)p; + p += g->slice_bytes; } if (i < k) { /* use arbitrary data for all remaining slices */ @@ -1079,8 +1090,6 @@ static void vss_send(struct vss_task *vsst) senders[i].send(mmd->current_chunk, mmd->chunks_sent, buf, len, vsst->header_buf, vsst->header_len); } - mmd->chunks_sent++; - mmd->current_chunk++; /* * Prefault next chunk(s) * @@ -1093,11 +1102,15 @@ static void vss_send(struct vss_task *vsst) * eliminate the delays completely. Moreover, it is supported * only on Linux. So we do our own read-ahead here. */ - buf += len; - for (i = 0; i < 5 && buf < vsst->map + mmd->size; i++) { - __a_unused volatile char x = *buf; - buf += 4096; + if (mmd->current_chunk > 0) { /* chunk 0 might be on the heap */ + buf += len; + for (i = 0; i < 5 && buf < vsst->map + mmd->size; i++) { + __a_unused volatile char x = *buf; + buf += 4096; + } } + mmd->chunks_sent++; + mmd->current_chunk++; } }