1 /**
2  * @file aiot_dm_api.c
3  * @brief 数据模型模块接口实现文件, 包含了支持物模型数据格式通信的所有接口实现
4  * @date 2020-01-20
5  *
6  * @copyright Copyright (C) 2015-2020 Alibaba Group Holding Limited
7  *
8  */
9 
10 #include "dm_private.h"
11 
12 
13 static int32_t _dm_send_property_post(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg);
14 static int32_t _dm_send_event_post(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg);
15 static int32_t _dm_send_property_set_reply(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg);
16 static int32_t _dm_send_async_service_reply(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg);
17 static int32_t _dm_send_sync_service_reply(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg);
18 static int32_t _dm_send_raw_data(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg);
19 static int32_t _dm_send_raw_service_reply(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg);
20 static int32_t _dm_send_desired_get(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg);
21 static int32_t _dm_send_desired_delete(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg);
22 static int32_t _dm_send_property_batch_post(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg);
23 
24 static void _dm_recv_generic_reply_handler(void *handle, const aiot_mqtt_recv_t *msg, void *userdata);
25 static void _dm_recv_property_set_handler(void *handle, const aiot_mqtt_recv_t *msg, void *userdata);
26 static void _dm_recv_async_service_invoke_handler(void *handle, const aiot_mqtt_recv_t *msg, void *userdata);
27 static void _dm_recv_sync_service_invoke_handler(void *handle, const aiot_mqtt_recv_t *msg, void *userdata);
28 static void _dm_recv_raw_data_handler(void *handle, const aiot_mqtt_recv_t *msg, void *userdata);
29 static void _dm_recv_raw_sync_service_invoke_handler(void *handle, const aiot_mqtt_recv_t *msg, void *userdata);
30 static void _dm_recv_up_raw_reply_data_handler(void *handle, const aiot_mqtt_recv_t *msg, void *userdata);
31 
32 static const dm_send_topic_map_t g_dm_send_topic_mapping[AIOT_DMMSG_MAX] = {
33     {
34         "/sys/%s/%s/thing/event/property/post",
35         _dm_send_property_post
36     },
37     {
38         "/sys/%s/%s/thing/event/%s/post",
39         _dm_send_event_post
40     },
41     {
42         "/sys/%s/%s/thing/service/property/set_reply",
43         _dm_send_property_set_reply
44     },
45     {
46         "/sys/%s/%s/thing/service/%s_reply",
47         _dm_send_async_service_reply
48     },
49     {
50         "/ext/rrpc/%s/sys/%s/%s/thing/service/%s",
51         _dm_send_sync_service_reply
52     },
53     {
54         "/sys/%s/%s/thing/model/up_raw",
55         _dm_send_raw_data
56     },
57     {
58         "/ext/rrpc/%s/sys/%s/%s/thing/model/down_raw",
59         _dm_send_raw_service_reply
60     },
61     {
62         "/sys/%s/%s/thing/property/desired/get",
63         _dm_send_desired_get
64     },
65     {
66         "/sys/%s/%s/thing/property/desired/delete",
67         _dm_send_desired_delete
68     },
69     {
70         "/sys/%s/%s/thing/event/property/batch/post",
71         _dm_send_property_batch_post
72     },
73 };
74 
75 static const dm_recv_topic_map_t g_dm_recv_topic_mapping[] = {
76     {
77         "/sys/+/+/thing/event/+/post_reply",
78         _dm_recv_generic_reply_handler,
79     },
80     {
81         "/sys/+/+/thing/service/property/set",
82         _dm_recv_property_set_handler,
83     },
84     {
85         "/sys/+/+/thing/service/+",
86         _dm_recv_async_service_invoke_handler,
87     },
88     {
89         "/ext/rrpc/+/sys/+/+/thing/service/+",
90         _dm_recv_sync_service_invoke_handler,
91     },
92     {
93         "/sys/+/+/thing/model/down_raw",
94         _dm_recv_raw_data_handler,
95     },
96     {
97         "/sys/+/+/thing/model/up_raw_reply",
98         _dm_recv_up_raw_reply_data_handler,
99     },
100     {
101         "/ext/rrpc/+/sys/+/+/thing/model/down_raw",
102         _dm_recv_raw_sync_service_invoke_handler,
103     },
104     {
105         "/sys/+/+/thing/property/desired/get_reply",
106         _dm_recv_generic_reply_handler,
107     },
108     {
109         "/sys/+/+/thing/property/desired/delete_reply",
110         _dm_recv_generic_reply_handler,
111     },
112     {
113         "/sys/+/+/thing/event/property/batch/post_reply",
114         _dm_recv_generic_reply_handler,
115     },
116 };
117 
_append_diag_data(dm_handle_t * dm_handle,uint8_t msg_type,int32_t msg_id)118 static void _append_diag_data(dm_handle_t *dm_handle, uint8_t msg_type, int32_t msg_id)
119 {
120     /* append diagnose data */
121     uint8_t diag_data[] = { 0x00, 0x30, 0x01, 0x00, 0x00, 0x31, 0x04, 0x00, 0x00, 0x00, 0x00 };
122     diag_data[3] = msg_type;
123     diag_data[7] = (msg_id >> 24) & 0xFF;
124     diag_data[8] = (msg_id >> 16) & 0xFF;
125     diag_data[9] = (msg_id >> 8) & 0xFF;
126     diag_data[10] = msg_id & 0xFF;
127     core_diag(dm_handle->sysdep, STATE_DM_BASE, diag_data, sizeof(diag_data));
128 }
129 
_dm_setup_topic_mapping(void * mqtt_handle,void * dm_handle)130 static int32_t _dm_setup_topic_mapping(void *mqtt_handle, void *dm_handle)
131 {
132     uint32_t i = 0;
133     int32_t res = STATE_SUCCESS;
134 
135     for (i = 0; i < sizeof(g_dm_recv_topic_mapping) / sizeof(dm_recv_topic_map_t); i++) {
136         aiot_mqtt_topic_map_t topic_mapping;
137         topic_mapping.topic = g_dm_recv_topic_mapping[i].topic;
138         topic_mapping.handler = g_dm_recv_topic_mapping[i].func;
139         topic_mapping.userdata = dm_handle;
140 
141         res = aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_APPEND_TOPIC_MAP, &topic_mapping);
142         if (res < 0) {
143             break;
144         }
145     }
146     return res;
147 }
148 
_dm_prepare_send_topic(dm_handle_t * dm_handle,const aiot_dm_msg_t * msg,char ** topic)149 static int32_t _dm_prepare_send_topic(dm_handle_t *dm_handle, const aiot_dm_msg_t *msg, char **topic)
150 {
151     char *src[4];
152     uint8_t src_count = 0;
153     char *pk = NULL;
154     char *dn = NULL;
155 
156     if (NULL == msg->product_key && NULL == core_mqtt_get_product_key(dm_handle->mqtt_handle)) {
157         return STATE_USER_INPUT_MISSING_PRODUCT_KEY;
158     }
159     if (NULL == msg->device_name && NULL == core_mqtt_get_device_name(dm_handle->mqtt_handle)) {
160         return STATE_USER_INPUT_MISSING_DEVICE_NAME;
161     }
162 
163     pk = (msg->product_key != NULL) ? msg->product_key : core_mqtt_get_product_key(dm_handle->mqtt_handle);
164     dn = (msg->device_name != NULL) ? msg->device_name : core_mqtt_get_device_name(dm_handle->mqtt_handle);
165 
166     switch (msg->type) {
167         case AIOT_DMMSG_PROPERTY_POST:
168         case AIOT_DMMSG_PROPERTY_BATCH_POST:
169         case AIOT_DMMSG_PROPERTY_SET_REPLY:
170         case AIOT_DMMSG_GET_DESIRED:
171         case AIOT_DMMSG_DELETE_DESIRED:
172         case AIOT_DMMSG_RAW_DATA: {
173             src[0] = pk;
174             src[1] = dn;
175             src_count = 2;
176         }
177         break;
178         case AIOT_DMMSG_EVENT_POST: {
179             if (msg->data.event_post.event_id == NULL) {
180                 return STATE_DM_EVENT_ID_IS_NULL;
181             }
182             src[0] = pk;
183             src[1] = dn;
184             src[2] = msg->data.event_post.event_id;
185             src_count = 3;
186         }
187         break;
188         case AIOT_DMMSG_ASYNC_SERVICE_REPLY: {
189             if (msg->data.async_service_reply.service_id == NULL) {
190                 return STATE_DM_SERVICE_ID_IS_NULL;
191             }
192             src[0] = pk;
193             src[1] = dn;
194             src[2] = msg->data.async_service_reply.service_id;
195             src_count = 3;
196         }
197         break;
198         case AIOT_DMMSG_SYNC_SERVICE_REPLY: {
199             if (msg->data.sync_service_reply.rrpc_id == NULL) {
200                 return STATE_DM_RRPC_ID_IS_NULL;
201             }
202             if (msg->data.sync_service_reply.service_id == NULL) {
203                 return STATE_DM_SERVICE_ID_IS_NULL;
204             }
205             src[0] = msg->data.sync_service_reply.rrpc_id;
206             src[1] = pk;
207             src[2] = dn;
208             src[3] = msg->data.sync_service_reply.service_id;
209             src_count = 4;
210         }
211         break;
212         case AIOT_DMMSG_RAW_SERVICE_REPLY: {
213             if (msg->data.raw_service_reply.rrpc_id == NULL) {
214                 return STATE_DM_RRPC_ID_IS_NULL;
215             }
216             src[0] = msg->data.raw_service_reply.rrpc_id;
217             src[1] = pk;
218             src[2] = dn;
219             src_count = 3;
220         }
221         break;
222         default:
223             return STATE_USER_INPUT_OUT_RANGE;
224     }
225 
226     return core_sprintf(dm_handle->sysdep, topic, g_dm_send_topic_mapping[msg->type].topic, src, src_count,
227                         DATA_MODEL_MODULE_NAME);
228 }
229 
230 
_dm_send_alink_req(dm_handle_t * handle,const char * topic,char * params)231 static int32_t _dm_send_alink_req(dm_handle_t *handle, const char *topic, char *params)
232 {
233     char *payload = NULL;
234     int32_t id = 0;
235     char id_string[11] = { 0 };
236     char *src[3] = { NULL };
237     int32_t res = STATE_SUCCESS;
238 
239     if (NULL == params) {
240         return STATE_DM_MSG_PARAMS_IS_NULL;
241     }
242 
243     core_global_alink_id_next(handle->sysdep, &id);
244     core_int2str(id, id_string, NULL);
245 
246     _append_diag_data(handle, DM_DIAG_MSG_TYPE_REQ, id);
247 
248     src[0] = id_string;
249     src[1] = params;
250     src[2] = (0 == handle->post_reply) ? "0" : "1";
251 
252     res = core_sprintf(handle->sysdep, &payload, ALINK_REQUEST_FMT, src, sizeof(src) / sizeof(char *),
253                        DATA_MODEL_MODULE_NAME);
254     if (res < 0) {
255         return res;
256     }
257 
258     res = aiot_mqtt_pub(handle->mqtt_handle, (char *)topic, (uint8_t *)payload, strlen(payload), 0);
259     handle->sysdep->core_sysdep_free(payload);
260 
261     if (STATE_SUCCESS == res) {
262         return id;
263     }
264     return res;
265 }
266 
_dm_send_alink_rsp(dm_handle_t * handle,const char * topic,uint64_t msg_id,uint32_t code,char * data)267 static int32_t _dm_send_alink_rsp(dm_handle_t *handle, const char *topic, uint64_t msg_id, uint32_t code,
268                                   char *data)
269 {
270     char *payload = NULL;
271     char id_string[21] = { 0 };
272     char code_string[11] = { 0 };
273     char *src[3] = { NULL };
274     int32_t res = STATE_SUCCESS;
275 
276     if (NULL == data) {
277         return STATE_DM_MSG_DATA_IS_NULL;
278     }
279 
280     core_uint642str(msg_id, id_string, NULL);
281     core_uint2str(code, code_string, NULL);
282 
283     src[0] = id_string;
284     src[1] = code_string;
285     src[2] = data;
286 
287     res = core_sprintf(handle->sysdep, &payload, ALINK_RESPONSE_FMT, src, sizeof(src) / sizeof(char *),
288                        DATA_MODEL_MODULE_NAME);
289     if (res < 0) {
290         return res;
291     }
292 
293     res = aiot_mqtt_pub(handle->mqtt_handle, (char *)topic, (uint8_t *)payload, strlen(payload), 0);
294     handle->sysdep->core_sysdep_free(payload);
295 
296     return res;
297 }
298 
299 /*** dm send function start ***/
_dm_send_property_post(dm_handle_t * handle,const char * topic,const aiot_dm_msg_t * msg)300 static int32_t _dm_send_property_post(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg)
301 {
302     return _dm_send_alink_req(handle, topic, msg->data.property_post.params);
303 }
304 
_dm_send_event_post(dm_handle_t * handle,const char * topic,const aiot_dm_msg_t * msg)305 static int32_t _dm_send_event_post(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg)
306 {
307     return _dm_send_alink_req(handle, topic, msg->data.event_post.params);
308 }
309 
_dm_send_property_set_reply(dm_handle_t * handle,const char * topic,const aiot_dm_msg_t * msg)310 static int32_t _dm_send_property_set_reply(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg)
311 {
312     return _dm_send_alink_rsp(handle, topic, msg->data.property_set_reply.msg_id,
313                               msg->data.property_set_reply.code,
314                               msg->data.property_set_reply.data);
315 }
316 
_dm_send_async_service_reply(dm_handle_t * handle,const char * topic,const aiot_dm_msg_t * msg)317 static int32_t _dm_send_async_service_reply(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg)
318 {
319     return _dm_send_alink_rsp(handle, topic, msg->data.async_service_reply.msg_id,
320                               msg->data.async_service_reply.code,
321                               msg->data.async_service_reply.data);
322 }
323 
_dm_send_sync_service_reply(dm_handle_t * handle,const char * topic,const aiot_dm_msg_t * msg)324 static int32_t _dm_send_sync_service_reply(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg)
325 {
326     return _dm_send_alink_rsp(handle, topic, msg->data.sync_service_reply.msg_id,
327                               msg->data.sync_service_reply.code,
328                               msg->data.sync_service_reply.data);
329 }
330 
_dm_send_raw_data(dm_handle_t * handle,const char * topic,const aiot_dm_msg_t * msg)331 static int32_t _dm_send_raw_data(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg)
332 {
333     return aiot_mqtt_pub(handle->mqtt_handle, (char *)topic, msg->data.raw_data.data, msg->data.raw_data.data_len, 0);
334 }
335 
_dm_send_raw_service_reply(dm_handle_t * handle,const char * topic,const aiot_dm_msg_t * msg)336 static int32_t _dm_send_raw_service_reply(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg)
337 {
338     return aiot_mqtt_pub(handle->mqtt_handle, (char *)topic, msg->data.raw_service_reply.data,
339                          msg->data.raw_service_reply.data_len, 0);
340 }
341 
_dm_send_desired_get(dm_handle_t * handle,const char * topic,const aiot_dm_msg_t * msg)342 static int32_t _dm_send_desired_get(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg)
343 {
344     return _dm_send_alink_req(handle, topic, msg->data.get_desired.params);
345 }
346 
_dm_send_desired_delete(dm_handle_t * handle,const char * topic,const aiot_dm_msg_t * msg)347 static int32_t _dm_send_desired_delete(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg)
348 {
349     return _dm_send_alink_req(handle, topic, msg->data.delete_desired.params);
350 }
351 
_dm_send_property_batch_post(dm_handle_t * handle,const char * topic,const aiot_dm_msg_t * msg)352 static int32_t _dm_send_property_batch_post(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg)
353 {
354     return _dm_send_alink_req(handle, topic, msg->data.property_post.params);
355 }
356 /*** dm send function end ***/
357 
358 /*** dm recv handler functions start ***/
_dm_get_topic_level(aiot_sysdep_portfile_t * sysdep,char * topic,uint32_t topic_len,uint8_t level,char ** level_name)359 static int32_t _dm_get_topic_level(aiot_sysdep_portfile_t *sysdep, char *topic, uint32_t topic_len, uint8_t level,
360                                    char **level_name)
361 {
362     uint32_t i = 0;
363     uint16_t level_curr = 0;
364     char *p_open = NULL;
365     char *p_close = NULL;
366     char *p_name = NULL;
367     uint16_t name_len = 0;
368 
369     for (i = 0; i < (topic_len - 1); i++) {
370         if (topic[i] == '/') {
371             level_curr++;
372             if (level_curr == level && p_open == NULL) {
373                 p_open = topic + i + 1;
374             }
375 
376             if (level_curr == (level + 1) && p_close == NULL) {
377                 p_close = topic + i;
378             }
379         }
380     }
381 
382     if (p_open == NULL) {
383         return STATE_DM_INTERNAL_TOPIC_ERROR;
384     }
385     if (p_close == NULL) {
386         p_close = topic + topic_len;
387     }
388 
389     name_len = p_close - p_open;
390     p_name = sysdep->core_sysdep_malloc(name_len + 1, DATA_MODEL_MODULE_NAME);
391     if (p_name == NULL) {
392         return STATE_SYS_DEPEND_MALLOC_FAILED;
393     }
394     memset(p_name, 0, name_len + 1);
395     memcpy(p_name, p_open, name_len);
396     *level_name = p_name;
397 
398     return STATE_SUCCESS;
399 }
400 
_dm_parse_alink_request(const char * payload,uint32_t payload_len,uint64_t * msg_id,char ** params,uint32_t * params_len)401 static int32_t _dm_parse_alink_request(const char *payload, uint32_t payload_len, uint64_t *msg_id, char **params,
402                                        uint32_t *params_len)
403 {
404     char *value = NULL;
405     uint32_t value_len = 0;
406     int32_t res = STATE_SUCCESS;
407 
408     if ((res = core_json_value((char *)payload, payload_len, ALINK_JSON_KEY_ID, strlen(ALINK_JSON_KEY_ID),
409                                &value, &value_len)) < 0 ||
410         (res = core_str2uint64(value, value_len, msg_id) < 0)) {
411         return res;
412     }
413 
414     if ((res = core_json_value((char *)payload, payload_len, ALINK_JSON_KEY_PARAMS, strlen(ALINK_JSON_KEY_PARAMS),
415                                &value, &value_len)) < 0) {
416         return res;
417     }
418     *params = value;
419     *params_len = value_len;
420 
421     return res;
422 }
423 
_dm_recv_generic_reply_handler(void * handle,const aiot_mqtt_recv_t * msg,void * userdata)424 static void _dm_recv_generic_reply_handler(void *handle, const aiot_mqtt_recv_t *msg, void *userdata)
425 {
426     dm_handle_t *dm_handle = (dm_handle_t *)userdata;
427     aiot_dm_recv_t recv;
428     char *value = NULL;
429     uint32_t value_len = 0;
430     int32_t res = STATE_SUCCESS;
431 
432     if (NULL == dm_handle->recv_handler) {
433         return;
434     }
435 
436     /* construct recv message */
437     memset(&recv, 0, sizeof(aiot_dm_recv_t));
438     recv.type = AIOT_DMRECV_GENERIC_REPLY;
439 
440     core_log(dm_handle->sysdep, STATE_DM_LOG_RECV, "DM recv generic reply\r\n");
441 
442     do {
443         if (_dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 2, &recv.product_key) < 0 ||
444             _dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 3, &recv.device_name) < 0) {
445             break;  /* must be malloc failed */
446         }
447 
448         if ((res = core_json_value((char *)msg->data.pub.payload, msg->data.pub.payload_len,
449                                    ALINK_JSON_KEY_ID, strlen(ALINK_JSON_KEY_ID), &value, &value_len)) < 0 ||
450             (res = core_str2uint(value, value_len, &recv.data.generic_reply.msg_id)) < 0 ||
451             (res = core_json_value((char *)msg->data.pub.payload, msg->data.pub.payload_len,
452                                    ALINK_JSON_KEY_CODE, strlen(ALINK_JSON_KEY_CODE), &value, &value_len)) < 0 ||
453             (res = core_str2uint(value, value_len, &recv.data.generic_reply.code) < 0) ||
454             (res = core_json_value((char *)msg->data.pub.payload, msg->data.pub.payload_len,
455                                    ALINK_JSON_KEY_DATA, strlen(ALINK_JSON_KEY_DATA),
456                                    &recv.data.generic_reply.data,
457                                    &recv.data.generic_reply.data_len)) < 0) {
458 
459             core_log(dm_handle->sysdep, SATAE_DM_LOG_PARSE_RECV_MSG_FAILED, "DM parse generic reply failed\r\n");
460             break;
461         }
462 
463         res = core_json_value((char *)msg->data.pub.payload, msg->data.pub.payload_len,
464                         ALINK_JSON_KEY_MESSAGE, strlen(ALINK_JSON_KEY_MESSAGE),
465                         &recv.data.generic_reply.message,
466                         &recv.data.generic_reply.message_len);
467 
468         if (res < 0) {
469             core_log(dm_handle->sysdep, SATAE_DM_LOG_PARSE_RECV_MSG_FAILED, "DM parse generic reply failed\r\n");
470             break;
471         }
472         _append_diag_data(handle, DM_DIAG_MSG_TYPE_RSP, recv.data.generic_reply.msg_id);
473         dm_handle->recv_handler(dm_handle, &recv, dm_handle->userdata);
474     } while (0);
475 
476     DM_FREE(recv.product_key);
477     DM_FREE(recv.device_name);
478 }
479 
_dm_recv_property_set_handler(void * handle,const aiot_mqtt_recv_t * msg,void * userdata)480 static void _dm_recv_property_set_handler(void *handle, const aiot_mqtt_recv_t *msg, void *userdata)
481 {
482     dm_handle_t *dm_handle = (dm_handle_t *)userdata;
483     aiot_dm_recv_t recv;
484     int32_t res = STATE_SUCCESS;
485 
486     if (NULL == dm_handle->recv_handler) {
487         return;
488     }
489 
490     /* construct recv message */
491     memset(&recv, 0, sizeof(aiot_dm_recv_t));
492     recv.type = AIOT_DMRECV_PROPERTY_SET;
493 
494     core_log(dm_handle->sysdep, STATE_DM_LOG_RECV, "DM recv property set\r\n");
495 
496     do {
497         if (_dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 2, &recv.product_key) < 0 ||
498             _dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 3, &recv.device_name) < 0) {
499             break;     /* must be malloc failed */
500         }
501 
502         if ((res = _dm_parse_alink_request((char *)msg->data.pub.payload, msg->data.pub.payload_len,
503                                            &recv.data.property_set.msg_id,
504                                            &recv.data.property_set.params,
505                                            &recv.data.property_set.params_len)) < 0) {
506 
507             core_log(dm_handle->sysdep, SATAE_DM_LOG_PARSE_RECV_MSG_FAILED, "DM parse property set failed\r\n");
508             break;
509         }
510         dm_handle->recv_handler(dm_handle, &recv, dm_handle->userdata);
511     } while (0);
512 
513     DM_FREE(recv.product_key);
514     DM_FREE(recv.device_name);
515 }
516 
_dm_recv_async_service_invoke_handler(void * handle,const aiot_mqtt_recv_t * msg,void * userdata)517 static void _dm_recv_async_service_invoke_handler(void *handle, const aiot_mqtt_recv_t *msg, void *userdata)
518 {
519     dm_handle_t *dm_handle = (dm_handle_t *)userdata;
520     aiot_dm_recv_t recv;
521     int32_t res = STATE_SUCCESS;
522 
523     if (NULL == dm_handle->recv_handler) {
524         return;
525     }
526 
527     memset(&recv, 0, sizeof(aiot_dm_recv_t));
528     recv.type = AIOT_DMRECV_ASYNC_SERVICE_INVOKE;
529 
530     core_log(dm_handle->sysdep, STATE_DM_LOG_RECV, "DM recv async service invoke\r\n");
531 
532     do {
533         if (_dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 2, &recv.product_key) < 0 ||
534             _dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 3, &recv.device_name) < 0 ||
535             _dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 6,
536                                 &recv.data.async_service_invoke.service_id) < 0) {
537             break;
538         }
539         if ((res = _dm_parse_alink_request((char *)msg->data.pub.payload, msg->data.pub.payload_len,
540                                            &recv.data.async_service_invoke.msg_id,
541                                            &recv.data.async_service_invoke.params,
542                                            &recv.data.async_service_invoke.params_len)) < 0) {
543 
544             /* core_log(dm_handle->sysdep, SATAE_DM_LOG_PARSE_RECV_MSG_FAILED, "DM parse async servicey failed\r\n"); */
545             break;
546         }
547 
548         dm_handle->recv_handler(dm_handle, &recv, dm_handle->userdata);
549     } while (0);
550 
551     DM_FREE(recv.product_key);
552     DM_FREE(recv.device_name);
553     DM_FREE(recv.data.async_service_invoke.service_id);
554 }
555 
_dm_recv_sync_service_invoke_handler(void * handle,const aiot_mqtt_recv_t * msg,void * userdata)556 static void _dm_recv_sync_service_invoke_handler(void *handle, const aiot_mqtt_recv_t *msg, void *userdata)
557 {
558     dm_handle_t *dm_handle = (dm_handle_t *)userdata;
559     aiot_dm_recv_t recv;
560     int32_t res = STATE_SUCCESS;
561 
562     if (NULL == dm_handle->recv_handler) {
563         return;
564     }
565 
566     memset(&recv, 0, sizeof(aiot_dm_recv_t));
567     recv.type = AIOT_DMRECV_SYNC_SERVICE_INVOKE;
568 
569     core_log(dm_handle->sysdep, STATE_DM_LOG_RECV, "DM recv sync service invoke\r\n");
570 
571     do {
572         if (_dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 5, &recv.product_key) < 0 ||
573             _dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 6, &recv.device_name) < 0 ||
574             _dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 3,
575                                 &recv.data.sync_service_invoke.rrpc_id) < 0 ||
576             _dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 9,
577                                 &recv.data.sync_service_invoke.service_id) < 0) {
578             break;
579         }
580         if ((res = _dm_parse_alink_request((char *)msg->data.pub.payload, msg->data.pub.payload_len,
581                                            &recv.data.sync_service_invoke.msg_id,
582                                            &recv.data.sync_service_invoke.params,
583                                            &recv.data.sync_service_invoke.params_len)) < 0) {
584 
585             core_log(dm_handle->sysdep, SATAE_DM_LOG_PARSE_RECV_MSG_FAILED, "DM parse sync service failed\r\n");
586             break;
587         }
588 
589         dm_handle->recv_handler(dm_handle, &recv, dm_handle->userdata);
590     } while (0);
591 
592     DM_FREE(recv.data.sync_service_invoke.rrpc_id);
593     DM_FREE(recv.product_key);
594     DM_FREE(recv.device_name);
595     DM_FREE(recv.data.sync_service_invoke.service_id);
596 }
597 
_dm_recv_raw_data_handler(void * handle,const aiot_mqtt_recv_t * msg,void * userdata)598 static void _dm_recv_raw_data_handler(void *handle, const aiot_mqtt_recv_t *msg, void *userdata)
599 {
600     dm_handle_t *dm_handle = (dm_handle_t *)userdata;
601     aiot_dm_recv_t recv;
602 
603     if (NULL == dm_handle->recv_handler) {
604         return;
605     }
606 
607     memset(&recv, 0, sizeof(aiot_dm_recv_t));
608     recv.type = AIOT_DMRECV_RAW_DATA;
609 
610     core_log(dm_handle->sysdep, STATE_DM_LOG_RECV, "DM recv raw data\r\n");
611 
612     do {
613         if (_dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 2, &recv.product_key) < 0 ||
614             _dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 3, &recv.device_name) < 0) {
615             break;
616         }
617         recv.data.raw_data.data = msg->data.pub.payload;
618         recv.data.raw_data.data_len = msg->data.pub.payload_len;
619 
620         dm_handle->recv_handler(dm_handle, &recv, dm_handle->userdata);
621     } while (0);
622 
623     DM_FREE(recv.product_key);
624     DM_FREE(recv.device_name);
625 }
626 
_dm_recv_up_raw_reply_data_handler(void * handle,const aiot_mqtt_recv_t * msg,void * userdata)627 static void _dm_recv_up_raw_reply_data_handler(void *handle, const aiot_mqtt_recv_t *msg, void *userdata)
628 {
629     dm_handle_t *dm_handle = (dm_handle_t *)userdata;
630     aiot_dm_recv_t recv;
631 
632     if (NULL == dm_handle->recv_handler) {
633         return;
634     }
635 
636     memset(&recv, 0, sizeof(aiot_dm_recv_t));
637     recv.type = AIOT_DMRECV_RAW_DATA_REPLY;
638 
639     core_log(dm_handle->sysdep, STATE_DM_LOG_RECV, "DM recv raw data\r\n");
640 
641     do {
642         if (_dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 2, &recv.product_key) < 0 ||
643             _dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 3, &recv.device_name) < 0) {
644             break;
645         }
646         recv.data.raw_data.data = msg->data.pub.payload;
647         recv.data.raw_data.data_len = msg->data.pub.payload_len;
648 
649         dm_handle->recv_handler(dm_handle, &recv, dm_handle->userdata);
650     } while (0);
651 
652     DM_FREE(recv.product_key);
653     DM_FREE(recv.device_name);
654 }
655 
_dm_recv_raw_sync_service_invoke_handler(void * handle,const aiot_mqtt_recv_t * msg,void * userdata)656 static void _dm_recv_raw_sync_service_invoke_handler(void *handle, const aiot_mqtt_recv_t *msg, void *userdata)
657 {
658     dm_handle_t *dm_handle = (dm_handle_t *)userdata;
659     aiot_dm_recv_t recv;
660 
661     if (NULL == dm_handle->recv_handler) {
662         return;
663     }
664 
665     memset(&recv, 0, sizeof(aiot_dm_recv_t));
666     recv.type = AIOT_DMRECV_RAW_SYNC_SERVICE_INVOKE;
667 
668     core_log(dm_handle->sysdep, STATE_DM_LOG_RECV, "DM recv raw sync service invoke\r\n");
669 
670     do {
671         if (_dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 3,
672                                 &recv.data.raw_service_invoke.rrpc_id) < 0 ||
673             _dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 5, &recv.product_key) < 0 ||
674             _dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 6, &recv.device_name) < 0) {
675             break;
676         }
677         recv.data.raw_service_invoke.data = msg->data.pub.payload;
678         recv.data.raw_service_invoke.data_len = msg->data.pub.payload_len;
679 
680         dm_handle->recv_handler(dm_handle, &recv, dm_handle->userdata);
681     } while (0);
682 
683     DM_FREE(recv.data.raw_service_invoke.rrpc_id);
684     DM_FREE(recv.product_key);
685     DM_FREE(recv.device_name);
686 }
687 
_dm_core_mqtt_process_handler(void * context,aiot_mqtt_event_t * event,core_mqtt_event_t * core_event)688 static void _dm_core_mqtt_process_handler(void *context, aiot_mqtt_event_t *event, core_mqtt_event_t *core_event)
689 {
690     dm_handle_t *dm_handle = (dm_handle_t *)context;
691 
692     if (core_event != NULL) {
693         switch (core_event->type) {
694             case CORE_MQTTEVT_DEINIT: {
695                 dm_handle->mqtt_handle = NULL;
696                 return;
697             }
698             break;
699             default: {
700 
701             }
702             break;
703         }
704     }
705 }
706 
_dm_core_mqtt_operate_process_handler(dm_handle_t * dm_handle,core_mqtt_option_t option)707 static int32_t _dm_core_mqtt_operate_process_handler(dm_handle_t *dm_handle, core_mqtt_option_t option)
708 {
709     core_mqtt_process_data_t process_data;
710 
711     memset(&process_data, 0, sizeof(core_mqtt_process_data_t));
712     process_data.handler = _dm_core_mqtt_process_handler;
713     process_data.context = dm_handle;
714 
715     return core_mqtt_setopt(dm_handle->mqtt_handle, option, &process_data);
716 }
717 
aiot_dm_init(void)718 void *aiot_dm_init(void)
719 {
720     aiot_sysdep_portfile_t *sysdep = aiot_sysdep_get_portfile();
721     dm_handle_t *dm_handle = NULL;
722 
723     if (NULL == sysdep) {
724         return NULL;
725     }
726 
727     dm_handle = sysdep->core_sysdep_malloc(sizeof(dm_handle_t), DATA_MODEL_MODULE_NAME);
728     if (NULL == dm_handle) {
729         return NULL;
730     }
731 
732     memset(dm_handle, 0, sizeof(dm_handle_t));
733     dm_handle->sysdep = sysdep;
734     dm_handle->post_reply = 1;
735 
736     core_global_init(sysdep);
737     return dm_handle;
738 }
739 
aiot_dm_setopt(void * handle,aiot_dm_option_t option,void * data)740 int32_t aiot_dm_setopt(void *handle, aiot_dm_option_t option, void *data)
741 {
742     dm_handle_t *dm_handle;
743     int32_t res = STATE_SUCCESS;
744 
745     if (NULL == handle || NULL == data) {
746         return STATE_USER_INPUT_NULL_POINTER;
747     }
748     if (option >= AIOT_DMOPT_MAX) {
749         return STATE_USER_INPUT_OUT_RANGE;
750     }
751 
752     dm_handle = (dm_handle_t *)handle;
753 
754     switch (option) {
755         case AIOT_DMOPT_MQTT_HANDLE: {
756             dm_handle->mqtt_handle = data;
757             /* setup mqtt topic mapping */
758             res = _dm_setup_topic_mapping(data, dm_handle);
759             if (res >= STATE_SUCCESS) {
760                 res = _dm_core_mqtt_operate_process_handler(dm_handle, CORE_MQTTOPT_APPEND_PROCESS_HANDLER);
761             }
762         }
763         break;
764         case AIOT_DMOPT_RECV_HANDLER: {
765             dm_handle->recv_handler = (aiot_dm_recv_handler_t)data;
766         }
767         break;
768         case AIOT_DMOPT_USERDATA: {
769             dm_handle->userdata = data;
770         }
771         break;
772         case AIOT_DMOPT_POST_REPLY: {
773             dm_handle->post_reply = *(uint8_t *)data;
774         }
775         break;
776         default:
777             break;
778     }
779 
780     return res;
781 }
782 
aiot_dm_send(void * handle,const aiot_dm_msg_t * msg)783 int32_t aiot_dm_send(void *handle, const aiot_dm_msg_t *msg)
784 {
785     dm_handle_t *dm_handle = NULL;
786     char *topic = NULL;
787     int32_t res = STATE_SUCCESS;
788 
789     if (NULL == handle || NULL == msg) {
790         return STATE_USER_INPUT_NULL_POINTER;
791     }
792 
793     if (msg->type >= AIOT_DMMSG_MAX) {
794         return STATE_USER_INPUT_OUT_RANGE;
795     }
796 
797     dm_handle = (dm_handle_t *)handle;
798     if (NULL == dm_handle->mqtt_handle) {
799         return STATE_DM_MQTT_HANDLE_IS_NULL;
800     }
801 
802     res = _dm_prepare_send_topic(dm_handle, msg, &topic);
803     if (res < 0) {
804         return res;
805     }
806 
807     res = g_dm_send_topic_mapping[msg->type].func(dm_handle, topic, msg);
808     dm_handle->sysdep->core_sysdep_free(topic);
809     return res;
810 }
811 
aiot_dm_deinit(void ** p_handle)812 int32_t aiot_dm_deinit(void **p_handle)
813 {
814     dm_handle_t *dm_handle = NULL;
815     aiot_sysdep_portfile_t *sysdep = NULL;
816     uint8_t i = 0;
817 
818     if (NULL == p_handle || NULL == *p_handle) {
819         return STATE_USER_INPUT_NULL_POINTER;
820     }
821 
822     dm_handle = *p_handle;
823     sysdep = dm_handle->sysdep;
824     *p_handle = NULL;
825 
826     _dm_core_mqtt_operate_process_handler(dm_handle, CORE_MQTTOPT_REMOVE_PROCESS_HANDLER);
827 
828     /* remove mqtt topic mapping */
829     for (i = 0; i < sizeof(g_dm_recv_topic_mapping) / sizeof(dm_recv_topic_map_t); i++) {
830         aiot_mqtt_topic_map_t topic_mapping;
831         memset(&topic_mapping, 0, sizeof(aiot_mqtt_topic_map_t));
832         topic_mapping.topic = g_dm_recv_topic_mapping[i].topic;
833         topic_mapping.handler = g_dm_recv_topic_mapping[i].func;
834 
835         aiot_mqtt_setopt(dm_handle->mqtt_handle, AIOT_MQTTOPT_REMOVE_TOPIC_MAP, &topic_mapping);
836     }
837 
838     sysdep->core_sysdep_free(dm_handle);
839 
840     core_global_deinit(sysdep);
841     return STATE_SUCCESS;
842 }
843 
844