*/
#include "para.h"
+#include "list.h"
+#include "sched.h"
#include "recv.h"
#include "recv.cmdline.h"
+#include "fd.h"
#include "error.h"
+#include "stdout.h"
-struct gengetopt_args_info conf;
+struct recv_args_info conf;
INIT_RECV_ERRLISTS;
{
int i;
- if (cmdline_parser(argc, argv, &conf))
+ if (recv_cmdline_parser(argc, argv, &conf))
return NULL;
if (conf.list_receivers_given) {
printf("available receivers: ");
return check_receiver_arg(conf.receiver_arg, receiver_num);
}
+void rn_event_handler(struct task *t)
+{
+ struct receiver_node *rn = t->private_data;
+ PARA_NOTICE_LOG("%s\n", PARA_STRERROR(-t->ret));
+ rn->eof = 1;
+ unregister_task(t);
+}
+
int main(int argc, char *argv[])
{
- int ret, eof = 0, max, r_opened = 0, receiver_num;
- struct timeval timeout;
+ int ret, r_opened = 0, receiver_num;
struct receiver *r = NULL;
- fd_set rfds, wfds;
struct receiver_node rn;
+ struct stdout_task sot;
+ struct sched s;
+
+ s.default_timeout.tv_sec = 1;
+ s.default_timeout.tv_usec = 0;
memset(&rn, 0, sizeof(struct receiver_node));
for (ret = 0; receivers[ret].name; ret++)
if (ret < 0)
goto out;
r_opened = 1;
-recv:
- FD_ZERO(&rfds);
- FD_ZERO(&wfds);
- timeout.tv_sec = 0;
- timeout.tv_usec = 1000 * 1000;
- max = -1;
- ret = r->pre_select(&rn, &rfds, &wfds, &timeout);
- max = MAX(max, ret);
- PARA_DEBUG_LOG("timeout: %lums\n", tv2ms(&timeout));
- ret = select(max + 1, &rfds, &wfds, NULL, &timeout);
- if (ret < 0) {
- if (errno == EINTR || errno == EAGAIN)
- goto recv;
- ret = -E_RECV_SELECT;
- goto out;
- }
- ret = r->post_select(&rn, ret, &rfds, &wfds);
- if (ret < 0)
- goto out;
- if (!ret)
- eof = 1;
- if (!rn.loaded) {
- if (eof)
- goto out;
- goto recv;
- }
- ret = write(STDOUT_FILENO, rn.buf, rn.loaded);
- PARA_DEBUG_LOG("wrote %d/%zd\n", ret, rn.loaded);
- if (ret < 0) {
- ret = -E_WRITE_STDOUT;
- goto out;
- }
- if (ret != rn.loaded) {
- PARA_INFO_LOG("short write %d/%zd\n", ret, rn.loaded);
- memmove(rn.buf, rn.buf + ret, rn.loaded - ret);
- }
- rn.loaded -= ret;
- if (rn.loaded || !eof)
- goto recv;
+ stdout_set_defaults(&sot);
+ sot.buf = rn.buf;
+ sot.loaded = &rn.loaded;
+ sot.input_eof = &rn.eof;
+ register_task(&sot.task);
+
+ rn.task.private_data = &rn;
+ rn.task.pre_select = r->pre_select;
+ rn.task.post_select = r->post_select;
+ rn.task.event_handler = rn_event_handler;
+ sprintf(rn.task.status, "receiver node");
+ register_task(&rn.task);
+
+
+ ret = sched(&s);
out:
if (r_opened)
r->close(&rn);