Lines Matching refs:queue
59 queue_tiocb(struct tqueue *queue, struct tiocb *tiocb) in queue_tiocb() argument
63 if (queue->queued) { in queue_tiocb()
65 queue->iocbs[queue->queued - 1]->data; in queue_tiocb()
69 queue->iocbs[queue->queued++] = iocb; in queue_tiocb()
73 deferred_tiocbs(struct tqueue *queue) in deferred_tiocbs() argument
75 return (queue->deferred.head != NULL); in deferred_tiocbs()
79 defer_tiocb(struct tqueue *queue, struct tiocb *tiocb) in defer_tiocb() argument
81 struct tlist *list = &queue->deferred; in defer_tiocb()
88 queue->tiocbs_deferred++; in defer_tiocb()
89 queue->deferrals++; in defer_tiocb()
93 queue_deferred_tiocb(struct tqueue *queue) in queue_deferred_tiocb() argument
95 struct tlist *list = &queue->deferred; in queue_deferred_tiocb()
104 queue_tiocb(queue, tiocb); in queue_deferred_tiocb()
105 queue->tiocbs_deferred--; in queue_deferred_tiocb()
110 queue_deferred_tiocbs(struct tqueue *queue) in queue_deferred_tiocbs() argument
112 while (!tapdisk_queue_full(queue) && deferred_tiocbs(queue)) in queue_deferred_tiocbs()
113 queue_deferred_tiocb(queue); in queue_deferred_tiocbs()
120 complete_tiocb(struct tqueue *queue, struct tiocb *tiocb, unsigned long res) in complete_tiocb() argument
136 cancel_tiocbs(struct tqueue *queue, int err) in cancel_tiocbs() argument
141 if (!queue->queued) in cancel_tiocbs()
150 tiocb = queue->iocbs[0]->data; in cancel_tiocbs()
151 queued = queue->queued; in cancel_tiocbs()
152 queue->queued = 0; in cancel_tiocbs()
155 complete_tiocb(queue, tiocb, err); in cancel_tiocbs()
161 fail_tiocbs(struct tqueue *queue, int succeeded, int total, int err) in fail_tiocbs() argument
168 queue->queued = io_expand_iocbs(&queue->opioctx, in fail_tiocbs()
169 queue->iocbs, succeeded, total); in fail_tiocbs()
171 return cancel_tiocbs(queue, err); in fail_tiocbs()
183 tapdisk_rwio_destroy(struct tqueue *queue) in tapdisk_rwio_destroy() argument
185 struct rwio *rwio = queue->tio_data; in tapdisk_rwio_destroy()
194 tapdisk_rwio_setup(struct tqueue *queue, int size) in tapdisk_rwio_setup() argument
196 struct rwio *rwio = queue->tio_data; in tapdisk_rwio_setup()
226 tapdisk_rwio_submit(struct tqueue *queue) in tapdisk_rwio_submit() argument
228 struct rwio *rwio = queue->tio_data; in tapdisk_rwio_submit()
234 if (!queue->queued) in tapdisk_rwio_submit()
237 tapdisk_filter_iocbs(queue->filter, queue->iocbs, queue->queued); in tapdisk_rwio_submit()
238 merged = io_merge(&queue->opioctx, queue->iocbs, queue->queued); in tapdisk_rwio_submit()
240 queue->queued = 0; in tapdisk_rwio_submit()
244 iocb = queue->iocbs[i]; in tapdisk_rwio_submit()
249 split = io_split(&queue->opioctx, rwio->aio_events, merged); in tapdisk_rwio_submit()
250 tapdisk_filter_events(queue->filter, rwio->aio_events, split); in tapdisk_rwio_submit()
255 complete_tiocb(queue, tiocb, ep->res); in tapdisk_rwio_submit()
258 queue_deferred_tiocbs(queue); in tapdisk_rwio_submit()
298 tapdisk_lio_destroy_aio(struct tqueue *queue) in tapdisk_lio_destroy_aio() argument
300 struct lio *lio = queue->tio_data; in tapdisk_lio_destroy_aio()
314 __lio_setup_aio_poll(struct tqueue *queue, int qlen) in __lio_setup_aio_poll() argument
316 struct lio *lio = queue->tio_data; in __lio_setup_aio_poll()
345 __lio_setup_aio_eventfd(struct tqueue *queue, int qlen) in __lio_setup_aio_eventfd() argument
347 struct lio *lio = queue->tio_data; in __lio_setup_aio_eventfd()
366 tapdisk_lio_setup_aio(struct tqueue *queue, int qlen) in tapdisk_lio_setup_aio() argument
368 struct lio *lio = queue->tio_data; in tapdisk_lio_setup_aio()
381 err = __lio_setup_aio_eventfd(queue, qlen); in tapdisk_lio_setup_aio()
383 err = __lio_setup_aio_poll(queue, qlen); in tapdisk_lio_setup_aio()
400 tapdisk_lio_destroy(struct tqueue *queue) in tapdisk_lio_destroy() argument
402 struct lio *lio = queue->tio_data; in tapdisk_lio_destroy()
412 tapdisk_lio_destroy_aio(queue); in tapdisk_lio_destroy()
421 tapdisk_lio_set_eventfd(struct tqueue *queue, int n, struct iocb **iocbs) in tapdisk_lio_set_eventfd() argument
423 struct lio *lio = queue->tio_data; in tapdisk_lio_set_eventfd()
432 tapdisk_lio_ack_event(struct tqueue *queue) in tapdisk_lio_ack_event() argument
434 struct lio *lio = queue->tio_data; in tapdisk_lio_ack_event()
444 struct tqueue *queue = private; in tapdisk_lio_event() local
451 tapdisk_lio_ack_event(queue); in tapdisk_lio_event()
453 lio = queue->tio_data; in tapdisk_lio_event()
455 queue->size, lio->aio_events, NULL); in tapdisk_lio_event()
456 split = io_split(&queue->opioctx, lio->aio_events, ret); in tapdisk_lio_event()
457 tapdisk_filter_events(queue->filter, lio->aio_events, split); in tapdisk_lio_event()
461 queue->iocbs_pending -= ret; in tapdisk_lio_event()
462 queue->tiocbs_pending -= split; in tapdisk_lio_event()
467 complete_tiocb(queue, tiocb, ep->res); in tapdisk_lio_event()
470 queue_deferred_tiocbs(queue); in tapdisk_lio_event()
474 tapdisk_lio_setup(struct tqueue *queue, int qlen) in tapdisk_lio_setup() argument
476 struct lio *lio = queue->tio_data; in tapdisk_lio_setup()
482 err = tapdisk_lio_setup_aio(queue, qlen); in tapdisk_lio_setup()
490 queue); in tapdisk_lio_setup()
504 tapdisk_lio_destroy(queue); in tapdisk_lio_setup()
509 tapdisk_lio_submit(struct tqueue *queue) in tapdisk_lio_submit() argument
511 struct lio *lio = queue->tio_data; in tapdisk_lio_submit()
514 if (!queue->queued) in tapdisk_lio_submit()
517 tapdisk_filter_iocbs(queue->filter, queue->iocbs, queue->queued); in tapdisk_lio_submit()
518 merged = io_merge(&queue->opioctx, queue->iocbs, queue->queued); in tapdisk_lio_submit()
519 tapdisk_lio_set_eventfd(queue, merged, queue->iocbs); in tapdisk_lio_submit()
520 submitted = io_submit(lio->aio_ctx, merged, queue->iocbs); in tapdisk_lio_submit()
523 queue->queued, merged, submitted); in tapdisk_lio_submit()
531 queue->iocbs_pending += submitted; in tapdisk_lio_submit()
532 queue->tiocbs_pending += queue->queued; in tapdisk_lio_submit()
533 queue->queued = 0; in tapdisk_lio_submit()
536 queue->tiocbs_pending -= in tapdisk_lio_submit()
537 fail_tiocbs(queue, submitted, merged, err); in tapdisk_lio_submit()
551 tapdisk_queue_free_io(struct tqueue *queue) in tapdisk_queue_free_io() argument
553 if (queue->tio) { in tapdisk_queue_free_io()
554 if (queue->tio->tio_destroy) in tapdisk_queue_free_io()
555 queue->tio->tio_destroy(queue); in tapdisk_queue_free_io()
556 queue->tio = NULL; in tapdisk_queue_free_io()
559 if (queue->tio_data) { in tapdisk_queue_free_io()
560 free(queue->tio_data); in tapdisk_queue_free_io()
561 queue->tio_data = NULL; in tapdisk_queue_free_io()
566 tapdisk_queue_init_io(struct tqueue *queue, int drv) in tapdisk_queue_init_io() argument
583 queue->tio_data = calloc(1, tio->data_size); in tapdisk_queue_init_io()
584 if (!queue->tio_data) { in tapdisk_queue_init_io()
590 queue->tio = tio; in tapdisk_queue_init_io()
593 err = tio->tio_setup(queue, queue->size); in tapdisk_queue_init_io()
603 tapdisk_queue_free_io(queue); in tapdisk_queue_init_io()
608 tapdisk_init_queue(struct tqueue *queue, int size, in tapdisk_init_queue() argument
613 memset(queue, 0, sizeof(struct tqueue)); in tapdisk_init_queue()
615 queue->size = size; in tapdisk_init_queue()
616 queue->filter = filter; in tapdisk_init_queue()
621 err = tapdisk_queue_init_io(queue, drv); in tapdisk_init_queue()
625 queue->iocbs = calloc(size, sizeof(struct iocb *)); in tapdisk_init_queue()
626 if (!queue->iocbs) { in tapdisk_init_queue()
631 err = opio_init(&queue->opioctx, size); in tapdisk_init_queue()
638 tapdisk_free_queue(queue); in tapdisk_init_queue()
643 tapdisk_free_queue(struct tqueue *queue) in tapdisk_free_queue() argument
645 tapdisk_queue_free_io(queue); in tapdisk_free_queue()
647 free(queue->iocbs); in tapdisk_free_queue()
648 queue->iocbs = NULL; in tapdisk_free_queue()
650 opio_free(&queue->opioctx); in tapdisk_free_queue()
654 tapdisk_debug_queue(struct tqueue *queue) in tapdisk_debug_queue() argument
656 struct tiocb *tiocb = queue->deferred.head; in tapdisk_debug_queue()
661 queue->size, queue->tio->name, queue->queued, queue->iocbs_pending, in tapdisk_debug_queue()
662 queue->tiocbs_pending, queue->tiocbs_deferred, queue->deferrals); in tapdisk_debug_queue()
694 tapdisk_queue_tiocb(struct tqueue *queue, struct tiocb *tiocb) in tapdisk_queue_tiocb() argument
696 if (!tapdisk_queue_full(queue)) in tapdisk_queue_tiocb()
697 queue_tiocb(queue, tiocb); in tapdisk_queue_tiocb()
699 defer_tiocb(queue, tiocb); in tapdisk_queue_tiocb()
707 tapdisk_submit_tiocbs(struct tqueue *queue) in tapdisk_submit_tiocbs() argument
709 return queue->tio->tio_submit(queue); in tapdisk_submit_tiocbs()
713 tapdisk_submit_all_tiocbs(struct tqueue *queue) in tapdisk_submit_all_tiocbs() argument
718 submitted += tapdisk_submit_tiocbs(queue); in tapdisk_submit_all_tiocbs()
719 } while (!tapdisk_queue_empty(queue)); in tapdisk_submit_all_tiocbs()
728 tapdisk_cancel_tiocbs(struct tqueue *queue) in tapdisk_cancel_tiocbs() argument
730 return cancel_tiocbs(queue, -EIO); in tapdisk_cancel_tiocbs()
734 tapdisk_cancel_all_tiocbs(struct tqueue *queue) in tapdisk_cancel_all_tiocbs() argument
739 cancelled += tapdisk_cancel_tiocbs(queue); in tapdisk_cancel_all_tiocbs()
740 } while (!tapdisk_queue_empty(queue)); in tapdisk_cancel_all_tiocbs()