/* SPDX-License-Identifier: LGPL-2.1-only */ /* Copyright (C) 2020-2021 Gediminas Jakutis */ #include #include #include #include #include #include #include #include #include #include #include "stream.h" #include "defs.h" #include "datagen.h" #include "cache.h" #include "util.h" static int stream_open_out(struct stream * const in); static int stream_open_lite(struct stream * const in); static int stream_open_in(struct stream * const in); static int stream_flush(struct stream * const in); static int stream_flush_array(struct stream * const in); static int stream_flush_list(struct stream * const in); int stream_open(struct stream * const in) { int ret = 0; try(!in || in->fd > 0 || (!in->name && in->type != stream_randread), err, EINVAL, "invalid argunent"); switch (in->type) { case (stream_lite): ret = stream_open_lite(in); break; case (stream_out): try(!in->name, err, EINVAL, "no filename given"); ret = stream_open_out(in); break; case (stream_in): try(!in->name, err, EINVAL, "no filename given"); ret = stream_open_in(in); break; case (stream_randread): ; /* NOOP */ break; default: try(0, err, EINVAL, "cannot open stream: stream is invalid"); } err: return ret; } int stream_close(struct stream * const in) { int ret = 0; if (in->type != stream_out) { goto out; } if (in->name) { char path[PATH_MAX]; struct stat st; if (in->settings->access == cached) { try_s((ret = stream_flush(in)), err); } snprintf(path, PATH_MAX, "/proc/self/fd/%i", in->fd); if (!stat(in->name, &st)) { if (st.st_mode & S_IFREG) { unlink(in->name); } else { ret = EINVAL; rin_err("the given output file already exists and is not a regular file"); goto err; } } try(linkat(AT_FDCWD, path, AT_FDCWD, in->name, AT_SYMLINK_FOLLOW), err, errno, "error linking output file to filesystem: %s", strerror(ret)); } else { rin_err("no output filename given"); ret = EINVAL; goto err; } out: if (in->settings && in->settings->access == cached) { cache_destroy(in); } err: in->fd > 2 ? close(in->fd) : 0; in->fd = -1; return ret; } struct entry_l *file_get_array(struct stream * const in, struct entry_l * const store) { struct entry ent; struct entry_l *ret = NULL; ssize_t bytes_read; if (in->index < in->n) { do { try(0 > (bytes_read = pread(in->fd, &ent, sizeof(ent), in->index * in->settings->stride)), err, NULL, "Writing to stream failed with %i", errno); } while (bytes_read != sizeof(ent)); ++in->index; store->val = ent.val; ret = store; } err: return ret; } struct entry_l *file_get_list(struct stream * const in, struct entry_l * const store) { struct entry_l ent; struct entry_l *ret = NULL; ssize_t bytes_read; if (in->index < in->n) { in->cnode = in->cnode ? in->cnode : &in->cnode_f; do { try(0 > (bytes_read = pread(in->fd, &ent, sizeof(ent), in->cnode->file_offset)), err, NULL, "Writing to stream failed with %i", errno); } while (bytes_read != sizeof(ent)); in->cnode->file_offset = ent.file_offset; ++in->index; store->val = ent.val; ret = store; } err: return ret; } int file_put_array(struct stream * const in, const struct entry_l * const data) { int ret = 0; ssize_t bytes_written; if (in->index < in->n) { do { try(0 > (bytes_written = pwrite(in->fd, data, sizeof(data->val), in->index * in->settings->stride)), err, errno, "Writing to stream failed with %i", ret); } while (bytes_written != sizeof(data->val)); ++in->index; } err: return ret; } int file_put_list(struct stream * const in, const struct entry_l * const node) { int ret = 0; ssize_t bytes_written; struct entry_l tmp = {0}; try(in->index >= in->n, err, EINVAL, "can't add element: out of cache bounds"); if (!in->cnode) { /* if this is the very first one */ in->cnode = &in->cnode_f; } else { in->cnode->file_offset += sizeof(*node); /* writing the offset of the node being added to the current node's offset field */ do { try(0 > (bytes_written = pwrite(in->fd, &in->cnode->file_offset, sizeof(in->cnode->file_offset), in->cnode->file_offset - sizeof(*node) + sizeof(node->val))), err, errno, "Writing to stream failed with %i", ret); } while (bytes_written != sizeof(in->cnode->file_offset)); } tmp.val = node->val; if (in->index < in->n) { do { try(0 > (bytes_written = pwrite(in->fd, &tmp, sizeof(tmp), in->cnode->file_offset)), err, errno, "Writing to stream failed with %i", ret); } while (bytes_written != sizeof(tmp)); } ++in->index; err: return ret; } int file_transfer(struct stream * const src, struct stream * const dest) { int ret = 0; int tmp; /* just a simple switcheroo of file descriptors */ tmp = src->fd; src->fd = dest->fd; dest->fd = tmp; return ret; } /* generic, naïve and slow copy routine when an optimized one cannot be used */ int stream_copy_range(struct stream * const restrict src, struct stream * const restrict dest) { int ret = 0; size_t ss; struct entry_l tmp_store; try(src->n < dest->settings->to, err, EINVAL, "invalid copy size"); ss = dest->settings->ss; /* skip over to start position */ while (ss--) { src->get(src, &tmp_store); } do { dest->put(dest, src->get(src, &tmp_store)); } while (dest->index < (dest->n)); stream_rewind(src); stream_rewind(dest); err: return ret; } int stream_shallow_copy(struct stream const * const restrict src, struct stream * const dest) { int ret = 0; dest->n = src->n; dest->settings = src->settings; dest->index = 0; dest->get = src->get; dest->put = src->put; dest->copy = src->copy; if (src->settings->access == cached) { dest->fd = -1; dest->type = stream_lite; try_s((ret = cache_create(dest)), err); } else { dest->name = src->name; dest->type = stream_lite; try_s((ret = stream_open_lite(dest)), err); } err: return ret; } static int stream_open_out(struct stream * const in) { struct stat st; mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP; char *dname = NULL; char *tmp[2]; int ret = 0; tmp[0] = strdup(in->name); tmp[1] = dirname(tmp[0]); dname = strdup(tmp[1]); free(tmp[0]); try(stat(dname, &st), err, errno, "stat failed: %s", strerror(ret)); try(!(st.st_mode & S_IFDIR), err, EINVAL, "invalid output path"); if (!stat(in->name, &st)) { try(!(st.st_mode & S_IFREG), err, EINVAL, "the given output file already exists and is not a regular file"); mode = st.st_mode; } in->fd = open(dname, O_TMPFILE | O_RDWR, mode); try(in->fd < 0, err, errno, "failed creating temporary output file: %s", strerror(ret)); try(ftruncate(in->fd, in->settings->stride * in->n), err, errno, "failed setting output file size: %s", strerror(ret)); err: free(dname); return ret; } static int stream_open_lite(struct stream * const in) { struct stat st; char *dname = NULL; char *tmp[2]; int ret = 0; tmp[0] = strdup(in->name); tmp[1] = dirname(tmp[0]); dname = strdup(tmp[1]); free(tmp[0]); try(stat(dname, &st), err, errno, "stat failed: %s", strerror(ret)); try(!(st.st_mode & S_IFDIR), err, EINVAL, "invalid output path"); in->fd = open(dname, O_TMPFILE | O_RDWR, S_IRUSR | S_IWUSR); try(in->fd < 0, err, errno, "failed creating temporary output file: %s", strerror(ret)); try(ftruncate(in->fd, in->settings->stride * in->n), err, errno, "failed setting output file size: %s", strerror(ret)); err: free(dname); return ret; } static int stream_open_in(struct stream * const in) { struct stat st; int ret = 0; try(stat(in->name, &st), err, errno, "stat failed: %s", strerror(ret)); try(!(st.st_mode & S_IFREG) || !st.st_size || (st.st_size % in->settings->stride), err, EINVAL, "invalid input file"); in->n = st.st_size / in->settings->stride; in->fd = open(in->name, O_RDONLY | O_NOATIME); try(in->fd < 0, err, errno, "failed opening input file: %s", strerror(ret)); err: return ret; } static int stream_flush(struct stream * const in) { int ret = 0; try(in->settings->access != cached, err, EINVAL, "cannot flush an uncached stream"); try(in->type != stream_out, err, EINVAL, "cannot flush a non-output cache"); if (in->settings->format == array) { ret = stream_flush_array(in); } else { ret = stream_flush_list(in); } err: return ret; } static int stream_flush_array(struct stream * const in) { ssize_t ret = 0; size_t remaining = in->n * in->settings->stride; ssize_t written = 0; try(in->fd < 3, err, EINVAL, "can't dump without an open file"); do { ret = write(in->fd, in->cache + written, remaining); if (ret < 0) { try(errno != EAGAIN, err, errno, "Writing to stream failed with %zi", ret); } else { written += ret; remaining -= ret; ret = 0; } } while (remaining); err: return ret; } static int stream_flush_list(struct stream * const in) { ssize_t ret = 0; size_t remaining = in->n * in->settings->stride; ssize_t written = 0; size_t i; /* adjust pointers to have a base starting at 0, to cater to disk storage format. * File offsets are otherwise 1:1 to memory addresses, which is neat. * Just don't process the last offset, as that is always NULL/zero. * */ for (i = 0; i < (in->n - 1); ++i) { in->cache_l[i].offset = (char *) in->cache_l[i].next - in->cache; } /* dump the bloody list */ do { ret = write(in->fd, in->cache + written, remaining); if (ret < 0) { try(errno != EAGAIN, err, errno, "Writing to stream failed with %zi", ret); } else { written += ret; remaining -= ret; ret = 0; } } while (remaining); err: return ret; }