summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSuren A. Chilingaryan <csa@dside.dyndns.org>2012-11-25 08:09:41 +0100
committerSuren A. Chilingaryan <csa@dside.dyndns.org>2012-11-25 08:09:41 +0100
commit933e63eaac669c9b235a7619e8d9aae058090a77 (patch)
tree445737b1f2070792a311dfb23939e600b48f248c
parent28673c9220debee363a21cc7ecbcb87e2eaa49da (diff)
downloadfwbench-933e63eaac669c9b235a7619e8d9aae058090a77.tar.gz
fwbench-933e63eaac669c9b235a7619e8d9aae058090a77.tar.bz2
fwbench-933e63eaac669c9b235a7619e8d9aae058090a77.tar.xz
fwbench-933e63eaac669c9b235a7619e8d9aae058090a77.zip
Use kernel asynchronous i/o in seqreader
-rw-r--r--CMakeLists.txt2
-rw-r--r--seqreader.c130
2 files changed, 125 insertions, 7 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index b1adb70..9595514 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -42,4 +42,6 @@ else (USE_UFO_GENERATOR)
target_link_libraries(fwbench m fastwriter ${GLIB2_LIBRARIES} ${GTHREAD2_LIBRARIES})
endif (USE_UFO_GENERATOR)
+target_link_libraries(seqreader aio)
+
configure_file(config.h.in ${CMAKE_CURRENT_BINARY_DIR}/config.h)
diff --git a/seqreader.c b/seqreader.c
index 359f38e..ba4db51 100644
--- a/seqreader.c
+++ b/seqreader.c
@@ -2,6 +2,7 @@
#include <stdio.h>
#include <stdlib.h>
+#include <stdint.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/time.h>
@@ -11,16 +12,34 @@
#include <string.h>
#include <errno.h>
+#include <libaio.h>
+
#define FASTWRITER_SYNCIO_ALIGN 512
#define SYNC_MODE
-#define BUFSIZE 2097152
+#define AIO_MODE 2
+#define EXTRA_BUFFERS 2
+#define WRITE_INTERVAL 1
+
+#define RAID_STRIP_SIZE 256
+#define RAID_DISKS 8
+#define STRIPS_AT_ONCE 2
+
+#ifdef AIO_MODE
+# define SYNC_MODE
+#endif /* AIO_MODE */
+
#ifdef SYNC_MODE
-# define BLOCK_SIZE 2097152
+# define BLOCK_SIZE (1024 * RAID_STRIP_SIZE * RAID_DISKS * STRIPS_AT_ONCE)
#else /* SYNC_MODE */
# define BLOCK_SIZE 16384
#endif /* SYNC_MODE */
-#define WRITE_INTERVAL 1
+
+#ifdef AIO_MODE
+# define BUFSIZE (BLOCK_SIZE * (AIO_MODE + EXTRA_BUFFERS))
+#else /* AIO_MODE */
+# define BUFSIZE BLOCK_SIZE
+#endif /* AIO_MODE */
int main(int argc, char *argv[]) {
@@ -33,18 +52,34 @@ int main(int argc, char *argv[]) {
size_t files = 0;
size_t total_size = 0;
size_t last_write = 0;
+ size_t last_size = 0;
size_t skip;
size_t run;
+ size_t ready;
ssize_t res;
size_t max_size = (size_t)-1;
char *buffer;//[BUFSIZE];
long double mcoef = 1000000. / (1024 * 1024);
int flags = O_RDONLY|O_NOATIME|O_LARGEFILE;
+#ifdef AIO_MODE
+ int i;
+ size_t curio, schedio;
+ int done[AIO_MODE + EXTRA_BUFFERS];
+
+ io_context_t aio;
+ struct iocb io[AIO_MODE], *ioptr[AIO_MODE];
+
+ int events;
+ struct io_event ev[AIO_MODE];
+#endif /* AIO_MODE */
+
#ifdef SYNC_MODE
flags |= O_DIRECT;
#endif
-
+
+ printf("Used buffer: %i MB, Block: %i KB\n", BUFSIZE / 1024 / 1024, BLOCK_SIZE/1024);
+
posix_memalign((void**)&buffer, FASTWRITER_SYNCIO_ALIGN, BUFSIZE);
if (argc < 2) {
@@ -63,13 +98,87 @@ int main(int argc, char *argv[]) {
printf("Unable to open device %s\n", argv[1]);
exit(1);
}
-
+
size_t size = BLOCK_SIZE;
+
+#ifdef AIO_MODE
+ memset(done, 0, sizeof(done));
+ memset(&aio, 0, sizeof(aio));
+ io_queue_init(AIO_MODE, &aio);
+ for (i = 0; i < AIO_MODE; i++) {
+ ioptr[i] = &io[i];
+ memset(ioptr[i], 0, sizeof(struct iocb));
+ io_prep_pread(ioptr[i], fd, buffer + i * BLOCK_SIZE, BLOCK_SIZE, i * BLOCK_SIZE);
+ io_set_callback(ioptr[i], (void*)(uintptr_t)i);
+ }
+
+ curio = 0;
+ schedio = AIO_MODE;
+ events = 0;
+#endif /* AIO_MODE */
gettimeofday(&start, NULL);
-
+
+#ifdef AIO_MODE
+ err = io_submit(aio, AIO_MODE, ioptr);
+ if (err != AIO_MODE) {
+ printf("io_submit returned %i\n", err);
+ perror("Failed to submit initial AIO jobs");
+ }
+#endif /* AIO_MODE */
+
+#ifdef AIO_MODE
+ ready = 0;
+ while (1) {
+ if (!done[curio%(AIO_MODE + EXTRA_BUFFERS)]) {
+ err = io_getevents(aio, 1, AIO_MODE - events, &ev[events], NULL);
+ if (err < 0) perror("Error waiting for AIO\n");
+
+ if ((!ready)&&(err > 1)) {
+ printf("*** Multiple read requests (%i of %i) are finished simultaneously. It is either:\n", err, AIO_MODE);
+ printf(" Small buffer size (%i KB)\n", BLOCK_SIZE/1024);
+ printf(" More parallel AIOs (%i) than supported by kernel, try %i\n", AIO_MODE, AIO_MODE - err);
+ }
+
+ for (i = 0; i < err; i++) {
+ struct io_event *ep = &ev[events + i];
+ int doneio = (uintptr_t)ep->data;
+ if (ep->res2 || (ep->res != BLOCK_SIZE)) perror("Error in async IO");
+ done[doneio%(AIO_MODE + EXTRA_BUFFERS)] = 1;
+// printf("done (%i): %i\n", i, doneio);
+ }
+
+ events += err;
+
+ for (i = events - 1; (i >= 0)&&((schedio - curio) < (AIO_MODE + EXTRA_BUFFERS)); i--) {
+ struct iocb *newio = (struct iocb *)ev[i].obj;
+ memset(newio, 0, sizeof(struct iocb));
+ io_prep_pread(newio, fd, buffer + (schedio % (AIO_MODE + EXTRA_BUFFERS)) * BLOCK_SIZE, BLOCK_SIZE, schedio * BLOCK_SIZE);
+ io_set_callback(newio, (void*)(uintptr_t)schedio);
+ err = io_submit(aio, 1, &newio);
+ if (err != 1) perror("Failed to submit AIO jobs");
+ schedio++;
+ }
+ events = i + 1;
+
+ if (events) {
+ printf("*** Unprocessed events (%i), probably not enough buffer space...\n", events);
+// printf(" curio (%zu), schedio (%zu)\n", curio, schedio);
+ }
+
+ ready = 1;
+ continue;
+ }
+
+ done[curio%(AIO_MODE + EXTRA_BUFFERS)] = 0;
+ curio++;
+
+ res = BLOCK_SIZE;
+#else /* AIO_MODE */
res = read(fd, buffer, size);
while (res > 0) {
+#endif /* AIO_MODE */
+
if (res != size) {
printf("Incomplete read: %zu bytes read instead of %zu\n", res, size);
exit(-1);
@@ -79,8 +188,9 @@ int main(int argc, char *argv[]) {
gettimeofday(&tv, NULL);
us = (tv.tv_sec - start.tv_sec) * 1000000 + (tv.tv_usec - start.tv_usec);
if ((us - last_write) > WRITE_INTERVAL * 1000000) {
+ printf("Reading: %s (%lu GB), Measured speed: %zu MB/s, Current speed: %zu MB/s\n", argv[0], total_size / 1024 / 1024 / 1024, (size_t)(mcoef * total_size / us), (size_t)(mcoef * (total_size - last_size) / (us - last_write)));
last_write = us;
- printf("Reading: %s (%lu GB), Measured speed: %zu MB/s\n", argv[0], total_size / 1024 / 1024 / 1024, (size_t)(mcoef * total_size / us));
+ last_size = total_size;
}
if (total_size > max_size) {
@@ -88,9 +198,15 @@ int main(int argc, char *argv[]) {
break;
}
+#ifndef AIO_MODE
res = read(fd, buffer, size);
+#endif /* AIO_MODE */
}
+#ifdef AIO_MODE
+ io_queue_release(aio);
+#endif /* AIO_MODE */
+
close(fd);
if (res < 0) {