1 /*
2 * Copyright (C) 2015-2019 Alibaba Group Holding Limited
3 */
4
5 #include "aiot_dm_api.h"
6 #include "aiot_mqtt_api.h"
7 #include "aiot_state_api.h"
8 #include "aiot_subdev_api.h"
9 #include "aiot_sysdep_api.h"
10 #include "amp_platform.h"
11 #include "amp_task.h"
12 #include "aos/kv.h"
13 #include "aos_system.h"
14 #include "py_defines.h"
15 // #include "be_inl.h"
16 #include "module_aiot.h"
17 #ifdef AOS_COMP_UAGENT
18 #include "uagent.h"
19 #endif
20 #define MOD_STR "AIOT_MQTT"
21
22 /* 位于portfiles/aiot_port文件夹下的系统适配函数集合 */
23 extern aiot_sysdep_portfile_t g_aiot_sysdep_portfile;
24
25 /* 位于external/ali_ca_cert.c中的服务器证书 */
26 extern const char *ali_ca_cert;
27
28 uint8_t pyamp_g_app_mqtt_process_thread_running = 0;
29 uint8_t pyamp_g_app_mqtt_recv_thread_running = 0;
30
__amp_strdup(char * src,int len)31 static char *__amp_strdup(char *src, int len)
32 {
33 char *dst;
34
35 if (src == NULL) {
36 return NULL;
37 }
38
39 dst = aos_malloc(len + 1);
40 if (dst == NULL) {
41 return NULL;
42 }
43
44 memcpy(dst, src, len);
45 dst[len] = '\0';
46 return dst;
47 }
48
49 /* 执行aiot_mqtt_process的线程, 包含心跳发送和QoS1消息重发 */
pyamp_aiot_app_mqtt_process_thread(void * args)50 void pyamp_aiot_app_mqtt_process_thread(void *args)
51 {
52 int32_t res = STATE_SUCCESS;
53
54 while (pyamp_g_app_mqtt_process_thread_running) {
55 res = aiot_mqtt_process(args);
56 if (res == STATE_USER_INPUT_EXEC_DISABLED) {
57 break;
58 }
59 aos_msleep(1000);
60 }
61 aos_task_exit(0);
62 return;
63 }
64
65 /* 执行aiot_mqtt_recv的线程, 包含网络自动重连和从服务器收取MQTT消息 */
pyamp_aiot_app_mqtt_recv_thread(void * args)66 void pyamp_aiot_app_mqtt_recv_thread(void *args)
67 {
68 int32_t res = STATE_SUCCESS;
69
70 while (pyamp_g_app_mqtt_recv_thread_running) {
71 res = aiot_mqtt_recv(args);
72 if (res < STATE_SUCCESS) {
73 if (res == STATE_USER_INPUT_EXEC_DISABLED) {
74 break;
75 }
76 aos_msleep(1000);
77 }
78 }
79 aos_task_exit(0);
80 return;
81 }
82
83 /* MQTT默认消息处理回调, 当SDK从服务器收到MQTT消息时,
84 * 且无对应用户回调处理时被调用 */
pyamp_aiot_app_mqtt_recv_handler(void * handle,const aiot_mqtt_recv_t * packet,void * userdata)85 void pyamp_aiot_app_mqtt_recv_handler(void *handle,
86 const aiot_mqtt_recv_t *packet,
87 void *userdata)
88 {
89 void (*callback)(void *userdata) = (void (*)(void *))userdata;
90
91 switch (packet->type) {
92 case AIOT_MQTTRECV_HEARTBEAT_RESPONSE:
93 {
94 // amp_debug(MOD_STR, "heartbeat response");
95 /* TODO: 处理服务器对心跳的回应, 一般不处理 */
96 }
97 break;
98
99 case AIOT_MQTTRECV_SUB_ACK:
100 {
101 amp_debug(MOD_STR,
102 "suback, res: -0x%04X, packet id: %d, max qos: %d",
103 -packet->data.sub_ack.res, packet->data.sub_ack.packet_id,
104 packet->data.sub_ack.max_qos);
105 /* TODO: 处理服务器对订阅请求的回应, 一般不处理 */
106 }
107 break;
108
109 case AIOT_MQTTRECV_PUB:
110 {
111 amp_debug(MOD_STR, "pub, qos: %d, topic: %.*s",
112 packet->data.pub.qos, packet->data.pub.topic_len,
113 packet->data.pub.topic);
114 amp_debug(MOD_STR, "pub, payload: %.*s",
115 packet->data.pub.payload_len, packet->data.pub.payload);
116 /* TODO: 处理服务器下发的业务报文 */
117 iot_mqtt_userdata_t *udata = (iot_mqtt_userdata_t *)userdata;
118 iot_mqtt_message_t message;
119 memset(&message, 0, sizeof(iot_mqtt_message_t));
120 if (udata && udata->callback) {
121 message.option = AIOT_MQTTOPT_RECV_HANDLER;
122 message.recv.type = packet->type;
123 message.recv.code = AIOT_MQTT_MESSAGE;
124 message.recv.topic = __amp_strdup(packet->data.pub.topic,
125 packet->data.pub.topic_len);
126 message.recv.payload = __amp_strdup(
127 packet->data.pub.payload, packet->data.pub.payload_len);
128 message.recv.topic_len = packet->data.pub.topic_len;
129 message.recv.payload_len = packet->data.pub.payload_len;
130 udata->callback(&message, udata);
131 aos_free(message.recv.topic);
132 aos_free(message.recv.payload);
133 }
134 }
135 break;
136
137 case AIOT_MQTTRECV_PUB_ACK:
138 {
139 amp_debug(MOD_STR, "puback, packet id: %d",
140 packet->data.pub_ack.packet_id);
141 /* TODO: 处理服务器对QoS1上报消息的回应, 一般不处理 */
142 }
143 break;
144
145 default:
146 {
147 }
148 }
149 }
150
151 /* MQTT事件回调函数, 当网络连接/重连/断开时被触发,
152 * 事件定义见core/aiot_mqtt_api.h */
pyamp_aiot_app_mqtt_event_handler(void * handle,const aiot_mqtt_event_t * event,void * userdata)153 void pyamp_aiot_app_mqtt_event_handler(void *handle,
154 const aiot_mqtt_event_t *event,
155 void *userdata)
156 {
157 iot_mqtt_userdata_t *udata = (iot_mqtt_userdata_t *)userdata;
158 iot_mqtt_message_t message;
159
160 memset(&message, 0, sizeof(iot_mqtt_message_t));
161 message.option = AIOT_MQTTOPT_EVENT_HANDLER;
162 message.event.type = event->type;
163
164 switch (event->type) {
165 /* SDK因为用户调用了aiot_mqtt_connect()接口, 与mqtt服务器建立连接已成功
166 */
167 case AIOT_MQTTEVT_CONNECT:
168 {
169 amp_debug(MOD_STR, "AIOT_MQTTEVT_CONNECT");
170 /* TODO: 处理SDK建连成功, 不可以在这里调用耗时较长的阻塞函数 */
171 message.event.code = AIOT_MQTT_CONNECT;
172 }
173 break;
174
175 /* SDK因为网络状况被动断连后, 自动发起重连已成功 */
176 case AIOT_MQTTEVT_RECONNECT:
177 {
178 amp_debug(MOD_STR, "AIOT_MQTTEVT_RECONNECT");
179 /* TODO: 处理SDK重连成功, 不可以在这里调用耗时较长的阻塞函数 */
180 message.event.code = AIOT_MQTT_RECONNECT;
181 }
182 break;
183
184 /* SDK因为网络的状况而被动断开了连接, network是底层读写失败,
185 * heartbeat是没有按预期得到服务端心跳应答 */
186 case AIOT_MQTTEVT_DISCONNECT:
187 {
188 char *cause = (event->data.disconnect ==
189 AIOT_MQTTDISCONNEVT_NETWORK_DISCONNECT)
190 ? ("network disconnect")
191 : ("heartbeat disconnect");
192 amp_debug(MOD_STR, "AIOT_MQTTEVT_DISCONNECT: %s", cause);
193 /* TODO: 处理SDK被动断连, 不可以在这里调用耗时较长的阻塞函数 */
194 message.event.code = AIOT_MQTT_DISCONNECT;
195 }
196 break;
197
198 default:
199 {
200 return;
201 }
202 }
203
204 if (udata && udata->callback)
205 udata->callback(&message, udata);
206 }
207
208 /* rawdata 函数演示 */
aiot_app_send_rawdata_post(void * dm_handle,char * data,int32_t data_len)209 int32_t aiot_app_send_rawdata_post(void *dm_handle, char *data, int32_t data_len)
210 {
211 aiot_dm_msg_t msg;
212
213 memset(&msg, 0, sizeof(aiot_dm_msg_t));
214 msg.type = AIOT_DMMSG_RAW_DATA;
215 msg.data.raw_data.data = data;
216 msg.data.raw_data.data_len = data_len;
217 return aiot_dm_send(dm_handle, &msg);
218 }
219
220
221 /* 属性上报函数演示 */
pyamp_aiot_app_send_property_post(void * dm_handle,char * params)222 int32_t pyamp_aiot_app_send_property_post(void *dm_handle, char *params)
223 {
224 aiot_dm_msg_t msg;
225
226 memset(&msg, 0, sizeof(aiot_dm_msg_t));
227 msg.type = AIOT_DMMSG_PROPERTY_POST;
228 msg.data.property_post.params = params;
229
230 return aiot_dm_send(dm_handle, &msg);
231 }
232
233 /* 事件上报函数演示 */
pyamp_aiot_app_send_event_post(void * dm_handle,char * event_id,char * params)234 int32_t pyamp_aiot_app_send_event_post(void *dm_handle, char *event_id,
235 char *params)
236 {
237 aiot_dm_msg_t msg;
238
239 memset(&msg, 0, sizeof(aiot_dm_msg_t));
240 msg.type = AIOT_DMMSG_EVENT_POST;
241 msg.data.event_post.event_id = event_id;
242 msg.data.event_post.params = params;
243
244 return aiot_dm_send(dm_handle, &msg);
245 }
246
pyamp_aiot_mqtt_client_start(void ** handle,int keepaliveSec,iot_mqtt_userdata_t * userdata)247 int32_t pyamp_aiot_mqtt_client_start(void **handle, int keepaliveSec,
248 iot_mqtt_userdata_t *userdata)
249 {
250 int32_t res = STATE_SUCCESS;
251 void *mqtt_handle = NULL;
252 char *url =
253 "iot-as-mqtt.cn-shanghai.aliyuncs.com"; /* 阿里云平台上海站点的域名后缀
254 */
255 char host[100] = { 0 }; /* 用这个数组拼接设备连接的云平台站点全地址, 规则是
256 ${productKey}.iot-as-mqtt.cn-shanghai.aliyuncs.com */
257 uint16_t port =
258 443; /* 无论设备是否使用TLS连接阿里云平台, 目的端口都是443 */
259 aiot_sysdep_network_cred_t
260 cred; /* 安全凭据结构体, 如果要用TLS, 这个结构体中配置CA证书等参数 */
261
262 /* get device tripple info */
263 char product_key[IOTX_PRODUCT_KEY_LEN] = { 0 };
264 char device_name[IOTX_DEVICE_NAME_LEN] = { 0 };
265 char device_secret[IOTX_DEVICE_SECRET_LEN] = { 0 };
266
267 int productkey_len = IOTX_PRODUCT_KEY_LEN;
268 int devicename_len = IOTX_DEVICE_NAME_LEN;
269 int devicesecret_len = IOTX_DEVICE_SECRET_LEN;
270
271 aos_kv_get(AMP_CUSTOMER_PRODUCTKEY, product_key, &productkey_len);
272 aos_kv_get(AMP_CUSTOMER_DEVICENAME, device_name, &devicename_len);
273 aos_kv_get(AMP_CUSTOMER_DEVICESECRET, device_secret, &devicesecret_len);
274 /* end get device tripple info */
275
276 /* 配置SDK的底层依赖 */
277 aiot_sysdep_set_portfile(&g_aiot_sysdep_portfile);
278 /* 配置SDK的日志输出 */
279 // aiot_state_set_logcb(demo_state_logcb);
280
281 /* 创建SDK的安全凭据, 用于建立TLS连接 */
282 memset(&cred, 0, sizeof(aiot_sysdep_network_cred_t));
283 cred.option =
284 AIOT_SYSDEP_NETWORK_CRED_SVRCERT_CA; /* 使用RSA证书校验MQTT服务端 */
285 cred.max_tls_fragment =
286 16384; /* 最大的分片长度为16K, 其它可选值还有4K, 2K, 1K, 0.5K */
287 cred.sni_enabled = 1; /* TLS建连时, 支持Server Name Indicator */
288 cred.x509_server_cert = ali_ca_cert; /* 用来验证MQTT服务端的RSA根证书 */
289 cred.x509_server_cert_len =
290 strlen(ali_ca_cert); /* 用来验证MQTT服务端的RSA根证书长度 */
291
292 /* 创建1个MQTT客户端实例并内部初始化默认参数 */
293 mqtt_handle = aiot_mqtt_init();
294
295 if (mqtt_handle == NULL) {
296 amp_debug(MOD_STR, "aiot_mqtt_init failed");
297 aos_free(mqtt_handle);
298 return -1;
299 }
300
301 /* TODO: 如果以下代码不被注释, 则例程会用TCP而不是TLS连接云平台 */
302 {
303 memset(&cred, 0, sizeof(aiot_sysdep_network_cred_t));
304 cred.option = AIOT_SYSDEP_NETWORK_CRED_NONE;
305 }
306
307 snprintf(host, 100, "%s.%s", product_key, url);
308 /* 配置MQTT服务器地址 */
309 aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_HOST, (void *)host);
310 /* 配置MQTT服务器端口 */
311 aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PORT, (void *)&port);
312 /* 配置设备productKey */
313 aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PRODUCT_KEY,
314 (void *)product_key);
315 /* 配置设备deviceName */
316 aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_DEVICE_NAME,
317 (void *)device_name);
318 /* 配置设备deviceSecret */
319 aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_DEVICE_SECRET,
320 (void *)device_secret);
321 /* 配置网络连接的安全凭据, 上面已经创建好了 */
322 aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_NETWORK_CRED, (void *)&cred);
323 /* 配置MQTT心跳间隔 */
324 aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_KEEPALIVE_SEC,
325 (void *)&keepaliveSec);
326 /* 配置回调参数 */
327 aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_USERDATA, userdata);
328 /* 配置MQTT默认消息接收回调函数 */
329 aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_RECV_HANDLER,
330 (void *)pyamp_aiot_app_mqtt_recv_handler);
331 /* 配置MQTT事件回调函数 */
332 aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_EVENT_HANDLER,
333 (void *)pyamp_aiot_app_mqtt_event_handler);
334
335 /* 与服务器建立MQTT连接 */
336 res = aiot_mqtt_connect(mqtt_handle);
337 if (res < STATE_SUCCESS) {
338 /* 尝试建立连接失败, 销毁MQTT实例, 回收资源 */
339 aiot_mqtt_deinit(&mqtt_handle);
340 amp_debug(MOD_STR, "aiot_mqtt_connect failed: -0x%04X", -res);
341 aos_task_exit(0);
342 return -1;
343 }
344
345 /* 创建一个单独的线程, 专用于执行aiot_mqtt_process, 它会自动发送心跳保活,
346 * 以及重发QoS1的未应答报文 */
347 pyamp_g_app_mqtt_process_thread_running = 1;
348
349 aos_task_t mqtt_process_task;
350
351 if (aos_task_new_ext(&mqtt_process_task, "mqtt_process",
352 pyamp_aiot_app_mqtt_process_thread, mqtt_handle,
353 1024 * 4, AOS_DEFAULT_APP_PRI) != 0) {
354 amp_debug(MOD_STR, "management mqtt process task create failed!");
355 aiot_mqtt_deinit(&mqtt_handle);
356 aos_task_exit(0);
357 return -1;
358 }
359 amp_debug(MOD_STR, "app mqtt process start");
360
361 /* 创建一个单独的线程用于执行aiot_mqtt_recv,
362 * 它会循环收取服务器下发的MQTT消息, 并在断线时自动重连 */
363 pyamp_g_app_mqtt_recv_thread_running = 1;
364
365 aos_task_t mqtt_rec_task;
366
367 if (aos_task_new_ext(&mqtt_rec_task, "mqtt_rec",
368 pyamp_aiot_app_mqtt_recv_thread, mqtt_handle, 1024 * 4,
369 AOS_DEFAULT_APP_PRI) != 0) {
370 amp_debug(MOD_STR, "management mqtt rec task create failed!");
371 aiot_mqtt_deinit(&mqtt_handle);
372 aos_task_exit(0);
373 return -1;
374 }
375 amp_debug(MOD_STR, "app mqtt rec start");
376
377 *handle = mqtt_handle;
378 #ifdef AOS_COMP_UAGENT
379 res = uagent_mqtt_client_set(mqtt_handle);
380 if (res != 0) {
381 amp_debug(MOD_STR, "uAgent mqtt handle set failed ret = %d\n", res);
382 }
383 res = uagent_ext_comm_start(product_key, device_name);
384 if (res != 0) {
385 amp_debug(MOD_STR, "uAgent ext comm start failed ret = %d\n", res);
386 }
387 #endif
388
389 return STATE_SUCCESS;
390 }
391
392 /* mqtt stop */
pyamp_aiot_mqtt_client_stop(void ** handle)393 int32_t pyamp_aiot_mqtt_client_stop(void **handle)
394 {
395 int32_t res = STATE_SUCCESS;
396 void *mqtt_handle = NULL;
397
398 mqtt_handle = *handle;
399
400 pyamp_g_app_mqtt_process_thread_running = 0;
401 pyamp_g_app_mqtt_recv_thread_running = 0;
402
403 /* 断开MQTT连接 */
404 res = aiot_mqtt_disconnect(mqtt_handle);
405 if (res < STATE_SUCCESS) {
406 aiot_mqtt_deinit(&mqtt_handle);
407 amp_debug(MOD_STR, "aiot_mqtt_disconnect failed: -0x%04X", -res);
408 return -1;
409 }
410
411 /* 销毁MQTT实例 */
412 res = aiot_mqtt_deinit(&mqtt_handle);
413 if (res < STATE_SUCCESS) {
414 amp_debug(MOD_STR, "aiot_mqtt_deinit failed: -0x%04X", -res);
415 return -1;
416 }
417
418 return res;
419 }
420