]> git.tuebingen.mpg.de Git - paraslash.git/blobdiff - udp_send.c
sender: add a hook to resolve target names
[paraslash.git] / udp_send.c
index 08429aa42601194df0304775fe853ea98eddcfe7..adac6a8db1e8e66bb2e68441c870390f59756324 100644 (file)
 #include "close_on_fork.h"
 #include "chunk_queue.h"
 
+/**
+ * Time window during which ICMP Destination/Port Unreachable messages are
+ * ignored, covering transient receiver problems such as restarting the
+ * client, rebooting, reconfiguration, or handover.
+ */
+#define UDP_MAX_UNREACHABLE_TIME 30
+
 /** Describes one entry in the list of targets for the udp sender. */
 struct udp_target {
        /** The hostname (DNS name or IPv4/v6 address string). */
        char host[MAX_HOSTLEN];
        /** The UDP port. */
        int port;
+       /** Track time (seconds) of last ICMP Port Unreachable error */
+       time_t last_unreachable;
        /** Common sender client data */
        struct sender_client *sc;
        /** The opaque structure returned by vss_add_fec_client(). */
@@ -178,6 +187,28 @@ static void udp_shutdown_targets(void)
                }
 }
 
+static int udp_resolve_target(const char *url, struct sender_command_data *scd)
+{
+       const char *result;
+       int ret, port;
+
+       ret = parse_fec_url(url, scd);
+       if (ret)
+               return ret;
+       port = scd->port > 0 ? scd->port : conf.udp_default_port_arg;
+
+       ret = para_connect_simple(IPPROTO_UDP, scd->host, port);
+       if (ret < 0)
+               return ret;
+
+       result = remote_name(ret);
+       close(ret);
+
+       if (!parse_url(result, scd->host, sizeof(scd->host), &scd->port))
+               return -E_ADDRESS_LOOKUP;
+       return 1;
+}
+
 static int udp_com_on(__a_unused struct sender_command_data *scd)
 {
        sender_status = SENDER_ON;
@@ -219,6 +250,38 @@ static int udp_init_fec(struct sender_client *sc)
        return mps;
 }
 
+/** Check and clear socket error if any. */
+static int udp_check_socket_state(struct udp_target *ut)
+{
+       int ret;
+       socklen_t errlen = sizeof(ret);
+
+       if (getsockopt(ut->sc->fd, SOL_SOCKET, SO_ERROR, &ret, &errlen) < 0) {
+               PARA_ERROR_LOG("SO_ERROR failed: %s\n", strerror(ret));
+               return 0;
+       } else if (ret == 0) {
+               return 0;
+       } else if (ret == ECONNREFUSED) {
+               time_t dist = now->tv_sec - ut->last_unreachable;
+
+               if (dist <= UDP_MAX_UNREACHABLE_TIME) {
+                       return 0;
+               } else if (dist > 2 * UDP_MAX_UNREACHABLE_TIME) {
+                       ut->last_unreachable = now->tv_sec;
+                       return 0;
+               } else {
+                       /*
+                        * unreachable_time < dist <= 2 * unreachable_time
+                        * No errors are allowed during this time window.
+                        */
+                       PARA_NOTICE_LOG("Evicting %s#%d after %d seconds "
+                                       "of connection errors.\n",
+                                       ut->host, ut->port, (int)dist);
+               }
+       }
+       return -ERRNO_TO_PARA_ERROR(ret);
+}
+
 static int udp_send_fec(struct sender_client *sc, char *buf, size_t len)
 {
        struct udp_target *ut = sc->private_data;
@@ -226,21 +289,27 @@ static int udp_send_fec(struct sender_client *sc, char *buf, size_t len)
 
        if (sender_status == SENDER_OFF)
                return 0;
+       if (len == 0 && !cq_peek(ut->sc->cq))
+               return 0;
+       ret = udp_check_socket_state(ut);
+       if (ret < 0)
+               goto fail;
        ret = send_queued_chunks(sc->fd, sc->cq);
-       if (ret == -ERRNO_TO_PARA_ERROR(ECONNREFUSED))
-               ret = 0;
        if (ret < 0)
                goto fail;
-       if (!len)
-               return 0;
        if (!ret) { /* still data left in the queue */
                ret = cq_force_enqueue(sc->cq, buf, len);
                assert(ret >= 0);
                return 0;
        }
        ret = write_nonblock(sc->fd, buf, len);
-       if (ret == -ERRNO_TO_PARA_ERROR(ECONNREFUSED))
+       if (ret == -ERRNO_TO_PARA_ERROR(ECONNREFUSED)) {
+               /*
+                * Happens if meanwhile an ICMP Destination / Port Unreachable
+                * has arrived. Ignore, persistent errors will be caught above.
+                */
                ret = 0;
+       }
        if (ret < 0)
                goto fail;
        if (ret != len) {
@@ -380,6 +449,7 @@ void udp_send_init(struct sender *s)
        s->pre_select = NULL;
        s->post_select = NULL;
        s->shutdown_clients = udp_shutdown_targets;
+       s->resolve_target = udp_resolve_target;
        s->client_cmds[SENDER_ON] = udp_com_on;
        s->client_cmds[SENDER_OFF] = udp_com_off;
        s->client_cmds[SENDER_DENY] = NULL;