summaryrefslogtreecommitdiffstats
path: root/src/stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/stream.c')
-rw-r--r--src/stream.c293
1 files changed, 293 insertions, 0 deletions
diff --git a/src/stream.c b/src/stream.c
new file mode 100644
index 0000000..9de9e83
--- /dev/null
+++ b/src/stream.c
@@ -0,0 +1,293 @@
+/* SPDX-License-Identifier: LGPL-2.1-only */
+
+/* Copyright (C) 2020 Gediminas Jakutis */
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <linux/limits.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <libgen.h>
+#include "stream.h"
+#include "defs.h"
+#include "datagen.h"
+#include "cache.h"
+
+static int stream_open_out(struct stream * const in);
+static int stream_open_out_lite(struct stream * const in);
+static int stream_open_in(struct stream * const in);
+static int stream_flush(struct stream * const in);
+static int stream_flush_array(struct stream * const in);
+static int stream_flush_list(struct stream * const in);
+
+int stream_open(struct stream * const in)
+{
+ int ret = 0;
+
+ try(!in || in->fd > 0 || (!in->name && in->type != stream_randread), err, EINVAL, "invalid argunent");
+
+ switch (in->type) {
+ case (stream_outlite):
+ ret = stream_open_out_lite(in);
+ break;
+ case (stream_out):
+ try(!in->name, err, EINVAL, "no filename given");
+ ret = stream_open_out(in);
+ break;
+ case (stream_in):
+ try(!in->name, err, EINVAL, "no filename given");
+ ret = stream_open_in(in);
+ break;
+ case (stream_randread):
+ ; /* NOOP */
+ break;
+ default:
+ try(0, err, EINVAL, "cannot open stream: stream is invalid");
+ }
+
+err:
+ return ret;
+}
+
+int stream_close(struct stream * const in)
+{
+ int ret = 0;
+
+ if (in->type != stream_out) {
+ goto out;
+ }
+
+ if (in->name) {
+ char path[PATH_MAX];
+ struct stat st;
+
+ try_s((ret = stream_flush(in)), err);
+
+ snprintf(path, PATH_MAX, "/proc/self/fd/%i", in->fd);
+
+ if (!stat(in->name, &st)) {
+ if (st.st_mode & S_IFREG) {
+ unlink(in->name);
+ } else {
+ ret = EINVAL;
+ rin_err("the given output file already exists and is not a regular file");
+ goto err;
+ }
+ }
+
+ try(linkat(AT_FDCWD, path, AT_FDCWD, in->name, AT_SYMLINK_FOLLOW), err, errno, "error linking output file to filesystem: %s", strerror(ret));
+ } else {
+ rin_err("no output filename given");
+ ret = EINVAL;
+ goto err;
+ }
+
+out:
+ if (in->settings && in->settings->access == cached) {
+ cache_destroy(in);
+ }
+err:
+ in->fd > 2 ? close(in->fd) : 0;
+ in->fd = -1;
+ return ret;
+}
+
+int stream_readfile(struct stream * const in)
+{
+ ssize_t ret = 0;
+ size_t remaining = in->n * in->settings->stride;
+ ssize_t bytesread = 0;
+ size_t i;
+
+ try(in->fd < 3, err, EINVAL, "no file open for reading");
+
+ do {
+ ret = read(in->fd, in->cache + bytesread, remaining);
+ if (ret < 0) {
+ try(errno != EAGAIN, err, errno, "Writing to stream failed with %zi", ret);
+ } else {
+ bytesread += ret;
+ remaining -= ret;
+ }
+ } while (ret);
+
+ /* if this is a list, we need to adjust the link pointers from file offsets
+ * to buffer addresses. 'Cept for the last one, which needs to be NULL.
+ */
+ if (in->settings->format == list) {
+ for (i = 0; i < (in->n - 1); ++i) {
+ in->cache_l[i].offset = (char *) in->cache_l[i].next - in->cache;
+ }
+ }
+
+err:
+ return ret;
+}
+
+int stream_shallow_copy(struct stream const * const restrict src, struct stream * const dest)
+{
+ int ret = 0;
+
+ dest->n = src->n;
+ dest->settings = src->settings;
+ dest->index = 0;
+
+ if (src->settings->access == cached) {
+ dest->type = stream_cache;
+ dest->get_next_element_cache = src->get_next_element_cache;
+ dest->place_next_element_cache = src->place_next_element_cache;
+ dest->split = src->split;
+ dest->rewind = src->rewind;
+ try_s((ret = cache_create(dest)), err);
+ } else {
+ rin_warn("stub!");
+ ret = ENOSYS;
+ }
+
+err:
+ return ret;
+}
+
+static int stream_open_out(struct stream * const in)
+{
+ struct stat st;
+ mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP;
+ char *dname = NULL;
+ char *tmp[2];
+ int ret = 0;
+
+ tmp[0] = strdup(in->name);
+ tmp[1] = dirname(tmp[0]);
+ dname = strdup(tmp[1]);
+ free(tmp[0]);
+
+ try(stat(dname, &st), err, errno, "stat failed: %s", strerror(ret));
+ try(!(st.st_mode & S_IFDIR), err, EINVAL, "invalid output path");
+
+ if (!stat(in->name, &st)) {
+ try(!(st.st_mode & S_IFREG), err, EINVAL, "the given output file already exists and is not a regular file");
+ mode = st.st_mode;
+ }
+
+ in->fd = open(dname, O_TMPFILE | O_RDWR, mode);
+ try(in->fd < 0, err, errno, "failed creating temporary output file: %s", strerror(ret));
+ try(ftruncate(in->fd, in->settings->stride * in->n), err, errno, "failed setting output file size: %s", strerror(ret));
+
+err:
+ free(dname);
+ return ret;
+}
+
+static int stream_open_out_lite(struct stream * const in)
+{
+ struct stat st;
+ char *dname = NULL;
+ char *tmp[2];
+ int ret = 0;
+
+ tmp[0] = strdup(in->name);
+ tmp[1] = dirname(tmp[0]);
+ dname = strdup(tmp[1]);
+ free(tmp[0]);
+
+ try(stat(dname, &st), err, errno, "stat failed: %s", strerror(ret));
+ try(!(st.st_mode & S_IFDIR), err, EINVAL, "invalid output path");
+
+ in->fd = open(dname, O_TMPFILE | O_RDWR, S_IRUSR | S_IWUSR);
+ try(in->fd < 0, err, errno, "failed creating temporary output file: %s", strerror(ret));
+ try(ftruncate(in->fd, in->settings->stride * in->n), err, errno, "failed setting output file size: %s", strerror(ret));
+
+err:
+ free(dname);
+ return ret;
+}
+
+static int stream_open_in(struct stream * const in)
+{
+ struct stat st;
+ int ret = 0;
+
+ try(stat(in->name, &st), err, errno, "stat failed: %s", strerror(ret));
+ try(!(st.st_mode & S_IFREG) || !st.st_size || (st.st_size % in->settings->stride), err, EINVAL, "invalid input file");
+ in->n = st.st_size / in->settings->stride;
+ in->fd = open(in->name, O_RDONLY | O_NOATIME);
+ try(in->fd < 0, err, errno, "failed opening input file: %s", strerror(ret));
+
+err:
+ return ret;
+}
+
+static int stream_flush(struct stream * const in)
+{
+ int ret = 0;
+
+ try(in->settings->access != cached, err, EINVAL, "cannot flush an uncached stream");
+ try(in->type != stream_out, err, EINVAL, "cannot flush a non-output cache");
+
+ if (in->settings->format == array) {
+ ret = stream_flush_array(in);
+ } else {
+ ret = stream_flush_list(in);
+ }
+err:
+ return ret;
+}
+
+static int stream_flush_array(struct stream * const in)
+{
+ ssize_t ret = 0;
+ size_t remaining = in->n * in->settings->stride;
+ ssize_t written = 0;
+
+ try(in->fd < 3, err, EINVAL, "can't dump without an open file");
+
+ do {
+ ret = write(in->fd, in->cache + written, remaining);
+ if (ret < 0) {
+ try(errno != EAGAIN, err, errno, "Writing to stream failed with %zi", ret);
+ } else {
+ written += ret;
+ remaining -= ret;
+ ret = 0;
+ }
+ } while (remaining);
+
+err:
+ return ret;
+}
+
+static int stream_flush_list(struct stream * const in)
+{
+ ssize_t ret = 0;
+ size_t remaining = in->n * in->settings->stride;
+ ssize_t written = 0;
+ size_t i;
+
+ /* adjust pointers to have a base starting at 0, to cater to disk storage format.
+ * File offsets are otherwise 1:1 to memory addresses, which is neat.
+ * Just don't process the last offset, as that is always NULL/zero.
+ * */
+
+ for (i = 0; i < (in->n - 1); ++i) {
+ in->cache_l[i].offset = (char *) in->cache_l[i].next - in->cache;
+ }
+
+ /* dump the bloody list */
+ do {
+ ret = write(in->fd, in->cache + written, remaining);
+ if (ret < 0) {
+ try(errno != EAGAIN, err, errno, "Writing to stream failed with %zi", ret);
+ } else {
+ written += ret;
+ remaining -= ret;
+ ret = 0;
+ }
+ } while (remaining);
+
+err:
+ return ret;
+}