1 /*
2 * Copyright (c) 2006-2024, RT-Thread Development Team
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 *
6 * Change Logs:
7 * Date Author Notes
8 * 2024-04-26 Shell lockless rt_completion for MP system
9 */
10
11 #define DBG_TAG "drivers.ipc"
12 #define DBG_LVL DBG_INFO
13 #include <rtdbg.h>
14
15 #include <rtthread.h>
16 #include <rthw.h>
17 #include <rtdevice.h>
18
19 #define RT_COMPLETED 1
20 #define RT_UNCOMPLETED 0
21 #define RT_WAKING (-1)
22 #define RT_OCCUPIED (-2)
23
24 #define RT_COMPLETION_NEW_STAT(thread, flag) (((flag) & 1) | (((rt_base_t)thread) & ~1))
25
26 /**
27 * The C11 atomic can be ~5% and even faster in testing on the arm64 platform
28 * compared to rt_atomic. So the C11 way is always preferred.
29 */
30 #ifdef RT_USING_STDC_ATOMIC
31 #include <stdatomic.h>
32
33 #define IPC_STORE(dst, val, morder) atomic_store_explicit(dst, val, morder)
34 #define IPC_LOAD(dst, morder) atomic_load_explicit(dst, morder)
35 #define IPC_BARRIER(morder) atomic_thread_fence(morder)
36 #define IPC_CAS(dst, exp, desired, succ, fail) \
37 atomic_compare_exchange_strong_explicit(dst, exp, desired, succ, fail)
38
39 #else /* !RT_USING_STDC_ATOMIC */
40 #include <rtatomic.h>
41
42 #define IPC_STORE(dst, val, morder) rt_atomic_store(dst, val)
43 #define IPC_LOAD(dst, morder) rt_atomic_load(dst)
44 #define IPC_BARRIER(morder)
45 #define IPC_CAS(dst, exp, desired, succ, fail) \
46 rt_atomic_compare_exchange_strong(dst, exp, desired)
47 #endif /* RT_USING_STDC_ATOMIC */
48
49 static rt_err_t _comp_susp_thread(struct rt_completion *completion,
50 rt_thread_t thread, rt_int32_t timeout,
51 int suspend_flag);
52
53 /**
54 * @brief This function will initialize a completion object.
55 *
56 * @param completion is a pointer to a completion object.
57 */
rt_completion_init(struct rt_completion * completion)58 void rt_completion_init(struct rt_completion *completion)
59 {
60 RT_ASSERT(completion != RT_NULL);
61
62 IPC_STORE(&completion->susp_thread_n_flag, RT_UNCOMPLETED,
63 memory_order_relaxed);
64 }
65 RTM_EXPORT(rt_completion_init);
66
67 /**
68 * @brief This is same as rt_completion_wait_flags(), except that this API is NOT
69 * ISR-safe (you can NOT call completion_done() on isr routine).
70 *
71 * @param completion is a pointer to a completion object.
72 * @param timeout is a timeout period (unit: OS ticks). If the completion is unavailable, the thread will wait for
73 * the completion done up to the amount of time specified by the argument.
74 * NOTE: Generally, we use the macro RT_WAITING_FOREVER to set this parameter, which means that when the
75 * completion is unavailable, the thread will be waitting forever.
76 * @param suspend_flag suspend flags. See rt_thread_suspend_with_flag()
77 *
78 * @return Return the operation status. ONLY when the return value is RT_EOK, the operation is successful.
79 * If the return value is any other values, it means that the completion wait failed.
80 *
81 * @warning This function can ONLY be called in the thread context. It MUST NOT be called in interrupt context.
82 */
rt_completion_wait_flags_noisr(struct rt_completion * completion,rt_int32_t timeout,int suspend_flag)83 rt_err_t rt_completion_wait_flags_noisr(struct rt_completion *completion,
84 rt_int32_t timeout, int suspend_flag)
85 {
86 rt_err_t result = -RT_ERROR;
87 rt_thread_t thread;
88 rt_bool_t exchange_succ;
89 rt_base_t expected_value;
90 RT_ASSERT(completion != RT_NULL);
91
92 /* current context checking */
93 RT_DEBUG_SCHEDULER_AVAILABLE(timeout != 0);
94
95 thread = rt_thread_self();
96
97 do
98 {
99 /* try to consume one completion */
100 expected_value = RT_COMPLETED;
101 exchange_succ =
102 IPC_CAS(&completion->susp_thread_n_flag, &expected_value,
103 RT_UNCOMPLETED, memory_order_acquire, memory_order_relaxed);
104
105 if (exchange_succ)
106 {
107 /* consume succeed, now return EOK */
108 result = RT_EOK;
109 break;
110 }
111 else if (expected_value == RT_WAKING)
112 {
113 /* previous wake is not done yet, yield thread & try again */
114 rt_thread_yield();
115 }
116 else
117 {
118 /**
119 * API rules say: only one thread can suspend on complete.
120 * So we assert if debug.
121 */
122 RT_ASSERT(expected_value == RT_UNCOMPLETED);
123
124 if (timeout != 0)
125 {
126 /**
127 * try to occupy completion, noted that we are assuming that
128 * `expected_value == RT_UNCOMPLETED`
129 */
130 exchange_succ = IPC_CAS(
131 &completion->susp_thread_n_flag, &expected_value,
132 RT_OCCUPIED, memory_order_relaxed, memory_order_relaxed);
133
134 if (exchange_succ)
135 {
136 /* complete waiting business and return result */
137 result = _comp_susp_thread(completion, thread, timeout,
138 suspend_flag);
139
140 RT_ASSERT(rt_atomic_load(&completion->susp_thread_n_flag) !=
141 RT_OCCUPIED);
142
143 break;
144 }
145 else
146 {
147 /* try again */
148 }
149 }
150 else
151 {
152 result = -RT_ETIMEOUT;
153 break;
154 }
155 }
156 } while (1);
157
158 return result;
159 }
160
161 /**
162 * @brief This function will wait for a completion, if the completion is unavailable, the thread shall wait for
163 * the completion up to a specified time.
164 *
165 * @param completion is a pointer to a completion object.
166 * @param timeout is a timeout period (unit: OS ticks). If the completion is unavailable, the thread will wait for
167 * the completion done up to the amount of time specified by the argument.
168 * NOTE: Generally, we use the macro RT_WAITING_FOREVER to set this parameter, which means that when the
169 * completion is unavailable, the thread will be waitting forever.
170 * @param suspend_flag suspend flags. See rt_thread_suspend_with_flag()
171 *
172 * @return Return the operation status. ONLY when the return value is RT_EOK, the operation is successful.
173 * If the return value is any other values, it means that the completion wait failed.
174 *
175 * @warning This function can ONLY be called in the thread context. It MUST NOT be called in interrupt context.
176 */
rt_completion_wait_flags(struct rt_completion * completion,rt_int32_t timeout,int suspend_flag)177 rt_err_t rt_completion_wait_flags(struct rt_completion *completion,
178 rt_int32_t timeout, int suspend_flag)
179 {
180 rt_err_t error;
181 rt_ubase_t level;
182 level = rt_hw_local_irq_disable();
183 error = rt_completion_wait_flags_noisr(completion, timeout, suspend_flag);
184 rt_hw_local_irq_enable(level);
185 return error;
186 }
187
_wait_until_update(struct rt_completion * completion,rt_base_t expected)188 static rt_base_t _wait_until_update(struct rt_completion *completion, rt_base_t expected)
189 {
190 rt_base_t current_value;
191
192 /* spinning for update */
193 do
194 {
195 rt_hw_isb();
196 current_value =
197 IPC_LOAD(&completion->susp_thread_n_flag, memory_order_relaxed);
198 } while (current_value == expected);
199
200 return current_value;
201 }
202
203 /**
204 * Try to suspend thread and update completion
205 */
_comp_susp_thread(struct rt_completion * completion,rt_thread_t thread,rt_int32_t timeout,int suspend_flag)206 static rt_err_t _comp_susp_thread(struct rt_completion *completion,
207 rt_thread_t thread, rt_int32_t timeout,
208 int suspend_flag)
209 {
210 rt_err_t error = -RT_ERROR;
211 rt_base_t clevel;
212 rt_base_t comp_waiting;
213
214 /* suspend thread */
215 clevel = rt_enter_critical();
216
217 /* reset thread error number */
218 thread->error = RT_EOK;
219
220 error = rt_thread_suspend_with_flag(thread, suspend_flag);
221
222 if (error)
223 {
224 rt_exit_critical_safe(clevel);
225 RT_ASSERT(rt_atomic_load(&completion->susp_thread_n_flag) ==
226 RT_OCCUPIED);
227 IPC_STORE(&completion->susp_thread_n_flag, RT_UNCOMPLETED,
228 memory_order_relaxed);
229 }
230 else
231 {
232 /* set to waiting */
233 comp_waiting = RT_COMPLETION_NEW_STAT(thread, RT_UNCOMPLETED);
234 RT_ASSERT(rt_atomic_load(&completion->susp_thread_n_flag) ==
235 RT_OCCUPIED);
236 IPC_STORE(&completion->susp_thread_n_flag, comp_waiting,
237 memory_order_relaxed);
238
239 /* current context checking */
240 RT_DEBUG_NOT_IN_INTERRUPT;
241
242 /* start timer */
243 if (timeout > 0)
244 {
245 /* reset the timeout of thread timer and start it */
246 rt_timer_control(&(thread->thread_timer),
247 RT_TIMER_CTRL_SET_TIME,
248 &timeout);
249 rt_timer_start(&(thread->thread_timer));
250 }
251
252 /* do schedule */
253 rt_schedule();
254
255 rt_exit_critical_safe(clevel);
256
257 /* thread is woken up */
258 error = thread->error;
259 error = error > 0 ? -error : error;
260
261 /* clean completed flag & remove susp_thread on the case of waking by timeout */
262 if (!error)
263 {
264 /* completion done successfully */
265 RT_ASSERT(rt_atomic_load(&completion->susp_thread_n_flag) !=
266 comp_waiting);
267
268 /* the necessary barrier is done during thread sched */
269 }
270 else
271 {
272 /* try to cancel waiting if woken up expectedly or timeout */
273 if (!IPC_CAS(&completion->susp_thread_n_flag, &comp_waiting,
274 RT_UNCOMPLETED, memory_order_relaxed,
275 memory_order_relaxed))
276 {
277 /* cancel failed, producer had woken us in the past, fix error */
278 if (comp_waiting == RT_WAKING)
279 {
280 _wait_until_update(completion, RT_WAKING);
281 }
282 IPC_BARRIER(memory_order_acquire);
283 error = RT_EOK;
284 }
285 }
286 }
287
288 return error;
289 }
290
291 /**
292 * @brief This function indicates a completion has done and wakeup the thread
293 * and update its errno. No update is applied if it's a negative value.
294 *
295 * @param completion is a pointer to a completion object.
296 * @param thread_errno is the errno set to waking thread.
297 * @return RT_EOK if wakeup succeed.
298 * RT_EEMPTY if wakeup failure and the completion is set to completed.
299 * RT_EBUSY if the completion is still in completed state
300 */
rt_completion_wakeup_by_errno(struct rt_completion * completion,rt_err_t thread_errno)301 rt_err_t rt_completion_wakeup_by_errno(struct rt_completion *completion,
302 rt_err_t thread_errno)
303 {
304 rt_err_t error = -RT_ERROR;
305 rt_thread_t suspend_thread;
306 rt_bool_t exchange_succ;
307 rt_base_t expected_value;
308 RT_ASSERT(completion != RT_NULL);
309
310 do
311 {
312 /* try to transform from uncompleted to completed */
313 expected_value = RT_UNCOMPLETED;
314
315 exchange_succ =
316 IPC_CAS(&completion->susp_thread_n_flag, &expected_value,
317 RT_COMPLETED, memory_order_release, memory_order_relaxed);
318
319 if (exchange_succ)
320 {
321 error = -RT_EEMPTY;
322 break;
323 }
324 else
325 {
326 if (expected_value == RT_COMPLETED)
327 {
328 /* completion still in completed state */
329 error = -RT_EBUSY;
330 break;
331 }
332 else if (expected_value == RT_OCCUPIED ||
333 expected_value == RT_WAKING)
334 {
335 continue;
336 }
337 else
338 {
339 /* try to resume the thread and set uncompleted */
340 exchange_succ = IPC_CAS(
341 &completion->susp_thread_n_flag, &expected_value,
342 RT_WAKING, memory_order_relaxed, memory_order_relaxed);
343
344 if (exchange_succ)
345 {
346 #define GET_THREAD(val) ((rt_thread_t)((val) & ~1))
347 suspend_thread = GET_THREAD(expected_value);
348
349 if (thread_errno >= 0)
350 {
351 suspend_thread->error = thread_errno;
352 }
353
354 /* safe to assume publication done even on resume failure */
355 RT_ASSERT(rt_atomic_load(&completion->susp_thread_n_flag) ==
356 RT_WAKING);
357 IPC_STORE(&completion->susp_thread_n_flag, RT_UNCOMPLETED,
358 memory_order_release);
359 rt_thread_resume(suspend_thread);
360 error = RT_EOK;
361 break;
362 }
363 else
364 {
365 /* failed in racing to resume thread, try again */
366 }
367 }
368 }
369 } while (1);
370
371 return error;
372 }
373