1 // Copyright 2018 The Fuchsia Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include <lib/async-testutils/test_loop_dispatcher.h>
6 
7 #include <zircon/assert.h>
8 #include <zircon/status.h>
9 #include <zircon/syscalls.h>
10 
11 #define TO_NODE(type, ptr) ((list_node_t*)&ptr->state)
12 #define FROM_NODE(type, ptr) ((type*)((char*)(ptr)-offsetof(type, state)))
13 
14 namespace async {
15 
16 namespace {
17 
18 // The packet key used to signal timer expirations.
19 constexpr uint64_t kTimerExpirationKey = 0u;
20 
21 // Convenience functions for task, wait, and list node management.
WaitToNode(async_wait_t * wait)22 inline list_node_t* WaitToNode(async_wait_t* wait) {
23     return TO_NODE(async_wait_t, wait);
24 }
25 
NodeToWait(list_node_t * node)26 inline async_wait_t* NodeToWait(list_node_t* node) {
27     return FROM_NODE(async_wait_t, node);
28 }
29 
TaskToNode(async_task_t * task)30 inline list_node_t* TaskToNode(async_task_t* task) {
31     return TO_NODE(async_task_t, task);
32 }
33 
NodeToTask(list_node_t * node)34 inline async_task_t* NodeToTask(list_node_t* node) {
35     return FROM_NODE(async_task_t, node);
36 }
37 
InsertTask(list_node_t * task_list,async_task_t * task)38 inline void InsertTask(list_node_t* task_list, async_task_t* task) {
39     list_node_t* node;
40     for (node = task_list->prev; node != task_list; node = node->prev) {
41         if (task->deadline >= NodeToTask(node)->deadline) {
42             break;
43         }
44     }
45     list_add_after(node, TaskToNode(task));
46 }
47 } // namespace
48 
TestLoopDispatcher(TimeKeeper * time_keeper)49 TestLoopDispatcher::TestLoopDispatcher(TimeKeeper* time_keeper)
50     : time_keeper_(time_keeper) {
51     ZX_DEBUG_ASSERT(time_keeper_);
52     list_initialize(&wait_list_);
53     list_initialize(&task_list_);
54     list_initialize(&due_list_);
55     zx_status_t status = zx::port::create(0u, &port_);
56     ZX_ASSERT_MSG(status == ZX_OK,
57                   "zx_port_create: %s",
58                   zx_status_get_string(status));
59 }
60 
~TestLoopDispatcher()61 TestLoopDispatcher::~TestLoopDispatcher() {
62     Shutdown();
63     time_keeper_->CancelTimers(this);
64 };
65 
Now()66 zx::time TestLoopDispatcher::Now() { return time_keeper_->Now(); }
67 
68 // TODO(ZX-2390): Return ZX_ERR_CANCELED if dispatcher is shutting down.
BeginWait(async_wait_t * wait)69 zx_status_t TestLoopDispatcher::BeginWait(async_wait_t* wait) {
70     ZX_DEBUG_ASSERT(wait);
71 
72     // Along with the above assertion, the following check guarantees that the
73     // packet to be sent to |port_| on completion of this wait will not be
74     // mistaken for a timer expiration.
75     static_assert(0u == kTimerExpirationKey,
76                   "Timer expirations must be signaled with a packet key of 0");
77 
78     list_add_head(&wait_list_, WaitToNode(wait));
79     zx_status_t status = zx_object_wait_async(wait->object, port_.get(),
80                                               reinterpret_cast<uintptr_t>(wait),
81                                               wait->trigger,
82                                               ZX_WAIT_ASYNC_ONCE);
83 
84     if (status != ZX_OK) {
85         // In this rare condition, the wait failed. Since a dispatched handler will
86         // never be invoked on the wait object, we remove it ourselves.
87         list_delete(WaitToNode(wait));
88     }
89     return status;
90 }
91 
CancelWait(async_wait_t * wait)92 zx_status_t TestLoopDispatcher::CancelWait(async_wait_t* wait) {
93     ZX_DEBUG_ASSERT(wait);
94 
95     list_node_t* node = WaitToNode(wait);
96     if (!list_in_list(node)) {
97         return ZX_ERR_NOT_FOUND;
98     }
99 
100     // |wait| already might be encoded in |due_packet_|.
101     if (due_packet_ && due_packet_->key != kTimerExpirationKey) {
102         if (wait == reinterpret_cast<async_wait_t*>(due_packet_->key)) {
103             due_packet_.reset();
104             list_delete(node);
105             return ZX_OK;
106         }
107     }
108 
109     zx_status_t status = port_.cancel(*zx::unowned_handle(wait->object),
110                                       reinterpret_cast<uintptr_t>(wait));
111     if (status == ZX_OK) {
112         list_delete(node);
113     }
114     return status;
115 }
116 
117 // TODO(ZX-2390): Return ZX_ERR_CANCELED if dispatcher is shutting down.
PostTask(async_task_t * task)118 zx_status_t TestLoopDispatcher::PostTask(async_task_t* task) {
119     ZX_DEBUG_ASSERT(task);
120 
121     InsertTask(&task_list_, task);
122     if (NodeToTask(list_peek_head(&task_list_)) == task) {
123         time_keeper_->RegisterTimer(GetNextTaskDueTime(), this);
124     }
125     return ZX_OK;
126 }
127 
CancelTask(async_task_t * task)128 zx_status_t TestLoopDispatcher::CancelTask(async_task_t* task) {
129     ZX_DEBUG_ASSERT(task);
130     list_node_t* node = TaskToNode(task);
131     if (!list_in_list(node)) {
132         return ZX_ERR_NOT_FOUND;
133     }
134     list_delete(node);
135     return ZX_OK;
136 }
137 
FireTimer()138 void TestLoopDispatcher::FireTimer() {
139     zx_port_packet_t timer_packet{};
140     timer_packet.key = kTimerExpirationKey;
141     timer_packet.type = ZX_PKT_TYPE_USER;
142     zx_status_t status = port_.queue(&timer_packet);
143     ZX_ASSERT_MSG(status == ZX_OK,
144                   "zx_port_queue: %s",
145                   zx_status_get_string(status));
146 }
147 
GetNextTaskDueTime()148 zx::time TestLoopDispatcher::GetNextTaskDueTime() {
149     list_node_t* node = list_is_empty(&due_list_) ?
150                         list_peek_head(&task_list_) :
151                         list_peek_head(&due_list_);
152     if (!node) {
153         return zx::time::infinite();
154     }
155     return zx::time(NodeToTask(node)->deadline);
156 }
157 
158 
ExtractNextDuePacket()159 void TestLoopDispatcher::ExtractNextDuePacket() {
160     ZX_DEBUG_ASSERT(!due_packet_);
161     bool tasks_are_due = GetNextTaskDueTime() <= Now();
162 
163     // If no tasks are due, flush all timer expiration packets until either
164     // there are no more packets to dequeue or a wait packet is reached.
165     do {
166         auto packet = fbl::make_unique<zx_port_packet_t>();
167         if (ZX_OK != port_.wait(zx::time(0), packet.get())) { return; }
168         due_packet_.swap(packet);
169     } while (!tasks_are_due && due_packet_->key == kTimerExpirationKey);
170 }
171 
HasPendingWork()172 bool TestLoopDispatcher::HasPendingWork() {
173     if (GetNextTaskDueTime() <= Now()) { return true; }
174     if (!due_packet_) { ExtractNextDuePacket(); }
175     return !!due_packet_;
176 }
177 
DispatchNextDueTask()178 void TestLoopDispatcher::DispatchNextDueTask() {
179     // if something is already in the due list, dispatch that.
180     list_node_t* node = list_peek_head(&due_list_);
181     if (node) {
182         list_delete(node);
183         async_task_t* task = NodeToTask(node);
184         task->handler(this, task, ZX_OK);
185 
186         // If the due list is now empty and there are still pending tasks,
187         // register a timer for the next due time.
188         if (list_is_empty(&due_list_) && !list_is_empty(&task_list_)) {
189             time_keeper_->RegisterTimer(GetNextTaskDueTime(), this);
190         }
191     }
192 }
193 
DispatchNextDueMessage()194 bool TestLoopDispatcher::DispatchNextDueMessage() {
195     if (!list_is_empty(&due_list_)) {
196         DispatchNextDueTask();
197         return true;
198     }
199 
200     if (!due_packet_) { ExtractNextDuePacket(); }
201 
202     if (!due_packet_) {
203         return false;
204     } else if (due_packet_->key == kTimerExpirationKey) {
205         ExtractDueTasks();
206         DispatchNextDueTask();
207         due_packet_.reset();
208     } else {  // |due_packet_| encodes a finished wait.
209         // Move the next due packet to the stack, as invoking the associated
210         // wait's handler might try to extract another.
211         zx_port_packet_t packet = *due_packet_;
212         due_packet_.reset();
213         async_wait_t* wait = reinterpret_cast<async_wait_t*>(packet.key);
214         list_delete(WaitToNode(wait));
215         wait->handler(this, wait, ZX_OK, &packet.signal);
216     }
217     return true;
218 }
219 
ExtractDueTasks()220 void TestLoopDispatcher::ExtractDueTasks() {
221     list_node_t* node;
222     list_node_t* tail = nullptr;
223     zx::time current_time = time_keeper_->Now();
224     list_for_every(&task_list_, node) {
225         if (NodeToTask(node)->deadline > current_time.get()) { break; }
226         tail = node;
227     }
228     if (tail) {
229         list_node_t* head = task_list_.next;
230         task_list_.next = tail->next;
231         tail->next->prev = &task_list_;
232         due_list_.next = head;
233         head->prev = &due_list_;
234         due_list_.prev = tail;
235         tail->next = &due_list_;
236     }
237 }
238 
Shutdown()239 void TestLoopDispatcher::Shutdown() {
240     list_node_t* node;
241     while ((node = list_remove_head(&wait_list_))) {
242         async_wait_t* wait = NodeToWait(node);
243         wait->handler(this, wait, ZX_ERR_CANCELED, nullptr);
244     }
245     while ((node = list_remove_head(&due_list_))) {
246         async_task_t* task = NodeToTask(node);
247         task->handler(this, task, ZX_ERR_CANCELED);
248     }
249     while ((node = list_remove_head(&task_list_))) {
250         async_task_t* task = NodeToTask(node);
251         task->handler(this, task, ZX_ERR_CANCELED);
252     }
253 }
254 
255 } // namespace async
256