1 /*
2 * Copyright (c) 2006-2022, RT-Thread Development Team
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 *
6 * Change Logs:
7 * Date Author Notes
8 * 2017-02-27 Bernard fix the re-work issue.
9 * 2021-08-01 Meco Man remove rt_delayed_work_init()
10 * 2021-08-14 Jackistang add comments for function interface
11 * 2022-01-16 Meco Man add rt_work_urgent()
12 * 2023-09-15 xqyjlj perf rt_hw_interrupt_disable/enable
13 * 2024-12-21 yuqingli delete timer, using list
14 */
15
16 #include <rthw.h>
17 #include <rtdevice.h>
18
19 #ifdef RT_USING_HEAP
20
_workqueue_work_completion(struct rt_workqueue * queue)21 rt_inline rt_err_t _workqueue_work_completion(struct rt_workqueue *queue)
22 {
23 rt_err_t result;
24
25 while (1)
26 {
27 /* try to take condition semaphore */
28 result = rt_sem_trytake(&(queue->sem));
29 if (result == -RT_ETIMEOUT)
30 {
31 /* it's timeout, release this semaphore */
32 rt_sem_release(&(queue->sem));
33 }
34 else if (result == RT_EOK)
35 {
36 /* keep the sem value = 0 */
37 result = RT_EOK;
38 break;
39 }
40 else
41 {
42 result = -RT_ERROR;
43 break;
44 }
45 }
46
47 return result;
48 }
49
_workqueue_thread_entry(void * parameter)50 static void _workqueue_thread_entry(void *parameter)
51 {
52 rt_base_t level;
53 struct rt_work *work;
54 struct rt_workqueue *queue;
55 rt_tick_t current_tick;
56 rt_int32_t delay_tick;
57 void (*work_func)(struct rt_work *work, void *work_data);
58 void *work_data;
59
60 queue = (struct rt_workqueue *)parameter;
61 RT_ASSERT(queue != RT_NULL);
62
63 while (1)
64 {
65 level = rt_spin_lock_irqsave(&(queue->spinlock));
66
67 /* timer check */
68 current_tick = rt_tick_get();
69 delay_tick = RT_WAITING_FOREVER;
70 while (!rt_list_isempty(&(queue->delayed_list)))
71 {
72 work = rt_list_entry(queue->delayed_list.next, struct rt_work, list);
73 if ((current_tick - work->timeout_tick) < RT_TICK_MAX / 2)
74 {
75 rt_list_remove(&(work->list));
76 rt_list_insert_after(queue->work_list.prev, &(work->list));
77 work->flags &= ~RT_WORK_STATE_SUBMITTING;
78 work->flags |= RT_WORK_STATE_PENDING;
79 }
80 else
81 {
82 delay_tick = work->timeout_tick - current_tick;
83 break;
84 }
85 }
86
87 if (rt_list_isempty(&(queue->work_list)))
88 {
89 rt_spin_unlock_irqrestore(&(queue->spinlock), level);
90 /* wait for work completion */
91 rt_completion_wait(&(queue->wakeup_completion), delay_tick);
92 continue;
93 }
94
95 /* we have work to do with. */
96 work = rt_list_entry(queue->work_list.next, struct rt_work, list);
97 rt_list_remove(&(work->list));
98 queue->work_current = work;
99 work->flags &= ~RT_WORK_STATE_PENDING;
100 work->workqueue = RT_NULL;
101 work_func = work->work_func;
102 work_data = work->work_data;
103 rt_spin_unlock_irqrestore(&(queue->spinlock), level);
104
105 /* do work */
106 work_func(work, work_data);
107 /* clean current work */
108 queue->work_current = RT_NULL;
109
110 /* ack work completion */
111 _workqueue_work_completion(queue);
112 }
113 }
114
_workqueue_submit_work(struct rt_workqueue * queue,struct rt_work * work,rt_tick_t ticks)115 static rt_err_t _workqueue_submit_work(struct rt_workqueue *queue,
116 struct rt_work *work, rt_tick_t ticks)
117 {
118 rt_base_t level;
119 rt_err_t err = RT_EOK;
120 struct rt_work *work_tmp;
121 rt_list_t *list_tmp;
122
123 level = rt_spin_lock_irqsave(&(queue->spinlock));
124 /* remove list */
125 rt_list_remove(&(work->list));
126 work->flags = 0;
127
128 if (ticks == 0)
129 {
130 rt_list_insert_after(queue->work_list.prev, &(work->list));
131 work->flags |= RT_WORK_STATE_PENDING;
132 work->workqueue = queue;
133
134 rt_completion_done(&(queue->wakeup_completion));
135 err = RT_EOK;
136 }
137 else if (ticks < RT_TICK_MAX / 2)
138 {
139 /* insert delay work list */
140 work->flags |= RT_WORK_STATE_SUBMITTING;
141 work->workqueue = queue;
142 work->timeout_tick = rt_tick_get() + ticks;
143
144 list_tmp = &(queue->delayed_list);
145 for (work_tmp = rt_list_entry(list_tmp->next, struct rt_work, list);
146 &work_tmp->list != list_tmp;
147 work_tmp = rt_list_entry(work_tmp->list.next, struct rt_work, list))
148 {
149 if ((work_tmp->timeout_tick - work->timeout_tick) < RT_TICK_MAX / 2)
150 {
151 list_tmp = &(work_tmp->list);
152 break;
153 }
154 }
155 rt_list_insert_before(list_tmp, &(work->list));
156
157 rt_completion_done(&(queue->wakeup_completion));
158 err = RT_EOK;
159 }
160 else
161 {
162 err = -RT_ERROR;
163 }
164 rt_spin_unlock_irqrestore(&(queue->spinlock), level);
165 return err;
166 }
167
_workqueue_cancel_work(struct rt_workqueue * queue,struct rt_work * work)168 static rt_err_t _workqueue_cancel_work(struct rt_workqueue *queue, struct rt_work *work)
169 {
170 rt_base_t level;
171 rt_err_t err;
172
173 level = rt_spin_lock_irqsave(&(queue->spinlock));
174 rt_list_remove(&(work->list));
175 work->flags = 0;
176 err = queue->work_current != work ? RT_EOK : -RT_EBUSY;
177 work->workqueue = RT_NULL;
178 rt_spin_unlock_irqrestore(&(queue->spinlock), level);
179 return err;
180 }
181
182 /**
183 * @brief Initialize a work item, binding with a callback function.
184 *
185 * @param work is a pointer to the work item object.
186 *
187 * @param work_func is a callback function that will be called when this work item is executed.
188 *
189 * @param work_data is a user data passed to the callback function as the second parameter.
190 */
rt_work_init(struct rt_work * work,void (* work_func)(struct rt_work * work,void * work_data),void * work_data)191 void rt_work_init(struct rt_work *work,
192 void (*work_func)(struct rt_work *work, void *work_data),
193 void *work_data)
194 {
195 RT_ASSERT(work != RT_NULL);
196 RT_ASSERT(work_func != RT_NULL);
197
198 rt_list_init(&(work->list));
199 work->work_func = work_func;
200 work->work_data = work_data;
201 work->workqueue = RT_NULL;
202 work->flags = 0;
203 work->type = 0;
204 }
205
206 /**
207 * @brief Create a work queue with a thread inside.
208 *
209 * @param name is a name of the work queue thread.
210 *
211 * @param stack_size is stack size of the work queue thread.
212 *
213 * @param priority is a priority of the work queue thread.
214 *
215 * @return Return a pointer to the workqueue object. It will return RT_NULL if failed.
216 */
rt_workqueue_create(const char * name,rt_uint16_t stack_size,rt_uint8_t priority)217 struct rt_workqueue *rt_workqueue_create(const char *name, rt_uint16_t stack_size, rt_uint8_t priority)
218 {
219 struct rt_workqueue *queue = RT_NULL;
220
221 queue = (struct rt_workqueue *)RT_KERNEL_MALLOC(sizeof(struct rt_workqueue));
222 if (queue != RT_NULL)
223 {
224 /* initialize work list */
225 rt_list_init(&(queue->work_list));
226 rt_list_init(&(queue->delayed_list));
227 queue->work_current = RT_NULL;
228 rt_sem_init(&(queue->sem), "wqueue", 0, RT_IPC_FLAG_FIFO);
229 rt_completion_init(&(queue->wakeup_completion));
230
231 /* create the work thread */
232 queue->work_thread = rt_thread_create(name, _workqueue_thread_entry, queue, stack_size, priority, 10);
233 if (queue->work_thread == RT_NULL)
234 {
235 rt_sem_detach(&(queue->sem));
236 RT_KERNEL_FREE(queue);
237 return RT_NULL;
238 }
239
240 rt_spin_lock_init(&(queue->spinlock));
241 rt_thread_startup(queue->work_thread);
242 }
243
244 return queue;
245 }
246
247 /**
248 * @brief Destroy a work queue.
249 *
250 * @param queue is a pointer to the workqueue object.
251 *
252 * @return RT_EOK Success.
253 */
rt_workqueue_destroy(struct rt_workqueue * queue)254 rt_err_t rt_workqueue_destroy(struct rt_workqueue *queue)
255 {
256 RT_ASSERT(queue != RT_NULL);
257
258 rt_workqueue_cancel_all_work(queue);
259 rt_thread_delete(queue->work_thread);
260 rt_sem_detach(&(queue->sem));
261 RT_KERNEL_FREE(queue);
262
263 return RT_EOK;
264 }
265
266 /**
267 * @brief Submit a work item to the work queue without delay.
268 *
269 * @param queue is a pointer to the workqueue object.
270 *
271 * @param work is a pointer to the work item object.
272 *
273 * @return RT_EOK Success.
274 */
rt_workqueue_dowork(struct rt_workqueue * queue,struct rt_work * work)275 rt_err_t rt_workqueue_dowork(struct rt_workqueue *queue, struct rt_work *work)
276 {
277 RT_ASSERT(queue != RT_NULL);
278 RT_ASSERT(work != RT_NULL);
279
280 return _workqueue_submit_work(queue, work, 0);
281 }
282
283 /**
284 * @brief Submit a work item to the work queue with a delay.
285 *
286 * @param queue is a pointer to the workqueue object.
287 *
288 * @param work is a pointer to the work item object.
289 *
290 * @param ticks is the delay ticks for the work item to be submitted to the work queue.
291 *
292 * NOTE: The max timeout tick should be no more than (RT_TICK_MAX/2 - 1)
293 *
294 * @return RT_EOK Success.
295 * -RT_ERROR The ticks parameter is invalid.
296 */
rt_workqueue_submit_work(struct rt_workqueue * queue,struct rt_work * work,rt_tick_t ticks)297 rt_err_t rt_workqueue_submit_work(struct rt_workqueue *queue, struct rt_work *work, rt_tick_t ticks)
298 {
299 RT_ASSERT(queue != RT_NULL);
300 RT_ASSERT(work != RT_NULL);
301 RT_ASSERT(ticks < RT_TICK_MAX / 2);
302
303 return _workqueue_submit_work(queue, work, ticks);
304 }
305
306 /**
307 * @brief Submit a work item to the work queue without delay. This work item will be executed after the current work item.
308 *
309 * @param queue is a pointer to the workqueue object.
310 *
311 * @param work is a pointer to the work item object.
312 *
313 * @return RT_EOK Success.
314 */
rt_workqueue_urgent_work(struct rt_workqueue * queue,struct rt_work * work)315 rt_err_t rt_workqueue_urgent_work(struct rt_workqueue *queue, struct rt_work *work)
316 {
317 rt_base_t level;
318
319 RT_ASSERT(queue != RT_NULL);
320 RT_ASSERT(work != RT_NULL);
321
322 level = rt_spin_lock_irqsave(&(queue->spinlock));
323 /* NOTE: the work MUST be initialized firstly */
324 rt_list_remove(&(work->list));
325 rt_list_insert_after(&queue->work_list, &(work->list));
326
327 rt_completion_done(&(queue->wakeup_completion));
328 rt_spin_unlock_irqrestore(&(queue->spinlock), level);
329
330 return RT_EOK;
331 }
332
333 /**
334 * @brief Cancel a work item in the work queue.
335 *
336 * @param queue is a pointer to the workqueue object.
337 *
338 * @param work is a pointer to the work item object.
339 *
340 * @return RT_EOK Success.
341 * -RT_EBUSY This work item is executing.
342 */
rt_workqueue_cancel_work(struct rt_workqueue * queue,struct rt_work * work)343 rt_err_t rt_workqueue_cancel_work(struct rt_workqueue *queue, struct rt_work *work)
344 {
345 RT_ASSERT(work != RT_NULL);
346 RT_ASSERT(queue != RT_NULL);
347
348 return _workqueue_cancel_work(queue, work);
349 }
350
351 /**
352 * @brief Cancel a work item in the work queue. If the work item is executing, this function will block until it is done.
353 *
354 * @param queue is a pointer to the workqueue object.
355 *
356 * @param work is a pointer to the work item object.
357 *
358 * @return RT_EOK Success.
359 */
rt_workqueue_cancel_work_sync(struct rt_workqueue * queue,struct rt_work * work)360 rt_err_t rt_workqueue_cancel_work_sync(struct rt_workqueue *queue, struct rt_work *work)
361 {
362 RT_ASSERT(queue != RT_NULL);
363 RT_ASSERT(work != RT_NULL);
364
365 if (queue->work_current == work) /* it's current work in the queue */
366 {
367 /* wait for work completion */
368 rt_sem_take(&(queue->sem), RT_WAITING_FOREVER);
369 /* Note that because work items are automatically deleted after execution, they do not need to be deleted again */
370 }
371 else
372 {
373 _workqueue_cancel_work(queue, work);
374 }
375
376 return RT_EOK;
377 }
378
379 /**
380 * @brief This function will cancel all work items in work queue.
381 *
382 * @param queue is a pointer to the workqueue object.
383 *
384 * @return RT_EOK Success.
385 */
rt_workqueue_cancel_all_work(struct rt_workqueue * queue)386 rt_err_t rt_workqueue_cancel_all_work(struct rt_workqueue *queue)
387 {
388 struct rt_work *work;
389
390 RT_ASSERT(queue != RT_NULL);
391
392 /* cancel work */
393 rt_enter_critical();
394 while (rt_list_isempty(&queue->work_list) == RT_FALSE)
395 {
396 work = rt_list_first_entry(&queue->work_list, struct rt_work, list);
397 _workqueue_cancel_work(queue, work);
398 }
399 /* cancel delay work */
400 while (rt_list_isempty(&queue->delayed_list) == RT_FALSE)
401 {
402 work = rt_list_first_entry(&queue->delayed_list, struct rt_work, list);
403 _workqueue_cancel_work(queue, work);
404 }
405 rt_exit_critical();
406
407 return RT_EOK;
408 }
409
410 #ifdef RT_USING_SYSTEM_WORKQUEUE
411
412 static struct rt_workqueue *sys_workq; /* system work queue */
413
414 /**
415 * @brief Submit a work item to the system work queue with a delay.
416 *
417 * @param work is a pointer to the work item object.
418 *
419 * @param ticks is the delay OS ticks for the work item to be submitted to the work queue.
420 *
421 * NOTE: The max timeout tick should be no more than (RT_TICK_MAX/2 - 1)
422 *
423 * @return RT_EOK Success.
424 * -RT_ERROR The ticks parameter is invalid.
425 */
rt_work_submit(struct rt_work * work,rt_tick_t ticks)426 rt_err_t rt_work_submit(struct rt_work *work, rt_tick_t ticks)
427 {
428 return rt_workqueue_submit_work(sys_workq, work, ticks);
429 }
430
431 /**
432 * @brief Submit a work item to the system work queue without delay. This work item will be executed after the current work item.
433 *
434 * @param work is a pointer to the work item object.
435 *
436 * @return RT_EOK Success.
437 */
rt_work_urgent(struct rt_work * work)438 rt_err_t rt_work_urgent(struct rt_work *work)
439 {
440 return rt_workqueue_urgent_work(sys_workq, work);
441 }
442
443 /**
444 * @brief Cancel a work item in the system work queue.
445 *
446 * @param work is a pointer to the work item object.
447 *
448 * @return RT_EOK Success.
449 * -RT_EBUSY This work item is executing.
450 */
rt_work_cancel(struct rt_work * work)451 rt_err_t rt_work_cancel(struct rt_work *work)
452 {
453 return rt_workqueue_cancel_work(sys_workq, work);
454 }
455
rt_work_sys_workqueue_init(void)456 static int rt_work_sys_workqueue_init(void)
457 {
458 if (sys_workq != RT_NULL)
459 return RT_EOK;
460
461 sys_workq = rt_workqueue_create("sys workq", RT_SYSTEM_WORKQUEUE_STACKSIZE,
462 RT_SYSTEM_WORKQUEUE_PRIORITY);
463 RT_ASSERT(sys_workq != RT_NULL);
464
465 return RT_EOK;
466 }
467 INIT_PREV_EXPORT(rt_work_sys_workqueue_init);
468 #endif /* RT_USING_SYSTEM_WORKQUEUE */
469 #endif /* RT_USING_HEAP */
470