17
#define FASTWRITER_SYNCIO_ALIGN 4096
21
#define EXTRA_BUFFERS 2
22
#define WRITE_INTERVAL 1
28
#define RAID_STRIP_SIZE 256
30
#define STRIPS_AT_ONCE 2
32
#define MIN_BLOCK_SIZE (1024 * RAID_STRIP_SIZE)
33
#define BLOCK_SIZE (1024 * RAID_STRIP_SIZE * RAID_DISKS * STRIPS_AT_ONCE)
34
#define BUFSIZE (BLOCK_SIZE * (AIO_MODE + EXTRA_BUFFERS))
43
int flags = O_RDONLY|O_NOATIME|O_LARGEFILE|O_DIRECT;
45
size_t run = 0; // file reading order
46
size_t skip = 0; // file reading order
48
int fd = -1; // we are currently scheduling reads from this file
49
size_t offset = 0; // we are schedulling reads at this offset
50
size_t size; // size of block we are schedulling to read
51
size_t file_size; // total size of current file
52
size_t file_block_size; // access unit for current file
54
size_t curf = 0, schedf = 0;
55
size_t curio = 0, schedio = 0;
58
int done[AIO_MODE + EXTRA_BUFFERS];
59
int done_finish[AIO_MODE + EXTRA_BUFFERS];
60
char done_file[AIO_MODE + EXTRA_BUFFERS][MAXLEN + 1];
61
size_t done_offset[AIO_MODE + EXTRA_BUFFERS];
62
size_t done_size[AIO_MODE + EXTRA_BUFFERS];
63
int done_fd[AIO_MODE + EXTRA_BUFFERS];
65
#define buf(io) (io%(AIO_MODE + EXTRA_BUFFERS))
66
#define max(a,b) (((a)>(b))?(a):(b))
67
#define min(a,b) (((a)<(b))?(a):(b))
68
#define page(size) (((size/4096)+(size%4096?1:0))*4096)
76
while ((ent = readdir(dir))) {
78
if ((((skip) - 1)%SKIP) != run) continue;
79
if (stat(ent->d_name, &st)) continue;
80
if (!S_ISREG(st.st_mode)) continue;
87
int fd = open(ent->d_name, flags, 0);
88
if (fd < 0) goto next;
90
if (size < MIN_BLOCK_SIZE) size = BLOCK_SIZE;
93
printf("Buffer too small\n");
97
file_block_size = size;
98
file_size = st.st_size;
121
if (LINE) offset += SEGMENT;
124
if (offset >= file_size) {
131
if (fd < 0) return -1;
132
// printf("open ===> %s (%i)\n", ent->d_name, fd);
135
if (LINE) size = LINE;
136
else size = file_block_size;
138
if ((offset + size) > file_size) size = file_size - offset;
141
strncpy(done_file[io], ent->d_name, MAXLEN);
142
done_offset[io] = offset;
143
done_size[io] = size;
147
if (LINE) next_offset = offset + SEGMENT;
148
else next_offset = offset + size;
150
if (next_offset >= file_size) done_finish[io] = 1;
151
else done_finish[io] = 0;
158
int main(int argc, char *argv[]) {
161
long double mcoef = 1000000. / (1024 * 1024);
164
struct iocb ios[AIO_MODE], *ioptr[AIO_MODE];
167
struct io_event ev[AIO_MODE];
171
struct timeval start, fstart, tv;
174
printf("Usage: %s <directory> [skip] [segment] [line]\n", argv[0]);
181
if (argc > 2) SKIP = atoi(argv[2]);
182
if (argc > 3) SEGMENT = atoi(argv[3]);
183
if (argc > 4) LINE = atoi(argv[4]);
187
printf("%s: Skip %zu, Segment %zu, Line %zu\n", argv[1], SKIP, SEGMENT, LINE);
189
posix_memalign((void**)&buffer, FASTWRITER_SYNCIO_ALIGN, BUFSIZE);
191
memset(done, 0, sizeof(done));
192
memset(&aio, 0, sizeof(aio));
193
io_queue_init(AIO_MODE, &aio);
195
for (schedio = 0; schedio < AIO_MODE; schedio++) {
196
ioptr[schedio] = &ios[schedio];
197
memset(ioptr[schedio], 0, sizeof(struct iocb));
202
io_prep_pread(ioptr[schedio], fd, buffer + schedio * BLOCK_SIZE, page(size), offset);
203
io_set_callback(ioptr[schedio], (void*)(uintptr_t)schedio);
204
// printf("sched %zu: %zu (%zu %zu)\n", schedio, schedio, offset, size);
208
size_t last_write = 0;
209
// size_t last_file_write = 0;
210
// size_t last_file_size = 0;
211
size_t cur_file_size = 0;
212
size_t total_size = 0;
215
gettimeofday(&start, NULL);
216
gettimeofday(&fstart, NULL);
218
n = io_submit(aio, schedio, ioptr);
220
printf("Failed to submit initial AIO job, io_submit returned %i\n", err);
230
while ((err >= 0)||(curio != schedio)) {
234
// printf("%i,%i - %i [%i %i %i %i]\n", curio, schedio, events, done[0], done[1], done[2], done[3]);
236
if (curio < schedio) {
237
n = io_getevents(aio, 1, AIO_MODE + EXTRA_BUFFERS - events, &ev[events], NULL);
239
printf("Error waiting for AIO (%i)\n", -err);
246
if ((!ready)&&(n > 1)) {
247
printf("*** Multiple read requests (%i of %i) are finished simultaneously. It is either:\n", err, AIO_MODE);
248
printf(" Small buffer size (%i KB)\n", BLOCK_SIZE/1024);
249
printf(" More parallel AIOs (%i) than supported by kernel, try %i\n", AIO_MODE, AIO_MODE - err);
252
for (i = 0; i < n; i++) {
253
struct io_event *ep = &ev[events + i];
254
int doneio = (uintptr_t)ep->data;
256
// printf("done %i: %lu %zu %zi\n", doneio, ep->res2, done_size[io], ep->res);
257
if (ep->res2 || (ep->res < done_size[io])) {
258
printf("Error in async IO (ret: %li, ret size: %zi, expected %zu)\n", ep->res2, ep->res, page(done_size[io]));
262
// printf("done (%i): %i\n", i, doneio);
267
for (i = events - 1; (i >= 0)&&((schedio - curio) < (AIO_MODE + EXTRA_BUFFERS)); i--) {
271
// printf("sched %i: %zu (%i %zu %zu)\n", i, schedio, fd, offset, size);
272
struct iocb *newio = (struct iocb *)ev[i].obj;
273
memset(newio, 0, sizeof(struct iocb));
274
io_prep_pread(newio, fd, buffer + buf(schedio) * BLOCK_SIZE, page(size), offset);
275
io_set_callback(newio, (void*)(uintptr_t)schedio);
276
err = io_submit(aio, 1, &newio);
278
printf("Failed to submit AIO jobs %i\n", err);
285
if ((events)&&(!err)) {
286
printf("*** Unprocessed events (%i), probably not enough buffer space...\n", events);
287
// printf(" curio (%zu), schedio (%zu)\n", curio, schedio);
296
cur_file_size += done_size[io];
297
total_size += done_size[io];
299
gettimeofday(&tv, NULL);
300
us = (tv.tv_sec - start.tv_sec) * 1000000 + (tv.tv_usec - start.tv_usec);
301
fileus = (tv.tv_sec - fstart.tv_sec) * 1000000 + (tv.tv_usec - fstart.tv_usec);
303
if (done_finish[io]) {
304
// printf("closing %i ===> %s (%i)\n", io, done_file[io], done_fd[io]);
306
gettimeofday(&fstart, NULL);
310
if ((us - last_write) > WRITE_INTERVAL * 1000000) {
312
printf("Read: %lu files (%lu GB) at %zu MB/s", files, total_size / 1024 / 1024 / 1024, (size_t)(mcoef * total_size / us));
314
if (1) {//cur_file_size > BLOCK_SIZE) {
315
printf(", Last: %s (%lu MB) at %zu MB/s\n", done_file[io], cur_file_size / 1024 / 1024, (size_t)(mcoef * cur_file_size / fileus));
321
if (done_finish[io]) {
330
gettimeofday(&tv, NULL);
331
us = (tv.tv_sec - start.tv_sec) * 1000000 + (tv.tv_usec - start.tv_usec);
332
printf("Total: %lu files (%lu GB) at %zu MB/s\n", files, total_size / 1024 / 1024 / 1024, (size_t)(mcoef * total_size / us));
b'\\ No newline at end of file'