1 /*
2 * Copyright (c) 2008, XenSource Inc.
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 * * Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * * Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 * * Neither the name of XenSource Inc. nor the names of its contributors
13 * may be used to endorse or promote products derived from this software
14 * without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
20 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
21 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
22 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
23 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
24 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
25 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
26 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #include <errno.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32 #include <libaio.h>
33 #ifdef __linux__
34 #include <linux/version.h>
35 #endif
36
37 #include "tapdisk.h"
38 #include "tapdisk-log.h"
39 #include "tapdisk-queue.h"
40 #include "tapdisk-filter.h"
41 #include "tapdisk-server.h"
42 #include "tapdisk-utils.h"
43
44 #include "libaio-compat.h"
45 #include "atomicio.h"
46
47 #define WARN(_f, _a...) tlog_write(TLOG_WARN, _f, ##_a)
48 #define DBG(_f, _a...) tlog_write(TLOG_DBG, _f, ##_a)
49 #define ERR(_err, _f, _a...) tlog_error(_err, _f, ##_a)
50
51 /*
52 * We used a kernel patch to return an fd associated with the AIO context
53 * so that we can concurrently poll on synchronous and async descriptors.
54 * This is signalled by passing 1 as the io context to io_setup.
55 */
56 #define REQUEST_ASYNC_FD ((io_context_t)1)
57
58 static inline void
queue_tiocb(struct tqueue * queue,struct tiocb * tiocb)59 queue_tiocb(struct tqueue *queue, struct tiocb *tiocb)
60 {
61 struct iocb *iocb = &tiocb->iocb;
62
63 if (queue->queued) {
64 struct tiocb *prev = (struct tiocb *)
65 queue->iocbs[queue->queued - 1]->data;
66 prev->next = tiocb;
67 }
68
69 queue->iocbs[queue->queued++] = iocb;
70 }
71
72 static inline int
deferred_tiocbs(struct tqueue * queue)73 deferred_tiocbs(struct tqueue *queue)
74 {
75 return (queue->deferred.head != NULL);
76 }
77
78 static inline void
defer_tiocb(struct tqueue * queue,struct tiocb * tiocb)79 defer_tiocb(struct tqueue *queue, struct tiocb *tiocb)
80 {
81 struct tlist *list = &queue->deferred;
82
83 if (!list->head)
84 list->head = list->tail = tiocb;
85 else
86 list->tail = list->tail->next = tiocb;
87
88 queue->tiocbs_deferred++;
89 queue->deferrals++;
90 }
91
92 static inline void
queue_deferred_tiocb(struct tqueue * queue)93 queue_deferred_tiocb(struct tqueue *queue)
94 {
95 struct tlist *list = &queue->deferred;
96
97 if (list->head) {
98 struct tiocb *tiocb = list->head;
99
100 list->head = tiocb->next;
101 if (!list->head)
102 list->tail = NULL;
103
104 queue_tiocb(queue, tiocb);
105 queue->tiocbs_deferred--;
106 }
107 }
108
109 static inline void
queue_deferred_tiocbs(struct tqueue * queue)110 queue_deferred_tiocbs(struct tqueue *queue)
111 {
112 while (!tapdisk_queue_full(queue) && deferred_tiocbs(queue))
113 queue_deferred_tiocb(queue);
114 }
115
116 /*
117 * td_complete may queue more tiocbs
118 */
119 static void
complete_tiocb(struct tqueue * queue,struct tiocb * tiocb,unsigned long res)120 complete_tiocb(struct tqueue *queue, struct tiocb *tiocb, unsigned long res)
121 {
122 int err;
123 struct iocb *iocb = &tiocb->iocb;
124
125 if (res == iocb->u.c.nbytes)
126 err = 0;
127 else if ((int)res < 0)
128 err = (int)res;
129 else
130 err = -EIO;
131
132 tiocb->cb(tiocb->arg, tiocb, err);
133 }
134
135 static int
cancel_tiocbs(struct tqueue * queue,int err)136 cancel_tiocbs(struct tqueue *queue, int err)
137 {
138 int queued;
139 struct tiocb *tiocb;
140
141 if (!queue->queued)
142 return 0;
143
144 /*
145 * td_complete may queue more tiocbs, which
146 * will overwrite the contents of queue->iocbs.
147 * use a private linked list to keep track
148 * of the tiocbs we're cancelling.
149 */
150 tiocb = queue->iocbs[0]->data;
151 queued = queue->queued;
152 queue->queued = 0;
153
154 for (; tiocb != NULL; tiocb = tiocb->next)
155 complete_tiocb(queue, tiocb, err);
156
157 return queued;
158 }
159
160 static int
fail_tiocbs(struct tqueue * queue,int succeeded,int total,int err)161 fail_tiocbs(struct tqueue *queue, int succeeded, int total, int err)
162 {
163 ERR(err, "io_submit error: %d of %d failed",
164 total - succeeded, total);
165
166 /* take any non-submitted, merged iocbs
167 * off of the queue, split them, and fail them */
168 queue->queued = io_expand_iocbs(&queue->opioctx,
169 queue->iocbs, succeeded, total);
170
171 return cancel_tiocbs(queue, err);
172 }
173
174 /*
175 * rwio
176 */
177
178 struct rwio {
179 struct io_event *aio_events;
180 };
181
182 static void
tapdisk_rwio_destroy(struct tqueue * queue)183 tapdisk_rwio_destroy(struct tqueue *queue)
184 {
185 struct rwio *rwio = queue->tio_data;
186
187 if (rwio->aio_events) {
188 free(rwio->aio_events);
189 rwio->aio_events = NULL;
190 }
191 }
192
193 static int
tapdisk_rwio_setup(struct tqueue * queue,int size)194 tapdisk_rwio_setup(struct tqueue *queue, int size)
195 {
196 struct rwio *rwio = queue->tio_data;
197 int err;
198
199 rwio->aio_events = calloc(size, sizeof(struct io_event));
200 if (!rwio->aio_events)
201 return -errno;
202
203 return 0;
204 }
205
206 static inline ssize_t
tapdisk_rwio_rw(const struct iocb * iocb)207 tapdisk_rwio_rw(const struct iocb *iocb)
208 {
209 int fd = iocb->aio_fildes;
210 char *buf = iocb->u.c.buf;
211 long long off = iocb->u.c.offset;
212 size_t size = iocb->u.c.nbytes;
213 ssize_t (*func)(int, void *, size_t) =
214 (iocb->aio_lio_opcode == IO_CMD_PWRITE ? vwrite : read);
215
216 if (lseek(fd, off, SEEK_SET) == (off_t)-1)
217 return -errno;
218
219 if (atomicio(func, fd, buf, size) != size)
220 return -errno;
221
222 return size;
223 }
224
225 static int
tapdisk_rwio_submit(struct tqueue * queue)226 tapdisk_rwio_submit(struct tqueue *queue)
227 {
228 struct rwio *rwio = queue->tio_data;
229 int i, merged, split;
230 struct iocb *iocb;
231 struct tiocb *tiocb;
232 struct io_event *ep;
233
234 if (!queue->queued)
235 return 0;
236
237 tapdisk_filter_iocbs(queue->filter, queue->iocbs, queue->queued);
238 merged = io_merge(&queue->opioctx, queue->iocbs, queue->queued);
239
240 queue->queued = 0;
241
242 for (i = 0; i < merged; i++) {
243 ep = rwio->aio_events + i;
244 iocb = queue->iocbs[i];
245 ep->obj = iocb;
246 ep->res = tapdisk_rwio_rw(iocb);
247 }
248
249 split = io_split(&queue->opioctx, rwio->aio_events, merged);
250 tapdisk_filter_events(queue->filter, rwio->aio_events, split);
251
252 for (i = split, ep = rwio->aio_events; i-- > 0; ep++) {
253 iocb = ep->obj;
254 tiocb = iocb->data;
255 complete_tiocb(queue, tiocb, ep->res);
256 }
257
258 queue_deferred_tiocbs(queue);
259
260 return split;
261 }
262
263 static const struct tio td_tio_rwio = {
264 .name = "rwio",
265 .data_size = 0,
266 .tio_setup = NULL,
267 .tio_destroy = NULL,
268 .tio_submit = tapdisk_rwio_submit
269 };
270
271 /*
272 * libaio
273 */
274
275 struct lio {
276 io_context_t aio_ctx;
277 struct io_event *aio_events;
278
279 int event_fd;
280 int event_id;
281
282 int flags;
283 };
284
285 #define LIO_FLAG_EVENTFD (1<<0)
286
287 static int
tapdisk_lio_check_resfd(void)288 tapdisk_lio_check_resfd(void)
289 {
290 #if defined(__linux__)
291 return tapdisk_linux_version() >= KERNEL_VERSION(2, 6, 22);
292 #else
293 return 1;
294 #endif
295 }
296
297 static void
tapdisk_lio_destroy_aio(struct tqueue * queue)298 tapdisk_lio_destroy_aio(struct tqueue *queue)
299 {
300 struct lio *lio = queue->tio_data;
301
302 if (lio->event_fd >= 0) {
303 close(lio->event_fd);
304 lio->event_fd = -1;
305 }
306
307 if (lio->aio_ctx) {
308 io_destroy(lio->aio_ctx);
309 lio->aio_ctx = 0;
310 }
311 }
312
313 static int
__lio_setup_aio_poll(struct tqueue * queue,int qlen)314 __lio_setup_aio_poll(struct tqueue *queue, int qlen)
315 {
316 struct lio *lio = queue->tio_data;
317 int err, fd;
318
319 lio->aio_ctx = REQUEST_ASYNC_FD;
320
321 fd = io_setup(qlen, &lio->aio_ctx);
322 if (fd < 0) {
323 lio->aio_ctx = 0;
324 err = -errno;
325
326 if (err == -EINVAL)
327 goto fail_fd;
328
329 goto fail;
330 }
331
332 lio->event_fd = fd;
333
334 return 0;
335
336 fail_fd:
337 DPRINTF("Couldn't get fd for AIO poll support. This is probably "
338 "because your kernel does not have the aio-poll patch "
339 "applied.\n");
340 fail:
341 return err;
342 }
343
344 static int
__lio_setup_aio_eventfd(struct tqueue * queue,int qlen)345 __lio_setup_aio_eventfd(struct tqueue *queue, int qlen)
346 {
347 struct lio *lio = queue->tio_data;
348 int err;
349
350 err = io_setup(qlen, &lio->aio_ctx);
351 if (err < 0) {
352 lio->aio_ctx = 0;
353 return err;
354 }
355
356 lio->event_fd = tapdisk_sys_eventfd(0);
357 if (lio->event_fd < 0)
358 return -errno;
359
360 lio->flags |= LIO_FLAG_EVENTFD;
361
362 return 0;
363 }
364
365 static int
tapdisk_lio_setup_aio(struct tqueue * queue,int qlen)366 tapdisk_lio_setup_aio(struct tqueue *queue, int qlen)
367 {
368 struct lio *lio = queue->tio_data;
369 int err;
370
371 lio->aio_ctx = 0;
372 lio->event_fd = -1;
373
374 /*
375 * prefer the mainline eventfd(2) api, if available.
376 * if not, fall back to the poll fd patch.
377 */
378
379 err = !tapdisk_lio_check_resfd();
380 if (!err)
381 err = __lio_setup_aio_eventfd(queue, qlen);
382 if (err)
383 err = __lio_setup_aio_poll(queue, qlen);
384
385 if (err == -EAGAIN)
386 goto fail_rsv;
387 fail:
388 return err;
389
390 fail_rsv:
391 DPRINTF("Couldn't setup AIO context. If you are trying to "
392 "concurrently use a large number of blktap-based disks, you may "
393 "need to increase the system-wide aio request limit. "
394 "(e.g. 'echo 1048576 > /proc/sys/fs/aio-max-nr')\n");
395 goto fail;
396 }
397
398
399 static void
tapdisk_lio_destroy(struct tqueue * queue)400 tapdisk_lio_destroy(struct tqueue *queue)
401 {
402 struct lio *lio = queue->tio_data;
403
404 if (!lio)
405 return;
406
407 if (lio->event_id >= 0) {
408 tapdisk_server_unregister_event(lio->event_id);
409 lio->event_id = -1;
410 }
411
412 tapdisk_lio_destroy_aio(queue);
413
414 if (lio->aio_events) {
415 free(lio->aio_events);
416 lio->aio_events = NULL;
417 }
418 }
419
420 static void
tapdisk_lio_set_eventfd(struct tqueue * queue,int n,struct iocb ** iocbs)421 tapdisk_lio_set_eventfd(struct tqueue *queue, int n, struct iocb **iocbs)
422 {
423 struct lio *lio = queue->tio_data;
424 int i;
425
426 if (lio->flags & LIO_FLAG_EVENTFD)
427 for (i = 0; i < n; ++i)
428 __io_set_eventfd(iocbs[i], lio->event_fd);
429 }
430
431 static void
tapdisk_lio_ack_event(struct tqueue * queue)432 tapdisk_lio_ack_event(struct tqueue *queue)
433 {
434 struct lio *lio = queue->tio_data;
435 uint64_t val;
436
437 if (lio->flags & LIO_FLAG_EVENTFD)
438 read_exact(lio->event_fd, &val, sizeof(val));
439 }
440
441 static void
tapdisk_lio_event(event_id_t id,char mode,void * private)442 tapdisk_lio_event(event_id_t id, char mode, void *private)
443 {
444 struct tqueue *queue = private;
445 struct lio *lio;
446 int i, ret, split;
447 struct iocb *iocb;
448 struct tiocb *tiocb;
449 struct io_event *ep;
450
451 tapdisk_lio_ack_event(queue);
452
453 lio = queue->tio_data;
454 ret = io_getevents(lio->aio_ctx, 0,
455 queue->size, lio->aio_events, NULL);
456 split = io_split(&queue->opioctx, lio->aio_events, ret);
457 tapdisk_filter_events(queue->filter, lio->aio_events, split);
458
459 DBG("events: %d, tiocbs: %d\n", ret, split);
460
461 queue->iocbs_pending -= ret;
462 queue->tiocbs_pending -= split;
463
464 for (i = split, ep = lio->aio_events; i-- > 0; ep++) {
465 iocb = ep->obj;
466 tiocb = iocb->data;
467 complete_tiocb(queue, tiocb, ep->res);
468 }
469
470 queue_deferred_tiocbs(queue);
471 }
472
473 static int
tapdisk_lio_setup(struct tqueue * queue,int qlen)474 tapdisk_lio_setup(struct tqueue *queue, int qlen)
475 {
476 struct lio *lio = queue->tio_data;
477 size_t sz;
478 int err;
479
480 lio->event_id = -1;
481
482 err = tapdisk_lio_setup_aio(queue, qlen);
483 if (err)
484 goto fail;
485
486 lio->event_id =
487 tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
488 lio->event_fd, 0,
489 tapdisk_lio_event,
490 queue);
491 err = lio->event_id;
492 if (err < 0)
493 goto fail;
494
495 lio->aio_events = calloc(qlen, sizeof(struct io_event));
496 if (!lio->aio_events) {
497 err = -errno;
498 goto fail;
499 }
500
501 return 0;
502
503 fail:
504 tapdisk_lio_destroy(queue);
505 return err;
506 }
507
508 static int
tapdisk_lio_submit(struct tqueue * queue)509 tapdisk_lio_submit(struct tqueue *queue)
510 {
511 struct lio *lio = queue->tio_data;
512 int merged, submitted, err = 0;
513
514 if (!queue->queued)
515 return 0;
516
517 tapdisk_filter_iocbs(queue->filter, queue->iocbs, queue->queued);
518 merged = io_merge(&queue->opioctx, queue->iocbs, queue->queued);
519 tapdisk_lio_set_eventfd(queue, merged, queue->iocbs);
520 submitted = io_submit(lio->aio_ctx, merged, queue->iocbs);
521
522 DBG("queued: %d, merged: %d, submitted: %d\n",
523 queue->queued, merged, submitted);
524
525 if (submitted < 0) {
526 err = submitted;
527 submitted = 0;
528 } else if (submitted < merged)
529 err = -EIO;
530
531 queue->iocbs_pending += submitted;
532 queue->tiocbs_pending += queue->queued;
533 queue->queued = 0;
534
535 if (err)
536 queue->tiocbs_pending -=
537 fail_tiocbs(queue, submitted, merged, err);
538
539 return submitted;
540 }
541
542 static const struct tio td_tio_lio = {
543 .name = "lio",
544 .data_size = sizeof(struct lio),
545 .tio_setup = tapdisk_lio_setup,
546 .tio_destroy = tapdisk_lio_destroy,
547 .tio_submit = tapdisk_lio_submit,
548 };
549
550 static void
tapdisk_queue_free_io(struct tqueue * queue)551 tapdisk_queue_free_io(struct tqueue *queue)
552 {
553 if (queue->tio) {
554 if (queue->tio->tio_destroy)
555 queue->tio->tio_destroy(queue);
556 queue->tio = NULL;
557 }
558
559 if (queue->tio_data) {
560 free(queue->tio_data);
561 queue->tio_data = NULL;
562 }
563 }
564
565 static int
tapdisk_queue_init_io(struct tqueue * queue,int drv)566 tapdisk_queue_init_io(struct tqueue *queue, int drv)
567 {
568 const struct tio *tio;
569 int err;
570
571 switch (drv) {
572 case TIO_DRV_LIO:
573 tio = &td_tio_lio;
574 break;
575 case TIO_DRV_RWIO:
576 tio = &td_tio_rwio;
577 break;
578 default:
579 err = -EINVAL;
580 goto fail;
581 }
582
583 queue->tio_data = calloc(1, tio->data_size);
584 if (!queue->tio_data) {
585 PERROR("malloc(%zu)", tio->data_size);
586 err = -errno;
587 goto fail;
588 }
589
590 queue->tio = tio;
591
592 if (tio->tio_setup) {
593 err = tio->tio_setup(queue, queue->size);
594 if (err)
595 goto fail;
596 }
597
598 DPRINTF("I/O queue driver: %s\n", tio->name);
599
600 return 0;
601
602 fail:
603 tapdisk_queue_free_io(queue);
604 return err;
605 }
606
607 int
tapdisk_init_queue(struct tqueue * queue,int size,int drv,struct tfilter * filter)608 tapdisk_init_queue(struct tqueue *queue, int size,
609 int drv, struct tfilter *filter)
610 {
611 int i, err;
612
613 memset(queue, 0, sizeof(struct tqueue));
614
615 queue->size = size;
616 queue->filter = filter;
617
618 if (!size)
619 return 0;
620
621 err = tapdisk_queue_init_io(queue, drv);
622 if (err)
623 goto fail;
624
625 queue->iocbs = calloc(size, sizeof(struct iocb *));
626 if (!queue->iocbs) {
627 err = -errno;
628 goto fail;
629 }
630
631 err = opio_init(&queue->opioctx, size);
632 if (err)
633 goto fail;
634
635 return 0;
636
637 fail:
638 tapdisk_free_queue(queue);
639 return err;
640 }
641
642 void
tapdisk_free_queue(struct tqueue * queue)643 tapdisk_free_queue(struct tqueue *queue)
644 {
645 tapdisk_queue_free_io(queue);
646
647 free(queue->iocbs);
648 queue->iocbs = NULL;
649
650 opio_free(&queue->opioctx);
651 }
652
653 void
tapdisk_debug_queue(struct tqueue * queue)654 tapdisk_debug_queue(struct tqueue *queue)
655 {
656 struct tiocb *tiocb = queue->deferred.head;
657
658 WARN("TAPDISK QUEUE:\n");
659 WARN("size: %d, tio: %s, queued: %d, iocbs_pending: %d, "
660 "tiocbs_pending: %d, tiocbs_deferred: %d, deferrals: %"PRIx64"\n",
661 queue->size, queue->tio->name, queue->queued, queue->iocbs_pending,
662 queue->tiocbs_pending, queue->tiocbs_deferred, queue->deferrals);
663
664 if (tiocb) {
665 WARN("deferred:\n");
666 for (; tiocb != NULL; tiocb = tiocb->next) {
667 struct iocb *io = &tiocb->iocb;
668 WARN("%s of %lu bytes at %lld\n",
669 (io->aio_lio_opcode == IO_CMD_PWRITE ?
670 "write" : "read"),
671 io->u.c.nbytes, io->u.c.offset);
672 }
673 }
674 }
675
676 void
tapdisk_prep_tiocb(struct tiocb * tiocb,int fd,int rw,char * buf,size_t size,long long offset,td_queue_callback_t cb,void * arg)677 tapdisk_prep_tiocb(struct tiocb *tiocb, int fd, int rw, char *buf, size_t size,
678 long long offset, td_queue_callback_t cb, void *arg)
679 {
680 struct iocb *iocb = &tiocb->iocb;
681
682 if (rw)
683 io_prep_pwrite(iocb, fd, buf, size, offset);
684 else
685 io_prep_pread(iocb, fd, buf, size, offset);
686
687 iocb->data = tiocb;
688 tiocb->cb = cb;
689 tiocb->arg = arg;
690 tiocb->next = NULL;
691 }
692
693 void
tapdisk_queue_tiocb(struct tqueue * queue,struct tiocb * tiocb)694 tapdisk_queue_tiocb(struct tqueue *queue, struct tiocb *tiocb)
695 {
696 if (!tapdisk_queue_full(queue))
697 queue_tiocb(queue, tiocb);
698 else
699 defer_tiocb(queue, tiocb);
700 }
701
702
703 /*
704 * fail_tiocbs may queue more tiocbs
705 */
706 int
tapdisk_submit_tiocbs(struct tqueue * queue)707 tapdisk_submit_tiocbs(struct tqueue *queue)
708 {
709 return queue->tio->tio_submit(queue);
710 }
711
712 int
tapdisk_submit_all_tiocbs(struct tqueue * queue)713 tapdisk_submit_all_tiocbs(struct tqueue *queue)
714 {
715 int submitted = 0;
716
717 do {
718 submitted += tapdisk_submit_tiocbs(queue);
719 } while (!tapdisk_queue_empty(queue));
720
721 return submitted;
722 }
723
724 /*
725 * cancel_tiocbs may queue more tiocbs
726 */
727 int
tapdisk_cancel_tiocbs(struct tqueue * queue)728 tapdisk_cancel_tiocbs(struct tqueue *queue)
729 {
730 return cancel_tiocbs(queue, -EIO);
731 }
732
733 int
tapdisk_cancel_all_tiocbs(struct tqueue * queue)734 tapdisk_cancel_all_tiocbs(struct tqueue *queue)
735 {
736 int cancelled = 0;
737
738 do {
739 cancelled += tapdisk_cancel_tiocbs(queue);
740 } while (!tapdisk_queue_empty(queue));
741
742 return cancelled;
743 }
744