/alps/fwbench

To get this branch, use:
bzr branch http://darksoft.org/webbzr/alps/fwbench

« back to all changes in this revision

Viewing changes to rndreader.c

  • Committer: Suren A. Chilingaryan
  • Date: 2012-11-29 10:43:08 UTC
  • Revision ID: csa@dside.dyndns.org-20121129104308-tdjkr69asp33nctf
EDF reading emulation (iteration1)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#define _GNU_SOURCE
 
2
 
 
3
#include <stdio.h>
 
4
#include <stdlib.h>
 
5
#include <stdint.h>
 
6
#include <sys/types.h>
 
7
#include <sys/stat.h>
 
8
#include <sys/time.h>
 
9
#include <unistd.h>
 
10
#include <dirent.h>
 
11
#include <fcntl.h>
 
12
#include <string.h>
 
13
#include <errno.h>
 
14
 
 
15
#include <libaio.h>
 
16
 
 
17
#define FASTWRITER_SYNCIO_ALIGN 4096
 
18
 
 
19
#define SYNC_MODE
 
20
#define AIO_MODE 2
 
21
#define EXTRA_BUFFERS 2
 
22
#define WRITE_INTERVAL 1
 
23
 
 
24
size_t SKIP = 1;
 
25
size_t SEGMENT = 0;
 
26
size_t LINE = 0;
 
27
 
 
28
#define RAID_STRIP_SIZE         256
 
29
#define RAID_DISKS              8
 
30
#define STRIPS_AT_ONCE          2
 
31
 
 
32
#define MIN_BLOCK_SIZE (1024 * RAID_STRIP_SIZE)
 
33
#define BLOCK_SIZE (1024 * RAID_STRIP_SIZE * RAID_DISKS * STRIPS_AT_ONCE)
 
34
#define BUFSIZE (BLOCK_SIZE * (AIO_MODE + EXTRA_BUFFERS))
 
35
 
 
36
#ifdef AIO_MODE
 
37
# define SYNC_MODE
 
38
#endif /* AIO_MODE */
 
39
 
 
40
 
 
41
DIR *dir; 
 
42
struct dirent *ent; 
 
43
int flags = O_RDONLY|O_NOATIME|O_LARGEFILE|O_DIRECT;
 
44
 
 
45
size_t run = 0;         // file reading order
 
46
size_t skip = 0;        // file reading order
 
47
 
 
48
int fd = -1;            // we are currently scheduling reads from this file
 
49
size_t offset = 0;      // we are schedulling reads at this offset
 
50
size_t size;            // size of block we are schedulling to read
 
51
size_t file_size;       // total size of current file
 
52
size_t file_block_size; // access unit for current file
 
53
 
 
54
size_t curf = 0, schedf = 0;
 
55
size_t curio = 0, schedio = 0;
 
56
 
 
57
#define MAXLEN 128
 
58
int done[AIO_MODE + EXTRA_BUFFERS];
 
59
int done_finish[AIO_MODE + EXTRA_BUFFERS];
 
60
char done_file[AIO_MODE + EXTRA_BUFFERS][MAXLEN + 1];
 
61
size_t done_offset[AIO_MODE + EXTRA_BUFFERS];
 
62
size_t done_size[AIO_MODE + EXTRA_BUFFERS];
 
63
int done_fd[AIO_MODE + EXTRA_BUFFERS];
 
64
 
 
65
#define buf(io) (io%(AIO_MODE + EXTRA_BUFFERS))
 
66
#define max(a,b) (((a)>(b))?(a):(b))
 
67
#define min(a,b) (((a)<(b))?(a):(b))
 
68
#define page(size) (((size/4096)+(size%4096?1:0))*4096)
 
69
 
 
70
 
 
71
int next_file() {
 
72
    size_t size;
 
73
    struct stat st;
 
74
    
 
75
next:
 
76
    while ((ent = readdir(dir))) {
 
77
        skip += 1;
 
78
        if ((((skip) - 1)%SKIP) != run) continue;
 
79
        if (stat(ent->d_name, &st)) continue;
 
80
        if (!S_ISREG(st.st_mode)) continue;
 
81
        break;
 
82
    }
 
83
 
 
84
    if (ent) {
 
85
        size = st.st_blksize;
 
86
 
 
87
        int fd = open(ent->d_name, flags, 0);
 
88
        if (fd < 0) goto next;
 
89
 
 
90
        if (size < MIN_BLOCK_SIZE) size = BLOCK_SIZE;
 
91
 
 
92
        if (size > BUFSIZE) {
 
93
            printf("Buffer too small\n");
 
94
            exit(1);
 
95
        }
 
96
        
 
97
        file_block_size = size;
 
98
        file_size = st.st_size;
 
99
        offset = 0;
 
100
        
 
101
        return fd;
 
102
 
 
103
    } else {
 
104
        skip = 0; 
 
105
        run += 1;
 
106
        if (run < SKIP) {
 
107
            closedir(dir);
 
108
            dir = opendir(".");
 
109
            goto next;
 
110
        }
 
111
    }
 
112
 
 
113
    return -1;
 
114
}
 
115
 
 
116
int next_block() {
 
117
    int io;
 
118
    size_t next_offset;
 
119
    
 
120
    if (fd >= 0) {
 
121
        if (LINE) offset += SEGMENT;
 
122
        else offset += size;
 
123
 
 
124
        if (offset >= file_size) {
 
125
            fd = -1;
 
126
        }
 
127
    }
 
128
 
 
129
    if (fd < 0) {
 
130
        fd = next_file();
 
131
        if (fd < 0) return -1;
 
132
//      printf("open ===> %s (%i)\n", ent->d_name, fd);
 
133
    }
 
134
 
 
135
    if (LINE) size = LINE;
 
136
    else size = file_block_size;
 
137
 
 
138
    if ((offset + size) > file_size) size = file_size - offset;
 
139
    
 
140
    io = buf(schedio);
 
141
    strncpy(done_file[io], ent->d_name, MAXLEN);
 
142
    done_offset[io] = offset;
 
143
    done_size[io] = size;
 
144
    done_fd[io] = fd;
 
145
    done[io] = 0;
 
146
 
 
147
    if (LINE) next_offset = offset + SEGMENT;
 
148
    else next_offset = offset + size;
 
149
 
 
150
    if (next_offset >= file_size) done_finish[io] = 1;
 
151
    else done_finish[io] = 0;
 
152
 
 
153
    return 0;
 
154
}
 
155
 
 
156
 
 
157
 
 
158
int main(int argc, char *argv[]) {
 
159
    int err;
 
160
    int i, n, io;
 
161
    long double mcoef = 1000000. / (1024 * 1024);
 
162
 
 
163
    io_context_t aio;
 
164
    struct iocb ios[AIO_MODE], *ioptr[AIO_MODE];
 
165
 
 
166
    int events;
 
167
    struct io_event ev[AIO_MODE];
 
168
 
 
169
    int ready;
 
170
    void *buffer;
 
171
    struct timeval start, fstart, tv;
 
172
 
 
173
    if (argc < 2) {
 
174
        printf("Usage: %s <directory> [skip] [segment] [line]\n", argv[0]);
 
175
        exit(0);
 
176
    }
 
177
    
 
178
    chdir(argv[1]);
 
179
    dir = opendir(".");
 
180
    
 
181
    if (argc > 2) SKIP = atoi(argv[2]);
 
182
    if (argc > 3) SEGMENT = atoi(argv[3]);
 
183
    if (argc > 4) LINE = atoi(argv[4]);
 
184
 
 
185
    if (!SKIP) SKIP = 1;
 
186
    
 
187
    printf("%s: Skip %zu, Segment %zu, Line %zu\n", argv[1], SKIP, SEGMENT, LINE);
 
188
 
 
189
    posix_memalign((void**)&buffer, FASTWRITER_SYNCIO_ALIGN, BUFSIZE);
 
190
 
 
191
    memset(done, 0, sizeof(done));
 
192
    memset(&aio, 0, sizeof(aio));
 
193
    io_queue_init(AIO_MODE, &aio);
 
194
 
 
195
    for (schedio = 0; schedio < AIO_MODE; schedio++) {
 
196
        ioptr[schedio] = &ios[schedio];
 
197
        memset(ioptr[schedio], 0, sizeof(struct iocb));
 
198
        
 
199
        err = next_block();
 
200
        if (err) break;
 
201
        
 
202
        io_prep_pread(ioptr[schedio], fd, buffer + schedio * BLOCK_SIZE, page(size), offset);
 
203
        io_set_callback(ioptr[schedio], (void*)(uintptr_t)schedio);
 
204
//      printf("sched %zu: %zu (%zu %zu)\n", schedio, schedio, offset, size);
 
205
    }
 
206
 
 
207
    size_t us, fileus; 
 
208
    size_t last_write = 0;
 
209
//    size_t last_file_write = 0;
 
210
//    size_t last_file_size = 0;
 
211
    size_t cur_file_size = 0;
 
212
    size_t total_size = 0;
 
213
    size_t files = 0;
 
214
 
 
215
    gettimeofday(&start, NULL);
 
216
    gettimeofday(&fstart, NULL);
 
217
 
 
218
    n = io_submit(aio, schedio, ioptr);
 
219
    if (n != schedio) {
 
220
        printf("Failed to submit initial AIO job, io_submit returned %i\n", err);
 
221
        exit(-1);
 
222
    }
 
223
 
 
224
    curio = 0;
 
225
    events = 0;
 
226
 
 
227
 
 
228
    ready = 0;    
 
229
 
 
230
    while ((err >= 0)||(curio != schedio)) {
 
231
        io = buf(curio);
 
232
        
 
233
        if (!done[io]) {
 
234
//              printf("%i,%i - %i [%i %i %i %i]\n", curio, schedio, events, done[0], done[1], done[2], done[3]);
 
235
 
 
236
                if (curio < schedio) {
 
237
                    n = io_getevents(aio, 1, AIO_MODE + EXTRA_BUFFERS - events, &ev[events], NULL);
 
238
                    if (n < 0) {
 
239
                        printf("Error waiting for AIO (%i)\n", -err);
 
240
                        exit(-1);
 
241
                    }
 
242
                } else {
 
243
                    n = 0;
 
244
                }
 
245
                
 
246
                if ((!ready)&&(n > 1)) {
 
247
                    printf("*** Multiple read requests (%i of %i) are finished simultaneously. It is either:\n", err, AIO_MODE);
 
248
                    printf("      Small buffer size (%i KB)\n", BLOCK_SIZE/1024);
 
249
                    printf("      More parallel AIOs (%i) than supported by kernel, try %i\n", AIO_MODE, AIO_MODE - err);
 
250
                }
 
251
                
 
252
                for (i = 0; i < n; i++) {
 
253
                    struct io_event *ep = &ev[events + i];
 
254
                    int doneio = (uintptr_t)ep->data;
 
255
                    io = buf(doneio);
 
256
//                  printf("done %i: %lu %zu %zi\n", doneio, ep->res2, done_size[io], ep->res);
 
257
                    if (ep->res2 || (ep->res < done_size[io])) {
 
258
                        printf("Error in async IO (ret: %li, ret size: %zi, expected %zu)\n", ep->res2, ep->res, page(done_size[io]));
 
259
                        exit(-1);
 
260
                    }
 
261
                    done[io] = 1;
 
262
//                  printf("done (%i): %i\n", i, doneio);
 
263
                }
 
264
                
 
265
                events += n;
 
266
                
 
267
                for (i = events - 1; (i >= 0)&&((schedio - curio) < (AIO_MODE + EXTRA_BUFFERS)); i--) {
 
268
                    err = next_block();
 
269
                    if (err) break;
 
270
                    
 
271
//                  printf("sched %i: %zu (%i %zu %zu)\n", i, schedio, fd, offset, size);
 
272
                    struct iocb *newio = (struct iocb *)ev[i].obj;
 
273
                    memset(newio, 0, sizeof(struct iocb));
 
274
                    io_prep_pread(newio, fd, buffer + buf(schedio) * BLOCK_SIZE, page(size), offset);
 
275
                    io_set_callback(newio, (void*)(uintptr_t)schedio);
 
276
                    err = io_submit(aio, 1, &newio);
 
277
                    if (err != 1) {
 
278
                        printf("Failed to submit AIO jobs %i\n", err);
 
279
                        exit(-1);
 
280
                    }
 
281
                    schedio++;
 
282
                }
 
283
                events = i + 1;
 
284
                
 
285
                if ((events)&&(!err)) {
 
286
                    printf("*** Unprocessed events (%i), probably not enough buffer space...\n", events);
 
287
//                  printf("      curio (%zu), schedio (%zu)\n", curio, schedio);
 
288
                }
 
289
 
 
290
                ready = 1;
 
291
                continue;
 
292
            }
 
293
 
 
294
            io = buf(curio);
 
295
 
 
296
            cur_file_size += done_size[io];
 
297
            total_size += done_size[io];
 
298
 
 
299
            gettimeofday(&tv, NULL);
 
300
            us = (tv.tv_sec - start.tv_sec) * 1000000 + (tv.tv_usec - start.tv_usec);
 
301
            fileus = (tv.tv_sec - fstart.tv_sec) * 1000000 + (tv.tv_usec - fstart.tv_usec);
 
302
            
 
303
            if (done_finish[io]) {
 
304
//              printf("closing %i ===> %s (%i)\n", io, done_file[io], done_fd[io]);
 
305
                close(done_fd[io]);
 
306
                gettimeofday(&fstart, NULL);
 
307
                files++;
 
308
            }
 
309
            
 
310
            if ((us - last_write) > WRITE_INTERVAL * 1000000) {
 
311
                last_write = us;
 
312
                printf("Read: %lu files (%lu GB) at %zu MB/s", files, total_size / 1024 / 1024 / 1024, (size_t)(mcoef * total_size / us));
 
313
                
 
314
                if (1) {//cur_file_size > BLOCK_SIZE) {
 
315
                    printf(", Last: %s (%lu MB) at %zu MB/s\n", done_file[io], cur_file_size / 1024 / 1024, (size_t)(mcoef * cur_file_size / fileus));
 
316
                } else {
 
317
                    printf("\n");
 
318
                }
 
319
            }
 
320
 
 
321
            if (done_finish[io]) {
 
322
                cur_file_size = 0;
 
323
            }
 
324
            
 
325
            done[io] = 0;
 
326
 
 
327
            curio++;
 
328
        }
 
329
 
 
330
        gettimeofday(&tv, NULL);
 
331
        us = (tv.tv_sec - start.tv_sec) * 1000000 + (tv.tv_usec - start.tv_usec);
 
332
        printf("Total: %lu files (%lu GB) at %zu MB/s\n", files, total_size / 1024 / 1024 / 1024, (size_t)(mcoef * total_size / us));
 
333
        
 
334
        free(buffer);
 
335
        closedir(dir);
 
336
}
 
 
b'\\ No newline at end of file'