X-Git-Url: http://git.tuebingen.mpg.de/?a=blobdiff_plain;f=udp_send.c;h=9e8bc0482cdf79153ded26ad08777bff044f938b;hb=b74c8941e36860a34483782cf91e449212401664;hp=8f5d796bee64ba602a59c0b066b6aa0c2c98f2fb;hpb=141b0fd36edcad22ee07c65b101e90064d8567b6;p=paraslash.git diff --git a/udp_send.c b/udp_send.c index 8f5d796b..9e8bc048 100644 --- a/udp_send.c +++ b/udp_send.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2005-2008 Andre Noll + * Copyright (C) 2005-2009 Andre Noll * * Licensed under the GPL v2. For licencing details see COPYING. */ @@ -25,6 +25,7 @@ #include "net.h" #include "fd.h" #include "sched.h" +#include "close_on_fork.h" /** Convert in_addr to ascii. */ @@ -49,8 +50,10 @@ static void udp_delete_target(struct udp_target *ut, const char *msg) { PARA_NOTICE_LOG("deleting %s:%d (%s) from list\n", TARGET_ADDR(ut), ut->port, msg); - if (ut->fd >= 0) + if (ut->fd >= 0) { close(ut->fd); + del_close_on_fork_list(ut->fd); + } list_del(&ut->node); free(ut); } @@ -61,21 +64,35 @@ static void udp_send_buf(char *buf, size_t len) int ret; list_for_each_entry_safe(ut, tmp, &targets, node) { - size_t written = len; if (ut->fd < 0) continue; - ret = write_all(ut->fd, buf, &written); - if (ret < 0) + ret = write_nonblock(ut->fd, buf, len, len); + if (ret < 0) /* TODO: Use chunk queueing */ return udp_delete_target(ut, "send error"); - if (written != len) - PARA_WARNING_LOG("short write %zu/%zu\n", written, len); + if (ret != len) + PARA_WARNING_LOG("short write %zu/%zu\n", ret, len); } } -static void udp_init_session(struct udp_target *ut) +static int udp_init_session(struct udp_target *ut) { + int ret; + + if (ut->fd >= 0) /* nothing to do */ + return 0; PARA_NOTICE_LOG("sending to udp %s:%d\n", TARGET_ADDR(ut), ut->port); - ut->fd = create_udp_send_socket(TARGET_ADDR(ut), ut->port, 10); + ret = create_udp_send_socket(TARGET_ADDR(ut), ut->port, + conf.udp_ttl_arg); + if (ret < 0) + return ret; + ut->fd = ret; + ret = mark_fd_nonblocking(ut->fd); + if (ret < 0) { + close(ut->fd); + return ret; + } + add_close_on_fork_list(ut->fd); + return 1; } static void udp_shutdown_targets(void) @@ -84,12 +101,14 @@ static void udp_shutdown_targets(void) struct udp_target *ut, *tmp; udp_write_packet_type(buf, UDP_EOF_PACKET); + udp_write_magic(buf); list_for_each_entry_safe(ut, tmp, &targets, node) { if (ut->fd < 0) continue; - PARA_INFO_LOG("sending eof to udp target %s:%d\n", - TARGET_ADDR(ut), ut->port); write(ut->fd, buf, UDP_AUDIO_HEADER_LEN); + close(ut->fd); + del_close_on_fork_list(ut->fd); + ut->fd = -1; } } @@ -113,7 +132,8 @@ static void udp_send(long unsigned current_chunk, __a_unused long unsigned chunk { struct udp_target *ut, *tmp; size_t sendbuf_len; - int packet_type = UDP_DATA_PACKET; + uint8_t packet_type = UDP_DATA_PACKET; + int ret; char *sendbuf; struct timeval *chunk_tv; uint8_t stream_type = header_len? UDP_HEADER_STREAM : UDP_PLAIN_STREAM; @@ -130,8 +150,9 @@ static void udp_send(long unsigned current_chunk, __a_unused long unsigned chunk if (list_empty(&targets)) return; list_for_each_entry_safe(ut, tmp, &targets, node) { - if (ut->fd < 0) - udp_init_session(ut); + ret = udp_init_session(ut); + if (ret < 0) + udp_delete_target(ut, para_strerror(-ret)); } if (!need_extra_header(current_chunk)) header_len = 0;