1 // Copyright 2016 The Fuchsia Authors
2 // Copyright (c) 2008-2015 Travis Geiselbrecht
3 //
4 // Use of this source code is governed by a MIT-style
5 // license that can be found in the LICENSE file or at
6 // https://opensource.org/licenses/MIT
7
8 #include <kernel/wait.h>
9
10 #include <err.h>
11 #include <kernel/sched.h>
12 #include <kernel/thread.h>
13 #include <kernel/timer.h>
14 #include <lib/ktrace.h>
15 #include <platform.h>
16 #include <trace.h>
17
18 #define LOCAL_TRACE 0
19
20 // add expensive code to do a full validation of the wait queue at various entry points
21 // to this module.
22 #define WAIT_QUEUE_VALIDATION (0 || (LK_DEBUGLEVEL > 2))
23
24 // Wait queues are building blocks that other locking primitives use to
25 // handle blocking threads.
26 //
27 // Implemented as a simple structure that contains a count of the number of threads
28 // blocked and a list of thread_ts acting as individual queue heads, one per priority.
29
30 // +----------------+
31 // | |
32 // | wait_queue_t |
33 // | |
34 // +-------+--------+
35 // |
36 // |
37 // +-----v-------+ +-------------+ +-------------+
38 // | +----> +---> |
39 // | thread_t | | thread_t | | thread_t |
40 // | pri 31 | | pri 17 | | pri 8 |
41 // | <----+ <---+ |
42 // +---+----^----+ +-------------+ +----+---^----+
43 // | | | |
44 // +---v----+----+ +----v---+----+
45 // | | | |
46 // | thread_t | | thread_t |
47 // | pri 31 | | pri 8 |
48 // | | | |
49 // +---+----^----+ +-------------+
50 // | |
51 // +---v----+----+
52 // | |
53 // | thread_t |
54 // | pri 31 |
55 // | |
56 // +-------------+
57
wait_queue_init(wait_queue_t * wait)58 void wait_queue_init(wait_queue_t* wait) {
59 *wait = (wait_queue_t)WAIT_QUEUE_INITIAL_VALUE(*wait);
60 }
61
wait_queue_validate_queue(wait_queue_t * wait)62 void wait_queue_validate_queue(wait_queue_t* wait) {
63 DEBUG_ASSERT(wait->magic == WAIT_QUEUE_MAGIC);
64 DEBUG_ASSERT(arch_ints_disabled());
65 DEBUG_ASSERT(spin_lock_held(&thread_lock));
66
67 // validate that the queue is sorted properly
68 thread_t* last = NULL;
69 thread_t* temp;
70 list_for_every_entry (&wait->heads, temp, thread_t, wait_queue_heads_node) {
71 DEBUG_ASSERT(temp->magic == THREAD_MAGIC);
72
73 // validate that the queue is sorted high to low priority
74 if (last) {
75 DEBUG_ASSERT_MSG(last->effec_priority > temp->effec_priority,
76 "%p:%d %p:%d",
77 last, last->effec_priority,
78 temp, temp->effec_priority);
79 }
80
81 // walk any threads linked to this head, validating that they're the same priority
82 thread_t* temp2;
83 list_for_every_entry (&temp->queue_node, temp2, thread_t, queue_node) {
84 DEBUG_ASSERT(temp2->magic == THREAD_MAGIC);
85 DEBUG_ASSERT_MSG(temp->effec_priority == temp2->effec_priority,
86 "%p:%d %p:%d",
87 temp, temp->effec_priority,
88 temp2, temp2->effec_priority);
89 }
90
91 last = temp;
92 }
93 }
94
95 // add a thread to the tail of a wait queue, sorted by priority
wait_queue_insert(wait_queue_t * wait,thread_t * t)96 static void wait_queue_insert(wait_queue_t* wait, thread_t* t) {
97 if (likely(list_is_empty(&wait->heads))) {
98 // we're the first thread
99 list_initialize(&t->queue_node);
100 list_add_head(&wait->heads, &t->wait_queue_heads_node);
101 } else {
102 int pri = t->effec_priority;
103
104 // walk through the sorted list of wait queue heads
105 thread_t* temp;
106 list_for_every_entry (&wait->heads, temp, thread_t, wait_queue_heads_node) {
107 if (pri > temp->effec_priority) {
108 // insert ourself here as a new queue head
109 list_initialize(&t->queue_node);
110 list_add_before(&temp->wait_queue_heads_node, &t->wait_queue_heads_node);
111 return;
112 } else if (temp->effec_priority == pri) {
113 // same priority, add ourself to the tail of this queue
114 list_add_tail(&temp->queue_node, &t->queue_node);
115 list_clear_node(&t->wait_queue_heads_node);
116 return;
117 }
118 }
119
120 // we walked off the end, add ourself as a new queue head at the end
121 list_initialize(&t->queue_node);
122 list_add_tail(&wait->heads, &t->wait_queue_heads_node);
123 }
124 }
125
126 // remove a thread from whatever wait queue its in
127 // thread must be the head of a queue
remove_queue_head(thread_t * t)128 static void remove_queue_head(thread_t* t) {
129 // are there any nodes in the queue for this priority?
130 if (list_is_empty(&t->queue_node)) {
131 // no, remove ourself from the queue list
132 list_delete(&t->wait_queue_heads_node);
133 list_clear_node(&t->queue_node);
134 } else {
135 // there are other threads in this list, make the next thread in the queue the head
136 thread_t* newhead = list_peek_head_type(&t->queue_node, thread_t, queue_node);
137 list_delete(&t->queue_node);
138
139 // patch in the new head into the queue head list
140 list_replace_node(&t->wait_queue_heads_node, &newhead->wait_queue_heads_node);
141 }
142 }
143
144 // remove the head of the highest priority queue
wait_queue_pop_head(wait_queue_t * wait)145 static thread_t* wait_queue_pop_head(wait_queue_t* wait) {
146 thread_t* t = NULL;
147
148 t = list_peek_head_type(&wait->heads, thread_t, wait_queue_heads_node);
149 if (!t) {
150 return NULL;
151 }
152
153 remove_queue_head(t);
154
155 return t;
156 }
157
158 // remove the thread from whatever wait queue its in
wait_queue_remove_thread(thread_t * t)159 static void wait_queue_remove_thread(thread_t* t) {
160 if (!list_in_list(&t->wait_queue_heads_node)) {
161 // we're just in a queue, not a head
162 list_delete(&t->queue_node);
163 } else {
164 // we're the head of a queue
165 remove_queue_head(t);
166 }
167 }
168
169 // return the numeric priority of the highest priority thread queued
wait_queue_blocked_priority(wait_queue_t * wait)170 int wait_queue_blocked_priority(wait_queue_t* wait) {
171 thread_t* t = list_peek_head_type(&wait->heads, thread_t, wait_queue_heads_node);
172 if (!t) {
173 return -1;
174 }
175
176 return t->effec_priority;
177 }
178
179 // returns a reference to the highest priority thread queued
wait_queue_peek(wait_queue_t * wait)180 thread_t* wait_queue_peek(wait_queue_t* wait) {
181 return list_peek_head_type(&wait->heads, thread_t, wait_queue_heads_node);
182 }
183
wait_queue_timeout_handler(timer_t * timer,zx_time_t now,void * arg)184 static void wait_queue_timeout_handler(timer_t* timer, zx_time_t now,
185 void* arg) {
186 thread_t* thread = (thread_t*)arg;
187
188 DEBUG_ASSERT(thread->magic == THREAD_MAGIC);
189
190 // spin trylocking on the thread lock since the routine that set up the callback,
191 // wait_queue_block, may be trying to simultaneously cancel this timer while holding the
192 // thread_lock.
193 if (timer_trylock_or_cancel(timer, &thread_lock)) {
194 return;
195 }
196
197 wait_queue_unblock_thread(thread, ZX_ERR_TIMED_OUT);
198
199 spin_unlock(&thread_lock);
200 }
201
202 /**
203 * @brief Block until a wait queue is notified, ignoring existing signals
204 * in |signal_mask|.
205 *
206 * This function puts the current thread at the end of a wait
207 * queue and then blocks until some other thread wakes the queue
208 * up again.
209 *
210 * @param wait The wait queue to enter
211 * @param deadline The time at which to abort the wait
212 * @param slack The amount of time it is acceptable to deviate from deadline
213 * @param signal_mask Mask of existing signals to ignore
214 *
215 * If the deadline is zero, this function returns immediately with
216 * ZX_ERR_TIMED_OUT. If the deadline is ZX_TIME_INFINITE, this function
217 * waits indefinitely. Otherwise, this function returns with
218 * ZX_ERR_TIMED_OUT when the slack-adjusted deadline elapses.
219 *
220 * @return ZX_ERR_TIMED_OUT on timeout, else returns the return
221 * value specified when the queue was woken by wait_queue_wake_one().
222 */
wait_queue_block_etc(wait_queue_t * wait,zx_time_t deadline,TimerSlack slack,uint signal_mask)223 zx_status_t wait_queue_block_etc(wait_queue_t* wait,
224 zx_time_t deadline,
225 TimerSlack slack,
226 uint signal_mask) TA_REQ(thread_lock) {
227 timer_t timer;
228
229 thread_t* current_thread = get_current_thread();
230
231 DEBUG_ASSERT(wait->magic == WAIT_QUEUE_MAGIC);
232 DEBUG_ASSERT(current_thread->state == THREAD_RUNNING);
233 DEBUG_ASSERT(arch_ints_disabled());
234 DEBUG_ASSERT(spin_lock_held(&thread_lock));
235
236 if (WAIT_QUEUE_VALIDATION) {
237 wait_queue_validate_queue(wait);
238 }
239
240 if (deadline != ZX_TIME_INFINITE && deadline <= current_time()) {
241 return ZX_ERR_TIMED_OUT;
242 }
243
244 if (current_thread->interruptable &&
245 (unlikely(current_thread->signals & ~signal_mask))) {
246 if (current_thread->signals & THREAD_SIGNAL_KILL) {
247 return ZX_ERR_INTERNAL_INTR_KILLED;
248 } else if (current_thread->signals & THREAD_SIGNAL_SUSPEND) {
249 return ZX_ERR_INTERNAL_INTR_RETRY;
250 }
251 }
252
253 wait_queue_insert(wait, current_thread);
254 wait->count++;
255 current_thread->state = THREAD_BLOCKED;
256 current_thread->blocking_wait_queue = wait;
257 current_thread->blocked_status = ZX_OK;
258
259 // if the deadline is nonzero or noninfinite, set a callback to yank us out of the queue
260 if (deadline != ZX_TIME_INFINITE) {
261 timer_init(&timer);
262 timer_set(&timer, deadline, slack, wait_queue_timeout_handler, (void*)current_thread);
263 }
264
265 ktrace_ptr(TAG_KWAIT_BLOCK, wait, 0, 0);
266
267 sched_block();
268
269 ktrace_ptr(TAG_KWAIT_UNBLOCK, wait, current_thread->blocked_status, 0);
270
271 // we don't really know if the timer fired or not, so it's better safe to try to cancel it
272 if (deadline != ZX_TIME_INFINITE) {
273 timer_cancel(&timer);
274 }
275
276 return current_thread->blocked_status;
277 }
278
279 /**
280 * @brief Block until a wait queue is notified.
281 *
282 * This function puts the current thread at the end of a wait
283 * queue and then blocks until some other thread wakes the queue
284 * up again.
285 *
286 * @param wait The wait queue to enter
287 * @param deadline The time at which to abort the wait
288 *
289 * If the deadline is zero, this function returns immediately with
290 * ZX_ERR_TIMED_OUT. If the deadline is ZX_TIME_INFINITE, this function
291 * waits indefinitely. Otherwise, this function returns with
292 * ZX_ERR_TIMED_OUT when the deadline occurs.
293 *
294 * @return ZX_ERR_TIMED_OUT on timeout, else returns the return
295 * value specified when the queue was woken by wait_queue_wake_one().
296 */
wait_queue_block(wait_queue_t * wait,zx_time_t deadline)297 zx_status_t wait_queue_block(wait_queue_t* wait, zx_time_t deadline) {
298 return wait_queue_block_etc(wait, deadline, kNoSlack, 0);
299 }
300
301 /**
302 * @brief Wake up one thread sleeping on a wait queue
303 *
304 * This function removes one thread (if any) from the head of the wait queue and
305 * makes it executable. The new thread will be placed at the head of the
306 * run queue.
307 *
308 * @param wait The wait queue to wake
309 * @param reschedule If true, the newly-woken thread will run immediately.
310 * @param wait_queue_error The return value which the new thread will receive
311 * from wait_queue_block().
312 *
313 * @return The number of threads woken (zero or one)
314 */
wait_queue_wake_one(wait_queue_t * wait,bool reschedule,zx_status_t wait_queue_error)315 int wait_queue_wake_one(wait_queue_t* wait, bool reschedule, zx_status_t wait_queue_error) {
316 thread_t* t;
317 int ret = 0;
318
319 DEBUG_ASSERT(wait->magic == WAIT_QUEUE_MAGIC);
320 DEBUG_ASSERT(arch_ints_disabled());
321 DEBUG_ASSERT(spin_lock_held(&thread_lock));
322
323 if (WAIT_QUEUE_VALIDATION) {
324 wait_queue_validate_queue(wait);
325 }
326
327 t = wait_queue_pop_head(wait);
328 if (t) {
329 wait->count--;
330 DEBUG_ASSERT(t->state == THREAD_BLOCKED);
331 t->blocked_status = wait_queue_error;
332 t->blocking_wait_queue = NULL;
333
334 ktrace_ptr(TAG_KWAIT_WAKE, wait, 0, 0);
335
336 // wake up the new thread, putting it in a run queue on a cpu. reschedule if the local
337 // cpu run queue was modified
338 bool local_resched = sched_unblock(t);
339 if (reschedule && local_resched) {
340 sched_reschedule();
341 }
342
343 ret = 1;
344 }
345
346 return ret;
347 }
348
wait_queue_dequeue_one(wait_queue_t * wait,zx_status_t wait_queue_error)349 thread_t* wait_queue_dequeue_one(wait_queue_t* wait, zx_status_t wait_queue_error) {
350 thread_t* t;
351
352 if (WAIT_QUEUE_VALIDATION) {
353 wait_queue_validate_queue(wait);
354 }
355
356 DEBUG_ASSERT(wait->magic == WAIT_QUEUE_MAGIC);
357 DEBUG_ASSERT(arch_ints_disabled());
358 DEBUG_ASSERT(spin_lock_held(&thread_lock));
359
360 t = wait_queue_pop_head(wait);
361 if (t) {
362 wait->count--;
363 DEBUG_ASSERT(t->state == THREAD_BLOCKED);
364 t->blocked_status = wait_queue_error;
365 t->blocking_wait_queue = NULL;
366 }
367
368 return t;
369 }
370
371 /**
372 * @brief Wake all threads sleeping on a wait queue
373 *
374 * This function removes all threads (if any) from the wait queue and
375 * makes them executable. The new threads will be placed at the head of the
376 * run queue.
377 *
378 * @param wait The wait queue to wake
379 * @param reschedule If true, the newly-woken threads will run immediately.
380 * @param wait_queue_error The return value which the new thread will receive
381 * from wait_queue_block().
382 *
383 * @return The number of threads woken
384 */
wait_queue_wake_all(wait_queue_t * wait,bool reschedule,zx_status_t wait_queue_error)385 int wait_queue_wake_all(wait_queue_t* wait, bool reschedule, zx_status_t wait_queue_error) {
386 thread_t* t;
387 int ret = 0;
388
389 DEBUG_ASSERT(wait->magic == WAIT_QUEUE_MAGIC);
390 DEBUG_ASSERT(arch_ints_disabled());
391 DEBUG_ASSERT(spin_lock_held(&thread_lock));
392
393 if (WAIT_QUEUE_VALIDATION) {
394 wait_queue_validate_queue(wait);
395 }
396
397 if (wait->count == 0) {
398 return 0;
399 }
400
401 struct list_node list = LIST_INITIAL_VALUE(list);
402
403 // pop all the threads off the wait queue into the run queue
404 // TODO: optimize with custom pop all routine
405 while ((t = wait_queue_pop_head(wait))) {
406 wait->count--;
407
408 DEBUG_ASSERT(t->state == THREAD_BLOCKED);
409 t->blocked_status = wait_queue_error;
410 t->blocking_wait_queue = NULL;
411
412 list_add_tail(&list, &t->queue_node);
413
414 ret++;
415 }
416
417 DEBUG_ASSERT(ret > 0);
418 DEBUG_ASSERT(wait->count == 0);
419
420 ktrace_ptr(TAG_KWAIT_WAKE, wait, 0, 0);
421
422 // wake up the new thread(s), putting it in a run queue on a cpu. reschedule if the local
423 // cpu run queue was modified
424 bool local_resched = sched_unblock_list(&list);
425 if (reschedule && local_resched) {
426 sched_reschedule();
427 }
428
429 return ret;
430 }
431
wait_queue_is_empty(wait_queue_t * wait)432 bool wait_queue_is_empty(wait_queue_t* wait) {
433 DEBUG_ASSERT(wait->magic == WAIT_QUEUE_MAGIC);
434 DEBUG_ASSERT(arch_ints_disabled());
435 DEBUG_ASSERT(spin_lock_held(&thread_lock));
436
437 return wait->count == 0;
438 }
439
440 /**
441 * @brief Tear down a wait queue
442 *
443 * This panics if any threads were waiting on this queue, because that
444 * would indicate a race condition for most uses of wait queues. If a
445 * thread is currently waiting, it could have been scheduled later, in
446 * which case it would have called wait_queue_block() on an invalid wait
447 * queue.
448 */
wait_queue_destroy(wait_queue_t * wait)449 void wait_queue_destroy(wait_queue_t* wait) {
450 DEBUG_ASSERT(wait->magic == WAIT_QUEUE_MAGIC);
451
452 if (wait->count != 0) {
453 panic("wait_queue_destroy() called on non-empty wait_queue_t\n");
454 }
455
456 wait->magic = 0;
457 }
458
459 /**
460 * @brief Wake a specific thread in a wait queue
461 *
462 * This function extracts a specific thread from a wait queue, wakes it,
463 * puts it at the head of the run queue, and does a reschedule if
464 * necessary.
465 *
466 * @param t The thread to wake
467 * @param wait_queue_error The return value which the new thread will receive from wait_queue_block().
468 *
469 * @return ZX_ERR_BAD_STATE if thread was not in any wait queue.
470 */
wait_queue_unblock_thread(thread_t * t,zx_status_t wait_queue_error)471 zx_status_t wait_queue_unblock_thread(thread_t* t, zx_status_t wait_queue_error) {
472 DEBUG_ASSERT(t->magic == THREAD_MAGIC);
473 DEBUG_ASSERT(arch_ints_disabled());
474 DEBUG_ASSERT(spin_lock_held(&thread_lock));
475
476 if (t->state != THREAD_BLOCKED) {
477 return ZX_ERR_BAD_STATE;
478 }
479
480 DEBUG_ASSERT(t->blocking_wait_queue != NULL);
481 DEBUG_ASSERT(t->blocking_wait_queue->magic == WAIT_QUEUE_MAGIC);
482 DEBUG_ASSERT(list_in_list(&t->queue_node));
483
484 if (WAIT_QUEUE_VALIDATION) {
485 wait_queue_validate_queue(t->blocking_wait_queue);
486 }
487
488 wait_queue_remove_thread(t);
489 t->blocking_wait_queue->count--;
490 t->blocking_wait_queue = NULL;
491 t->blocked_status = wait_queue_error;
492
493 if (sched_unblock(t)) {
494 sched_reschedule();
495 }
496
497 return ZX_OK;
498 }
499
wait_queue_priority_changed(struct thread * t,int old_prio)500 void wait_queue_priority_changed(struct thread* t, int old_prio) {
501 DEBUG_ASSERT(t->magic == THREAD_MAGIC);
502 DEBUG_ASSERT(arch_ints_disabled());
503 DEBUG_ASSERT(spin_lock_held(&thread_lock));
504
505 DEBUG_ASSERT(t->state == THREAD_BLOCKED);
506 DEBUG_ASSERT(t->blocking_wait_queue != NULL);
507 DEBUG_ASSERT(t->blocking_wait_queue->magic == WAIT_QUEUE_MAGIC);
508
509 LTRACEF("%p %d -> %d\n", t, old_prio, t->effec_priority);
510
511 // simple algorithm: remove the thread from the queue and add it back
512 // TODO: implement optimal algorithm depending on all the different edge
513 // cases of how the thread was previously queued and what priority its
514 // switching to.
515 wait_queue_remove_thread(t);
516 wait_queue_insert(t->blocking_wait_queue, t);
517
518 // TODO: find a way to call into wrapper mutex object if present and
519 // have the holder inherit the new priority
520
521 if (WAIT_QUEUE_VALIDATION) {
522 wait_queue_validate_queue(t->blocking_wait_queue);
523 }
524 }
525