1 /*
2  * Copyright (c) 2025 Intel Corporation
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 
7 #include <zephyr/kernel.h>
8 #include <zephyr/rtio/rtio.h>
9 #include <zephyr/logging/log.h>
10 
11 LOG_MODULE_REGISTER(main, LOG_LEVEL_DBG);
12 
13 /* Our producer is a timer (interrupt) but could be a thread as well */
14 struct producer {
15 	struct mpsc io_q;
16 };
17 
18 /* Our producer function, could be done in a thread or interrupt, here
19  * we are producing cycle count values from a timer interrupt.
20  */
producer_periodic(struct k_timer * timer)21 static void producer_periodic(struct k_timer *timer)
22 {
23 	struct producer *p = timer->user_data;
24 
25 	LOG_INF("producer %p trigger", (void *)p);
26 
27 	struct mpsc_node *n = mpsc_pop(&p->io_q);
28 
29 	if (n == NULL) {
30 		LOG_WRN("producer overflowed consumers");
31 		return;
32 	}
33 
34 	struct rtio_iodev_sqe *iodev_sqe = CONTAINER_OF(n, struct rtio_iodev_sqe, q);
35 
36 	/* Only accept read/rx requests */
37 	if (iodev_sqe->sqe.op != RTIO_OP_RX) {
38 		rtio_iodev_sqe_err(iodev_sqe, -EINVAL);
39 		return;
40 	}
41 
42 	/* Get the rx buffer with a minimum/maximum size pair to fill with the current time */
43 	uint8_t *buf = NULL;
44 	uint32_t buf_len = 0;
45 	int rc = rtio_sqe_rx_buf(iodev_sqe, sizeof(uint64_t), sizeof(uint64_t), &buf, &buf_len);
46 
47 	LOG_INF("buf %p, len %u", (void *)buf, buf_len);
48 
49 	if (rc < 0) {
50 		/* buffer wasn't available or too small */
51 		rtio_iodev_sqe_err(iodev_sqe, -ENOMEM);
52 		return;
53 	}
54 
55 	/* We now have a buffer we can produce data into and then signal back to the consumer */
56 	uint64_t *cycle_count = (uint64_t *)buf;
57 
58 	/* "Produce" a timestamp */
59 	*cycle_count = k_cycle_get_64();
60 
61 	/* Signal read has completed  */
62 	rtio_iodev_sqe_ok(iodev_sqe, 0);
63 }
64 
65 /* Accept incoming commands (e.g. read requests), could come from multiple sources
66  * so the only real safe thing to do here is put it into the lock free queue
67  */
producer_submit(struct rtio_iodev_sqe * iodev_sqe)68 static void producer_submit(struct rtio_iodev_sqe *iodev_sqe)
69 {
70 	struct mpsc *producer_ioq = iodev_sqe->sqe.iodev->data;
71 
72 	mpsc_push(producer_ioq, &iodev_sqe->q);
73 }
74 
75 K_TIMER_DEFINE(producer_tmr, producer_periodic, NULL);
76 static struct producer producer_data;
77 const struct rtio_iodev_api producer_api = {.submit = producer_submit};
78 
79 /* Setup our i/o device, akin to a file handle we can read from */
80 RTIO_IODEV_DEFINE(producer_iodev, &producer_api, &producer_data);
81 
82 /* Setup our pair of queues for our consumer, with 1 submission and 1 completion available */
83 RTIO_DEFINE(rconsumer, 1, 1);
84 
consumer_loop(struct rtio * consumer,struct rtio_iodev * producer)85 int consumer_loop(struct rtio *consumer, struct rtio_iodev *producer)
86 {
87 	/* We can share memory with kernel space without any work, to share
88 	 * between usermode threads we'd need a k_mem_partition added to
89 	 * both domains instead
90 	 */
91 	uint64_t cycle_count;
92 
93 	/* Our read submission and completion pair */
94 	struct rtio_sqe read_sqe;
95 	struct rtio_cqe read_cqe;
96 	struct rtio_sqe *read_sqe_handle;
97 
98 	/* Helper that sets up the submission to be a read request, reading *directly*
99 	 * into the given buffer pointer without copying
100 	 */
101 	rtio_sqe_prep_read(&read_sqe, producer, RTIO_PRIO_NORM, (uint8_t *)&cycle_count,
102 			   sizeof(cycle_count), NULL);
103 
104 	/* We can automatically have this read request resubmitted for us */
105 	read_sqe.flags |= RTIO_SQE_MULTISHOT;
106 
107 	/* A syscall to copy the control structure (sqe) into kernel mode, and get a handle out
108 	 * so we can cancel it later if we want
109 	 */
110 	rtio_sqe_copy_in_get_handles(consumer, &read_sqe, &read_sqe_handle, 1);
111 
112 	/* A syscall to submit the queued up requests (there could be many) to all iodevs */
113 	rtio_submit(consumer, 0);
114 
115 	/* A consumer loop that waits for read completions in a single syscall
116 	 *
117 	 * This never ends but to end the loop we'd cancel the requests to read.
118 	 *
119 	 * NOTE: There could be multiple read requests out to multiple producers we could
120 	 * be waiting on!
121 	 */
122 	while (true) {
123 		/* A syscall to consume a completion, waiting forever for it to arrive */
124 		rtio_cqe_copy_out(consumer, &read_cqe, 1, K_FOREVER);
125 
126 		/* The read has been completed, and its safe to read the value until
127 		 * we attach and submit a request to read into it again
128 		 */
129 		LOG_INF("read result %d, cycle count is %llu", read_cqe.result, cycle_count);
130 	}
131 
132 	return 0;
133 }
134 
main(void)135 int main(void)
136 {
137 	/* init stuff */
138 	mpsc_init(&producer_data.io_q);
139 	producer_tmr.user_data = &producer_data;
140 
141 	/* Start our producer task (timer interrupt based) */
142 	k_timer_start(&producer_tmr, K_MSEC(100), K_MSEC(100));
143 
144 	/* We can enter usermode here with a little work to setup k_objects for the iodev
145 	 * and struct rtio context
146 	 * E.g.
147 	 * k_object_access_grant(&producer, k_current_get());
148 	 * k_object_access_grant(&consumer, k_current_get());
149 	 * k_thread_user_mode_enter(consumer_loop, &producer, &consumer, NULL);
150 	 */
151 
152 	return consumer_loop(&rconsumer, &producer_iodev);
153 }
154