/*
- * Copyright (C) 2005-2009 Andre Noll <maan@systemlinux.org>
+ * Copyright (C) 2005-2010 Andre Noll <maan@systemlinux.org>
*
* Licensed under the GPL v2. For licencing details see COPYING.
*/
#include "afh.h"
#include "afs.h"
#include "server.h"
-#include "vss.h"
#include "list.h"
#include "send.h"
+#include "vss.h"
#include "portable_io.h"
#include "net.h"
#include "fd.h"
static struct list_head targets;
static int sender_status;
+static struct sender *self;
static void udp_close_target(struct udp_target *ut)
{
/** The maximal size of the per-target chunk queue. */
#define UDP_CQ_BYTES 40000
-static int udp_init_session(struct udp_target *ut)
+static int udp_open(void *client, struct fec_client_parms **fcp)
{
+ struct udp_target *ut = client;
int ret;
char *iface = NULL;
if (ut->fd >= 0) /* nothing to do */
return 0;
- ret = makesock(AF_UNSPEC, IPPROTO_UDP, 0, ut->host, ut->port);
+ ret = para_connect_simple(IPPROTO_UDP, ut->host, ut->port);
if (ret < 0)
return ret;
ut->fd = ret;
}
add_close_on_fork_list(ut->fd);
ut->cq = cq_new(UDP_CQ_BYTES);
- PARA_NOTICE_LOG("sending to udp %s#%d using fec parms %d:%d:%d\n",
- ut->host, ut->port , ut->fcp.max_slice_bytes,
- ut->fcp.data_slices_per_group, ut->fcp.slices_per_group);
+ ut->fcp.max_slice_bytes = 1472; /* FIXME */
+ *fcp = &ut->fcp;
+ PARA_NOTICE_LOG("sending to udp %s#%d\n", ut->host, ut->port);
return 1;
}
size_t len = 0; /* STFU, gcc */
list_for_each_entry_safe(ut, tmp, &targets, node) {
+ int ubuntu_glibc_headers_suck;
if (ut->fd < 0)
continue;
if (!buf)
len = vss_get_fec_eof_packet(&buf);
- write(ut->fd, buf, len);
+ /* ignore return value, we're closing the target anyway. */
+ ubuntu_glibc_headers_suck = write(ut->fd, buf, len); /* STFU */
udp_close_target(ut);
}
}
static int udp_send_fec(char *buf, size_t len, void *private_data)
{
struct udp_target *ut = private_data;
- int ret = udp_init_session(ut);
+ int ret;
- if (ret < 0)
- goto fail;
+ if (sender_status == SENDER_OFF)
+ return 0;
ret = send_queued_chunks(ut->fd, ut->cq, 0);
+ if (ret == -ERRNO_TO_PARA_ERROR(ECONNREFUSED))
+ ret = 0;
if (ret < 0)
goto fail;
if (!len)
return 0;
if (!ret) { /* still data left in the queue */
- ret = cq_enqueue(ut->cq, buf, len);
- if (ret < 0)
- goto fail;
+ ret = cq_force_enqueue(ut->cq, buf, len);
+ assert(ret >= 0);
}
ret = write_nonblock(ut->fd, buf, len, 0);
+ if (ret == -ERRNO_TO_PARA_ERROR(ECONNREFUSED))
+ ret = 0;
if (ret < 0)
goto fail;
if (ret != len) {
- ret = cq_enqueue(ut->cq, buf + ret, len - ret);
- if (ret < 0)
- goto fail;
+ ret = cq_force_enqueue(ut->cq, buf + ret, len - ret);
+ assert(ret >= 0);
}
return 1;
fail:
para_list_add(&ut->node, &targets);
ut->fcp.slices_per_group = scd->slices_per_group;
ut->fcp.data_slices_per_group = scd->data_slices_per_group;
- ut->fcp.max_slice_bytes = scd->max_slice_bytes;
- ut->fcp.send = udp_send_fec;
- ut->fcp.private_data = ut;
- vss_add_fec_client(&ut->fcp, &ut->fc);
+ vss_add_fec_client(self, ut, &ut->fc);
}
static int udp_com_add(struct sender_command_data *scd)
ret = make_message(
"udp sender:\n"
"\tstatus: %s\n"
- "\tport: udp %d\n"
+ "\tport: %s\n"
"\ttargets: %s\n",
(sender_status == SENDER_ON)? "on" : "off",
- conf.udp_default_port_arg,
+ stringify_port(conf.udp_default_port_arg, "udp"),
tgts? tgts : "(none)"
);
free(tgts);
s->info = udp_info;
s->help = udp_help;
s->send = NULL;
+ s->send_fec = udp_send_fec;
+ s->open = udp_open;
s->pre_select = NULL;
s->post_select = NULL;
s->shutdown_clients = udp_shutdown_targets;
udp_init_target_list();
if (!conf.udp_no_autostart_given)
sender_status = SENDER_ON;
+ self = s;
PARA_DEBUG_LOG("udp sender init complete\n");
}