]> git.tuebingen.mpg.de Git - osl.git/blob - osl.c
Switch to the new error code handling.
[osl.git] / osl.c
1 /*
2  * Copyright (C) 2007-2008 Andre Noll <maan@systemlinux.org>
3  *
4  * Licensed under the GPL v2. For licencing details see COPYING.
5  */
6
7 /** \file osl.c Object storage layer functions. */
8 #include <dirent.h> /* readdir() */
9 #include <assert.h>
10
11
12 #include "log.h"
13 #include "osl.h"
14 #include "error.h"
15 #include "fd.h"
16 #include "list.h"
17 #include "osl_core.h"
18
19 /**
20  * Allocate a sufficiently large string and print into it.
21  *
22  * \param fmt A usual format string.
23  *
24  * Produce output according to \p fmt. No artificial bound on the length of the
25  * resulting string is imposed.
26  *
27  * \return This function either returns a pointer to a string that must be
28  * freed by the caller or \p NULL if memory allocation failed.
29  *
30  * \sa printf(3).
31  */
32 static __must_check __printf_1_2 __malloc char *make_message(const char *fmt, ...)
33 {
34         int n;
35         size_t size = 100;
36         char *p = malloc(size);
37
38         if (!p)
39                 return NULL;
40         while (1) {
41                 char *q;
42                 va_list ap;
43                 /* Try to print in the allocated space. */
44                 va_start(ap, fmt);
45                 n = vsnprintf(p, size, fmt, ap);
46                 va_end(ap);
47                 /* If that worked, return the string. */
48                 if (n > -1 && n < size)
49                         break;
50                 /* Else try again with more space. */
51                 if (n > -1) /* glibc 2.1 */
52                         size = n + 1; /* precisely what is needed */
53                 else /* glibc 2.0 */
54                         size *= 2; /* twice the old size */
55                 q = realloc(p, size);
56                 if (!q) {
57                         free(p);
58                         return NULL;
59                 }
60         }
61         return p;
62 }
63
64 /* Taken from Drepper: How to write shared libraries, Appendix B. */
65 #include <stddef.h>
66 #define MSGSTRFIELD(line) MSGSTRFIELD1(line)
67 #define MSGSTRFIELD1(line) str##line
68 static const union msgstr_t {
69         struct {
70 #define _S(n, s) char MSGSTRFIELD(__LINE__)[sizeof(s)];
71 #include "errtab.h"
72 #undef _S
73         };
74         char str[0];
75 } msgstr = { {
76 #define _S(n, s) s,
77 #include "errtab.h"
78 #undef _S
79 } };
80 static const unsigned int errmsgidx[] = {
81 #define _S(n, s) [n] = offsetof(union msgstr_t, MSGSTRFIELD(__LINE__)),
82 #include "errtab.h"
83 #undef _S
84 };
85 __export const char *osl_strerror(int num)
86 {
87         if (IS_SYSTEM_ERROR(num))
88                 return strerror((num) & ((1 << SYSTEM_ERROR_BIT) - 1));
89         return msgstr.str + errmsgidx[num];
90 }
91
92 /**
93  * The log function.
94  *
95  * \param ll Loglevel.
96  * \param fmt Usual format string.
97  *
98  * All XXX_LOG() macros use this function.
99  */
100 __printf_2_3 void __log(int ll, const char* fmt,...)
101 {
102         va_list argp;
103         FILE *outfd;
104         struct tm *tm;
105         time_t t1;
106         char str[255] = "";
107
108         if (ll < 2)
109                 return;
110         outfd = stderr;
111         time(&t1);
112         tm = localtime(&t1);
113         strftime(str, sizeof(str), "%b %d %H:%M:%S", tm);
114         fprintf(outfd, "%s ", str);
115         va_start(argp, fmt);
116         vfprintf(outfd, fmt, argp);
117         va_end(argp);
118 }
119
120 /**
121  * A wrapper for lseek(2).
122  *
123  * \param fd The file descriptor whose offset is to be to repositioned.
124  * \param offset A value-result parameter.
125  * \param whence Usual repositioning directive.
126  *
127  * Reposition the offset of the file descriptor \a fd to the argument \a offset
128  * according to the directive \a whence. Upon successful return, \a offset
129  * contains the resulting offset location as measured in bytes from the
130  * beginning of the file.
131  *
132  * \return Positive on success. Otherwise, the function returns \p -E_OSL_LSEEK.
133  *
134  * \sa lseek(2).
135  */
136 static int __lseek(int fd, off_t *offset, int whence)
137 {
138         *offset = lseek(fd, *offset, whence);
139         int ret = -E_OSL_LSEEK;
140         if (*offset == -1)
141                 return ret;
142         return 1;
143 }
144
145 /**
146  * Wrapper for the write system call.
147  *
148  * \param fd The file descriptor to write to.
149  * \param buf The buffer to write.
150  * \param size The length of \a buf in bytes.
151  *
152  * This function writes out the given buffer and retries if an interrupt
153  * occurred during the write.
154  *
155  * \return On success, the number of bytes written is returned, otherwise, the
156  * function returns \p -E_OSL_WRITE.
157  *
158  * \sa write(2).
159  */
160 static ssize_t __write(int fd, const void *buf, size_t size)
161 {
162         ssize_t ret;
163
164         for (;;) {
165                 ret = write(fd, buf, size);
166                 if ((ret < 0) && (errno == EAGAIN || errno == EINTR))
167                         continue;
168                 return ret >= 0? ret : -E_OSL_WRITE;
169         }
170 }
171
172 /**
173  * Write the whole buffer to a file descriptor.
174  *
175  * \param fd The file descriptor to write to.
176  * \param buf The buffer to write.
177  * \param size The length of \a buf in bytes.
178  *
179  * This function writes the given buffer and continues on short writes and
180  * when interrupted by a signal.
181  *
182  * \return Positive on success, negative on errors. Possible errors: any
183  * errors returned by para_write().
184  *
185  * \sa para_write().
186  */
187 static ssize_t write_all(int fd, const void *buf, size_t size)
188 {
189 //      DEBUG_LOG("writing %zu bytes\n", size);
190         const char *b = buf;
191         while (size) {
192                 ssize_t ret = __write(fd, b, size);
193 //              DEBUG_LOG("ret: %zd\n", ret);
194                 if (ret < 0)
195                         return ret;
196                 b += ret;
197                 size -= ret;
198         }
199         return 1;
200 }
201 /**
202  * Open a file, write the given buffer and close the file.
203  *
204  * \param filename Full path to the file to open.
205  * \param buf The buffer to write to the file.
206  * \param size The size of \a buf.
207  *
208  * \return Standard.
209  */
210 static int write_file(const char *filename, const void *buf, size_t size)
211 {
212         int ret, fd;
213
214         ret = osl_open(filename, O_WRONLY | O_CREAT | O_EXCL, 0644);
215         if (ret < 0)
216                 return ret;
217         fd = ret;
218         ret = write_all(fd, buf, size);
219         if (ret < 0)
220                 goto out;
221         ret = 1;
222 out:
223         close(fd);
224         return ret;
225 }
226
227 static int append_file(const char *filename, char *header, size_t header_size,
228         char *data, size_t data_size, uint32_t *new_pos)
229 {
230         int ret, fd;
231
232 //      DEBUG_LOG("appending %zu  + %zu bytes\n", header_size, data_size);
233         ret = osl_open(filename, O_WRONLY | O_CREAT | O_APPEND, 0644);
234         if (ret < 0)
235                 return ret;
236         fd = ret;
237         if (header && header_size) {
238                 ret = write_all(fd, header, header_size);
239                 if (ret < 0)
240                         goto out;
241         }
242         ret = write_all(fd, data, data_size);
243         if (ret < 0)
244                 goto out;
245         if (new_pos) {
246                 off_t offset = 0;
247                 ret = __lseek(fd, &offset, SEEK_END);
248                 if (ret < 0)
249                         goto out;
250 //              DEBUG_LOG("new file size: " FMT_OFF_T "\n", offset);
251                 *new_pos = offset;
252         }
253         ret = 1;
254 out:
255         close(fd);
256         return ret;
257 }
258
259 static int verify_name(const char *name)
260 {
261         if (!name)
262                 return -E_OSL_BAD_NAME;
263         if (!*name)
264                 return -E_OSL_BAD_NAME;
265         if (strchr(name, '/'))
266                 return -E_OSL_BAD_NAME;
267         if (!strcmp(name, ".."))
268                 return -E_OSL_BAD_NAME;
269         if (!strcmp(name, "."))
270                 return -E_OSL_BAD_NAME;
271         return 1;
272 }
273
274 /**
275  * Compare two osl objects pointing to unsigned integers of 32 bit size.
276  *
277  * \param obj1 Pointer to the first integer.
278  * \param obj2 Pointer to the second integer.
279  *
280  * \return The values required for an osl compare function.
281  *
282  * \sa osl_compare_func, osl_hash_compare().
283  */
284 int uint32_compare(const struct osl_object *obj1, const struct osl_object *obj2)
285 {
286         uint32_t d1 = read_u32((const char *)obj1->data);
287         uint32_t d2 = read_u32((const char *)obj2->data);
288
289         if (d1 < d2)
290                 return 1;
291         if (d1 > d2)
292                 return -1;
293         return 0;
294 }
295
296 /**
297  * Compare two osl objects pointing to hash values.
298  *
299  * \param obj1 Pointer to the first hash object.
300  * \param obj2 Pointer to the second hash object.
301  *
302  * \return The values required for an osl compare function.
303  *
304  * \sa osl_compare_func, uint32_compare().
305  */
306 int osl_hash_compare(const struct osl_object *obj1, const struct osl_object *obj2)
307 {
308         return hash_compare((HASH_TYPE *)obj1->data, (HASH_TYPE *)obj2->data);
309 }
310
311 static char *disk_storage_dirname(const struct osl_table *t, unsigned col_num,
312                 const char *ds_name)
313 {
314         char *dirname, *column_name = column_filename(t, col_num);
315
316         if (!column_name)
317                 return NULL;
318         if (!(t->desc->flags & OSL_LARGE_TABLE))
319                 return column_name;
320         dirname = make_message("%s/%.2s", column_name, ds_name);
321         free(column_name);
322         return dirname;
323 }
324
325 static char *disk_storage_name_of_object(const struct osl_table *t,
326         const struct osl_object *obj)
327 {
328         HASH_TYPE hash[HASH_SIZE];
329         hash_object(obj, hash);
330         return disk_storage_name_of_hash(t, hash);
331 }
332
333 static int disk_storage_name_of_row(const struct osl_table *t,
334                 const struct osl_row *row, char **name)
335 {
336         struct osl_object obj;
337         int ret = osl_get_object(t, row, t->disk_storage_name_column, &obj);
338
339         if (ret < 0)
340                 return ret;
341         *name = disk_storage_name_of_object(t, &obj);
342         if (*name)
343                 return 1;
344         return -ERRNO_TO_ERROR(ENOMEM);
345 }
346
347 static void column_name_hash(const char *col_name, HASH_TYPE *hash)
348 {
349         hash_function(col_name, strlen(col_name), hash);
350 }
351
352 static int init_column_descriptions(struct osl_table *t)
353 {
354         int i, j, ret;
355         const struct osl_column_description *cd;
356
357         ret = -E_OSL_BAD_TABLE_DESC;
358         ret = verify_name(t->desc->name);
359         if (ret < 0)
360                 goto err;
361         ret = -E_OSL_BAD_DB_DIR;
362         if (!t->desc->dir && (t->num_disk_storage_columns || t->num_mapped_columns))
363                 goto err;
364         /* the size of the index header without column descriptions */
365         t->index_header_size = IDX_COLUMN_DESCRIPTIONS;
366         FOR_EACH_COLUMN(i, t->desc, cd) {
367                 struct osl_column *col = t->columns + i;
368                 if (cd->storage_flags & OSL_RBTREE) {
369                         if (!cd->compare_function)
370                                 return -E_OSL_NO_COMPARE_FUNC;
371                 }
372                 if (cd->storage_type == OSL_NO_STORAGE)
373                         continue;
374                 ret = -E_OSL_NO_COLUMN_NAME;
375                 if (!cd->name || !cd->name[0])
376                         goto err;
377                 ret = verify_name(cd->name);
378                 if (ret < 0)
379                         goto err;
380                 t->index_header_size += index_column_description_size(cd->name);
381                 column_name_hash(cd->name, col->name_hash);
382                 ret = -E_OSL_DUPLICATE_COL_NAME;
383                 for (j = i + 1; j < t->desc->num_columns; j++) {
384                         const char *name2 = get_column_description(t->desc,
385                                 j)->name;
386                         if (cd->name && name2 && !strcmp(cd->name, name2))
387                                 goto err;
388                 }
389         }
390         return 1;
391 err:
392         return ret;
393 }
394
395 /**
396  * Initialize a struct table from given table description.
397  *
398  * \param desc The description of the osl table.
399  * \param table_ptr Result is returned here.
400  *
401  * This function performs several sanity checks on \p desc and returns if any
402  * of these tests fail. On success, a struct \p osl_table is allocated and
403  * initialized with data derived from \p desc.
404  *
405  * \return Standard.
406  *
407  * \sa struct osl_table.
408  */
409 int init_table_structure(const struct osl_table_description *desc,
410                 struct osl_table **table_ptr)
411 {
412         const struct osl_column_description *cd;
413         struct osl_table *t = calloc(1, sizeof(*t));
414         int i, ret = -ERRNO_TO_ERROR(ENOMEM), have_disk_storage_name_column = 0;
415
416         if (!t)
417                 return ret;
418         ret = -E_OSL_BAD_TABLE_DESC;
419         if (!desc)
420                 goto err;
421         DEBUG_LOG("creating table structure for '%s' from table "
422                 "description\n", desc->name);
423         ret = -E_OSL_NO_COLUMN_DESC;
424         if (!desc->column_descriptions)
425                 goto err;
426         ret = -E_OSL_NO_COLUMNS;
427         if (!desc->num_columns)
428                 goto err;
429         ret = -ERRNO_TO_ERROR(ENOMEM);
430         t->columns = calloc(desc->num_columns, sizeof(struct osl_column));
431         if (!t->columns)
432                 goto err;
433         t->desc = desc;
434         FOR_EACH_COLUMN(i, t->desc, cd) {
435                 enum osl_storage_type st = cd->storage_type;
436                 enum osl_storage_flags sf = cd->storage_flags;
437                 struct osl_column *col = &t->columns[i];
438
439                 ret = -E_OSL_BAD_STORAGE_TYPE;
440                 if (st != OSL_MAPPED_STORAGE && st != OSL_DISK_STORAGE
441                                 && st != OSL_NO_STORAGE)
442                         goto err;
443                 ret = -E_OSL_BAD_STORAGE_FLAGS;
444                 if (st == OSL_DISK_STORAGE && sf & OSL_RBTREE)
445                         goto err;
446                 ret = -E_OSL_BAD_STORAGE_SIZE;
447                 if (sf & OSL_FIXED_SIZE && !cd->data_size)
448                         goto err;
449                 switch (st) {
450                 case OSL_DISK_STORAGE:
451                         t->num_disk_storage_columns++;
452                         break;
453                 case OSL_MAPPED_STORAGE:
454                         t->num_mapped_columns++;
455                         col->index_offset = t->row_index_size;
456                         t->row_index_size += 8;
457                         break;
458                 case OSL_NO_STORAGE:
459                         col->volatile_num = t->num_volatile_columns;
460                         t->num_volatile_columns++;
461                         break;
462                 }
463                 if (sf & OSL_RBTREE) {
464                         col->rbtree_num = t->num_rbtrees;
465                         t->num_rbtrees++;
466                         if ((sf & OSL_UNIQUE) && (st == OSL_MAPPED_STORAGE)) {
467                                 if (!have_disk_storage_name_column)
468                                         t->disk_storage_name_column = i;
469                                 have_disk_storage_name_column = 1;
470                         }
471                 }
472         }
473         ret = -E_OSL_NO_UNIQUE_RBTREE_COLUMN;
474         if (t->num_disk_storage_columns && !have_disk_storage_name_column)
475                 goto err;
476         ret = -E_OSL_NO_RBTREE_COL;
477         if (!t->num_rbtrees)
478                 goto err;
479         /* success */
480         DEBUG_LOG("OK. Index entry size: %u\n", t->row_index_size);
481         ret = init_column_descriptions(t);
482         if (ret < 0)
483                 goto err;
484         *table_ptr = t;
485         return 1;
486 err:
487         free(t->columns);
488         free(t);
489         return ret;
490 }
491
492 /**
493  * Read the table description from index header.
494  *
495  * \param map The memory mapping of the index file.
496  * \param desc The values found in the index header are returned here.
497  *
498  * Read the index header, check for the paraslash magic string and the table version number.
499  * Read all information stored in the index header into \a desc.
500  *
501  * \return Standard.
502  *
503  * \sa struct osl_table_description, osl_create_table.
504  */
505 int read_table_desc(struct osl_object *map, struct osl_table_description *desc)
506 {
507         char *buf = map->data;
508         uint8_t version;
509         uint16_t header_size;
510         int ret, i;
511         unsigned offset;
512         struct osl_column_description *cd;
513
514         if (map->size < MIN_INDEX_HEADER_SIZE(1))
515                 return -E_OSL_SHORT_TABLE;
516         if (strncmp(buf + IDX_PARA_MAGIC, PARA_MAGIC, strlen(PARA_MAGIC)))
517                 return -E_OSL_NO_MAGIC;
518         version = read_u8(buf + IDX_VERSION);
519         if (version < MIN_TABLE_VERSION || version > MAX_TABLE_VERSION)
520                 return -E_OSL_VERSION_MISMATCH;
521         desc->flags = read_u8(buf + IDX_TABLE_FLAGS);
522         desc->num_columns = read_u16(buf + IDX_NUM_COLUMNS);
523         DEBUG_LOG("%u columns\n", desc->num_columns);
524         if (!desc->num_columns)
525                 return -E_OSL_NO_COLUMNS;
526         header_size = read_u16(buf + IDX_HEADER_SIZE);
527         if (map->size < header_size)
528                 return -E_OSL_BAD_SIZE;
529         desc->column_descriptions = calloc(desc->num_columns,
530                 sizeof(struct osl_column_description));
531         if (!desc->column_descriptions)
532                 return -ERRNO_TO_ERROR(ENOMEM);
533         offset = IDX_COLUMN_DESCRIPTIONS;
534         FOR_EACH_COLUMN(i, desc, cd) {
535                 char *null_byte;
536
537                 ret = -E_OSL_SHORT_TABLE;
538                 if (map->size < offset + MIN_IDX_COLUMN_DESCRIPTION_SIZE) {
539                         ERROR_LOG("map size = %zu < %u = offset + min desc size\n",
540                                 map->size, offset + MIN_IDX_COLUMN_DESCRIPTION_SIZE);
541                         goto err;
542                 }
543                 cd->storage_type = read_u16(buf + offset + IDX_CD_STORAGE_TYPE);
544                 cd->storage_flags = read_u16(buf + offset +
545                         IDX_CD_STORAGE_FLAGS);
546                 cd->data_size = read_u32(buf + offset + IDX_CD_DATA_SIZE);
547                 null_byte = memchr(buf + offset + IDX_CD_NAME, '\0',
548                         map->size - offset - IDX_CD_NAME);
549                 ret = -E_OSL_INDEX_CORRUPTION;
550                 if (!null_byte)
551                         goto err;
552                 ret = -ERRNO_TO_ERROR(ENOMEM);
553                 cd->name = strdup(buf + offset + IDX_CD_NAME);
554                 if (!cd->name)
555                         goto err;
556                 offset += index_column_description_size(cd->name);
557         }
558         if (offset != header_size) {
559                 ret = -E_OSL_INDEX_CORRUPTION;
560                 ERROR_LOG("real header size = %u != %u = stored header size\n",
561                         offset, header_size);
562                 goto err;
563         }
564         return 1;
565 err:
566         FOR_EACH_COLUMN(i, desc, cd)
567                 free(cd->name);
568         return ret;
569 }
570
571 /*
572  * check whether the table description given by \p t->desc matches the on-disk
573  * table structure stored in the index of \a t.
574  */
575 static int compare_table_descriptions(struct osl_table *t)
576 {
577         int i, ret;
578         struct osl_table_description desc;
579         const struct osl_column_description *cd1, *cd2;
580
581         /* read the on-disk structure into desc */
582         ret = read_table_desc(&t->index_map, &desc);
583         if (ret < 0)
584                 return ret;
585         ret = -E_OSL_BAD_TABLE_FLAGS;
586         if (desc.flags != t->desc->flags)
587                 goto out;
588         ret = -E_OSL_BAD_COLUMN_NUM;
589         if (desc.num_columns > t->desc->num_columns)
590                 goto out;
591         if (desc.num_columns < t->desc->num_columns) {
592                 struct osl_column_description *cd;
593                 unsigned diff = t->desc->num_columns - desc.num_columns;
594                 INFO_LOG("extending table by %u volatile columns\n", diff);
595                 ret = -ERRNO_TO_ERROR(ENOMEM);
596                 desc.column_descriptions = realloc(desc.column_descriptions,
597                         t->desc->num_columns * sizeof(struct osl_column_description));
598                 if (!desc.column_descriptions)
599                         goto out;
600                 for (i = desc.num_columns; i < t->desc->num_columns; i++) {
601                         cd = get_column_description(&desc, i);
602                         cd->storage_type = OSL_NO_STORAGE;
603                         cd->name = NULL;
604                 }
605                 desc.num_columns += diff;
606         }
607         FOR_EACH_COLUMN(i, t->desc, cd1) {
608                 cd2 = get_column_description(&desc, i);
609                 ret = -E_OSL_BAD_STORAGE_TYPE;
610                 if (cd1->storage_type != cd2->storage_type)
611                         goto out;
612                 if (cd1->storage_type == OSL_NO_STORAGE)
613                         continue;
614                 ret = -E_OSL_BAD_STORAGE_FLAGS;
615                 if (cd1->storage_flags != cd2->storage_flags) {
616                         ERROR_LOG("sf1 = %u != %u = sf2\n",
617                                 cd1->storage_flags, cd2->storage_flags);
618                         goto out;
619                 }
620                 ret = -E_OSL_BAD_DATA_SIZE;
621                 if (cd1->storage_flags & OSL_FIXED_SIZE)
622                         if (cd1->data_size != cd2->data_size)
623                                 goto out;
624                 ret = -E_OSL_BAD_COLUMN_NAME;
625                 if (strcmp(cd1->name, cd2->name))
626                         goto out;
627         }
628         DEBUG_LOG("table description of '%s' matches on-disk data, good\n",
629                 t->desc->name);
630         ret = 1;
631 out:
632         FOR_EACH_COLUMN(i, &desc, cd1)
633                 free(cd1->name);
634         free(desc.column_descriptions);
635         return ret;
636 }
637
638 static int create_table_index(struct osl_table *t)
639 {
640         char *buf, *filename;
641         int i, ret;
642         size_t size = t->index_header_size;
643         const struct osl_column_description *cd;
644         unsigned offset;
645
646         INFO_LOG("creating %zu byte index for table %s\n", size,
647                 t->desc->name);
648         buf = calloc(1, size);
649         if (!buf)
650                 return -ERRNO_TO_ERROR(ENOMEM);
651         sprintf(buf + IDX_PARA_MAGIC, "%s", PARA_MAGIC);
652         write_u8(buf + IDX_TABLE_FLAGS, t->desc->flags);
653         write_u8(buf + IDX_DIRTY_FLAG, 0);
654         write_u8(buf + IDX_VERSION, CURRENT_TABLE_VERSION);
655         write_u16(buf + IDX_NUM_COLUMNS, t->num_mapped_columns + t->num_disk_storage_columns);
656         write_u16(buf + IDX_HEADER_SIZE, t->index_header_size);
657         offset = IDX_COLUMN_DESCRIPTIONS;
658         FOR_EACH_COLUMN(i, t->desc, cd) {
659                 /* no need to store info about volatile storage */
660                 if (cd->storage_type == OSL_NO_STORAGE)
661                         continue;
662                 write_u16(buf + offset + IDX_CD_STORAGE_TYPE,
663                         cd->storage_type);
664                 write_u16(buf + offset + IDX_CD_STORAGE_FLAGS,
665                         cd->storage_flags);
666                 if (cd->storage_flags & OSL_FIXED_SIZE)
667                         write_u32(buf + offset + IDX_CD_DATA_SIZE,
668                                 cd->data_size);
669                 strcpy(buf + offset + IDX_CD_NAME, cd->name);
670                 offset += index_column_description_size(cd->name);
671         }
672         assert(offset = size);
673         filename = index_filename(t->desc);
674         if (filename)
675                 ret = write_file(filename, buf, size);
676         else
677                 ret = -ERRNO_TO_ERROR(ENOMEM);
678         free(buf);
679         free(filename);
680         return ret;
681 }
682
683 /**
684  * Create a new osl table.
685  *
686  * \param desc Pointer to the table description.
687  *
688  * \return Standard.
689  */
690 __export int osl_create_table(const struct osl_table_description *desc)
691 {
692         const struct osl_column_description *cd;
693         char *table_dir = NULL, *filename;
694         struct osl_table *t;
695         int i, ret = init_table_structure(desc, &t);
696
697         if (ret < 0)
698                 return ret;
699         INFO_LOG("creating %s\n", desc->name);
700         FOR_EACH_COLUMN(i, t->desc, cd) {
701                 if (cd->storage_type == OSL_NO_STORAGE)
702                         continue;
703                 if (!table_dir) {
704                         ret = para_mkdir(desc->dir, 0777);
705                         if (ret < 0 && !is_errno(-ret, EEXIST))
706                                 goto out;
707                         table_dir = make_message("%s/%s", desc->dir,
708                                 desc->name);
709                         ret = -ERRNO_TO_ERROR(ENOMEM);
710                         if (!table_dir)
711                                 goto out;
712                         ret = para_mkdir(table_dir, 0777);
713                         if (ret < 0)
714                                 goto out;
715                 }
716                 ret = -ERRNO_TO_ERROR(ENOMEM);
717                 filename = column_filename(t, i);
718                 if (!filename)
719                         goto out;
720                 INFO_LOG("filename: %s\n", filename);
721                 if (cd->storage_type == OSL_MAPPED_STORAGE) {
722                         ret = osl_open(filename, O_RDWR | O_CREAT | O_EXCL,
723                                 0644);
724                         free(filename);
725                         if (ret < 0)
726                                 goto out;
727                         close(ret);
728                         continue;
729                 }
730                 /* DISK STORAGE */
731                 ret = para_mkdir(filename, 0777);
732                 free(filename);
733                 if (ret < 0)
734                         goto out;
735         }
736         if (t->num_mapped_columns) {
737                 ret = create_table_index(t);
738                 if (ret < 0)
739                         goto out;
740         }
741         ret = 1;
742 out:
743         free(table_dir);
744         free(t->columns);
745         free(t);
746         return ret;
747 }
748
749 static int table_is_dirty(struct osl_table *t)
750 {
751         char *buf = (char *)t->index_map.data + IDX_DIRTY_FLAG;
752         uint8_t dirty = read_u8(buf) & 0x1;
753         return !!dirty;
754 }
755
756 static void mark_table_dirty(struct osl_table *t)
757 {
758         char *buf = (char *)t->index_map.data + IDX_DIRTY_FLAG;
759         write_u8(buf, read_u8(buf) | 1);
760 }
761
762 static void mark_table_clean(struct osl_table *t)
763 {
764         char *buf = (char *)t->index_map.data + IDX_DIRTY_FLAG;
765         write_u8(buf, read_u8(buf) & 0xfe);
766 }
767
768 static void unmap_column(struct osl_table *t, unsigned col_num)
769 {
770         struct osl_object map = t->columns[col_num].data_map;
771         int ret;
772         if (!map.data)
773                 return;
774         ret = para_munmap(map.data, map.size);
775         assert(ret > 0);
776         map.data = NULL;
777 }
778
779 /**
780  * Unmap all mapped files of an osl table.
781  *
782  * \param t Pointer to a mapped table.
783  * \param flags Options for unmapping.
784  *
785  * \return Positive on success, negative on errors.
786  *
787  * \sa map_table(), enum osl_close_flags, para_munmap().
788  */
789 int unmap_table(struct osl_table *t, enum osl_close_flags flags)
790 {
791         unsigned i;
792         const struct osl_column_description *cd;
793         int ret;
794
795         if (!t->num_mapped_columns) /* can this ever happen? */
796                 return 1;
797         DEBUG_LOG("unmapping table '%s'\n", t->desc->name);
798         if (!t->index_map.data)
799                 return -E_OSL_NOT_MAPPED;
800         if (flags & OSL_MARK_CLEAN)
801                 mark_table_clean(t);
802         ret = para_munmap(t->index_map.data, t->index_map.size);
803         if (ret < 0)
804                 return ret;
805         t->index_map.data = NULL;
806         if (!t->num_rows)
807                 return 1;
808         FOR_EACH_MAPPED_COLUMN(i, t, cd)
809                 unmap_column(t, i);
810         return 1;
811 }
812
813 static int map_column(struct osl_table *t, unsigned col_num)
814 {
815         struct stat statbuf;
816         char *filename = column_filename(t, col_num);
817         int ret = -E_OSL_STAT;
818
819         if (!filename)
820                 return -ERRNO_TO_ERROR(ENOMEM);
821         if (stat(filename, &statbuf) < 0) {
822                 free(filename);
823                 return ret;
824         }
825         if (!(S_IFREG & statbuf.st_mode)) {
826                 free(filename);
827                 return ret;
828         }
829         ret = mmap_full_file(filename, O_RDWR,
830                 &t->columns[col_num].data_map.data,
831                 &t->columns[col_num].data_map.size,
832                 NULL);
833         free(filename);
834         return ret;
835 }
836
837 /**
838  * Map the index file and all columns of type \p OSL_MAPPED_STORAGE into memory.
839  *
840  * \param t Pointer to an initialized table structure.
841  * \param flags Mapping options.
842  *
843  * \return Negative return value on errors; on success the number of rows
844  * (including invalid rows) is returned.
845  *
846  * \sa unmap_table(), enum map_table_flags, osl_open_table(), mmap(2).
847  */
848 int map_table(struct osl_table *t, enum map_table_flags flags)
849 {
850         char *filename;
851         const struct osl_column_description *cd;
852         int i = 0, ret, num_rows = 0;
853
854         if (!t->num_mapped_columns)
855                 return 0;
856         if (t->index_map.data)
857                 return -E_OSL_ALREADY_MAPPED;
858         filename = index_filename(t->desc);
859         if (!filename)
860                 return -ERRNO_TO_ERROR(ENOMEM);
861         DEBUG_LOG("mapping table '%s' (index: %s)\n", t->desc->name, filename);
862         ret = mmap_full_file(filename, flags & MAP_TBL_FL_MAP_RDONLY?
863                 O_RDONLY : O_RDWR, &t->index_map.data, &t->index_map.size, NULL);
864         free(filename);
865         if (ret < 0)
866                 return ret;
867         if (flags & MAP_TBL_FL_VERIFY_INDEX) {
868                 ret = compare_table_descriptions(t);
869                 if (ret < 0)
870                         goto err;
871         }
872         ret = -E_OSL_BUSY;
873         if (!(flags & MAP_TBL_FL_IGNORE_DIRTY)) {
874                 if (table_is_dirty(t)) {
875                         ERROR_LOG("%s is dirty\n", t->desc->name);
876                         goto err;
877                 }
878         }
879         mark_table_dirty(t);
880         num_rows = table_num_rows(t);
881         if (!num_rows)
882                 return num_rows;
883         /* map data files */
884         FOR_EACH_MAPPED_COLUMN(i, t, cd) {
885                 ret = map_column(t, i);
886                 if (ret < 0)
887                         goto err;
888         }
889         return num_rows;
890 err:    /* unmap what is already mapped */
891         for (i--; i >= 0; i--) {
892                 struct osl_object map = t->columns[i].data_map;
893                 para_munmap(map.data, map.size);
894                 map.data = NULL;
895         }
896         para_munmap(t->index_map.data, t->index_map.size);
897         t->index_map.data = NULL;
898         return ret;
899 }
900
901 /**
902  * Retrieve a mapped object by row and column number.
903  *
904  * \param t Pointer to an open osl table.
905  * \param col_num Number of the mapped column containing the object to retrieve.
906  * \param row_num Number of the row containing the object to retrieve.
907  * \param obj The result is returned here.
908  *
909  * It is considered an error if \a col_num does not refer to a column
910  * of storage type \p OSL_MAPPED_STORAGE.
911  *
912  * \return Standard.
913  *
914  * \sa osl_storage_type.
915  */
916 int get_mapped_object(const struct osl_table *t, unsigned col_num,
917         uint32_t row_num, struct osl_object *obj)
918 {
919         struct osl_column *col = &t->columns[col_num];
920         uint32_t offset;
921         char *header;
922         char *cell_index;
923         int ret;
924
925         if (t->num_rows <= row_num)
926                 return -E_OSL_BAD_ROW_NUM;
927         ret = get_cell_index(t, row_num, col_num, &cell_index);
928         if (ret < 0)
929                 return ret;
930         offset = read_u32(cell_index);
931         obj->size = read_u32(cell_index + 4) - 1;
932         header = col->data_map.data + offset;
933         obj->data = header + 1;
934         if (read_u8(header) == 0xff) {
935                 ERROR_LOG("col %u, size %zu, offset %u\n", col_num,
936                         obj->size, offset);
937                 return -E_OSL_INVALID_OBJECT;
938         }
939         return 1;
940 }
941
942 static int search_rbtree(const struct osl_object *obj,
943                 const struct osl_table *t, unsigned col_num,
944                 struct rb_node **result, struct rb_node ***rb_link)
945 {
946         struct osl_column *col = &t->columns[col_num];
947         struct rb_node **new = &col->rbtree.rb_node, *parent = NULL;
948         const struct osl_column_description *cd =
949                 get_column_description(t->desc, col_num);
950         enum osl_storage_type st = cd->storage_type;
951         while (*new) {
952                 struct osl_row *this_row = get_row_pointer(*new,
953                         col->rbtree_num);
954                 int ret;
955                 struct osl_object this_obj;
956                 parent = *new;
957                 if (st == OSL_MAPPED_STORAGE) {
958                         ret = get_mapped_object(t, col_num, this_row->num,
959                                 &this_obj);
960                         if (ret < 0)
961                                 return ret;
962                 } else
963                         this_obj = this_row->volatile_objects[col->volatile_num];
964                 ret = cd->compare_function(obj, &this_obj);
965                 if (!ret) {
966                         if (result)
967                                 *result = get_rb_node_pointer(this_row,
968                                         col->rbtree_num);
969                         return 1;
970                 }
971                 if (ret < 0)
972                         new = &((*new)->rb_left);
973                 else
974                         new = &((*new)->rb_right);
975         }
976         if (result)
977                 *result = parent;
978         if (rb_link)
979                 *rb_link = new;
980         return -E_OSL_RB_KEY_NOT_FOUND;
981 }
982
983 static int insert_rbtree(struct osl_table *t, unsigned col_num,
984         const struct osl_row *row, const struct osl_object *obj)
985 {
986         struct rb_node *parent, **rb_link;
987         unsigned rbtree_num;
988         struct rb_node *n;
989         int ret = search_rbtree(obj, t, col_num, &parent, &rb_link);
990
991         if (ret > 0)
992                 return -E_OSL_RB_KEY_EXISTS;
993         rbtree_num = t->columns[col_num].rbtree_num;
994         n = get_rb_node_pointer(row, rbtree_num);
995         rb_link_node(n, parent, rb_link);
996         rb_insert_color(n, &t->columns[col_num].rbtree);
997         return 1;
998 }
999
1000 static void remove_rb_node(struct osl_table *t, unsigned col_num,
1001                 const struct osl_row *row)
1002 {
1003         struct osl_column *col = &t->columns[col_num];
1004         const struct osl_column_description *cd =
1005                 get_column_description(t->desc, col_num);
1006         enum osl_storage_flags sf = cd->storage_flags;
1007         struct rb_node *victim, *splice_out_node, *tmp;
1008         if (!(sf & OSL_RBTREE))
1009                 return;
1010         /*
1011          * Which node is removed/spliced out actually depends on how many
1012          * children the victim node has: If it has no children, it gets
1013          * deleted. If it has one child, it gets spliced out. If it has two
1014          * children, its successor (which has at most a right child) gets
1015          * spliced out.
1016          */
1017         victim = get_rb_node_pointer(row, col->rbtree_num);
1018         if (victim->rb_left && victim->rb_right)
1019                 splice_out_node = rb_next(victim);
1020         else
1021                 splice_out_node = victim;
1022         /* Go up to the root and decrement the size of each node in the path. */
1023         for (tmp = splice_out_node; tmp; tmp = rb_parent(tmp))
1024                 tmp->size--;
1025         rb_erase(victim, &col->rbtree);
1026 }
1027
1028 static int add_row_to_rbtrees(struct osl_table *t, uint32_t row_num,
1029                 struct osl_object *volatile_objs, struct osl_row **row_ptr)
1030 {
1031         unsigned i;
1032         int ret;
1033         struct osl_row *row = allocate_row(t->num_rbtrees);
1034         const struct osl_column_description *cd;
1035
1036         if (!row)
1037                 return -ERRNO_TO_ERROR(ENOMEM);
1038         row->num = row_num;
1039         row->volatile_objects = volatile_objs;
1040         FOR_EACH_RBTREE_COLUMN(i, t, cd) {
1041                 if (cd->storage_type == OSL_MAPPED_STORAGE) {
1042                         struct osl_object obj;
1043                         ret = get_mapped_object(t, i, row_num, &obj);
1044                         if (ret < 0)
1045                                 goto err;
1046                         ret = insert_rbtree(t, i, row, &obj);
1047                 } else { /* volatile */
1048                         const struct osl_object *obj
1049                                 = volatile_objs + t->columns[i].volatile_num;
1050                         ret = insert_rbtree(t, i, row, obj);
1051                 }
1052                 if (ret < 0)
1053                         goto err;
1054         }
1055         if (row_ptr)
1056                 *row_ptr = row;
1057         return 1;
1058 err: /* rollback changes, i.e. remove added entries from rbtrees */
1059         while (i)
1060                 remove_rb_node(t, i--, row);
1061         free(row);
1062         return ret;
1063 }
1064
1065 static void free_volatile_objects(const struct osl_table *t,
1066                 enum osl_close_flags flags)
1067 {
1068         int i, j;
1069         struct rb_node *n;
1070         struct osl_column *rb_col;
1071         const struct osl_column_description *cd;
1072
1073         if (!t->num_volatile_columns)
1074                 return;
1075         /* find the first rbtree column (any will do) */
1076         FOR_EACH_RBTREE_COLUMN(i, t, cd)
1077                 break;
1078         rb_col = t->columns + i;
1079         /* walk that rbtree and free all volatile objects */
1080         for (n = rb_first(&rb_col->rbtree); n; n = rb_next(n)) {
1081                 struct osl_row *r = get_row_pointer(n, rb_col->rbtree_num);
1082                 if (flags & OSL_FREE_VOLATILE)
1083                         FOR_EACH_VOLATILE_COLUMN(j, t, cd) {
1084                                 if (cd->storage_flags & OSL_DONT_FREE)
1085                                         continue;
1086                                 free(r->volatile_objects[
1087                                         t->columns[j].volatile_num].data);
1088                         }
1089 //                      for (j = 0; j < t->num_volatile_columns; j++)
1090 //                              free(r->volatile_objects[j].data);
1091                 free(r->volatile_objects);
1092         }
1093 }
1094
1095 /**
1096  * Erase all rbtree nodes and free resources.
1097  *
1098  * \param t Pointer to an open osl table.
1099  *
1100  * This function is called by osl_close_table().
1101  */
1102 void clear_rbtrees(struct osl_table *t)
1103 {
1104         const struct osl_column_description *cd;
1105         unsigned i, rbtrees_cleared = 0;
1106
1107         FOR_EACH_RBTREE_COLUMN(i, t, cd) {
1108                 struct osl_column *col = &t->columns[i];
1109                 struct rb_node *n;
1110                 rbtrees_cleared++;
1111                 for (n = rb_first(&col->rbtree); n;) {
1112                         struct osl_row *r;
1113                         rb_erase(n, &col->rbtree);
1114                         if (rbtrees_cleared == t->num_rbtrees) {
1115                                 r = get_row_pointer(n, col->rbtree_num);
1116                                 n = rb_next(n);
1117                                 free(r);
1118                         } else
1119                                 n = rb_next(n);
1120                 }
1121         }
1122
1123 }
1124
1125 /**
1126  * Close an osl table.
1127  *
1128  * \param t Pointer to the table to be closed.
1129  * \param flags Options for what should be cleaned up.
1130  *
1131  * If osl_open_table() succeeds, the resulting table pointer must later be
1132  * passed to this function in order to flush all changes to the file system and
1133  * to free the resources that were allocated by osl_open_table().
1134  *
1135  * \return Standard.
1136  *
1137  * \sa osl_open_table(), unmap_table().
1138  */
1139 __export int osl_close_table(struct osl_table *t, enum osl_close_flags flags)
1140 {
1141         int ret;
1142
1143         if (!t)
1144                 return -E_OSL_BAD_TABLE;
1145         free_volatile_objects(t, flags);
1146         clear_rbtrees(t);
1147         ret = unmap_table(t, flags);
1148         if (ret < 0)
1149                 ERROR_LOG("unmap_table failed: %d\n", ret);
1150         free(t->columns);
1151         free(t);
1152         return ret;
1153 }
1154
1155 /**
1156  * Find out whether the given row number corresponds to an invalid row.
1157  *
1158  * \param t Pointer to the osl table.
1159  * \param row_num The number of the row in question.
1160  *
1161  * By definition, a row is considered invalid if all its index entries
1162  * are invalid.
1163  *
1164  * \return Positive if \a row_num corresponds to an invalid row,
1165  * zero if it corresponds to a valid row, negative on errors.
1166  */
1167 int row_is_invalid(struct osl_table *t, uint32_t row_num)
1168 {
1169         char *row_index;
1170         int i, ret = get_row_index(t, row_num, &row_index);
1171
1172         if (ret < 0)
1173                 return ret;
1174         for (i = 0; i < t->row_index_size; i++) {
1175                 if ((unsigned char)row_index[i] != 0xff)
1176                         return 0;
1177         }
1178         INFO_LOG("row %d is invalid\n", row_num);
1179         return 1;
1180 }
1181
1182 /**
1183  * Invalidate a row of an osl table.
1184  *
1185  * \param t Pointer to an open osl table.
1186  * \param row_num Number of the row to mark as invalid.
1187  *
1188  * This function marks each mapped object in the index entry of \a row as
1189  * invalid.
1190  *
1191  * \return Standard.
1192  */
1193 int mark_row_invalid(struct osl_table *t, uint32_t row_num)
1194 {
1195         char *row_index;
1196         int ret = get_row_index(t, row_num, &row_index);
1197
1198         if (ret < 0)
1199                 return ret;
1200         INFO_LOG("marking row %d as invalid\n", row_num);
1201         memset(row_index, 0xff, t->row_index_size);
1202         return 1;
1203 }
1204
1205 /**
1206  * Initialize all rbtrees and compute number of invalid rows.
1207  *
1208  * \param t The table containing the rbtrees to be initialized.
1209  *
1210  * \return Standard.
1211  */
1212 int init_rbtrees(struct osl_table *t)
1213 {
1214         int i, ret;
1215         const struct osl_column_description *cd;
1216
1217         /* create rbtrees */
1218         FOR_EACH_RBTREE_COLUMN(i, t, cd)
1219                 t->columns[i].rbtree = RB_ROOT;
1220         /* add valid rows to rbtrees */
1221         t->num_invalid_rows = 0;
1222         for (i = 0; i < t->num_rows; i++) {
1223                 ret = row_is_invalid(t, i);
1224                 if (ret < 0)
1225                         return ret;
1226                 if (ret) {
1227                         t->num_invalid_rows++;
1228                         continue;
1229                 }
1230                 ret = add_row_to_rbtrees(t, i, NULL, NULL);
1231                 if (ret < 0)
1232                         return ret;
1233         }
1234         return 1;
1235 }
1236
1237 /**
1238  * Open an osl table.
1239  *
1240  * Each osl table must be opened before its data can be accessed.
1241  *
1242  * \param table_desc Describes the table to be opened.
1243  * \param result Contains a pointer to the open table on success.
1244  *
1245  * The table description given by \a desc should coincide with the
1246  * description used at creation time.
1247  *
1248  * \return Standard.
1249  */
1250 __export int osl_open_table(const struct osl_table_description *table_desc,
1251                 struct osl_table **result)
1252 {
1253         int i, ret;
1254         struct osl_table *t;
1255         const struct osl_column_description *cd;
1256
1257         INFO_LOG("opening table %s\n", table_desc->name);
1258         ret = init_table_structure(table_desc, &t);
1259         if (ret < 0)
1260                 return ret;
1261         FOR_EACH_DISK_STORAGE_COLUMN(i, t, cd) {
1262                 struct stat statbuf;
1263                 char *dirname = column_filename(t, i);
1264
1265                 ret = -ERRNO_TO_ERROR(ENOMEM);
1266                 if (!dirname)
1267                         goto err;
1268                 /* check if directory exists */
1269                 ret = stat(dirname, &statbuf);
1270                 free(dirname);
1271                 if (ret < 0) {
1272                         ret = -ERRNO_TO_ERROR(errno);
1273                         goto err;
1274                 }
1275                 ret = -ERRNO_TO_ERROR(ENOTDIR);
1276                 if (!S_ISDIR(statbuf.st_mode))
1277                         goto err;
1278         }
1279         ret = map_table(t, MAP_TBL_FL_VERIFY_INDEX);
1280         if (ret < 0)
1281                 goto err;
1282         t->num_rows = ret;
1283         DEBUG_LOG("num rows: %d\n", t->num_rows);
1284         ret = init_rbtrees(t);
1285         if (ret < 0) {
1286                 osl_close_table(t, OSL_MARK_CLEAN); /* ignore further errors */
1287                 return ret;
1288         }
1289         *result = t;
1290         return 1;
1291 err:
1292         free(t->columns);
1293         free(t);
1294         return ret;
1295 }
1296
1297 static int create_disk_storage_object_dir(const struct osl_table *t,
1298                 unsigned col_num, const char *ds_name)
1299 {
1300         char *dirname;
1301         int ret;
1302
1303         if (!(t->desc->flags & OSL_LARGE_TABLE))
1304                 return 1;
1305         dirname = disk_storage_dirname(t, col_num, ds_name);
1306         if (!dirname)
1307                 return -ERRNO_TO_ERROR(ENOMEM);
1308         ret = para_mkdir(dirname, 0777);
1309         free(dirname);
1310         if (ret < 0 && !is_errno(-ret, EEXIST))
1311                 return ret;
1312         return 1;
1313 }
1314
1315 static int write_disk_storage_file(const struct osl_table *t, unsigned col_num,
1316         const struct osl_object *obj, const char *ds_name)
1317 {
1318         int ret;
1319         char *filename;
1320
1321         ret = create_disk_storage_object_dir(t, col_num, ds_name);
1322         if (ret < 0)
1323                 return ret;
1324         filename = disk_storage_path(t, col_num, ds_name);
1325         if (!filename)
1326                 return -ERRNO_TO_ERROR(ENOMEM);
1327         ret = write_file(filename, obj->data, obj->size);
1328         free(filename);
1329         return ret;
1330 }
1331
1332 static int append_map_file(const struct osl_table *t, unsigned col_num,
1333         const struct osl_object *obj, uint32_t *new_size)
1334 {
1335         char *filename = column_filename(t, col_num);
1336         int ret;
1337         char header = 0; /* zero means valid object */
1338
1339         if (!filename)
1340                 return -ERRNO_TO_ERROR(ENOMEM);
1341         ret = append_file(filename, &header, 1, obj->data, obj->size,
1342                 new_size);
1343         free(filename);
1344         return ret;
1345 }
1346
1347 static int append_row_index(const struct osl_table *t, char *row_index)
1348 {
1349         char *filename;
1350         int ret;
1351
1352         if (!t->num_mapped_columns)
1353                 return 1;
1354         filename = index_filename(t->desc);
1355         if (!filename)
1356                 return -ERRNO_TO_ERROR(ENOMEM);
1357         ret = append_file(filename, NULL, 0, row_index,
1358                 t->row_index_size, NULL);
1359         free(filename);
1360         return ret;
1361 }
1362
1363 /**
1364  * A wrapper for truncate(2)
1365  *
1366  * \param path Name of the regular file to truncate
1367  * \param size Number of bytes to \b shave \b off
1368  *
1369  * Truncate the regular file named by \a path by \a size bytes.
1370  *
1371  * \return Standard.
1372  *
1373  * \sa truncate(2)
1374  */
1375 int para_truncate(const char *path, off_t size)
1376 {
1377         int ret;
1378         struct stat statbuf;
1379
1380         ret = -E_OSL_STAT;
1381         if (stat(path, &statbuf) < 0)
1382                 goto out;
1383         ret = -E_OSL_BAD_SIZE;
1384         if (statbuf.st_size < size)
1385                 goto out;
1386         ret = -E_OSL_TRUNC;
1387         if (truncate(path, statbuf.st_size - size) < 0)
1388                 goto out;
1389         ret = 1;
1390 out:
1391         return ret;
1392 }
1393
1394 static int truncate_mapped_file(const struct osl_table *t, unsigned col_num,
1395                 off_t size)
1396 {
1397         int ret;
1398         char *filename = column_filename(t, col_num);
1399
1400         if (!filename)
1401                 return -ERRNO_TO_ERROR(ENOMEM);
1402         ret = para_truncate(filename, size);
1403         free(filename);
1404         return ret;
1405 }
1406
1407 static int delete_disk_storage_file(const struct osl_table *t, unsigned col_num,
1408                 const char *ds_name)
1409 {
1410         char *dirname, *filename = disk_storage_path(t, col_num, ds_name);
1411         int ret, err;
1412
1413         if (!filename)
1414                 return -ERRNO_TO_ERROR(ENOMEM);
1415         ret = unlink(filename);
1416         err = errno;
1417         free(filename);
1418         if (ret < 0)
1419                 return -ERRNO_TO_ERROR(err);
1420         if (!(t->desc->flags & OSL_LARGE_TABLE))
1421                 return 1;
1422         dirname = disk_storage_dirname(t, col_num, ds_name);
1423         if (!dirname)
1424                 return -ERRNO_TO_ERROR(ENOMEM);
1425         rmdir(dirname);
1426         free(dirname);
1427         return 1;
1428 }
1429
1430 /**
1431  * Add a new row to an osl table and retrieve this row.
1432  *
1433  * \param t Pointer to an open osl table.
1434  * \param objects Array of objects to be added.
1435  * \param row Result pointer.
1436  *
1437  * The \a objects parameter must point to an array containing one object per
1438  * column.  The order of the objects in the array is given by the table
1439  * description of \a table. Several sanity checks are performed during object
1440  * insertion and the function returns without modifying the table if any of
1441  * these tests fail.  In fact, it is atomic in the sense that it either
1442  * succeeds or leaves the table unchanged (i.e. either all or none of the
1443  * objects are added to the table).
1444  *
1445  * It is considered an error if an object is added to a column with associated
1446  * rbtree if this object is equal to an object already contained in that column
1447  * (i.e. the compare function for the column's rbtree returns zero).
1448  *
1449  * \return Standard.
1450  *
1451  * \sa struct osl_table_description, osl_compare_func, osl_add_row().
1452  */
1453 __export int osl_add_and_get_row(struct osl_table *t, struct osl_object *objects,
1454                 struct osl_row **row)
1455 {
1456         int i, ret;
1457         char *ds_name = NULL;
1458         struct rb_node **rb_parents = NULL, ***rb_links = NULL;
1459         char *new_row_index = NULL;
1460         struct osl_object *volatile_objs = NULL;
1461         const struct osl_column_description *cd;
1462
1463         if (!t)
1464                 return -E_OSL_BAD_TABLE;
1465         rb_parents = malloc(t->num_rbtrees * sizeof(struct rn_node*));
1466         if (!rb_parents)
1467                 return -ERRNO_TO_ERROR(ENOMEM);
1468         rb_links = malloc(t->num_rbtrees * sizeof(struct rn_node**));
1469         if (!rb_links) {
1470                 free(rb_parents);
1471                 return -ERRNO_TO_ERROR(ENOMEM);
1472         }
1473         if (t->num_mapped_columns) {
1474                 new_row_index = malloc(t->row_index_size);
1475                 if (!new_row_index) {
1476                         free(rb_links);
1477                         free(rb_parents);
1478                         return -ERRNO_TO_ERROR(ENOMEM);
1479                 }
1480         }
1481         /* pass 1: sanity checks */
1482 //      DEBUG_LOG("sanity tests: %p:%p\n", objects[0].data,
1483 //              objects[1].data);
1484         FOR_EACH_COLUMN(i, t->desc, cd) {
1485                 enum osl_storage_type st = cd->storage_type;
1486                 enum osl_storage_flags sf = cd->storage_flags;
1487
1488 //              ret = -E_OSL_NULL_OBJECT;
1489 //              if (!objects[i])
1490 //                      goto out;
1491                 if (st == OSL_DISK_STORAGE)
1492                         continue;
1493                 if (sf & OSL_RBTREE) {
1494                         unsigned rbtree_num = t->columns[i].rbtree_num;
1495                         ret = -E_OSL_RB_KEY_EXISTS;
1496 //                      DEBUG_LOG("checking whether %p exists\n",
1497 //                              objects[i].data);
1498                         if (search_rbtree(objects + i, t, i,
1499                                         &rb_parents[rbtree_num],
1500                                         &rb_links[rbtree_num]) > 0)
1501                                 goto out;
1502                 }
1503                 if (sf & OSL_FIXED_SIZE) {
1504 //                      DEBUG_LOG("fixed size. need: %zu, have: %d\n",
1505 //                              objects[i].size, cd->data_size);
1506                         ret = -E_OSL_BAD_DATA_SIZE;
1507                         if (objects[i].size != cd->data_size)
1508                                 goto out;
1509                 }
1510         }
1511         if (t->num_disk_storage_columns) {
1512                 ds_name = disk_storage_name_of_object(t,
1513                         &objects[t->disk_storage_name_column]);
1514                 ret = -ERRNO_TO_ERROR(ENOMEM);
1515                 if (!ds_name)
1516                         goto out;
1517         }
1518         ret = unmap_table(t, OSL_MARK_CLEAN);
1519         if (ret < 0)
1520                 goto out;
1521 //      DEBUG_LOG("sanity tests passed%s\n", "");
1522         /* pass 2: create data files, append map data */
1523         FOR_EACH_COLUMN(i, t->desc, cd) {
1524                 enum osl_storage_type st = cd->storage_type;
1525                 if (st == OSL_NO_STORAGE)
1526                         continue;
1527                 if (st == OSL_MAPPED_STORAGE) {
1528                         uint32_t new_size;
1529                         struct osl_column *col = &t->columns[i];
1530 //                      DEBUG_LOG("appending object of size %zu\n",
1531 //                              objects[i].size);
1532                         ret = append_map_file(t, i, objects + i, &new_size);
1533                         if (ret < 0)
1534                                 goto rollback;
1535                         update_cell_index(new_row_index, col, new_size,
1536                                 objects[i].size);
1537                         continue;
1538                 }
1539                 /* DISK_STORAGE */
1540                 ret = write_disk_storage_file(t, i, objects + i, ds_name);
1541                 if (ret < 0)
1542                         goto rollback;
1543         }
1544         ret = append_row_index(t, new_row_index);
1545         if (ret < 0)
1546                 goto rollback;
1547         ret = map_table(t, MAP_TBL_FL_VERIFY_INDEX);
1548         if (ret < 0) { /* truncate index and rollback changes */
1549                 char *filename = index_filename(t->desc);
1550                 if (filename)
1551                         para_truncate(filename, t->row_index_size);
1552                 free(filename);
1553                 goto rollback;
1554         }
1555         /* pass 3: add entry to rbtrees */
1556         if (t->num_volatile_columns) {
1557                 ret = -ERRNO_TO_ERROR(ENOMEM);
1558                 volatile_objs = calloc(t->num_volatile_columns,
1559                         sizeof(struct osl_object));
1560                 if (!volatile_objs)
1561                         goto out;
1562                 FOR_EACH_VOLATILE_COLUMN(i, t, cd)
1563                         volatile_objs[t->columns[i].volatile_num] = objects[i];
1564         }
1565         t->num_rows++;
1566 //      DEBUG_LOG("adding new entry as row #%d\n", t->num_rows - 1);
1567         ret = add_row_to_rbtrees(t, t->num_rows - 1, volatile_objs, row);
1568         if (ret < 0)
1569                 goto out;
1570 //      DEBUG_LOG("added new entry as row #%d\n", t->num_rows - 1);
1571         ret = 1;
1572         goto out;
1573 rollback: /* rollback all changes made, ignore further errors */
1574         for (i--; i >= 0; i--) {
1575                 cd = get_column_description(t->desc, i);
1576                 enum osl_storage_type st = cd->storage_type;
1577                 if (st == OSL_NO_STORAGE)
1578                         continue;
1579
1580                 if (st == OSL_MAPPED_STORAGE)
1581                         truncate_mapped_file(t, i, objects[i].size);
1582                 else /* disk storage */
1583                         delete_disk_storage_file(t, i, ds_name);
1584         }
1585         /* ignore error and return previous error value */
1586         map_table(t, MAP_TBL_FL_VERIFY_INDEX);
1587 out:
1588         free(new_row_index);
1589         free(ds_name);
1590         free(rb_parents);
1591         free(rb_links);
1592         return ret;
1593 }
1594
1595 /**
1596  * Add a new row to an osl table.
1597  *
1598  * \param t Same meaning as osl_add_and_get_row().
1599  * \param objects Same meaning as osl_add_and_get_row().
1600  *
1601  * \return The return value of the underlying call to osl_add_and_get_row().
1602  *
1603  * This is equivalent to osl_add_and_get_row(t, objects, NULL).
1604  */
1605 __export int osl_add_row(struct osl_table *t, struct osl_object *objects)
1606 {
1607         return osl_add_and_get_row(t, objects, NULL);
1608 }
1609
1610 /**
1611  * Retrieve an object identified by row and column
1612  *
1613  * \param t Pointer to an open osl table.
1614  * \param r Pointer to the row.
1615  * \param col_num The column number.
1616  * \param object The result pointer.
1617  *
1618  * The column determined by \a col_num must be of type \p OSL_MAPPED_STORAGE
1619  * or \p OSL_NO_STORAGE, i.e. no disk storage objects may be retrieved by this
1620  * function.
1621  *
1622  * \return Standard.
1623  *
1624  * \sa osl_storage_type, osl_open_disk_object().
1625  */
1626 __export int osl_get_object(const struct osl_table *t, const struct osl_row *r,
1627         unsigned col_num, struct osl_object *object)
1628 {
1629         const struct osl_column_description *cd;
1630
1631         if (!t)
1632                 return -E_OSL_BAD_TABLE;
1633         cd = get_column_description(t->desc, col_num);
1634         /* col must not be disk storage */
1635         if (cd->storage_type == OSL_DISK_STORAGE)
1636                 return -E_OSL_BAD_STORAGE_TYPE;
1637         if (cd->storage_type == OSL_MAPPED_STORAGE)
1638                 return get_mapped_object(t, col_num, r->num, object);
1639         /* volatile */
1640         *object = r->volatile_objects[t->columns[col_num].volatile_num];
1641         return 1;
1642 }
1643
1644 static int mark_mapped_object_invalid(const struct osl_table *t,
1645                 uint32_t row_num, unsigned col_num)
1646 {
1647         struct osl_object obj;
1648         char *p;
1649         int ret = get_mapped_object(t, col_num, row_num, &obj);
1650
1651         if (ret < 0)
1652                 return ret;
1653         p = obj.data;
1654         p--;
1655         *p = 0xff;
1656         return 1;
1657 }
1658
1659 /**
1660  * Delete a row from an osl table.
1661  *
1662  * \param t Pointer to an open osl table.
1663  * \param row Pointer to the row to delete.
1664  *
1665  * This removes all disk storage objects, removes all rbtree nodes,  and frees
1666  * all volatile objects belonging to the given row. For mapped columns, the
1667  * data is merely marked invalid and may be pruned from time to time by
1668  * para_fsck.
1669  *
1670  * \return Standard.
1671  */
1672 __export int osl_del_row(struct osl_table *t, struct osl_row *row)
1673 {
1674         struct osl_row *r = row;
1675         int i, ret;
1676         const struct osl_column_description *cd;
1677
1678         if (!t)
1679                 return -E_OSL_BAD_TABLE;
1680         INFO_LOG("deleting row %p\n", row);
1681
1682         if (t->num_disk_storage_columns) {
1683                 char *ds_name;
1684                 ret = disk_storage_name_of_row(t, r, &ds_name);
1685                 if (ret < 0)
1686                         goto out;
1687                 FOR_EACH_DISK_STORAGE_COLUMN(i, t, cd)
1688                         delete_disk_storage_file(t, i, ds_name);
1689                 free(ds_name);
1690         }
1691         FOR_EACH_COLUMN(i, t->desc, cd) {
1692                 struct osl_column *col = t->columns + i;
1693                 enum osl_storage_type st = cd->storage_type;
1694                 remove_rb_node(t, i, r);
1695                 if (st == OSL_MAPPED_STORAGE) {
1696                         mark_mapped_object_invalid(t, r->num, i);
1697                         continue;
1698                 }
1699                 if (st == OSL_NO_STORAGE && !(cd->storage_flags & OSL_DONT_FREE))
1700                         free(r->volatile_objects[col->volatile_num].data);
1701         }
1702         if (t->num_mapped_columns) {
1703                 ret = mark_row_invalid(t, r->num);
1704                 if (ret < 0)
1705                         goto out;
1706                 t->num_invalid_rows++;
1707         } else
1708                 t->num_rows--;
1709         ret = 1;
1710 out:
1711         free(r->volatile_objects);
1712         free(r);
1713         return ret;
1714 }
1715
1716 /* test if column has an rbtree */
1717 static int check_rbtree_col(const struct osl_table *t, unsigned col_num,
1718                 struct osl_column **col)
1719 {
1720         if (!t)
1721                 return -E_OSL_BAD_TABLE;
1722         if (!(get_column_description(t->desc, col_num)->storage_flags & OSL_RBTREE))
1723                 return -E_OSL_BAD_STORAGE_FLAGS;
1724         *col = t->columns + col_num;
1725         return 1;
1726 }
1727
1728 /**
1729  * Get the row that contains the given object.
1730  *
1731  * \param t Pointer to an open osl table.
1732  * \param col_num The number of the column to be searched.
1733  * \param obj The object to be looked up.
1734  * \param result Points to the row containing \a obj.
1735  *
1736  * Lookup \a obj in \a t and return the row containing \a obj. The column
1737  * specified by \a col_num must have an associated rbtree.
1738  *
1739  * \return Standard.
1740  *
1741  * \sa osl_storage_flags
1742  */
1743 __export int osl_get_row(const struct osl_table *t, unsigned col_num,
1744                 const struct osl_object *obj, struct osl_row **result)
1745 {
1746         int ret;
1747         struct rb_node *node;
1748         struct osl_row *row;
1749         struct osl_column *col;
1750
1751         *result = NULL;
1752         ret = check_rbtree_col(t, col_num, &col);
1753         if (ret < 0)
1754                 return ret;
1755         ret = search_rbtree(obj, t, col_num, &node, NULL);
1756         if (ret < 0)
1757                 return ret;
1758         row = get_row_pointer(node, t->columns[col_num].rbtree_num);
1759         *result = row;
1760         return 1;
1761 }
1762
1763 static int rbtree_loop(struct osl_column *col,  void *private_data,
1764                 osl_rbtree_loop_func *func)
1765 {
1766         struct rb_node *n, *tmp;
1767
1768         /* this for-loop is safe against removal of an entry */
1769         for (n = rb_first(&col->rbtree), tmp = n? rb_next(n) : NULL;
1770                         n;
1771                         n = tmp, tmp = tmp? rb_next(tmp) : NULL) {
1772                 struct osl_row *r = get_row_pointer(n, col->rbtree_num);
1773                 int ret = func(r, private_data);
1774                 if (ret < 0)
1775                         return ret;
1776         }
1777         return 1;
1778 }
1779
1780 static int rbtree_loop_reverse(struct osl_column *col,  void *private_data,
1781                 osl_rbtree_loop_func *func)
1782 {
1783         struct rb_node *n, *tmp;
1784
1785         /* safe against removal of an entry */
1786         for (n = rb_last(&col->rbtree), tmp = n? rb_prev(n) : NULL;
1787                         n;
1788                         n = tmp, tmp = tmp? rb_prev(tmp) : NULL) {
1789                 struct osl_row *r = get_row_pointer(n, col->rbtree_num);
1790                 int ret = func(r, private_data);
1791                 if (ret < 0)
1792                         return ret;
1793         }
1794         return 1;
1795 }
1796
1797 /**
1798  * Loop over all nodes in an rbtree.
1799  *
1800  * \param t Pointer to an open osl table.
1801  * \param col_num The column to use for iterating over the elements.
1802  * \param private_data Pointer that gets passed to \a func.
1803  * \param func The function to be called for each node in the rbtree.
1804  *
1805  * This function does an in-order walk of the rbtree associated with \a
1806  * col_num. It is an error if the \p OSL_RBTREE flag is not set for this
1807  * column. For each node in the rbtree, the given function \a func is called
1808  * with two pointers as arguments: The first osl_row* argument points to the
1809  * row that contains the object corresponding to the rbtree node currently
1810  * traversed, and the \a private_data pointer is passed verbatim to \a func as the
1811  * second argument. The loop terminates either if \a func returns a negative
1812  * value, or if all nodes of the tree have been visited.
1813  *
1814  *
1815  * \return Standard. If the termination of the loop was caused by \a func
1816  * returning a negative value, this value is returned.
1817  *
1818  * \sa osl_storage_flags, osl_rbtree_loop_reverse(), osl_compare_func.
1819  */
1820 __export int osl_rbtree_loop(const struct osl_table *t, unsigned col_num,
1821         void *private_data, osl_rbtree_loop_func *func)
1822 {
1823         struct osl_column *col;
1824
1825         int ret = check_rbtree_col(t, col_num, &col);
1826         if (ret < 0)
1827                 return ret;
1828         return rbtree_loop(col, private_data, func);
1829 }
1830
1831 /**
1832  * Loop over all nodes in an rbtree in reverse order.
1833  *
1834  * \param t Identical meaning as in \p osl_rbtree_loop().
1835  * \param col_num Identical meaning as in \p osl_rbtree_loop().
1836  * \param private_data Identical meaning as in \p osl_rbtree_loop().
1837  * \param func Identical meaning as in \p osl_rbtree_loop().
1838  *
1839  * This function is identical to \p osl_rbtree_loop(), the only difference
1840  * is that the tree is walked in reverse order.
1841  *
1842  * \return The same return value as \p osl_rbtree_loop().
1843  *
1844  * \sa osl_rbtree_loop().
1845  */
1846 __export int osl_rbtree_loop_reverse(const struct osl_table *t, unsigned col_num,
1847         void *private_data, osl_rbtree_loop_func *func)
1848 {
1849         struct osl_column *col;
1850
1851         int ret = check_rbtree_col(t, col_num, &col);
1852         if (ret < 0)
1853                 return ret;
1854         return rbtree_loop_reverse(col, private_data, func);
1855 }
1856
1857 /* TODO: Rollback changes on errors */
1858 static int rename_disk_storage_objects(struct osl_table *t,
1859                 struct osl_object *old_obj, struct osl_object *new_obj)
1860 {
1861         int i, ret;
1862         const struct osl_column_description *cd;
1863         char *old_ds_name, *new_ds_name;
1864
1865         if (!t->num_disk_storage_columns)
1866                 return 1; /* nothing to do */
1867         if (old_obj->size == new_obj->size && !memcmp(new_obj->data,
1868                         old_obj->data, new_obj->size))
1869                 return 1; /* object did not change */
1870         old_ds_name = disk_storage_name_of_object(t, old_obj);
1871         new_ds_name = disk_storage_name_of_object(t, new_obj);
1872         ret = -ERRNO_TO_ERROR(ENOMEM);
1873         if (!old_ds_name || ! new_ds_name)
1874                 goto out;
1875
1876         FOR_EACH_DISK_STORAGE_COLUMN(i, t, cd) {
1877                 char *old_filename, *new_filename;
1878                 ret = create_disk_storage_object_dir(t, i, new_ds_name);
1879                 if (ret < 0)
1880                         goto out;
1881                 old_filename = disk_storage_path(t, i, old_ds_name);
1882                 new_filename = disk_storage_path(t, i, new_ds_name);
1883                 if (!old_filename || !new_filename)
1884                         ret = -ERRNO_TO_ERROR(ENOMEM);
1885                 else
1886                         ret = para_rename(old_filename, new_filename);
1887                 free(old_filename);
1888                 free(new_filename);
1889                 if (ret < 0)
1890                         goto out;
1891         }
1892         ret = 1;
1893 out:
1894         free(old_ds_name);
1895         free(new_ds_name);
1896         return ret;
1897
1898 }
1899
1900 /**
1901  * Change an object in an osl table.
1902  *
1903  * \param t Pointer to an open osl table.
1904  * \param r Pointer to the row containing the object to be updated.
1905  * \param col_num Number of the column containing the object to be updated.
1906  * \param obj Pointer to the replacement object.
1907  *
1908  * This function  gets rid of all references to the old object. This includes
1909  * removal of the rbtree node in case there is an rbtree associated with \a
1910  * col_num. It then inserts \a obj into the table and the rbtree if necessary.
1911  *
1912  * If the \p OSL_RBTREE flag is set for \a col_num, you \b MUST call this
1913  * function in order to change the contents of an object, even for volatile or
1914  * mapped columns of constant size (which may be updated directly if \p
1915  * OSL_RBTREE is not set).  Otherwise the rbtree might become corrupted.
1916  *
1917  * \return Standard
1918  */
1919 __export int osl_update_object(struct osl_table *t, const struct osl_row *r,
1920                 unsigned col_num, struct osl_object *obj)
1921 {
1922         struct osl_column *col;
1923         const struct osl_column_description *cd;
1924         int ret;
1925
1926         if (!t)
1927                 return -E_OSL_BAD_TABLE;
1928         col = &t->columns[col_num];
1929         cd = get_column_description(t->desc, col_num);
1930         DEBUG_LOG("updating column %u of %s\n", col_num, t->desc->name);
1931         if (cd->storage_flags & OSL_RBTREE) {
1932                 if (search_rbtree(obj, t, col_num, NULL, NULL) > 0)
1933                         return -E_OSL_RB_KEY_EXISTS;
1934         }
1935         if (cd->storage_flags & OSL_FIXED_SIZE) {
1936                 if (obj->size != cd->data_size)
1937                         return -E_OSL_BAD_DATA_SIZE;
1938         }
1939         remove_rb_node(t, col_num, r);
1940         if (cd->storage_type == OSL_NO_STORAGE) { /* TODO: If fixed size, reuse object? */
1941                 free(r->volatile_objects[col->volatile_num].data);
1942                 r->volatile_objects[col->volatile_num] = *obj;
1943         } else if (cd->storage_type == OSL_DISK_STORAGE) {
1944                 char *ds_name;
1945                 ret = disk_storage_name_of_row(t, r, &ds_name);
1946                 if (ret < 0)
1947                         return ret;
1948                 ret = delete_disk_storage_file(t, col_num, ds_name);
1949                 if (ret < 0 && !is_errno(-ret, ENOENT)) {
1950                         free(ds_name);
1951                         return ret;
1952                 }
1953                 ret = write_disk_storage_file(t, col_num, obj, ds_name);
1954                 free(ds_name);
1955                 if (ret < 0)
1956                         return ret;
1957         } else { /* mapped storage */
1958                 struct osl_object old_obj;
1959                 ret = get_mapped_object(t, col_num, r->num, &old_obj);
1960                 if (ret < 0)
1961                         return ret;
1962                 /*
1963                  * If the updated column is the disk storage name column, the
1964                  * disk storage name changes, so we have to rename all disk
1965                  * storage objects accordingly.
1966                  */
1967                 if (col_num == t->disk_storage_name_column) {
1968                         ret = rename_disk_storage_objects(t, &old_obj, obj);
1969                         if (ret < 0)
1970                                 return ret;
1971                 }
1972                 if (cd->storage_flags & OSL_FIXED_SIZE)
1973                         memcpy(old_obj.data, obj->data, cd->data_size);
1974                 else { /* TODO: if the size doesn't change, use old space */
1975                         uint32_t new_data_map_size;
1976                         char *row_index;
1977                         ret = get_row_index(t, r->num, &row_index);
1978                         if (ret < 0)
1979                                 return ret;
1980                         ret = mark_mapped_object_invalid(t, r->num, col_num);
1981                         if (ret < 0)
1982                                 return ret;
1983                         unmap_column(t, col_num);
1984                         ret = append_map_file(t, col_num, obj,
1985                                 &new_data_map_size);
1986                         if (ret < 0)
1987                                 return ret;
1988                         ret = map_column(t, col_num);
1989                         if (ret < 0)
1990                                 return ret;
1991                         update_cell_index(row_index, col, new_data_map_size,
1992                                 obj->size);
1993                 }
1994         }
1995         if (cd->storage_flags & OSL_RBTREE) {
1996                 ret = insert_rbtree(t, col_num, r, obj);
1997                 if (ret < 0)
1998                         return ret;
1999         }
2000         return 1;
2001 }
2002
2003 /**
2004  * Retrieve an object of type \p OSL_DISK_STORAGE by row and column.
2005  *
2006  * \param t Pointer to an open osl table.
2007  * \param r Pointer to the row containing the object.
2008  * \param col_num The column number.
2009  * \param obj Points to the result upon successful return.
2010  *
2011  * For columns of type \p OSL_DISK_STORAGE, this function must be used to
2012  * retrieve one of its containing objects. Afterwards, osl_close_disk_object()
2013  * must be called in order to deallocate the resources.
2014  *
2015  * \return Standard.
2016  *
2017  * \sa osl_get_object(), osl_storage_type, osl_close_disk_object().
2018  */
2019 __export int osl_open_disk_object(const struct osl_table *t, const struct osl_row *r,
2020                 unsigned col_num, struct osl_object *obj)
2021 {
2022         const struct osl_column_description *cd;
2023         char *ds_name, *filename;
2024         int ret;
2025
2026         if (!t)
2027                 return -E_OSL_BAD_TABLE;
2028         cd = get_column_description(t->desc, col_num);
2029         if (cd->storage_type != OSL_DISK_STORAGE)
2030                 return -E_OSL_BAD_STORAGE_TYPE;
2031
2032         ret = disk_storage_name_of_row(t, r, &ds_name);
2033         if (ret < 0)
2034                 return ret;
2035         filename = disk_storage_path(t, col_num, ds_name);
2036         free(ds_name);
2037         if (!filename)
2038                 return -ERRNO_TO_ERROR(ENOMEM);
2039         DEBUG_LOG("filename: %s\n", filename);
2040         ret = mmap_full_file(filename, O_RDONLY, &obj->data, &obj->size, NULL);
2041         free(filename);
2042         return ret;
2043 }
2044
2045 /**
2046  * Free resources that were allocated during osl_open_disk_object().
2047  *
2048  * \param obj Pointer to the object previously returned by open_disk_object().
2049  *
2050  * \return The return value of the underlying call to para_munmap().
2051  *
2052  * \sa para_munmap().
2053  */
2054 __export int osl_close_disk_object(struct osl_object *obj)
2055 {
2056         return para_munmap(obj->data, obj->size);
2057 }
2058
2059 /**
2060  * Get the number of rows of the given table.
2061  *
2062  * \param t Pointer to an open osl table.
2063  * \param num_rows Result is returned here.
2064  *
2065  * The number of rows returned via \a num_rows excluding any invalid rows.
2066  *
2067  * \return Positive on success, \p -E_OSL_BAD_TABLE if \a t is \p NULL.
2068  */
2069 __export int osl_get_num_rows(const struct osl_table *t, unsigned *num_rows)
2070 {
2071         if (!t)
2072                 return -E_OSL_BAD_TABLE;
2073         assert(t->num_rows >= t->num_invalid_rows);
2074         *num_rows = t->num_rows - t->num_invalid_rows;
2075         return 1;
2076 }
2077
2078 /**
2079  * Get the rank of a row.
2080  *
2081  * \param t An open osl table.
2082  * \param r The row to get the rank of.
2083  * \param col_num The number of an rbtree column.
2084  * \param rank Result pointer.
2085  *
2086  * The rank is, by definition, the position of the row in the linear order
2087  * determined by an in-order tree walk of the rbtree associated with column
2088  * number \a col_num of \a table.
2089  *
2090  * \return Standard.
2091  *
2092  * \sa osl_get_nth_row().
2093  */
2094 __export int osl_get_rank(const struct osl_table *t, struct osl_row *r,
2095                 unsigned col_num, unsigned *rank)
2096 {
2097         struct osl_object obj;
2098         struct osl_column *col;
2099         struct rb_node *node;
2100         int ret = check_rbtree_col(t, col_num, &col);
2101
2102         if (ret < 0)
2103                 return ret;
2104         ret = osl_get_object(t, r, col_num, &obj);
2105         if (ret < 0)
2106                 return ret;
2107         ret = search_rbtree(&obj, t, col_num, &node, NULL);
2108         if (ret < 0)
2109                 return ret;
2110         ret = rb_rank(node, rank);
2111         if (ret < 0)
2112                 return -E_OSL_BAD_ROW;
2113         return 1;
2114 }
2115
2116 /**
2117  * Get the row with n-th greatest value.
2118  *
2119  * \param t Pointer to an open osl table.
2120  * \param col_num The column number.
2121  * \param n The rank of the desired row.
2122  * \param result Row is returned here.
2123  *
2124  * Retrieve the n-th order statistic with respect to the compare function
2125  * of the rbtree column \a col_num. In other words, get that row with
2126  * \a n th greatest value in column \a col_num. It's an error if
2127  * \a col_num is not a rbtree column, or if \a n is larger than the
2128  * number of rows in the table.
2129  *
2130  * \return Standard.
2131  *
2132  * \sa osl_storage_flags, osl_compare_func, osl_get_row(),
2133  * osl_rbtree_last_row(), osl_rbtree_first_row(), osl_get_rank().
2134  */
2135 __export int osl_get_nth_row(const struct osl_table *t, unsigned col_num,
2136                 unsigned n, struct osl_row **result)
2137 {
2138         struct osl_column *col;
2139         struct rb_node *node;
2140         unsigned num_rows;
2141         int ret;
2142
2143         if (n == 0)
2144                 return -E_OSL_RB_KEY_NOT_FOUND;
2145         ret = osl_get_num_rows(t, &num_rows);
2146         if (ret < 0)
2147                 return ret;
2148         if (n > num_rows)
2149                 return -E_OSL_RB_KEY_NOT_FOUND;
2150         ret = check_rbtree_col(t, col_num, &col);
2151         if (ret < 0)
2152                 return ret;
2153         node = rb_nth(col->rbtree.rb_node, n);
2154         if (!node)
2155                 return -E_OSL_RB_KEY_NOT_FOUND;
2156         *result = get_row_pointer(node, col->rbtree_num);
2157         return 1;
2158 }
2159
2160 /**
2161  * Get the row corresponding to the smallest rbtree node of a column.
2162  *
2163  * \param t An open rbtree table.
2164  * \param col_num The number of the rbtree column.
2165  * \param result A pointer to the first row is returned here.
2166  *
2167  * The rbtree node of the smallest object (with respect to the corresponding
2168  * compare function) is selected and the row containing this object is
2169  * returned. It is an error if \a col_num refers to a column without an
2170  * associated rbtree.
2171  *
2172  * \return Standard.
2173  *
2174  * \sa osl_get_nth_row(), osl_rbtree_last_row().
2175  */
2176 __export int osl_rbtree_first_row(const struct osl_table *t, unsigned col_num,
2177                 struct osl_row **result)
2178 {
2179         return osl_get_nth_row(t, col_num, 1, result);
2180 }
2181
2182 /**
2183  * Get the row corresponding to the greatest rbtree node of a column.
2184  *
2185  * \param t The same meaning as in \p osl_rbtree_first_row().
2186  * \param col_num The same meaning as in \p osl_rbtree_first_row().
2187  * \param result The same meaning as in \p osl_rbtree_first_row().
2188  *
2189  * This function works just like osl_rbtree_first_row(), the only difference
2190  * is that the row containing the greatest rather than the smallest object is
2191  * returned.
2192  *
2193  * \return Standard.
2194  *
2195  * \sa osl_get_nth_row(), osl_rbtree_first_row().
2196  */
2197 __export int osl_rbtree_last_row(const struct osl_table *t, unsigned col_num,
2198                 struct osl_row **result)
2199 {
2200         unsigned num_rows;
2201         int ret = osl_get_num_rows(t, &num_rows);
2202
2203         if (ret < 0)
2204                 return ret;
2205         return osl_get_nth_row(t, col_num, num_rows, result);
2206 }