1 /*
2  * Copyright (C) 2019-2020 Alibaba Group Holding Limited
3  */
4 
5 #include <stdlib.h>
6 #include <stdint.h>
7 #include <string.h>
8 
9 //#include <aos/osal_debug.h>
10 #include <aos/list.h>
11 #include <uservice/uservice.h>
12 #include <uservice/event.h>
13 #include <sys/socket.h>
14 
15 #include "internal.h"
16 
17 #define FD_MAX_STEMP 8
18 
19 static struct event_call {
20     uservice_t  *svr;
21     event_list_t event;
22     dlist_t      timeouts;
23     int          event_id;
24     int          is_event_service_inited;
25     void        *data;
26     aos_task_t   select_task;
27     aos_sem_t    select_sem;
28     aos_event_t  wait_event;
29 } ev_service;
30 
31 struct event_param {
32     uint32_t event_id;
33     union {
34         event_callback_t cb;
35         uint32_t         timeout;
36     };
37     void   *data;
38     dlist_t next;
39 };
40 
41 enum {
42     CMD_SUB_FD_EVENT,
43     CMD_SUB_EVENT,
44     CMD_REMOVE_FD_EVENT,
45     CMD_REMOVE_EVENT,
46     CMD_PUBLISH_EVENT,
47     CMD_PUBLISH_FD_EVENT,
48 };
49 
50 #define EVENT_SUBSCRIBE 0x0000FF00
51 #define EVENT_UNSUBSCRIBE 0x000000FF
52 
53 static void select_task_entry(void *arg);
54 
process_rpc(void * context,rpc_t * rpc)55 static int process_rpc(void *context, rpc_t *rpc)
56 {
57     int                 size = -1;
58     struct event_param *param = (struct event_param *)rpc_get_buffer(rpc, &size);
59 
60     aos_assert(size == sizeof(struct event_param));
61 
62     switch (rpc->cmd_id) {
63     case CMD_SUB_FD_EVENT:
64         eventlist_subscribe_fd(&ev_service.event, param->event_id, param->cb, param->data);
65         aos_sem_signal(&ev_service.select_sem);
66         break;
67 
68     case CMD_REMOVE_FD_EVENT:
69         eventlist_unsubscribe_fd(&ev_service.event, param->event_id, param->cb, param->data);
70         break;
71 
72     case CMD_PUBLISH_FD_EVENT:
73         eventlist_publish_fd(&ev_service.event, param->event_id, param->data);
74         break;
75 
76     case CMD_SUB_EVENT:
77         eventlist_subscribe(&ev_service.event, param->event_id, param->cb, param->data);
78         break;
79 
80     case CMD_REMOVE_EVENT:
81         eventlist_unsubscribe(&ev_service.event, param->event_id, param->cb, param->data);
82         break;
83 
84     case CMD_PUBLISH_EVENT:
85         if (param->timeout > 0) {
86             struct event_param *timer = aos_malloc(sizeof(struct event_param));
87             if (timer == NULL)
88                 break;
89 
90             timer->timeout = aos_now_ms() + param->timeout;
91             timer->event_id = param->event_id;
92             timer->data = param->data;
93 
94             struct event_param *node;
95             dlist_for_each_entry(&ev_service.timeouts, node, struct event_param, next) {
96                 if (timer->timeout < node->timeout)
97                     break;
98             }
99             dlist_add_tail(&timer->next, &node->next);
100             aos_sem_signal(&ev_service.select_sem);
101         } else {
102             eventlist_publish(&ev_service.event, param->event_id, param->data);
103         }
104 
105         break;
106     }
107 
108     rpc_reply(rpc);
109 
110     return 0;
111 }
112 
event_service_init(utask_t * task)113 int event_service_init(utask_t *task)
114 {
115     if (ev_service.is_event_service_inited != 0) {
116         return 0;
117     }
118     ev_service.is_event_service_inited = 1;
119     if (task == NULL)
120         task = utask_new("event_svr", 2*1024, QUEUE_MSG_COUNT * 5, AOS_DEFAULT_APP_PRI);
121 
122     if (task == NULL) {
123         ev_service.is_event_service_inited = 0;
124         return -1;
125     }
126     eventlist_init(&ev_service.event);
127     dlist_init(&ev_service.timeouts);
128     if (aos_sem_new(&ev_service.select_sem, 0) < 0) {
129         utask_destroy(task);
130         ev_service.is_event_service_inited = 0;
131         return -1;
132     }
133     if (aos_event_new(&ev_service.wait_event, 0) < 0) {
134         utask_destroy(task);
135         aos_sem_free(&ev_service.select_sem);
136         ev_service.is_event_service_inited = 0;
137         return -1;
138     }
139 
140     ev_service.svr = uservice_new("event_svr", process_rpc, NULL);
141     if (ev_service.svr == NULL) {
142         utask_destroy(task);
143         aos_sem_free(&ev_service.select_sem);
144         aos_event_free(&ev_service.select_sem);
145         ev_service.is_event_service_inited = 0;
146         return -1;
147     }
148 
149     aos_task_new_ext(&ev_service.select_task, "select", select_task_entry, NULL,
150                      1024, AOS_DEFAULT_APP_PRI);
151     utask_add(task, ev_service.svr);
152 
153     return 0;
154 }
155 
event_call(struct event_param * param,int cmd_id,int sync)156 static void event_call(struct event_param *param, int cmd_id, int sync)
157 {
158     rpc_t rpc;
159 
160     if (rpc_init(&rpc, cmd_id, sync ? AOS_WAIT_FOREVER : 0) == 0) {
161         rpc_put_buffer(&rpc, param, sizeof(struct event_param ));
162         uservice_call(ev_service.svr, &rpc);
163         rpc_deinit(&rpc);
164     }
165 }
166 
event_publish_fd(uint32_t event_id,void * data,int sync)167 void event_publish_fd(uint32_t event_id, void *data, int sync)
168 {
169     struct event_param param;
170 
171     param.event_id = event_id;
172     param.data     = data;
173     param.cb       = NULL;
174 
175     event_call(&param, CMD_PUBLISH_FD_EVENT, sync);
176 }
177 
event_subscribe_fd(uint32_t fd,event_callback_t cb,void * context)178 void event_subscribe_fd(uint32_t fd, event_callback_t cb, void *context)
179 {
180     aos_assert(cb);
181     struct event_param param;
182     param.event_id = fd;
183     param.cb       = cb;
184     param.data     = context;
185 
186     event_call(&param, CMD_SUB_FD_EVENT, 0);
187 }
188 
event_unsubscribe_fd(uint32_t event_id,event_callback_t cb,void * context)189 void event_unsubscribe_fd(uint32_t event_id, event_callback_t cb, void *context)
190 {
191     struct event_param param;
192     param.event_id = event_id;
193     param.cb       = cb;
194     param.data     = context;
195 
196     event_call(&param, CMD_REMOVE_FD_EVENT, 0);
197 }
198 
event_publish(uint32_t event_id,void * data)199 void event_publish(uint32_t event_id, void *data)
200 {
201     struct event_param param;
202 
203     param.event_id = event_id;
204     param.data     = data;
205     param.timeout  = 0;
206 
207     event_call(&param, CMD_PUBLISH_EVENT, 0);
208 }
209 
event_publish_delay(uint32_t event_id,void * data,int timeout)210 void event_publish_delay(uint32_t event_id, void *data, int timeout)
211 {
212     struct event_param param;
213     param.event_id = event_id;
214     param.data     = data;
215     param.timeout  = timeout;
216 
217     event_call(&param, CMD_PUBLISH_EVENT, 0);
218 }
219 
event_subscribe(uint32_t event_id,event_callback_t cb,void * context)220 void event_subscribe(uint32_t event_id, event_callback_t cb, void *context)
221 {
222     aos_assert(cb);
223     struct event_param param;
224     param.event_id = event_id;
225     param.cb       = cb;
226     param.data     = context;
227 
228     event_call(&param, CMD_SUB_EVENT, 0);
229 }
230 
event_unsubscribe(uint32_t event_id,event_callback_t cb,void * context)231 void event_unsubscribe(uint32_t event_id, event_callback_t cb, void *context)
232 {
233     aos_assert(cb);
234     struct event_param param;
235 
236     param.event_id = event_id;
237     param.cb       = cb;
238     param.data     = context;
239 
240     event_call(&param, CMD_REMOVE_EVENT, 0);
241 }
242 
do_time_event()243 static int do_time_event()
244 {
245     int delayed_ms = -1;
246 
247     struct event_param *node;
248     dlist_t            *tmp;
249 
250     uservice_lock(ev_service.svr);
251     dlist_for_each_entry_safe(&ev_service.timeouts, tmp, node, struct event_param, next) {
252         long long now = aos_now_ms();
253 
254         if (now >= node->timeout) {
255             event_publish(node->event_id, node->data);
256             dlist_del(&node->next);
257             aos_free(node);
258         } else {
259             delayed_ms = node->timeout - now;
260             break;
261         }
262     }
263     uservice_unlock(ev_service.svr);
264 
265     return delayed_ms;
266 }
267 
268 #define SELECT_TIMEOUT (10)
select_task_entry(void * arg)269 static void select_task_entry(void *arg)
270 {
271 #ifdef CONFIG_LWIP_V200
272     aos_task_exit(0);
273 #else
274     event_list_t *evlist = &ev_service.event;
275     utask_t *task = ev_service.svr->task;
276 
277 #if defined(CONFIG_SAL) || defined(CONFIG_TCPIP)
278     extern void sys_init(void);
279     sys_init();
280 #endif
281 
282     while (1) {
283         fd_set         readfds;
284         struct timeval timeout;
285 
286         int time_ms = do_time_event();
287 
288         int max_fd = eventlist_setfd(evlist, &readfds);
289 
290         timeout.tv_sec   = time_ms / 1000;
291         timeout.tv_usec  = (time_ms % 1000) * 1000;
292         int ret = select2(max_fd + 1, &readfds, NULL, NULL, time_ms == -1 ? NULL : &timeout, &ev_service.select_sem);
293         if (ret > 0) {
294             for (int fd = 0; fd <= max_fd; fd++) {
295                 if (FD_ISSET(fd, &readfds)) {
296                     if(aos_queue_get_count(&task->queue) < (task->queue_count*3/4)) {
297                         event_publish_fd(fd, NULL, 1);
298                         // eventlist_remove_fd(evlist, fd);
299                     }
300                 }
301             }
302         }
303     }
304     aos_task_exit(0);
305 #endif
306 }
307