1 /*
2  * Copyright (c) 2015 Carlos Pizano-Uribe  cpu@chromium.org
3  *
4  * Use of this source code is governed by a MIT-style
5  * license that can be found in the LICENSE file or at
6  * https://opensource.org/licenses/MIT
7  */
8 
9 /**
10  * @file
11  * @brief  Port object functions
12  * @defgroup event Events
13  *
14  */
15 
16 #include <kernel/port.h>
17 
18 #include <kernel/thread.h>
19 #include <lk/debug.h>
20 #include <lk/err.h>
21 #include <lk/list.h>
22 #include <lk/pow2.h>
23 #include <malloc.h>
24 #include <string.h>
25 
26 // write ports can be in two states, open and closed, which have a
27 // different magic number.
28 
29 #define WRITEPORT_MAGIC_W (0x70727477) // 'prtw'
30 #define WRITEPORT_MAGIC_X (0x70727478) // 'prtx'
31 
32 #define READPORT_MAGIC  (0x70727472)  // 'prtr'
33 #define PORTGROUP_MAGIC (0x70727467)  // 'prtg'
34 
35 #define PORT_BUFF_SIZE      8
36 #define PORT_BUFF_SIZE_BIG 64
37 
38 #define RESCHEDULE_POLICY 1
39 
40 #define MAX_PORT_GROUP_COUNT 256
41 
42 typedef struct {
43     uint log2;
44     uint avail;
45     uint head;
46     uint tail;
47     port_packet_t packet[1];
48 } port_buf_t;
49 
50 typedef struct {
51     int magic;
52     struct list_node node;
53     port_buf_t *buf;
54     struct list_node rp_list;
55     port_mode_t mode;
56     char name[PORT_NAME_LEN];
57 } write_port_t;
58 
59 typedef struct {
60     int magic;
61     wait_queue_t wait;
62     struct list_node rp_list;
63 } port_group_t;
64 
65 typedef struct {
66     int magic;
67     struct list_node w_node;
68     struct list_node g_node;
69     port_buf_t *buf;
70     void *ctx;
71     wait_queue_t wait;
72     write_port_t *wport;
73     port_group_t *gport;
74 } read_port_t;
75 
76 
77 static struct list_node write_port_list;
78 
79 
make_buf(bool big)80 static port_buf_t *make_buf(bool big) {
81     uint pk_count = big ? PORT_BUFF_SIZE_BIG : PORT_BUFF_SIZE;
82     uint size = sizeof(port_buf_t) + ((pk_count - 1) * sizeof(port_packet_t));
83     port_buf_t *buf = (port_buf_t *) malloc(size);
84     if (!buf)
85         return NULL;
86     buf->log2 = log2_uint(pk_count);
87     buf->head = buf->tail = 0;
88     buf->avail = pk_count;
89     return buf;
90 }
91 
buf_is_empty(port_buf_t * buf)92 static inline bool buf_is_empty(port_buf_t *buf) {
93     return buf->avail == valpow2(buf->log2);
94 }
95 
buf_write(port_buf_t * buf,const port_packet_t * packets,size_t count)96 static status_t buf_write(port_buf_t *buf, const port_packet_t *packets, size_t count) {
97     if (buf->avail < count)
98         return ERR_NOT_ENOUGH_BUFFER;
99 
100     for (size_t ix = 0; ix != count; ix++) {
101         buf->packet[buf->tail] = packets[ix];
102         buf->tail = modpow2(++buf->tail, buf->log2);
103     }
104     buf->avail -= count;
105     return NO_ERROR;
106 }
107 
buf_read(port_buf_t * buf,port_result_t * pr)108 static status_t buf_read(port_buf_t *buf, port_result_t *pr) {
109     if (buf_is_empty(buf))
110         return ERR_NO_MSG;
111     pr->packet = buf->packet[buf->head];
112     buf->head = modpow2(++buf->head, buf->log2);
113     ++buf->avail;
114     return NO_ERROR;
115 }
116 
117 // must be called before any use of ports.
port_init(void)118 void port_init(void) {
119     list_initialize(&write_port_list);
120 }
121 
port_create(const char * name,port_mode_t mode,port_t * port)122 status_t port_create(const char *name, port_mode_t mode, port_t *port) {
123     if (!name || !port)
124         return ERR_INVALID_ARGS;
125 
126     // only unicast ports can have a large buffer.
127     if (mode & PORT_MODE_BROADCAST) {
128         if (mode & PORT_MODE_BIG_BUFFER)
129             return ERR_INVALID_ARGS;
130     }
131 
132     if (strlen(name) >= PORT_NAME_LEN)
133         return ERR_INVALID_ARGS;
134 
135     // lookup for existing port, return that if found.
136     write_port_t *wp = NULL;
137     THREAD_LOCK(state1);
138     list_for_every_entry(&write_port_list, wp, write_port_t, node) {
139         if (strcmp(wp->name, name) == 0) {
140             // can't return closed ports.
141             if (wp->magic == WRITEPORT_MAGIC_X)
142                 wp = NULL;
143             THREAD_UNLOCK(state1);
144             if (wp) {
145                 *port = (void *) wp;
146                 return ERR_ALREADY_EXISTS;
147             } else {
148                 return ERR_BUSY;
149             }
150         }
151     }
152     THREAD_UNLOCK(state1);
153 
154     // not found, create the write port and the circular buffer.
155     wp = calloc(1, sizeof(write_port_t));
156     if (!wp)
157         return ERR_NO_MEMORY;
158 
159     wp->magic = WRITEPORT_MAGIC_W;
160     wp->mode = mode;
161     strlcpy(wp->name, name, sizeof(wp->name));
162     list_initialize(&wp->rp_list);
163 
164     wp->buf = make_buf(mode & PORT_MODE_BIG_BUFFER);
165     if (!wp->buf) {
166         free(wp);
167         return ERR_NO_MEMORY;
168     }
169 
170     // todo: race condtion! a port with the same name could have been created
171     // by another thread at is point.
172     THREAD_LOCK(state2);
173     list_add_tail(&write_port_list, &wp->node);
174     THREAD_UNLOCK(state2);
175 
176     *port = (void *)wp;
177     return NO_ERROR;
178 }
179 
port_open(const char * name,void * ctx,port_t * port)180 status_t port_open(const char *name, void *ctx, port_t *port) {
181     if (!name || !port)
182         return ERR_INVALID_ARGS;
183 
184     // assume success; create the read port and buffer now.
185     read_port_t *rp = calloc(1, sizeof(read_port_t));
186     if (!rp)
187         return ERR_NO_MEMORY;
188 
189     rp->magic = READPORT_MAGIC;
190     wait_queue_init(&rp->wait);
191     rp->ctx = ctx;
192 
193     // |buf| might not be needed, but we always allocate outside the lock.
194     // this buffer is only needed for broadcast ports, but we don't know
195     // that here.
196     port_buf_t *buf = make_buf(false);  // Small is enough.
197     if (!buf) {
198         free(rp);
199         return ERR_NO_MEMORY;
200     }
201 
202     // find the named write port and associate it with read port.
203     status_t rc = ERR_NOT_FOUND;
204 
205     THREAD_LOCK(state);
206     write_port_t *wp = NULL;
207     list_for_every_entry(&write_port_list, wp, write_port_t, node) {
208         if (strcmp(wp->name, name) == 0) {
209             // found; add read port to write port list.
210             rp->wport = wp;
211             if (wp->buf) {
212                 // this is the first read port; transfer the circular buffer.
213                 list_add_tail(&wp->rp_list, &rp->w_node);
214                 rp->buf = wp->buf;
215                 wp->buf = NULL;
216                 rc = NO_ERROR;
217             } else if (buf) {
218                 // not first read port.
219                 if (wp->mode & PORT_MODE_UNICAST) {
220                     // cannot add a second listener.
221                     rc = ERR_NOT_ALLOWED;
222                     break;
223                 }
224                 // use the new (small) circular buffer.
225                 list_add_tail(&wp->rp_list, &rp->w_node);
226                 rp->buf = buf;
227                 buf = NULL;
228                 rc = NO_ERROR;
229             } else {
230                 // |buf| allocation failed and the buffer was needed.
231                 rc = ERR_NO_MEMORY;
232             }
233             break;
234         }
235     }
236     THREAD_UNLOCK(state);
237 
238     if (buf)
239         free(buf);
240 
241     if (rc == NO_ERROR) {
242         *port = (void *)rp;
243     } else {
244         free(rp);
245     }
246     return rc;
247 }
248 
port_group(port_t * ports,size_t count,port_t * group)249 status_t port_group(port_t *ports, size_t count, port_t *group) {
250     if (count > MAX_PORT_GROUP_COUNT)
251         return ERR_TOO_BIG;
252 
253     // Allow empty port groups.
254     if (count && !ports)
255         return ERR_INVALID_ARGS;
256 
257     if (!group)
258         return ERR_INVALID_ARGS;
259 
260     // assume success; create port group now.
261     port_group_t *pg = calloc(1, sizeof(port_group_t));
262     if (!pg)
263         return ERR_NO_MEMORY;
264 
265     pg->magic = PORTGROUP_MAGIC;
266     wait_queue_init(&pg->wait);
267     list_initialize(&pg->rp_list);
268 
269     status_t rc = NO_ERROR;
270 
271     THREAD_LOCK(state);
272     for (size_t ix = 0; ix != count; ix++) {
273         read_port_t *rp = (read_port_t *)ports[ix];
274         if ((rp->magic != READPORT_MAGIC) || rp->gport) {
275             // wrong type of port, or port already part of a group,
276             // in any case, undo the changes to the previous read ports.
277             for (size_t jx = 0; jx != ix; jx++) {
278                 ((read_port_t *)ports[jx])->gport = NULL;
279             }
280             rc = ERR_BAD_HANDLE;
281             break;
282         }
283         // link port group and read port.
284         rp->gport = pg;
285         list_add_tail(&pg->rp_list, &rp->g_node);
286     }
287     THREAD_UNLOCK(state);
288 
289     if (rc == NO_ERROR) {
290         *group = (port_t *)pg;
291     } else {
292         free(pg);
293     }
294     return rc;
295 }
296 
port_group_add(port_t group,port_t port)297 status_t port_group_add(port_t group, port_t port) {
298     if (!port || !group)
299         return ERR_INVALID_ARGS;
300 
301     // Make sure the user has actually passed in a port group and a read-port.
302     port_group_t *pg = (port_group_t *)group;
303     if (pg->magic != PORTGROUP_MAGIC)
304         return ERR_INVALID_ARGS;
305 
306     read_port_t *rp = (read_port_t *)port;
307     if (rp->magic != READPORT_MAGIC || rp->gport)
308         return ERR_BAD_HANDLE;
309 
310     status_t rc = NO_ERROR;
311     THREAD_LOCK(state);
312 
313     if (list_length(&pg->rp_list) == MAX_PORT_GROUP_COUNT) {
314         rc = ERR_TOO_BIG;
315     } else {
316         rp->gport = pg;
317         list_add_tail(&pg->rp_list, &rp->g_node);
318 
319         // If the new read port being added has messages available, try to wake
320         // any readers that might be present.
321         if (!buf_is_empty(rp->buf)) {
322             wait_queue_wake_one(&pg->wait, false, NO_ERROR);
323         }
324     }
325 
326     THREAD_UNLOCK(state);
327 
328     return rc;
329 }
330 
port_group_remove(port_t group,port_t port)331 status_t port_group_remove(port_t group, port_t port) {
332     if (!port || !group)
333         return ERR_INVALID_ARGS;
334 
335     // Make sure the user has actually passed in a port group and a read-port.
336     port_group_t *pg = (port_group_t *)group;
337     if (pg->magic != PORTGROUP_MAGIC)
338         return ERR_INVALID_ARGS;
339 
340     read_port_t *rp = (read_port_t *)port;
341     if (rp->magic != READPORT_MAGIC || rp->gport != pg)
342         return ERR_BAD_HANDLE;
343 
344     THREAD_LOCK(state);
345 
346     bool found = false;
347     read_port_t *current_rp;
348     list_for_every_entry(&pg->rp_list, current_rp, read_port_t, g_node) {
349         if (current_rp == rp) {
350             found = true;
351         }
352     }
353 
354     if (!found)
355         return ERR_BAD_HANDLE;
356 
357     list_delete(&rp->g_node);
358 
359     THREAD_UNLOCK(state);
360 
361     return NO_ERROR;
362 }
363 
port_write(port_t port,const port_packet_t * pk,size_t count)364 status_t port_write(port_t port, const port_packet_t *pk, size_t count) {
365     if (!port || !pk)
366         return ERR_INVALID_ARGS;
367 
368     write_port_t *wp = (write_port_t *)port;
369     THREAD_LOCK(state);
370     if (wp->magic != WRITEPORT_MAGIC_W) {
371         // wrong port type.
372         THREAD_UNLOCK(state);
373         return ERR_BAD_HANDLE;
374     }
375 
376     status_t status = NO_ERROR;
377     int awake_count = 0;
378 
379     if (wp->buf) {
380         // there are no read ports, just write to the buffer.
381         status = buf_write(wp->buf, pk, count);
382     } else {
383         // there are read ports. for each, write and attempt to wake a thread
384         // from the port group or from the read port itself.
385         read_port_t *rp;
386         list_for_every_entry(&wp->rp_list, rp, read_port_t, w_node) {
387             if (buf_write(rp->buf, pk, count) < 0) {
388                 // buffer full.
389                 status = ERR_PARTIAL_WRITE;
390                 continue;
391             }
392 
393             int awaken = 0;
394             if (rp->gport) {
395                 awaken = wait_queue_wake_one(&rp->gport->wait, false, NO_ERROR);
396             }
397             if (!awaken) {
398                 awaken = wait_queue_wake_one(&rp->wait, false, NO_ERROR);
399             }
400 
401             awake_count += awaken;
402         }
403     }
404 
405     THREAD_UNLOCK(state);
406 
407 #if RESCHEDULE_POLICY
408     if (awake_count)
409         thread_yield();
410 #endif
411 
412     return status;
413 }
414 
read_no_lock(read_port_t * rp,lk_time_t timeout,port_result_t * result)415 static inline status_t read_no_lock(read_port_t *rp, lk_time_t timeout, port_result_t *result) {
416     status_t status = buf_read(rp->buf, result);
417     result->ctx = rp->ctx;
418 
419     if (status != ERR_NO_MSG)
420         return status;
421 
422     // early return allows compiler to elide the rest for the group read case.
423     if (!timeout)
424         return ERR_TIMED_OUT;
425 
426     status_t wr = wait_queue_block(&rp->wait, timeout);
427     if (wr != NO_ERROR)
428         return wr;
429     // recursive tail call is usually optimized away with a goto.
430     return read_no_lock(rp, timeout, result);
431 }
432 
port_read(port_t port,lk_time_t timeout,port_result_t * result)433 status_t port_read(port_t port, lk_time_t timeout, port_result_t *result) {
434     if (!port || !result)
435         return ERR_INVALID_ARGS;
436 
437     status_t rc = ERR_GENERIC;
438     read_port_t *rp = (read_port_t *)port;
439 
440     THREAD_LOCK(state);
441     if (rp->magic == READPORT_MAGIC) {
442         // dealing with a single port.
443         rc = read_no_lock(rp, timeout, result);
444     } else if (rp->magic == PORTGROUP_MAGIC) {
445         // dealing with a port group.
446         port_group_t *pg = (port_group_t *)port;
447         do {
448             // read each port with no timeout.
449             // todo: this order is fixed, probably a bad thing.
450             list_for_every_entry(&pg->rp_list, rp, read_port_t, g_node) {
451                 rc = read_no_lock(rp, 0, result);
452                 if (rc != ERR_TIMED_OUT)
453                     goto read_exit;
454             }
455             // no data, block on the group waitqueue.
456             rc = wait_queue_block(&pg->wait, timeout);
457         } while (rc == NO_ERROR);
458     } else {
459         // wrong port type.
460         rc = ERR_BAD_HANDLE;
461     }
462 
463 read_exit:
464     THREAD_UNLOCK(state);
465     return rc;
466 }
467 
port_destroy(port_t port)468 status_t port_destroy(port_t port) {
469     if (!port)
470         return ERR_INVALID_ARGS;
471 
472     write_port_t *wp = (write_port_t *) port;
473     port_buf_t *buf = NULL;
474 
475     THREAD_LOCK(state);
476     if (wp->magic != WRITEPORT_MAGIC_X) {
477         // wrong port type.
478         THREAD_UNLOCK(state);
479         return ERR_BAD_HANDLE;
480     }
481     // remove self from global named ports list.
482     list_delete(&wp->node);
483 
484     if (wp->buf) {
485         // we have no readers.
486         buf = wp->buf;
487     } else {
488         // for each reader:
489         read_port_t *rp;
490         list_for_every_entry(&wp->rp_list, rp, read_port_t, w_node) {
491             // wake the read and group ports.
492             wait_queue_wake_all(&rp->wait, false, ERR_CANCELLED);
493             if (rp->gport) {
494                 wait_queue_wake_all(&rp->gport->wait, false, ERR_CANCELLED);
495             }
496             // remove self from reader ports.
497             rp->wport = NULL;
498         }
499     }
500 
501     wp->magic = 0;
502     THREAD_UNLOCK(state);
503 
504     free(buf);
505     free(wp);
506     return NO_ERROR;
507 }
508 
port_close(port_t port)509 status_t port_close(port_t port) {
510     if (!port)
511         return ERR_INVALID_ARGS;
512 
513     read_port_t *rp = (read_port_t *) port;
514     port_buf_t *buf = NULL;
515 
516     THREAD_LOCK(state);
517     if (rp->magic == READPORT_MAGIC) {
518         // dealing with a read port.
519         if (rp->wport) {
520             // remove self from write port list and reassign the bufer if last.
521             list_delete(&rp->w_node);
522             if (list_is_empty(&rp->wport->rp_list)) {
523                 rp->wport->buf = rp->buf;
524                 rp->buf = NULL;
525             } else {
526                 buf = rp->buf;
527             }
528         }
529         if (rp->gport) {
530             // remove self from port group list.
531             list_delete(&rp->g_node);
532         }
533         // wake up waiters, the return code is ERR_OBJECT_DESTROYED.
534         wait_queue_destroy(&rp->wait, true);
535         rp->magic = 0;
536 
537     } else if (rp->magic == PORTGROUP_MAGIC) {
538         // dealing with a port group.
539         port_group_t *pg = (port_group_t *) port;
540         // wake up waiters.
541         wait_queue_destroy(&pg->wait, true);
542         // remove self from reader ports.
543         rp = NULL;
544         list_for_every_entry(&pg->rp_list, rp, read_port_t, g_node) {
545             rp->gport = NULL;
546         }
547         pg->magic = 0;
548 
549     } else if (rp->magic == WRITEPORT_MAGIC_W) {
550         // dealing with a write port.
551         write_port_t *wp = (write_port_t *) port;
552         // mark it as closed. Now it can be read but not written to.
553         wp->magic = WRITEPORT_MAGIC_X;
554         THREAD_UNLOCK(state);
555         return NO_ERROR;
556 
557     } else {
558         THREAD_UNLOCK(state);
559         return ERR_BAD_HANDLE;
560     }
561 
562     THREAD_UNLOCK(state);
563 
564     free(buf);
565     free(port);
566     return NO_ERROR;
567 }
568 
569