1 /*
2  * Copyright (c) 2006-2022, RT-Thread Development Team
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  *
6  * Change Logs:
7  * Date           Author       Notes
8  * 2017-02-27     Bernard      fix the re-work issue.
9  * 2021-08-01     Meco Man     remove rt_delayed_work_init()
10  * 2021-08-14     Jackistang   add comments for function interface
11  * 2022-01-16     Meco Man     add rt_work_urgent()
12  * 2023-09-15     xqyjlj       perf rt_hw_interrupt_disable/enable
13  * 2024-12-21     yuqingli     delete timer, using list
14  */
15 
16 #include <rthw.h>
17 #include <rtdevice.h>
18 
19 #ifdef RT_USING_HEAP
20 
_workqueue_work_completion(struct rt_workqueue * queue)21 rt_inline rt_err_t _workqueue_work_completion(struct rt_workqueue *queue)
22 {
23     rt_err_t result;
24 
25     while (1)
26     {
27         /* try to take condition semaphore */
28         result = rt_sem_trytake(&(queue->sem));
29         if (result == -RT_ETIMEOUT)
30         {
31             /* it's timeout, release this semaphore */
32             rt_sem_release(&(queue->sem));
33         }
34         else if (result == RT_EOK)
35         {
36             /* keep the sem value = 0 */
37             result = RT_EOK;
38             break;
39         }
40         else
41         {
42             result = -RT_ERROR;
43             break;
44         }
45     }
46 
47     return result;
48 }
49 
_workqueue_thread_entry(void * parameter)50 static void _workqueue_thread_entry(void *parameter)
51 {
52     rt_base_t            level;
53     struct rt_work      *work;
54     struct rt_workqueue *queue;
55     rt_tick_t            current_tick;
56     rt_int32_t           delay_tick;
57     void (*work_func)(struct rt_work *work, void *work_data);
58     void *work_data;
59 
60     queue = (struct rt_workqueue *)parameter;
61     RT_ASSERT(queue != RT_NULL);
62 
63     while (1)
64     {
65         level = rt_spin_lock_irqsave(&(queue->spinlock));
66 
67         /* timer check */
68         current_tick = rt_tick_get();
69         delay_tick   = RT_WAITING_FOREVER;
70         while (!rt_list_isempty(&(queue->delayed_list)))
71         {
72             work = rt_list_entry(queue->delayed_list.next, struct rt_work, list);
73             if ((current_tick - work->timeout_tick) < RT_TICK_MAX / 2)
74             {
75                 rt_list_remove(&(work->list));
76                 rt_list_insert_after(queue->work_list.prev, &(work->list));
77                 work->flags &= ~RT_WORK_STATE_SUBMITTING;
78                 work->flags |= RT_WORK_STATE_PENDING;
79             }
80             else
81             {
82                 delay_tick = work->timeout_tick - current_tick;
83                 break;
84             }
85         }
86 
87         if (rt_list_isempty(&(queue->work_list)))
88         {
89             rt_spin_unlock_irqrestore(&(queue->spinlock), level);
90             /* wait for work completion */
91             rt_completion_wait(&(queue->wakeup_completion), delay_tick);
92             continue;
93         }
94 
95         /* we have work to do with. */
96         work = rt_list_entry(queue->work_list.next, struct rt_work, list);
97         rt_list_remove(&(work->list));
98         queue->work_current  = work;
99         work->flags         &= ~RT_WORK_STATE_PENDING;
100         work->workqueue      = RT_NULL;
101         work_func            = work->work_func;
102         work_data            = work->work_data;
103         rt_spin_unlock_irqrestore(&(queue->spinlock), level);
104 
105         /* do work */
106         work_func(work, work_data);
107         /* clean current work */
108         queue->work_current = RT_NULL;
109 
110         /* ack work completion */
111         _workqueue_work_completion(queue);
112     }
113 }
114 
_workqueue_submit_work(struct rt_workqueue * queue,struct rt_work * work,rt_tick_t ticks)115 static rt_err_t _workqueue_submit_work(struct rt_workqueue *queue,
116                                        struct rt_work *work, rt_tick_t ticks)
117 {
118     rt_base_t       level;
119     rt_err_t        err = RT_EOK;
120     struct rt_work *work_tmp;
121     rt_list_t      *list_tmp;
122 
123     level = rt_spin_lock_irqsave(&(queue->spinlock));
124     /* remove list */
125     rt_list_remove(&(work->list));
126     work->flags = 0;
127 
128     if (ticks == 0)
129     {
130         rt_list_insert_after(queue->work_list.prev, &(work->list));
131         work->flags     |= RT_WORK_STATE_PENDING;
132         work->workqueue  = queue;
133 
134         rt_completion_done(&(queue->wakeup_completion));
135         err = RT_EOK;
136     }
137     else if (ticks < RT_TICK_MAX / 2)
138     {
139         /* insert delay work list */
140         work->flags        |= RT_WORK_STATE_SUBMITTING;
141         work->workqueue     = queue;
142         work->timeout_tick  = rt_tick_get() + ticks;
143 
144         list_tmp = &(queue->delayed_list);
145         for (work_tmp = rt_list_entry(list_tmp->next, struct rt_work, list);
146                 &work_tmp->list != list_tmp;
147                 work_tmp = rt_list_entry(work_tmp->list.next, struct rt_work, list))
148         {
149             if ((work_tmp->timeout_tick - work->timeout_tick) < RT_TICK_MAX / 2)
150             {
151                 list_tmp = &(work_tmp->list);
152                 break;
153             }
154         }
155         rt_list_insert_before(list_tmp, &(work->list));
156 
157         rt_completion_done(&(queue->wakeup_completion));
158         err = RT_EOK;
159     }
160     else
161     {
162         err = -RT_ERROR;
163     }
164     rt_spin_unlock_irqrestore(&(queue->spinlock), level);
165     return err;
166 }
167 
_workqueue_cancel_work(struct rt_workqueue * queue,struct rt_work * work)168 static rt_err_t _workqueue_cancel_work(struct rt_workqueue *queue, struct rt_work *work)
169 {
170     rt_base_t level;
171     rt_err_t  err;
172 
173     level = rt_spin_lock_irqsave(&(queue->spinlock));
174     rt_list_remove(&(work->list));
175     work->flags     = 0;
176     err             = queue->work_current != work ? RT_EOK : -RT_EBUSY;
177     work->workqueue = RT_NULL;
178     rt_spin_unlock_irqrestore(&(queue->spinlock), level);
179     return err;
180 }
181 
182 /**
183  * @brief Initialize a work item, binding with a callback function.
184  *
185  * @param work is a pointer to the work item object.
186  *
187  * @param work_func is a callback function that will be called when this work item is executed.
188  *
189  * @param work_data is a user data passed to the callback function as the second parameter.
190  */
rt_work_init(struct rt_work * work,void (* work_func)(struct rt_work * work,void * work_data),void * work_data)191 void rt_work_init(struct rt_work *work,
192                   void (*work_func)(struct rt_work *work, void *work_data),
193                   void *work_data)
194 {
195     RT_ASSERT(work != RT_NULL);
196     RT_ASSERT(work_func != RT_NULL);
197 
198     rt_list_init(&(work->list));
199     work->work_func = work_func;
200     work->work_data = work_data;
201     work->workqueue = RT_NULL;
202     work->flags     = 0;
203     work->type      = 0;
204 }
205 
206 /**
207  * @brief Create a work queue with a thread inside.
208  *
209  * @param name is a name of the work queue thread.
210  *
211  * @param stack_size is stack size of the work queue thread.
212  *
213  * @param priority is a priority of the work queue thread.
214  *
215  * @return Return a pointer to the workqueue object. It will return RT_NULL if failed.
216  */
rt_workqueue_create(const char * name,rt_uint16_t stack_size,rt_uint8_t priority)217 struct rt_workqueue *rt_workqueue_create(const char *name, rt_uint16_t stack_size, rt_uint8_t priority)
218 {
219     struct rt_workqueue *queue = RT_NULL;
220 
221     queue = (struct rt_workqueue *)RT_KERNEL_MALLOC(sizeof(struct rt_workqueue));
222     if (queue != RT_NULL)
223     {
224         /* initialize work list */
225         rt_list_init(&(queue->work_list));
226         rt_list_init(&(queue->delayed_list));
227         queue->work_current = RT_NULL;
228         rt_sem_init(&(queue->sem), "wqueue", 0, RT_IPC_FLAG_FIFO);
229         rt_completion_init(&(queue->wakeup_completion));
230 
231         /* create the work thread */
232         queue->work_thread = rt_thread_create(name, _workqueue_thread_entry, queue, stack_size, priority, 10);
233         if (queue->work_thread == RT_NULL)
234         {
235             rt_sem_detach(&(queue->sem));
236             RT_KERNEL_FREE(queue);
237             return RT_NULL;
238         }
239 
240         rt_spin_lock_init(&(queue->spinlock));
241         rt_thread_startup(queue->work_thread);
242     }
243 
244     return queue;
245 }
246 
247 /**
248  * @brief Destroy a work queue.
249  *
250  * @param queue is a pointer to the workqueue object.
251  *
252  * @return RT_EOK     Success.
253  */
rt_workqueue_destroy(struct rt_workqueue * queue)254 rt_err_t rt_workqueue_destroy(struct rt_workqueue *queue)
255 {
256     RT_ASSERT(queue != RT_NULL);
257 
258     rt_workqueue_cancel_all_work(queue);
259     rt_thread_delete(queue->work_thread);
260     rt_sem_detach(&(queue->sem));
261     RT_KERNEL_FREE(queue);
262 
263     return RT_EOK;
264 }
265 
266 /**
267  * @brief Submit a work item to the work queue without delay.
268  *
269  * @param queue is a pointer to the workqueue object.
270  *
271  * @param work is a pointer to the work item object.
272  *
273  * @return RT_EOK       Success.
274  */
rt_workqueue_dowork(struct rt_workqueue * queue,struct rt_work * work)275 rt_err_t rt_workqueue_dowork(struct rt_workqueue *queue, struct rt_work *work)
276 {
277     RT_ASSERT(queue != RT_NULL);
278     RT_ASSERT(work != RT_NULL);
279 
280     return _workqueue_submit_work(queue, work, 0);
281 }
282 
283 /**
284  * @brief Submit a work item to the work queue with a delay.
285  *
286  * @param queue is a pointer to the workqueue object.
287  *
288  * @param work is a pointer to the work item object.
289  *
290  * @param ticks is the delay ticks for the work item to be submitted to the work queue.
291  *
292  *             NOTE: The max timeout tick should be no more than (RT_TICK_MAX/2 - 1)
293  *
294  * @return RT_EOK       Success.
295  *         -RT_ERROR    The ticks parameter is invalid.
296  */
rt_workqueue_submit_work(struct rt_workqueue * queue,struct rt_work * work,rt_tick_t ticks)297 rt_err_t rt_workqueue_submit_work(struct rt_workqueue *queue, struct rt_work *work, rt_tick_t ticks)
298 {
299     RT_ASSERT(queue != RT_NULL);
300     RT_ASSERT(work != RT_NULL);
301     RT_ASSERT(ticks < RT_TICK_MAX / 2);
302 
303     return _workqueue_submit_work(queue, work, ticks);
304 }
305 
306 /**
307  * @brief Submit a work item to the work queue without delay. This work item will be executed after the current work item.
308  *
309  * @param queue is a pointer to the workqueue object.
310  *
311  * @param work is a pointer to the work item object.
312  *
313  * @return RT_EOK   Success.
314  */
rt_workqueue_urgent_work(struct rt_workqueue * queue,struct rt_work * work)315 rt_err_t rt_workqueue_urgent_work(struct rt_workqueue *queue, struct rt_work *work)
316 {
317     rt_base_t level;
318 
319     RT_ASSERT(queue != RT_NULL);
320     RT_ASSERT(work != RT_NULL);
321 
322     level = rt_spin_lock_irqsave(&(queue->spinlock));
323     /* NOTE: the work MUST be initialized firstly */
324     rt_list_remove(&(work->list));
325     rt_list_insert_after(&queue->work_list, &(work->list));
326 
327     rt_completion_done(&(queue->wakeup_completion));
328     rt_spin_unlock_irqrestore(&(queue->spinlock), level);
329 
330     return RT_EOK;
331 }
332 
333 /**
334  * @brief Cancel a work item in the work queue.
335  *
336  * @param queue is a pointer to the workqueue object.
337  *
338  * @param work is a pointer to the work item object.
339  *
340  * @return RT_EOK       Success.
341  *         -RT_EBUSY    This work item is executing.
342  */
rt_workqueue_cancel_work(struct rt_workqueue * queue,struct rt_work * work)343 rt_err_t rt_workqueue_cancel_work(struct rt_workqueue *queue, struct rt_work *work)
344 {
345     RT_ASSERT(work != RT_NULL);
346     RT_ASSERT(queue != RT_NULL);
347 
348     return _workqueue_cancel_work(queue, work);
349 }
350 
351 /**
352  * @brief Cancel a work item in the work queue. If the work item is executing, this function will block until it is done.
353  *
354  * @param queue is a pointer to the workqueue object.
355  *
356  * @param work is a pointer to the work item object.
357  *
358  * @return RT_EOK       Success.
359  */
rt_workqueue_cancel_work_sync(struct rt_workqueue * queue,struct rt_work * work)360 rt_err_t rt_workqueue_cancel_work_sync(struct rt_workqueue *queue, struct rt_work *work)
361 {
362     RT_ASSERT(queue != RT_NULL);
363     RT_ASSERT(work != RT_NULL);
364 
365     if (queue->work_current == work) /* it's current work in the queue */
366     {
367         /* wait for work completion */
368         rt_sem_take(&(queue->sem), RT_WAITING_FOREVER);
369         /* Note that because work items are automatically deleted after execution, they do not need to be deleted again */
370     }
371     else
372     {
373         _workqueue_cancel_work(queue, work);
374     }
375 
376     return RT_EOK;
377 }
378 
379 /**
380  * @brief This function will cancel all work items in work queue.
381  *
382  * @param queue is a pointer to the workqueue object.
383  *
384  * @return RT_EOK       Success.
385  */
rt_workqueue_cancel_all_work(struct rt_workqueue * queue)386 rt_err_t rt_workqueue_cancel_all_work(struct rt_workqueue *queue)
387 {
388     struct rt_work *work;
389 
390     RT_ASSERT(queue != RT_NULL);
391 
392     /* cancel work */
393     rt_enter_critical();
394     while (rt_list_isempty(&queue->work_list) == RT_FALSE)
395     {
396         work = rt_list_first_entry(&queue->work_list, struct rt_work, list);
397         _workqueue_cancel_work(queue, work);
398     }
399     /* cancel delay work */
400     while (rt_list_isempty(&queue->delayed_list) == RT_FALSE)
401     {
402         work = rt_list_first_entry(&queue->delayed_list, struct rt_work, list);
403         _workqueue_cancel_work(queue, work);
404     }
405     rt_exit_critical();
406 
407     return RT_EOK;
408 }
409 
410 #ifdef RT_USING_SYSTEM_WORKQUEUE
411 
412 static struct rt_workqueue *sys_workq; /* system work queue */
413 
414 /**
415  * @brief Submit a work item to the system work queue with a delay.
416  *
417  * @param work is a pointer to the work item object.
418  *
419  * @param ticks is the delay OS ticks for the work item to be submitted to the work queue.
420  *
421  *             NOTE: The max timeout tick should be no more than (RT_TICK_MAX/2 - 1)
422  *
423  * @return RT_EOK       Success.
424  *         -RT_ERROR    The ticks parameter is invalid.
425  */
rt_work_submit(struct rt_work * work,rt_tick_t ticks)426 rt_err_t rt_work_submit(struct rt_work *work, rt_tick_t ticks)
427 {
428     return rt_workqueue_submit_work(sys_workq, work, ticks);
429 }
430 
431 /**
432  * @brief Submit a work item to the system work queue without delay. This work item will be executed after the current work item.
433  *
434  * @param work is a pointer to the work item object.
435  *
436  * @return RT_EOK   Success.
437  */
rt_work_urgent(struct rt_work * work)438 rt_err_t rt_work_urgent(struct rt_work *work)
439 {
440     return rt_workqueue_urgent_work(sys_workq, work);
441 }
442 
443 /**
444  * @brief Cancel a work item in the system work queue.
445  *
446  * @param work is a pointer to the work item object.
447  *
448  * @return RT_EOK       Success.
449  *         -RT_EBUSY    This work item is executing.
450  */
rt_work_cancel(struct rt_work * work)451 rt_err_t rt_work_cancel(struct rt_work *work)
452 {
453     return rt_workqueue_cancel_work(sys_workq, work);
454 }
455 
rt_work_sys_workqueue_init(void)456 static int rt_work_sys_workqueue_init(void)
457 {
458     if (sys_workq != RT_NULL)
459         return RT_EOK;
460 
461     sys_workq = rt_workqueue_create("sys workq", RT_SYSTEM_WORKQUEUE_STACKSIZE,
462                                     RT_SYSTEM_WORKQUEUE_PRIORITY);
463     RT_ASSERT(sys_workq != RT_NULL);
464 
465     return RT_EOK;
466 }
467 INIT_PREV_EXPORT(rt_work_sys_workqueue_init);
468 #endif /* RT_USING_SYSTEM_WORKQUEUE */
469 #endif /* RT_USING_HEAP */
470