#include <sys/time.h>
#include <dirent.h>
+#include <net/if.h>
#include "server.cmdline.h"
#include "para.h"
#include "list.h"
#include "send.h"
#include "portable_io.h"
-#include "udp_header.h"
#include "net.h"
#include "fd.h"
#include "sched.h"
int fd;
/** The list of queued chunks for this fd. */
struct chunk_queue *cq;
+ struct fec_client *fc;
+ struct fec_client_parms fcp;
};
static struct list_head targets;
PARA_NOTICE_LOG("deleting %s#%d (%s) from list\n", ut->host,
ut->port, msg);
udp_close_target(ut);
+ vss_del_fec_client(ut->fc);
list_del(&ut->node);
free(ut);
}
return 1;
}
-static void udp_send_buf(char *buf, size_t len)
-{
- struct udp_target *ut, *tmp;
- int ret;
-
- list_for_each_entry_safe(ut, tmp, &targets, node) {
- ret = udp_init_session(ut);
- if (ret < 0) {
- udp_delete_target(ut, para_strerror(-ret));
- continue;
- }
- ret = send_queued_chunks(ut->fd, ut->cq, 0);
- if (ret < 0) {
- udp_delete_target(ut, para_strerror(-ret));
- continue;
- }
- if (!len)
- continue;
- if (!ret) { /* still data left in the queue */
- ret = cq_enqueue(ut->cq, buf, len);
- if (ret < 0) {
- udp_delete_target(ut, para_strerror(-ret));
- continue;
- }
- }
- ret = write_nonblock(ut->fd, buf, len, 0);
- if (ret < 0) {
- udp_delete_target(ut, para_strerror(-ret));
- continue;
- }
- if (ret != len) {
- ret = cq_enqueue(ut->cq, buf + ret, len - ret);
- if (ret < 0) {
- udp_delete_target(ut, para_strerror(-ret));
- continue;
- }
- }
- }
-}
-
static void udp_shutdown_targets(void)
{
- char buf[UDP_AUDIO_HEADER_LEN];
struct udp_target *ut, *tmp;
- struct udp_audio_header uah = {
- .stream_type = UDP_UNKNOWN_STREAM,
- .packet_type = UDP_EOF_PACKET,
- };
+ const char *buf = NULL;
+ size_t len = 0; /* STFU, gcc */
- write_udp_audio_header(buf, &uah);
list_for_each_entry_safe(ut, tmp, &targets, node) {
if (ut->fd < 0)
continue;
- write(ut->fd, buf, UDP_AUDIO_HEADER_LEN);
+ if (!buf)
+ len = vss_get_fec_eof_packet(&buf);
+ write(ut->fd, buf, len);
udp_close_target(ut);
}
}
-static int need_extra_header(long unsigned current_chunk)
-{
- static struct timeval last_header;
- struct timeval diff;
-
- if (!current_chunk)
- return 0;
- tv_diff(now, &last_header, &diff);
- if (tv2ms(&diff) < conf.udp_header_interval_arg)
- return 0;
- last_header = *now;
- return 1;
-}
-
-static void udp_send(long unsigned current_chunk, __a_unused long unsigned chunks_sent,
- const char *buf, size_t len, const char *header_buf,
- size_t header_len)
-{
- char *sendbuf;
- size_t sendbuf_len;
- struct timeval *chunk_tv;
- struct udp_audio_header uah;
-
-// PARA_NOTICE_LOG("len: %zd, header_len: %zd\n", len, header_len);
- if (sender_status != SENDER_ON)
- return;
-
- /* we might not yet know the chunk time */
- chunk_tv = vss_chunk_time();
- if (!chunk_tv)
- return;
- if (list_empty(&targets))
- return;
- uah.stream_type = header_len? UDP_HEADER_STREAM : UDP_PLAIN_STREAM;
- uah.header_len = need_extra_header(current_chunk)? header_len : 0;
- if (!current_chunk)
- uah.packet_type = UDP_BOF_PACKET;
- else if (uah.header_len)
- uah.packet_type = UDP_HEADER_PACKET;
- else
- uah.packet_type = UDP_DATA_PACKET;
- uah.payload_len = uah.header_len + len;
- sendbuf_len = UDP_AUDIO_HEADER_LEN + uah.payload_len;
- sendbuf = para_malloc(sendbuf_len);
- write_udp_audio_header(sendbuf, &uah);
- if (uah.header_len)
- memcpy(sendbuf + UDP_AUDIO_HEADER_LEN, header_buf,
- uah.header_len);
- memcpy(sendbuf + UDP_AUDIO_HEADER_LEN + uah.header_len, buf, len);
- udp_send_buf(sendbuf, sendbuf_len);
- free(sendbuf);
-}
-
static int udp_com_on(__a_unused struct sender_command_data *scd)
{
sender_status = SENDER_ON;
return 1;
}
+static int udp_send_fec(char *buf, size_t len, void *private_data)
+{
+ struct udp_target *ut = private_data;
+ int ret = udp_init_session(ut);
+
+ if (ret < 0)
+ goto fail;
+ ret = send_queued_chunks(ut->fd, ut->cq, 0);
+ if (ret < 0)
+ goto fail;
+ if (!len)
+ return 0;
+ if (!ret) { /* still data left in the queue */
+ ret = cq_enqueue(ut->cq, buf, len);
+ if (ret < 0)
+ goto fail;
+ }
+ ret = write_nonblock(ut->fd, buf, len, 0);
+ if (ret < 0)
+ goto fail;
+ if (ret != len) {
+ ret = cq_enqueue(ut->cq, buf + ret, len - ret);
+ if (ret < 0)
+ goto fail;
+ }
+ return 1;
+fail:
+ udp_delete_target(ut, para_strerror(-ret));
+ return ret;
+}
+
static void udp_add_target(const char *host, int port)
{
struct udp_target *ut = para_calloc(sizeof(struct udp_target));
PARA_INFO_LOG("adding to target list (%s#%d)\n",
ut->host, ut->port);
para_list_add(&ut->node, &targets);
+ ut->fcp.slices_per_group = 16;
+ ut->fcp.data_slices_per_group = 14;
+ ut->fcp.max_slice_bytes = 1400;
+ ut->fcp.send = udp_send_fec;
+ ut->fcp.private_data = ut;
+ vss_add_fec_client(&ut->fcp, &ut->fc);
}
static int udp_com_add(struct sender_command_data *scd)
INIT_LIST_HEAD(&targets);
s->info = udp_info;
s->help = udp_help;
- s->send = udp_send;
+ s->send = NULL;
s->pre_select = NULL;
s->post_select = NULL;
s->shutdown_clients = udp_shutdown_targets;