1 /*
2  * Copyright (c) 2006-2024, RT-Thread Development Team
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  *
6  * Change Logs:
7  * Date           Author       Notes
8  * 2024-04-26     Shell        lockless rt_completion for MP system
9  */
10 
11 #define DBG_TAG           "drivers.ipc"
12 #define DBG_LVL           DBG_INFO
13 #include <rtdbg.h>
14 
15 #include <rtthread.h>
16 #include <rthw.h>
17 #include <rtdevice.h>
18 
19 #define RT_COMPLETED        1
20 #define RT_UNCOMPLETED      0
21 #define RT_WAKING           (-1)
22 #define RT_OCCUPIED         (-2)
23 
24 #define RT_COMPLETION_NEW_STAT(thread, flag) (((flag) & 1) | (((rt_base_t)thread) & ~1))
25 
26 /**
27  * The C11 atomic can be ~5% and even faster in testing on the arm64 platform
28  * compared to rt_atomic. So the C11 way is always preferred.
29  */
30 #ifdef RT_USING_STDC_ATOMIC
31 #include <stdatomic.h>
32 
33 #define IPC_STORE(dst, val, morder) atomic_store_explicit(dst, val, morder)
34 #define IPC_LOAD(dst, morder)       atomic_load_explicit(dst, morder)
35 #define IPC_BARRIER(morder)         atomic_thread_fence(morder)
36 #define IPC_CAS(dst, exp, desired, succ, fail) \
37     atomic_compare_exchange_strong_explicit(dst, exp, desired, succ, fail)
38 
39 #else /* !RT_USING_STDC_ATOMIC */
40 #include <rtatomic.h>
41 
42 #define IPC_STORE(dst, val, morder) rt_atomic_store(dst, val)
43 #define IPC_LOAD(dst, morder)       rt_atomic_load(dst)
44 #define IPC_BARRIER(morder)
45 #define IPC_CAS(dst, exp, desired, succ, fail) \
46     rt_atomic_compare_exchange_strong(dst, exp, desired)
47 #endif /* RT_USING_STDC_ATOMIC */
48 
49 static rt_err_t _comp_susp_thread(struct rt_completion *completion,
50                                   rt_thread_t thread, rt_int32_t timeout,
51                                   int suspend_flag);
52 
53 /**
54  * @brief This function will initialize a completion object.
55  *
56  * @param completion is a pointer to a completion object.
57  */
rt_completion_init(struct rt_completion * completion)58 void rt_completion_init(struct rt_completion *completion)
59 {
60     RT_ASSERT(completion != RT_NULL);
61 
62     IPC_STORE(&completion->susp_thread_n_flag, RT_UNCOMPLETED,
63               memory_order_relaxed);
64 }
65 RTM_EXPORT(rt_completion_init);
66 
67 /**
68  * @brief This is same as rt_completion_wait_flags(), except that this API is NOT
69  *        ISR-safe (you can NOT call completion_done() on isr routine).
70  *
71  * @param completion is a pointer to a completion object.
72  * @param timeout is a timeout period (unit: OS ticks). If the completion is unavailable, the thread will wait for
73  *                the completion done up to the amount of time specified by the argument.
74  *                NOTE: Generally, we use the macro RT_WAITING_FOREVER to set this parameter, which means that when the
75  *                completion is unavailable, the thread will be waitting forever.
76  * @param suspend_flag suspend flags. See rt_thread_suspend_with_flag()
77  *
78  * @return Return the operation status. ONLY when the return value is RT_EOK, the operation is successful.
79  *         If the return value is any other values, it means that the completion wait failed.
80  *
81  * @warning This function can ONLY be called in the thread context. It MUST NOT be called in interrupt context.
82  */
rt_completion_wait_flags_noisr(struct rt_completion * completion,rt_int32_t timeout,int suspend_flag)83 rt_err_t rt_completion_wait_flags_noisr(struct rt_completion *completion,
84                                         rt_int32_t timeout, int suspend_flag)
85 {
86     rt_err_t result = -RT_ERROR;
87     rt_thread_t thread;
88     rt_bool_t exchange_succ;
89     rt_base_t expected_value;
90     RT_ASSERT(completion != RT_NULL);
91 
92     /* current context checking */
93     RT_DEBUG_SCHEDULER_AVAILABLE(timeout != 0);
94 
95     thread = rt_thread_self();
96 
97     do
98     {
99         /* try to consume one completion */
100         expected_value = RT_COMPLETED;
101         exchange_succ =
102             IPC_CAS(&completion->susp_thread_n_flag, &expected_value,
103                     RT_UNCOMPLETED, memory_order_acquire, memory_order_relaxed);
104 
105         if (exchange_succ)
106         {
107             /* consume succeed, now return EOK */
108             result = RT_EOK;
109             break;
110         }
111         else if (expected_value == RT_WAKING)
112         {
113             /* previous wake is not done yet, yield thread & try again */
114             rt_thread_yield();
115         }
116         else
117         {
118             /**
119              * API rules say: only one thread can suspend on complete.
120              * So we assert if debug.
121              */
122             RT_ASSERT(expected_value == RT_UNCOMPLETED);
123 
124             if (timeout != 0)
125             {
126                 /**
127                  * try to occupy completion, noted that we are assuming that
128                  * `expected_value == RT_UNCOMPLETED`
129                  */
130                 exchange_succ = IPC_CAS(
131                     &completion->susp_thread_n_flag, &expected_value,
132                     RT_OCCUPIED, memory_order_relaxed, memory_order_relaxed);
133 
134                 if (exchange_succ)
135                 {
136                     /* complete waiting business and return result */
137                     result = _comp_susp_thread(completion, thread, timeout,
138                                                suspend_flag);
139 
140                     RT_ASSERT(rt_atomic_load(&completion->susp_thread_n_flag) !=
141                               RT_OCCUPIED);
142 
143                     break;
144                 }
145                 else
146                 {
147                     /* try again */
148                 }
149             }
150             else
151             {
152                 result = -RT_ETIMEOUT;
153                 break;
154             }
155         }
156     } while (1);
157 
158     return result;
159 }
160 
161 /**
162  * @brief This function will wait for a completion, if the completion is unavailable, the thread shall wait for
163  *        the completion up to a specified time.
164  *
165  * @param completion is a pointer to a completion object.
166  * @param timeout is a timeout period (unit: OS ticks). If the completion is unavailable, the thread will wait for
167  *                the completion done up to the amount of time specified by the argument.
168  *                NOTE: Generally, we use the macro RT_WAITING_FOREVER to set this parameter, which means that when the
169  *                completion is unavailable, the thread will be waitting forever.
170  * @param suspend_flag suspend flags. See rt_thread_suspend_with_flag()
171  *
172  * @return Return the operation status. ONLY when the return value is RT_EOK, the operation is successful.
173  *         If the return value is any other values, it means that the completion wait failed.
174  *
175  * @warning This function can ONLY be called in the thread context. It MUST NOT be called in interrupt context.
176  */
rt_completion_wait_flags(struct rt_completion * completion,rt_int32_t timeout,int suspend_flag)177 rt_err_t rt_completion_wait_flags(struct rt_completion *completion,
178                                   rt_int32_t timeout, int suspend_flag)
179 {
180     rt_err_t error;
181     rt_ubase_t level;
182     level = rt_hw_local_irq_disable();
183     error = rt_completion_wait_flags_noisr(completion, timeout, suspend_flag);
184     rt_hw_local_irq_enable(level);
185     return error;
186 }
187 
_wait_until_update(struct rt_completion * completion,rt_base_t expected)188 static rt_base_t _wait_until_update(struct rt_completion *completion, rt_base_t expected)
189 {
190     rt_base_t current_value;
191 
192     /* spinning for update */
193     do
194     {
195         rt_hw_isb();
196         current_value =
197             IPC_LOAD(&completion->susp_thread_n_flag, memory_order_relaxed);
198     } while (current_value == expected);
199 
200     return current_value;
201 }
202 
203 /**
204  * Try to suspend thread and update completion
205  */
_comp_susp_thread(struct rt_completion * completion,rt_thread_t thread,rt_int32_t timeout,int suspend_flag)206 static rt_err_t _comp_susp_thread(struct rt_completion *completion,
207                                   rt_thread_t thread, rt_int32_t timeout,
208                                   int suspend_flag)
209 {
210     rt_err_t error = -RT_ERROR;
211     rt_base_t clevel;
212     rt_base_t comp_waiting;
213 
214     /* suspend thread */
215     clevel = rt_enter_critical();
216 
217     /* reset thread error number */
218     thread->error = RT_EOK;
219 
220     error = rt_thread_suspend_with_flag(thread, suspend_flag);
221 
222     if (error)
223     {
224         rt_exit_critical_safe(clevel);
225         RT_ASSERT(rt_atomic_load(&completion->susp_thread_n_flag) ==
226                   RT_OCCUPIED);
227         IPC_STORE(&completion->susp_thread_n_flag, RT_UNCOMPLETED,
228                   memory_order_relaxed);
229     }
230     else
231     {
232         /* set to waiting */
233         comp_waiting = RT_COMPLETION_NEW_STAT(thread, RT_UNCOMPLETED);
234         RT_ASSERT(rt_atomic_load(&completion->susp_thread_n_flag) ==
235                   RT_OCCUPIED);
236         IPC_STORE(&completion->susp_thread_n_flag, comp_waiting,
237                   memory_order_relaxed);
238 
239         /* current context checking */
240         RT_DEBUG_NOT_IN_INTERRUPT;
241 
242         /* start timer */
243         if (timeout > 0)
244         {
245             /* reset the timeout of thread timer and start it */
246             rt_timer_control(&(thread->thread_timer),
247                              RT_TIMER_CTRL_SET_TIME,
248                              &timeout);
249             rt_timer_start(&(thread->thread_timer));
250         }
251 
252         /* do schedule */
253         rt_schedule();
254 
255         rt_exit_critical_safe(clevel);
256 
257         /* thread is woken up */
258         error = thread->error;
259         error = error > 0 ? -error : error;
260 
261         /* clean completed flag & remove susp_thread on the case of waking by timeout */
262         if (!error)
263         {
264             /* completion done successfully */
265             RT_ASSERT(rt_atomic_load(&completion->susp_thread_n_flag) !=
266                       comp_waiting);
267 
268             /* the necessary barrier is done during thread sched */
269         }
270         else
271         {
272             /* try to cancel waiting if woken up expectedly or timeout */
273             if (!IPC_CAS(&completion->susp_thread_n_flag, &comp_waiting,
274                          RT_UNCOMPLETED, memory_order_relaxed,
275                          memory_order_relaxed))
276             {
277                 /* cancel failed, producer had woken us in the past, fix error */
278                 if (comp_waiting == RT_WAKING)
279                 {
280                     _wait_until_update(completion, RT_WAKING);
281                 }
282                 IPC_BARRIER(memory_order_acquire);
283                 error = RT_EOK;
284             }
285         }
286     }
287 
288     return error;
289 }
290 
291 /**
292  * @brief   This function indicates a completion has done and wakeup the thread
293  *          and update its errno. No update is applied if it's a negative value.
294  *
295  * @param   completion is a pointer to a completion object.
296  * @param   thread_errno is the errno set to waking thread.
297  * @return  RT_EOK if wakeup succeed.
298  *          RT_EEMPTY if wakeup failure and the completion is set to completed.
299  *          RT_EBUSY if the completion is still in completed state
300  */
rt_completion_wakeup_by_errno(struct rt_completion * completion,rt_err_t thread_errno)301 rt_err_t rt_completion_wakeup_by_errno(struct rt_completion *completion,
302                                        rt_err_t thread_errno)
303 {
304     rt_err_t error = -RT_ERROR;
305     rt_thread_t suspend_thread;
306     rt_bool_t exchange_succ;
307     rt_base_t expected_value;
308     RT_ASSERT(completion != RT_NULL);
309 
310     do
311     {
312         /* try to transform from uncompleted to completed */
313         expected_value = RT_UNCOMPLETED;
314 
315         exchange_succ =
316             IPC_CAS(&completion->susp_thread_n_flag, &expected_value,
317                     RT_COMPLETED, memory_order_release, memory_order_relaxed);
318 
319         if (exchange_succ)
320         {
321             error = -RT_EEMPTY;
322             break;
323         }
324         else
325         {
326             if (expected_value == RT_COMPLETED)
327             {
328                 /* completion still in completed state */
329                 error = -RT_EBUSY;
330                 break;
331             }
332             else if (expected_value == RT_OCCUPIED ||
333                      expected_value == RT_WAKING)
334             {
335                 continue;
336             }
337             else
338             {
339                 /* try to resume the thread and set uncompleted */
340                 exchange_succ = IPC_CAS(
341                     &completion->susp_thread_n_flag, &expected_value,
342                     RT_WAKING, memory_order_relaxed, memory_order_relaxed);
343 
344                 if (exchange_succ)
345                 {
346                     #define GET_THREAD(val) ((rt_thread_t)((val) & ~1))
347                     suspend_thread = GET_THREAD(expected_value);
348 
349                     if (thread_errno >= 0)
350                     {
351                         suspend_thread->error = thread_errno;
352                     }
353 
354                     /* safe to assume publication done even on resume failure */
355                     RT_ASSERT(rt_atomic_load(&completion->susp_thread_n_flag) ==
356                               RT_WAKING);
357                     IPC_STORE(&completion->susp_thread_n_flag, RT_UNCOMPLETED,
358                               memory_order_release);
359                     rt_thread_resume(suspend_thread);
360                     error = RT_EOK;
361                     break;
362                 }
363                 else
364                 {
365                     /* failed in racing to resume thread, try again */
366                 }
367             }
368         }
369     } while (1);
370 
371     return error;
372 }
373