1 #include <string.h>
2 #include "linkkit/infra/infra_types.h"
3 #include "linkkit/infra/infra_defs.h"
4 #include "linkkit/infra/infra_string.h"
5 #include "linkkit/infra/infra_list.h"
6 #include "linkkit/infra/infra_report.h"
7 #include "linkkit/infra/infra_sha256.h"
8 #include "linkkit/infra/infra_compat.h"
9 #include "mqtt_wrapper.h"
10 #include "linkkit/dev_sign_api.h"
11 #include "linkkit/mqtt_api.h"
12 
13 #ifdef PLATFORM_HAS_DYNMEM
14 #ifdef INFRA_MEM_STATS
15 #include "linkkit/infra/infra_mem_stats.h"
16 #define mqtt_api_malloc(size) LITE_malloc(size, MEM_MAGIC, "mqtt-api")
17 #define mqtt_api_free(ptr)    LITE_free(ptr)
18 #else
19 #define mqtt_api_malloc(size) HAL_Malloc(size)
20 #define mqtt_api_free(ptr)     \
21     {                          \
22         HAL_Free((void *)ptr); \
23         ptr = NULL;            \
24     }
25 #endif
26 
27 #else
28 static iotx_mqtt_param_t g_iotx_mqtt_param;
29 #endif
30 
31 #ifdef INFRA_LOG
32 #include "linkkit/infra/infra_log.h"
33 #define mqtt_emerg(...)   log_emerg("MQTT", __VA_ARGS__)
34 #define mqtt_crit(...)    log_crit("MQTT", __VA_ARGS__)
35 #define mqtt_err(...)     log_err("MQTT", __VA_ARGS__)
36 #define mqtt_warning(...) log_warning("MQTT", __VA_ARGS__)
37 #define mqtt_info(...)    log_info("MQTT", __VA_ARGS__)
38 #define mqtt_debug(...)   log_debug("MQTT", __VA_ARGS__)
39 #else
40 #define mqtt_emerg(...)          \
41     do {                         \
42         HAL_Printf(__VA_ARGS__); \
43         HAL_Printf("\r\n");      \
44     } while (0)
45 #define mqtt_crit(...)           \
46     do {                         \
47         HAL_Printf(__VA_ARGS__); \
48         HAL_Printf("\r\n");      \
49     } while (0)
50 #define mqtt_err(...)            \
51     do {                         \
52         HAL_Printf(__VA_ARGS__); \
53         HAL_Printf("\r\n");      \
54     } while (0)
55 #define mqtt_warning(...)        \
56     do {                         \
57         HAL_Printf(__VA_ARGS__); \
58         HAL_Printf("\r\n");      \
59     } while (0)
60 #define mqtt_info(...)           \
61     do {                         \
62         HAL_Printf(__VA_ARGS__); \
63         HAL_Printf("\r\n");      \
64     } while (0)
65 #define mqtt_debug(...)          \
66     do {                         \
67         HAL_Printf(__VA_ARGS__); \
68         HAL_Printf("\r\n");      \
69     } while (0)
70 #endif
71 
72 static void *g_mqtt_client = NULL;
73 iotx_sign_mqtt_t g_default_sign;
74 
75 /* Handle structure of subscribed topic */
iotx_mqtt_report_funcs(void * pclient)76 static void iotx_mqtt_report_funcs(void *pclient)
77 {
78     /* report firmware version */
79 #if !defined(ATHOST_MQTT_REPORT_DISBALED) && !defined(MUTE_VERSION_REPORT)
80     int err;
81     iotx_set_report_func(IOT_MQTT_Publish_Simple);
82     err = iotx_report_firmware_version(pclient);
83     if (SUCCESS_RETURN != err) {
84         mqtt_err("failed to report firmware version");
85     }
86 #endif
87 #ifdef BUILD_AOS
88     activation_report();
89 #endif
90 }
91 
92 #ifdef DYNAMIC_REGISTER
93 #include "linkkit/dynreg_api.h"
94 int HAL_SetDeviceSecret(char *device_secret);
95 int HAL_GetProductSecret(char *product_secret);
96 int HAL_Kv_Set(const char *key, const void *val, int len, int sync);
97 int HAL_Kv_Get(const char *key, void *val, int *buffer_len);
98 
99 #define DYNAMIC_REG_KV_PREFIX     "DYNAMIC_REG_"
100 #define DYNAMIC_REG_KV_PREFIX_LEN 12
101 
_iotx_dynamic_register(iotx_http_region_types_t region,iotx_dev_meta_info_t * meta_info)102 static int _iotx_dynamic_register(iotx_http_region_types_t region,
103                                   iotx_dev_meta_info_t *meta_info)
104 {
105     int res = FAIL_RETURN;
106 
107     HAL_GetDeviceSecret(meta_info->device_secret);
108     /* Check if Device Secret exist in KV */
109     if (strlen(meta_info->device_secret) == 0) {
110         /* KV not exit, goto dynamic register */
111         mqtt_info("DeviceSecret KV not exist, Now We Need Dynamic Register...");
112 
113         res = IOT_Dynamic_Register(region, meta_info);
114         if (res != SUCCESS_RETURN) {
115             mqtt_err("Dynamic Register Failed");
116             return FAIL_RETURN;
117         }
118         res = HAL_SetDeviceSecret(meta_info->device_secret);
119         if (res < 0) {
120             mqtt_err("Save Device Secret to KV Failed");
121             return FAIL_RETURN;
122         }
123     }
124 
125     return SUCCESS_RETURN;
126 }
127 #endif /* #ifdef DYNAMIC_REGISTER */
128 
129 #ifdef MQTT_PRE_AUTH
130 #include "linkkit/infra/infra_preauth.h"
131 extern int _iotx_generate_sign_string(const char *device_id,
132                                       const char *device_name,
133                                       const char *product_key,
134                                       const char *device_secret,
135                                       char *sign_string);
136 
_iotx_preauth(iotx_mqtt_region_types_t region,iotx_dev_meta_info_t * meta,iotx_sign_mqtt_t * preauth_out)137 static int _iotx_preauth(iotx_mqtt_region_types_t region,
138                          iotx_dev_meta_info_t *meta,
139                          iotx_sign_mqtt_t *preauth_out)
140 {
141     uint16_t length = 0;
142     char device_id[IOTX_PRODUCT_KEY_LEN + IOTX_DEVICE_NAME_LEN + 1] = { 0 };
143     char sign_string[65] = { 0 };
144     int res;
145 
146     memset(preauth_out, 0, sizeof(iotx_sign_mqtt_t));
147 
148     /* setup device_id */
149     memcpy(device_id, meta->product_key, strlen(meta->product_key));
150     memcpy(device_id + strlen(device_id), ".", strlen("."));
151     memcpy(device_id + strlen(device_id), meta->device_name,
152            strlen(meta->device_name));
153 
154     /* setup sign_string */
155     res = _iotx_generate_sign_string(device_id, meta->device_name,
156                                      meta->product_key, meta->device_secret,
157                                      sign_string);
158     if (res < SUCCESS_RETURN) {
159         return res;
160     }
161 
162     return preauth_get_connection_info(region, meta, sign_string, device_id,
163                                        preauth_out);
164 }
165 #endif /* #ifdef MQTT_PRE_AUTH */
166 
167 extern int _sign_get_clientid(char *clientid_string, const char *device_id,
168                               const char *custom_kv, uint8_t enable_itls);
169 
170 /************************  Public Interface ************************/
IOT_MQTT_Construct(iotx_mqtt_param_t * pInitParams)171 void *IOT_MQTT_Construct(iotx_mqtt_param_t *pInitParams)
172 {
173     void *pclient;
174     iotx_dev_meta_info_t meta_info;
175     iotx_mqtt_param_t mqtt_params;
176     char device_id[IOTX_PRODUCT_KEY_LEN + IOTX_DEVICE_NAME_LEN + 1] = { 0 };
177     iotx_mqtt_region_types_t region = IOTX_CLOUD_REGION_SHANGHAI;
178     int dynamic = 0;
179     uint8_t enalbe_itls = 0;
180     int ret;
181     void *callback;
182 
183     if (g_mqtt_client != NULL) {
184         mqtt_err(
185             "Already exist default MQTT connection, won't proceed another one");
186         return g_mqtt_client;
187     }
188 
189     /* get region */
190     IOT_Ioctl(IOTX_IOCTL_GET_REGION, (void *)&region);
191 
192     /* get dynamic option */
193     IOT_Ioctl(IOTX_IOCTL_GET_DYNAMIC_REGISTER, (void *)&dynamic);
194 
195     /* get meta_info from hal */
196     memset(&meta_info, 0, sizeof(iotx_dev_meta_info_t));
197     HAL_GetProductKey(meta_info.product_key);
198     HAL_GetDeviceName(meta_info.device_name);
199 
200     if (meta_info.product_key[0] == '\0' ||
201         meta_info.product_key[IOTX_PRODUCT_KEY_LEN] != '\0') {
202         mqtt_err("Invalid product key, abort!");
203         return NULL;
204     }
205     if (meta_info.device_name[0] == '\0' ||
206         meta_info.device_name[IOTX_DEVICE_NAME_LEN] != '\0') {
207         mqtt_err("Invalid device name, abort!");
208         return NULL;
209     }
210 
211     /* reconfig clientid, append custome clientKV and itls switch flag */
212     if (pInitParams != NULL && pInitParams->customize_info != NULL) {
213         if (strstr(pInitParams->customize_info, "authtype=id2") != NULL) {
214             enalbe_itls = 1;
215         } else {
216             enalbe_itls = 0;
217         }
218     }
219 #ifdef DYNAMIC_REGISTER /* get device secret through https dynamic register */
220     if (dynamic) {
221         HAL_GetProductSecret(meta_info.product_secret);
222         if (meta_info.product_secret[0] == '\0' ||
223             meta_info.product_secret[IOTX_PRODUCT_SECRET_LEN] != '\0') {
224             mqtt_err("Product Secret doesn't exist");
225             return NULL;
226         }
227 
228         ret = _iotx_dynamic_register(region, &meta_info);
229         if (ret < SUCCESS_RETURN) {
230             mqtt_err("ret = _iotx_dynamic_register() = %d, abort", ret);
231             return NULL;
232         }
233     } else {
234         HAL_GetDeviceSecret(meta_info.device_secret);
235         if (meta_info.device_secret[0] == '\0' ||
236             meta_info.device_secret[IOTX_DEVICE_SECRET_LEN] != '\0') {
237             mqtt_err("Invalid device secret, abort!");
238             return NULL;
239         }
240     }
241 #else  /* get device secret from hal */
242     if (enalbe_itls == 0) {
243         HAL_GetDeviceSecret(meta_info.device_secret);
244         if (meta_info.device_secret[0] == '\0' ||
245             meta_info.device_secret[IOTX_DEVICE_SECRET_LEN] != '\0') {
246             mqtt_err("Invalid device secret, abort!");
247             return NULL;
248         }
249     }
250 #endif /* #ifdef DYNAMIC_REGISTER */
251 
252 #ifdef MQTT_PRE_AUTH /* preauth mode through https */
253     ret = _iotx_preauth(region, &meta_info,
254                         (iotx_sign_mqtt_t *)&g_default_sign); /* type convert */
255     if (ret < SUCCESS_RETURN) {
256         mqtt_err("ret = _iotx_preauth() = %d, abort", ret);
257         return NULL;
258     }
259 #else  /* direct mode */
260     ret = IOT_Sign_MQTT(region, &meta_info, &g_default_sign);
261     if (ret < SUCCESS_RETURN) {
262         mqtt_err("ret = IOT_Sign_MQTT() = %d, abort", ret);
263         return NULL;
264     }
265 #endif /* #ifdef MQTT_PRE_AUTH */
266 
267     /* setup device_id */
268     memcpy(device_id, meta_info.product_key, strlen(meta_info.product_key));
269     memcpy(device_id + strlen(device_id), ".", strlen("."));
270     memcpy(device_id + strlen(device_id), meta_info.device_name,
271            strlen(meta_info.device_name));
272 
273     if (_sign_get_clientid(
274             g_default_sign.clientid, device_id,
275             (pInitParams != NULL) ? pInitParams->customize_info : NULL,
276             enalbe_itls) != SUCCESS_RETURN) {
277         return NULL;
278     }
279 
280     /* Initialize MQTT parameter */
281     memset(&mqtt_params, 0x0, sizeof(iotx_mqtt_param_t));
282 
283 #ifdef SUPPORT_TLS
284     {
285         extern const char *iotx_ca_crt;
286         if (enalbe_itls == 0) {
287             mqtt_params.pub_key = iotx_ca_crt;
288         }
289     }
290 #endif
291     mqtt_params.request_timeout_ms = CONFIG_MQTT_REQUEST_TIMEOUT;
292     mqtt_params.clean_session = 0;
293     mqtt_params.keepalive_interval_ms = CONFIG_MQTT_KEEPALIVE_INTERVAL * 1000;
294     mqtt_params.read_buf_size = CONFIG_MQTT_MESSAGE_MAXLEN;
295     mqtt_params.write_buf_size = CONFIG_MQTT_MESSAGE_MAXLEN;
296     mqtt_params.handle_event.h_fp = NULL;
297     mqtt_params.handle_event.pcontext = NULL;
298 
299     /* optional configuration */
300     if (pInitParams != NULL) {
301         if (pInitParams->host && strlen(pInitParams->host)) {
302             mqtt_params.host = pInitParams->host;
303         } else {
304             mqtt_warning("Using default hostname: '%s'",
305                          g_default_sign.hostname);
306             mqtt_params.host = g_default_sign.hostname;
307         }
308 
309         if (pInitParams->port) {
310             mqtt_params.port = pInitParams->port;
311         } else {
312             mqtt_warning("Using default port: [%d]", g_default_sign.port);
313             mqtt_params.port = g_default_sign.port;
314         }
315 
316         if (pInitParams->client_id && strlen(pInitParams->client_id)) {
317             mqtt_params.client_id = pInitParams->client_id;
318         } else {
319             mqtt_warning("Using default client_id: %s",
320                          g_default_sign.clientid);
321             mqtt_params.client_id = g_default_sign.clientid;
322         }
323 
324         if (pInitParams->username && strlen(pInitParams->username)) {
325             mqtt_params.username = pInitParams->username;
326         } else {
327             mqtt_warning("Using default username: %s", g_default_sign.username);
328             mqtt_params.username = g_default_sign.username;
329         }
330 
331         if (pInitParams->password && strlen(pInitParams->password)) {
332             mqtt_params.password = pInitParams->password;
333         } else {
334 #if 1
335             mqtt_warning("Using default password: %s", "******");
336 #else
337             mqtt_warning("Using default password: %s", g_default_sign.password);
338 #endif
339             mqtt_params.password = g_default_sign.password;
340         }
341 
342         if (pInitParams->request_timeout_ms < CONFIG_MQTT_REQ_TIMEOUT_MIN ||
343             pInitParams->request_timeout_ms > CONFIG_MQTT_REQ_TIMEOUT_MAX) {
344             mqtt_warning(
345                 "Using default request_timeout_ms: %d, configured value(%d) "
346                 "out of [%d, %d]",
347                 mqtt_params.request_timeout_ms, pInitParams->request_timeout_ms,
348                 CONFIG_MQTT_REQ_TIMEOUT_MIN, CONFIG_MQTT_REQ_TIMEOUT_MAX);
349         } else {
350             mqtt_params.request_timeout_ms = pInitParams->request_timeout_ms;
351         }
352 
353         if (pInitParams->clean_session == 0 ||
354             pInitParams->clean_session == 1) {
355             mqtt_params.clean_session = pInitParams->clean_session;
356         }
357 
358         if (pInitParams->keepalive_interval_ms <
359                 CONFIG_MQTT_KEEPALIVE_INTERVAL_MIN * 1000 ||
360             pInitParams->keepalive_interval_ms >
361                 CONFIG_MQTT_KEEPALIVE_INTERVAL_MAX * 1000) {
362             mqtt_warning(
363                 "Using default keepalive_interval_ms: %d, configured value(%d) "
364                 "out of [%d, %d]",
365                 mqtt_params.keepalive_interval_ms,
366                 pInitParams->keepalive_interval_ms,
367                 CONFIG_MQTT_KEEPALIVE_INTERVAL_MIN * 1000,
368                 CONFIG_MQTT_KEEPALIVE_INTERVAL_MAX * 1000);
369         } else {
370             mqtt_params.keepalive_interval_ms =
371                 pInitParams->keepalive_interval_ms;
372         }
373 
374         if (!pInitParams->read_buf_size) {
375             mqtt_warning("Using default read_buf_size: %d",
376                          mqtt_params.read_buf_size);
377         } else {
378             mqtt_params.read_buf_size = pInitParams->read_buf_size;
379         }
380 
381         if (!pInitParams->write_buf_size) {
382             mqtt_warning("Using default write_buf_size: %d",
383                          mqtt_params.write_buf_size);
384         } else {
385             mqtt_params.write_buf_size = pInitParams->write_buf_size;
386         }
387 
388         if (pInitParams->handle_event.h_fp != NULL) {
389             mqtt_params.handle_event.h_fp = pInitParams->handle_event.h_fp;
390         }
391 
392         if (pInitParams->handle_event.pcontext != NULL) {
393             mqtt_params.handle_event.pcontext =
394                 pInitParams->handle_event.pcontext;
395         }
396     } else {
397         mqtt_warning("Using default port: [%d]", g_default_sign.port);
398         mqtt_params.port = g_default_sign.port;
399 
400         mqtt_warning("Using default hostname: '%s'", g_default_sign.hostname);
401         mqtt_params.host = g_default_sign.hostname;
402 
403         mqtt_warning("Using default client_id: %s", g_default_sign.clientid);
404         mqtt_params.client_id = g_default_sign.clientid;
405 
406         mqtt_warning("Using default username: %s", g_default_sign.username);
407         mqtt_params.username = g_default_sign.username;
408 
409 #if 1
410         mqtt_warning("Using default password: %s", "******");
411 #else
412         mqtt_warning("Using default password: %s", g_default_sign.password);
413 #endif
414         mqtt_params.password = g_default_sign.password;
415     }
416 
417     pclient = wrapper_mqtt_init(&mqtt_params);
418     if (pclient == NULL) {
419         mqtt_err("wrapper_mqtt_init error");
420         return NULL;
421     }
422 
423     ret = wrapper_mqtt_connect(pclient);
424     if (SUCCESS_RETURN != ret) {
425         if (MQTT_CONNECT_BLOCK != ret) {
426             mqtt_err("wrapper_mqtt_connect failed");
427             wrapper_mqtt_release(&pclient);
428             return NULL;
429         }
430     }
431 
432 #ifndef ASYNC_PROTOCOL_STACK
433     iotx_mqtt_report_funcs(pclient);
434 #endif
435 
436     /* report device info */
437     ret = iotx_report_devinfo(pclient);
438     if (SUCCESS_RETURN != ret) {
439 #ifdef DEBUG_REPORT_MID_DEVINFO_FIRMWARE
440         mqtt_err("failed to report devinfo");
441 #endif
442     }
443     g_mqtt_client = pclient;
444 
445     /* Mqtt Connect Callback */
446     callback = iotx_event_callback(ITE_MQTT_CONNECT_SUCC);
447     if (callback) {
448         ((int (*)(void))callback)();
449     }
450     return pclient;
451 }
452 
IOT_MQTT_Destroy(void ** phandler)453 int IOT_MQTT_Destroy(void **phandler)
454 {
455     void *client;
456     if (phandler != NULL) {
457         client = *phandler;
458         *phandler = NULL;
459     } else {
460         client = g_mqtt_client;
461     }
462 
463     if (client == NULL) {
464         mqtt_err("handler is null");
465         return NULL_VALUE_ERROR;
466     }
467 
468     wrapper_mqtt_release(&client);
469     g_mqtt_client = NULL;
470 
471     return SUCCESS_RETURN;
472 }
473 
IOT_MQTT_Yield(void * handle,int timeout_ms)474 int IOT_MQTT_Yield(void *handle, int timeout_ms)
475 {
476     void *pClient = (handle ? handle : g_mqtt_client);
477     return wrapper_mqtt_yield(pClient, timeout_ms);
478 }
479 
480 /* check whether MQTT connection is established or not */
IOT_MQTT_CheckStateNormal(void * handle)481 int IOT_MQTT_CheckStateNormal(void *handle)
482 {
483     void *pClient = (handle ? handle : g_mqtt_client);
484     if (pClient == NULL) {
485         mqtt_err("handler is null");
486         return NULL_VALUE_ERROR;
487     }
488 
489     return wrapper_mqtt_check_state(pClient);
490 }
491 
IOT_MQTT_Subscribe(void * handle,const char * topic_filter,iotx_mqtt_qos_t qos,iotx_mqtt_event_handle_func_fpt topic_handle_func,void * pcontext)492 int IOT_MQTT_Subscribe(void *handle, const char *topic_filter,
493                        iotx_mqtt_qos_t qos,
494                        iotx_mqtt_event_handle_func_fpt topic_handle_func,
495                        void *pcontext)
496 {
497     void *client = handle ? handle : g_mqtt_client;
498 
499     if (topic_filter == NULL || strlen(topic_filter) == 0 ||
500         topic_handle_func == NULL) {
501         mqtt_err("params err");
502         return NULL_VALUE_ERROR;
503     }
504 
505     if (qos > IOTX_MQTT_QOS3_SUB_LOCAL) {
506         mqtt_warning("Invalid qos(%d) out of [%d, %d], using %d", qos,
507                      IOTX_MQTT_QOS0, IOTX_MQTT_QOS3_SUB_LOCAL, IOTX_MQTT_QOS0);
508         qos = IOTX_MQTT_QOS0;
509     }
510 
511     return wrapper_mqtt_subscribe(client, topic_filter, qos, topic_handle_func,
512                                   pcontext);
513 }
514 
515 #define SUBSCRIBE_SYNC_TIMEOUT_MAX 10000
IOT_MQTT_Subscribe_Sync(void * handle,const char * topic_filter,iotx_mqtt_qos_t qos,iotx_mqtt_event_handle_func_fpt topic_handle_func,void * pcontext,int timeout_ms)516 int IOT_MQTT_Subscribe_Sync(void *handle, const char *topic_filter,
517                             iotx_mqtt_qos_t qos,
518                             iotx_mqtt_event_handle_func_fpt topic_handle_func,
519                             void *pcontext, int timeout_ms)
520 {
521     void *client = handle ? handle : g_mqtt_client;
522 
523     if (timeout_ms > SUBSCRIBE_SYNC_TIMEOUT_MAX) {
524         timeout_ms = SUBSCRIBE_SYNC_TIMEOUT_MAX;
525     }
526 
527     if (topic_filter == NULL || strlen(topic_filter) == 0 ||
528         topic_handle_func == NULL) {
529         mqtt_err("params err");
530         return NULL_VALUE_ERROR;
531     }
532 
533     if (qos > IOTX_MQTT_QOS3_SUB_LOCAL) {
534         mqtt_warning("Invalid qos(%d) out of [%d, %d], using %d", qos,
535                      IOTX_MQTT_QOS0, IOTX_MQTT_QOS3_SUB_LOCAL, IOTX_MQTT_QOS0);
536         qos = IOTX_MQTT_QOS0;
537     }
538 
539     return wrapper_mqtt_subscribe_sync(client, topic_filter, qos,
540                                        topic_handle_func, pcontext, timeout_ms);
541 }
542 
IOT_MQTT_Unsubscribe(void * handle,const char * topic_filter)543 int IOT_MQTT_Unsubscribe(void *handle, const char *topic_filter)
544 {
545     void *client = handle ? handle : g_mqtt_client;
546 
547     if (client == NULL || topic_filter == NULL || strlen(topic_filter) == 0) {
548         mqtt_err("params err");
549         return NULL_VALUE_ERROR;
550     }
551 
552     return wrapper_mqtt_unsubscribe(client, topic_filter);
553 }
554 
IOT_MQTT_Publish(void * handle,const char * topic_name,iotx_mqtt_topic_info_pt topic_msg)555 int IOT_MQTT_Publish(void *handle, const char *topic_name,
556                      iotx_mqtt_topic_info_pt topic_msg)
557 {
558     void *client = handle ? handle : g_mqtt_client;
559     int rc = -1;
560 
561     if (client == NULL || topic_name == NULL || strlen(topic_name) == 0) {
562         mqtt_err("params err");
563         return NULL_VALUE_ERROR;
564     }
565 
566     rc = wrapper_mqtt_publish(client, topic_name, topic_msg);
567     return rc;
568 }
569 
IOT_MQTT_Publish_Simple(void * handle,const char * topic_name,int qos,void * data,int len)570 int IOT_MQTT_Publish_Simple(void *handle, const char *topic_name, int qos,
571                             void *data, int len)
572 {
573     iotx_mqtt_topic_info_t mqtt_msg;
574     void *client = handle ? handle : g_mqtt_client;
575     int rc = -1;
576 
577     if (client == NULL || topic_name == NULL || strlen(topic_name) == 0) {
578         mqtt_err("params err");
579         return NULL_VALUE_ERROR;
580     }
581 
582     memset(&mqtt_msg, 0x0, sizeof(iotx_mqtt_topic_info_t));
583 
584     mqtt_msg.qos = qos;
585     mqtt_msg.retain = 0;
586     mqtt_msg.dup = 0;
587     mqtt_msg.payload = (void *)data;
588     mqtt_msg.payload_len = len;
589 
590     rc = wrapper_mqtt_publish(client, topic_name, &mqtt_msg);
591 
592     if (rc < 0) {
593         mqtt_err("IOT_MQTT_Publish failed\n");
594         return -1;
595     }
596 
597     return rc;
598 }
599 
IOT_MQTT_Nwk_Event_Handler(void * handle,iotx_mqtt_nwk_event_t event,iotx_mqtt_nwk_param_t * param)600 int IOT_MQTT_Nwk_Event_Handler(void *handle, iotx_mqtt_nwk_event_t event,
601                                iotx_mqtt_nwk_param_t *param)
602 {
603 #ifdef ASYNC_PROTOCOL_STACK
604     void *client = handle ? handle : g_mqtt_client;
605     int rc = -1;
606 
607     if (client == NULL || event >= IOTX_MQTT_SOC_MAX || param == NULL) {
608         mqtt_err("params err");
609         return NULL_VALUE_ERROR;
610     }
611 
612     rc = wrapper_mqtt_nwk_event_handler(client, event, param);
613 
614     if (rc < 0) {
615         mqtt_err("IOT_MQTT_Nwk_Event_Handler failed\n");
616         return -1;
617     }
618 
619     switch (event) {
620     case IOTX_MQTT_SOC_CONNECTED:
621         {
622             iotx_mqtt_report_funcs(client);
623         }
624         break;
625     default:
626         {
627         }
628         break;
629     }
630 
631     return rc;
632 #else
633     return -1;
634 #endif
635 }
636