1 /*
2  * Copyright (c) 2024 Måns Ansgariusson <mansgariusson@gmail.com>
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 #include <zephyr/init.h>
7 #include <zephyr/kernel.h>
8 #include <zephyr/internal/syscall_handler.h>
9 #include <ksched.h>
10 #include <kthread.h>
11 #include <wait_q.h>
12 
13 #ifdef CONFIG_OBJ_CORE_PIPE
14 static struct k_obj_type obj_type_pipe;
15 #endif /* CONFIG_OBJ_CORE_PIPE */
16 
pipe_closed(struct k_pipe * pipe)17 static inline bool pipe_closed(struct k_pipe *pipe)
18 {
19 	return (pipe->flags & PIPE_FLAG_OPEN) == 0;
20 }
21 
pipe_resetting(struct k_pipe * pipe)22 static inline bool pipe_resetting(struct k_pipe *pipe)
23 {
24 	return (pipe->flags & PIPE_FLAG_RESET) != 0;
25 }
26 
pipe_full(struct k_pipe * pipe)27 static inline bool pipe_full(struct k_pipe *pipe)
28 {
29 	return ring_buf_space_get(&pipe->buf) == 0;
30 }
31 
pipe_empty(struct k_pipe * pipe)32 static inline bool pipe_empty(struct k_pipe *pipe)
33 {
34 	return ring_buf_is_empty(&pipe->buf);
35 }
36 
wait_for(_wait_q_t * waitq,struct k_pipe * pipe,k_spinlock_key_t * key,k_timepoint_t time_limit,bool * need_resched)37 static int wait_for(_wait_q_t *waitq, struct k_pipe *pipe, k_spinlock_key_t *key,
38 		    k_timepoint_t time_limit, bool *need_resched)
39 {
40 	k_timeout_t timeout = sys_timepoint_timeout(time_limit);
41 	int rc;
42 
43 	if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
44 		return -EAGAIN;
45 	}
46 
47 	pipe->waiting++;
48 	*need_resched = false;
49 	if (waitq == &pipe->space) {
50 		SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_pipe, write, pipe, timeout);
51 	} else {
52 		SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_pipe, read, pipe, timeout);
53 	}
54 	rc = z_pend_curr(&pipe->lock, *key, waitq, timeout);
55 	*key = k_spin_lock(&pipe->lock);
56 	pipe->waiting--;
57 	if (unlikely(pipe_resetting(pipe))) {
58 		if (pipe->waiting == 0) {
59 			pipe->flags &= ~PIPE_FLAG_RESET;
60 		}
61 		rc = -ECANCELED;
62 	}
63 
64 	return rc;
65 }
66 
z_impl_k_pipe_init(struct k_pipe * pipe,uint8_t * buffer,size_t buffer_size)67 void z_impl_k_pipe_init(struct k_pipe *pipe, uint8_t *buffer, size_t buffer_size)
68 {
69 	ring_buf_init(&pipe->buf, buffer_size, buffer);
70 	pipe->flags = PIPE_FLAG_OPEN;
71 	pipe->waiting = 0;
72 
73 	pipe->lock = (struct k_spinlock){};
74 	z_waitq_init(&pipe->data);
75 	z_waitq_init(&pipe->space);
76 	k_object_init(pipe);
77 
78 #ifdef CONFIG_POLL
79 	sys_dlist_init(&pipe->poll_events);
80 #endif /* CONFIG_POLL */
81 #ifdef CONFIG_OBJ_CORE_PIPE
82 	k_obj_core_init_and_link(K_OBJ_CORE(pipe), &obj_type_pipe);
83 #endif /* CONFIG_OBJ_CORE_PIPE */
84 	SYS_PORT_TRACING_OBJ_INIT(k_pipe, pipe, buffer, buffer_size);
85 }
86 
87 struct pipe_buf_spec {
88 	uint8_t * const data;
89 	const size_t len;
90 	size_t used;
91 };
92 
copy_to_pending_readers(struct k_pipe * pipe,bool * need_resched,const uint8_t * data,size_t len)93 static size_t copy_to_pending_readers(struct k_pipe *pipe, bool *need_resched,
94 				      const uint8_t *data, size_t len)
95 {
96 	struct k_thread *reader = NULL;
97 	struct pipe_buf_spec *reader_buf;
98 	size_t copy_size, written = 0;
99 
100 	/*
101 	 * Attempt a direct data copy to waiting readers if any.
102 	 * The copy has to be done under the scheduler lock to ensure all the
103 	 * needed data is copied to the target thread whose buffer spec lives
104 	 * on that thread's stack, and then the thread unpended only if it
105 	 * received all the data it wanted, without racing with a potential
106 	 * thread timeout/cancellation event.
107 	 */
108 	do {
109 		LOCK_SCHED_SPINLOCK {
110 			reader = _priq_wait_best(&pipe->data.waitq);
111 			if (reader == NULL) {
112 				K_SPINLOCK_BREAK;
113 			}
114 
115 			reader_buf = reader->base.swap_data;
116 			copy_size = MIN(len - written,
117 					reader_buf->len - reader_buf->used);
118 			memcpy(&reader_buf->data[reader_buf->used],
119 			       &data[written], copy_size);
120 			written += copy_size;
121 			reader_buf->used += copy_size;
122 
123 			if (reader_buf->used < reader_buf->len) {
124 				/* This reader wants more: don't unpend. */
125 				reader = NULL;
126 			} else {
127 				/*
128 				 * This reader has received all the data
129 				 * it was waiting for: wake it up with
130 				 * the scheduler lock still held.
131 				 */
132 				unpend_thread_no_timeout(reader);
133 				z_abort_thread_timeout(reader);
134 			}
135 		}
136 		if (reader != NULL) {
137 			/* rest of thread wake-up outside the scheduler lock */
138 			z_thread_return_value_set_with_data(reader, 0, NULL);
139 			z_ready_thread(reader);
140 			*need_resched = true;
141 		}
142 	} while (reader != NULL && written < len);
143 
144 	return written;
145 }
146 
z_impl_k_pipe_write(struct k_pipe * pipe,const uint8_t * data,size_t len,k_timeout_t timeout)147 int z_impl_k_pipe_write(struct k_pipe *pipe, const uint8_t *data, size_t len, k_timeout_t timeout)
148 {
149 	int rc;
150 	size_t written = 0;
151 	k_timepoint_t end = sys_timepoint_calc(timeout);
152 	k_spinlock_key_t key = k_spin_lock(&pipe->lock);
153 	bool need_resched = false;
154 
155 	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, write, pipe, data, len, timeout);
156 
157 	if (unlikely(pipe_resetting(pipe))) {
158 		rc = -ECANCELED;
159 		goto exit;
160 	}
161 
162 	for (;;) {
163 		if (unlikely(pipe_closed(pipe))) {
164 			rc = -EPIPE;
165 			break;
166 		}
167 
168 		if (pipe_empty(pipe)) {
169 			if (IS_ENABLED(CONFIG_KERNEL_COHERENCE)) {
170 				/*
171 				 * Systems that enabled this option don't have
172 				 * their stacks in coherent memory. Given our
173 				 * pipe_buf_spec is stored on the stack, and
174 				 * readers may also have their destination
175 				 * buffer on their stack too, it is not worth
176 				 * supporting direct-to-readers copy with them.
177 				 * Simply wake up all pending readers instead.
178 				 */
179 				need_resched = z_sched_wake_all(&pipe->data, 0, NULL);
180 			} else if (pipe->waiting != 0) {
181 				written += copy_to_pending_readers(pipe, &need_resched,
182 								   &data[written],
183 								   len - written);
184 				if (written >= len) {
185 					rc = written;
186 					break;
187 				}
188 			}
189 		}
190 
191 #ifdef CONFIG_POLL
192 		need_resched |= z_handle_obj_poll_events(&pipe->poll_events,
193 							 K_POLL_STATE_PIPE_DATA_AVAILABLE);
194 #endif /* CONFIG_POLL */
195 
196 		written += ring_buf_put(&pipe->buf, &data[written], len - written);
197 		if (likely(written == len)) {
198 			rc = written;
199 			break;
200 		}
201 
202 		rc = wait_for(&pipe->space, pipe, &key, end, &need_resched);
203 		if (rc != 0) {
204 			if (rc == -EAGAIN) {
205 				rc = written ? written : -EAGAIN;
206 			}
207 			break;
208 		}
209 	}
210 exit:
211 	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, write, pipe, rc);
212 	if (need_resched) {
213 		z_reschedule(&pipe->lock, key);
214 	} else {
215 		k_spin_unlock(&pipe->lock, key);
216 	}
217 	return rc;
218 }
219 
z_impl_k_pipe_read(struct k_pipe * pipe,uint8_t * data,size_t len,k_timeout_t timeout)220 int z_impl_k_pipe_read(struct k_pipe *pipe, uint8_t *data, size_t len, k_timeout_t timeout)
221 {
222 	struct pipe_buf_spec buf = { data, len, 0 };
223 	int rc;
224 	k_timepoint_t end = sys_timepoint_calc(timeout);
225 	k_spinlock_key_t key = k_spin_lock(&pipe->lock);
226 	bool need_resched = false;
227 
228 	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, read, pipe, data, len, timeout);
229 
230 	if (unlikely(pipe_resetting(pipe))) {
231 		rc = -ECANCELED;
232 		goto exit;
233 	}
234 
235 	for (;;) {
236 		if (pipe_full(pipe)) {
237 			/* One or more pending writers may exist. */
238 			need_resched = z_sched_wake_all(&pipe->space, 0, NULL);
239 		}
240 
241 		buf.used += ring_buf_get(&pipe->buf, &data[buf.used], len - buf.used);
242 		if (likely(buf.used == len)) {
243 			rc = buf.used;
244 			break;
245 		}
246 
247 		if (unlikely(pipe_closed(pipe))) {
248 			rc = buf.used ? buf.used : -EPIPE;
249 			break;
250 		}
251 
252 		/* provide our "direct copy" info to potential writers */
253 		_current->base.swap_data = &buf;
254 
255 		rc = wait_for(&pipe->data, pipe, &key, end, &need_resched);
256 		if (rc != 0) {
257 			if (rc == -EAGAIN) {
258 				rc = buf.used ? buf.used : -EAGAIN;
259 			}
260 			break;
261 		}
262 	}
263 exit:
264 	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, read, pipe, rc);
265 	if (need_resched) {
266 		z_reschedule(&pipe->lock, key);
267 	} else {
268 		k_spin_unlock(&pipe->lock, key);
269 	}
270 	return rc;
271 }
272 
z_impl_k_pipe_reset(struct k_pipe * pipe)273 void z_impl_k_pipe_reset(struct k_pipe *pipe)
274 {
275 	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, reset, pipe);
276 	K_SPINLOCK(&pipe->lock) {
277 		ring_buf_reset(&pipe->buf);
278 		if (likely(pipe->waiting != 0)) {
279 			pipe->flags |= PIPE_FLAG_RESET;
280 			z_sched_wake_all(&pipe->data, 0, NULL);
281 			z_sched_wake_all(&pipe->space, 0, NULL);
282 		}
283 	}
284 	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, reset, pipe);
285 }
286 
z_impl_k_pipe_close(struct k_pipe * pipe)287 void z_impl_k_pipe_close(struct k_pipe *pipe)
288 {
289 	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, close, pipe);
290 	K_SPINLOCK(&pipe->lock) {
291 		pipe->flags = 0;
292 		z_sched_wake_all(&pipe->data, 0, NULL);
293 		z_sched_wake_all(&pipe->space, 0, NULL);
294 	}
295 	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, close, pipe);
296 }
297 
298 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_init(struct k_pipe * pipe,uint8_t * buffer,size_t buffer_size)299 void z_vrfy_k_pipe_init(struct k_pipe *pipe, uint8_t *buffer, size_t buffer_size)
300 {
301 	K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
302 	K_OOPS(K_SYSCALL_MEMORY_WRITE(buffer, buffer_size));
303 
304 	z_impl_k_pipe_init(pipe, buffer, buffer_size);
305 }
306 #include <zephyr/syscalls/k_pipe_init_mrsh.c>
307 
z_vrfy_k_pipe_read(struct k_pipe * pipe,uint8_t * data,size_t len,k_timeout_t timeout)308 int z_vrfy_k_pipe_read(struct k_pipe *pipe, uint8_t *data, size_t len, k_timeout_t timeout)
309 {
310 	K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
311 	K_OOPS(K_SYSCALL_MEMORY_WRITE(data, len));
312 
313 	return z_impl_k_pipe_read(pipe, data, len, timeout);
314 }
315 #include <zephyr/syscalls/k_pipe_read_mrsh.c>
316 
z_vrfy_k_pipe_write(struct k_pipe * pipe,const uint8_t * data,size_t len,k_timeout_t timeout)317 int z_vrfy_k_pipe_write(struct k_pipe *pipe, const uint8_t *data, size_t len, k_timeout_t timeout)
318 {
319 	K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
320 	K_OOPS(K_SYSCALL_MEMORY_READ(data, len));
321 
322 	return z_impl_k_pipe_write(pipe, data, len, timeout);
323 }
324 #include <zephyr/syscalls/k_pipe_write_mrsh.c>
325 
z_vrfy_k_pipe_reset(struct k_pipe * pipe)326 void z_vrfy_k_pipe_reset(struct k_pipe *pipe)
327 {
328 	K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
329 	z_impl_k_pipe_reset(pipe);
330 }
331 #include <zephyr/syscalls/k_pipe_reset_mrsh.c>
332 
z_vrfy_k_pipe_close(struct k_pipe * pipe)333 void z_vrfy_k_pipe_close(struct k_pipe *pipe)
334 {
335 	K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
336 	z_impl_k_pipe_close(pipe);
337 }
338 #include <zephyr/syscalls/k_pipe_close_mrsh.c>
339 #endif /* CONFIG_USERSPACE */
340 
341 #ifdef CONFIG_OBJ_CORE_PIPE
init_pipe_obj_core_list(void)342 static int init_pipe_obj_core_list(void)
343 {
344 	/* Initialize pipe object type */
345 	z_obj_type_init(&obj_type_pipe, K_OBJ_TYPE_PIPE_ID,
346 			offsetof(struct k_pipe, obj_core));
347 
348 	/* Initialize and link statically defined pipes */
349 	STRUCT_SECTION_FOREACH(k_pipe, pipe) {
350 		k_obj_core_init_and_link(K_OBJ_CORE(pipe), &obj_type_pipe);
351 	}
352 
353 	return 0;
354 }
355 
356 SYS_INIT(init_pipe_obj_core_list, PRE_KERNEL_1,
357 	 CONFIG_KERNEL_INIT_PRIORITY_OBJECTS);
358 #endif /* CONFIG_OBJ_CORE_PIPE */
359