1 // Copyright 2018 The Fuchsia Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include <unistd.h>
6 
7 #include <string>
8 #include <thread>
9 
10 #include <lib/fit/bridge.h>
11 #include <lib/fit/sequencer.h>
12 #include <lib/fit/single_threaded_executor.h>
13 #include <unittest/unittest.h>
14 
15 namespace {
16 
sequencing_tasks()17 bool sequencing_tasks() {
18     BEGIN_TEST;
19 
20     fit::sequencer seq;
21     std::string str;
22 
23     // This promise writes ":a" sequentially then writes ":a2" later.
24     auto a = fit::make_promise([&] { str += ":a"; })
25                  .wrap_with(seq)
26                  .then([&](fit::result<>) { str += ":a2"; });
27 
28     // This promise writes ":b" sequentially then writes ":b2" and ":b3" later.
29     // Also schedules another sequential task that writes ":e".
30     auto b = fit::make_promise([&](fit::context& context) {
31                  str += ":b";
32                  context.executor()->schedule_task(
33                      fit::make_promise([&] { str += ":e"; })
34                          .wrap_with(seq));
35              })
36                  .wrap_with(seq)
37                  .then([&, count = 0](fit::context& context, fit::result<>) mutable
38                        -> fit::result<> {
39                      if (++count == 5) {
40                          str += ":b3";
41                          return fit::error();
42                      }
43                      str += ":b2";
44                      context.suspend_task().resume_task(); // immediately resume
45                      return fit::pending();
46                  });
47 
48     // This promise writes ":c" sequentially then abandons itself.
49     auto c = fit::make_promise([&](fit::context& context) {
50                  str += ":c";
51                  context.suspend_task(); // abandon result
52                  return fit::pending();
53              })
54                  .wrap_with(seq)
55                  .then([&](fit::result<>) { str += ":c2"; });
56 
57     // This promise writes ":d" sequentially.
58     auto d = fit::make_promise([&] { str += ":d"; })
59                  .wrap_with(seq);
60 
61     // These promises just write ":z1" and ":z2" whenever they happen to run.
62     auto z1 = fit::make_promise([&] { str += ":z1"; });
63     auto z2 = fit::make_promise([&] { str += ":z2"; });
64 
65     // Schedule the promises in an order which intentionally does not
66     // match the sequencing.
67     fit::single_threaded_executor executor;
68     executor.schedule_task(std::move(z1));
69     executor.schedule_task(std::move(b));
70     executor.schedule_task(std::move(c));
71     executor.schedule_task(std::move(a));
72     executor.schedule_task(std::move(d));
73     executor.schedule_task(std::move(z2));
74     executor.run();
75 
76     // Evaluate the promises and check the execution order.
77     EXPECT_STR_EQ(":z1:a:a2:z2:b:b2:c:b2:d:b2:e:b2:b3", str.c_str());
78 
79     END_TEST;
80 }
81 
thread_safety()82 bool thread_safety() {
83     BEGIN_TEST;
84 
85     fit::sequencer seq;
86     fit::single_threaded_executor executor;
87     uint64_t run_count = 0;
88 
89     // Schedule work from a few threads, just to show that we can.
90     constexpr int num_threads = 4;
91     constexpr int num_tasks_per_thread = 100;
92     std::thread threads[num_threads];
93     for (int i = 0; i < num_threads; i++) {
94         fit::bridge bridge;
95         threads[i] =
96             std::thread([&, completer = std::move(bridge.completer)]() mutable {
97                 for (int j = 0; j < num_tasks_per_thread; j++) {
98                     executor.schedule_task(
99                         fit::make_promise([&] { run_count++; }).wrap_with(seq));
100                     usleep(1);
101                 }
102                 completer.complete_ok();
103             });
104         executor.schedule_task(bridge.consumer.promise());
105     }
106 
107     // Run the tasks.
108     executor.run();
109     for (int i = 0; i < num_threads; i++)
110         threads[i].join();
111 
112     // We expect all tasks to have run.
113     EXPECT_EQ(num_threads * num_tasks_per_thread, run_count);
114 
115     END_TEST;
116 }
117 
118 } // namespace
119 
120 BEGIN_TEST_CASE(sequencer_tests)
121 RUN_TEST(sequencing_tasks)
122 RUN_TEST(thread_safety)
123 END_TEST_CASE(sequencer_tests)
124