1 /*
2 * Copyright (c) 2018 Intel Corporation
3 * Copyright (c) 2024 BayLibre, SAS
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 */
7
8 #include "posix_clock.h"
9
10 #include <zephyr/kernel.h>
11 #include <errno.h>
12 #include <string.h>
13 #include <zephyr/sys/atomic.h>
14 #include <zephyr/posix/mqueue.h>
15 #include <zephyr/posix/pthread.h>
16
17 #define SIGEV_MASK (SIGEV_NONE | SIGEV_SIGNAL | SIGEV_THREAD)
18
19 typedef struct mqueue_object {
20 sys_snode_t snode;
21 char *mem_buffer;
22 char *mem_obj;
23 struct k_msgq queue;
24 atomic_t ref_count;
25 char *name;
26 struct sigevent not;
27 } mqueue_object;
28
29 typedef struct mqueue_desc {
30 char *mem_desc;
31 mqueue_object *mqueue;
32 uint32_t flags;
33 } mqueue_desc;
34
35 K_SEM_DEFINE(mq_sem, 1, 1);
36
37 /* Initialize the list */
38 sys_slist_t mq_list = SYS_SLIST_STATIC_INIT(&mq_list);
39
40 static mqueue_object *find_in_list(const char *name);
41 static int32_t send_message(mqueue_desc *mqd, const char *msg_ptr, size_t msg_len,
42 k_timeout_t timeout);
43 static int32_t receive_message(mqueue_desc *mqd, char *msg_ptr, size_t msg_len,
44 k_timeout_t timeout);
45 static void remove_notification(mqueue_object *msg_queue);
46 static void remove_mq(mqueue_object *msg_queue);
47 static void *mq_notify_thread(void *arg);
48
49 /**
50 * @brief Open a message queue.
51 *
52 * Number of message queue and descriptor to message queue are limited by
53 * heap size. increase the size through CONFIG_HEAP_MEM_POOL_SIZE.
54 *
55 * See IEEE 1003.1
56 */
mq_open(const char * name,int oflags,...)57 mqd_t mq_open(const char *name, int oflags, ...)
58 {
59 va_list va;
60 mode_t mode;
61 struct mq_attr *attrs = NULL;
62 long msg_size = 0U, max_msgs = 0U;
63 mqueue_object *msg_queue;
64 mqueue_desc *msg_queue_desc = NULL, *mqd = (mqueue_desc *)(-1);
65 char *mq_desc_ptr, *mq_obj_ptr, *mq_buf_ptr, *mq_name_ptr;
66
67 va_start(va, oflags);
68 if ((oflags & O_CREAT) != 0) {
69 BUILD_ASSERT(sizeof(mode_t) <= sizeof(int));
70 mode = va_arg(va, unsigned int);
71 attrs = va_arg(va, struct mq_attr*);
72 }
73 va_end(va);
74
75 if (attrs != NULL) {
76 msg_size = attrs->mq_msgsize;
77 max_msgs = attrs->mq_maxmsg;
78 }
79
80 if ((name == NULL) || ((oflags & O_CREAT) != 0 && (msg_size <= 0 ||
81 max_msgs <= 0))) {
82 errno = EINVAL;
83 return (mqd_t)mqd;
84 }
85
86 if ((strlen(name) + 1) > CONFIG_MQUEUE_NAMELEN_MAX) {
87 errno = ENAMETOOLONG;
88 return (mqd_t)mqd;
89 }
90
91 /* Check if queue already exists */
92 k_sem_take(&mq_sem, K_FOREVER);
93 msg_queue = find_in_list(name);
94 k_sem_give(&mq_sem);
95
96 if ((msg_queue != NULL) && (oflags & O_CREAT) != 0 &&
97 (oflags & O_EXCL) != 0) {
98 /* Message queue has already been opened and O_EXCL is set */
99 errno = EEXIST;
100 return (mqd_t)mqd;
101 }
102
103 if ((msg_queue == NULL) && (oflags & O_CREAT) == 0) {
104 errno = ENOENT;
105 return (mqd_t)mqd;
106 }
107
108 mq_desc_ptr = k_malloc(sizeof(struct mqueue_desc));
109 if (mq_desc_ptr != NULL) {
110 (void)memset(mq_desc_ptr, 0, sizeof(struct mqueue_desc));
111 msg_queue_desc = (struct mqueue_desc *)mq_desc_ptr;
112 msg_queue_desc->mem_desc = mq_desc_ptr;
113 } else {
114 goto free_mq_desc;
115 }
116
117
118 /* Allocate mqueue object for new message queue */
119 if (msg_queue == NULL) {
120
121 /* Check for message quantity and size in message queue */
122 if (attrs->mq_msgsize > CONFIG_MSG_SIZE_MAX &&
123 attrs->mq_maxmsg > CONFIG_POSIX_MQ_OPEN_MAX) {
124 goto free_mq_desc;
125 }
126
127 mq_obj_ptr = k_malloc(sizeof(mqueue_object));
128 if (mq_obj_ptr != NULL) {
129 (void)memset(mq_obj_ptr, 0, sizeof(mqueue_object));
130 msg_queue = (mqueue_object *)mq_obj_ptr;
131 msg_queue->mem_obj = mq_obj_ptr;
132
133 } else {
134 goto free_mq_object;
135 }
136
137 mq_name_ptr = k_malloc(strlen(name) + 1);
138 if (mq_name_ptr != NULL) {
139 (void)memset(mq_name_ptr, 0, strlen(name) + 1);
140 msg_queue->name = mq_name_ptr;
141
142 } else {
143 goto free_mq_name;
144 }
145
146 strcpy(msg_queue->name, name);
147
148 mq_buf_ptr = k_malloc(msg_size * max_msgs * sizeof(uint8_t));
149 if (mq_buf_ptr != NULL) {
150 (void)memset(mq_buf_ptr, 0,
151 msg_size * max_msgs * sizeof(uint8_t));
152 msg_queue->mem_buffer = mq_buf_ptr;
153 } else {
154 goto free_mq_buffer;
155 }
156
157 (void)atomic_set(&msg_queue->ref_count, 1);
158 /* initialize zephyr message queue */
159 k_msgq_init(&msg_queue->queue, msg_queue->mem_buffer, msg_size,
160 max_msgs);
161 k_sem_take(&mq_sem, K_FOREVER);
162 sys_slist_append(&mq_list, (sys_snode_t *)&(msg_queue->snode));
163 k_sem_give(&mq_sem);
164
165 } else {
166 atomic_inc(&msg_queue->ref_count);
167 }
168
169 msg_queue_desc->mqueue = msg_queue;
170 msg_queue_desc->flags = (oflags & O_NONBLOCK) != 0 ? O_NONBLOCK : 0;
171 return (mqd_t)msg_queue_desc;
172
173 free_mq_buffer:
174 k_free(mq_name_ptr);
175 free_mq_name:
176 k_free(mq_obj_ptr);
177 free_mq_object:
178 k_free(mq_desc_ptr);
179 free_mq_desc:
180 errno = ENOSPC;
181 return (mqd_t)mqd;
182 }
183
184 /**
185 * @brief Close a message queue descriptor.
186 *
187 * See IEEE 1003.1
188 */
mq_close(mqd_t mqdes)189 int mq_close(mqd_t mqdes)
190 {
191 mqueue_desc *mqd = (mqueue_desc *)mqdes;
192
193 if (mqd == NULL) {
194 errno = EBADF;
195 return -1;
196 }
197
198 atomic_dec(&mqd->mqueue->ref_count);
199
200 /* remove mq if marked for unlink */
201 if (mqd->mqueue->name == NULL) {
202 remove_mq(mqd->mqueue);
203 }
204
205 k_free(mqd->mem_desc);
206 return 0;
207 }
208
209 /**
210 * @brief Remove a message queue.
211 *
212 * See IEEE 1003.1
213 */
mq_unlink(const char * name)214 int mq_unlink(const char *name)
215 {
216 mqueue_object *msg_queue;
217
218 k_sem_take(&mq_sem, K_FOREVER);
219 msg_queue = find_in_list(name);
220
221 if (msg_queue == NULL) {
222 k_sem_give(&mq_sem);
223 errno = EBADF;
224 return -1;
225 }
226
227 k_free(msg_queue->name);
228 msg_queue->name = NULL;
229 k_sem_give(&mq_sem);
230 remove_mq(msg_queue);
231 return 0;
232 }
233
234 /**
235 * @brief Send a message to a message queue.
236 *
237 * All messages in message queue are of equal priority.
238 *
239 * See IEEE 1003.1
240 */
mq_send(mqd_t mqdes,const char * msg_ptr,size_t msg_len,unsigned int msg_prio)241 int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
242 unsigned int msg_prio)
243 {
244 mqueue_desc *mqd = (mqueue_desc *)mqdes;
245
246 return send_message(mqd, msg_ptr, msg_len, K_FOREVER);
247 }
248
249 /**
250 * @brief Send message to a message queue within abstime time.
251 *
252 * All messages in message queue are of equal priority.
253 *
254 * See IEEE 1003.1
255 */
mq_timedsend(mqd_t mqdes,const char * msg_ptr,size_t msg_len,unsigned int msg_prio,const struct timespec * abstime)256 int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
257 unsigned int msg_prio, const struct timespec *abstime)
258 {
259 mqueue_desc *mqd = (mqueue_desc *)mqdes;
260
261 if ((abstime == NULL) || !timespec_is_valid(abstime)) {
262 errno = EINVAL;
263 return -1;
264 }
265
266 return send_message(mqd, msg_ptr, msg_len,
267 K_MSEC(timespec_to_timeoutms(CLOCK_REALTIME, abstime)));
268 }
269
270 /**
271 * @brief Receive a message from a message queue.
272 *
273 * All messages in message queue are of equal priority.
274 *
275 * See IEEE 1003.1
276 */
mq_receive(mqd_t mqdes,char * msg_ptr,size_t msg_len,unsigned int * msg_prio)277 int mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
278 unsigned int *msg_prio)
279 {
280 mqueue_desc *mqd = (mqueue_desc *)mqdes;
281
282 return receive_message(mqd, msg_ptr, msg_len, K_FOREVER);
283 }
284
285 /**
286 * @brief Receive message from a message queue within abstime time.
287 *
288 * All messages in message queue are of equal priority.
289 *
290 * See IEEE 1003.1
291 */
mq_timedreceive(mqd_t mqdes,char * msg_ptr,size_t msg_len,unsigned int * msg_prio,const struct timespec * abstime)292 int mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
293 unsigned int *msg_prio, const struct timespec *abstime)
294 {
295 mqueue_desc *mqd = (mqueue_desc *)mqdes;
296
297 if ((abstime == NULL) || !timespec_is_valid(abstime)) {
298 errno = EINVAL;
299 return -1;
300 }
301
302 return receive_message(mqd, msg_ptr, msg_len,
303 K_MSEC(timespec_to_timeoutms(CLOCK_REALTIME, abstime)));
304 }
305
306 /**
307 * @brief Get message queue attributes.
308 *
309 * See IEEE 1003.1
310 */
mq_getattr(mqd_t mqdes,struct mq_attr * mqstat)311 int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat)
312 {
313 mqueue_desc *mqd = (mqueue_desc *)mqdes;
314 struct k_msgq_attrs attrs;
315
316 if (mqd == NULL) {
317 errno = EBADF;
318 return -1;
319 }
320
321 k_sem_take(&mq_sem, K_FOREVER);
322 k_msgq_get_attrs(&mqd->mqueue->queue, &attrs);
323 mqstat->mq_flags = mqd->flags;
324 mqstat->mq_maxmsg = attrs.max_msgs;
325 mqstat->mq_msgsize = attrs.msg_size;
326 mqstat->mq_curmsgs = attrs.used_msgs;
327 k_sem_give(&mq_sem);
328 return 0;
329 }
330
331 /**
332 * @brief Set message queue attributes.
333 *
334 * See IEEE 1003.1
335 */
mq_setattr(mqd_t mqdes,const struct mq_attr * mqstat,struct mq_attr * omqstat)336 int mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat,
337 struct mq_attr *omqstat)
338 {
339 mqueue_desc *mqd = (mqueue_desc *)mqdes;
340
341 if (mqd == NULL) {
342 errno = EBADF;
343 return -1;
344 }
345
346 if (mqstat->mq_flags != 0 && mqstat->mq_flags != O_NONBLOCK) {
347 errno = EINVAL;
348 return -1;
349 }
350
351 if (omqstat != NULL) {
352 mq_getattr(mqdes, omqstat);
353 }
354
355 k_sem_take(&mq_sem, K_FOREVER);
356 mqd->flags = mqstat->mq_flags;
357 k_sem_give(&mq_sem);
358
359 return 0;
360 }
361
362 /**
363 * @brief Notify process that a message is available.
364 *
365 * See IEEE 1003.1
366 */
mq_notify(mqd_t mqdes,const struct sigevent * notification)367 int mq_notify(mqd_t mqdes, const struct sigevent *notification)
368 {
369 mqueue_desc *mqd = (mqueue_desc *)mqdes;
370
371 if (mqd == NULL) {
372 errno = EBADF;
373 return -1;
374 }
375
376 mqueue_object *msg_queue = mqd->mqueue;
377
378 if (notification == NULL) {
379 if ((msg_queue->not.sigev_notify & SIGEV_MASK) == 0) {
380 errno = EINVAL;
381 return -1;
382 }
383 remove_notification(msg_queue);
384 return 0;
385 }
386
387 if ((msg_queue->not.sigev_notify & SIGEV_MASK) != 0) {
388 errno = EBUSY;
389 return -1;
390 }
391 if (notification->sigev_notify == SIGEV_SIGNAL) {
392 errno = ENOSYS;
393 return -1;
394 }
395 if (notification->sigev_notify_attributes != NULL) {
396 int ret = pthread_attr_setdetachstate(notification->sigev_notify_attributes,
397 PTHREAD_CREATE_DETACHED);
398 if (ret != 0) {
399 errno = ret;
400 return -1;
401 }
402 }
403
404 k_sem_take(&mq_sem, K_FOREVER);
405 memcpy(&msg_queue->not, notification, sizeof(struct sigevent));
406 k_sem_give(&mq_sem);
407
408 return 0;
409 }
410
mq_notify_thread(void * arg)411 static void *mq_notify_thread(void *arg)
412 {
413 mqueue_object *mqueue = (mqueue_object *)arg;
414 struct sigevent *sevp = &mqueue->not;
415
416 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
417
418 if (sevp->sigev_notify_attributes == NULL) {
419 pthread_detach(pthread_self());
420 }
421
422 sevp->sigev_notify_function(sevp->sigev_value);
423
424 remove_notification(mqueue);
425
426 return NULL;
427 }
428
429 /* Internal functions */
find_in_list(const char * name)430 static mqueue_object *find_in_list(const char *name)
431 {
432 sys_snode_t *mq;
433 mqueue_object *msg_queue;
434
435 mq = mq_list.head;
436
437 while (mq != NULL) {
438 msg_queue = (mqueue_object *)mq;
439 if ((msg_queue->name != NULL) && (strcmp(msg_queue->name, name) == 0)) {
440 return msg_queue;
441 }
442
443 mq = mq->next;
444 }
445
446 return NULL;
447 }
448
send_message(mqueue_desc * mqd,const char * msg_ptr,size_t msg_len,k_timeout_t timeout)449 static int32_t send_message(mqueue_desc *mqd, const char *msg_ptr, size_t msg_len,
450 k_timeout_t timeout)
451 {
452 int32_t ret = -1;
453
454 if (mqd == NULL) {
455 errno = EBADF;
456 return ret;
457 }
458
459 if ((mqd->flags & O_NONBLOCK) != 0U) {
460 timeout = K_NO_WAIT;
461 }
462
463 if (msg_len > mqd->mqueue->queue.msg_size) {
464 errno = EMSGSIZE;
465 return ret;
466 }
467
468 uint32_t msgq_num = k_msgq_num_used_get(&mqd->mqueue->queue);
469
470 if (k_msgq_put(&mqd->mqueue->queue, (void *)msg_ptr, timeout) != 0) {
471 errno = K_TIMEOUT_EQ(timeout, K_NO_WAIT) ? EAGAIN : ETIMEDOUT;
472 return ret;
473 }
474
475 if (k_msgq_num_used_get(&mqd->mqueue->queue) - msgq_num > 0) {
476 struct sigevent *sevp = &mqd->mqueue->not;
477
478 if (sevp->sigev_notify == SIGEV_NONE) {
479 sevp->sigev_notify_function(sevp->sigev_value);
480 } else if (sevp->sigev_notify == SIGEV_THREAD) {
481 pthread_t th;
482
483 ret = pthread_create(&th,
484 sevp->sigev_notify_attributes,
485 mq_notify_thread,
486 mqd->mqueue);
487 }
488 }
489
490 return 0;
491 }
492
receive_message(mqueue_desc * mqd,char * msg_ptr,size_t msg_len,k_timeout_t timeout)493 static int32_t receive_message(mqueue_desc *mqd, char *msg_ptr, size_t msg_len,
494 k_timeout_t timeout)
495 {
496 int ret = -1;
497
498 if (mqd == NULL) {
499 errno = EBADF;
500 return ret;
501 }
502
503 if (msg_len < mqd->mqueue->queue.msg_size) {
504 errno = EMSGSIZE;
505 return ret;
506 }
507
508 if ((mqd->flags & O_NONBLOCK) != 0U) {
509 timeout = K_NO_WAIT;
510 }
511
512 if (k_msgq_get(&mqd->mqueue->queue, (void *)msg_ptr, timeout) != 0) {
513 errno = K_TIMEOUT_EQ(timeout, K_NO_WAIT) ? EAGAIN : ETIMEDOUT;
514 } else {
515 ret = mqd->mqueue->queue.msg_size;
516 }
517
518 return ret;
519 }
520
remove_mq(mqueue_object * msg_queue)521 static void remove_mq(mqueue_object *msg_queue)
522 {
523 if (atomic_cas(&msg_queue->ref_count, 0, 0)) {
524 k_sem_take(&mq_sem, K_FOREVER);
525 sys_slist_find_and_remove(&mq_list, (sys_snode_t *) msg_queue);
526 k_sem_give(&mq_sem);
527
528 /* Free mq buffer and pbject */
529 k_free(msg_queue->mem_buffer);
530 k_free(msg_queue->mem_obj);
531 }
532 }
533
remove_notification(mqueue_object * msg_queue)534 static void remove_notification(mqueue_object *msg_queue)
535 {
536 k_sem_take(&mq_sem, K_FOREVER);
537 memset(&msg_queue->not, 0, sizeof(struct sigevent));
538 k_sem_give(&mq_sem);
539 }
540