From 29712a5098842ea3930ec00ddd1c0b9d264ba9b5 Mon Sep 17 00:00:00 2001 From: Gediminas Jakutis Date: Sun, 21 Feb 2021 13:16:54 +0200 Subject: in-memory array sorting: GET! lists need a bit more work and after that, in-file should shortly follow. Signed-off-by: Gediminas Jakutis --- src/stream.c | 293 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 293 insertions(+) create mode 100644 src/stream.c (limited to 'src/stream.c') diff --git a/src/stream.c b/src/stream.c new file mode 100644 index 0000000..9de9e83 --- /dev/null +++ b/src/stream.c @@ -0,0 +1,293 @@ +/* SPDX-License-Identifier: LGPL-2.1-only */ + +/* Copyright (C) 2020 Gediminas Jakutis */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "stream.h" +#include "defs.h" +#include "datagen.h" +#include "cache.h" + +static int stream_open_out(struct stream * const in); +static int stream_open_out_lite(struct stream * const in); +static int stream_open_in(struct stream * const in); +static int stream_flush(struct stream * const in); +static int stream_flush_array(struct stream * const in); +static int stream_flush_list(struct stream * const in); + +int stream_open(struct stream * const in) +{ + int ret = 0; + + try(!in || in->fd > 0 || (!in->name && in->type != stream_randread), err, EINVAL, "invalid argunent"); + + switch (in->type) { + case (stream_outlite): + ret = stream_open_out_lite(in); + break; + case (stream_out): + try(!in->name, err, EINVAL, "no filename given"); + ret = stream_open_out(in); + break; + case (stream_in): + try(!in->name, err, EINVAL, "no filename given"); + ret = stream_open_in(in); + break; + case (stream_randread): + ; /* NOOP */ + break; + default: + try(0, err, EINVAL, "cannot open stream: stream is invalid"); + } + +err: + return ret; +} + +int stream_close(struct stream * const in) +{ + int ret = 0; + + if (in->type != stream_out) { + goto out; + } + + if (in->name) { + char path[PATH_MAX]; + struct stat st; + + try_s((ret = stream_flush(in)), err); + + snprintf(path, PATH_MAX, "/proc/self/fd/%i", in->fd); + + if (!stat(in->name, &st)) { + if (st.st_mode & S_IFREG) { + unlink(in->name); + } else { + ret = EINVAL; + rin_err("the given output file already exists and is not a regular file"); + goto err; + } + } + + try(linkat(AT_FDCWD, path, AT_FDCWD, in->name, AT_SYMLINK_FOLLOW), err, errno, "error linking output file to filesystem: %s", strerror(ret)); + } else { + rin_err("no output filename given"); + ret = EINVAL; + goto err; + } + +out: + if (in->settings && in->settings->access == cached) { + cache_destroy(in); + } +err: + in->fd > 2 ? close(in->fd) : 0; + in->fd = -1; + return ret; +} + +int stream_readfile(struct stream * const in) +{ + ssize_t ret = 0; + size_t remaining = in->n * in->settings->stride; + ssize_t bytesread = 0; + size_t i; + + try(in->fd < 3, err, EINVAL, "no file open for reading"); + + do { + ret = read(in->fd, in->cache + bytesread, remaining); + if (ret < 0) { + try(errno != EAGAIN, err, errno, "Writing to stream failed with %zi", ret); + } else { + bytesread += ret; + remaining -= ret; + } + } while (ret); + + /* if this is a list, we need to adjust the link pointers from file offsets + * to buffer addresses. 'Cept for the last one, which needs to be NULL. + */ + if (in->settings->format == list) { + for (i = 0; i < (in->n - 1); ++i) { + in->cache_l[i].offset = (char *) in->cache_l[i].next - in->cache; + } + } + +err: + return ret; +} + +int stream_shallow_copy(struct stream const * const restrict src, struct stream * const dest) +{ + int ret = 0; + + dest->n = src->n; + dest->settings = src->settings; + dest->index = 0; + + if (src->settings->access == cached) { + dest->type = stream_cache; + dest->get_next_element_cache = src->get_next_element_cache; + dest->place_next_element_cache = src->place_next_element_cache; + dest->split = src->split; + dest->rewind = src->rewind; + try_s((ret = cache_create(dest)), err); + } else { + rin_warn("stub!"); + ret = ENOSYS; + } + +err: + return ret; +} + +static int stream_open_out(struct stream * const in) +{ + struct stat st; + mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP; + char *dname = NULL; + char *tmp[2]; + int ret = 0; + + tmp[0] = strdup(in->name); + tmp[1] = dirname(tmp[0]); + dname = strdup(tmp[1]); + free(tmp[0]); + + try(stat(dname, &st), err, errno, "stat failed: %s", strerror(ret)); + try(!(st.st_mode & S_IFDIR), err, EINVAL, "invalid output path"); + + if (!stat(in->name, &st)) { + try(!(st.st_mode & S_IFREG), err, EINVAL, "the given output file already exists and is not a regular file"); + mode = st.st_mode; + } + + in->fd = open(dname, O_TMPFILE | O_RDWR, mode); + try(in->fd < 0, err, errno, "failed creating temporary output file: %s", strerror(ret)); + try(ftruncate(in->fd, in->settings->stride * in->n), err, errno, "failed setting output file size: %s", strerror(ret)); + +err: + free(dname); + return ret; +} + +static int stream_open_out_lite(struct stream * const in) +{ + struct stat st; + char *dname = NULL; + char *tmp[2]; + int ret = 0; + + tmp[0] = strdup(in->name); + tmp[1] = dirname(tmp[0]); + dname = strdup(tmp[1]); + free(tmp[0]); + + try(stat(dname, &st), err, errno, "stat failed: %s", strerror(ret)); + try(!(st.st_mode & S_IFDIR), err, EINVAL, "invalid output path"); + + in->fd = open(dname, O_TMPFILE | O_RDWR, S_IRUSR | S_IWUSR); + try(in->fd < 0, err, errno, "failed creating temporary output file: %s", strerror(ret)); + try(ftruncate(in->fd, in->settings->stride * in->n), err, errno, "failed setting output file size: %s", strerror(ret)); + +err: + free(dname); + return ret; +} + +static int stream_open_in(struct stream * const in) +{ + struct stat st; + int ret = 0; + + try(stat(in->name, &st), err, errno, "stat failed: %s", strerror(ret)); + try(!(st.st_mode & S_IFREG) || !st.st_size || (st.st_size % in->settings->stride), err, EINVAL, "invalid input file"); + in->n = st.st_size / in->settings->stride; + in->fd = open(in->name, O_RDONLY | O_NOATIME); + try(in->fd < 0, err, errno, "failed opening input file: %s", strerror(ret)); + +err: + return ret; +} + +static int stream_flush(struct stream * const in) +{ + int ret = 0; + + try(in->settings->access != cached, err, EINVAL, "cannot flush an uncached stream"); + try(in->type != stream_out, err, EINVAL, "cannot flush a non-output cache"); + + if (in->settings->format == array) { + ret = stream_flush_array(in); + } else { + ret = stream_flush_list(in); + } +err: + return ret; +} + +static int stream_flush_array(struct stream * const in) +{ + ssize_t ret = 0; + size_t remaining = in->n * in->settings->stride; + ssize_t written = 0; + + try(in->fd < 3, err, EINVAL, "can't dump without an open file"); + + do { + ret = write(in->fd, in->cache + written, remaining); + if (ret < 0) { + try(errno != EAGAIN, err, errno, "Writing to stream failed with %zi", ret); + } else { + written += ret; + remaining -= ret; + ret = 0; + } + } while (remaining); + +err: + return ret; +} + +static int stream_flush_list(struct stream * const in) +{ + ssize_t ret = 0; + size_t remaining = in->n * in->settings->stride; + ssize_t written = 0; + size_t i; + + /* adjust pointers to have a base starting at 0, to cater to disk storage format. + * File offsets are otherwise 1:1 to memory addresses, which is neat. + * Just don't process the last offset, as that is always NULL/zero. + * */ + + for (i = 0; i < (in->n - 1); ++i) { + in->cache_l[i].offset = (char *) in->cache_l[i].next - in->cache; + } + + /* dump the bloody list */ + do { + ret = write(in->fd, in->cache + written, remaining); + if (ret < 0) { + try(errno != EAGAIN, err, errno, "Writing to stream failed with %zi", ret); + } else { + written += ret; + remaining -= ret; + ret = 0; + } + } while (remaining); + +err: + return ret; +} -- cgit v1.2.3