1 /*
2 * Copyright (c) 2024 Måns Ansgariusson <mansgariusson@gmail.com>
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6 #include <zephyr/init.h>
7 #include <zephyr/kernel.h>
8 #include <zephyr/internal/syscall_handler.h>
9 #include <ksched.h>
10 #include <kthread.h>
11 #include <wait_q.h>
12
13 #ifdef CONFIG_OBJ_CORE_PIPE
14 static struct k_obj_type obj_type_pipe;
15 #endif /* CONFIG_OBJ_CORE_PIPE */
16
pipe_closed(struct k_pipe * pipe)17 static inline bool pipe_closed(struct k_pipe *pipe)
18 {
19 return (pipe->flags & PIPE_FLAG_OPEN) == 0;
20 }
21
pipe_resetting(struct k_pipe * pipe)22 static inline bool pipe_resetting(struct k_pipe *pipe)
23 {
24 return (pipe->flags & PIPE_FLAG_RESET) != 0;
25 }
26
pipe_full(struct k_pipe * pipe)27 static inline bool pipe_full(struct k_pipe *pipe)
28 {
29 return ring_buf_space_get(&pipe->buf) == 0;
30 }
31
pipe_empty(struct k_pipe * pipe)32 static inline bool pipe_empty(struct k_pipe *pipe)
33 {
34 return ring_buf_is_empty(&pipe->buf);
35 }
36
wait_for(_wait_q_t * waitq,struct k_pipe * pipe,k_spinlock_key_t * key,k_timepoint_t time_limit,bool * need_resched)37 static int wait_for(_wait_q_t *waitq, struct k_pipe *pipe, k_spinlock_key_t *key,
38 k_timepoint_t time_limit, bool *need_resched)
39 {
40 k_timeout_t timeout = sys_timepoint_timeout(time_limit);
41 int rc;
42
43 if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
44 return -EAGAIN;
45 }
46
47 pipe->waiting++;
48 *need_resched = false;
49 if (waitq == &pipe->space) {
50 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_pipe, write, pipe, timeout);
51 } else {
52 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_pipe, read, pipe, timeout);
53 }
54 rc = z_pend_curr(&pipe->lock, *key, waitq, timeout);
55 *key = k_spin_lock(&pipe->lock);
56 pipe->waiting--;
57 if (unlikely(pipe_resetting(pipe))) {
58 if (pipe->waiting == 0) {
59 pipe->flags &= ~PIPE_FLAG_RESET;
60 }
61 rc = -ECANCELED;
62 }
63
64 return rc;
65 }
66
z_impl_k_pipe_init(struct k_pipe * pipe,uint8_t * buffer,size_t buffer_size)67 void z_impl_k_pipe_init(struct k_pipe *pipe, uint8_t *buffer, size_t buffer_size)
68 {
69 ring_buf_init(&pipe->buf, buffer_size, buffer);
70 pipe->flags = PIPE_FLAG_OPEN;
71 pipe->waiting = 0;
72
73 pipe->lock = (struct k_spinlock){};
74 z_waitq_init(&pipe->data);
75 z_waitq_init(&pipe->space);
76 k_object_init(pipe);
77
78 #ifdef CONFIG_POLL
79 sys_dlist_init(&pipe->poll_events);
80 #endif /* CONFIG_POLL */
81 #ifdef CONFIG_OBJ_CORE_PIPE
82 k_obj_core_init_and_link(K_OBJ_CORE(pipe), &obj_type_pipe);
83 #endif /* CONFIG_OBJ_CORE_PIPE */
84 SYS_PORT_TRACING_OBJ_INIT(k_pipe, pipe, buffer, buffer_size);
85 }
86
87 struct pipe_buf_spec {
88 uint8_t * const data;
89 const size_t len;
90 size_t used;
91 };
92
copy_to_pending_readers(struct k_pipe * pipe,bool * need_resched,const uint8_t * data,size_t len)93 static size_t copy_to_pending_readers(struct k_pipe *pipe, bool *need_resched,
94 const uint8_t *data, size_t len)
95 {
96 struct k_thread *reader = NULL;
97 struct pipe_buf_spec *reader_buf;
98 size_t copy_size, written = 0;
99
100 /*
101 * Attempt a direct data copy to waiting readers if any.
102 * The copy has to be done under the scheduler lock to ensure all the
103 * needed data is copied to the target thread whose buffer spec lives
104 * on that thread's stack, and then the thread unpended only if it
105 * received all the data it wanted, without racing with a potential
106 * thread timeout/cancellation event.
107 */
108 do {
109 LOCK_SCHED_SPINLOCK {
110 reader = _priq_wait_best(&pipe->data.waitq);
111 if (reader == NULL) {
112 K_SPINLOCK_BREAK;
113 }
114
115 reader_buf = reader->base.swap_data;
116 copy_size = MIN(len - written,
117 reader_buf->len - reader_buf->used);
118 memcpy(&reader_buf->data[reader_buf->used],
119 &data[written], copy_size);
120 written += copy_size;
121 reader_buf->used += copy_size;
122
123 if (reader_buf->used < reader_buf->len) {
124 /* This reader wants more: don't unpend. */
125 reader = NULL;
126 } else {
127 /*
128 * This reader has received all the data
129 * it was waiting for: wake it up with
130 * the scheduler lock still held.
131 */
132 unpend_thread_no_timeout(reader);
133 z_abort_thread_timeout(reader);
134 }
135 }
136 if (reader != NULL) {
137 /* rest of thread wake-up outside the scheduler lock */
138 z_thread_return_value_set_with_data(reader, 0, NULL);
139 z_ready_thread(reader);
140 *need_resched = true;
141 }
142 } while (reader != NULL && written < len);
143
144 return written;
145 }
146
z_impl_k_pipe_write(struct k_pipe * pipe,const uint8_t * data,size_t len,k_timeout_t timeout)147 int z_impl_k_pipe_write(struct k_pipe *pipe, const uint8_t *data, size_t len, k_timeout_t timeout)
148 {
149 int rc;
150 size_t written = 0;
151 k_timepoint_t end = sys_timepoint_calc(timeout);
152 k_spinlock_key_t key = k_spin_lock(&pipe->lock);
153 bool need_resched = false;
154
155 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, write, pipe, data, len, timeout);
156
157 if (unlikely(pipe_resetting(pipe))) {
158 rc = -ECANCELED;
159 goto exit;
160 }
161
162 for (;;) {
163 if (unlikely(pipe_closed(pipe))) {
164 rc = -EPIPE;
165 break;
166 }
167
168 if (pipe_empty(pipe)) {
169 if (IS_ENABLED(CONFIG_KERNEL_COHERENCE)) {
170 /*
171 * Systems that enabled this option don't have
172 * their stacks in coherent memory. Given our
173 * pipe_buf_spec is stored on the stack, and
174 * readers may also have their destination
175 * buffer on their stack too, it is not worth
176 * supporting direct-to-readers copy with them.
177 * Simply wake up all pending readers instead.
178 */
179 need_resched = z_sched_wake_all(&pipe->data, 0, NULL);
180 } else if (pipe->waiting != 0) {
181 written += copy_to_pending_readers(pipe, &need_resched,
182 &data[written],
183 len - written);
184 if (written >= len) {
185 rc = written;
186 break;
187 }
188 }
189 }
190
191 #ifdef CONFIG_POLL
192 need_resched |= z_handle_obj_poll_events(&pipe->poll_events,
193 K_POLL_STATE_PIPE_DATA_AVAILABLE);
194 #endif /* CONFIG_POLL */
195
196 written += ring_buf_put(&pipe->buf, &data[written], len - written);
197 if (likely(written == len)) {
198 rc = written;
199 break;
200 }
201
202 rc = wait_for(&pipe->space, pipe, &key, end, &need_resched);
203 if (rc != 0) {
204 if (rc == -EAGAIN) {
205 rc = written ? written : -EAGAIN;
206 }
207 break;
208 }
209 }
210 exit:
211 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, write, pipe, rc);
212 if (need_resched) {
213 z_reschedule(&pipe->lock, key);
214 } else {
215 k_spin_unlock(&pipe->lock, key);
216 }
217 return rc;
218 }
219
z_impl_k_pipe_read(struct k_pipe * pipe,uint8_t * data,size_t len,k_timeout_t timeout)220 int z_impl_k_pipe_read(struct k_pipe *pipe, uint8_t *data, size_t len, k_timeout_t timeout)
221 {
222 struct pipe_buf_spec buf = { data, len, 0 };
223 int rc;
224 k_timepoint_t end = sys_timepoint_calc(timeout);
225 k_spinlock_key_t key = k_spin_lock(&pipe->lock);
226 bool need_resched = false;
227
228 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, read, pipe, data, len, timeout);
229
230 if (unlikely(pipe_resetting(pipe))) {
231 rc = -ECANCELED;
232 goto exit;
233 }
234
235 for (;;) {
236 if (pipe_full(pipe)) {
237 /* One or more pending writers may exist. */
238 need_resched = z_sched_wake_all(&pipe->space, 0, NULL);
239 }
240
241 buf.used += ring_buf_get(&pipe->buf, &data[buf.used], len - buf.used);
242 if (likely(buf.used == len)) {
243 rc = buf.used;
244 break;
245 }
246
247 if (unlikely(pipe_closed(pipe))) {
248 rc = buf.used ? buf.used : -EPIPE;
249 break;
250 }
251
252 /* provide our "direct copy" info to potential writers */
253 _current->base.swap_data = &buf;
254
255 rc = wait_for(&pipe->data, pipe, &key, end, &need_resched);
256 if (rc != 0) {
257 if (rc == -EAGAIN) {
258 rc = buf.used ? buf.used : -EAGAIN;
259 }
260 break;
261 }
262 }
263 exit:
264 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, read, pipe, rc);
265 if (need_resched) {
266 z_reschedule(&pipe->lock, key);
267 } else {
268 k_spin_unlock(&pipe->lock, key);
269 }
270 return rc;
271 }
272
z_impl_k_pipe_reset(struct k_pipe * pipe)273 void z_impl_k_pipe_reset(struct k_pipe *pipe)
274 {
275 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, reset, pipe);
276 K_SPINLOCK(&pipe->lock) {
277 ring_buf_reset(&pipe->buf);
278 if (likely(pipe->waiting != 0)) {
279 pipe->flags |= PIPE_FLAG_RESET;
280 z_sched_wake_all(&pipe->data, 0, NULL);
281 z_sched_wake_all(&pipe->space, 0, NULL);
282 }
283 }
284 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, reset, pipe);
285 }
286
z_impl_k_pipe_close(struct k_pipe * pipe)287 void z_impl_k_pipe_close(struct k_pipe *pipe)
288 {
289 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, close, pipe);
290 K_SPINLOCK(&pipe->lock) {
291 pipe->flags = 0;
292 z_sched_wake_all(&pipe->data, 0, NULL);
293 z_sched_wake_all(&pipe->space, 0, NULL);
294 }
295 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, close, pipe);
296 }
297
298 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_init(struct k_pipe * pipe,uint8_t * buffer,size_t buffer_size)299 void z_vrfy_k_pipe_init(struct k_pipe *pipe, uint8_t *buffer, size_t buffer_size)
300 {
301 K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
302 K_OOPS(K_SYSCALL_MEMORY_WRITE(buffer, buffer_size));
303
304 z_impl_k_pipe_init(pipe, buffer, buffer_size);
305 }
306 #include <zephyr/syscalls/k_pipe_init_mrsh.c>
307
z_vrfy_k_pipe_read(struct k_pipe * pipe,uint8_t * data,size_t len,k_timeout_t timeout)308 int z_vrfy_k_pipe_read(struct k_pipe *pipe, uint8_t *data, size_t len, k_timeout_t timeout)
309 {
310 K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
311 K_OOPS(K_SYSCALL_MEMORY_WRITE(data, len));
312
313 return z_impl_k_pipe_read(pipe, data, len, timeout);
314 }
315 #include <zephyr/syscalls/k_pipe_read_mrsh.c>
316
z_vrfy_k_pipe_write(struct k_pipe * pipe,const uint8_t * data,size_t len,k_timeout_t timeout)317 int z_vrfy_k_pipe_write(struct k_pipe *pipe, const uint8_t *data, size_t len, k_timeout_t timeout)
318 {
319 K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
320 K_OOPS(K_SYSCALL_MEMORY_READ(data, len));
321
322 return z_impl_k_pipe_write(pipe, data, len, timeout);
323 }
324 #include <zephyr/syscalls/k_pipe_write_mrsh.c>
325
z_vrfy_k_pipe_reset(struct k_pipe * pipe)326 void z_vrfy_k_pipe_reset(struct k_pipe *pipe)
327 {
328 K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
329 z_impl_k_pipe_reset(pipe);
330 }
331 #include <zephyr/syscalls/k_pipe_reset_mrsh.c>
332
z_vrfy_k_pipe_close(struct k_pipe * pipe)333 void z_vrfy_k_pipe_close(struct k_pipe *pipe)
334 {
335 K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
336 z_impl_k_pipe_close(pipe);
337 }
338 #include <zephyr/syscalls/k_pipe_close_mrsh.c>
339 #endif /* CONFIG_USERSPACE */
340
341 #ifdef CONFIG_OBJ_CORE_PIPE
init_pipe_obj_core_list(void)342 static int init_pipe_obj_core_list(void)
343 {
344 /* Initialize pipe object type */
345 z_obj_type_init(&obj_type_pipe, K_OBJ_TYPE_PIPE_ID,
346 offsetof(struct k_pipe, obj_core));
347
348 /* Initialize and link statically defined pipes */
349 STRUCT_SECTION_FOREACH(k_pipe, pipe) {
350 k_obj_core_init_and_link(K_OBJ_CORE(pipe), &obj_type_pipe);
351 }
352
353 return 0;
354 }
355
356 SYS_INIT(init_pipe_obj_core_list, PRE_KERNEL_1,
357 CONFIG_KERNEL_INIT_PRIORITY_OBJECTS);
358 #endif /* CONFIG_OBJ_CORE_PIPE */
359