/alps/fastwriter

To get this branch, use:
bzr branch http://darksoft.org/webbzr/alps/fastwriter
1 by Suren A. Chilingaryan
Initial release
1
#define _FASTWRITER_DEFAULT_C
2
3
#define _GNU_SOURCE
4
#define _XOPEN_SOURCE 600
5
#define _POSIX_C_SOURCE 200112L
6
#define _LARGEFILE64_SOURCE
7
8
#include "config.h"
9
10
#include <stdio.h>
11
#include <stdlib.h>
12
#include <string.h>
13
#include <unistd.h>
14
#include <limits.h>
15
#include <errno.h>
16
17
#include <pthread.h>
18
19
#include <sys/types.h>
20
#include <sys/stat.h>
21
#include <sys/time.h>
22
23
#include <fcntl.h>
24
25
26
#ifdef HAVE_LINUX_FALLOC_H
27
# include <linux/falloc.h>
28
#endif /* HAVE_LINUX_FALLOC_H */
29
9 by Suren A. Chilingaryan
Support XFS RealTime partition
30
#ifndef DISABLE_XFS_REALTIME
31
# include <xfs/xfs.h>
32
#endif /* !DISABLE_XFS_REALTIME */
33
13 by Suren A. Chilingaryan
AIO support
34
1 by Suren A. Chilingaryan
Initial release
35
#include "fastwriter.h"
36
#include "private.h"
37
#include "sysinfo.h"
1.1.1 by Suren A. Chilingaryan
Compile-in default api descriptor
38
#include "default.h"
1 by Suren A. Chilingaryan
Initial release
39
40
#define SYNC_MODE
41
#define HAVE_FALLOCATE
42
#define EXT4_WRITEBLOCK 4194304
43
#define EXT4_PREALLOCATE 1073741824
13 by Suren A. Chilingaryan
AIO support
44
#define OCFS_WRITEBLOCK 262144
45
#define AIO_QUEUE_LENGTH 4
46
#define AIO_BUFFERS 8
47
48
49
#ifndef DISABLE_AIO
50
# include <libaio.h>
51
# if AIO_QUEUE_LENGTH > AIO_BUFFERS
52
#  error "AIO_QUEUE_LENGTH > AIO_BUFFERS"
53
# endif
54
#endif /* DISABLE_AIO */
55
56
57
#ifndef DISABLE_AIO
58
typedef struct {
59
    size_t offset;
60
    size_t size;
61
    int ios;
62
    int ready;			/**< 0 - unused, 1 - processing, 2 - done */
63
} fastwriter_data_t;
64
#endif /* !DISABLE_AIO */
1 by Suren A. Chilingaryan
Initial release
65
66
typedef struct {
67
    int fd;
4 by Suren A. Chilingaryan
Few synchronization and alignment related fixes
68
13 by Suren A. Chilingaryan
AIO support
69
    int sync_mode;		/**< Open with O_DIRECT flag to avoid caches */
70
    int aio_mode;		/**< Use kernel AIO (libaio.h) */
1 by Suren A. Chilingaryan
Initial release
71
    
72
    size_t prior_size;		/**< original size of file */
73
    size_t preallocated;	/**< preallocated bytes */
74
    
75
    size_t wr_block;		/**< minimal block of data to write */
76
    size_t pa_block;		/**< preallocation setp */
13 by Suren A. Chilingaryan
AIO support
77
78
#ifndef DISABLE_AIO
79
    io_context_t aio;
80
    
81
    int ios_ready_n;
82
    int ios_ready[AIO_QUEUE_LENGTH];
83
    struct iocb ios[AIO_QUEUE_LENGTH];
84
85
    int data_head, data_tail;
86
    fastwriter_data_t data[AIO_BUFFERS];
87
    
88
    int ios_status[AIO_QUEUE_LENGTH];
89
    
90
    size_t sched;		/**< how far we ahead of currently writted head */
91
    size_t fd_offset;		/**< current file offset */
92
93
    int page_size;
94
#endif /* !DISABLE_AIO */
1 by Suren A. Chilingaryan
Initial release
95
} fastwriter_default_t;
96
97
1.1.1 by Suren A. Chilingaryan
Compile-in default api descriptor
98
int fastwriter_default_open(fastwriter_t *fw, const char *name, fastwriter_flags_t flags) {
1 by Suren A. Chilingaryan
Initial release
99
    int err;
100
    char fs[16];
101
9 by Suren A. Chilingaryan
Support XFS RealTime partition
102
#ifndef DISABLE_XFS_REALTIME
103
    struct fsxattr attr;
104
#endif /* !DISABLE_XFS_REALTIME */
105
1 by Suren A. Chilingaryan
Initial release
106
    int open_flags = (O_CREAT|O_WRONLY|O_NOATIME|O_LARGEFILE);
107
    int open_mode = (S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
108
109
    
110
    fastwriter_default_t *ctx;
111
5 by Suren A. Chilingaryan
Properly detect /dev/null as raw device and do not set DIRECT flag on raw devices
112
    err = fastwriter_get_file_fs(name, sizeof(fs) - 1, fs);
1 by Suren A. Chilingaryan
Initial release
113
    if (err) return err;
114
    
115
    ctx = (fastwriter_default_t*)malloc(sizeof(fastwriter_default_t));
116
    if (!ctx) return ENOMEM;
117
118
    memset(ctx, 0, sizeof(fastwriter_default_t));
119
120
    fw->ctx = ctx;
121
4 by Suren A. Chilingaryan
Few synchronization and alignment related fixes
122
#ifdef SYNC_MODE
123
    ctx->sync_mode = 1;
124
#endif /* SYNC_MODE */
125
126
    ctx->prior_size = 0;
127
1 by Suren A. Chilingaryan
Initial release
128
    if (!strcmp(fs, "raw")) {
129
	ctx->wr_block = EXT4_WRITEBLOCK;
130
	ctx->pa_block = 0;
4 by Suren A. Chilingaryan
Few synchronization and alignment related fixes
131
	ctx->prior_size = (size_t)-1;
7 by Suren A. Chilingaryan
Clean open flags in raw mode
132
	open_flags &= ~(O_CREAT|O_NOATIME|O_LARGEFILE);
1 by Suren A. Chilingaryan
Initial release
133
    } else if (!strcmp(fs, "ext4")) {
134
	ctx->wr_block = EXT4_WRITEBLOCK;
135
	ctx->pa_block = EXT4_PREALLOCATE;
136
    } else if (!strcmp(fs, "btrfs")) {
137
	ctx->wr_block = EXT4_WRITEBLOCK;
138
	ctx->pa_block = EXT4_PREALLOCATE;
139
    } else if (!strcmp(fs, "xfs")) {
140
	ctx->wr_block = EXT4_WRITEBLOCK;
141
	ctx->pa_block = EXT4_PREALLOCATE;
13 by Suren A. Chilingaryan
AIO support
142
    } else if (!strcmp(fs, "ocfs2")) {
143
#ifndef DISABLE_AIO
144
	ctx->aio_mode = 1;
145
	ctx->sync_mode = 0;
146
	ctx->wr_block = OCFS_WRITEBLOCK;
147
#else /* !DISABLE_AIO */
148
	ctx->wr_block = EXT4_WRITEBLOCK;
149
#endif /* !DISABLE_AIO */
150
	ctx->pa_block = EXT4_PREALLOCATE;
1 by Suren A. Chilingaryan
Initial release
151
    } else {
152
	ctx->wr_block = EXT4_WRITEBLOCK;
153
	ctx->pa_block = 0;
154
    }
13 by Suren A. Chilingaryan
AIO support
155
5 by Suren A. Chilingaryan
Properly detect /dev/null as raw device and do not set DIRECT flag on raw devices
156
    if (ctx->sync_mode) {
157
	open_flags |= O_DIRECT;
158
    }
159
1 by Suren A. Chilingaryan
Initial release
160
    if (flags&FASTWRITER_FLAGS_OVERWRITE)
161
	open_flags |= O_TRUNC;
162
163
    ctx->fd = open(name, open_flags, open_mode);
10 by Suren A. Chilingaryan
Try without O_DIRECT if run under normal user
164
    if (ctx->fd < 0) {
165
	    // Running as normal user, try to disable direct mode
166
	if ((errno == EINVAL)&&(ctx->sync_mode)) {
167
	    ctx->sync_mode = 0;
168
	    open_flags &= ~O_DIRECT;
169
	    ctx->fd = open(name, open_flags, open_mode);
170
	}
171
	if (ctx->fd < 0) return errno;
172
    }
1 by Suren A. Chilingaryan
Initial release
173
174
    if (((open_flags&FASTWRITER_FLAGS_OVERWRITE)==0)&&(strcmp(fs, "raw"))) {
13 by Suren A. Chilingaryan
AIO support
175
	ctx->prior_size = lseek64(ctx->fd, 0, SEEK_END);
176
4 by Suren A. Chilingaryan
Few synchronization and alignment related fixes
177
	if (ctx->prior_size%FASTWRITER_SYNCIO_ALIGN) {
178
	    close(ctx->fd);
179
	    
180
	    ctx->fd = open(name, open_flags&~O_DIRECT, open_mode);
181
	    if (ctx->fd < 0) return errno;
182
	    
13 by Suren A. Chilingaryan
AIO support
183
	    ctx->prior_size = lseek64(ctx->fd, 0, SEEK_END);
4 by Suren A. Chilingaryan
Few synchronization and alignment related fixes
184
	    
185
	    ctx->sync_mode = 0;
13 by Suren A. Chilingaryan
AIO support
186
	    ctx->aio_mode = 0;
4 by Suren A. Chilingaryan
Few synchronization and alignment related fixes
187
	}
1 by Suren A. Chilingaryan
Initial release
188
    }
189
9 by Suren A. Chilingaryan
Support XFS RealTime partition
190
#ifndef DISABLE_XFS_REALTIME
191
    if (!strcmp(fs, "xfs")) {
192
	err = xfsctl (name, ctx->fd, XFS_IOC_FSGETXATTR, (void *) &attr);
193
	if (!err) {
194
	    attr.fsx_xflags |= XFS_XFLAG_REALTIME;
195
	    err = xfsctl (name, ctx->fd, XFS_IOC_FSSETXATTR, (void *) &attr);
13 by Suren A. Chilingaryan
AIO support
196
	    if (err) fprintf(stderr, "Error initializing XFS real-time mode (%i), disabling...\n", err);
9 by Suren A. Chilingaryan
Support XFS RealTime partition
197
	}
198
    }
199
#endif /* !DISABLE_XFS_REALTIME */
200
13 by Suren A. Chilingaryan
AIO support
201
#ifndef DISABLE_AIO
202
    if (ctx->aio_mode) {
203
	int i;
204
	ctx->page_size = getpagesize();
205
	ctx->fd_offset = ctx->prior_size;
206
207
	ctx->ios_ready_n = AIO_QUEUE_LENGTH;
208
	for (i = 0; i < AIO_QUEUE_LENGTH; i++) {
209
	    ctx->ios_ready[i] = i;
210
	}
211
	
212
	err = io_queue_init(AIO_QUEUE_LENGTH, &ctx->aio);
213
	if (err) {
214
	    fprintf(stderr, "Error initializing AIO mode (%i), disabling...\n", -err);
215
	    ctx->aio_mode = 0;
216
	}
217
    }
218
#endif /* !DISABLE_AIO */
219
1 by Suren A. Chilingaryan
Initial release
220
    ctx->preallocated = 0;
221
222
    return 0;
223
}
224
225
1.1.1 by Suren A. Chilingaryan
Compile-in default api descriptor
226
void fastwriter_default_close(fastwriter_t *fw) {
1 by Suren A. Chilingaryan
Initial release
227
    if (fw->ctx) {
228
	fastwriter_default_t *ctx = (fastwriter_default_t*)fw->ctx;
229
230
	if (ctx->fd >= 0) {
13 by Suren A. Chilingaryan
AIO support
231
#ifndef DISABLE_AIO
232
	    if ((ctx->aio_mode)&&(ctx->aio)) {
233
		int n_ev;
234
		struct io_event ev[AIO_QUEUE_LENGTH];
235
		
236
		while (ctx->ios_ready_n < AIO_QUEUE_LENGTH) {
237
		    n_ev = io_getevents(ctx->aio, 1, AIO_QUEUE_LENGTH, &ev[0], NULL);
238
		    if (n_ev <= 0) {
239
			fprintf(stderr, "AIO io_getevents have failed (%i)", -n_ev);
240
			break;
241
		    }
242
		    ctx->ios_ready_n += n_ev;
243
		}
244
		
245
		io_queue_release(ctx->aio);
246
	    }
247
#endif /* DISABLE_AIO */
248
249
#ifdef HAVE_LINUX_FALLOC_H
4 by Suren A. Chilingaryan
Few synchronization and alignment related fixes
250
	    if (ctx->prior_size != (size_t)-1) {
13 by Suren A. Chilingaryan
AIO support
251
#else /* HAVE_LINUX_FALLOC_H */
252
	    if ((ctx->prior_size != (size_t)-1)&&((ctx->sync_mode)||(ctx->aio_mode))) {
253
#endif /* HAVE_LINUX_FALLOC_H */
4 by Suren A. Chilingaryan
Few synchronization and alignment related fixes
254
		ftruncate(ctx->fd, ctx->prior_size + fw->written);
1 by Suren A. Chilingaryan
Initial release
255
	    }
256
	    close(ctx->fd);
257
	}
258
	
259
	free(ctx);
260
	fw->ctx = NULL;
261
    }
262
}
263
264
1.1.1 by Suren A. Chilingaryan
Compile-in default api descriptor
265
int fastwriter_default_write(fastwriter_t *fw, fastwriter_write_flags_t flags, size_t size, void *data, size_t *written) {
1 by Suren A. Chilingaryan
Initial release
266
    size_t sum = 0;
4 by Suren A. Chilingaryan
Few synchronization and alignment related fixes
267
    size_t delta = 0;
1 by Suren A. Chilingaryan
Initial release
268
    ssize_t res;
269
    fastwriter_default_t *ctx = (fastwriter_default_t*)fw->ctx;
4 by Suren A. Chilingaryan
Few synchronization and alignment related fixes
270
1 by Suren A. Chilingaryan
Initial release
271
    if ((flags&FASTWRITER_WRITE_FLAG_FORCE)==0) {
272
	if (size < ctx->wr_block) {
273
	    *written = 0;
274
	    return 0;
275
	}
13 by Suren A. Chilingaryan
AIO support
276
1 by Suren A. Chilingaryan
Initial release
277
        size -= size % ctx->wr_block;
278
    }
279
280
    if ((ctx->pa_block)&&((fw->written + size) > ctx->preallocated)) {
281
#ifdef HAVE_LINUX_FALLOC_H
282
    	if (fallocate(ctx->fd, FALLOC_FL_KEEP_SIZE, ctx->preallocated, ctx->pa_block)) {
283
#else /* HAVE_LINUX_FALLOC_H */
284
    	if (posix_fallocate(ctx->fd, ctx->preallocated, ctx->pa_block)) {
285
#endif /* HAVE_LINUX_FALLOC_H */
286
	    ctx->pa_block = 0;
287
	} else {
288
	    ctx->preallocated += ctx->pa_block;
289
	}
290
    }
4 by Suren A. Chilingaryan
Few synchronization and alignment related fixes
291
292
	// we expect this to happen only at last iteration (buffer is multiply of the required align)
13 by Suren A. Chilingaryan
AIO support
293
    if (((ctx->aio_mode)||(ctx->sync_mode))&&(size%FASTWRITER_SYNCIO_ALIGN)) {
4 by Suren A. Chilingaryan
Few synchronization and alignment related fixes
294
	delta = FASTWRITER_SYNCIO_ALIGN - size%FASTWRITER_SYNCIO_ALIGN;
295
    }
1 by Suren A. Chilingaryan
Initial release
296
    
13 by Suren A. Chilingaryan
AIO support
297
#ifndef DISABLE_AIO
298
    if (ctx->aio_mode) {
299
	int err;
300
	size_t done = 0;
301
	size_t sched = 0;
302
303
	fastwriter_data_t *iodata;
304
	struct iocb *newio;
305
	size_t wr_block = ctx->wr_block;
306
307
	do {
308
	    if (!ctx->ios_ready_n) {
309
		int i, n_ev;
310
		struct io_event ev[AIO_QUEUE_LENGTH];
311
		
312
		n_ev = io_getevents(ctx->aio, 1, AIO_QUEUE_LENGTH, &ev[0], NULL);
313
		if (n_ev <= 0) {
314
		    fprintf(stderr, "AIO io_getevents have failed (%i)", -n_ev);
315
		    return -n_ev;
316
		}
317
		
318
		for (i = 0; i < n_ev; i++) {
319
		    fastwriter_data_t *ev_data = (fastwriter_data_t *)(ev[i].data);
320
		    if ((ev[i].res2)||(ev[i].res < ev_data->size)) {
321
			fprintf(stderr, "AIO write failed (res: %li, res2: %li, expected: %zu), no handling data will be corrupted...\n", ev[i].res, ev[i].res2, ev_data->size);
322
			return -ev[i].res2;
323
		    }
324
325
		    ctx->ios_ready[ctx->ios_ready_n++] = ev_data->ios;
326
//		    printf("Data: %i (ios %i)\n", ev_data->ready, ev_data->ios);
327
		    ev_data->ready = 2;
328
		}
329
		
330
		while (ctx->data[ctx->data_tail].ready > 1) {
331
//		    printf("Done: %i %zu\n", ctx->data_tail, ctx->data[ctx->data_tail].offset);
332
		    ctx->data[ctx->data_tail].ready = 0;
333
334
		    done += ctx->data[ctx->data_tail].size;
335
		    if ((++ctx->data_tail) == AIO_BUFFERS) ctx->data_tail = 0;
336
		}
337
	    }
338
	    
339
	    if ((ctx->sched + sched) < size) {
340
		if ((ctx->data_head == ctx->data_tail)&&(ctx->data[ctx->data_head].ready)) continue;
341
342
		newio = (struct iocb*)&ctx->ios[ctx->ios_ready[--ctx->ios_ready_n]];
343
	        iodata = &ctx->data[ctx->data_head];
344
345
		if (wr_block > ((size + delta) - (ctx->sched + sched))) {
346
		    wr_block = (size + delta) - (ctx->sched + sched);
347
		    if (wr_block % ctx->page_size) {
348
			fprintf(stderr, "We need to write incomplete page (%zu bytes). This is no supported yet...\n", wr_block);
349
			return -1;
350
		    }
351
		}
352
		
353
//		printf("Sched: %lu => %lu (%lu) [tail %lu, head %lu]\n", ctx->sched + sched, ctx->fd_offset, wr_block, ctx->data_tail, ctx->data_head);
354
355
		iodata->offset = ctx->fd_offset;
356
		iodata->size = wr_block;
357
		iodata->ios = ctx->ios_ready_n;
358
359
		io_prep_pwrite(newio, ctx->fd, data + ctx->sched + sched, wr_block, ctx->fd_offset);
360
		io_set_callback(newio, (void*)iodata);
361
		err = io_submit(ctx->aio, 1, &newio);
362
		if (err != 1) {
363
		    fprintf(stderr, "Error submiting AIO job (%i)\n", -err);
364
		    return -err;
365
		}
366
367
		iodata->ready = 1;
368
		sched += wr_block;
369
		ctx->fd_offset += wr_block;
370
		if ((++ctx->data_head) == AIO_BUFFERS) ctx->data_head = 0;
371
	    }
372
	} while (!done);
373
374
	ctx->sched += sched - done;
375
	size = done;
376
    } else {
377
#endif /* !DISABLE_AIO */
378
	do {
379
	    res = write(ctx->fd, data + sum, size + delta - sum);
380
	    if (res < 0) {
381
		*written = sum;
382
		return errno;
383
	    }
1 by Suren A. Chilingaryan
Initial release
384
	
13 by Suren A. Chilingaryan
AIO support
385
	    sum += res;
386
	} while (sum < size);
387
#ifndef DISABLE_AIO
388
    }
389
#endif /* !DISABLE_AIO */
390
391
    if ((ctx->sync_mode)||(ctx->aio_mode)) {
392
	posix_fadvise(ctx->fd, fw->written, size, POSIX_FADV_DONTNEED);
393
    }
394
1 by Suren A. Chilingaryan
Initial release
395
    *written = size;
13 by Suren A. Chilingaryan
AIO support
396
1 by Suren A. Chilingaryan
Initial release
397
    return 0;
398
}