diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/cache.c | 63 | ||||
-rw-r--r-- | src/cache.h | 7 | ||||
-rw-r--r-- | src/defs.h | 15 | ||||
-rw-r--r-- | src/io.c | 108 | ||||
-rw-r--r-- | src/io.h | 2 | ||||
-rw-r--r-- | src/main.c | 32 |
6 files changed, 140 insertions, 87 deletions
diff --git a/src/cache.c b/src/cache.c index 4fe0c46..bb41689 100644 --- a/src/cache.c +++ b/src/cache.c @@ -9,13 +9,13 @@ #include <rin/diagnostic.h> #include "defs.h" -int cache_create(struct stream * const in, const struct settings * const restrict s) +int cache_create(struct stream * const in) { int ret = 0; void *cache; - try(!in->cached, err, EINVAL, "cannot create cache: stream is uncached"); - try(!(cache = calloc(in->n, s->stride)), err, ENOMEM, "out of memory"); + try(in->settings->access != cached, err, EINVAL, "cannot create cache: stream is uncached"); + try(!(cache = calloc(in->n, in->settings->stride)), err, ENOMEM, "out of memory"); /* yeah... */ in->cache = cache; @@ -31,33 +31,22 @@ int cache_populate(struct stream * const in) size_t i; struct entry_l *tmp; - try(!in->cached, err, EINVAL, "cannot populate cache: stream is uncached"); - - for (i = 0; i < in->n && !ret; ++i) { - errno = 0; - tmp = in->get_next_element_direct(in); - if (tmp) { /* non-cache reads CAN fail */ - put(in, tmp); - } else { - ret = errno; - break; + try(in->settings->access != cached, err, EINVAL, "cannot populate cache: stream is uncached"); + + /* if reading a chardev, fall back to the one-element-at-a-time mode */ + if (in->type == stream_chardev) { + for (i = 0; i < in->n && !ret; ++i) { + errno = 0; + tmp = in->get_next_element_direct(in); + if (tmp) { /* non-cache reads CAN fail */ + put(in, tmp); + } else { + ret = errno; + break; + } } - } - -err: - return ret; -} - -int cache_flush(struct stream * const in) -{ - int ret = 0; - struct entry_l *tmp; - - try(!in->cached, err, EINVAL, "no cache to flush: stream is uncached"); - try(in->out != 1, err, EINVAL, "cannot flush a non-output cache"); - - while((tmp = get(in))) { - in->place_next_element_direct(in, tmp); + } else { + /* TODO */ } err: @@ -68,7 +57,7 @@ int cache_destroy(struct stream * const in) { int ret = 0; - try(!in->cached, err, EINVAL, "no cache to destroy: stream uncached"); + try(in->settings->access != cached, err, EINVAL, "cannot destroy cache: stream is uncached"); free(in->cache); in->cache_l = NULL; in->cache_a = NULL; @@ -81,7 +70,7 @@ int cache_transfer(struct stream * const src, struct stream * const dest) { int ret = 0; - try(!src->cached || !dest->cached, err, EINVAL, "cannot transfer caches of uncached streams"); + try(src->settings->access != cached || dest->settings->access != cached, err, EINVAL, "cannot transfer caches of uncached streams"); try(!src->cache, err, EINVAL, "no cache to transfer"); try(dest->cache, err, EINVAL, "cannot transfer cache: recipient cache already exists"); @@ -95,28 +84,28 @@ err: return ret; } -int cache_block_copy(struct stream const * const src, struct stream * const dest, const struct settings * const s) +int cache_block_copy(struct stream const * const src, struct stream * const dest) { int ret = 0; - try(!src->cached || !dest->cached, err, EINVAL, "cannot cache-copy between uncached streams"); + try(src->settings->access != cached || dest->settings->access != cached, err, EINVAL, "cannot cache-copy between uncached streams"); try(!src->cache, err, EINVAL, "no cache to transfer"); - try(!(src->n < s->to), err, EINVAL, "invalid copy size"); + try(!(src->n < dest->settings->to), err, EINVAL, "invalid copy size"); try(!dest->cache, err, EINVAL, "no cache to transfer to"); - memcpy(dest->cache, src->cache_a + s->ss, (s->to - s->ss) * s->stride); + memcpy(dest->cache, src->cache_a + dest->settings->ss, (dest->settings->to - dest->settings->ss) * dest->settings->stride); + dest->n = dest->settings->to - dest->settings->ss; err: return ret; } -int cache_list_copy(struct stream const * const src, struct stream * const dest, const struct settings * const s) +int cache_list_copy(struct stream const * const src, struct stream * const dest) { int ret = ENOSYS; (void) src; (void) dest; - (void) s; rin_warn("stub!"); diff --git a/src/cache.h b/src/cache.h index 9f333d3..055a686 100644 --- a/src/cache.h +++ b/src/cache.h @@ -9,15 +9,14 @@ #include "defs.h" /* INIT|DESTROY */ -int cache_create(struct stream * const in, const struct settings * const restrict s); +int cache_create(struct stream * const in); int cache_populate(struct stream * const in); -int cache_flush(struct stream * const in); int cache_destroy(struct stream * const in); /* BLOCKMANIP */ int cache_transfer(struct stream * const src, struct stream * const dest); -int cache_block_copy(struct stream const * const src, struct stream * const dest, const struct settings * const s); -int cache_list_copy(struct stream const * const src, struct stream * const dest, const struct settings * const s); +int cache_block_copy(struct stream const * const src, struct stream * const dest); +int cache_list_copy(struct stream const * const src, struct stream * const dest); int cache_block_split(struct stream * const src, struct stream * const A, struct stream * const B); int cache_list_split(struct stream * const src, struct stream * const A, struct stream * const B); @@ -23,8 +23,8 @@ }} while (0); -#define get(in) (in->cached ? in->get_next_element_cache(in) : in->get_next_element_direct(in)) -#define put(in, data) (in->cached ? in->place_next_element_cache(in, data) : in->place_next_element_direct(in, data)) +#define get(in) (in->settings->access == cached ? in->get_next_element_cache(in) : in->get_next_element_direct(in)) +#define put(in, data) (in->settings->access == cached ? in->place_next_element_cache(in, data) : in->place_next_element_direct(in, data)) #define stream_blank .fd = -1, .settings = &settings, .get_next_element_direct = stub_getnext, \ .get_next_element_cache = stub_getnext, .place_next_element_direct = stub_put, \ @@ -57,12 +57,19 @@ enum dataformat { list }; +enum streamtype { + stream_invalid = -1, + stream_in, + stream_out, + stream_outlite, + stream_chardev +}; + struct stream { struct stream *parent; size_t n; int fd; - int out; - int cached; + enum streamtype type; char *name; size_t stride; size_t index; @@ -17,27 +17,37 @@ #include "datagen.h" #include "cache.h" -static int stream_open_out(struct stream * const in, const struct settings * const s); -static int stream_open_out_lite(struct stream * const in, const struct settings * const s); -static int stream_open_in(struct stream * const in, const struct settings * const s); -static int stream_open_special(struct stream * const in); - -int stream_open(struct stream * const in, const struct settings * const s) +static int stream_open_out(struct stream * const in); +static int stream_open_out_lite(struct stream * const in); +static int stream_open_in(struct stream * const in); +static int stream_open_chardev(struct stream * const in); +static int stream_flush(struct stream * const in); +static int stream_flush_array(struct stream * const in); +static int stream_flush_list(struct stream * const in); + +int stream_open(struct stream * const in) { int ret = 0; - try(!in || in->fd > 0 || !in->name || !s, err, EINVAL, "invalid argunent"); + try(!in || in->fd > 0 || !in->name, err, EINVAL, "invalid argunent"); - if (in->out == 2) { - ret = stream_open_out_lite(in, s); - } else if (in->out == 1) { + switch (in->type) { + case (stream_outlite): + ret = stream_open_out_lite(in); + break; + case (stream_out): try(!in->name, err, EINVAL, "no filename given"); - ret = stream_open_out(in, s); - } else if (!in->out) { + ret = stream_open_out(in); + break; + case (stream_in): try(!in->name, err, EINVAL, "no filename given"); - ret = stream_open_in(in, s); - } else { - ret = stream_open_special(in); + ret = stream_open_in(in); + break; + case (stream_chardev): + ret = stream_open_chardev(in); + break; + default: + try(0, err, EINVAL, "cannot open stream: stream is invalid"); } err: @@ -50,7 +60,7 @@ int stream_close(struct stream * const in) try(!in || in->fd < 0, early_err, EINVAL, "invalid argunent"); - if (in->out != 1) { + if (in->type != stream_out) { goto out; } @@ -58,6 +68,8 @@ int stream_close(struct stream * const in) char path[PATH_MAX]; struct stat st; + try_s((ret = stream_flush(in)), err); + snprintf(path, PATH_MAX, "/proc/self/fd/%i", in->fd); if (!stat(in->name, &st)) { @@ -85,7 +97,7 @@ early_err: return ret; } -static int stream_open_out(struct stream * const in, const struct settings * const s) +static int stream_open_out(struct stream * const in) { struct stat st; mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP; @@ -108,14 +120,14 @@ static int stream_open_out(struct stream * const in, const struct settings * con in->fd = open(dname, O_TMPFILE | O_RDWR, mode); try(in->fd < 0, err, errno, "failed creating temporary output file: %s", strerror(ret)); - try(ftruncate(in->fd, s->stride * in->n), err, errno, "failed setting output file size: %s", strerror(ret)); + try(ftruncate(in->fd, in->settings->stride * in->n), err, errno, "failed setting output file size: %s", strerror(ret)); err: free(dname); return ret; } -static int stream_open_out_lite(struct stream * const in, const struct settings * const s) +static int stream_open_out_lite(struct stream * const in) { struct stat st; char *dname = NULL; @@ -132,21 +144,21 @@ static int stream_open_out_lite(struct stream * const in, const struct settings in->fd = open(dname, O_TMPFILE | O_RDWR, S_IRUSR | S_IWUSR); try(in->fd < 0, err, errno, "failed creating temporary output file: %s", strerror(ret)); - try(ftruncate(in->fd, s->stride * in->n), err, errno, "failed setting output file size: %s", strerror(ret)); + try(ftruncate(in->fd, in->settings->stride * in->n), err, errno, "failed setting output file size: %s", strerror(ret)); err: free(dname); return ret; } -static int stream_open_in(struct stream * const in, const struct settings * const s) +static int stream_open_in(struct stream * const in) { struct stat st; int ret = 0; try(stat(in->name, &st), err, errno, "stat failed: %s", strerror(ret)); - try(!(st.st_mode & S_IFREG) || !st.st_size || (st.st_size % s->stride), err, EINVAL, "invalid input file"); - in->n = st.st_size / s->stride; + try(!(st.st_mode & S_IFREG) || !st.st_size || (st.st_size % in->settings->stride), err, EINVAL, "invalid input file"); + in->n = st.st_size / in->settings->stride; in->fd = open(in->name, O_RDONLY | O_NOATIME); try(in->fd < 0, err, errno, "failed opening input file: %s", strerror(ret)); @@ -154,7 +166,7 @@ err: return ret; } -static int stream_open_special(struct stream * const in) +static int stream_open_chardev(struct stream * const in) { struct stat st; int ret = 0; @@ -166,3 +178,51 @@ static int stream_open_special(struct stream * const in) err: return ret; } + +int stream_flush(struct stream * const in) +{ + int ret = 0; + + try(in->settings->access != cached, err, EINVAL, "cannot flush an uncached stream"); + try(in->type != stream_out, err, EINVAL, "cannot flush a non-output cache"); + + if (in->settings->format == array) { + ret = stream_flush_array(in); + } else { + ret = stream_flush_list(in); + } +err: + return ret; +} + +int stream_flush_array(struct stream * const in) +{ + ssize_t ret = 0; + size_t remaining = in->n * in->settings->stride; + ssize_t written = 0; + + do { + ret = write(in->fd, in->cache + written, remaining); + if (ret < 0) { + try(errno != EAGAIN, err, errno, "Writing to stream failed with %zi", ret); + } else { + written += ret; + remaining -= ret; + ret = 0; + } + } while (remaining); + +err: + return ret; +} + +int stream_flush_list(struct stream * const in) +{ + int ret = ENOSYS; + + (void) in; + + rin_warn("stub!"); + + return ret; +} @@ -7,7 +7,7 @@ #include "defs.h" -int stream_open(struct stream * const in, const struct settings * const s); +int stream_open(struct stream * const in); int stream_close(struct stream * const in); int direct_get_array(struct stream * const restrict in, ssize_t idx, struct entry_l * const data); @@ -26,9 +26,9 @@ static int stub_split(struct stream * const in, struct stream * const a, struct static int stub_flush(struct stream * const in); /* these need to go AFTER the stub declarations */ -static struct stream file_in = {stream_blank}; -static struct stream file_out = {stream_blank, .out = 1}; -static struct stream file_tmp = {stream_blank, .out = 2}; +static struct stream file_in = {stream_blank, .type = stream_in}; +static struct stream file_out = {stream_blank, .type = stream_out}; +static struct stream file_tmp = {stream_blank, .type = stream_outlite}; int main(int argc, char **argv) { @@ -39,10 +39,8 @@ int main(int argc, char **argv) if (settings.opmode == mode_generate) { file_in.name = randfile; - file_in.out = -1; + file_in.type = stream_chardev; file_in.n = settings.to; - file_in.cached = 1; - file_out.cached = 1; } else { file_in.name = settings.filein; } @@ -50,22 +48,22 @@ int main(int argc, char **argv) load_io_functions(&settings, &file_in); file_out.name = settings.fileout ? settings.fileout : settings.filein; - try_s((ret = stream_open(&file_in, &settings)), out); + try_s((ret = stream_open(&file_in)), out); file_out.n = file_in.n - settings.ss; - try_s((ret = stream_open(&file_out, &settings)), out); + try_s((ret = stream_open(&file_out)), out); load_io_functions(&settings, &file_out); if (settings.access == cached) { - try_s((ret = cache_create(&file_in, &settings)), out); + try_s((ret = cache_create(&file_in)), out); try_s((ret = cache_populate(&file_in)), out); switch (settings.opmode) { case mode_fetch: if (settings.format == array) { - try_s((ret = cache_block_copy(&file_in, &file_out, &settings)), out); + try_s((ret = cache_block_copy(&file_in, &file_out)), out); } else { /* settings.format == list */ - try_s((ret = cache_list_copy(&file_in, &file_out, &settings)), out); + try_s((ret = cache_list_copy(&file_in, &file_out)), out); } break; case mode_generate: @@ -76,8 +74,6 @@ int main(int argc, char **argv) ; } - try_s((ret = cache_flush(&file_out)), out); - } else { /* uncached */ /* TODO */ } @@ -88,7 +84,7 @@ int main(int argc, char **argv) while (0) { out: stream_close(&file_in); - file_out.out = 2; /* in case of error-exit, just close, don't link */ + file_out.type = stream_invalid; /* in case of error-exit, just close, don't link */ stream_close(&file_out); stream_close(&file_tmp); } @@ -184,19 +180,19 @@ int load_io_functions(struct settings const * const s, struct stream * const in) { int ret = 0; - if (in->out == 1) { + if (in->type == stream_out) { if (s->format == array) { /* TODO */ } else { /* TODO */ } - } else if (!in->out) { /* reading streams do not support dumping */ + } else if (in->type == stream_in) { /* reading streams do not support dumping */ if (s->format == array) { /* TODO */ } else { /* TODO */ } - } else { /* data generation streams do not support dumping nor any cache I/O beyond initial data generation */ + } else if (in->type == stream_chardev) { /* data generation streams do not support dumping nor any cache I/O beyond initial data generation */ if (s->format == array) { in->get_next_element_direct = gen_get_array; in->place_next_element_cache = cached_put_array; @@ -204,6 +200,8 @@ int load_io_functions(struct settings const * const s, struct stream * const in) in->get_next_element_direct = gen_get_list; in->place_next_element_cache = cached_put_list; } + } else { + ret = EINVAL; } return ret; |