1 // Copyright 2018 The Fuchsia Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #ifndef LIB_SYNC_INTERNAL_CONDITION_TEMPLATE_H_
6 #define LIB_SYNC_INTERNAL_CONDITION_TEMPLATE_H_
7
8 #include <zircon/syscalls.h>
9 #include <lib/sync/completion.h>
10 #include <lib/sync/mutex.h>
11
12 namespace condition_impl_internal {
13
14 // A template implementation of a condition variable.
15 // The algorithm is borrowed from MUSL.
16 //
17 // The 'Condition' struct must contain the following fields:
18 // int lock;
19 // void* head;
20 // void* tail;
21 //
22 // The following struct template must be specialized for the mutex type 'Mutex'
23 // in order to instantiate the template:
24 template <typename Mutex>
25 struct MutexOps {
26 // Return a pointer to the futex that backs the |mutex|
27 static zx_futex_t* get_futex(Mutex* mutex);
28
29 // Lock the |mutex|. If an error occurs while locking the mutex,
30 // ZX_ERR_BAD_STATE must be returned. An implementation-defined
31 // error code can be returned via |mutex_lock_err| if it's not null.
32 static zx_status_t lock(Mutex* mutex, int* mutex_lock_err);
33
34 // Similar to lock(), but also update the waiter information in the mutex.
35 // If the mutex implements waiter counting, then the count must be adjusted
36 // by |waiters_delta|. Otherwise, the mutex must be marked as potentially
37 // having waiters.
38 static zx_status_t lock_with_waiters(
39 Mutex* mutex, int waiters_delta, int* mutex_lock_err);
40
41 // Unlock the mutex
42 static void unlock(Mutex* mutex);
43 };
44
45 // Note that this library is used by libc, and as such needs to use
46 // '_zx_' function names for syscalls and not the regular 'zx_' names.
47
spin()48 static inline void spin() {
49 #if defined(__x86_64__)
50 __asm__ __volatile__("pause"
51 :
52 :
53 : "memory");
54 #elif defined(__aarch64__)
55 __atomic_thread_fence(__ATOMIC_SEQ_CST);
56 #else
57 #error Please define spin() for your architecture
58 #endif
59 }
60
cas(int * ptr,int * expected,int desired)61 static inline bool cas(int* ptr, int* expected, int desired) {
62 return __atomic_compare_exchange_n(ptr, expected, desired, false,
63 __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
64 }
65
wait(int * futex,int current_value)66 static inline void wait(int* futex, int current_value) {
67 int spins = 100;
68 while (spins--) {
69 if (__atomic_load_n(futex, __ATOMIC_SEQ_CST) == current_value) {
70 spin();
71 } else {
72 return;
73 }
74 }
75 while (__atomic_load_n(futex, __ATOMIC_SEQ_CST) == current_value) {
76 _zx_futex_wait(futex, current_value, ZX_HANDLE_INVALID, ZX_TIME_INFINITE);
77 }
78 }
79
80 enum {
81 WAITING,
82 SIGNALED,
83 LEAVING,
84 };
85
86 struct Waiter {
87 Waiter* prev = nullptr;
88 Waiter* next = nullptr;
89 int state = WAITING;
90 sync_completion_t ready;
91 int* notify = nullptr;
92 };
93
94 // Return value:
95 // - ZX_OK if the condition variable was signaled;
96 // - ZX_ERR_TIMED_OUT if deadline was reached;
97 // - ZX_ERR_BAD_STATE if there was an error locking the mutex.
98 // In this case, |mutex_lock_err|, if not null, will be populated with an error code
99 // provided by the mutex implementation.
100 template <typename Condition, typename Mutex>
timedwait(Condition * c,Mutex * mutex,zx_time_t deadline,int * mutex_lock_err)101 static inline zx_status_t timedwait(Condition* c, Mutex* mutex, zx_time_t deadline,
102 int* mutex_lock_err)
103 __TA_NO_THREAD_SAFETY_ANALYSIS {
104 sync_mutex_lock(reinterpret_cast<sync_mutex_t*>(&c->lock));
105
106 Waiter node;
107
108 // Add our waiter node onto the condition's list. We add the node to the
109 // head of the list, but this is logically the end of the queue.
110 node.next = static_cast<Waiter*>(c->head);
111 c->head = &node;
112 if (!c->tail) {
113 c->tail = &node;
114 } else {
115 node.next->prev = &node;
116 }
117
118 sync_mutex_unlock(reinterpret_cast<sync_mutex_t*>(&c->lock));
119
120 MutexOps<Mutex>::unlock(mutex);
121
122 // Wait to be signaled. There are multiple ways this wait could finish:
123 // 1) After being woken by signal().
124 // 2) After being woken by a mutex unlock, after we were
125 // requeued from the condition's futex to the mutex's futex (by
126 // timedwait() in another thread).
127 // 3) After a timeout.
128 // In the original Linux version of this algorithm, this could also exit
129 // when interrupted by an asynchronous signal, but that does not apply on Zircon.
130 sync_completion_wait_deadline(&node.ready, deadline);
131
132 int oldstate = WAITING;
133 if (cas(&node.state, &oldstate, LEAVING)) {
134 // The wait timed out. So far, this thread was not signaled by
135 // signal() -- this thread was able to move state.node out of the
136 // WAITING state before any signal() call could do that.
137 //
138 // This thread must therefore remove the waiter node from the
139 // list itself.
140
141 // Access to cv object is valid because this waiter was not
142 // yet signaled and a new signal/broadcast cannot return
143 // after seeing a LEAVING waiter without getting notified
144 // via the futex notify below.
145
146 sync_mutex_lock(reinterpret_cast<sync_mutex_t*>(&c->lock));
147
148 // Remove our waiter node from the list.
149 if (c->head == &node) {
150 c->head = node.next;
151 } else if (node.prev) {
152 node.prev->next = node.next;
153 }
154
155 if (c->tail == &node) {
156 c->tail = node.prev;
157 } else if (node.next) {
158 node.next->prev = node.prev;
159 }
160
161 sync_mutex_unlock(reinterpret_cast<sync_mutex_t*>(&c->lock));
162
163 // It is possible that signal() saw our waiter node after we set
164 // node.state to LEAVING but before we removed the node from the
165 // list. If so, it will have set node.notify and will be waiting
166 // on it, and we need to wake it up.
167 //
168 // This is rather complex. An alternative would be to eliminate
169 // the |node.state| field and always claim |lock| if we could have
170 // got a timeout. However, that presumably has higher overhead
171 // (since it contends |lock| and involves more atomic ops).
172 if (node.notify) {
173 if (__atomic_fetch_add(node.notify, -1, __ATOMIC_SEQ_CST) == 1) {
174 _zx_futex_wake(node.notify, 1);
175 }
176 }
177
178 // We don't need lock_with_waiters() here: we haven't been signaled, and will
179 // never be since we managed to claim the state as LEAVING. This means that
180 // we could not have been woken up by unlock_requeue() + mutex unlock().
181 if (MutexOps<Mutex>::lock(mutex, mutex_lock_err) != ZX_OK) {
182 return ZX_ERR_BAD_STATE;
183 }
184 return ZX_ERR_TIMED_OUT;
185 }
186
187 // Since the CAS above failed, we have been signaled.
188 // It could still be the case that sync_completion_wait_deadline() above timed out,
189 // so we need to make sure to wait for the completion to control the wake order.
190 // If the completion has already been signaled, this will return immediately.
191 sync_completion_wait_deadline(&node.ready, ZX_TIME_INFINITE);
192
193 // By this point, our part of the waiter list cannot change further.
194 // It has been unlinked from the condition by signal().
195 // Any timed out waiters would have removed themselves from the list
196 // before signal() signaled the first node.ready in our list.
197 //
198 // It is therefore safe now to read node.next and node.prev without
199 // holding c->lock.
200
201 // As an optimization, we only update waiter count at the beginning and
202 // end of the signaled list.
203 int waiters_delta = 0;
204 if (!node.prev) {
205 waiters_delta++;
206 }
207 if (!node.next) {
208 waiters_delta--;
209 }
210
211 // We must leave the mutex in the "locked with waiters" state here
212 // (or adjust its waiter count, depending on the implementation).
213 // There are two reasons for that:
214 // 1) If we do the unlock_requeue() below, a condition waiter will be
215 // requeued to the mutex's futex. We need to ensure that it will
216 // be signaled by mutex unlock() in future.
217 // 2) If the current thread was woken via an unlock_requeue() +
218 // mutex unlock, there *might* be another thread waiting for
219 // the mutex after us in the queue. We need to ensure that it
220 // will be signaled by zxr_mutex_unlock() in future.
221 zx_status_t status = MutexOps<Mutex>::lock_with_waiters(mutex, waiters_delta, mutex_lock_err);
222
223 if (node.prev) {
224 // Signal the completion that's holding back the next waiter, and
225 // requeue it to the mutex so that it will be woken when the
226 // mutex is unlocked.
227 sync_completion_signal_requeue(&node.prev->ready, MutexOps<Mutex>::get_futex(mutex));
228 }
229
230 // Even if the first call to sync_completion_wait_deadline() timed out,
231 // we still have been signaled. Thus we still return ZX_OK rather than
232 // ZX_ERR_TIMED_OUT. This provides the following guarantee: if multiple
233 // threads are waiting when signal() is called, at least one waiting
234 // thread will be woken *and* get a ZX_OK from timedwait() (unless there
235 // is an error locking the mutex). This property does not appear to be
236 // required by pthread condvars, although an analogous property is
237 // required for futex wake-ups. We also require this property for
238 // sync_condition_t.
239 return status;
240 }
241
242 // This will wake up to |n| threads that are waiting on the condition,
243 // or all waiting threads if |n| is set to -1
244 template <typename Condition>
signal(Condition * c,int n)245 static inline void signal(Condition* c, int n) {
246 Waiter* p;
247 Waiter* first = nullptr;
248 int ref = 0;
249 int cur;
250
251 sync_mutex_lock(reinterpret_cast<sync_mutex_t*>(&c->lock));
252 for (p = static_cast<Waiter*>(c->tail); n && p; p = p->prev) {
253 int oldstate = WAITING;
254 if (!cas(&p->state, &oldstate, SIGNALED)) {
255 // This waiter timed out, and it marked itself as in the
256 // LEAVING state. However, it hasn't yet claimed |lock|
257 // (since we claimed the lock first) and so it hasn't yet
258 // removed itself from the list. We will wait for the waiter
259 // to remove itself from the list and to notify us of that.
260 __atomic_fetch_add(&ref, 1, __ATOMIC_SEQ_CST);
261 p->notify = &ref;
262 } else {
263 n--;
264 if (!first) {
265 first = p;
266 }
267 }
268 }
269 // Split the list, leaving any remainder on the cv.
270 if (p) {
271 if (p->next) {
272 p->next->prev = 0;
273 }
274 p->next = 0;
275 } else {
276 c->head = 0;
277 }
278 c->tail = p;
279 sync_mutex_unlock(reinterpret_cast<sync_mutex_t*>(&c->lock));
280
281 // Wait for any waiters in the LEAVING state to remove
282 // themselves from the list before returning or allowing
283 // signaled threads to proceed.
284 while ((cur = __atomic_load_n(&ref, __ATOMIC_SEQ_CST))) {
285 wait(&ref, cur);
286 }
287
288 // Allow first signaled waiter, if any, to proceed.
289 if (first) {
290 sync_completion_signal(&first->ready);
291 }
292 }
293
294 } // namespace condition_impl_internal
295
296 #endif // LIB_SYNC_INTERNAL_CONDITION_TEMPLATE_H_
297