From ce7895919b2379de37b4004e7b17ed192cae30c4 Mon Sep 17 00:00:00 2001 From: "Suren A. Chilingaryan" Date: Thu, 29 Nov 2012 11:43:08 +0100 Subject: EDF reading emulation (iteration1) --- .bzrignore | 1 + CMakeLists.txt | 2 + rndreader.c | 336 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 339 insertions(+) create mode 100644 rndreader.c diff --git a/.bzrignore b/.bzrignore index f0d8bc2..5c1c5bd 100644 --- a/.bzrignore +++ b/.bzrignore @@ -5,3 +5,4 @@ config.h cmake_install.cmake fwbench seqreader +rndreader diff --git a/CMakeLists.txt b/CMakeLists.txt index 9595514..41f1068 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -35,6 +35,7 @@ include_directories( add_executable(fwbench fwbench.c) add_executable(seqreader seqreader.c) +add_executable(rndreader rndreader.c) if (USE_UFO_GENERATOR) target_link_libraries(fwbench m ufo fastwriter ${GLIB2_LIBRARIES} ${GTHREAD2_LIBRARIES}) @@ -43,5 +44,6 @@ else (USE_UFO_GENERATOR) endif (USE_UFO_GENERATOR) target_link_libraries(seqreader aio) +target_link_libraries(rndreader aio) configure_file(config.h.in ${CMAKE_CURRENT_BINARY_DIR}/config.h) diff --git a/rndreader.c b/rndreader.c new file mode 100644 index 0000000..e1199d4 --- /dev/null +++ b/rndreader.c @@ -0,0 +1,336 @@ +#define _GNU_SOURCE + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#define FASTWRITER_SYNCIO_ALIGN 4096 + +#define SYNC_MODE +#define AIO_MODE 2 +#define EXTRA_BUFFERS 2 +#define WRITE_INTERVAL 1 + +size_t SKIP = 1; +size_t SEGMENT = 0; +size_t LINE = 0; + +#define RAID_STRIP_SIZE 256 +#define RAID_DISKS 8 +#define STRIPS_AT_ONCE 2 + +#define MIN_BLOCK_SIZE (1024 * RAID_STRIP_SIZE) +#define BLOCK_SIZE (1024 * RAID_STRIP_SIZE * RAID_DISKS * STRIPS_AT_ONCE) +#define BUFSIZE (BLOCK_SIZE * (AIO_MODE + EXTRA_BUFFERS)) + +#ifdef AIO_MODE +# define SYNC_MODE +#endif /* AIO_MODE */ + + +DIR *dir; +struct dirent *ent; +int flags = O_RDONLY|O_NOATIME|O_LARGEFILE|O_DIRECT; + +size_t run = 0; // file reading order +size_t skip = 0; // file reading order + +int fd = -1; // we are currently scheduling reads from this file +size_t offset = 0; // we are schedulling reads at this offset +size_t size; // size of block we are schedulling to read +size_t file_size; // total size of current file +size_t file_block_size; // access unit for current file + +size_t curf = 0, schedf = 0; +size_t curio = 0, schedio = 0; + +#define MAXLEN 128 +int done[AIO_MODE + EXTRA_BUFFERS]; +int done_finish[AIO_MODE + EXTRA_BUFFERS]; +char done_file[AIO_MODE + EXTRA_BUFFERS][MAXLEN + 1]; +size_t done_offset[AIO_MODE + EXTRA_BUFFERS]; +size_t done_size[AIO_MODE + EXTRA_BUFFERS]; +int done_fd[AIO_MODE + EXTRA_BUFFERS]; + +#define buf(io) (io%(AIO_MODE + EXTRA_BUFFERS)) +#define max(a,b) (((a)>(b))?(a):(b)) +#define min(a,b) (((a)<(b))?(a):(b)) +#define page(size) (((size/4096)+(size%4096?1:0))*4096) + + +int next_file() { + size_t size; + struct stat st; + +next: + while ((ent = readdir(dir))) { + skip += 1; + if ((((skip) - 1)%SKIP) != run) continue; + if (stat(ent->d_name, &st)) continue; + if (!S_ISREG(st.st_mode)) continue; + break; + } + + if (ent) { + size = st.st_blksize; + + int fd = open(ent->d_name, flags, 0); + if (fd < 0) goto next; + + if (size < MIN_BLOCK_SIZE) size = BLOCK_SIZE; + + if (size > BUFSIZE) { + printf("Buffer too small\n"); + exit(1); + } + + file_block_size = size; + file_size = st.st_size; + offset = 0; + + return fd; + + } else { + skip = 0; + run += 1; + if (run < SKIP) { + closedir(dir); + dir = opendir("."); + goto next; + } + } + + return -1; +} + +int next_block() { + int io; + size_t next_offset; + + if (fd >= 0) { + if (LINE) offset += SEGMENT; + else offset += size; + + if (offset >= file_size) { + fd = -1; + } + } + + if (fd < 0) { + fd = next_file(); + if (fd < 0) return -1; +// printf("open ===> %s (%i)\n", ent->d_name, fd); + } + + if (LINE) size = LINE; + else size = file_block_size; + + if ((offset + size) > file_size) size = file_size - offset; + + io = buf(schedio); + strncpy(done_file[io], ent->d_name, MAXLEN); + done_offset[io] = offset; + done_size[io] = size; + done_fd[io] = fd; + done[io] = 0; + + if (LINE) next_offset = offset + SEGMENT; + else next_offset = offset + size; + + if (next_offset >= file_size) done_finish[io] = 1; + else done_finish[io] = 0; + + return 0; +} + + + +int main(int argc, char *argv[]) { + int err; + int i, n, io; + long double mcoef = 1000000. / (1024 * 1024); + + io_context_t aio; + struct iocb ios[AIO_MODE], *ioptr[AIO_MODE]; + + int events; + struct io_event ev[AIO_MODE]; + + int ready; + void *buffer; + struct timeval start, fstart, tv; + + if (argc < 2) { + printf("Usage: %s [skip] [segment] [line]\n", argv[0]); + exit(0); + } + + chdir(argv[1]); + dir = opendir("."); + + if (argc > 2) SKIP = atoi(argv[2]); + if (argc > 3) SEGMENT = atoi(argv[3]); + if (argc > 4) LINE = atoi(argv[4]); + + if (!SKIP) SKIP = 1; + + printf("%s: Skip %zu, Segment %zu, Line %zu\n", argv[1], SKIP, SEGMENT, LINE); + + posix_memalign((void**)&buffer, FASTWRITER_SYNCIO_ALIGN, BUFSIZE); + + memset(done, 0, sizeof(done)); + memset(&aio, 0, sizeof(aio)); + io_queue_init(AIO_MODE, &aio); + + for (schedio = 0; schedio < AIO_MODE; schedio++) { + ioptr[schedio] = &ios[schedio]; + memset(ioptr[schedio], 0, sizeof(struct iocb)); + + err = next_block(); + if (err) break; + + io_prep_pread(ioptr[schedio], fd, buffer + schedio * BLOCK_SIZE, page(size), offset); + io_set_callback(ioptr[schedio], (void*)(uintptr_t)schedio); +// printf("sched %zu: %zu (%zu %zu)\n", schedio, schedio, offset, size); + } + + size_t us, fileus; + size_t last_write = 0; +// size_t last_file_write = 0; +// size_t last_file_size = 0; + size_t cur_file_size = 0; + size_t total_size = 0; + size_t files = 0; + + gettimeofday(&start, NULL); + gettimeofday(&fstart, NULL); + + n = io_submit(aio, schedio, ioptr); + if (n != schedio) { + printf("Failed to submit initial AIO job, io_submit returned %i\n", err); + exit(-1); + } + + curio = 0; + events = 0; + + + ready = 0; + + while ((err >= 0)||(curio != schedio)) { + io = buf(curio); + + if (!done[io]) { +// printf("%i,%i - %i [%i %i %i %i]\n", curio, schedio, events, done[0], done[1], done[2], done[3]); + + if (curio < schedio) { + n = io_getevents(aio, 1, AIO_MODE + EXTRA_BUFFERS - events, &ev[events], NULL); + if (n < 0) { + printf("Error waiting for AIO (%i)\n", -err); + exit(-1); + } + } else { + n = 0; + } + + if ((!ready)&&(n > 1)) { + printf("*** Multiple read requests (%i of %i) are finished simultaneously. It is either:\n", err, AIO_MODE); + printf(" Small buffer size (%i KB)\n", BLOCK_SIZE/1024); + printf(" More parallel AIOs (%i) than supported by kernel, try %i\n", AIO_MODE, AIO_MODE - err); + } + + for (i = 0; i < n; i++) { + struct io_event *ep = &ev[events + i]; + int doneio = (uintptr_t)ep->data; + io = buf(doneio); +// printf("done %i: %lu %zu %zi\n", doneio, ep->res2, done_size[io], ep->res); + if (ep->res2 || (ep->res < done_size[io])) { + printf("Error in async IO (ret: %li, ret size: %zi, expected %zu)\n", ep->res2, ep->res, page(done_size[io])); + exit(-1); + } + done[io] = 1; +// printf("done (%i): %i\n", i, doneio); + } + + events += n; + + for (i = events - 1; (i >= 0)&&((schedio - curio) < (AIO_MODE + EXTRA_BUFFERS)); i--) { + err = next_block(); + if (err) break; + +// printf("sched %i: %zu (%i %zu %zu)\n", i, schedio, fd, offset, size); + struct iocb *newio = (struct iocb *)ev[i].obj; + memset(newio, 0, sizeof(struct iocb)); + io_prep_pread(newio, fd, buffer + buf(schedio) * BLOCK_SIZE, page(size), offset); + io_set_callback(newio, (void*)(uintptr_t)schedio); + err = io_submit(aio, 1, &newio); + if (err != 1) { + printf("Failed to submit AIO jobs %i\n", err); + exit(-1); + } + schedio++; + } + events = i + 1; + + if ((events)&&(!err)) { + printf("*** Unprocessed events (%i), probably not enough buffer space...\n", events); +// printf(" curio (%zu), schedio (%zu)\n", curio, schedio); + } + + ready = 1; + continue; + } + + io = buf(curio); + + cur_file_size += done_size[io]; + total_size += done_size[io]; + + gettimeofday(&tv, NULL); + us = (tv.tv_sec - start.tv_sec) * 1000000 + (tv.tv_usec - start.tv_usec); + fileus = (tv.tv_sec - fstart.tv_sec) * 1000000 + (tv.tv_usec - fstart.tv_usec); + + if (done_finish[io]) { +// printf("closing %i ===> %s (%i)\n", io, done_file[io], done_fd[io]); + close(done_fd[io]); + gettimeofday(&fstart, NULL); + files++; + } + + if ((us - last_write) > WRITE_INTERVAL * 1000000) { + last_write = us; + printf("Read: %lu files (%lu GB) at %zu MB/s", files, total_size / 1024 / 1024 / 1024, (size_t)(mcoef * total_size / us)); + + if (1) {//cur_file_size > BLOCK_SIZE) { + printf(", Last: %s (%lu MB) at %zu MB/s\n", done_file[io], cur_file_size / 1024 / 1024, (size_t)(mcoef * cur_file_size / fileus)); + } else { + printf("\n"); + } + } + + if (done_finish[io]) { + cur_file_size = 0; + } + + done[io] = 0; + + curio++; + } + + gettimeofday(&tv, NULL); + us = (tv.tv_sec - start.tv_sec) * 1000000 + (tv.tv_usec - start.tv_usec); + printf("Total: %lu files (%lu GB) at %zu MB/s\n", files, total_size / 1024 / 1024 / 1024, (size_t)(mcoef * total_size / us)); + + free(buffer); + closedir(dir); +} \ No newline at end of file -- cgit v1.2.1