1 #include "iotx_cm_internal.h"
2 
3 #if defined(MQTT_COMM_ENABLED) || defined(MAL_ENABLED)
4 
5 static iotx_cm_connection_t *_mqtt_conncection = NULL;
6 static void iotx_cloud_conn_mqtt_event_handle(void *pcontext, void *pclient,
7                                               iotx_mqtt_event_msg_pt msg);
8 static int _mqtt_connect(uint32_t timeout);
9 static int _mqtt_publish(iotx_cm_ext_params_t *params, const char *topic,
10                          const char *payload, unsigned int payload_len);
11 static int _mqtt_sub(iotx_cm_ext_params_t *params, const char *topic,
12                      iotx_cm_data_handle_cb topic_handle_func, void *pcontext);
13 static iotx_mqtt_qos_t _get_mqtt_qos(iotx_cm_ack_types_t ack_type);
14 static int _mqtt_unsub(const char *topic);
15 static int _mqtt_close(void);
16 static void _set_common_handlers(void);
17 
iotx_cm_open_mqtt(iotx_cm_init_param_t * params)18 iotx_cm_connection_t *iotx_cm_open_mqtt(iotx_cm_init_param_t *params)
19 {
20     iotx_mqtt_param_t *mqtt_param = NULL;
21 
22     if (_mqtt_conncection != NULL) {
23         cm_warning("mqtt connection is opened already,return it");
24         return _mqtt_conncection;
25     }
26 
27     _mqtt_conncection =
28         (iotx_cm_connection_t *)cm_malloc(sizeof(iotx_cm_connection_t));
29     if (_mqtt_conncection == NULL) {
30         cm_err("_mqtt_conncection malloc failed!");
31         goto failed;
32     }
33     memset(_mqtt_conncection, 0, sizeof(iotx_cm_connection_t));
34     mqtt_param = (iotx_mqtt_param_t *)cm_malloc(sizeof(iotx_mqtt_param_t));
35     if (mqtt_param == NULL) {
36         cm_err("mqtt_param malloc failed!");
37         goto failed;
38     }
39 
40     memset(mqtt_param, 0, sizeof(iotx_mqtt_param_t));
41     mqtt_param->request_timeout_ms = params->request_timeout_ms;
42     mqtt_param->clean_session = 0;
43     mqtt_param->keepalive_interval_ms = params->keepalive_interval_ms;
44     mqtt_param->read_buf_size = params->read_buf_size;
45     mqtt_param->write_buf_size = params->write_buf_size;
46 
47     mqtt_param->handle_event.h_fp = iotx_cloud_conn_mqtt_event_handle;
48     mqtt_param->handle_event.pcontext = NULL;
49 
50     _mqtt_conncection->open_params = mqtt_param;
51     _mqtt_conncection->event_handler = params->handle_event;
52     _mqtt_conncection->cb_data = params->context;
53     _set_common_handlers();
54 
55     return _mqtt_conncection;
56 
57 failed:
58 
59     if (_mqtt_conncection != NULL) {
60         cm_free(_mqtt_conncection);
61         _mqtt_conncection = NULL;
62     }
63 
64     if (mqtt_param != NULL) {
65         cm_free(mqtt_param);
66     }
67 
68     return NULL;
69 }
70 
iotx_cloud_conn_mqtt_event_handle(void * pcontext,void * pclient,iotx_mqtt_event_msg_pt msg)71 static void iotx_cloud_conn_mqtt_event_handle(void *pcontext, void *pclient,
72                                               iotx_mqtt_event_msg_pt msg)
73 {
74     uintptr_t packet_id = (uintptr_t)msg->msg;
75     if (_mqtt_conncection == NULL) {
76         return;
77     }
78 
79     switch (msg->event_type) {
80     case IOTX_MQTT_EVENT_DISCONNECT:
81         {
82             iotx_cm_event_msg_t event;
83             cm_info("disconnected,fd = %d", _mqtt_conncection->fd);
84             event.type = IOTX_CM_EVENT_CLOUD_DISCONNECT;
85             event.msg = NULL;
86             if (_mqtt_conncection->event_handler) {
87                 _mqtt_conncection->event_handler(_mqtt_conncection->fd, &event,
88                                                  _mqtt_conncection->cb_data);
89             }
90         }
91         break;
92 
93     case IOTX_MQTT_EVENT_RECONNECT:
94         {
95             iotx_cm_event_msg_t event;
96             cm_info("connected,fd = %d", _mqtt_conncection->fd);
97             event.type = IOTX_CM_EVENT_CLOUD_CONNECTED;
98             event.msg = NULL;
99             /* cm_info(cm_log_info_MQTT_reconnect); */
100 
101             if (_mqtt_conncection->event_handler) {
102                 _mqtt_conncection->event_handler(_mqtt_conncection->fd, &event,
103                                                  _mqtt_conncection->cb_data);
104             }
105         }
106         break;
107 
108     case IOTX_MQTT_EVENT_SUBCRIBE_SUCCESS:
109         {
110             iotx_cm_event_msg_t event;
111             event.type = IOTX_CM_EVENT_SUBCRIBE_SUCCESS;
112             event.msg = (void *)packet_id;
113 
114             if (_mqtt_conncection->event_handler) {
115                 _mqtt_conncection->event_handler(_mqtt_conncection->fd, &event,
116                                                  _mqtt_conncection->cb_data);
117             }
118         }
119         break;
120 
121     case IOTX_MQTT_EVENT_SUBCRIBE_NACK:
122     case IOTX_MQTT_EVENT_SUBCRIBE_TIMEOUT:
123         {
124             iotx_cm_event_msg_t event;
125             event.type = IOTX_CM_EVENT_SUBCRIBE_FAILED;
126             event.msg = (void *)packet_id;
127 
128             if (_mqtt_conncection->event_handler) {
129                 _mqtt_conncection->event_handler(_mqtt_conncection->fd, &event,
130                                                  _mqtt_conncection->cb_data);
131             }
132         }
133         break;
134 
135     case IOTX_MQTT_EVENT_UNSUBCRIBE_SUCCESS:
136         {
137             iotx_cm_event_msg_t event;
138             event.type = IOTX_CM_EVENT_UNSUB_SUCCESS;
139             event.msg = (void *)packet_id;
140 
141             if (_mqtt_conncection->event_handler) {
142                 _mqtt_conncection->event_handler(_mqtt_conncection->fd, &event,
143                                                  _mqtt_conncection->cb_data);
144             }
145         }
146         break;
147 
148     case IOTX_MQTT_EVENT_UNSUBCRIBE_NACK:
149     case IOTX_MQTT_EVENT_UNSUBCRIBE_TIMEOUT:
150         {
151             iotx_cm_event_msg_t event;
152             event.type = IOTX_CM_EVENT_UNSUB_FAILED;
153             event.msg = (void *)packet_id;
154 
155             if (_mqtt_conncection->event_handler) {
156                 _mqtt_conncection->event_handler(_mqtt_conncection->fd, &event,
157                                                  _mqtt_conncection->cb_data);
158             }
159         }
160         break;
161 
162     case IOTX_MQTT_EVENT_PUBLISH_SUCCESS:
163         {
164             iotx_cm_event_msg_t event;
165             event.type = IOTX_CM_EVENT_PUBLISH_SUCCESS;
166             event.msg = (void *)packet_id;
167 
168             if (_mqtt_conncection->event_handler) {
169                 _mqtt_conncection->event_handler(_mqtt_conncection->fd, &event,
170                                                  _mqtt_conncection->cb_data);
171             }
172         }
173         break;
174 
175     case IOTX_MQTT_EVENT_PUBLISH_NACK:
176     case IOTX_MQTT_EVENT_PUBLISH_TIMEOUT:
177         {
178             iotx_cm_event_msg_t event;
179             event.type = IOTX_CM_EVENT_PUBLISH_FAILED;
180             event.msg = (void *)packet_id;
181 
182             if (_mqtt_conncection->event_handler) {
183                 _mqtt_conncection->event_handler(_mqtt_conncection->fd, &event,
184                                                  _mqtt_conncection->cb_data);
185             }
186         }
187         break;
188 
189     case IOTX_MQTT_EVENT_PUBLISH_RECEIVED:
190         {
191             iotx_mqtt_topic_info_pt topic_info =
192                 (iotx_mqtt_topic_info_pt)msg->msg;
193             iotx_cm_data_handle_cb topic_handle_func =
194                 (iotx_cm_data_handle_cb)pcontext;
195 #ifndef DEVICE_MODEL_ALINK2
196             char *topic = NULL;
197 #endif
198             if (topic_handle_func == NULL) {
199                 cm_warning("bypass %d bytes on [%.*s]", topic_info->payload_len,
200                            topic_info->topic_len, topic_info->ptopic);
201                 return;
202             }
203 #ifdef DEVICE_MODEL_ALINK2
204             topic_handle_func(_mqtt_conncection->fd, topic_info->ptopic,
205                               topic_info->topic_len, topic_info->payload,
206                               topic_info->payload_len, NULL);
207 #else
208             topic = cm_malloc(topic_info->topic_len + 1);
209             if (topic == NULL) {
210                 cm_err("topic malloc failed");
211                 return;
212             }
213             memset(topic, 0, topic_info->topic_len + 1);
214             memcpy(topic, topic_info->ptopic, topic_info->topic_len);
215 
216             topic_handle_func(_mqtt_conncection->fd, topic, topic_info->payload,
217                               topic_info->payload_len, NULL);
218 
219             cm_free(topic);
220 #endif
221         }
222         break;
223 
224     case IOTX_MQTT_EVENT_BUFFER_OVERFLOW:
225         cm_warning("buffer overflow");
226         break;
227 
228     default:
229         cm_warning("msg type unkown, type = %d", msg->event_type);
230         break;
231     }
232 }
233 
234 extern sdk_impl_ctx_t g_sdk_impl_ctx;
_mqtt_connect(uint32_t timeout)235 static int _mqtt_connect(uint32_t timeout)
236 {
237     void *pclient;
238     iotx_time_t timer;
239     iotx_cm_event_msg_t event;
240 
241     char product_key[IOTX_PRODUCT_KEY_LEN + 1] = { 0 };
242     char device_name[IOTX_DEVICE_NAME_LEN + 1] = { 0 };
243     char device_secret[IOTX_DEVICE_SECRET_LEN + 1] = { 0 };
244 
245     if (_mqtt_conncection == NULL) {
246         return NULL_VALUE_ERROR;
247     }
248 
249     HAL_GetProductKey(product_key);
250     HAL_GetDeviceName(device_name);
251     HAL_GetDeviceSecret(device_secret);
252 
253     if (strlen(product_key) == 0 || strlen(device_name) == 0) {
254         return FAIL_RETURN;
255     }
256 
257     iotx_time_init(&timer);
258     utils_time_countdown_ms(&timer, timeout);
259 
260     if (g_sdk_impl_ctx.mqtt_customzie_info[0] != '\0') {
261         ((iotx_mqtt_param_t *)_mqtt_conncection->open_params)->customize_info =
262             g_sdk_impl_ctx.mqtt_customzie_info;
263     }
264     if (g_sdk_impl_ctx.mqtt_port_num != 0) {
265         ((iotx_mqtt_param_t *)_mqtt_conncection->open_params)->port =
266             g_sdk_impl_ctx.mqtt_port_num;
267     }
268 
269     do {
270         pclient = IOT_MQTT_Construct(
271             (iotx_mqtt_param_t *)_mqtt_conncection->open_params);
272         if (pclient != NULL) {
273             iotx_cm_event_msg_t event;
274             _mqtt_conncection->context = pclient;
275             event.type = IOTX_CM_EVENT_CLOUD_CONNECTED;
276             event.msg = NULL;
277 
278             if (_mqtt_conncection->event_handler) {
279                 _mqtt_conncection->event_handler(_mqtt_conncection->fd, &event,
280                                                  (void *)_mqtt_conncection);
281             }
282             return 0;
283         }
284         HAL_SleepMs(500);
285     } while (!utils_time_is_expired(&timer));
286 
287     event.type = IOTX_CM_EVENT_CLOUD_CONNECT_FAILED;
288     event.msg = NULL;
289 
290     if (_mqtt_conncection->event_handler) {
291         _mqtt_conncection->event_handler(_mqtt_conncection->fd, &event,
292                                          (void *)_mqtt_conncection);
293     }
294     cm_err("mqtt connect failed");
295     return -1;
296 }
297 
_mqtt_publish(iotx_cm_ext_params_t * ext,const char * topic,const char * payload,unsigned int payload_len)298 static int _mqtt_publish(iotx_cm_ext_params_t *ext, const char *topic,
299                          const char *payload, unsigned int payload_len)
300 {
301     int qos = 0;
302 
303     if (_mqtt_conncection == NULL) {
304         return NULL_VALUE_ERROR;
305     }
306 
307     if (ext != NULL) {
308         qos = (int)_get_mqtt_qos(ext->ack_type);
309     }
310     return IOT_MQTT_Publish_Simple(_mqtt_conncection->context, topic, qos,
311                                    (void *)payload, payload_len);
312 }
313 
_mqtt_yield(uint32_t timeout)314 static int _mqtt_yield(uint32_t timeout)
315 {
316     if (_mqtt_conncection == NULL) {
317         return NULL_VALUE_ERROR;
318     }
319 
320     return IOT_MQTT_Yield(_mqtt_conncection->context, timeout);
321 }
322 
_mqtt_sub(iotx_cm_ext_params_t * ext,const char * topic,iotx_cm_data_handle_cb topic_handle_func,void * pcontext)323 static int _mqtt_sub(iotx_cm_ext_params_t *ext, const char *topic,
324                      iotx_cm_data_handle_cb topic_handle_func, void *pcontext)
325 {
326     int sync = 0;
327     iotx_mqtt_qos_t qos = IOTX_MQTT_QOS0;
328     int timeout = 0;
329     int ret;
330 
331     if (_mqtt_conncection == NULL || topic == NULL ||
332         topic_handle_func == NULL) {
333         return NULL_VALUE_ERROR;
334     }
335 
336     if (ext != NULL) {
337         if (ext->sync_mode == IOTX_CM_ASYNC) {
338             sync = 0;
339         } else {
340             sync = 1;
341             timeout = ext->sync_timeout;
342         }
343         qos = (int)_get_mqtt_qos(ext->ack_type);
344     }
345 
346     if (sync != 0) {
347         ret = IOT_MQTT_Subscribe_Sync(_mqtt_conncection->context, topic, qos,
348                                       iotx_cloud_conn_mqtt_event_handle,
349                                       (void *)topic_handle_func, timeout);
350     } else {
351         ret = IOT_MQTT_Subscribe(_mqtt_conncection->context, topic, qos,
352                                  iotx_cloud_conn_mqtt_event_handle,
353                                  (void *)topic_handle_func);
354     }
355 
356     return ret;
357 }
358 
_mqtt_unsub(const char * topic)359 static int _mqtt_unsub(const char *topic)
360 {
361     int ret;
362 
363     if (_mqtt_conncection == NULL) {
364         return NULL_VALUE_ERROR;
365     }
366 
367     ret = IOT_MQTT_Unsubscribe(_mqtt_conncection->context, topic);
368 
369     if (ret < 0) {
370         return -1;
371     }
372 
373     return ret;
374 }
375 
_mqtt_close()376 static int _mqtt_close()
377 {
378     if (_mqtt_conncection == NULL) {
379         return NULL_VALUE_ERROR;
380     }
381 
382     cm_free(_mqtt_conncection->open_params);
383     IOT_MQTT_Destroy(&_mqtt_conncection->context);
384     cm_free(_mqtt_conncection);
385     _mqtt_conncection = NULL;
386     return 0;
387 }
388 
_get_mqtt_qos(iotx_cm_ack_types_t ack_type)389 static iotx_mqtt_qos_t _get_mqtt_qos(iotx_cm_ack_types_t ack_type)
390 {
391     switch (ack_type) {
392     case IOTX_CM_MESSAGE_NO_ACK:
393         return IOTX_MQTT_QOS0;
394 
395     case IOTX_CM_MESSAGE_NEED_ACK:
396         return IOTX_MQTT_QOS1;
397 
398     case IOTX_CM_MESSAGE_SUB_LOCAL:
399         return IOTX_MQTT_QOS3_SUB_LOCAL;
400 
401     default:
402         return IOTX_MQTT_QOS0;
403     }
404 }
405 
_set_common_handlers()406 static void _set_common_handlers()
407 {
408     if (_mqtt_conncection != NULL) {
409         _mqtt_conncection->connect_func = _mqtt_connect;
410         _mqtt_conncection->sub_func = _mqtt_sub;
411         _mqtt_conncection->unsub_func = _mqtt_unsub;
412         _mqtt_conncection->pub_func = _mqtt_publish;
413         _mqtt_conncection->yield_func = (iotx_cm_yield_fp)_mqtt_yield;
414         _mqtt_conncection->close_func = _mqtt_close;
415     }
416 }
417 #endif
418