1 /*
2  * Copyright (c) 2018 Intel Corporation
3  * Copyright (c) 2024 BayLibre, SAS
4  *
5  * SPDX-License-Identifier: Apache-2.0
6  */
7 
8 #include "posix_clock.h"
9 
10 #include <zephyr/kernel.h>
11 #include <errno.h>
12 #include <string.h>
13 #include <zephyr/sys/atomic.h>
14 #include <zephyr/posix/mqueue.h>
15 #include <zephyr/posix/pthread.h>
16 
17 #define SIGEV_MASK (SIGEV_NONE | SIGEV_SIGNAL | SIGEV_THREAD)
18 
19 typedef struct mqueue_object {
20 	sys_snode_t snode;
21 	char *mem_buffer;
22 	char *mem_obj;
23 	struct k_msgq queue;
24 	atomic_t ref_count;
25 	char *name;
26 	struct sigevent not;
27 } mqueue_object;
28 
29 typedef struct mqueue_desc {
30 	char *mem_desc;
31 	mqueue_object *mqueue;
32 	uint32_t  flags;
33 } mqueue_desc;
34 
35 K_SEM_DEFINE(mq_sem, 1, 1);
36 
37 /* Initialize the list */
38 sys_slist_t mq_list = SYS_SLIST_STATIC_INIT(&mq_list);
39 
40 static mqueue_object *find_in_list(const char *name);
41 static int32_t send_message(mqueue_desc *mqd, const char *msg_ptr, size_t msg_len,
42 			  k_timeout_t timeout);
43 static int32_t receive_message(mqueue_desc *mqd, char *msg_ptr, size_t msg_len,
44 			   k_timeout_t timeout);
45 static void remove_notification(mqueue_object *msg_queue);
46 static void remove_mq(mqueue_object *msg_queue);
47 static void *mq_notify_thread(void *arg);
48 
49 /**
50  * @brief Open a message queue.
51  *
52  * Number of message queue and descriptor to message queue are limited by
53  * heap size. increase the size through CONFIG_HEAP_MEM_POOL_SIZE.
54  *
55  * See IEEE 1003.1
56  */
mq_open(const char * name,int oflags,...)57 mqd_t mq_open(const char *name, int oflags, ...)
58 {
59 	va_list va;
60 	mode_t mode;
61 	struct mq_attr *attrs = NULL;
62 	long msg_size = 0U, max_msgs = 0U;
63 	mqueue_object *msg_queue;
64 	mqueue_desc *msg_queue_desc = NULL, *mqd = (mqueue_desc *)(-1);
65 	char *mq_desc_ptr, *mq_obj_ptr, *mq_buf_ptr, *mq_name_ptr;
66 
67 	va_start(va, oflags);
68 	if ((oflags & O_CREAT) != 0) {
69 		BUILD_ASSERT(sizeof(mode_t) <= sizeof(int));
70 		mode = va_arg(va, unsigned int);
71 		attrs = va_arg(va, struct mq_attr*);
72 	}
73 	va_end(va);
74 
75 	if (attrs != NULL) {
76 		msg_size = attrs->mq_msgsize;
77 		max_msgs = attrs->mq_maxmsg;
78 	}
79 
80 	if ((name == NULL) || ((oflags & O_CREAT) != 0 && (msg_size <= 0 ||
81 						      max_msgs <= 0))) {
82 		errno = EINVAL;
83 		return (mqd_t)mqd;
84 	}
85 
86 	if ((strlen(name) + 1)  > CONFIG_MQUEUE_NAMELEN_MAX) {
87 		errno = ENAMETOOLONG;
88 		return (mqd_t)mqd;
89 	}
90 
91 	/* Check if queue already exists */
92 	k_sem_take(&mq_sem, K_FOREVER);
93 	msg_queue = find_in_list(name);
94 	k_sem_give(&mq_sem);
95 
96 	if ((msg_queue != NULL) && (oflags & O_CREAT) != 0 &&
97 	    (oflags & O_EXCL) != 0) {
98 		/* Message queue has already been opened and O_EXCL is set */
99 		errno = EEXIST;
100 		return (mqd_t)mqd;
101 	}
102 
103 	if ((msg_queue == NULL) && (oflags & O_CREAT) == 0) {
104 		errno = ENOENT;
105 		return (mqd_t)mqd;
106 	}
107 
108 	mq_desc_ptr = k_malloc(sizeof(struct mqueue_desc));
109 	if (mq_desc_ptr != NULL) {
110 		(void)memset(mq_desc_ptr, 0, sizeof(struct mqueue_desc));
111 		msg_queue_desc = (struct mqueue_desc *)mq_desc_ptr;
112 		msg_queue_desc->mem_desc = mq_desc_ptr;
113 	} else {
114 		goto free_mq_desc;
115 	}
116 
117 
118 	/* Allocate mqueue object for new message queue */
119 	if (msg_queue == NULL) {
120 
121 		/* Check for message quantity and size in message queue */
122 		if (attrs->mq_msgsize > CONFIG_MSG_SIZE_MAX &&
123 		    attrs->mq_maxmsg > CONFIG_POSIX_MQ_OPEN_MAX) {
124 			goto free_mq_desc;
125 		}
126 
127 		mq_obj_ptr = k_malloc(sizeof(mqueue_object));
128 		if (mq_obj_ptr != NULL) {
129 			(void)memset(mq_obj_ptr, 0, sizeof(mqueue_object));
130 			msg_queue = (mqueue_object *)mq_obj_ptr;
131 			msg_queue->mem_obj = mq_obj_ptr;
132 
133 		} else {
134 			goto free_mq_object;
135 		}
136 
137 		mq_name_ptr = k_malloc(strlen(name) + 1);
138 		if (mq_name_ptr != NULL) {
139 			(void)memset(mq_name_ptr, 0, strlen(name) + 1);
140 			msg_queue->name = mq_name_ptr;
141 
142 		} else {
143 			goto free_mq_name;
144 		}
145 
146 		strcpy(msg_queue->name, name);
147 
148 		mq_buf_ptr = k_malloc(msg_size * max_msgs * sizeof(uint8_t));
149 		if (mq_buf_ptr != NULL) {
150 			(void)memset(mq_buf_ptr, 0,
151 				     msg_size * max_msgs * sizeof(uint8_t));
152 			msg_queue->mem_buffer = mq_buf_ptr;
153 		} else {
154 			goto free_mq_buffer;
155 		}
156 
157 		(void)atomic_set(&msg_queue->ref_count, 1);
158 		/* initialize zephyr message queue */
159 		k_msgq_init(&msg_queue->queue, msg_queue->mem_buffer, msg_size,
160 			    max_msgs);
161 		k_sem_take(&mq_sem, K_FOREVER);
162 		sys_slist_append(&mq_list, (sys_snode_t *)&(msg_queue->snode));
163 		k_sem_give(&mq_sem);
164 
165 	} else {
166 		atomic_inc(&msg_queue->ref_count);
167 	}
168 
169 	msg_queue_desc->mqueue = msg_queue;
170 	msg_queue_desc->flags = (oflags & O_NONBLOCK) != 0 ? O_NONBLOCK : 0;
171 	return (mqd_t)msg_queue_desc;
172 
173 free_mq_buffer:
174 	k_free(mq_name_ptr);
175 free_mq_name:
176 	k_free(mq_obj_ptr);
177 free_mq_object:
178 	k_free(mq_desc_ptr);
179 free_mq_desc:
180 	errno = ENOSPC;
181 	return (mqd_t)mqd;
182 }
183 
184 /**
185  * @brief Close a message queue descriptor.
186  *
187  * See IEEE 1003.1
188  */
mq_close(mqd_t mqdes)189 int mq_close(mqd_t mqdes)
190 {
191 	mqueue_desc *mqd = (mqueue_desc *)mqdes;
192 
193 	if (mqd == NULL) {
194 		errno = EBADF;
195 		return -1;
196 	}
197 
198 	atomic_dec(&mqd->mqueue->ref_count);
199 
200 	/* remove mq if marked for unlink */
201 	if (mqd->mqueue->name == NULL) {
202 		remove_mq(mqd->mqueue);
203 	}
204 
205 	k_free(mqd->mem_desc);
206 	return 0;
207 }
208 
209 /**
210  * @brief Remove a message queue.
211  *
212  * See IEEE 1003.1
213  */
mq_unlink(const char * name)214 int mq_unlink(const char *name)
215 {
216 	mqueue_object *msg_queue;
217 
218 	k_sem_take(&mq_sem, K_FOREVER);
219 	msg_queue = find_in_list(name);
220 
221 	if (msg_queue == NULL) {
222 		k_sem_give(&mq_sem);
223 		errno = EBADF;
224 		return -1;
225 	}
226 
227 	k_free(msg_queue->name);
228 	msg_queue->name = NULL;
229 	k_sem_give(&mq_sem);
230 	remove_mq(msg_queue);
231 	return 0;
232 }
233 
234 /**
235  * @brief Send a message to a message queue.
236  *
237  * All messages in message queue are of equal priority.
238  *
239  * See IEEE 1003.1
240  */
mq_send(mqd_t mqdes,const char * msg_ptr,size_t msg_len,unsigned int msg_prio)241 int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
242 	    unsigned int msg_prio)
243 {
244 	mqueue_desc *mqd = (mqueue_desc *)mqdes;
245 
246 	return send_message(mqd, msg_ptr, msg_len, K_FOREVER);
247 }
248 
249 /**
250  * @brief Send message to a message queue within abstime time.
251  *
252  * All messages in message queue are of equal priority.
253  *
254  * See IEEE 1003.1
255  */
mq_timedsend(mqd_t mqdes,const char * msg_ptr,size_t msg_len,unsigned int msg_prio,const struct timespec * abstime)256 int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
257 		 unsigned int msg_prio, const struct timespec *abstime)
258 {
259 	mqueue_desc *mqd = (mqueue_desc *)mqdes;
260 
261 	if ((abstime == NULL) || !timespec_is_valid(abstime)) {
262 		errno = EINVAL;
263 		return -1;
264 	}
265 
266 	return send_message(mqd, msg_ptr, msg_len,
267 			    K_MSEC(timespec_to_timeoutms(CLOCK_REALTIME, abstime)));
268 }
269 
270 /**
271  * @brief Receive a message from a message queue.
272  *
273  * All messages in message queue are of equal priority.
274  *
275  * See IEEE 1003.1
276  */
mq_receive(mqd_t mqdes,char * msg_ptr,size_t msg_len,unsigned int * msg_prio)277 int mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
278 		   unsigned int *msg_prio)
279 {
280 	mqueue_desc *mqd = (mqueue_desc *)mqdes;
281 
282 	return receive_message(mqd, msg_ptr, msg_len, K_FOREVER);
283 }
284 
285 /**
286  * @brief Receive message from a message queue within abstime time.
287  *
288  * All messages in message queue are of equal priority.
289  *
290  * See IEEE 1003.1
291  */
mq_timedreceive(mqd_t mqdes,char * msg_ptr,size_t msg_len,unsigned int * msg_prio,const struct timespec * abstime)292 int mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
293 			unsigned int *msg_prio, const struct timespec *abstime)
294 {
295 	mqueue_desc *mqd = (mqueue_desc *)mqdes;
296 
297 	if ((abstime == NULL) || !timespec_is_valid(abstime)) {
298 		errno = EINVAL;
299 		return -1;
300 	}
301 
302 	return receive_message(mqd, msg_ptr, msg_len,
303 			       K_MSEC(timespec_to_timeoutms(CLOCK_REALTIME, abstime)));
304 }
305 
306 /**
307  * @brief Get message queue attributes.
308  *
309  * See IEEE 1003.1
310  */
mq_getattr(mqd_t mqdes,struct mq_attr * mqstat)311 int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat)
312 {
313 	mqueue_desc *mqd = (mqueue_desc *)mqdes;
314 	struct k_msgq_attrs attrs;
315 
316 	if (mqd == NULL) {
317 		errno = EBADF;
318 		return -1;
319 	}
320 
321 	k_sem_take(&mq_sem, K_FOREVER);
322 	k_msgq_get_attrs(&mqd->mqueue->queue, &attrs);
323 	mqstat->mq_flags = mqd->flags;
324 	mqstat->mq_maxmsg = attrs.max_msgs;
325 	mqstat->mq_msgsize = attrs.msg_size;
326 	mqstat->mq_curmsgs = attrs.used_msgs;
327 	k_sem_give(&mq_sem);
328 	return 0;
329 }
330 
331 /**
332  * @brief Set message queue attributes.
333  *
334  * See IEEE 1003.1
335  */
mq_setattr(mqd_t mqdes,const struct mq_attr * mqstat,struct mq_attr * omqstat)336 int mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat,
337 	       struct mq_attr *omqstat)
338 {
339 	mqueue_desc *mqd = (mqueue_desc *)mqdes;
340 
341 	if (mqd == NULL) {
342 		errno = EBADF;
343 		return -1;
344 	}
345 
346 	if (mqstat->mq_flags != 0 && mqstat->mq_flags != O_NONBLOCK) {
347 		errno = EINVAL;
348 		return -1;
349 	}
350 
351 	if (omqstat != NULL) {
352 		mq_getattr(mqdes, omqstat);
353 	}
354 
355 	k_sem_take(&mq_sem, K_FOREVER);
356 	mqd->flags = mqstat->mq_flags;
357 	k_sem_give(&mq_sem);
358 
359 	return 0;
360 }
361 
362 /**
363  * @brief Notify process that a message is available.
364  *
365  * See IEEE 1003.1
366  */
mq_notify(mqd_t mqdes,const struct sigevent * notification)367 int mq_notify(mqd_t mqdes, const struct sigevent *notification)
368 {
369 	mqueue_desc *mqd = (mqueue_desc *)mqdes;
370 
371 	if (mqd == NULL) {
372 		errno = EBADF;
373 		return -1;
374 	}
375 
376 	mqueue_object *msg_queue = mqd->mqueue;
377 
378 	if (notification == NULL) {
379 		if ((msg_queue->not.sigev_notify & SIGEV_MASK) == 0) {
380 			errno = EINVAL;
381 			return -1;
382 		}
383 		remove_notification(msg_queue);
384 		return 0;
385 	}
386 
387 	if ((msg_queue->not.sigev_notify & SIGEV_MASK) != 0) {
388 		errno = EBUSY;
389 		return -1;
390 	}
391 	if (notification->sigev_notify == SIGEV_SIGNAL) {
392 		errno = ENOSYS;
393 		return -1;
394 	}
395 	if (notification->sigev_notify_attributes != NULL) {
396 		int ret = pthread_attr_setdetachstate(notification->sigev_notify_attributes,
397 						      PTHREAD_CREATE_DETACHED);
398 		if (ret != 0) {
399 			errno = ret;
400 			return -1;
401 		}
402 	}
403 
404 	k_sem_take(&mq_sem, K_FOREVER);
405 	memcpy(&msg_queue->not, notification, sizeof(struct sigevent));
406 	k_sem_give(&mq_sem);
407 
408 	return 0;
409 }
410 
mq_notify_thread(void * arg)411 static void *mq_notify_thread(void *arg)
412 {
413 	mqueue_object *mqueue = (mqueue_object *)arg;
414 	struct sigevent *sevp = &mqueue->not;
415 
416 	pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
417 
418 	if (sevp->sigev_notify_attributes == NULL) {
419 		pthread_detach(pthread_self());
420 	}
421 
422 	sevp->sigev_notify_function(sevp->sigev_value);
423 
424 	remove_notification(mqueue);
425 
426 	return NULL;
427 }
428 
429 /* Internal functions */
find_in_list(const char * name)430 static mqueue_object *find_in_list(const char *name)
431 {
432 	sys_snode_t *mq;
433 	mqueue_object *msg_queue;
434 
435 	mq = mq_list.head;
436 
437 	while (mq != NULL) {
438 		msg_queue = (mqueue_object *)mq;
439 		if ((msg_queue->name != NULL) && (strcmp(msg_queue->name, name) == 0)) {
440 			return msg_queue;
441 		}
442 
443 		mq = mq->next;
444 	}
445 
446 	return NULL;
447 }
448 
send_message(mqueue_desc * mqd,const char * msg_ptr,size_t msg_len,k_timeout_t timeout)449 static int32_t send_message(mqueue_desc *mqd, const char *msg_ptr, size_t msg_len,
450 			  k_timeout_t timeout)
451 {
452 	int32_t ret = -1;
453 
454 	if (mqd == NULL) {
455 		errno = EBADF;
456 		return ret;
457 	}
458 
459 	if ((mqd->flags & O_NONBLOCK) != 0U) {
460 		timeout = K_NO_WAIT;
461 	}
462 
463 	if (msg_len >  mqd->mqueue->queue.msg_size) {
464 		errno = EMSGSIZE;
465 		return ret;
466 	}
467 
468 	uint32_t msgq_num = k_msgq_num_used_get(&mqd->mqueue->queue);
469 
470 	if (k_msgq_put(&mqd->mqueue->queue, (void *)msg_ptr, timeout) != 0) {
471 		errno = K_TIMEOUT_EQ(timeout, K_NO_WAIT) ? EAGAIN : ETIMEDOUT;
472 		return ret;
473 	}
474 
475 	if (k_msgq_num_used_get(&mqd->mqueue->queue) - msgq_num > 0) {
476 		struct sigevent *sevp = &mqd->mqueue->not;
477 
478 		if (sevp->sigev_notify == SIGEV_NONE) {
479 			sevp->sigev_notify_function(sevp->sigev_value);
480 		} else if (sevp->sigev_notify == SIGEV_THREAD) {
481 			pthread_t th;
482 
483 			ret = pthread_create(&th,
484 					     sevp->sigev_notify_attributes,
485 					     mq_notify_thread,
486 					     mqd->mqueue);
487 		}
488 	}
489 
490 	return 0;
491 }
492 
receive_message(mqueue_desc * mqd,char * msg_ptr,size_t msg_len,k_timeout_t timeout)493 static int32_t receive_message(mqueue_desc *mqd, char *msg_ptr, size_t msg_len,
494 			     k_timeout_t timeout)
495 {
496 	int ret = -1;
497 
498 	if (mqd == NULL) {
499 		errno = EBADF;
500 		return ret;
501 	}
502 
503 	if (msg_len < mqd->mqueue->queue.msg_size) {
504 		errno = EMSGSIZE;
505 		return ret;
506 	}
507 
508 	if ((mqd->flags & O_NONBLOCK) != 0U) {
509 		timeout = K_NO_WAIT;
510 	}
511 
512 	if (k_msgq_get(&mqd->mqueue->queue, (void *)msg_ptr, timeout) != 0) {
513 		errno = K_TIMEOUT_EQ(timeout, K_NO_WAIT) ? EAGAIN : ETIMEDOUT;
514 	} else {
515 		ret = mqd->mqueue->queue.msg_size;
516 	}
517 
518 	return ret;
519 }
520 
remove_mq(mqueue_object * msg_queue)521 static void remove_mq(mqueue_object *msg_queue)
522 {
523 	if (atomic_cas(&msg_queue->ref_count, 0, 0)) {
524 		k_sem_take(&mq_sem, K_FOREVER);
525 		sys_slist_find_and_remove(&mq_list, (sys_snode_t *) msg_queue);
526 		k_sem_give(&mq_sem);
527 
528 		/* Free mq buffer and pbject */
529 		k_free(msg_queue->mem_buffer);
530 		k_free(msg_queue->mem_obj);
531 	}
532 }
533 
remove_notification(mqueue_object * msg_queue)534 static void remove_notification(mqueue_object *msg_queue)
535 {
536 	k_sem_take(&mq_sem, K_FOREVER);
537 	memset(&msg_queue->not, 0, sizeof(struct sigevent));
538 	k_sem_give(&mq_sem);
539 }
540