]> git.tuebingen.mpg.de Git - osl.git/blob - osl.c
Always compile with -fvisibility=hidden.
[osl.git] / osl.c
1 /*
2  * Copyright (C) 2007-2008 Andre Noll <maan@systemlinux.org>
3  *
4  * Licensed under the GPL v2. For licencing details see COPYING.
5  */
6
7 /** \file osl.c Object storage layer functions. */
8 #include <dirent.h> /* readdir() */
9 #include <assert.h>
10
11
12 #include "log.h"
13 #include "error.h"
14 #include "fd.h"
15 #include "list.h"
16 #include "osl_core.h"
17
18 /**
19  * Allocate a sufficiently large string and print into it.
20  *
21  * \param fmt A usual format string.
22  *
23  * Produce output according to \p fmt. No artificial bound on the length of the
24  * resulting string is imposed.
25  *
26  * \return This function either returns a pointer to a string that must be
27  * freed by the caller or \p NULL if memory allocation failed.
28  *
29  * \sa printf(3).
30  */
31 static __must_check __printf_1_2 __malloc char *make_message(const char *fmt, ...)
32 {
33         int n;
34         size_t size = 100;
35         char *p = malloc(size);
36
37         if (!p)
38                 return NULL;
39         while (1) {
40                 char *q;
41                 va_list ap;
42                 /* Try to print in the allocated space. */
43                 va_start(ap, fmt);
44                 n = vsnprintf(p, size, fmt, ap);
45                 va_end(ap);
46                 /* If that worked, return the string. */
47                 if (n > -1 && n < size)
48                         break;
49                 /* Else try again with more space. */
50                 if (n > -1) /* glibc 2.1 */
51                         size = n + 1; /* precisely what is needed */
52                 else /* glibc 2.0 */
53                         size *= 2; /* twice the old size */
54                 q = realloc(p, size);
55                 if (!q) {
56                         free(p);
57                         return NULL;
58                 }
59         }
60         return p;
61 }
62
63 /**
64  * The log function.
65  *
66  * \param ll Loglevel.
67  * \param fmt Usual format string.
68  *
69  * All XXX_LOG() macros use this function.
70  */
71 __printf_2_3 void __log(int ll, const char* fmt,...)
72 {
73         va_list argp;
74         FILE *outfd;
75         struct tm *tm;
76         time_t t1;
77         char str[255] = "";
78
79         if (ll < 2)
80                 return;
81         outfd = stderr;
82         time(&t1);
83         tm = localtime(&t1);
84         strftime(str, sizeof(str), "%b %d %H:%M:%S", tm);
85         fprintf(outfd, "%s ", str);
86         va_start(argp, fmt);
87         vfprintf(outfd, fmt, argp);
88         va_end(argp);
89 }
90
91 /**
92  * A wrapper for lseek(2).
93  *
94  * \param fd The file descriptor whose offset is to be to repositioned.
95  * \param offset A value-result parameter.
96  * \param whence Usual repositioning directive.
97  *
98  * Reposition the offset of the file descriptor \a fd to the argument \a offset
99  * according to the directive \a whence. Upon successful return, \a offset
100  * contains the resulting offset location as measured in bytes from the
101  * beginning of the file.
102  *
103  * \return Positive on success. Otherwise, the function returns \p -E_LSEEK.
104  *
105  * \sa lseek(2).
106  */
107 static int __lseek(int fd, off_t *offset, int whence)
108 {
109         *offset = lseek(fd, *offset, whence);
110         int ret = -E_LSEEK;
111         if (*offset == -1)
112                 return ret;
113         return 1;
114 }
115
116 /**
117  * Wrapper for the write system call.
118  *
119  * \param fd The file descriptor to write to.
120  * \param buf The buffer to write.
121  * \param size The length of \a buf in bytes.
122  *
123  * This function writes out the given buffer and retries if an interrupt
124  * occurred during the write.
125  *
126  * \return On success, the number of bytes written is returned, otherwise, the
127  * function returns \p -E_WRITE.
128  *
129  * \sa write(2).
130  */
131 static ssize_t __write(int fd, const void *buf, size_t size)
132 {
133         ssize_t ret;
134
135         for (;;) {
136                 ret = write(fd, buf, size);
137                 if ((ret < 0) && (errno == EAGAIN || errno == EINTR))
138                         continue;
139                 return ret >= 0? ret : -E_WRITE;
140         }
141 }
142
143 /**
144  * Write the whole buffer to a file descriptor.
145  *
146  * \param fd The file descriptor to write to.
147  * \param buf The buffer to write.
148  * \param size The length of \a buf in bytes.
149  *
150  * This function writes the given buffer and continues on short writes and
151  * when interrupted by a signal.
152  *
153  * \return Positive on success, negative on errors. Possible errors: any
154  * errors returned by para_write().
155  *
156  * \sa para_write().
157  */
158 static ssize_t write_all(int fd, const void *buf, size_t size)
159 {
160 //      DEBUG_LOG("writing %zu bytes\n", size);
161         const char *b = buf;
162         while (size) {
163                 ssize_t ret = __write(fd, b, size);
164 //              DEBUG_LOG("ret: %zd\n", ret);
165                 if (ret < 0)
166                         return ret;
167                 b += ret;
168                 size -= ret;
169         }
170         return 1;
171 }
172 /**
173  * Open a file, write the given buffer and close the file.
174  *
175  * \param filename Full path to the file to open.
176  * \param buf The buffer to write to the file.
177  * \param size The size of \a buf.
178  *
179  * \return Standard.
180  */
181 static int write_file(const char *filename, const void *buf, size_t size)
182 {
183         int ret, fd;
184
185         ret = osl_open(filename, O_WRONLY | O_CREAT | O_EXCL, 0644);
186         if (ret < 0)
187                 return ret;
188         fd = ret;
189         ret = write_all(fd, buf, size);
190         if (ret < 0)
191                 goto out;
192         ret = 1;
193 out:
194         close(fd);
195         return ret;
196 }
197
198 static int append_file(const char *filename, char *header, size_t header_size,
199         char *data, size_t data_size, uint32_t *new_pos)
200 {
201         int ret, fd;
202
203 //      DEBUG_LOG("appending %zu  + %zu bytes\n", header_size, data_size);
204         ret = osl_open(filename, O_WRONLY | O_CREAT | O_APPEND, 0644);
205         if (ret < 0)
206                 return ret;
207         fd = ret;
208         if (header && header_size) {
209                 ret = write_all(fd, header, header_size);
210                 if (ret < 0)
211                         goto out;
212         }
213         ret = write_all(fd, data, data_size);
214         if (ret < 0)
215                 goto out;
216         if (new_pos) {
217                 off_t offset = 0;
218                 ret = __lseek(fd, &offset, SEEK_END);
219                 if (ret < 0)
220                         goto out;
221 //              DEBUG_LOG("new file size: " FMT_OFF_T "\n", offset);
222                 *new_pos = offset;
223         }
224         ret = 1;
225 out:
226         close(fd);
227         return ret;
228 }
229
230 static int verify_name(const char *name)
231 {
232         if (!name)
233                 return -E_BAD_NAME;
234         if (!*name)
235                 return -E_BAD_NAME;
236         if (strchr(name, '/'))
237                 return -E_BAD_NAME;
238         if (!strcmp(name, ".."))
239                 return -E_BAD_NAME;
240         if (!strcmp(name, "."))
241                 return -E_BAD_NAME;
242         return 1;
243 }
244
245 /**
246  * Compare two osl objects pointing to unsigned integers of 32 bit size.
247  *
248  * \param obj1 Pointer to the first integer.
249  * \param obj2 Pointer to the second integer.
250  *
251  * \return The values required for an osl compare function.
252  *
253  * \sa osl_compare_func, osl_hash_compare().
254  */
255 int uint32_compare(const struct osl_object *obj1, const struct osl_object *obj2)
256 {
257         uint32_t d1 = read_u32((const char *)obj1->data);
258         uint32_t d2 = read_u32((const char *)obj2->data);
259
260         if (d1 < d2)
261                 return 1;
262         if (d1 > d2)
263                 return -1;
264         return 0;
265 }
266
267 /**
268  * Compare two osl objects pointing to hash values.
269  *
270  * \param obj1 Pointer to the first hash object.
271  * \param obj2 Pointer to the second hash object.
272  *
273  * \return The values required for an osl compare function.
274  *
275  * \sa osl_compare_func, uint32_compare().
276  */
277 int osl_hash_compare(const struct osl_object *obj1, const struct osl_object *obj2)
278 {
279         return hash_compare((HASH_TYPE *)obj1->data, (HASH_TYPE *)obj2->data);
280 }
281
282 static char *disk_storage_dirname(const struct osl_table *t, unsigned col_num,
283                 const char *ds_name)
284 {
285         char *dirname, *column_name = column_filename(t, col_num);
286
287         if (!column_name)
288                 return NULL;
289         if (!(t->desc->flags & OSL_LARGE_TABLE))
290                 return column_name;
291         dirname = make_message("%s/%.2s", column_name, ds_name);
292         free(column_name);
293         return dirname;
294 }
295
296 static char *disk_storage_name_of_object(const struct osl_table *t,
297         const struct osl_object *obj)
298 {
299         HASH_TYPE hash[HASH_SIZE];
300         hash_object(obj, hash);
301         return disk_storage_name_of_hash(t, hash);
302 }
303
304 static int disk_storage_name_of_row(const struct osl_table *t,
305                 const struct osl_row *row, char **name)
306 {
307         struct osl_object obj;
308         int ret = osl_get_object(t, row, t->disk_storage_name_column, &obj);
309
310         if (ret < 0)
311                 return ret;
312         *name = disk_storage_name_of_object(t, &obj);
313         if (*name)
314                 return 1;
315         return -ERRNO_TO_ERROR(ENOMEM);
316 }
317
318 static void column_name_hash(const char *col_name, HASH_TYPE *hash)
319 {
320         hash_function(col_name, strlen(col_name), hash);
321 }
322
323 static int init_column_descriptions(struct osl_table *t)
324 {
325         int i, j, ret;
326         const struct osl_column_description *cd;
327
328         ret = -E_BAD_TABLE_DESC;
329         ret = verify_name(t->desc->name);
330         if (ret < 0)
331                 goto err;
332         ret = -E_BAD_DB_DIR;
333         if (!t->desc->dir && (t->num_disk_storage_columns || t->num_mapped_columns))
334                 goto err;
335         /* the size of the index header without column descriptions */
336         t->index_header_size = IDX_COLUMN_DESCRIPTIONS;
337         FOR_EACH_COLUMN(i, t->desc, cd) {
338                 struct osl_column *col = t->columns + i;
339                 if (cd->storage_flags & OSL_RBTREE) {
340                         if (!cd->compare_function)
341                                 return -E_NO_COMPARE_FUNC;
342                 }
343                 if (cd->storage_type == OSL_NO_STORAGE)
344                         continue;
345                 ret = -E_NO_COLUMN_NAME;
346                 if (!cd->name || !cd->name[0])
347                         goto err;
348                 ret = verify_name(cd->name);
349                 if (ret < 0)
350                         goto err;
351                 t->index_header_size += index_column_description_size(cd->name);
352                 column_name_hash(cd->name, col->name_hash);
353                 ret = -E_DUPLICATE_COL_NAME;
354                 for (j = i + 1; j < t->desc->num_columns; j++) {
355                         const char *name2 = get_column_description(t->desc,
356                                 j)->name;
357                         if (cd->name && name2 && !strcmp(cd->name, name2))
358                                 goto err;
359                 }
360         }
361         return 1;
362 err:
363         return ret;
364 }
365
366 /**
367  * Initialize a struct table from given table description.
368  *
369  * \param desc The description of the osl table.
370  * \param table_ptr Result is returned here.
371  *
372  * This function performs several sanity checks on \p desc and returns if any
373  * of these tests fail. On success, a struct \p osl_table is allocated and
374  * initialized with data derived from \p desc.
375  *
376  * \return Positive on success, negative on errors. Possible errors include: \p
377  * E_BAD_TABLE_DESC, \p E_NO_COLUMN_DESC, \p E_NO_COLUMNS, \p
378  * E_BAD_STORAGE_TYPE, \p E_BAD_STORAGE_FLAGS, \p E_BAD_STORAGE_SIZE, \p
379  * E_NO_UNIQUE_RBTREE_COLUMN, \p E_NO_RBTREE_COL.
380  *
381  * \sa struct osl_table.
382  */
383 int init_table_structure(const struct osl_table_description *desc,
384                 struct osl_table **table_ptr)
385 {
386         const struct osl_column_description *cd;
387         struct osl_table *t = calloc(1, sizeof(*t));
388         int i, ret = -ERRNO_TO_ERROR(ENOMEM), have_disk_storage_name_column = 0;
389
390         if (!t)
391                 return ret;
392         ret = -E_BAD_TABLE_DESC;
393         if (!desc)
394                 goto err;
395         DEBUG_LOG("creating table structure for '%s' from table "
396                 "description\n", desc->name);
397         ret = -E_NO_COLUMN_DESC;
398         if (!desc->column_descriptions)
399                 goto err;
400         ret = -E_NO_COLUMNS;
401         if (!desc->num_columns)
402                 goto err;
403         ret = -ERRNO_TO_ERROR(ENOMEM);
404         t->columns = calloc(desc->num_columns, sizeof(struct osl_column));
405         if (!t->columns)
406                 goto err;
407         t->desc = desc;
408         FOR_EACH_COLUMN(i, t->desc, cd) {
409                 enum osl_storage_type st = cd->storage_type;
410                 enum osl_storage_flags sf = cd->storage_flags;
411                 struct osl_column *col = &t->columns[i];
412
413                 ret = -E_BAD_STORAGE_TYPE;
414                 if (st != OSL_MAPPED_STORAGE && st != OSL_DISK_STORAGE
415                                 && st != OSL_NO_STORAGE)
416                         goto err;
417                 ret = -E_BAD_STORAGE_FLAGS;
418                 if (st == OSL_DISK_STORAGE && sf & OSL_RBTREE)
419                         goto err;
420                 ret = -E_BAD_STORAGE_SIZE;
421                 if (sf & OSL_FIXED_SIZE && !cd->data_size)
422                         goto err;
423                 switch (st) {
424                 case OSL_DISK_STORAGE:
425                         t->num_disk_storage_columns++;
426                         break;
427                 case OSL_MAPPED_STORAGE:
428                         t->num_mapped_columns++;
429                         col->index_offset = t->row_index_size;
430                         t->row_index_size += 8;
431                         break;
432                 case OSL_NO_STORAGE:
433                         col->volatile_num = t->num_volatile_columns;
434                         t->num_volatile_columns++;
435                         break;
436                 }
437                 if (sf & OSL_RBTREE) {
438                         col->rbtree_num = t->num_rbtrees;
439                         t->num_rbtrees++;
440                         if ((sf & OSL_UNIQUE) && (st == OSL_MAPPED_STORAGE)) {
441                                 if (!have_disk_storage_name_column)
442                                         t->disk_storage_name_column = i;
443                                 have_disk_storage_name_column = 1;
444                         }
445                 }
446         }
447         ret = -E_NO_UNIQUE_RBTREE_COLUMN;
448         if (t->num_disk_storage_columns && !have_disk_storage_name_column)
449                 goto err;
450         ret = -E_NO_RBTREE_COL;
451         if (!t->num_rbtrees)
452                 goto err;
453         /* success */
454         DEBUG_LOG("OK. Index entry size: %u\n", t->row_index_size);
455         ret = init_column_descriptions(t);
456         if (ret < 0)
457                 goto err;
458         *table_ptr = t;
459         return 1;
460 err:
461         free(t->columns);
462         free(t);
463         return ret;
464 }
465
466 /**
467  * Read the table description from index header.
468  *
469  * \param map The memory mapping of the index file.
470  * \param desc The values found in the index header are returned here.
471  *
472  * Read the index header, check for the paraslash magic string and the table version number.
473  * Read all information stored in the index header into \a desc.
474  *
475  * \return Positive on success, negative on errors.
476  *
477  * \sa struct osl_table_description, osl_create_table.
478  */
479 int read_table_desc(struct osl_object *map, struct osl_table_description *desc)
480 {
481         char *buf = map->data;
482         uint8_t version;
483         uint16_t header_size;
484         int ret, i;
485         unsigned offset;
486         struct osl_column_description *cd;
487
488         if (map->size < MIN_INDEX_HEADER_SIZE(1))
489                 return -E_SHORT_TABLE;
490         if (strncmp(buf + IDX_PARA_MAGIC, PARA_MAGIC, strlen(PARA_MAGIC)))
491                 return -E_NO_MAGIC;
492         version = read_u8(buf + IDX_VERSION);
493         if (version < MIN_TABLE_VERSION || version > MAX_TABLE_VERSION)
494                 return -E_VERSION_MISMATCH;
495         desc->flags = read_u8(buf + IDX_TABLE_FLAGS);
496         desc->num_columns = read_u16(buf + IDX_NUM_COLUMNS);
497         DEBUG_LOG("%u columns\n", desc->num_columns);
498         if (!desc->num_columns)
499                 return -E_NO_COLUMNS;
500         header_size = read_u16(buf + IDX_HEADER_SIZE);
501         if (map->size < header_size)
502                 return -E_BAD_SIZE;
503         desc->column_descriptions = calloc(desc->num_columns,
504                 sizeof(struct osl_column_description));
505         if (!desc->column_descriptions)
506                 return -ERRNO_TO_ERROR(ENOMEM);
507         offset = IDX_COLUMN_DESCRIPTIONS;
508         FOR_EACH_COLUMN(i, desc, cd) {
509                 char *null_byte;
510
511                 ret = -E_SHORT_TABLE;
512                 if (map->size < offset + MIN_IDX_COLUMN_DESCRIPTION_SIZE) {
513                         ERROR_LOG("map size = %zu < %u = offset + min desc size\n",
514                                 map->size, offset + MIN_IDX_COLUMN_DESCRIPTION_SIZE);
515                         goto err;
516                 }
517                 cd->storage_type = read_u16(buf + offset + IDX_CD_STORAGE_TYPE);
518                 cd->storage_flags = read_u16(buf + offset +
519                         IDX_CD_STORAGE_FLAGS);
520                 cd->data_size = read_u32(buf + offset + IDX_CD_DATA_SIZE);
521                 null_byte = memchr(buf + offset + IDX_CD_NAME, '\0',
522                         map->size - offset - IDX_CD_NAME);
523                 ret = -E_INDEX_CORRUPTION;
524                 if (!null_byte)
525                         goto err;
526                 ret = -ERRNO_TO_ERROR(ENOMEM);
527                 cd->name = strdup(buf + offset + IDX_CD_NAME);
528                 if (!cd->name)
529                         goto err;
530                 offset += index_column_description_size(cd->name);
531         }
532         if (offset != header_size) {
533                 ret = -E_INDEX_CORRUPTION;
534                 ERROR_LOG("real header size = %u != %u = stored header size\n",
535                         offset, header_size);
536                 goto err;
537         }
538         return 1;
539 err:
540         FOR_EACH_COLUMN(i, desc, cd)
541                 free(cd->name);
542         return ret;
543 }
544
545 /*
546  * check whether the table description given by \p t->desc matches the on-disk
547  * table structure stored in the index of \a t.
548  */
549 static int compare_table_descriptions(struct osl_table *t)
550 {
551         int i, ret;
552         struct osl_table_description desc;
553         const struct osl_column_description *cd1, *cd2;
554
555         /* read the on-disk structure into desc */
556         ret = read_table_desc(&t->index_map, &desc);
557         if (ret < 0)
558                 return ret;
559         ret = -E_BAD_TABLE_FLAGS;
560         if (desc.flags != t->desc->flags)
561                 goto out;
562         ret = -E_BAD_COLUMN_NUM;
563         if (desc.num_columns > t->desc->num_columns)
564                 goto out;
565         if (desc.num_columns < t->desc->num_columns) {
566                 struct osl_column_description *cd;
567                 unsigned diff = t->desc->num_columns - desc.num_columns;
568                 INFO_LOG("extending table by %u volatile columns\n", diff);
569                 ret = -ERRNO_TO_ERROR(ENOMEM);
570                 desc.column_descriptions = realloc(desc.column_descriptions,
571                         t->desc->num_columns * sizeof(struct osl_column_description));
572                 if (!desc.column_descriptions)
573                         goto out;
574                 for (i = desc.num_columns; i < t->desc->num_columns; i++) {
575                         cd = get_column_description(&desc, i);
576                         cd->storage_type = OSL_NO_STORAGE;
577                         cd->name = NULL;
578                 }
579                 desc.num_columns += diff;
580         }
581         FOR_EACH_COLUMN(i, t->desc, cd1) {
582                 cd2 = get_column_description(&desc, i);
583                 ret = -E_BAD_STORAGE_TYPE;
584                 if (cd1->storage_type != cd2->storage_type)
585                         goto out;
586                 if (cd1->storage_type == OSL_NO_STORAGE)
587                         continue;
588                 ret = -E_BAD_STORAGE_FLAGS;
589                 if (cd1->storage_flags != cd2->storage_flags) {
590                         ERROR_LOG("sf1 = %u != %u = sf2\n",
591                                 cd1->storage_flags, cd2->storage_flags);
592                         goto out;
593                 }
594                 ret = -E_BAD_DATA_SIZE;
595                 if (cd1->storage_flags & OSL_FIXED_SIZE)
596                         if (cd1->data_size != cd2->data_size)
597                                 goto out;
598                 ret = -E_BAD_COLUMN_NAME;
599                 if (strcmp(cd1->name, cd2->name))
600                         goto out;
601         }
602         DEBUG_LOG("table description of '%s' matches on-disk data, good\n",
603                 t->desc->name);
604         ret = 1;
605 out:
606         FOR_EACH_COLUMN(i, &desc, cd1)
607                 free(cd1->name);
608         free(desc.column_descriptions);
609         return ret;
610 }
611
612 static int create_table_index(struct osl_table *t)
613 {
614         char *buf, *filename;
615         int i, ret;
616         size_t size = t->index_header_size;
617         const struct osl_column_description *cd;
618         unsigned offset;
619
620         INFO_LOG("creating %zu byte index for table %s\n", size,
621                 t->desc->name);
622         buf = calloc(1, size);
623         if (!buf)
624                 return -ERRNO_TO_ERROR(ENOMEM);
625         sprintf(buf + IDX_PARA_MAGIC, "%s", PARA_MAGIC);
626         write_u8(buf + IDX_TABLE_FLAGS, t->desc->flags);
627         write_u8(buf + IDX_DIRTY_FLAG, 0);
628         write_u8(buf + IDX_VERSION, CURRENT_TABLE_VERSION);
629         write_u16(buf + IDX_NUM_COLUMNS, t->num_mapped_columns + t->num_disk_storage_columns);
630         write_u16(buf + IDX_HEADER_SIZE, t->index_header_size);
631         offset = IDX_COLUMN_DESCRIPTIONS;
632         FOR_EACH_COLUMN(i, t->desc, cd) {
633                 /* no need to store info about volatile storage */
634                 if (cd->storage_type == OSL_NO_STORAGE)
635                         continue;
636                 write_u16(buf + offset + IDX_CD_STORAGE_TYPE,
637                         cd->storage_type);
638                 write_u16(buf + offset + IDX_CD_STORAGE_FLAGS,
639                         cd->storage_flags);
640                 if (cd->storage_flags & OSL_FIXED_SIZE)
641                         write_u32(buf + offset + IDX_CD_DATA_SIZE,
642                                 cd->data_size);
643                 strcpy(buf + offset + IDX_CD_NAME, cd->name);
644                 offset += index_column_description_size(cd->name);
645         }
646         assert(offset = size);
647         filename = index_filename(t->desc);
648         if (filename)
649                 ret = write_file(filename, buf, size);
650         else
651                 ret = -ERRNO_TO_ERROR(ENOMEM);
652         free(buf);
653         free(filename);
654         return ret;
655 }
656
657 /**
658  * Create a new osl table.
659  *
660  * \param desc Pointer to the table description.
661  *
662  * \return Standard.
663  */
664 __export int osl_create_table(const struct osl_table_description *desc)
665 {
666         const struct osl_column_description *cd;
667         char *table_dir = NULL, *filename;
668         struct osl_table *t;
669         int i, ret = init_table_structure(desc, &t);
670
671         if (ret < 0)
672                 return ret;
673         INFO_LOG("creating %s\n", desc->name);
674         FOR_EACH_COLUMN(i, t->desc, cd) {
675                 if (cd->storage_type == OSL_NO_STORAGE)
676                         continue;
677                 if (!table_dir) {
678                         ret = para_mkdir(desc->dir, 0777);
679                         if (ret < 0 && !is_errno(-ret, EEXIST))
680                                 goto out;
681                         table_dir = make_message("%s/%s", desc->dir,
682                                 desc->name);
683                         ret = -ERRNO_TO_ERROR(ENOMEM);
684                         if (!table_dir)
685                                 goto out;
686                         ret = para_mkdir(table_dir, 0777);
687                         if (ret < 0)
688                                 goto out;
689                 }
690                 ret = -ERRNO_TO_ERROR(ENOMEM);
691                 filename = column_filename(t, i);
692                 if (!filename)
693                         goto out;
694                 INFO_LOG("filename: %s\n", filename);
695                 if (cd->storage_type == OSL_MAPPED_STORAGE) {
696                         ret = osl_open(filename, O_RDWR | O_CREAT | O_EXCL,
697                                 0644);
698                         free(filename);
699                         if (ret < 0)
700                                 goto out;
701                         close(ret);
702                         continue;
703                 }
704                 /* DISK STORAGE */
705                 ret = para_mkdir(filename, 0777);
706                 free(filename);
707                 if (ret < 0)
708                         goto out;
709         }
710         if (t->num_mapped_columns) {
711                 ret = create_table_index(t);
712                 if (ret < 0)
713                         goto out;
714         }
715         ret = 1;
716 out:
717         free(table_dir);
718         free(t->columns);
719         free(t);
720         return ret;
721 }
722
723 static int table_is_dirty(struct osl_table *t)
724 {
725         char *buf = (char *)t->index_map.data + IDX_DIRTY_FLAG;
726         uint8_t dirty = read_u8(buf) & 0x1;
727         return !!dirty;
728 }
729
730 static void mark_table_dirty(struct osl_table *t)
731 {
732         char *buf = (char *)t->index_map.data + IDX_DIRTY_FLAG;
733         write_u8(buf, read_u8(buf) | 1);
734 }
735
736 static void mark_table_clean(struct osl_table *t)
737 {
738         char *buf = (char *)t->index_map.data + IDX_DIRTY_FLAG;
739         write_u8(buf, read_u8(buf) & 0xfe);
740 }
741
742 static void unmap_column(struct osl_table *t, unsigned col_num)
743 {
744         struct osl_object map = t->columns[col_num].data_map;
745         int ret;
746         if (!map.data)
747                 return;
748         ret = para_munmap(map.data, map.size);
749         assert(ret > 0);
750         map.data = NULL;
751 }
752
753 /**
754  * Unmap all mapped files of an osl table.
755  *
756  * \param t Pointer to a mapped table.
757  * \param flags Options for unmapping.
758  *
759  * \return Positive on success, negative on errors.
760  *
761  * \sa map_table(), enum osl_close_flags, para_munmap().
762  */
763 int unmap_table(struct osl_table *t, enum osl_close_flags flags)
764 {
765         unsigned i;
766         const struct osl_column_description *cd;
767         int ret;
768
769         if (!t->num_mapped_columns) /* can this ever happen? */
770                 return 1;
771         DEBUG_LOG("unmapping table '%s'\n", t->desc->name);
772         if (!t->index_map.data)
773                 return -E_NOT_MAPPED;
774         if (flags & OSL_MARK_CLEAN)
775                 mark_table_clean(t);
776         ret = para_munmap(t->index_map.data, t->index_map.size);
777         if (ret < 0)
778                 return ret;
779         t->index_map.data = NULL;
780         if (!t->num_rows)
781                 return 1;
782         FOR_EACH_MAPPED_COLUMN(i, t, cd)
783                 unmap_column(t, i);
784         return 1;
785 }
786
787 static int map_column(struct osl_table *t, unsigned col_num)
788 {
789         struct stat statbuf;
790         char *filename = column_filename(t, col_num);
791         int ret = -E_STAT;
792
793         if (!filename)
794                 return -ERRNO_TO_ERROR(ENOMEM);
795         if (stat(filename, &statbuf) < 0) {
796                 free(filename);
797                 return ret;
798         }
799         if (!(S_IFREG & statbuf.st_mode)) {
800                 free(filename);
801                 return ret;
802         }
803         ret = mmap_full_file(filename, O_RDWR,
804                 &t->columns[col_num].data_map.data,
805                 &t->columns[col_num].data_map.size,
806                 NULL);
807         free(filename);
808         return ret;
809 }
810
811 /**
812  * Map the index file and all columns of type \p OSL_MAPPED_STORAGE into memory.
813  *
814  * \param t Pointer to an initialized table structure.
815  * \param flags Mapping options.
816  *
817  * \return Negative return value on errors; on success the number of rows
818  * (including invalid rows) is returned.
819  *
820  * \sa unmap_table(), enum map_table_flags, osl_open_table(), mmap(2).
821  */
822 int map_table(struct osl_table *t, enum map_table_flags flags)
823 {
824         char *filename;
825         const struct osl_column_description *cd;
826         int i = 0, ret, num_rows = 0;
827
828         if (!t->num_mapped_columns)
829                 return 0;
830         if (t->index_map.data)
831                 return -E_ALREADY_MAPPED;
832         filename = index_filename(t->desc);
833         if (!filename)
834                 return -ERRNO_TO_ERROR(ENOMEM);
835         DEBUG_LOG("mapping table '%s' (index: %s)\n", t->desc->name, filename);
836         ret = mmap_full_file(filename, flags & MAP_TBL_FL_MAP_RDONLY?
837                 O_RDONLY : O_RDWR, &t->index_map.data, &t->index_map.size, NULL);
838         free(filename);
839         if (ret < 0)
840                 return ret;
841         if (flags & MAP_TBL_FL_VERIFY_INDEX) {
842                 ret = compare_table_descriptions(t);
843                 if (ret < 0)
844                         goto err;
845         }
846         ret = -E_BUSY;
847         if (!(flags & MAP_TBL_FL_IGNORE_DIRTY)) {
848                 if (table_is_dirty(t)) {
849                         ERROR_LOG("%s is dirty\n", t->desc->name);
850                         goto err;
851                 }
852         }
853         mark_table_dirty(t);
854         num_rows = table_num_rows(t);
855         if (!num_rows)
856                 return num_rows;
857         /* map data files */
858         FOR_EACH_MAPPED_COLUMN(i, t, cd) {
859                 ret = map_column(t, i);
860                 if (ret < 0)
861                         goto err;
862         }
863         return num_rows;
864 err:    /* unmap what is already mapped */
865         for (i--; i >= 0; i--) {
866                 struct osl_object map = t->columns[i].data_map;
867                 para_munmap(map.data, map.size);
868                 map.data = NULL;
869         }
870         para_munmap(t->index_map.data, t->index_map.size);
871         t->index_map.data = NULL;
872         return ret;
873 }
874
875 /**
876  * Retrieve a mapped object by row and column number.
877  *
878  * \param t Pointer to an open osl table.
879  * \param col_num Number of the mapped column containing the object to retrieve.
880  * \param row_num Number of the row containing the object to retrieve.
881  * \param obj The result is returned here.
882  *
883  * It is considered an error if \a col_num does not refer to a column
884  * of storage type \p OSL_MAPPED_STORAGE.
885  *
886  * \return Positive on success, negative on errors. Possible errors include:
887  * \p E_BAD_ROW_NUM, \p E_INVALID_OBJECT.
888  *
889  * \sa osl_storage_type.
890  */
891 int get_mapped_object(const struct osl_table *t, unsigned col_num,
892         uint32_t row_num, struct osl_object *obj)
893 {
894         struct osl_column *col = &t->columns[col_num];
895         uint32_t offset;
896         char *header;
897         char *cell_index;
898         int ret;
899
900         if (t->num_rows <= row_num)
901                 return -E_BAD_ROW_NUM;
902         ret = get_cell_index(t, row_num, col_num, &cell_index);
903         if (ret < 0)
904                 return ret;
905         offset = read_u32(cell_index);
906         obj->size = read_u32(cell_index + 4) - 1;
907         header = col->data_map.data + offset;
908         obj->data = header + 1;
909         if (read_u8(header) == 0xff) {
910                 ERROR_LOG("col %u, size %zu, offset %u\n", col_num,
911                         obj->size, offset);
912                 return -E_INVALID_OBJECT;
913         }
914         return 1;
915 }
916
917 static int search_rbtree(const struct osl_object *obj,
918                 const struct osl_table *t, unsigned col_num,
919                 struct rb_node **result, struct rb_node ***rb_link)
920 {
921         struct osl_column *col = &t->columns[col_num];
922         struct rb_node **new = &col->rbtree.rb_node, *parent = NULL;
923         const struct osl_column_description *cd =
924                 get_column_description(t->desc, col_num);
925         enum osl_storage_type st = cd->storage_type;
926         while (*new) {
927                 struct osl_row *this_row = get_row_pointer(*new,
928                         col->rbtree_num);
929                 int ret;
930                 struct osl_object this_obj;
931                 parent = *new;
932                 if (st == OSL_MAPPED_STORAGE) {
933                         ret = get_mapped_object(t, col_num, this_row->num,
934                                 &this_obj);
935                         if (ret < 0)
936                                 return ret;
937                 } else
938                         this_obj = this_row->volatile_objects[col->volatile_num];
939                 ret = cd->compare_function(obj, &this_obj);
940                 if (!ret) {
941                         if (result)
942                                 *result = get_rb_node_pointer(this_row,
943                                         col->rbtree_num);
944                         return 1;
945                 }
946                 if (ret < 0)
947                         new = &((*new)->rb_left);
948                 else
949                         new = &((*new)->rb_right);
950         }
951         if (result)
952                 *result = parent;
953         if (rb_link)
954                 *rb_link = new;
955         return -E_RB_KEY_NOT_FOUND;
956 }
957
958 static int insert_rbtree(struct osl_table *t, unsigned col_num,
959         const struct osl_row *row, const struct osl_object *obj)
960 {
961         struct rb_node *parent, **rb_link;
962         unsigned rbtree_num;
963         struct rb_node *n;
964         int ret = search_rbtree(obj, t, col_num, &parent, &rb_link);
965
966         if (ret > 0)
967                 return -E_RB_KEY_EXISTS;
968         rbtree_num = t->columns[col_num].rbtree_num;
969         n = get_rb_node_pointer(row, rbtree_num);
970         rb_link_node(n, parent, rb_link);
971         rb_insert_color(n, &t->columns[col_num].rbtree);
972         return 1;
973 }
974
975 static void remove_rb_node(struct osl_table *t, unsigned col_num,
976                 const struct osl_row *row)
977 {
978         struct osl_column *col = &t->columns[col_num];
979         const struct osl_column_description *cd =
980                 get_column_description(t->desc, col_num);
981         enum osl_storage_flags sf = cd->storage_flags;
982         struct rb_node *victim, *splice_out_node, *tmp;
983         if (!(sf & OSL_RBTREE))
984                 return;
985         /*
986          * Which node is removed/spliced out actually depends on how many
987          * children the victim node has: If it has no children, it gets
988          * deleted. If it has one child, it gets spliced out. If it has two
989          * children, its successor (which has at most a right child) gets
990          * spliced out.
991          */
992         victim = get_rb_node_pointer(row, col->rbtree_num);
993         if (victim->rb_left && victim->rb_right)
994                 splice_out_node = rb_next(victim);
995         else
996                 splice_out_node = victim;
997         /* Go up to the root and decrement the size of each node in the path. */
998         for (tmp = splice_out_node; tmp; tmp = rb_parent(tmp))
999                 tmp->size--;
1000         rb_erase(victim, &col->rbtree);
1001 }
1002
1003 static int add_row_to_rbtrees(struct osl_table *t, uint32_t row_num,
1004                 struct osl_object *volatile_objs, struct osl_row **row_ptr)
1005 {
1006         unsigned i;
1007         int ret;
1008         struct osl_row *row = allocate_row(t->num_rbtrees);
1009         const struct osl_column_description *cd;
1010
1011         if (!row)
1012                 return -ERRNO_TO_ERROR(ENOMEM);
1013         row->num = row_num;
1014         row->volatile_objects = volatile_objs;
1015         FOR_EACH_RBTREE_COLUMN(i, t, cd) {
1016                 if (cd->storage_type == OSL_MAPPED_STORAGE) {
1017                         struct osl_object obj;
1018                         ret = get_mapped_object(t, i, row_num, &obj);
1019                         if (ret < 0)
1020                                 goto err;
1021                         ret = insert_rbtree(t, i, row, &obj);
1022                 } else { /* volatile */
1023                         const struct osl_object *obj
1024                                 = volatile_objs + t->columns[i].volatile_num;
1025                         ret = insert_rbtree(t, i, row, obj);
1026                 }
1027                 if (ret < 0)
1028                         goto err;
1029         }
1030         if (row_ptr)
1031                 *row_ptr = row;
1032         return 1;
1033 err: /* rollback changes, i.e. remove added entries from rbtrees */
1034         while (i)
1035                 remove_rb_node(t, i--, row);
1036         free(row);
1037         return ret;
1038 }
1039
1040 static void free_volatile_objects(const struct osl_table *t,
1041                 enum osl_close_flags flags)
1042 {
1043         int i, j;
1044         struct rb_node *n;
1045         struct osl_column *rb_col;
1046         const struct osl_column_description *cd;
1047
1048         if (!t->num_volatile_columns)
1049                 return;
1050         /* find the first rbtree column (any will do) */
1051         FOR_EACH_RBTREE_COLUMN(i, t, cd)
1052                 break;
1053         rb_col = t->columns + i;
1054         /* walk that rbtree and free all volatile objects */
1055         for (n = rb_first(&rb_col->rbtree); n; n = rb_next(n)) {
1056                 struct osl_row *r = get_row_pointer(n, rb_col->rbtree_num);
1057                 if (flags & OSL_FREE_VOLATILE)
1058                         FOR_EACH_VOLATILE_COLUMN(j, t, cd) {
1059                                 if (cd->storage_flags & OSL_DONT_FREE)
1060                                         continue;
1061                                 free(r->volatile_objects[
1062                                         t->columns[j].volatile_num].data);
1063                         }
1064 //                      for (j = 0; j < t->num_volatile_columns; j++)
1065 //                              free(r->volatile_objects[j].data);
1066                 free(r->volatile_objects);
1067         }
1068 }
1069
1070 /**
1071  * Erase all rbtree nodes and free resources.
1072  *
1073  * \param t Pointer to an open osl table.
1074  *
1075  * This function is called by osl_close_table().
1076  */
1077 void clear_rbtrees(struct osl_table *t)
1078 {
1079         const struct osl_column_description *cd;
1080         unsigned i, rbtrees_cleared = 0;
1081
1082         FOR_EACH_RBTREE_COLUMN(i, t, cd) {
1083                 struct osl_column *col = &t->columns[i];
1084                 struct rb_node *n;
1085                 rbtrees_cleared++;
1086                 for (n = rb_first(&col->rbtree); n;) {
1087                         struct osl_row *r;
1088                         rb_erase(n, &col->rbtree);
1089                         if (rbtrees_cleared == t->num_rbtrees) {
1090                                 r = get_row_pointer(n, col->rbtree_num);
1091                                 n = rb_next(n);
1092                                 free(r);
1093                         } else
1094                                 n = rb_next(n);
1095                 }
1096         }
1097
1098 }
1099
1100 /**
1101  * Close an osl table.
1102  *
1103  * \param t Pointer to the table to be closed.
1104  * \param flags Options for what should be cleaned up.
1105  *
1106  * If osl_open_table() succeeds, the resulting table pointer must later be
1107  * passed to this function in order to flush all changes to the file system and
1108  * to free the resources that were allocated by osl_open_table().
1109  *
1110  * \return Positive on success, negative on errors. Possible errors: \p E_BAD_TABLE,
1111  * errors returned by unmap_table().
1112  *
1113  * \sa osl_open_table(), unmap_table().
1114  */
1115 __export int osl_close_table(struct osl_table *t, enum osl_close_flags flags)
1116 {
1117         int ret;
1118
1119         if (!t)
1120                 return -E_BAD_TABLE;
1121         free_volatile_objects(t, flags);
1122         clear_rbtrees(t);
1123         ret = unmap_table(t, flags);
1124         if (ret < 0)
1125                 ERROR_LOG("unmap_table failed: %d\n", ret);
1126         free(t->columns);
1127         free(t);
1128         return ret;
1129 }
1130
1131 /**
1132  * Find out whether the given row number corresponds to an invalid row.
1133  *
1134  * \param t Pointer to the osl table.
1135  * \param row_num The number of the row in question.
1136  *
1137  * By definition, a row is considered invalid if all its index entries
1138  * are invalid.
1139  *
1140  * \return Positive if \a row_num corresponds to an invalid row,
1141  * zero if it corresponds to a valid row, negative on errors.
1142  */
1143 int row_is_invalid(struct osl_table *t, uint32_t row_num)
1144 {
1145         char *row_index;
1146         int i, ret = get_row_index(t, row_num, &row_index);
1147
1148         if (ret < 0)
1149                 return ret;
1150         for (i = 0; i < t->row_index_size; i++) {
1151                 if ((unsigned char)row_index[i] != 0xff)
1152                         return 0;
1153         }
1154         INFO_LOG("row %d is invalid\n", row_num);
1155         return 1;
1156 }
1157
1158 /**
1159  * Invalidate a row of an osl table.
1160  *
1161  * \param t Pointer to an open osl table.
1162  * \param row_num Number of the row to mark as invalid.
1163  *
1164  * This function marks each mapped object in the index entry of \a row as
1165  * invalid.
1166  *
1167  * \return Positive on success, negative on errors.
1168  */
1169 int mark_row_invalid(struct osl_table *t, uint32_t row_num)
1170 {
1171         char *row_index;
1172         int ret = get_row_index(t, row_num, &row_index);
1173
1174         if (ret < 0)
1175                 return ret;
1176         INFO_LOG("marking row %d as invalid\n", row_num);
1177         memset(row_index, 0xff, t->row_index_size);
1178         return 1;
1179 }
1180
1181 /**
1182  * Initialize all rbtrees and compute number of invalid rows.
1183  *
1184  * \param t The table containing the rbtrees to be initialized.
1185  *
1186  * \return Positive on success, negative on errors.
1187  */
1188 int init_rbtrees(struct osl_table *t)
1189 {
1190         int i, ret;
1191         const struct osl_column_description *cd;
1192
1193         /* create rbtrees */
1194         FOR_EACH_RBTREE_COLUMN(i, t, cd)
1195                 t->columns[i].rbtree = RB_ROOT;
1196         /* add valid rows to rbtrees */
1197         t->num_invalid_rows = 0;
1198         for (i = 0; i < t->num_rows; i++) {
1199                 ret = row_is_invalid(t, i);
1200                 if (ret < 0)
1201                         return ret;
1202                 if (ret) {
1203                         t->num_invalid_rows++;
1204                         continue;
1205                 }
1206                 ret = add_row_to_rbtrees(t, i, NULL, NULL);
1207                 if (ret < 0)
1208                         return ret;
1209         }
1210         return 1;
1211 }
1212
1213 /**
1214  * Open an osl table.
1215  *
1216  * Each osl table must be opened before its data can be accessed.
1217  *
1218  * \param table_desc Describes the table to be opened.
1219  * \param result Contains a pointer to the open table on success.
1220  *
1221  * The table description given by \a desc should coincide with the
1222  * description used at creation time.
1223  *
1224  * \return Standard.
1225  */
1226 __export int osl_open_table(const struct osl_table_description *table_desc,
1227                 struct osl_table **result)
1228 {
1229         int i, ret;
1230         struct osl_table *t;
1231         const struct osl_column_description *cd;
1232
1233         INFO_LOG("opening table %s\n", table_desc->name);
1234         ret = init_table_structure(table_desc, &t);
1235         if (ret < 0)
1236                 return ret;
1237         FOR_EACH_DISK_STORAGE_COLUMN(i, t, cd) {
1238                 struct stat statbuf;
1239                 char *dirname = column_filename(t, i);
1240
1241                 ret = -ERRNO_TO_ERROR(ENOMEM);
1242                 if (!dirname)
1243                         goto err;
1244                 /* check if directory exists */
1245                 ret = stat(dirname, &statbuf);
1246                 free(dirname);
1247                 if (ret < 0) {
1248                         ret = -ERRNO_TO_ERROR(errno);
1249                         goto err;
1250                 }
1251                 ret = -ERRNO_TO_ERROR(ENOTDIR);
1252                 if (!S_ISDIR(statbuf.st_mode))
1253                         goto err;
1254         }
1255         ret = map_table(t, MAP_TBL_FL_VERIFY_INDEX);
1256         if (ret < 0)
1257                 goto err;
1258         t->num_rows = ret;
1259         DEBUG_LOG("num rows: %d\n", t->num_rows);
1260         ret = init_rbtrees(t);
1261         if (ret < 0) {
1262                 osl_close_table(t, OSL_MARK_CLEAN); /* ignore further errors */
1263                 return ret;
1264         }
1265         *result = t;
1266         return 1;
1267 err:
1268         free(t->columns);
1269         free(t);
1270         return ret;
1271 }
1272
1273 static int create_disk_storage_object_dir(const struct osl_table *t,
1274                 unsigned col_num, const char *ds_name)
1275 {
1276         char *dirname;
1277         int ret;
1278
1279         if (!(t->desc->flags & OSL_LARGE_TABLE))
1280                 return 1;
1281         dirname = disk_storage_dirname(t, col_num, ds_name);
1282         if (!dirname)
1283                 return -ERRNO_TO_ERROR(ENOMEM);
1284         ret = para_mkdir(dirname, 0777);
1285         free(dirname);
1286         if (ret < 0 && !is_errno(-ret, EEXIST))
1287                 return ret;
1288         return 1;
1289 }
1290
1291 static int write_disk_storage_file(const struct osl_table *t, unsigned col_num,
1292         const struct osl_object *obj, const char *ds_name)
1293 {
1294         int ret;
1295         char *filename;
1296
1297         ret = create_disk_storage_object_dir(t, col_num, ds_name);
1298         if (ret < 0)
1299                 return ret;
1300         filename = disk_storage_path(t, col_num, ds_name);
1301         if (!filename)
1302                 return -ERRNO_TO_ERROR(ENOMEM);
1303         ret = write_file(filename, obj->data, obj->size);
1304         free(filename);
1305         return ret;
1306 }
1307
1308 static int append_map_file(const struct osl_table *t, unsigned col_num,
1309         const struct osl_object *obj, uint32_t *new_size)
1310 {
1311         char *filename = column_filename(t, col_num);
1312         int ret;
1313         char header = 0; /* zero means valid object */
1314
1315         if (!filename)
1316                 return -ERRNO_TO_ERROR(ENOMEM);
1317         ret = append_file(filename, &header, 1, obj->data, obj->size,
1318                 new_size);
1319         free(filename);
1320         return ret;
1321 }
1322
1323 static int append_row_index(const struct osl_table *t, char *row_index)
1324 {
1325         char *filename;
1326         int ret;
1327
1328         if (!t->num_mapped_columns)
1329                 return 1;
1330         filename = index_filename(t->desc);
1331         if (!filename)
1332                 return -ERRNO_TO_ERROR(ENOMEM);
1333         ret = append_file(filename, NULL, 0, row_index,
1334                 t->row_index_size, NULL);
1335         free(filename);
1336         return ret;
1337 }
1338
1339 /**
1340  * A wrapper for truncate(2)
1341  *
1342  * \param path Name of the regular file to truncate
1343  * \param size Number of bytes to \b shave \b off
1344  *
1345  * Truncate the regular file named by \a path by \a size bytes.
1346  *
1347  * \return Positive on success, negative on errors. Possible errors include: \p
1348  * E_STAT, \p E_BAD_SIZE, \p E_TRUNC.
1349  *
1350  * \sa truncate(2)
1351  */
1352 int para_truncate(const char *path, off_t size)
1353 {
1354         int ret;
1355         struct stat statbuf;
1356
1357         ret = -E_STAT;
1358         if (stat(path, &statbuf) < 0)
1359                 goto out;
1360         ret = -E_BAD_SIZE;
1361         if (statbuf.st_size < size)
1362                 goto out;
1363         ret = -E_TRUNC;
1364         if (truncate(path, statbuf.st_size - size) < 0)
1365                 goto out;
1366         ret = 1;
1367 out:
1368         return ret;
1369 }
1370
1371 static int truncate_mapped_file(const struct osl_table *t, unsigned col_num,
1372                 off_t size)
1373 {
1374         int ret;
1375         char *filename = column_filename(t, col_num);
1376
1377         if (!filename)
1378                 return -ERRNO_TO_ERROR(ENOMEM);
1379         ret = para_truncate(filename, size);
1380         free(filename);
1381         return ret;
1382 }
1383
1384 static int delete_disk_storage_file(const struct osl_table *t, unsigned col_num,
1385                 const char *ds_name)
1386 {
1387         char *dirname, *filename = disk_storage_path(t, col_num, ds_name);
1388         int ret, err;
1389
1390         if (!filename)
1391                 return -ERRNO_TO_ERROR(ENOMEM);
1392         ret = unlink(filename);
1393         err = errno;
1394         free(filename);
1395         if (ret < 0)
1396                 return -ERRNO_TO_ERROR(err);
1397         if (!(t->desc->flags & OSL_LARGE_TABLE))
1398                 return 1;
1399         dirname = disk_storage_dirname(t, col_num, ds_name);
1400         if (!dirname)
1401                 return -ERRNO_TO_ERROR(ENOMEM);
1402         rmdir(dirname);
1403         free(dirname);
1404         return 1;
1405 }
1406
1407 /**
1408  * Add a new row to an osl table and retrieve this row.
1409  *
1410  * \param t Pointer to an open osl table.
1411  * \param objects Array of objects to be added.
1412  * \param row Result pointer.
1413  *
1414  * The \a objects parameter must point to an array containing one object per
1415  * column.  The order of the objects in the array is given by the table
1416  * description of \a table. Several sanity checks are performed during object
1417  * insertion and the function returns without modifying the table if any of
1418  * these tests fail.  In fact, it is atomic in the sense that it either
1419  * succeeds or leaves the table unchanged (i.e. either all or none of the
1420  * objects are added to the table).
1421  *
1422  * It is considered an error if an object is added to a column with associated
1423  * rbtree if this object is equal to an object already contained in that column
1424  * (i.e. the compare function for the column's rbtree returns zero).
1425  *
1426  * Possible errors include: \p E_RB_KEY_EXISTS, \p E_BAD_DATA_SIZE.
1427  *
1428  * \return Positive on success, negative on errors.
1429  *
1430  * \sa struct osl_table_description, osl_compare_func, osl_add_row().
1431  */
1432 __export int osl_add_and_get_row(struct osl_table *t, struct osl_object *objects,
1433                 struct osl_row **row)
1434 {
1435         int i, ret;
1436         char *ds_name = NULL;
1437         struct rb_node **rb_parents = NULL, ***rb_links = NULL;
1438         char *new_row_index = NULL;
1439         struct osl_object *volatile_objs = NULL;
1440         const struct osl_column_description *cd;
1441
1442         if (!t)
1443                 return -E_BAD_TABLE;
1444         rb_parents = malloc(t->num_rbtrees * sizeof(struct rn_node*));
1445         if (!rb_parents)
1446                 return -ERRNO_TO_ERROR(ENOMEM);
1447         rb_links = malloc(t->num_rbtrees * sizeof(struct rn_node**));
1448         if (!rb_links) {
1449                 free(rb_parents);
1450                 return -ERRNO_TO_ERROR(ENOMEM);
1451         }
1452         if (t->num_mapped_columns) {
1453                 new_row_index = malloc(t->row_index_size);
1454                 if (!new_row_index) {
1455                         free(rb_links);
1456                         free(rb_parents);
1457                         return -ERRNO_TO_ERROR(ENOMEM);
1458                 }
1459         }
1460         /* pass 1: sanity checks */
1461 //      DEBUG_LOG("sanity tests: %p:%p\n", objects[0].data,
1462 //              objects[1].data);
1463         FOR_EACH_COLUMN(i, t->desc, cd) {
1464                 enum osl_storage_type st = cd->storage_type;
1465                 enum osl_storage_flags sf = cd->storage_flags;
1466
1467 //              ret = -E_NULL_OBJECT;
1468 //              if (!objects[i])
1469 //                      goto out;
1470                 if (st == OSL_DISK_STORAGE)
1471                         continue;
1472                 if (sf & OSL_RBTREE) {
1473                         unsigned rbtree_num = t->columns[i].rbtree_num;
1474                         ret = -E_RB_KEY_EXISTS;
1475 //                      DEBUG_LOG("checking whether %p exists\n",
1476 //                              objects[i].data);
1477                         if (search_rbtree(objects + i, t, i,
1478                                         &rb_parents[rbtree_num],
1479                                         &rb_links[rbtree_num]) > 0)
1480                                 goto out;
1481                 }
1482                 if (sf & OSL_FIXED_SIZE) {
1483 //                      DEBUG_LOG("fixed size. need: %zu, have: %d\n",
1484 //                              objects[i].size, cd->data_size);
1485                         ret = -E_BAD_DATA_SIZE;
1486                         if (objects[i].size != cd->data_size)
1487                                 goto out;
1488                 }
1489         }
1490         if (t->num_disk_storage_columns) {
1491                 ds_name = disk_storage_name_of_object(t,
1492                         &objects[t->disk_storage_name_column]);
1493                 ret = -ERRNO_TO_ERROR(ENOMEM);
1494                 if (!ds_name)
1495                         goto out;
1496         }
1497         ret = unmap_table(t, OSL_MARK_CLEAN);
1498         if (ret < 0)
1499                 goto out;
1500 //      DEBUG_LOG("sanity tests passed%s\n", "");
1501         /* pass 2: create data files, append map data */
1502         FOR_EACH_COLUMN(i, t->desc, cd) {
1503                 enum osl_storage_type st = cd->storage_type;
1504                 if (st == OSL_NO_STORAGE)
1505                         continue;
1506                 if (st == OSL_MAPPED_STORAGE) {
1507                         uint32_t new_size;
1508                         struct osl_column *col = &t->columns[i];
1509 //                      DEBUG_LOG("appending object of size %zu\n",
1510 //                              objects[i].size);
1511                         ret = append_map_file(t, i, objects + i, &new_size);
1512                         if (ret < 0)
1513                                 goto rollback;
1514                         update_cell_index(new_row_index, col, new_size,
1515                                 objects[i].size);
1516                         continue;
1517                 }
1518                 /* DISK_STORAGE */
1519                 ret = write_disk_storage_file(t, i, objects + i, ds_name);
1520                 if (ret < 0)
1521                         goto rollback;
1522         }
1523         ret = append_row_index(t, new_row_index);
1524         if (ret < 0)
1525                 goto rollback;
1526         ret = map_table(t, MAP_TBL_FL_VERIFY_INDEX);
1527         if (ret < 0) { /* truncate index and rollback changes */
1528                 char *filename = index_filename(t->desc);
1529                 if (filename)
1530                         para_truncate(filename, t->row_index_size);
1531                 free(filename);
1532                 goto rollback;
1533         }
1534         /* pass 3: add entry to rbtrees */
1535         if (t->num_volatile_columns) {
1536                 ret = -ERRNO_TO_ERROR(ENOMEM);
1537                 volatile_objs = calloc(t->num_volatile_columns,
1538                         sizeof(struct osl_object));
1539                 if (!volatile_objs)
1540                         goto out;
1541                 FOR_EACH_VOLATILE_COLUMN(i, t, cd)
1542                         volatile_objs[t->columns[i].volatile_num] = objects[i];
1543         }
1544         t->num_rows++;
1545 //      DEBUG_LOG("adding new entry as row #%d\n", t->num_rows - 1);
1546         ret = add_row_to_rbtrees(t, t->num_rows - 1, volatile_objs, row);
1547         if (ret < 0)
1548                 goto out;
1549 //      DEBUG_LOG("added new entry as row #%d\n", t->num_rows - 1);
1550         ret = 1;
1551         goto out;
1552 rollback: /* rollback all changes made, ignore further errors */
1553         for (i--; i >= 0; i--) {
1554                 cd = get_column_description(t->desc, i);
1555                 enum osl_storage_type st = cd->storage_type;
1556                 if (st == OSL_NO_STORAGE)
1557                         continue;
1558
1559                 if (st == OSL_MAPPED_STORAGE)
1560                         truncate_mapped_file(t, i, objects[i].size);
1561                 else /* disk storage */
1562                         delete_disk_storage_file(t, i, ds_name);
1563         }
1564         /* ignore error and return previous error value */
1565         map_table(t, MAP_TBL_FL_VERIFY_INDEX);
1566 out:
1567         free(new_row_index);
1568         free(ds_name);
1569         free(rb_parents);
1570         free(rb_links);
1571         return ret;
1572 }
1573
1574 /**
1575  * Add a new row to an osl table.
1576  *
1577  * \param t Same meaning as osl_add_and_get_row().
1578  * \param objects Same meaning as osl_add_and_get_row().
1579  *
1580  * \return The return value of the underlying call to osl_add_and_get_row().
1581  *
1582  * This is equivalent to osl_add_and_get_row(t, objects, NULL).
1583  */
1584 __export int osl_add_row(struct osl_table *t, struct osl_object *objects)
1585 {
1586         return osl_add_and_get_row(t, objects, NULL);
1587 }
1588
1589 /**
1590  * Retrieve an object identified by row and column
1591  *
1592  * \param t Pointer to an open osl table.
1593  * \param r Pointer to the row.
1594  * \param col_num The column number.
1595  * \param object The result pointer.
1596  *
1597  * The column determined by \a col_num must be of type \p OSL_MAPPED_STORAGE
1598  * or \p OSL_NO_STORAGE, i.e. no disk storage objects may be retrieved by this
1599  * function.
1600  *
1601  * \return Positive if object was found, negative on errors. Possible errors
1602  * include: \p E_BAD_TABLE, \p E_BAD_STORAGE_TYPE.
1603  *
1604  * \sa osl_storage_type, osl_open_disk_object().
1605  */
1606 __export int osl_get_object(const struct osl_table *t, const struct osl_row *r,
1607         unsigned col_num, struct osl_object *object)
1608 {
1609         const struct osl_column_description *cd;
1610
1611         if (!t)
1612                 return -E_BAD_TABLE;
1613         cd = get_column_description(t->desc, col_num);
1614         /* col must not be disk storage */
1615         if (cd->storage_type == OSL_DISK_STORAGE)
1616                 return -E_BAD_STORAGE_TYPE;
1617         if (cd->storage_type == OSL_MAPPED_STORAGE)
1618                 return get_mapped_object(t, col_num, r->num, object);
1619         /* volatile */
1620         *object = r->volatile_objects[t->columns[col_num].volatile_num];
1621         return 1;
1622 }
1623
1624 static int mark_mapped_object_invalid(const struct osl_table *t,
1625                 uint32_t row_num, unsigned col_num)
1626 {
1627         struct osl_object obj;
1628         char *p;
1629         int ret = get_mapped_object(t, col_num, row_num, &obj);
1630
1631         if (ret < 0)
1632                 return ret;
1633         p = obj.data;
1634         p--;
1635         *p = 0xff;
1636         return 1;
1637 }
1638
1639 /**
1640  * Delete a row from an osl table.
1641  *
1642  * \param t Pointer to an open osl table.
1643  * \param row Pointer to the row to delete.
1644  *
1645  * This removes all disk storage objects, removes all rbtree nodes,  and frees
1646  * all volatile objects belonging to the given row. For mapped columns, the
1647  * data is merely marked invalid and may be pruned from time to time by
1648  * para_fsck.
1649  *
1650  * \return Positive on success, negative on errors. Possible errors include:
1651  * \p E_BAD_TABLE, errors returned by osl_get_object().
1652  */
1653 __export int osl_del_row(struct osl_table *t, struct osl_row *row)
1654 {
1655         struct osl_row *r = row;
1656         int i, ret;
1657         const struct osl_column_description *cd;
1658
1659         if (!t)
1660                 return -E_BAD_TABLE;
1661         INFO_LOG("deleting row %p\n", row);
1662
1663         if (t->num_disk_storage_columns) {
1664                 char *ds_name;
1665                 ret = disk_storage_name_of_row(t, r, &ds_name);
1666                 if (ret < 0)
1667                         goto out;
1668                 FOR_EACH_DISK_STORAGE_COLUMN(i, t, cd)
1669                         delete_disk_storage_file(t, i, ds_name);
1670                 free(ds_name);
1671         }
1672         FOR_EACH_COLUMN(i, t->desc, cd) {
1673                 struct osl_column *col = t->columns + i;
1674                 enum osl_storage_type st = cd->storage_type;
1675                 remove_rb_node(t, i, r);
1676                 if (st == OSL_MAPPED_STORAGE) {
1677                         mark_mapped_object_invalid(t, r->num, i);
1678                         continue;
1679                 }
1680                 if (st == OSL_NO_STORAGE && !(cd->storage_flags & OSL_DONT_FREE))
1681                         free(r->volatile_objects[col->volatile_num].data);
1682         }
1683         if (t->num_mapped_columns) {
1684                 ret = mark_row_invalid(t, r->num);
1685                 if (ret < 0)
1686                         goto out;
1687                 t->num_invalid_rows++;
1688         } else
1689                 t->num_rows--;
1690         ret = 1;
1691 out:
1692         free(r->volatile_objects);
1693         free(r);
1694         return ret;
1695 }
1696
1697 /* test if column has an rbtree */
1698 static int check_rbtree_col(const struct osl_table *t, unsigned col_num,
1699                 struct osl_column **col)
1700 {
1701         if (!t)
1702                 return -E_BAD_TABLE;
1703         if (!(get_column_description(t->desc, col_num)->storage_flags & OSL_RBTREE))
1704                 return -E_BAD_STORAGE_FLAGS;
1705         *col = t->columns + col_num;
1706         return 1;
1707 }
1708
1709 /**
1710  * Get the row that contains the given object.
1711  *
1712  * \param t Pointer to an open osl table.
1713  * \param col_num The number of the column to be searched.
1714  * \param obj The object to be looked up.
1715  * \param result Points to the row containing \a obj.
1716  *
1717  * Lookup \a obj in \a t and return the row containing \a obj. The column
1718  * specified by \a col_num must have an associated rbtree.
1719  *
1720  * \return Positive on success, negative on errors. If an error occurred, \a
1721  * result is set to \p NULL. Possible errors include: \p E_BAD_TABLE, \p
1722  * E_BAD_STORAGE_FLAGS, errors returned by get_mapped_object(), \p
1723  * E_RB_KEY_NOT_FOUND.
1724  *
1725  * \sa osl_storage_flags
1726  */
1727 __export int osl_get_row(const struct osl_table *t, unsigned col_num,
1728                 const struct osl_object *obj, struct osl_row **result)
1729 {
1730         int ret;
1731         struct rb_node *node;
1732         struct osl_row *row;
1733         struct osl_column *col;
1734
1735         *result = NULL;
1736         ret = check_rbtree_col(t, col_num, &col);
1737         if (ret < 0)
1738                 return ret;
1739         ret = search_rbtree(obj, t, col_num, &node, NULL);
1740         if (ret < 0)
1741                 return ret;
1742         row = get_row_pointer(node, t->columns[col_num].rbtree_num);
1743         *result = row;
1744         return 1;
1745 }
1746
1747 static int rbtree_loop(struct osl_column *col,  void *private_data,
1748                 osl_rbtree_loop_func *func)
1749 {
1750         struct rb_node *n, *tmp;
1751
1752         /* this for-loop is safe against removal of an entry */
1753         for (n = rb_first(&col->rbtree), tmp = n? rb_next(n) : NULL;
1754                         n;
1755                         n = tmp, tmp = tmp? rb_next(tmp) : NULL) {
1756                 struct osl_row *r = get_row_pointer(n, col->rbtree_num);
1757                 int ret = func(r, private_data);
1758                 if (ret < 0)
1759                         return ret;
1760         }
1761         return 1;
1762 }
1763
1764 static int rbtree_loop_reverse(struct osl_column *col,  void *private_data,
1765                 osl_rbtree_loop_func *func)
1766 {
1767         struct rb_node *n, *tmp;
1768
1769         /* safe against removal of an entry */
1770         for (n = rb_last(&col->rbtree), tmp = n? rb_prev(n) : NULL;
1771                         n;
1772                         n = tmp, tmp = tmp? rb_prev(tmp) : NULL) {
1773                 struct osl_row *r = get_row_pointer(n, col->rbtree_num);
1774                 int ret = func(r, private_data);
1775                 if (ret < 0)
1776                         return ret;
1777         }
1778         return 1;
1779 }
1780
1781 /**
1782  * Loop over all nodes in an rbtree.
1783  *
1784  * \param t Pointer to an open osl table.
1785  * \param col_num The column to use for iterating over the elements.
1786  * \param private_data Pointer that gets passed to \a func.
1787  * \param func The function to be called for each node in the rbtree.
1788  *
1789  * This function does an in-order walk of the rbtree associated with \a
1790  * col_num. It is an error if the \p OSL_RBTREE flag is not set for this
1791  * column. For each node in the rbtree, the given function \a func is called
1792  * with two pointers as arguments: The first osl_row* argument points to the
1793  * row that contains the object corresponding to the rbtree node currently
1794  * traversed, and the \a private_data pointer is passed verbatim to \a func as the
1795  * second argument. The loop terminates either if \a func returns a negative
1796  * value, or if all nodes of the tree have been visited.
1797  *
1798  *
1799  * \return Positive on success, negative on errors. If the termination of the
1800  * loop was caused by \a func returning a negative value, this value is
1801  * returned.
1802  *
1803  * \sa osl_storage_flags, osl_rbtree_loop_reverse(), osl_compare_func.
1804  */
1805 __export int osl_rbtree_loop(const struct osl_table *t, unsigned col_num,
1806         void *private_data, osl_rbtree_loop_func *func)
1807 {
1808         struct osl_column *col;
1809
1810         int ret = check_rbtree_col(t, col_num, &col);
1811         if (ret < 0)
1812                 return ret;
1813         return rbtree_loop(col, private_data, func);
1814 }
1815
1816 /**
1817  * Loop over all nodes in an rbtree in reverse order.
1818  *
1819  * \param t Identical meaning as in \p osl_rbtree_loop().
1820  * \param col_num Identical meaning as in \p osl_rbtree_loop().
1821  * \param private_data Identical meaning as in \p osl_rbtree_loop().
1822  * \param func Identical meaning as in \p osl_rbtree_loop().
1823  *
1824  * This function is identical to \p osl_rbtree_loop(), the only difference
1825  * is that the tree is walked in reverse order.
1826  *
1827  * \return The same return value as \p osl_rbtree_loop().
1828  *
1829  * \sa osl_rbtree_loop().
1830  */
1831 __export int osl_rbtree_loop_reverse(const struct osl_table *t, unsigned col_num,
1832         void *private_data, osl_rbtree_loop_func *func)
1833 {
1834         struct osl_column *col;
1835
1836         int ret = check_rbtree_col(t, col_num, &col);
1837         if (ret < 0)
1838                 return ret;
1839         return rbtree_loop_reverse(col, private_data, func);
1840 }
1841
1842 /* TODO: Rollback changes on errors */
1843 static int rename_disk_storage_objects(struct osl_table *t,
1844                 struct osl_object *old_obj, struct osl_object *new_obj)
1845 {
1846         int i, ret;
1847         const struct osl_column_description *cd;
1848         char *old_ds_name, *new_ds_name;
1849
1850         if (!t->num_disk_storage_columns)
1851                 return 1; /* nothing to do */
1852         if (old_obj->size == new_obj->size && !memcmp(new_obj->data,
1853                         old_obj->data, new_obj->size))
1854                 return 1; /* object did not change */
1855         old_ds_name = disk_storage_name_of_object(t, old_obj);
1856         new_ds_name = disk_storage_name_of_object(t, new_obj);
1857         ret = -ERRNO_TO_ERROR(ENOMEM);
1858         if (!old_ds_name || ! new_ds_name)
1859                 goto out;
1860
1861         FOR_EACH_DISK_STORAGE_COLUMN(i, t, cd) {
1862                 char *old_filename, *new_filename;
1863                 ret = create_disk_storage_object_dir(t, i, new_ds_name);
1864                 if (ret < 0)
1865                         goto out;
1866                 old_filename = disk_storage_path(t, i, old_ds_name);
1867                 new_filename = disk_storage_path(t, i, new_ds_name);
1868                 if (!old_filename || !new_filename)
1869                         ret = -ERRNO_TO_ERROR(ENOMEM);
1870                 else
1871                         ret = para_rename(old_filename, new_filename);
1872                 free(old_filename);
1873                 free(new_filename);
1874                 if (ret < 0)
1875                         goto out;
1876         }
1877         ret = 1;
1878 out:
1879         free(old_ds_name);
1880         free(new_ds_name);
1881         return ret;
1882
1883 }
1884
1885 /**
1886  * Change an object in an osl table.
1887  *
1888  * \param t Pointer to an open osl table.
1889  * \param r Pointer to the row containing the object to be updated.
1890  * \param col_num Number of the column containing the object to be updated.
1891  * \param obj Pointer to the replacement object.
1892  *
1893  * This function  gets rid of all references to the old object. This includes
1894  * removal of the rbtree node in case there is an rbtree associated with \a
1895  * col_num. It then inserts \a obj into the table and the rbtree if necessary.
1896  *
1897  * If the \p OSL_RBTREE flag is set for \a col_num, you \b MUST call this
1898  * function in order to change the contents of an object, even for volatile or
1899  * mapped columns of constant size (which may be updated directly if \p
1900  * OSL_RBTREE is not set).  Otherwise the rbtree might become corrupted.
1901  *
1902  * \return Standard
1903  */
1904 __export int osl_update_object(struct osl_table *t, const struct osl_row *r,
1905                 unsigned col_num, struct osl_object *obj)
1906 {
1907         struct osl_column *col;
1908         const struct osl_column_description *cd;
1909         int ret;
1910
1911         if (!t)
1912                 return -E_BAD_TABLE;
1913         col = &t->columns[col_num];
1914         cd = get_column_description(t->desc, col_num);
1915         DEBUG_LOG("updating column %u of %s\n", col_num, t->desc->name);
1916         if (cd->storage_flags & OSL_RBTREE) {
1917                 if (search_rbtree(obj, t, col_num, NULL, NULL) > 0)
1918                         return -E_RB_KEY_EXISTS;
1919         }
1920         if (cd->storage_flags & OSL_FIXED_SIZE) {
1921                 if (obj->size != cd->data_size)
1922                         return -E_BAD_DATA_SIZE;
1923         }
1924         remove_rb_node(t, col_num, r);
1925         if (cd->storage_type == OSL_NO_STORAGE) { /* TODO: If fixed size, reuse object? */
1926                 free(r->volatile_objects[col->volatile_num].data);
1927                 r->volatile_objects[col->volatile_num] = *obj;
1928         } else if (cd->storage_type == OSL_DISK_STORAGE) {
1929                 char *ds_name;
1930                 ret = disk_storage_name_of_row(t, r, &ds_name);
1931                 if (ret < 0)
1932                         return ret;
1933                 ret = delete_disk_storage_file(t, col_num, ds_name);
1934                 if (ret < 0 && !is_errno(-ret, ENOENT)) {
1935                         free(ds_name);
1936                         return ret;
1937                 }
1938                 ret = write_disk_storage_file(t, col_num, obj, ds_name);
1939                 free(ds_name);
1940                 if (ret < 0)
1941                         return ret;
1942         } else { /* mapped storage */
1943                 struct osl_object old_obj;
1944                 ret = get_mapped_object(t, col_num, r->num, &old_obj);
1945                 if (ret < 0)
1946                         return ret;
1947                 /*
1948                  * If the updated column is the disk storage name column, the
1949                  * disk storage name changes, so we have to rename all disk
1950                  * storage objects accordingly.
1951                  */
1952                 if (col_num == t->disk_storage_name_column) {
1953                         ret = rename_disk_storage_objects(t, &old_obj, obj);
1954                         if (ret < 0)
1955                                 return ret;
1956                 }
1957                 if (cd->storage_flags & OSL_FIXED_SIZE)
1958                         memcpy(old_obj.data, obj->data, cd->data_size);
1959                 else { /* TODO: if the size doesn't change, use old space */
1960                         uint32_t new_data_map_size;
1961                         char *row_index;
1962                         ret = get_row_index(t, r->num, &row_index);
1963                         if (ret < 0)
1964                                 return ret;
1965                         ret = mark_mapped_object_invalid(t, r->num, col_num);
1966                         if (ret < 0)
1967                                 return ret;
1968                         unmap_column(t, col_num);
1969                         ret = append_map_file(t, col_num, obj,
1970                                 &new_data_map_size);
1971                         if (ret < 0)
1972                                 return ret;
1973                         ret = map_column(t, col_num);
1974                         if (ret < 0)
1975                                 return ret;
1976                         update_cell_index(row_index, col, new_data_map_size,
1977                                 obj->size);
1978                 }
1979         }
1980         if (cd->storage_flags & OSL_RBTREE) {
1981                 ret = insert_rbtree(t, col_num, r, obj);
1982                 if (ret < 0)
1983                         return ret;
1984         }
1985         return 1;
1986 }
1987
1988 /**
1989  * Retrieve an object of type \p OSL_DISK_STORAGE by row and column.
1990  *
1991  * \param t Pointer to an open osl table.
1992  * \param r Pointer to the row containing the object.
1993  * \param col_num The column number.
1994  * \param obj Points to the result upon successful return.
1995  *
1996  * For columns of type \p OSL_DISK_STORAGE, this function must be used to
1997  * retrieve one of its containing objects. Afterwards, osl_close_disk_object()
1998  * must be called in order to deallocate the resources.
1999  *
2000  * \return Positive on success, negative on errors. Possible errors include:
2001  * \p E_BAD_TABLE, \p E_BAD_STORAGE_TYPE, errors returned by osl_get_object().
2002  *
2003  * \sa osl_get_object(), osl_storage_type, osl_close_disk_object().
2004  */
2005 __export int osl_open_disk_object(const struct osl_table *t, const struct osl_row *r,
2006                 unsigned col_num, struct osl_object *obj)
2007 {
2008         const struct osl_column_description *cd;
2009         char *ds_name, *filename;
2010         int ret;
2011
2012         if (!t)
2013                 return -E_BAD_TABLE;
2014         cd = get_column_description(t->desc, col_num);
2015         if (cd->storage_type != OSL_DISK_STORAGE)
2016                 return -E_BAD_STORAGE_TYPE;
2017
2018         ret = disk_storage_name_of_row(t, r, &ds_name);
2019         if (ret < 0)
2020                 return ret;
2021         filename = disk_storage_path(t, col_num, ds_name);
2022         free(ds_name);
2023         if (!filename)
2024                 return -ERRNO_TO_ERROR(ENOMEM);
2025         DEBUG_LOG("filename: %s\n", filename);
2026         ret = mmap_full_file(filename, O_RDONLY, &obj->data, &obj->size, NULL);
2027         free(filename);
2028         return ret;
2029 }
2030
2031 /**
2032  * Free resources that were allocated during osl_open_disk_object().
2033  *
2034  * \param obj Pointer to the object previously returned by open_disk_object().
2035  *
2036  * \return The return value of the underlying call to para_munmap().
2037  *
2038  * \sa para_munmap().
2039  */
2040 __export int osl_close_disk_object(struct osl_object *obj)
2041 {
2042         return para_munmap(obj->data, obj->size);
2043 }
2044
2045 /**
2046  * Get the number of rows of the given table.
2047  *
2048  * \param t Pointer to an open osl table.
2049  * \param num_rows Result is returned here.
2050  *
2051  * The number of rows returned via \a num_rows excluding any invalid rows.
2052  *
2053  * \return Positive on success, \p -E_BAD_TABLE if \a t is \p NULL.
2054  */
2055 __export int osl_get_num_rows(const struct osl_table *t, unsigned *num_rows)
2056 {
2057         if (!t)
2058                 return -E_BAD_TABLE;
2059         assert(t->num_rows >= t->num_invalid_rows);
2060         *num_rows = t->num_rows - t->num_invalid_rows;
2061         return 1;
2062 }
2063
2064 /**
2065  * Get the rank of a row.
2066  *
2067  * \param t An open osl table.
2068  * \param r The row to get the rank of.
2069  * \param col_num The number of an rbtree column.
2070  * \param rank Result pointer.
2071  *
2072  * The rank is, by definition, the position of the row in the linear order
2073  * determined by an in-order tree walk of the rbtree associated with column
2074  * number \a col_num of \a table.
2075  *
2076  * \return Positive on success, negative on errors.
2077  *
2078  * \sa osl_get_nth_row().
2079  */
2080 __export int osl_get_rank(const struct osl_table *t, struct osl_row *r,
2081                 unsigned col_num, unsigned *rank)
2082 {
2083         struct osl_object obj;
2084         struct osl_column *col;
2085         struct rb_node *node;
2086         int ret = check_rbtree_col(t, col_num, &col);
2087
2088         if (ret < 0)
2089                 return ret;
2090         ret = osl_get_object(t, r, col_num, &obj);
2091         if (ret < 0)
2092                 return ret;
2093         ret = search_rbtree(&obj, t, col_num, &node, NULL);
2094         if (ret < 0)
2095                 return ret;
2096         ret = rb_rank(node, rank);
2097         if (ret < 0)
2098                 return -E_BAD_ROW;
2099         return 1;
2100 }
2101
2102 /**
2103  * Get the row with n-th greatest value.
2104  *
2105  * \param t Pointer to an open osl table.
2106  * \param col_num The column number.
2107  * \param n The rank of the desired row.
2108  * \param result Row is returned here.
2109  *
2110  * Retrieve the n-th order statistic with respect to the compare function
2111  * of the rbtree column \a col_num. In other words, get that row with
2112  * \a n th greatest value in column \a col_num. It's an error if
2113  * \a col_num is not a rbtree column, or if \a n is larger than the
2114  * number of rows in the table.
2115  *
2116  * \return Positive on success, negative on errors. Possible errors:
2117  * \p E_BAD_TABLE, \p E_BAD_STORAGE_FLAGS, \p E_RB_KEY_NOT_FOUND.
2118  *
2119  * \sa osl_storage_flags, osl_compare_func, osl_get_row(),
2120  * osl_rbtree_last_row(), osl_rbtree_first_row(), osl_get_rank().
2121  */
2122 __export int osl_get_nth_row(const struct osl_table *t, unsigned col_num,
2123                 unsigned n, struct osl_row **result)
2124 {
2125         struct osl_column *col;
2126         struct rb_node *node;
2127         unsigned num_rows;
2128         int ret;
2129
2130         if (n == 0)
2131                 return -E_RB_KEY_NOT_FOUND;
2132         ret = osl_get_num_rows(t, &num_rows);
2133         if (ret < 0)
2134                 return ret;
2135         if (n > num_rows)
2136                 return -E_RB_KEY_NOT_FOUND;
2137         ret = check_rbtree_col(t, col_num, &col);
2138         if (ret < 0)
2139                 return ret;
2140         node = rb_nth(col->rbtree.rb_node, n);
2141         if (!node)
2142                 return -E_RB_KEY_NOT_FOUND;
2143         *result = get_row_pointer(node, col->rbtree_num);
2144         return 1;
2145 }
2146
2147 /**
2148  * Get the row corresponding to the smallest rbtree node of a column.
2149  *
2150  * \param t An open rbtree table.
2151  * \param col_num The number of the rbtree column.
2152  * \param result A pointer to the first row is returned here.
2153  *
2154  * The rbtree node of the smallest object (with respect to the corresponding
2155  * compare function) is selected and the row containing this object is
2156  * returned. It is an error if \a col_num refers to a column without an
2157  * associated rbtree.
2158  *
2159  * \return Positive on success, negative on errors.
2160  *
2161  * \sa osl_get_nth_row(), osl_rbtree_last_row().
2162  */
2163 __export int osl_rbtree_first_row(const struct osl_table *t, unsigned col_num,
2164                 struct osl_row **result)
2165 {
2166         return osl_get_nth_row(t, col_num, 1, result);
2167 }
2168
2169 /**
2170  * Get the row corresponding to the greatest rbtree node of a column.
2171  *
2172  * \param t The same meaning as in \p osl_rbtree_first_row().
2173  * \param col_num The same meaning as in \p osl_rbtree_first_row().
2174  * \param result The same meaning as in \p osl_rbtree_first_row().
2175  *
2176  * This function works just like osl_rbtree_first_row(), the only difference
2177  * is that the row containing the greatest rather than the smallest object is
2178  * returned.
2179  *
2180  * \return Positive on success, negative on errors.
2181  *
2182  * \sa osl_get_nth_row(), osl_rbtree_first_row().
2183  */
2184 __export int osl_rbtree_last_row(const struct osl_table *t, unsigned col_num,
2185                 struct osl_row **result)
2186 {
2187         unsigned num_rows;
2188         int ret = osl_get_num_rows(t, &num_rows);
2189
2190         if (ret < 0)
2191                 return ret;
2192         return osl_get_nth_row(t, col_num, num_rows, result);
2193 }