1 /*
2 * Copyright (c) 2015 Carlos Pizano-Uribe cpu@chromium.org
3 *
4 * Use of this source code is governed by a MIT-style
5 * license that can be found in the LICENSE file or at
6 * https://opensource.org/licenses/MIT
7 */
8
9 /**
10 * @file
11 * @brief Port object functions
12 * @defgroup event Events
13 *
14 */
15
16 #include <kernel/port.h>
17
18 #include <kernel/thread.h>
19 #include <lk/debug.h>
20 #include <lk/err.h>
21 #include <lk/list.h>
22 #include <lk/pow2.h>
23 #include <malloc.h>
24 #include <string.h>
25
26 // write ports can be in two states, open and closed, which have a
27 // different magic number.
28
29 #define WRITEPORT_MAGIC_W (0x70727477) // 'prtw'
30 #define WRITEPORT_MAGIC_X (0x70727478) // 'prtx'
31
32 #define READPORT_MAGIC (0x70727472) // 'prtr'
33 #define PORTGROUP_MAGIC (0x70727467) // 'prtg'
34
35 #define PORT_BUFF_SIZE 8
36 #define PORT_BUFF_SIZE_BIG 64
37
38 #define RESCHEDULE_POLICY 1
39
40 #define MAX_PORT_GROUP_COUNT 256
41
42 typedef struct {
43 uint log2;
44 uint avail;
45 uint head;
46 uint tail;
47 port_packet_t packet[1];
48 } port_buf_t;
49
50 typedef struct {
51 int magic;
52 struct list_node node;
53 port_buf_t *buf;
54 struct list_node rp_list;
55 port_mode_t mode;
56 char name[PORT_NAME_LEN];
57 } write_port_t;
58
59 typedef struct {
60 int magic;
61 wait_queue_t wait;
62 struct list_node rp_list;
63 } port_group_t;
64
65 typedef struct {
66 int magic;
67 struct list_node w_node;
68 struct list_node g_node;
69 port_buf_t *buf;
70 void *ctx;
71 wait_queue_t wait;
72 write_port_t *wport;
73 port_group_t *gport;
74 } read_port_t;
75
76
77 static struct list_node write_port_list;
78
79
make_buf(bool big)80 static port_buf_t *make_buf(bool big) {
81 uint pk_count = big ? PORT_BUFF_SIZE_BIG : PORT_BUFF_SIZE;
82 uint size = sizeof(port_buf_t) + ((pk_count - 1) * sizeof(port_packet_t));
83 port_buf_t *buf = (port_buf_t *) malloc(size);
84 if (!buf)
85 return NULL;
86 buf->log2 = log2_uint(pk_count);
87 buf->head = buf->tail = 0;
88 buf->avail = pk_count;
89 return buf;
90 }
91
buf_is_empty(port_buf_t * buf)92 static inline bool buf_is_empty(port_buf_t *buf) {
93 return buf->avail == valpow2(buf->log2);
94 }
95
buf_write(port_buf_t * buf,const port_packet_t * packets,size_t count)96 static status_t buf_write(port_buf_t *buf, const port_packet_t *packets, size_t count) {
97 if (buf->avail < count)
98 return ERR_NOT_ENOUGH_BUFFER;
99
100 for (size_t ix = 0; ix != count; ix++) {
101 buf->packet[buf->tail] = packets[ix];
102 buf->tail = modpow2(++buf->tail, buf->log2);
103 }
104 buf->avail -= count;
105 return NO_ERROR;
106 }
107
buf_read(port_buf_t * buf,port_result_t * pr)108 static status_t buf_read(port_buf_t *buf, port_result_t *pr) {
109 if (buf_is_empty(buf))
110 return ERR_NO_MSG;
111 pr->packet = buf->packet[buf->head];
112 buf->head = modpow2(++buf->head, buf->log2);
113 ++buf->avail;
114 return NO_ERROR;
115 }
116
117 // must be called before any use of ports.
port_init(void)118 void port_init(void) {
119 list_initialize(&write_port_list);
120 }
121
port_create(const char * name,port_mode_t mode,port_t * port)122 status_t port_create(const char *name, port_mode_t mode, port_t *port) {
123 if (!name || !port)
124 return ERR_INVALID_ARGS;
125
126 // only unicast ports can have a large buffer.
127 if (mode & PORT_MODE_BROADCAST) {
128 if (mode & PORT_MODE_BIG_BUFFER)
129 return ERR_INVALID_ARGS;
130 }
131
132 if (strlen(name) >= PORT_NAME_LEN)
133 return ERR_INVALID_ARGS;
134
135 // lookup for existing port, return that if found.
136 write_port_t *wp = NULL;
137 THREAD_LOCK(state1);
138 list_for_every_entry(&write_port_list, wp, write_port_t, node) {
139 if (strcmp(wp->name, name) == 0) {
140 // can't return closed ports.
141 if (wp->magic == WRITEPORT_MAGIC_X)
142 wp = NULL;
143 THREAD_UNLOCK(state1);
144 if (wp) {
145 *port = (void *) wp;
146 return ERR_ALREADY_EXISTS;
147 } else {
148 return ERR_BUSY;
149 }
150 }
151 }
152 THREAD_UNLOCK(state1);
153
154 // not found, create the write port and the circular buffer.
155 wp = calloc(1, sizeof(write_port_t));
156 if (!wp)
157 return ERR_NO_MEMORY;
158
159 wp->magic = WRITEPORT_MAGIC_W;
160 wp->mode = mode;
161 strlcpy(wp->name, name, sizeof(wp->name));
162 list_initialize(&wp->rp_list);
163
164 wp->buf = make_buf(mode & PORT_MODE_BIG_BUFFER);
165 if (!wp->buf) {
166 free(wp);
167 return ERR_NO_MEMORY;
168 }
169
170 // todo: race condtion! a port with the same name could have been created
171 // by another thread at is point.
172 THREAD_LOCK(state2);
173 list_add_tail(&write_port_list, &wp->node);
174 THREAD_UNLOCK(state2);
175
176 *port = (void *)wp;
177 return NO_ERROR;
178 }
179
port_open(const char * name,void * ctx,port_t * port)180 status_t port_open(const char *name, void *ctx, port_t *port) {
181 if (!name || !port)
182 return ERR_INVALID_ARGS;
183
184 // assume success; create the read port and buffer now.
185 read_port_t *rp = calloc(1, sizeof(read_port_t));
186 if (!rp)
187 return ERR_NO_MEMORY;
188
189 rp->magic = READPORT_MAGIC;
190 wait_queue_init(&rp->wait);
191 rp->ctx = ctx;
192
193 // |buf| might not be needed, but we always allocate outside the lock.
194 // this buffer is only needed for broadcast ports, but we don't know
195 // that here.
196 port_buf_t *buf = make_buf(false); // Small is enough.
197 if (!buf) {
198 free(rp);
199 return ERR_NO_MEMORY;
200 }
201
202 // find the named write port and associate it with read port.
203 status_t rc = ERR_NOT_FOUND;
204
205 THREAD_LOCK(state);
206 write_port_t *wp = NULL;
207 list_for_every_entry(&write_port_list, wp, write_port_t, node) {
208 if (strcmp(wp->name, name) == 0) {
209 // found; add read port to write port list.
210 rp->wport = wp;
211 if (wp->buf) {
212 // this is the first read port; transfer the circular buffer.
213 list_add_tail(&wp->rp_list, &rp->w_node);
214 rp->buf = wp->buf;
215 wp->buf = NULL;
216 rc = NO_ERROR;
217 } else if (buf) {
218 // not first read port.
219 if (wp->mode & PORT_MODE_UNICAST) {
220 // cannot add a second listener.
221 rc = ERR_NOT_ALLOWED;
222 break;
223 }
224 // use the new (small) circular buffer.
225 list_add_tail(&wp->rp_list, &rp->w_node);
226 rp->buf = buf;
227 buf = NULL;
228 rc = NO_ERROR;
229 } else {
230 // |buf| allocation failed and the buffer was needed.
231 rc = ERR_NO_MEMORY;
232 }
233 break;
234 }
235 }
236 THREAD_UNLOCK(state);
237
238 if (buf)
239 free(buf);
240
241 if (rc == NO_ERROR) {
242 *port = (void *)rp;
243 } else {
244 free(rp);
245 }
246 return rc;
247 }
248
port_group(port_t * ports,size_t count,port_t * group)249 status_t port_group(port_t *ports, size_t count, port_t *group) {
250 if (count > MAX_PORT_GROUP_COUNT)
251 return ERR_TOO_BIG;
252
253 // Allow empty port groups.
254 if (count && !ports)
255 return ERR_INVALID_ARGS;
256
257 if (!group)
258 return ERR_INVALID_ARGS;
259
260 // assume success; create port group now.
261 port_group_t *pg = calloc(1, sizeof(port_group_t));
262 if (!pg)
263 return ERR_NO_MEMORY;
264
265 pg->magic = PORTGROUP_MAGIC;
266 wait_queue_init(&pg->wait);
267 list_initialize(&pg->rp_list);
268
269 status_t rc = NO_ERROR;
270
271 THREAD_LOCK(state);
272 for (size_t ix = 0; ix != count; ix++) {
273 read_port_t *rp = (read_port_t *)ports[ix];
274 if ((rp->magic != READPORT_MAGIC) || rp->gport) {
275 // wrong type of port, or port already part of a group,
276 // in any case, undo the changes to the previous read ports.
277 for (size_t jx = 0; jx != ix; jx++) {
278 ((read_port_t *)ports[jx])->gport = NULL;
279 }
280 rc = ERR_BAD_HANDLE;
281 break;
282 }
283 // link port group and read port.
284 rp->gport = pg;
285 list_add_tail(&pg->rp_list, &rp->g_node);
286 }
287 THREAD_UNLOCK(state);
288
289 if (rc == NO_ERROR) {
290 *group = (port_t *)pg;
291 } else {
292 free(pg);
293 }
294 return rc;
295 }
296
port_group_add(port_t group,port_t port)297 status_t port_group_add(port_t group, port_t port) {
298 if (!port || !group)
299 return ERR_INVALID_ARGS;
300
301 // Make sure the user has actually passed in a port group and a read-port.
302 port_group_t *pg = (port_group_t *)group;
303 if (pg->magic != PORTGROUP_MAGIC)
304 return ERR_INVALID_ARGS;
305
306 read_port_t *rp = (read_port_t *)port;
307 if (rp->magic != READPORT_MAGIC || rp->gport)
308 return ERR_BAD_HANDLE;
309
310 status_t rc = NO_ERROR;
311 THREAD_LOCK(state);
312
313 if (list_length(&pg->rp_list) == MAX_PORT_GROUP_COUNT) {
314 rc = ERR_TOO_BIG;
315 } else {
316 rp->gport = pg;
317 list_add_tail(&pg->rp_list, &rp->g_node);
318
319 // If the new read port being added has messages available, try to wake
320 // any readers that might be present.
321 if (!buf_is_empty(rp->buf)) {
322 wait_queue_wake_one(&pg->wait, false, NO_ERROR);
323 }
324 }
325
326 THREAD_UNLOCK(state);
327
328 return rc;
329 }
330
port_group_remove(port_t group,port_t port)331 status_t port_group_remove(port_t group, port_t port) {
332 if (!port || !group)
333 return ERR_INVALID_ARGS;
334
335 // Make sure the user has actually passed in a port group and a read-port.
336 port_group_t *pg = (port_group_t *)group;
337 if (pg->magic != PORTGROUP_MAGIC)
338 return ERR_INVALID_ARGS;
339
340 read_port_t *rp = (read_port_t *)port;
341 if (rp->magic != READPORT_MAGIC || rp->gport != pg)
342 return ERR_BAD_HANDLE;
343
344 THREAD_LOCK(state);
345
346 bool found = false;
347 read_port_t *current_rp;
348 list_for_every_entry(&pg->rp_list, current_rp, read_port_t, g_node) {
349 if (current_rp == rp) {
350 found = true;
351 }
352 }
353
354 if (!found)
355 return ERR_BAD_HANDLE;
356
357 list_delete(&rp->g_node);
358
359 THREAD_UNLOCK(state);
360
361 return NO_ERROR;
362 }
363
port_write(port_t port,const port_packet_t * pk,size_t count)364 status_t port_write(port_t port, const port_packet_t *pk, size_t count) {
365 if (!port || !pk)
366 return ERR_INVALID_ARGS;
367
368 write_port_t *wp = (write_port_t *)port;
369 THREAD_LOCK(state);
370 if (wp->magic != WRITEPORT_MAGIC_W) {
371 // wrong port type.
372 THREAD_UNLOCK(state);
373 return ERR_BAD_HANDLE;
374 }
375
376 status_t status = NO_ERROR;
377 int awake_count = 0;
378
379 if (wp->buf) {
380 // there are no read ports, just write to the buffer.
381 status = buf_write(wp->buf, pk, count);
382 } else {
383 // there are read ports. for each, write and attempt to wake a thread
384 // from the port group or from the read port itself.
385 read_port_t *rp;
386 list_for_every_entry(&wp->rp_list, rp, read_port_t, w_node) {
387 if (buf_write(rp->buf, pk, count) < 0) {
388 // buffer full.
389 status = ERR_PARTIAL_WRITE;
390 continue;
391 }
392
393 int awaken = 0;
394 if (rp->gport) {
395 awaken = wait_queue_wake_one(&rp->gport->wait, false, NO_ERROR);
396 }
397 if (!awaken) {
398 awaken = wait_queue_wake_one(&rp->wait, false, NO_ERROR);
399 }
400
401 awake_count += awaken;
402 }
403 }
404
405 THREAD_UNLOCK(state);
406
407 #if RESCHEDULE_POLICY
408 if (awake_count)
409 thread_yield();
410 #endif
411
412 return status;
413 }
414
read_no_lock(read_port_t * rp,lk_time_t timeout,port_result_t * result)415 static inline status_t read_no_lock(read_port_t *rp, lk_time_t timeout, port_result_t *result) {
416 status_t status = buf_read(rp->buf, result);
417 result->ctx = rp->ctx;
418
419 if (status != ERR_NO_MSG)
420 return status;
421
422 // early return allows compiler to elide the rest for the group read case.
423 if (!timeout)
424 return ERR_TIMED_OUT;
425
426 status_t wr = wait_queue_block(&rp->wait, timeout);
427 if (wr != NO_ERROR)
428 return wr;
429 // recursive tail call is usually optimized away with a goto.
430 return read_no_lock(rp, timeout, result);
431 }
432
port_read(port_t port,lk_time_t timeout,port_result_t * result)433 status_t port_read(port_t port, lk_time_t timeout, port_result_t *result) {
434 if (!port || !result)
435 return ERR_INVALID_ARGS;
436
437 status_t rc = ERR_GENERIC;
438 read_port_t *rp = (read_port_t *)port;
439
440 THREAD_LOCK(state);
441 if (rp->magic == READPORT_MAGIC) {
442 // dealing with a single port.
443 rc = read_no_lock(rp, timeout, result);
444 } else if (rp->magic == PORTGROUP_MAGIC) {
445 // dealing with a port group.
446 port_group_t *pg = (port_group_t *)port;
447 do {
448 // read each port with no timeout.
449 // todo: this order is fixed, probably a bad thing.
450 list_for_every_entry(&pg->rp_list, rp, read_port_t, g_node) {
451 rc = read_no_lock(rp, 0, result);
452 if (rc != ERR_TIMED_OUT)
453 goto read_exit;
454 }
455 // no data, block on the group waitqueue.
456 rc = wait_queue_block(&pg->wait, timeout);
457 } while (rc == NO_ERROR);
458 } else {
459 // wrong port type.
460 rc = ERR_BAD_HANDLE;
461 }
462
463 read_exit:
464 THREAD_UNLOCK(state);
465 return rc;
466 }
467
port_destroy(port_t port)468 status_t port_destroy(port_t port) {
469 if (!port)
470 return ERR_INVALID_ARGS;
471
472 write_port_t *wp = (write_port_t *) port;
473 port_buf_t *buf = NULL;
474
475 THREAD_LOCK(state);
476 if (wp->magic != WRITEPORT_MAGIC_X) {
477 // wrong port type.
478 THREAD_UNLOCK(state);
479 return ERR_BAD_HANDLE;
480 }
481 // remove self from global named ports list.
482 list_delete(&wp->node);
483
484 if (wp->buf) {
485 // we have no readers.
486 buf = wp->buf;
487 } else {
488 // for each reader:
489 read_port_t *rp;
490 list_for_every_entry(&wp->rp_list, rp, read_port_t, w_node) {
491 // wake the read and group ports.
492 wait_queue_wake_all(&rp->wait, false, ERR_CANCELLED);
493 if (rp->gport) {
494 wait_queue_wake_all(&rp->gport->wait, false, ERR_CANCELLED);
495 }
496 // remove self from reader ports.
497 rp->wport = NULL;
498 }
499 }
500
501 wp->magic = 0;
502 THREAD_UNLOCK(state);
503
504 free(buf);
505 free(wp);
506 return NO_ERROR;
507 }
508
port_close(port_t port)509 status_t port_close(port_t port) {
510 if (!port)
511 return ERR_INVALID_ARGS;
512
513 read_port_t *rp = (read_port_t *) port;
514 port_buf_t *buf = NULL;
515
516 THREAD_LOCK(state);
517 if (rp->magic == READPORT_MAGIC) {
518 // dealing with a read port.
519 if (rp->wport) {
520 // remove self from write port list and reassign the bufer if last.
521 list_delete(&rp->w_node);
522 if (list_is_empty(&rp->wport->rp_list)) {
523 rp->wport->buf = rp->buf;
524 rp->buf = NULL;
525 } else {
526 buf = rp->buf;
527 }
528 }
529 if (rp->gport) {
530 // remove self from port group list.
531 list_delete(&rp->g_node);
532 }
533 // wake up waiters, the return code is ERR_OBJECT_DESTROYED.
534 wait_queue_destroy(&rp->wait, true);
535 rp->magic = 0;
536
537 } else if (rp->magic == PORTGROUP_MAGIC) {
538 // dealing with a port group.
539 port_group_t *pg = (port_group_t *) port;
540 // wake up waiters.
541 wait_queue_destroy(&pg->wait, true);
542 // remove self from reader ports.
543 rp = NULL;
544 list_for_every_entry(&pg->rp_list, rp, read_port_t, g_node) {
545 rp->gport = NULL;
546 }
547 pg->magic = 0;
548
549 } else if (rp->magic == WRITEPORT_MAGIC_W) {
550 // dealing with a write port.
551 write_port_t *wp = (write_port_t *) port;
552 // mark it as closed. Now it can be read but not written to.
553 wp->magic = WRITEPORT_MAGIC_X;
554 THREAD_UNLOCK(state);
555 return NO_ERROR;
556
557 } else {
558 THREAD_UNLOCK(state);
559 return ERR_BAD_HANDLE;
560 }
561
562 THREAD_UNLOCK(state);
563
564 free(buf);
565 free(port);
566 return NO_ERROR;
567 }
568
569