1 // Copyright 2016 The Fuchsia Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include <assert.h>
6 #include <errno.h>
7 #include <fcntl.h>
8 #include <stdbool.h>
9 #include <stdint.h>
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include <string.h>
13 #include <threads.h>
14 #include <unistd.h>
15 
16 #include <zircon/compiler.h>
17 #include <zircon/listnode.h>
18 #include <lib/zircon-internal/xorshiftrand.h>
19 #include <zircon/types.h>
20 
21 #include "filesystems.h"
22 #include "misc.h"
23 
24 #define FAIL -1
25 #define BUSY 0
26 #define DONE 1
27 
28 #define FBUFSIZE 65536
29 
30 static_assert(FBUFSIZE == ((FBUFSIZE / sizeof(uint64_t)) * sizeof(uint64_t)),
31               "FBUFSIZE not multiple of uint64_t");
32 
33 typedef struct worker worker_t;
34 // global environment variables
35 typedef struct env {
36     worker_t* all_workers;
37 
38     list_node_t threads;
39 } env_t;
40 
41 typedef struct worker {
42     env_t* env;
43 
44     worker_t* next;
45     int (*work)(worker_t* w);
46 
47     rand64_t rdata;
48     rand32_t rops;
49 
50     int fd;
51     int status;
52     uint32_t flags;
53     uint32_t size;
54     uint32_t pos;
55 
56     union {
57         uint8_t u8[FBUFSIZE];
58         uint64_t u64[FBUFSIZE / sizeof(uint64_t)];
59     };
60 
61     char name[256];
62 } worker_t;
63 #define F_RAND_IOSIZE 1
64 
65 
66 bool worker_new(env_t* env, const char* where, const char* fn,
67                 int (*work)(worker_t* w), uint32_t size, uint32_t flags);
68 int worker_writer(worker_t* w);
69 static bool init_environment(env_t* env);
70 
71 typedef struct thread_list {
72     list_node_t node;
73     thrd_t t;
74 } thread_list_t;
75 
worker_rw(worker_t * w,bool do_read)76 int worker_rw(worker_t* w, bool do_read) {
77     if (w->pos == w->size) {
78         return DONE;
79     }
80 
81     // offset into buffer
82     uint32_t off = w->pos % FBUFSIZE;
83 
84     // fill our content buffer if it's empty
85     if (off == 0) {
86         for (unsigned n = 0; n < (FBUFSIZE / sizeof(uint64_t)); n++) {
87             w->u64[n] = rand64(&w->rdata);
88         }
89     }
90 
91     // data in buffer available to write
92     uint32_t xfer = FBUFSIZE - off;
93 
94     // do not exceed our desired size
95     if (xfer > (w->size - w->pos)) {
96         xfer = w->size - w->pos;
97     }
98 
99     if ((w->flags & F_RAND_IOSIZE) && (xfer > 3000)) {
100         xfer = 3000 + (rand32(&w->rops) % (xfer - 3000));
101     }
102 
103     int r;
104     if (do_read) {
105         uint8_t buffer[FBUFSIZE];
106         if ((r = read(w->fd, buffer, xfer)) < 0) {
107             fprintf(stderr, "worker('%s') read failed @%u: %d\n",
108                     w->name, w->pos, errno);
109             return FAIL;
110         }
111         if (memcmp(buffer, w->u8 + off, r)) {
112             fprintf(stderr, "worker('%s) verify failed @%u\n",
113                     w->name, w->pos);
114             return FAIL;
115         }
116     } else {
117         if ((r = write(w->fd, w->u8 + off, xfer)) < 0) {
118             fprintf(stderr, "worker('%s') write failed @%u: %d\n",
119                     w->name, w->pos, errno);
120             return FAIL;
121         }
122     }
123 
124     // advance
125     w->pos += r;
126     return BUSY;
127 }
128 
worker_verify(worker_t * w)129 int worker_verify(worker_t* w) {
130     int r = worker_rw(w, true);
131     if (r == DONE) {
132         close(w->fd);
133     }
134     return r;
135 }
136 
worker_writer(worker_t * w)137 int worker_writer(worker_t* w) {
138     int r = worker_rw(w, false);
139     if (r == DONE) {
140         if (lseek(w->fd, 0, SEEK_SET) != 0) {
141             fprintf(stderr, "worker('%s') seek failed: %s\n",
142                     w->name, strerror(errno));
143             return FAIL;
144         }
145         // start at 0 and reset our data generator seed
146         srand64(&w->rdata, w->name);
147         w->pos = 0;
148         w->work = worker_verify;
149         return BUSY;
150     }
151     return r;
152 }
153 
worker_new(env_t * env,const char * where,const char * fn,int (* work)(worker_t * w),uint32_t size,uint32_t flags)154 bool worker_new(env_t* env, const char* where, const char* fn,
155                 int (*work)(worker_t* w), uint32_t size, uint32_t flags) {
156     worker_t* w = calloc(1, sizeof(worker_t));
157     ASSERT_NE(w, NULL, "");
158 
159     w->env = env;
160 
161     snprintf(w->name, sizeof(w->name), "%s%s", where, fn);
162     srand64(&w->rdata, w->name);
163     srand32(&w->rops, w->name);
164     w->size = size;
165     w->work = work;
166     w->flags = flags;
167 
168     ASSERT_GT((w->fd = open(w->name, O_RDWR | O_CREAT | O_EXCL, 0644)), 0, "");
169 
170     w->next = w->env->all_workers;
171     env->all_workers = w;
172 
173     return true;
174 }
175 
do_work(env_t * env)176 int do_work(env_t* env) {
177     uint32_t busy_count = 0;
178     for (worker_t* w = env->all_workers; w != NULL; w = w->next) {
179         w->env = env;
180         if (w->status == BUSY) {
181             busy_count++;
182             if ((w->status = w->work(w)) == FAIL) {
183                 EXPECT_EQ(unlink(w->name), 0, "");
184                 return FAIL;
185             }
186             if (w->status == DONE) {
187                 fprintf(stderr, "worker('%s') finished\n", w->name);
188                 EXPECT_EQ(unlink(w->name), 0, "");
189             }
190         }
191     }
192     return busy_count ? BUSY : DONE;
193 }
194 
test_work_single_thread(void)195 bool test_work_single_thread(void) {
196     BEGIN_TEST;
197 
198     env_t env;
199     init_environment(&env);
200 
201     for (;;) {
202         int r = do_work(&env);
203         assert(r != FAIL);
204         if (r == DONE) {
205             break;
206         }
207     }
208 
209     worker_t* w = env.all_workers;
210     worker_t* next;
211     while (w != NULL) {
212         next = w->next;
213         free(w);
214         w = next;
215     }
216     END_TEST;
217 }
218 
219 #define KB(n) ((n)*1024)
220 #define MB(n) ((n)*1024 * 1024)
221 
222 static struct {
223     int (*work)(worker_t*);
224     const char* name;
225     uint32_t size;
226     uint32_t flags;
227 } WORK[] = {
228     {
229         worker_writer, "file0000", KB(512), F_RAND_IOSIZE,
230     },
231     {
232         worker_writer, "file0001", MB(10), F_RAND_IOSIZE,
233     },
234     {
235         worker_writer, "file0002", KB(512), F_RAND_IOSIZE,
236     },
237     {
238         worker_writer, "file0003", KB(512), F_RAND_IOSIZE,
239     },
240     {
241         worker_writer, "file0004", KB(512), 0,
242     },
243     {
244         worker_writer, "file0005", MB(20), 0,
245     },
246     {
247         worker_writer, "file0006", KB(512), 0,
248     },
249     {
250         worker_writer, "file0007", KB(512), 0,
251     },
252 };
253 
init_environment(env_t * env)254 static bool init_environment(env_t* env) {
255 
256     // tests are run repeatedly, so reinitialize each time
257     env->all_workers = NULL;
258 
259     list_initialize(&env->threads);
260 
261     // assemble the work
262     const char* where = "::";
263     for (unsigned n = 0; n < countof(WORK); n++) {
264         ASSERT_TRUE(worker_new(env, where, WORK[n].name, WORK[n].work,
265                                WORK[n].size, WORK[n].flags), "");
266     }
267     return true;
268 }
269 
do_threaded_work(void * arg)270 static int do_threaded_work(void* arg) {
271     worker_t* w = arg;
272 
273     fprintf(stderr, "work thread(%s) started\n", w->name);
274     while ((w->status = w->work(w)) == BUSY) {
275         thrd_yield();
276     }
277 
278     fprintf(stderr, "work thread(%s) %s\n", w->name,
279             w->status == DONE ? "finished" : "failed");
280     EXPECT_EQ(unlink(w->name), 0, "");
281 
282     zx_status_t status = w->status;
283     free(w);
284     return status;
285 }
286 
test_work_concurrently(void)287 static bool test_work_concurrently(void) {
288     BEGIN_TEST;
289 
290     env_t env;
291     ASSERT_TRUE(init_environment(&env), "");
292 
293     for (worker_t* w = env.all_workers; w != NULL; w = w->next) {
294         // start the workers on separate threads
295         thrd_t t;
296         ASSERT_EQ(thrd_create(&t, do_threaded_work, w), thrd_success, "");
297         thread_list_t* thread = malloc(sizeof(thread_list_t));
298         ASSERT_NE(thread, NULL, "");
299         thread->t = t;
300         list_add_tail(&env.threads, &thread->node);
301     }
302 
303     thread_list_t* next;
304     thread_list_t* tmp;
305     list_for_every_entry_safe(&env.threads, next, tmp, thread_list_t, node) {
306         int rc;
307         ASSERT_EQ(thrd_join(next->t, &rc), thrd_success, "");
308         ASSERT_EQ(rc, DONE, "Thread joined, but failed");
309         free(next);
310     }
311 
312     END_TEST;
313 }
314 
315 const test_disk_t disk = {
316     .block_count = 3 * (1LLU << 16),
317     .block_size = 1LLU << 9,
318     .slice_size = 1LLU << 23,
319 };
320 
321 RUN_FOR_ALL_FILESYSTEMS_SIZE(rw_workers_test, disk,
322     RUN_TEST_MEDIUM(test_work_single_thread)
323     RUN_TEST_LARGE(test_work_concurrently)
324 )
325