summary |
shortlog |
log |
commit | commitdiff |
tree
raw |
patch |
inline | side by side (from parent 1:
2b47137)
Storing the chunk number has the disadvantage that the
queuing code must call into vss to get the chunk. This
is unneccessary and requires the map pointer of vss.c
to be global.
/** Describes one queued chunk in a chunk queue. */
struct queued_chunk {
/** Describes one queued chunk in a chunk queue. */
struct queued_chunk {
- /** The number of the queued chunk, -1U means header. */
- unsigned chunk_num;
- /** The number of bytes already sent. */
- unsigned sent;
+ /** Pointer to the data to be queued. */
+ const char *buf;
+ /** The number of bytes of this chunk. */
+ size_t num_bytes;
/** Position of the chunk in the chunk queue. */
struct list_head node;
};
/** Position of the chunk in the chunk queue. */
struct list_head node;
};
* Add a chunk to the given queue.
*
* \param cq the queue to add the chunk to.
* Add a chunk to the given queue.
*
* \param cq the queue to add the chunk to.
- * \param chunk_num The number of the chunk to be queued.
- * \param sent The number of bytes of this chunk that the sender was able to
- * send.
+ * \param buf Pointer to the data to be queued.
+ * \param num_bytes The size of \a buf.
- * \return Positive on success, negative on errors.
-int cq_enqueue(struct chunk_queue *cq, long unsigned chunk_num,
- size_t sent)
+int cq_enqueue(struct chunk_queue *cq, const char *buf, size_t num_bytes)
{
struct queued_chunk *qc;
{
struct queued_chunk *qc;
- char *buf;
- size_t len;
- int ret;
- if (chunk_num != -1U) {
- ret = vss_get_chunk(chunk_num, &buf, &len);
- if (ret < 0)
- return ret;
- } else
- vss_get_header(&buf, &len);
- if (cq->num_pending + len > cq->max_pending)
+ if (cq->num_pending + num_bytes > cq->max_pending)
return -E_QUEUE;
qc = para_malloc(sizeof(struct queued_chunk));
return -E_QUEUE;
qc = para_malloc(sizeof(struct queued_chunk));
- cq->num_pending += len;
- qc->chunk_num = chunk_num;
- qc->sent = sent;
+ cq->num_pending += num_bytes;
+ qc->buf = buf;
+ qc->num_bytes = num_bytes;
list_add_tail(&qc->node, &cq->q);
PARA_DEBUG_LOG("%lu bytes queued for %p\n", cq->num_pending, &cq->q);
return 1;
list_add_tail(&qc->node, &cq->q);
PARA_DEBUG_LOG("%lu bytes queued for %p\n", cq->num_pending, &cq->q);
return 1;
{
struct queued_chunk *qc = cq_peek(cq);
assert(qc);
{
struct queued_chunk *qc = cq_peek(cq);
assert(qc);
+ qc->num_bytes -= sent;
+ qc->buf += sent;
cq->num_pending -= sent;
}
cq->num_pending -= sent;
}
*
* \return Positive on success, negative on errors.
*/
*
* \return Positive on success, negative on errors.
*/
-int cq_get(struct queued_chunk *qc, char **buf, size_t *len)
+int cq_get(struct queued_chunk *qc, const char **buf, size_t *num_bytes)
- int ret;
-
- if (qc->chunk_num != -1U) {
- ret = vss_get_chunk(qc->chunk_num, buf, len);
- if (ret < 0)
- return ret;
- } else
- vss_get_header(buf, len);
- assert(*len > qc->sent);
- *buf += qc->sent;
- *len -= qc->sent;
+ *buf = qc->buf;
+ *num_bytes = qc->num_bytes;
struct chunk_queue;
struct queued_chunk;
struct chunk_queue;
struct queued_chunk;
-int cq_enqueue(struct chunk_queue *cq, long unsigned chunk_num, size_t sent);
+int cq_enqueue(struct chunk_queue *cq, const char *buf, size_t num_bytes);
struct queued_chunk *cq_peek(struct chunk_queue *cq);
void cq_dequeue(struct chunk_queue *cq);
void cq_update(struct chunk_queue *cq, size_t sent);
struct queued_chunk *cq_peek(struct chunk_queue *cq);
void cq_dequeue(struct chunk_queue *cq);
void cq_update(struct chunk_queue *cq, size_t sent);
-int cq_get(struct queued_chunk *qc, char **buf, size_t *len);
+int cq_get(struct queued_chunk *qc, const char **buf, size_t *len);
struct chunk_queue *cq_new(size_t max_pending);
void cq_destroy(struct chunk_queue *cq);
struct chunk_queue *cq_new(size_t max_pending);
void cq_destroy(struct chunk_queue *cq);
}
static int queue_chunk_or_shutdown(struct sender_client *sc,
}
static int queue_chunk_or_shutdown(struct sender_client *sc,
- struct sender_status *ss, long unsigned chunk_num,
- size_t sent)
+ struct sender_status *ss, const char *buf, size_t num_bytes)
- int ret = cq_enqueue(sc->cq, chunk_num, sent);
+ int ret = cq_enqueue(sc->cq, buf, num_bytes);
if (ret < 0)
shutdown_client(sc, ss);
return ret;
if (ret < 0)
shutdown_client(sc, ss);
return ret;
{
struct queued_chunk *qc;
while ((qc = cq_peek(sc->cq))) {
{
struct queued_chunk *qc;
while ((qc = cq_peek(sc->cq))) {
size_t len;
int ret;
cq_get(qc, &buf, &len);
size_t len;
int ret;
cq_get(qc, &buf, &len);
vss_get_header(&header_buf, &header_len);
if (header_buf && header_len > 0) {
vss_get_header(&header_buf, &header_len);
if (header_buf && header_len > 0) {
- ret = queue_chunk_or_shutdown(sc, ss, -1U, 0);
+ ret = queue_chunk_or_shutdown(sc, ss, header_buf, header_len);
if (!len)
goto out;
if (!ret) { /* still data left in the queue */
if (!len)
goto out;
if (!ret) { /* still data left in the queue */
- ret = queue_chunk_or_shutdown(sc, ss, current_chunk, 0);
+ ret = queue_chunk_or_shutdown(sc, ss, buf, len);
goto out;
}
ret = write_nonblock(sc->fd, buf, len, max_bytes_per_write);
goto out;
}
ret = write_nonblock(sc->fd, buf, len, max_bytes_per_write);
goto out;
}
if (ret != len)
goto out;
}
if (ret != len)
- ret = queue_chunk_or_shutdown(sc, ss, current_chunk, ret);
+ ret = queue_chunk_or_shutdown(sc, ss, buf + ret, len - ret);
out:
if (ret < 0)
PARA_NOTICE_LOG("%s\n", para_strerror(-ret));
out:
if (ret < 0)
PARA_NOTICE_LOG("%s\n", para_strerror(-ret));