summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSuren A. Chilingaryan <csa@dside.dyndns.org>2012-11-29 11:43:08 +0100
committerSuren A. Chilingaryan <csa@dside.dyndns.org>2012-11-29 11:43:08 +0100
commitce7895919b2379de37b4004e7b17ed192cae30c4 (patch)
treea0d86846bb1b28f02db740d3f3a78be102b279dd
parent8cf73663783503d8e9ebef42e193bd90d6c84313 (diff)
downloadfwbench-ce7895919b2379de37b4004e7b17ed192cae30c4.tar.gz
fwbench-ce7895919b2379de37b4004e7b17ed192cae30c4.tar.bz2
fwbench-ce7895919b2379de37b4004e7b17ed192cae30c4.tar.xz
fwbench-ce7895919b2379de37b4004e7b17ed192cae30c4.zip
EDF reading emulation (iteration1)
-rw-r--r--.bzrignore1
-rw-r--r--CMakeLists.txt2
-rw-r--r--rndreader.c336
3 files changed, 339 insertions, 0 deletions
diff --git a/.bzrignore b/.bzrignore
index f0d8bc2..5c1c5bd 100644
--- a/.bzrignore
+++ b/.bzrignore
@@ -5,3 +5,4 @@ config.h
cmake_install.cmake
fwbench
seqreader
+rndreader
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 9595514..41f1068 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -35,6 +35,7 @@ include_directories(
add_executable(fwbench fwbench.c)
add_executable(seqreader seqreader.c)
+add_executable(rndreader rndreader.c)
if (USE_UFO_GENERATOR)
target_link_libraries(fwbench m ufo fastwriter ${GLIB2_LIBRARIES} ${GTHREAD2_LIBRARIES})
@@ -43,5 +44,6 @@ else (USE_UFO_GENERATOR)
endif (USE_UFO_GENERATOR)
target_link_libraries(seqreader aio)
+target_link_libraries(rndreader aio)
configure_file(config.h.in ${CMAKE_CURRENT_BINARY_DIR}/config.h)
diff --git a/rndreader.c b/rndreader.c
new file mode 100644
index 0000000..e1199d4
--- /dev/null
+++ b/rndreader.c
@@ -0,0 +1,336 @@
+#define _GNU_SOURCE
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/time.h>
+#include <unistd.h>
+#include <dirent.h>
+#include <fcntl.h>
+#include <string.h>
+#include <errno.h>
+
+#include <libaio.h>
+
+#define FASTWRITER_SYNCIO_ALIGN 4096
+
+#define SYNC_MODE
+#define AIO_MODE 2
+#define EXTRA_BUFFERS 2
+#define WRITE_INTERVAL 1
+
+size_t SKIP = 1;
+size_t SEGMENT = 0;
+size_t LINE = 0;
+
+#define RAID_STRIP_SIZE 256
+#define RAID_DISKS 8
+#define STRIPS_AT_ONCE 2
+
+#define MIN_BLOCK_SIZE (1024 * RAID_STRIP_SIZE)
+#define BLOCK_SIZE (1024 * RAID_STRIP_SIZE * RAID_DISKS * STRIPS_AT_ONCE)
+#define BUFSIZE (BLOCK_SIZE * (AIO_MODE + EXTRA_BUFFERS))
+
+#ifdef AIO_MODE
+# define SYNC_MODE
+#endif /* AIO_MODE */
+
+
+DIR *dir;
+struct dirent *ent;
+int flags = O_RDONLY|O_NOATIME|O_LARGEFILE|O_DIRECT;
+
+size_t run = 0; // file reading order
+size_t skip = 0; // file reading order
+
+int fd = -1; // we are currently scheduling reads from this file
+size_t offset = 0; // we are schedulling reads at this offset
+size_t size; // size of block we are schedulling to read
+size_t file_size; // total size of current file
+size_t file_block_size; // access unit for current file
+
+size_t curf = 0, schedf = 0;
+size_t curio = 0, schedio = 0;
+
+#define MAXLEN 128
+int done[AIO_MODE + EXTRA_BUFFERS];
+int done_finish[AIO_MODE + EXTRA_BUFFERS];
+char done_file[AIO_MODE + EXTRA_BUFFERS][MAXLEN + 1];
+size_t done_offset[AIO_MODE + EXTRA_BUFFERS];
+size_t done_size[AIO_MODE + EXTRA_BUFFERS];
+int done_fd[AIO_MODE + EXTRA_BUFFERS];
+
+#define buf(io) (io%(AIO_MODE + EXTRA_BUFFERS))
+#define max(a,b) (((a)>(b))?(a):(b))
+#define min(a,b) (((a)<(b))?(a):(b))
+#define page(size) (((size/4096)+(size%4096?1:0))*4096)
+
+
+int next_file() {
+ size_t size;
+ struct stat st;
+
+next:
+ while ((ent = readdir(dir))) {
+ skip += 1;
+ if ((((skip) - 1)%SKIP) != run) continue;
+ if (stat(ent->d_name, &st)) continue;
+ if (!S_ISREG(st.st_mode)) continue;
+ break;
+ }
+
+ if (ent) {
+ size = st.st_blksize;
+
+ int fd = open(ent->d_name, flags, 0);
+ if (fd < 0) goto next;
+
+ if (size < MIN_BLOCK_SIZE) size = BLOCK_SIZE;
+
+ if (size > BUFSIZE) {
+ printf("Buffer too small\n");
+ exit(1);
+ }
+
+ file_block_size = size;
+ file_size = st.st_size;
+ offset = 0;
+
+ return fd;
+
+ } else {
+ skip = 0;
+ run += 1;
+ if (run < SKIP) {
+ closedir(dir);
+ dir = opendir(".");
+ goto next;
+ }
+ }
+
+ return -1;
+}
+
+int next_block() {
+ int io;
+ size_t next_offset;
+
+ if (fd >= 0) {
+ if (LINE) offset += SEGMENT;
+ else offset += size;
+
+ if (offset >= file_size) {
+ fd = -1;
+ }
+ }
+
+ if (fd < 0) {
+ fd = next_file();
+ if (fd < 0) return -1;
+// printf("open ===> %s (%i)\n", ent->d_name, fd);
+ }
+
+ if (LINE) size = LINE;
+ else size = file_block_size;
+
+ if ((offset + size) > file_size) size = file_size - offset;
+
+ io = buf(schedio);
+ strncpy(done_file[io], ent->d_name, MAXLEN);
+ done_offset[io] = offset;
+ done_size[io] = size;
+ done_fd[io] = fd;
+ done[io] = 0;
+
+ if (LINE) next_offset = offset + SEGMENT;
+ else next_offset = offset + size;
+
+ if (next_offset >= file_size) done_finish[io] = 1;
+ else done_finish[io] = 0;
+
+ return 0;
+}
+
+
+
+int main(int argc, char *argv[]) {
+ int err;
+ int i, n, io;
+ long double mcoef = 1000000. / (1024 * 1024);
+
+ io_context_t aio;
+ struct iocb ios[AIO_MODE], *ioptr[AIO_MODE];
+
+ int events;
+ struct io_event ev[AIO_MODE];
+
+ int ready;
+ void *buffer;
+ struct timeval start, fstart, tv;
+
+ if (argc < 2) {
+ printf("Usage: %s <directory> [skip] [segment] [line]\n", argv[0]);
+ exit(0);
+ }
+
+ chdir(argv[1]);
+ dir = opendir(".");
+
+ if (argc > 2) SKIP = atoi(argv[2]);
+ if (argc > 3) SEGMENT = atoi(argv[3]);
+ if (argc > 4) LINE = atoi(argv[4]);
+
+ if (!SKIP) SKIP = 1;
+
+ printf("%s: Skip %zu, Segment %zu, Line %zu\n", argv[1], SKIP, SEGMENT, LINE);
+
+ posix_memalign((void**)&buffer, FASTWRITER_SYNCIO_ALIGN, BUFSIZE);
+
+ memset(done, 0, sizeof(done));
+ memset(&aio, 0, sizeof(aio));
+ io_queue_init(AIO_MODE, &aio);
+
+ for (schedio = 0; schedio < AIO_MODE; schedio++) {
+ ioptr[schedio] = &ios[schedio];
+ memset(ioptr[schedio], 0, sizeof(struct iocb));
+
+ err = next_block();
+ if (err) break;
+
+ io_prep_pread(ioptr[schedio], fd, buffer + schedio * BLOCK_SIZE, page(size), offset);
+ io_set_callback(ioptr[schedio], (void*)(uintptr_t)schedio);
+// printf("sched %zu: %zu (%zu %zu)\n", schedio, schedio, offset, size);
+ }
+
+ size_t us, fileus;
+ size_t last_write = 0;
+// size_t last_file_write = 0;
+// size_t last_file_size = 0;
+ size_t cur_file_size = 0;
+ size_t total_size = 0;
+ size_t files = 0;
+
+ gettimeofday(&start, NULL);
+ gettimeofday(&fstart, NULL);
+
+ n = io_submit(aio, schedio, ioptr);
+ if (n != schedio) {
+ printf("Failed to submit initial AIO job, io_submit returned %i\n", err);
+ exit(-1);
+ }
+
+ curio = 0;
+ events = 0;
+
+
+ ready = 0;
+
+ while ((err >= 0)||(curio != schedio)) {
+ io = buf(curio);
+
+ if (!done[io]) {
+// printf("%i,%i - %i [%i %i %i %i]\n", curio, schedio, events, done[0], done[1], done[2], done[3]);
+
+ if (curio < schedio) {
+ n = io_getevents(aio, 1, AIO_MODE + EXTRA_BUFFERS - events, &ev[events], NULL);
+ if (n < 0) {
+ printf("Error waiting for AIO (%i)\n", -err);
+ exit(-1);
+ }
+ } else {
+ n = 0;
+ }
+
+ if ((!ready)&&(n > 1)) {
+ printf("*** Multiple read requests (%i of %i) are finished simultaneously. It is either:\n", err, AIO_MODE);
+ printf(" Small buffer size (%i KB)\n", BLOCK_SIZE/1024);
+ printf(" More parallel AIOs (%i) than supported by kernel, try %i\n", AIO_MODE, AIO_MODE - err);
+ }
+
+ for (i = 0; i < n; i++) {
+ struct io_event *ep = &ev[events + i];
+ int doneio = (uintptr_t)ep->data;
+ io = buf(doneio);
+// printf("done %i: %lu %zu %zi\n", doneio, ep->res2, done_size[io], ep->res);
+ if (ep->res2 || (ep->res < done_size[io])) {
+ printf("Error in async IO (ret: %li, ret size: %zi, expected %zu)\n", ep->res2, ep->res, page(done_size[io]));
+ exit(-1);
+ }
+ done[io] = 1;
+// printf("done (%i): %i\n", i, doneio);
+ }
+
+ events += n;
+
+ for (i = events - 1; (i >= 0)&&((schedio - curio) < (AIO_MODE + EXTRA_BUFFERS)); i--) {
+ err = next_block();
+ if (err) break;
+
+// printf("sched %i: %zu (%i %zu %zu)\n", i, schedio, fd, offset, size);
+ struct iocb *newio = (struct iocb *)ev[i].obj;
+ memset(newio, 0, sizeof(struct iocb));
+ io_prep_pread(newio, fd, buffer + buf(schedio) * BLOCK_SIZE, page(size), offset);
+ io_set_callback(newio, (void*)(uintptr_t)schedio);
+ err = io_submit(aio, 1, &newio);
+ if (err != 1) {
+ printf("Failed to submit AIO jobs %i\n", err);
+ exit(-1);
+ }
+ schedio++;
+ }
+ events = i + 1;
+
+ if ((events)&&(!err)) {
+ printf("*** Unprocessed events (%i), probably not enough buffer space...\n", events);
+// printf(" curio (%zu), schedio (%zu)\n", curio, schedio);
+ }
+
+ ready = 1;
+ continue;
+ }
+
+ io = buf(curio);
+
+ cur_file_size += done_size[io];
+ total_size += done_size[io];
+
+ gettimeofday(&tv, NULL);
+ us = (tv.tv_sec - start.tv_sec) * 1000000 + (tv.tv_usec - start.tv_usec);
+ fileus = (tv.tv_sec - fstart.tv_sec) * 1000000 + (tv.tv_usec - fstart.tv_usec);
+
+ if (done_finish[io]) {
+// printf("closing %i ===> %s (%i)\n", io, done_file[io], done_fd[io]);
+ close(done_fd[io]);
+ gettimeofday(&fstart, NULL);
+ files++;
+ }
+
+ if ((us - last_write) > WRITE_INTERVAL * 1000000) {
+ last_write = us;
+ printf("Read: %lu files (%lu GB) at %zu MB/s", files, total_size / 1024 / 1024 / 1024, (size_t)(mcoef * total_size / us));
+
+ if (1) {//cur_file_size > BLOCK_SIZE) {
+ printf(", Last: %s (%lu MB) at %zu MB/s\n", done_file[io], cur_file_size / 1024 / 1024, (size_t)(mcoef * cur_file_size / fileus));
+ } else {
+ printf("\n");
+ }
+ }
+
+ if (done_finish[io]) {
+ cur_file_size = 0;
+ }
+
+ done[io] = 0;
+
+ curio++;
+ }
+
+ gettimeofday(&tv, NULL);
+ us = (tv.tv_sec - start.tv_sec) * 1000000 + (tv.tv_usec - start.tv_usec);
+ printf("Total: %lu files (%lu GB) at %zu MB/s\n", files, total_size / 1024 / 1024 / 1024, (size_t)(mcoef * total_size / us));
+
+ free(buffer);
+ closedir(dir);
+} \ No newline at end of file