X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=udp_send.c;h=24ebf5848ed3c2de5d671de98dab142ce9018581;hp=08429aa42601194df0304775fe853ea98eddcfe7;hb=4ebaf62d6bbd0498d93e19bc4f0a3c9564b1a8f5;hpb=0c395e45939ec272214ea14f3c07fc925a5697f5 diff --git a/udp_send.c b/udp_send.c index 08429aa4..24ebf584 100644 --- a/udp_send.c +++ b/udp_send.c @@ -32,12 +32,21 @@ #include "close_on_fork.h" #include "chunk_queue.h" +/** + * Time window during which ICMP Destination/Port Unreachable messages are + * ignored, covering transient receiver problems such as restarting the + * client, rebooting, reconfiguration, or handover. + */ +#define UDP_MAX_UNREACHABLE_TIME 30 + /** Describes one entry in the list of targets for the udp sender. */ struct udp_target { /** The hostname (DNS name or IPv4/v6 address string). */ char host[MAX_HOSTLEN]; /** The UDP port. */ int port; + /** Track time (seconds) of last ICMP Port Unreachable error */ + time_t last_unreachable; /** Common sender client data */ struct sender_client *sc; /** The opaque structure returned by vss_add_fec_client(). */ @@ -219,6 +228,38 @@ static int udp_init_fec(struct sender_client *sc) return mps; } +/** Check and clear socket error if any. */ +static int udp_check_socket_state(struct udp_target *ut) +{ + int ret; + socklen_t errlen = sizeof(ret); + + if (getsockopt(ut->sc->fd, SOL_SOCKET, SO_ERROR, &ret, &errlen) < 0) { + PARA_ERROR_LOG("SO_ERROR failed: %s\n", strerror(ret)); + return 0; + } else if (ret == 0) { + return 0; + } else if (ret == ECONNREFUSED) { + time_t dist = now->tv_sec - ut->last_unreachable; + + if (dist <= UDP_MAX_UNREACHABLE_TIME) { + return 0; + } else if (dist > 2 * UDP_MAX_UNREACHABLE_TIME) { + ut->last_unreachable = now->tv_sec; + return 0; + } else { + /* + * unreachable_time < dist <= 2 * unreachable_time + * No errors are allowed during this time window. + */ + PARA_NOTICE_LOG("Evicting %s#%d after %d seconds " + "of connection errors.\n", + ut->host, ut->port, (int)dist); + } + } + return -ERRNO_TO_PARA_ERROR(ret); +} + static int udp_send_fec(struct sender_client *sc, char *buf, size_t len) { struct udp_target *ut = sc->private_data; @@ -226,21 +267,27 @@ static int udp_send_fec(struct sender_client *sc, char *buf, size_t len) if (sender_status == SENDER_OFF) return 0; + if (len == 0 && !cq_peek(ut->sc->cq)) + return 0; + ret = udp_check_socket_state(ut); + if (ret < 0) + goto fail; ret = send_queued_chunks(sc->fd, sc->cq); - if (ret == -ERRNO_TO_PARA_ERROR(ECONNREFUSED)) - ret = 0; if (ret < 0) goto fail; - if (!len) - return 0; if (!ret) { /* still data left in the queue */ ret = cq_force_enqueue(sc->cq, buf, len); assert(ret >= 0); return 0; } ret = write_nonblock(sc->fd, buf, len); - if (ret == -ERRNO_TO_PARA_ERROR(ECONNREFUSED)) + if (ret == -ERRNO_TO_PARA_ERROR(ECONNREFUSED)) { + /* + * Happens if meanwhile an ICMP Destination / Port Unreachable + * has arrived. Ignore, persistent errors will be caught above. + */ ret = 0; + } if (ret < 0) goto fail; if (ret != len) {