1 /*
2 * Copyright (c) 2006-2023, RT-Thread Development Team
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 *
6 * Change Logs:
7 * Date Author Notes
8 * 2019-10-12 Jesven first version
9 * 2023-07-25 Shell Remove usage of rt_hw_interrupt API in the lwp
10 * 2023-09-16 zmq810150896 Increased versatility of some features on dfs v2
11 * 2024-01-25 Shell porting to susp_list API
12 */
13 #define __RT_IPC_SOURCE__
14
15 #define DBG_TAG "lwp.ipc"
16 #define DBG_LVL DBG_WARNING
17 #include <rtdbg.h>
18
19 #include <rtthread.h>
20 #include <rthw.h>
21
22 #include "lwp_internal.h"
23 #include "lwp_ipc.h"
24 #include "lwp_ipc_internal.h"
25
26 #include <dfs_file.h>
27 #include <poll.h>
28
29 #ifdef RT_USING_DFS_V2
30 #include <dfs_dentry.h>
31 #endif
32
33 /**
34 * the IPC channel states
35 */
36 enum
37 {
38 RT_IPC_STAT_IDLE, /* no suspended threads */
39 RT_IPC_STAT_WAIT, /* suspended receivers exist */
40 RT_IPC_STAT_ACTIVE, /* suspended senders exist */
41 };
42
43 /**
44 * IPC message structure.
45 *
46 * They are allocated and released in the similar way like 'rt_chfd'.
47 */
48 struct rt_ipc_msg
49 {
50 struct rt_channel_msg msg; /**< the payload of msg */
51 rt_list_t mlist; /**< the msg list */
52 rt_uint8_t need_reply; /**< whether msg wait reply*/
53 };
54 typedef struct rt_ipc_msg *rt_ipc_msg_t;
55
56 static rt_ipc_msg_t _ipc_msg_free_list = (rt_ipc_msg_t)RT_NULL; /* released chain */
57 static int rt_ipc_msg_used = 0; /* first unallocated entry */
58 static struct rt_ipc_msg ipc_msg_pool[RT_CH_MSG_MAX_NR]; /* initial message array */
59 static struct rt_mutex _chn_obj_lock;
60 static struct rt_spinlock _msg_list_lock; /* lock protect of _ipc_msg_free_list */
61
62 /**
63 * Allocate an IPC message from the statically-allocated array.
64 */
_ipc_msg_alloc(void)65 static rt_ipc_msg_t _ipc_msg_alloc(void)
66 {
67 rt_ipc_msg_t p = (rt_ipc_msg_t)RT_NULL;
68 rt_base_t level;
69
70 level = rt_spin_lock_irqsave(&_msg_list_lock);
71 if (_ipc_msg_free_list) /* use the released chain first */
72 {
73 p = _ipc_msg_free_list;
74 _ipc_msg_free_list = (rt_ipc_msg_t)p->msg.sender; /* emtry payload as a pointer */
75 }
76 else if (rt_ipc_msg_used < RT_CH_MSG_MAX_NR)
77 {
78 p = &ipc_msg_pool[rt_ipc_msg_used];
79 rt_ipc_msg_used++;
80 }
81 rt_spin_unlock_irqrestore(&_msg_list_lock, level);
82 return p;
83 }
84
85 /**
86 * Put a released IPC message back to the released chain.
87 */
_ipc_msg_free(rt_ipc_msg_t p_msg)88 static void _ipc_msg_free(rt_ipc_msg_t p_msg)
89 {
90 rt_base_t level;
91
92 level = rt_spin_lock_irqsave(&_msg_list_lock);
93 p_msg->msg.sender = (void *)_ipc_msg_free_list;
94 _ipc_msg_free_list = p_msg;
95 rt_spin_unlock_irqrestore(&_msg_list_lock, level);
96 }
97
98 /**
99 * Initialized the IPC message.
100 */
rt_ipc_msg_init(rt_ipc_msg_t msg,struct rt_channel_msg * data,rt_uint8_t need_reply)101 static void rt_ipc_msg_init(rt_ipc_msg_t msg, struct rt_channel_msg *data, rt_uint8_t need_reply)
102 {
103 RT_ASSERT(msg != RT_NULL);
104
105 msg->need_reply = need_reply;
106 msg->msg = *data;
107 msg->msg.sender = (void *)rt_thread_self();
108 rt_list_init(&msg->mlist);
109 }
110
111 /**
112 * Initialized the list of the waiting receivers on the IPC channel.
113 */
rt_channel_object_init(struct rt_ipc_object * ipc)114 rt_inline rt_err_t rt_channel_object_init(struct rt_ipc_object *ipc)
115 {
116 rt_list_init(&(ipc->suspend_thread)); /* receiver list */
117
118 return RT_EOK;
119 }
120
121 /**
122 * Wakeup the first suspened thread in the list.
123 */
rt_channel_list_resume(rt_list_t * list)124 rt_inline rt_err_t rt_channel_list_resume(rt_list_t *list)
125 {
126 struct rt_thread *thread;
127
128 /* get the first thread entry waiting for sending */
129 thread = rt_susp_list_dequeue(list, RT_THREAD_RESUME_RES_THR_ERR);
130
131 return thread ? RT_EOK : -RT_ERROR;
132 }
133
134 /**
135 * Wakeup all the suspended threads in the list.
136 */
_channel_list_resume_all_locked(rt_list_t * list)137 rt_inline rt_err_t _channel_list_resume_all_locked(rt_list_t *list)
138 {
139 /* wakeup all suspended threads for sending */
140 rt_susp_list_resume_all(list, RT_ERROR);
141
142 return RT_EOK;
143 }
144
145 /**
146 * Suspend the thread and chain it into the end of the list.
147 */
rt_channel_list_suspend(rt_list_t * list,struct rt_thread * thread)148 rt_inline rt_err_t rt_channel_list_suspend(rt_list_t *list, struct rt_thread *thread)
149 {
150 /* suspend thread */
151 rt_err_t ret = rt_thread_suspend_to_list(thread, list, RT_IPC_FLAG_FIFO, RT_INTERRUPTIBLE);
152
153 return ret;
154 }
155
_rt_channel_check_wq_wakup_locked(rt_channel_t ch)156 static void _rt_channel_check_wq_wakup_locked(rt_channel_t ch)
157 {
158 if (rt_list_isempty(&ch->wait_msg))
159 {
160 return;
161 }
162
163 rt_wqueue_wakeup(&ch->reader_queue, 0);
164 }
165
rt_channel_component_init(void)166 rt_err_t rt_channel_component_init(void)
167 {
168 return rt_mutex_init(&_chn_obj_lock, "rt_chnannel", RT_IPC_FLAG_PRIO);
169 }
170
171 /**
172 * Create a new or open an existing IPC channel.
173 */
rt_raw_channel_open(const char * name,int flags)174 rt_channel_t rt_raw_channel_open(const char *name, int flags)
175 {
176 rt_err_t err = RT_EOK;
177 rt_channel_t ch = RT_NULL;
178 rt_base_t level;
179 struct rt_object *object;
180 struct rt_list_node *node;
181 struct rt_object_information *information;
182
183 RT_DEBUG_NOT_IN_INTERRUPT;
184
185 /**
186 * Brief: Match an existing channel from object list with the same name
187 * If no such channel found, it will create a new channel if O_CREAT
188 * is set in the flag
189 *
190 * Note: Critical Section
191 * - Channel Object list (RW; this may write to a channel if needed, and
192 * the RCU operation of the routine should be atomic)
193 */
194 information = rt_object_get_information(RT_Object_Class_Channel);
195 RT_ASSERT(information != RT_NULL);
196
197 err = rt_mutex_take_interruptible(&_chn_obj_lock, RT_WAITING_FOREVER);
198 if (err != RT_EOK)
199 {
200 return RT_NULL;
201 }
202
203 for (node = information->object_list.next;
204 node != &(information->object_list);
205 node = node->next)
206 {
207 object = rt_list_entry(node, struct rt_object, list);
208 if (rt_strncmp(object->name, name, RT_NAME_MAX) == 0)
209 {
210 if ((flags & O_CREAT) && (flags & O_EXCL))
211 {
212 err = -RT_EFULL;
213 break;
214 }
215 /* find the IPC channel with the specific name */
216 ch = (rt_channel_t)object;
217 level = rt_spin_lock_irqsave(&ch->slock);
218 ch->ref++; /* increase the reference count */
219 rt_spin_unlock_irqrestore(&ch->slock, level);
220 break;
221 }
222 }
223
224 if (!ch && err == RT_EOK)
225 {
226 /* create a new IPC channel */
227 if (flags & O_CREAT)
228 {
229 /* allocate a real IPC channel structure */
230 ch = (rt_channel_t)rt_object_allocate(RT_Object_Class_Channel, name);
231 }
232
233 if (ch)
234 {
235 rt_channel_object_init(&ch->parent); /* suspended receivers */
236 rt_spin_lock_init(&ch->slock);
237 rt_list_init(&ch->wait_msg); /* unhandled messages */
238 rt_list_init(&ch->wait_thread); /* suspended senders */
239 rt_wqueue_init(&ch->reader_queue); /* reader poll queue */
240 ch->reply = RT_NULL;
241 ch->stat = RT_IPC_STAT_IDLE; /* no suspended threads */
242 ch->ref = 1;
243 }
244 }
245
246 rt_mutex_release(&_chn_obj_lock);
247
248 return ch;
249 }
250
251 /**
252 * Close an existiong IPC channel, release the resources.
253 */
rt_raw_channel_close(rt_channel_t ch)254 rt_err_t rt_raw_channel_close(rt_channel_t ch)
255 {
256 rt_err_t rc = -RT_EIO;
257 rt_base_t level;
258
259 RT_DEBUG_NOT_IN_INTERRUPT;
260
261 if (ch != RT_NULL)
262 {
263 rc = rt_mutex_take_interruptible(&_chn_obj_lock, RT_WAITING_FOREVER);
264 if (rc != RT_EOK)
265 {
266 return rc;
267 }
268 /**
269 * Brief: Remove the channel from object list
270 *
271 * Note: Critical Section
272 * - the channel
273 */
274 level = rt_spin_lock_irqsave(&ch->slock);
275
276 if (rt_object_get_type(&ch->parent.parent) != RT_Object_Class_Channel)
277 {
278 rc = -RT_EIO;
279 }
280 else if (rt_object_is_systemobject(&ch->parent.parent) != RT_FALSE)
281 {
282 rc = -RT_EIO;
283 }
284 else if (ch->ref == 0)
285 {
286 rc = -RT_EIO;
287 }
288 else
289 {
290 ch->ref--;
291
292 rc = RT_EOK;
293 }
294 rt_spin_unlock_irqrestore(&ch->slock, level);
295
296 if (rc == RT_EOK)
297 {
298 if (ch->ref == 0)
299 {
300 /* wakeup all the suspended receivers and senders */
301 _channel_list_resume_all_locked(&ch->parent.suspend_thread);
302 _channel_list_resume_all_locked(&ch->wait_thread);
303
304 /* all ipc msg will lost */
305 rt_list_init(&ch->wait_msg);
306
307 rt_object_delete(&ch->parent.parent); /* release the IPC channel structure */
308 }
309 }
310
311 rt_mutex_release(&_chn_obj_lock);
312 }
313
314 return rc;
315 }
316
wakeup_sender_wait_recv(void * object,struct rt_thread * thread)317 static rt_err_t wakeup_sender_wait_recv(void *object, struct rt_thread *thread)
318 {
319 rt_channel_t ch;
320
321 ch = (rt_channel_t)object;
322 if (ch->stat == RT_IPC_STAT_ACTIVE && ch->reply == thread)
323 {
324 ch->stat = RT_IPC_STAT_IDLE;
325 ch->reply = RT_NULL;
326 }
327 else
328 {
329 rt_ipc_msg_t msg;
330 rt_list_t *l;
331
332 l = ch->wait_msg.next;
333 while (l != &ch->wait_msg)
334 {
335 msg = rt_list_entry(l, struct rt_ipc_msg, mlist);
336 if (msg->need_reply && msg->msg.sender == thread)
337 {
338 rt_list_remove(&msg->mlist); /* remove the msg from the channel */
339 _ipc_msg_free(msg);
340 break;
341 }
342 l = l->next;
343 }
344 }
345 thread->error = -RT_EINTR;
346 return rt_thread_resume(thread); /* wake up the sender */
347 }
348
wakeup_sender_wait_reply(void * object,struct rt_thread * thread)349 static rt_err_t wakeup_sender_wait_reply(void *object, struct rt_thread *thread)
350 {
351 rt_channel_t ch;
352
353 ch = (rt_channel_t)object;
354 RT_ASSERT(ch->stat == RT_IPC_STAT_ACTIVE && ch->reply == thread);
355 ch->stat = RT_IPC_STAT_IDLE;
356 ch->reply = RT_NULL;
357 thread->error = -RT_EINTR;
358 return rt_thread_resume(thread); /* wake up the sender */
359 }
360
sender_timeout(void * parameter)361 static void sender_timeout(void *parameter)
362 {
363 rt_sched_lock_level_t slvl;
364 struct rt_thread *thread = (struct rt_thread *)parameter;
365 rt_channel_t ch;
366
367 rt_sched_lock(&slvl);
368
369 ch = (rt_channel_t)(thread->wakeup_handle.user_data);
370 if (ch->stat == RT_IPC_STAT_ACTIVE && ch->reply == thread)
371 {
372 ch->stat = RT_IPC_STAT_IDLE;
373 ch->reply = RT_NULL;
374 }
375 else
376 {
377 rt_ipc_msg_t msg;
378 rt_list_t *l;
379
380 l = ch->wait_msg.next;
381 while (l != &ch->wait_msg)
382 {
383 msg = rt_list_entry(l, struct rt_ipc_msg, mlist);
384 if (msg->need_reply && msg->msg.sender == thread)
385 {
386 rt_list_remove(&msg->mlist); /* remove the msg from the channel */
387 _ipc_msg_free(msg);
388 break;
389 }
390 l = l->next;
391 }
392 }
393
394 thread->wakeup_handle.func = RT_NULL;
395 thread->error = RT_ETIMEOUT;
396
397 /* insert to schedule ready list */
398 rt_sched_insert_thread(thread);
399 /* do schedule */
400 rt_sched_unlock_n_resched(slvl);
401 }
402
403 /**
404 * Get file vnode from fd.
405 */
_ipc_msg_get_file(int fd)406 static void *_ipc_msg_get_file(int fd)
407 {
408 struct dfs_file *d;
409
410 d = fd_get(fd);
411 if (d == RT_NULL)
412 return RT_NULL;
413
414 if (!d->vnode)
415 return RT_NULL;
416
417 return (void *)d;
418 }
419
420 /**
421 * Get fd from file vnode.
422 */
_ipc_msg_fd_new(void * file)423 static int _ipc_msg_fd_new(void *file)
424 {
425 int fd;
426 struct dfs_file *d;
427 struct dfs_file *df = RT_NULL;
428
429 if (file == RT_NULL)
430 {
431 return -1;
432 }
433
434 df = (struct dfs_file *)file;
435
436 fd = fd_new();
437 if (fd < 0)
438 {
439 return -1;
440 }
441
442 d = fd_get(fd);
443 if (!d)
444 {
445 fd_release(fd);
446 return -1;
447 }
448
449 d->vnode = df->vnode;
450 d->flags = df->flags;
451 d->data = df->data;
452 d->magic = df->magic;
453
454 #ifdef RT_USING_DFS_V2
455 d->fops = df->fops;
456 d->mode = df->mode;
457 d->dentry = df->dentry;
458 if (d->dentry)
459 rt_atomic_add(&(d->dentry->ref_count), 1);
460
461 if (d->vnode)
462 rt_atomic_add(&(d->vnode->ref_count), 1);
463 #else
464 if (d->vnode)
465 d->vnode->ref_count++;
466 #endif
467
468 return fd;
469 }
470
471 static rt_err_t _do_send_recv_timeout(rt_channel_t ch, rt_channel_msg_t data, int need_reply, rt_channel_msg_t data_ret, rt_int32_t time, rt_ipc_msg_t msg);
472
473 /**
474 * Send data through an IPC channel, wait for the reply or not.
475 */
_send_recv_timeout(rt_channel_t ch,rt_channel_msg_t data,int need_reply,rt_channel_msg_t data_ret,rt_int32_t time)476 static rt_err_t _send_recv_timeout(rt_channel_t ch, rt_channel_msg_t data, int need_reply, rt_channel_msg_t data_ret, rt_int32_t time)
477 {
478 rt_ipc_msg_t msg;
479 rt_err_t rc = -RT_ERROR;
480
481 if (need_reply)
482 {
483 RT_DEBUG_NOT_IN_INTERRUPT;
484 }
485
486 if (ch == RT_NULL)
487 {
488 rc = -RT_EIO;
489 }
490 else
491 {
492 if (rt_object_get_type(&ch->parent.parent) != RT_Object_Class_Channel)
493 {
494 rc = -RT_EIO;
495 }
496 else if (need_reply && time == 0)
497 {
498 rc = -RT_ETIMEOUT;
499 }
500 else
501 {
502 /* allocate an IPC message */
503 msg = _ipc_msg_alloc();
504 if (!msg)
505 rc = -RT_ENOMEM;
506 else
507 rc = _do_send_recv_timeout(ch, data, need_reply, data_ret, time, msg);
508 }
509 }
510
511 return rc;
512 }
513
_do_send_recv_timeout(rt_channel_t ch,rt_channel_msg_t data,int need_reply,rt_channel_msg_t data_ret,rt_int32_t time,rt_ipc_msg_t msg)514 static rt_err_t _do_send_recv_timeout(rt_channel_t ch, rt_channel_msg_t data, int need_reply, rt_channel_msg_t data_ret, rt_int32_t time, rt_ipc_msg_t msg)
515 {
516 LWP_DEF_RETURN_CODE(rc);
517 rt_thread_t thread_recv;
518 rt_thread_t thread_send = 0;
519 void (*old_timeout_func)(void *) = 0;
520 rt_base_t level;
521
522 /* IPC message : file descriptor */
523 if (data->type == RT_CHANNEL_FD)
524 {
525 data->u.fd.file = _ipc_msg_get_file(data->u.fd.fd);
526 }
527
528 rt_ipc_msg_init(msg, data, need_reply);
529
530 if (need_reply)
531 {
532 thread_send = rt_thread_self();
533 thread_send->error = RT_EOK;
534 }
535
536 rc = RT_EOK;
537
538 level = rt_spin_lock_irqsave(&ch->slock);
539
540 switch (ch->stat)
541 {
542 case RT_IPC_STAT_IDLE:
543 case RT_IPC_STAT_ACTIVE:
544 if (need_reply)
545 {
546 rc = rt_channel_list_suspend(&ch->wait_thread, thread_send);
547 if (rc != RT_EOK)
548 {
549 _ipc_msg_free(msg);
550 }
551 else
552 {
553 rt_thread_wakeup_set(thread_send, wakeup_sender_wait_recv, (void *)ch);
554 if (time > 0)
555 {
556 rt_timer_control(&(thread_send->thread_timer),
557 RT_TIMER_CTRL_GET_FUNC,
558 &old_timeout_func);
559 rt_timer_control(&(thread_send->thread_timer),
560 RT_TIMER_CTRL_SET_FUNC,
561 sender_timeout);
562 /* reset the timeout of thread timer and start it */
563 rt_timer_control(&(thread_send->thread_timer),
564 RT_TIMER_CTRL_SET_TIME,
565 &time);
566 rt_timer_start(&(thread_send->thread_timer));
567 }
568 }
569 }
570
571 /**
572 * If there is no thread waiting for messages, chain the message
573 * into the list.
574 */
575 if (rc == RT_EOK)
576 rt_list_insert_before(&ch->wait_msg, &msg->mlist);
577 break;
578 case RT_IPC_STAT_WAIT:
579 /**
580 * If there are suspended receivers on the IPC channel, transfer the
581 * pointer of the message to the first receiver directly and wake it
582 * up.
583 */
584 RT_ASSERT(ch->parent.suspend_thread.next != &ch->parent.suspend_thread);
585
586 if (need_reply)
587 {
588 rc = rt_channel_list_suspend(&ch->wait_thread, thread_send);
589 if (rc != RT_EOK)
590 {
591 _ipc_msg_free(msg);
592 }
593 else
594 {
595 ch->reply = thread_send; /* record the current waiting sender */
596 ch->stat = RT_IPC_STAT_ACTIVE;
597 rt_thread_wakeup_set(thread_send, wakeup_sender_wait_reply, (void *)ch);
598 if (time > 0)
599 {
600 rt_timer_control(&(thread_send->thread_timer),
601 RT_TIMER_CTRL_GET_FUNC,
602 &old_timeout_func);
603 rt_timer_control(&(thread_send->thread_timer),
604 RT_TIMER_CTRL_SET_FUNC,
605 sender_timeout);
606 /* reset the timeout of thread timer and start it */
607 rt_timer_control(&(thread_send->thread_timer),
608 RT_TIMER_CTRL_SET_TIME,
609 &time);
610 rt_timer_start(&(thread_send->thread_timer));
611 }
612 }
613 }
614 else
615 {
616 ch->stat = RT_IPC_STAT_IDLE;
617 }
618
619 if (!need_reply || rc == RT_EOK)
620 {
621 rt_sched_lock_level_t slvl;
622 rt_sched_lock(&slvl);
623 thread_recv = RT_THREAD_LIST_NODE_ENTRY(ch->parent.suspend_thread.next);
624 thread_recv->msg_ret = msg; /* to the first suspended receiver */
625 thread_recv->error = RT_EOK;
626 rt_sched_unlock(slvl);
627 rt_channel_list_resume(&ch->parent.suspend_thread);
628 }
629 break;
630 default:
631 break;
632 }
633
634 if (rc == RT_EOK)
635 {
636 if (ch->stat == RT_IPC_STAT_IDLE)
637 {
638 _rt_channel_check_wq_wakup_locked(ch);
639 }
640 rt_spin_unlock_irqrestore(&ch->slock, level);
641
642 /* reschedule in order to let the potential receivers run */
643 rt_schedule();
644
645 if (need_reply)
646 {
647 if (old_timeout_func)
648 {
649 rt_timer_control(&(thread_send->thread_timer),
650 RT_TIMER_CTRL_SET_FUNC,
651 old_timeout_func);
652 }
653 rc = thread_send->error;
654
655 if (rc == RT_EOK)
656 {
657 /* If the sender gets the chance to run, the requested reply must be valid. */
658 RT_ASSERT(data_ret != RT_NULL);
659 *data_ret = ((rt_ipc_msg_t)(thread_send->msg_ret))->msg; /* extract data */
660 _ipc_msg_free(thread_send->msg_ret); /* put back the message to kernel */
661
662 thread_send->msg_ret = RT_NULL;
663 }
664 }
665 }
666 else
667 {
668 rt_spin_unlock_irqrestore(&ch->slock, level);
669 }
670
671 return rc;
672 }
673
674 /**
675 * Send data through an IPC channel with no reply.
676 */
rt_raw_channel_send(rt_channel_t ch,rt_channel_msg_t data)677 rt_err_t rt_raw_channel_send(rt_channel_t ch, rt_channel_msg_t data)
678 {
679 return _send_recv_timeout(ch, data, 0, 0, RT_WAITING_FOREVER);
680 }
681
682 /**
683 * Send data through an IPC channel and wait for the relpy.
684 */
rt_raw_channel_send_recv(rt_channel_t ch,rt_channel_msg_t data,rt_channel_msg_t data_ret)685 rt_err_t rt_raw_channel_send_recv(rt_channel_t ch, rt_channel_msg_t data, rt_channel_msg_t data_ret)
686 {
687 return _send_recv_timeout(ch, data, 1, data_ret, RT_WAITING_FOREVER);
688 }
689
690 /**
691 * Send data through an IPC channel and wait for the relpy.
692 */
rt_raw_channel_send_recv_timeout(rt_channel_t ch,rt_channel_msg_t data,rt_channel_msg_t data_ret,rt_int32_t time)693 rt_err_t rt_raw_channel_send_recv_timeout(rt_channel_t ch, rt_channel_msg_t data, rt_channel_msg_t data_ret, rt_int32_t time)
694 {
695 return _send_recv_timeout(ch, data, 1, data_ret, time);
696 }
697
698 /**
699 * Reply to the waiting sender and wake it up.
700 */
rt_raw_channel_reply(rt_channel_t ch,rt_channel_msg_t data)701 rt_err_t rt_raw_channel_reply(rt_channel_t ch, rt_channel_msg_t data)
702 {
703 LWP_DEF_RETURN_CODE(rc);
704 rt_ipc_msg_t msg;
705 struct rt_thread *thread;
706 rt_base_t level;
707
708 if (ch == RT_NULL)
709 {
710 rc = -RT_EIO;
711 }
712 else
713 {
714 level = rt_spin_lock_irqsave(&ch->slock);
715
716 if (rt_object_get_type(&ch->parent.parent) != RT_Object_Class_Channel)
717 {
718 rc = -RT_EIO;
719 }
720 else if (ch->stat != RT_IPC_STAT_ACTIVE)
721 {
722 rc = -RT_ERROR;
723 }
724 else if (ch->reply == RT_NULL)
725 {
726 rc = -RT_ERROR;
727 }
728 else
729 {
730 /* allocate an IPC message */
731 msg = _ipc_msg_alloc();
732 if (!msg)
733 {
734 rc = -RT_ENOMEM;
735 }
736 else
737 {
738 rt_ipc_msg_init(msg, data, 0);
739
740 thread = ch->reply;
741 thread->msg_ret = msg; /* transfer the reply to the sender */
742 rt_thread_resume(thread); /* wake up the sender */
743 ch->stat = RT_IPC_STAT_IDLE;
744 ch->reply = RT_NULL;
745
746 _rt_channel_check_wq_wakup_locked(ch);
747 rc = RT_EOK;
748 }
749 }
750 rt_spin_unlock_irqrestore(&ch->slock, level);
751
752 rt_schedule();
753 }
754
755 LWP_RETURN(rc);
756 }
757
wakeup_receiver(void * object,struct rt_thread * thread)758 static rt_err_t wakeup_receiver(void *object, struct rt_thread *thread)
759 {
760 rt_channel_t ch;
761 rt_err_t ret;
762 rt_base_t level;
763
764 ch = (rt_channel_t)object;
765
766 level = rt_spin_lock_irqsave(&ch->slock);
767 ch->stat = RT_IPC_STAT_IDLE;
768 thread->error = -RT_EINTR;
769 ret = rt_channel_list_resume(&ch->parent.suspend_thread);
770 _rt_channel_check_wq_wakup_locked(ch);
771 rt_spin_unlock_irqrestore(&ch->slock, level);
772
773 return ret;
774 }
775
receiver_timeout(void * parameter)776 static void receiver_timeout(void *parameter)
777 {
778 struct rt_thread *thread = (struct rt_thread *)parameter;
779 rt_channel_t ch;
780 rt_sched_lock_level_t slvl;
781
782 rt_sched_lock(&slvl);
783
784 ch = (rt_channel_t)(thread->wakeup_handle.user_data);
785
786 thread->error = -RT_ETIMEOUT;
787 thread->wakeup_handle.func = RT_NULL;
788
789 rt_spin_lock(&ch->slock);
790 ch->stat = RT_IPC_STAT_IDLE;
791
792 rt_list_remove(&RT_THREAD_LIST_NODE(thread));
793 /* insert to schedule ready list */
794 rt_sched_insert_thread(thread);
795
796 _rt_channel_check_wq_wakup_locked(ch);
797 rt_spin_unlock(&ch->slock);
798
799 /* do schedule */
800 rt_sched_unlock_n_resched(slvl);
801 }
802
803 /**
804 * Fetch a message from the specified IPC channel.
805 */
_rt_raw_channel_recv_timeout(rt_channel_t ch,rt_channel_msg_t data,rt_int32_t time)806 static rt_err_t _rt_raw_channel_recv_timeout(rt_channel_t ch, rt_channel_msg_t data, rt_int32_t time)
807 {
808 LWP_DEF_RETURN_CODE(rc);
809 struct rt_thread *thread;
810 rt_ipc_msg_t msg_ret;
811 void (*old_timeout_func)(void *) = 0;
812 rt_base_t level;
813
814 RT_DEBUG_NOT_IN_INTERRUPT;
815
816 if (ch == RT_NULL)
817 {
818 return -RT_EIO;
819 }
820
821 level = rt_spin_lock_irqsave(&ch->slock);
822
823 if (rt_object_get_type(&ch->parent.parent) != RT_Object_Class_Channel)
824 {
825 rc = -RT_EIO;
826 }
827 else if (ch->stat != RT_IPC_STAT_IDLE)
828 {
829 rc = -RT_ERROR;
830 }
831 else
832 {
833 if (ch->wait_msg.next != &ch->wait_msg) /* there exist unhandled messages */
834 {
835 msg_ret = rt_list_entry(ch->wait_msg.next, struct rt_ipc_msg, mlist);
836 rt_list_remove(ch->wait_msg.next); /* remove the message from the channel */
837 if (msg_ret->need_reply)
838 {
839 rt_sched_lock_level_t slvl;
840 rt_sched_lock(&slvl);
841 RT_ASSERT(ch->wait_thread.next != &ch->wait_thread);
842 thread = RT_THREAD_LIST_NODE_ENTRY(ch->wait_thread.next);
843 rt_list_remove(ch->wait_thread.next);
844 rt_sched_unlock(slvl);
845 ch->reply = thread; /* record the waiting sender */
846 ch->stat = RT_IPC_STAT_ACTIVE; /* no valid suspened receivers */
847 }
848 *data = msg_ret->msg; /* extract the transferred data */
849 if (data->type == RT_CHANNEL_FD)
850 {
851 data->u.fd.fd = _ipc_msg_fd_new(data->u.fd.file);
852 }
853 _ipc_msg_free(msg_ret); /* put back the message to kernel */
854 rc = RT_EOK;
855 }
856 else if (time == 0)
857 {
858 rc = -RT_ETIMEOUT;
859 }
860 else
861 {
862 /* no valid message, we must wait */
863 thread = rt_thread_self();
864
865 rc = rt_channel_list_suspend(&ch->parent.suspend_thread, thread);
866 if (rc == RT_EOK)
867 {
868 rt_thread_wakeup_set(thread, wakeup_receiver, (void *)ch);
869 ch->stat = RT_IPC_STAT_WAIT; /* no valid suspended senders */
870 thread->error = RT_EOK;
871 if (time > 0)
872 {
873 rt_timer_control(&(thread->thread_timer),
874 RT_TIMER_CTRL_GET_FUNC,
875 &old_timeout_func);
876 rt_timer_control(&(thread->thread_timer),
877 RT_TIMER_CTRL_SET_FUNC,
878 receiver_timeout);
879 /* reset the timeout of thread timer and start it */
880 rt_timer_control(&(thread->thread_timer),
881 RT_TIMER_CTRL_SET_TIME,
882 &time);
883 rt_timer_start(&(thread->thread_timer));
884 }
885 rt_spin_unlock_irqrestore(&ch->slock, level);
886
887 rt_schedule(); /* let the senders run */
888
889 if (old_timeout_func)
890 {
891 rt_timer_control(&(thread->thread_timer),
892 RT_TIMER_CTRL_SET_FUNC,
893 old_timeout_func);
894 }
895 rc = thread->error;
896 if (rc == RT_EOK)
897 {
898 /* If waked up, the received message has been store into the thread. */
899 *data = ((rt_ipc_msg_t)(thread->msg_ret))->msg; /* extract data */
900 if (data->type == RT_CHANNEL_FD)
901 {
902 data->u.fd.fd = _ipc_msg_fd_new(data->u.fd.file);
903 }
904 _ipc_msg_free(thread->msg_ret); /* put back the message to kernel */
905 thread->msg_ret = RT_NULL;
906 }
907 level = rt_spin_lock_irqsave(&ch->slock);
908 }
909 }
910 }
911
912 rt_spin_unlock_irqrestore(&ch->slock, level);
913
914 LWP_RETURN(rc);
915 }
916
rt_raw_channel_recv(rt_channel_t ch,rt_channel_msg_t data)917 rt_err_t rt_raw_channel_recv(rt_channel_t ch, rt_channel_msg_t data)
918 {
919 return _rt_raw_channel_recv_timeout(ch, data, RT_WAITING_FOREVER);
920 }
921
rt_raw_channel_recv_timeout(rt_channel_t ch,rt_channel_msg_t data,rt_int32_t time)922 rt_err_t rt_raw_channel_recv_timeout(rt_channel_t ch, rt_channel_msg_t data, rt_int32_t time)
923 {
924 return _rt_raw_channel_recv_timeout(ch, data, time);
925 }
926 /**
927 * Peek a message from the specified IPC channel.
928 */
rt_raw_channel_peek(rt_channel_t ch,rt_channel_msg_t data)929 rt_err_t rt_raw_channel_peek(rt_channel_t ch, rt_channel_msg_t data)
930 {
931 return _rt_raw_channel_recv_timeout(ch, data, 0);
932 }
933
934 /* for API */
935
lwp_fd_new(int fdt_type)936 static int lwp_fd_new(int fdt_type)
937 {
938 struct dfs_fdtable *fdt;
939
940 if (fdt_type)
941 {
942 fdt = dfs_fdtable_get_global();
943 }
944 else
945 {
946 fdt = dfs_fdtable_get();
947 }
948 return fdt_fd_new(fdt);
949 }
950
lwp_fd_get(int fdt_type,int fd)951 static struct dfs_file *lwp_fd_get(int fdt_type, int fd)
952 {
953 struct dfs_fdtable *fdt;
954
955 if (fdt_type)
956 {
957 fdt = dfs_fdtable_get_global();
958 }
959 else
960 {
961 fdt = dfs_fdtable_get();
962 }
963 return fdt_get_file(fdt, fd);
964 }
965
lwp_fd_release(int fdt_type,int fd)966 static void lwp_fd_release(int fdt_type, int fd)
967 {
968 struct dfs_fdtable *fdt;
969
970 if (fdt_type)
971 {
972 fdt = dfs_fdtable_get_global();
973 }
974 else
975 {
976 fdt = dfs_fdtable_get();
977 }
978 fdt_fd_release(fdt, fd);
979 }
980
_chfd_alloc(int fdt_type)981 static int _chfd_alloc(int fdt_type)
982 {
983 /* create a BSD socket */
984 int fd;
985
986 /* allocate a fd */
987 fd = lwp_fd_new(fdt_type);
988
989 if (fd < 0)
990 {
991 return -1;
992 }
993 return fd;
994 }
995
_chfd_free(int fd,int fdt_type)996 static void _chfd_free(int fd, int fdt_type)
997 {
998 struct dfs_file *d;
999
1000 d = lwp_fd_get(fdt_type, fd);
1001 if (d == RT_NULL)
1002 {
1003 return;
1004 }
1005 lwp_fd_release(fdt_type, fd);
1006 }
1007
1008 /* for fops */
channel_fops_poll(struct dfs_file * file,struct rt_pollreq * req)1009 static int channel_fops_poll(struct dfs_file *file, struct rt_pollreq *req)
1010 {
1011 int mask = POLLOUT;
1012 rt_channel_t ch;
1013 rt_base_t level;
1014
1015 ch = (rt_channel_t)file->vnode->data;
1016
1017 level = rt_spin_lock_irqsave(&ch->slock);
1018 rt_poll_add(&(ch->reader_queue), req);
1019 if (ch->stat != RT_IPC_STAT_IDLE)
1020 {
1021 rt_spin_unlock_irqrestore(&ch->slock, level);
1022 return mask;
1023 }
1024 if (!rt_list_isempty(&ch->wait_msg))
1025 {
1026 mask |= POLLIN;
1027 }
1028 rt_spin_unlock_irqrestore(&ch->slock, level);
1029
1030 return mask;
1031 }
1032
channel_fops_close(struct dfs_file * file)1033 static int channel_fops_close(struct dfs_file *file)
1034 {
1035 rt_channel_t ch;
1036 rt_base_t level;
1037 RT_DEBUG_NOT_IN_INTERRUPT;
1038
1039 ch = (rt_channel_t)file->vnode->data;
1040
1041 level = rt_spin_lock_irqsave(&ch->slock);
1042
1043 if (file->vnode->ref_count == 1)
1044 {
1045 ch->ref--;
1046 if (ch->ref == 0)
1047 {
1048 /* wakeup all the suspended receivers and senders */
1049 _channel_list_resume_all_locked(&ch->parent.suspend_thread);
1050 _channel_list_resume_all_locked(&ch->wait_thread);
1051
1052 /* all ipc msg will lost */
1053 rt_list_init(&ch->wait_msg);
1054
1055 rt_spin_unlock_irqrestore(&ch->slock, level);
1056
1057 rt_object_delete(&ch->parent.parent); /* release the IPC channel structure */
1058 }
1059 else
1060 {
1061 rt_spin_unlock_irqrestore(&ch->slock, level);
1062 }
1063 }
1064 else
1065 {
1066 rt_spin_unlock_irqrestore(&ch->slock, level);
1067 }
1068
1069 return 0;
1070 }
1071
1072 static const struct dfs_file_ops channel_fops =
1073 {
1074 .close = channel_fops_close, /* close */
1075 .poll = channel_fops_poll, /* poll */
1076 };
1077
lwp_channel_open(int fdt_type,const char * name,int flags)1078 int lwp_channel_open(int fdt_type, const char *name, int flags)
1079 {
1080 int fd;
1081 rt_channel_t ch = RT_NULL;
1082 struct dfs_file *d;
1083
1084 fd = _chfd_alloc(fdt_type); /* allocate an IPC channel descriptor */
1085 if (fd == -1)
1086 {
1087 goto quit;
1088 }
1089
1090 d = lwp_fd_get(fdt_type, fd);
1091 d->vnode = (struct dfs_vnode *)rt_malloc(sizeof(struct dfs_vnode));
1092 if (!d->vnode)
1093 {
1094 _chfd_free(fd, fdt_type);
1095 fd = -1;
1096 goto quit;
1097 }
1098
1099 ch = rt_raw_channel_open(name, flags);
1100 if (ch)
1101 {
1102 /* initialize vnode */
1103 dfs_vnode_init(d->vnode, FT_USER, &channel_fops);
1104 d->flags = O_RDWR; /* set flags as read and write */
1105
1106 /* set socket to the data of dfs_file */
1107 d->vnode->data = (void *)ch;
1108 }
1109 else
1110 {
1111 rt_free(d->vnode);
1112 d->vnode = RT_NULL;
1113 _chfd_free(fd, fdt_type);
1114 fd = -1;
1115 }
1116 quit:
1117 return fd;
1118 }
1119
fd_2_channel(int fdt_type,int fd)1120 static rt_channel_t fd_2_channel(int fdt_type, int fd)
1121 {
1122 struct dfs_file *d;
1123
1124 d = lwp_fd_get(fdt_type, fd);
1125 if (d)
1126 {
1127 rt_channel_t ch;
1128
1129 ch = (rt_channel_t)d->vnode->data;
1130 if (ch)
1131 {
1132 return ch;
1133 }
1134 }
1135 return RT_NULL;
1136 }
1137
lwp_channel_close(int fdt_type,int fd)1138 rt_err_t lwp_channel_close(int fdt_type, int fd)
1139 {
1140 rt_channel_t ch;
1141 struct dfs_file *d;
1142 struct dfs_vnode *vnode;
1143
1144 d = lwp_fd_get(fdt_type, fd);
1145 if (!d)
1146 {
1147 return -RT_EIO;
1148 }
1149
1150 vnode = d->vnode;
1151 if (!vnode)
1152 {
1153 return -RT_EIO;
1154 }
1155
1156 ch = fd_2_channel(fdt_type, fd);
1157 if (!ch)
1158 {
1159 return -RT_EIO;
1160 }
1161 _chfd_free(fd, fdt_type);
1162 if (vnode->ref_count == 1)
1163 {
1164 rt_free(vnode);
1165 return rt_raw_channel_close(ch);
1166 }
1167
1168 return 0;
1169 }
1170
lwp_channel_send(int fdt_type,int fd,rt_channel_msg_t data)1171 rt_err_t lwp_channel_send(int fdt_type, int fd, rt_channel_msg_t data)
1172 {
1173 rt_channel_t ch;
1174
1175 ch = fd_2_channel(fdt_type, fd);
1176 if (ch)
1177 {
1178 return rt_raw_channel_send(ch, data);
1179 }
1180 return -RT_EIO;
1181 }
1182
lwp_channel_send_recv_timeout(int fdt_type,int fd,rt_channel_msg_t data,rt_channel_msg_t data_ret,rt_int32_t time)1183 rt_err_t lwp_channel_send_recv_timeout(int fdt_type, int fd, rt_channel_msg_t data, rt_channel_msg_t data_ret, rt_int32_t time)
1184 {
1185 rt_channel_t ch;
1186
1187 ch = fd_2_channel(fdt_type, fd);
1188 if (ch)
1189 {
1190 return rt_raw_channel_send_recv_timeout(ch, data, data_ret, time);
1191 }
1192 return -RT_EIO;
1193 }
1194
lwp_channel_reply(int fdt_type,int fd,rt_channel_msg_t data)1195 rt_err_t lwp_channel_reply(int fdt_type, int fd, rt_channel_msg_t data)
1196 {
1197 rt_channel_t ch;
1198
1199 ch = fd_2_channel(fdt_type, fd);
1200 if (ch)
1201 {
1202 return rt_raw_channel_reply(ch, data);
1203 }
1204 return -RT_EIO;
1205 }
1206
lwp_channel_recv_timeout(int fdt_type,int fd,rt_channel_msg_t data,rt_int32_t time)1207 rt_err_t lwp_channel_recv_timeout(int fdt_type, int fd, rt_channel_msg_t data, rt_int32_t time)
1208 {
1209 rt_channel_t ch;
1210
1211 ch = fd_2_channel(fdt_type, fd);
1212 if (ch)
1213 {
1214 return rt_raw_channel_recv_timeout(ch, data, time);
1215 }
1216 return -RT_EIO;
1217 }
1218
rt_channel_open(const char * name,int flags)1219 int rt_channel_open(const char *name, int flags)
1220 {
1221 return lwp_channel_open(FDT_TYPE_KERNEL, name, flags);
1222 }
1223
rt_channel_close(int fd)1224 rt_err_t rt_channel_close(int fd)
1225 {
1226 return lwp_channel_close(FDT_TYPE_KERNEL, fd);
1227 }
1228
rt_channel_send(int fd,rt_channel_msg_t data)1229 rt_err_t rt_channel_send(int fd, rt_channel_msg_t data)
1230 {
1231 return lwp_channel_send(FDT_TYPE_KERNEL, fd, data);
1232 }
1233
rt_channel_send_recv_timeout(int fd,rt_channel_msg_t data,rt_channel_msg_t data_ret,rt_int32_t time)1234 rt_err_t rt_channel_send_recv_timeout(int fd, rt_channel_msg_t data, rt_channel_msg_t data_ret, rt_int32_t time)
1235 {
1236 return lwp_channel_send_recv_timeout(FDT_TYPE_KERNEL, fd, data, data_ret, time);
1237 }
1238
rt_channel_send_recv(int fd,rt_channel_msg_t data,rt_channel_msg_t data_ret)1239 rt_err_t rt_channel_send_recv(int fd, rt_channel_msg_t data, rt_channel_msg_t data_ret)
1240 {
1241 return lwp_channel_send_recv_timeout(FDT_TYPE_KERNEL, fd, data, data_ret, RT_WAITING_FOREVER);
1242 }
1243
rt_channel_reply(int fd,rt_channel_msg_t data)1244 rt_err_t rt_channel_reply(int fd, rt_channel_msg_t data)
1245 {
1246 return lwp_channel_reply(FDT_TYPE_KERNEL, fd, data);
1247 }
1248
rt_channel_recv_timeout(int fd,rt_channel_msg_t data,rt_int32_t time)1249 rt_err_t rt_channel_recv_timeout(int fd, rt_channel_msg_t data, rt_int32_t time)
1250 {
1251 return lwp_channel_recv_timeout(FDT_TYPE_KERNEL, fd, data, time);
1252 }
1253
rt_channel_recv(int fd,rt_channel_msg_t data)1254 rt_err_t rt_channel_recv(int fd, rt_channel_msg_t data)
1255 {
1256 return lwp_channel_recv_timeout(FDT_TYPE_KERNEL, fd, data, RT_WAITING_FOREVER);
1257 }
1258
rt_channel_peek(int fd,rt_channel_msg_t data)1259 rt_err_t rt_channel_peek(int fd, rt_channel_msg_t data)
1260 {
1261 return lwp_channel_recv_timeout(FDT_TYPE_KERNEL, fd, data, 0);
1262 }
1263
list_channel(void)1264 static int list_channel(void)
1265 {
1266 rt_channel_t *channels;
1267 rt_ubase_t index, count;
1268 struct rt_object *object;
1269 struct rt_list_node *node;
1270 struct rt_object_information *information;
1271
1272 RT_DEBUG_NOT_IN_INTERRUPT;
1273
1274 const char *stat_strs[] = {"idle", "wait", "active"};
1275
1276 information = rt_object_get_information(RT_Object_Class_Channel);
1277 RT_ASSERT(information != RT_NULL);
1278
1279 count = 0;
1280 rt_mutex_take(&_chn_obj_lock, RT_WAITING_FOREVER);
1281 /* get the count of IPC channels */
1282 for (node = information->object_list.next;
1283 node != &(information->object_list);
1284 node = node->next)
1285 {
1286 count++;
1287 }
1288 rt_mutex_release(&_chn_obj_lock);
1289
1290 if (count == 0)
1291 return 0;
1292
1293 channels = (rt_channel_t *)rt_calloc(count, sizeof(rt_channel_t));
1294 if (channels == RT_NULL)
1295 return 0; /* out of memory */
1296
1297 rt_mutex_take(&_chn_obj_lock, RT_WAITING_FOREVER);
1298 /* retrieve pointer of IPC channels */
1299 for (index = 0, node = information->object_list.next;
1300 index < count && node != &(information->object_list);
1301 node = node->next)
1302 {
1303 object = rt_list_entry(node, struct rt_object, list);
1304
1305 channels[index] = (rt_channel_t)object;
1306 index++;
1307 }
1308 rt_mutex_release(&_chn_obj_lock);
1309
1310 rt_kprintf(" channel state\n");
1311 rt_kprintf("-------- -------\n");
1312 for (index = 0; index < count; index++)
1313 {
1314 if (channels[index] != RT_NULL)
1315 {
1316 rt_kprintf("%-*.s", RT_NAME_MAX, channels[index]->parent.parent.name);
1317 if (channels[index]->stat < 3)
1318 rt_kprintf(" %s\n", stat_strs[channels[index]->stat]);
1319 }
1320 }
1321
1322 rt_free(channels);
1323
1324 return 0;
1325 }
1326 MSH_CMD_EXPORT(list_channel, list IPC channel information);
1327