1 /*
2 * Copyright (c) 2025 Intel Corporation
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
7 #include <zephyr/kernel.h>
8 #include <zephyr/rtio/rtio.h>
9 #include <zephyr/logging/log.h>
10
11 LOG_MODULE_REGISTER(main, LOG_LEVEL_DBG);
12
13 /* Our producer is a timer (interrupt) but could be a thread as well */
14 struct producer {
15 struct mpsc io_q;
16 };
17
18 /* Our producer function, could be done in a thread or interrupt, here
19 * we are producing cycle count values from a timer interrupt.
20 */
producer_periodic(struct k_timer * timer)21 static void producer_periodic(struct k_timer *timer)
22 {
23 struct producer *p = timer->user_data;
24
25 LOG_INF("producer %p trigger", (void *)p);
26
27 struct mpsc_node *n = mpsc_pop(&p->io_q);
28
29 if (n == NULL) {
30 LOG_WRN("producer overflowed consumers");
31 return;
32 }
33
34 struct rtio_iodev_sqe *iodev_sqe = CONTAINER_OF(n, struct rtio_iodev_sqe, q);
35
36 /* Only accept read/rx requests */
37 if (iodev_sqe->sqe.op != RTIO_OP_RX) {
38 rtio_iodev_sqe_err(iodev_sqe, -EINVAL);
39 return;
40 }
41
42 /* Get the rx buffer with a minimum/maximum size pair to fill with the current time */
43 uint8_t *buf = NULL;
44 uint32_t buf_len = 0;
45 int rc = rtio_sqe_rx_buf(iodev_sqe, sizeof(uint64_t), sizeof(uint64_t), &buf, &buf_len);
46
47 LOG_INF("buf %p, len %u", (void *)buf, buf_len);
48
49 if (rc < 0) {
50 /* buffer wasn't available or too small */
51 rtio_iodev_sqe_err(iodev_sqe, -ENOMEM);
52 return;
53 }
54
55 /* We now have a buffer we can produce data into and then signal back to the consumer */
56 uint64_t *cycle_count = (uint64_t *)buf;
57
58 /* "Produce" a timestamp */
59 *cycle_count = k_cycle_get_64();
60
61 /* Signal read has completed */
62 rtio_iodev_sqe_ok(iodev_sqe, 0);
63 }
64
65 /* Accept incoming commands (e.g. read requests), could come from multiple sources
66 * so the only real safe thing to do here is put it into the lock free queue
67 */
producer_submit(struct rtio_iodev_sqe * iodev_sqe)68 static void producer_submit(struct rtio_iodev_sqe *iodev_sqe)
69 {
70 struct mpsc *producer_ioq = iodev_sqe->sqe.iodev->data;
71
72 mpsc_push(producer_ioq, &iodev_sqe->q);
73 }
74
75 K_TIMER_DEFINE(producer_tmr, producer_periodic, NULL);
76 static struct producer producer_data;
77 const struct rtio_iodev_api producer_api = {.submit = producer_submit};
78
79 /* Setup our i/o device, akin to a file handle we can read from */
80 RTIO_IODEV_DEFINE(producer_iodev, &producer_api, &producer_data);
81
82 /* Setup our pair of queues for our consumer, with 1 submission and 1 completion available */
83 RTIO_DEFINE(rconsumer, 1, 1);
84
consumer_loop(struct rtio * consumer,struct rtio_iodev * producer)85 int consumer_loop(struct rtio *consumer, struct rtio_iodev *producer)
86 {
87 /* We can share memory with kernel space without any work, to share
88 * between usermode threads we'd need a k_mem_partition added to
89 * both domains instead
90 */
91 uint64_t cycle_count;
92
93 /* Our read submission and completion pair */
94 struct rtio_sqe read_sqe;
95 struct rtio_cqe read_cqe;
96 struct rtio_sqe *read_sqe_handle;
97
98 /* Helper that sets up the submission to be a read request, reading *directly*
99 * into the given buffer pointer without copying
100 */
101 rtio_sqe_prep_read(&read_sqe, producer, RTIO_PRIO_NORM, (uint8_t *)&cycle_count,
102 sizeof(cycle_count), NULL);
103
104 /* We can automatically have this read request resubmitted for us */
105 read_sqe.flags |= RTIO_SQE_MULTISHOT;
106
107 /* A syscall to copy the control structure (sqe) into kernel mode, and get a handle out
108 * so we can cancel it later if we want
109 */
110 rtio_sqe_copy_in_get_handles(consumer, &read_sqe, &read_sqe_handle, 1);
111
112 /* A syscall to submit the queued up requests (there could be many) to all iodevs */
113 rtio_submit(consumer, 0);
114
115 /* A consumer loop that waits for read completions in a single syscall
116 *
117 * This never ends but to end the loop we'd cancel the requests to read.
118 *
119 * NOTE: There could be multiple read requests out to multiple producers we could
120 * be waiting on!
121 */
122 while (true) {
123 /* A syscall to consume a completion, waiting forever for it to arrive */
124 rtio_cqe_copy_out(consumer, &read_cqe, 1, K_FOREVER);
125
126 /* The read has been completed, and its safe to read the value until
127 * we attach and submit a request to read into it again
128 */
129 LOG_INF("read result %d, cycle count is %llu", read_cqe.result, cycle_count);
130 }
131
132 return 0;
133 }
134
main(void)135 int main(void)
136 {
137 /* init stuff */
138 mpsc_init(&producer_data.io_q);
139 producer_tmr.user_data = &producer_data;
140
141 /* Start our producer task (timer interrupt based) */
142 k_timer_start(&producer_tmr, K_MSEC(100), K_MSEC(100));
143
144 /* We can enter usermode here with a little work to setup k_objects for the iodev
145 * and struct rtio context
146 * E.g.
147 * k_object_access_grant(&producer, k_current_get());
148 * k_object_access_grant(&consumer, k_current_get());
149 * k_thread_user_mode_enter(consumer_loop, &producer, &consumer, NULL);
150 */
151
152 return consumer_loop(&rconsumer, &producer_iodev);
153 }
154