1 // Copyright 2016 The Fuchsia Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include <assert.h>
6 #include <stdarg.h>
7 #include <stdatomic.h>
8 #include <stdbool.h>
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <string.h>
12 #include <threads.h>
13 #include <unistd.h>
14 
15 #include <zircon/syscalls.h>
16 #include <zircon/threads.h>
17 #include <unittest/unittest.h>
18 
19 #include <zircon/compiler.h>
20 
21 #define ASSERT_NOT_REACHED() \
22     assert(0)
23 
24 // We have to poll a thread's state as there is no way to wait for it to
25 // transition states. Wait this amount of time. Generally the thread won't
26 // take very long so this is a compromise between polling too frequently and
27 // waiting too long.
28 #define THREAD_BLOCKED_WAIT_DURATION ZX_MSEC(1)
29 
30 enum message {
31     MSG_EXIT,
32     MSG_EXITED,
33     MSG_WAIT_EVENT,
34     MSG_WAIT_EVENT_SIGNALED,
35     MSG_WAIT_EVENT_CANCELLED,
36     MSG_PING,
37     MSG_PONG,
38     MSG_READ_CANCELLED,
39 };
40 
41 enum wait_result {
42     WAIT_READABLE,
43     WAIT_SIGNALED,
44     WAIT_CLOSED,
45     WAIT_CANCELLED,
46 };
47 
48 typedef struct thread_data {
49     int thread_num;
50     zx_handle_t channel;
51 } thread_data_t;
52 
53 typedef struct wait_data {
54     zx_handle_t handle;
55     zx_handle_t signals;
56     zx_duration_t timeout;
57     zx_status_t status;
58 } wait_data_t;
59 
60 // [0] is used by main thread
61 // [1] is used by worker thread
62 static zx_handle_t thread1_channel[2];
63 static zx_handle_t thread2_channel[2];
64 
65 static atomic_int in_wait_event = ATOMIC_VAR_INIT(0);
66 static zx_handle_t event_handle;
67 
68 // Wait until |handle| is readable or peer is closed (or wait is cancelled).
69 
wait_readable(zx_handle_t handle,enum wait_result * result)70 static bool wait_readable(zx_handle_t handle, enum wait_result* result) {
71     zx_signals_t pending;
72     zx_signals_t signals = ZX_CHANNEL_READABLE | ZX_CHANNEL_PEER_CLOSED;
73     zx_time_t deadline = ZX_TIME_INFINITE;
74     zx_status_t status = zx_object_wait_one(handle, signals, deadline, &pending);
75     if (status == ZX_ERR_CANCELED) {
76         *result = WAIT_CANCELLED;
77         return true;
78     }
79     ASSERT_GE(status, 0, "handle wait one failed");
80     if ((pending & ZX_CHANNEL_READABLE) != 0) {
81         *result = WAIT_READABLE;
82         return true;
83     }
84     unittest_printf("wait_readable: peer closed\n");
85     *result = WAIT_CLOSED;
86     return true;
87 }
88 
89 // N.B. This must use zx_object_wait_one.
90 // See wait_thread_blocked_in_wait_event.
wait_event_worker(zx_handle_t handle,enum wait_result * result)91 static bool wait_event_worker(zx_handle_t handle, enum wait_result* result) {
92     zx_signals_t pending;
93     zx_signals_t signals = ZX_EVENT_SIGNALED;
94     zx_time_t deadline = ZX_TIME_INFINITE;
95     zx_status_t status = zx_object_wait_one(handle, signals, deadline, &pending);
96     if (status == ZX_ERR_CANCELED) {
97         *result = WAIT_CANCELLED;
98         return true;
99     }
100     ASSERT_GE(status, 0, "handle wait one failed");
101     ASSERT_NE(pending & ZX_EVENT_SIGNALED, 0u,
102               "unexpected return in wait_signaled");
103     *result = WAIT_SIGNALED;
104     return true;
105 }
106 
wait_event(enum wait_result * result)107 static bool wait_event(enum wait_result* result) {
108     atomic_store(&in_wait_event, 1);
109     bool pass = wait_event_worker(event_handle, result);
110     atomic_store(&in_wait_event, 0);
111     return pass;
112 }
113 
114 // Wait for |thread| to be blocked inside wait_event().
115 // We wait forever and let Unittest's watchdog handle errors.
116 // Returns true if |thread| successfully enters the blocked state,
117 // false if there's an error somewhere.
118 // N.B. We assume wait_event() uses zx_object_wait_one.
wait_thread_blocked_in_wait_event(zx_handle_t thread)119 static bool wait_thread_blocked_in_wait_event(zx_handle_t thread) {
120     while (true) {
121         if (atomic_load(&in_wait_event)) {
122             zx_info_thread_t info;
123             ASSERT_EQ(zx_object_get_info(thread, ZX_INFO_THREAD, &info, sizeof(info), NULL, NULL),
124                       ZX_OK, "");
125             if (info.state == ZX_THREAD_STATE_BLOCKED_WAIT_ONE)
126                 break;
127         }
128         zx_nanosleep(zx_deadline_after(THREAD_BLOCKED_WAIT_DURATION));
129     }
130 
131     return true;
132 }
133 
channel_create(zx_handle_t * handle0,zx_handle_t * handle1)134 static zx_status_t channel_create(zx_handle_t* handle0, zx_handle_t* handle1) {
135     return zx_channel_create(0, handle0, handle1);
136 }
137 
send_msg(zx_handle_t handle,enum message msg)138 static bool send_msg(zx_handle_t handle, enum message msg) {
139     uint64_t data = msg;
140     unittest_printf("sending message %d on handle %u\n", msg, handle);
141     zx_status_t status =
142         zx_channel_write(handle, 0, &data, sizeof(data), NULL, 0);
143     ASSERT_GE(status, 0, "message write failed");
144     return true;
145 }
146 
recv_msg(zx_handle_t handle,enum message * msg)147 static bool recv_msg(zx_handle_t handle, enum message* msg) {
148     uint64_t data;
149 
150     unittest_printf("waiting for message on handle %u\n", handle);
151     enum wait_result result;
152     ASSERT_TRUE(wait_readable(handle, &result), "Error during waiting for read call");
153     ASSERT_NE(result, (enum wait_result)WAIT_CLOSED, "peer closed while trying to read message");
154     switch (result) {
155     case WAIT_READABLE:
156         break;
157     case WAIT_CANCELLED:
158         unittest_printf("read wait cancelled\n");
159         *msg = MSG_READ_CANCELLED;
160         return true;
161     default:
162         ASSERT_TRUE(false, "Invalid read-wait status");
163     }
164 
165     uint32_t num_bytes = sizeof(data);
166 
167     ASSERT_GE(zx_channel_read(handle, 0, &data, NULL, num_bytes, 0, &num_bytes, NULL), 0,
168               "Error while reading message");
169     EXPECT_EQ(num_bytes, sizeof(data), "unexpected message size");
170     if (num_bytes != sizeof(data)) {
171         zx_thread_exit();
172     }
173     *msg = (enum message)data;
174     unittest_printf("received message %d\n", *msg);
175     return true;
176 }
177 
msg_loop(zx_handle_t channel)178 static bool msg_loop(zx_handle_t channel) {
179     bool my_done_tests = false;
180     while (!my_done_tests) {
181         enum message msg;
182         enum wait_result result;
183         ASSERT_TRUE(recv_msg(channel, &msg), "Error while receiving msg");
184         switch (msg) {
185         case MSG_EXIT:
186             my_done_tests = true;
187             break;
188         case MSG_PING:
189             send_msg(channel, MSG_PONG);
190             break;
191         case MSG_WAIT_EVENT:
192             ASSERT_TRUE(wait_event(&result), "Error during wait signal call");
193             switch (result) {
194             case WAIT_SIGNALED:
195                 send_msg(channel, MSG_WAIT_EVENT_SIGNALED);
196                 break;
197             case WAIT_CANCELLED:
198                 send_msg(channel, MSG_WAIT_EVENT_CANCELLED);
199                 break;
200             default:
201                 ASSERT_TRUE(false, "Invalid wait signal");
202             }
203             break;
204         default:
205             unittest_printf("unknown message received: %d", msg);
206             break;
207         }
208     }
209     return true;
210 }
211 
worker_thread_func(void * arg)212 static int worker_thread_func(void* arg) {
213     thread_data_t* data = arg;
214     msg_loop(data->channel);
215     unittest_printf("thread %d exiting\n", data->thread_num);
216     send_msg(data->channel, MSG_EXITED);
217     return 0;
218 }
219 
220 
wait_thread_func(void * arg)221 static int wait_thread_func(void* arg) {
222     wait_data_t* data = arg;
223     zx_signals_t observed;
224     data->status = zx_object_wait_one(data->handle, data->signals, zx_deadline_after(data->timeout),
225                                       &observed);
226     return 0;
227 }
228 
handle_wait_test(void)229 bool handle_wait_test(void) {
230     BEGIN_TEST;
231 
232     ASSERT_GE(channel_create(&thread1_channel[0], &thread1_channel[1]), 0, "channel creation failed");
233     ASSERT_GE(channel_create(&thread2_channel[0], &thread2_channel[1]), 0, "channel creation failed");
234 
235     thread_data_t thread1_data = {1, thread1_channel[1]};
236     thread_data_t thread2_data = {2, thread2_channel[1]};
237 
238     thrd_t thread1;
239     ASSERT_EQ(thrd_create(&thread1, worker_thread_func, &thread1_data), thrd_success,
240               "thread creation failed");
241     thrd_t thread2;
242     ASSERT_EQ(thrd_create(&thread2, worker_thread_func, &thread2_data), thrd_success,
243               "thread creation failed");
244     unittest_printf("threads started\n");
245 
246     event_handle = ZX_HANDLE_INVALID;
247     ASSERT_EQ(zx_event_create(0u, &event_handle), 0, "");
248     ASSERT_NE(event_handle, ZX_HANDLE_INVALID, "event creation failed");
249 
250     enum message msg;
251     send_msg(thread1_channel[0], MSG_PING);
252     ASSERT_TRUE(recv_msg(thread1_channel[0], &msg), "Error while receiving msg");
253     EXPECT_EQ(msg, (enum message)MSG_PONG, "unexpected reply to ping1");
254 
255     send_msg(thread1_channel[0], MSG_WAIT_EVENT);
256 
257     send_msg(thread2_channel[0], MSG_PING);
258     ASSERT_TRUE(recv_msg(thread2_channel[0], &msg), "Error while receiving msg");
259     EXPECT_EQ(msg, (enum message)MSG_PONG, "unexpected reply to ping2");
260 
261     // Verify thread 1 is woken up when we close the handle it's waiting on
262     // when there exists a duplicate of the handle.
263     // But first make sure the thread is waiting on |event_handle| before we
264     // close it.
265     zx_handle_t thread1_handle = thrd_get_zx_handle(thread1);
266     ASSERT_TRUE(wait_thread_blocked_in_wait_event(thread1_handle), "");
267 
268     zx_handle_t event_handle_dup = ZX_HANDLE_INVALID;
269     zx_status_t status = zx_handle_duplicate(event_handle, ZX_RIGHT_SAME_RIGHTS, &event_handle_dup);
270     ASSERT_EQ(status, ZX_OK, "");
271     ASSERT_NE(event_handle_dup, ZX_HANDLE_INVALID, "handle duplication failed");
272     ASSERT_EQ(zx_handle_close(event_handle), ZX_OK, "handle close failed");
273 
274     ASSERT_TRUE(recv_msg(thread1_channel[0], &msg), "Error while receiving msg");
275     ASSERT_EQ(msg, (enum message)MSG_WAIT_EVENT_CANCELLED,
276               "unexpected reply from thread1 (wait for event)");
277 
278     send_msg(thread1_channel[0], MSG_EXIT);
279     send_msg(thread2_channel[0], MSG_EXIT);
280     EXPECT_EQ(thrd_join(thread1, NULL), thrd_success, "failed to join thread");
281     EXPECT_EQ(thrd_join(thread2, NULL), thrd_success, "failed to join thread");
282     EXPECT_EQ(zx_handle_close(event_handle_dup), ZX_OK, "handle close failed");
283     END_TEST;
284 }
285 
286 BEGIN_TEST_CASE(handle_wait_tests)
287 RUN_TEST(handle_wait_test);
END_TEST_CASE(handle_wait_tests)288 END_TEST_CASE(handle_wait_tests)
289 
290 #ifndef BUILD_COMBINED_TESTS
291 int main(int argc, char** argv) {
292     bool success = unittest_run_all_tests(argc, argv);
293     return success ? 0 : -1;
294 }
295 #endif
296