#define _GNU_SOURCE #include #include #include #include #include #include #include #include #include #include #include #include #define FASTWRITER_SYNCIO_ALIGN 4096 #define SYNC_MODE #define AIO_MODE 2 #define EXTRA_BUFFERS 2 #define WRITE_INTERVAL 1 size_t SKIP = 1; size_t SEGMENT = 0; size_t LINE = 0; #define RAID_STRIP_SIZE 256 #define RAID_DISKS 8 #define STRIPS_AT_ONCE 2 #define MIN_BLOCK_SIZE (1024 * RAID_STRIP_SIZE) #define BLOCK_SIZE (1024 * RAID_STRIP_SIZE * RAID_DISKS * STRIPS_AT_ONCE) #define BUFSIZE (BLOCK_SIZE * (AIO_MODE + EXTRA_BUFFERS)) #ifdef AIO_MODE # define SYNC_MODE #endif /* AIO_MODE */ DIR *dir; struct dirent *ent; int flags = O_RDONLY|O_NOATIME|O_LARGEFILE|O_DIRECT; size_t run = 0; // file reading order size_t skip = 0; // file reading order int fd = -1; // we are currently scheduling reads from this file size_t offset = 0; // we are schedulling reads at this offset size_t size; // size of block we are schedulling to read size_t file_size; // total size of current file size_t file_block_size; // access unit for current file size_t curf = 0, schedf = 0; size_t curio = 0, schedio = 0; #define MAXLEN 128 int done[AIO_MODE + EXTRA_BUFFERS]; int done_finish[AIO_MODE + EXTRA_BUFFERS]; char done_file[AIO_MODE + EXTRA_BUFFERS][MAXLEN + 1]; size_t done_offset[AIO_MODE + EXTRA_BUFFERS]; size_t done_size[AIO_MODE + EXTRA_BUFFERS]; int done_fd[AIO_MODE + EXTRA_BUFFERS]; #define buf(io) (io%(AIO_MODE + EXTRA_BUFFERS)) #define max(a,b) (((a)>(b))?(a):(b)) #define min(a,b) (((a)<(b))?(a):(b)) #define page(size) (((size/4096)+(size%4096?1:0))*4096) int next_file() { size_t size; struct stat st; next: while ((ent = readdir(dir))) { skip += 1; if ((((skip) - 1)%SKIP) != run) continue; if (stat(ent->d_name, &st)) continue; if (!S_ISREG(st.st_mode)) continue; break; } if (ent) { size = st.st_blksize; int fd = open(ent->d_name, flags, 0); if (fd < 0) goto next; if (size < MIN_BLOCK_SIZE) size = BLOCK_SIZE; if (size > BUFSIZE) { printf("Buffer too small\n"); exit(1); } file_block_size = size; file_size = st.st_size; offset = 0; return fd; } else { skip = 0; run += 1; if (run < SKIP) { closedir(dir); dir = opendir("."); goto next; } } return -1; } int next_block() { int io; size_t next_offset; if (fd >= 0) { if (LINE) offset += SEGMENT; else offset += size; if (offset >= file_size) { fd = -1; } } if (fd < 0) { fd = next_file(); if (fd < 0) return -1; // printf("open ===> %s (%i)\n", ent->d_name, fd); } if (LINE) size = LINE; else size = file_block_size; if ((offset + size) > file_size) size = file_size - offset; io = buf(schedio); strncpy(done_file[io], ent->d_name, MAXLEN); done_offset[io] = offset; done_size[io] = size; done_fd[io] = fd; done[io] = 0; if (LINE) next_offset = offset + SEGMENT; else next_offset = offset + size; if (next_offset >= file_size) done_finish[io] = 1; else done_finish[io] = 0; return 0; } int main(int argc, char *argv[]) { int err; int i, n, io; long double mcoef = 1000000. / (1024 * 1024); io_context_t aio; struct iocb ios[AIO_MODE], *ioptr[AIO_MODE]; int events; struct io_event ev[AIO_MODE]; int ready; void *buffer; struct timeval start, fstart, tv; if (argc < 2) { printf("Usage: %s [skip] [segment] [line]\n", argv[0]); exit(0); } chdir(argv[1]); dir = opendir("."); if (argc > 2) SKIP = atoi(argv[2]); if (argc > 3) SEGMENT = atoi(argv[3]); if (argc > 4) LINE = atoi(argv[4]); if (!SKIP) SKIP = 1; printf("%s: Skip %zu, Segment %zu, Line %zu\n", argv[1], SKIP, SEGMENT, LINE); posix_memalign((void**)&buffer, FASTWRITER_SYNCIO_ALIGN, BUFSIZE); memset(done, 0, sizeof(done)); memset(&aio, 0, sizeof(aio)); io_queue_init(AIO_MODE, &aio); for (schedio = 0; schedio < AIO_MODE; schedio++) { ioptr[schedio] = &ios[schedio]; memset(ioptr[schedio], 0, sizeof(struct iocb)); err = next_block(); if (err) break; io_prep_pread(ioptr[schedio], fd, buffer + schedio * BLOCK_SIZE, page(size), offset); io_set_callback(ioptr[schedio], (void*)(uintptr_t)schedio); // printf("sched %zu: %zu (%zu %zu)\n", schedio, schedio, offset, size); } size_t us, fileus; size_t last_write = 0; // size_t last_file_write = 0; // size_t last_file_size = 0; size_t cur_file_size = 0; size_t total_size = 0; size_t files = 0; gettimeofday(&start, NULL); gettimeofday(&fstart, NULL); n = io_submit(aio, schedio, ioptr); if (n != schedio) { printf("Failed to submit initial AIO job, io_submit returned %i\n", err); exit(-1); } curio = 0; events = 0; ready = 0; while ((err >= 0)||(curio != schedio)) { io = buf(curio); if (!done[io]) { // printf("%i,%i - %i [%i %i %i %i]\n", curio, schedio, events, done[0], done[1], done[2], done[3]); if (curio < schedio) { n = io_getevents(aio, 1, AIO_MODE + EXTRA_BUFFERS - events, &ev[events], NULL); if (n < 0) { printf("Error waiting for AIO (%i)\n", -err); exit(-1); } } else { n = 0; } if ((!ready)&&(n > 1)) { printf("*** Multiple read requests (%i of %i) are finished simultaneously. It is either:\n", err, AIO_MODE); printf(" Small buffer size (%i KB)\n", BLOCK_SIZE/1024); printf(" More parallel AIOs (%i) than supported by kernel, try %i\n", AIO_MODE, AIO_MODE - err); } for (i = 0; i < n; i++) { struct io_event *ep = &ev[events + i]; int doneio = (uintptr_t)ep->data; io = buf(doneio); // printf("done %i: %lu %zu %zi\n", doneio, ep->res2, done_size[io], ep->res); if (ep->res2 || (ep->res < done_size[io])) { printf("Error in async IO (ret: %li, ret size: %zi, expected %zu)\n", ep->res2, ep->res, page(done_size[io])); exit(-1); } done[io] = 1; // printf("done (%i): %i\n", i, doneio); } events += n; for (i = events - 1; (i >= 0)&&((schedio - curio) < (AIO_MODE + EXTRA_BUFFERS)); i--) { err = next_block(); if (err) break; // printf("sched %i: %zu (%i %zu %zu)\n", i, schedio, fd, offset, size); struct iocb *newio = (struct iocb *)ev[i].obj; memset(newio, 0, sizeof(struct iocb)); io_prep_pread(newio, fd, buffer + buf(schedio) * BLOCK_SIZE, page(size), offset); io_set_callback(newio, (void*)(uintptr_t)schedio); err = io_submit(aio, 1, &newio); if (err != 1) { printf("Failed to submit AIO jobs %i\n", err); exit(-1); } schedio++; } events = i + 1; if ((events)&&(!err)) { printf("*** Unprocessed events (%i), probably not enough buffer space...\n", events); // printf(" curio (%zu), schedio (%zu)\n", curio, schedio); } ready = 1; continue; } io = buf(curio); cur_file_size += done_size[io]; total_size += done_size[io]; gettimeofday(&tv, NULL); us = (tv.tv_sec - start.tv_sec) * 1000000 + (tv.tv_usec - start.tv_usec); fileus = (tv.tv_sec - fstart.tv_sec) * 1000000 + (tv.tv_usec - fstart.tv_usec); if (done_finish[io]) { // printf("closing %i ===> %s (%i)\n", io, done_file[io], done_fd[io]); close(done_fd[io]); gettimeofday(&fstart, NULL); files++; } if ((us - last_write) > WRITE_INTERVAL * 1000000) { last_write = us; printf("Read: %lu files (%lu GB) at %zu MB/s", files, total_size / 1024 / 1024 / 1024, (size_t)(mcoef * total_size / us)); if (1) {//cur_file_size > BLOCK_SIZE) { printf(", Last: %s (%lu MB) at %zu MB/s\n", done_file[io], cur_file_size / 1024 / 1024, (size_t)(mcoef * cur_file_size / fileus)); } else { printf("\n"); } } if (done_finish[io]) { cur_file_size = 0; } done[io] = 0; curio++; } gettimeofday(&tv, NULL); us = (tv.tv_sec - start.tv_sec) * 1000000 + (tv.tv_usec - start.tv_usec); printf("Total: %lu files (%lu GB) at %zu MB/s\n", files, total_size / 1024 / 1024 / 1024, (size_t)(mcoef * total_size / us)); free(buffer); closedir(dir); }