From 933e63eaac669c9b235a7619e8d9aae058090a77 Mon Sep 17 00:00:00 2001 From: "Suren A. Chilingaryan" Date: Sun, 25 Nov 2012 08:09:41 +0100 Subject: Use kernel asynchronous i/o in seqreader --- CMakeLists.txt | 2 + seqreader.c | 130 +++++++++++++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 125 insertions(+), 7 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index b1adb70..9595514 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -42,4 +42,6 @@ else (USE_UFO_GENERATOR) target_link_libraries(fwbench m fastwriter ${GLIB2_LIBRARIES} ${GTHREAD2_LIBRARIES}) endif (USE_UFO_GENERATOR) +target_link_libraries(seqreader aio) + configure_file(config.h.in ${CMAKE_CURRENT_BINARY_DIR}/config.h) diff --git a/seqreader.c b/seqreader.c index 359f38e..ba4db51 100644 --- a/seqreader.c +++ b/seqreader.c @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -11,16 +12,34 @@ #include #include +#include + #define FASTWRITER_SYNCIO_ALIGN 512 #define SYNC_MODE -#define BUFSIZE 2097152 +#define AIO_MODE 2 +#define EXTRA_BUFFERS 2 +#define WRITE_INTERVAL 1 + +#define RAID_STRIP_SIZE 256 +#define RAID_DISKS 8 +#define STRIPS_AT_ONCE 2 + +#ifdef AIO_MODE +# define SYNC_MODE +#endif /* AIO_MODE */ + #ifdef SYNC_MODE -# define BLOCK_SIZE 2097152 +# define BLOCK_SIZE (1024 * RAID_STRIP_SIZE * RAID_DISKS * STRIPS_AT_ONCE) #else /* SYNC_MODE */ # define BLOCK_SIZE 16384 #endif /* SYNC_MODE */ -#define WRITE_INTERVAL 1 + +#ifdef AIO_MODE +# define BUFSIZE (BLOCK_SIZE * (AIO_MODE + EXTRA_BUFFERS)) +#else /* AIO_MODE */ +# define BUFSIZE BLOCK_SIZE +#endif /* AIO_MODE */ int main(int argc, char *argv[]) { @@ -33,18 +52,34 @@ int main(int argc, char *argv[]) { size_t files = 0; size_t total_size = 0; size_t last_write = 0; + size_t last_size = 0; size_t skip; size_t run; + size_t ready; ssize_t res; size_t max_size = (size_t)-1; char *buffer;//[BUFSIZE]; long double mcoef = 1000000. / (1024 * 1024); int flags = O_RDONLY|O_NOATIME|O_LARGEFILE; +#ifdef AIO_MODE + int i; + size_t curio, schedio; + int done[AIO_MODE + EXTRA_BUFFERS]; + + io_context_t aio; + struct iocb io[AIO_MODE], *ioptr[AIO_MODE]; + + int events; + struct io_event ev[AIO_MODE]; +#endif /* AIO_MODE */ + #ifdef SYNC_MODE flags |= O_DIRECT; #endif - + + printf("Used buffer: %i MB, Block: %i KB\n", BUFSIZE / 1024 / 1024, BLOCK_SIZE/1024); + posix_memalign((void**)&buffer, FASTWRITER_SYNCIO_ALIGN, BUFSIZE); if (argc < 2) { @@ -63,13 +98,87 @@ int main(int argc, char *argv[]) { printf("Unable to open device %s\n", argv[1]); exit(1); } - + size_t size = BLOCK_SIZE; + +#ifdef AIO_MODE + memset(done, 0, sizeof(done)); + memset(&aio, 0, sizeof(aio)); + io_queue_init(AIO_MODE, &aio); + for (i = 0; i < AIO_MODE; i++) { + ioptr[i] = &io[i]; + memset(ioptr[i], 0, sizeof(struct iocb)); + io_prep_pread(ioptr[i], fd, buffer + i * BLOCK_SIZE, BLOCK_SIZE, i * BLOCK_SIZE); + io_set_callback(ioptr[i], (void*)(uintptr_t)i); + } + + curio = 0; + schedio = AIO_MODE; + events = 0; +#endif /* AIO_MODE */ gettimeofday(&start, NULL); - + +#ifdef AIO_MODE + err = io_submit(aio, AIO_MODE, ioptr); + if (err != AIO_MODE) { + printf("io_submit returned %i\n", err); + perror("Failed to submit initial AIO jobs"); + } +#endif /* AIO_MODE */ + +#ifdef AIO_MODE + ready = 0; + while (1) { + if (!done[curio%(AIO_MODE + EXTRA_BUFFERS)]) { + err = io_getevents(aio, 1, AIO_MODE - events, &ev[events], NULL); + if (err < 0) perror("Error waiting for AIO\n"); + + if ((!ready)&&(err > 1)) { + printf("*** Multiple read requests (%i of %i) are finished simultaneously. It is either:\n", err, AIO_MODE); + printf(" Small buffer size (%i KB)\n", BLOCK_SIZE/1024); + printf(" More parallel AIOs (%i) than supported by kernel, try %i\n", AIO_MODE, AIO_MODE - err); + } + + for (i = 0; i < err; i++) { + struct io_event *ep = &ev[events + i]; + int doneio = (uintptr_t)ep->data; + if (ep->res2 || (ep->res != BLOCK_SIZE)) perror("Error in async IO"); + done[doneio%(AIO_MODE + EXTRA_BUFFERS)] = 1; +// printf("done (%i): %i\n", i, doneio); + } + + events += err; + + for (i = events - 1; (i >= 0)&&((schedio - curio) < (AIO_MODE + EXTRA_BUFFERS)); i--) { + struct iocb *newio = (struct iocb *)ev[i].obj; + memset(newio, 0, sizeof(struct iocb)); + io_prep_pread(newio, fd, buffer + (schedio % (AIO_MODE + EXTRA_BUFFERS)) * BLOCK_SIZE, BLOCK_SIZE, schedio * BLOCK_SIZE); + io_set_callback(newio, (void*)(uintptr_t)schedio); + err = io_submit(aio, 1, &newio); + if (err != 1) perror("Failed to submit AIO jobs"); + schedio++; + } + events = i + 1; + + if (events) { + printf("*** Unprocessed events (%i), probably not enough buffer space...\n", events); +// printf(" curio (%zu), schedio (%zu)\n", curio, schedio); + } + + ready = 1; + continue; + } + + done[curio%(AIO_MODE + EXTRA_BUFFERS)] = 0; + curio++; + + res = BLOCK_SIZE; +#else /* AIO_MODE */ res = read(fd, buffer, size); while (res > 0) { +#endif /* AIO_MODE */ + if (res != size) { printf("Incomplete read: %zu bytes read instead of %zu\n", res, size); exit(-1); @@ -79,8 +188,9 @@ int main(int argc, char *argv[]) { gettimeofday(&tv, NULL); us = (tv.tv_sec - start.tv_sec) * 1000000 + (tv.tv_usec - start.tv_usec); if ((us - last_write) > WRITE_INTERVAL * 1000000) { + printf("Reading: %s (%lu GB), Measured speed: %zu MB/s, Current speed: %zu MB/s\n", argv[0], total_size / 1024 / 1024 / 1024, (size_t)(mcoef * total_size / us), (size_t)(mcoef * (total_size - last_size) / (us - last_write))); last_write = us; - printf("Reading: %s (%lu GB), Measured speed: %zu MB/s\n", argv[0], total_size / 1024 / 1024 / 1024, (size_t)(mcoef * total_size / us)); + last_size = total_size; } if (total_size > max_size) { @@ -88,9 +198,15 @@ int main(int argc, char *argv[]) { break; } +#ifndef AIO_MODE res = read(fd, buffer, size); +#endif /* AIO_MODE */ } +#ifdef AIO_MODE + io_queue_release(aio); +#endif /* AIO_MODE */ + close(fd); if (res < 0) { -- cgit v1.2.1