1 /*
2 * Copyright (c) 2006-2023, RT-Thread Development Team
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 *
6 * Change Logs:
7 * Date Author Notes
8 * 2023-11-20 Shell Support of condition variable
9 */
10 #define DBG_TAG "ipc.condvar"
11 #define DBG_LVL DBG_INFO
12 #include <rtdbg.h>
13
14 #include <rtdevice.h>
15 #include <rtatomic.h>
16 #include <rtthread.h>
17
18 static struct rt_spinlock _local_cv_queue_lock = RT_SPINLOCK_INIT;
19
20 #define CV_ASSERT_LOCKED(cv) \
21 RT_ASSERT(!(cv)->waiting_mtx || \
22 rt_mutex_get_owner((rt_mutex_t)(cv)->waiting_mtx) == \
23 rt_thread_self())
24
rt_condvar_init(rt_condvar_t cv,char * name)25 void rt_condvar_init(rt_condvar_t cv, char *name)
26 {
27 #ifdef USING_RT_OBJECT
28 /* TODO: support rt object */
29 rt_object_init();
30 #endif
31
32 rt_wqueue_init(&cv->event);
33 rt_atomic_store(&cv->waiters_cnt, 0);
34 rt_atomic_store(&cv->waiting_mtx, 0);
35 }
36
_waitq_inqueue(rt_wqueue_t * queue,struct rt_wqueue_node * node,rt_tick_t timeout,int suspend_flag)37 static int _waitq_inqueue(rt_wqueue_t *queue, struct rt_wqueue_node *node,
38 rt_tick_t timeout, int suspend_flag)
39 {
40 rt_thread_t tcb = node->polling_thread;
41 rt_timer_t timer = &(tcb->thread_timer);
42 rt_err_t ret;
43
44 if (queue->flag != RT_WQ_FLAG_WAKEUP)
45 {
46 ret = rt_thread_suspend_with_flag(tcb, suspend_flag);
47 if (ret == RT_EOK)
48 {
49 rt_wqueue_add(queue, node);
50 if (timeout != RT_WAITING_FOREVER)
51 {
52 rt_timer_control(timer, RT_TIMER_CTRL_SET_TIME, &timeout);
53
54 rt_timer_start(timer);
55 }
56 }
57 }
58 else
59 {
60 ret = RT_EOK;
61 }
62
63 return ret;
64 }
65
66 #define INIT_WAITQ_NODE(node) \
67 { \
68 .polling_thread = rt_thread_self(), .key = 0, \
69 .wakeup = __wqueue_default_wake, .wqueue = &cv->event, \
70 .list = RT_LIST_OBJECT_INIT(node.list) \
71 }
72
rt_condvar_timedwait(rt_condvar_t cv,rt_mutex_t mtx,int suspend_flag,rt_tick_t timeout)73 int rt_condvar_timedwait(rt_condvar_t cv, rt_mutex_t mtx, int suspend_flag,
74 rt_tick_t timeout)
75 {
76 rt_err_t acq_mtx_succ, rc;
77 rt_atomic_t waiting_mtx;
78 struct rt_wqueue_node node = INIT_WAITQ_NODE(node);
79
80 /* not allowed in IRQ & critical section */
81 RT_DEBUG_SCHEDULER_AVAILABLE(1);
82 CV_ASSERT_LOCKED(cv);
83
84 /**
85 * for the worst case, this is racy with the following works to reset field
86 * before mutex is taken. The spinlock then comes to rescue.
87 */
88 rt_spin_lock(&_local_cv_queue_lock);
89 waiting_mtx = rt_atomic_load(&cv->waiting_mtx);
90 if (!waiting_mtx)
91 acq_mtx_succ = rt_atomic_compare_exchange_strong(
92 &cv->waiting_mtx, &waiting_mtx, (size_t)mtx);
93 else
94 acq_mtx_succ = 0;
95
96 rt_spin_unlock(&_local_cv_queue_lock);
97
98 if (acq_mtx_succ == 1 || waiting_mtx == (size_t)mtx)
99 {
100 rt_atomic_add(&cv->waiters_cnt, 1);
101
102 rt_enter_critical();
103
104 if (suspend_flag == RT_INTERRUPTIBLE)
105 rc = _waitq_inqueue(&cv->event, &node, timeout, RT_INTERRUPTIBLE);
106 else /* UNINTERRUPTIBLE is forbidden, since it's not safe for user space */
107 rc = _waitq_inqueue(&cv->event, &node, timeout, RT_KILLABLE);
108
109 acq_mtx_succ = rt_mutex_release(mtx);
110 RT_ASSERT(acq_mtx_succ == 0);
111 rt_exit_critical();
112
113 if (rc == RT_EOK)
114 {
115 rt_schedule();
116
117 rc = rt_get_errno();
118 rc = rc > 0 ? -rc : rc;
119 }
120 else
121 {
122 LOG_D("%s() failed to suspend", __func__);
123 }
124
125 rt_wqueue_remove(&node);
126
127 rt_spin_lock(&_local_cv_queue_lock);
128 if (rt_atomic_add(&cv->waiters_cnt, -1) == 1)
129 {
130 waiting_mtx = (size_t)mtx;
131 acq_mtx_succ = rt_atomic_compare_exchange_strong(&cv->waiting_mtx,
132 &waiting_mtx, 0);
133 RT_ASSERT(acq_mtx_succ == 1);
134 }
135 rt_spin_unlock(&_local_cv_queue_lock);
136
137 acq_mtx_succ = rt_mutex_take(mtx, RT_WAITING_FOREVER);
138 RT_ASSERT(acq_mtx_succ == 0);
139 }
140 else
141 {
142 LOG_D("%s: conflict waiting mutex", __func__);
143 rc = -EBUSY;
144 }
145
146 return rc;
147 }
148
149 /** Keep in mind that we always operating when cv.waiting_mtx is taken */
150
rt_condvar_signal(rt_condvar_t cv)151 int rt_condvar_signal(rt_condvar_t cv)
152 {
153 CV_ASSERT_LOCKED(cv);
154
155 /* to avoid spurious wakeups */
156 if (rt_atomic_load(&cv->waiters_cnt) > 0)
157 rt_wqueue_wakeup(&cv->event, 0);
158
159 cv->event.flag = 0;
160 return 0;
161 }
162
rt_condvar_broadcast(rt_condvar_t cv)163 int rt_condvar_broadcast(rt_condvar_t cv)
164 {
165 CV_ASSERT_LOCKED(cv);
166
167 /* to avoid spurious wakeups */
168 if (rt_atomic_load(&cv->waiters_cnt) > 0)
169 rt_wqueue_wakeup_all(&cv->event, 0);
170
171 cv->event.flag = 0;
172 return 0;
173 }
174