1 /*
2  * Copyright (c) 2008, XenSource Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *     * Redistributions of source code must retain the above copyright
8  *       notice, this list of conditions and the following disclaimer.
9  *     * Redistributions in binary form must reproduce the above copyright
10  *       notice, this list of conditions and the following disclaimer in the
11  *       documentation and/or other materials provided with the distribution.
12  *     * Neither the name of XenSource Inc. nor the names of its contributors
13  *       may be used to endorse or promote products derived from this software
14  *       without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
20  * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
21  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
22  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
23  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
24  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
25  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
26  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28 
29 #include <errno.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32 #include <libaio.h>
33 #ifdef __linux__
34 #include <linux/version.h>
35 #endif
36 
37 #include "tapdisk.h"
38 #include "tapdisk-log.h"
39 #include "tapdisk-queue.h"
40 #include "tapdisk-filter.h"
41 #include "tapdisk-server.h"
42 #include "tapdisk-utils.h"
43 
44 #include "libaio-compat.h"
45 #include "atomicio.h"
46 
47 #define WARN(_f, _a...) tlog_write(TLOG_WARN, _f, ##_a)
48 #define DBG(_f, _a...) tlog_write(TLOG_DBG, _f, ##_a)
49 #define ERR(_err, _f, _a...) tlog_error(_err, _f, ##_a)
50 
51 /*
52  * We used a kernel patch to return an fd associated with the AIO context
53  * so that we can concurrently poll on synchronous and async descriptors.
54  * This is signalled by passing 1 as the io context to io_setup.
55  */
56 #define REQUEST_ASYNC_FD ((io_context_t)1)
57 
58 static inline void
queue_tiocb(struct tqueue * queue,struct tiocb * tiocb)59 queue_tiocb(struct tqueue *queue, struct tiocb *tiocb)
60 {
61 	struct iocb *iocb = &tiocb->iocb;
62 
63 	if (queue->queued) {
64 		struct tiocb *prev = (struct tiocb *)
65 			queue->iocbs[queue->queued - 1]->data;
66 		prev->next = tiocb;
67 	}
68 
69 	queue->iocbs[queue->queued++] = iocb;
70 }
71 
72 static inline int
deferred_tiocbs(struct tqueue * queue)73 deferred_tiocbs(struct tqueue *queue)
74 {
75 	return (queue->deferred.head != NULL);
76 }
77 
78 static inline void
defer_tiocb(struct tqueue * queue,struct tiocb * tiocb)79 defer_tiocb(struct tqueue *queue, struct tiocb *tiocb)
80 {
81 	struct tlist *list = &queue->deferred;
82 
83 	if (!list->head)
84 		list->head = list->tail = tiocb;
85 	else
86 		list->tail = list->tail->next = tiocb;
87 
88 	queue->tiocbs_deferred++;
89 	queue->deferrals++;
90 }
91 
92 static inline void
queue_deferred_tiocb(struct tqueue * queue)93 queue_deferred_tiocb(struct tqueue *queue)
94 {
95 	struct tlist *list = &queue->deferred;
96 
97 	if (list->head) {
98 		struct tiocb *tiocb = list->head;
99 
100 		list->head = tiocb->next;
101 		if (!list->head)
102 			list->tail = NULL;
103 
104 		queue_tiocb(queue, tiocb);
105 		queue->tiocbs_deferred--;
106 	}
107 }
108 
109 static inline void
queue_deferred_tiocbs(struct tqueue * queue)110 queue_deferred_tiocbs(struct tqueue *queue)
111 {
112 	while (!tapdisk_queue_full(queue) && deferred_tiocbs(queue))
113 		queue_deferred_tiocb(queue);
114 }
115 
116 /*
117  * td_complete may queue more tiocbs
118  */
119 static void
complete_tiocb(struct tqueue * queue,struct tiocb * tiocb,unsigned long res)120 complete_tiocb(struct tqueue *queue, struct tiocb *tiocb, unsigned long res)
121 {
122 	int err;
123 	struct iocb *iocb = &tiocb->iocb;
124 
125 	if (res == iocb->u.c.nbytes)
126 		err = 0;
127 	else if ((int)res < 0)
128 		err = (int)res;
129 	else
130 		err = -EIO;
131 
132 	tiocb->cb(tiocb->arg, tiocb, err);
133 }
134 
135 static int
cancel_tiocbs(struct tqueue * queue,int err)136 cancel_tiocbs(struct tqueue *queue, int err)
137 {
138 	int queued;
139 	struct tiocb *tiocb;
140 
141 	if (!queue->queued)
142 		return 0;
143 
144 	/*
145 	 * td_complete may queue more tiocbs, which
146 	 * will overwrite the contents of queue->iocbs.
147 	 * use a private linked list to keep track
148 	 * of the tiocbs we're cancelling.
149 	 */
150 	tiocb  = queue->iocbs[0]->data;
151 	queued = queue->queued;
152 	queue->queued = 0;
153 
154 	for (; tiocb != NULL; tiocb = tiocb->next)
155 		complete_tiocb(queue, tiocb, err);
156 
157 	return queued;
158 }
159 
160 static int
fail_tiocbs(struct tqueue * queue,int succeeded,int total,int err)161 fail_tiocbs(struct tqueue *queue, int succeeded, int total, int err)
162 {
163 	ERR(err, "io_submit error: %d of %d failed",
164 	    total - succeeded, total);
165 
166 	/* take any non-submitted, merged iocbs
167 	 * off of the queue, split them, and fail them */
168 	queue->queued = io_expand_iocbs(&queue->opioctx,
169 					queue->iocbs, succeeded, total);
170 
171 	return cancel_tiocbs(queue, err);
172 }
173 
174 /*
175  * rwio
176  */
177 
178 struct rwio {
179 	struct io_event *aio_events;
180 };
181 
182 static void
tapdisk_rwio_destroy(struct tqueue * queue)183 tapdisk_rwio_destroy(struct tqueue *queue)
184 {
185 	struct rwio *rwio = queue->tio_data;
186 
187 	if (rwio->aio_events) {
188 		free(rwio->aio_events);
189 		rwio->aio_events = NULL;
190 	}
191 }
192 
193 static int
tapdisk_rwio_setup(struct tqueue * queue,int size)194 tapdisk_rwio_setup(struct tqueue *queue, int size)
195 {
196 	struct rwio *rwio = queue->tio_data;
197 	int err;
198 
199 	rwio->aio_events = calloc(size, sizeof(struct io_event));
200 	if (!rwio->aio_events)
201 		return -errno;
202 
203 	return 0;
204 }
205 
206 static inline ssize_t
tapdisk_rwio_rw(const struct iocb * iocb)207 tapdisk_rwio_rw(const struct iocb *iocb)
208 {
209 	int fd        = iocb->aio_fildes;
210 	char *buf     = iocb->u.c.buf;
211 	long long off = iocb->u.c.offset;
212 	size_t size   = iocb->u.c.nbytes;
213 	ssize_t (*func)(int, void *, size_t) =
214 		(iocb->aio_lio_opcode == IO_CMD_PWRITE ? vwrite : read);
215 
216 	if (lseek(fd, off, SEEK_SET) == (off_t)-1)
217 		return -errno;
218 
219 	if (atomicio(func, fd, buf, size) != size)
220 		return -errno;
221 
222 	return size;
223 }
224 
225 static int
tapdisk_rwio_submit(struct tqueue * queue)226 tapdisk_rwio_submit(struct tqueue *queue)
227 {
228 	struct rwio *rwio = queue->tio_data;
229 	int i, merged, split;
230 	struct iocb *iocb;
231 	struct tiocb *tiocb;
232 	struct io_event *ep;
233 
234 	if (!queue->queued)
235 		return 0;
236 
237 	tapdisk_filter_iocbs(queue->filter, queue->iocbs, queue->queued);
238 	merged = io_merge(&queue->opioctx, queue->iocbs, queue->queued);
239 
240 	queue->queued = 0;
241 
242 	for (i = 0; i < merged; i++) {
243 		ep      = rwio->aio_events + i;
244 		iocb    = queue->iocbs[i];
245 		ep->obj = iocb;
246 		ep->res = tapdisk_rwio_rw(iocb);
247 	}
248 
249 	split = io_split(&queue->opioctx, rwio->aio_events, merged);
250 	tapdisk_filter_events(queue->filter, rwio->aio_events, split);
251 
252 	for (i = split, ep = rwio->aio_events; i-- > 0; ep++) {
253 		iocb  = ep->obj;
254 		tiocb = iocb->data;
255 		complete_tiocb(queue, tiocb, ep->res);
256 	}
257 
258 	queue_deferred_tiocbs(queue);
259 
260 	return split;
261 }
262 
263 static const struct tio td_tio_rwio = {
264 	.name        = "rwio",
265 	.data_size   = 0,
266 	.tio_setup   = NULL,
267 	.tio_destroy = NULL,
268 	.tio_submit  = tapdisk_rwio_submit
269 };
270 
271 /*
272  * libaio
273  */
274 
275 struct lio {
276 	io_context_t     aio_ctx;
277 	struct io_event *aio_events;
278 
279 	int              event_fd;
280 	int              event_id;
281 
282 	int              flags;
283 };
284 
285 #define LIO_FLAG_EVENTFD        (1<<0)
286 
287 static int
tapdisk_lio_check_resfd(void)288 tapdisk_lio_check_resfd(void)
289 {
290 #if defined(__linux__)
291 	return tapdisk_linux_version() >= KERNEL_VERSION(2, 6, 22);
292 #else
293 	return 1;
294 #endif
295 }
296 
297 static void
tapdisk_lio_destroy_aio(struct tqueue * queue)298 tapdisk_lio_destroy_aio(struct tqueue *queue)
299 {
300 	struct lio *lio = queue->tio_data;
301 
302 	if (lio->event_fd >= 0) {
303 		close(lio->event_fd);
304 		lio->event_fd = -1;
305 	}
306 
307 	if (lio->aio_ctx) {
308 		io_destroy(lio->aio_ctx);
309 		lio->aio_ctx = 0;
310 	}
311 }
312 
313 static int
__lio_setup_aio_poll(struct tqueue * queue,int qlen)314 __lio_setup_aio_poll(struct tqueue *queue, int qlen)
315 {
316 	struct lio *lio = queue->tio_data;
317 	int err, fd;
318 
319 	lio->aio_ctx = REQUEST_ASYNC_FD;
320 
321 	fd = io_setup(qlen, &lio->aio_ctx);
322 	if (fd < 0) {
323 		lio->aio_ctx = 0;
324 		err = -errno;
325 
326 		if (err == -EINVAL)
327 			goto fail_fd;
328 
329 		goto fail;
330 	}
331 
332 	lio->event_fd = fd;
333 
334 	return 0;
335 
336 fail_fd:
337 	DPRINTF("Couldn't get fd for AIO poll support. This is probably "
338 		"because your kernel does not have the aio-poll patch "
339 		"applied.\n");
340 fail:
341 	return err;
342 }
343 
344 static int
__lio_setup_aio_eventfd(struct tqueue * queue,int qlen)345 __lio_setup_aio_eventfd(struct tqueue *queue, int qlen)
346 {
347 	struct lio *lio = queue->tio_data;
348 	int err;
349 
350 	err = io_setup(qlen, &lio->aio_ctx);
351 	if (err < 0) {
352 		lio->aio_ctx = 0;
353 		return err;
354 	}
355 
356 	lio->event_fd = tapdisk_sys_eventfd(0);
357 	if (lio->event_fd < 0)
358 		return  -errno;
359 
360 	lio->flags |= LIO_FLAG_EVENTFD;
361 
362 	return 0;
363 }
364 
365 static int
tapdisk_lio_setup_aio(struct tqueue * queue,int qlen)366 tapdisk_lio_setup_aio(struct tqueue *queue, int qlen)
367 {
368 	struct lio *lio = queue->tio_data;
369 	int err;
370 
371 	lio->aio_ctx  =  0;
372 	lio->event_fd = -1;
373 
374 	/*
375 	 * prefer the mainline eventfd(2) api, if available.
376 	 * if not, fall back to the poll fd patch.
377 	 */
378 
379 	err = !tapdisk_lio_check_resfd();
380 	if (!err)
381 		err = __lio_setup_aio_eventfd(queue, qlen);
382 	if (err)
383 		err = __lio_setup_aio_poll(queue, qlen);
384 
385 	if (err == -EAGAIN)
386 		goto fail_rsv;
387 fail:
388 	return err;
389 
390 fail_rsv:
391 	DPRINTF("Couldn't setup AIO context. If you are trying to "
392 		"concurrently use a large number of blktap-based disks, you may "
393 		"need to increase the system-wide aio request limit. "
394 		"(e.g. 'echo 1048576 > /proc/sys/fs/aio-max-nr')\n");
395 	goto fail;
396 }
397 
398 
399 static void
tapdisk_lio_destroy(struct tqueue * queue)400 tapdisk_lio_destroy(struct tqueue *queue)
401 {
402 	struct lio *lio = queue->tio_data;
403 
404 	if (!lio)
405 		return;
406 
407 	if (lio->event_id >= 0) {
408 		tapdisk_server_unregister_event(lio->event_id);
409 		lio->event_id = -1;
410 	}
411 
412 	tapdisk_lio_destroy_aio(queue);
413 
414 	if (lio->aio_events) {
415 		free(lio->aio_events);
416 		lio->aio_events = NULL;
417 	}
418 }
419 
420 static void
tapdisk_lio_set_eventfd(struct tqueue * queue,int n,struct iocb ** iocbs)421 tapdisk_lio_set_eventfd(struct tqueue *queue, int n, struct iocb **iocbs)
422 {
423 	struct lio *lio = queue->tio_data;
424 	int i;
425 
426 	if (lio->flags & LIO_FLAG_EVENTFD)
427 		for (i = 0; i < n; ++i)
428 			__io_set_eventfd(iocbs[i], lio->event_fd);
429 }
430 
431 static void
tapdisk_lio_ack_event(struct tqueue * queue)432 tapdisk_lio_ack_event(struct tqueue *queue)
433 {
434 	struct lio *lio = queue->tio_data;
435 	uint64_t val;
436 
437 	if (lio->flags & LIO_FLAG_EVENTFD)
438 		read_exact(lio->event_fd, &val, sizeof(val));
439 }
440 
441 static void
tapdisk_lio_event(event_id_t id,char mode,void * private)442 tapdisk_lio_event(event_id_t id, char mode, void *private)
443 {
444 	struct tqueue *queue = private;
445 	struct lio *lio;
446 	int i, ret, split;
447 	struct iocb *iocb;
448 	struct tiocb *tiocb;
449 	struct io_event *ep;
450 
451 	tapdisk_lio_ack_event(queue);
452 
453 	lio   = queue->tio_data;
454 	ret   = io_getevents(lio->aio_ctx, 0,
455 			     queue->size, lio->aio_events, NULL);
456 	split = io_split(&queue->opioctx, lio->aio_events, ret);
457 	tapdisk_filter_events(queue->filter, lio->aio_events, split);
458 
459 	DBG("events: %d, tiocbs: %d\n", ret, split);
460 
461 	queue->iocbs_pending  -= ret;
462 	queue->tiocbs_pending -= split;
463 
464 	for (i = split, ep = lio->aio_events; i-- > 0; ep++) {
465 		iocb  = ep->obj;
466 		tiocb = iocb->data;
467 		complete_tiocb(queue, tiocb, ep->res);
468 	}
469 
470 	queue_deferred_tiocbs(queue);
471 }
472 
473 static int
tapdisk_lio_setup(struct tqueue * queue,int qlen)474 tapdisk_lio_setup(struct tqueue *queue, int qlen)
475 {
476 	struct lio *lio = queue->tio_data;
477 	size_t sz;
478 	int err;
479 
480 	lio->event_id = -1;
481 
482 	err = tapdisk_lio_setup_aio(queue, qlen);
483 	if (err)
484 		goto fail;
485 
486 	lio->event_id =
487 		tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
488 					      lio->event_fd, 0,
489 					      tapdisk_lio_event,
490 					      queue);
491 	err = lio->event_id;
492 	if (err < 0)
493 		goto fail;
494 
495 	lio->aio_events = calloc(qlen, sizeof(struct io_event));
496 	if (!lio->aio_events) {
497 		err = -errno;
498 		goto fail;
499 	}
500 
501 	return 0;
502 
503 fail:
504 	tapdisk_lio_destroy(queue);
505 	return err;
506 }
507 
508 static int
tapdisk_lio_submit(struct tqueue * queue)509 tapdisk_lio_submit(struct tqueue *queue)
510 {
511 	struct lio *lio = queue->tio_data;
512 	int merged, submitted, err = 0;
513 
514 	if (!queue->queued)
515 		return 0;
516 
517 	tapdisk_filter_iocbs(queue->filter, queue->iocbs, queue->queued);
518 	merged    = io_merge(&queue->opioctx, queue->iocbs, queue->queued);
519 	tapdisk_lio_set_eventfd(queue, merged, queue->iocbs);
520 	submitted = io_submit(lio->aio_ctx, merged, queue->iocbs);
521 
522 	DBG("queued: %d, merged: %d, submitted: %d\n",
523 	    queue->queued, merged, submitted);
524 
525 	if (submitted < 0) {
526 		err = submitted;
527 		submitted = 0;
528 	} else if (submitted < merged)
529 		err = -EIO;
530 
531 	queue->iocbs_pending  += submitted;
532 	queue->tiocbs_pending += queue->queued;
533 	queue->queued          = 0;
534 
535 	if (err)
536 		queue->tiocbs_pending -=
537 			fail_tiocbs(queue, submitted, merged, err);
538 
539 	return submitted;
540 }
541 
542 static const struct tio td_tio_lio = {
543 	.name        = "lio",
544 	.data_size   = sizeof(struct lio),
545 	.tio_setup   = tapdisk_lio_setup,
546 	.tio_destroy = tapdisk_lio_destroy,
547 	.tio_submit  = tapdisk_lio_submit,
548 };
549 
550 static void
tapdisk_queue_free_io(struct tqueue * queue)551 tapdisk_queue_free_io(struct tqueue *queue)
552 {
553 	if (queue->tio) {
554 		if (queue->tio->tio_destroy)
555 			queue->tio->tio_destroy(queue);
556 		queue->tio = NULL;
557 	}
558 
559 	if (queue->tio_data) {
560 		free(queue->tio_data);
561 		queue->tio_data = NULL;
562 	}
563 }
564 
565 static int
tapdisk_queue_init_io(struct tqueue * queue,int drv)566 tapdisk_queue_init_io(struct tqueue *queue, int drv)
567 {
568 	const struct tio *tio;
569 	int err;
570 
571 	switch (drv) {
572 	case TIO_DRV_LIO:
573 		tio = &td_tio_lio;
574 		break;
575 	case TIO_DRV_RWIO:
576 		tio = &td_tio_rwio;
577 		break;
578 	default:
579 		err = -EINVAL;
580 		goto fail;
581 	}
582 
583 	queue->tio_data = calloc(1, tio->data_size);
584 	if (!queue->tio_data) {
585 		PERROR("malloc(%zu)", tio->data_size);
586 		err = -errno;
587 		goto fail;
588 	}
589 
590 	queue->tio = tio;
591 
592 	if (tio->tio_setup) {
593 		err = tio->tio_setup(queue, queue->size);
594 		if (err)
595 			goto fail;
596 	}
597 
598 	DPRINTF("I/O queue driver: %s\n", tio->name);
599 
600 	return 0;
601 
602 fail:
603 	tapdisk_queue_free_io(queue);
604 	return err;
605 }
606 
607 int
tapdisk_init_queue(struct tqueue * queue,int size,int drv,struct tfilter * filter)608 tapdisk_init_queue(struct tqueue *queue, int size,
609 		   int drv, struct tfilter *filter)
610 {
611 	int i, err;
612 
613 	memset(queue, 0, sizeof(struct tqueue));
614 
615 	queue->size   = size;
616 	queue->filter = filter;
617 
618 	if (!size)
619 		return 0;
620 
621 	err = tapdisk_queue_init_io(queue, drv);
622 	if (err)
623 		goto fail;
624 
625 	queue->iocbs = calloc(size, sizeof(struct iocb *));
626 	if (!queue->iocbs) {
627 		err = -errno;
628 		goto fail;
629 	}
630 
631 	err = opio_init(&queue->opioctx, size);
632 	if (err)
633 		goto fail;
634 
635 	return 0;
636 
637  fail:
638 	tapdisk_free_queue(queue);
639 	return err;
640 }
641 
642 void
tapdisk_free_queue(struct tqueue * queue)643 tapdisk_free_queue(struct tqueue *queue)
644 {
645 	tapdisk_queue_free_io(queue);
646 
647 	free(queue->iocbs);
648 	queue->iocbs = NULL;
649 
650 	opio_free(&queue->opioctx);
651 }
652 
653 void
tapdisk_debug_queue(struct tqueue * queue)654 tapdisk_debug_queue(struct tqueue *queue)
655 {
656 	struct tiocb *tiocb = queue->deferred.head;
657 
658 	WARN("TAPDISK QUEUE:\n");
659 	WARN("size: %d, tio: %s, queued: %d, iocbs_pending: %d, "
660 	     "tiocbs_pending: %d, tiocbs_deferred: %d, deferrals: %"PRIx64"\n",
661 	     queue->size, queue->tio->name, queue->queued, queue->iocbs_pending,
662 	     queue->tiocbs_pending, queue->tiocbs_deferred, queue->deferrals);
663 
664 	if (tiocb) {
665 		WARN("deferred:\n");
666 		for (; tiocb != NULL; tiocb = tiocb->next) {
667 			struct iocb *io = &tiocb->iocb;
668 			WARN("%s of %lu bytes at %lld\n",
669 			     (io->aio_lio_opcode == IO_CMD_PWRITE ?
670 			      "write" : "read"),
671 			     io->u.c.nbytes, io->u.c.offset);
672 		}
673 	}
674 }
675 
676 void
tapdisk_prep_tiocb(struct tiocb * tiocb,int fd,int rw,char * buf,size_t size,long long offset,td_queue_callback_t cb,void * arg)677 tapdisk_prep_tiocb(struct tiocb *tiocb, int fd, int rw, char *buf, size_t size,
678 		   long long offset, td_queue_callback_t cb, void *arg)
679 {
680 	struct iocb *iocb = &tiocb->iocb;
681 
682 	if (rw)
683 		io_prep_pwrite(iocb, fd, buf, size, offset);
684 	else
685 		io_prep_pread(iocb, fd, buf, size, offset);
686 
687 	iocb->data  = tiocb;
688 	tiocb->cb   = cb;
689 	tiocb->arg  = arg;
690 	tiocb->next = NULL;
691 }
692 
693 void
tapdisk_queue_tiocb(struct tqueue * queue,struct tiocb * tiocb)694 tapdisk_queue_tiocb(struct tqueue *queue, struct tiocb *tiocb)
695 {
696 	if (!tapdisk_queue_full(queue))
697 		queue_tiocb(queue, tiocb);
698 	else
699 		defer_tiocb(queue, tiocb);
700 }
701 
702 
703 /*
704  * fail_tiocbs may queue more tiocbs
705  */
706 int
tapdisk_submit_tiocbs(struct tqueue * queue)707 tapdisk_submit_tiocbs(struct tqueue *queue)
708 {
709 	return queue->tio->tio_submit(queue);
710 }
711 
712 int
tapdisk_submit_all_tiocbs(struct tqueue * queue)713 tapdisk_submit_all_tiocbs(struct tqueue *queue)
714 {
715 	int submitted = 0;
716 
717 	do {
718 		submitted += tapdisk_submit_tiocbs(queue);
719 	} while (!tapdisk_queue_empty(queue));
720 
721 	return submitted;
722 }
723 
724 /*
725  * cancel_tiocbs may queue more tiocbs
726  */
727 int
tapdisk_cancel_tiocbs(struct tqueue * queue)728 tapdisk_cancel_tiocbs(struct tqueue *queue)
729 {
730 	return cancel_tiocbs(queue, -EIO);
731 }
732 
733 int
tapdisk_cancel_all_tiocbs(struct tqueue * queue)734 tapdisk_cancel_all_tiocbs(struct tqueue *queue)
735 {
736 	int cancelled = 0;
737 
738 	do {
739 		cancelled += tapdisk_cancel_tiocbs(queue);
740 	} while (!tapdisk_queue_empty(queue));
741 
742 	return cancelled;
743 }
744