1 /*
2  * Copyright (c) 2006-2023, RT-Thread Development Team
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  *
6  * Change Logs:
7  * Date           Author       Notes
8  * 2023-11-20     Shell        Support of condition variable
9  */
10 #define DBG_TAG "ipc.condvar"
11 #define DBG_LVL DBG_INFO
12 #include <rtdbg.h>
13 
14 #include <rtdevice.h>
15 #include <rtatomic.h>
16 #include <rtthread.h>
17 
18 static struct rt_spinlock _local_cv_queue_lock = RT_SPINLOCK_INIT;
19 
20 #define CV_ASSERT_LOCKED(cv)                                       \
21     RT_ASSERT(!(cv)->waiting_mtx ||                                \
22               rt_mutex_get_owner((rt_mutex_t)(cv)->waiting_mtx) == \
23                   rt_thread_self())
24 
rt_condvar_init(rt_condvar_t cv,char * name)25 void rt_condvar_init(rt_condvar_t cv, char *name)
26 {
27 #ifdef USING_RT_OBJECT
28     /* TODO: support rt object */
29     rt_object_init();
30 #endif
31 
32     rt_wqueue_init(&cv->event);
33     rt_atomic_store(&cv->waiters_cnt, 0);
34     rt_atomic_store(&cv->waiting_mtx, 0);
35 }
36 
_waitq_inqueue(rt_wqueue_t * queue,struct rt_wqueue_node * node,rt_tick_t timeout,int suspend_flag)37 static int _waitq_inqueue(rt_wqueue_t *queue, struct rt_wqueue_node *node,
38                           rt_tick_t timeout, int suspend_flag)
39 {
40     rt_thread_t tcb = node->polling_thread;
41     rt_timer_t timer = &(tcb->thread_timer);
42     rt_err_t ret;
43 
44     if (queue->flag != RT_WQ_FLAG_WAKEUP)
45     {
46         ret = rt_thread_suspend_with_flag(tcb, suspend_flag);
47         if (ret == RT_EOK)
48         {
49             rt_wqueue_add(queue, node);
50             if (timeout != RT_WAITING_FOREVER)
51             {
52                 rt_timer_control(timer, RT_TIMER_CTRL_SET_TIME, &timeout);
53 
54                 rt_timer_start(timer);
55             }
56         }
57     }
58     else
59     {
60         ret = RT_EOK;
61     }
62 
63     return ret;
64 }
65 
66 #define INIT_WAITQ_NODE(node)                                  \
67     {                                                          \
68         .polling_thread = rt_thread_self(), .key = 0,          \
69         .wakeup = __wqueue_default_wake, .wqueue = &cv->event, \
70         .list = RT_LIST_OBJECT_INIT(node.list)                 \
71     }
72 
rt_condvar_timedwait(rt_condvar_t cv,rt_mutex_t mtx,int suspend_flag,rt_tick_t timeout)73 int rt_condvar_timedwait(rt_condvar_t cv, rt_mutex_t mtx, int suspend_flag,
74                          rt_tick_t timeout)
75 {
76     rt_err_t acq_mtx_succ, rc;
77     rt_atomic_t waiting_mtx;
78     struct rt_wqueue_node node = INIT_WAITQ_NODE(node);
79 
80     /* not allowed in IRQ & critical section */
81     RT_DEBUG_SCHEDULER_AVAILABLE(1);
82     CV_ASSERT_LOCKED(cv);
83 
84     /**
85      * for the worst case, this is racy with the following works to reset field
86      * before mutex is taken. The spinlock then comes to rescue.
87      */
88     rt_spin_lock(&_local_cv_queue_lock);
89     waiting_mtx = rt_atomic_load(&cv->waiting_mtx);
90     if (!waiting_mtx)
91         acq_mtx_succ = rt_atomic_compare_exchange_strong(
92             &cv->waiting_mtx, &waiting_mtx, (size_t)mtx);
93     else
94         acq_mtx_succ = 0;
95 
96     rt_spin_unlock(&_local_cv_queue_lock);
97 
98     if (acq_mtx_succ == 1 || waiting_mtx == (size_t)mtx)
99     {
100         rt_atomic_add(&cv->waiters_cnt, 1);
101 
102         rt_enter_critical();
103 
104         if (suspend_flag == RT_INTERRUPTIBLE)
105             rc = _waitq_inqueue(&cv->event, &node, timeout, RT_INTERRUPTIBLE);
106         else /* UNINTERRUPTIBLE is forbidden, since it's not safe for user space */
107             rc = _waitq_inqueue(&cv->event, &node, timeout, RT_KILLABLE);
108 
109         acq_mtx_succ = rt_mutex_release(mtx);
110         RT_ASSERT(acq_mtx_succ == 0);
111         rt_exit_critical();
112 
113         if (rc == RT_EOK)
114         {
115             rt_schedule();
116 
117             rc = rt_get_errno();
118             rc = rc > 0 ? -rc : rc;
119         }
120         else
121         {
122             LOG_D("%s() failed to suspend", __func__);
123         }
124 
125         rt_wqueue_remove(&node);
126 
127         rt_spin_lock(&_local_cv_queue_lock);
128         if (rt_atomic_add(&cv->waiters_cnt, -1) == 1)
129         {
130             waiting_mtx = (size_t)mtx;
131             acq_mtx_succ = rt_atomic_compare_exchange_strong(&cv->waiting_mtx,
132                                                       &waiting_mtx, 0);
133             RT_ASSERT(acq_mtx_succ == 1);
134         }
135         rt_spin_unlock(&_local_cv_queue_lock);
136 
137         acq_mtx_succ = rt_mutex_take(mtx, RT_WAITING_FOREVER);
138         RT_ASSERT(acq_mtx_succ == 0);
139     }
140     else
141     {
142         LOG_D("%s: conflict waiting mutex", __func__);
143         rc = -EBUSY;
144     }
145 
146     return rc;
147 }
148 
149 /** Keep in mind that we always operating when cv.waiting_mtx is taken */
150 
rt_condvar_signal(rt_condvar_t cv)151 int rt_condvar_signal(rt_condvar_t cv)
152 {
153     CV_ASSERT_LOCKED(cv);
154 
155     /* to avoid spurious wakeups */
156     if (rt_atomic_load(&cv->waiters_cnt) > 0)
157         rt_wqueue_wakeup(&cv->event, 0);
158 
159     cv->event.flag = 0;
160     return 0;
161 }
162 
rt_condvar_broadcast(rt_condvar_t cv)163 int rt_condvar_broadcast(rt_condvar_t cv)
164 {
165     CV_ASSERT_LOCKED(cv);
166 
167     /* to avoid spurious wakeups */
168     if (rt_atomic_load(&cv->waiters_cnt) > 0)
169         rt_wqueue_wakeup_all(&cv->event, 0);
170 
171     cv->event.flag = 0;
172     return 0;
173 }
174