1 #include "iotx_cm_internal.h"
2
3 #if defined(MQTT_COMM_ENABLED) || defined(MAL_ENABLED)
4
5 static iotx_cm_connection_t *_mqtt_conncection = NULL;
6 static void iotx_cloud_conn_mqtt_event_handle(void *pcontext, void *pclient,
7 iotx_mqtt_event_msg_pt msg);
8 static int _mqtt_connect(uint32_t timeout);
9 static int _mqtt_publish(iotx_cm_ext_params_t *params, const char *topic,
10 const char *payload, unsigned int payload_len);
11 static int _mqtt_sub(iotx_cm_ext_params_t *params, const char *topic,
12 iotx_cm_data_handle_cb topic_handle_func, void *pcontext);
13 static iotx_mqtt_qos_t _get_mqtt_qos(iotx_cm_ack_types_t ack_type);
14 static int _mqtt_unsub(const char *topic);
15 static int _mqtt_close(void);
16 static void _set_common_handlers(void);
17
iotx_cm_open_mqtt(iotx_cm_init_param_t * params)18 iotx_cm_connection_t *iotx_cm_open_mqtt(iotx_cm_init_param_t *params)
19 {
20 iotx_mqtt_param_t *mqtt_param = NULL;
21
22 if (_mqtt_conncection != NULL) {
23 cm_warning("mqtt connection is opened already,return it");
24 return _mqtt_conncection;
25 }
26
27 _mqtt_conncection =
28 (iotx_cm_connection_t *)cm_malloc(sizeof(iotx_cm_connection_t));
29 if (_mqtt_conncection == NULL) {
30 cm_err("_mqtt_conncection malloc failed!");
31 goto failed;
32 }
33 memset(_mqtt_conncection, 0, sizeof(iotx_cm_connection_t));
34 mqtt_param = (iotx_mqtt_param_t *)cm_malloc(sizeof(iotx_mqtt_param_t));
35 if (mqtt_param == NULL) {
36 cm_err("mqtt_param malloc failed!");
37 goto failed;
38 }
39
40 memset(mqtt_param, 0, sizeof(iotx_mqtt_param_t));
41 mqtt_param->request_timeout_ms = params->request_timeout_ms;
42 mqtt_param->clean_session = 0;
43 mqtt_param->keepalive_interval_ms = params->keepalive_interval_ms;
44 mqtt_param->read_buf_size = params->read_buf_size;
45 mqtt_param->write_buf_size = params->write_buf_size;
46
47 mqtt_param->handle_event.h_fp = iotx_cloud_conn_mqtt_event_handle;
48 mqtt_param->handle_event.pcontext = NULL;
49
50 _mqtt_conncection->open_params = mqtt_param;
51 _mqtt_conncection->event_handler = params->handle_event;
52 _mqtt_conncection->cb_data = params->context;
53 _set_common_handlers();
54
55 return _mqtt_conncection;
56
57 failed:
58
59 if (_mqtt_conncection != NULL) {
60 cm_free(_mqtt_conncection);
61 _mqtt_conncection = NULL;
62 }
63
64 if (mqtt_param != NULL) {
65 cm_free(mqtt_param);
66 }
67
68 return NULL;
69 }
70
iotx_cloud_conn_mqtt_event_handle(void * pcontext,void * pclient,iotx_mqtt_event_msg_pt msg)71 static void iotx_cloud_conn_mqtt_event_handle(void *pcontext, void *pclient,
72 iotx_mqtt_event_msg_pt msg)
73 {
74 uintptr_t packet_id = (uintptr_t)msg->msg;
75 if (_mqtt_conncection == NULL) {
76 return;
77 }
78
79 switch (msg->event_type) {
80 case IOTX_MQTT_EVENT_DISCONNECT:
81 {
82 iotx_cm_event_msg_t event;
83 cm_info("disconnected,fd = %d", _mqtt_conncection->fd);
84 event.type = IOTX_CM_EVENT_CLOUD_DISCONNECT;
85 event.msg = NULL;
86 if (_mqtt_conncection->event_handler) {
87 _mqtt_conncection->event_handler(_mqtt_conncection->fd, &event,
88 _mqtt_conncection->cb_data);
89 }
90 }
91 break;
92
93 case IOTX_MQTT_EVENT_RECONNECT:
94 {
95 iotx_cm_event_msg_t event;
96 cm_info("connected,fd = %d", _mqtt_conncection->fd);
97 event.type = IOTX_CM_EVENT_CLOUD_CONNECTED;
98 event.msg = NULL;
99 /* cm_info(cm_log_info_MQTT_reconnect); */
100
101 if (_mqtt_conncection->event_handler) {
102 _mqtt_conncection->event_handler(_mqtt_conncection->fd, &event,
103 _mqtt_conncection->cb_data);
104 }
105 }
106 break;
107
108 case IOTX_MQTT_EVENT_SUBCRIBE_SUCCESS:
109 {
110 iotx_cm_event_msg_t event;
111 event.type = IOTX_CM_EVENT_SUBCRIBE_SUCCESS;
112 event.msg = (void *)packet_id;
113
114 if (_mqtt_conncection->event_handler) {
115 _mqtt_conncection->event_handler(_mqtt_conncection->fd, &event,
116 _mqtt_conncection->cb_data);
117 }
118 }
119 break;
120
121 case IOTX_MQTT_EVENT_SUBCRIBE_NACK:
122 case IOTX_MQTT_EVENT_SUBCRIBE_TIMEOUT:
123 {
124 iotx_cm_event_msg_t event;
125 event.type = IOTX_CM_EVENT_SUBCRIBE_FAILED;
126 event.msg = (void *)packet_id;
127
128 if (_mqtt_conncection->event_handler) {
129 _mqtt_conncection->event_handler(_mqtt_conncection->fd, &event,
130 _mqtt_conncection->cb_data);
131 }
132 }
133 break;
134
135 case IOTX_MQTT_EVENT_UNSUBCRIBE_SUCCESS:
136 {
137 iotx_cm_event_msg_t event;
138 event.type = IOTX_CM_EVENT_UNSUB_SUCCESS;
139 event.msg = (void *)packet_id;
140
141 if (_mqtt_conncection->event_handler) {
142 _mqtt_conncection->event_handler(_mqtt_conncection->fd, &event,
143 _mqtt_conncection->cb_data);
144 }
145 }
146 break;
147
148 case IOTX_MQTT_EVENT_UNSUBCRIBE_NACK:
149 case IOTX_MQTT_EVENT_UNSUBCRIBE_TIMEOUT:
150 {
151 iotx_cm_event_msg_t event;
152 event.type = IOTX_CM_EVENT_UNSUB_FAILED;
153 event.msg = (void *)packet_id;
154
155 if (_mqtt_conncection->event_handler) {
156 _mqtt_conncection->event_handler(_mqtt_conncection->fd, &event,
157 _mqtt_conncection->cb_data);
158 }
159 }
160 break;
161
162 case IOTX_MQTT_EVENT_PUBLISH_SUCCESS:
163 {
164 iotx_cm_event_msg_t event;
165 event.type = IOTX_CM_EVENT_PUBLISH_SUCCESS;
166 event.msg = (void *)packet_id;
167
168 if (_mqtt_conncection->event_handler) {
169 _mqtt_conncection->event_handler(_mqtt_conncection->fd, &event,
170 _mqtt_conncection->cb_data);
171 }
172 }
173 break;
174
175 case IOTX_MQTT_EVENT_PUBLISH_NACK:
176 case IOTX_MQTT_EVENT_PUBLISH_TIMEOUT:
177 {
178 iotx_cm_event_msg_t event;
179 event.type = IOTX_CM_EVENT_PUBLISH_FAILED;
180 event.msg = (void *)packet_id;
181
182 if (_mqtt_conncection->event_handler) {
183 _mqtt_conncection->event_handler(_mqtt_conncection->fd, &event,
184 _mqtt_conncection->cb_data);
185 }
186 }
187 break;
188
189 case IOTX_MQTT_EVENT_PUBLISH_RECEIVED:
190 {
191 iotx_mqtt_topic_info_pt topic_info =
192 (iotx_mqtt_topic_info_pt)msg->msg;
193 iotx_cm_data_handle_cb topic_handle_func =
194 (iotx_cm_data_handle_cb)pcontext;
195 #ifndef DEVICE_MODEL_ALINK2
196 char *topic = NULL;
197 #endif
198 if (topic_handle_func == NULL) {
199 cm_warning("bypass %d bytes on [%.*s]", topic_info->payload_len,
200 topic_info->topic_len, topic_info->ptopic);
201 return;
202 }
203 #ifdef DEVICE_MODEL_ALINK2
204 topic_handle_func(_mqtt_conncection->fd, topic_info->ptopic,
205 topic_info->topic_len, topic_info->payload,
206 topic_info->payload_len, NULL);
207 #else
208 topic = cm_malloc(topic_info->topic_len + 1);
209 if (topic == NULL) {
210 cm_err("topic malloc failed");
211 return;
212 }
213 memset(topic, 0, topic_info->topic_len + 1);
214 memcpy(topic, topic_info->ptopic, topic_info->topic_len);
215
216 topic_handle_func(_mqtt_conncection->fd, topic, topic_info->payload,
217 topic_info->payload_len, NULL);
218
219 cm_free(topic);
220 #endif
221 }
222 break;
223
224 case IOTX_MQTT_EVENT_BUFFER_OVERFLOW:
225 cm_warning("buffer overflow");
226 break;
227
228 default:
229 cm_warning("msg type unkown, type = %d", msg->event_type);
230 break;
231 }
232 }
233
234 extern sdk_impl_ctx_t g_sdk_impl_ctx;
_mqtt_connect(uint32_t timeout)235 static int _mqtt_connect(uint32_t timeout)
236 {
237 void *pclient;
238 iotx_time_t timer;
239 iotx_cm_event_msg_t event;
240
241 char product_key[IOTX_PRODUCT_KEY_LEN + 1] = { 0 };
242 char device_name[IOTX_DEVICE_NAME_LEN + 1] = { 0 };
243 char device_secret[IOTX_DEVICE_SECRET_LEN + 1] = { 0 };
244
245 if (_mqtt_conncection == NULL) {
246 return NULL_VALUE_ERROR;
247 }
248
249 HAL_GetProductKey(product_key);
250 HAL_GetDeviceName(device_name);
251 HAL_GetDeviceSecret(device_secret);
252
253 if (strlen(product_key) == 0 || strlen(device_name) == 0) {
254 return FAIL_RETURN;
255 }
256
257 iotx_time_init(&timer);
258 utils_time_countdown_ms(&timer, timeout);
259
260 if (g_sdk_impl_ctx.mqtt_customzie_info[0] != '\0') {
261 ((iotx_mqtt_param_t *)_mqtt_conncection->open_params)->customize_info =
262 g_sdk_impl_ctx.mqtt_customzie_info;
263 }
264 if (g_sdk_impl_ctx.mqtt_port_num != 0) {
265 ((iotx_mqtt_param_t *)_mqtt_conncection->open_params)->port =
266 g_sdk_impl_ctx.mqtt_port_num;
267 }
268
269 do {
270 pclient = IOT_MQTT_Construct(
271 (iotx_mqtt_param_t *)_mqtt_conncection->open_params);
272 if (pclient != NULL) {
273 iotx_cm_event_msg_t event;
274 _mqtt_conncection->context = pclient;
275 event.type = IOTX_CM_EVENT_CLOUD_CONNECTED;
276 event.msg = NULL;
277
278 if (_mqtt_conncection->event_handler) {
279 _mqtt_conncection->event_handler(_mqtt_conncection->fd, &event,
280 (void *)_mqtt_conncection);
281 }
282 return 0;
283 }
284 HAL_SleepMs(500);
285 } while (!utils_time_is_expired(&timer));
286
287 event.type = IOTX_CM_EVENT_CLOUD_CONNECT_FAILED;
288 event.msg = NULL;
289
290 if (_mqtt_conncection->event_handler) {
291 _mqtt_conncection->event_handler(_mqtt_conncection->fd, &event,
292 (void *)_mqtt_conncection);
293 }
294 cm_err("mqtt connect failed");
295 return -1;
296 }
297
_mqtt_publish(iotx_cm_ext_params_t * ext,const char * topic,const char * payload,unsigned int payload_len)298 static int _mqtt_publish(iotx_cm_ext_params_t *ext, const char *topic,
299 const char *payload, unsigned int payload_len)
300 {
301 int qos = 0;
302
303 if (_mqtt_conncection == NULL) {
304 return NULL_VALUE_ERROR;
305 }
306
307 if (ext != NULL) {
308 qos = (int)_get_mqtt_qos(ext->ack_type);
309 }
310 return IOT_MQTT_Publish_Simple(_mqtt_conncection->context, topic, qos,
311 (void *)payload, payload_len);
312 }
313
_mqtt_yield(uint32_t timeout)314 static int _mqtt_yield(uint32_t timeout)
315 {
316 if (_mqtt_conncection == NULL) {
317 return NULL_VALUE_ERROR;
318 }
319
320 return IOT_MQTT_Yield(_mqtt_conncection->context, timeout);
321 }
322
_mqtt_sub(iotx_cm_ext_params_t * ext,const char * topic,iotx_cm_data_handle_cb topic_handle_func,void * pcontext)323 static int _mqtt_sub(iotx_cm_ext_params_t *ext, const char *topic,
324 iotx_cm_data_handle_cb topic_handle_func, void *pcontext)
325 {
326 int sync = 0;
327 iotx_mqtt_qos_t qos = IOTX_MQTT_QOS0;
328 int timeout = 0;
329 int ret;
330
331 if (_mqtt_conncection == NULL || topic == NULL ||
332 topic_handle_func == NULL) {
333 return NULL_VALUE_ERROR;
334 }
335
336 if (ext != NULL) {
337 if (ext->sync_mode == IOTX_CM_ASYNC) {
338 sync = 0;
339 } else {
340 sync = 1;
341 timeout = ext->sync_timeout;
342 }
343 qos = (int)_get_mqtt_qos(ext->ack_type);
344 }
345
346 if (sync != 0) {
347 ret = IOT_MQTT_Subscribe_Sync(_mqtt_conncection->context, topic, qos,
348 iotx_cloud_conn_mqtt_event_handle,
349 (void *)topic_handle_func, timeout);
350 } else {
351 ret = IOT_MQTT_Subscribe(_mqtt_conncection->context, topic, qos,
352 iotx_cloud_conn_mqtt_event_handle,
353 (void *)topic_handle_func);
354 }
355
356 return ret;
357 }
358
_mqtt_unsub(const char * topic)359 static int _mqtt_unsub(const char *topic)
360 {
361 int ret;
362
363 if (_mqtt_conncection == NULL) {
364 return NULL_VALUE_ERROR;
365 }
366
367 ret = IOT_MQTT_Unsubscribe(_mqtt_conncection->context, topic);
368
369 if (ret < 0) {
370 return -1;
371 }
372
373 return ret;
374 }
375
_mqtt_close()376 static int _mqtt_close()
377 {
378 if (_mqtt_conncection == NULL) {
379 return NULL_VALUE_ERROR;
380 }
381
382 cm_free(_mqtt_conncection->open_params);
383 IOT_MQTT_Destroy(&_mqtt_conncection->context);
384 cm_free(_mqtt_conncection);
385 _mqtt_conncection = NULL;
386 return 0;
387 }
388
_get_mqtt_qos(iotx_cm_ack_types_t ack_type)389 static iotx_mqtt_qos_t _get_mqtt_qos(iotx_cm_ack_types_t ack_type)
390 {
391 switch (ack_type) {
392 case IOTX_CM_MESSAGE_NO_ACK:
393 return IOTX_MQTT_QOS0;
394
395 case IOTX_CM_MESSAGE_NEED_ACK:
396 return IOTX_MQTT_QOS1;
397
398 case IOTX_CM_MESSAGE_SUB_LOCAL:
399 return IOTX_MQTT_QOS3_SUB_LOCAL;
400
401 default:
402 return IOTX_MQTT_QOS0;
403 }
404 }
405
_set_common_handlers()406 static void _set_common_handlers()
407 {
408 if (_mqtt_conncection != NULL) {
409 _mqtt_conncection->connect_func = _mqtt_connect;
410 _mqtt_conncection->sub_func = _mqtt_sub;
411 _mqtt_conncection->unsub_func = _mqtt_unsub;
412 _mqtt_conncection->pub_func = _mqtt_publish;
413 _mqtt_conncection->yield_func = (iotx_cm_yield_fp)_mqtt_yield;
414 _mqtt_conncection->close_func = _mqtt_close;
415 }
416 }
417 #endif
418