1 // Copyright 2018 The Fuchsia Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include <unistd.h>
6 
7 #include <thread>
8 
9 #include <lib/fit/bridge.h>
10 #include <lib/fit/defer.h>
11 #include <lib/fit/scope.h>
12 #include <lib/fit/single_threaded_executor.h>
13 #include <unittest/unittest.h>
14 
15 #include "unittest_utils.h"
16 
17 namespace {
18 
19 class fake_context : public fit::context {
20 public:
executor() const21     fit::executor* executor() const override {
22         ASSERT_CRITICAL(false);
23     }
suspend_task()24     fit::suspended_task suspend_task() override {
25         ASSERT_CRITICAL(false);
26     }
27 };
28 
29 // Asynchronously accumulates a sum.
30 // This is an example of an object that offers promises that captures
31 // the "this" pointer, thereby needing a scope to prevent dangling pointers
32 // in case it is destroyed before the promises complete.
33 class accumulator {
34 public:
35     // Adds a value to the counter then returns it.
36     // Takes time proportional to the value being added.
add(uint32_t value)37     fit::promise<uint32_t> add(uint32_t value) {
38         return fit::make_promise(
39                    [this, cycles = value](fit::context& context) mutable
40                    -> fit::result<uint32_t> {
41                        if (cycles == 0)
42                            return fit::ok(counter_);
43                        counter_++;
44                        cycles--;
45                        context.suspend_task().resume_task();
46                        return fit::pending();
47                    })
48             .wrap_with(scope_);
49     }
50 
51     // Gets the current count, immediately.
count() const52     uint32_t count() const { return counter_; }
53 
54 private:
55     fit::scope scope_;
56     uint32_t counter_ = 0;
57 };
58 
scoping_tasks()59 bool scoping_tasks() {
60     BEGIN_TEST;
61 
62     auto acc = std::make_unique<accumulator>();
63     fit::single_threaded_executor executor;
64     uint32_t sums[4] = {};
65 
66     // Schedule some tasks which accumulate values asynchronously.
67     executor.schedule_task(acc->add(2).and_then(
68         [&](uint32_t value) { sums[0] = value; }));
69     executor.schedule_task(acc->add(1).and_then(
70         [&](uint32_t value) { sums[1] = value; }));
71     executor.schedule_task(acc->add(5).and_then(
72         [&](uint32_t value) { sums[2] = value; }));
73 
74     // Schedule a task which accumulates and then destroys the accumulator
75     // so that the scope is exited.  Any remaining promises will be aborted.
76     uint32_t last_count = 0;
77     executor.schedule_task(acc->add(3).and_then(
78         [&](uint32_t value) {
79             sums[3] = value;
80             // Schedule destruction in another task to avoid re-entrance.
81             executor.schedule_task(fit::make_promise([&] {
82                 last_count = acc->count();
83                 acc.reset();
84             }));
85         }));
86 
87     // Run the tasks.
88     executor.run();
89 
90     // The counts reflect the fact that the scope is exited part-way through
91     // the cycle.  For example, the sums[2] task doesn't get to run since
92     // it only runs after 5 cycles and the scope is exited on the third.
93     EXPECT_EQ(11, last_count);
94     EXPECT_EQ(7, sums[0]);
95     EXPECT_EQ(5, sums[1]);
96     EXPECT_EQ(0, sums[2]);
97     EXPECT_EQ(10, sums[3]);
98 
99     END_TEST;
100 }
101 
exit_destroys_wrapped_promises()102 bool exit_destroys_wrapped_promises() {
103     BEGIN_TEST;
104 
105     fit::scope scope;
106     EXPECT_FALSE(scope.exited());
107 
108     // Set up three wrapped promises.
109     bool destroyed[4] = {};
110     auto p0 = scope.wrap(fit::make_promise(
111         [d = fit::defer([&] { destroyed[0] = true; })] { return fit::ok(); }));
112     auto p1 = scope.wrap(fit::make_promise(
113         [d = fit::defer([&] { destroyed[1] = true; })] { return fit::ok(); }));
114     auto p2 = scope.wrap(fit::make_promise(
115         [d = fit::defer([&] { destroyed[2] = true; })] { return fit::ok(); }));
116     EXPECT_FALSE(destroyed[0]);
117     EXPECT_FALSE(destroyed[1]);
118     EXPECT_FALSE(destroyed[2]);
119 
120     // Execute one of them to completion, causing it to be destroyed.
121     EXPECT_TRUE(fit::run_single_threaded(std::move(p1)).is_ok());
122     EXPECT_FALSE(destroyed[0]);
123     EXPECT_TRUE(destroyed[1]);
124     EXPECT_FALSE(destroyed[2]);
125 
126     // Exit the scope, causing the wrapped promise to be destroyed
127     // while still leaving the wrapper alive (but aborted).
128     scope.exit();
129     EXPECT_TRUE(scope.exited());
130     EXPECT_TRUE(destroyed[0]);
131     EXPECT_TRUE(destroyed[1]);
132     EXPECT_TRUE(destroyed[2]);
133 
134     // Wrapping another promise causes the wrapped promise to be immediately
135     // destroyed.
136     auto p3 = scope.wrap(fit::make_promise(
137         [d = fit::defer([&] { destroyed[3] = true; })] { return fit::ok(); }));
138     EXPECT_TRUE(destroyed[3]);
139 
140     // Executing the wrapped promises returns pending.
141     EXPECT_TRUE(fit::run_single_threaded(std::move(p0)).is_pending());
142     EXPECT_TRUE(fit::run_single_threaded(std::move(p2)).is_pending());
143     EXPECT_TRUE(fit::run_single_threaded(std::move(p3)).is_pending());
144 
145     // Exiting again has no effect.
146     scope.exit();
147     EXPECT_TRUE(scope.exited());
148 
149     END_TEST;
150 }
151 
double_wrap()152 bool double_wrap() {
153     BEGIN_TEST;
154 
155     fit::scope scope;
156     fake_context context;
157 
158     // Here we wrap a task that's already been wrapped to see what happens
159     // when the scope is exited.  This is interesting because it means that
160     // the destruction of one wrapped promise will cause the destruction of
161     // another wrapped promise and could uncover re-entrance issues.
162     uint32_t run_count = 0;
163     bool destroyed = false;
164     auto promise =
165         fit::make_promise(
166             [&, d = fit::defer([&] { destroyed = true; })](fit::context& context) {
167                 run_count++;
168                 return fit::pending();
169             })
170             .wrap_with(scope)
171             .wrap_with(scope); // wrap again!
172 
173     // Run the promise once to show that we can.
174     EXPECT_EQ(fit::result_state::pending, promise(context).state());
175     EXPECT_EQ(1, run_count);
176     EXPECT_FALSE(destroyed);
177 
178     // Now exit the scope, which should cause the promise to be destroyed.
179     scope.exit();
180     EXPECT_EQ(1, run_count);
181     EXPECT_TRUE(destroyed);
182 
183     // Running the promise again should do nothing.
184     EXPECT_EQ(fit::result_state::pending, promise(context).state());
185     EXPECT_EQ(1, run_count);
186     EXPECT_TRUE(destroyed);
187 
188     END_TEST;
189 }
190 
thread_safety()191 bool thread_safety() {
192     BEGIN_TEST;
193 
194     fit::scope scope;
195     fit::single_threaded_executor executor;
196     uint64_t run_count = 0;
197 
198     // Schedule work from a few threads, just to show that we can.
199     // Part way through, exit the scope.
200     constexpr int num_threads = 4;
201     constexpr int num_tasks_per_thread = 100;
202     constexpr int exit_threshold = 75;
203     std::thread threads[num_threads];
204     for (int i = 0; i < num_threads; i++) {
205         fit::bridge bridge;
206         threads[i] =
207             std::thread([&, completer = std::move(bridge.completer)]() mutable {
208                 for (int j = 0; j < num_tasks_per_thread; j++) {
209                     if (j == exit_threshold) {
210                         executor.schedule_task(fit::make_promise([&] {
211                             scope.exit();
212                         }));
213                     }
214 
215                     executor.schedule_task(
216                         fit::make_promise([&] {
217                             run_count++;
218                         }).wrap_with(scope));
219                 }
220                 completer.complete_ok();
221             });
222         executor.schedule_task(bridge.consumer.promise());
223     }
224 
225     // Run the tasks.
226     executor.run();
227     for (int i = 0; i < num_threads; i++)
228         threads[i].join();
229 
230     // We expect some non-deterministic number of tasks to have run
231     // related to the exit threshold.
232     // We scheduled num_threads * num_tasks_per_thread tasks, but on each thread
233     // we exited the (common) scope after scheduling its first exit_threshold
234     // tasks.  Once one of those threads exits the scope, no more tasks
235     // (scheduled by any thread) will run within the scope, so the number of
236     // executed tasks cannot increase any further.  Therefore we know that at
237     // least exit_threshold tasks have run but we could have run as many as
238     // num_threads * exit_threshold in a perfect world where all of the threads
239     // called scope.exit() at the same time.
240     EXPECT_GE(run_count, exit_threshold);
241     EXPECT_LE(run_count, num_threads * exit_threshold);
242 
243     END_TEST;
244 }
245 
246 } // namespace
247 
248 BEGIN_TEST_CASE(scope_tests)
249 RUN_TEST(scoping_tasks)
250 RUN_TEST(exit_destroys_wrapped_promises)
251 RUN_TEST(double_wrap)
252 RUN_TEST(thread_safety)
253 END_TEST_CASE(scope_tests)
254