Make para_recv use the new scheduler.
authorAndre <maan@p133.(none)>
Wed, 24 May 2006 01:29:27 +0000 (03:29 +0200)
committerAndre <maan@p133.(none)>
Wed, 24 May 2006 01:29:27 +0000 (03:29 +0200)
This was pretty straight-forward, but it broke audiod badly. The plan
is first to convert para_filter, then para_audiod. It this works out well,
para_server will be converted as well.

12 files changed:
audiod.c
configure.ac
dccp_recv.c
error.h
http_recv.c
ortp_recv.c
recv.c
recv.h
recv_common.c
stdout.c [new file with mode: 0644]
stdout.h [new file with mode: 0644]
write.c

index 08b29e0..1d2d69c 100644 (file)
--- a/audiod.c
+++ b/audiod.c
@@ -24,6 +24,7 @@
 #include "audiod.cmdline.h"
 #include "list.h"
 #include "close_on_fork.h"
+#include "sched.h"
 #include "recv.h"
 #include "filter.h"
 #include "grab_client.cmdline.h"
index 74f5ae8..1702893 100644 (file)
@@ -57,7 +57,7 @@ AC_CHECK_LIB([menu], [new_menu], [extras="$extras para_dbadm"],
 
 recv_cmdline_objs="recv.cmdline http_recv.cmdline dccp_recv.cmdline"
 recv_errlist_objs="http_recv recv_common recv time string net dccp_recv
-       dccp fd"
+       dccp fd sched stdout"
 recv_ldflags=""
 
 filter_cmdline_objs="filter.cmdline compress_filter.cmdline"
index f7db6fc..deca716 100644 (file)
 #include "para.h"
 #include "error.h"
 #include "dccp.h"
+#include "list.h"
+#include "sched.h"
 #include "recv.h"
 #include "string.h"
 #include "net.h"
+#include "fd.h"
 
 #include "dccp_recv.cmdline.h"
 
@@ -116,35 +119,35 @@ static void *dccp_recv_parse_config(int argc, char **argv)
        return NULL;
 }
 
-static int dccp_recv_pre_select(struct receiver_node *rn, fd_set *rfds,
-               __a_unused fd_set *wfds, __a_unused struct timeval *timeout)
+static void dccp_recv_pre_select(struct sched *s, struct task *t)
 {
-       struct private_dccp_recv_data *pdd = rn->private_data;
+       struct private_dccp_recv_data *pdd = t->private_data;
 
        if (!pdd)
-               return -1;
-       FD_SET(pdd->fd, rfds);
-       return pdd->fd;
+               return ;
+       para_fd_set(pdd->fd, &s->rfds, &s->max_fileno);
 }
 
-static int dccp_recv_post_select(struct receiver_node *rn, int select_ret,
-               fd_set *rfds, __a_unused fd_set *wfds)
+static void dccp_recv_post_select(struct sched *s, struct task *t)
 {
-       int ret;
+       struct receiver_node *rn = t->private_data;
        struct private_dccp_recv_data *pdd = rn->private_data;
 
-       if (!select_ret || !pdd || !FD_ISSET(pdd->fd, rfds))
-               return 1; /* nothing to do */
+       t->ret = 1;
+       if (!s->select_ret || !pdd || !FD_ISSET(pdd->fd, &s->rfds))
+               return; /* nothing to do */
+       t->ret = -E_DCCP_OVERRUN;
        if (rn->loaded >= DCCP_BUFSIZE)
-               return -E_DCCP_OVERRUN;
-       ret = recv_bin_buffer(pdd->fd, rn->buf + rn->loaded,
+               return;
+       t->ret = recv_bin_buffer(pdd->fd, rn->buf + rn->loaded,
                DCCP_BUFSIZE - rn->loaded);
-       if (ret <= 0) {
-               PARA_INFO_LOG("%s\n", ret? PARA_STRERROR(-ret) : "eof");
-               return ret;
+       if (t->ret <= 0) {
+               rn->eof = 1;
+               if (!t->ret)
+                       t->ret = -E_DCCP_RECV_EOF;
+               return;
        }
-       rn->loaded += ret;
-       return 1;
+       rn->loaded += t->ret;
 }
 
 /**
diff --git a/error.h b/error.h
index 1505fe2..aa5666b 100644 (file)
--- a/error.h
+++ b/error.h
@@ -35,6 +35,7 @@ enum para_subsystem {
        SS_AUDIOD,
        SS_EXEC,
        SS_STDIN,
+       SS_STDOUT,
        SS_SIGNAL,
        SS_STRING,
        SS_STAT,
@@ -91,6 +92,12 @@ extern const char **para_errlist[];
        PARA_ERROR(STDIN_READ, "failed to read from stdin"), \
        PARA_ERROR(STDIN_EOF, "end of file"), \
 
+
+#define STDOUT_ERRORS \
+       PARA_ERROR(STDOUT_WRITE, "failed to write to stdout"), \
+       PARA_ERROR(STDOUT_EOF, "end of file"), \
+
+
 #define NET_ERRORS \
        PARA_ERROR(SEND, "send error"), \
        PARA_ERROR(RECV, "receive error"), \
@@ -115,12 +122,14 @@ extern const char **para_errlist[];
        PARA_ERROR(TOO_MANY_BAD_CHUNKS, "too many consecutive bad chunks"), \
        PARA_ERROR(INVALID_HEADER, "invalid header packet"), \
        PARA_ERROR(OVERRUN, "outout buffer overrun"), \
+       PARA_ERROR(ORTP_RECV_EOF, "ortp_recv: end of file"), \
 
 
 #define HTTP_RECV_ERRORS \
        PARA_ERROR(SEND_HTTP_REQUEST, "failed to send http request"), \
        PARA_ERROR(MISSING_OK, "did not receive OK message from peer"), \
-       PARA_ERROR(HTTP_RECV_BUF, "did not receive buffer")
+       PARA_ERROR(HTTP_RECV_BUF, "did not receive buffer"), \
+       PARA_ERROR(HTTP_RECV_EOF, "http_recv: end of file"), \
 
 
 #define RECV_ERRORS \
@@ -318,6 +327,7 @@ extern const char **para_errlist[];
        PARA_ERROR(DCCP_SOCKET, "can not create dccp socket"), \
        PARA_ERROR(DCCP_PACKET_SIZE, "failed to set dccp packet size"), \
        PARA_ERROR(DCCP_SERVICE, "could not get service code"), \
+       PARA_ERROR(DCCP_RECV_EOF, "dccp_recv: end of file"), \
 
 
 #define DCCP_RECV_ERRORS \
@@ -458,6 +468,7 @@ extern const char **para_errlist[];
 SS_ENUM(GUI);
 SS_ENUM(SCHED);
 SS_ENUM(STDIN);
+SS_ENUM(STDOUT);
 SS_ENUM(WAV);
 SS_ENUM(COMPRESS);
 SS_ENUM(TIME);
index 5f1d779..b4d3860 100644 (file)
 #include "para.h"
 
 #include "http.h"
+#include "list.h"
+#include "sched.h"
 #include "recv.h"
 #include "http_recv.cmdline.h"
 #include "error.h"
 #include "net.h"
 #include "string.h"
+#include "fd.h"
 
 /** the output buffer size of the http receiver */
 #define BUFSIZE (32 * 1024)
@@ -89,61 +92,62 @@ static char *make_request_msg(void)
        return ret;
 }
 
-static int http_pre_select(struct receiver_node *rn, fd_set *rfds, fd_set *wfds,
-       __a_unused struct timeval *timeout)
+static void http_recv_pre_select(struct sched *s, struct task *t)
 {
+       struct receiver_node *rn = t->private_data;
        struct private_http_recv_data *phd = rn->private_data;
 
+       t->ret = 1;
        if  (phd->status == HTTP_CONNECTED)
-               FD_SET(phd->fd, wfds);
+               para_fd_set(phd->fd, &s->wfds, &s->max_fileno);
        else
-               FD_SET(phd->fd, rfds);
-       return phd->fd;
+               para_fd_set(phd->fd, &s->rfds, &s->max_fileno);
 }
 
-static int http_post_select(struct receiver_node *rn, int select_ret,
-               fd_set *rfds, fd_set *wfds)
+
+static void http_recv_post_select(struct sched *s, struct task *t)
 {
-       int ret;
+       struct receiver_node *rn = t->private_data;
        struct private_http_recv_data *phd = rn->private_data;
 
-       if (!select_ret) /* we're not interested in timeouts */
-               return 1;
+       t->ret = 1;
+       if (!s->select_ret) /* we're not interested in timeouts */
+               return;
        if  (phd->status == HTTP_CONNECTED) {
                char *rq;
-               if (!FD_ISSET(phd->fd, wfds))
-                       return 1; /* nothing to do */
+               if (!FD_ISSET(phd->fd, &s->wfds))
+                       return; /* nothing to do */
                rq = make_request_msg();
                PARA_NOTICE_LOG("%s", "sending http request\n");
-               ret = send_va_buffer(phd->fd, "%s", rq);
+               t->ret = send_va_buffer(phd->fd, "%s", rq);
                free(rq);
-               if (ret < 0)
-                       return E_SEND_HTTP_REQUEST;
-               phd->status = HTTP_SENT_GET_REQUEST;
-               return 1;
+               if (t->ret > 0)
+                       phd->status = HTTP_SENT_GET_REQUEST;
+               return;
        }
-       if (!FD_ISSET(phd->fd, rfds))
-               return 1; /* nothing to do */
+       if (!FD_ISSET(phd->fd, &s->rfds))
+               return; /* nothing to do */
        if (phd->status == HTTP_SENT_GET_REQUEST) {
-               ret = recv_pattern(phd->fd, HTTP_OK_MSG, MAXLINE);
-               if (ret < 0)
-                       return -E_MISSING_OK;
+               t->ret = recv_pattern(phd->fd, HTTP_OK_MSG, MAXLINE);
+               if (t->ret < 0)
+                       return;
                PARA_NOTICE_LOG("%s", "received ok msg, streaming\n");
                phd->status = HTTP_STREAMING;
-               return 1;
+               return;
        }
+       t->ret = -E_OVERRUN;
        /* already streaming */
-       if (rn->loaded >= BUFSIZE) {
-               PARA_ERROR_LOG("%s", "buffer overrun\n");
-               return -E_OVERRUN;
+       if (rn->loaded >= BUFSIZE)
+               return;
+       t->ret = recv_bin_buffer(phd->fd, rn->buf + rn->loaded,
+               BUFSIZE - rn->loaded);
+       if (t->ret <= 0) {
+               rn->eof = 1;
+               if (!t->ret)
+                       t->ret = -E_HTTP_RECV_EOF;
+               return;
        }
-       ret = recv_bin_buffer(phd->fd, rn->buf + rn->loaded, BUFSIZE - rn->loaded);
-       if (ret <= 0) {
-               PARA_NOTICE_LOG("recv returned %d/%zd\n", ret, BUFSIZE - rn->loaded);
-               return ret < 0? -E_HTTP_RECV_BUF : 0;
-       }
-       rn->loaded += ret;
-       return 1;
+       rn->loaded += t->ret;
 }
 
 static void http_recv_close(struct receiver_node *rn)
@@ -209,8 +213,8 @@ void http_recv_init(struct receiver *r)
 {
        r->open = http_recv_open;
        r->close = http_recv_close;
-       r->pre_select = http_pre_select;
-       r->post_select = http_post_select;
+       r->pre_select = http_recv_pre_select;
+       r->post_select = http_recv_post_select;
        r->shutdown = http_shutdown;
        r->parse_config = http_recv_parse_config;
 }
index 905d580..99b53ad 100644 (file)
@@ -21,6 +21,8 @@
 #include "para.h"
 
 #include "ortp.h"
+#include "list.h"
+#include "sched.h"
 #include "recv.h"
 #include "ortp_recv.cmdline.h"
 
@@ -66,21 +68,19 @@ uint32_t timestamp;
 uint32_t chunk_ts;
 };
 
-static int ortp_recv_pre_select(struct receiver_node *rn,
-               __a_unused fd_set *rfds, __a_unused fd_set *wfds,
-               struct timeval *timeout)
+static void ortp_recv_pre_select(struct sched *s, struct task *t)
 {
+       struct receiver_node *rn = t->private_data;
        struct private_ortp_recv_data *pord = rn->private_data;
-       struct timeval now, tmp;
+       struct timeval tmp;
 
-       gettimeofday(&now, NULL);
-       if (tv_diff(&now, &pord->next_chunk, &tmp) >= 0) {
+       if (tv_diff(&s->now, &pord->next_chunk, &tmp) >= 0) {
                tmp.tv_sec = 0;
                tmp.tv_usec = 1000;
        }
-       if (tv_diff(timeout, &tmp, NULL) > 0)
-               *timeout = tmp;
-       return -1; /* we did not modify the fd sets */
+       if (tv_diff(&s->timeout, &tmp, NULL) > 0)
+               s->timeout = tmp;
+       t->ret = 1;
 }
 
 static void compute_next_chunk(unsigned chunk_time,
@@ -97,47 +97,43 @@ static void compute_next_chunk(unsigned chunk_time,
                pord->next_chunk.tv_usec);
 }
 
-static int ortp_recv_post_select(struct receiver_node *rn,
-               __a_unused int select_ret, __a_unused fd_set *rfds,
-               __a_unused fd_set *wfds)
+static void ortp_recv_post_select(struct sched *s, struct task *t)
 {
+       struct receiver_node *rn = t->private_data;
        struct private_ortp_recv_data *pord = rn->private_data;
        mblk_t *mp;
-       int ret, packet_type, stream_type;
+       int packet_type, stream_type;
        char tmpbuf[CHUNK_SIZE + 3];
-       struct timeval now;
        unsigned chunk_time;
 
-       gettimeofday(&now, NULL);
 //     PARA_DEBUG_LOG("rn: %p, pord: %p, session: %p\n", rn, pord, pord->session);
-       if (pord->start.tv_sec) {
-               struct timeval diff;
-               if (tv_diff(&now, &pord->next_chunk, &diff) < 0)
-                               return 1;
-       }
+       t->ret = 1;
+       if (pord->start.tv_sec)
+               if (tv_diff(&s->now, &pord->next_chunk, NULL) < 0)
+                       return;
        mp = rtp_session_recvm_with_ts(pord->session, pord->timestamp);
        if (!mp) {
                struct timeval min_delay = {0, 100};
 //             PARA_INFO_LOG("nope, chunk_ts = %d, loaded: %d, bad: %d\n",
 //                     pord->timestamp, rn->loaded, pord->c_bad);
                pord->c_bad++;
+               t->ret = -E_TOO_MANY_BAD_CHUNKS;
                if ((pord->c_bad > 5000 && pord->start.tv_sec) || pord->c_bad > 10000)
-                       return -E_TOO_MANY_BAD_CHUNKS;
-               tv_add(&now, &min_delay, &pord->next_chunk);
-               return 1;
+                       return;
+               t->ret = 1;
+               tv_add(&s->now, &min_delay, &pord->next_chunk);
+               return;
        }
        /* okay, we have a chunk of data */
        if (!pord->start.tv_sec)
-               pord->start = now;
-       ret = msg_to_buf(mp, tmpbuf, CHUNK_SIZE);
-//     PARA_DEBUG_LOG("have it ts = %d, chunk_ts = %d, loaded: %d, "
-//             "bad: %d, len: %d\n", pord->timestamp, pord->chunk_ts,
-//             rn->loaded, pord->c_bad, ret);
-       if (ret < ORTP_AUDIO_HEADER_LEN) {
-               if (ret < 0)
-                       ret = -E_MSG_TO_BUF;
+               pord->start = s->now;
+       t->ret = msg_to_buf(mp, tmpbuf, CHUNK_SIZE);
+       if (t->ret < ORTP_AUDIO_HEADER_LEN) {
+               rn->eof = 1;
+               if (t->ret < 0)
+                       t->ret = -E_MSG_TO_BUF;
                else
-                       ret = 0;
+                       t->ret = -E_ORTP_RECV_EOF;
                goto err_out;
        }
        packet_type = READ_PACKET_TYPE(tmpbuf);
@@ -150,65 +146,67 @@ static int ortp_recv_post_select(struct receiver_node *rn,
        switch (packet_type) {
        unsigned header_len, payload_len;
        case ORTP_EOF:
-               ret = 0;
+               rn->eof = 1;
+               t->ret = -E_ORTP_RECV_EOF;
                goto err_out;
        case ORTP_BOF:
-               PARA_INFO_LOG("bof (%d)\n", ret);
+               PARA_INFO_LOG("bof (%d)\n", t->ret);
                pord->have_header = 1;
                /* fall through */
        case ORTP_DATA:
                if (!pord->have_header && stream_type)
                /* can't use the data, wait for header */
                        goto success;
-               if (ret + rn->loaded >= CHUNK_SIZE + ORTP_AUDIO_HEADER_LEN) {
-                       ret = -E_OVERRUN;
+               if (t->ret + rn->loaded >= CHUNK_SIZE + ORTP_AUDIO_HEADER_LEN) {
+                       t->ret = -E_OVERRUN;
                        goto err_out;
                }
-               if (ret > ORTP_AUDIO_HEADER_LEN) {
+               if (t->ret > ORTP_AUDIO_HEADER_LEN) {
                        memcpy(rn->buf + rn->loaded, tmpbuf + ORTP_AUDIO_HEADER_LEN,
-                               ret - ORTP_AUDIO_HEADER_LEN);
-                       rn->loaded += ret - ORTP_AUDIO_HEADER_LEN;
+                               t->ret - ORTP_AUDIO_HEADER_LEN);
+                       rn->loaded += t->ret - ORTP_AUDIO_HEADER_LEN;
                }
                goto success;
        case ORTP_HEADER:
                header_len = READ_HEADER_LEN(tmpbuf);
                PARA_DEBUG_LOG("header packet (%d bytes), header len: %d\n",
-                       ret, header_len);
+                       t->ret, header_len);
                if (!pord->have_header) {
                        pord->have_header = 1;
                        memcpy(rn->buf, tmpbuf + ORTP_AUDIO_HEADER_LEN,
-                               ret - ORTP_AUDIO_HEADER_LEN);
-                       rn->loaded = ret - ORTP_AUDIO_HEADER_LEN;
+                               t->ret - ORTP_AUDIO_HEADER_LEN);
+                       rn->loaded = t->ret - ORTP_AUDIO_HEADER_LEN;
                        goto success;
                }
-               if (header_len + ORTP_AUDIO_HEADER_LEN > ret) {
-                       ret = -E_INVALID_HEADER;
+               if (header_len + ORTP_AUDIO_HEADER_LEN > t->ret) {
+                       t->ret = -E_INVALID_HEADER;
                        goto err_out;
                }
-               payload_len = ret - ORTP_AUDIO_HEADER_LEN - header_len;
+               payload_len = t->ret - ORTP_AUDIO_HEADER_LEN - header_len;
 //             PARA_INFO_LOG("len: %d header_len: %d, payload_len: %d, loaded: %d\n", ret,
 //                     header_len, payload_len, rn->loaded);
                if (rn->loaded + payload_len > CHUNK_SIZE) {
-                       ret = -E_OVERRUN;
+                       t->ret = -E_OVERRUN;
                        goto err_out;
                }
                if (payload_len)
                        memcpy(rn->buf + rn->loaded, tmpbuf
-                               + (ret - payload_len), payload_len);
+                               + (t->ret - payload_len), payload_len);
                rn->loaded += payload_len;
                goto success;
        }
 success:
+       t->ret = 1;
        freemsg(mp);
        if (pord->c_bad) {
                pord->c_bad = 0;
-               pord->next_chunk = now;
+               pord->next_chunk = s->now;
        }
        compute_next_chunk(chunk_time, pord);
-       return 1;
+       return;
 err_out:
        freemsg(mp);
-       return ret;
+       return;
 }
 
 static void ortp_shutdown(void)
diff --git a/recv.c b/recv.c
index af15eb0..4b84e8b 100644 (file)
--- a/recv.c
+++ b/recv.c
  */
 #include "para.h"
 
+#include "list.h"
+#include "sched.h"
 #include "recv.h"
 #include "recv.cmdline.h"
 #include "fd.h"
 #include "error.h"
+#include "stdout.h"
 
 struct gengetopt_args_info conf;
 
@@ -55,6 +58,7 @@ static void *parse_config(int argc, char *argv[], int *receiver_num)
        return check_receiver_arg(conf.receiver_arg, receiver_num);
 }
 
+#if 0
 int main(int argc, char *argv[])
 {
        int ret, eof = 0, max, r_opened = 0, receiver_num;
@@ -125,3 +129,77 @@ out:
                PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret));
        return ret;
 }
+#endif
+
+void rn_event_handler(struct task *t)
+{
+       PARA_ERROR_LOG("%s\n", PARA_STRERROR(-t->ret));
+       unregister_task(t);
+}
+
+void stdout_event_handler(struct task *t)
+{
+       PARA_ERROR_LOG("%s\n", PARA_STRERROR(-t->ret));
+       unregister_task(t);
+}
+
+int main(int argc, char *argv[])
+{
+       int ret, eof = 0, max, r_opened = 0, receiver_num;
+       struct timeval timeout;
+       struct  receiver *r = NULL;
+       fd_set rfds, wfds;
+       struct receiver_node rn;
+       struct stdout_task sot;
+       struct sched s;
+
+       init_sched();
+       s.default_timeout.tv_sec = 1;
+       s.default_timeout.tv_usec = 0;
+
+       memset(&rn, 0, sizeof(struct receiver_node));
+       for (ret = 0; receivers[ret].name; ret++)
+               receivers[ret].init(&receivers[ret]);
+       ret = -E_RECV_SYNTAX;
+       rn.conf = parse_config(argc, argv, &receiver_num);
+       if (!rn.conf) {
+               PARA_EMERG_LOG("%s", "parse failed\n");
+               goto out;
+       }
+       r = &receivers[receiver_num];
+       rn.receiver = r;
+       ret = r->open(&rn);
+       if (ret < 0)
+               goto out;
+       r_opened = 1;
+
+       sot.task.private_data = &sot;
+       sot.task.pre_select = stdout_pre_select;
+       sot.task.post_select = stdout_post_select;
+       sot.task.event_handler = stdout_event_handler;
+       sot.task.flags = 0;
+       sprintf(sot.task.status, "stdout writer");
+       sot.buf = rn.buf;
+       sot.loaded = &rn.loaded;
+       sot.eof = &rn.eof;
+       register_task(&sot.task);
+
+       rn.task.private_data = &rn;
+       rn.task.pre_select = r->pre_select;
+       rn.task.post_select = r->post_select;
+       rn.task.event_handler = rn_event_handler;
+       rn.task.flags = 0;
+       sprintf(rn.task.status, "receiver node");
+       register_task(&rn.task);
+
+
+       ret = sched(&s);
+out:
+       if (r_opened)
+               r->close(&rn);
+       if (r)
+               r->shutdown();
+       if (ret < 0)
+               PARA_ERROR_LOG("%s\n", PARA_STRERROR(-ret));
+       return ret;
+}
diff --git a/recv.h b/recv.h
index f7fc725..3c8b543 100644 (file)
--- a/recv.h
+++ b/recv.h
@@ -34,6 +34,8 @@ struct receiver_node {
        int eof;
        /** pointer to the configuration data for this instance */
        void *conf;
+       /** the task associated with this instance */
+       struct task task;
 };
 
 /**
@@ -111,38 +113,26 @@ struct receiver {
  *
  * The pre_select function gets called from the driving application before
  * entering its select loop. The receiver may use this hook to add any file
- * descriptors to \a rfds and \a wfds in order to check the result later in the
- * post_select hook.
+ * descriptors to the sets of file descriptors given by \a s.
  *
- * \a timeout is a value-result parameter, initially containing the timeout for
- * select() which was set by the application or by another receiver node. If
- * the receiver wants its pre_select function to be called at some earlier time
- * than the time determined by \a timeout, it may set \a timeout to an
- * appropriate smaller value. However, it must never increase this timeout.
  *
- * This function must return the highest-numbered descriptor it wants to being
- * checked, or -1 if no file descriptors should be checked for this run.
- *
- * \sa select(2), receiver_node:private_data, time.c
+ * \sa select(2), time.c struct task, struct sched
  */
-       int (*pre_select)(struct receiver_node *rn, fd_set *rfds,
-               fd_set *wfds, struct timeval *timeout);
+       void (*pre_select)(struct sched *s, struct task *t);
 /**
  *
  *
  * evaluate the result from select()
  *
- * If the call to select() was succesful, this hook gets called. It should
- * check all file descriptors which were added to any of the the fd sets during
- * the previous call to pre_select. According to the result, it may then use
- * any non-blocking I/O to establish a connection or to receive the audio data.
+ * This hook gets called after the call to select(). It should check all file
+ * descriptors which were added to any of the the fd sets during the previous
+ * call to pre_select. According to the result, it may then use any
+ * non-blocking I/O to establish a connection or to receive the audio data.
  *
- * A negative return value is interpreted as an error.
  *
  * \sa select(2), struct receiver
  */
-       int (*post_select)(struct receiver_node *rn, int select_ret,
-               fd_set *rfds, fd_set *wfds);
+       void (*post_select)(struct sched *s, struct task *t);
 };
 
 
index 078e44d..e39719d 100644 (file)
@@ -20,6 +20,8 @@
 
 #include "para.h"
 
+#include "list.h"
+#include "sched.h"
 #include "recv.h"
 #include "string.h"
 
diff --git a/stdout.c b/stdout.c
new file mode 100644 (file)
index 0000000..8370305
--- /dev/null
+++ b/stdout.c
@@ -0,0 +1,40 @@
+#include "para.h"
+#include "string.h"
+#include "list.h"
+#include "sched.h"
+#include "fd.h"
+#include "error.h"
+#include "stdout.h"
+
+void stdout_pre_select(struct sched *s, struct task *t)
+{
+       struct stdout_task *sot = t->private_data;
+
+       t->ret = 1;
+       sot->check_fd = 0;
+       if (!*sot->loaded)
+               return;
+       sot->check_fd = 1;
+       para_fd_set(STDOUT_FILENO, &s->wfds, &s->max_fileno);
+}
+
+void stdout_post_select(struct sched *s, struct task *t)
+{
+       struct stdout_task *sot = t->private_data;
+       ssize_t ret;
+
+       t->ret = 1;
+       if (!sot->check_fd) {
+               if (*sot->eof)
+                       t->ret = -E_STDOUT_EOF;
+               return;
+       }
+       if (!FD_ISSET(STDOUT_FILENO, &s->wfds))
+               return;
+       t->ret = -E_STDOUT_WRITE;
+       ret = write(STDOUT_FILENO, sot->buf, *sot->loaded);
+       if (ret <= 0)
+               return;
+       *sot->loaded -= ret;
+       t->ret = 1;
+}
diff --git a/stdout.h b/stdout.h
new file mode 100644 (file)
index 0000000..f02483d
--- /dev/null
+++ b/stdout.h
@@ -0,0 +1,11 @@
+struct stdout_task {
+       char *buf;
+       size_t *bufsize;
+       size_t *loaded;
+       int *eof;
+       struct task task;
+       int check_fd;
+};
+
+void stdout_pre_select(struct sched *s, struct task *t);
+void stdout_post_select(struct sched *s, struct task *t);
diff --git a/write.c b/write.c
index 7b8ddd6..5bcd1f6 100644 (file)
--- a/write.c
+++ b/write.c
@@ -199,7 +199,7 @@ static void stdin_event_handler(struct task *t)
        if (t->ret != -E_STDIN_EOF)
                PARA_ERROR_LOG("%s\n", PARA_STRERROR(-t->ret));
        else
-               PARA_ERROR_LOG("%s\n", PARA_STRERROR(-t->ret));
+               PARA_INFO_LOG("%s\n", PARA_STRERROR(-t->ret));
 }
 
 int main(int argc, char *argv[])