1 /*
2  * Copyright (C) 2019-2020 Alibaba Group Holding Limited
3  */
4 
5 #include <stdint.h>
6 #include <string.h>
7 
8 #include <aos/kernel.h>
9 //#include <aos/osal_debug.h>
10 
11 #include <uservice/uservice.h>
12 
13 #include "internal.h"
14 
15 #define TAG "utask"
16 
17 static int g_utask_softwdt_timeout = 0;
18 static AOS_SLIST_HEAD(utask_list);
19 
20 #define LOOP_TIME_MS 1000
21 #define MIN_SOFTWDT_TIME 2000 /*ms*/
22 
23 #if 0
24 static void task_will(void *args)
25 {
26     utask_t *task = (utask_t*)args;
27 
28     LOGE(TAG, "utask: %s crash!", aos_task_get_name(&task->task));
29     if (task->current_rpc) {
30         LOGE(TAG, "uservice_name: %s, rpc_cmd: %d", task->current_rpc->srv->name, task->current_rpc->cmd_id);
31     }
32 }
33 #endif
34 
utask_entry(void * data)35 static void utask_entry(void *data)
36 {
37     utask_t *task = (utask_t *)data;
38 
39     rpc_t  rpc;
40     size_t size;
41 
42     while (task->running) {
43         if (aos_queue_recv(&task->queue, LOOP_TIME_MS, &rpc, &size) == 0) {
44 
45 #if defined(CONFIG_DEBUG) && defined(CONFIG_DEBUG_UTASK)
46             int count = aos_queue_get_count(&task->queue) + 1;
47             if (count > task->queue_max_used)
48                 task->queue_max_used = count;
49 
50             struct rpc_record *node;
51             int               found = 0;
52             slist_for_each_entry(&task->rpc_reclist, node, struct rpc_record, next) {
53                 if (node->cmd_id == rpc.cmd_id && node->srv == rpc.srv) {
54                     node->count++;
55                     found = 1;
56                     break;
57                 }
58             }
59 
60             if (found == 0) {
61                 node = aos_zalloc(sizeof(struct rpc_record));
62                 node->cmd_id = rpc.cmd_id;
63                 node->srv = rpc.srv;
64                 node->count = 1;
65                 slist_add(&node->next, &task->rpc_reclist);
66             }
67 #endif
68             if (rpc.srv->process_rpc) {
69                 task->current_rpc = &rpc;
70 
71                 if (g_utask_softwdt_timeout >= MIN_SOFTWDT_TIME) {
72                     //aos_task_wdt_attach(task_will, task);
73                     //aos_task_wdt_feed(g_utask_softwdt_timeout);
74                     // simulaton timeout
75                     // sleep(11);
76                     //uservice_lock(rpc.srv);
77                     rpc.srv->process_rpc(rpc.srv->context, &rpc);
78                     //uservice_unlock(rpc.srv);
79                     //aos_task_wdt_detach();
80                 } else {
81                     //uservice_lock(rpc.srv);
82                     rpc.srv->process_rpc(rpc.srv->context, &rpc);
83                     //uservice_unlock(rpc.srv);
84                 }
85                 task->current_rpc = NULL;
86             } else {
87                 rpc_reply(&rpc);
88             }
89         }
90     }
91 
92     aos_sem_signal(&task->running_wait);
93 }
94 
utask_new(const char * name,size_t stack_size,int queue_count,int prio)95 utask_t *utask_new(const char *name, size_t stack_size, int queue_count, int prio)
96 {
97     if (stack_size <= 0 || queue_count <= 0)
98         return NULL;
99     int queue_buffer_size = queue_count * sizeof(rpc_t);
100 
101     utask_t *task = aos_zalloc(sizeof(utask_t) + queue_buffer_size);
102 
103     if (task == NULL)
104         return NULL;
105 
106     task->running = 1;
107     slist_init(&task->uservice_lists);
108     slist_init(&task->rpc_buffer_gc_cache);
109     slist_add_tail(&task->node, &utask_list);
110 
111     task->queue_count = queue_count;
112     task->queue_buffer = (uint8_t*)task + sizeof(utask_t);
113 
114     if (aos_queue_new(&task->queue, task->queue_buffer, queue_buffer_size, sizeof(rpc_t)) != 0)
115         goto out0;
116 
117     if (aos_mutex_new(&task->mutex) != 0)
118         goto out1;
119 
120     if (aos_sem_new(&task->running_wait, 0) != 0)
121         goto out2;
122 
123     if (aos_task_new_ext(&task->task, name ? name : "utask", utask_entry, task, stack_size, prio) != 0)
124         goto out3;
125 
126     return task;
127 
128 out3:
129     aos_sem_free(&task->running_wait);
130 out2:
131     aos_mutex_free(&task->mutex);
132 out1:
133     aos_queue_free(&task->queue);
134 out0:
135     aos_free(task);
136 
137     return NULL;
138 }
139 
utask_join(utask_t * task)140 void utask_join(utask_t *task)
141 {
142     aos_assert(task);
143 
144     task->running = 0;
145     aos_sem_wait(&task->running_wait, AOS_WAIT_FOREVER);
146 
147     slist_del(&task->node, &utask_list);
148 
149     uservice_t *node;
150     slist_t *tmp;
151     slist_for_each_entry_safe(&task->uservice_lists, tmp, node, uservice_t, next) {
152         utask_remove(task, node);
153     }
154 
155     aos_sem_free(&task->running_wait);
156     aos_mutex_free(&task->mutex);
157     aos_queue_free(&task->queue);
158     aos_free(task);
159 }
160 
utask_destroy(utask_t * task)161 void utask_destroy(utask_t *task)
162 {
163     aos_assert(task);
164     utask_join(task);
165 }
166 
utask_add(utask_t * task,uservice_t * srv)167 void utask_add(utask_t *task, uservice_t *srv)
168 {
169     aos_assert(task && srv);
170 
171     if (srv->task == NULL) {
172         TASK_LOCK(task);
173         srv->task = task;
174         slist_add_tail(&srv->next, &task->uservice_lists);
175         TASK_UNLOCK(task);
176     }
177 }
178 
utask_remove(utask_t * task,uservice_t * srv)179 void utask_remove(utask_t *task, uservice_t *srv)
180 {
181     aos_assert(task && srv);
182 
183     TASK_LOCK(task);
184 
185 #if defined(CONFIG_DEBUG) && defined(CONFIG_DEBUG_UTASK)
186     struct rpc_record *node;
187     slist_t *tmp;
188     slist_for_each_entry_safe(&task->rpc_reclist, tmp, node, struct rpc_record, next) {
189         if (node->srv == srv) {
190             slist_del(&node->next, &task->rpc_reclist);
191             free(node);
192         }
193     }
194 #endif
195 
196     slist_del(&srv->next, &task->uservice_lists);
197     srv->task = NULL;
198 
199     TASK_UNLOCK(task);
200 }
201 
utask_lock(utask_t * task)202 void utask_lock(utask_t *task)
203 {
204     aos_assert(task);
205     TASK_LOCK(task);
206 }
207 
utask_unlock(utask_t * task)208 void utask_unlock(utask_t *task)
209 {
210     aos_assert(task);
211     TASK_UNLOCK(task);
212 }
213 
utask_set_softwdt_timeout(int ms)214 void utask_set_softwdt_timeout(int ms)
215 {
216     if (ms == 0) {
217         g_utask_softwdt_timeout = 0;
218     } else {
219         g_utask_softwdt_timeout = (ms < MIN_SOFTWDT_TIME) ? MIN_SOFTWDT_TIME : ms;
220     }
221 }
222 
task_get(utask_t * task,const char * name)223 uservice_t *task_get(utask_t *task, const char *name)
224 {
225     aos_assert(task);
226 
227     uservice_t *node;
228 
229     TASK_LOCK(task);
230 
231     slist_for_each_entry(&task->uservice_lists, node, uservice_t, next) {
232         if (strcmp(node->name, name) == 0) {
233             TASK_UNLOCK(task);
234             return node;
235         }
236     }
237 
238     TASK_UNLOCK(task);
239 
240     return NULL;
241 }
242 
tasks_debug()243 void tasks_debug()
244 {
245     utask_t *utask;
246     slist_for_each_entry(&utask_list, utask, utask_t, node) {
247 #if defined(CONFIG_DEBUG) && defined(CONFIG_DEBUG_UTASK)
248         struct rpc_record *rpc;
249 
250         printf("utask: %s, queue total count: %d, max used: %d\n------------------------------------------------------\n",
251                aos_task_get_name(&utask->task), utask->queue_count, utask->queue_max_used);
252         printf("\t%05s\t\%05s\tSERVER\n", "CMD_ID", "COUNT");
253         slist_for_each_entry(&utask->rpc_reclist, rpc, struct rpc_record, next) {
254             printf("\t%05d\t%05d\t%s\n", rpc->cmd_id, rpc->count, rpc->srv->name);
255         }
256         printf("\n");
257 #endif
258     }
259 }
260