X-Git-Url: http://git.tuebingen.mpg.de/?p=paraslash.git;a=blobdiff_plain;f=ortp_recv.c;h=287d7064acd4f2ec9cb67672ad6ed4835181224e;hp=9c9013648e794b12d6c6f0750eedc079048ebaf7;hb=be0b6d883628d8ff9043107f1ddd780838facd3d;hpb=41f13bf6cac519b46ed24892e2c84384ef5dc500 diff --git a/ortp_recv.c b/ortp_recv.c index 9c901364..287d7064 100644 --- a/ortp_recv.c +++ b/ortp_recv.c @@ -32,8 +32,6 @@ #define CHUNK_SIZE 128 * 1024 -extern int msg_to_buf(mblk_t *, char *, int); - /** * data specific to the ortp receiver * @@ -67,13 +65,44 @@ uint32_t timestamp; uint32_t chunk_ts; }; + +static int msg_to_buf(mblk_t *mp, char *buffer, int len) +{ + int rlen = len; + mblk_t *m, *mprev; + int mlen; + + m = mp->b_cont; + mprev = mp; + while (m != NULL) { + mlen = (int) (m->b_wptr - m->b_rptr); + if (mlen <= rlen) { + mblk_t *consumed = m; + memcpy (buffer, m->b_rptr, mlen); + /* go to next mblk_t */ + mprev->b_cont = m->b_cont; + m = m->b_cont; + consumed->b_cont = NULL; + freeb (consumed); + buffer += mlen; + rlen -= mlen; + } else { /*if mlen>rlen */ + memcpy (buffer, m->b_rptr, rlen); + m->b_rptr += rlen; + return len; + } + } + return len - rlen; +} + + static void ortp_recv_pre_select(struct sched *s, struct task *t) { struct receiver_node *rn = t->private_data; struct private_ortp_recv_data *pord = rn->private_data; struct timeval tmp; - if (tv_diff(&s->now, &pord->next_chunk, &tmp) >= 0) { + if (tv_diff(now, &pord->next_chunk, &tmp) >= 0) { tmp.tv_sec = 0; tmp.tv_usec = 1000; } @@ -91,12 +120,12 @@ static void compute_next_chunk(unsigned chunk_time, tv_add(&chunk_tv, &pord->next_chunk, &tmp); pord->next_chunk = tmp; pord->timestamp += pord->chunk_ts; - PARA_DEBUG_LOG("next chunk (ts = %d) due at %lu:%lu\n", - pord->timestamp, pord->next_chunk.tv_sec, - pord->next_chunk.tv_usec); +// PARA_DEBUG_LOG("next chunk (ts = %d) due at %lu:%lu\n", +// pord->timestamp, pord->next_chunk.tv_sec, +// pord->next_chunk.tv_usec); } -static void ortp_recv_post_select(struct sched *s, struct task *t) +static void ortp_recv_post_select(__a_unused struct sched *s, struct task *t) { struct receiver_node *rn = t->private_data; struct private_ortp_recv_data *pord = rn->private_data; @@ -106,9 +135,14 @@ static void ortp_recv_post_select(struct sched *s, struct task *t) unsigned chunk_time; // PARA_INFO_LOG("rn: %p, pord: %p, session: %p\n", rn, pord, pord->session); + t->ret = -E_ORTP_RECV_EOF; + if (rn->output_eof && *rn->output_eof) { + rn->eof = 1; + return; + } t->ret = 1; if (pord->start.tv_sec) - if (tv_diff(&s->now, &pord->next_chunk, NULL) < 0) + if (tv_diff(now, &pord->next_chunk, NULL) < 0) return; mp = rtp_session_recvm_with_ts(pord->session, pord->timestamp); if (!mp) { @@ -120,12 +154,12 @@ static void ortp_recv_post_select(struct sched *s, struct task *t) if ((pord->c_bad > 5000 && pord->start.tv_sec) || pord->c_bad > 10000) return; t->ret = 1; - tv_add(&s->now, &min_delay, &pord->next_chunk); + tv_add(now, &min_delay, &pord->next_chunk); return; } /* okay, we have a chunk of data */ if (!pord->start.tv_sec) - pord->start = s->now; + pord->start = *now; t->ret = msg_to_buf(mp, tmpbuf, CHUNK_SIZE); if (t->ret < ORTP_AUDIO_HEADER_LEN) { rn->eof = 1; @@ -199,7 +233,7 @@ success: freemsg(mp); if (pord->c_bad) { pord->c_bad = 0; - pord->next_chunk = s->now; + pord->next_chunk = *now; } compute_next_chunk(chunk_time, pord); return; @@ -239,20 +273,21 @@ static void *ortp_recv_parse_config(int argc, char **argv) static int ortp_recv_open(struct receiver_node *rn) { struct private_ortp_recv_data *pord; - struct ortp_recv_args_info *conf = rn->conf; + struct ortp_recv_args_info *c = rn->conf; rn->buf = para_calloc(CHUNK_SIZE); rn->private_data = para_calloc(sizeof(struct private_ortp_recv_data)); pord = rn->private_data; pord->session = rtp_session_new(RTP_SESSION_RECVONLY); - PARA_NOTICE_LOG("receiving from %s:%d\n", conf->host_arg, conf->port_arg); - rtp_session_set_local_addr(pord->session, conf->host_arg, conf->port_arg); + PARA_NOTICE_LOG("receiving from %s:%d\n", c->host_arg, c->port_arg); + rtp_session_set_local_addr(pord->session, c->host_arg, c->port_arg); + rtp_session_set_remote_addr(pord->session, c->host_arg, c->port_arg); rtp_session_set_payload_type(pord->session, PAYLOAD_AUDIO_CONTINUOUS); - if (conf->jitter_compensation_arg) { + if (c->jitter_compensation_arg) { rtp_session_enable_adaptive_jitter_compensation(pord->session, TRUE); rtp_session_set_jitter_compensation(pord->session, - conf->jitter_compensation_arg); + c->jitter_compensation_arg); } return 1; }