Implement buffer tree pool support.
[paraslash.git] / buffer_tree.c
1 #include <regex.h>
2 #include <stdbool.h>
3
4 #include "para.h"
5 #include "list.h"
6 #include "string.h"
7 #include "buffer_tree.h"
8 #include "error.h"
9 #include "sched.h"
10
11 struct btr_pool {
12 char *area_start;
13 char *area_end;
14 char *rhead;
15 char *whead;
16 };
17
18 enum btr_buffer_flags {
19 /* changes the way the buffer is deallocated */
20 BTR_BF_BTR_POOL = 1,
21 };
22
23 struct btr_buffer {
24 char *buf;
25 size_t size;
26 /** The number of references to this buffer. */
27 int refcount;
28 struct btr_pool *pool;
29 };
30
31 struct btr_buffer_reference {
32 struct btr_buffer *btrb;
33 size_t consumed;
34 /* Each buffer reference belongs to the buffer queue list of some buffer tree node. */
35 struct list_head node;
36 size_t wrap_count;
37 };
38
39 struct btr_node {
40 char *name;
41 struct btr_node *parent;
42 /* The position of this btr node in the buffer tree. */
43 struct list_head node;
44 /* The children nodes of this btr node are linked together in a list. */
45 struct list_head children;
46 /* Time of first data transfer. */
47 struct timeval start;
48 /**
49 * The input queue is a list of references to btr buffers. Each item on
50 * the list represents an input buffer which has not been completely
51 * used by this btr node.
52 */
53 struct list_head input_queue;
54 btr_command_handler execute;
55 void *context;
56 };
57
58 struct btr_pool *btr_pool_new(size_t area_size)
59 {
60 struct btr_pool *btrp = para_malloc(sizeof(*btrp));
61
62 btrp->area_start = para_malloc(area_size);
63 btrp->area_end = btrp->area_start + area_size;
64 btrp->rhead = btrp->area_start;
65 btrp->whead = btrp->area_start;
66 return btrp;
67 }
68
69 /* whead = NULL means area full */
70
71 void btr_pool_free(struct btr_pool *btrp)
72 {
73 if (!btrp)
74 return;
75 free(btrp->area_start);
76 free(btrp);
77 }
78
79 size_t btr_pool_size(struct btr_pool *btrp)
80 {
81 return btrp->area_end - btrp->area_start;
82 }
83
84 size_t btr_pool_filled(struct btr_pool *btrp)
85 {
86 if (!btrp->whead)
87 return btr_pool_size(btrp);
88 if (btrp->rhead <= btrp->whead)
89 return btrp->whead - btrp->rhead;
90 return btr_pool_size(btrp) - (btrp->rhead - btrp->whead);
91 }
92
93 size_t btr_pool_unused(struct btr_pool *btrp)
94 {
95 return btr_pool_size(btrp) - btr_pool_filled(btrp);
96 }
97
98 size_t btr_pool_available(struct btr_pool *btrp)
99 {
100 if (!btrp->whead)
101 return 0;
102 if (btrp->rhead <= btrp->whead)
103 return btrp->area_end - btrp->whead;
104 return btrp->rhead - btrp->whead;
105 }
106
107 size_t btr_pool_get_buffer(struct btr_pool *btrp, char **result)
108 {
109 if (result)
110 *result = btrp->whead;
111 return btr_pool_available(btrp);
112 }
113
114 void btr_pool_allocate(struct btr_pool *btrp, size_t size)
115 {
116 char *end;
117
118 if (size == 0)
119 return;
120 //PARA_CRIT_LOG("filled: %zu, alloc %zu\n", btr_pool_filled(btrp), size);
121 assert(size <= btr_pool_available(btrp));
122 end = btrp->whead + size;
123 assert(end <= btrp->area_end);
124
125 if (end == btrp->area_end) {
126 PARA_DEBUG_LOG("end of pool area reached: %p\n", end);
127 end = btrp->area_start;
128 }
129 if (end == btrp->rhead) {
130 PARA_DEBUG_LOG("btrp buffer full\n");
131 end = NULL; /* buffer full */
132 }
133 btrp->whead = end;
134 //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
135 }
136
137 static void btr_pool_deallocate(struct btr_pool *btrp, size_t size)
138 {
139 char *end = btrp->rhead + size;
140
141 //PARA_CRIT_LOG("filled: %zu, dealloc %zu\n", btr_pool_filled(btrp), size);
142 if (size == 0)
143 return;
144 assert(end <= btrp->area_end);
145 assert(size <= btr_pool_filled(btrp));
146 if (end == btrp->area_end)
147 end = btrp->area_start;
148 if (!btrp->whead)
149 btrp->whead = btrp->rhead;
150 btrp->rhead = end;
151 if (btrp->rhead == btrp->whead)
152 btrp->rhead = btrp->whead = btrp->area_start;
153 //PARA_CRIT_LOG("filled: %zu\n", btr_pool_filled(btrp));
154 }
155
156 #define FOR_EACH_CHILD(_tn, _btrn) list_for_each_entry((_tn), \
157 &((_btrn)->children), node)
158 #define FOR_EACH_CHILD_SAFE(_tn, _tmp, _btrn) \
159 list_for_each_entry_safe((_tn), (_tmp), &((_btrn)->children), node)
160
161 #define FOR_EACH_BUFFER_REF(_br, _btrn) \
162 list_for_each_entry((_br), &(_btrn)->input_queue, node)
163 #define FOR_EACH_BUFFER_REF_SAFE(_br, _tmp, _btrn) \
164 list_for_each_entry_safe((_br), (_tmp), &(_btrn)->input_queue, node)
165
166 struct btr_node *btr_new_node(const char *name, struct btr_node *parent,
167 btr_command_handler handler, void *context)
168 {
169 struct btr_node *btrn = para_malloc(sizeof(*btrn));
170
171 btrn->name = para_strdup(name);
172 btrn->parent = parent;
173 btrn->execute = handler;
174 btrn->context = context;
175 btrn->start.tv_sec = 0;
176 btrn->start.tv_usec = 0;
177 if (parent)
178 list_add_tail(&btrn->node, &parent->children);
179 INIT_LIST_HEAD(&btrn->children);
180 INIT_LIST_HEAD(&btrn->input_queue);
181 if (parent)
182 PARA_INFO_LOG("added %s as child of %s\n", name, parent->name);
183 else
184 PARA_INFO_LOG("added %s as btr root\n", name);
185 return btrn;
186 }
187
188 /*
189 * Allocate a new btr buffer.
190 *
191 * The freshly allocated buffer will have a zero refcount.
192 */
193 static struct btr_buffer *new_btrb(char *buf, size_t size)
194 {
195 struct btr_buffer *btrb = para_calloc(sizeof(*btrb));
196
197 btrb->buf = buf;
198 btrb->size = size;
199 return btrb;
200 }
201
202 static void dealloc_buffer(struct btr_buffer *btrb)
203 {
204 if (btrb->pool)
205 btr_pool_deallocate(btrb->pool, btrb->size);
206 else
207 free(btrb->buf);
208 }
209
210 static struct btr_buffer_reference *get_first_input_br(struct btr_node *btrn)
211 {
212 if (list_empty(&btrn->input_queue))
213 return NULL;
214 return list_first_entry(&btrn->input_queue,
215 struct btr_buffer_reference, node);
216 }
217
218 /*
219 * Deallocate the reference, release the resources if refcount drops to zero.
220 */
221 static void btr_drop_buffer_reference(struct btr_buffer_reference *br)
222 {
223 struct btr_buffer *btrb = br->btrb;
224
225 //PARA_CRIT_LOG("dropping buffer reference %p\n", br);
226 list_del(&br->node);
227 free(br);
228 btrb->refcount--;
229 if (btrb->refcount == 0) {
230 dealloc_buffer(btrb);
231 free(btrb);
232 }
233 }
234
235 static void add_btrb_to_children(struct btr_buffer *btrb,
236 struct btr_node *btrn, size_t consumed)
237 {
238 struct btr_node *ch;
239
240 if (btrn->start.tv_sec == 0)
241 btrn->start = *now;
242 FOR_EACH_CHILD(ch, btrn) {
243 struct btr_buffer_reference *br = para_calloc(sizeof(*br));
244 br->btrb = btrb;
245 br->consumed = consumed;
246 list_add_tail(&br->node, &ch->input_queue);
247 btrb->refcount++;
248 if (ch->start.tv_sec == 0)
249 ch->start = *now;
250 }
251 }
252
253 void btr_add_output(char *buf, size_t size, struct btr_node *btrn)
254 {
255 struct btr_buffer *btrb;
256
257 assert(size != 0);
258 if (list_empty(&btrn->children)) {
259 free(buf);
260 return;
261 }
262 btrb = new_btrb(buf, size);
263 add_btrb_to_children(btrb, btrn, 0);
264 }
265
266 void btr_add_output_pool(struct btr_pool *btrp, char *buf, size_t size,
267 struct btr_node *btrn)
268 {
269 struct btr_buffer *btrb;
270
271 assert(size != 0);
272 if (list_empty(&btrn->children)) {
273 btr_pool_deallocate(btrp, size);
274 return;
275 }
276 btrb = new_btrb(buf, size);
277 btrb->pool = btrp;
278 add_btrb_to_children(btrb, btrn, 0);
279 }
280
281 static void btr_pushdown_br(struct btr_buffer_reference *br, struct btr_node *btrn)
282 {
283 add_btrb_to_children(br->btrb, btrn, br->consumed);
284 btr_drop_buffer_reference(br);
285 }
286
287 void btr_pushdown(struct btr_node *btrn)
288 {
289 struct btr_buffer_reference *br, *tmp;
290
291 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
292 btr_pushdown_br(br, btrn);
293 }
294
295 int btr_pushdown_one(struct btr_node *btrn)
296 {
297 struct btr_buffer_reference *br;
298
299 if (list_empty(&btrn->input_queue))
300 return 0;
301 br = list_first_entry(&btrn->input_queue, struct btr_buffer_reference, node);
302 btr_pushdown_br(br, btrn);
303 return 1;
304 }
305
306 /* Return true if this node has no children. */
307 bool btr_no_children(struct btr_node *btrn)
308 {
309 return list_empty(&btrn->children);
310 }
311
312 bool btr_no_parent(struct btr_node *btrn)
313 {
314 return !btrn->parent;
315 }
316
317 bool btr_inplace_ok(struct btr_node *btrn)
318 {
319 if (!btrn->parent)
320 return true;
321 return list_is_singular(&btrn->parent->children);
322 }
323
324 static inline size_t br_available_bytes(struct btr_buffer_reference *br)
325 {
326 return br->btrb->size - br->consumed;
327 }
328
329 size_t btr_get_buffer_by_reference(struct btr_buffer_reference *br, char **buf)
330 {
331 if (buf)
332 *buf = br->btrb->buf + br->consumed;
333 return br_available_bytes(br);
334 }
335
336 /**
337 * \return zero if the input buffer queue is empty.
338 */
339 size_t btr_next_buffer(struct btr_node *btrn, char **bufp)
340 {
341 struct btr_buffer_reference *br;
342 char *buf, *result = NULL;
343 size_t sz, rv = 0;
344
345 FOR_EACH_BUFFER_REF(br, btrn) {
346 sz = btr_get_buffer_by_reference(br, &buf);
347 if (!result) {
348 result = buf;
349 rv = sz;
350 if (!br->btrb->pool)
351 break;
352 continue;
353 }
354 if (!br->btrb->pool)
355 break;
356 if (result + rv != buf) {
357 PARA_DEBUG_LOG("%s: pool merge impossible: %p != %p\n",
358 btrn->name, result + rv, buf);
359 break;
360 }
361 // PARA_CRIT_LOG("%s: inplace merge (%zu, %zu)->%zu\n", btrn->name,
362 // rv, sz, rv + sz);
363 // PARA_CRIT_LOG("%s: inplace merge %p (%zu)\n", btrn->name,
364 // result, sz);
365 rv += sz;
366 }
367 if (bufp)
368 *bufp = result;
369 return rv;
370 }
371
372 void btr_consume(struct btr_node *btrn, size_t numbytes)
373 {
374 struct btr_buffer_reference *br, *tmp;
375 size_t sz;
376
377 if (numbytes == 0)
378 return;
379 br = get_first_input_br(btrn);
380 assert(br);
381
382 //PARA_CRIT_LOG("wrap count: %zu\n", br->wrap_count);
383 if (br->wrap_count == 0) {
384 /*
385 * No wrap buffer. Drop buffer references whose buffer
386 * has been fully used. */
387 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn) {
388 if (br->consumed + numbytes <= br->btrb->size) {
389 br->consumed += numbytes;
390 if (br->consumed == br->btrb->size)
391 btr_drop_buffer_reference(br);
392 return;
393 }
394 numbytes -= br->btrb->size - br->consumed;
395 btr_drop_buffer_reference(br);
396 }
397 assert(true);
398 }
399 /*
400
401 We have a wrap buffer, consume from it. If in total,
402 i.e. including previous calls to brt_consume(), less than
403 wrap_count has been consumed, there's nothing more we can do.
404
405 Otherwise we drop the wrap buffer and consume from subsequent
406 buffers of the input queue the correct amount of bytes. This
407 is the total number of bytes that have been consumed from the
408 wrap buffer.
409 */
410 PARA_DEBUG_LOG("consuming %zu/%zu bytes from wrap buffer\n", numbytes,
411 br_available_bytes(br));
412
413 assert(numbytes <= br_available_bytes(br));
414 if (br->consumed + numbytes < br->wrap_count) {
415 br->consumed += numbytes;
416 return;
417 }
418 PARA_DEBUG_LOG("dropping wrap buffer (%zu bytes)\n", br->btrb->size);
419 /* get rid of the wrap buffer */
420 sz = br->consumed + numbytes;
421 btr_drop_buffer_reference(br);
422 return btr_consume(btrn, sz);
423 }
424
425 static void flush_input_queue(struct btr_node *btrn)
426 {
427 struct btr_buffer_reference *br, *tmp;
428 FOR_EACH_BUFFER_REF_SAFE(br, tmp, btrn)
429 btr_drop_buffer_reference(br);
430 }
431
432 void btr_free_node(struct btr_node *btrn)
433 {
434 if (!btrn)
435 return;
436 free(btrn->name);
437 free(btrn);
438 }
439
440 void btr_remove_node(struct btr_node *btrn)
441 {
442 struct btr_node *ch;
443
444 if (!btrn)
445 return;
446 PARA_NOTICE_LOG("removing btr node %s from buffer tree\n", btrn->name);
447 FOR_EACH_CHILD(ch, btrn)
448 ch->parent = NULL;
449 flush_input_queue(btrn);
450 if (btrn->parent)
451 list_del(&btrn->node);
452 }
453
454 size_t btr_get_input_queue_size(struct btr_node *btrn)
455 {
456 struct btr_buffer_reference *br;
457 size_t size = 0;
458
459 FOR_EACH_BUFFER_REF(br, btrn) {
460 //PARA_CRIT_LOG("size: %zu\n", size);
461 size += br_available_bytes(br);
462 }
463 return size;
464 }
465
466 void btr_splice_out_node(struct btr_node *btrn)
467 {
468 struct btr_node *ch, *tmp;
469
470 assert(btrn);
471 PARA_NOTICE_LOG("splicing out %s\n", btrn->name);
472 btr_pushdown(btrn);
473 if (btrn->parent)
474 list_del(&btrn->node);
475 FOR_EACH_CHILD_SAFE(ch, tmp, btrn) {
476 PARA_INFO_LOG("parent(%s): %s\n", ch->name,
477 btrn->parent? btrn->parent->name : "NULL");
478 ch->parent = btrn->parent;
479 if (btrn->parent)
480 list_move(&ch->node, &btrn->parent->children);
481 }
482 assert(list_empty(&btrn->children));
483 }
484
485 /**
486 * Return the size of the largest input queue.
487 *
488 * Iterates over all children of the given node.
489 */
490 size_t btr_bytes_pending(struct btr_node *btrn)
491 {
492 size_t max_size = 0;
493 struct btr_node *ch;
494
495 FOR_EACH_CHILD(ch, btrn) {
496 size_t size = btr_get_input_queue_size(ch);
497 max_size = PARA_MAX(max_size, size);
498 }
499 return max_size;
500 }
501
502 int btr_exec(struct btr_node *btrn, const char *command, char **value_result)
503 {
504 if (!btrn)
505 return -ERRNO_TO_PARA_ERROR(EINVAL);
506 if (!btrn->execute)
507 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
508 return btrn->execute(btrn, command, value_result);
509 }
510
511 int btr_exec_up(struct btr_node *btrn, const char *command, char **value_result)
512 {
513 int ret;
514
515 for (; btrn; btrn = btrn->parent) {
516 struct btr_node *parent = btrn->parent;
517 if (!parent)
518 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
519 if (!parent->execute)
520 continue;
521 PARA_INFO_LOG("parent: %s, cmd: %s\n", parent->name, command);
522 ret = parent->execute(parent, command, value_result);
523 if (ret == -ERRNO_TO_PARA_ERROR(ENOTSUP))
524 continue;
525 if (ret < 0)
526 return ret;
527 if (value_result && *value_result)
528 PARA_NOTICE_LOG("%s(%s): %s\n", command, parent->name,
529 *value_result);
530 return 1;
531 }
532 return -ERRNO_TO_PARA_ERROR(ENOTSUP);
533 }
534
535 void *btr_context(struct btr_node *btrn)
536 {
537 return btrn->context;
538 }
539
540 static bool need_buffer_pool_merge(struct btr_node *btrn)
541 {
542 struct btr_buffer_reference *br = get_first_input_br(btrn);
543
544 if (!br)
545 return false;
546 if (br->wrap_count != 0)
547 return true;
548 if (br->btrb->pool)
549 return true;
550 return false;
551 }
552
553 static void merge_input_pool(struct btr_node *btrn, size_t dest_size)
554 {
555 struct btr_buffer_reference *br, *wbr;
556 int num_refs; /* including wrap buffer */
557 char *buf, *buf1, *buf2 = NULL;
558 size_t sz, sz1, sz2 = 0, wsz;
559
560 if (list_empty(&btrn->input_queue))
561 return;
562
563 num_refs = 0;
564 FOR_EACH_BUFFER_REF(br, btrn) {
565 num_refs++;
566 sz = btr_get_buffer_by_reference(br, &buf);
567 if (br->wrap_count != 0) {
568 assert(!wbr);
569 assert(num_refs == 1);
570 wbr = br;
571 if (sz >= dest_size)
572 return;
573 continue;
574 }
575 if (!buf1) {
576 buf1 = buf;
577 sz1 = sz;
578 goto next;
579 }
580 if (buf1 + sz1 == buf) {
581 sz1 += sz;
582 goto next;
583 }
584 if (!buf2) {
585 buf2 = buf;
586 sz2 = sz;
587 goto next;
588 }
589 assert(buf2 + sz2 == buf);
590 sz2 += sz;
591 next:
592 if (sz1 + sz2 >= dest_size)
593 break;
594 }
595 if (!wbr) {
596 assert(buf1);
597 if (!buf2) /* nothing to do */
598 return;
599 /* make a new wrap buffer combining buf1 and buf 2. */
600 sz = sz1 + sz2;
601 buf = para_malloc(sz);
602 PARA_DEBUG_LOG("merging input buffers: (%p:%zu, %p:%zu) -> %p:%zu\n",
603 buf1, sz1, buf2, sz2, buf, sz);
604 memcpy(buf, buf1, sz1);
605 memcpy(buf + sz1, buf2, sz2);
606 br = para_calloc(sizeof(*br));
607 br->btrb = new_btrb(buf, sz);
608 br->btrb->refcount = 1;
609 br->consumed = 0;
610 /* This is a wrap buffer */
611 br->wrap_count = sz1;
612 para_list_add(&br->node, &btrn->input_queue);
613 return;
614 }
615 /*
616 * We already have a wrap buffer, but it is too small. It might be
617 * partially used.
618 */
619 wsz = br_available_bytes(wbr);
620 if (wbr->wrap_count == sz1 && wbr->btrb->size >= sz1 + sz2) /* nothing we can do about it */
621 return;
622 assert(buf1 && buf2);
623 sz = sz1 + sz2 - wbr->btrb->size; /* amount of new data */
624 wbr->btrb->size += sz;
625 PARA_DEBUG_LOG("increasing wrap buffer to %zu\n", wbr->btrb->size);
626 wbr->btrb->buf = para_realloc(wbr->btrb->buf, wbr->btrb->size);
627 /* copy the new data to the end of the reallocated buffer */
628 assert(sz2 >= sz);
629 memcpy(wbr->btrb->buf + wbr->btrb->size - sz, buf2 + sz2 - sz, sz);
630 }
631
632 /**
633 * Merge the first two input buffers into one.
634 *
635 * This is a quite expensive operation.
636 *
637 * \return The number of buffers that have been available (zero, one or two).
638 */
639 static int merge_input(struct btr_node *btrn)
640 {
641 struct btr_buffer_reference *brs[2], *br;
642 char *bufs[2], *buf;
643 size_t szs[2], sz;
644 int i;
645
646 if (list_empty(&btrn->input_queue))
647 return 0;
648 if (list_is_singular(&btrn->input_queue))
649 return 1;
650 i = 0;
651 /* get references to the first two buffers */
652 FOR_EACH_BUFFER_REF(br, btrn) {
653 brs[i] = br;
654 szs[i] = btr_get_buffer_by_reference(brs[i], bufs + i);
655 i++;
656 if (i == 2)
657 break;
658 }
659 /* make a new btrb that combines the two buffers and a br to it. */
660 sz = szs[0] + szs[1];
661 buf = para_malloc(sz);
662 PARA_DEBUG_LOG("memory merging input buffers: (%zu, %zu) -> %zu\n",
663 szs[0], szs[1], sz);
664 memcpy(buf, bufs[0], szs[0]);
665 memcpy(buf + szs[0], bufs[1], szs[1]);
666
667 br = para_calloc(sizeof(*br));
668 br->btrb = new_btrb(buf, sz);
669 br->btrb->refcount = 1;
670
671 /* replace the first two refs by the new one */
672 btr_drop_buffer_reference(brs[0]);
673 btr_drop_buffer_reference(brs[1]);
674 para_list_add(&br->node, &btrn->input_queue);
675 return 2;
676 }
677
678 void btr_merge(struct btr_node *btrn, size_t dest_size)
679 {
680 if (need_buffer_pool_merge(btrn))
681 return merge_input_pool(btrn, dest_size);
682 for (;;) {
683 char *buf;
684 size_t len = btr_next_buffer(btrn, &buf);
685 if (len >= dest_size)
686 return;
687 PARA_DEBUG_LOG("input size = %zu < %zu = dest\n", len, dest_size);
688 if (merge_input(btrn) < 2)
689 return;
690 }
691 }
692
693 bool btr_eof(struct btr_node *btrn)
694 {
695 char *buf;
696 size_t len = btr_next_buffer(btrn, &buf);
697
698 return (len == 0 && btr_no_parent(btrn));
699 }
700
701 void log_tree_recursively(struct btr_node *btrn, int loglevel, int depth)
702 {
703 struct btr_node *ch;
704 const char spaces[] = " ", *space = spaces + 16 - depth;
705
706 if (depth > 16)
707 return;
708 para_log(loglevel, "%s%s\n", space, btrn->name);
709 FOR_EACH_CHILD(ch, btrn)
710 log_tree_recursively(ch, loglevel, depth + 1);
711 }
712
713 void btr_log_tree(struct btr_node *btrn, int loglevel)
714 {
715 return log_tree_recursively(btrn, loglevel, 0);
716 }
717
718 /** 640K ought to be enough for everybody ;) */
719 #define BTRN_MAX_PENDING (640 * 1024)
720
721 int btr_node_status(struct btr_node *btrn, size_t min_iqs,
722 enum btr_node_type type)
723 {
724 size_t iqs;
725
726 if (type != BTR_NT_LEAF) {
727 if (btr_no_children(btrn))
728 return -E_BTR_NO_CHILD;
729 if (btr_bytes_pending(btrn) > BTRN_MAX_PENDING)
730 return 0;
731 }
732 if (type != BTR_NT_ROOT) {
733 if (btr_eof(btrn))
734 return -E_BTR_EOF;
735 iqs = btr_get_input_queue_size(btrn);
736 if (iqs == 0) /* we have a parent, because not eof */
737 return 0;
738 if (iqs < min_iqs && !btr_no_parent(btrn))
739 return 0;
740 }
741 return 1;
742 }
743
744 void btr_get_node_start(struct btr_node *btrn, struct timeval *tv)
745 {
746 *tv = btrn->start;
747 }