From 29712a5098842ea3930ec00ddd1c0b9d264ba9b5 Mon Sep 17 00:00:00 2001 From: Gediminas Jakutis Date: Sun, 21 Feb 2021 13:16:54 +0200 Subject: in-memory array sorting: GET! lists need a bit more work and after that, in-file should shortly follow. Signed-off-by: Gediminas Jakutis --- src/cache.c | 15 +-- src/cache.h | 1 + src/defs.h | 10 +- src/io.c | 241 ---------------------------------------------- src/io.h | 19 ---- src/main.c | 74 ++++++++------ src/mergesort.c | 32 ++++--- src/mergesort.h | 1 - src/meson.build | 4 +- src/stream.c | 293 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/stream.h | 21 ++++ 11 files changed, 394 insertions(+), 317 deletions(-) delete mode 100644 src/io.c delete mode 100644 src/io.h create mode 100644 src/stream.c create mode 100644 src/stream.h diff --git a/src/cache.c b/src/cache.c index f8a2026..19c5a92 100644 --- a/src/cache.c +++ b/src/cache.c @@ -8,8 +8,8 @@ #include #include #include "defs.h" - -static int cache_rewind(struct stream * const in); +#include "stream.h" +#include "cache.h" int cache_create(struct stream * const in) { @@ -34,6 +34,7 @@ int cache_populate(struct stream * const in) struct entry_l *tmp; try(in->settings->access != cached, err, EINVAL, "cannot populate cache: stream is uncached"); + try(!in->cache, err, EINVAL, "stream has no cache allocated"); /* if reading a a randstream, fall back to the one-element-at-a-time mode */ if (in->type == stream_randread) { @@ -48,7 +49,7 @@ int cache_populate(struct stream * const in) } } } else if (in->type == stream_in) { - try(1, err, ENOSYS, "populating cache from file is a TODO"); + try_s((ret == stream_readfile(in)), err); } else { try(1, err, EINVAL, "cannot populate a non-reading stream cache"); } @@ -94,7 +95,7 @@ int cache_block_copy(struct stream const * const src, struct stream * const dest try(src->settings->access != cached || dest->settings->access != cached, err, EINVAL, "cannot cache-copy between uncached streams"); try(!src->cache, err, EINVAL, "no cache to transfer"); - try(src->n > dest->settings->to, err, EINVAL, "invalid copy size"); + try(src->n < dest->settings->to, err, EINVAL, "invalid copy size"); try(!dest->cache, err, EINVAL, "no cache to transfer to"); memcpy(dest->cache, src->cache_a + dest->settings->ss, (dest->settings->to - dest->settings->ss) * dest->settings->stride); @@ -135,13 +136,13 @@ int cache_block_split(struct stream * const src, struct stream * const A, struct A->n = src->n / 2; B->n = src->n / 2 + (src->n & 1ul); A->index = B->index = 0; - A->parentid = B->parentid = random(); /* generate random parent ID that needs to match between children */ - A->type = B->type = stream_invalid; /* disallow any stream operations other than split/merge on children */ + A->type = B->type = stream_cache; /* disallow any stream operations other than split/merge on children */ A->fd = B->fd = -1; /* if we're splitting, these are for holding cache only */ /* we only care about these three functions for these temporary streams */ A->get_next_element_cache = B->get_next_element_cache = src->get_next_element_cache; A->place_next_element_cache = B->place_next_element_cache = src->place_next_element_cache; A->split = B->split = src->split; + A->rewind = B->rewind = src->rewind; tmp_settings = *src->settings; A->settings = B->settings = &tmp_settings; @@ -158,7 +159,7 @@ int cache_block_split(struct stream * const src, struct stream * const A, struct try_s((ret = cache_create(B)), err); try_s((ret = cache_block_copy(src, B)), err); - A->settings = B->settings = &tmp_settings; + A->settings = B->settings = src->settings; err: return ret; diff --git a/src/cache.h b/src/cache.h index 63f5560..dc22836 100644 --- a/src/cache.h +++ b/src/cache.h @@ -19,6 +19,7 @@ int cache_block_copy(struct stream const * const src, struct stream * const dest int cache_list_copy(struct stream * const src, struct stream * const dest); int cache_block_split(struct stream * const src, struct stream * const A, struct stream * const B); int cache_list_split(struct stream * const src, struct stream * const A, struct stream * const B); +int cache_rewind(struct stream * const in); /* GET */ struct entry_l *cached_get_array(struct stream * const in); diff --git a/src/defs.h b/src/defs.h index 8396015..425e97c 100644 --- a/src/defs.h +++ b/src/defs.h @@ -45,10 +45,6 @@ #define get(in) (in->settings->access == cached ? in->get_next_element_cache(in) : in->get_next_element_direct(in)) #define put(in, data) (in->settings->access == cached ? in->place_next_element_cache(in, data) : in->place_next_element_direct(in, data)) -#define stream_blank .fd = -1, .settings = &settings, .get_next_element_direct = stub_getnext, \ - .get_next_element_cache = stub_getnext, .place_next_element_direct = stub_put, \ - .place_next_element_cache = stub_put, .split = stub_split, .flush = stub_flush - union nextoff { struct entry_l *next; ptrdiff_t offset; @@ -89,15 +85,15 @@ enum dataformat { }; enum streamtype { - stream_invalid = -1, + stream_invalid, stream_in, stream_out, stream_outlite, + stream_cache, stream_randread }; struct stream { - long int parentid; size_t n; int fd; enum streamtype type; @@ -112,7 +108,7 @@ struct stream { int (*place_next_element_direct)(struct stream * const, struct entry_l const * const); int (*place_next_element_cache)(struct stream * const, struct entry_l const * const); int (*split)(struct stream * const, struct stream * const, struct stream * const); - int (*flush)(struct stream * const); + int (*rewind)(struct stream * const); }; struct settings { diff --git a/src/io.c b/src/io.c deleted file mode 100644 index 5f86678..0000000 --- a/src/io.c +++ /dev/null @@ -1,241 +0,0 @@ -/* SPDX-License-Identifier: LGPL-2.1-only */ - -/* Copyright (C) 2020 Gediminas Jakutis */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include "io.h" -#include "defs.h" -#include "datagen.h" -#include "cache.h" - -static int stream_open_out(struct stream * const in); -static int stream_open_out_lite(struct stream * const in); -static int stream_open_in(struct stream * const in); -static int stream_flush(struct stream * const in); -static int stream_flush_array(struct stream * const in); -static int stream_flush_list(struct stream * const in); - -int stream_open(struct stream * const in) -{ - int ret = 0; - - try(!in || in->fd > 0 || (!in->name && in->type != stream_randread), err, EINVAL, "invalid argunent"); - - switch (in->type) { - case (stream_outlite): - ret = stream_open_out_lite(in); - break; - case (stream_out): - try(!in->name, err, EINVAL, "no filename given"); - ret = stream_open_out(in); - break; - case (stream_in): - try(!in->name, err, EINVAL, "no filename given"); - ret = stream_open_in(in); - break; - case (stream_randread): - ; /* NOOP */ - break; - default: - try(0, err, EINVAL, "cannot open stream: stream is invalid"); - } - -err: - return ret; -} - -int stream_close(struct stream * const in) -{ - int ret = 0; - - try(!in || (in->fd < 0 && in->type != stream_randread), early_err, EINVAL, "invalid argunent"); - - if (in->type != stream_out) { - goto out; - } - - if (in->name) { - char path[PATH_MAX]; - struct stat st; - - try_s((ret = stream_flush(in)), err); - - snprintf(path, PATH_MAX, "/proc/self/fd/%i", in->fd); - - if (!stat(in->name, &st)) { - if (st.st_mode & S_IFREG) { - unlink(in->name); - } else { - ret = EINVAL; - rin_err("the given output file already exists and is not a regular file"); - goto err; - } - } - - try(linkat(AT_FDCWD, path, AT_FDCWD, in->name, AT_SYMLINK_FOLLOW), err, errno, "error linking output file to filesystem: %s", strerror(ret)); - } else { - rin_err("no output filename given"); - ret = EINVAL; - goto err; - } - -out: - if (in->settings->access == cached) { - cache_destroy(in); - } -err: - close(in->fd); - in->fd = -1; -early_err: - return ret; -} - -static int stream_open_out(struct stream * const in) -{ - struct stat st; - mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP; - char *dname = NULL; - char *tmp[2]; - int ret = 0; - - tmp[0] = strdup(in->name); - tmp[1] = dirname(tmp[0]); - dname = strdup(tmp[1]); - free(tmp[0]); - - try(stat(dname, &st), err, errno, "stat failed: %s", strerror(ret)); - try(!(st.st_mode & S_IFDIR), err, EINVAL, "invalid output path"); - - if (!stat(in->name, &st)) { - try(!(st.st_mode & S_IFREG), err, EINVAL, "the given output file already exists and is not a regular file"); - mode = st.st_mode; - } - - in->fd = open(dname, O_TMPFILE | O_RDWR, mode); - try(in->fd < 0, err, errno, "failed creating temporary output file: %s", strerror(ret)); - try(ftruncate(in->fd, in->settings->stride * in->n), err, errno, "failed setting output file size: %s", strerror(ret)); - -err: - free(dname); - return ret; -} - -static int stream_open_out_lite(struct stream * const in) -{ - struct stat st; - char *dname = NULL; - char *tmp[2]; - int ret = 0; - - tmp[0] = strdup(in->name); - tmp[1] = dirname(tmp[0]); - dname = strdup(tmp[1]); - free(tmp[0]); - - try(stat(dname, &st), err, errno, "stat failed: %s", strerror(ret)); - try(!(st.st_mode & S_IFDIR), err, EINVAL, "invalid output path"); - - in->fd = open(dname, O_TMPFILE | O_RDWR, S_IRUSR | S_IWUSR); - try(in->fd < 0, err, errno, "failed creating temporary output file: %s", strerror(ret)); - try(ftruncate(in->fd, in->settings->stride * in->n), err, errno, "failed setting output file size: %s", strerror(ret)); - -err: - free(dname); - return ret; -} - -static int stream_open_in(struct stream * const in) -{ - struct stat st; - int ret = 0; - - try(stat(in->name, &st), err, errno, "stat failed: %s", strerror(ret)); - try(!(st.st_mode & S_IFREG) || !st.st_size || (st.st_size % in->settings->stride), err, EINVAL, "invalid input file"); - in->n = st.st_size / in->settings->stride; - in->fd = open(in->name, O_RDONLY | O_NOATIME); - try(in->fd < 0, err, errno, "failed opening input file: %s", strerror(ret)); - -err: - return ret; -} - -int stream_flush(struct stream * const in) -{ - int ret = 0; - - try(in->settings->access != cached, err, EINVAL, "cannot flush an uncached stream"); - try(in->type != stream_out, err, EINVAL, "cannot flush a non-output cache"); - - if (in->settings->format == array) { - ret = stream_flush_array(in); - } else { - ret = stream_flush_list(in); - } -err: - return ret; -} - -int stream_flush_array(struct stream * const in) -{ - ssize_t ret = 0; - size_t remaining = in->n * in->settings->stride; - ssize_t written = 0; - - do { - ret = write(in->fd, in->cache + written, remaining); - if (ret < 0) { - try(errno != EAGAIN, err, errno, "Writing to stream failed with %zi", ret); - } else { - written += ret; - remaining -= ret; - ret = 0; - } - } while (remaining); - -err: - return ret; -} - -int stream_flush_list(struct stream * const in) -{ - ssize_t ret = 0; - size_t remaining; - ssize_t written; - size_t i; - - /* adjust pointers to have a base starting at 0, to cater to disk storage format. - * File offsets are otherwise 1:1 to memory addresses, which is neat. - * Just don't process the last offset, as that is always NULL/zero. - * */ - - for (i = 0; i < (in->n - 1); ++i) { - in->cache_l[i].offset = (char *) in->cache_l[i].next - in->cache; - } - - /* dump the bloody list */ - written = 0; - remaining = in->n * in->settings->stride; - - do { - ret = write(in->fd, in->cache + written, remaining); - if (ret < 0) { - try(errno != EAGAIN, err, errno, "Writing to stream failed with %zi", ret); - } else { - written += ret; - remaining -= ret; - ret = 0; - } - } while (remaining); - -err: - return ret; -} diff --git a/src/io.h b/src/io.h deleted file mode 100644 index 91f1f88..0000000 --- a/src/io.h +++ /dev/null @@ -1,19 +0,0 @@ -/* SPDX-License-Identifier: LGPL-2.1-only */ - -/* Copyright (C) 2020 Gediminas Jakutis */ - -#ifndef ALGOS_IO_H_INCLUDED -#define ALGOS_IO_H_INCLUDED - -#include "defs.h" - -int stream_open(struct stream * const in); -int stream_close(struct stream * const in); - -int direct_get_array(struct stream * const restrict in, ssize_t idx, struct entry_l * const data); -int direct_get_list(struct stream * const restrict in, ssize_t idx, struct entry_l * const data); - -int direct_put_array(struct stream * const restrict in, ssize_t idx, const struct entry_l * const data); -int direct_put_list(struct stream * const restrict in, ssize_t idx, const struct entry_l * const data); - -#endif /* ALGOS_IO_H_INCLUDED */ diff --git a/src/main.c b/src/main.c index c2d24ce..b841d56 100644 --- a/src/main.c +++ b/src/main.c @@ -8,32 +8,48 @@ #include #include #include -#include "io.h" +#include +#include "stream.h" #include "defs.h" #include "cache.h" #include "datagen.h" #include "mergesort.h" -static struct settings settings = {0}; +static struct settings settings = {0, 0, 0, NULL, NULL, mode_normal, array, cached}; static int parseargs(int argc, char **argv, struct settings * settings); -int load_io_functions(struct settings const * const s, struct stream * const in); -void printhelp(const char * const name); -struct entry_l *stub_getnext(struct stream * const restrict in); +static int load_io_functions(struct settings const * const s, struct stream * const in); +static void printhelp(const char * const name); +static struct entry_l *stub_getnext(struct stream * const restrict in); static int stub_put(struct stream * const restrict in, struct entry_l const * const data); static int stub_split(struct stream * const in, struct stream * const a, struct stream * const b); -static int stub_flush(struct stream * const in); -/* these need to go AFTER the stub declarations */ -static struct stream file_in = {stream_blank, .type = stream_in}; -static struct stream file_out = {stream_blank, .type = stream_out}; -static struct stream file_tmp = {stream_blank, .type = stream_outlite}; +static const struct stream stream_blank = { + .n = 0, .type = stream_invalid, .fd = -1, .settings = &settings, + .name = NULL, .index = 0, .pnode = NULL, .cnode = NULL, .cache = NULL, + .get_next_element_direct = stub_getnext, + .get_next_element_cache = stub_getnext, + .place_next_element_direct = stub_put, + .place_next_element_cache = stub_put, .split = stub_split }; + +static struct stream file_in = stream_blank; +static struct stream file_out = stream_blank; +static struct stream file_tmp; + +static struct rin_bench_result bongholio; int main(int argc, char **argv) { int ret = 0; rin_diag_init(); + rin_diag_format(rin_diag_info, "C:mn"); + rin_diag_channel_set_enabled_state(rin_diag_err, 0); + + file_in.type = stream_in; + file_out.type = stream_out; + file_tmp.type = stream_outlite; + try_s((ret = parseargs(argc, argv, &settings)), early_out); if (settings.opmode == mode_generate) { @@ -47,7 +63,7 @@ int main(int argc, char **argv) file_out.name = settings.fileout ? settings.fileout : settings.filein; try_s((ret = stream_open(&file_in)), out); - file_out.n = file_in.n - settings.ss; + file_out.n = settings.opmode == mode_normal ? file_in.n : file_in.n - settings.ss; try_s((ret = stream_open(&file_out)), out); load_io_functions(&settings, &file_out); @@ -68,8 +84,18 @@ int main(int argc, char **argv) try_s((ret = cache_transfer(&file_in, &file_out)), out); break; case mode_normal: - try_s((ret = cache_create(&file_out)), out); - try_s((ret = merge_sort(&file_in, &file_out)), out); + + /* BENCHMARK STARTS HERE */ + rin_bench_start(); + try_s((ret = merge_sort(&file_in, &file_tmp)), out); + rin_bench_stop(&bongholio); + /* BENCHMARK ENDS HERE */ + rin_info("wall: %lus%6luµs", bongholio.wall.tv_sec, bongholio.wall.tv_nsec / 1000); + rin_info("system: %lus%6luµs", bongholio.system.tv_sec, bongholio.system.tv_usec); + rin_info("user: %lus%6luµs", bongholio.user.tv_sec, bongholio.user.tv_usec); + rin_info("total: %lus%6luµs", bongholio.total.tv_sec, bongholio.total.tv_usec); + + try_s((ret = cache_transfer(&file_tmp, &file_out)), out); } } else { /* uncached */ @@ -78,6 +104,7 @@ int main(int argc, char **argv) stream_close(&file_in); stream_close(&file_out); + stream_close(&file_tmp); while (0) { out: @@ -170,7 +197,7 @@ err: return ret; } -int load_io_functions(struct settings const * const s, struct stream * const in) +static int load_io_functions(struct settings const * const s, struct stream * const in) { int ret = 0; @@ -178,6 +205,8 @@ int load_io_functions(struct settings const * const s, struct stream * const in) if (s->format == array) { in->get_next_element_cache = cached_get_array; in->place_next_element_cache = cached_put_array; + in->split = cache_block_split; + in->rewind = cache_rewind; } else { /* if (s->format == list */ /* TODO */ } @@ -185,6 +214,8 @@ int load_io_functions(struct settings const * const s, struct stream * const in) if (s->format == array) { in->get_next_element_cache = cached_get_array; in->place_next_element_cache = cached_put_array; + in->split = cache_block_split; + in->rewind = cache_rewind; } else { /* if (s->format == list */ /* TODO */ } @@ -203,7 +234,7 @@ int load_io_functions(struct settings const * const s, struct stream * const in) return ret; } -void printhelp(const char * const name) +static void printhelp(const char * const name) { printf( "This is a mergesort program and such\n" "\n" @@ -226,7 +257,7 @@ void printhelp(const char * const name) return; } -struct entry_l *stub_getnext(struct stream * const restrict in) +static struct entry_l *stub_getnext(struct stream * const restrict in) { struct entry_l *ret = NULL; @@ -261,14 +292,3 @@ static int stub_split(struct stream * const in, struct stream * const a, struct return ret; } - -static int stub_flush(struct stream * const in) -{ - int ret = ENOSYS; - - (void) in; - - rin_warn("stub!"); - - return ret; -} diff --git a/src/mergesort.c b/src/mergesort.c index c431b31..7391544 100644 --- a/src/mergesort.c +++ b/src/mergesort.c @@ -5,42 +5,49 @@ #include #include #include "defs.h" -#include "io.h" +#include "stream.h" #include "mergesort.h" +static int merge(struct stream * const dest, struct stream * const A, struct stream * const B); + /* bottom-up variantas */ int merge_sort(struct stream * const src, struct stream * const dest) { int ret = 0; - struct stream tmp[4]; /* I can't into stream reuse, hence four. A yeet, followed by a dab. */ + struct stream tmp[4] = {0}; /* I can't into stream reuse, hence four. A yeet, followed by a dab. */ size_t i; try(!src || !dest, err, EINVAL, "cannot sort what's not there"); - if (src->n > 1) { /* stream of size one is inherently sorted */ + try_s((ret = stream_shallow_copy(src, dest)), err); + + if (src->n > 1) { src->split(src, tmp, tmp + 2); /* split stream in half */ - merge_sort(tmp + 1, tmp); /* recurse-sort first half */ - merge_sort(tmp + 3, tmp + 2); /* recurse-sort second half */ - merge(dest, tmp, tmp + 2); /* merge the two halves back */ - } + merge_sort(tmp, tmp + 1); /* recurse-sort first half */ + merge_sort(tmp + 2, tmp + 3); /* recurse-sort second half */ + merge(dest, tmp + 1, tmp + 3); /* merge the two halves back */ + dest->rewind(dest); - for (i = 0; i < (sizeof(tmp) / sizeof(*tmp)); ++i) { - stream_close(tmp + i); + for (i = 0; i < (sizeof(tmp) / sizeof(*tmp)); ++i) { + stream_close(tmp + i); + } + } else { /* stream of size one is inherently sorted, simply src with dest */ + *tmp = *dest; + *dest = *src; + *src = *tmp; } err: return ret; } -int merge(struct stream * const dest, struct stream * const A, struct stream * const B) +static int merge(struct stream * const dest, struct stream * const A, struct stream * const B) { int ret = 0; struct entry_l *a; struct entry_l *b; - try(A->parentid != B->parentid, err, EINVAL, "cannot merge blocks: uncommon parent!"); - a = get(A); b = get(B); @@ -54,7 +61,6 @@ int merge(struct stream * const dest, struct stream * const A, struct stream * c } } -err: return ret; } diff --git a/src/mergesort.h b/src/mergesort.h index ea66a8b..8842906 100644 --- a/src/mergesort.h +++ b/src/mergesort.h @@ -8,6 +8,5 @@ #include "defs.h" int merge_sort(struct stream * const src, struct stream * const dest); -int merge(struct stream * const dest, struct stream * const A, struct stream * const B); #endif /* ALGOS_MERGESORT_H_INCLUDED */ diff --git a/src/meson.build b/src/meson.build index b714823..aed222e 100644 --- a/src/meson.build +++ b/src/meson.build @@ -1,7 +1,7 @@ source_files = [ 'cache.c', 'datagen.c', - 'io.c', + 'stream.c', 'main.c', 'mergesort.c', ] @@ -10,7 +10,7 @@ header_files = [ 'cache.h', 'defs.h', 'datagen.h', - 'io.h', + 'stream.h', 'mergesort.h', ] diff --git a/src/stream.c b/src/stream.c new file mode 100644 index 0000000..9de9e83 --- /dev/null +++ b/src/stream.c @@ -0,0 +1,293 @@ +/* SPDX-License-Identifier: LGPL-2.1-only */ + +/* Copyright (C) 2020 Gediminas Jakutis */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "stream.h" +#include "defs.h" +#include "datagen.h" +#include "cache.h" + +static int stream_open_out(struct stream * const in); +static int stream_open_out_lite(struct stream * const in); +static int stream_open_in(struct stream * const in); +static int stream_flush(struct stream * const in); +static int stream_flush_array(struct stream * const in); +static int stream_flush_list(struct stream * const in); + +int stream_open(struct stream * const in) +{ + int ret = 0; + + try(!in || in->fd > 0 || (!in->name && in->type != stream_randread), err, EINVAL, "invalid argunent"); + + switch (in->type) { + case (stream_outlite): + ret = stream_open_out_lite(in); + break; + case (stream_out): + try(!in->name, err, EINVAL, "no filename given"); + ret = stream_open_out(in); + break; + case (stream_in): + try(!in->name, err, EINVAL, "no filename given"); + ret = stream_open_in(in); + break; + case (stream_randread): + ; /* NOOP */ + break; + default: + try(0, err, EINVAL, "cannot open stream: stream is invalid"); + } + +err: + return ret; +} + +int stream_close(struct stream * const in) +{ + int ret = 0; + + if (in->type != stream_out) { + goto out; + } + + if (in->name) { + char path[PATH_MAX]; + struct stat st; + + try_s((ret = stream_flush(in)), err); + + snprintf(path, PATH_MAX, "/proc/self/fd/%i", in->fd); + + if (!stat(in->name, &st)) { + if (st.st_mode & S_IFREG) { + unlink(in->name); + } else { + ret = EINVAL; + rin_err("the given output file already exists and is not a regular file"); + goto err; + } + } + + try(linkat(AT_FDCWD, path, AT_FDCWD, in->name, AT_SYMLINK_FOLLOW), err, errno, "error linking output file to filesystem: %s", strerror(ret)); + } else { + rin_err("no output filename given"); + ret = EINVAL; + goto err; + } + +out: + if (in->settings && in->settings->access == cached) { + cache_destroy(in); + } +err: + in->fd > 2 ? close(in->fd) : 0; + in->fd = -1; + return ret; +} + +int stream_readfile(struct stream * const in) +{ + ssize_t ret = 0; + size_t remaining = in->n * in->settings->stride; + ssize_t bytesread = 0; + size_t i; + + try(in->fd < 3, err, EINVAL, "no file open for reading"); + + do { + ret = read(in->fd, in->cache + bytesread, remaining); + if (ret < 0) { + try(errno != EAGAIN, err, errno, "Writing to stream failed with %zi", ret); + } else { + bytesread += ret; + remaining -= ret; + } + } while (ret); + + /* if this is a list, we need to adjust the link pointers from file offsets + * to buffer addresses. 'Cept for the last one, which needs to be NULL. + */ + if (in->settings->format == list) { + for (i = 0; i < (in->n - 1); ++i) { + in->cache_l[i].offset = (char *) in->cache_l[i].next - in->cache; + } + } + +err: + return ret; +} + +int stream_shallow_copy(struct stream const * const restrict src, struct stream * const dest) +{ + int ret = 0; + + dest->n = src->n; + dest->settings = src->settings; + dest->index = 0; + + if (src->settings->access == cached) { + dest->type = stream_cache; + dest->get_next_element_cache = src->get_next_element_cache; + dest->place_next_element_cache = src->place_next_element_cache; + dest->split = src->split; + dest->rewind = src->rewind; + try_s((ret = cache_create(dest)), err); + } else { + rin_warn("stub!"); + ret = ENOSYS; + } + +err: + return ret; +} + +static int stream_open_out(struct stream * const in) +{ + struct stat st; + mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP; + char *dname = NULL; + char *tmp[2]; + int ret = 0; + + tmp[0] = strdup(in->name); + tmp[1] = dirname(tmp[0]); + dname = strdup(tmp[1]); + free(tmp[0]); + + try(stat(dname, &st), err, errno, "stat failed: %s", strerror(ret)); + try(!(st.st_mode & S_IFDIR), err, EINVAL, "invalid output path"); + + if (!stat(in->name, &st)) { + try(!(st.st_mode & S_IFREG), err, EINVAL, "the given output file already exists and is not a regular file"); + mode = st.st_mode; + } + + in->fd = open(dname, O_TMPFILE | O_RDWR, mode); + try(in->fd < 0, err, errno, "failed creating temporary output file: %s", strerror(ret)); + try(ftruncate(in->fd, in->settings->stride * in->n), err, errno, "failed setting output file size: %s", strerror(ret)); + +err: + free(dname); + return ret; +} + +static int stream_open_out_lite(struct stream * const in) +{ + struct stat st; + char *dname = NULL; + char *tmp[2]; + int ret = 0; + + tmp[0] = strdup(in->name); + tmp[1] = dirname(tmp[0]); + dname = strdup(tmp[1]); + free(tmp[0]); + + try(stat(dname, &st), err, errno, "stat failed: %s", strerror(ret)); + try(!(st.st_mode & S_IFDIR), err, EINVAL, "invalid output path"); + + in->fd = open(dname, O_TMPFILE | O_RDWR, S_IRUSR | S_IWUSR); + try(in->fd < 0, err, errno, "failed creating temporary output file: %s", strerror(ret)); + try(ftruncate(in->fd, in->settings->stride * in->n), err, errno, "failed setting output file size: %s", strerror(ret)); + +err: + free(dname); + return ret; +} + +static int stream_open_in(struct stream * const in) +{ + struct stat st; + int ret = 0; + + try(stat(in->name, &st), err, errno, "stat failed: %s", strerror(ret)); + try(!(st.st_mode & S_IFREG) || !st.st_size || (st.st_size % in->settings->stride), err, EINVAL, "invalid input file"); + in->n = st.st_size / in->settings->stride; + in->fd = open(in->name, O_RDONLY | O_NOATIME); + try(in->fd < 0, err, errno, "failed opening input file: %s", strerror(ret)); + +err: + return ret; +} + +static int stream_flush(struct stream * const in) +{ + int ret = 0; + + try(in->settings->access != cached, err, EINVAL, "cannot flush an uncached stream"); + try(in->type != stream_out, err, EINVAL, "cannot flush a non-output cache"); + + if (in->settings->format == array) { + ret = stream_flush_array(in); + } else { + ret = stream_flush_list(in); + } +err: + return ret; +} + +static int stream_flush_array(struct stream * const in) +{ + ssize_t ret = 0; + size_t remaining = in->n * in->settings->stride; + ssize_t written = 0; + + try(in->fd < 3, err, EINVAL, "can't dump without an open file"); + + do { + ret = write(in->fd, in->cache + written, remaining); + if (ret < 0) { + try(errno != EAGAIN, err, errno, "Writing to stream failed with %zi", ret); + } else { + written += ret; + remaining -= ret; + ret = 0; + } + } while (remaining); + +err: + return ret; +} + +static int stream_flush_list(struct stream * const in) +{ + ssize_t ret = 0; + size_t remaining = in->n * in->settings->stride; + ssize_t written = 0; + size_t i; + + /* adjust pointers to have a base starting at 0, to cater to disk storage format. + * File offsets are otherwise 1:1 to memory addresses, which is neat. + * Just don't process the last offset, as that is always NULL/zero. + * */ + + for (i = 0; i < (in->n - 1); ++i) { + in->cache_l[i].offset = (char *) in->cache_l[i].next - in->cache; + } + + /* dump the bloody list */ + do { + ret = write(in->fd, in->cache + written, remaining); + if (ret < 0) { + try(errno != EAGAIN, err, errno, "Writing to stream failed with %zi", ret); + } else { + written += ret; + remaining -= ret; + ret = 0; + } + } while (remaining); + +err: + return ret; +} diff --git a/src/stream.h b/src/stream.h new file mode 100644 index 0000000..7a429a4 --- /dev/null +++ b/src/stream.h @@ -0,0 +1,21 @@ +/* SPDX-License-Identifier: LGPL-2.1-only */ + +/* Copyright (C) 2020 Gediminas Jakutis */ + +#ifndef ALGOS_IO_H_INCLUDED +#define ALGOS_IO_H_INCLUDED + +#include "defs.h" + +int stream_open(struct stream * const in); +int stream_close(struct stream * const in); +int stream_readfile(struct stream * const in); + +int direct_get_array(struct stream * const restrict in, ssize_t idx, struct entry_l * const data); +int direct_get_list(struct stream * const restrict in, ssize_t idx, struct entry_l * const data); + +int direct_put_array(struct stream * const restrict in, ssize_t idx, const struct entry_l * const data); +int direct_put_list(struct stream * const restrict in, ssize_t idx, const struct entry_l * const data); + +int stream_shallow_copy(struct stream const * const restrict src, struct stream * const dest); +#endif /* ALGOS_IO_H_INCLUDED */ -- cgit v1.2.3