1 // Copyright 2018 The Fuchsia Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 // Can't compile this for Zircon userspace yet since libstdc++ isn't available.
6 #ifndef FIT_NO_STD_FOR_ZIRCON_USERSPACE
7
8 #include <condition_variable>
9 #include <mutex>
10
11 #include <lib/fit/single_threaded_executor.h>
12 #include <lib/fit/thread_safety.h>
13
14 namespace fit {
15
16 // The dispatcher runs tasks and provides the suspended task resolver.
17 //
18 // The lifetime of this object is somewhat complex since there are pointers
19 // to it from multiple sources which are released in different ways.
20 //
21 // - |single_threaded_executor| holds a pointer in |dispatcher_| which it releases
22 // after calling |shutdown()| to inform the dispatcher of its own demise
23 // - |suspended_task| holds a pointer to the dispatcher's resolver
24 // interface and the number of outstanding pointers corresponds to the
25 // number of outstanding suspended task tickets tracked by |scheduler_|.
26 //
27 // The dispatcher deletes itself once all pointers have been released.
28 class single_threaded_executor::dispatcher_impl final
29 : public suspended_task::resolver {
30 public:
31 dispatcher_impl();
32
33 void shutdown();
34 void schedule_task(pending_task task);
35 void run(context_impl& context);
36 suspended_task suspend_current_task();
37
38 suspended_task::ticket duplicate_ticket(
39 suspended_task::ticket ticket) override;
40 void resolve_ticket(
41 suspended_task::ticket ticket, bool resume_task) override;
42
43 private:
44 ~dispatcher_impl() override;
45
46 void wait_for_runnable_tasks(
47 fit::subtle::scheduler::task_queue* out_tasks);
48 void run_task(pending_task* task, context& context);
49
50 suspended_task::ticket current_task_ticket_ = 0;
51 std::condition_variable wake_;
52
53 // A bunch of state that is guarded by a mutex.
54 struct {
55 std::mutex mutex_;
56 bool was_shutdown_ FIT_GUARDED(mutex_) = false;
57 bool need_wake_ FIT_GUARDED(mutex_) = false;
58 fit::subtle::scheduler scheduler_ FIT_GUARDED(mutex_);
59 } guarded_;
60 };
61
single_threaded_executor()62 single_threaded_executor::single_threaded_executor()
63 : context_(this), dispatcher_(new dispatcher_impl()) {}
64
~single_threaded_executor()65 single_threaded_executor::~single_threaded_executor() {
66 dispatcher_->shutdown();
67 }
68
schedule_task(pending_task task)69 void single_threaded_executor::schedule_task(pending_task task) {
70 assert(task);
71 dispatcher_->schedule_task(std::move(task));
72 }
73
run()74 void single_threaded_executor::run() {
75 dispatcher_->run(context_);
76 }
77
context_impl(single_threaded_executor * executor)78 single_threaded_executor::context_impl::context_impl(single_threaded_executor* executor)
79 : executor_(executor) {}
80
81 single_threaded_executor::context_impl::~context_impl() = default;
82
executor() const83 single_threaded_executor* single_threaded_executor::context_impl::executor() const {
84 return executor_;
85 }
86
suspend_task()87 suspended_task single_threaded_executor::context_impl::suspend_task() {
88 return executor_->dispatcher_->suspend_current_task();
89 }
90
91 single_threaded_executor::dispatcher_impl::dispatcher_impl() = default;
92
~dispatcher_impl()93 single_threaded_executor::dispatcher_impl::~dispatcher_impl() {
94 std::lock_guard<std::mutex> lock(guarded_.mutex_);
95 assert(guarded_.was_shutdown_);
96 assert(!guarded_.scheduler_.has_runnable_tasks());
97 assert(!guarded_.scheduler_.has_suspended_tasks());
98 assert(!guarded_.scheduler_.has_outstanding_tickets());
99 }
100
shutdown()101 void single_threaded_executor::dispatcher_impl::shutdown() {
102 fit::subtle::scheduler::task_queue tasks; // drop outside of the lock
103 {
104 std::lock_guard<std::mutex> lock(guarded_.mutex_);
105 assert(!guarded_.was_shutdown_);
106 guarded_.was_shutdown_ = true;
107 guarded_.scheduler_.take_all_tasks(&tasks);
108 if (guarded_.scheduler_.has_outstanding_tickets()) {
109 return; // can't delete self yet
110 }
111 }
112
113 // Must destroy self outside of the lock.
114 delete this;
115 }
116
schedule_task(pending_task task)117 void single_threaded_executor::dispatcher_impl::schedule_task(pending_task task) {
118 {
119 std::lock_guard<std::mutex> lock(guarded_.mutex_);
120 assert(!guarded_.was_shutdown_);
121 guarded_.scheduler_.schedule_task(std::move(task));
122 if (!guarded_.need_wake_) {
123 return; // don't need to wake
124 }
125 guarded_.need_wake_ = false;
126 }
127
128 // It is more efficient to notify outside the lock.
129 wake_.notify_one();
130 }
131
run(context_impl & context)132 void single_threaded_executor::dispatcher_impl::run(context_impl& context) {
133 fit::subtle::scheduler::task_queue tasks;
134 for (;;) {
135 wait_for_runnable_tasks(&tasks);
136 if (tasks.empty()) {
137 return; // all done!
138 }
139
140 do {
141 run_task(&tasks.front(), context);
142 tasks.pop(); // the task may be destroyed here if it was not suspended
143 } while (!tasks.empty());
144 }
145 }
146
147 // Must only be called while |run_task()| is running a task.
148 // This happens when the task's continuation calls |context::suspend_task()|
149 // upon the context it received as an argument.
suspend_current_task()150 suspended_task single_threaded_executor::dispatcher_impl::suspend_current_task() {
151 std::lock_guard<std::mutex> lock(guarded_.mutex_);
152 assert(!guarded_.was_shutdown_);
153 if (current_task_ticket_ == 0) {
154 current_task_ticket_ = guarded_.scheduler_.obtain_ticket(
155 2 /*initial_refs*/);
156 } else {
157 guarded_.scheduler_.duplicate_ticket(current_task_ticket_);
158 }
159 return suspended_task(this, current_task_ticket_);
160 }
161
162 // Unfortunately std::unique_lock does not support thread-safety annotations
wait_for_runnable_tasks(fit::subtle::scheduler::task_queue * out_tasks)163 void single_threaded_executor::dispatcher_impl::wait_for_runnable_tasks(
164 fit::subtle::scheduler::task_queue* out_tasks) FIT_NO_THREAD_SAFETY_ANALYSIS {
165 std::unique_lock<std::mutex> lock(guarded_.mutex_);
166 for (;;) {
167 assert(!guarded_.was_shutdown_);
168 guarded_.scheduler_.take_runnable_tasks(out_tasks);
169 if (!out_tasks->empty()) {
170 return; // got some tasks
171 }
172 if (!guarded_.scheduler_.has_suspended_tasks()) {
173 return; // all done!
174 }
175 guarded_.need_wake_ = true;
176 wake_.wait(lock);
177 guarded_.need_wake_ = false;
178 }
179 }
180
run_task(pending_task * task,context & context)181 void single_threaded_executor::dispatcher_impl::run_task(pending_task* task,
182 context& context) {
183 assert(current_task_ticket_ == 0);
184 const bool finished = (*task)(context);
185 assert(!*task == finished);
186 (void)finished;
187 if (current_task_ticket_ == 0) {
188 return; // task was not suspended, no ticket was produced
189 }
190
191 std::lock_guard<std::mutex> lock(guarded_.mutex_);
192 assert(!guarded_.was_shutdown_);
193 guarded_.scheduler_.finalize_ticket(current_task_ticket_, task);
194 current_task_ticket_ = 0;
195 }
196
duplicate_ticket(suspended_task::ticket ticket)197 suspended_task::ticket single_threaded_executor::dispatcher_impl::duplicate_ticket(
198 suspended_task::ticket ticket) {
199 std::lock_guard<std::mutex> lock(guarded_.mutex_);
200 guarded_.scheduler_.duplicate_ticket(ticket);
201 return ticket;
202 }
203
resolve_ticket(suspended_task::ticket ticket,bool resume_task)204 void single_threaded_executor::dispatcher_impl::resolve_ticket(
205 suspended_task::ticket ticket, bool resume_task) {
206 pending_task abandoned_task; // drop outside of the lock
207 bool do_wake = false;
208 {
209 std::lock_guard<std::mutex> lock(guarded_.mutex_);
210 if (resume_task) {
211 guarded_.scheduler_.resume_task_with_ticket(ticket);
212 } else {
213 abandoned_task = guarded_.scheduler_.release_ticket(ticket);
214 }
215 if (guarded_.was_shutdown_) {
216 assert(!guarded_.need_wake_);
217 if (guarded_.scheduler_.has_outstanding_tickets()) {
218 return; // can't shutdown yet
219 }
220 } else if (guarded_.need_wake_ &&
221 (guarded_.scheduler_.has_runnable_tasks() ||
222 !guarded_.scheduler_.has_suspended_tasks())) {
223 guarded_.need_wake_ = false;
224 do_wake = true;
225 } else {
226 return; // nothing else to do
227 }
228 }
229
230 // Must do this outside of the lock.
231 if (do_wake) {
232 wake_.notify_one();
233 } else {
234 delete this;
235 }
236 }
237
238 } // namespace fit
239
240 #endif // FIT_NO_STD_FOR_ZIRCON_USERSPACE
241