1 /*
2 * Copyright (c) 2020 Nordic Semiconductor ASA
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
7 /**
8 * @file
9 *
10 * Second generation work queue implementation
11 */
12
13 #include <zephyr/kernel.h>
14 #include <zephyr/kernel_structs.h>
15 #include <wait_q.h>
16 #include <zephyr/spinlock.h>
17 #include <errno.h>
18 #include <ksched.h>
19 #include <zephyr/sys/printk.h>
20 #include <zephyr/logging/log.h>
21
22 LOG_MODULE_DECLARE(os, CONFIG_KERNEL_LOG_LEVEL);
23
flag_clear(uint32_t * flagp,uint32_t bit)24 static inline void flag_clear(uint32_t *flagp,
25 uint32_t bit)
26 {
27 *flagp &= ~BIT(bit);
28 }
29
flag_set(uint32_t * flagp,uint32_t bit)30 static inline void flag_set(uint32_t *flagp,
31 uint32_t bit)
32 {
33 *flagp |= BIT(bit);
34 }
35
flag_test(const uint32_t * flagp,uint32_t bit)36 static inline bool flag_test(const uint32_t *flagp,
37 uint32_t bit)
38 {
39 return (*flagp & BIT(bit)) != 0U;
40 }
41
flag_test_and_clear(uint32_t * flagp,int bit)42 static inline bool flag_test_and_clear(uint32_t *flagp,
43 int bit)
44 {
45 bool ret = flag_test(flagp, bit);
46
47 flag_clear(flagp, bit);
48
49 return ret;
50 }
51
flags_set(uint32_t * flagp,uint32_t flags)52 static inline void flags_set(uint32_t *flagp,
53 uint32_t flags)
54 {
55 *flagp = flags;
56 }
57
flags_get(const uint32_t * flagp)58 static inline uint32_t flags_get(const uint32_t *flagp)
59 {
60 return *flagp;
61 }
62
63 /* Lock to protect the internal state of all work items, work queues,
64 * and pending_cancels.
65 */
66 static struct k_spinlock lock;
67
68 /* Invoked by work thread */
handle_flush(struct k_work * work)69 static void handle_flush(struct k_work *work) { }
70
init_flusher(struct z_work_flusher * flusher)71 static inline void init_flusher(struct z_work_flusher *flusher)
72 {
73 struct k_work *work = &flusher->work;
74 k_sem_init(&flusher->sem, 0, 1);
75 k_work_init(&flusher->work, handle_flush);
76 flag_set(&work->flags, K_WORK_FLUSHING_BIT);
77 }
78
79 /* List of pending cancellations. */
80 static sys_slist_t pending_cancels;
81
82 /* Initialize a canceler record and add it to the list of pending
83 * cancels.
84 *
85 * Invoked with work lock held.
86 *
87 * @param canceler the structure used to notify a waiting process.
88 * @param work the work structure that is to be canceled
89 */
init_work_cancel(struct z_work_canceller * canceler,struct k_work * work)90 static inline void init_work_cancel(struct z_work_canceller *canceler,
91 struct k_work *work)
92 {
93 k_sem_init(&canceler->sem, 0, 1);
94 canceler->work = work;
95 sys_slist_append(&pending_cancels, &canceler->node);
96 }
97
98 /* Complete flushing of a work item.
99 *
100 * Invoked with work lock held.
101 *
102 * Invoked from a work queue thread.
103 *
104 * Reschedules.
105 *
106 * @param work the work structure that has completed flushing.
107 */
finalize_flush_locked(struct k_work * work)108 static void finalize_flush_locked(struct k_work *work)
109 {
110 struct z_work_flusher *flusher
111 = CONTAINER_OF(work, struct z_work_flusher, work);
112
113 flag_clear(&work->flags, K_WORK_FLUSHING_BIT);
114
115 k_sem_give(&flusher->sem);
116 };
117
118 /* Complete cancellation of a work item and unlock held lock.
119 *
120 * Invoked with work lock held.
121 *
122 * Invoked from a work queue thread.
123 *
124 * Reschedules.
125 *
126 * @param work the work structure that has completed cancellation
127 */
finalize_cancel_locked(struct k_work * work)128 static void finalize_cancel_locked(struct k_work *work)
129 {
130 struct z_work_canceller *wc, *tmp;
131 sys_snode_t *prev = NULL;
132
133 /* Clear this first, so released high-priority threads don't
134 * see it when doing things.
135 */
136 flag_clear(&work->flags, K_WORK_CANCELING_BIT);
137
138 /* Search for and remove the matching container, and release
139 * what's waiting for the completion. The same work item can
140 * appear multiple times in the list if multiple threads
141 * attempt to cancel it.
142 */
143 SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&pending_cancels, wc, tmp, node) {
144 if (wc->work == work) {
145 sys_slist_remove(&pending_cancels, prev, &wc->node);
146 k_sem_give(&wc->sem);
147 break;
148 }
149 prev = &wc->node;
150 }
151 }
152
k_work_init(struct k_work * work,k_work_handler_t handler)153 void k_work_init(struct k_work *work,
154 k_work_handler_t handler)
155 {
156 __ASSERT_NO_MSG(work != NULL);
157 __ASSERT_NO_MSG(handler != NULL);
158
159 *work = (struct k_work)Z_WORK_INITIALIZER(handler);
160
161 SYS_PORT_TRACING_OBJ_INIT(k_work, work);
162 }
163
work_busy_get_locked(const struct k_work * work)164 static inline int work_busy_get_locked(const struct k_work *work)
165 {
166 return flags_get(&work->flags) & K_WORK_MASK;
167 }
168
k_work_busy_get(const struct k_work * work)169 int k_work_busy_get(const struct k_work *work)
170 {
171 k_spinlock_key_t key = k_spin_lock(&lock);
172 int ret = work_busy_get_locked(work);
173
174 k_spin_unlock(&lock, key);
175
176 return ret;
177 }
178
179 /* Add a flusher work item to the queue.
180 *
181 * Invoked with work lock held.
182 *
183 * Caller must notify queue of pending work.
184 *
185 * @param queue queue on which a work item may appear.
186 * @param work the work item that is either queued or running on @p
187 * queue
188 * @param flusher an uninitialized/unused flusher object
189 */
queue_flusher_locked(struct k_work_q * queue,struct k_work * work,struct z_work_flusher * flusher)190 static void queue_flusher_locked(struct k_work_q *queue,
191 struct k_work *work,
192 struct z_work_flusher *flusher)
193 {
194 init_flusher(flusher);
195
196 if ((flags_get(&work->flags) & K_WORK_QUEUED) != 0U) {
197 sys_slist_insert(&queue->pending, &work->node,
198 &flusher->work.node);
199 } else {
200 sys_slist_prepend(&queue->pending, &flusher->work.node);
201 }
202 }
203
204 /* Try to remove a work item from the given queue.
205 *
206 * Invoked with work lock held.
207 *
208 * @param queue the queue from which the work should be removed
209 * @param work work that may be on the queue
210 */
queue_remove_locked(struct k_work_q * queue,struct k_work * work)211 static inline void queue_remove_locked(struct k_work_q *queue,
212 struct k_work *work)
213 {
214 if (flag_test_and_clear(&work->flags, K_WORK_QUEUED_BIT)) {
215 (void)sys_slist_find_and_remove(&queue->pending, &work->node);
216 }
217 }
218
219 /* Potentially notify a queue that it needs to look for pending work.
220 *
221 * This may make the work queue thread ready, but as the lock is held it
222 * will not be a reschedule point. Callers should yield after the lock is
223 * released where appropriate (generally if this returns true).
224 *
225 * @param queue to be notified. If this is null no notification is required.
226 *
227 * @return true if and only if the queue was notified and woken, i.e. a
228 * reschedule is pending.
229 */
notify_queue_locked(struct k_work_q * queue)230 static inline bool notify_queue_locked(struct k_work_q *queue)
231 {
232 bool rv = false;
233
234 if (queue != NULL) {
235 rv = z_sched_wake(&queue->notifyq, 0, NULL);
236 }
237
238 return rv;
239 }
240
241 /* Submit an work item to a queue if queue state allows new work.
242 *
243 * Submission is rejected if no queue is provided, or if the queue is
244 * draining and the work isn't being submitted from the queue's
245 * thread (chained submission).
246 *
247 * Invoked with work lock held.
248 * Conditionally notifies queue.
249 *
250 * @param queue the queue to which work should be submitted. This may
251 * be null, in which case the submission will fail.
252 *
253 * @param work to be submitted
254 *
255 * @retval 1 if successfully queued
256 * @retval -EINVAL if no queue is provided
257 * @retval -ENODEV if the queue is not started
258 * @retval -EBUSY if the submission was rejected (draining, plugged)
259 */
queue_submit_locked(struct k_work_q * queue,struct k_work * work)260 static inline int queue_submit_locked(struct k_work_q *queue,
261 struct k_work *work)
262 {
263 if (queue == NULL) {
264 return -EINVAL;
265 }
266
267 int ret;
268 bool chained = (_current == queue->thread_id) && !k_is_in_isr();
269 bool draining = flag_test(&queue->flags, K_WORK_QUEUE_DRAIN_BIT);
270 bool plugged = flag_test(&queue->flags, K_WORK_QUEUE_PLUGGED_BIT);
271
272 /* Test for acceptability, in priority order:
273 *
274 * * -ENODEV if the queue isn't running.
275 * * -EBUSY if draining and not chained
276 * * -EBUSY if plugged and not draining
277 * * otherwise OK
278 */
279 if (!flag_test(&queue->flags, K_WORK_QUEUE_STARTED_BIT)) {
280 ret = -ENODEV;
281 } else if (draining && !chained) {
282 ret = -EBUSY;
283 } else if (plugged && !draining) {
284 ret = -EBUSY;
285 } else {
286 sys_slist_append(&queue->pending, &work->node);
287 ret = 1;
288 (void)notify_queue_locked(queue);
289 }
290
291 return ret;
292 }
293
294 /* Attempt to submit work to a queue.
295 *
296 * The submission can fail if:
297 * * the work is cancelling,
298 * * no candidate queue can be identified;
299 * * the candidate queue rejects the submission.
300 *
301 * Invoked with work lock held.
302 * Conditionally notifies queue.
303 *
304 * @param work the work structure to be submitted
305
306 * @param queuep pointer to a queue reference. On input this should
307 * dereference to the proposed queue (which may be null); after completion it
308 * will be null if the work was not submitted or if submitted will reference
309 * the queue it was submitted to. That may or may not be the queue provided
310 * on input.
311 *
312 * @retval 0 if work was already submitted to a queue
313 * @retval 1 if work was not submitted and has been queued to @p queue
314 * @retval 2 if work was running and has been queued to the queue that was
315 * running it
316 * @retval -EBUSY if canceling or submission was rejected by queue
317 * @retval -EINVAL if no queue is provided
318 * @retval -ENODEV if the queue is not started
319 */
submit_to_queue_locked(struct k_work * work,struct k_work_q ** queuep)320 static int submit_to_queue_locked(struct k_work *work,
321 struct k_work_q **queuep)
322 {
323 int ret = 0;
324
325 if (flag_test(&work->flags, K_WORK_CANCELING_BIT)) {
326 /* Disallowed */
327 ret = -EBUSY;
328 } else if (!flag_test(&work->flags, K_WORK_QUEUED_BIT)) {
329 /* Not currently queued */
330 ret = 1;
331
332 /* If no queue specified resubmit to last queue.
333 */
334 if (*queuep == NULL) {
335 *queuep = work->queue;
336 }
337
338 /* If the work is currently running we have to use the
339 * queue it's running on to prevent handler
340 * re-entrancy.
341 */
342 if (flag_test(&work->flags, K_WORK_RUNNING_BIT)) {
343 __ASSERT_NO_MSG(work->queue != NULL);
344 *queuep = work->queue;
345 ret = 2;
346 }
347
348 int rc = queue_submit_locked(*queuep, work);
349
350 if (rc < 0) {
351 ret = rc;
352 } else {
353 flag_set(&work->flags, K_WORK_QUEUED_BIT);
354 work->queue = *queuep;
355 }
356 } else {
357 /* Already queued, do nothing. */
358 }
359
360 if (ret <= 0) {
361 *queuep = NULL;
362 }
363
364 return ret;
365 }
366
367 /* Submit work to a queue but do not yield the current thread.
368 *
369 * Intended for internal use.
370 *
371 * See also submit_to_queue_locked().
372 *
373 * @param queuep pointer to a queue reference.
374 * @param work the work structure to be submitted
375 *
376 * @retval see submit_to_queue_locked()
377 */
z_work_submit_to_queue(struct k_work_q * queue,struct k_work * work)378 int z_work_submit_to_queue(struct k_work_q *queue,
379 struct k_work *work)
380 {
381 __ASSERT_NO_MSG(work != NULL);
382 __ASSERT_NO_MSG(work->handler != NULL);
383
384 k_spinlock_key_t key = k_spin_lock(&lock);
385
386 int ret = submit_to_queue_locked(work, &queue);
387
388 k_spin_unlock(&lock, key);
389
390 return ret;
391 }
392
k_work_submit_to_queue(struct k_work_q * queue,struct k_work * work)393 int k_work_submit_to_queue(struct k_work_q *queue,
394 struct k_work *work)
395 {
396 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, submit_to_queue, queue, work);
397
398 int ret = z_work_submit_to_queue(queue, work);
399
400 /* submit_to_queue_locked() won't reschedule on its own
401 * (really it should, otherwise this process will result in
402 * spurious calls to z_swap() due to the race), so do it here
403 * if the queue state changed.
404 */
405 if (ret > 0) {
406 z_reschedule_unlocked();
407 }
408
409 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, submit_to_queue, queue, work, ret);
410
411 return ret;
412 }
413
k_work_submit(struct k_work * work)414 int k_work_submit(struct k_work *work)
415 {
416 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, submit, work);
417
418 int ret = k_work_submit_to_queue(&k_sys_work_q, work);
419
420 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, submit, work, ret);
421
422 return ret;
423 }
424
425 /* Flush the work item if necessary.
426 *
427 * Flushing is necessary only if the work is either queued or running.
428 *
429 * Invoked with work lock held by key.
430 * Sleeps.
431 *
432 * @param work the work item that is to be flushed
433 * @param flusher state used to synchronize the flush
434 *
435 * @retval true if work is queued or running. If this happens the
436 * caller must take the flusher semaphore after releasing the lock.
437 *
438 * @retval false otherwise. No wait required.
439 */
work_flush_locked(struct k_work * work,struct z_work_flusher * flusher)440 static bool work_flush_locked(struct k_work *work,
441 struct z_work_flusher *flusher)
442 {
443 bool need_flush = (flags_get(&work->flags)
444 & (K_WORK_QUEUED | K_WORK_RUNNING)) != 0U;
445
446 if (need_flush) {
447 struct k_work_q *queue = work->queue;
448
449 __ASSERT_NO_MSG(queue != NULL);
450
451 queue_flusher_locked(queue, work, flusher);
452 notify_queue_locked(queue);
453 }
454
455 return need_flush;
456 }
457
k_work_flush(struct k_work * work,struct k_work_sync * sync)458 bool k_work_flush(struct k_work *work,
459 struct k_work_sync *sync)
460 {
461 __ASSERT_NO_MSG(work != NULL);
462 __ASSERT_NO_MSG(!flag_test(&work->flags, K_WORK_DELAYABLE_BIT));
463 __ASSERT_NO_MSG(!k_is_in_isr());
464 __ASSERT_NO_MSG(sync != NULL);
465 #ifdef CONFIG_KERNEL_COHERENCE
466 __ASSERT_NO_MSG(arch_mem_coherent(sync));
467 #endif /* CONFIG_KERNEL_COHERENCE */
468
469 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, flush, work);
470
471 struct z_work_flusher *flusher = &sync->flusher;
472 k_spinlock_key_t key = k_spin_lock(&lock);
473
474 bool need_flush = work_flush_locked(work, flusher);
475
476 k_spin_unlock(&lock, key);
477
478 /* If necessary wait until the flusher item completes */
479 if (need_flush) {
480 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_work, flush, work, K_FOREVER);
481
482 k_sem_take(&flusher->sem, K_FOREVER);
483 }
484
485 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, flush, work, need_flush);
486
487 return need_flush;
488 }
489
490 /* Execute the non-waiting steps necessary to cancel a work item.
491 *
492 * Invoked with work lock held.
493 *
494 * @param work the work item to be canceled.
495 *
496 * @retval true if we need to wait for the work item to finish canceling
497 * @retval false if the work item is idle
498 *
499 * @return k_busy_wait() captured under lock
500 */
cancel_async_locked(struct k_work * work)501 static int cancel_async_locked(struct k_work *work)
502 {
503 /* If we haven't already started canceling, do it now. */
504 if (!flag_test(&work->flags, K_WORK_CANCELING_BIT)) {
505 /* Remove it from the queue, if it's queued. */
506 queue_remove_locked(work->queue, work);
507 }
508
509 /* If it's still busy after it's been dequeued, then flag it
510 * as canceling.
511 */
512 int ret = work_busy_get_locked(work);
513
514 if (ret != 0) {
515 flag_set(&work->flags, K_WORK_CANCELING_BIT);
516 ret = work_busy_get_locked(work);
517 }
518
519 return ret;
520 }
521
522 /* Complete cancellation necessary, release work lock, and wait if
523 * necessary.
524 *
525 * Invoked with work lock held by key.
526 * Sleeps.
527 *
528 * @param work work that is being canceled
529 * @param canceller state used to synchronize the cancellation
530 * @param key used by work lock
531 *
532 * @retval true if and only if the work was still active on entry. The caller
533 * must wait on the canceller semaphore after releasing the lock.
534 *
535 * @retval false if work was idle on entry. The caller need not wait.
536 */
cancel_sync_locked(struct k_work * work,struct z_work_canceller * canceller)537 static bool cancel_sync_locked(struct k_work *work,
538 struct z_work_canceller *canceller)
539 {
540 bool ret = flag_test(&work->flags, K_WORK_CANCELING_BIT);
541
542 /* If something's still running then we have to wait for
543 * completion, which is indicated when finish_cancel() gets
544 * invoked.
545 */
546 if (ret) {
547 init_work_cancel(canceller, work);
548 }
549
550 return ret;
551 }
552
k_work_cancel(struct k_work * work)553 int k_work_cancel(struct k_work *work)
554 {
555 __ASSERT_NO_MSG(work != NULL);
556 __ASSERT_NO_MSG(!flag_test(&work->flags, K_WORK_DELAYABLE_BIT));
557
558 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, cancel, work);
559
560 k_spinlock_key_t key = k_spin_lock(&lock);
561 int ret = cancel_async_locked(work);
562
563 k_spin_unlock(&lock, key);
564
565 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, cancel, work, ret);
566
567 return ret;
568 }
569
k_work_cancel_sync(struct k_work * work,struct k_work_sync * sync)570 bool k_work_cancel_sync(struct k_work *work,
571 struct k_work_sync *sync)
572 {
573 __ASSERT_NO_MSG(work != NULL);
574 __ASSERT_NO_MSG(sync != NULL);
575 __ASSERT_NO_MSG(!flag_test(&work->flags, K_WORK_DELAYABLE_BIT));
576 __ASSERT_NO_MSG(!k_is_in_isr());
577 #ifdef CONFIG_KERNEL_COHERENCE
578 __ASSERT_NO_MSG(arch_mem_coherent(sync));
579 #endif /* CONFIG_KERNEL_COHERENCE */
580
581 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, cancel_sync, work, sync);
582
583 struct z_work_canceller *canceller = &sync->canceller;
584 k_spinlock_key_t key = k_spin_lock(&lock);
585 bool pending = (work_busy_get_locked(work) != 0U);
586 bool need_wait = false;
587
588 if (pending) {
589 (void)cancel_async_locked(work);
590 need_wait = cancel_sync_locked(work, canceller);
591 }
592
593 k_spin_unlock(&lock, key);
594
595 if (need_wait) {
596 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_work, cancel_sync, work, sync);
597
598 k_sem_take(&canceller->sem, K_FOREVER);
599 }
600
601 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, cancel_sync, work, sync, pending);
602 return pending;
603 }
604
605 #if defined(CONFIG_WORKQUEUE_WORK_TIMEOUT)
work_timeout_handler(struct _timeout * record)606 static void work_timeout_handler(struct _timeout *record)
607 {
608 struct k_work_q *queue = CONTAINER_OF(record, struct k_work_q, work_timeout_record);
609 struct k_work *work;
610 k_work_handler_t handler;
611 const char *name;
612 const char *space = " ";
613
614 K_SPINLOCK(&lock) {
615 work = queue->work;
616 handler = work->handler;
617 }
618
619 name = k_thread_name_get(queue->thread_id);
620 if (name == NULL) {
621 name = "";
622 space = "";
623 }
624
625 LOG_ERR("queue %p%s%s blocked by work %p with handler %p",
626 queue, space, name, work, handler);
627
628 k_thread_abort(queue->thread_id);
629 }
630
work_timeout_start_locked(struct k_work_q * queue,struct k_work * work)631 static void work_timeout_start_locked(struct k_work_q *queue, struct k_work *work)
632 {
633 if (K_TIMEOUT_EQ(queue->work_timeout, K_FOREVER)) {
634 return;
635 }
636
637 queue->work = work;
638 z_add_timeout(&queue->work_timeout_record, work_timeout_handler, queue->work_timeout);
639 }
640
work_timeout_stop_locked(struct k_work_q * queue)641 static void work_timeout_stop_locked(struct k_work_q *queue)
642 {
643 if (K_TIMEOUT_EQ(queue->work_timeout, K_FOREVER)) {
644 return;
645 }
646
647 z_abort_timeout(&queue->work_timeout_record);
648 }
649 #endif /* defined(CONFIG_WORKQUEUE_WORK_TIMEOUT) */
650
651 /* Loop executed by a work queue thread.
652 *
653 * @param workq_ptr pointer to the work queue structure
654 */
work_queue_main(void * workq_ptr,void * p2,void * p3)655 static void work_queue_main(void *workq_ptr, void *p2, void *p3)
656 {
657 ARG_UNUSED(p2);
658 ARG_UNUSED(p3);
659
660 struct k_work_q *queue = (struct k_work_q *)workq_ptr;
661
662 while (true) {
663 sys_snode_t *node;
664 struct k_work *work = NULL;
665 k_work_handler_t handler = NULL;
666 k_spinlock_key_t key = k_spin_lock(&lock);
667 bool yield;
668
669 /* Check for and prepare any new work. */
670 node = sys_slist_get(&queue->pending);
671 if (node != NULL) {
672 /* Mark that there's some work active that's
673 * not on the pending list.
674 */
675 flag_set(&queue->flags, K_WORK_QUEUE_BUSY_BIT);
676 work = CONTAINER_OF(node, struct k_work, node);
677 flag_set(&work->flags, K_WORK_RUNNING_BIT);
678 flag_clear(&work->flags, K_WORK_QUEUED_BIT);
679
680 /* Static code analysis tool can raise a false-positive violation
681 * in the line below that 'work' is checked for null after being
682 * dereferenced.
683 *
684 * The work is figured out by CONTAINER_OF, as a container
685 * of type struct k_work that contains the node.
686 * The only way for it to be NULL is if node would be a member
687 * of struct k_work object that has been placed at address NULL,
688 * which should never happen, even line 'if (work != NULL)'
689 * ensures that.
690 * This means that if node is not NULL, then work will not be NULL.
691 */
692 handler = work->handler;
693 } else if (flag_test_and_clear(&queue->flags,
694 K_WORK_QUEUE_DRAIN_BIT)) {
695 /* Not busy and draining: move threads waiting for
696 * drain to ready state. The held spinlock inhibits
697 * immediate reschedule; released threads get their
698 * chance when this invokes z_sched_wait() below.
699 *
700 * We don't touch K_WORK_QUEUE_PLUGGABLE, so getting
701 * here doesn't mean that the queue will allow new
702 * submissions.
703 */
704 (void)z_sched_wake_all(&queue->drainq, 1, NULL);
705 } else if (flag_test(&queue->flags, K_WORK_QUEUE_STOP_BIT)) {
706 /* User has requested that the queue stop. Clear the status flags and exit.
707 */
708 flags_set(&queue->flags, 0);
709 k_spin_unlock(&lock, key);
710 return;
711 } else {
712 /* No work is available and no queue state requires
713 * special handling.
714 */
715 ;
716 }
717
718 if (work == NULL) {
719 /* Nothing's had a chance to add work since we took
720 * the lock, and we didn't find work nor got asked to
721 * stop. Just go to sleep: when something happens the
722 * work thread will be woken and we can check again.
723 */
724
725 (void)z_sched_wait(&lock, key, &queue->notifyq,
726 K_FOREVER, NULL);
727 continue;
728 }
729
730 #if defined(CONFIG_WORKQUEUE_WORK_TIMEOUT)
731 work_timeout_start_locked(queue, work);
732 #endif /* defined(CONFIG_WORKQUEUE_WORK_TIMEOUT) */
733
734 k_spin_unlock(&lock, key);
735
736 __ASSERT_NO_MSG(handler != NULL);
737 handler(work);
738
739 /* Mark the work item as no longer running and deal
740 * with any cancellation and flushing issued while it
741 * was running. Clear the BUSY flag and optionally
742 * yield to prevent starving other threads.
743 */
744 key = k_spin_lock(&lock);
745
746 #if defined(CONFIG_WORKQUEUE_WORK_TIMEOUT)
747 work_timeout_stop_locked(queue);
748 #endif /* defined(CONFIG_WORKQUEUE_WORK_TIMEOUT) */
749
750 flag_clear(&work->flags, K_WORK_RUNNING_BIT);
751 if (flag_test(&work->flags, K_WORK_FLUSHING_BIT)) {
752 finalize_flush_locked(work);
753 }
754 if (flag_test(&work->flags, K_WORK_CANCELING_BIT)) {
755 finalize_cancel_locked(work);
756 }
757
758 flag_clear(&queue->flags, K_WORK_QUEUE_BUSY_BIT);
759 yield = !flag_test(&queue->flags, K_WORK_QUEUE_NO_YIELD_BIT);
760 k_spin_unlock(&lock, key);
761
762 /* Optionally yield to prevent the work queue from
763 * starving other threads.
764 */
765 if (yield) {
766 k_yield();
767 }
768 }
769 }
770
k_work_queue_init(struct k_work_q * queue)771 void k_work_queue_init(struct k_work_q *queue)
772 {
773 __ASSERT_NO_MSG(queue != NULL);
774
775 *queue = (struct k_work_q) {
776 .flags = 0,
777 };
778
779 SYS_PORT_TRACING_OBJ_INIT(k_work_queue, queue);
780 }
781
k_work_queue_run(struct k_work_q * queue,const struct k_work_queue_config * cfg)782 void k_work_queue_run(struct k_work_q *queue, const struct k_work_queue_config *cfg)
783 {
784 __ASSERT_NO_MSG(!flag_test(&queue->flags, K_WORK_QUEUE_STARTED_BIT));
785
786 uint32_t flags = K_WORK_QUEUE_STARTED;
787
788 if ((cfg != NULL) && cfg->no_yield) {
789 flags |= K_WORK_QUEUE_NO_YIELD;
790 }
791
792 if ((cfg != NULL) && (cfg->name != NULL)) {
793 k_thread_name_set(_current, cfg->name);
794 }
795
796 #if defined(CONFIG_WORKQUEUE_WORK_TIMEOUT)
797 if ((cfg != NULL) && (cfg->work_timeout_ms)) {
798 queue->work_timeout = K_MSEC(cfg->work_timeout_ms);
799 } else {
800 queue->work_timeout = K_FOREVER;
801 }
802 #endif /* defined(CONFIG_WORKQUEUE_WORK_TIMEOUT) */
803
804 sys_slist_init(&queue->pending);
805 z_waitq_init(&queue->notifyq);
806 z_waitq_init(&queue->drainq);
807 queue->thread_id = _current;
808 flags_set(&queue->flags, flags);
809 work_queue_main(queue, NULL, NULL);
810 }
811
k_work_queue_start(struct k_work_q * queue,k_thread_stack_t * stack,size_t stack_size,int prio,const struct k_work_queue_config * cfg)812 void k_work_queue_start(struct k_work_q *queue,
813 k_thread_stack_t *stack,
814 size_t stack_size,
815 int prio,
816 const struct k_work_queue_config *cfg)
817 {
818 __ASSERT_NO_MSG(queue);
819 __ASSERT_NO_MSG(stack);
820 __ASSERT_NO_MSG(!flag_test(&queue->flags, K_WORK_QUEUE_STARTED_BIT));
821
822 uint32_t flags = K_WORK_QUEUE_STARTED;
823
824 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work_queue, start, queue);
825
826 sys_slist_init(&queue->pending);
827 z_waitq_init(&queue->notifyq);
828 z_waitq_init(&queue->drainq);
829
830 if ((cfg != NULL) && cfg->no_yield) {
831 flags |= K_WORK_QUEUE_NO_YIELD;
832 }
833
834 /* It hasn't actually been started yet, but all the state is in place
835 * so we can submit things and once the thread gets control it's ready
836 * to roll.
837 */
838 flags_set(&queue->flags, flags);
839
840 (void)k_thread_create(&queue->thread, stack, stack_size,
841 work_queue_main, queue, NULL, NULL,
842 prio, 0, K_FOREVER);
843
844 if ((cfg != NULL) && (cfg->name != NULL)) {
845 k_thread_name_set(&queue->thread, cfg->name);
846 }
847
848 if ((cfg != NULL) && (cfg->essential)) {
849 queue->thread.base.user_options |= K_ESSENTIAL;
850 }
851
852 #if defined(CONFIG_WORKQUEUE_WORK_TIMEOUT)
853 if ((cfg != NULL) && (cfg->work_timeout_ms)) {
854 queue->work_timeout = K_MSEC(cfg->work_timeout_ms);
855 } else {
856 queue->work_timeout = K_FOREVER;
857 }
858 #endif /* defined(CONFIG_WORKQUEUE_WORK_TIMEOUT) */
859
860 k_thread_start(&queue->thread);
861 queue->thread_id = &queue->thread;
862
863 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work_queue, start, queue);
864 }
865
k_work_queue_drain(struct k_work_q * queue,bool plug)866 int k_work_queue_drain(struct k_work_q *queue,
867 bool plug)
868 {
869 __ASSERT_NO_MSG(queue);
870 __ASSERT_NO_MSG(!k_is_in_isr());
871
872 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work_queue, drain, queue);
873
874 int ret = 0;
875 k_spinlock_key_t key = k_spin_lock(&lock);
876
877 if (((flags_get(&queue->flags)
878 & (K_WORK_QUEUE_BUSY | K_WORK_QUEUE_DRAIN)) != 0U)
879 || plug
880 || !sys_slist_is_empty(&queue->pending)) {
881 flag_set(&queue->flags, K_WORK_QUEUE_DRAIN_BIT);
882 if (plug) {
883 flag_set(&queue->flags, K_WORK_QUEUE_PLUGGED_BIT);
884 }
885
886 notify_queue_locked(queue);
887 ret = z_sched_wait(&lock, key, &queue->drainq,
888 K_FOREVER, NULL);
889 } else {
890 k_spin_unlock(&lock, key);
891 }
892
893 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work_queue, drain, queue, ret);
894
895 return ret;
896 }
897
k_work_queue_unplug(struct k_work_q * queue)898 int k_work_queue_unplug(struct k_work_q *queue)
899 {
900 __ASSERT_NO_MSG(queue);
901
902 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work_queue, unplug, queue);
903
904 int ret = -EALREADY;
905 k_spinlock_key_t key = k_spin_lock(&lock);
906
907 if (flag_test_and_clear(&queue->flags, K_WORK_QUEUE_PLUGGED_BIT)) {
908 ret = 0;
909 }
910
911 k_spin_unlock(&lock, key);
912
913 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work_queue, unplug, queue, ret);
914
915 return ret;
916 }
917
k_work_queue_stop(struct k_work_q * queue,k_timeout_t timeout)918 int k_work_queue_stop(struct k_work_q *queue, k_timeout_t timeout)
919 {
920 __ASSERT_NO_MSG(queue);
921
922 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work_queue, stop, queue, timeout);
923 k_spinlock_key_t key = k_spin_lock(&lock);
924
925 if (!flag_test(&queue->flags, K_WORK_QUEUE_STARTED_BIT)) {
926 k_spin_unlock(&lock, key);
927 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work_queue, stop, queue, timeout, -EALREADY);
928 return -EALREADY;
929 }
930
931 if (!flag_test(&queue->flags, K_WORK_QUEUE_PLUGGED_BIT)) {
932 k_spin_unlock(&lock, key);
933 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work_queue, stop, queue, timeout, -EBUSY);
934 return -EBUSY;
935 }
936
937 flag_set(&queue->flags, K_WORK_QUEUE_STOP_BIT);
938 notify_queue_locked(queue);
939 k_spin_unlock(&lock, key);
940 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_work_queue, stop, queue, timeout);
941 if (k_thread_join(queue->thread_id, timeout)) {
942 key = k_spin_lock(&lock);
943 flag_clear(&queue->flags, K_WORK_QUEUE_STOP_BIT);
944 k_spin_unlock(&lock, key);
945 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work_queue, stop, queue, timeout, -ETIMEDOUT);
946 return -ETIMEDOUT;
947 }
948
949 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work_queue, stop, queue, timeout, 0);
950 return 0;
951 }
952
953 #ifdef CONFIG_SYS_CLOCK_EXISTS
954
955 /* Timeout handler for delayable work.
956 *
957 * Invoked by timeout infrastructure.
958 * Takes and releases work lock.
959 * Conditionally reschedules.
960 */
work_timeout(struct _timeout * to)961 static void work_timeout(struct _timeout *to)
962 {
963 struct k_work_delayable *dw
964 = CONTAINER_OF(to, struct k_work_delayable, timeout);
965 struct k_work *wp = &dw->work;
966 k_spinlock_key_t key = k_spin_lock(&lock);
967 struct k_work_q *queue = NULL;
968
969 /* If the work is still marked delayed (should be) then clear that
970 * state and submit it to the queue. If successful the queue will be
971 * notified of new work at the next reschedule point.
972 *
973 * If not successful there is no notification that the work has been
974 * abandoned. Sorry.
975 */
976 if (flag_test_and_clear(&wp->flags, K_WORK_DELAYED_BIT)) {
977 queue = dw->queue;
978 (void)submit_to_queue_locked(wp, &queue);
979 }
980
981 k_spin_unlock(&lock, key);
982 }
983
k_work_init_delayable(struct k_work_delayable * dwork,k_work_handler_t handler)984 void k_work_init_delayable(struct k_work_delayable *dwork,
985 k_work_handler_t handler)
986 {
987 __ASSERT_NO_MSG(dwork != NULL);
988 __ASSERT_NO_MSG(handler != NULL);
989
990 *dwork = (struct k_work_delayable){
991 .work = {
992 .handler = handler,
993 .flags = K_WORK_DELAYABLE,
994 },
995 };
996 z_init_timeout(&dwork->timeout);
997
998 SYS_PORT_TRACING_OBJ_INIT(k_work_delayable, dwork);
999 }
1000
work_delayable_busy_get_locked(const struct k_work_delayable * dwork)1001 static inline int work_delayable_busy_get_locked(const struct k_work_delayable *dwork)
1002 {
1003 return flags_get(&dwork->work.flags) & K_WORK_MASK;
1004 }
1005
k_work_delayable_busy_get(const struct k_work_delayable * dwork)1006 int k_work_delayable_busy_get(const struct k_work_delayable *dwork)
1007 {
1008 __ASSERT_NO_MSG(dwork != NULL);
1009
1010 k_spinlock_key_t key = k_spin_lock(&lock);
1011 int ret = work_delayable_busy_get_locked(dwork);
1012
1013 k_spin_unlock(&lock, key);
1014 return ret;
1015 }
1016
1017 /* Attempt to schedule a work item for future (maybe immediate)
1018 * submission.
1019 *
1020 * Invoked with work lock held.
1021 *
1022 * See also submit_to_queue_locked(), which implements this for a no-wait
1023 * delay.
1024 *
1025 * Invoked with work lock held.
1026 *
1027 * @param queuep pointer to a pointer to a queue. On input this
1028 * should dereference to the proposed queue (which may be null); after
1029 * completion it will be null if the work was not submitted or if
1030 * submitted will reference the queue it was submitted to. That may
1031 * or may not be the queue provided on input.
1032 *
1033 * @param dwork the delayed work structure
1034 *
1035 * @param delay the delay to use before scheduling.
1036 *
1037 * @retval from submit_to_queue_locked() if delay is K_NO_WAIT; otherwise
1038 * @retval 1 to indicate successfully scheduled.
1039 */
schedule_for_queue_locked(struct k_work_q ** queuep,struct k_work_delayable * dwork,k_timeout_t delay)1040 static int schedule_for_queue_locked(struct k_work_q **queuep,
1041 struct k_work_delayable *dwork,
1042 k_timeout_t delay)
1043 {
1044 int ret = 1;
1045 struct k_work *work = &dwork->work;
1046
1047 if (K_TIMEOUT_EQ(delay, K_NO_WAIT)) {
1048 return submit_to_queue_locked(work, queuep);
1049 }
1050
1051 flag_set(&work->flags, K_WORK_DELAYED_BIT);
1052 dwork->queue = *queuep;
1053
1054 /* Add timeout */
1055 z_add_timeout(&dwork->timeout, work_timeout, delay);
1056
1057 return ret;
1058 }
1059
1060 /* Unschedule delayable work.
1061 *
1062 * If the work is delayed, cancel the timeout and clear the delayed
1063 * flag.
1064 *
1065 * Invoked with work lock held.
1066 *
1067 * @param dwork pointer to delayable work structure.
1068 *
1069 * @return true if and only if work had been delayed so the timeout
1070 * was cancelled.
1071 */
unschedule_locked(struct k_work_delayable * dwork)1072 static inline bool unschedule_locked(struct k_work_delayable *dwork)
1073 {
1074 bool ret = false;
1075 struct k_work *work = &dwork->work;
1076
1077 /* If scheduled, try to cancel. If it fails, that means the
1078 * callback has been dequeued and will inevitably run (or has
1079 * already run), so treat that as "undelayed" and return
1080 * false.
1081 */
1082 if (flag_test_and_clear(&work->flags, K_WORK_DELAYED_BIT)) {
1083 ret = z_abort_timeout(&dwork->timeout) == 0;
1084 }
1085
1086 return ret;
1087 }
1088
1089 /* Full cancellation of a delayable work item.
1090 *
1091 * Unschedules the delayed part then delegates to standard work
1092 * cancellation.
1093 *
1094 * Invoked with work lock held.
1095 *
1096 * @param dwork delayable work item
1097 *
1098 * @return k_work_busy_get() flags
1099 */
cancel_delayable_async_locked(struct k_work_delayable * dwork)1100 static int cancel_delayable_async_locked(struct k_work_delayable *dwork)
1101 {
1102 (void)unschedule_locked(dwork);
1103
1104 return cancel_async_locked(&dwork->work);
1105 }
1106
k_work_schedule_for_queue(struct k_work_q * queue,struct k_work_delayable * dwork,k_timeout_t delay)1107 int k_work_schedule_for_queue(struct k_work_q *queue, struct k_work_delayable *dwork,
1108 k_timeout_t delay)
1109 {
1110 __ASSERT_NO_MSG(queue != NULL);
1111 __ASSERT_NO_MSG(dwork != NULL);
1112
1113 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, schedule_for_queue, queue, dwork, delay);
1114
1115 struct k_work *work = &dwork->work;
1116 int ret = 0;
1117 k_spinlock_key_t key = k_spin_lock(&lock);
1118
1119 /* Schedule the work item if it's idle or running. */
1120 if ((work_busy_get_locked(work) & ~K_WORK_RUNNING) == 0U) {
1121 ret = schedule_for_queue_locked(&queue, dwork, delay);
1122 }
1123
1124 k_spin_unlock(&lock, key);
1125
1126 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, schedule_for_queue, queue, dwork, delay, ret);
1127
1128 return ret;
1129 }
1130
k_work_schedule(struct k_work_delayable * dwork,k_timeout_t delay)1131 int k_work_schedule(struct k_work_delayable *dwork, k_timeout_t delay)
1132 {
1133 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, schedule, dwork, delay);
1134
1135 int ret = k_work_schedule_for_queue(&k_sys_work_q, dwork, delay);
1136
1137 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, schedule, dwork, delay, ret);
1138
1139 return ret;
1140 }
1141
k_work_reschedule_for_queue(struct k_work_q * queue,struct k_work_delayable * dwork,k_timeout_t delay)1142 int k_work_reschedule_for_queue(struct k_work_q *queue, struct k_work_delayable *dwork,
1143 k_timeout_t delay)
1144 {
1145 __ASSERT_NO_MSG(queue != NULL);
1146 __ASSERT_NO_MSG(dwork != NULL);
1147
1148 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, reschedule_for_queue, queue, dwork, delay);
1149
1150 int ret;
1151 k_spinlock_key_t key = k_spin_lock(&lock);
1152
1153 /* Remove any active scheduling. */
1154 (void)unschedule_locked(dwork);
1155
1156 /* Schedule the work item with the new parameters. */
1157 ret = schedule_for_queue_locked(&queue, dwork, delay);
1158
1159 k_spin_unlock(&lock, key);
1160
1161 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, reschedule_for_queue, queue, dwork, delay, ret);
1162
1163 return ret;
1164 }
1165
k_work_reschedule(struct k_work_delayable * dwork,k_timeout_t delay)1166 int k_work_reschedule(struct k_work_delayable *dwork, k_timeout_t delay)
1167 {
1168 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, reschedule, dwork, delay);
1169
1170 int ret = k_work_reschedule_for_queue(&k_sys_work_q, dwork, delay);
1171
1172 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, reschedule, dwork, delay, ret);
1173
1174 return ret;
1175 }
1176
k_work_cancel_delayable(struct k_work_delayable * dwork)1177 int k_work_cancel_delayable(struct k_work_delayable *dwork)
1178 {
1179 __ASSERT_NO_MSG(dwork != NULL);
1180
1181 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, cancel_delayable, dwork);
1182
1183 k_spinlock_key_t key = k_spin_lock(&lock);
1184 int ret = cancel_delayable_async_locked(dwork);
1185
1186 k_spin_unlock(&lock, key);
1187
1188 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, cancel_delayable, dwork, ret);
1189
1190 return ret;
1191 }
1192
k_work_cancel_delayable_sync(struct k_work_delayable * dwork,struct k_work_sync * sync)1193 bool k_work_cancel_delayable_sync(struct k_work_delayable *dwork,
1194 struct k_work_sync *sync)
1195 {
1196 __ASSERT_NO_MSG(dwork != NULL);
1197 __ASSERT_NO_MSG(sync != NULL);
1198 __ASSERT_NO_MSG(!k_is_in_isr());
1199 #ifdef CONFIG_KERNEL_COHERENCE
1200 __ASSERT_NO_MSG(arch_mem_coherent(sync));
1201 #endif /* CONFIG_KERNEL_COHERENCE */
1202
1203 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, cancel_delayable_sync, dwork, sync);
1204
1205 struct z_work_canceller *canceller = &sync->canceller;
1206 k_spinlock_key_t key = k_spin_lock(&lock);
1207 bool pending = (work_delayable_busy_get_locked(dwork) != 0U);
1208 bool need_wait = false;
1209
1210 if (pending) {
1211 (void)cancel_delayable_async_locked(dwork);
1212 need_wait = cancel_sync_locked(&dwork->work, canceller);
1213 }
1214
1215 k_spin_unlock(&lock, key);
1216
1217 if (need_wait) {
1218 k_sem_take(&canceller->sem, K_FOREVER);
1219 }
1220
1221 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, cancel_delayable_sync, dwork, sync, pending);
1222 return pending;
1223 }
1224
k_work_flush_delayable(struct k_work_delayable * dwork,struct k_work_sync * sync)1225 bool k_work_flush_delayable(struct k_work_delayable *dwork,
1226 struct k_work_sync *sync)
1227 {
1228 __ASSERT_NO_MSG(dwork != NULL);
1229 __ASSERT_NO_MSG(sync != NULL);
1230 __ASSERT_NO_MSG(!k_is_in_isr());
1231 #ifdef CONFIG_KERNEL_COHERENCE
1232 __ASSERT_NO_MSG(arch_mem_coherent(sync));
1233 #endif /* CONFIG_KERNEL_COHERENCE */
1234
1235 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, flush_delayable, dwork, sync);
1236
1237 struct k_work *work = &dwork->work;
1238 struct z_work_flusher *flusher = &sync->flusher;
1239 k_spinlock_key_t key = k_spin_lock(&lock);
1240
1241 /* If it's idle release the lock and return immediately. */
1242 if (work_busy_get_locked(work) == 0U) {
1243 k_spin_unlock(&lock, key);
1244
1245 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, flush_delayable, dwork, sync, false);
1246
1247 return false;
1248 }
1249
1250 /* If unscheduling did something then submit it. Ignore a
1251 * failed submission (e.g. when cancelling).
1252 */
1253 if (unschedule_locked(dwork)) {
1254 struct k_work_q *queue = dwork->queue;
1255
1256 (void)submit_to_queue_locked(work, &queue);
1257 }
1258
1259 /* Wait for it to finish */
1260 bool need_flush = work_flush_locked(work, flusher);
1261
1262 k_spin_unlock(&lock, key);
1263
1264 /* If necessary wait until the flusher item completes */
1265 if (need_flush) {
1266 k_sem_take(&flusher->sem, K_FOREVER);
1267 }
1268
1269 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, flush_delayable, dwork, sync, need_flush);
1270
1271 return need_flush;
1272 }
1273
1274 #endif /* CONFIG_SYS_CLOCK_EXISTS */
1275