1 /*
2 * Copyright (C) 2019-2020 Alibaba Group Holding Limited
3 */
4
5 #include <stdlib.h>
6 #include <stdint.h>
7 #include <string.h>
8
9 //#include <aos/osal_debug.h>
10 #include <aos/list.h>
11 #include <uservice/uservice.h>
12 #include <uservice/event.h>
13 #include <sys/socket.h>
14
15 #include "internal.h"
16
17 #define FD_MAX_STEMP 8
18
19 static struct event_call {
20 uservice_t *svr;
21 event_list_t event;
22 dlist_t timeouts;
23 int event_id;
24 int is_event_service_inited;
25 void *data;
26 aos_task_t select_task;
27 aos_sem_t select_sem;
28 aos_event_t wait_event;
29 } ev_service;
30
31 struct event_param {
32 uint32_t event_id;
33 union {
34 event_callback_t cb;
35 uint32_t timeout;
36 };
37 void *data;
38 dlist_t next;
39 };
40
41 enum {
42 CMD_SUB_FD_EVENT,
43 CMD_SUB_EVENT,
44 CMD_REMOVE_FD_EVENT,
45 CMD_REMOVE_EVENT,
46 CMD_PUBLISH_EVENT,
47 CMD_PUBLISH_FD_EVENT,
48 };
49
50 #define EVENT_SUBSCRIBE 0x0000FF00
51 #define EVENT_UNSUBSCRIBE 0x000000FF
52
53 static void select_task_entry(void *arg);
54
process_rpc(void * context,rpc_t * rpc)55 static int process_rpc(void *context, rpc_t *rpc)
56 {
57 int size = -1;
58 struct event_param *param = (struct event_param *)rpc_get_buffer(rpc, &size);
59
60 aos_assert(size == sizeof(struct event_param));
61
62 switch (rpc->cmd_id) {
63 case CMD_SUB_FD_EVENT:
64 eventlist_subscribe_fd(&ev_service.event, param->event_id, param->cb, param->data);
65 aos_sem_signal(&ev_service.select_sem);
66 break;
67
68 case CMD_REMOVE_FD_EVENT:
69 eventlist_unsubscribe_fd(&ev_service.event, param->event_id, param->cb, param->data);
70 break;
71
72 case CMD_PUBLISH_FD_EVENT:
73 eventlist_publish_fd(&ev_service.event, param->event_id, param->data);
74 break;
75
76 case CMD_SUB_EVENT:
77 eventlist_subscribe(&ev_service.event, param->event_id, param->cb, param->data);
78 break;
79
80 case CMD_REMOVE_EVENT:
81 eventlist_unsubscribe(&ev_service.event, param->event_id, param->cb, param->data);
82 break;
83
84 case CMD_PUBLISH_EVENT:
85 if (param->timeout > 0) {
86 struct event_param *timer = aos_malloc(sizeof(struct event_param));
87 if (timer == NULL)
88 break;
89
90 timer->timeout = aos_now_ms() + param->timeout;
91 timer->event_id = param->event_id;
92 timer->data = param->data;
93
94 struct event_param *node;
95 dlist_for_each_entry(&ev_service.timeouts, node, struct event_param, next) {
96 if (timer->timeout < node->timeout)
97 break;
98 }
99 dlist_add_tail(&timer->next, &node->next);
100 aos_sem_signal(&ev_service.select_sem);
101 } else {
102 eventlist_publish(&ev_service.event, param->event_id, param->data);
103 }
104
105 break;
106 }
107
108 rpc_reply(rpc);
109
110 return 0;
111 }
112
event_service_init(utask_t * task)113 int event_service_init(utask_t *task)
114 {
115 if (ev_service.is_event_service_inited != 0) {
116 return 0;
117 }
118 ev_service.is_event_service_inited = 1;
119 if (task == NULL)
120 task = utask_new("event_svr", 2*1024, QUEUE_MSG_COUNT * 5, AOS_DEFAULT_APP_PRI);
121
122 if (task == NULL) {
123 ev_service.is_event_service_inited = 0;
124 return -1;
125 }
126 eventlist_init(&ev_service.event);
127 dlist_init(&ev_service.timeouts);
128 if (aos_sem_new(&ev_service.select_sem, 0) < 0) {
129 utask_destroy(task);
130 ev_service.is_event_service_inited = 0;
131 return -1;
132 }
133 if (aos_event_new(&ev_service.wait_event, 0) < 0) {
134 utask_destroy(task);
135 aos_sem_free(&ev_service.select_sem);
136 ev_service.is_event_service_inited = 0;
137 return -1;
138 }
139
140 ev_service.svr = uservice_new("event_svr", process_rpc, NULL);
141 if (ev_service.svr == NULL) {
142 utask_destroy(task);
143 aos_sem_free(&ev_service.select_sem);
144 aos_event_free(&ev_service.select_sem);
145 ev_service.is_event_service_inited = 0;
146 return -1;
147 }
148
149 aos_task_new_ext(&ev_service.select_task, "select", select_task_entry, NULL,
150 1024, AOS_DEFAULT_APP_PRI);
151 utask_add(task, ev_service.svr);
152
153 return 0;
154 }
155
event_call(struct event_param * param,int cmd_id,int sync)156 static void event_call(struct event_param *param, int cmd_id, int sync)
157 {
158 rpc_t rpc;
159
160 if (rpc_init(&rpc, cmd_id, sync ? AOS_WAIT_FOREVER : 0) == 0) {
161 rpc_put_buffer(&rpc, param, sizeof(struct event_param ));
162 uservice_call(ev_service.svr, &rpc);
163 rpc_deinit(&rpc);
164 }
165 }
166
event_publish_fd(uint32_t event_id,void * data,int sync)167 void event_publish_fd(uint32_t event_id, void *data, int sync)
168 {
169 struct event_param param;
170
171 param.event_id = event_id;
172 param.data = data;
173 param.cb = NULL;
174
175 event_call(¶m, CMD_PUBLISH_FD_EVENT, sync);
176 }
177
event_subscribe_fd(uint32_t fd,event_callback_t cb,void * context)178 void event_subscribe_fd(uint32_t fd, event_callback_t cb, void *context)
179 {
180 aos_assert(cb);
181 struct event_param param;
182 param.event_id = fd;
183 param.cb = cb;
184 param.data = context;
185
186 event_call(¶m, CMD_SUB_FD_EVENT, 0);
187 }
188
event_unsubscribe_fd(uint32_t event_id,event_callback_t cb,void * context)189 void event_unsubscribe_fd(uint32_t event_id, event_callback_t cb, void *context)
190 {
191 struct event_param param;
192 param.event_id = event_id;
193 param.cb = cb;
194 param.data = context;
195
196 event_call(¶m, CMD_REMOVE_FD_EVENT, 0);
197 }
198
event_publish(uint32_t event_id,void * data)199 void event_publish(uint32_t event_id, void *data)
200 {
201 struct event_param param;
202
203 param.event_id = event_id;
204 param.data = data;
205 param.timeout = 0;
206
207 event_call(¶m, CMD_PUBLISH_EVENT, 0);
208 }
209
event_publish_delay(uint32_t event_id,void * data,int timeout)210 void event_publish_delay(uint32_t event_id, void *data, int timeout)
211 {
212 struct event_param param;
213 param.event_id = event_id;
214 param.data = data;
215 param.timeout = timeout;
216
217 event_call(¶m, CMD_PUBLISH_EVENT, 0);
218 }
219
event_subscribe(uint32_t event_id,event_callback_t cb,void * context)220 void event_subscribe(uint32_t event_id, event_callback_t cb, void *context)
221 {
222 aos_assert(cb);
223 struct event_param param;
224 param.event_id = event_id;
225 param.cb = cb;
226 param.data = context;
227
228 event_call(¶m, CMD_SUB_EVENT, 0);
229 }
230
event_unsubscribe(uint32_t event_id,event_callback_t cb,void * context)231 void event_unsubscribe(uint32_t event_id, event_callback_t cb, void *context)
232 {
233 aos_assert(cb);
234 struct event_param param;
235
236 param.event_id = event_id;
237 param.cb = cb;
238 param.data = context;
239
240 event_call(¶m, CMD_REMOVE_EVENT, 0);
241 }
242
do_time_event()243 static int do_time_event()
244 {
245 int delayed_ms = -1;
246
247 struct event_param *node;
248 dlist_t *tmp;
249
250 uservice_lock(ev_service.svr);
251 dlist_for_each_entry_safe(&ev_service.timeouts, tmp, node, struct event_param, next) {
252 long long now = aos_now_ms();
253
254 if (now >= node->timeout) {
255 event_publish(node->event_id, node->data);
256 dlist_del(&node->next);
257 aos_free(node);
258 } else {
259 delayed_ms = node->timeout - now;
260 break;
261 }
262 }
263 uservice_unlock(ev_service.svr);
264
265 return delayed_ms;
266 }
267
268 #define SELECT_TIMEOUT (10)
select_task_entry(void * arg)269 static void select_task_entry(void *arg)
270 {
271 #ifdef CONFIG_LWIP_V200
272 aos_task_exit(0);
273 #else
274 event_list_t *evlist = &ev_service.event;
275 utask_t *task = ev_service.svr->task;
276
277 #if defined(CONFIG_SAL) || defined(CONFIG_TCPIP)
278 extern void sys_init(void);
279 sys_init();
280 #endif
281
282 while (1) {
283 fd_set readfds;
284 struct timeval timeout;
285
286 int time_ms = do_time_event();
287
288 int max_fd = eventlist_setfd(evlist, &readfds);
289
290 timeout.tv_sec = time_ms / 1000;
291 timeout.tv_usec = (time_ms % 1000) * 1000;
292 int ret = select2(max_fd + 1, &readfds, NULL, NULL, time_ms == -1 ? NULL : &timeout, &ev_service.select_sem);
293 if (ret > 0) {
294 for (int fd = 0; fd <= max_fd; fd++) {
295 if (FD_ISSET(fd, &readfds)) {
296 if(aos_queue_get_count(&task->queue) < (task->queue_count*3/4)) {
297 event_publish_fd(fd, NULL, 1);
298 // eventlist_remove_fd(evlist, fd);
299 }
300 }
301 }
302 }
303 }
304 aos_task_exit(0);
305 #endif
306 }
307