From: Andre Noll Date: Wed, 8 Dec 2010 12:45:42 +0000 (+0100) Subject: Merge branch 't/kill_udp_cq' X-Git-Tag: v0.4.5~2 X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=commitdiff_plain;h=b5b5d00eea33eccd3d2d75057c5254bf85568b00;hp=924c3764da9cecc2bd72fa2cd0b7f97b5a24650f Merge branch 't/kill_udp_cq' --- diff --git a/chunk_queue.c b/chunk_queue.c index 468b1dc2..3928edd7 100644 --- a/chunk_queue.c +++ b/chunk_queue.c @@ -93,34 +93,6 @@ void cq_dequeue(struct chunk_queue *cq) free(qc); } -/** - * Force to add a chunk to the given queue. - * - * \param cq See \ref cq_enqueue. - * \param buf See \ref cq_enqueue. - * \param num_bytes See \ref cq_enqueue. - * - * If queuing the given buffer would result in exceeding the maximal queue - * size, buffers are dropped from the beginning of the queue. Note that this - * function still might fail. - * - * \return Standard. - */ -int cq_force_enqueue(struct chunk_queue *cq, const char *buf, size_t num_bytes) -{ - int ret; - - if (num_bytes > cq->max_pending) - return -E_QUEUE; - for (;;) { - ret = cq_enqueue(cq, buf, num_bytes); - if (ret >= 0) - return ret; - cq_dequeue(cq); - } - /* never reached */ -} - /** * Change the number of bytes sent for the current queued chunk. * diff --git a/chunk_queue.h b/chunk_queue.h index 9e794ba8..3c138eb5 100644 --- a/chunk_queue.h +++ b/chunk_queue.h @@ -16,4 +16,3 @@ void cq_update(struct chunk_queue *cq, size_t sent); int cq_get(struct queued_chunk *qc, const char **buf, size_t *len); struct chunk_queue *cq_new(size_t max_pending); void cq_destroy(struct chunk_queue *cq); -int cq_force_enqueue(struct chunk_queue *cq, const char *buf, size_t num_bytes); diff --git a/udp_send.c b/udp_send.c index 0343feb6..f39b4f14 100644 --- a/udp_send.c +++ b/udp_send.c @@ -30,7 +30,6 @@ #include "fd.h" #include "sched.h" #include "close_on_fork.h" -#include "chunk_queue.h" /** * Time window during which ICMP Destination/Port Unreachable messages are @@ -43,8 +42,6 @@ struct udp_target { /** Track time (seconds) of last ICMP Port Unreachable error */ time_t last_unreachable; - /** Common sender client data */ - struct sender_client *sc; /** The opaque structure returned by vss_add_fec_client(). */ struct fec_client *fc; /** The FEC parameters for this target. */ @@ -56,11 +53,11 @@ static int sender_status; static void udp_close_target(struct sender_client *sc) { - if (sc->cq != NULL) { - del_close_on_fork_list(sc->fd); - cq_destroy(sc->cq); - sc->cq = NULL; - } + const char *buf; + size_t len = vss_get_fec_eof_packet(&buf); + + /* ignore return value, closing the target anyway. */ + (void)write(sc->fd, buf, len); } static void udp_delete_target(struct sender_client *sc, const char *msg) @@ -69,6 +66,8 @@ static void udp_delete_target(struct sender_client *sc, const char *msg) PARA_NOTICE_LOG("deleting %s (%s) from list\n", sc->name, msg); udp_close_target(sc); + close(sc->fd); + del_close_on_fork_list(sc->fd); vss_del_fec_client(ut->fc); list_del(&sc->node); free(sc->name); @@ -152,30 +151,16 @@ err: return -ERRNO_TO_PARA_ERROR(errno); } -/** The maximal size of the per-target chunk queue. */ -#define UDP_CQ_BYTES 40000 - static void udp_init_session(struct sender_client *sc) { - if (sc->cq == NULL) { - sc->cq = cq_new(UDP_CQ_BYTES); - add_close_on_fork_list(sc->fd); - PARA_NOTICE_LOG("sending to udp %s\n", sc->name); - } + PARA_NOTICE_LOG("sending to udp %s\n", sc->name); } static void udp_shutdown_targets(void) { struct sender_client *sc, *tmp; - const char *buf; - size_t len = vss_get_fec_eof_packet(&buf); - list_for_each_entry_safe(sc, tmp, &targets, node) - if (sc->cq != NULL) { - /* ignore return value, closing the target anyway. */ - (void)write(sc->fd, buf, len); - udp_close_target(sc); - } + udp_close_target(sc); } static int udp_resolve_target(const char *url, struct sender_command_data *scd) @@ -294,19 +279,11 @@ static int udp_send_fec(struct sender_client *sc, char *buf, size_t len) if (sender_status == SENDER_OFF) return 0; - if (len == 0 && !cq_peek(sc->cq)) + if (len == 0) return 0; ret = udp_check_socket_state(sc); if (ret < 0) goto fail; - ret = send_queued_chunks(sc->fd, sc->cq); - if (ret < 0) - goto fail; - if (!ret) { /* still data left in the queue */ - ret = cq_force_enqueue(sc->cq, buf, len); - assert(ret >= 0); - return 0; - } ret = write_nonblock(sc->fd, buf, len); if (ret == -ERRNO_TO_PARA_ERROR(ECONNREFUSED)) { /* @@ -317,10 +294,6 @@ static int udp_send_fec(struct sender_client *sc, char *buf, size_t len) } if (ret < 0) goto fail; - if (ret != len) { - ret = cq_force_enqueue(sc->cq, buf + ret, len - ret); - assert(ret >= 0); - } return 1; fail: udp_delete_target(sc, para_strerror(-ret)); @@ -339,7 +312,7 @@ static int udp_com_add(struct sender_command_data *scd) return -E_TARGET_EXISTS; } ut = para_calloc(sizeof(*ut)); - sc = ut->sc = para_calloc(sizeof(*sc)); + sc = para_calloc(sizeof(*sc)); ut->fcp.slices_per_group = scd->slices_per_group; ut->fcp.data_slices_per_group = scd->data_slices_per_group; ut->fcp.init_fec = udp_init_fec; @@ -362,6 +335,7 @@ static int udp_com_add(struct sender_command_data *scd) PARA_INFO_LOG("adding to target list (%s)\n", sc->name); ut->fc = vss_add_fec_client(sc, &ut->fcp); para_list_add(&sc->node, &targets); + add_close_on_fork_list(sc->fd); return 1; err: if (sc->fd >= 0)