1 // Copyright 2016 The Fuchsia Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <assert.h>
6 #include <errno.h>
7 #include <fcntl.h>
8 #include <stdbool.h>
9 #include <stdint.h>
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include <string.h>
13 #include <threads.h>
14 #include <unistd.h>
15
16 #include <zircon/compiler.h>
17 #include <zircon/listnode.h>
18 #include <lib/zircon-internal/xorshiftrand.h>
19 #include <zircon/types.h>
20
21 #include "filesystems.h"
22 #include "misc.h"
23
24 #define FAIL -1
25 #define BUSY 0
26 #define DONE 1
27
28 #define FBUFSIZE 65536
29
30 static_assert(FBUFSIZE == ((FBUFSIZE / sizeof(uint64_t)) * sizeof(uint64_t)),
31 "FBUFSIZE not multiple of uint64_t");
32
33 typedef struct worker worker_t;
34 // global environment variables
35 typedef struct env {
36 worker_t* all_workers;
37
38 list_node_t threads;
39 } env_t;
40
41 typedef struct worker {
42 env_t* env;
43
44 worker_t* next;
45 int (*work)(worker_t* w);
46
47 rand64_t rdata;
48 rand32_t rops;
49
50 int fd;
51 int status;
52 uint32_t flags;
53 uint32_t size;
54 uint32_t pos;
55
56 union {
57 uint8_t u8[FBUFSIZE];
58 uint64_t u64[FBUFSIZE / sizeof(uint64_t)];
59 };
60
61 char name[256];
62 } worker_t;
63 #define F_RAND_IOSIZE 1
64
65
66 bool worker_new(env_t* env, const char* where, const char* fn,
67 int (*work)(worker_t* w), uint32_t size, uint32_t flags);
68 int worker_writer(worker_t* w);
69 static bool init_environment(env_t* env);
70
71 typedef struct thread_list {
72 list_node_t node;
73 thrd_t t;
74 } thread_list_t;
75
worker_rw(worker_t * w,bool do_read)76 int worker_rw(worker_t* w, bool do_read) {
77 if (w->pos == w->size) {
78 return DONE;
79 }
80
81 // offset into buffer
82 uint32_t off = w->pos % FBUFSIZE;
83
84 // fill our content buffer if it's empty
85 if (off == 0) {
86 for (unsigned n = 0; n < (FBUFSIZE / sizeof(uint64_t)); n++) {
87 w->u64[n] = rand64(&w->rdata);
88 }
89 }
90
91 // data in buffer available to write
92 uint32_t xfer = FBUFSIZE - off;
93
94 // do not exceed our desired size
95 if (xfer > (w->size - w->pos)) {
96 xfer = w->size - w->pos;
97 }
98
99 if ((w->flags & F_RAND_IOSIZE) && (xfer > 3000)) {
100 xfer = 3000 + (rand32(&w->rops) % (xfer - 3000));
101 }
102
103 int r;
104 if (do_read) {
105 uint8_t buffer[FBUFSIZE];
106 if ((r = read(w->fd, buffer, xfer)) < 0) {
107 fprintf(stderr, "worker('%s') read failed @%u: %d\n",
108 w->name, w->pos, errno);
109 return FAIL;
110 }
111 if (memcmp(buffer, w->u8 + off, r)) {
112 fprintf(stderr, "worker('%s) verify failed @%u\n",
113 w->name, w->pos);
114 return FAIL;
115 }
116 } else {
117 if ((r = write(w->fd, w->u8 + off, xfer)) < 0) {
118 fprintf(stderr, "worker('%s') write failed @%u: %d\n",
119 w->name, w->pos, errno);
120 return FAIL;
121 }
122 }
123
124 // advance
125 w->pos += r;
126 return BUSY;
127 }
128
worker_verify(worker_t * w)129 int worker_verify(worker_t* w) {
130 int r = worker_rw(w, true);
131 if (r == DONE) {
132 close(w->fd);
133 }
134 return r;
135 }
136
worker_writer(worker_t * w)137 int worker_writer(worker_t* w) {
138 int r = worker_rw(w, false);
139 if (r == DONE) {
140 if (lseek(w->fd, 0, SEEK_SET) != 0) {
141 fprintf(stderr, "worker('%s') seek failed: %s\n",
142 w->name, strerror(errno));
143 return FAIL;
144 }
145 // start at 0 and reset our data generator seed
146 srand64(&w->rdata, w->name);
147 w->pos = 0;
148 w->work = worker_verify;
149 return BUSY;
150 }
151 return r;
152 }
153
worker_new(env_t * env,const char * where,const char * fn,int (* work)(worker_t * w),uint32_t size,uint32_t flags)154 bool worker_new(env_t* env, const char* where, const char* fn,
155 int (*work)(worker_t* w), uint32_t size, uint32_t flags) {
156 worker_t* w = calloc(1, sizeof(worker_t));
157 ASSERT_NE(w, NULL, "");
158
159 w->env = env;
160
161 snprintf(w->name, sizeof(w->name), "%s%s", where, fn);
162 srand64(&w->rdata, w->name);
163 srand32(&w->rops, w->name);
164 w->size = size;
165 w->work = work;
166 w->flags = flags;
167
168 ASSERT_GT((w->fd = open(w->name, O_RDWR | O_CREAT | O_EXCL, 0644)), 0, "");
169
170 w->next = w->env->all_workers;
171 env->all_workers = w;
172
173 return true;
174 }
175
do_work(env_t * env)176 int do_work(env_t* env) {
177 uint32_t busy_count = 0;
178 for (worker_t* w = env->all_workers; w != NULL; w = w->next) {
179 w->env = env;
180 if (w->status == BUSY) {
181 busy_count++;
182 if ((w->status = w->work(w)) == FAIL) {
183 EXPECT_EQ(unlink(w->name), 0, "");
184 return FAIL;
185 }
186 if (w->status == DONE) {
187 fprintf(stderr, "worker('%s') finished\n", w->name);
188 EXPECT_EQ(unlink(w->name), 0, "");
189 }
190 }
191 }
192 return busy_count ? BUSY : DONE;
193 }
194
test_work_single_thread(void)195 bool test_work_single_thread(void) {
196 BEGIN_TEST;
197
198 env_t env;
199 init_environment(&env);
200
201 for (;;) {
202 int r = do_work(&env);
203 assert(r != FAIL);
204 if (r == DONE) {
205 break;
206 }
207 }
208
209 worker_t* w = env.all_workers;
210 worker_t* next;
211 while (w != NULL) {
212 next = w->next;
213 free(w);
214 w = next;
215 }
216 END_TEST;
217 }
218
219 #define KB(n) ((n)*1024)
220 #define MB(n) ((n)*1024 * 1024)
221
222 static struct {
223 int (*work)(worker_t*);
224 const char* name;
225 uint32_t size;
226 uint32_t flags;
227 } WORK[] = {
228 {
229 worker_writer, "file0000", KB(512), F_RAND_IOSIZE,
230 },
231 {
232 worker_writer, "file0001", MB(10), F_RAND_IOSIZE,
233 },
234 {
235 worker_writer, "file0002", KB(512), F_RAND_IOSIZE,
236 },
237 {
238 worker_writer, "file0003", KB(512), F_RAND_IOSIZE,
239 },
240 {
241 worker_writer, "file0004", KB(512), 0,
242 },
243 {
244 worker_writer, "file0005", MB(20), 0,
245 },
246 {
247 worker_writer, "file0006", KB(512), 0,
248 },
249 {
250 worker_writer, "file0007", KB(512), 0,
251 },
252 };
253
init_environment(env_t * env)254 static bool init_environment(env_t* env) {
255
256 // tests are run repeatedly, so reinitialize each time
257 env->all_workers = NULL;
258
259 list_initialize(&env->threads);
260
261 // assemble the work
262 const char* where = "::";
263 for (unsigned n = 0; n < countof(WORK); n++) {
264 ASSERT_TRUE(worker_new(env, where, WORK[n].name, WORK[n].work,
265 WORK[n].size, WORK[n].flags), "");
266 }
267 return true;
268 }
269
do_threaded_work(void * arg)270 static int do_threaded_work(void* arg) {
271 worker_t* w = arg;
272
273 fprintf(stderr, "work thread(%s) started\n", w->name);
274 while ((w->status = w->work(w)) == BUSY) {
275 thrd_yield();
276 }
277
278 fprintf(stderr, "work thread(%s) %s\n", w->name,
279 w->status == DONE ? "finished" : "failed");
280 EXPECT_EQ(unlink(w->name), 0, "");
281
282 zx_status_t status = w->status;
283 free(w);
284 return status;
285 }
286
test_work_concurrently(void)287 static bool test_work_concurrently(void) {
288 BEGIN_TEST;
289
290 env_t env;
291 ASSERT_TRUE(init_environment(&env), "");
292
293 for (worker_t* w = env.all_workers; w != NULL; w = w->next) {
294 // start the workers on separate threads
295 thrd_t t;
296 ASSERT_EQ(thrd_create(&t, do_threaded_work, w), thrd_success, "");
297 thread_list_t* thread = malloc(sizeof(thread_list_t));
298 ASSERT_NE(thread, NULL, "");
299 thread->t = t;
300 list_add_tail(&env.threads, &thread->node);
301 }
302
303 thread_list_t* next;
304 thread_list_t* tmp;
305 list_for_every_entry_safe(&env.threads, next, tmp, thread_list_t, node) {
306 int rc;
307 ASSERT_EQ(thrd_join(next->t, &rc), thrd_success, "");
308 ASSERT_EQ(rc, DONE, "Thread joined, but failed");
309 free(next);
310 }
311
312 END_TEST;
313 }
314
315 const test_disk_t disk = {
316 .block_count = 3 * (1LLU << 16),
317 .block_size = 1LLU << 9,
318 .slice_size = 1LLU << 23,
319 };
320
321 RUN_FOR_ALL_FILESYSTEMS_SIZE(rw_workers_test, disk,
322 RUN_TEST_MEDIUM(test_work_single_thread)
323 RUN_TEST_LARGE(test_work_concurrently)
324 )
325