1 /*
2 * Copyright (C) 2019-2020 Alibaba Group Holding Limited
3 */
4
5 #include <stdint.h>
6 #include <string.h>
7
8 #include <aos/kernel.h>
9 //#include <aos/osal_debug.h>
10
11 #include <uservice/uservice.h>
12
13 #include "internal.h"
14
15 #define TAG "utask"
16
17 static int g_utask_softwdt_timeout = 0;
18 static AOS_SLIST_HEAD(utask_list);
19
20 #define LOOP_TIME_MS 1000
21 #define MIN_SOFTWDT_TIME 2000 /*ms*/
22
23 #if 0
24 static void task_will(void *args)
25 {
26 utask_t *task = (utask_t*)args;
27
28 LOGE(TAG, "utask: %s crash!", aos_task_get_name(&task->task));
29 if (task->current_rpc) {
30 LOGE(TAG, "uservice_name: %s, rpc_cmd: %d", task->current_rpc->srv->name, task->current_rpc->cmd_id);
31 }
32 }
33 #endif
34
utask_entry(void * data)35 static void utask_entry(void *data)
36 {
37 utask_t *task = (utask_t *)data;
38
39 rpc_t rpc;
40 size_t size;
41
42 while (task->running) {
43 if (aos_queue_recv(&task->queue, LOOP_TIME_MS, &rpc, &size) == 0) {
44
45 #if defined(CONFIG_DEBUG) && defined(CONFIG_DEBUG_UTASK)
46 int count = aos_queue_get_count(&task->queue) + 1;
47 if (count > task->queue_max_used)
48 task->queue_max_used = count;
49
50 struct rpc_record *node;
51 int found = 0;
52 slist_for_each_entry(&task->rpc_reclist, node, struct rpc_record, next) {
53 if (node->cmd_id == rpc.cmd_id && node->srv == rpc.srv) {
54 node->count++;
55 found = 1;
56 break;
57 }
58 }
59
60 if (found == 0) {
61 node = aos_zalloc(sizeof(struct rpc_record));
62 node->cmd_id = rpc.cmd_id;
63 node->srv = rpc.srv;
64 node->count = 1;
65 slist_add(&node->next, &task->rpc_reclist);
66 }
67 #endif
68 if (rpc.srv->process_rpc) {
69 task->current_rpc = &rpc;
70
71 if (g_utask_softwdt_timeout >= MIN_SOFTWDT_TIME) {
72 //aos_task_wdt_attach(task_will, task);
73 //aos_task_wdt_feed(g_utask_softwdt_timeout);
74 // simulaton timeout
75 // sleep(11);
76 //uservice_lock(rpc.srv);
77 rpc.srv->process_rpc(rpc.srv->context, &rpc);
78 //uservice_unlock(rpc.srv);
79 //aos_task_wdt_detach();
80 } else {
81 //uservice_lock(rpc.srv);
82 rpc.srv->process_rpc(rpc.srv->context, &rpc);
83 //uservice_unlock(rpc.srv);
84 }
85 task->current_rpc = NULL;
86 } else {
87 rpc_reply(&rpc);
88 }
89 }
90 }
91
92 aos_sem_signal(&task->running_wait);
93 }
94
utask_new(const char * name,size_t stack_size,int queue_count,int prio)95 utask_t *utask_new(const char *name, size_t stack_size, int queue_count, int prio)
96 {
97 if (stack_size <= 0 || queue_count <= 0)
98 return NULL;
99 int queue_buffer_size = queue_count * sizeof(rpc_t);
100
101 utask_t *task = aos_zalloc(sizeof(utask_t) + queue_buffer_size);
102
103 if (task == NULL)
104 return NULL;
105
106 task->running = 1;
107 slist_init(&task->uservice_lists);
108 slist_init(&task->rpc_buffer_gc_cache);
109 slist_add_tail(&task->node, &utask_list);
110
111 task->queue_count = queue_count;
112 task->queue_buffer = (uint8_t*)task + sizeof(utask_t);
113
114 if (aos_queue_new(&task->queue, task->queue_buffer, queue_buffer_size, sizeof(rpc_t)) != 0)
115 goto out0;
116
117 if (aos_mutex_new(&task->mutex) != 0)
118 goto out1;
119
120 if (aos_sem_new(&task->running_wait, 0) != 0)
121 goto out2;
122
123 if (aos_task_new_ext(&task->task, name ? name : "utask", utask_entry, task, stack_size, prio) != 0)
124 goto out3;
125
126 return task;
127
128 out3:
129 aos_sem_free(&task->running_wait);
130 out2:
131 aos_mutex_free(&task->mutex);
132 out1:
133 aos_queue_free(&task->queue);
134 out0:
135 aos_free(task);
136
137 return NULL;
138 }
139
utask_join(utask_t * task)140 void utask_join(utask_t *task)
141 {
142 aos_assert(task);
143
144 task->running = 0;
145 aos_sem_wait(&task->running_wait, AOS_WAIT_FOREVER);
146
147 slist_del(&task->node, &utask_list);
148
149 uservice_t *node;
150 slist_t *tmp;
151 slist_for_each_entry_safe(&task->uservice_lists, tmp, node, uservice_t, next) {
152 utask_remove(task, node);
153 }
154
155 aos_sem_free(&task->running_wait);
156 aos_mutex_free(&task->mutex);
157 aos_queue_free(&task->queue);
158 aos_free(task);
159 }
160
utask_destroy(utask_t * task)161 void utask_destroy(utask_t *task)
162 {
163 aos_assert(task);
164 utask_join(task);
165 }
166
utask_add(utask_t * task,uservice_t * srv)167 void utask_add(utask_t *task, uservice_t *srv)
168 {
169 aos_assert(task && srv);
170
171 if (srv->task == NULL) {
172 TASK_LOCK(task);
173 srv->task = task;
174 slist_add_tail(&srv->next, &task->uservice_lists);
175 TASK_UNLOCK(task);
176 }
177 }
178
utask_remove(utask_t * task,uservice_t * srv)179 void utask_remove(utask_t *task, uservice_t *srv)
180 {
181 aos_assert(task && srv);
182
183 TASK_LOCK(task);
184
185 #if defined(CONFIG_DEBUG) && defined(CONFIG_DEBUG_UTASK)
186 struct rpc_record *node;
187 slist_t *tmp;
188 slist_for_each_entry_safe(&task->rpc_reclist, tmp, node, struct rpc_record, next) {
189 if (node->srv == srv) {
190 slist_del(&node->next, &task->rpc_reclist);
191 free(node);
192 }
193 }
194 #endif
195
196 slist_del(&srv->next, &task->uservice_lists);
197 srv->task = NULL;
198
199 TASK_UNLOCK(task);
200 }
201
utask_lock(utask_t * task)202 void utask_lock(utask_t *task)
203 {
204 aos_assert(task);
205 TASK_LOCK(task);
206 }
207
utask_unlock(utask_t * task)208 void utask_unlock(utask_t *task)
209 {
210 aos_assert(task);
211 TASK_UNLOCK(task);
212 }
213
utask_set_softwdt_timeout(int ms)214 void utask_set_softwdt_timeout(int ms)
215 {
216 if (ms == 0) {
217 g_utask_softwdt_timeout = 0;
218 } else {
219 g_utask_softwdt_timeout = (ms < MIN_SOFTWDT_TIME) ? MIN_SOFTWDT_TIME : ms;
220 }
221 }
222
task_get(utask_t * task,const char * name)223 uservice_t *task_get(utask_t *task, const char *name)
224 {
225 aos_assert(task);
226
227 uservice_t *node;
228
229 TASK_LOCK(task);
230
231 slist_for_each_entry(&task->uservice_lists, node, uservice_t, next) {
232 if (strcmp(node->name, name) == 0) {
233 TASK_UNLOCK(task);
234 return node;
235 }
236 }
237
238 TASK_UNLOCK(task);
239
240 return NULL;
241 }
242
tasks_debug()243 void tasks_debug()
244 {
245 utask_t *utask;
246 slist_for_each_entry(&utask_list, utask, utask_t, node) {
247 #if defined(CONFIG_DEBUG) && defined(CONFIG_DEBUG_UTASK)
248 struct rpc_record *rpc;
249
250 printf("utask: %s, queue total count: %d, max used: %d\n------------------------------------------------------\n",
251 aos_task_get_name(&utask->task), utask->queue_count, utask->queue_max_used);
252 printf("\t%05s\t\%05s\tSERVER\n", "CMD_ID", "COUNT");
253 slist_for_each_entry(&utask->rpc_reclist, rpc, struct rpc_record, next) {
254 printf("\t%05d\t%05d\t%s\n", rpc->cmd_id, rpc->count, rpc->srv->name);
255 }
256 printf("\n");
257 #endif
258 }
259 }
260