1 // Copyright 2018 The Fuchsia Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef LIB_SYNC_INTERNAL_CONDITION_TEMPLATE_H_
6 #define LIB_SYNC_INTERNAL_CONDITION_TEMPLATE_H_
7 
8 #include <zircon/syscalls.h>
9 #include <lib/sync/completion.h>
10 #include <lib/sync/mutex.h>
11 
12 namespace condition_impl_internal {
13 
14 // A template implementation of a condition variable.
15 // The algorithm is borrowed from MUSL.
16 //
17 // The 'Condition' struct must contain the following fields:
18 //      int lock;
19 //      void* head;
20 //      void* tail;
21 //
22 // The following struct template must be specialized for the mutex type 'Mutex'
23 // in order to instantiate the template:
24 template <typename Mutex>
25 struct MutexOps {
26     // Return a pointer to the futex that backs the |mutex|
27     static zx_futex_t* get_futex(Mutex* mutex);
28 
29     // Lock the |mutex|. If an error occurs while locking the mutex,
30     // ZX_ERR_BAD_STATE must be returned. An implementation-defined
31     // error code can be returned via |mutex_lock_err| if it's not null.
32     static zx_status_t lock(Mutex* mutex, int* mutex_lock_err);
33 
34     // Similar to lock(), but also update the waiter information in the mutex.
35     // If the mutex implements waiter counting, then the count must be adjusted
36     // by |waiters_delta|. Otherwise, the mutex must be marked as potentially
37     // having waiters.
38     static zx_status_t lock_with_waiters(
39         Mutex* mutex, int waiters_delta, int* mutex_lock_err);
40 
41     // Unlock the mutex
42     static void unlock(Mutex* mutex);
43 };
44 
45 // Note that this library is used by libc, and as such needs to use
46 // '_zx_' function names for syscalls and not the regular 'zx_' names.
47 
spin()48 static inline void spin() {
49 #if defined(__x86_64__)
50     __asm__ __volatile__("pause"
51                          :
52                          :
53                          : "memory");
54 #elif defined(__aarch64__)
55     __atomic_thread_fence(__ATOMIC_SEQ_CST);
56 #else
57 #error Please define spin() for your architecture
58 #endif
59 }
60 
cas(int * ptr,int * expected,int desired)61 static inline bool cas(int* ptr, int* expected, int desired) {
62     return __atomic_compare_exchange_n(ptr, expected, desired, false,
63                                        __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
64 }
65 
wait(int * futex,int current_value)66 static inline void wait(int* futex, int current_value) {
67     int spins = 100;
68     while (spins--) {
69         if (__atomic_load_n(futex, __ATOMIC_SEQ_CST) == current_value) {
70             spin();
71         } else {
72             return;
73         }
74     }
75     while (__atomic_load_n(futex, __ATOMIC_SEQ_CST) == current_value) {
76         _zx_futex_wait(futex, current_value, ZX_HANDLE_INVALID, ZX_TIME_INFINITE);
77     }
78 }
79 
80 enum {
81     WAITING,
82     SIGNALED,
83     LEAVING,
84 };
85 
86 struct Waiter {
87     Waiter* prev = nullptr;
88     Waiter* next = nullptr;
89     int state = WAITING;
90     sync_completion_t ready;
91     int* notify = nullptr;
92 };
93 
94 // Return value:
95 //      - ZX_OK if the condition variable was signaled;
96 //      - ZX_ERR_TIMED_OUT if deadline was reached;
97 //      - ZX_ERR_BAD_STATE if there was an error locking the mutex.
98 //        In this case, |mutex_lock_err|, if not null, will be populated with an error code
99 //        provided by the mutex implementation.
100 template <typename Condition, typename Mutex>
timedwait(Condition * c,Mutex * mutex,zx_time_t deadline,int * mutex_lock_err)101 static inline zx_status_t timedwait(Condition* c, Mutex* mutex, zx_time_t deadline,
102                                     int* mutex_lock_err)
103     __TA_NO_THREAD_SAFETY_ANALYSIS {
104     sync_mutex_lock(reinterpret_cast<sync_mutex_t*>(&c->lock));
105 
106     Waiter node;
107 
108     // Add our waiter node onto the condition's list.  We add the node to the
109     // head of the list, but this is logically the end of the queue.
110     node.next = static_cast<Waiter*>(c->head);
111     c->head = &node;
112     if (!c->tail) {
113         c->tail = &node;
114     } else {
115         node.next->prev = &node;
116     }
117 
118     sync_mutex_unlock(reinterpret_cast<sync_mutex_t*>(&c->lock));
119 
120     MutexOps<Mutex>::unlock(mutex);
121 
122     // Wait to be signaled.  There are multiple ways this wait could finish:
123     //  1) After being woken by signal().
124     //  2) After being woken by a mutex unlock, after we were
125     //     requeued from the condition's futex to the mutex's futex (by
126     //     timedwait() in another thread).
127     //  3) After a timeout.
128     // In the original Linux version of this algorithm, this could also exit
129     // when interrupted by an asynchronous signal, but that does not apply on Zircon.
130     sync_completion_wait_deadline(&node.ready, deadline);
131 
132     int oldstate = WAITING;
133     if (cas(&node.state, &oldstate, LEAVING)) {
134         // The wait timed out.  So far, this thread was not signaled by
135         // signal() -- this thread was able to move state.node out of the
136         // WAITING state before any signal() call could do that.
137         //
138         // This thread must therefore remove the waiter node from the
139         // list itself.
140 
141         // Access to cv object is valid because this waiter was not
142         // yet signaled and a new signal/broadcast cannot return
143         // after seeing a LEAVING waiter without getting notified
144         // via the futex notify below.
145 
146         sync_mutex_lock(reinterpret_cast<sync_mutex_t*>(&c->lock));
147 
148         // Remove our waiter node from the list.
149         if (c->head == &node) {
150             c->head = node.next;
151         } else if (node.prev) {
152             node.prev->next = node.next;
153         }
154 
155         if (c->tail == &node) {
156             c->tail = node.prev;
157         } else if (node.next) {
158             node.next->prev = node.prev;
159         }
160 
161         sync_mutex_unlock(reinterpret_cast<sync_mutex_t*>(&c->lock));
162 
163         // It is possible that signal() saw our waiter node after we set
164         // node.state to LEAVING but before we removed the node from the
165         // list.  If so, it will have set node.notify and will be waiting
166         // on it, and we need to wake it up.
167         //
168         // This is rather complex.  An alternative would be to eliminate
169         // the |node.state| field and always claim |lock| if we could have
170         // got a timeout.  However, that presumably has higher overhead
171         // (since it contends |lock| and involves more atomic ops).
172         if (node.notify) {
173             if (__atomic_fetch_add(node.notify, -1, __ATOMIC_SEQ_CST) == 1) {
174                 _zx_futex_wake(node.notify, 1);
175             }
176         }
177 
178         // We don't need lock_with_waiters() here: we haven't been signaled, and will
179         // never be since we managed to claim the state as LEAVING. This means that
180         // we could not have been woken up by unlock_requeue() + mutex unlock().
181         if (MutexOps<Mutex>::lock(mutex, mutex_lock_err) != ZX_OK) {
182             return ZX_ERR_BAD_STATE;
183         }
184         return ZX_ERR_TIMED_OUT;
185     }
186 
187     // Since the CAS above failed, we have been signaled.
188     // It could still be the case that sync_completion_wait_deadline() above timed out,
189     // so we need to make sure to wait for the completion to control the wake order.
190     // If the completion has already been signaled, this will return immediately.
191     sync_completion_wait_deadline(&node.ready, ZX_TIME_INFINITE);
192 
193     // By this point, our part of the waiter list cannot change further.
194     // It has been unlinked from the condition by signal().
195     // Any timed out waiters would have removed themselves from the list
196     // before signal() signaled the first node.ready in our list.
197     //
198     // It is therefore safe now to read node.next and node.prev without
199     // holding c->lock.
200 
201     // As an optimization, we only update waiter count at the beginning and
202     // end of the signaled list.
203     int waiters_delta = 0;
204     if (!node.prev) {
205         waiters_delta++;
206     }
207     if (!node.next) {
208         waiters_delta--;
209     }
210 
211     // We must leave the mutex in the "locked with waiters" state here
212     // (or adjust its waiter count, depending on the implementation).
213     // There are two reasons for that:
214     //  1) If we do the unlock_requeue() below, a condition waiter will be
215     //     requeued to the mutex's futex.  We need to ensure that it will
216     //     be signaled by mutex unlock() in future.
217     //  2) If the current thread was woken via an unlock_requeue() +
218     //     mutex unlock, there *might* be another thread waiting for
219     //     the mutex after us in the queue.  We need to ensure that it
220     //     will be signaled by zxr_mutex_unlock() in future.
221     zx_status_t status = MutexOps<Mutex>::lock_with_waiters(mutex, waiters_delta, mutex_lock_err);
222 
223     if (node.prev) {
224         // Signal the completion that's holding back the next waiter, and
225         // requeue it to the mutex so that it will be woken when the
226         // mutex is unlocked.
227         sync_completion_signal_requeue(&node.prev->ready, MutexOps<Mutex>::get_futex(mutex));
228     }
229 
230     // Even if the first call to sync_completion_wait_deadline() timed out,
231     // we still have been signaled. Thus we still return ZX_OK rather than
232     // ZX_ERR_TIMED_OUT. This provides the following guarantee: if multiple
233     // threads are waiting when signal() is called, at least one waiting
234     // thread will be woken *and* get a ZX_OK from timedwait() (unless there
235     // is an error locking the mutex). This property does not appear to be
236     // required by pthread condvars, although an analogous property is
237     // required for futex wake-ups. We also require this property for
238     // sync_condition_t.
239     return status;
240 }
241 
242 // This will wake up to |n| threads that are waiting on the condition,
243 // or all waiting threads if |n| is set to -1
244 template <typename Condition>
signal(Condition * c,int n)245 static inline void signal(Condition* c, int n) {
246     Waiter* p;
247     Waiter* first = nullptr;
248     int ref = 0;
249     int cur;
250 
251     sync_mutex_lock(reinterpret_cast<sync_mutex_t*>(&c->lock));
252     for (p = static_cast<Waiter*>(c->tail); n && p; p = p->prev) {
253         int oldstate = WAITING;
254         if (!cas(&p->state, &oldstate, SIGNALED)) {
255             // This waiter timed out, and it marked itself as in the
256             // LEAVING state.  However, it hasn't yet claimed |lock|
257             // (since we claimed the lock first) and so it hasn't yet
258             // removed itself from the list.  We will wait for the waiter
259             // to remove itself from the list and to notify us of that.
260             __atomic_fetch_add(&ref, 1, __ATOMIC_SEQ_CST);
261             p->notify = &ref;
262         } else {
263             n--;
264             if (!first) {
265                 first = p;
266             }
267         }
268     }
269     // Split the list, leaving any remainder on the cv.
270     if (p) {
271         if (p->next) {
272             p->next->prev = 0;
273         }
274         p->next = 0;
275     } else {
276         c->head = 0;
277     }
278     c->tail = p;
279     sync_mutex_unlock(reinterpret_cast<sync_mutex_t*>(&c->lock));
280 
281     // Wait for any waiters in the LEAVING state to remove
282     // themselves from the list before returning or allowing
283     // signaled threads to proceed.
284     while ((cur = __atomic_load_n(&ref, __ATOMIC_SEQ_CST))) {
285         wait(&ref, cur);
286     }
287 
288     // Allow first signaled waiter, if any, to proceed.
289     if (first) {
290         sync_completion_signal(&first->ready);
291     }
292 }
293 
294 } // namespace condition_impl_internal
295 
296 #endif // LIB_SYNC_INTERNAL_CONDITION_TEMPLATE_H_
297