]> git.tuebingen.mpg.de Git - paraslash.git/blobdiff - udp_recv.c
para_recv: handle EAGAIN on non-blocking socket
[paraslash.git] / udp_recv.c
index c768d3243ead5fa5e8be4c68ba276cc2193a7a49..316957d51050fc6e2751bfd6a48f9d1034a9e285 100644 (file)
@@ -32,6 +32,7 @@ struct private_udp_recv_data {
        /** The socket file descriptor. */
        int fd;
        struct btr_pool *btrp;
+       struct timeval last_read_time;
 };
 
 static void udp_recv_pre_select(struct sched *s, struct task *t)
@@ -77,13 +78,23 @@ static void udp_recv_post_select(__a_unused struct sched *s, struct task *t)
                goto err;
        if (ret == 0)
                return;
-       if (!FD_ISSET(purd->fd, &s->rfds))
+       if (!FD_ISSET(purd->fd, &s->rfds)) {
+               struct timeval tmp;
+               tv_add(&purd->last_read_time, &(struct timeval)EMBRACE(5, 0),
+                       &tmp);
+               ret = -E_UDP_TIMEOUT;
+               if (tv_diff(now, &tmp, NULL) > 0)
+                       goto err;
                return;
+       }
        iovcnt = btr_pool_get_buffers(purd->btrp, iov);
        ret = -E_UDP_OVERRUN;
        if (iovcnt == 0)
                goto err;
        ret = para_readv(purd->fd, iov, iovcnt);
+       /* EAGAIN is possible even if FD_ISSET */
+       if (ret < 0 && is_errno(-ret, EAGAIN))
+               return;
        if (ret == 0)
                ret = -E_RECV_EOF;
        if (ret < 0)
@@ -92,6 +103,7 @@ static void udp_recv_post_select(__a_unused struct sched *s, struct task *t)
        ret = udp_check_eof(packet_size, iov);
        if (ret < 0)
                goto err;
+       purd->last_read_time = *now;
        if (iov[0].iov_len >= packet_size)
                btr_add_output_pool(purd->btrp, packet_size, btrn);
        else { /* both buffers contain data */
@@ -209,7 +221,7 @@ static int udp_recv_open(struct receiver_node *rn)
        ret = mcast_receiver_setup(purd->fd, iface);
        if (ret < 0) {
                close(purd->fd);
-               return ret;
+               goto err;
        }
 
        ret = mark_fd_nonblocking(purd->fd);
@@ -220,6 +232,7 @@ static int udp_recv_open(struct receiver_node *rn)
        PARA_INFO_LOG("receiving from %s:%d, fd=%d\n", c->host_arg,
                c->port_arg, purd->fd);
        purd->btrp = btr_pool_new("udp_recv", 320 * 1024);
+       purd->last_read_time = *now;
        return purd->fd;
 err:
        free(rn->private_data);