From 5ae22aeedb43629fed30f64d025b9df3cf25cff8 Mon Sep 17 00:00:00 2001 From: "Suren A. Chilingaryan" Date: Mon, 3 Dec 2012 22:09:48 +0100 Subject: AIO support --- CMakeLists.txt | 17 ++++- config.h.in | 3 +- default.c | 231 ++++++++++++++++++++++++++++++++++++++++++++++++--------- 3 files changed, 214 insertions(+), 37 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 7a95a93..496864e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,6 +5,7 @@ set(FASTWRITER_ABI_VERSION "0") cmake_minimum_required(VERSION 2.8) +set(DISABLE_AIO TRUE CACHE BOOL "Use kernel AIO writer") set(DISABLE_XFS_REALTIME FALSE CACHE BOOL "Disable support of RealTime XFS partition") @@ -26,7 +27,16 @@ include_directories( add_definitions("-fPIC --std=c99 -Wall -O2 -pthread") set(HEADERS fastwriter.h sysinfo.h default.h private.h) -add_library(fastwriter SHARED fastwriter.c sysinfo.c default.c) +set(SOURCES fastwriter.c sysinfo.c default.c) + +if (NOT DISABLE_AIO) + check_include_files("libaio.h" HAVE_LIBAIO_H) + if (NOT HAVE_LIBAIO_H) + message(FATAL_ERROR "error: libaio.h is not found...") + endif (NOT HAVE_LIBAIO_H) +endif (NOT DISABLE_AIO) + +add_library(fastwriter SHARED ${SOURCES}) set_target_properties(fastwriter PROPERTIES VERSION ${FASTWRITER_VERSION} @@ -34,6 +44,11 @@ set_target_properties(fastwriter PROPERTIES LINK_FLAGS "-pthread" ) +if (NOT DISABLE_AIO) + target_link_libraries(fastwriter aio) +endif (NOT DISABLE_AIO) + + set(TARNAME "fastwriter") set(PACKAGE_VERSION ${FASTWRITER_VERSION}) set(PACKAGE_NAME "${TARNAME}") diff --git a/config.h.in b/config.h.in index 3627160..9e02a42 100644 --- a/config.h.in +++ b/config.h.in @@ -1,2 +1,3 @@ #cmakedefine HAVE_LINUX_FALLOC_H -#cmakedefine DISABLE_XFS_REALTIME \ No newline at end of file +#cmakedefine DISABLE_XFS_REALTIME +#cmakedefine DISABLE_AIO diff --git a/default.c b/default.c index 5109608..71c6ab1 100644 --- a/default.c +++ b/default.c @@ -31,6 +31,7 @@ # include #endif /* !DISABLE_XFS_REALTIME */ + #include "fastwriter.h" #include "private.h" #include "sysinfo.h" @@ -40,18 +41,57 @@ #define HAVE_FALLOCATE #define EXT4_WRITEBLOCK 4194304 #define EXT4_PREALLOCATE 1073741824 +#define OCFS_WRITEBLOCK 262144 +#define AIO_QUEUE_LENGTH 4 +#define AIO_BUFFERS 8 + +#ifndef DISABLE_AIO +# include +# if AIO_QUEUE_LENGTH > AIO_BUFFERS +# error "AIO_QUEUE_LENGTH > AIO_BUFFERS" +# endif +#endif /* DISABLE_AIO */ + + +#ifndef DISABLE_AIO +typedef struct { + size_t offset; + size_t size; + int ios; + int ready; /**< 0 - unused, 1 - processing, 2 - done */ +} fastwriter_data_t; +#endif /* !DISABLE_AIO */ typedef struct { int fd; - int sync_mode; + int sync_mode; /**< Open with O_DIRECT flag to avoid caches */ + int aio_mode; /**< Use kernel AIO (libaio.h) */ size_t prior_size; /**< original size of file */ size_t preallocated; /**< preallocated bytes */ size_t wr_block; /**< minimal block of data to write */ size_t pa_block; /**< preallocation setp */ + +#ifndef DISABLE_AIO + io_context_t aio; + + int ios_ready_n; + int ios_ready[AIO_QUEUE_LENGTH]; + struct iocb ios[AIO_QUEUE_LENGTH]; + + int data_head, data_tail; + fastwriter_data_t data[AIO_BUFFERS]; + + int ios_status[AIO_QUEUE_LENGTH]; + + size_t sched; /**< how far we ahead of currently writted head */ + size_t fd_offset; /**< current file offset */ + + int page_size; +#endif /* !DISABLE_AIO */ } fastwriter_default_t; @@ -89,9 +129,6 @@ int fastwriter_default_open(fastwriter_t *fw, const char *name, fastwriter_flags ctx->wr_block = EXT4_WRITEBLOCK; ctx->pa_block = 0; ctx->prior_size = (size_t)-1; -#ifdef SYNC_MODE -// ctx->sync_mode = 0; -#endif /* SYNC_MODE */ open_flags &= ~(O_CREAT|O_NOATIME|O_LARGEFILE); } else if (!strcmp(fs, "ext4")) { ctx->wr_block = EXT4_WRITEBLOCK; @@ -102,22 +139,28 @@ int fastwriter_default_open(fastwriter_t *fw, const char *name, fastwriter_flags } else if (!strcmp(fs, "xfs")) { ctx->wr_block = EXT4_WRITEBLOCK; ctx->pa_block = EXT4_PREALLOCATE; + } else if (!strcmp(fs, "ocfs2")) { +#ifndef DISABLE_AIO + ctx->aio_mode = 1; + ctx->sync_mode = 0; + ctx->wr_block = OCFS_WRITEBLOCK; +#else /* !DISABLE_AIO */ + ctx->wr_block = EXT4_WRITEBLOCK; +#endif /* !DISABLE_AIO */ + ctx->pa_block = EXT4_PREALLOCATE; } else { ctx->wr_block = EXT4_WRITEBLOCK; ctx->pa_block = 0; } - -#ifdef SYNC_MODE + if (ctx->sync_mode) { open_flags |= O_DIRECT; } -#endif /* SYNC_MODE */ if (flags&FASTWRITER_FLAGS_OVERWRITE) open_flags |= O_TRUNC; ctx->fd = open(name, open_flags, open_mode); -#ifdef SYNC_MODE if (ctx->fd < 0) { // Running as normal user, try to disable direct mode if ((errno == EINVAL)&&(ctx->sync_mode)) { @@ -125,26 +168,23 @@ int fastwriter_default_open(fastwriter_t *fw, const char *name, fastwriter_flags open_flags &= ~O_DIRECT; ctx->fd = open(name, open_flags, open_mode); } -#endif /* SYNC_MODE */ if (ctx->fd < 0) return errno; -#ifdef SYNC_MODE } -#endif /* SYNC_MODE */ if (((open_flags&FASTWRITER_FLAGS_OVERWRITE)==0)&&(strcmp(fs, "raw"))) { - ctx->prior_size = lseek(ctx->fd, 0, SEEK_END); -# ifdef SYNC_MODE + ctx->prior_size = lseek64(ctx->fd, 0, SEEK_END); + if (ctx->prior_size%FASTWRITER_SYNCIO_ALIGN) { close(ctx->fd); ctx->fd = open(name, open_flags&~O_DIRECT, open_mode); if (ctx->fd < 0) return errno; - ctx->prior_size = lseek(ctx->fd, 0, SEEK_END); + ctx->prior_size = lseek64(ctx->fd, 0, SEEK_END); ctx->sync_mode = 0; + ctx->aio_mode = 0; } -# endif /* SYNC_MODE */ } #ifndef DISABLE_XFS_REALTIME @@ -153,11 +193,30 @@ int fastwriter_default_open(fastwriter_t *fw, const char *name, fastwriter_flags if (!err) { attr.fsx_xflags |= XFS_XFLAG_REALTIME; err = xfsctl (name, ctx->fd, XFS_IOC_FSSETXATTR, (void *) &attr); -// if (!err) puts("Real-time"); + if (err) fprintf(stderr, "Error initializing XFS real-time mode (%i), disabling...\n", err); } } #endif /* !DISABLE_XFS_REALTIME */ +#ifndef DISABLE_AIO + if (ctx->aio_mode) { + int i; + ctx->page_size = getpagesize(); + ctx->fd_offset = ctx->prior_size; + + ctx->ios_ready_n = AIO_QUEUE_LENGTH; + for (i = 0; i < AIO_QUEUE_LENGTH; i++) { + ctx->ios_ready[i] = i; + } + + err = io_queue_init(AIO_QUEUE_LENGTH, &ctx->aio); + if (err) { + fprintf(stderr, "Error initializing AIO mode (%i), disabling...\n", -err); + ctx->aio_mode = 0; + } + } +#endif /* !DISABLE_AIO */ + ctx->preallocated = 0; return 0; @@ -169,11 +228,31 @@ void fastwriter_default_close(fastwriter_t *fw) { fastwriter_default_t *ctx = (fastwriter_default_t*)fw->ctx; if (ctx->fd >= 0) { -#if defined(SYNC_MODE)||!defined(HAVE_LINUX_FALLOC_H) +#ifndef DISABLE_AIO + if ((ctx->aio_mode)&&(ctx->aio)) { + int n_ev; + struct io_event ev[AIO_QUEUE_LENGTH]; + + while (ctx->ios_ready_n < AIO_QUEUE_LENGTH) { + n_ev = io_getevents(ctx->aio, 1, AIO_QUEUE_LENGTH, &ev[0], NULL); + if (n_ev <= 0) { + fprintf(stderr, "AIO io_getevents have failed (%i)", -n_ev); + break; + } + ctx->ios_ready_n += n_ev; + } + + io_queue_release(ctx->aio); + } +#endif /* DISABLE_AIO */ + +#ifdef HAVE_LINUX_FALLOC_H if (ctx->prior_size != (size_t)-1) { +#else /* HAVE_LINUX_FALLOC_H */ + if ((ctx->prior_size != (size_t)-1)&&((ctx->sync_mode)||(ctx->aio_mode))) { +#endif /* HAVE_LINUX_FALLOC_H */ ftruncate(ctx->fd, ctx->prior_size + fw->written); } -#endif close(ctx->fd); } @@ -194,10 +273,9 @@ int fastwriter_default_write(fastwriter_t *fw, fastwriter_write_flags_t flags, s *written = 0; return 0; } - + size -= size % ctx->wr_block; } - if ((ctx->pa_block)&&((fw->written + size) > ctx->preallocated)) { #ifdef HAVE_LINUX_FALLOC_H @@ -211,27 +289,110 @@ int fastwriter_default_write(fastwriter_t *fw, fastwriter_write_flags_t flags, s } } -#ifdef SYNC_MODE // we expect this to happen only at last iteration (buffer is multiply of the required align) - if ((ctx->sync_mode)&&(size%FASTWRITER_SYNCIO_ALIGN)) { + if (((ctx->aio_mode)||(ctx->sync_mode))&&(size%FASTWRITER_SYNCIO_ALIGN)) { delta = FASTWRITER_SYNCIO_ALIGN - size%FASTWRITER_SYNCIO_ALIGN; } -#endif /* SYNC_MODE */ - do { - res = write(ctx->fd, data + sum, size + delta - sum); - if (res < 0) { - *written = sum; - return errno; - } +#ifndef DISABLE_AIO + if (ctx->aio_mode) { + int err; + size_t done = 0; + size_t sched = 0; + + fastwriter_data_t *iodata; + struct iocb *newio; + size_t wr_block = ctx->wr_block; + + do { + if (!ctx->ios_ready_n) { + int i, n_ev; + struct io_event ev[AIO_QUEUE_LENGTH]; + + n_ev = io_getevents(ctx->aio, 1, AIO_QUEUE_LENGTH, &ev[0], NULL); + if (n_ev <= 0) { + fprintf(stderr, "AIO io_getevents have failed (%i)", -n_ev); + return -n_ev; + } + + for (i = 0; i < n_ev; i++) { + fastwriter_data_t *ev_data = (fastwriter_data_t *)(ev[i].data); + if ((ev[i].res2)||(ev[i].res < ev_data->size)) { + fprintf(stderr, "AIO write failed (res: %li, res2: %li, expected: %zu), no handling data will be corrupted...\n", ev[i].res, ev[i].res2, ev_data->size); + return -ev[i].res2; + } + + ctx->ios_ready[ctx->ios_ready_n++] = ev_data->ios; +// printf("Data: %i (ios %i)\n", ev_data->ready, ev_data->ios); + ev_data->ready = 2; + } + + while (ctx->data[ctx->data_tail].ready > 1) { +// printf("Done: %i %zu\n", ctx->data_tail, ctx->data[ctx->data_tail].offset); + ctx->data[ctx->data_tail].ready = 0; + + done += ctx->data[ctx->data_tail].size; + if ((++ctx->data_tail) == AIO_BUFFERS) ctx->data_tail = 0; + } + } + + if ((ctx->sched + sched) < size) { + if ((ctx->data_head == ctx->data_tail)&&(ctx->data[ctx->data_head].ready)) continue; + + newio = (struct iocb*)&ctx->ios[ctx->ios_ready[--ctx->ios_ready_n]]; + iodata = &ctx->data[ctx->data_head]; + + if (wr_block > ((size + delta) - (ctx->sched + sched))) { + wr_block = (size + delta) - (ctx->sched + sched); + if (wr_block % ctx->page_size) { + fprintf(stderr, "We need to write incomplete page (%zu bytes). This is no supported yet...\n", wr_block); + return -1; + } + } + +// printf("Sched: %lu => %lu (%lu) [tail %lu, head %lu]\n", ctx->sched + sched, ctx->fd_offset, wr_block, ctx->data_tail, ctx->data_head); + + iodata->offset = ctx->fd_offset; + iodata->size = wr_block; + iodata->ios = ctx->ios_ready_n; + + io_prep_pwrite(newio, ctx->fd, data + ctx->sched + sched, wr_block, ctx->fd_offset); + io_set_callback(newio, (void*)iodata); + err = io_submit(ctx->aio, 1, &newio); + if (err != 1) { + fprintf(stderr, "Error submiting AIO job (%i)\n", -err); + return -err; + } + + iodata->ready = 1; + sched += wr_block; + ctx->fd_offset += wr_block; + if ((++ctx->data_head) == AIO_BUFFERS) ctx->data_head = 0; + } + } while (!done); + + ctx->sched += sched - done; + size = done; + } else { +#endif /* !DISABLE_AIO */ + do { + res = write(ctx->fd, data + sum, size + delta - sum); + if (res < 0) { + *written = sum; + return errno; + } - sum += res; - } while (sum < size); + sum += res; + } while (sum < size); +#ifndef DISABLE_AIO + } +#endif /* !DISABLE_AIO */ + + if ((ctx->sync_mode)||(ctx->aio_mode)) { + posix_fadvise(ctx->fd, fw->written, size, POSIX_FADV_DONTNEED); + } -#ifdef SYNC_MODE - posix_fadvise(ctx->fd, fw->written, size, POSIX_FADV_DONTNEED); -#endif /* SYNC_MODE */ - *written = size; + return 0; } -- cgit v1.2.1