From 0c395e45939ec272214ea14f3c07fc925a5697f5 Mon Sep 17 00:00:00 2001 From: Gerrit Renker Date: Sun, 30 May 2010 18:06:33 +0200 Subject: [PATCH] udp: initialize common structure only once When temporarily closing the UDP stream (e.g. due to 'next' or 'ff' command), the UDP file descriptor need not be closed. Hence this patch moves its initialization into the add_target() function. The patch also factors global variables out of mcast_sender_setup() function. --- udp_send.c | 122 +++++++++++++++++++++++++---------------------------- 1 file changed, 57 insertions(+), 65 deletions(-) diff --git a/udp_send.c b/udp_send.c index 49f84acd..08429aa4 100644 --- a/udp_send.c +++ b/udp_send.c @@ -51,13 +51,14 @@ static int sender_status; static void udp_close_target(struct sender_client *sc) { - if (sc->fd < 0) - return; - close(sc->fd); - del_close_on_fork_list(sc->fd); - cq_destroy(sc->cq); - sc->cq = NULL; - sc->fd = -1; + if (sc->cq != NULL) { + del_close_on_fork_list(sc->fd); + cq_destroy(sc->cq); + free(sc->name); + sc->name = NULL; + sc->cq = NULL; + + } } static void udp_delete_target(struct udp_target *ut, const char *msg) @@ -76,26 +77,28 @@ static void udp_delete_target(struct udp_target *ut, const char *msg) * Perform AF-independent multicast sender setup. * * \param sc The connected sender client. - * \param ttl UDPv4 multicast TTL or UDPv6 multicast number of hops. - * Use -1 to mean default, 0..255 otherwise. - * \param iface The outgoing multicast interface, or NULL for the default. * * \return Zero if okay, negative on error. */ -static int mcast_sender_setup(struct sender_client *sc, int ttl, char *iface) +static int mcast_sender_setup(struct sender_client *sc) { struct sockaddr_storage ss; socklen_t sslen = sizeof(ss); - + int ttl = conf.udp_ttl_arg, id = 0; const int on = 1; - int id = iface == NULL ? 0 : if_nametoindex(iface); + + if (conf.udp_mcast_iface_given) { + char *iface = conf.udp_mcast_iface_arg; + + id = if_nametoindex(iface); + if (id == 0) + PARA_WARNING_LOG("could not resolve interface '%s', " + "using default", iface); + } if (getpeername(sc->fd, (struct sockaddr *)&ss, &sslen) < 0) goto err; - if (iface != NULL && id == 0) - PARA_WARNING_LOG("could not resolve interface %s, using default", iface); - /* RFC 3493, 5.2: -1 means 'use kernel default' */ if (ttl < 0 || ttl > 255) ttl = -1; @@ -151,56 +154,28 @@ err: /** The maximal size of the per-target chunk queue. */ #define UDP_CQ_BYTES 40000 -static int udp_init_session(struct sender_client *sc) +static void udp_init_session(struct sender_client *sc) { - struct udp_target *ut = sc->private_data; - int ret; - char *iface = NULL; - - if (sc->fd >= 0) /* nothing to do */ - return 0; - - ret = para_connect_simple(IPPROTO_UDP, ut->host, ut->port); - if (ret < 0) - return ret; - sc->fd = ret; - - if (conf.udp_mcast_iface_given) - iface = conf.udp_mcast_iface_arg; - - ret = mcast_sender_setup(sc, conf.udp_ttl_arg, iface); - if (ret < 0) { - close(sc->fd); - return ret; - } - - ret = mark_fd_nonblocking(sc->fd); - if (ret < 0) { - close(sc->fd); - return ret; + if (sc->cq == NULL) { + add_close_on_fork_list(sc->fd); + sc->cq = cq_new(UDP_CQ_BYTES); + sc->name = para_strdup(remote_name(sc->fd)); + PARA_NOTICE_LOG("sending to udp %s\n", sc->name); } - add_close_on_fork_list(sc->fd); - sc->cq = cq_new(UDP_CQ_BYTES); - PARA_NOTICE_LOG("sending to udp %s#%d\n", ut->host, ut->port); - return 1; } static void udp_shutdown_targets(void) { struct sender_client *sc, *tmp; - const char *buf = NULL; - size_t len = 0; /* STFU, gcc */ - - list_for_each_entry_safe(sc, tmp, &targets, node) { - int ubuntu_glibc_headers_suck; - if (sc->fd < 0) - continue; - if (!buf) - len = vss_get_fec_eof_packet(&buf); - /* ignore return value, we're closing the target anyway. */ - ubuntu_glibc_headers_suck = write(sc->fd, buf, len); /* STFU */ - udp_close_target(sc); - } + const char *buf; + size_t len = vss_get_fec_eof_packet(&buf); + + list_for_each_entry_safe(sc, tmp, &targets, node) + if (sc->cq != NULL) { + /* ignore return value, closing the target anyway. */ + (void)write(sc->fd, buf, len); + udp_close_target(sc); + } } static int udp_com_on(__a_unused struct sender_command_data *scd) @@ -236,11 +211,9 @@ static int udp_com_delete(struct sender_command_data *scd) /** Initialize UDP session and set maximum payload size. */ static int udp_init_fec(struct sender_client *sc) { - int mps, ret = udp_init_session(sc); - - if (ret < 0) - return ret; + int mps; + udp_init_session(sc); mps = generic_max_transport_msg_size(sc->fd) - sizeof(struct udphdr); PARA_INFO_LOG("current MPS = %d bytes\n", mps); return mps; @@ -282,6 +255,7 @@ fail: static void udp_add_target(struct sender_command_data *scd) { + int ret, port = scd->port > 0 ? scd->port : conf.udp_default_port_arg; struct udp_target *ut = para_calloc(sizeof(*ut)); strncpy(ut->host, scd->host, sizeof(ut->host)); @@ -294,12 +268,30 @@ static void udp_add_target(struct sender_command_data *scd) ut->fcp.send_fec = udp_send_fec; ut->sc = para_calloc(sizeof(*ut->sc)); - ut->sc->fd = -1; /* not yet connected */ ut->sc->private_data = ut; - ut->fc = vss_add_fec_client(ut->sc, &ut->fcp); + ut->sc->fd = -1; + ret = para_connect_simple(IPPROTO_UDP, scd->host, port); + if (ret < 0) + goto err; + ut->sc->fd = ret; + ret = mcast_sender_setup(ut->sc); + if (ret < 0) + goto err; + ret = mark_fd_nonblocking(ut->sc->fd); + if (ret < 0) + goto err; PARA_INFO_LOG("adding to target list (%s#%d)\n", ut->host, ut->port); + ut->fc = vss_add_fec_client(ut->sc, &ut->fcp); para_list_add(&ut->sc->node, &targets); + return; +err: + if (ut->sc->fd >= 0) + close(ut->sc->fd); + PARA_NOTICE_LOG("failed to set up %s#%d (%s)- not adding it\n", + scd->host, port, para_strerror(-ret)); + free(ut->sc); + free(ut); } static int udp_com_add(struct sender_command_data *scd) -- 2.39.2