1 // Copyright 2018 The Fuchsia Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 // Can't compile this for Zircon userspace yet since libstdc++ isn't available.
6 #ifndef FIT_NO_STD_FOR_ZIRCON_USERSPACE
7 
8 #include <condition_variable>
9 #include <mutex>
10 
11 #include <lib/fit/single_threaded_executor.h>
12 #include <lib/fit/thread_safety.h>
13 
14 namespace fit {
15 
16 // The dispatcher runs tasks and provides the suspended task resolver.
17 //
18 // The lifetime of this object is somewhat complex since there are pointers
19 // to it from multiple sources which are released in different ways.
20 //
21 // - |single_threaded_executor| holds a pointer in |dispatcher_| which it releases
22 //   after calling |shutdown()| to inform the dispatcher of its own demise
23 // - |suspended_task| holds a pointer to the dispatcher's resolver
24 //   interface and the number of outstanding pointers corresponds to the
25 //   number of outstanding suspended task tickets tracked by |scheduler_|.
26 //
27 // The dispatcher deletes itself once all pointers have been released.
28 class single_threaded_executor::dispatcher_impl final
29     : public suspended_task::resolver {
30 public:
31     dispatcher_impl();
32 
33     void shutdown();
34     void schedule_task(pending_task task);
35     void run(context_impl& context);
36     suspended_task suspend_current_task();
37 
38     suspended_task::ticket duplicate_ticket(
39         suspended_task::ticket ticket) override;
40     void resolve_ticket(
41         suspended_task::ticket ticket, bool resume_task) override;
42 
43 private:
44     ~dispatcher_impl() override;
45 
46     void wait_for_runnable_tasks(
47         fit::subtle::scheduler::task_queue* out_tasks);
48     void run_task(pending_task* task, context& context);
49 
50     suspended_task::ticket current_task_ticket_ = 0;
51     std::condition_variable wake_;
52 
53     // A bunch of state that is guarded by a mutex.
54     struct {
55         std::mutex mutex_;
56         bool was_shutdown_ FIT_GUARDED(mutex_) = false;
57         bool need_wake_ FIT_GUARDED(mutex_) = false;
58         fit::subtle::scheduler scheduler_ FIT_GUARDED(mutex_);
59     } guarded_;
60 };
61 
single_threaded_executor()62 single_threaded_executor::single_threaded_executor()
63     : context_(this), dispatcher_(new dispatcher_impl()) {}
64 
~single_threaded_executor()65 single_threaded_executor::~single_threaded_executor() {
66     dispatcher_->shutdown();
67 }
68 
schedule_task(pending_task task)69 void single_threaded_executor::schedule_task(pending_task task) {
70     assert(task);
71     dispatcher_->schedule_task(std::move(task));
72 }
73 
run()74 void single_threaded_executor::run() {
75     dispatcher_->run(context_);
76 }
77 
context_impl(single_threaded_executor * executor)78 single_threaded_executor::context_impl::context_impl(single_threaded_executor* executor)
79     : executor_(executor) {}
80 
81 single_threaded_executor::context_impl::~context_impl() = default;
82 
executor() const83 single_threaded_executor* single_threaded_executor::context_impl::executor() const {
84     return executor_;
85 }
86 
suspend_task()87 suspended_task single_threaded_executor::context_impl::suspend_task() {
88     return executor_->dispatcher_->suspend_current_task();
89 }
90 
91 single_threaded_executor::dispatcher_impl::dispatcher_impl() = default;
92 
~dispatcher_impl()93 single_threaded_executor::dispatcher_impl::~dispatcher_impl() {
94     std::lock_guard<std::mutex> lock(guarded_.mutex_);
95     assert(guarded_.was_shutdown_);
96     assert(!guarded_.scheduler_.has_runnable_tasks());
97     assert(!guarded_.scheduler_.has_suspended_tasks());
98     assert(!guarded_.scheduler_.has_outstanding_tickets());
99 }
100 
shutdown()101 void single_threaded_executor::dispatcher_impl::shutdown() {
102     fit::subtle::scheduler::task_queue tasks; // drop outside of the lock
103     {
104         std::lock_guard<std::mutex> lock(guarded_.mutex_);
105         assert(!guarded_.was_shutdown_);
106         guarded_.was_shutdown_ = true;
107         guarded_.scheduler_.take_all_tasks(&tasks);
108         if (guarded_.scheduler_.has_outstanding_tickets()) {
109             return; // can't delete self yet
110         }
111     }
112 
113     // Must destroy self outside of the lock.
114     delete this;
115 }
116 
schedule_task(pending_task task)117 void single_threaded_executor::dispatcher_impl::schedule_task(pending_task task) {
118     {
119         std::lock_guard<std::mutex> lock(guarded_.mutex_);
120         assert(!guarded_.was_shutdown_);
121         guarded_.scheduler_.schedule_task(std::move(task));
122         if (!guarded_.need_wake_) {
123             return; // don't need to wake
124         }
125         guarded_.need_wake_ = false;
126     }
127 
128     // It is more efficient to notify outside the lock.
129     wake_.notify_one();
130 }
131 
run(context_impl & context)132 void single_threaded_executor::dispatcher_impl::run(context_impl& context) {
133     fit::subtle::scheduler::task_queue tasks;
134     for (;;) {
135         wait_for_runnable_tasks(&tasks);
136         if (tasks.empty()) {
137             return; // all done!
138         }
139 
140         do {
141             run_task(&tasks.front(), context);
142             tasks.pop(); // the task may be destroyed here if it was not suspended
143         } while (!tasks.empty());
144     }
145 }
146 
147 // Must only be called while |run_task()| is running a task.
148 // This happens when the task's continuation calls |context::suspend_task()|
149 // upon the context it received as an argument.
suspend_current_task()150 suspended_task single_threaded_executor::dispatcher_impl::suspend_current_task() {
151     std::lock_guard<std::mutex> lock(guarded_.mutex_);
152     assert(!guarded_.was_shutdown_);
153     if (current_task_ticket_ == 0) {
154         current_task_ticket_ = guarded_.scheduler_.obtain_ticket(
155             2 /*initial_refs*/);
156     } else {
157         guarded_.scheduler_.duplicate_ticket(current_task_ticket_);
158     }
159     return suspended_task(this, current_task_ticket_);
160 }
161 
162 // Unfortunately std::unique_lock does not support thread-safety annotations
wait_for_runnable_tasks(fit::subtle::scheduler::task_queue * out_tasks)163 void single_threaded_executor::dispatcher_impl::wait_for_runnable_tasks(
164     fit::subtle::scheduler::task_queue* out_tasks) FIT_NO_THREAD_SAFETY_ANALYSIS {
165     std::unique_lock<std::mutex> lock(guarded_.mutex_);
166     for (;;) {
167         assert(!guarded_.was_shutdown_);
168         guarded_.scheduler_.take_runnable_tasks(out_tasks);
169         if (!out_tasks->empty()) {
170             return; // got some tasks
171         }
172         if (!guarded_.scheduler_.has_suspended_tasks()) {
173             return; // all done!
174         }
175         guarded_.need_wake_ = true;
176         wake_.wait(lock);
177         guarded_.need_wake_ = false;
178     }
179 }
180 
run_task(pending_task * task,context & context)181 void single_threaded_executor::dispatcher_impl::run_task(pending_task* task,
182                                                          context& context) {
183     assert(current_task_ticket_ == 0);
184     const bool finished = (*task)(context);
185     assert(!*task == finished);
186     (void)finished;
187     if (current_task_ticket_ == 0) {
188         return; // task was not suspended, no ticket was produced
189     }
190 
191     std::lock_guard<std::mutex> lock(guarded_.mutex_);
192     assert(!guarded_.was_shutdown_);
193     guarded_.scheduler_.finalize_ticket(current_task_ticket_, task);
194     current_task_ticket_ = 0;
195 }
196 
duplicate_ticket(suspended_task::ticket ticket)197 suspended_task::ticket single_threaded_executor::dispatcher_impl::duplicate_ticket(
198     suspended_task::ticket ticket) {
199     std::lock_guard<std::mutex> lock(guarded_.mutex_);
200     guarded_.scheduler_.duplicate_ticket(ticket);
201     return ticket;
202 }
203 
resolve_ticket(suspended_task::ticket ticket,bool resume_task)204 void single_threaded_executor::dispatcher_impl::resolve_ticket(
205     suspended_task::ticket ticket, bool resume_task) {
206     pending_task abandoned_task; // drop outside of the lock
207     bool do_wake = false;
208     {
209         std::lock_guard<std::mutex> lock(guarded_.mutex_);
210         if (resume_task) {
211             guarded_.scheduler_.resume_task_with_ticket(ticket);
212         } else {
213             abandoned_task = guarded_.scheduler_.release_ticket(ticket);
214         }
215         if (guarded_.was_shutdown_) {
216             assert(!guarded_.need_wake_);
217             if (guarded_.scheduler_.has_outstanding_tickets()) {
218                 return; // can't shutdown yet
219             }
220         } else if (guarded_.need_wake_ &&
221                    (guarded_.scheduler_.has_runnable_tasks() ||
222                     !guarded_.scheduler_.has_suspended_tasks())) {
223             guarded_.need_wake_ = false;
224             do_wake = true;
225         } else {
226             return; // nothing else to do
227         }
228     }
229 
230     // Must do this outside of the lock.
231     if (do_wake) {
232         wake_.notify_one();
233     } else {
234         delete this;
235     }
236 }
237 
238 } // namespace fit
239 
240 #endif // FIT_NO_STD_FOR_ZIRCON_USERSPACE
241