]> git.tuebingen.mpg.de Git - paraslash.git/blob - buffer_tree.c
udp_recv: Use buffer pool API.
[paraslash.git] / buffer_tree.c
1 #include <regex.h>
2 #include <stdbool.h>
3
4 #include "para.h"
5 #include "list.h"
6 #include "string.h"
7 #include "buffer_tree.h"
8 #include "error.h"
9 #include "sched.h"
10
11 struct btr_pool {
12         char *area_start;
13         char *area_end;
14         char *rhead;
15         char *whead;
16 };
17
18 enum btr_buffer_flags {
19         /* changes the way the buffer is deallocated */
20         BTR_BF_BTR_POOL = 1,
21 };
22
23 struct btr_buffer {
24         char *buf;
25         size_t size;
26         /** The number of references to this buffer. */
27         int refcount;
28         struct btr_pool *pool;
29 };
30
31 struct btr_buffer_reference {
32         struct btr_buffer *btrb;
33         size_t consumed;
34         /* Each buffer reference belongs to the buffer queue list of some buffer tree node. */
35         struct list_head node;
36         size_t wrap_count;
37 };
38
39 struct btr_node {
40         char *name;
41         struct btr_node *parent;
42         /* The position of this btr node in the buffer tree. */
43         struct list_head node;
44         /* The children nodes of this btr node are linked together in a list. */
45         struct list_head children;
46         /* Time of first data transfer. */
47         struct timeval start;
48         /**
49          * The input queue is a list of references to btr buffers. Each item on
50          * the list represents an input buffer which has not been completely
51          * used by this btr node.
52          */
53         struct list_head input_queue;
54         btr_command_handler execute;
55         void *context;
56 };
57
58 struct btr_pool *btr_pool_new(size_t area_size)
59 {
60         struct btr_pool *btrp = para_malloc(sizeof(*btrp));
61
62         btrp->area_start = para_malloc(area_size);
63         btrp->area_end = btrp->area_start + area_size;
64         btrp->rhead = btrp->area_start;
65         btrp->whead = btrp->area_start;
66         return btrp;
67 }
68
69 /* whead = NULL means area full */
70
71 void btr_pool_free(struct btr_pool *btrp)
72 {
73         if (!btrp)
74                 return;
75         free(btrp->area_start);
76         free(btrp);
77 }
78
79 size_t btr_pool_size(struct btr_pool *btrp)
80 {
81         return btrp->area_end - btrp->area_start;
82 }
83
84 size_t btr_pool_filled(struct btr_pool *btrp)
85 {
86         if (!btrp->whead)
87                 return btr_pool_size(btrp);
88         if (btrp->rhead <= btrp->whead)
89                 return  btrp->whead - btrp->rhead;
90         return btr_pool_size(btrp) - (btrp->rhead - btrp->whead);
91 }
92
93 size_t btr_pool_unused(struct btr_pool *btrp)
94 {
95         return btr_pool_size(btrp) - btr_pool_filled(btrp);
96 }
97
98 size_t btr_pool_available(struct btr_pool *btrp)
99 {
100         if (!btrp->whead)
101                 return 0;
102         if (btrp->rhead <= btrp->whead)
103                 return btrp->area_end - btrp->whead;
104         return btrp->rhead - btrp->whead;
105 }
106
107 size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result)
108 {
109         if (result)
110                 *result = btrp->whead;
111         return btr_pool_available(btrp);
112 }
113
114 void btr_pool_allocate(struct btr_pool *btrp, size_t size)
115 {
116         char *end;
117
118         if (size == 0)
119                 return;
120         //PARA_CRIT_LOG("filled: %zu, alloc %zu\n", btr_pool_filled(btrp), size);
121         assert(size <= btr_pool_available(btrp));
122         end = btrp->whead + size;
123         assert(end <= btrp->area_end);
124
125         if (end == btrp->area_end) {
126                 PARA_DEBUG_LOG("end of pool area reached: %p\n", end);
127                 end = btrp->area_start;
128         }
129         if (end == btrp->rhead) {
130                 PARA_DEBUG_LOG("btrp buffer full\n");
131                 end = NULL; /* buffer full */
132         }
133         btrp->whead = end;
134         //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
135 }
136
137 static void btr_pool_deallocate(struct btr_pool *btrp, size_t size)
138 {
139         char *end = btrp->rhead + size;
140
141         //PARA_CRIT_LOG("filled: %zu, dealloc %zu\n", btr_pool_filled(btrp), size);
142         if (size == 0)
143                 return;
144         assert(end <= btrp->area_end);
145         assert(size <= btr_pool_filled(btrp));
146         if (end == btrp->area_end)
147                 end = btrp->area_start;
148         if (!btrp->whead)
149                 btrp->whead = btrp->rhead;
150         btrp->rhead = end;
151         if (btrp->rhead == btrp->whead)
152                 btrp->rhead = btrp->whead = btrp->area_start;
153         //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
154 }
155
156 #define FOR_EACH_CHILD(_tn, _btrn) list_for_each_entry((_tn), \
157         &((_btrn)->children), node)
158 #define FOR_EACH_CHILD_SAFE(_tn, _tmp, _btrn) \
159         list_for_each_entry_safe((_tn), (_tmp), &((_btrn)->children), node)
160
161 #define FOR_EACH_BUFFER_REF(_br, _btrn) \
162         list_for_each_entry((_br), &(_btrn)->input_queue, node)
163 #define FOR_EACH_BUFFER_REF_SAFE(_br, _tmp, _btrn) \
164         list_for_each_entry_safe((_br), (_tmp), &(_btrn)->input_queue, node)
165
166 struct btr_node *btr_new_node(const char *name, struct btr_node *parent,
167                 btr_command_handler handler, void *context)
168 {
169         struct btr_node *btrn = para_malloc(sizeof(*btrn));
170
171         btrn->name = para_strdup(name);
172         btrn->parent = parent;
173         btrn->execute = handler;
174         btrn->context = context;
175         btrn->start.tv_sec = 0;
176         btrn->start.tv_usec = 0;
177         if (parent)
178                 list_add_tail(&btrn->node, &parent->children);
179         INIT_LIST_HEAD(&btrn->children);
180         INIT_LIST_HEAD(&btrn->input_queue);
181         if (parent)
182                 PARA_INFO_LOG("added %s as child of %s\n", name, parent->name);
183         else
184                 PARA_INFO_LOG("added %s as btr root\n", name);
185         return btrn;
186 }
187
188 /*
189  * Allocate a new btr buffer.
190  *
191  * The freshly allocated buffer will have a zero refcount.
192  */
193 static struct btr_buffer *new_btrb(char *buf, size_t size)
194 {
195         struct btr_buffer *btrb = para_calloc(sizeof(*btrb));
196
197         btrb->buf = buf;
198         btrb->size = size;
199         return btrb;
200 }
201
202 static void dealloc_buffer(struct btr_buffer *btrb)
203 {
204         if (btrb->pool)
205                 btr_pool_deallocate(btrb->pool, btrb->size);
206         else
207                 free(btrb->buf);
208 }
209
210 static struct btr_buffer_reference *get_first_input_br(struct btr_node *btrn)
211 {
212         if (list_empty(&btrn->input_queue))
213                 return NULL;
214         return list_first_entry(&btrn->input_queue,
215                 struct btr_buffer_reference, node);
216 }
217
218 /*
219  * Deallocate the reference, release the resources if refcount drops to zero.
220  */
221 static void btr_drop_buffer_reference(struct btr_buffer_reference *br)
222 {
223         struct btr_buffer *btrb = br->btrb;
224
225         //PARA_CRIT_LOG("dropping buffer reference %p\n", br);
226         list_del(&br->node);
227         free(br);
228         btrb->refcount--;
229         if (btrb->refcount == 0) {
230                 dealloc_buffer(btrb);
231                 free(btrb);
232         }
233 }
234
235 static void add_btrb_to_children(struct btr_buffer *btrb,
236                 struct btr_node *btrn, size_t consumed)
237 {
238         struct btr_node *ch;
239
240         if (btrn->start.tv_sec == 0)
241                 btrn->start = *now;
242         FOR_EACH_CHILD(ch, btrn) {
243                 struct btr_buffer_reference *br = para_calloc(sizeof(*br));
244                 br->btrb = btrb;
245                 br->consumed = consumed;
246                 list_add_tail(&br->node, &ch->input_queue);
247                 btrb->refcount++;
248                 if (ch->start.tv_sec == 0)
249                         ch->start = *now;
250         }
251 }
252
253 void btr_add_output(char *buf, size_t size, struct btr_node *btrn)
254 {
255         struct btr_buffer *btrb;
256
257         assert(size != 0);
258         if (list_empty(&btrn->children)) {
259                 free(buf);
260                 return;
261         }
262         btrb = new_btrb(buf, size);
263         add_btrb_to_children(btrb, btrn, 0);
264 }
265
266 void btr_add_output_pool(struct btr_pool *btrp, char *buf, size_t size,
267         struct btr_node *btrn)
268 {
269         struct btr_buffer *btrb;
270
271         assert(size != 0);
272         if (list_empty(&btrn->children)) {
273                 btr_pool_deallocate(btrp, size);
274                 return;
275         }
276         btrb = new_btrb(buf, size);
277         btrb->pool = btrp;
278         add_btrb_to_children(btrb, btrn, 0);
279 }
280
281 static void btr_pushdown_br(struct btr_buffer_reference *br, struct btr_node *btrn)
282 {
283         add_btrb_to_children(br->btrb, btrn, br->consumed);
284         btr_drop_buffer_reference(br);
285 }
286
287 void btr_pushdown(struct btr_node *btrn)
288 {
289         struct btr_buffer_reference *br, *tmp;
290
291         FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
292                 btr_pushdown_br(br, btrn);
293 }
294
295 int btr_pushdown_one(struct btr_node *btrn)
296 {
297         struct btr_buffer_reference *br;
298
299         if (list_empty(&btrn->input_queue))
300                 return 0;
301         br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
302         btr_pushdown_br(br, btrn);
303         return 1;
304 }
305
306 /* Return true if this node has no children. */
307 bool btr_no_children(struct btr_node *btrn)
308 {
309         return list_empty(&btrn->children);
310 }
311
312 bool btr_no_parent(struct btr_node *btrn)
313 {
314         return !btrn->parent;
315 }
316
317 bool btr_inplace_ok(struct btr_node *btrn)
318 {
319         if (!btrn->parent)
320                 return true;
321         return list_is_singular(&btrn->parent->children);
322 }
323
324 static inline size_t br_available_bytes(struct btr_buffer_reference *br)
325 {
326         return br->btrb->size - br->consumed;
327 }
328
329 size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
330 {
331         if (buf)
332                 *buf = br->btrb->buf + br->consumed;
333         return br_available_bytes(br);
334 }
335
336 /**
337  * \return zero if the input buffer queue is empty.
338  */
339 size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
340 {
341         struct btr_buffer_reference *br;
342         char *buf, *result = NULL;
343         size_t sz, rv = 0;
344
345         FOR_EACH_BUFFER_REF(br, btrn) {
346                 sz = btr_get_buffer_by_reference(br, &buf);
347                 if (!result) {
348                         result = buf;
349                         rv = sz;
350                         if (!br->btrb->pool)
351                                 break;
352                         continue;
353                 }
354                 if (!br->btrb->pool)
355                         break;
356                 if (result + rv != buf) {
357                         PARA_DEBUG_LOG("%s: pool merge impossible: %p != %p\n",
358                                 btrn->name, result + rv, buf);
359                         break;
360                 }
361 //              PARA_CRIT_LOG("%s: inplace merge (%zu, %zu)->%zu\n", btrn->name,
362 //                      rv, sz, rv + sz);
363 //              PARA_CRIT_LOG("%s: inplace merge %p (%zu)\n", btrn->name,
364 //                      result, sz);
365                 rv += sz;
366         }
367         if (bufp)
368                 *bufp = result;
369         return rv;
370 }
371
372 void btr_consume(struct btr_node *btrn, size_t numbytes)
373 {
374         struct btr_buffer_reference *br, *tmp;
375         size_t sz;
376
377         if (numbytes == 0)
378                 return;
379         br = get_first_input_br(btrn);
380         assert(br);
381
382         //PARA_CRIT_LOG("wrap count: %zu\n", br->wrap_count);
383         if (br->wrap_count == 0) {
384                 /*
385                  * No wrap buffer. Drop buffer references whose buffer
386                  * has been fully used. */
387                 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn) {
388                         if (br->consumed + numbytes <= br->btrb->size) {
389                                 br->consumed += numbytes;
390                                 if (br->consumed == br->btrb->size)
391                                         btr_drop_buffer_reference(br);
392                                 return;
393                         }
394                         numbytes -= br->btrb->size - br->consumed;
395                         btr_drop_buffer_reference(br);
396                 }
397                 assert(true);
398         }
399         /*
400
401         We have a wrap buffer, consume from it. If in total,
402         i.e. including previous calls to brt_consume(), less than
403         wrap_count has been consumed, there's nothing more we can do.
404
405         Otherwise we drop the wrap buffer and consume from subsequent
406         buffers of the input queue the correct amount of bytes. This
407         is the total number of bytes that have been consumed from the
408         wrap buffer.
409 */
410         PARA_DEBUG_LOG("consuming %zu/%zu bytes from wrap buffer\n", numbytes,
411                 br_available_bytes(br));
412
413         assert(numbytes <= br_available_bytes(br));
414         if (br->consumed + numbytes < br->wrap_count) {
415                 br->consumed += numbytes;
416                 return;
417         }
418         PARA_DEBUG_LOG("dropping wrap buffer (%zu bytes)\n", br->btrb->size);
419         /* get rid of the wrap buffer */
420         sz = br->consumed + numbytes;
421         btr_drop_buffer_reference(br);
422         return btr_consume(btrn, sz);
423 }
424
425 static void flush_input_queue(struct btr_node *btrn)
426 {
427         struct btr_buffer_reference *br, *tmp;
428         FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
429                 btr_drop_buffer_reference(br);
430 }
431
432 void btr_free_node(struct btr_node *btrn)
433 {
434         if (!btrn)
435                 return;
436         free(btrn->name);
437         free(btrn);
438 }
439
440 void btr_remove_node(struct btr_node *btrn)
441 {
442         struct btr_node *ch;
443
444         if (!btrn)
445                 return;
446         PARA_NOTICE_LOG("removing btr node %s from buffer tree\n", btrn->name);
447         FOR_EACH_CHILD(ch, btrn)
448                 ch->parent = NULL;
449         flush_input_queue(btrn);
450         if (btrn->parent)
451                 list_del(&btrn->node);
452 }
453
454 size_t btr_get_input_queue_size(struct btr_node *btrn)
455 {
456         struct btr_buffer_reference *br;
457         size_t size = 0;
458
459         FOR_EACH_BUFFER_REF(br, btrn) {
460                 //PARA_CRIT_LOG("size: %zu\n", size);
461                 size += br_available_bytes(br);
462         }
463         return size;
464 }
465
466 void btr_splice_out_node(struct btr_node *btrn)
467 {
468         struct btr_node *ch, *tmp;
469
470         assert(btrn);
471         PARA_NOTICE_LOG("splicing out %s\n", btrn->name);
472         btr_pushdown(btrn);
473         if (btrn->parent)
474                 list_del(&btrn->node);
475         FOR_EACH_CHILD_SAFE(ch, tmp, btrn) {
476                 PARA_INFO_LOG("parent(%s): %s\n", ch->name,
477                         btrn->parent? btrn->parent->name : "NULL");
478                 ch->parent = btrn->parent;
479                 if (btrn->parent)
480                         list_move(&ch->node, &btrn->parent->children);
481         }
482         assert(list_empty(&btrn->children));
483 }
484
485 /**
486  * Return the size of the largest input queue.
487  *
488  * Iterates over all children of the given node.
489  */
490 size_t btr_bytes_pending(struct btr_node *btrn)
491 {
492         size_t max_size = 0;
493         struct btr_node *ch;
494
495         FOR_EACH_CHILD(ch, btrn) {
496                 size_t size = btr_get_input_queue_size(ch);
497                 max_size = PARA_MAX(max_size, size);
498         }
499         return max_size;
500 }
501
502 int btr_exec(struct btr_node *btrn, const char *command, char **value_result)
503 {
504         if (!btrn)
505                 return -ERRNO_TO_PARA_ERROR(EINVAL);
506         if (!btrn->execute)
507                 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
508         return btrn->execute(btrn, command, value_result);
509 }
510
511 int btr_exec_up(struct btr_node *btrn, const char *command, char **value_result)
512 {
513         int ret;
514
515         for (; btrn; btrn = btrn->parent) {
516                 struct btr_node *parent = btrn->parent;
517                 if (!parent)
518                         return -ERRNO_TO_PARA_ERROR(ENOTSUP);
519                 if (!parent->execute)
520                         continue;
521                 PARA_INFO_LOG("parent: %s, cmd: %s\n", parent->name, command);
522                 ret = parent->execute(parent, command, value_result);
523                 if (ret == -ERRNO_TO_PARA_ERROR(ENOTSUP))
524                         continue;
525                 if (ret < 0)
526                         return ret;
527                 if (value_result && *value_result)
528                         PARA_NOTICE_LOG("%s(%s): %s\n", command, parent->name,
529                                 *value_result);
530                 return 1;
531         }
532         return -ERRNO_TO_PARA_ERROR(ENOTSUP);
533 }
534
535 void *btr_context(struct btr_node *btrn)
536 {
537         return btrn->context;
538 }
539
540 static bool need_buffer_pool_merge(struct btr_node *btrn)
541 {
542         struct btr_buffer_reference *br = get_first_input_br(btrn);
543
544         if (!br)
545                 return false;
546         if (br->wrap_count != 0)
547                 return true;
548         if (br->btrb->pool)
549                 return true;
550         return false;
551 }
552
553 static void merge_input_pool(struct btr_node *btrn, size_t dest_size)
554 {
555         struct btr_buffer_reference *br, *wbr;
556         int num_refs; /* including wrap buffer */
557         char *buf, *buf1, *buf2 = NULL;
558         size_t sz, sz1, sz2 = 0, wsz;
559
560         if (list_empty(&btrn->input_queue))
561                 return;
562
563         num_refs = 0;
564         FOR_EACH_BUFFER_REF(br, btrn) {
565                 num_refs++;
566                 sz = btr_get_buffer_by_reference(br, &buf);
567                 if (br->wrap_count != 0) {
568                         assert(!wbr);
569                         assert(num_refs == 1);
570                         wbr = br;
571                         if (sz >= dest_size)
572                                 return;
573                         continue;
574                 }
575                 if (!buf1) {
576                         buf1 = buf;
577                         sz1 = sz;
578                         goto next;
579                 }
580                 if (buf1 + sz1 == buf) {
581                         sz1 += sz;
582                         goto next;
583                 }
584                 if (!buf2) {
585                         buf2 = buf;
586                         sz2 = sz;
587                         goto next;
588                 }
589                 assert(buf2 + sz2 == buf);
590                 sz2 += sz;
591 next:
592                 if (sz1 + sz2 >= dest_size)
593                         break;
594         }
595         if (!wbr) {
596                 assert(buf1);
597                 if (!buf2) /* nothing to do */
598                         return;
599                 /* make a new wrap buffer combining buf1 and buf 2. */
600                 sz = sz1 + sz2;
601                 buf = para_malloc(sz);
602                 PARA_DEBUG_LOG("merging input buffers: (%p:%zu, %p:%zu) -> %p:%zu\n",
603                         buf1, sz1, buf2, sz2, buf, sz);
604                 memcpy(buf, buf1, sz1);
605                 memcpy(buf + sz1, buf2, sz2);
606                 br = para_calloc(sizeof(*br));
607                 br->btrb = new_btrb(buf, sz);
608                 br->btrb->refcount = 1;
609                 br->consumed = 0;
610                 /* This is a wrap buffer */
611                 br->wrap_count = sz1;
612                 para_list_add(&br->node, &btrn->input_queue);
613                 return;
614         }
615         /*
616          * We already have a wrap buffer, but it is too small. It might be
617          * partially used.
618          */
619         wsz = br_available_bytes(wbr);
620         if (wbr->wrap_count == sz1 && wbr->btrb->size >= sz1 + sz2) /* nothing we can do about it */
621                 return;
622         assert(buf1 && buf2);
623         sz = sz1 + sz2 - wbr->btrb->size; /* amount of new data */
624         wbr->btrb->size += sz;
625         PARA_DEBUG_LOG("increasing wrap buffer to %zu\n", wbr->btrb->size);
626         wbr->btrb->buf = para_realloc(wbr->btrb->buf, wbr->btrb->size);
627         /* copy the new data to the end of the reallocated buffer */
628         assert(sz2 >= sz);
629         memcpy(wbr->btrb->buf + wbr->btrb->size - sz, buf2 + sz2 - sz, sz);
630 }
631
632 /**
633  * Merge the first two input buffers into one.
634  *
635  * This is a quite expensive operation.
636  *
637  * \return The number of buffers that have been available (zero, one or two).
638  */
639 static int merge_input(struct btr_node *btrn)
640 {
641         struct btr_buffer_reference *brs[2], *br;
642         char *bufs[2], *buf;
643         size_t szs[2], sz;
644         int i;
645
646         if (list_empty(&btrn->input_queue))
647                 return 0;
648         if (list_is_singular(&btrn->input_queue))
649                 return 1;
650         i = 0;
651         /* get references to the first two buffers */
652         FOR_EACH_BUFFER_REF(br, btrn) {
653                 brs[i] = br;
654                 szs[i] = btr_get_buffer_by_reference(brs[i], bufs + i);
655                 i++;
656                 if (i == 2)
657                         break;
658         }
659         /* make a new btrb that combines the two buffers and a br to it. */
660         sz = szs[0] + szs[1];
661         buf = para_malloc(sz);
662         PARA_DEBUG_LOG("memory merging input buffers: (%zu, %zu) -> %zu\n",
663                 szs[0], szs[1], sz);
664         memcpy(buf, bufs[0], szs[0]);
665         memcpy(buf + szs[0], bufs[1], szs[1]);
666
667         br = para_calloc(sizeof(*br));
668         br->btrb = new_btrb(buf, sz);
669         br->btrb->refcount = 1;
670
671         /* replace the first two refs by the new one */
672         btr_drop_buffer_reference(brs[0]);
673         btr_drop_buffer_reference(brs[1]);
674         para_list_add(&br->node, &btrn->input_queue);
675         return 2;
676 }
677
678 void btr_merge(struct btr_node *btrn, size_t dest_size)
679 {
680         if (need_buffer_pool_merge(btrn))
681                 return merge_input_pool(btrn, dest_size);
682         for (;;) {
683                 char *buf;
684                 size_t len = btr_next_buffer(btrn, &buf);
685                 if (len >= dest_size)
686                         return;
687                 PARA_DEBUG_LOG("input size = %zu < %zu = dest\n", len, dest_size);
688                 if (merge_input(btrn) < 2)
689                         return;
690         }
691 }
692
693 bool btr_eof(struct btr_node *btrn)
694 {
695         char *buf;
696         size_t len = btr_next_buffer(btrn, &buf);
697
698         return (len == 0 && btr_no_parent(btrn));
699 }
700
701 void log_tree_recursively(struct btr_node *btrn, int loglevel, int depth)
702 {
703         struct btr_node *ch;
704         const char spaces[] = "                 ", *space = spaces + 16 - depth;
705
706         if (depth > 16)
707                 return;
708         para_log(loglevel, "%s%s\n", space, btrn->name);
709         FOR_EACH_CHILD(ch, btrn)
710                 log_tree_recursively(ch, loglevel, depth + 1);
711 }
712
713 void btr_log_tree(struct btr_node *btrn, int loglevel)
714 {
715         return log_tree_recursively(btrn, loglevel, 0);
716 }
717
718 /** 640K ought to be enough for everybody ;) */
719 #define BTRN_MAX_PENDING (640 * 1024)
720
721 int btr_node_status(struct btr_node *btrn, size_t min_iqs,
722         enum btr_node_type type)
723 {
724         size_t iqs;
725
726         if (type != BTR_NT_LEAF) {
727                 if (btr_no_children(btrn))
728                         return -E_BTR_NO_CHILD;
729                 if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
730                         return 0;
731         }
732         if (type != BTR_NT_ROOT) {
733                 if (btr_eof(btrn))
734                         return -E_BTR_EOF;
735                 iqs = btr_get_input_queue_size(btrn);
736                 if (iqs == 0) /* we have a parent, because not eof */
737                         return 0;
738                 if (iqs < min_iqs && !btr_no_parent(btrn))
739                         return 0;
740         }
741         return 1;
742 }
743
744 void btr_get_node_start(struct btr_node *btrn, struct timeval *tv)
745 {
746         *tv = btrn->start;
747 }