/alps/fwbench

To get this branch, use:
bzr branch http://darksoft.org/webbzr/alps/fwbench
18 by Suren A. Chilingaryan
EDF reading emulation (iteration1)
1
#define _GNU_SOURCE
2
3
#include <stdio.h>
4
#include <stdlib.h>
5
#include <stdint.h>
6
#include <sys/types.h>
7
#include <sys/stat.h>
8
#include <sys/time.h>
9
#include <unistd.h>
10
#include <dirent.h>
11
#include <fcntl.h>
12
#include <string.h>
13
#include <errno.h>
14
15
#include <libaio.h>
16
17
#define FASTWRITER_SYNCIO_ALIGN 4096
18
19
#define SYNC_MODE
20
#define AIO_MODE 2
21
#define EXTRA_BUFFERS 2
22
#define WRITE_INTERVAL 1
23
24
size_t SKIP = 1;
25
size_t SEGMENT = 0;
26
size_t LINE = 0;
27
28
#define RAID_STRIP_SIZE 	256
29
#define RAID_DISKS		8
30
#define STRIPS_AT_ONCE		2
31
32
#define MIN_BLOCK_SIZE (1024 * RAID_STRIP_SIZE)
33
#define BLOCK_SIZE (1024 * RAID_STRIP_SIZE * RAID_DISKS * STRIPS_AT_ONCE)
34
#define BUFSIZE (BLOCK_SIZE * (AIO_MODE + EXTRA_BUFFERS))
35
36
#ifdef AIO_MODE
37
# define SYNC_MODE
38
#endif /* AIO_MODE */
39
40
41
DIR *dir; 
42
struct dirent *ent; 
43
int flags = O_RDONLY|O_NOATIME|O_LARGEFILE|O_DIRECT;
44
45
size_t run = 0; 	// file reading order
46
size_t skip = 0; 	// file reading order
47
48
int fd = -1;		// we are currently scheduling reads from this file
49
size_t offset = 0;	// we are schedulling reads at this offset
50
size_t size;		// size of block we are schedulling to read
51
size_t file_size; 	// total size of current file
52
size_t file_block_size; // access unit for current file
53
54
size_t curf = 0, schedf = 0;
55
size_t curio = 0, schedio = 0;
56
57
#define MAXLEN 128
58
int done[AIO_MODE + EXTRA_BUFFERS];
59
int done_finish[AIO_MODE + EXTRA_BUFFERS];
60
char done_file[AIO_MODE + EXTRA_BUFFERS][MAXLEN + 1];
61
size_t done_offset[AIO_MODE + EXTRA_BUFFERS];
62
size_t done_size[AIO_MODE + EXTRA_BUFFERS];
63
int done_fd[AIO_MODE + EXTRA_BUFFERS];
64
65
#define buf(io) (io%(AIO_MODE + EXTRA_BUFFERS))
66
#define max(a,b) (((a)>(b))?(a):(b))
67
#define min(a,b) (((a)<(b))?(a):(b))
68
#define page(size) (((size/4096)+(size%4096?1:0))*4096)
69
70
71
int next_file() {
72
    size_t size;
73
    struct stat st;
74
    
75
next:
76
    while ((ent = readdir(dir))) {
77
	skip += 1;
78
	if ((((skip) - 1)%SKIP) != run) continue;
79
	if (stat(ent->d_name, &st)) continue;
80
	if (!S_ISREG(st.st_mode)) continue;
81
	break;
82
    }
83
84
    if (ent) {
85
	size = st.st_blksize;
86
87
	int fd = open(ent->d_name, flags, 0);
88
	if (fd < 0) goto next;
89
90
	if (size < MIN_BLOCK_SIZE) size = BLOCK_SIZE;
91
92
	if (size > BUFSIZE) {
93
	    printf("Buffer too small\n");
94
	    exit(1);
95
	}
96
	
97
	file_block_size = size;
98
	file_size = st.st_size;
99
	offset = 0;
100
	
101
	return fd;
102
103
    } else {
104
	skip = 0; 
105
	run += 1;
106
	if (run < SKIP) {
107
	    closedir(dir);
108
	    dir = opendir(".");
109
	    goto next;
110
	}
111
    }
112
113
    return -1;
114
}
115
116
int next_block() {
117
    int io;
118
    size_t next_offset;
119
    
120
    if (fd >= 0) {
121
	if (LINE) offset += SEGMENT;
122
	else offset += size;
123
124
	if (offset >= file_size) {
125
	    fd = -1;
126
	}
127
    }
128
129
    if (fd < 0) {
130
	fd = next_file();
131
	if (fd < 0) return -1;
132
//	printf("open ===> %s (%i)\n", ent->d_name, fd);
133
    }
134
135
    if (LINE) size = LINE;
136
    else size = file_block_size;
137
138
    if ((offset + size) > file_size) size = file_size - offset;
139
    
140
    io = buf(schedio);
141
    strncpy(done_file[io], ent->d_name, MAXLEN);
142
    done_offset[io] = offset;
143
    done_size[io] = size;
144
    done_fd[io] = fd;
145
    done[io] = 0;
146
147
    if (LINE) next_offset = offset + SEGMENT;
148
    else next_offset = offset + size;
149
150
    if (next_offset >= file_size) done_finish[io] = 1;
151
    else done_finish[io] = 0;
152
153
    return 0;
154
}
155
156
157
158
int main(int argc, char *argv[]) {
159
    int err;
160
    int i, n, io;
161
    long double mcoef = 1000000. / (1024 * 1024);
162
163
    io_context_t aio;
164
    struct iocb ios[AIO_MODE], *ioptr[AIO_MODE];
165
166
    int events;
167
    struct io_event ev[AIO_MODE];
168
169
    int ready;
170
    void *buffer;
171
    struct timeval start, fstart, tv;
172
173
    if (argc < 2) {
174
	printf("Usage: %s <directory> [skip] [segment] [line]\n", argv[0]);
175
	exit(0);
176
    }
177
    
178
    chdir(argv[1]);
179
    dir = opendir(".");
180
    
181
    if (argc > 2) SKIP = atoi(argv[2]);
182
    if (argc > 3) SEGMENT = atoi(argv[3]);
183
    if (argc > 4) LINE = atoi(argv[4]);
184
185
    if (!SKIP) SKIP = 1;
186
    
187
    printf("%s: Skip %zu, Segment %zu, Line %zu\n", argv[1], SKIP, SEGMENT, LINE);
188
189
    posix_memalign((void**)&buffer, FASTWRITER_SYNCIO_ALIGN, BUFSIZE);
190
191
    memset(done, 0, sizeof(done));
192
    memset(&aio, 0, sizeof(aio));
193
    io_queue_init(AIO_MODE, &aio);
194
195
    for (schedio = 0; schedio < AIO_MODE; schedio++) {
196
	ioptr[schedio] = &ios[schedio];
197
	memset(ioptr[schedio], 0, sizeof(struct iocb));
198
	
199
	err = next_block();
200
	if (err) break;
201
	
202
	io_prep_pread(ioptr[schedio], fd, buffer + schedio * BLOCK_SIZE, page(size), offset);
203
	io_set_callback(ioptr[schedio], (void*)(uintptr_t)schedio);
204
//	printf("sched %zu: %zu (%zu %zu)\n", schedio, schedio, offset, size);
205
    }
206
207
    size_t us, fileus; 
208
    size_t last_write = 0;
209
//    size_t last_file_write = 0;
210
//    size_t last_file_size = 0;
211
    size_t cur_file_size = 0;
212
    size_t total_size = 0;
213
    size_t files = 0;
214
215
    gettimeofday(&start, NULL);
216
    gettimeofday(&fstart, NULL);
217
218
    n = io_submit(aio, schedio, ioptr);
219
    if (n != schedio) {
220
        printf("Failed to submit initial AIO job, io_submit returned %i\n", err);
221
        exit(-1);
222
    }
223
224
    curio = 0;
225
    events = 0;
226
227
228
    ready = 0;    
229
230
    while ((err >= 0)||(curio != schedio)) {
231
	io = buf(curio);
232
	
233
	if (!done[io]) {
234
//		printf("%i,%i - %i [%i %i %i %i]\n", curio, schedio, events, done[0], done[1], done[2], done[3]);
235
236
		if (curio < schedio) {
237
		    n = io_getevents(aio, 1, AIO_MODE + EXTRA_BUFFERS - events, &ev[events], NULL);
238
		    if (n < 0) {
239
			printf("Error waiting for AIO (%i)\n", -err);
240
			exit(-1);
241
		    }
242
		} else {
243
		    n = 0;
244
		}
245
		
246
		if ((!ready)&&(n > 1)) {
247
		    printf("*** Multiple read requests (%i of %i) are finished simultaneously. It is either:\n", err, AIO_MODE);
248
		    printf("      Small buffer size (%i KB)\n", BLOCK_SIZE/1024);
249
		    printf("      More parallel AIOs (%i) than supported by kernel, try %i\n", AIO_MODE, AIO_MODE - err);
250
		}
251
		
252
		for (i = 0; i < n; i++) {
253
		    struct io_event *ep = &ev[events + i];
254
		    int doneio = (uintptr_t)ep->data;
255
		    io = buf(doneio);
256
//		    printf("done %i: %lu %zu %zi\n", doneio, ep->res2, done_size[io], ep->res);
257
	    	    if (ep->res2 || (ep->res < done_size[io])) {
258
	    		printf("Error in async IO (ret: %li, ret size: %zi, expected %zu)\n", ep->res2, ep->res, page(done_size[io]));
259
	    		exit(-1);
260
	    	    }
261
		    done[io] = 1;
262
//		    printf("done (%i): %i\n", i, doneio);
263
		}
264
		
265
		events += n;
266
		
267
		for (i = events - 1; (i >= 0)&&((schedio - curio) < (AIO_MODE + EXTRA_BUFFERS)); i--) {
268
		    err = next_block();
269
		    if (err) break;
270
		    
271
//		    printf("sched %i: %zu (%i %zu %zu)\n", i, schedio, fd, offset, size);
272
		    struct iocb *newio = (struct iocb *)ev[i].obj;
273
		    memset(newio, 0, sizeof(struct iocb));
274
		    io_prep_pread(newio, fd, buffer + buf(schedio) * BLOCK_SIZE, page(size), offset);
275
		    io_set_callback(newio, (void*)(uintptr_t)schedio);
276
		    err = io_submit(aio, 1, &newio);
277
		    if (err != 1) {
278
			printf("Failed to submit AIO jobs %i\n", err);
279
			exit(-1);
280
		    }
281
		    schedio++;
282
		}
283
		events = i + 1;
284
		
285
		if ((events)&&(!err)) {
286
		    printf("*** Unprocessed events (%i), probably not enough buffer space...\n", events);
287
//		    printf("      curio (%zu), schedio (%zu)\n", curio, schedio);
288
		}
289
290
		ready = 1;
291
		continue;
292
	    }
293
294
	    io = buf(curio);
295
296
	    cur_file_size += done_size[io];
297
	    total_size += done_size[io];
298
299
	    gettimeofday(&tv, NULL);
300
	    us = (tv.tv_sec - start.tv_sec) * 1000000 + (tv.tv_usec - start.tv_usec);
301
	    fileus = (tv.tv_sec - fstart.tv_sec) * 1000000 + (tv.tv_usec - fstart.tv_usec);
302
	    
303
	    if (done_finish[io]) {
304
//		printf("closing %i ===> %s (%i)\n", io, done_file[io], done_fd[io]);
305
		close(done_fd[io]);
306
	        gettimeofday(&fstart, NULL);
307
		files++;
308
	    }
309
	    
310
	    if ((us - last_write) > WRITE_INTERVAL * 1000000) {
311
		last_write = us;
312
		printf("Read: %lu files (%lu GB) at %zu MB/s", files, total_size / 1024 / 1024 / 1024, (size_t)(mcoef * total_size / us));
313
		
314
		if (1) {//cur_file_size > BLOCK_SIZE) {
315
	    	    printf(", Last: %s (%lu MB) at %zu MB/s\n", done_file[io], cur_file_size / 1024 / 1024, (size_t)(mcoef * cur_file_size / fileus));
316
	    	} else {
317
	    	    printf("\n");
318
	    	}
319
	    }
320
321
	    if (done_finish[io]) {
322
		cur_file_size = 0;
323
	    }
324
	    
325
	    done[io] = 0;
326
327
	    curio++;
328
	}
329
330
        gettimeofday(&tv, NULL);
331
        us = (tv.tv_sec - start.tv_sec) * 1000000 + (tv.tv_usec - start.tv_usec);
332
	printf("Total: %lu files (%lu GB) at %zu MB/s\n", files, total_size / 1024 / 1024 / 1024, (size_t)(mcoef * total_size / us));
333
	
334
	free(buffer);
335
	closedir(dir);
336
}