summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGravatar Gediminas Jakutis <gediminas@varciai.lt> 2021-02-11 08:17:03 +0200
committerGravatar Gediminas Jakutis <gediminas@varciai.lt> 2021-02-11 08:17:03 +0200
commit4a14ab7ab48e3fd591dde33d59c6d29fc39f1c5d (patch)
tree8d4e527e4a8ad76819e3baa1b89441eeb1081f15
parent41efe7b8f9f67d5956ab677f3631478c48114ac1 (diff)
downloadalgos-ld1-4a14ab7ab48e3fd591dde33d59c6d29fc39f1c5d.tar.gz
algos-ld1-4a14ab7ab48e3fd591dde33d59c6d29fc39f1c5d.tar.bz2
algos-ld1-4a14ab7ab48e3fd591dde33d59c6d29fc39f1c5d.zip
continue the overhaul.
we can finally create a basic array input data file too! Signed-off-by: Gediminas Jakutis <gediminas@varciai.lt>
-rw-r--r--src/cache.c63
-rw-r--r--src/cache.h7
-rw-r--r--src/defs.h15
-rw-r--r--src/io.c108
-rw-r--r--src/io.h2
-rw-r--r--src/main.c32
6 files changed, 140 insertions, 87 deletions
diff --git a/src/cache.c b/src/cache.c
index 4fe0c46..bb41689 100644
--- a/src/cache.c
+++ b/src/cache.c
@@ -9,13 +9,13 @@
#include <rin/diagnostic.h>
#include "defs.h"
-int cache_create(struct stream * const in, const struct settings * const restrict s)
+int cache_create(struct stream * const in)
{
int ret = 0;
void *cache;
- try(!in->cached, err, EINVAL, "cannot create cache: stream is uncached");
- try(!(cache = calloc(in->n, s->stride)), err, ENOMEM, "out of memory");
+ try(in->settings->access != cached, err, EINVAL, "cannot create cache: stream is uncached");
+ try(!(cache = calloc(in->n, in->settings->stride)), err, ENOMEM, "out of memory");
/* yeah... */
in->cache = cache;
@@ -31,33 +31,22 @@ int cache_populate(struct stream * const in)
size_t i;
struct entry_l *tmp;
- try(!in->cached, err, EINVAL, "cannot populate cache: stream is uncached");
-
- for (i = 0; i < in->n && !ret; ++i) {
- errno = 0;
- tmp = in->get_next_element_direct(in);
- if (tmp) { /* non-cache reads CAN fail */
- put(in, tmp);
- } else {
- ret = errno;
- break;
+ try(in->settings->access != cached, err, EINVAL, "cannot populate cache: stream is uncached");
+
+ /* if reading a chardev, fall back to the one-element-at-a-time mode */
+ if (in->type == stream_chardev) {
+ for (i = 0; i < in->n && !ret; ++i) {
+ errno = 0;
+ tmp = in->get_next_element_direct(in);
+ if (tmp) { /* non-cache reads CAN fail */
+ put(in, tmp);
+ } else {
+ ret = errno;
+ break;
+ }
}
- }
-
-err:
- return ret;
-}
-
-int cache_flush(struct stream * const in)
-{
- int ret = 0;
- struct entry_l *tmp;
-
- try(!in->cached, err, EINVAL, "no cache to flush: stream is uncached");
- try(in->out != 1, err, EINVAL, "cannot flush a non-output cache");
-
- while((tmp = get(in))) {
- in->place_next_element_direct(in, tmp);
+ } else {
+ /* TODO */
}
err:
@@ -68,7 +57,7 @@ int cache_destroy(struct stream * const in)
{
int ret = 0;
- try(!in->cached, err, EINVAL, "no cache to destroy: stream uncached");
+ try(in->settings->access != cached, err, EINVAL, "cannot destroy cache: stream is uncached");
free(in->cache);
in->cache_l = NULL;
in->cache_a = NULL;
@@ -81,7 +70,7 @@ int cache_transfer(struct stream * const src, struct stream * const dest)
{
int ret = 0;
- try(!src->cached || !dest->cached, err, EINVAL, "cannot transfer caches of uncached streams");
+ try(src->settings->access != cached || dest->settings->access != cached, err, EINVAL, "cannot transfer caches of uncached streams");
try(!src->cache, err, EINVAL, "no cache to transfer");
try(dest->cache, err, EINVAL, "cannot transfer cache: recipient cache already exists");
@@ -95,28 +84,28 @@ err:
return ret;
}
-int cache_block_copy(struct stream const * const src, struct stream * const dest, const struct settings * const s)
+int cache_block_copy(struct stream const * const src, struct stream * const dest)
{
int ret = 0;
- try(!src->cached || !dest->cached, err, EINVAL, "cannot cache-copy between uncached streams");
+ try(src->settings->access != cached || dest->settings->access != cached, err, EINVAL, "cannot cache-copy between uncached streams");
try(!src->cache, err, EINVAL, "no cache to transfer");
- try(!(src->n < s->to), err, EINVAL, "invalid copy size");
+ try(!(src->n < dest->settings->to), err, EINVAL, "invalid copy size");
try(!dest->cache, err, EINVAL, "no cache to transfer to");
- memcpy(dest->cache, src->cache_a + s->ss, (s->to - s->ss) * s->stride);
+ memcpy(dest->cache, src->cache_a + dest->settings->ss, (dest->settings->to - dest->settings->ss) * dest->settings->stride);
+ dest->n = dest->settings->to - dest->settings->ss;
err:
return ret;
}
-int cache_list_copy(struct stream const * const src, struct stream * const dest, const struct settings * const s)
+int cache_list_copy(struct stream const * const src, struct stream * const dest)
{
int ret = ENOSYS;
(void) src;
(void) dest;
- (void) s;
rin_warn("stub!");
diff --git a/src/cache.h b/src/cache.h
index 9f333d3..055a686 100644
--- a/src/cache.h
+++ b/src/cache.h
@@ -9,15 +9,14 @@
#include "defs.h"
/* INIT|DESTROY */
-int cache_create(struct stream * const in, const struct settings * const restrict s);
+int cache_create(struct stream * const in);
int cache_populate(struct stream * const in);
-int cache_flush(struct stream * const in);
int cache_destroy(struct stream * const in);
/* BLOCKMANIP */
int cache_transfer(struct stream * const src, struct stream * const dest);
-int cache_block_copy(struct stream const * const src, struct stream * const dest, const struct settings * const s);
-int cache_list_copy(struct stream const * const src, struct stream * const dest, const struct settings * const s);
+int cache_block_copy(struct stream const * const src, struct stream * const dest);
+int cache_list_copy(struct stream const * const src, struct stream * const dest);
int cache_block_split(struct stream * const src, struct stream * const A, struct stream * const B);
int cache_list_split(struct stream * const src, struct stream * const A, struct stream * const B);
diff --git a/src/defs.h b/src/defs.h
index cb03c81..39960d9 100644
--- a/src/defs.h
+++ b/src/defs.h
@@ -23,8 +23,8 @@
}} while (0);
-#define get(in) (in->cached ? in->get_next_element_cache(in) : in->get_next_element_direct(in))
-#define put(in, data) (in->cached ? in->place_next_element_cache(in, data) : in->place_next_element_direct(in, data))
+#define get(in) (in->settings->access == cached ? in->get_next_element_cache(in) : in->get_next_element_direct(in))
+#define put(in, data) (in->settings->access == cached ? in->place_next_element_cache(in, data) : in->place_next_element_direct(in, data))
#define stream_blank .fd = -1, .settings = &settings, .get_next_element_direct = stub_getnext, \
.get_next_element_cache = stub_getnext, .place_next_element_direct = stub_put, \
@@ -57,12 +57,19 @@ enum dataformat {
list
};
+enum streamtype {
+ stream_invalid = -1,
+ stream_in,
+ stream_out,
+ stream_outlite,
+ stream_chardev
+};
+
struct stream {
struct stream *parent;
size_t n;
int fd;
- int out;
- int cached;
+ enum streamtype type;
char *name;
size_t stride;
size_t index;
diff --git a/src/io.c b/src/io.c
index d323573..41c5880 100644
--- a/src/io.c
+++ b/src/io.c
@@ -17,27 +17,37 @@
#include "datagen.h"
#include "cache.h"
-static int stream_open_out(struct stream * const in, const struct settings * const s);
-static int stream_open_out_lite(struct stream * const in, const struct settings * const s);
-static int stream_open_in(struct stream * const in, const struct settings * const s);
-static int stream_open_special(struct stream * const in);
-
-int stream_open(struct stream * const in, const struct settings * const s)
+static int stream_open_out(struct stream * const in);
+static int stream_open_out_lite(struct stream * const in);
+static int stream_open_in(struct stream * const in);
+static int stream_open_chardev(struct stream * const in);
+static int stream_flush(struct stream * const in);
+static int stream_flush_array(struct stream * const in);
+static int stream_flush_list(struct stream * const in);
+
+int stream_open(struct stream * const in)
{
int ret = 0;
- try(!in || in->fd > 0 || !in->name || !s, err, EINVAL, "invalid argunent");
+ try(!in || in->fd > 0 || !in->name, err, EINVAL, "invalid argunent");
- if (in->out == 2) {
- ret = stream_open_out_lite(in, s);
- } else if (in->out == 1) {
+ switch (in->type) {
+ case (stream_outlite):
+ ret = stream_open_out_lite(in);
+ break;
+ case (stream_out):
try(!in->name, err, EINVAL, "no filename given");
- ret = stream_open_out(in, s);
- } else if (!in->out) {
+ ret = stream_open_out(in);
+ break;
+ case (stream_in):
try(!in->name, err, EINVAL, "no filename given");
- ret = stream_open_in(in, s);
- } else {
- ret = stream_open_special(in);
+ ret = stream_open_in(in);
+ break;
+ case (stream_chardev):
+ ret = stream_open_chardev(in);
+ break;
+ default:
+ try(0, err, EINVAL, "cannot open stream: stream is invalid");
}
err:
@@ -50,7 +60,7 @@ int stream_close(struct stream * const in)
try(!in || in->fd < 0, early_err, EINVAL, "invalid argunent");
- if (in->out != 1) {
+ if (in->type != stream_out) {
goto out;
}
@@ -58,6 +68,8 @@ int stream_close(struct stream * const in)
char path[PATH_MAX];
struct stat st;
+ try_s((ret = stream_flush(in)), err);
+
snprintf(path, PATH_MAX, "/proc/self/fd/%i", in->fd);
if (!stat(in->name, &st)) {
@@ -85,7 +97,7 @@ early_err:
return ret;
}
-static int stream_open_out(struct stream * const in, const struct settings * const s)
+static int stream_open_out(struct stream * const in)
{
struct stat st;
mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP;
@@ -108,14 +120,14 @@ static int stream_open_out(struct stream * const in, const struct settings * con
in->fd = open(dname, O_TMPFILE | O_RDWR, mode);
try(in->fd < 0, err, errno, "failed creating temporary output file: %s", strerror(ret));
- try(ftruncate(in->fd, s->stride * in->n), err, errno, "failed setting output file size: %s", strerror(ret));
+ try(ftruncate(in->fd, in->settings->stride * in->n), err, errno, "failed setting output file size: %s", strerror(ret));
err:
free(dname);
return ret;
}
-static int stream_open_out_lite(struct stream * const in, const struct settings * const s)
+static int stream_open_out_lite(struct stream * const in)
{
struct stat st;
char *dname = NULL;
@@ -132,21 +144,21 @@ static int stream_open_out_lite(struct stream * const in, const struct settings
in->fd = open(dname, O_TMPFILE | O_RDWR, S_IRUSR | S_IWUSR);
try(in->fd < 0, err, errno, "failed creating temporary output file: %s", strerror(ret));
- try(ftruncate(in->fd, s->stride * in->n), err, errno, "failed setting output file size: %s", strerror(ret));
+ try(ftruncate(in->fd, in->settings->stride * in->n), err, errno, "failed setting output file size: %s", strerror(ret));
err:
free(dname);
return ret;
}
-static int stream_open_in(struct stream * const in, const struct settings * const s)
+static int stream_open_in(struct stream * const in)
{
struct stat st;
int ret = 0;
try(stat(in->name, &st), err, errno, "stat failed: %s", strerror(ret));
- try(!(st.st_mode & S_IFREG) || !st.st_size || (st.st_size % s->stride), err, EINVAL, "invalid input file");
- in->n = st.st_size / s->stride;
+ try(!(st.st_mode & S_IFREG) || !st.st_size || (st.st_size % in->settings->stride), err, EINVAL, "invalid input file");
+ in->n = st.st_size / in->settings->stride;
in->fd = open(in->name, O_RDONLY | O_NOATIME);
try(in->fd < 0, err, errno, "failed opening input file: %s", strerror(ret));
@@ -154,7 +166,7 @@ err:
return ret;
}
-static int stream_open_special(struct stream * const in)
+static int stream_open_chardev(struct stream * const in)
{
struct stat st;
int ret = 0;
@@ -166,3 +178,51 @@ static int stream_open_special(struct stream * const in)
err:
return ret;
}
+
+int stream_flush(struct stream * const in)
+{
+ int ret = 0;
+
+ try(in->settings->access != cached, err, EINVAL, "cannot flush an uncached stream");
+ try(in->type != stream_out, err, EINVAL, "cannot flush a non-output cache");
+
+ if (in->settings->format == array) {
+ ret = stream_flush_array(in);
+ } else {
+ ret = stream_flush_list(in);
+ }
+err:
+ return ret;
+}
+
+int stream_flush_array(struct stream * const in)
+{
+ ssize_t ret = 0;
+ size_t remaining = in->n * in->settings->stride;
+ ssize_t written = 0;
+
+ do {
+ ret = write(in->fd, in->cache + written, remaining);
+ if (ret < 0) {
+ try(errno != EAGAIN, err, errno, "Writing to stream failed with %zi", ret);
+ } else {
+ written += ret;
+ remaining -= ret;
+ ret = 0;
+ }
+ } while (remaining);
+
+err:
+ return ret;
+}
+
+int stream_flush_list(struct stream * const in)
+{
+ int ret = ENOSYS;
+
+ (void) in;
+
+ rin_warn("stub!");
+
+ return ret;
+}
diff --git a/src/io.h b/src/io.h
index eca6aec..91f1f88 100644
--- a/src/io.h
+++ b/src/io.h
@@ -7,7 +7,7 @@
#include "defs.h"
-int stream_open(struct stream * const in, const struct settings * const s);
+int stream_open(struct stream * const in);
int stream_close(struct stream * const in);
int direct_get_array(struct stream * const restrict in, ssize_t idx, struct entry_l * const data);
diff --git a/src/main.c b/src/main.c
index 352c531..1733f84 100644
--- a/src/main.c
+++ b/src/main.c
@@ -26,9 +26,9 @@ static int stub_split(struct stream * const in, struct stream * const a, struct
static int stub_flush(struct stream * const in);
/* these need to go AFTER the stub declarations */
-static struct stream file_in = {stream_blank};
-static struct stream file_out = {stream_blank, .out = 1};
-static struct stream file_tmp = {stream_blank, .out = 2};
+static struct stream file_in = {stream_blank, .type = stream_in};
+static struct stream file_out = {stream_blank, .type = stream_out};
+static struct stream file_tmp = {stream_blank, .type = stream_outlite};
int main(int argc, char **argv)
{
@@ -39,10 +39,8 @@ int main(int argc, char **argv)
if (settings.opmode == mode_generate) {
file_in.name = randfile;
- file_in.out = -1;
+ file_in.type = stream_chardev;
file_in.n = settings.to;
- file_in.cached = 1;
- file_out.cached = 1;
} else {
file_in.name = settings.filein;
}
@@ -50,22 +48,22 @@ int main(int argc, char **argv)
load_io_functions(&settings, &file_in);
file_out.name = settings.fileout ? settings.fileout : settings.filein;
- try_s((ret = stream_open(&file_in, &settings)), out);
+ try_s((ret = stream_open(&file_in)), out);
file_out.n = file_in.n - settings.ss;
- try_s((ret = stream_open(&file_out, &settings)), out);
+ try_s((ret = stream_open(&file_out)), out);
load_io_functions(&settings, &file_out);
if (settings.access == cached) {
- try_s((ret = cache_create(&file_in, &settings)), out);
+ try_s((ret = cache_create(&file_in)), out);
try_s((ret = cache_populate(&file_in)), out);
switch (settings.opmode) {
case mode_fetch:
if (settings.format == array) {
- try_s((ret = cache_block_copy(&file_in, &file_out, &settings)), out);
+ try_s((ret = cache_block_copy(&file_in, &file_out)), out);
} else { /* settings.format == list */
- try_s((ret = cache_list_copy(&file_in, &file_out, &settings)), out);
+ try_s((ret = cache_list_copy(&file_in, &file_out)), out);
}
break;
case mode_generate:
@@ -76,8 +74,6 @@ int main(int argc, char **argv)
;
}
- try_s((ret = cache_flush(&file_out)), out);
-
} else { /* uncached */
/* TODO */
}
@@ -88,7 +84,7 @@ int main(int argc, char **argv)
while (0) {
out:
stream_close(&file_in);
- file_out.out = 2; /* in case of error-exit, just close, don't link */
+ file_out.type = stream_invalid; /* in case of error-exit, just close, don't link */
stream_close(&file_out);
stream_close(&file_tmp);
}
@@ -184,19 +180,19 @@ int load_io_functions(struct settings const * const s, struct stream * const in)
{
int ret = 0;
- if (in->out == 1) {
+ if (in->type == stream_out) {
if (s->format == array) {
/* TODO */
} else {
/* TODO */
}
- } else if (!in->out) { /* reading streams do not support dumping */
+ } else if (in->type == stream_in) { /* reading streams do not support dumping */
if (s->format == array) {
/* TODO */
} else {
/* TODO */
}
- } else { /* data generation streams do not support dumping nor any cache I/O beyond initial data generation */
+ } else if (in->type == stream_chardev) { /* data generation streams do not support dumping nor any cache I/O beyond initial data generation */
if (s->format == array) {
in->get_next_element_direct = gen_get_array;
in->place_next_element_cache = cached_put_array;
@@ -204,6 +200,8 @@ int load_io_functions(struct settings const * const s, struct stream * const in)
in->get_next_element_direct = gen_get_list;
in->place_next_element_cache = cached_put_list;
}
+ } else {
+ ret = EINVAL;
}
return ret;