#include "close_on_fork.h"
#include "chunk_queue.h"
+/**
+ * Time window during which ICMP Destination/Port Unreachable messages are
+ * ignored, covering transient receiver problems such as restarting the
+ * client, rebooting, reconfiguration, or handover.
+ */
+#define UDP_MAX_UNREACHABLE_TIME 30
+
/** Describes one entry in the list of targets for the udp sender. */
struct udp_target {
/** The hostname (DNS name or IPv4/v6 address string). */
char host[MAX_HOSTLEN];
/** The UDP port. */
int port;
+ /** Track time (seconds) of last ICMP Port Unreachable error */
+ time_t last_unreachable;
/** Common sender client data */
struct sender_client *sc;
/** The opaque structure returned by vss_add_fec_client(). */
}
}
+static int udp_resolve_target(const char *url, struct sender_command_data *scd)
+{
+ const char *result;
+ int ret, port;
+
+ ret = parse_fec_url(url, scd);
+ if (ret)
+ return ret;
+ port = scd->port > 0 ? scd->port : conf.udp_default_port_arg;
+
+ ret = para_connect_simple(IPPROTO_UDP, scd->host, port);
+ if (ret < 0)
+ return ret;
+
+ result = remote_name(ret);
+ close(ret);
+
+ if (!parse_url(result, scd->host, sizeof(scd->host), &scd->port))
+ return -E_ADDRESS_LOOKUP;
+ return 1;
+}
+
static int udp_com_on(__a_unused struct sender_command_data *scd)
{
sender_status = SENDER_ON;
return mps;
}
+/** Check and clear socket error if any. */
+static int udp_check_socket_state(struct udp_target *ut)
+{
+ int ret;
+ socklen_t errlen = sizeof(ret);
+
+ if (getsockopt(ut->sc->fd, SOL_SOCKET, SO_ERROR, &ret, &errlen) < 0) {
+ PARA_ERROR_LOG("SO_ERROR failed: %s\n", strerror(ret));
+ return 0;
+ } else if (ret == 0) {
+ return 0;
+ } else if (ret == ECONNREFUSED) {
+ time_t dist = now->tv_sec - ut->last_unreachable;
+
+ if (dist <= UDP_MAX_UNREACHABLE_TIME) {
+ return 0;
+ } else if (dist > 2 * UDP_MAX_UNREACHABLE_TIME) {
+ ut->last_unreachable = now->tv_sec;
+ return 0;
+ } else {
+ /*
+ * unreachable_time < dist <= 2 * unreachable_time
+ * No errors are allowed during this time window.
+ */
+ PARA_NOTICE_LOG("Evicting %s#%d after %d seconds "
+ "of connection errors.\n",
+ ut->host, ut->port, (int)dist);
+ }
+ }
+ return -ERRNO_TO_PARA_ERROR(ret);
+}
+
static int udp_send_fec(struct sender_client *sc, char *buf, size_t len)
{
struct udp_target *ut = sc->private_data;
if (sender_status == SENDER_OFF)
return 0;
+ if (len == 0 && !cq_peek(ut->sc->cq))
+ return 0;
+ ret = udp_check_socket_state(ut);
+ if (ret < 0)
+ goto fail;
ret = send_queued_chunks(sc->fd, sc->cq);
- if (ret == -ERRNO_TO_PARA_ERROR(ECONNREFUSED))
- ret = 0;
if (ret < 0)
goto fail;
- if (!len)
- return 0;
if (!ret) { /* still data left in the queue */
ret = cq_force_enqueue(sc->cq, buf, len);
assert(ret >= 0);
return 0;
}
ret = write_nonblock(sc->fd, buf, len);
- if (ret == -ERRNO_TO_PARA_ERROR(ECONNREFUSED))
+ if (ret == -ERRNO_TO_PARA_ERROR(ECONNREFUSED)) {
+ /*
+ * Happens if meanwhile an ICMP Destination / Port Unreachable
+ * has arrived. Ignore, persistent errors will be caught above.
+ */
ret = 0;
+ }
if (ret < 0)
goto fail;
if (ret != len) {
static void udp_add_target(struct sender_command_data *scd)
{
- int ret, port = scd->port > 0 ? scd->port : conf.udp_default_port_arg;
+ int ret;
struct udp_target *ut = para_calloc(sizeof(*ut));
- strncpy(ut->host, scd->host, sizeof(ut->host));
- ut->port = scd->port > 0 ? scd->port : conf.udp_default_port_arg;
-
ut->fcp.slices_per_group = scd->slices_per_group;
ut->fcp.data_slices_per_group = scd->data_slices_per_group;
ut->fcp.max_slice_bytes = scd->max_slice_bytes;
ut->sc = para_calloc(sizeof(*ut->sc));
ut->sc->private_data = ut;
ut->sc->fd = -1;
- ret = para_connect_simple(IPPROTO_UDP, scd->host, port);
+ ret = para_connect_simple(IPPROTO_UDP, scd->host, scd->port);
if (ret < 0)
goto err;
ut->sc->fd = ret;
err:
if (ut->sc->fd >= 0)
close(ut->sc->fd);
- PARA_NOTICE_LOG("failed to set up %s#%d (%s)- not adding it\n",
- scd->host, port, para_strerror(-ret));
+ PARA_NOTICE_LOG("failed to set up %s:%d (%s)- not adding it\n",
+ scd->host, scd->port, para_strerror(-ret));
free(ut->sc);
free(ut);
}
INIT_LIST_HEAD(&targets);
for (i = 0; i < conf.udp_target_given; i++) {
- if (parse_fec_url(conf.udp_target_arg[i], &scd) < 0) {
- PARA_CRIT_LOG("syntax error for udp target option "
- "#%d, ignoring\n", i);
- continue;
- }
- udp_add_target(&scd);
+ if (udp_resolve_target(conf.udp_target_arg[i], &scd) < 0)
+ PARA_CRIT_LOG("not adding requested target '%s'\n",
+ conf.udp_target_arg[i]);
+ else
+ udp_add_target(&scd);
}
}
s->pre_select = NULL;
s->post_select = NULL;
s->shutdown_clients = udp_shutdown_targets;
+ s->resolve_target = udp_resolve_target;
s->client_cmds[SENDER_ON] = udp_com_on;
s->client_cmds[SENDER_OFF] = udp_com_off;
s->client_cmds[SENDER_DENY] = NULL;