udp_send: add a time window for errors
[paraslash.git] / udp_send.c
index 08429aa..24ebf58 100644 (file)
 #include "close_on_fork.h"
 #include "chunk_queue.h"
 
+/**
+ * Time window during which ICMP Destination/Port Unreachable messages are
+ * ignored, covering transient receiver problems such as restarting the
+ * client, rebooting, reconfiguration, or handover.
+ */
+#define UDP_MAX_UNREACHABLE_TIME 30
+
 /** Describes one entry in the list of targets for the udp sender. */
 struct udp_target {
        /** The hostname (DNS name or IPv4/v6 address string). */
        char host[MAX_HOSTLEN];
        /** The UDP port. */
        int port;
+       /** Track time (seconds) of last ICMP Port Unreachable error */
+       time_t last_unreachable;
        /** Common sender client data */
        struct sender_client *sc;
        /** The opaque structure returned by vss_add_fec_client(). */
@@ -219,6 +228,38 @@ static int udp_init_fec(struct sender_client *sc)
        return mps;
 }
 
+/** Check and clear socket error if any. */
+static int udp_check_socket_state(struct udp_target *ut)
+{
+       int ret;
+       socklen_t errlen = sizeof(ret);
+
+       if (getsockopt(ut->sc->fd, SOL_SOCKET, SO_ERROR, &ret, &errlen) < 0) {
+               PARA_ERROR_LOG("SO_ERROR failed: %s\n", strerror(ret));
+               return 0;
+       } else if (ret == 0) {
+               return 0;
+       } else if (ret == ECONNREFUSED) {
+               time_t dist = now->tv_sec - ut->last_unreachable;
+
+               if (dist <= UDP_MAX_UNREACHABLE_TIME) {
+                       return 0;
+               } else if (dist > 2 * UDP_MAX_UNREACHABLE_TIME) {
+                       ut->last_unreachable = now->tv_sec;
+                       return 0;
+               } else {
+                       /*
+                        * unreachable_time < dist <= 2 * unreachable_time
+                        * No errors are allowed during this time window.
+                        */
+                       PARA_NOTICE_LOG("Evicting %s#%d after %d seconds "
+                                       "of connection errors.\n",
+                                       ut->host, ut->port, (int)dist);
+               }
+       }
+       return -ERRNO_TO_PARA_ERROR(ret);
+}
+
 static int udp_send_fec(struct sender_client *sc, char *buf, size_t len)
 {
        struct udp_target *ut = sc->private_data;
@@ -226,21 +267,27 @@ static int udp_send_fec(struct sender_client *sc, char *buf, size_t len)
 
        if (sender_status == SENDER_OFF)
                return 0;
+       if (len == 0 && !cq_peek(ut->sc->cq))
+               return 0;
+       ret = udp_check_socket_state(ut);
+       if (ret < 0)
+               goto fail;
        ret = send_queued_chunks(sc->fd, sc->cq);
-       if (ret == -ERRNO_TO_PARA_ERROR(ECONNREFUSED))
-               ret = 0;
        if (ret < 0)
                goto fail;
-       if (!len)
-               return 0;
        if (!ret) { /* still data left in the queue */
                ret = cq_force_enqueue(sc->cq, buf, len);
                assert(ret >= 0);
                return 0;
        }
        ret = write_nonblock(sc->fd, buf, len);
-       if (ret == -ERRNO_TO_PARA_ERROR(ECONNREFUSED))
+       if (ret == -ERRNO_TO_PARA_ERROR(ECONNREFUSED)) {
+               /*
+                * Happens if meanwhile an ICMP Destination / Port Unreachable
+                * has arrived. Ignore, persistent errors will be caught above.
+                */
                ret = 0;
+       }
        if (ret < 0)
                goto fail;
        if (ret != len) {