1 // Copyright 2016 The Fuchsia Authors
2 // Copyright (c) 2008-2015 Travis Geiselbrecht
3 //
4 // Use of this source code is governed by a MIT-style
5 // license that can be found in the LICENSE file or at
6 // https://opensource.org/licenses/MIT
7 
8 #include <kernel/wait.h>
9 
10 #include <err.h>
11 #include <kernel/sched.h>
12 #include <kernel/thread.h>
13 #include <kernel/timer.h>
14 #include <lib/ktrace.h>
15 #include <platform.h>
16 #include <trace.h>
17 
18 #define LOCAL_TRACE 0
19 
20 // add expensive code to do a full validation of the wait queue at various entry points
21 // to this module.
22 #define WAIT_QUEUE_VALIDATION (0 || (LK_DEBUGLEVEL > 2))
23 
24 // Wait queues are building blocks that other locking primitives use to
25 // handle blocking threads.
26 //
27 // Implemented as a simple structure that contains a count of the number of threads
28 // blocked and a list of thread_ts acting as individual queue heads, one per priority.
29 
30 // +----------------+
31 // |                |
32 // |  wait_queue_t  |
33 // |                |
34 // +-------+--------+
35 //         |
36 //         |
37 //   +-----v-------+    +-------------+   +-------------+
38 //   |             +---->             +--->             |
39 //   |   thread_t  |    |   thread_t  |   |   thread_t  |
40 //   |   pri 31    |    |   pri 17    |   |   pri 8     |
41 //   |             <----+             <---+             |
42 //   +---+----^----+    +-------------+   +----+---^----+
43 //       |    |                                |   |
44 //   +---v----+----+                      +----v---+----+
45 //   |             |                      |             |
46 //   |   thread_t  |                      |   thread_t  |
47 //   |   pri 31    |                      |   pri 8     |
48 //   |             |                      |             |
49 //   +---+----^----+                      +-------------+
50 //       |    |
51 //   +---v----+----+
52 //   |             |
53 //   |   thread_t  |
54 //   |   pri 31    |
55 //   |             |
56 //   +-------------+
57 
wait_queue_init(wait_queue_t * wait)58 void wait_queue_init(wait_queue_t* wait) {
59     *wait = (wait_queue_t)WAIT_QUEUE_INITIAL_VALUE(*wait);
60 }
61 
wait_queue_validate_queue(wait_queue_t * wait)62 void wait_queue_validate_queue(wait_queue_t* wait) {
63     DEBUG_ASSERT(wait->magic == WAIT_QUEUE_MAGIC);
64     DEBUG_ASSERT(arch_ints_disabled());
65     DEBUG_ASSERT(spin_lock_held(&thread_lock));
66 
67     // validate that the queue is sorted properly
68     thread_t* last = NULL;
69     thread_t* temp;
70     list_for_every_entry (&wait->heads, temp, thread_t, wait_queue_heads_node) {
71         DEBUG_ASSERT(temp->magic == THREAD_MAGIC);
72 
73         // validate that the queue is sorted high to low priority
74         if (last) {
75             DEBUG_ASSERT_MSG(last->effec_priority > temp->effec_priority,
76                              "%p:%d  %p:%d",
77                              last, last->effec_priority,
78                              temp, temp->effec_priority);
79         }
80 
81         // walk any threads linked to this head, validating that they're the same priority
82         thread_t* temp2;
83         list_for_every_entry (&temp->queue_node, temp2, thread_t, queue_node) {
84             DEBUG_ASSERT(temp2->magic == THREAD_MAGIC);
85             DEBUG_ASSERT_MSG(temp->effec_priority == temp2->effec_priority,
86                              "%p:%d  %p:%d",
87                              temp, temp->effec_priority,
88                              temp2, temp2->effec_priority);
89         }
90 
91         last = temp;
92     }
93 }
94 
95 // add a thread to the tail of a wait queue, sorted by priority
wait_queue_insert(wait_queue_t * wait,thread_t * t)96 static void wait_queue_insert(wait_queue_t* wait, thread_t* t) {
97     if (likely(list_is_empty(&wait->heads))) {
98         // we're the first thread
99         list_initialize(&t->queue_node);
100         list_add_head(&wait->heads, &t->wait_queue_heads_node);
101     } else {
102         int pri = t->effec_priority;
103 
104         // walk through the sorted list of wait queue heads
105         thread_t* temp;
106         list_for_every_entry (&wait->heads, temp, thread_t, wait_queue_heads_node) {
107             if (pri > temp->effec_priority) {
108                 // insert ourself here as a new queue head
109                 list_initialize(&t->queue_node);
110                 list_add_before(&temp->wait_queue_heads_node, &t->wait_queue_heads_node);
111                 return;
112             } else if (temp->effec_priority == pri) {
113                 // same priority, add ourself to the tail of this queue
114                 list_add_tail(&temp->queue_node, &t->queue_node);
115                 list_clear_node(&t->wait_queue_heads_node);
116                 return;
117             }
118         }
119 
120         // we walked off the end, add ourself as a new queue head at the end
121         list_initialize(&t->queue_node);
122         list_add_tail(&wait->heads, &t->wait_queue_heads_node);
123     }
124 }
125 
126 // remove a thread from whatever wait queue its in
127 // thread must be the head of a queue
remove_queue_head(thread_t * t)128 static void remove_queue_head(thread_t* t) {
129     // are there any nodes in the queue for this priority?
130     if (list_is_empty(&t->queue_node)) {
131         // no, remove ourself from the queue list
132         list_delete(&t->wait_queue_heads_node);
133         list_clear_node(&t->queue_node);
134     } else {
135         // there are other threads in this list, make the next thread in the queue the head
136         thread_t* newhead = list_peek_head_type(&t->queue_node, thread_t, queue_node);
137         list_delete(&t->queue_node);
138 
139         // patch in the new head into the queue head list
140         list_replace_node(&t->wait_queue_heads_node, &newhead->wait_queue_heads_node);
141     }
142 }
143 
144 // remove the head of the highest priority queue
wait_queue_pop_head(wait_queue_t * wait)145 static thread_t* wait_queue_pop_head(wait_queue_t* wait) {
146     thread_t* t = NULL;
147 
148     t = list_peek_head_type(&wait->heads, thread_t, wait_queue_heads_node);
149     if (!t) {
150         return NULL;
151     }
152 
153     remove_queue_head(t);
154 
155     return t;
156 }
157 
158 // remove the thread from whatever wait queue its in
wait_queue_remove_thread(thread_t * t)159 static void wait_queue_remove_thread(thread_t* t) {
160     if (!list_in_list(&t->wait_queue_heads_node)) {
161         // we're just in a queue, not a head
162         list_delete(&t->queue_node);
163     } else {
164         // we're the head of a queue
165         remove_queue_head(t);
166     }
167 }
168 
169 // return the numeric priority of the highest priority thread queued
wait_queue_blocked_priority(wait_queue_t * wait)170 int wait_queue_blocked_priority(wait_queue_t* wait) {
171     thread_t* t = list_peek_head_type(&wait->heads, thread_t, wait_queue_heads_node);
172     if (!t) {
173         return -1;
174     }
175 
176     return t->effec_priority;
177 }
178 
179 // returns a reference to the highest priority thread queued
wait_queue_peek(wait_queue_t * wait)180 thread_t* wait_queue_peek(wait_queue_t* wait) {
181     return list_peek_head_type(&wait->heads, thread_t, wait_queue_heads_node);
182 }
183 
wait_queue_timeout_handler(timer_t * timer,zx_time_t now,void * arg)184 static void wait_queue_timeout_handler(timer_t* timer, zx_time_t now,
185                                        void* arg) {
186     thread_t* thread = (thread_t*)arg;
187 
188     DEBUG_ASSERT(thread->magic == THREAD_MAGIC);
189 
190     // spin trylocking on the thread lock since the routine that set up the callback,
191     // wait_queue_block, may be trying to simultaneously cancel this timer while holding the
192     // thread_lock.
193     if (timer_trylock_or_cancel(timer, &thread_lock)) {
194         return;
195     }
196 
197     wait_queue_unblock_thread(thread, ZX_ERR_TIMED_OUT);
198 
199     spin_unlock(&thread_lock);
200 }
201 
202 /**
203  * @brief  Block until a wait queue is notified, ignoring existing signals
204  *         in |signal_mask|.
205  *
206  * This function puts the current thread at the end of a wait
207  * queue and then blocks until some other thread wakes the queue
208  * up again.
209  *
210  * @param  wait        The wait queue to enter
211  * @param  deadline    The time at which to abort the wait
212  * @param  slack       The amount of time it is acceptable to deviate from deadline
213  * @param  signal_mask Mask of existing signals to ignore
214  *
215  * If the deadline is zero, this function returns immediately with
216  * ZX_ERR_TIMED_OUT.  If the deadline is ZX_TIME_INFINITE, this function
217  * waits indefinitely.  Otherwise, this function returns with
218  * ZX_ERR_TIMED_OUT when the slack-adjusted deadline elapses.
219  *
220  * @return ZX_ERR_TIMED_OUT on timeout, else returns the return
221  * value specified when the queue was woken by wait_queue_wake_one().
222  */
wait_queue_block_etc(wait_queue_t * wait,zx_time_t deadline,TimerSlack slack,uint signal_mask)223 zx_status_t wait_queue_block_etc(wait_queue_t* wait,
224                                  zx_time_t deadline,
225                                  TimerSlack slack,
226                                  uint signal_mask) TA_REQ(thread_lock) {
227     timer_t timer;
228 
229     thread_t* current_thread = get_current_thread();
230 
231     DEBUG_ASSERT(wait->magic == WAIT_QUEUE_MAGIC);
232     DEBUG_ASSERT(current_thread->state == THREAD_RUNNING);
233     DEBUG_ASSERT(arch_ints_disabled());
234     DEBUG_ASSERT(spin_lock_held(&thread_lock));
235 
236     if (WAIT_QUEUE_VALIDATION) {
237         wait_queue_validate_queue(wait);
238     }
239 
240     if (deadline != ZX_TIME_INFINITE && deadline <= current_time()) {
241         return ZX_ERR_TIMED_OUT;
242     }
243 
244     if (current_thread->interruptable &&
245         (unlikely(current_thread->signals & ~signal_mask))) {
246         if (current_thread->signals & THREAD_SIGNAL_KILL) {
247             return ZX_ERR_INTERNAL_INTR_KILLED;
248         } else if (current_thread->signals & THREAD_SIGNAL_SUSPEND) {
249             return ZX_ERR_INTERNAL_INTR_RETRY;
250         }
251     }
252 
253     wait_queue_insert(wait, current_thread);
254     wait->count++;
255     current_thread->state = THREAD_BLOCKED;
256     current_thread->blocking_wait_queue = wait;
257     current_thread->blocked_status = ZX_OK;
258 
259     // if the deadline is nonzero or noninfinite, set a callback to yank us out of the queue
260     if (deadline != ZX_TIME_INFINITE) {
261         timer_init(&timer);
262         timer_set(&timer, deadline, slack, wait_queue_timeout_handler, (void*)current_thread);
263     }
264 
265     ktrace_ptr(TAG_KWAIT_BLOCK, wait, 0, 0);
266 
267     sched_block();
268 
269     ktrace_ptr(TAG_KWAIT_UNBLOCK, wait, current_thread->blocked_status, 0);
270 
271     // we don't really know if the timer fired or not, so it's better safe to try to cancel it
272     if (deadline != ZX_TIME_INFINITE) {
273         timer_cancel(&timer);
274     }
275 
276     return current_thread->blocked_status;
277 }
278 
279 /**
280  * @brief  Block until a wait queue is notified.
281  *
282  * This function puts the current thread at the end of a wait
283  * queue and then blocks until some other thread wakes the queue
284  * up again.
285  *
286  * @param  wait     The wait queue to enter
287  * @param  deadline The time at which to abort the wait
288  *
289  * If the deadline is zero, this function returns immediately with
290  * ZX_ERR_TIMED_OUT.  If the deadline is ZX_TIME_INFINITE, this function
291  * waits indefinitely.  Otherwise, this function returns with
292  * ZX_ERR_TIMED_OUT when the deadline occurs.
293  *
294  * @return ZX_ERR_TIMED_OUT on timeout, else returns the return
295  * value specified when the queue was woken by wait_queue_wake_one().
296  */
wait_queue_block(wait_queue_t * wait,zx_time_t deadline)297 zx_status_t wait_queue_block(wait_queue_t* wait, zx_time_t deadline) {
298     return wait_queue_block_etc(wait, deadline, kNoSlack, 0);
299 }
300 
301 /**
302  * @brief  Wake up one thread sleeping on a wait queue
303  *
304  * This function removes one thread (if any) from the head of the wait queue and
305  * makes it executable.  The new thread will be placed at the head of the
306  * run queue.
307  *
308  * @param wait  The wait queue to wake
309  * @param reschedule  If true, the newly-woken thread will run immediately.
310  * @param wait_queue_error  The return value which the new thread will receive
311  * from wait_queue_block().
312  *
313  * @return  The number of threads woken (zero or one)
314  */
wait_queue_wake_one(wait_queue_t * wait,bool reschedule,zx_status_t wait_queue_error)315 int wait_queue_wake_one(wait_queue_t* wait, bool reschedule, zx_status_t wait_queue_error) {
316     thread_t* t;
317     int ret = 0;
318 
319     DEBUG_ASSERT(wait->magic == WAIT_QUEUE_MAGIC);
320     DEBUG_ASSERT(arch_ints_disabled());
321     DEBUG_ASSERT(spin_lock_held(&thread_lock));
322 
323     if (WAIT_QUEUE_VALIDATION) {
324         wait_queue_validate_queue(wait);
325     }
326 
327     t = wait_queue_pop_head(wait);
328     if (t) {
329         wait->count--;
330         DEBUG_ASSERT(t->state == THREAD_BLOCKED);
331         t->blocked_status = wait_queue_error;
332         t->blocking_wait_queue = NULL;
333 
334         ktrace_ptr(TAG_KWAIT_WAKE, wait, 0, 0);
335 
336         // wake up the new thread, putting it in a run queue on a cpu. reschedule if the local
337         // cpu run queue was modified
338         bool local_resched = sched_unblock(t);
339         if (reschedule && local_resched) {
340             sched_reschedule();
341         }
342 
343         ret = 1;
344     }
345 
346     return ret;
347 }
348 
wait_queue_dequeue_one(wait_queue_t * wait,zx_status_t wait_queue_error)349 thread_t* wait_queue_dequeue_one(wait_queue_t* wait, zx_status_t wait_queue_error) {
350     thread_t* t;
351 
352     if (WAIT_QUEUE_VALIDATION) {
353         wait_queue_validate_queue(wait);
354     }
355 
356     DEBUG_ASSERT(wait->magic == WAIT_QUEUE_MAGIC);
357     DEBUG_ASSERT(arch_ints_disabled());
358     DEBUG_ASSERT(spin_lock_held(&thread_lock));
359 
360     t = wait_queue_pop_head(wait);
361     if (t) {
362         wait->count--;
363         DEBUG_ASSERT(t->state == THREAD_BLOCKED);
364         t->blocked_status = wait_queue_error;
365         t->blocking_wait_queue = NULL;
366     }
367 
368     return t;
369 }
370 
371 /**
372  * @brief  Wake all threads sleeping on a wait queue
373  *
374  * This function removes all threads (if any) from the wait queue and
375  * makes them executable.  The new threads will be placed at the head of the
376  * run queue.
377  *
378  * @param wait  The wait queue to wake
379  * @param reschedule  If true, the newly-woken threads will run immediately.
380  * @param wait_queue_error  The return value which the new thread will receive
381  * from wait_queue_block().
382  *
383  * @return  The number of threads woken
384  */
wait_queue_wake_all(wait_queue_t * wait,bool reschedule,zx_status_t wait_queue_error)385 int wait_queue_wake_all(wait_queue_t* wait, bool reschedule, zx_status_t wait_queue_error) {
386     thread_t* t;
387     int ret = 0;
388 
389     DEBUG_ASSERT(wait->magic == WAIT_QUEUE_MAGIC);
390     DEBUG_ASSERT(arch_ints_disabled());
391     DEBUG_ASSERT(spin_lock_held(&thread_lock));
392 
393     if (WAIT_QUEUE_VALIDATION) {
394         wait_queue_validate_queue(wait);
395     }
396 
397     if (wait->count == 0) {
398         return 0;
399     }
400 
401     struct list_node list = LIST_INITIAL_VALUE(list);
402 
403     // pop all the threads off the wait queue into the run queue
404     // TODO: optimize with custom pop all routine
405     while ((t = wait_queue_pop_head(wait))) {
406         wait->count--;
407 
408         DEBUG_ASSERT(t->state == THREAD_BLOCKED);
409         t->blocked_status = wait_queue_error;
410         t->blocking_wait_queue = NULL;
411 
412         list_add_tail(&list, &t->queue_node);
413 
414         ret++;
415     }
416 
417     DEBUG_ASSERT(ret > 0);
418     DEBUG_ASSERT(wait->count == 0);
419 
420     ktrace_ptr(TAG_KWAIT_WAKE, wait, 0, 0);
421 
422     // wake up the new thread(s), putting it in a run queue on a cpu. reschedule if the local
423     // cpu run queue was modified
424     bool local_resched = sched_unblock_list(&list);
425     if (reschedule && local_resched) {
426         sched_reschedule();
427     }
428 
429     return ret;
430 }
431 
wait_queue_is_empty(wait_queue_t * wait)432 bool wait_queue_is_empty(wait_queue_t* wait) {
433     DEBUG_ASSERT(wait->magic == WAIT_QUEUE_MAGIC);
434     DEBUG_ASSERT(arch_ints_disabled());
435     DEBUG_ASSERT(spin_lock_held(&thread_lock));
436 
437     return wait->count == 0;
438 }
439 
440 /**
441  * @brief  Tear down a wait queue
442  *
443  * This panics if any threads were waiting on this queue, because that
444  * would indicate a race condition for most uses of wait queues.  If a
445  * thread is currently waiting, it could have been scheduled later, in
446  * which case it would have called wait_queue_block() on an invalid wait
447  * queue.
448  */
wait_queue_destroy(wait_queue_t * wait)449 void wait_queue_destroy(wait_queue_t* wait) {
450     DEBUG_ASSERT(wait->magic == WAIT_QUEUE_MAGIC);
451 
452     if (wait->count != 0) {
453         panic("wait_queue_destroy() called on non-empty wait_queue_t\n");
454     }
455 
456     wait->magic = 0;
457 }
458 
459 /**
460  * @brief  Wake a specific thread in a wait queue
461  *
462  * This function extracts a specific thread from a wait queue, wakes it,
463  * puts it at the head of the run queue, and does a reschedule if
464  * necessary.
465  *
466  * @param t  The thread to wake
467  * @param wait_queue_error  The return value which the new thread will receive from wait_queue_block().
468  *
469  * @return ZX_ERR_BAD_STATE if thread was not in any wait queue.
470  */
wait_queue_unblock_thread(thread_t * t,zx_status_t wait_queue_error)471 zx_status_t wait_queue_unblock_thread(thread_t* t, zx_status_t wait_queue_error) {
472     DEBUG_ASSERT(t->magic == THREAD_MAGIC);
473     DEBUG_ASSERT(arch_ints_disabled());
474     DEBUG_ASSERT(spin_lock_held(&thread_lock));
475 
476     if (t->state != THREAD_BLOCKED) {
477         return ZX_ERR_BAD_STATE;
478     }
479 
480     DEBUG_ASSERT(t->blocking_wait_queue != NULL);
481     DEBUG_ASSERT(t->blocking_wait_queue->magic == WAIT_QUEUE_MAGIC);
482     DEBUG_ASSERT(list_in_list(&t->queue_node));
483 
484     if (WAIT_QUEUE_VALIDATION) {
485         wait_queue_validate_queue(t->blocking_wait_queue);
486     }
487 
488     wait_queue_remove_thread(t);
489     t->blocking_wait_queue->count--;
490     t->blocking_wait_queue = NULL;
491     t->blocked_status = wait_queue_error;
492 
493     if (sched_unblock(t)) {
494         sched_reschedule();
495     }
496 
497     return ZX_OK;
498 }
499 
wait_queue_priority_changed(struct thread * t,int old_prio)500 void wait_queue_priority_changed(struct thread* t, int old_prio) {
501     DEBUG_ASSERT(t->magic == THREAD_MAGIC);
502     DEBUG_ASSERT(arch_ints_disabled());
503     DEBUG_ASSERT(spin_lock_held(&thread_lock));
504 
505     DEBUG_ASSERT(t->state == THREAD_BLOCKED);
506     DEBUG_ASSERT(t->blocking_wait_queue != NULL);
507     DEBUG_ASSERT(t->blocking_wait_queue->magic == WAIT_QUEUE_MAGIC);
508 
509     LTRACEF("%p %d -> %d\n", t, old_prio, t->effec_priority);
510 
511     // simple algorithm: remove the thread from the queue and add it back
512     // TODO: implement optimal algorithm depending on all the different edge
513     // cases of how the thread was previously queued and what priority its
514     // switching to.
515     wait_queue_remove_thread(t);
516     wait_queue_insert(t->blocking_wait_queue, t);
517 
518     // TODO: find a way to call into wrapper mutex object if present and
519     // have the holder inherit the new priority
520 
521     if (WAIT_QUEUE_VALIDATION) {
522         wait_queue_validate_queue(t->blocking_wait_queue);
523     }
524 }
525