1 /*
2 * Copyright (c) 2016 Wind River Systems, Inc.
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
7 /**
8 * @file
9 * @brief Message queues.
10 */
11
12
13 #include <zephyr/kernel.h>
14 #include <zephyr/kernel_structs.h>
15
16 #include <zephyr/toolchain.h>
17 #include <zephyr/linker/sections.h>
18 #include <string.h>
19 #include <ksched.h>
20 #include <wait_q.h>
21 #include <zephyr/sys/dlist.h>
22 #include <zephyr/sys/math_extras.h>
23 #include <zephyr/init.h>
24 #include <zephyr/internal/syscall_handler.h>
25 #include <kernel_internal.h>
26 #include <zephyr/sys/check.h>
27
28 #ifdef CONFIG_OBJ_CORE_MSGQ
29 static struct k_obj_type obj_type_msgq;
30 #endif /* CONFIG_OBJ_CORE_MSGQ */
31
handle_poll_events(struct k_msgq * msgq)32 static inline bool handle_poll_events(struct k_msgq *msgq)
33 {
34 #ifdef CONFIG_POLL
35 return z_handle_obj_poll_events(&msgq->poll_events,
36 K_POLL_STATE_MSGQ_DATA_AVAILABLE);
37 #else
38 ARG_UNUSED(msgq);
39 return false;
40 #endif /* CONFIG_POLL */
41 }
42
k_msgq_init(struct k_msgq * msgq,char * buffer,size_t msg_size,uint32_t max_msgs)43 void k_msgq_init(struct k_msgq *msgq, char *buffer, size_t msg_size,
44 uint32_t max_msgs)
45 {
46 msgq->msg_size = msg_size;
47 msgq->max_msgs = max_msgs;
48 msgq->buffer_start = buffer;
49 msgq->buffer_end = buffer + (max_msgs * msg_size);
50 msgq->read_ptr = buffer;
51 msgq->write_ptr = buffer;
52 msgq->used_msgs = 0;
53 msgq->flags = 0;
54 z_waitq_init(&msgq->wait_q);
55 msgq->lock = (struct k_spinlock) {};
56 #ifdef CONFIG_POLL
57 sys_dlist_init(&msgq->poll_events);
58 #endif /* CONFIG_POLL */
59
60 #ifdef CONFIG_OBJ_CORE_MSGQ
61 k_obj_core_init_and_link(K_OBJ_CORE(msgq), &obj_type_msgq);
62 #endif /* CONFIG_OBJ_CORE_MSGQ */
63
64 SYS_PORT_TRACING_OBJ_INIT(k_msgq, msgq);
65
66 k_object_init(msgq);
67 }
68
z_impl_k_msgq_alloc_init(struct k_msgq * msgq,size_t msg_size,uint32_t max_msgs)69 int z_impl_k_msgq_alloc_init(struct k_msgq *msgq, size_t msg_size,
70 uint32_t max_msgs)
71 {
72 void *buffer;
73 int ret;
74 size_t total_size;
75
76 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, alloc_init, msgq);
77
78 if (size_mul_overflow(msg_size, max_msgs, &total_size)) {
79 ret = -EINVAL;
80 } else {
81 buffer = z_thread_malloc(total_size);
82 if (buffer != NULL) {
83 k_msgq_init(msgq, buffer, msg_size, max_msgs);
84 msgq->flags = K_MSGQ_FLAG_ALLOC;
85 ret = 0;
86 } else {
87 ret = -ENOMEM;
88 }
89 }
90
91 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, alloc_init, msgq, ret);
92
93 return ret;
94 }
95
96 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_alloc_init(struct k_msgq * msgq,size_t msg_size,uint32_t max_msgs)97 int z_vrfy_k_msgq_alloc_init(struct k_msgq *msgq, size_t msg_size,
98 uint32_t max_msgs)
99 {
100 K_OOPS(K_SYSCALL_OBJ_NEVER_INIT(msgq, K_OBJ_MSGQ));
101
102 return z_impl_k_msgq_alloc_init(msgq, msg_size, max_msgs);
103 }
104 #include <zephyr/syscalls/k_msgq_alloc_init_mrsh.c>
105 #endif /* CONFIG_USERSPACE */
106
k_msgq_cleanup(struct k_msgq * msgq)107 int k_msgq_cleanup(struct k_msgq *msgq)
108 {
109 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, cleanup, msgq);
110
111 CHECKIF(z_waitq_head(&msgq->wait_q) != NULL) {
112 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, cleanup, msgq, -EBUSY);
113
114 return -EBUSY;
115 }
116
117 if ((msgq->flags & K_MSGQ_FLAG_ALLOC) != 0U) {
118 k_free(msgq->buffer_start);
119 msgq->flags &= ~K_MSGQ_FLAG_ALLOC;
120 }
121
122 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, cleanup, msgq, 0);
123
124 return 0;
125 }
126
put_msg_in_queue(struct k_msgq * msgq,const void * data,k_timeout_t timeout,bool put_at_back)127 static inline int put_msg_in_queue(struct k_msgq *msgq, const void *data,
128 k_timeout_t timeout, bool put_at_back)
129 {
130 __ASSERT(!arch_is_in_isr() || K_TIMEOUT_EQ(timeout, K_NO_WAIT), "");
131
132 struct k_thread *pending_thread;
133 k_spinlock_key_t key;
134 int result;
135 bool resched = false;
136
137 key = k_spin_lock(&msgq->lock);
138
139 if (put_at_back) {
140 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, put, msgq, timeout);
141 } else {
142 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, put_front, msgq, timeout);
143 }
144
145 if (msgq->used_msgs < msgq->max_msgs) {
146 /* message queue isn't full */
147 pending_thread = z_unpend_first_thread(&msgq->wait_q);
148 if (unlikely(pending_thread != NULL)) {
149 resched = true;
150
151 /* give message to waiting thread */
152 (void)memcpy(pending_thread->base.swap_data, data, msgq->msg_size);
153 /* wake up waiting thread */
154 arch_thread_return_value_set(pending_thread, 0);
155 z_ready_thread(pending_thread);
156 } else {
157 __ASSERT_NO_MSG(msgq->write_ptr >= msgq->buffer_start &&
158 msgq->write_ptr < msgq->buffer_end);
159 if (put_at_back) {
160 /*
161 * to write a message to the back of the queue,
162 * copy the message and increment write_ptr
163 */
164 (void)memcpy(msgq->write_ptr, (char *)data, msgq->msg_size);
165 msgq->write_ptr += msgq->msg_size;
166 if (msgq->write_ptr == msgq->buffer_end) {
167 msgq->write_ptr = msgq->buffer_start;
168 }
169 } else {
170 /*
171 * to write a message to the head of the queue,
172 * first decrement the read pointer (to open
173 * space at the front of the queue) then copy
174 * the message to the newly created space.
175 */
176 if (msgq->read_ptr == msgq->buffer_start) {
177 msgq->read_ptr = msgq->buffer_end;
178 }
179 msgq->read_ptr -= msgq->msg_size;
180 (void)memcpy(msgq->read_ptr, (char *)data, msgq->msg_size);
181 }
182 msgq->used_msgs++;
183 resched = handle_poll_events(msgq);
184 }
185 result = 0;
186 } else if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
187 /* don't wait for message space to become available */
188 result = -ENOMSG;
189 } else {
190 if (put_at_back) {
191 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, put, msgq, timeout);
192 } else {
193 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, put_front, msgq, timeout);
194 }
195
196 /* wait for put message success, failure, or timeout */
197 _current->base.swap_data = (void *) data;
198
199 result = z_pend_curr(&msgq->lock, key, &msgq->wait_q, timeout);
200
201 if (put_at_back) {
202 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, result);
203 } else {
204 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put_front, msgq, timeout, result);
205 }
206
207 return result;
208 }
209
210 if (put_at_back) {
211 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, result);
212 } else {
213 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put_front, msgq, timeout, result);
214 }
215
216 if (resched) {
217 z_reschedule(&msgq->lock, key);
218 } else {
219 k_spin_unlock(&msgq->lock, key);
220 }
221
222 return result;
223 }
224
225
z_impl_k_msgq_put(struct k_msgq * msgq,const void * data,k_timeout_t timeout)226 int z_impl_k_msgq_put(struct k_msgq *msgq, const void *data, k_timeout_t timeout)
227 {
228 return put_msg_in_queue(msgq, data, timeout, true);
229 }
230
z_impl_k_msgq_put_front(struct k_msgq * msgq,const void * data,k_timeout_t timeout)231 int z_impl_k_msgq_put_front(struct k_msgq *msgq, const void *data, k_timeout_t timeout)
232 {
233 return put_msg_in_queue(msgq, data, timeout, false);
234 }
235
236 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_put(struct k_msgq * msgq,const void * data,k_timeout_t timeout)237 static inline int z_vrfy_k_msgq_put(struct k_msgq *msgq, const void *data,
238 k_timeout_t timeout)
239 {
240 K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
241 K_OOPS(K_SYSCALL_MEMORY_READ(data, msgq->msg_size));
242
243 return z_impl_k_msgq_put(msgq, data, timeout);
244 }
245 #include <zephyr/syscalls/k_msgq_put_mrsh.c>
246
z_vrfy_k_msgq_put_front(struct k_msgq * msgq,const void * data,k_timeout_t timeout)247 static inline int z_vrfy_k_msgq_put_front(struct k_msgq *msgq, const void *data,
248 k_timeout_t timeout)
249 {
250 K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
251 K_OOPS(K_SYSCALL_MEMORY_READ(data, msgq->msg_size));
252
253 return z_impl_k_msgq_put_front(msgq, data, timeout);
254 }
255 #include <zephyr/syscalls/k_msgq_put_front_mrsh.c>
256 #endif /* CONFIG_USERSPACE */
257
z_impl_k_msgq_get_attrs(struct k_msgq * msgq,struct k_msgq_attrs * attrs)258 void z_impl_k_msgq_get_attrs(struct k_msgq *msgq, struct k_msgq_attrs *attrs)
259 {
260 attrs->msg_size = msgq->msg_size;
261 attrs->max_msgs = msgq->max_msgs;
262 attrs->used_msgs = msgq->used_msgs;
263 }
264
265 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_get_attrs(struct k_msgq * msgq,struct k_msgq_attrs * attrs)266 static inline void z_vrfy_k_msgq_get_attrs(struct k_msgq *msgq,
267 struct k_msgq_attrs *attrs)
268 {
269 K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
270 K_OOPS(K_SYSCALL_MEMORY_WRITE(attrs, sizeof(struct k_msgq_attrs)));
271 z_impl_k_msgq_get_attrs(msgq, attrs);
272 }
273 #include <zephyr/syscalls/k_msgq_get_attrs_mrsh.c>
274 #endif /* CONFIG_USERSPACE */
275
z_impl_k_msgq_get(struct k_msgq * msgq,void * data,k_timeout_t timeout)276 int z_impl_k_msgq_get(struct k_msgq *msgq, void *data, k_timeout_t timeout)
277 {
278 __ASSERT(!arch_is_in_isr() || K_TIMEOUT_EQ(timeout, K_NO_WAIT), "");
279
280 k_spinlock_key_t key;
281 struct k_thread *pending_thread;
282 int result;
283 bool resched = false;
284
285 key = k_spin_lock(&msgq->lock);
286
287 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, get, msgq, timeout);
288
289 if (msgq->used_msgs > 0U) {
290 /* take first available message from queue */
291 (void)memcpy((char *)data, msgq->read_ptr, msgq->msg_size);
292 msgq->read_ptr += msgq->msg_size;
293 if (msgq->read_ptr == msgq->buffer_end) {
294 msgq->read_ptr = msgq->buffer_start;
295 }
296 msgq->used_msgs--;
297
298 /* handle first thread waiting to write (if any) */
299 pending_thread = z_unpend_first_thread(&msgq->wait_q);
300 if (unlikely(pending_thread != NULL)) {
301 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, get, msgq, timeout);
302
303 /* add thread's message to queue */
304 __ASSERT_NO_MSG(msgq->write_ptr >= msgq->buffer_start &&
305 msgq->write_ptr < msgq->buffer_end);
306 (void)memcpy(msgq->write_ptr, (char *)pending_thread->base.swap_data,
307 msgq->msg_size);
308 msgq->write_ptr += msgq->msg_size;
309 if (msgq->write_ptr == msgq->buffer_end) {
310 msgq->write_ptr = msgq->buffer_start;
311 }
312 msgq->used_msgs++;
313
314 /* wake up waiting thread */
315 arch_thread_return_value_set(pending_thread, 0);
316 z_ready_thread(pending_thread);
317 resched = true;
318 }
319 result = 0;
320 } else if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
321 /* don't wait for a message to become available */
322 result = -ENOMSG;
323 } else {
324 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, get, msgq, timeout);
325
326 /* wait for get message success or timeout */
327 _current->base.swap_data = data;
328
329 result = z_pend_curr(&msgq->lock, key, &msgq->wait_q, timeout);
330 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, get, msgq, timeout, result);
331 return result;
332 }
333
334 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, get, msgq, timeout, result);
335
336 if (resched) {
337 z_reschedule(&msgq->lock, key);
338 } else {
339 k_spin_unlock(&msgq->lock, key);
340 }
341
342 return result;
343 }
344
345 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_get(struct k_msgq * msgq,void * data,k_timeout_t timeout)346 static inline int z_vrfy_k_msgq_get(struct k_msgq *msgq, void *data,
347 k_timeout_t timeout)
348 {
349 K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
350 K_OOPS(K_SYSCALL_MEMORY_WRITE(data, msgq->msg_size));
351
352 return z_impl_k_msgq_get(msgq, data, timeout);
353 }
354 #include <zephyr/syscalls/k_msgq_get_mrsh.c>
355 #endif /* CONFIG_USERSPACE */
356
z_impl_k_msgq_peek(struct k_msgq * msgq,void * data)357 int z_impl_k_msgq_peek(struct k_msgq *msgq, void *data)
358 {
359 k_spinlock_key_t key;
360 int result;
361
362 key = k_spin_lock(&msgq->lock);
363
364 if (msgq->used_msgs > 0U) {
365 /* take first available message from queue */
366 (void)memcpy((char *)data, msgq->read_ptr, msgq->msg_size);
367 result = 0;
368 } else {
369 /* don't wait for a message to become available */
370 result = -ENOMSG;
371 }
372
373 SYS_PORT_TRACING_OBJ_FUNC(k_msgq, peek, msgq, result);
374
375 k_spin_unlock(&msgq->lock, key);
376
377 return result;
378 }
379
380 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_peek(struct k_msgq * msgq,void * data)381 static inline int z_vrfy_k_msgq_peek(struct k_msgq *msgq, void *data)
382 {
383 K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
384 K_OOPS(K_SYSCALL_MEMORY_WRITE(data, msgq->msg_size));
385
386 return z_impl_k_msgq_peek(msgq, data);
387 }
388 #include <zephyr/syscalls/k_msgq_peek_mrsh.c>
389 #endif /* CONFIG_USERSPACE */
390
z_impl_k_msgq_peek_at(struct k_msgq * msgq,void * data,uint32_t idx)391 int z_impl_k_msgq_peek_at(struct k_msgq *msgq, void *data, uint32_t idx)
392 {
393 k_spinlock_key_t key;
394 int result;
395 uint32_t bytes_to_end;
396 uint32_t byte_offset;
397 char *start_addr;
398
399 key = k_spin_lock(&msgq->lock);
400
401 if (msgq->used_msgs > idx) {
402 bytes_to_end = (msgq->buffer_end - msgq->read_ptr);
403 byte_offset = idx * msgq->msg_size;
404 start_addr = msgq->read_ptr;
405 /* check item available in start/end of ring buffer */
406 if (bytes_to_end <= byte_offset) {
407 /* Tweak the values in case */
408 byte_offset -= bytes_to_end;
409 /* wrap-around is required */
410 start_addr = msgq->buffer_start;
411 }
412 (void)memcpy(data, start_addr + byte_offset, msgq->msg_size);
413 result = 0;
414 } else {
415 /* don't wait for a message to become available */
416 result = -ENOMSG;
417 }
418
419 SYS_PORT_TRACING_OBJ_FUNC(k_msgq, peek, msgq, result);
420
421 k_spin_unlock(&msgq->lock, key);
422
423 return result;
424 }
425
426 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_peek_at(struct k_msgq * msgq,void * data,uint32_t idx)427 static inline int z_vrfy_k_msgq_peek_at(struct k_msgq *msgq, void *data, uint32_t idx)
428 {
429 K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
430 K_OOPS(K_SYSCALL_MEMORY_WRITE(data, msgq->msg_size));
431
432 return z_impl_k_msgq_peek_at(msgq, data, idx);
433 }
434 #include <zephyr/syscalls/k_msgq_peek_at_mrsh.c>
435 #endif /* CONFIG_USERSPACE */
436
z_impl_k_msgq_purge(struct k_msgq * msgq)437 void z_impl_k_msgq_purge(struct k_msgq *msgq)
438 {
439 k_spinlock_key_t key;
440 struct k_thread *pending_thread;
441 bool resched = false;
442
443 key = k_spin_lock(&msgq->lock);
444
445 SYS_PORT_TRACING_OBJ_FUNC(k_msgq, purge, msgq);
446
447 /* wake up any threads that are waiting to write */
448 for (pending_thread = z_unpend_first_thread(&msgq->wait_q);
449 pending_thread != NULL;
450 pending_thread = z_unpend_first_thread(&msgq->wait_q)) {
451 arch_thread_return_value_set(pending_thread, -ENOMSG);
452 z_ready_thread(pending_thread);
453 resched = true;
454 }
455
456 msgq->used_msgs = 0;
457 msgq->read_ptr = msgq->write_ptr;
458
459 if (resched) {
460 z_reschedule(&msgq->lock, key);
461 } else {
462 k_spin_unlock(&msgq->lock, key);
463 }
464 }
465
466 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_purge(struct k_msgq * msgq)467 static inline void z_vrfy_k_msgq_purge(struct k_msgq *msgq)
468 {
469 K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
470 z_impl_k_msgq_purge(msgq);
471 }
472 #include <zephyr/syscalls/k_msgq_purge_mrsh.c>
473
z_vrfy_k_msgq_num_free_get(struct k_msgq * msgq)474 static inline uint32_t z_vrfy_k_msgq_num_free_get(struct k_msgq *msgq)
475 {
476 K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
477 return z_impl_k_msgq_num_free_get(msgq);
478 }
479 #include <zephyr/syscalls/k_msgq_num_free_get_mrsh.c>
480
z_vrfy_k_msgq_num_used_get(struct k_msgq * msgq)481 static inline uint32_t z_vrfy_k_msgq_num_used_get(struct k_msgq *msgq)
482 {
483 K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
484 return z_impl_k_msgq_num_used_get(msgq);
485 }
486 #include <zephyr/syscalls/k_msgq_num_used_get_mrsh.c>
487
488 #endif /* CONFIG_USERSPACE */
489
490 #ifdef CONFIG_OBJ_CORE_MSGQ
init_msgq_obj_core_list(void)491 static int init_msgq_obj_core_list(void)
492 {
493 /* Initialize msgq object type */
494
495 z_obj_type_init(&obj_type_msgq, K_OBJ_TYPE_MSGQ_ID,
496 offsetof(struct k_msgq, obj_core));
497
498 /* Initialize and link statically defined message queues */
499
500 STRUCT_SECTION_FOREACH(k_msgq, msgq) {
501 k_obj_core_init_and_link(K_OBJ_CORE(msgq), &obj_type_msgq);
502 }
503
504 return 0;
505 };
506
507 SYS_INIT(init_msgq_obj_core_list, PRE_KERNEL_1,
508 CONFIG_KERNEL_INIT_PRIORITY_OBJECTS);
509
510 #endif /* CONFIG_OBJ_CORE_MSGQ */
511