1 // Copyright 2018 The Fuchsia Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <lib/async-testutils/test_loop_dispatcher.h>
6
7 #include <zircon/assert.h>
8 #include <zircon/status.h>
9 #include <zircon/syscalls.h>
10
11 #define TO_NODE(type, ptr) ((list_node_t*)&ptr->state)
12 #define FROM_NODE(type, ptr) ((type*)((char*)(ptr)-offsetof(type, state)))
13
14 namespace async {
15
16 namespace {
17
18 // The packet key used to signal timer expirations.
19 constexpr uint64_t kTimerExpirationKey = 0u;
20
21 // Convenience functions for task, wait, and list node management.
WaitToNode(async_wait_t * wait)22 inline list_node_t* WaitToNode(async_wait_t* wait) {
23 return TO_NODE(async_wait_t, wait);
24 }
25
NodeToWait(list_node_t * node)26 inline async_wait_t* NodeToWait(list_node_t* node) {
27 return FROM_NODE(async_wait_t, node);
28 }
29
TaskToNode(async_task_t * task)30 inline list_node_t* TaskToNode(async_task_t* task) {
31 return TO_NODE(async_task_t, task);
32 }
33
NodeToTask(list_node_t * node)34 inline async_task_t* NodeToTask(list_node_t* node) {
35 return FROM_NODE(async_task_t, node);
36 }
37
InsertTask(list_node_t * task_list,async_task_t * task)38 inline void InsertTask(list_node_t* task_list, async_task_t* task) {
39 list_node_t* node;
40 for (node = task_list->prev; node != task_list; node = node->prev) {
41 if (task->deadline >= NodeToTask(node)->deadline) {
42 break;
43 }
44 }
45 list_add_after(node, TaskToNode(task));
46 }
47 } // namespace
48
TestLoopDispatcher(TimeKeeper * time_keeper)49 TestLoopDispatcher::TestLoopDispatcher(TimeKeeper* time_keeper)
50 : time_keeper_(time_keeper) {
51 ZX_DEBUG_ASSERT(time_keeper_);
52 list_initialize(&wait_list_);
53 list_initialize(&task_list_);
54 list_initialize(&due_list_);
55 zx_status_t status = zx::port::create(0u, &port_);
56 ZX_ASSERT_MSG(status == ZX_OK,
57 "zx_port_create: %s",
58 zx_status_get_string(status));
59 }
60
~TestLoopDispatcher()61 TestLoopDispatcher::~TestLoopDispatcher() {
62 Shutdown();
63 time_keeper_->CancelTimers(this);
64 };
65
Now()66 zx::time TestLoopDispatcher::Now() { return time_keeper_->Now(); }
67
68 // TODO(ZX-2390): Return ZX_ERR_CANCELED if dispatcher is shutting down.
BeginWait(async_wait_t * wait)69 zx_status_t TestLoopDispatcher::BeginWait(async_wait_t* wait) {
70 ZX_DEBUG_ASSERT(wait);
71
72 // Along with the above assertion, the following check guarantees that the
73 // packet to be sent to |port_| on completion of this wait will not be
74 // mistaken for a timer expiration.
75 static_assert(0u == kTimerExpirationKey,
76 "Timer expirations must be signaled with a packet key of 0");
77
78 list_add_head(&wait_list_, WaitToNode(wait));
79 zx_status_t status = zx_object_wait_async(wait->object, port_.get(),
80 reinterpret_cast<uintptr_t>(wait),
81 wait->trigger,
82 ZX_WAIT_ASYNC_ONCE);
83
84 if (status != ZX_OK) {
85 // In this rare condition, the wait failed. Since a dispatched handler will
86 // never be invoked on the wait object, we remove it ourselves.
87 list_delete(WaitToNode(wait));
88 }
89 return status;
90 }
91
CancelWait(async_wait_t * wait)92 zx_status_t TestLoopDispatcher::CancelWait(async_wait_t* wait) {
93 ZX_DEBUG_ASSERT(wait);
94
95 list_node_t* node = WaitToNode(wait);
96 if (!list_in_list(node)) {
97 return ZX_ERR_NOT_FOUND;
98 }
99
100 // |wait| already might be encoded in |due_packet_|.
101 if (due_packet_ && due_packet_->key != kTimerExpirationKey) {
102 if (wait == reinterpret_cast<async_wait_t*>(due_packet_->key)) {
103 due_packet_.reset();
104 list_delete(node);
105 return ZX_OK;
106 }
107 }
108
109 zx_status_t status = port_.cancel(*zx::unowned_handle(wait->object),
110 reinterpret_cast<uintptr_t>(wait));
111 if (status == ZX_OK) {
112 list_delete(node);
113 }
114 return status;
115 }
116
117 // TODO(ZX-2390): Return ZX_ERR_CANCELED if dispatcher is shutting down.
PostTask(async_task_t * task)118 zx_status_t TestLoopDispatcher::PostTask(async_task_t* task) {
119 ZX_DEBUG_ASSERT(task);
120
121 InsertTask(&task_list_, task);
122 if (NodeToTask(list_peek_head(&task_list_)) == task) {
123 time_keeper_->RegisterTimer(GetNextTaskDueTime(), this);
124 }
125 return ZX_OK;
126 }
127
CancelTask(async_task_t * task)128 zx_status_t TestLoopDispatcher::CancelTask(async_task_t* task) {
129 ZX_DEBUG_ASSERT(task);
130 list_node_t* node = TaskToNode(task);
131 if (!list_in_list(node)) {
132 return ZX_ERR_NOT_FOUND;
133 }
134 list_delete(node);
135 return ZX_OK;
136 }
137
FireTimer()138 void TestLoopDispatcher::FireTimer() {
139 zx_port_packet_t timer_packet{};
140 timer_packet.key = kTimerExpirationKey;
141 timer_packet.type = ZX_PKT_TYPE_USER;
142 zx_status_t status = port_.queue(&timer_packet);
143 ZX_ASSERT_MSG(status == ZX_OK,
144 "zx_port_queue: %s",
145 zx_status_get_string(status));
146 }
147
GetNextTaskDueTime()148 zx::time TestLoopDispatcher::GetNextTaskDueTime() {
149 list_node_t* node = list_is_empty(&due_list_) ?
150 list_peek_head(&task_list_) :
151 list_peek_head(&due_list_);
152 if (!node) {
153 return zx::time::infinite();
154 }
155 return zx::time(NodeToTask(node)->deadline);
156 }
157
158
ExtractNextDuePacket()159 void TestLoopDispatcher::ExtractNextDuePacket() {
160 ZX_DEBUG_ASSERT(!due_packet_);
161 bool tasks_are_due = GetNextTaskDueTime() <= Now();
162
163 // If no tasks are due, flush all timer expiration packets until either
164 // there are no more packets to dequeue or a wait packet is reached.
165 do {
166 auto packet = fbl::make_unique<zx_port_packet_t>();
167 if (ZX_OK != port_.wait(zx::time(0), packet.get())) { return; }
168 due_packet_.swap(packet);
169 } while (!tasks_are_due && due_packet_->key == kTimerExpirationKey);
170 }
171
HasPendingWork()172 bool TestLoopDispatcher::HasPendingWork() {
173 if (GetNextTaskDueTime() <= Now()) { return true; }
174 if (!due_packet_) { ExtractNextDuePacket(); }
175 return !!due_packet_;
176 }
177
DispatchNextDueTask()178 void TestLoopDispatcher::DispatchNextDueTask() {
179 // if something is already in the due list, dispatch that.
180 list_node_t* node = list_peek_head(&due_list_);
181 if (node) {
182 list_delete(node);
183 async_task_t* task = NodeToTask(node);
184 task->handler(this, task, ZX_OK);
185
186 // If the due list is now empty and there are still pending tasks,
187 // register a timer for the next due time.
188 if (list_is_empty(&due_list_) && !list_is_empty(&task_list_)) {
189 time_keeper_->RegisterTimer(GetNextTaskDueTime(), this);
190 }
191 }
192 }
193
DispatchNextDueMessage()194 bool TestLoopDispatcher::DispatchNextDueMessage() {
195 if (!list_is_empty(&due_list_)) {
196 DispatchNextDueTask();
197 return true;
198 }
199
200 if (!due_packet_) { ExtractNextDuePacket(); }
201
202 if (!due_packet_) {
203 return false;
204 } else if (due_packet_->key == kTimerExpirationKey) {
205 ExtractDueTasks();
206 DispatchNextDueTask();
207 due_packet_.reset();
208 } else { // |due_packet_| encodes a finished wait.
209 // Move the next due packet to the stack, as invoking the associated
210 // wait's handler might try to extract another.
211 zx_port_packet_t packet = *due_packet_;
212 due_packet_.reset();
213 async_wait_t* wait = reinterpret_cast<async_wait_t*>(packet.key);
214 list_delete(WaitToNode(wait));
215 wait->handler(this, wait, ZX_OK, &packet.signal);
216 }
217 return true;
218 }
219
ExtractDueTasks()220 void TestLoopDispatcher::ExtractDueTasks() {
221 list_node_t* node;
222 list_node_t* tail = nullptr;
223 zx::time current_time = time_keeper_->Now();
224 list_for_every(&task_list_, node) {
225 if (NodeToTask(node)->deadline > current_time.get()) { break; }
226 tail = node;
227 }
228 if (tail) {
229 list_node_t* head = task_list_.next;
230 task_list_.next = tail->next;
231 tail->next->prev = &task_list_;
232 due_list_.next = head;
233 head->prev = &due_list_;
234 due_list_.prev = tail;
235 tail->next = &due_list_;
236 }
237 }
238
Shutdown()239 void TestLoopDispatcher::Shutdown() {
240 list_node_t* node;
241 while ((node = list_remove_head(&wait_list_))) {
242 async_wait_t* wait = NodeToWait(node);
243 wait->handler(this, wait, ZX_ERR_CANCELED, nullptr);
244 }
245 while ((node = list_remove_head(&due_list_))) {
246 async_task_t* task = NodeToTask(node);
247 task->handler(this, task, ZX_ERR_CANCELED);
248 }
249 while ((node = list_remove_head(&task_list_))) {
250 async_task_t* task = NodeToTask(node);
251 task->handler(this, task, ZX_ERR_CANCELED);
252 }
253 }
254
255 } // namespace async
256