1 /**
2 * @file aiot_task_api.c
3 * @brief TASK模块接口实现文件, 其中包含了TASK的所有用户API
4 * @date 2019-12-27
5 *
6 * @copyright Copyright (C) 2015-2018 Alibaba Group Holding Limited
7 *
8 */
9
10 #include "core_mqtt.h"
11 #include "core_string.h"
12 #include "core_log.h"
13 #include "core_global.h"
14 #include "task_private.h"
15 #include "aiot_task_api.h"
16
17 static void _task_recv_notify_handler(void *handle, const aiot_mqtt_recv_t *msg, void *userdata);
18 static void _task_recv_get_reply_handler(void *handle, const aiot_mqtt_recv_t *msg, void *userdata);
19 static void _task_recv_update_reply_handler(void *handle, const aiot_mqtt_recv_t *msg, void *userdata);
20
21 static const task_status_map_t g_task_stauts_mapping[] = {
22 {"QUEUED", AIOT_TASK_STATUS_QUEUED},
23 {"SENT", AIOT_TASK_STATUS_SENT},
24 {"IN_PROGRESS", AIOT_TASK_STATUS_IN_PROGRESS},
25 {"SUCCEEDED", AIOT_TASK_STATUS_SUCCEEDED},
26 {"FAILED", AIOT_TASK_STATUS_FAILED},
27 {"REJECTED", AIOT_TASK_STATUS_REJECTED},
28 {"CANCELLED", AIOT_TASK_STATUS_CANCELLED},
29 {"REMOVED", AIOT_TASK_STATUS_REMOVED},
30 {"TIMED_OUT", AIOT_TASK_STATUS_TIMED_OUT}
31 };
32
33 static const task_recv_topic_map_t g_task_recv_topic_mapping[] = {
34 {
35 "/sys/+/+/thing/job/notify",
36 _task_recv_notify_handler,
37 },
38 {
39 "/sys/+/+/thing/job/get_reply",
40 _task_recv_get_reply_handler,
41 },
42 {
43 "/sys/+/+/thing/job/update_reply",
44 _task_recv_update_reply_handler,
45 }
46 };
47
_task_convert_status_to_str(aiot_task_status_t status)48 static char *_task_convert_status_to_str(aiot_task_status_t status)
49 {
50 uint32_t i = 0;
51
52 for (i = 0; i < sizeof(g_task_stauts_mapping) / sizeof(task_status_map_t); i++) {
53 if (g_task_stauts_mapping[i].status == status) {
54 return g_task_stauts_mapping[i].str;
55 }
56 }
57
58 return NULL;
59 }
60
_task_cover_status_to_enum(char * str)61 static aiot_task_status_t _task_cover_status_to_enum(char *str)
62 {
63 uint32_t i = 0;
64
65 if (NULL == str) {
66 return AIOT_TASK_STATUS_FAILED;
67 }
68
69 for (i = 0; i < sizeof(g_task_stauts_mapping) / sizeof(task_status_map_t); i++) {
70 if (strcmp(g_task_stauts_mapping[i].str, str) == 0) {
71 return g_task_stauts_mapping[i].status;
72 }
73 }
74
75 return AIOT_TASK_STATUS_FAILED;
76 }
77
_task_parse_json(aiot_sysdep_portfile_t * sysdep,const void * input,uint32_t input_len,char * key_word,char ** out)78 static int32_t _task_parse_json(aiot_sysdep_portfile_t *sysdep, const void *input, uint32_t input_len, char *key_word,
79 char **out)
80 {
81 int32_t res = STATE_SUCCESS;
82 char *value = NULL, *buffer = NULL;
83 uint32_t value_len = 0, buffer_len = 0;
84
85 res = core_json_value((const char *)input, input_len, key_word, strlen(key_word), &value, &value_len);
86 if (res != STATE_SUCCESS) {
87 return STATE_TASK_PARSE_JSON_ERROR;
88 }
89 buffer_len = value_len + 1;
90 buffer = sysdep->core_sysdep_malloc(buffer_len, TASK_MODULE_NAME);
91 if (NULL == buffer) {
92 return STATE_TASK_PARSE_JSON_MALLOC_FAILED;
93 }
94 memset(buffer, 0, buffer_len);
95 memcpy(buffer, value, value_len);
96 *out = buffer;
97 return res;
98 }
99
_task_parse_task_detail(task_handle_t * task_handle,const char * payload,uint32_t payload_len,task_desc_t * task)100 static int32_t _task_parse_task_detail(task_handle_t *task_handle, const char *payload, uint32_t payload_len,
101 task_desc_t *task)
102 {
103 int32_t res = STATE_SUCCESS;
104 if (STATE_SUCCESS != _task_parse_json(task_handle->sysdep, payload, payload_len, "taskId", &(task->task_id))) {
105 return STATE_TASK_PARSE_JSON_MALLOC_FAILED;
106 }
107
108 char *status = NULL;
109 if (STATE_SUCCESS != _task_parse_json(task_handle->sysdep, payload, payload_len, "status", &status)) {
110 return STATE_TASK_PARSE_JSON_MALLOC_FAILED;
111 }
112 task->status = _task_cover_status_to_enum(status);
113 task_handle->sysdep->core_sysdep_free(status);
114
115 _task_parse_json(task_handle->sysdep, payload, payload_len, "jobDocument", &(task->job_document));
116
117 char *job_file_key = "jobFile";
118 char *job_file_value = NULL;
119 uint32_t job_file_len = 0;
120 if ((res = core_json_value(payload, payload_len, job_file_key, (uint32_t)strlen(job_file_key),
121 &job_file_value, &job_file_len)) != STATE_SUCCESS) {
122 return STATE_SUCCESS;
123 }
124
125 if (STATE_SUCCESS != _task_parse_json(task_handle->sysdep, job_file_value, job_file_len, "signMethod",
126 &(task->sign_method))) {
127 return res;
128 }
129
130 if (STATE_SUCCESS != _task_parse_json(task_handle->sysdep, job_file_value, job_file_len, "sign", &(task->sign))) {
131 return res;
132 }
133
134 if (STATE_SUCCESS != _task_parse_json(task_handle->sysdep, job_file_value, job_file_len, "fileUrl",
135 &(task->document_file_url))) {
136 return res;
137 }
138
139 return res;
140 }
141
_task_free_task_list(task_handle_t * task_handle,task_get_list_reply_t * list)142 static void _task_free_task_list(task_handle_t *task_handle, task_get_list_reply_t *list)
143 {
144 int32_t i = 0;
145 if (NULL == list->tasks) {
146 return;
147 }
148
149 for (i = 0; i < list->number; i++) {
150 if (list->tasks[i].task_id) {
151 task_handle->sysdep->core_sysdep_free(list->tasks[i].task_id);
152 }
153 }
154 task_handle->sysdep->core_sysdep_free(list->tasks);
155 }
156
_task_free_task_detail(task_handle_t * task_handle,task_desc_t * task)157 static void _task_free_task_detail(task_handle_t *task_handle, task_desc_t *task)
158 {
159 aiot_sysdep_portfile_t *sysdep = aiot_sysdep_get_portfile();
160 if (NULL == task) {
161 return;
162 }
163
164 if (task->task_id) {
165 sysdep->core_sysdep_free(task->task_id);
166 }
167
168 if (task->job_document) {
169 sysdep->core_sysdep_free(task->job_document);
170 }
171
172 if (task->sign_method) {
173 sysdep->core_sysdep_free(task->sign_method);
174 }
175
176 if (task->sign) {
177 sysdep->core_sysdep_free(task->sign);
178 }
179
180 if (task->document_file_url) {
181 sysdep->core_sysdep_free(task->document_file_url);
182 }
183 }
184
_task_free_update_reply(task_update_reply_t * reply)185 static void _task_free_update_reply(task_update_reply_t *reply)
186 {
187 aiot_sysdep_portfile_t *sysdep = aiot_sysdep_get_portfile();
188 if (NULL == reply) {
189 return;
190 }
191
192 if (reply->task_id) {
193 sysdep->core_sysdep_free(reply->task_id);
194 }
195 }
196
_task_send_notify_reply(task_handle_t * task_handle,char * msg_id,uint32_t code)197 static int32_t _task_send_notify_reply(task_handle_t *task_handle, char *msg_id, uint32_t code)
198 {
199 char *topic = NULL, *payload = NULL;
200 char code_string[11] = { 0 };
201 char *src[2] = { NULL };
202 int32_t res = STATE_SUCCESS;
203
204 core_uint2str(code, code_string, NULL);
205
206 src[0] = msg_id;
207 src[1] = code_string;
208
209 res = core_sprintf(task_handle->sysdep, &payload, TASK_NOTIFY_REPLY_FMT, src, sizeof(src) / sizeof(char *),
210 TASK_MODULE_NAME);
211 if (res < 0) {
212 return res;
213 }
214
215 char *topic_src[] = { core_mqtt_get_product_key(task_handle->mqtt_handle), core_mqtt_get_device_name(task_handle->mqtt_handle) };
216 res = core_sprintf(task_handle->sysdep, &topic, TASK_NOTIFY_REPLY_TOPIC_FMT, topic_src,
217 sizeof(topic_src) / sizeof(char *),
218 TASK_MODULE_NAME);
219 if (res < STATE_SUCCESS) {
220 task_handle->sysdep->core_sysdep_free(payload);
221 return res;
222 }
223
224 res = aiot_mqtt_pub(task_handle->mqtt_handle, (char *)topic, (uint8_t *)payload, strlen(payload), 0);
225 task_handle->sysdep->core_sysdep_free(topic);
226 task_handle->sysdep->core_sysdep_free(payload);
227
228 return res;
229 }
230
_task_parse_notify(task_handle_t * task_handle,const char * payload,uint32_t payload_len,task_desc_t * task)231 static int32_t _task_parse_notify(task_handle_t *task_handle, const char *payload, uint32_t payload_len,
232 task_desc_t *task)
233 {
234 int32_t res = STATE_SUCCESS;
235
236 char *params_key = "params";
237 char *params_value = NULL;
238 uint32_t params_len = 0;
239 if ((res = core_json_value(payload, payload_len, params_key, (uint32_t)strlen(params_key),
240 ¶ms_value, ¶ms_len)) != STATE_SUCCESS) {
241 return res;
242 }
243
244 char *task_key = "task";
245 char *task_value = NULL;
246 uint32_t task_len = 0;
247 if ((res = core_json_value(params_value, params_len, task_key, (uint32_t)strlen(task_key),
248 &task_value, &task_len)) != STATE_SUCCESS) {
249 return res;
250 }
251
252 res = _task_parse_task_detail(task_handle, task_value, task_len, task);
253
254 return res;
255 }
256
_task_recv_notify_handler(void * handle,const aiot_mqtt_recv_t * msg,void * userdata)257 static void _task_recv_notify_handler(void *handle, const aiot_mqtt_recv_t *msg, void *userdata)
258 {
259 task_handle_t *task_handle = (task_handle_t *)userdata;
260 aiot_task_recv_t recv;
261 int32_t res = STATE_SUCCESS;
262 char *id = NULL;
263
264 if (NULL == task_handle->recv_handler) {
265 return;
266 }
267
268 memset(&recv, 0, sizeof(aiot_task_recv_t));
269 recv.type = AIOT_TASKRECV_NOTIFY;
270
271 core_log(task_handle->sysdep, STATE_TASK_RECV_NOTIFY, "task recv notify\r\n");
272
273 if (STATE_SUCCESS != _task_parse_json(task_handle->sysdep, (char *)msg->data.pub.payload, msg->data.pub.payload_len,
274 "id", &id)) {
275 return;
276 }
277
278 _task_send_notify_reply(task_handle, id, 200);
279 if (NULL != id) {
280 task_handle->sysdep->core_sysdep_free(id);
281 }
282
283 if ((res = _task_parse_notify(task_handle, (char *)msg->data.pub.payload, msg->data.pub.payload_len,
284 &(recv.data.notify))) == STATE_SUCCESS) {
285 task_handle->recv_handler(task_handle, &recv, task_handle->userdata);
286 } else {
287 core_log(task_handle->sysdep, STATE_TASK_PARSE_NOTIFY_FAILED, "task parse notify failed\r\n");
288 }
289 _task_free_task_detail(task_handle, &(recv.data.notify));
290 }
291
_task_parse_get_list_reply_array(char * str,int32_t str_len,task_list_json * array)292 static int32_t _task_parse_get_list_reply_array(char *str, int32_t str_len, task_list_json *array)
293 {
294 int32_t num = 0, len = 0;
295 int32_t i = 0, new = 0;
296
297 if ((NULL == str) || (str_len <= 2)) {
298 return 0;
299 }
300
301 str++;
302 if (*str == ']') { /* empty array */
303 return 0;
304 }
305
306 while ((i < str_len) && (num < TASK_GET_LIST_REPLY_ARRAY_MAX)) {
307 if (*str == '{') {
308 len = 1;
309 new = 1;
310 array[num].pos = str;
311 } else if (*str == '}') {
312 len++;
313 if (new == 1) {
314 array[num].len = len;
315 num++;
316 new = 0;
317 }
318 } else {
319 len++;
320 }
321 str++;
322 i++;
323 }
324 return num;
325 }
326
_task_parse_get_list_reply(task_handle_t * task_handle,void * input,uint32_t input_len,task_get_list_reply_t * reply)327 static uint32_t _task_parse_get_list_reply(task_handle_t *task_handle, void *input, uint32_t input_len,
328 task_get_list_reply_t *reply)
329 {
330 int32_t res = STATE_SUCCESS;
331 task_list_json json_array[TASK_GET_LIST_REPLY_ARRAY_MAX];
332 int32_t num = _task_parse_get_list_reply_array(input, input_len, json_array);
333 reply->number = num;
334 if (0 == num) {
335 return res;
336 }
337
338 task_summary_t *array = task_handle->sysdep->core_sysdep_malloc(sizeof(task_summary_t) * num, TASK_MODULE_NAME);
339
340 if (NULL == array) {
341 return STATE_TASK_PARSE_JSON_MALLOC_FAILED;
342 }
343
344 memset(array, 0, sizeof(task_summary_t)*num);
345
346 reply->tasks = array;
347
348 for (int32_t i = 0; i < num; i++) {
349 if (STATE_SUCCESS != _task_parse_json(task_handle->sysdep, json_array[i].pos, json_array[i].len, "taskId",
350 &(array[i].task_id))) {
351 return res;
352 }
353
354 char *status = NULL;
355 if (STATE_SUCCESS != _task_parse_json(task_handle->sysdep, json_array[i].pos, json_array[i].len, "status", &status)) {
356 return res;
357 }
358
359 array[i].status = _task_cover_status_to_enum(status);
360 task_handle->sysdep->core_sysdep_free(status);
361 }
362
363 return res;
364 }
365
_task_recv_get_reply_handler(void * handle,const aiot_mqtt_recv_t * msg,void * userdata)366 static void _task_recv_get_reply_handler(void *handle, const aiot_mqtt_recv_t *msg, void *userdata)
367 {
368 task_handle_t *task_handle = (task_handle_t *)userdata;
369 aiot_task_recv_t recv;
370 int32_t res = STATE_SUCCESS;
371
372 if (NULL == task_handle->recv_handler) {
373 return;
374 }
375
376 memset(&recv, 0, sizeof(aiot_task_recv_t));
377
378 char *code_key = "code";
379 char *code_value = NULL;
380 uint32_t code_len = 0;
381 if (core_json_value((char *)msg->data.pub.payload, msg->data.pub.payload_len, code_key, (uint32_t)strlen(code_key),
382 &code_value, &code_len) != STATE_SUCCESS) {
383 return;
384 }
385
386 if (core_str2uint(code_value, (uint8_t)code_len, &(recv.data.get_detail_reply.code)) != STATE_SUCCESS) {
387 return;
388 }
389
390 char *data_key = "data";
391 char *data_value = NULL;
392 uint32_t data_len = 0;
393 if (core_json_value((char *)msg->data.pub.payload, msg->data.pub.payload_len, data_key, (uint32_t)strlen(data_key),
394 &data_value, &data_len) != STATE_SUCCESS) {
395 return;
396 }
397
398 char *tasks_key = "tasks";
399 char *tasks_value = NULL;
400 uint32_t tasks_len = 0;
401 if (core_json_value(data_value, data_len, tasks_key, (uint32_t)strlen(tasks_key),
402 &tasks_value, &tasks_len) == STATE_SUCCESS) {
403 recv.type = AIOT_TASKRECV_GET_LIST_REPLY;
404 task_get_list_reply_t *reply = &(recv.data.get_list_reply);
405 if ((res = _task_parse_get_list_reply(task_handle, tasks_value, tasks_len, reply)) == STATE_SUCCESS) {
406 task_handle->recv_handler(task_handle, &recv, task_handle->userdata);
407 }
408 _task_free_task_list(task_handle, &(recv.data.get_list_reply));
409 return;
410 }
411
412 char *task_key = "task";
413 char *task_value = NULL;
414 uint32_t task_len = 0;
415 if (core_json_value(data_value, data_len, task_key, (uint32_t)strlen(task_key),
416 &task_value, &task_len) == STATE_SUCCESS) {
417 recv.type = AIOT_TASKRECV_GET_DETAIL_REPLY;
418 task_desc_t *task = &(recv.data.get_detail_reply.task);
419 if ((res = _task_parse_task_detail(task_handle, task_value, task_len, task)) == STATE_SUCCESS) {
420 task_handle->recv_handler(task_handle, &recv, task_handle->userdata);
421 }
422 _task_free_task_detail(task_handle, task);
423 return;
424 }
425
426 char *task_id_key = "taskId";
427 char *task_id_value = NULL;
428 uint32_t task_id_len = 0;
429 if (core_json_value(data_value, data_len, task_id_key, (uint32_t)strlen(task_id_key),
430 &task_id_value, &task_id_len) == STATE_SUCCESS) {
431 if (memcmp(task_id_value, TASK_GET_LIST_REPLY_TASK_ID, strlen(TASK_GET_LIST_REPLY_TASK_ID)) == 0) {
432 recv.type = AIOT_TASKRECV_GET_LIST_REPLY;
433 task_get_list_reply_t *reply = &(recv.data.get_list_reply);
434 reply->number = 0;
435 task_handle->recv_handler(task_handle, &recv, task_handle->userdata);
436 } else {
437 recv.type = AIOT_TASKRECV_GET_DETAIL_REPLY;
438 task_desc_t *task = &(recv.data.get_detail_reply.task);
439
440 char *id = task_handle->sysdep->core_sysdep_malloc(task_id_len + 1, TASK_MODULE_NAME);
441 memset(id, 0, task_id_len + 1);
442 memcpy(id, task_id_value, task_id_len);
443 task->task_id = id;
444 task->status = AIOT_TASK_STATUS_NOT_FOUND;
445 task_handle->recv_handler(task_handle, &recv, task_handle->userdata);
446 _task_free_task_detail(task_handle, task);
447 }
448 }
449 }
450
_task_recv_update_reply_handler(void * handle,const aiot_mqtt_recv_t * msg,void * userdata)451 static void _task_recv_update_reply_handler(void *handle, const aiot_mqtt_recv_t *msg, void *userdata)
452 {
453 task_handle_t *task_handle = (task_handle_t *)userdata;
454 aiot_task_recv_t recv;
455
456 if (NULL == task_handle->recv_handler) {
457 return;
458 }
459
460 memset(&recv, 0, sizeof(aiot_task_recv_t));
461 recv.type = AIOT_TASKRECV_UPDATE_REPLY;
462
463 char *code_key = "code";
464 char *code_value = NULL;
465 uint32_t code_len = 0;
466 if (core_json_value((char *)msg->data.pub.payload, msg->data.pub.payload_len, code_key, (uint32_t)strlen(code_key),
467 &code_value, &code_len) != STATE_SUCCESS) {
468 return;
469 }
470
471 if (core_str2uint(code_value, (uint8_t)code_len, &(recv.data.get_detail_reply.code)) != STATE_SUCCESS) {
472 return;
473 }
474
475 char *data_key = "data";
476 char *data_value = NULL;
477 uint32_t data_len = 0;
478 if (core_json_value((char *)msg->data.pub.payload, msg->data.pub.payload_len, data_key, (uint32_t)strlen(data_key),
479 &data_value, &data_len) != STATE_SUCCESS) {
480 return;
481 }
482
483 _task_parse_json(task_handle->sysdep, data_value, data_len, "taskId", &(recv.data.update_reply.task_id));
484
485 task_handle->recv_handler(task_handle, &recv, task_handle->userdata);
486
487 _task_free_update_reply(&(recv.data.update_reply));
488 }
489
_task_setup_topic_mapping(void * mqtt_handle,void * task_handle)490 static int32_t _task_setup_topic_mapping(void *mqtt_handle, void *task_handle)
491 {
492 uint32_t i = 0;
493 int32_t res = STATE_SUCCESS;
494 aiot_mqtt_topic_map_t topic_mapping;
495
496 for (i = 0; i < sizeof(g_task_recv_topic_mapping) / sizeof(task_recv_topic_map_t); i++) {
497 topic_mapping.topic = g_task_recv_topic_mapping[i].topic;
498 topic_mapping.handler = g_task_recv_topic_mapping[i].func;
499 topic_mapping.userdata = task_handle;
500
501 res = aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_APPEND_TOPIC_MAP, &topic_mapping);
502 if (STATE_SUCCESS != res) {
503 break;
504 }
505 }
506 return res;
507 }
508
_task_core_mqtt_process_handler(void * context,aiot_mqtt_event_t * event,core_mqtt_event_t * core_event)509 static void _task_core_mqtt_process_handler(void *context, aiot_mqtt_event_t *event, core_mqtt_event_t *core_event)
510 {
511 task_handle_t *task_handle = (task_handle_t *)context;
512
513 if (core_event != NULL) {
514 switch (core_event->type) {
515 case CORE_MQTTEVT_DEINIT: {
516 task_handle->mqtt_handle = NULL;
517 return;
518 }
519 break;
520 default: {
521
522 }
523 break;
524 }
525 }
526 }
527
_task_core_mqtt_operate_process_handler(task_handle_t * task_handle,core_mqtt_option_t option)528 static int32_t _task_core_mqtt_operate_process_handler(task_handle_t *task_handle, core_mqtt_option_t option)
529 {
530 core_mqtt_process_data_t process_data;
531
532 memset(&process_data, 0, sizeof(core_mqtt_process_data_t));
533 process_data.handler = _task_core_mqtt_process_handler;
534 process_data.context = task_handle;
535
536 return core_mqtt_setopt(task_handle->mqtt_handle, option, &process_data);
537 }
538
_task_send_query_task(task_handle_t * task_handle,char * task_id)539 static int32_t _task_send_query_task(task_handle_t *task_handle, char *task_id)
540 {
541 char *topic = NULL, *payload = NULL;
542 int32_t id = 0;
543 char id_string[11] = { 0 };
544 char *src[2] = { NULL };
545 int32_t res = STATE_SUCCESS;
546
547 if (NULL == task_id) {
548 return STATE_TASK_QUERY_TASK_ID_IS_NULL;
549 }
550
551 core_global_alink_id_next(task_handle->sysdep, &id);
552 core_int2str(id, id_string, NULL);
553
554 src[0] = id_string;
555 src[1] = task_id;
556
557 res = core_sprintf(task_handle->sysdep, &payload, TASK_REQUEST_QUERY_TASK_FMT, src, sizeof(src) / sizeof(char *),
558 TASK_MODULE_NAME);
559 if (res < 0) {
560 return res;
561 }
562
563 char *topic_src[] = { core_mqtt_get_product_key(task_handle->mqtt_handle), core_mqtt_get_device_name(task_handle->mqtt_handle) };
564 res = core_sprintf(task_handle->sysdep, &topic, TASK_QUERY_TASK_TOPIC_FMT, topic_src,
565 sizeof(topic_src) / sizeof(char *),
566 TASK_MODULE_NAME);
567 if (res < STATE_SUCCESS) {
568 task_handle->sysdep->core_sysdep_free(payload);
569 return res;
570 }
571
572 res = aiot_mqtt_pub(task_handle->mqtt_handle, (char *)topic, (uint8_t *)payload, strlen(payload), 0);
573 task_handle->sysdep->core_sysdep_free(payload);
574 task_handle->sysdep->core_sysdep_free(topic);
575
576 return res;
577 }
578
aiot_task_get_task_list(void * handle)579 int32_t aiot_task_get_task_list(void *handle)
580 {
581 task_handle_t *task_handle = (task_handle_t *)handle;
582
583 if (NULL == handle) {
584 return STATE_USER_INPUT_NULL_POINTER;
585 }
586 return _task_send_query_task(task_handle, "$list");
587 }
588
aiot_task_get_task_detail(void * handle,char * task_id)589 int32_t aiot_task_get_task_detail(void *handle, char *task_id)
590 {
591 int32_t res = STATE_SUCCESS;
592 task_handle_t *task_handle = (task_handle_t *)handle;
593
594 if (NULL == handle) {
595 return STATE_USER_INPUT_NULL_POINTER;
596 }
597
598 if (NULL == task_id) {
599 res = _task_send_query_task(task_handle, "$next");
600 } else {
601 res = _task_send_query_task(task_handle, task_id);
602 }
603
604 return res;
605 }
606
aiot_task_update(void * handle,task_desc_t * task)607 int32_t aiot_task_update(void *handle, task_desc_t *task)
608 {
609 int32_t res = STATE_SUCCESS;
610 char *topic = NULL, *payload = NULL, *status = NULL;
611 int32_t id = 0;
612 char id_string[11] = { 0 };
613 char progress_string[11] = { 0 };
614 char *src[5] = { NULL };
615 task_handle_t *task_handle = (task_handle_t *)handle;
616
617 if ((NULL == handle) || (NULL == task)) {
618 return STATE_USER_INPUT_NULL_POINTER;
619 }
620
621 if ((NULL != task->status_details) && (strcmp(task->status_details, "") == 0)) {
622 return STATE_TASK_UPDATE_STATUS_DETAILS_INVALID;
623 }
624
625 status = _task_convert_status_to_str(task->status);
626
627 if (NULL == status) {
628 return STATE_TASK_UPDATE_STATUS_INVALID;
629 }
630
631 core_global_alink_id_next(task_handle->sysdep, &id);
632 core_int2str(id, id_string, NULL);
633
634 char *topic_src[] = { core_mqtt_get_product_key(task_handle->mqtt_handle), core_mqtt_get_device_name(task_handle->mqtt_handle) };
635 res = core_sprintf(task_handle->sysdep, &topic, TASK_UPDATE_TASK_TOPIC_FMT, topic_src,
636 sizeof(topic_src) / sizeof(char *),
637 TASK_MODULE_NAME);
638 if (res < STATE_SUCCESS) {
639 return res;
640 }
641
642 core_uint2str(task->progress, progress_string, NULL);
643 src[0] = id_string;
644 src[1] = task->task_id;
645 src[2] = _task_convert_status_to_str(task->status);
646 src[3] = progress_string;
647
648 if (NULL == task->status_details) {
649 res = core_sprintf(task_handle->sysdep, &payload, TASK_REQUEST_UPDATE_TASK_NO_DETAIL_FMT, src, 4,
650 TASK_MODULE_NAME);
651 } else {
652 src[4] = task->status_details;
653 res = core_sprintf(task_handle->sysdep, &payload, TASK_REQUEST_UPDATE_TASK_FMT, src, sizeof(src) / sizeof(char *),
654 TASK_MODULE_NAME);
655 }
656
657 if (res < STATE_SUCCESS) {
658 task_handle->sysdep->core_sysdep_free(topic);
659 return res;
660 }
661
662 res = aiot_mqtt_pub(task_handle->mqtt_handle, (char *)topic, (uint8_t *)payload, strlen(payload), 0);
663 task_handle->sysdep->core_sysdep_free(topic);
664 task_handle->sysdep->core_sysdep_free(payload);
665
666 return res;
667 }
668
aiot_task_setopt(void * handle,aiot_task_option_t option,void * data)669 int32_t aiot_task_setopt(void *handle, aiot_task_option_t option, void *data)
670 {
671 int32_t res = STATE_SUCCESS;
672 task_handle_t *task_handle = (task_handle_t *)handle;
673 aiot_sysdep_portfile_t *sysdep = NULL;
674
675 if (NULL == task_handle) {
676 return STATE_TASK_SETOPT_HANDLE_IS_NULL;
677 }
678 if (NULL == data) {
679 return STATE_TASK_SETOPT_DATA_IS_NULL;
680 }
681
682 sysdep = task_handle->sysdep;
683 sysdep->core_sysdep_mutex_lock(task_handle->data_mutex);
684 switch (option) {
685 case AIOT_TASKOPT_RECV_HANDLER: {
686 task_handle->recv_handler = (aiot_task_recv_handler_t)data;
687 }
688 break;
689 case AIOT_TASKOPT_USERDATA: {
690 task_handle->userdata = data;
691 }
692 break;
693 case AIOT_TASKOPT_MQTT_HANDLE: {
694 task_handle->mqtt_handle = data;
695 res = _task_setup_topic_mapping(data, task_handle);
696 if (res >= STATE_SUCCESS) {
697 res = _task_core_mqtt_operate_process_handler(task_handle, CORE_MQTTOPT_APPEND_PROCESS_HANDLER);
698 }
699 }
700 break;
701 default: {
702 res = STATE_USER_INPUT_UNKNOWN_OPTION;
703 }
704 break;
705 }
706 sysdep->core_sysdep_mutex_unlock(task_handle->data_mutex);
707
708 return res;
709 }
710
aiot_task_init(void)711 void *aiot_task_init(void)
712 {
713 task_handle_t *task_handle = NULL;
714 aiot_sysdep_portfile_t *sysdep = NULL;
715
716 sysdep = aiot_sysdep_get_portfile();
717 if (sysdep == NULL) {
718 return NULL;
719 }
720
721 task_handle = sysdep->core_sysdep_malloc(sizeof(task_handle_t), TASK_MODULE_NAME);
722 if (task_handle == NULL) {
723 return NULL;
724 }
725
726 memset(task_handle, 0, sizeof(task_handle_t));
727 task_handle->sysdep = sysdep;
728
729 core_global_init(sysdep);
730 return task_handle;
731 }
732
aiot_task_deinit(void ** p_handle)733 int32_t aiot_task_deinit(void **p_handle)
734 {
735 task_handle_t *task_handle = NULL;
736 aiot_sysdep_portfile_t *sysdep = NULL;
737 uint8_t i = 0;
738
739 if (NULL == p_handle || NULL == *p_handle) {
740 return STATE_USER_INPUT_NULL_POINTER;
741 }
742
743 task_handle = *p_handle;
744 sysdep = task_handle->sysdep;
745 *p_handle = NULL;
746
747 _task_core_mqtt_operate_process_handler(task_handle, CORE_MQTTOPT_REMOVE_PROCESS_HANDLER);
748
749 /* remove mqtt topic mapping */
750 for (i = 0; i < sizeof(g_task_recv_topic_mapping) / sizeof(task_recv_topic_map_t); i++) {
751 aiot_mqtt_topic_map_t topic_mapping;
752 memset(&topic_mapping, 0, sizeof(aiot_mqtt_topic_map_t));
753 topic_mapping.topic = g_task_recv_topic_mapping[i].topic;
754 topic_mapping.handler = g_task_recv_topic_mapping[i].func;
755
756 aiot_mqtt_setopt(task_handle->mqtt_handle, AIOT_MQTTOPT_REMOVE_TOPIC_MAP, &topic_mapping);
757 }
758
759 sysdep->core_sysdep_free(task_handle);
760
761 core_global_deinit(sysdep);
762 return STATE_SUCCESS;
763 }
764