1 /**
2 * @file aiot_dm_api.c
3 * @brief 数据模型模块接口实现文件, 包含了支持物模型数据格式通信的所有接口实现
4 * @date 2020-01-20
5 *
6 * @copyright Copyright (C) 2015-2020 Alibaba Group Holding Limited
7 *
8 */
9
10 #include "dm_private.h"
11
12
13 static int32_t _dm_send_property_post(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg);
14 static int32_t _dm_send_event_post(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg);
15 static int32_t _dm_send_property_set_reply(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg);
16 static int32_t _dm_send_async_service_reply(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg);
17 static int32_t _dm_send_sync_service_reply(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg);
18 static int32_t _dm_send_raw_data(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg);
19 static int32_t _dm_send_raw_service_reply(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg);
20 static int32_t _dm_send_desired_get(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg);
21 static int32_t _dm_send_desired_delete(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg);
22 static int32_t _dm_send_property_batch_post(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg);
23
24 static void _dm_recv_generic_reply_handler(void *handle, const aiot_mqtt_recv_t *msg, void *userdata);
25 static void _dm_recv_property_set_handler(void *handle, const aiot_mqtt_recv_t *msg, void *userdata);
26 static void _dm_recv_async_service_invoke_handler(void *handle, const aiot_mqtt_recv_t *msg, void *userdata);
27 static void _dm_recv_sync_service_invoke_handler(void *handle, const aiot_mqtt_recv_t *msg, void *userdata);
28 static void _dm_recv_raw_data_handler(void *handle, const aiot_mqtt_recv_t *msg, void *userdata);
29 static void _dm_recv_raw_sync_service_invoke_handler(void *handle, const aiot_mqtt_recv_t *msg, void *userdata);
30 static void _dm_recv_up_raw_reply_data_handler(void *handle, const aiot_mqtt_recv_t *msg, void *userdata);
31
32 static const dm_send_topic_map_t g_dm_send_topic_mapping[AIOT_DMMSG_MAX] = {
33 {
34 "/sys/%s/%s/thing/event/property/post",
35 _dm_send_property_post
36 },
37 {
38 "/sys/%s/%s/thing/event/%s/post",
39 _dm_send_event_post
40 },
41 {
42 "/sys/%s/%s/thing/service/property/set_reply",
43 _dm_send_property_set_reply
44 },
45 {
46 "/sys/%s/%s/thing/service/%s_reply",
47 _dm_send_async_service_reply
48 },
49 {
50 "/ext/rrpc/%s/sys/%s/%s/thing/service/%s",
51 _dm_send_sync_service_reply
52 },
53 {
54 "/sys/%s/%s/thing/model/up_raw",
55 _dm_send_raw_data
56 },
57 {
58 "/ext/rrpc/%s/sys/%s/%s/thing/model/down_raw",
59 _dm_send_raw_service_reply
60 },
61 {
62 "/sys/%s/%s/thing/property/desired/get",
63 _dm_send_desired_get
64 },
65 {
66 "/sys/%s/%s/thing/property/desired/delete",
67 _dm_send_desired_delete
68 },
69 {
70 "/sys/%s/%s/thing/event/property/batch/post",
71 _dm_send_property_batch_post
72 },
73 };
74
75 static const dm_recv_topic_map_t g_dm_recv_topic_mapping[] = {
76 {
77 "/sys/+/+/thing/event/+/post_reply",
78 _dm_recv_generic_reply_handler,
79 },
80 {
81 "/sys/+/+/thing/service/property/set",
82 _dm_recv_property_set_handler,
83 },
84 {
85 "/sys/+/+/thing/service/+",
86 _dm_recv_async_service_invoke_handler,
87 },
88 {
89 "/ext/rrpc/+/sys/+/+/thing/service/+",
90 _dm_recv_sync_service_invoke_handler,
91 },
92 {
93 "/sys/+/+/thing/model/down_raw",
94 _dm_recv_raw_data_handler,
95 },
96 {
97 "/sys/+/+/thing/model/up_raw_reply",
98 _dm_recv_up_raw_reply_data_handler,
99 },
100 {
101 "/ext/rrpc/+/sys/+/+/thing/model/down_raw",
102 _dm_recv_raw_sync_service_invoke_handler,
103 },
104 {
105 "/sys/+/+/thing/property/desired/get_reply",
106 _dm_recv_generic_reply_handler,
107 },
108 {
109 "/sys/+/+/thing/property/desired/delete_reply",
110 _dm_recv_generic_reply_handler,
111 },
112 {
113 "/sys/+/+/thing/event/property/batch/post_reply",
114 _dm_recv_generic_reply_handler,
115 },
116 };
117
_append_diag_data(dm_handle_t * dm_handle,uint8_t msg_type,int32_t msg_id)118 static void _append_diag_data(dm_handle_t *dm_handle, uint8_t msg_type, int32_t msg_id)
119 {
120 /* append diagnose data */
121 uint8_t diag_data[] = { 0x00, 0x30, 0x01, 0x00, 0x00, 0x31, 0x04, 0x00, 0x00, 0x00, 0x00 };
122 diag_data[3] = msg_type;
123 diag_data[7] = (msg_id >> 24) & 0xFF;
124 diag_data[8] = (msg_id >> 16) & 0xFF;
125 diag_data[9] = (msg_id >> 8) & 0xFF;
126 diag_data[10] = msg_id & 0xFF;
127 core_diag(dm_handle->sysdep, STATE_DM_BASE, diag_data, sizeof(diag_data));
128 }
129
_dm_setup_topic_mapping(void * mqtt_handle,void * dm_handle)130 static int32_t _dm_setup_topic_mapping(void *mqtt_handle, void *dm_handle)
131 {
132 uint32_t i = 0;
133 int32_t res = STATE_SUCCESS;
134
135 for (i = 0; i < sizeof(g_dm_recv_topic_mapping) / sizeof(dm_recv_topic_map_t); i++) {
136 aiot_mqtt_topic_map_t topic_mapping;
137 topic_mapping.topic = g_dm_recv_topic_mapping[i].topic;
138 topic_mapping.handler = g_dm_recv_topic_mapping[i].func;
139 topic_mapping.userdata = dm_handle;
140
141 res = aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_APPEND_TOPIC_MAP, &topic_mapping);
142 if (res < 0) {
143 break;
144 }
145 }
146 return res;
147 }
148
_dm_prepare_send_topic(dm_handle_t * dm_handle,const aiot_dm_msg_t * msg,char ** topic)149 static int32_t _dm_prepare_send_topic(dm_handle_t *dm_handle, const aiot_dm_msg_t *msg, char **topic)
150 {
151 char *src[4];
152 uint8_t src_count = 0;
153 char *pk = NULL;
154 char *dn = NULL;
155
156 if (NULL == msg->product_key && NULL == core_mqtt_get_product_key(dm_handle->mqtt_handle)) {
157 return STATE_USER_INPUT_MISSING_PRODUCT_KEY;
158 }
159 if (NULL == msg->device_name && NULL == core_mqtt_get_device_name(dm_handle->mqtt_handle)) {
160 return STATE_USER_INPUT_MISSING_DEVICE_NAME;
161 }
162
163 pk = (msg->product_key != NULL) ? msg->product_key : core_mqtt_get_product_key(dm_handle->mqtt_handle);
164 dn = (msg->device_name != NULL) ? msg->device_name : core_mqtt_get_device_name(dm_handle->mqtt_handle);
165
166 switch (msg->type) {
167 case AIOT_DMMSG_PROPERTY_POST:
168 case AIOT_DMMSG_PROPERTY_BATCH_POST:
169 case AIOT_DMMSG_PROPERTY_SET_REPLY:
170 case AIOT_DMMSG_GET_DESIRED:
171 case AIOT_DMMSG_DELETE_DESIRED:
172 case AIOT_DMMSG_RAW_DATA: {
173 src[0] = pk;
174 src[1] = dn;
175 src_count = 2;
176 }
177 break;
178 case AIOT_DMMSG_EVENT_POST: {
179 if (msg->data.event_post.event_id == NULL) {
180 return STATE_DM_EVENT_ID_IS_NULL;
181 }
182 src[0] = pk;
183 src[1] = dn;
184 src[2] = msg->data.event_post.event_id;
185 src_count = 3;
186 }
187 break;
188 case AIOT_DMMSG_ASYNC_SERVICE_REPLY: {
189 if (msg->data.async_service_reply.service_id == NULL) {
190 return STATE_DM_SERVICE_ID_IS_NULL;
191 }
192 src[0] = pk;
193 src[1] = dn;
194 src[2] = msg->data.async_service_reply.service_id;
195 src_count = 3;
196 }
197 break;
198 case AIOT_DMMSG_SYNC_SERVICE_REPLY: {
199 if (msg->data.sync_service_reply.rrpc_id == NULL) {
200 return STATE_DM_RRPC_ID_IS_NULL;
201 }
202 if (msg->data.sync_service_reply.service_id == NULL) {
203 return STATE_DM_SERVICE_ID_IS_NULL;
204 }
205 src[0] = msg->data.sync_service_reply.rrpc_id;
206 src[1] = pk;
207 src[2] = dn;
208 src[3] = msg->data.sync_service_reply.service_id;
209 src_count = 4;
210 }
211 break;
212 case AIOT_DMMSG_RAW_SERVICE_REPLY: {
213 if (msg->data.raw_service_reply.rrpc_id == NULL) {
214 return STATE_DM_RRPC_ID_IS_NULL;
215 }
216 src[0] = msg->data.raw_service_reply.rrpc_id;
217 src[1] = pk;
218 src[2] = dn;
219 src_count = 3;
220 }
221 break;
222 default:
223 return STATE_USER_INPUT_OUT_RANGE;
224 }
225
226 return core_sprintf(dm_handle->sysdep, topic, g_dm_send_topic_mapping[msg->type].topic, src, src_count,
227 DATA_MODEL_MODULE_NAME);
228 }
229
230
_dm_send_alink_req(dm_handle_t * handle,const char * topic,char * params)231 static int32_t _dm_send_alink_req(dm_handle_t *handle, const char *topic, char *params)
232 {
233 char *payload = NULL;
234 int32_t id = 0;
235 char id_string[11] = { 0 };
236 char *src[3] = { NULL };
237 int32_t res = STATE_SUCCESS;
238
239 if (NULL == params) {
240 return STATE_DM_MSG_PARAMS_IS_NULL;
241 }
242
243 core_global_alink_id_next(handle->sysdep, &id);
244 core_int2str(id, id_string, NULL);
245
246 _append_diag_data(handle, DM_DIAG_MSG_TYPE_REQ, id);
247
248 src[0] = id_string;
249 src[1] = params;
250 src[2] = (0 == handle->post_reply) ? "0" : "1";
251
252 res = core_sprintf(handle->sysdep, &payload, ALINK_REQUEST_FMT, src, sizeof(src) / sizeof(char *),
253 DATA_MODEL_MODULE_NAME);
254 if (res < 0) {
255 return res;
256 }
257
258 res = aiot_mqtt_pub(handle->mqtt_handle, (char *)topic, (uint8_t *)payload, strlen(payload), 0);
259 handle->sysdep->core_sysdep_free(payload);
260
261 if (STATE_SUCCESS == res) {
262 return id;
263 }
264 return res;
265 }
266
_dm_send_alink_rsp(dm_handle_t * handle,const char * topic,uint64_t msg_id,uint32_t code,char * data)267 static int32_t _dm_send_alink_rsp(dm_handle_t *handle, const char *topic, uint64_t msg_id, uint32_t code,
268 char *data)
269 {
270 char *payload = NULL;
271 char id_string[21] = { 0 };
272 char code_string[11] = { 0 };
273 char *src[3] = { NULL };
274 int32_t res = STATE_SUCCESS;
275
276 if (NULL == data) {
277 return STATE_DM_MSG_DATA_IS_NULL;
278 }
279
280 core_uint642str(msg_id, id_string, NULL);
281 core_uint2str(code, code_string, NULL);
282
283 src[0] = id_string;
284 src[1] = code_string;
285 src[2] = data;
286
287 res = core_sprintf(handle->sysdep, &payload, ALINK_RESPONSE_FMT, src, sizeof(src) / sizeof(char *),
288 DATA_MODEL_MODULE_NAME);
289 if (res < 0) {
290 return res;
291 }
292
293 res = aiot_mqtt_pub(handle->mqtt_handle, (char *)topic, (uint8_t *)payload, strlen(payload), 0);
294 handle->sysdep->core_sysdep_free(payload);
295
296 return res;
297 }
298
299 /*** dm send function start ***/
_dm_send_property_post(dm_handle_t * handle,const char * topic,const aiot_dm_msg_t * msg)300 static int32_t _dm_send_property_post(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg)
301 {
302 return _dm_send_alink_req(handle, topic, msg->data.property_post.params);
303 }
304
_dm_send_event_post(dm_handle_t * handle,const char * topic,const aiot_dm_msg_t * msg)305 static int32_t _dm_send_event_post(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg)
306 {
307 return _dm_send_alink_req(handle, topic, msg->data.event_post.params);
308 }
309
_dm_send_property_set_reply(dm_handle_t * handle,const char * topic,const aiot_dm_msg_t * msg)310 static int32_t _dm_send_property_set_reply(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg)
311 {
312 return _dm_send_alink_rsp(handle, topic, msg->data.property_set_reply.msg_id,
313 msg->data.property_set_reply.code,
314 msg->data.property_set_reply.data);
315 }
316
_dm_send_async_service_reply(dm_handle_t * handle,const char * topic,const aiot_dm_msg_t * msg)317 static int32_t _dm_send_async_service_reply(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg)
318 {
319 return _dm_send_alink_rsp(handle, topic, msg->data.async_service_reply.msg_id,
320 msg->data.async_service_reply.code,
321 msg->data.async_service_reply.data);
322 }
323
_dm_send_sync_service_reply(dm_handle_t * handle,const char * topic,const aiot_dm_msg_t * msg)324 static int32_t _dm_send_sync_service_reply(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg)
325 {
326 return _dm_send_alink_rsp(handle, topic, msg->data.sync_service_reply.msg_id,
327 msg->data.sync_service_reply.code,
328 msg->data.sync_service_reply.data);
329 }
330
_dm_send_raw_data(dm_handle_t * handle,const char * topic,const aiot_dm_msg_t * msg)331 static int32_t _dm_send_raw_data(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg)
332 {
333 return aiot_mqtt_pub(handle->mqtt_handle, (char *)topic, msg->data.raw_data.data, msg->data.raw_data.data_len, 0);
334 }
335
_dm_send_raw_service_reply(dm_handle_t * handle,const char * topic,const aiot_dm_msg_t * msg)336 static int32_t _dm_send_raw_service_reply(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg)
337 {
338 return aiot_mqtt_pub(handle->mqtt_handle, (char *)topic, msg->data.raw_service_reply.data,
339 msg->data.raw_service_reply.data_len, 0);
340 }
341
_dm_send_desired_get(dm_handle_t * handle,const char * topic,const aiot_dm_msg_t * msg)342 static int32_t _dm_send_desired_get(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg)
343 {
344 return _dm_send_alink_req(handle, topic, msg->data.get_desired.params);
345 }
346
_dm_send_desired_delete(dm_handle_t * handle,const char * topic,const aiot_dm_msg_t * msg)347 static int32_t _dm_send_desired_delete(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg)
348 {
349 return _dm_send_alink_req(handle, topic, msg->data.delete_desired.params);
350 }
351
_dm_send_property_batch_post(dm_handle_t * handle,const char * topic,const aiot_dm_msg_t * msg)352 static int32_t _dm_send_property_batch_post(dm_handle_t *handle, const char *topic, const aiot_dm_msg_t *msg)
353 {
354 return _dm_send_alink_req(handle, topic, msg->data.property_post.params);
355 }
356 /*** dm send function end ***/
357
358 /*** dm recv handler functions start ***/
_dm_get_topic_level(aiot_sysdep_portfile_t * sysdep,char * topic,uint32_t topic_len,uint8_t level,char ** level_name)359 static int32_t _dm_get_topic_level(aiot_sysdep_portfile_t *sysdep, char *topic, uint32_t topic_len, uint8_t level,
360 char **level_name)
361 {
362 uint32_t i = 0;
363 uint16_t level_curr = 0;
364 char *p_open = NULL;
365 char *p_close = NULL;
366 char *p_name = NULL;
367 uint16_t name_len = 0;
368
369 for (i = 0; i < (topic_len - 1); i++) {
370 if (topic[i] == '/') {
371 level_curr++;
372 if (level_curr == level && p_open == NULL) {
373 p_open = topic + i + 1;
374 }
375
376 if (level_curr == (level + 1) && p_close == NULL) {
377 p_close = topic + i;
378 }
379 }
380 }
381
382 if (p_open == NULL) {
383 return STATE_DM_INTERNAL_TOPIC_ERROR;
384 }
385 if (p_close == NULL) {
386 p_close = topic + topic_len;
387 }
388
389 name_len = p_close - p_open;
390 p_name = sysdep->core_sysdep_malloc(name_len + 1, DATA_MODEL_MODULE_NAME);
391 if (p_name == NULL) {
392 return STATE_SYS_DEPEND_MALLOC_FAILED;
393 }
394 memset(p_name, 0, name_len + 1);
395 memcpy(p_name, p_open, name_len);
396 *level_name = p_name;
397
398 return STATE_SUCCESS;
399 }
400
_dm_parse_alink_request(const char * payload,uint32_t payload_len,uint64_t * msg_id,char ** params,uint32_t * params_len)401 static int32_t _dm_parse_alink_request(const char *payload, uint32_t payload_len, uint64_t *msg_id, char **params,
402 uint32_t *params_len)
403 {
404 char *value = NULL;
405 uint32_t value_len = 0;
406 int32_t res = STATE_SUCCESS;
407
408 if ((res = core_json_value((char *)payload, payload_len, ALINK_JSON_KEY_ID, strlen(ALINK_JSON_KEY_ID),
409 &value, &value_len)) < 0 ||
410 (res = core_str2uint64(value, value_len, msg_id) < 0)) {
411 return res;
412 }
413
414 if ((res = core_json_value((char *)payload, payload_len, ALINK_JSON_KEY_PARAMS, strlen(ALINK_JSON_KEY_PARAMS),
415 &value, &value_len)) < 0) {
416 return res;
417 }
418 *params = value;
419 *params_len = value_len;
420
421 return res;
422 }
423
_dm_recv_generic_reply_handler(void * handle,const aiot_mqtt_recv_t * msg,void * userdata)424 static void _dm_recv_generic_reply_handler(void *handle, const aiot_mqtt_recv_t *msg, void *userdata)
425 {
426 dm_handle_t *dm_handle = (dm_handle_t *)userdata;
427 aiot_dm_recv_t recv;
428 char *value = NULL;
429 uint32_t value_len = 0;
430 int32_t res = STATE_SUCCESS;
431
432 if (NULL == dm_handle->recv_handler) {
433 return;
434 }
435
436 /* construct recv message */
437 memset(&recv, 0, sizeof(aiot_dm_recv_t));
438 recv.type = AIOT_DMRECV_GENERIC_REPLY;
439
440 core_log(dm_handle->sysdep, STATE_DM_LOG_RECV, "DM recv generic reply\r\n");
441
442 do {
443 if (_dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 2, &recv.product_key) < 0 ||
444 _dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 3, &recv.device_name) < 0) {
445 break; /* must be malloc failed */
446 }
447
448 if ((res = core_json_value((char *)msg->data.pub.payload, msg->data.pub.payload_len,
449 ALINK_JSON_KEY_ID, strlen(ALINK_JSON_KEY_ID), &value, &value_len)) < 0 ||
450 (res = core_str2uint(value, value_len, &recv.data.generic_reply.msg_id)) < 0 ||
451 (res = core_json_value((char *)msg->data.pub.payload, msg->data.pub.payload_len,
452 ALINK_JSON_KEY_CODE, strlen(ALINK_JSON_KEY_CODE), &value, &value_len)) < 0 ||
453 (res = core_str2uint(value, value_len, &recv.data.generic_reply.code) < 0) ||
454 (res = core_json_value((char *)msg->data.pub.payload, msg->data.pub.payload_len,
455 ALINK_JSON_KEY_DATA, strlen(ALINK_JSON_KEY_DATA),
456 &recv.data.generic_reply.data,
457 &recv.data.generic_reply.data_len)) < 0) {
458
459 core_log(dm_handle->sysdep, SATAE_DM_LOG_PARSE_RECV_MSG_FAILED, "DM parse generic reply failed\r\n");
460 break;
461 }
462
463 res = core_json_value((char *)msg->data.pub.payload, msg->data.pub.payload_len,
464 ALINK_JSON_KEY_MESSAGE, strlen(ALINK_JSON_KEY_MESSAGE),
465 &recv.data.generic_reply.message,
466 &recv.data.generic_reply.message_len);
467
468 if (res < 0) {
469 core_log(dm_handle->sysdep, SATAE_DM_LOG_PARSE_RECV_MSG_FAILED, "DM parse generic reply failed\r\n");
470 break;
471 }
472 _append_diag_data(handle, DM_DIAG_MSG_TYPE_RSP, recv.data.generic_reply.msg_id);
473 dm_handle->recv_handler(dm_handle, &recv, dm_handle->userdata);
474 } while (0);
475
476 DM_FREE(recv.product_key);
477 DM_FREE(recv.device_name);
478 }
479
_dm_recv_property_set_handler(void * handle,const aiot_mqtt_recv_t * msg,void * userdata)480 static void _dm_recv_property_set_handler(void *handle, const aiot_mqtt_recv_t *msg, void *userdata)
481 {
482 dm_handle_t *dm_handle = (dm_handle_t *)userdata;
483 aiot_dm_recv_t recv;
484 int32_t res = STATE_SUCCESS;
485
486 if (NULL == dm_handle->recv_handler) {
487 return;
488 }
489
490 /* construct recv message */
491 memset(&recv, 0, sizeof(aiot_dm_recv_t));
492 recv.type = AIOT_DMRECV_PROPERTY_SET;
493
494 core_log(dm_handle->sysdep, STATE_DM_LOG_RECV, "DM recv property set\r\n");
495
496 do {
497 if (_dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 2, &recv.product_key) < 0 ||
498 _dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 3, &recv.device_name) < 0) {
499 break; /* must be malloc failed */
500 }
501
502 if ((res = _dm_parse_alink_request((char *)msg->data.pub.payload, msg->data.pub.payload_len,
503 &recv.data.property_set.msg_id,
504 &recv.data.property_set.params,
505 &recv.data.property_set.params_len)) < 0) {
506
507 core_log(dm_handle->sysdep, SATAE_DM_LOG_PARSE_RECV_MSG_FAILED, "DM parse property set failed\r\n");
508 break;
509 }
510 dm_handle->recv_handler(dm_handle, &recv, dm_handle->userdata);
511 } while (0);
512
513 DM_FREE(recv.product_key);
514 DM_FREE(recv.device_name);
515 }
516
_dm_recv_async_service_invoke_handler(void * handle,const aiot_mqtt_recv_t * msg,void * userdata)517 static void _dm_recv_async_service_invoke_handler(void *handle, const aiot_mqtt_recv_t *msg, void *userdata)
518 {
519 dm_handle_t *dm_handle = (dm_handle_t *)userdata;
520 aiot_dm_recv_t recv;
521 int32_t res = STATE_SUCCESS;
522
523 if (NULL == dm_handle->recv_handler) {
524 return;
525 }
526
527 memset(&recv, 0, sizeof(aiot_dm_recv_t));
528 recv.type = AIOT_DMRECV_ASYNC_SERVICE_INVOKE;
529
530 core_log(dm_handle->sysdep, STATE_DM_LOG_RECV, "DM recv async service invoke\r\n");
531
532 do {
533 if (_dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 2, &recv.product_key) < 0 ||
534 _dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 3, &recv.device_name) < 0 ||
535 _dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 6,
536 &recv.data.async_service_invoke.service_id) < 0) {
537 break;
538 }
539 if ((res = _dm_parse_alink_request((char *)msg->data.pub.payload, msg->data.pub.payload_len,
540 &recv.data.async_service_invoke.msg_id,
541 &recv.data.async_service_invoke.params,
542 &recv.data.async_service_invoke.params_len)) < 0) {
543
544 /* core_log(dm_handle->sysdep, SATAE_DM_LOG_PARSE_RECV_MSG_FAILED, "DM parse async servicey failed\r\n"); */
545 break;
546 }
547
548 dm_handle->recv_handler(dm_handle, &recv, dm_handle->userdata);
549 } while (0);
550
551 DM_FREE(recv.product_key);
552 DM_FREE(recv.device_name);
553 DM_FREE(recv.data.async_service_invoke.service_id);
554 }
555
_dm_recv_sync_service_invoke_handler(void * handle,const aiot_mqtt_recv_t * msg,void * userdata)556 static void _dm_recv_sync_service_invoke_handler(void *handle, const aiot_mqtt_recv_t *msg, void *userdata)
557 {
558 dm_handle_t *dm_handle = (dm_handle_t *)userdata;
559 aiot_dm_recv_t recv;
560 int32_t res = STATE_SUCCESS;
561
562 if (NULL == dm_handle->recv_handler) {
563 return;
564 }
565
566 memset(&recv, 0, sizeof(aiot_dm_recv_t));
567 recv.type = AIOT_DMRECV_SYNC_SERVICE_INVOKE;
568
569 core_log(dm_handle->sysdep, STATE_DM_LOG_RECV, "DM recv sync service invoke\r\n");
570
571 do {
572 if (_dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 5, &recv.product_key) < 0 ||
573 _dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 6, &recv.device_name) < 0 ||
574 _dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 3,
575 &recv.data.sync_service_invoke.rrpc_id) < 0 ||
576 _dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 9,
577 &recv.data.sync_service_invoke.service_id) < 0) {
578 break;
579 }
580 if ((res = _dm_parse_alink_request((char *)msg->data.pub.payload, msg->data.pub.payload_len,
581 &recv.data.sync_service_invoke.msg_id,
582 &recv.data.sync_service_invoke.params,
583 &recv.data.sync_service_invoke.params_len)) < 0) {
584
585 core_log(dm_handle->sysdep, SATAE_DM_LOG_PARSE_RECV_MSG_FAILED, "DM parse sync service failed\r\n");
586 break;
587 }
588
589 dm_handle->recv_handler(dm_handle, &recv, dm_handle->userdata);
590 } while (0);
591
592 DM_FREE(recv.data.sync_service_invoke.rrpc_id);
593 DM_FREE(recv.product_key);
594 DM_FREE(recv.device_name);
595 DM_FREE(recv.data.sync_service_invoke.service_id);
596 }
597
_dm_recv_raw_data_handler(void * handle,const aiot_mqtt_recv_t * msg,void * userdata)598 static void _dm_recv_raw_data_handler(void *handle, const aiot_mqtt_recv_t *msg, void *userdata)
599 {
600 dm_handle_t *dm_handle = (dm_handle_t *)userdata;
601 aiot_dm_recv_t recv;
602
603 if (NULL == dm_handle->recv_handler) {
604 return;
605 }
606
607 memset(&recv, 0, sizeof(aiot_dm_recv_t));
608 recv.type = AIOT_DMRECV_RAW_DATA;
609
610 core_log(dm_handle->sysdep, STATE_DM_LOG_RECV, "DM recv raw data\r\n");
611
612 do {
613 if (_dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 2, &recv.product_key) < 0 ||
614 _dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 3, &recv.device_name) < 0) {
615 break;
616 }
617 recv.data.raw_data.data = msg->data.pub.payload;
618 recv.data.raw_data.data_len = msg->data.pub.payload_len;
619
620 dm_handle->recv_handler(dm_handle, &recv, dm_handle->userdata);
621 } while (0);
622
623 DM_FREE(recv.product_key);
624 DM_FREE(recv.device_name);
625 }
626
_dm_recv_up_raw_reply_data_handler(void * handle,const aiot_mqtt_recv_t * msg,void * userdata)627 static void _dm_recv_up_raw_reply_data_handler(void *handle, const aiot_mqtt_recv_t *msg, void *userdata)
628 {
629 dm_handle_t *dm_handle = (dm_handle_t *)userdata;
630 aiot_dm_recv_t recv;
631
632 if (NULL == dm_handle->recv_handler) {
633 return;
634 }
635
636 memset(&recv, 0, sizeof(aiot_dm_recv_t));
637 recv.type = AIOT_DMRECV_RAW_DATA_REPLY;
638
639 core_log(dm_handle->sysdep, STATE_DM_LOG_RECV, "DM recv raw data\r\n");
640
641 do {
642 if (_dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 2, &recv.product_key) < 0 ||
643 _dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 3, &recv.device_name) < 0) {
644 break;
645 }
646 recv.data.raw_data.data = msg->data.pub.payload;
647 recv.data.raw_data.data_len = msg->data.pub.payload_len;
648
649 dm_handle->recv_handler(dm_handle, &recv, dm_handle->userdata);
650 } while (0);
651
652 DM_FREE(recv.product_key);
653 DM_FREE(recv.device_name);
654 }
655
_dm_recv_raw_sync_service_invoke_handler(void * handle,const aiot_mqtt_recv_t * msg,void * userdata)656 static void _dm_recv_raw_sync_service_invoke_handler(void *handle, const aiot_mqtt_recv_t *msg, void *userdata)
657 {
658 dm_handle_t *dm_handle = (dm_handle_t *)userdata;
659 aiot_dm_recv_t recv;
660
661 if (NULL == dm_handle->recv_handler) {
662 return;
663 }
664
665 memset(&recv, 0, sizeof(aiot_dm_recv_t));
666 recv.type = AIOT_DMRECV_RAW_SYNC_SERVICE_INVOKE;
667
668 core_log(dm_handle->sysdep, STATE_DM_LOG_RECV, "DM recv raw sync service invoke\r\n");
669
670 do {
671 if (_dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 3,
672 &recv.data.raw_service_invoke.rrpc_id) < 0 ||
673 _dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 5, &recv.product_key) < 0 ||
674 _dm_get_topic_level(dm_handle->sysdep, msg->data.pub.topic, msg->data.pub.topic_len, 6, &recv.device_name) < 0) {
675 break;
676 }
677 recv.data.raw_service_invoke.data = msg->data.pub.payload;
678 recv.data.raw_service_invoke.data_len = msg->data.pub.payload_len;
679
680 dm_handle->recv_handler(dm_handle, &recv, dm_handle->userdata);
681 } while (0);
682
683 DM_FREE(recv.data.raw_service_invoke.rrpc_id);
684 DM_FREE(recv.product_key);
685 DM_FREE(recv.device_name);
686 }
687
_dm_core_mqtt_process_handler(void * context,aiot_mqtt_event_t * event,core_mqtt_event_t * core_event)688 static void _dm_core_mqtt_process_handler(void *context, aiot_mqtt_event_t *event, core_mqtt_event_t *core_event)
689 {
690 dm_handle_t *dm_handle = (dm_handle_t *)context;
691
692 if (core_event != NULL) {
693 switch (core_event->type) {
694 case CORE_MQTTEVT_DEINIT: {
695 dm_handle->mqtt_handle = NULL;
696 return;
697 }
698 break;
699 default: {
700
701 }
702 break;
703 }
704 }
705 }
706
_dm_core_mqtt_operate_process_handler(dm_handle_t * dm_handle,core_mqtt_option_t option)707 static int32_t _dm_core_mqtt_operate_process_handler(dm_handle_t *dm_handle, core_mqtt_option_t option)
708 {
709 core_mqtt_process_data_t process_data;
710
711 memset(&process_data, 0, sizeof(core_mqtt_process_data_t));
712 process_data.handler = _dm_core_mqtt_process_handler;
713 process_data.context = dm_handle;
714
715 return core_mqtt_setopt(dm_handle->mqtt_handle, option, &process_data);
716 }
717
aiot_dm_init(void)718 void *aiot_dm_init(void)
719 {
720 aiot_sysdep_portfile_t *sysdep = aiot_sysdep_get_portfile();
721 dm_handle_t *dm_handle = NULL;
722
723 if (NULL == sysdep) {
724 return NULL;
725 }
726
727 dm_handle = sysdep->core_sysdep_malloc(sizeof(dm_handle_t), DATA_MODEL_MODULE_NAME);
728 if (NULL == dm_handle) {
729 return NULL;
730 }
731
732 memset(dm_handle, 0, sizeof(dm_handle_t));
733 dm_handle->sysdep = sysdep;
734 dm_handle->post_reply = 1;
735
736 core_global_init(sysdep);
737 return dm_handle;
738 }
739
aiot_dm_setopt(void * handle,aiot_dm_option_t option,void * data)740 int32_t aiot_dm_setopt(void *handle, aiot_dm_option_t option, void *data)
741 {
742 dm_handle_t *dm_handle;
743 int32_t res = STATE_SUCCESS;
744
745 if (NULL == handle || NULL == data) {
746 return STATE_USER_INPUT_NULL_POINTER;
747 }
748 if (option >= AIOT_DMOPT_MAX) {
749 return STATE_USER_INPUT_OUT_RANGE;
750 }
751
752 dm_handle = (dm_handle_t *)handle;
753
754 switch (option) {
755 case AIOT_DMOPT_MQTT_HANDLE: {
756 dm_handle->mqtt_handle = data;
757 /* setup mqtt topic mapping */
758 res = _dm_setup_topic_mapping(data, dm_handle);
759 if (res >= STATE_SUCCESS) {
760 res = _dm_core_mqtt_operate_process_handler(dm_handle, CORE_MQTTOPT_APPEND_PROCESS_HANDLER);
761 }
762 }
763 break;
764 case AIOT_DMOPT_RECV_HANDLER: {
765 dm_handle->recv_handler = (aiot_dm_recv_handler_t)data;
766 }
767 break;
768 case AIOT_DMOPT_USERDATA: {
769 dm_handle->userdata = data;
770 }
771 break;
772 case AIOT_DMOPT_POST_REPLY: {
773 dm_handle->post_reply = *(uint8_t *)data;
774 }
775 break;
776 default:
777 break;
778 }
779
780 return res;
781 }
782
aiot_dm_send(void * handle,const aiot_dm_msg_t * msg)783 int32_t aiot_dm_send(void *handle, const aiot_dm_msg_t *msg)
784 {
785 dm_handle_t *dm_handle = NULL;
786 char *topic = NULL;
787 int32_t res = STATE_SUCCESS;
788
789 if (NULL == handle || NULL == msg) {
790 return STATE_USER_INPUT_NULL_POINTER;
791 }
792
793 if (msg->type >= AIOT_DMMSG_MAX) {
794 return STATE_USER_INPUT_OUT_RANGE;
795 }
796
797 dm_handle = (dm_handle_t *)handle;
798 if (NULL == dm_handle->mqtt_handle) {
799 return STATE_DM_MQTT_HANDLE_IS_NULL;
800 }
801
802 res = _dm_prepare_send_topic(dm_handle, msg, &topic);
803 if (res < 0) {
804 return res;
805 }
806
807 res = g_dm_send_topic_mapping[msg->type].func(dm_handle, topic, msg);
808 dm_handle->sysdep->core_sysdep_free(topic);
809 return res;
810 }
811
aiot_dm_deinit(void ** p_handle)812 int32_t aiot_dm_deinit(void **p_handle)
813 {
814 dm_handle_t *dm_handle = NULL;
815 aiot_sysdep_portfile_t *sysdep = NULL;
816 uint8_t i = 0;
817
818 if (NULL == p_handle || NULL == *p_handle) {
819 return STATE_USER_INPUT_NULL_POINTER;
820 }
821
822 dm_handle = *p_handle;
823 sysdep = dm_handle->sysdep;
824 *p_handle = NULL;
825
826 _dm_core_mqtt_operate_process_handler(dm_handle, CORE_MQTTOPT_REMOVE_PROCESS_HANDLER);
827
828 /* remove mqtt topic mapping */
829 for (i = 0; i < sizeof(g_dm_recv_topic_mapping) / sizeof(dm_recv_topic_map_t); i++) {
830 aiot_mqtt_topic_map_t topic_mapping;
831 memset(&topic_mapping, 0, sizeof(aiot_mqtt_topic_map_t));
832 topic_mapping.topic = g_dm_recv_topic_mapping[i].topic;
833 topic_mapping.handler = g_dm_recv_topic_mapping[i].func;
834
835 aiot_mqtt_setopt(dm_handle->mqtt_handle, AIOT_MQTTOPT_REMOVE_TOPIC_MAP, &topic_mapping);
836 }
837
838 sysdep->core_sysdep_free(dm_handle);
839
840 core_global_deinit(sysdep);
841 return STATE_SUCCESS;
842 }
843
844